Phase 1-3 of decoupling plan: - Shared DB, models, infrastructure, browser, config, utils - Event infrastructure (domain_events outbox, bus, processor) - Structured logging - Generic container concept (container_type/container_id) - Alembic migrations for all schema changes Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
57 lines
1.7 KiB
Python
57 lines
1.7 KiB
Python
"""
|
|
Transactional outbox event bus.
|
|
|
|
emit_event() writes to the domain_events table within the caller's existing
|
|
DB transaction — atomic with whatever domain change triggered the event.
|
|
|
|
register_handler() registers async handler functions that the EventProcessor
|
|
will call when processing events of a given type.
|
|
"""
|
|
from __future__ import annotations
|
|
|
|
from collections import defaultdict
|
|
from typing import Any, Awaitable, Callable, Dict, List
|
|
|
|
from sqlalchemy.ext.asyncio import AsyncSession
|
|
|
|
from shared.models.domain_event import DomainEvent
|
|
|
|
# handler signature: async def handler(event: DomainEvent, session: AsyncSession) -> None
|
|
HandlerFn = Callable[[DomainEvent, AsyncSession], Awaitable[None]]
|
|
|
|
_handlers: Dict[str, List[HandlerFn]] = defaultdict(list)
|
|
|
|
|
|
async def emit_event(
|
|
session: AsyncSession,
|
|
event_type: str,
|
|
aggregate_type: str,
|
|
aggregate_id: int,
|
|
payload: Dict[str, Any] | None = None,
|
|
) -> DomainEvent:
|
|
"""
|
|
Write a domain event to the outbox table in the current transaction.
|
|
|
|
Call this inside your service function, using the same session that
|
|
performs the domain change. The event and the change commit together.
|
|
"""
|
|
event = DomainEvent(
|
|
event_type=event_type,
|
|
aggregate_type=aggregate_type,
|
|
aggregate_id=aggregate_id,
|
|
payload=payload or {},
|
|
)
|
|
session.add(event)
|
|
await session.flush() # assign event.id
|
|
return event
|
|
|
|
|
|
def register_handler(event_type: str, fn: HandlerFn) -> None:
|
|
"""Register an async handler for a given event type."""
|
|
_handlers[event_type].append(fn)
|
|
|
|
|
|
def get_handlers(event_type: str) -> List[HandlerFn]:
|
|
"""Return all registered handlers for an event type."""
|
|
return _handlers.get(event_type, [])
|