Unify domain_events + ap_activities into AP-shaped event bus
All cross-service events now flow through ap_activities with a unified EventProcessor. Internal events use visibility="internal"; federation activities use visibility="public" and get delivered by a wildcard handler. - Add processing columns to APActivity (process_state, actor_uri, etc.) - New emit_activity() / register_activity_handler() API - EventProcessor polls ap_activities instead of domain_events - Rewrite all handlers to accept APActivity - Migrate all 7 emit_event call sites to emit_activity - publish_activity() sets process_state=pending directly (no emit_event bridge) - Migration to drop domain_events table Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
This commit is contained in:
@@ -1,4 +1,9 @@
|
||||
from .bus import emit_event, register_handler
|
||||
from .bus import emit_activity, register_activity_handler, get_activity_handlers
|
||||
from .processor import EventProcessor
|
||||
|
||||
__all__ = ["emit_event", "register_handler", "EventProcessor"]
|
||||
__all__ = [
|
||||
"emit_activity",
|
||||
"register_activity_handler",
|
||||
"get_activity_handlers",
|
||||
"EventProcessor",
|
||||
]
|
||||
|
||||
127
events/bus.py
127
events/bus.py
@@ -1,56 +1,109 @@
|
||||
"""
|
||||
Transactional outbox event bus.
|
||||
Unified activity bus.
|
||||
|
||||
emit_event() writes to the domain_events table within the caller's existing
|
||||
DB transaction — atomic with whatever domain change triggered the event.
|
||||
emit_activity() writes an APActivity row with process_state='pending' within
|
||||
the caller's existing DB transaction — atomic with the domain change.
|
||||
|
||||
register_handler() registers async handler functions that the EventProcessor
|
||||
will call when processing events of a given type.
|
||||
register_activity_handler() registers async handler functions that the
|
||||
EventProcessor dispatches when processing pending activities.
|
||||
"""
|
||||
from __future__ import annotations
|
||||
|
||||
import uuid
|
||||
from collections import defaultdict
|
||||
from typing import Any, Awaitable, Callable, Dict, List
|
||||
from typing import Awaitable, Callable, Dict, List, Tuple
|
||||
|
||||
from sqlalchemy.ext.asyncio import AsyncSession
|
||||
|
||||
from shared.models.domain_event import DomainEvent
|
||||
from shared.models.federation import APActivity
|
||||
|
||||
# handler signature: async def handler(event: DomainEvent, session: AsyncSession) -> None
|
||||
HandlerFn = Callable[[DomainEvent, AsyncSession], Awaitable[None]]
|
||||
# ---------------------------------------------------------------------------
|
||||
# Activity-handler registry
|
||||
# ---------------------------------------------------------------------------
|
||||
# Handler signature: async def handler(activity: APActivity, session: AsyncSession) -> None
|
||||
ActivityHandlerFn = Callable[[APActivity, AsyncSession], Awaitable[None]]
|
||||
|
||||
_handlers: Dict[str, List[HandlerFn]] = defaultdict(list)
|
||||
# Keyed by (activity_type, object_type). object_type="*" is wildcard.
|
||||
_activity_handlers: Dict[Tuple[str, str], List[ActivityHandlerFn]] = defaultdict(list)
|
||||
|
||||
|
||||
async def emit_event(
|
||||
def register_activity_handler(
|
||||
activity_type: str,
|
||||
fn: ActivityHandlerFn,
|
||||
*,
|
||||
object_type: str | None = None,
|
||||
) -> None:
|
||||
"""Register an async handler for an activity type + optional object type.
|
||||
|
||||
Use ``activity_type="*"`` as a wildcard that fires for every activity
|
||||
(e.g. federation delivery handler).
|
||||
"""
|
||||
key = (activity_type, object_type or "*")
|
||||
_activity_handlers[key].append(fn)
|
||||
|
||||
|
||||
def get_activity_handlers(
|
||||
activity_type: str,
|
||||
object_type: str | None = None,
|
||||
) -> List[ActivityHandlerFn]:
|
||||
"""Return all matching handlers for an activity.
|
||||
|
||||
Matches in order:
|
||||
1. Exact (activity_type, object_type)
|
||||
2. (activity_type, "*") — type-level wildcard
|
||||
3. ("*", "*") — global wildcard (e.g. delivery)
|
||||
"""
|
||||
handlers: List[ActivityHandlerFn] = []
|
||||
ot = object_type or "*"
|
||||
|
||||
# Exact match
|
||||
if ot != "*":
|
||||
handlers.extend(_activity_handlers.get((activity_type, ot), []))
|
||||
# Type-level wildcard
|
||||
handlers.extend(_activity_handlers.get((activity_type, "*"), []))
|
||||
# Global wildcard
|
||||
if activity_type != "*":
|
||||
handlers.extend(_activity_handlers.get(("*", "*"), []))
|
||||
|
||||
return handlers
|
||||
|
||||
|
||||
# ---------------------------------------------------------------------------
|
||||
# emit_activity — the primary way to emit events
|
||||
# ---------------------------------------------------------------------------
|
||||
async def emit_activity(
|
||||
session: AsyncSession,
|
||||
event_type: str,
|
||||
aggregate_type: str,
|
||||
aggregate_id: int,
|
||||
payload: Dict[str, Any] | None = None,
|
||||
) -> DomainEvent:
|
||||
*,
|
||||
activity_type: str,
|
||||
actor_uri: str,
|
||||
object_type: str,
|
||||
object_data: dict | None = None,
|
||||
source_type: str | None = None,
|
||||
source_id: int | None = None,
|
||||
visibility: str = "internal",
|
||||
actor_profile_id: int | None = None,
|
||||
) -> APActivity:
|
||||
"""
|
||||
Write a domain event to the outbox table in the current transaction.
|
||||
Write an AP-shaped activity to ap_activities with process_state='pending'.
|
||||
|
||||
Call this inside your service function, using the same session that
|
||||
performs the domain change. The event and the change commit together.
|
||||
Called inside a service function using the same session that performs the
|
||||
domain change. The activity and the change commit together.
|
||||
"""
|
||||
event = DomainEvent(
|
||||
event_type=event_type,
|
||||
aggregate_type=aggregate_type,
|
||||
aggregate_id=aggregate_id,
|
||||
payload=payload or {},
|
||||
activity_uri = f"internal:{uuid.uuid4()}" if visibility == "internal" else f"urn:uuid:{uuid.uuid4()}"
|
||||
|
||||
activity = APActivity(
|
||||
activity_id=activity_uri,
|
||||
activity_type=activity_type,
|
||||
actor_profile_id=actor_profile_id,
|
||||
actor_uri=actor_uri,
|
||||
object_type=object_type,
|
||||
object_data=object_data or {},
|
||||
is_local=True,
|
||||
source_type=source_type,
|
||||
source_id=source_id,
|
||||
visibility=visibility,
|
||||
process_state="pending",
|
||||
)
|
||||
session.add(event)
|
||||
await session.flush() # assign event.id
|
||||
return event
|
||||
|
||||
|
||||
def register_handler(event_type: str, fn: HandlerFn) -> None:
|
||||
"""Register an async handler for a given event type."""
|
||||
_handlers[event_type].append(fn)
|
||||
|
||||
|
||||
def get_handlers(event_type: str) -> List[HandlerFn]:
|
||||
"""Return all registered handlers for an event type."""
|
||||
return _handlers.get(event_type, [])
|
||||
session.add(activity)
|
||||
await session.flush()
|
||||
return activity
|
||||
|
||||
@@ -1,7 +1,7 @@
|
||||
"""Deliver AP activities to remote followers.
|
||||
|
||||
On ``federation.activity_created`` → load activity + actor + followers →
|
||||
sign with HTTP Signatures → POST to each follower inbox.
|
||||
Registered as a wildcard handler — fires for every activity. Skips
|
||||
non-public activities and those without an actor profile.
|
||||
"""
|
||||
from __future__ import annotations
|
||||
|
||||
@@ -11,7 +11,7 @@ import httpx
|
||||
from sqlalchemy import select
|
||||
from sqlalchemy.ext.asyncio import AsyncSession
|
||||
|
||||
from shared.events.bus import register_handler, DomainEvent
|
||||
from shared.events.bus import register_activity_handler
|
||||
from shared.models.federation import ActorProfile, APActivity, APFollower
|
||||
from shared.services.registry import services
|
||||
|
||||
@@ -33,12 +33,9 @@ def _build_activity_json(activity: APActivity, actor: ActorProfile, domain: str)
|
||||
object_id = activity.activity_id + "/object"
|
||||
|
||||
if activity.activity_type == "Delete":
|
||||
# Delete: object is a Tombstone with just id + type
|
||||
obj.setdefault("id", object_id)
|
||||
obj.setdefault("type", "Tombstone")
|
||||
else:
|
||||
# Create/Update: full object with attribution
|
||||
# Prefer stable id from object_data (set by try_publish), fall back to activity-derived
|
||||
obj.setdefault("id", object_id)
|
||||
obj.setdefault("type", activity.object_type)
|
||||
obj.setdefault("attributedTo", actor_url)
|
||||
@@ -105,30 +102,20 @@ async def _deliver_to_inbox(
|
||||
return False
|
||||
|
||||
|
||||
async def on_activity_created(event: DomainEvent, session: AsyncSession) -> None:
|
||||
"""Deliver a newly created activity to all followers."""
|
||||
async def on_any_activity(activity: APActivity, session: AsyncSession) -> None:
|
||||
"""Deliver a public activity to all followers of its actor."""
|
||||
import os
|
||||
|
||||
# Only deliver public activities that have an actor profile
|
||||
if activity.visibility != "public":
|
||||
return
|
||||
if activity.actor_profile_id is None:
|
||||
return
|
||||
if not services.has("federation"):
|
||||
return
|
||||
|
||||
payload = event.payload
|
||||
activity_id_uri = payload.get("activity_id")
|
||||
if not activity_id_uri:
|
||||
return
|
||||
|
||||
domain = os.getenv("AP_DOMAIN", "rose-ash.com")
|
||||
|
||||
# Load the activity
|
||||
activity = (
|
||||
await session.execute(
|
||||
select(APActivity).where(APActivity.activity_id == activity_id_uri)
|
||||
)
|
||||
).scalar_one_or_none()
|
||||
if not activity:
|
||||
log.warning("Activity not found: %s", activity_id_uri)
|
||||
return
|
||||
|
||||
# Load actor with private key
|
||||
actor = (
|
||||
await session.execute(
|
||||
@@ -136,7 +123,7 @@ async def on_activity_created(event: DomainEvent, session: AsyncSession) -> None
|
||||
)
|
||||
).scalar_one_or_none()
|
||||
if not actor or not actor.private_key_pem:
|
||||
log.warning("Actor not found or missing key for activity %s", activity_id_uri)
|
||||
log.warning("Actor not found or missing key for activity %s", activity.activity_id)
|
||||
return
|
||||
|
||||
# Load followers
|
||||
@@ -147,14 +134,13 @@ async def on_activity_created(event: DomainEvent, session: AsyncSession) -> None
|
||||
).scalars().all()
|
||||
|
||||
if not followers:
|
||||
log.debug("No followers to deliver to for %s", activity_id_uri)
|
||||
log.debug("No followers to deliver to for %s", activity.activity_id)
|
||||
return
|
||||
|
||||
# Build activity JSON
|
||||
activity_json = _build_activity_json(activity, actor, domain)
|
||||
|
||||
# Deliver to each follower inbox
|
||||
# Deduplicate inboxes (multiple followers might share a shared inbox)
|
||||
# Deduplicate inboxes
|
||||
inboxes = {f.follower_inbox for f in followers if f.follower_inbox}
|
||||
|
||||
log.info(
|
||||
@@ -167,4 +153,5 @@ async def on_activity_created(event: DomainEvent, session: AsyncSession) -> None
|
||||
await _deliver_to_inbox(client, inbox_url, activity_json, actor, domain)
|
||||
|
||||
|
||||
register_handler("federation.activity_created", on_activity_created)
|
||||
# Wildcard: fires for every activity
|
||||
register_activity_handler("*", on_any_activity)
|
||||
|
||||
@@ -2,18 +2,18 @@ from __future__ import annotations
|
||||
|
||||
from sqlalchemy.ext.asyncio import AsyncSession
|
||||
|
||||
from shared.events import register_handler
|
||||
from shared.models.domain_event import DomainEvent
|
||||
from shared.events import register_activity_handler
|
||||
from shared.models.federation import APActivity
|
||||
from shared.services.navigation import rebuild_navigation
|
||||
|
||||
|
||||
async def on_child_attached(event: DomainEvent, session: AsyncSession) -> None:
|
||||
async def on_child_attached(activity: APActivity, session: AsyncSession) -> None:
|
||||
await rebuild_navigation(session)
|
||||
|
||||
|
||||
async def on_child_detached(event: DomainEvent, session: AsyncSession) -> None:
|
||||
async def on_child_detached(activity: APActivity, session: AsyncSession) -> None:
|
||||
await rebuild_navigation(session)
|
||||
|
||||
|
||||
register_handler("container.child_attached", on_child_attached)
|
||||
register_handler("container.child_detached", on_child_detached)
|
||||
register_activity_handler("Add", on_child_attached, object_type="rose:ContainerRelation")
|
||||
register_activity_handler("Remove", on_child_detached, object_type="rose:ContainerRelation")
|
||||
|
||||
@@ -2,24 +2,22 @@ from __future__ import annotations
|
||||
|
||||
from sqlalchemy.ext.asyncio import AsyncSession
|
||||
|
||||
from shared.events import register_handler
|
||||
from shared.models.domain_event import DomainEvent
|
||||
from shared.events import register_activity_handler
|
||||
from shared.models.federation import APActivity
|
||||
from shared.services.registry import services
|
||||
|
||||
|
||||
async def on_user_logged_in(event: DomainEvent, session: AsyncSession) -> None:
|
||||
payload = event.payload
|
||||
user_id = payload["user_id"]
|
||||
session_id = payload["session_id"]
|
||||
async def on_user_logged_in(activity: APActivity, session: AsyncSession) -> None:
|
||||
data = activity.object_data
|
||||
user_id = data["user_id"]
|
||||
session_id = data["session_id"]
|
||||
|
||||
# Adopt cart items (if cart service is registered)
|
||||
if services.has("cart"):
|
||||
await services.cart.adopt_cart_for_user(session, user_id, session_id)
|
||||
|
||||
# Adopt calendar entries and tickets (if calendar service is registered)
|
||||
if services.has("calendar"):
|
||||
await services.calendar.adopt_entries_for_user(session, user_id, session_id)
|
||||
await services.calendar.adopt_tickets_for_user(session, user_id, session_id)
|
||||
|
||||
|
||||
register_handler("user.logged_in", on_user_logged_in)
|
||||
register_activity_handler("rose:Login", on_user_logged_in)
|
||||
|
||||
@@ -4,19 +4,19 @@ import logging
|
||||
|
||||
from sqlalchemy.ext.asyncio import AsyncSession
|
||||
|
||||
from shared.events import register_handler
|
||||
from shared.models.domain_event import DomainEvent
|
||||
from shared.events import register_activity_handler
|
||||
from shared.models.federation import APActivity
|
||||
|
||||
log = logging.getLogger(__name__)
|
||||
|
||||
|
||||
async def on_order_created(event: DomainEvent, session: AsyncSession) -> None:
|
||||
log.info("order.created: order_id=%s", event.payload.get("order_id"))
|
||||
async def on_order_created(activity: APActivity, session: AsyncSession) -> None:
|
||||
log.info("order.created: order_id=%s", activity.object_data.get("order_id"))
|
||||
|
||||
|
||||
async def on_order_paid(event: DomainEvent, session: AsyncSession) -> None:
|
||||
log.info("order.paid: order_id=%s", event.payload.get("order_id"))
|
||||
async def on_order_paid(activity: APActivity, session: AsyncSession) -> None:
|
||||
log.info("order.paid: order_id=%s", activity.object_data.get("order_id"))
|
||||
|
||||
|
||||
register_handler("order.created", on_order_created)
|
||||
register_handler("order.paid", on_order_paid)
|
||||
register_activity_handler("Create", on_order_created, object_type="rose:Order")
|
||||
register_activity_handler("rose:OrderPaid", on_order_paid)
|
||||
|
||||
@@ -1,6 +1,6 @@
|
||||
"""
|
||||
Event processor — polls the domain_events outbox table and dispatches
|
||||
to registered handlers.
|
||||
Event processor — polls the ap_activities table and dispatches to registered
|
||||
activity handlers.
|
||||
|
||||
Runs as an asyncio background task within each app process.
|
||||
Uses SELECT ... FOR UPDATE SKIP LOCKED for safe concurrent processing.
|
||||
@@ -11,16 +11,16 @@ import asyncio
|
||||
import traceback
|
||||
from datetime import datetime, timezone
|
||||
|
||||
from sqlalchemy import select, update
|
||||
from sqlalchemy import select
|
||||
from sqlalchemy.ext.asyncio import AsyncSession
|
||||
|
||||
from shared.db.session import get_session
|
||||
from shared.models.domain_event import DomainEvent
|
||||
from .bus import get_handlers
|
||||
from shared.models.federation import APActivity
|
||||
from .bus import get_activity_handlers
|
||||
|
||||
|
||||
class EventProcessor:
|
||||
"""Background event processor that polls the outbox table."""
|
||||
"""Background event processor that polls the ap_activities table."""
|
||||
|
||||
def __init__(
|
||||
self,
|
||||
@@ -64,54 +64,52 @@ class EventProcessor:
|
||||
await asyncio.sleep(self._poll_interval)
|
||||
|
||||
async def _process_batch(self) -> int:
|
||||
"""Fetch and process a batch of pending events. Returns count processed."""
|
||||
"""Fetch and process a batch of pending activities. Returns count processed."""
|
||||
processed = 0
|
||||
async with get_session() as session:
|
||||
# FOR UPDATE SKIP LOCKED: safe for concurrent processors
|
||||
stmt = (
|
||||
select(DomainEvent)
|
||||
select(APActivity)
|
||||
.where(
|
||||
DomainEvent.state == "pending",
|
||||
DomainEvent.attempts < DomainEvent.max_attempts,
|
||||
APActivity.process_state == "pending",
|
||||
APActivity.process_attempts < APActivity.process_max_attempts,
|
||||
)
|
||||
.order_by(DomainEvent.created_at)
|
||||
.order_by(APActivity.created_at)
|
||||
.limit(self._batch_size)
|
||||
.with_for_update(skip_locked=True)
|
||||
)
|
||||
result = await session.execute(stmt)
|
||||
events = result.scalars().all()
|
||||
activities = result.scalars().all()
|
||||
|
||||
for event in events:
|
||||
await self._process_one(session, event)
|
||||
for activity in activities:
|
||||
await self._process_one(session, activity)
|
||||
processed += 1
|
||||
|
||||
await session.commit()
|
||||
return processed
|
||||
|
||||
async def _process_one(self, session: AsyncSession, event: DomainEvent) -> None:
|
||||
"""Run all handlers for a single event."""
|
||||
handlers = get_handlers(event.event_type)
|
||||
async def _process_one(self, session: AsyncSession, activity: APActivity) -> None:
|
||||
"""Run all handlers for a single activity."""
|
||||
handlers = get_activity_handlers(activity.activity_type, activity.object_type)
|
||||
now = datetime.now(timezone.utc)
|
||||
|
||||
event.state = "processing"
|
||||
event.attempts += 1
|
||||
activity.process_state = "processing"
|
||||
activity.process_attempts += 1
|
||||
await session.flush()
|
||||
|
||||
if not handlers:
|
||||
# No handlers registered — mark completed (nothing to do)
|
||||
event.state = "completed"
|
||||
event.processed_at = now
|
||||
activity.process_state = "completed"
|
||||
activity.processed_at = now
|
||||
return
|
||||
|
||||
try:
|
||||
for handler in handlers:
|
||||
await handler(event, session)
|
||||
event.state = "completed"
|
||||
event.processed_at = now
|
||||
await handler(activity, session)
|
||||
activity.process_state = "completed"
|
||||
activity.processed_at = now
|
||||
except Exception as exc:
|
||||
event.last_error = f"{exc.__class__.__name__}: {exc}"
|
||||
if event.attempts >= event.max_attempts:
|
||||
event.state = "failed"
|
||||
event.processed_at = now
|
||||
activity.process_error = f"{exc.__class__.__name__}: {exc}"
|
||||
if activity.process_attempts >= activity.process_max_attempts:
|
||||
activity.process_state = "failed"
|
||||
activity.processed_at = now
|
||||
else:
|
||||
event.state = "pending" # retry
|
||||
activity.process_state = "pending" # retry
|
||||
|
||||
Reference in New Issue
Block a user