""" Event processor — polls the domain_events outbox table and dispatches to registered handlers. Runs as an asyncio background task within each app process. Uses SELECT ... FOR UPDATE SKIP LOCKED for safe concurrent processing. """ from __future__ import annotations import asyncio import traceback from datetime import datetime, timezone from sqlalchemy import select, update from sqlalchemy.ext.asyncio import AsyncSession from shared.db.session import get_session from shared.models.domain_event import DomainEvent from .bus import get_handlers class EventProcessor: """Background event processor that polls the outbox table.""" def __init__( self, *, poll_interval: float = 2.0, batch_size: int = 10, ): self._poll_interval = poll_interval self._batch_size = batch_size self._task: asyncio.Task | None = None self._running = False async def start(self) -> None: """Start the background polling loop.""" if self._task is not None: return self._running = True self._task = asyncio.create_task(self._poll_loop()) async def stop(self) -> None: """Stop the background polling loop gracefully.""" self._running = False if self._task is not None: self._task.cancel() try: await self._task except asyncio.CancelledError: pass self._task = None async def _poll_loop(self) -> None: while self._running: try: processed = await self._process_batch() if processed == 0: await asyncio.sleep(self._poll_interval) except asyncio.CancelledError: break except Exception: traceback.print_exc() await asyncio.sleep(self._poll_interval) async def _process_batch(self) -> int: """Fetch and process a batch of pending events. Returns count processed.""" processed = 0 async with get_session() as session: # FOR UPDATE SKIP LOCKED: safe for concurrent processors stmt = ( select(DomainEvent) .where( DomainEvent.state == "pending", DomainEvent.attempts < DomainEvent.max_attempts, ) .order_by(DomainEvent.created_at) .limit(self._batch_size) .with_for_update(skip_locked=True) ) result = await session.execute(stmt) events = result.scalars().all() for event in events: await self._process_one(session, event) processed += 1 await session.commit() return processed async def _process_one(self, session: AsyncSession, event: DomainEvent) -> None: """Run all handlers for a single event.""" handlers = get_handlers(event.event_type) now = datetime.now(timezone.utc) event.state = "processing" event.attempts += 1 await session.flush() if not handlers: # No handlers registered — mark completed (nothing to do) event.state = "completed" event.processed_at = now return try: for handler in handlers: await handler(event, session) event.state = "completed" event.processed_at = now except Exception as exc: event.last_error = f"{exc.__class__.__name__}: {exc}" if event.attempts >= event.max_attempts: event.state = "failed" event.processed_at = now else: event.state = "pending" # retry