This repository has been archived on 2026-02-24. You can view files and clone it. You cannot open issues or pull requests or push a commit.
Files
shared/events/bus.py
giles 8951a62b90 Add NOTIFY/LISTEN wake-up to event processor
emit_activity() now fires NOTIFY ap_activity_pending inside the
caller's transaction (delivered on commit).  EventProcessor maintains
a dedicated asyncpg LISTEN connection and wakes the poll loop
immediately, dropping latency from ~2 s to sub-100 ms.  The fixed-
interval poll remains as a safety-net fallback.

Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
2026-02-23 15:35:27 +00:00

127 lines
4.1 KiB
Python

"""
Unified activity bus.
emit_activity() writes an APActivity row with process_state='pending' within
the caller's existing DB transaction — atomic with the domain change.
register_activity_handler() registers async handler functions that the
EventProcessor dispatches when processing pending activities.
"""
from __future__ import annotations
import logging
import uuid
from collections import defaultdict
from typing import Awaitable, Callable, Dict, List, Tuple
from sqlalchemy import text
from sqlalchemy.ext.asyncio import AsyncSession
from shared.models.federation import APActivity
log = logging.getLogger(__name__)
# ---------------------------------------------------------------------------
# Activity-handler registry
# ---------------------------------------------------------------------------
# Handler signature: async def handler(activity: APActivity, session: AsyncSession) -> None
ActivityHandlerFn = Callable[[APActivity, AsyncSession], Awaitable[None]]
# Keyed by (activity_type, object_type). object_type="*" is wildcard.
_activity_handlers: Dict[Tuple[str, str], List[ActivityHandlerFn]] = defaultdict(list)
def register_activity_handler(
activity_type: str,
fn: ActivityHandlerFn,
*,
object_type: str | None = None,
) -> None:
"""Register an async handler for an activity type + optional object type.
Use ``activity_type="*"`` as a wildcard that fires for every activity
(e.g. federation delivery handler).
"""
key = (activity_type, object_type or "*")
_activity_handlers[key].append(fn)
log.info("Registered activity handler %s.%s for key %s", fn.__module__, fn.__qualname__, key)
def get_activity_handlers(
activity_type: str,
object_type: str | None = None,
) -> List[ActivityHandlerFn]:
"""Return all matching handlers for an activity.
Matches in order:
1. Exact (activity_type, object_type)
2. (activity_type, "*") — type-level wildcard
3. ("*", "*") — global wildcard (e.g. delivery)
"""
handlers: List[ActivityHandlerFn] = []
ot = object_type or "*"
# Exact match
if ot != "*":
handlers.extend(_activity_handlers.get((activity_type, ot), []))
# Type-level wildcard
handlers.extend(_activity_handlers.get((activity_type, "*"), []))
# Global wildcard
if activity_type != "*":
handlers.extend(_activity_handlers.get(("*", "*"), []))
return handlers
# ---------------------------------------------------------------------------
# emit_activity — the primary way to emit events
# ---------------------------------------------------------------------------
async def emit_activity(
session: AsyncSession,
*,
activity_type: str,
actor_uri: str,
object_type: str,
object_data: dict | None = None,
source_type: str | None = None,
source_id: int | None = None,
visibility: str = "internal",
actor_profile_id: int | None = None,
origin_app: str | None = None,
) -> APActivity:
"""
Write an AP-shaped activity to ap_activities with process_state='pending'.
Called inside a service function using the same session that performs the
domain change. The activity and the change commit together.
"""
if not origin_app:
try:
from quart import current_app
origin_app = current_app.name
except (ImportError, RuntimeError):
pass
activity_uri = f"internal:{uuid.uuid4()}" if visibility == "internal" else f"urn:uuid:{uuid.uuid4()}"
activity = APActivity(
activity_id=activity_uri,
activity_type=activity_type,
actor_profile_id=actor_profile_id,
actor_uri=actor_uri,
object_type=object_type,
object_data=object_data or {},
is_local=True,
source_type=source_type,
source_id=source_id,
visibility=visibility,
process_state="pending",
origin_app=origin_app,
)
session.add(activity)
await session.flush()
# Wake any listening EventProcessor as soon as this transaction commits.
# NOTIFY is transactional — delivered only after commit.
await session.execute(text("NOTIFY ap_activity_pending"))
return activity