""" Unified activity bus. emit_activity() writes an APActivity row with process_state='pending' within the caller's existing DB transaction — atomic with the domain change. register_activity_handler() registers async handler functions that the EventProcessor dispatches when processing pending activities. """ from __future__ import annotations import logging import uuid from collections import defaultdict from typing import Awaitable, Callable, Dict, List, Tuple from sqlalchemy.ext.asyncio import AsyncSession from shared.models.federation import APActivity log = logging.getLogger(__name__) # --------------------------------------------------------------------------- # Activity-handler registry # --------------------------------------------------------------------------- # Handler signature: async def handler(activity: APActivity, session: AsyncSession) -> None ActivityHandlerFn = Callable[[APActivity, AsyncSession], Awaitable[None]] # Keyed by (activity_type, object_type). object_type="*" is wildcard. _activity_handlers: Dict[Tuple[str, str], List[ActivityHandlerFn]] = defaultdict(list) def register_activity_handler( activity_type: str, fn: ActivityHandlerFn, *, object_type: str | None = None, ) -> None: """Register an async handler for an activity type + optional object type. Use ``activity_type="*"`` as a wildcard that fires for every activity (e.g. federation delivery handler). """ key = (activity_type, object_type or "*") _activity_handlers[key].append(fn) log.info("Registered activity handler %s.%s for key %s", fn.__module__, fn.__qualname__, key) def get_activity_handlers( activity_type: str, object_type: str | None = None, ) -> List[ActivityHandlerFn]: """Return all matching handlers for an activity. Matches in order: 1. Exact (activity_type, object_type) 2. (activity_type, "*") — type-level wildcard 3. ("*", "*") — global wildcard (e.g. delivery) """ handlers: List[ActivityHandlerFn] = [] ot = object_type or "*" # Exact match if ot != "*": handlers.extend(_activity_handlers.get((activity_type, ot), [])) # Type-level wildcard handlers.extend(_activity_handlers.get((activity_type, "*"), [])) # Global wildcard if activity_type != "*": handlers.extend(_activity_handlers.get(("*", "*"), [])) return handlers # --------------------------------------------------------------------------- # emit_activity — the primary way to emit events # --------------------------------------------------------------------------- async def emit_activity( session: AsyncSession, *, activity_type: str, actor_uri: str, object_type: str, object_data: dict | None = None, source_type: str | None = None, source_id: int | None = None, visibility: str = "internal", actor_profile_id: int | None = None, origin_app: str | None = None, ) -> APActivity: """ Write an AP-shaped activity to ap_activities with process_state='pending'. Called inside a service function using the same session that performs the domain change. The activity and the change commit together. """ if not origin_app: try: from quart import current_app origin_app = current_app.name except (ImportError, RuntimeError): pass activity_uri = f"internal:{uuid.uuid4()}" if visibility == "internal" else f"urn:uuid:{uuid.uuid4()}" activity = APActivity( activity_id=activity_uri, activity_type=activity_type, actor_profile_id=actor_profile_id, actor_uri=actor_uri, object_type=object_type, object_data=object_data or {}, is_local=True, source_type=source_type, source_id=source_id, visibility=visibility, process_state="pending", origin_app=origin_app, ) session.add(activity) await session.flush() return activity