Using Celery for Async Registration Batch Processing

Symptom Profile & Operational Impact Link to this section

During high-velocity ticket drops, corporate bulk uploads, or multi-track conference check-ins, registration platforms routinely exhibit a 30–120 second payment sync gap. Operators observe attendees marked as REGISTERED in the CRM while badge printers queue PAYMENT_PENDING jobs. Concurrently, webhook payloads arrive out of sequence, synchronous form API polling exhausts connection pools, and memory bottlenecks trigger OOM kills on application servers. The operational fallout is predictable: duplicate badge prints, orphaned payment records, and manual reconciliation queues that scale linearly with attendee volume.

Root Cause & Architectural Shift Link to this section

The underlying failure stems from coupling form ingestion, payment verification, and badge template rendering within a single synchronous request thread. When a registration payload arrives, the system blocks on external gateway latency, holds database row locks during schema validation, and retries webhook verification without idempotency guarantees. This tight coupling creates cascading backpressure. The resolution requires shifting to a decoupled event-driven architecture where Registration Ingestion & Payment Reconciliation operates as independent, retry-safe task boundaries. Celery provides the necessary task queue semantics, but production reliability demands explicit chunking, deterministic idempotency keys, and compensating transaction patterns rather than naive fire-and-forget dispatch.

Implementation Guide Link to this section

Step 1: Idempotent Task Routing & Chunking Link to this section

Batch registration payloads must be segmented to prevent broker memory exhaustion and guarantee partial failure isolation. Monolithic payloads monopolize worker threads and trigger ACK timeouts. Each chunk carries a deterministic batch_id and idempotency_key derived from the source form submission hash.

PYTHON
import os
import uuid
import hashlib
import logging
from typing import List, Dict, Any
from celery import Celery, group, chain
from pydantic import BaseModel, ValidationError

logger = logging.getLogger(__name__)

app = Celery(
    "event_reg",
    broker=os.getenv("CELERY_BROKER_URL", "redis://localhost:6379/0"),
    backend=os.getenv("CELERY_RESULT_BACKEND", "postgresql+psycopg2://user:pass@db/events"),
)

app.conf.update(
    task_serializer="json",
    accept_content=["json"],
    result_serializer="json",
    timezone="UTC",
    enable_utc=True,
    task_acks_late=True,
    task_reject_on_worker_lost=True,
    worker_prefetch_multiplier=1,
    task_default_retry_delay=15,
    task_max_retries=5,
    task_time_limit=300,
    task_soft_time_limit=240,
)

class RegistrationChunk(BaseModel):
    batch_id: str
    idempotency_key: str
    records: List[Dict[str, Any]]

def generate_idempotency_key(payload: Dict[str, Any]) -> str:
    raw = f"{payload.get('email', '')}-{payload.get('ticket_type', '')}-{payload.get('timestamp', '')}"
    return hashlib.sha256(raw.encode()).hexdigest()

def chunk_payload(records: List[Dict[str, Any]], chunk_size: int = 50) -> List[RegistrationChunk]:
    batch_id = str(uuid.uuid4())
    chunks = []
    for i in range(0, len(records), chunk_size):
        subset = records[i : i + chunk_size]
        key = generate_idempotency_key(subset[0])
        chunks.append(RegistrationChunk(batch_id=batch_id, idempotency_key=key, records=subset))
    return chunks

@app.task(bind=True, name="tasks.process_registration_chunk", max_retries=5)
def process_registration_chunk(self, chunk: RegistrationChunk) -> Dict[str, Any]:
    try:
        # Idempotency guard: skip if already processed
        # Pseudocode for DB check: if exists(batch_id, idempotency_key): return {"status": "skipped"}
        logger.info(f"Processing chunk {chunk.batch_id} | {len(chunk.records)} records")
        
        # Execute [Async Batch Processing](/registration-ingestion-payment-reconciliation/async-batch-processing/) pipeline
        for record in chunk.records:
            validate_and_dispatch(record)
            
        return {"batch_id": chunk.batch_id, "status": "completed", "processed": len(chunk.records)}
    except Exception as exc:
        logger.error(f"Chunk {chunk.batch_id} failed: {exc}")
        self.retry(exc=exc, countdown=self.request.retries * 15)

Step 2: Worker Concurrency & Memory Guardrails Link to this section

OOM kills occur when workers accumulate unbounded ORM sessions, large JSON payloads, or unreleased file descriptors. Enforce strict memory ceilings and connection recycling.

PYTHON
# celery_worker_config.py
app.conf.update(
    worker_max_tasks_per_child=100,  # Force worker restart to clear memory leaks
    worker_max_memory_per_child=512000,  # Kill & restart if RSS > 512MB
    worker_concurrency=4,  # Tune to CPU cores; avoid oversubscription
    broker_connection_retry_on_startup=True,
    result_expires=3600,  # Auto-cleanup stale results
)

Startup Command:

BASH
celery -A event_reg worker \
  --loglevel=INFO \
  --concurrency=4 \
  --max-memory-per-child=512000 \
  --max-tasks-per-child=100 \
  --pool=prefork \
  --without-gossip --without-mingle --without-heartbeat

Step 3: Payment Webhook Reconciliation & Retry Semantics Link to this section

Webhooks arrive out-of-order due to network jitter and gateway retries. Implement database-level upserts with explicit state transitions to prevent race conditions.

PYTHON
import psycopg2
from psycopg2.extras import execute_values
from celery.exceptions import Ignore

@app.task(bind=True, name="tasks.reconcile_payment_webhook", max_retries=8)
def reconcile_payment_webhook(self, webhook_payload: Dict[str, Any]) -> str:
    txn_id = webhook_payload.get("transaction_id")
    status = webhook_payload.get("status")
    
    if not txn_id or not status:
        raise Ignore("Malformed webhook payload")

    try:
        with psycopg2.connect(os.getenv("DB_URL")) as conn:
            with conn.cursor() as cur:
                # Atomic upsert with state machine guard
                cur.execute("""
                    INSERT INTO payment_ledger (txn_id, status, updated_at, idempotency_hash)
                    VALUES (%s, %s, NOW(), %s)
                    ON CONFLICT (txn_id) DO UPDATE SET
                        status = EXCLUDED.status,
                        updated_at = NOW()
                    WHERE payment_ledger.status != 'COMPLETED'
                """, (txn_id, status, webhook_payload.get("idempotency_key")))
                conn.commit()
                
        if status == "COMPLETED":
            trigger_badge_print.delay(txn_id)
        return f"Reconciled {txn_id} -> {status}"
    except psycopg2.OperationalError as e:
        logger.warning(f"DB connection lost, retrying: {e}")
        self.retry(exc=e, countdown=2 ** self.request.retries)

Step 4: Schema Validation & Error Categorization Link to this section

Reject malformed payloads early. Categorize failures into RETRYABLE (transient network), FATAL (schema violation), and MANUAL_REVIEW (business rule conflict). Route accordingly.

PYTHON
from enum import Enum
from pydantic import EmailStr, Field, conint

class ErrorCategory(str, Enum):
    RETRYABLE = "RETRYABLE"
    FATAL = "FATAL"
    MANUAL_REVIEW = "MANUAL_REVIEW"

class RegistrationPayload(BaseModel):
    email: EmailStr
    ticket_type: str = Field(..., min_length=3, max_length=50)
    quantity: conint(ge=1, le=10)
    payment_method: str

def validate_and_dispatch(record: Dict[str, Any]) -> None:
    try:
        validated = RegistrationPayload(**record)
        # Proceed to payment gateway
    except ValidationError as e:
        log_error(ErrorCategory.FATAL, record, e)
    except Exception as e:
        if "timeout" in str(e).lower() or "5xx" in str(e):
            log_error(ErrorCategory.RETRYABLE, record, e)
            raise
        else:
            log_error(ErrorCategory.MANUAL_REVIEW, record, e)

Incident Response & Rollback Procedures Link to this section

Fast Incident Resolution Link to this section

  1. Identify Backpressure: celery -A event_reg inspect active --json
  2. Purge Stale Tasks: celery -A event_reg purge (only if tasks are idempotent and safe to drop)
  3. Force Worker Restart: sudo systemctl restart celery-worker
  4. Clear Redis Broker Memory: redis-cli MEMORY PURGE && redis-cli FLUSHDB (use with caution; verify no in-flight webhooks)
  5. DB Reconciliation Script:
SQL
  -- Find orphaned PAYMENT_PENDING records older than 1 hour
  SELECT id, email, created_at FROM registrations 
  WHERE payment_status = 'PAYMENT_PENDING' AND created_at < NOW() - INTERVAL '1 hour'
  ORDER BY created_at ASC;

Re-dispatch via: process_registration_chunk.delay(RegistrationChunk(batch_id="manual_fix", idempotency_key="fix", records=[...]))

Rollback to Synchronous Fallback Link to this section

If Celery broker fails or workers crash repeatedly:

  1. Disable async routing in API gateway: export REGISTRATION_ASYNC_MODE=false
  2. Route /register directly to legacy sync handler.
  3. Flush pending Celery tasks: celery -A event_reg control shutdown
  4. Verify connection pool limits in pgbouncer or SQLAlchemy: pool_size=10, max_overflow=5
  5. Monitor psql -c "SELECT count(*) FROM pg_stat_activity WHERE state = 'active';" to confirm sync load is within capacity.

Post-Incident Validation Link to this section

  • Confirm payment_ledger row counts match CRM REGISTERED counts.
  • Verify badge printer queue depth returns to < 5.
  • Review celery -A event_reg inspect stats for memory leaks or task rejection spikes.