feat: extract shared infrastructure from shared_lib
Phase 1-3 of decoupling plan: - Shared DB, models, infrastructure, browser, config, utils - Event infrastructure (domain_events outbox, bus, processor) - Structured logging - Generic container concept (container_type/container_id) - Alembic migrations for all schema changes Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
This commit is contained in:
117
events/processor.py
Normal file
117
events/processor.py
Normal file
@@ -0,0 +1,117 @@
|
||||
"""
|
||||
Event processor — polls the domain_events outbox table and dispatches
|
||||
to registered handlers.
|
||||
|
||||
Runs as an asyncio background task within each app process.
|
||||
Uses SELECT ... FOR UPDATE SKIP LOCKED for safe concurrent processing.
|
||||
"""
|
||||
from __future__ import annotations
|
||||
|
||||
import asyncio
|
||||
import traceback
|
||||
from datetime import datetime, timezone
|
||||
|
||||
from sqlalchemy import select, update
|
||||
from sqlalchemy.ext.asyncio import AsyncSession
|
||||
|
||||
from shared.db.session import get_session
|
||||
from shared.models.domain_event import DomainEvent
|
||||
from .bus import get_handlers
|
||||
|
||||
|
||||
class EventProcessor:
|
||||
"""Background event processor that polls the outbox table."""
|
||||
|
||||
def __init__(
|
||||
self,
|
||||
*,
|
||||
poll_interval: float = 2.0,
|
||||
batch_size: int = 10,
|
||||
):
|
||||
self._poll_interval = poll_interval
|
||||
self._batch_size = batch_size
|
||||
self._task: asyncio.Task | None = None
|
||||
self._running = False
|
||||
|
||||
async def start(self) -> None:
|
||||
"""Start the background polling loop."""
|
||||
if self._task is not None:
|
||||
return
|
||||
self._running = True
|
||||
self._task = asyncio.create_task(self._poll_loop())
|
||||
|
||||
async def stop(self) -> None:
|
||||
"""Stop the background polling loop gracefully."""
|
||||
self._running = False
|
||||
if self._task is not None:
|
||||
self._task.cancel()
|
||||
try:
|
||||
await self._task
|
||||
except asyncio.CancelledError:
|
||||
pass
|
||||
self._task = None
|
||||
|
||||
async def _poll_loop(self) -> None:
|
||||
while self._running:
|
||||
try:
|
||||
processed = await self._process_batch()
|
||||
if processed == 0:
|
||||
await asyncio.sleep(self._poll_interval)
|
||||
except asyncio.CancelledError:
|
||||
break
|
||||
except Exception:
|
||||
traceback.print_exc()
|
||||
await asyncio.sleep(self._poll_interval)
|
||||
|
||||
async def _process_batch(self) -> int:
|
||||
"""Fetch and process a batch of pending events. Returns count processed."""
|
||||
processed = 0
|
||||
async with get_session() as session:
|
||||
# FOR UPDATE SKIP LOCKED: safe for concurrent processors
|
||||
stmt = (
|
||||
select(DomainEvent)
|
||||
.where(
|
||||
DomainEvent.state == "pending",
|
||||
DomainEvent.attempts < DomainEvent.max_attempts,
|
||||
)
|
||||
.order_by(DomainEvent.created_at)
|
||||
.limit(self._batch_size)
|
||||
.with_for_update(skip_locked=True)
|
||||
)
|
||||
result = await session.execute(stmt)
|
||||
events = result.scalars().all()
|
||||
|
||||
for event in events:
|
||||
await self._process_one(session, event)
|
||||
processed += 1
|
||||
|
||||
await session.commit()
|
||||
return processed
|
||||
|
||||
async def _process_one(self, session: AsyncSession, event: DomainEvent) -> None:
|
||||
"""Run all handlers for a single event."""
|
||||
handlers = get_handlers(event.event_type)
|
||||
now = datetime.now(timezone.utc)
|
||||
|
||||
event.state = "processing"
|
||||
event.attempts += 1
|
||||
await session.flush()
|
||||
|
||||
if not handlers:
|
||||
# No handlers registered — mark completed (nothing to do)
|
||||
event.state = "completed"
|
||||
event.processed_at = now
|
||||
return
|
||||
|
||||
try:
|
||||
for handler in handlers:
|
||||
await handler(event, session)
|
||||
event.state = "completed"
|
||||
event.processed_at = now
|
||||
except Exception as exc:
|
||||
event.last_error = f"{exc.__class__.__name__}: {exc}"
|
||||
if event.attempts >= event.max_attempts:
|
||||
event.state = "failed"
|
||||
event.processed_at = now
|
||||
else:
|
||||
event.state = "pending" # retry
|
||||
Reference in New Issue
Block a user