Event-Driven Architecture in Python: Celery, RabbitMQ, and Real-World Patterns

2025-04-05Python, Celery, RabbitMQ, Architecture, Backend

When your monolith starts groaning under the weight of synchronous API calls, background jobs, and webhook processing, it's time to go event-driven. After 4 years of building event-driven systems at Strobes Security, I've distilled the patterns that actually work in production.

What is Event-Driven Architecture?

Event-driven architecture (EDA) is a design pattern where system components communicate through events — notifications that something has happened — rather than direct API calls.

Traditional (Synchronous):
  Client ──▶ API ──▶ Process ──▶ Database ──▶ Response

Event-Driven (Asynchronous):
  Client ──▶ API ──▶ Publish Event ──▶ Response (immediate)
                          │
                          ▼
                    Message Broker (RabbitMQ)
                          │
                    ┌─────┴─────┐
                    ▼           ▼
               Worker 1    Worker 2
              (Process)   (Notify)

Why Celery + RabbitMQ?

ComponentRoleWhy This Choice
CeleryTask queue + worker frameworkPython-native, battle-tested, massive ecosystem
RabbitMQMessage brokerReliable delivery, routing, dead-letter queues
RedisResult backend + cachingFast, simple result storage

Why Not Kafka?

Kafka is excellent for event streaming (high-throughput, log-based), but for task processing (job queues, retries, rate limiting), Celery + RabbitMQ is more appropriate and simpler to operate for most Python teams.

Pattern 1: Fire-and-Forget Tasks

The simplest pattern — offload slow operations to background workers:

from celery import Celery

app = Celery("tasks", broker="amqp://rabbitmq:5672")

@app.task
def send_vulnerability_notification(vuln_id: int, channels: list[str]):
    """Send notifications about a new vulnerability to all channels."""
    vuln = Vulnerability.objects.get(id=vuln_id)

    for channel in channels:
        if channel == "email":
            send_email(vuln.assignee.email, vuln.summary)
        elif channel == "slack":
            post_to_slack(vuln.slack_channel, vuln.format_slack())
        elif channel == "teams":
            post_to_teams(vuln.teams_webhook, vuln.format_teams())

# In your API view — returns immediately
def create_vulnerability(request):
    vuln = Vulnerability.objects.create(**request.data)

    # Fire-and-forget: notification happens in background
    send_vulnerability_notification.delay(vuln.id, ["email", "slack", "teams"])

    return Response({"id": vuln.id}, status=201)

Pattern 2: Task Chains (Sequential Processing)

When tasks must execute in order:

from celery import chain

def process_security_scan(scan_id: int):
    """Chain: Scan → Deduplicate → Enrich → Notify"""
    workflow = chain(
        run_scanner.s(scan_id),
        deduplicate_findings.s(),
        enrich_with_cve_data.s(),
        notify_stakeholders.s()
    )
    workflow.apply_async()

@app.task
def run_scanner(scan_id: int) -> list[dict]:
    scan = Scan.objects.get(id=scan_id)
    findings = scanner.execute(scan.target)
    return [f.dict() for f in findings]

@app.task
def deduplicate_findings(findings: list[dict]) -> list[dict]:
    """Remove duplicate findings based on hash."""
    seen = set()
    unique = []
    for f in findings:
        key = hash_finding(f)
        if key not in seen:
            seen.add(key)
            unique.append(f)
    return unique

@app.task
def enrich_with_cve_data(findings: list[dict]) -> list[dict]:
    """Add CVE details from NVD database."""
    for f in findings:
        if f.get("cve_id"):
            f["cve_details"] = fetch_nvd(f["cve_id"])
    return findings

@app.task
def notify_stakeholders(findings: list[dict]):
    """Send summary notification with all findings."""
    summary = generate_scan_summary(findings)
    send_email_to_security_team(summary)

Pattern 3: Fan-Out with Groups

Process multiple items in parallel, then aggregate:

from celery import group, chord

@app.task
def scan_single_asset(asset_id: int) -> dict:
    """Scan one asset for vulnerabilities."""
    asset = Asset.objects.get(id=asset_id)
    return run_scan(asset)

@app.task
def aggregate_scan_results(results: list[dict]):
    """Merge all scan results into a single report."""
    report = ScanReport.create(findings=results)
    report.calculate_risk_score()
    report.save()
    return report.id

def scan_all_assets(organization_id: int):
    """Scan all assets in parallel, then aggregate."""
    assets = Asset.objects.filter(organization_id=organization_id)

    # chord = group (parallel) + callback (when all complete)
    workflow = chord(
        group(scan_single_asset.s(a.id) for a in assets),
        aggregate_scan_results.s()
    )
    workflow.apply_async()

Pattern 4: Retry with Exponential Backoff

External API calls fail. Handle it gracefully:

@app.task(
    bind=True,
    max_retries=5,
    default_retry_delay=60,
    retry_backoff=True,        # Exponential backoff
    retry_backoff_max=600,     # Max 10 minutes between retries
    retry_jitter=True          # Add randomness to prevent thundering herd
)
def sync_to_jira(self, vulnerability_id: int):
    try:
        vuln = Vulnerability.objects.get(id=vulnerability_id)
        jira_client = get_jira_client(vuln.organization)

        ticket = jira_client.create_issue(
            project=vuln.jira_project,
            summary=vuln.title,
            description=vuln.description,
            priority=map_severity_to_priority(vuln.severity)
        )

        vuln.jira_ticket_id = ticket.key
        vuln.save()

    except JiraConnectionError as exc:
        # Retry with exponential backoff
        raise self.retry(exc=exc)
    except JiraAuthError:
        # Don't retry auth errors — they won't fix themselves
        vuln.sync_status = "auth_failed"
        vuln.save()

Pattern 5: Dead Letter Queue (DLQ)

When tasks fail permanently, don't lose them:

# celery_config.py
app.conf.task_queues = [
    Queue("default", routing_key="default"),
    Queue("priority_high", routing_key="priority.high"),
    Queue("dead_letter", routing_key="dead_letter"),
]

app.conf.task_routes = {
    "tasks.critical_scan": {"queue": "priority_high"},
}

# Custom error handler
@app.task(bind=True)
def error_handler(self, task_id, exc, traceback, einfo, args, kwargs):
    """Move permanently failed tasks to DLQ for manual review."""
    DeadLetterEntry.objects.create(
        task_id=task_id,
        task_name=self.name,
        args=args,
        kwargs=kwargs,
        exception=str(exc),
        traceback=traceback,
    )

Pattern 6: Rate Limiting

Protect external APIs from being overwhelmed:

@app.task(rate_limit="10/m")  # Max 10 per minute
def call_external_api(endpoint: str, payload: dict):
    """Rate-limited external API call."""
    response = httpx.post(endpoint, json=payload, timeout=30)
    response.raise_for_status()
    return response.json()

Production Configuration

Here's the Celery configuration I use in production:

# celery_config.py
app.conf.update(
    # Serialization
    task_serializer="json",
    result_serializer="json",
    accept_content=["json"],

    # Timezone
    timezone="UTC",
    enable_utc=True,

    # Reliability
    task_acks_late=True,                  # Ack after completion, not receipt
    worker_prefetch_multiplier=1,         # One task at a time per worker
    task_reject_on_worker_lost=True,      # Re-queue if worker dies

    # Performance
    worker_concurrency=4,                 # 4 concurrent workers
    worker_max_tasks_per_child=100,       # Restart worker after 100 tasks (memory leak protection)

    # Monitoring
    worker_send_task_events=True,
    task_send_sent_event=True,
)

Monitoring with Flower

Always monitor your Celery workers in production:

pip install flower
celery -A myapp flower --port=5555

This gives you a real-time dashboard showing:

  • Active/reserved/completed tasks
  • Worker health and resource usage
  • Task success/failure rates
  • Queue depths

Key Takeaways

  1. Start with fire-and-forget: Don't over-engineer. Most background jobs just need .delay()
  2. Always set retry limits: Infinite retries will fill your queue
  3. Use task_acks_late=True: Prevents task loss if a worker crashes mid-execution
  4. Monitor queue depth: If queues grow faster than workers process, you need to scale
  5. Dead letter queues are essential: Failed tasks need a place to go for debugging
  6. Rate limit external calls: Be a good API citizen

Building event-driven security platforms at Strobes Security. Find me on GitHub.