This repository has been archived on 2026-02-24. You can view files and clone it. You cannot open issues or pull requests or push a commit.
Files
shared/events/bus.py
giles ef806f8fbb feat: extract shared infrastructure from shared_lib
Phase 1-3 of decoupling plan:
- Shared DB, models, infrastructure, browser, config, utils
- Event infrastructure (domain_events outbox, bus, processor)
- Structured logging
- Generic container concept (container_type/container_id)
- Alembic migrations for all schema changes

Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
2026-02-11 12:45:56 +00:00

57 lines
1.7 KiB
Python

"""
Transactional outbox event bus.
emit_event() writes to the domain_events table within the caller's existing
DB transaction — atomic with whatever domain change triggered the event.
register_handler() registers async handler functions that the EventProcessor
will call when processing events of a given type.
"""
from __future__ import annotations
from collections import defaultdict
from typing import Any, Awaitable, Callable, Dict, List
from sqlalchemy.ext.asyncio import AsyncSession
from shared.models.domain_event import DomainEvent
# handler signature: async def handler(event: DomainEvent, session: AsyncSession) -> None
HandlerFn = Callable[[DomainEvent, AsyncSession], Awaitable[None]]
_handlers: Dict[str, List[HandlerFn]] = defaultdict(list)
async def emit_event(
session: AsyncSession,
event_type: str,
aggregate_type: str,
aggregate_id: int,
payload: Dict[str, Any] | None = None,
) -> DomainEvent:
"""
Write a domain event to the outbox table in the current transaction.
Call this inside your service function, using the same session that
performs the domain change. The event and the change commit together.
"""
event = DomainEvent(
event_type=event_type,
aggregate_type=aggregate_type,
aggregate_id=aggregate_id,
payload=payload or {},
)
session.add(event)
await session.flush() # assign event.id
return event
def register_handler(event_type: str, fn: HandlerFn) -> None:
"""Register an async handler for a given event type."""
_handlers[event_type].append(fn)
def get_handlers(event_type: str) -> List[HandlerFn]:
"""Return all registered handlers for an event type."""
return _handlers.get(event_type, [])