Files
mono/shared/events/bus.py
giles 580f551700 Split databases and Redis — prepare infrastructure for per-domain isolation
Redis: per-app DB index (0-5) with shared auth DB 15 for SSO keys;
flushdb replaces flushall so deploys don't wipe cross-app auth state.

Postgres: drop 13 cross-domain FK constraints (migration v2t0p8q9r0),
remove dead ORM relationships, add explicit joins for 4 live ones.
Multi-engine sessions (account + federation) ready for per-domain DBs
via DATABASE_URL_ACCOUNT / DATABASE_URL_FEDERATION env vars.

All URLs initially point to the same appdb — zero behaviour change
until split-databases.sh is run to migrate data to per-domain DBs.

Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
2026-02-25 02:20:34 +00:00

148 lines
4.9 KiB
Python

"""
Unified activity bus.
emit_activity() writes an APActivity row with process_state='pending' within
the caller's existing DB transaction — atomic with the domain change.
When the federation database is separate (DATABASE_URL_FEDERATION differs from
DATABASE_URL), emit_activity() opens its own federation session and commits
independently. Atomicity is traded for domain isolation; handlers are
idempotent, so at-least-once delivery is safe.
register_activity_handler() registers async handler functions that the
EventProcessor dispatches when processing pending activities.
"""
from __future__ import annotations
import logging
import uuid
from collections import defaultdict
from typing import Awaitable, Callable, Dict, List, Tuple
from sqlalchemy import text
from sqlalchemy.ext.asyncio import AsyncSession
from shared.models.federation import APActivity
log = logging.getLogger(__name__)
# ---------------------------------------------------------------------------
# Activity-handler registry
# ---------------------------------------------------------------------------
# Handler signature: async def handler(activity: APActivity, session: AsyncSession) -> None
ActivityHandlerFn = Callable[[APActivity, AsyncSession], Awaitable[None]]
# Keyed by (activity_type, object_type). object_type="*" is wildcard.
_activity_handlers: Dict[Tuple[str, str], List[ActivityHandlerFn]] = defaultdict(list)
def register_activity_handler(
activity_type: str,
fn: ActivityHandlerFn,
*,
object_type: str | None = None,
) -> None:
"""Register an async handler for an activity type + optional object type.
Use ``activity_type="*"`` as a wildcard that fires for every activity
(e.g. federation delivery handler).
"""
key = (activity_type, object_type or "*")
_activity_handlers[key].append(fn)
log.info("Registered activity handler %s.%s for key %s", fn.__module__, fn.__qualname__, key)
def get_activity_handlers(
activity_type: str,
object_type: str | None = None,
) -> List[ActivityHandlerFn]:
"""Return all matching handlers for an activity.
Matches in order:
1. Exact (activity_type, object_type)
2. (activity_type, "*") — type-level wildcard
3. ("*", "*") — global wildcard (e.g. delivery)
"""
handlers: List[ActivityHandlerFn] = []
ot = object_type or "*"
# Exact match
if ot != "*":
handlers.extend(_activity_handlers.get((activity_type, ot), []))
# Type-level wildcard
handlers.extend(_activity_handlers.get((activity_type, "*"), []))
# Global wildcard
if activity_type != "*":
handlers.extend(_activity_handlers.get(("*", "*"), []))
return handlers
def _needs_federation_session() -> bool:
"""True when the federation DB differs from the app's default DB."""
from shared.db.session import DATABASE_URL, DATABASE_URL_FEDERATION
return DATABASE_URL_FEDERATION != DATABASE_URL
# ---------------------------------------------------------------------------
# emit_activity — the primary way to emit events
# ---------------------------------------------------------------------------
async def emit_activity(
session: AsyncSession,
*,
activity_type: str,
actor_uri: str,
object_type: str,
object_data: dict | None = None,
source_type: str | None = None,
source_id: int | None = None,
visibility: str = "internal",
actor_profile_id: int | None = None,
origin_app: str | None = None,
) -> APActivity:
"""
Write an AP-shaped activity to ap_activities with process_state='pending'.
When all apps share one database the activity is written in the caller's
transaction (atomic with the domain change). When the federation DB is
separate, a dedicated federation session is used and committed
independently.
"""
if not origin_app:
try:
from quart import current_app
origin_app = current_app.name
except (ImportError, RuntimeError):
pass
activity_uri = f"internal:{uuid.uuid4()}" if visibility == "internal" else f"urn:uuid:{uuid.uuid4()}"
activity = APActivity(
activity_id=activity_uri,
activity_type=activity_type,
actor_profile_id=actor_profile_id,
actor_uri=actor_uri,
object_type=object_type,
object_data=object_data or {},
is_local=True,
source_type=source_type,
source_id=source_id,
visibility=visibility,
process_state="pending",
origin_app=origin_app,
)
if _needs_federation_session():
from shared.db.session import get_federation_session
async with get_federation_session() as fed_s:
async with fed_s.begin():
fed_s.add(activity)
await fed_s.flush()
await fed_s.execute(text("NOTIFY ap_activity_pending"))
else:
session.add(activity)
await session.flush()
await session.execute(text("NOTIFY ap_activity_pending"))
return activity