""" Unified activity bus. emit_activity() writes an APActivity row with process_state='pending' within the caller's existing DB transaction — atomic with the domain change. When the federation database is separate (DATABASE_URL_FEDERATION differs from DATABASE_URL), emit_activity() opens its own federation session and commits independently. Atomicity is traded for domain isolation; handlers are idempotent, so at-least-once delivery is safe. register_activity_handler() registers async handler functions that the EventProcessor dispatches when processing pending activities. """ from __future__ import annotations import logging import uuid from collections import defaultdict from typing import Awaitable, Callable, Dict, List, Tuple from sqlalchemy import text from sqlalchemy.ext.asyncio import AsyncSession from shared.models.federation import APActivity log = logging.getLogger(__name__) # --------------------------------------------------------------------------- # Activity-handler registry # --------------------------------------------------------------------------- # Handler signature: async def handler(activity: APActivity, session: AsyncSession) -> None ActivityHandlerFn = Callable[[APActivity, AsyncSession], Awaitable[None]] # Keyed by (activity_type, object_type). object_type="*" is wildcard. _activity_handlers: Dict[Tuple[str, str], List[ActivityHandlerFn]] = defaultdict(list) def register_activity_handler( activity_type: str, fn: ActivityHandlerFn, *, object_type: str | None = None, ) -> None: """Register an async handler for an activity type + optional object type. Use ``activity_type="*"`` as a wildcard that fires for every activity (e.g. federation delivery handler). """ key = (activity_type, object_type or "*") _activity_handlers[key].append(fn) log.info("Registered activity handler %s.%s for key %s", fn.__module__, fn.__qualname__, key) def get_activity_handlers( activity_type: str, object_type: str | None = None, ) -> List[ActivityHandlerFn]: """Return all matching handlers for an activity. Matches in order: 1. Exact (activity_type, object_type) 2. (activity_type, "*") — type-level wildcard 3. ("*", "*") — global wildcard (e.g. delivery) """ handlers: List[ActivityHandlerFn] = [] ot = object_type or "*" # Exact match if ot != "*": handlers.extend(_activity_handlers.get((activity_type, ot), [])) # Type-level wildcard handlers.extend(_activity_handlers.get((activity_type, "*"), [])) # Global wildcard if activity_type != "*": handlers.extend(_activity_handlers.get(("*", "*"), [])) return handlers def _needs_federation_session() -> bool: """True when the federation DB differs from the app's default DB.""" from shared.db.session import DATABASE_URL, DATABASE_URL_FEDERATION return DATABASE_URL_FEDERATION != DATABASE_URL # --------------------------------------------------------------------------- # emit_activity — the primary way to emit events # --------------------------------------------------------------------------- async def emit_activity( session: AsyncSession, *, activity_type: str, actor_uri: str, object_type: str, object_data: dict | None = None, source_type: str | None = None, source_id: int | None = None, visibility: str = "internal", actor_profile_id: int | None = None, origin_app: str | None = None, ) -> APActivity: """ Write an AP-shaped activity to ap_activities with process_state='pending'. When all apps share one database the activity is written in the caller's transaction (atomic with the domain change). When the federation DB is separate, a dedicated federation session is used and committed independently. """ if not origin_app: try: from quart import current_app origin_app = current_app.name except (ImportError, RuntimeError): pass activity_uri = f"internal:{uuid.uuid4()}" if visibility == "internal" else f"urn:uuid:{uuid.uuid4()}" activity = APActivity( activity_id=activity_uri, activity_type=activity_type, actor_profile_id=actor_profile_id, actor_uri=actor_uri, object_type=object_type, object_data=object_data or {}, is_local=True, source_type=source_type, source_id=source_id, visibility=visibility, process_state="pending", origin_app=origin_app, ) if _needs_federation_session(): from shared.db.session import get_federation_session async with get_federation_session() as fed_s: async with fed_s.begin(): fed_s.add(activity) await fed_s.flush() await fed_s.execute(text("NOTIFY ap_activity_pending")) else: session.add(activity) await session.flush() await session.execute(text("NOTIFY ap_activity_pending")) return activity