emit_activity() now fires NOTIFY ap_activity_pending inside the caller's transaction (delivered on commit). EventProcessor maintains a dedicated asyncpg LISTEN connection and wakes the poll loop immediately, dropping latency from ~2 s to sub-100 ms. The fixed- interval poll remains as a safety-net fallback. Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
127 lines
4.1 KiB
Python
127 lines
4.1 KiB
Python
"""
|
|
Unified activity bus.
|
|
|
|
emit_activity() writes an APActivity row with process_state='pending' within
|
|
the caller's existing DB transaction — atomic with the domain change.
|
|
|
|
register_activity_handler() registers async handler functions that the
|
|
EventProcessor dispatches when processing pending activities.
|
|
"""
|
|
from __future__ import annotations
|
|
|
|
import logging
|
|
import uuid
|
|
from collections import defaultdict
|
|
from typing import Awaitable, Callable, Dict, List, Tuple
|
|
|
|
from sqlalchemy import text
|
|
from sqlalchemy.ext.asyncio import AsyncSession
|
|
|
|
from shared.models.federation import APActivity
|
|
|
|
log = logging.getLogger(__name__)
|
|
|
|
# ---------------------------------------------------------------------------
|
|
# Activity-handler registry
|
|
# ---------------------------------------------------------------------------
|
|
# Handler signature: async def handler(activity: APActivity, session: AsyncSession) -> None
|
|
ActivityHandlerFn = Callable[[APActivity, AsyncSession], Awaitable[None]]
|
|
|
|
# Keyed by (activity_type, object_type). object_type="*" is wildcard.
|
|
_activity_handlers: Dict[Tuple[str, str], List[ActivityHandlerFn]] = defaultdict(list)
|
|
|
|
|
|
def register_activity_handler(
|
|
activity_type: str,
|
|
fn: ActivityHandlerFn,
|
|
*,
|
|
object_type: str | None = None,
|
|
) -> None:
|
|
"""Register an async handler for an activity type + optional object type.
|
|
|
|
Use ``activity_type="*"`` as a wildcard that fires for every activity
|
|
(e.g. federation delivery handler).
|
|
"""
|
|
key = (activity_type, object_type or "*")
|
|
_activity_handlers[key].append(fn)
|
|
log.info("Registered activity handler %s.%s for key %s", fn.__module__, fn.__qualname__, key)
|
|
|
|
|
|
def get_activity_handlers(
|
|
activity_type: str,
|
|
object_type: str | None = None,
|
|
) -> List[ActivityHandlerFn]:
|
|
"""Return all matching handlers for an activity.
|
|
|
|
Matches in order:
|
|
1. Exact (activity_type, object_type)
|
|
2. (activity_type, "*") — type-level wildcard
|
|
3. ("*", "*") — global wildcard (e.g. delivery)
|
|
"""
|
|
handlers: List[ActivityHandlerFn] = []
|
|
ot = object_type or "*"
|
|
|
|
# Exact match
|
|
if ot != "*":
|
|
handlers.extend(_activity_handlers.get((activity_type, ot), []))
|
|
# Type-level wildcard
|
|
handlers.extend(_activity_handlers.get((activity_type, "*"), []))
|
|
# Global wildcard
|
|
if activity_type != "*":
|
|
handlers.extend(_activity_handlers.get(("*", "*"), []))
|
|
|
|
return handlers
|
|
|
|
|
|
# ---------------------------------------------------------------------------
|
|
# emit_activity — the primary way to emit events
|
|
# ---------------------------------------------------------------------------
|
|
async def emit_activity(
|
|
session: AsyncSession,
|
|
*,
|
|
activity_type: str,
|
|
actor_uri: str,
|
|
object_type: str,
|
|
object_data: dict | None = None,
|
|
source_type: str | None = None,
|
|
source_id: int | None = None,
|
|
visibility: str = "internal",
|
|
actor_profile_id: int | None = None,
|
|
origin_app: str | None = None,
|
|
) -> APActivity:
|
|
"""
|
|
Write an AP-shaped activity to ap_activities with process_state='pending'.
|
|
|
|
Called inside a service function using the same session that performs the
|
|
domain change. The activity and the change commit together.
|
|
"""
|
|
if not origin_app:
|
|
try:
|
|
from quart import current_app
|
|
origin_app = current_app.name
|
|
except (ImportError, RuntimeError):
|
|
pass
|
|
|
|
activity_uri = f"internal:{uuid.uuid4()}" if visibility == "internal" else f"urn:uuid:{uuid.uuid4()}"
|
|
|
|
activity = APActivity(
|
|
activity_id=activity_uri,
|
|
activity_type=activity_type,
|
|
actor_profile_id=actor_profile_id,
|
|
actor_uri=actor_uri,
|
|
object_type=object_type,
|
|
object_data=object_data or {},
|
|
is_local=True,
|
|
source_type=source_type,
|
|
source_id=source_id,
|
|
visibility=visibility,
|
|
process_state="pending",
|
|
origin_app=origin_app,
|
|
)
|
|
session.add(activity)
|
|
await session.flush()
|
|
# Wake any listening EventProcessor as soon as this transaction commits.
|
|
# NOTIFY is transactional — delivered only after commit.
|
|
await session.execute(text("NOTIFY ap_activity_pending"))
|
|
return activity
|