Phase 1-3 of decoupling plan: - Shared DB, models, infrastructure, browser, config, utils - Event infrastructure (domain_events outbox, bus, processor) - Structured logging - Generic container concept (container_type/container_id) - Alembic migrations for all schema changes Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
118 lines
3.7 KiB
Python
118 lines
3.7 KiB
Python
"""
|
|
Event processor — polls the domain_events outbox table and dispatches
|
|
to registered handlers.
|
|
|
|
Runs as an asyncio background task within each app process.
|
|
Uses SELECT ... FOR UPDATE SKIP LOCKED for safe concurrent processing.
|
|
"""
|
|
from __future__ import annotations
|
|
|
|
import asyncio
|
|
import traceback
|
|
from datetime import datetime, timezone
|
|
|
|
from sqlalchemy import select, update
|
|
from sqlalchemy.ext.asyncio import AsyncSession
|
|
|
|
from shared.db.session import get_session
|
|
from shared.models.domain_event import DomainEvent
|
|
from .bus import get_handlers
|
|
|
|
|
|
class EventProcessor:
|
|
"""Background event processor that polls the outbox table."""
|
|
|
|
def __init__(
|
|
self,
|
|
*,
|
|
poll_interval: float = 2.0,
|
|
batch_size: int = 10,
|
|
):
|
|
self._poll_interval = poll_interval
|
|
self._batch_size = batch_size
|
|
self._task: asyncio.Task | None = None
|
|
self._running = False
|
|
|
|
async def start(self) -> None:
|
|
"""Start the background polling loop."""
|
|
if self._task is not None:
|
|
return
|
|
self._running = True
|
|
self._task = asyncio.create_task(self._poll_loop())
|
|
|
|
async def stop(self) -> None:
|
|
"""Stop the background polling loop gracefully."""
|
|
self._running = False
|
|
if self._task is not None:
|
|
self._task.cancel()
|
|
try:
|
|
await self._task
|
|
except asyncio.CancelledError:
|
|
pass
|
|
self._task = None
|
|
|
|
async def _poll_loop(self) -> None:
|
|
while self._running:
|
|
try:
|
|
processed = await self._process_batch()
|
|
if processed == 0:
|
|
await asyncio.sleep(self._poll_interval)
|
|
except asyncio.CancelledError:
|
|
break
|
|
except Exception:
|
|
traceback.print_exc()
|
|
await asyncio.sleep(self._poll_interval)
|
|
|
|
async def _process_batch(self) -> int:
|
|
"""Fetch and process a batch of pending events. Returns count processed."""
|
|
processed = 0
|
|
async with get_session() as session:
|
|
# FOR UPDATE SKIP LOCKED: safe for concurrent processors
|
|
stmt = (
|
|
select(DomainEvent)
|
|
.where(
|
|
DomainEvent.state == "pending",
|
|
DomainEvent.attempts < DomainEvent.max_attempts,
|
|
)
|
|
.order_by(DomainEvent.created_at)
|
|
.limit(self._batch_size)
|
|
.with_for_update(skip_locked=True)
|
|
)
|
|
result = await session.execute(stmt)
|
|
events = result.scalars().all()
|
|
|
|
for event in events:
|
|
await self._process_one(session, event)
|
|
processed += 1
|
|
|
|
await session.commit()
|
|
return processed
|
|
|
|
async def _process_one(self, session: AsyncSession, event: DomainEvent) -> None:
|
|
"""Run all handlers for a single event."""
|
|
handlers = get_handlers(event.event_type)
|
|
now = datetime.now(timezone.utc)
|
|
|
|
event.state = "processing"
|
|
event.attempts += 1
|
|
await session.flush()
|
|
|
|
if not handlers:
|
|
# No handlers registered — mark completed (nothing to do)
|
|
event.state = "completed"
|
|
event.processed_at = now
|
|
return
|
|
|
|
try:
|
|
for handler in handlers:
|
|
await handler(event, session)
|
|
event.state = "completed"
|
|
event.processed_at = now
|
|
except Exception as exc:
|
|
event.last_error = f"{exc.__class__.__name__}: {exc}"
|
|
if event.attempts >= event.max_attempts:
|
|
event.state = "failed"
|
|
event.processed_at = now
|
|
else:
|
|
event.state = "pending" # retry
|