Files
rose-ash/shared/events/bus.py
giles f42042ccb7
All checks were successful
Build and Deploy / build-and-deploy (push) Successful in 1m5s
Monorepo: consolidate 7 repos into one
Combines shared, blog, market, cart, events, federation, and account
into a single repository. Eliminates submodule sync, sibling model
copying at build time, and per-app CI orchestration.

Changes:
- Remove per-app .git, .gitmodules, .gitea, submodule shared/ dirs
- Remove stale sibling model copies from each app
- Update all 6 Dockerfiles for monorepo build context (root = .)
- Add build directives to docker-compose.yml
- Add single .gitea/workflows/ci.yml with change detection
- Add .dockerignore for monorepo build context
- Create __init__.py for federation and account (cross-app imports)
2026-02-24 19:44:17 +00:00

127 lines
4.1 KiB
Python

"""
Unified activity bus.
emit_activity() writes an APActivity row with process_state='pending' within
the caller's existing DB transaction — atomic with the domain change.
register_activity_handler() registers async handler functions that the
EventProcessor dispatches when processing pending activities.
"""
from __future__ import annotations
import logging
import uuid
from collections import defaultdict
from typing import Awaitable, Callable, Dict, List, Tuple
from sqlalchemy import text
from sqlalchemy.ext.asyncio import AsyncSession
from shared.models.federation import APActivity
log = logging.getLogger(__name__)
# ---------------------------------------------------------------------------
# Activity-handler registry
# ---------------------------------------------------------------------------
# Handler signature: async def handler(activity: APActivity, session: AsyncSession) -> None
ActivityHandlerFn = Callable[[APActivity, AsyncSession], Awaitable[None]]
# Keyed by (activity_type, object_type). object_type="*" is wildcard.
_activity_handlers: Dict[Tuple[str, str], List[ActivityHandlerFn]] = defaultdict(list)
def register_activity_handler(
activity_type: str,
fn: ActivityHandlerFn,
*,
object_type: str | None = None,
) -> None:
"""Register an async handler for an activity type + optional object type.
Use ``activity_type="*"`` as a wildcard that fires for every activity
(e.g. federation delivery handler).
"""
key = (activity_type, object_type or "*")
_activity_handlers[key].append(fn)
log.info("Registered activity handler %s.%s for key %s", fn.__module__, fn.__qualname__, key)
def get_activity_handlers(
activity_type: str,
object_type: str | None = None,
) -> List[ActivityHandlerFn]:
"""Return all matching handlers for an activity.
Matches in order:
1. Exact (activity_type, object_type)
2. (activity_type, "*") — type-level wildcard
3. ("*", "*") — global wildcard (e.g. delivery)
"""
handlers: List[ActivityHandlerFn] = []
ot = object_type or "*"
# Exact match
if ot != "*":
handlers.extend(_activity_handlers.get((activity_type, ot), []))
# Type-level wildcard
handlers.extend(_activity_handlers.get((activity_type, "*"), []))
# Global wildcard
if activity_type != "*":
handlers.extend(_activity_handlers.get(("*", "*"), []))
return handlers
# ---------------------------------------------------------------------------
# emit_activity — the primary way to emit events
# ---------------------------------------------------------------------------
async def emit_activity(
session: AsyncSession,
*,
activity_type: str,
actor_uri: str,
object_type: str,
object_data: dict | None = None,
source_type: str | None = None,
source_id: int | None = None,
visibility: str = "internal",
actor_profile_id: int | None = None,
origin_app: str | None = None,
) -> APActivity:
"""
Write an AP-shaped activity to ap_activities with process_state='pending'.
Called inside a service function using the same session that performs the
domain change. The activity and the change commit together.
"""
if not origin_app:
try:
from quart import current_app
origin_app = current_app.name
except (ImportError, RuntimeError):
pass
activity_uri = f"internal:{uuid.uuid4()}" if visibility == "internal" else f"urn:uuid:{uuid.uuid4()}"
activity = APActivity(
activity_id=activity_uri,
activity_type=activity_type,
actor_profile_id=actor_profile_id,
actor_uri=actor_uri,
object_type=object_type,
object_data=object_data or {},
is_local=True,
source_type=source_type,
source_id=source_id,
visibility=visibility,
process_state="pending",
origin_app=origin_app,
)
session.add(activity)
await session.flush()
# Wake any listening EventProcessor as soon as this transaction commits.
# NOTIFY is transactional — delivered only after commit.
await session.execute(text("NOTIFY ap_activity_pending"))
return activity