Event-Driven Architecture in Python: Celery, RabbitMQ, and Real-World Patterns
When your monolith starts groaning under the weight of synchronous API calls, background jobs, and webhook processing, it's time to go event-driven. After 4 years of building event-driven systems at Strobes Security, I've distilled the patterns that actually work in production.
What is Event-Driven Architecture?
Event-driven architecture (EDA) is a design pattern where system components communicate through events — notifications that something has happened — rather than direct API calls.
Traditional (Synchronous):
Client ──▶ API ──▶ Process ──▶ Database ──▶ Response
Event-Driven (Asynchronous):
Client ──▶ API ──▶ Publish Event ──▶ Response (immediate)
│
▼
Message Broker (RabbitMQ)
│
┌─────┴─────┐
▼ ▼
Worker 1 Worker 2
(Process) (Notify)
Why Celery + RabbitMQ?
| Component | Role | Why This Choice |
|---|---|---|
| Celery | Task queue + worker framework | Python-native, battle-tested, massive ecosystem |
| RabbitMQ | Message broker | Reliable delivery, routing, dead-letter queues |
| Redis | Result backend + caching | Fast, simple result storage |
Why Not Kafka?
Kafka is excellent for event streaming (high-throughput, log-based), but for task processing (job queues, retries, rate limiting), Celery + RabbitMQ is more appropriate and simpler to operate for most Python teams.
Pattern 1: Fire-and-Forget Tasks
The simplest pattern — offload slow operations to background workers:
from celery import Celery
app = Celery("tasks", broker="amqp://rabbitmq:5672")
@app.task
def send_vulnerability_notification(vuln_id: int, channels: list[str]):
"""Send notifications about a new vulnerability to all channels."""
vuln = Vulnerability.objects.get(id=vuln_id)
for channel in channels:
if channel == "email":
send_email(vuln.assignee.email, vuln.summary)
elif channel == "slack":
post_to_slack(vuln.slack_channel, vuln.format_slack())
elif channel == "teams":
post_to_teams(vuln.teams_webhook, vuln.format_teams())
# In your API view — returns immediately
def create_vulnerability(request):
vuln = Vulnerability.objects.create(**request.data)
# Fire-and-forget: notification happens in background
send_vulnerability_notification.delay(vuln.id, ["email", "slack", "teams"])
return Response({"id": vuln.id}, status=201)
Pattern 2: Task Chains (Sequential Processing)
When tasks must execute in order:
from celery import chain
def process_security_scan(scan_id: int):
"""Chain: Scan → Deduplicate → Enrich → Notify"""
workflow = chain(
run_scanner.s(scan_id),
deduplicate_findings.s(),
enrich_with_cve_data.s(),
notify_stakeholders.s()
)
workflow.apply_async()
@app.task
def run_scanner(scan_id: int) -> list[dict]:
scan = Scan.objects.get(id=scan_id)
findings = scanner.execute(scan.target)
return [f.dict() for f in findings]
@app.task
def deduplicate_findings(findings: list[dict]) -> list[dict]:
"""Remove duplicate findings based on hash."""
seen = set()
unique = []
for f in findings:
key = hash_finding(f)
if key not in seen:
seen.add(key)
unique.append(f)
return unique
@app.task
def enrich_with_cve_data(findings: list[dict]) -> list[dict]:
"""Add CVE details from NVD database."""
for f in findings:
if f.get("cve_id"):
f["cve_details"] = fetch_nvd(f["cve_id"])
return findings
@app.task
def notify_stakeholders(findings: list[dict]):
"""Send summary notification with all findings."""
summary = generate_scan_summary(findings)
send_email_to_security_team(summary)
Pattern 3: Fan-Out with Groups
Process multiple items in parallel, then aggregate:
from celery import group, chord
@app.task
def scan_single_asset(asset_id: int) -> dict:
"""Scan one asset for vulnerabilities."""
asset = Asset.objects.get(id=asset_id)
return run_scan(asset)
@app.task
def aggregate_scan_results(results: list[dict]):
"""Merge all scan results into a single report."""
report = ScanReport.create(findings=results)
report.calculate_risk_score()
report.save()
return report.id
def scan_all_assets(organization_id: int):
"""Scan all assets in parallel, then aggregate."""
assets = Asset.objects.filter(organization_id=organization_id)
# chord = group (parallel) + callback (when all complete)
workflow = chord(
group(scan_single_asset.s(a.id) for a in assets),
aggregate_scan_results.s()
)
workflow.apply_async()
Pattern 4: Retry with Exponential Backoff
External API calls fail. Handle it gracefully:
@app.task(
bind=True,
max_retries=5,
default_retry_delay=60,
retry_backoff=True, # Exponential backoff
retry_backoff_max=600, # Max 10 minutes between retries
retry_jitter=True # Add randomness to prevent thundering herd
)
def sync_to_jira(self, vulnerability_id: int):
try:
vuln = Vulnerability.objects.get(id=vulnerability_id)
jira_client = get_jira_client(vuln.organization)
ticket = jira_client.create_issue(
project=vuln.jira_project,
summary=vuln.title,
description=vuln.description,
priority=map_severity_to_priority(vuln.severity)
)
vuln.jira_ticket_id = ticket.key
vuln.save()
except JiraConnectionError as exc:
# Retry with exponential backoff
raise self.retry(exc=exc)
except JiraAuthError:
# Don't retry auth errors — they won't fix themselves
vuln.sync_status = "auth_failed"
vuln.save()
Pattern 5: Dead Letter Queue (DLQ)
When tasks fail permanently, don't lose them:
# celery_config.py
app.conf.task_queues = [
Queue("default", routing_key="default"),
Queue("priority_high", routing_key="priority.high"),
Queue("dead_letter", routing_key="dead_letter"),
]
app.conf.task_routes = {
"tasks.critical_scan": {"queue": "priority_high"},
}
# Custom error handler
@app.task(bind=True)
def error_handler(self, task_id, exc, traceback, einfo, args, kwargs):
"""Move permanently failed tasks to DLQ for manual review."""
DeadLetterEntry.objects.create(
task_id=task_id,
task_name=self.name,
args=args,
kwargs=kwargs,
exception=str(exc),
traceback=traceback,
)
Pattern 6: Rate Limiting
Protect external APIs from being overwhelmed:
@app.task(rate_limit="10/m") # Max 10 per minute
def call_external_api(endpoint: str, payload: dict):
"""Rate-limited external API call."""
response = httpx.post(endpoint, json=payload, timeout=30)
response.raise_for_status()
return response.json()
Production Configuration
Here's the Celery configuration I use in production:
# celery_config.py
app.conf.update(
# Serialization
task_serializer="json",
result_serializer="json",
accept_content=["json"],
# Timezone
timezone="UTC",
enable_utc=True,
# Reliability
task_acks_late=True, # Ack after completion, not receipt
worker_prefetch_multiplier=1, # One task at a time per worker
task_reject_on_worker_lost=True, # Re-queue if worker dies
# Performance
worker_concurrency=4, # 4 concurrent workers
worker_max_tasks_per_child=100, # Restart worker after 100 tasks (memory leak protection)
# Monitoring
worker_send_task_events=True,
task_send_sent_event=True,
)
Monitoring with Flower
Always monitor your Celery workers in production:
pip install flower
celery -A myapp flower --port=5555
This gives you a real-time dashboard showing:
- Active/reserved/completed tasks
- Worker health and resource usage
- Task success/failure rates
- Queue depths
Key Takeaways
- Start with fire-and-forget: Don't over-engineer. Most background jobs just need
.delay() - Always set retry limits: Infinite retries will fill your queue
- Use
task_acks_late=True: Prevents task loss if a worker crashes mid-execution - Monitor queue depth: If queues grow faster than workers process, you need to scale
- Dead letter queues are essential: Failed tasks need a place to go for debugging
- Rate limit external calls: Be a good API citizen
Building event-driven security platforms at Strobes Security. Find me on GitHub.