""" Transactional outbox event bus. emit_event() writes to the domain_events table within the caller's existing DB transaction — atomic with whatever domain change triggered the event. register_handler() registers async handler functions that the EventProcessor will call when processing events of a given type. """ from __future__ import annotations from collections import defaultdict from typing import Any, Awaitable, Callable, Dict, List from sqlalchemy.ext.asyncio import AsyncSession from shared.models.domain_event import DomainEvent # handler signature: async def handler(event: DomainEvent, session: AsyncSession) -> None HandlerFn = Callable[[DomainEvent, AsyncSession], Awaitable[None]] _handlers: Dict[str, List[HandlerFn]] = defaultdict(list) async def emit_event( session: AsyncSession, event_type: str, aggregate_type: str, aggregate_id: int, payload: Dict[str, Any] | None = None, ) -> DomainEvent: """ Write a domain event to the outbox table in the current transaction. Call this inside your service function, using the same session that performs the domain change. The event and the change commit together. """ event = DomainEvent( event_type=event_type, aggregate_type=aggregate_type, aggregate_id=aggregate_id, payload=payload or {}, ) session.add(event) await session.flush() # assign event.id return event def register_handler(event_type: str, fn: HandlerFn) -> None: """Register an async handler for a given event type.""" _handlers[event_type].append(fn) def get_handlers(event_type: str) -> List[HandlerFn]: """Return all registered handlers for an event type.""" return _handlers.get(event_type, [])