Microservices Example¶
Complete example of using Amebo for microservices communication in an e-commerce platform.
Architecture¶
graph TB
A[API Gateway] --> B[User Service]
A --> C[Order Service]
A --> D[Payment Service]
A --> E[Inventory Service]
B --> F[Amebo]
C --> F
D --> F
E --> F
F --> G[Email Service]
F --> H[Analytics Service]
F --> I[Notification Service]
Services Setup¶
User Service Events¶
# Register user service
curl -X POST http://localhost/v1/applications \
-d '{
"application": "user-service",
"address": "https://users.myapp.com",
"secret": "user-service-secret"
}'
# Define user events
curl -X POST http://localhost/v1/actions \
-d '{
"action": "user.created",
"application": "user-service",
"schemata": {
"type": "object",
"properties": {
"user_id": {"type": "string"},
"email": {"type": "string", "format": "email"},
"name": {"type": "string"},
"plan": {"type": "string", "enum": ["free", "premium"]}
},
"required": ["user_id", "email", "name"]
}
}'
Order Service Events¶
# Register order service
curl -X POST http://localhost/v1/applications \
-d '{
"application": "order-service",
"address": "https://orders.myapp.com",
"secret": "order-service-secret"
}'
# Define order events
curl -X POST http://localhost/v1/actions \
-d '{
"action": "order.placed",
"application": "order-service",
"schemata": {
"type": "object",
"properties": {
"order_id": {"type": "string"},
"user_id": {"type": "string"},
"items": {"type": "array"},
"total": {"type": "number"},
"currency": {"type": "string"}
},
"required": ["order_id", "user_id", "items", "total"]
}
}'
Event Flow Examples¶
User Registration Flow¶
# User Service - Handle registration
@app.route('/users', methods=['POST'])
def create_user():
user_data = request.json
# Create user in database
user = create_user_in_db(user_data)
# Publish event
amebo_client.publish_event('user.created', {
'user_id': user.id,
'email': user.email,
'name': user.name,
'plan': user.plan,
'created_at': user.created_at.isoformat()
})
return jsonify(user.to_dict()), 201
Order Processing Flow¶
# Order Service - Handle order placement
@app.route('/orders', methods=['POST'])
def place_order():
order_data = request.json
# Validate and create order
order = create_order_in_db(order_data)
# Publish order placed event
amebo_client.publish_event('order.placed', {
'order_id': order.id,
'user_id': order.user_id,
'items': order.items,
'total': float(order.total),
'currency': order.currency,
'placed_at': order.created_at.isoformat()
})
return jsonify(order.to_dict()), 201
Service Integrations¶
Email Service¶
# Email Service - Handle user events
@app.route('/webhooks/user-created', methods=['POST'])
def handle_user_created():
event = request.json
user_data = event['payload']
# Send welcome email
send_welcome_email(
email=user_data['email'],
name=user_data['name'],
plan=user_data.get('plan', 'free')
)
return jsonify({'status': 'processed'}), 200
# Subscription setup
curl -X POST http://localhost/v1/subscriptions \
-d '{
"application": "email-service",
"subscription": "welcome-emails",
"action": "user.created",
"handler": "https://email.myapp.com/webhooks/user-created"
}'
Analytics Service¶
# Analytics Service - Track events
@app.route('/webhooks/events', methods=['POST'])
def handle_analytics_event():
event = request.json
action = event['action']
payload = event['payload']
if action == 'user.created':
track_user_registration(payload)
elif action == 'order.placed':
track_order_placement(payload)
return jsonify({'status': 'tracked'}), 200
# Multiple subscriptions
curl -X POST http://localhost/v1/subscriptions \
-d '{
"application": "analytics-service",
"subscription": "user-analytics",
"action": "user.created",
"handler": "https://analytics.myapp.com/webhooks/events"
}'
curl -X POST http://localhost/v1/subscriptions \
-d '{
"application": "analytics-service",
"subscription": "order-analytics",
"action": "order.placed",
"handler": "https://analytics.myapp.com/webhooks/events"
}'
Advanced Patterns¶
Saga Pattern¶
# Order Service - Orchestrate order processing
def process_order(order_id):
try:
# Step 1: Reserve inventory
amebo_client.publish_event('inventory.reserve_requested', {
'order_id': order_id,
'items': order.items
})
# Step 2: Process payment
amebo_client.publish_event('payment.process_requested', {
'order_id': order_id,
'amount': order.total,
'user_id': order.user_id
})
except Exception as e:
# Compensate on failure
amebo_client.publish_event('order.processing_failed', {
'order_id': order_id,
'error': str(e)
})
Event Sourcing¶
# User Service - Event sourced user aggregate
class User:
def __init__(self, user_id):
self.user_id = user_id
self.events = []
def create(self, email, name):
event = {
'action': 'user.created',
'payload': {
'user_id': self.user_id,
'email': email,
'name': name,
'created_at': datetime.utcnow().isoformat()
}
}
self.apply_event(event)
return event
def update_email(self, new_email):
event = {
'action': 'user.email_updated',
'payload': {
'user_id': self.user_id,
'old_email': self.email,
'new_email': new_email,
'updated_at': datetime.utcnow().isoformat()
}
}
self.apply_event(event)
return event
Testing¶
Integration Tests¶
def test_user_registration_flow():
# Create user
response = requests.post('http://user-service/users', json={
'email': 'test@example.com',
'name': 'Test User'
})
assert response.status_code == 201
# Wait for event processing
time.sleep(1)
# Verify email was sent
emails = get_sent_emails()
assert any(email['to'] == 'test@example.com' for email in emails)
# Verify analytics tracking
analytics_events = get_analytics_events()
assert any(event['action'] == 'user.created' for event in analytics_events)
Event Replay¶
def replay_events_for_service(service_name, from_date, to_date):
"""Replay events for a specific service"""
events = amebo_client.get_events(
from_date=from_date,
to_date=to_date
)
for event in events:
# Send to service webhook
requests.post(f'https://{service_name}/webhooks/replay', json=event)
Monitoring¶
Service Health¶
# Health check endpoint
@app.route('/health')
def health_check():
checks = {
'database': check_database_connection(),
'amebo': check_amebo_connection(),
'external_apis': check_external_apis()
}
status = 'healthy' if all(checks.values()) else 'unhealthy'
return jsonify({'status': status, 'checks': checks})
Event Metrics¶
# Track event publishing
def publish_event_with_metrics(action, payload):
start_time = time.time()
try:
result = amebo_client.publish_event(action, payload)
metrics.increment('events.published', tags=[f'action:{action}'])
return result
except Exception as e:
metrics.increment('events.failed', tags=[f'action:{action}'])
raise
finally:
duration = time.time() - start_time
metrics.histogram('events.publish_duration', duration)
Best Practices¶
- Service boundaries: Clear ownership of events
- Schema evolution: Plan for backward compatibility
- Error handling: Graceful degradation
- Monitoring: Comprehensive observability
- Testing: End-to-end integration tests