All checks were successful
Build and Deploy / build-and-deploy (push) Successful in 3m20s
Redis: per-app DB index (0-5) with shared auth DB 15 for SSO keys; flushdb replaces flushall so deploys don't wipe cross-app auth state. Postgres: drop 13 cross-domain FK constraints (migration v2t0p8q9r0), remove dead ORM relationships, add explicit joins for 4 live ones. Multi-engine sessions (account + federation) ready for per-domain DBs via DATABASE_URL_ACCOUNT / DATABASE_URL_FEDERATION env vars. All URLs initially point to the same appdb — zero behaviour change until split-databases.sh is run to migrate data to per-domain DBs. Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
148 lines
4.9 KiB
Python
148 lines
4.9 KiB
Python
"""
|
|
Unified activity bus.
|
|
|
|
emit_activity() writes an APActivity row with process_state='pending' within
|
|
the caller's existing DB transaction — atomic with the domain change.
|
|
|
|
When the federation database is separate (DATABASE_URL_FEDERATION differs from
|
|
DATABASE_URL), emit_activity() opens its own federation session and commits
|
|
independently. Atomicity is traded for domain isolation; handlers are
|
|
idempotent, so at-least-once delivery is safe.
|
|
|
|
register_activity_handler() registers async handler functions that the
|
|
EventProcessor dispatches when processing pending activities.
|
|
"""
|
|
from __future__ import annotations
|
|
|
|
import logging
|
|
import uuid
|
|
from collections import defaultdict
|
|
from typing import Awaitable, Callable, Dict, List, Tuple
|
|
|
|
from sqlalchemy import text
|
|
from sqlalchemy.ext.asyncio import AsyncSession
|
|
|
|
from shared.models.federation import APActivity
|
|
|
|
log = logging.getLogger(__name__)
|
|
|
|
# ---------------------------------------------------------------------------
|
|
# Activity-handler registry
|
|
# ---------------------------------------------------------------------------
|
|
# Handler signature: async def handler(activity: APActivity, session: AsyncSession) -> None
|
|
ActivityHandlerFn = Callable[[APActivity, AsyncSession], Awaitable[None]]
|
|
|
|
# Keyed by (activity_type, object_type). object_type="*" is wildcard.
|
|
_activity_handlers: Dict[Tuple[str, str], List[ActivityHandlerFn]] = defaultdict(list)
|
|
|
|
|
|
def register_activity_handler(
|
|
activity_type: str,
|
|
fn: ActivityHandlerFn,
|
|
*,
|
|
object_type: str | None = None,
|
|
) -> None:
|
|
"""Register an async handler for an activity type + optional object type.
|
|
|
|
Use ``activity_type="*"`` as a wildcard that fires for every activity
|
|
(e.g. federation delivery handler).
|
|
"""
|
|
key = (activity_type, object_type or "*")
|
|
_activity_handlers[key].append(fn)
|
|
log.info("Registered activity handler %s.%s for key %s", fn.__module__, fn.__qualname__, key)
|
|
|
|
|
|
def get_activity_handlers(
|
|
activity_type: str,
|
|
object_type: str | None = None,
|
|
) -> List[ActivityHandlerFn]:
|
|
"""Return all matching handlers for an activity.
|
|
|
|
Matches in order:
|
|
1. Exact (activity_type, object_type)
|
|
2. (activity_type, "*") — type-level wildcard
|
|
3. ("*", "*") — global wildcard (e.g. delivery)
|
|
"""
|
|
handlers: List[ActivityHandlerFn] = []
|
|
ot = object_type or "*"
|
|
|
|
# Exact match
|
|
if ot != "*":
|
|
handlers.extend(_activity_handlers.get((activity_type, ot), []))
|
|
# Type-level wildcard
|
|
handlers.extend(_activity_handlers.get((activity_type, "*"), []))
|
|
# Global wildcard
|
|
if activity_type != "*":
|
|
handlers.extend(_activity_handlers.get(("*", "*"), []))
|
|
|
|
return handlers
|
|
|
|
|
|
def _needs_federation_session() -> bool:
|
|
"""True when the federation DB differs from the app's default DB."""
|
|
from shared.db.session import DATABASE_URL, DATABASE_URL_FEDERATION
|
|
return DATABASE_URL_FEDERATION != DATABASE_URL
|
|
|
|
|
|
# ---------------------------------------------------------------------------
|
|
# emit_activity — the primary way to emit events
|
|
# ---------------------------------------------------------------------------
|
|
async def emit_activity(
|
|
session: AsyncSession,
|
|
*,
|
|
activity_type: str,
|
|
actor_uri: str,
|
|
object_type: str,
|
|
object_data: dict | None = None,
|
|
source_type: str | None = None,
|
|
source_id: int | None = None,
|
|
visibility: str = "internal",
|
|
actor_profile_id: int | None = None,
|
|
origin_app: str | None = None,
|
|
) -> APActivity:
|
|
"""
|
|
Write an AP-shaped activity to ap_activities with process_state='pending'.
|
|
|
|
When all apps share one database the activity is written in the caller's
|
|
transaction (atomic with the domain change). When the federation DB is
|
|
separate, a dedicated federation session is used and committed
|
|
independently.
|
|
"""
|
|
if not origin_app:
|
|
try:
|
|
from quart import current_app
|
|
origin_app = current_app.name
|
|
except (ImportError, RuntimeError):
|
|
pass
|
|
|
|
activity_uri = f"internal:{uuid.uuid4()}" if visibility == "internal" else f"urn:uuid:{uuid.uuid4()}"
|
|
|
|
activity = APActivity(
|
|
activity_id=activity_uri,
|
|
activity_type=activity_type,
|
|
actor_profile_id=actor_profile_id,
|
|
actor_uri=actor_uri,
|
|
object_type=object_type,
|
|
object_data=object_data or {},
|
|
is_local=True,
|
|
source_type=source_type,
|
|
source_id=source_id,
|
|
visibility=visibility,
|
|
process_state="pending",
|
|
origin_app=origin_app,
|
|
)
|
|
|
|
if _needs_federation_session():
|
|
from shared.db.session import get_federation_session
|
|
async with get_federation_session() as fed_s:
|
|
async with fed_s.begin():
|
|
fed_s.add(activity)
|
|
await fed_s.flush()
|
|
await fed_s.execute(text("NOTIFY ap_activity_pending"))
|
|
else:
|
|
session.add(activity)
|
|
await session.flush()
|
|
await session.execute(text("NOTIFY ap_activity_pending"))
|
|
|
|
return activity
|