From 2e9db119252aae9db2374b948c687467081a0b85 Mon Sep 17 00:00:00 2001 From: giles Date: Sun, 22 Feb 2026 16:19:29 +0000 Subject: [PATCH] Unify domain_events + ap_activities into AP-shaped event bus All cross-service events now flow through ap_activities with a unified EventProcessor. Internal events use visibility="internal"; federation activities use visibility="public" and get delivered by a wildcard handler. - Add processing columns to APActivity (process_state, actor_uri, etc.) - New emit_activity() / register_activity_handler() API - EventProcessor polls ap_activities instead of domain_events - Rewrite all handlers to accept APActivity - Migrate all 7 emit_event call sites to emit_activity - publish_activity() sets process_state=pending directly (no emit_event bridge) - Migration to drop domain_events table Co-Authored-By: Claude Opus 4.6 --- README.md | 12 +- .../m3k1h7i9j0_add_activity_bus_columns.py | 113 ++++++++++++++++ .../n4l2i8j0k1_drop_domain_events_table.py | 46 +++++++ events/__init__.py | 9 +- events/bus.py | 127 +++++++++++++----- events/handlers/ap_delivery_handler.py | 43 +++--- events/handlers/container_handlers.py | 12 +- events/handlers/login_handlers.py | 16 +-- events/handlers/order_handlers.py | 16 +-- events/processor.py | 60 ++++----- models/__init__.py | 2 - models/federation.py | 33 ++++- services/federation_impl.py | 22 +-- services/federation_publish.py | 8 +- services/relationships.py | 38 +++--- 15 files changed, 389 insertions(+), 168 deletions(-) create mode 100644 alembic/versions/m3k1h7i9j0_add_activity_bus_columns.py create mode 100644 alembic/versions/n4l2i8j0k1_drop_domain_events_table.py diff --git a/README.md b/README.md index 4991d87..527075d 100644 --- a/README.md +++ b/README.md @@ -12,7 +12,7 @@ shared/ models/ # Canonical domain models user.py # User magic_link.py # MagicLink (auth tokens) - domain_event.py # DomainEvent (transactional outbox) + domain_event.py # DomainEvent (legacy — being removed) kv.py # KeyValue (key-value store) menu_item.py # MenuItem (deprecated — use MenuNode) menu_node.py # MenuNode (navigation tree) @@ -48,13 +48,13 @@ shared/ user_loader.py # Load current user from session http_utils.py # HTTP utility functions events/ - bus.py # emit_event(), register_handler() - processor.py # EventProcessor (polls domain_events, runs handlers) - handlers/ # Shared event handlers + bus.py # emit_activity(), register_activity_handler() + processor.py # EventProcessor (polls ap_activities, runs handlers) + handlers/ # Shared activity handlers container_handlers.py # Navigation rebuild on attach/detach login_handlers.py # Cart/entry adoption on login order_handlers.py # Order lifecycle events - ap_delivery_handler.py # AP activity delivery to follower inboxes + ap_delivery_handler.py # AP activity delivery to follower inboxes (wildcard) utils/ __init__.py calendar_helpers.py # Calendar period/entry utilities @@ -78,7 +78,7 @@ shared/ - **App factory:** All apps call `create_base_app()` which sets up DB sessions, CSRF, error handling, event processing, logging, widget registration, and domain service wiring. - **Service contracts:** Cross-domain communication via typed Protocols + frozen DTO dataclasses. Apps call `services.calendar.method()`, never import models from other domains. - **Service registry:** Typed singleton (`services.blog`, `.calendar`, `.market`, `.cart`, `.federation`). Apps wire their own domain + stubs for others via `register_domain_services()`. -- **Event bus:** `emit_event()` writes to `domain_events` table in the caller's transaction. `EventProcessor` polls and dispatches to registered handlers. +- **Activity bus:** `emit_activity()` writes to `ap_activities` table in the caller's transaction. `EventProcessor` polls pending activities and dispatches to registered handlers. Internal events use `visibility="internal"`; federation activities use `visibility="public"` and are delivered to follower inboxes by the wildcard delivery handler. - **Widget registry:** Domain services register widgets (nav, card, account); templates consume via `widgets.container_nav`, `widgets.container_cards`. - **Cart identity:** `current_cart_identity()` returns `{"user_id": int|None, "session_id": str|None}` from the request session. diff --git a/alembic/versions/m3k1h7i9j0_add_activity_bus_columns.py b/alembic/versions/m3k1h7i9j0_add_activity_bus_columns.py new file mode 100644 index 0000000..b61aa5e --- /dev/null +++ b/alembic/versions/m3k1h7i9j0_add_activity_bus_columns.py @@ -0,0 +1,113 @@ +"""add unified event bus columns to ap_activities + +Revision ID: m3k1h7i9j0 +Revises: l2j0g6h8i9 +Create Date: 2026-02-22 + +Adds processing and visibility columns so ap_activities can serve as the +unified event bus for both internal domain events and federation delivery. +""" + +revision = "m3k1h7i9j0" +down_revision = "l2j0g6h8i9" +branch_labels = None +depends_on = None + +from alembic import op +import sqlalchemy as sa + + +def upgrade() -> None: + # Add new columns with defaults so existing rows stay valid + op.add_column( + "ap_activities", + sa.Column("actor_uri", sa.String(512), nullable=True), + ) + op.add_column( + "ap_activities", + sa.Column( + "visibility", sa.String(20), + nullable=False, server_default="public", + ), + ) + op.add_column( + "ap_activities", + sa.Column( + "process_state", sa.String(20), + nullable=False, server_default="completed", + ), + ) + op.add_column( + "ap_activities", + sa.Column( + "process_attempts", sa.Integer(), + nullable=False, server_default="0", + ), + ) + op.add_column( + "ap_activities", + sa.Column( + "process_max_attempts", sa.Integer(), + nullable=False, server_default="5", + ), + ) + op.add_column( + "ap_activities", + sa.Column("process_error", sa.Text(), nullable=True), + ) + op.add_column( + "ap_activities", + sa.Column( + "processed_at", sa.DateTime(timezone=True), nullable=True, + ), + ) + + # Backfill actor_uri from the related actor_profile + op.execute( + """ + UPDATE ap_activities a + SET actor_uri = CONCAT( + 'https://', + COALESCE(current_setting('app.ap_domain', true), 'rose-ash.com'), + '/users/', + p.preferred_username + ) + FROM ap_actor_profiles p + WHERE a.actor_profile_id = p.id + AND a.actor_uri IS NULL + """ + ) + + # Make actor_profile_id nullable (internal events have no actor profile) + op.alter_column( + "ap_activities", "actor_profile_id", + existing_type=sa.Integer(), + nullable=True, + ) + + # Index for processor polling + op.create_index( + "ix_ap_activity_process", "ap_activities", ["process_state"], + ) + + +def downgrade() -> None: + op.drop_index("ix_ap_activity_process", table_name="ap_activities") + + # Restore actor_profile_id NOT NULL (remove any rows without it first) + op.execute( + "DELETE FROM ap_activities WHERE actor_profile_id IS NULL" + ) + op.alter_column( + "ap_activities", "actor_profile_id", + existing_type=sa.Integer(), + nullable=False, + ) + + op.drop_column("ap_activities", "processed_at") + op.drop_column("ap_activities", "process_error") + op.drop_column("ap_activities", "process_max_attempts") + op.drop_column("ap_activities", "process_attempts") + op.drop_column("ap_activities", "process_state") + op.drop_column("ap_activities", "visibility") + op.drop_column("ap_activities", "actor_uri") diff --git a/alembic/versions/n4l2i8j0k1_drop_domain_events_table.py b/alembic/versions/n4l2i8j0k1_drop_domain_events_table.py new file mode 100644 index 0000000..3d11dab --- /dev/null +++ b/alembic/versions/n4l2i8j0k1_drop_domain_events_table.py @@ -0,0 +1,46 @@ +"""drop domain_events table + +Revision ID: n4l2i8j0k1 +Revises: m3k1h7i9j0 +Create Date: 2026-02-22 + +The domain_events table is no longer used — all events now flow through +ap_activities with the unified activity bus. +""" + +revision = "n4l2i8j0k1" +down_revision = "m3k1h7i9j0" +branch_labels = None +depends_on = None + +from alembic import op +import sqlalchemy as sa +from sqlalchemy.dialects.postgresql import JSONB + + +def upgrade() -> None: + op.drop_index("ix_domain_events_state", table_name="domain_events") + op.drop_index("ix_domain_events_event_type", table_name="domain_events") + op.drop_table("domain_events") + + +def downgrade() -> None: + op.create_table( + "domain_events", + sa.Column("id", sa.Integer(), primary_key=True, autoincrement=True), + sa.Column("event_type", sa.String(128), nullable=False), + sa.Column("aggregate_type", sa.String(64), nullable=False), + sa.Column("aggregate_id", sa.Integer(), nullable=False), + sa.Column("payload", JSONB(), nullable=True), + sa.Column("state", sa.String(20), nullable=False, server_default="pending"), + sa.Column("attempts", sa.Integer(), nullable=False, server_default="0"), + sa.Column("max_attempts", sa.Integer(), nullable=False, server_default="5"), + sa.Column("last_error", sa.Text(), nullable=True), + sa.Column( + "created_at", sa.DateTime(timezone=True), + nullable=False, server_default=sa.func.now(), + ), + sa.Column("processed_at", sa.DateTime(timezone=True), nullable=True), + ) + op.create_index("ix_domain_events_event_type", "domain_events", ["event_type"]) + op.create_index("ix_domain_events_state", "domain_events", ["state"]) diff --git a/events/__init__.py b/events/__init__.py index c9fd143..522cb5f 100644 --- a/events/__init__.py +++ b/events/__init__.py @@ -1,4 +1,9 @@ -from .bus import emit_event, register_handler +from .bus import emit_activity, register_activity_handler, get_activity_handlers from .processor import EventProcessor -__all__ = ["emit_event", "register_handler", "EventProcessor"] +__all__ = [ + "emit_activity", + "register_activity_handler", + "get_activity_handlers", + "EventProcessor", +] diff --git a/events/bus.py b/events/bus.py index 1a6e201..e1e90c0 100644 --- a/events/bus.py +++ b/events/bus.py @@ -1,56 +1,109 @@ """ -Transactional outbox event bus. +Unified activity bus. -emit_event() writes to the domain_events table within the caller's existing -DB transaction — atomic with whatever domain change triggered the event. +emit_activity() writes an APActivity row with process_state='pending' within +the caller's existing DB transaction — atomic with the domain change. -register_handler() registers async handler functions that the EventProcessor -will call when processing events of a given type. +register_activity_handler() registers async handler functions that the +EventProcessor dispatches when processing pending activities. """ from __future__ import annotations +import uuid from collections import defaultdict -from typing import Any, Awaitable, Callable, Dict, List +from typing import Awaitable, Callable, Dict, List, Tuple from sqlalchemy.ext.asyncio import AsyncSession -from shared.models.domain_event import DomainEvent +from shared.models.federation import APActivity -# handler signature: async def handler(event: DomainEvent, session: AsyncSession) -> None -HandlerFn = Callable[[DomainEvent, AsyncSession], Awaitable[None]] +# --------------------------------------------------------------------------- +# Activity-handler registry +# --------------------------------------------------------------------------- +# Handler signature: async def handler(activity: APActivity, session: AsyncSession) -> None +ActivityHandlerFn = Callable[[APActivity, AsyncSession], Awaitable[None]] -_handlers: Dict[str, List[HandlerFn]] = defaultdict(list) +# Keyed by (activity_type, object_type). object_type="*" is wildcard. +_activity_handlers: Dict[Tuple[str, str], List[ActivityHandlerFn]] = defaultdict(list) -async def emit_event( +def register_activity_handler( + activity_type: str, + fn: ActivityHandlerFn, + *, + object_type: str | None = None, +) -> None: + """Register an async handler for an activity type + optional object type. + + Use ``activity_type="*"`` as a wildcard that fires for every activity + (e.g. federation delivery handler). + """ + key = (activity_type, object_type or "*") + _activity_handlers[key].append(fn) + + +def get_activity_handlers( + activity_type: str, + object_type: str | None = None, +) -> List[ActivityHandlerFn]: + """Return all matching handlers for an activity. + + Matches in order: + 1. Exact (activity_type, object_type) + 2. (activity_type, "*") — type-level wildcard + 3. ("*", "*") — global wildcard (e.g. delivery) + """ + handlers: List[ActivityHandlerFn] = [] + ot = object_type or "*" + + # Exact match + if ot != "*": + handlers.extend(_activity_handlers.get((activity_type, ot), [])) + # Type-level wildcard + handlers.extend(_activity_handlers.get((activity_type, "*"), [])) + # Global wildcard + if activity_type != "*": + handlers.extend(_activity_handlers.get(("*", "*"), [])) + + return handlers + + +# --------------------------------------------------------------------------- +# emit_activity — the primary way to emit events +# --------------------------------------------------------------------------- +async def emit_activity( session: AsyncSession, - event_type: str, - aggregate_type: str, - aggregate_id: int, - payload: Dict[str, Any] | None = None, -) -> DomainEvent: + *, + activity_type: str, + actor_uri: str, + object_type: str, + object_data: dict | None = None, + source_type: str | None = None, + source_id: int | None = None, + visibility: str = "internal", + actor_profile_id: int | None = None, +) -> APActivity: """ - Write a domain event to the outbox table in the current transaction. + Write an AP-shaped activity to ap_activities with process_state='pending'. - Call this inside your service function, using the same session that - performs the domain change. The event and the change commit together. + Called inside a service function using the same session that performs the + domain change. The activity and the change commit together. """ - event = DomainEvent( - event_type=event_type, - aggregate_type=aggregate_type, - aggregate_id=aggregate_id, - payload=payload or {}, + activity_uri = f"internal:{uuid.uuid4()}" if visibility == "internal" else f"urn:uuid:{uuid.uuid4()}" + + activity = APActivity( + activity_id=activity_uri, + activity_type=activity_type, + actor_profile_id=actor_profile_id, + actor_uri=actor_uri, + object_type=object_type, + object_data=object_data or {}, + is_local=True, + source_type=source_type, + source_id=source_id, + visibility=visibility, + process_state="pending", ) - session.add(event) - await session.flush() # assign event.id - return event - - -def register_handler(event_type: str, fn: HandlerFn) -> None: - """Register an async handler for a given event type.""" - _handlers[event_type].append(fn) - - -def get_handlers(event_type: str) -> List[HandlerFn]: - """Return all registered handlers for an event type.""" - return _handlers.get(event_type, []) + session.add(activity) + await session.flush() + return activity diff --git a/events/handlers/ap_delivery_handler.py b/events/handlers/ap_delivery_handler.py index 3cbbc8f..f28b3a4 100644 --- a/events/handlers/ap_delivery_handler.py +++ b/events/handlers/ap_delivery_handler.py @@ -1,7 +1,7 @@ """Deliver AP activities to remote followers. -On ``federation.activity_created`` → load activity + actor + followers → -sign with HTTP Signatures → POST to each follower inbox. +Registered as a wildcard handler — fires for every activity. Skips +non-public activities and those without an actor profile. """ from __future__ import annotations @@ -11,7 +11,7 @@ import httpx from sqlalchemy import select from sqlalchemy.ext.asyncio import AsyncSession -from shared.events.bus import register_handler, DomainEvent +from shared.events.bus import register_activity_handler from shared.models.federation import ActorProfile, APActivity, APFollower from shared.services.registry import services @@ -33,12 +33,9 @@ def _build_activity_json(activity: APActivity, actor: ActorProfile, domain: str) object_id = activity.activity_id + "/object" if activity.activity_type == "Delete": - # Delete: object is a Tombstone with just id + type obj.setdefault("id", object_id) obj.setdefault("type", "Tombstone") else: - # Create/Update: full object with attribution - # Prefer stable id from object_data (set by try_publish), fall back to activity-derived obj.setdefault("id", object_id) obj.setdefault("type", activity.object_type) obj.setdefault("attributedTo", actor_url) @@ -105,30 +102,20 @@ async def _deliver_to_inbox( return False -async def on_activity_created(event: DomainEvent, session: AsyncSession) -> None: - """Deliver a newly created activity to all followers.""" +async def on_any_activity(activity: APActivity, session: AsyncSession) -> None: + """Deliver a public activity to all followers of its actor.""" import os + # Only deliver public activities that have an actor profile + if activity.visibility != "public": + return + if activity.actor_profile_id is None: + return if not services.has("federation"): return - payload = event.payload - activity_id_uri = payload.get("activity_id") - if not activity_id_uri: - return - domain = os.getenv("AP_DOMAIN", "rose-ash.com") - # Load the activity - activity = ( - await session.execute( - select(APActivity).where(APActivity.activity_id == activity_id_uri) - ) - ).scalar_one_or_none() - if not activity: - log.warning("Activity not found: %s", activity_id_uri) - return - # Load actor with private key actor = ( await session.execute( @@ -136,7 +123,7 @@ async def on_activity_created(event: DomainEvent, session: AsyncSession) -> None ) ).scalar_one_or_none() if not actor or not actor.private_key_pem: - log.warning("Actor not found or missing key for activity %s", activity_id_uri) + log.warning("Actor not found or missing key for activity %s", activity.activity_id) return # Load followers @@ -147,14 +134,13 @@ async def on_activity_created(event: DomainEvent, session: AsyncSession) -> None ).scalars().all() if not followers: - log.debug("No followers to deliver to for %s", activity_id_uri) + log.debug("No followers to deliver to for %s", activity.activity_id) return # Build activity JSON activity_json = _build_activity_json(activity, actor, domain) - # Deliver to each follower inbox - # Deduplicate inboxes (multiple followers might share a shared inbox) + # Deduplicate inboxes inboxes = {f.follower_inbox for f in followers if f.follower_inbox} log.info( @@ -167,4 +153,5 @@ async def on_activity_created(event: DomainEvent, session: AsyncSession) -> None await _deliver_to_inbox(client, inbox_url, activity_json, actor, domain) -register_handler("federation.activity_created", on_activity_created) +# Wildcard: fires for every activity +register_activity_handler("*", on_any_activity) diff --git a/events/handlers/container_handlers.py b/events/handlers/container_handlers.py index e687ef2..c405002 100644 --- a/events/handlers/container_handlers.py +++ b/events/handlers/container_handlers.py @@ -2,18 +2,18 @@ from __future__ import annotations from sqlalchemy.ext.asyncio import AsyncSession -from shared.events import register_handler -from shared.models.domain_event import DomainEvent +from shared.events import register_activity_handler +from shared.models.federation import APActivity from shared.services.navigation import rebuild_navigation -async def on_child_attached(event: DomainEvent, session: AsyncSession) -> None: +async def on_child_attached(activity: APActivity, session: AsyncSession) -> None: await rebuild_navigation(session) -async def on_child_detached(event: DomainEvent, session: AsyncSession) -> None: +async def on_child_detached(activity: APActivity, session: AsyncSession) -> None: await rebuild_navigation(session) -register_handler("container.child_attached", on_child_attached) -register_handler("container.child_detached", on_child_detached) +register_activity_handler("Add", on_child_attached, object_type="rose:ContainerRelation") +register_activity_handler("Remove", on_child_detached, object_type="rose:ContainerRelation") diff --git a/events/handlers/login_handlers.py b/events/handlers/login_handlers.py index 17ef493..d09ce23 100644 --- a/events/handlers/login_handlers.py +++ b/events/handlers/login_handlers.py @@ -2,24 +2,22 @@ from __future__ import annotations from sqlalchemy.ext.asyncio import AsyncSession -from shared.events import register_handler -from shared.models.domain_event import DomainEvent +from shared.events import register_activity_handler +from shared.models.federation import APActivity from shared.services.registry import services -async def on_user_logged_in(event: DomainEvent, session: AsyncSession) -> None: - payload = event.payload - user_id = payload["user_id"] - session_id = payload["session_id"] +async def on_user_logged_in(activity: APActivity, session: AsyncSession) -> None: + data = activity.object_data + user_id = data["user_id"] + session_id = data["session_id"] - # Adopt cart items (if cart service is registered) if services.has("cart"): await services.cart.adopt_cart_for_user(session, user_id, session_id) - # Adopt calendar entries and tickets (if calendar service is registered) if services.has("calendar"): await services.calendar.adopt_entries_for_user(session, user_id, session_id) await services.calendar.adopt_tickets_for_user(session, user_id, session_id) -register_handler("user.logged_in", on_user_logged_in) +register_activity_handler("rose:Login", on_user_logged_in) diff --git a/events/handlers/order_handlers.py b/events/handlers/order_handlers.py index 41016aa..c608ae7 100644 --- a/events/handlers/order_handlers.py +++ b/events/handlers/order_handlers.py @@ -4,19 +4,19 @@ import logging from sqlalchemy.ext.asyncio import AsyncSession -from shared.events import register_handler -from shared.models.domain_event import DomainEvent +from shared.events import register_activity_handler +from shared.models.federation import APActivity log = logging.getLogger(__name__) -async def on_order_created(event: DomainEvent, session: AsyncSession) -> None: - log.info("order.created: order_id=%s", event.payload.get("order_id")) +async def on_order_created(activity: APActivity, session: AsyncSession) -> None: + log.info("order.created: order_id=%s", activity.object_data.get("order_id")) -async def on_order_paid(event: DomainEvent, session: AsyncSession) -> None: - log.info("order.paid: order_id=%s", event.payload.get("order_id")) +async def on_order_paid(activity: APActivity, session: AsyncSession) -> None: + log.info("order.paid: order_id=%s", activity.object_data.get("order_id")) -register_handler("order.created", on_order_created) -register_handler("order.paid", on_order_paid) +register_activity_handler("Create", on_order_created, object_type="rose:Order") +register_activity_handler("rose:OrderPaid", on_order_paid) diff --git a/events/processor.py b/events/processor.py index f6bc863..e580233 100644 --- a/events/processor.py +++ b/events/processor.py @@ -1,6 +1,6 @@ """ -Event processor — polls the domain_events outbox table and dispatches -to registered handlers. +Event processor — polls the ap_activities table and dispatches to registered +activity handlers. Runs as an asyncio background task within each app process. Uses SELECT ... FOR UPDATE SKIP LOCKED for safe concurrent processing. @@ -11,16 +11,16 @@ import asyncio import traceback from datetime import datetime, timezone -from sqlalchemy import select, update +from sqlalchemy import select from sqlalchemy.ext.asyncio import AsyncSession from shared.db.session import get_session -from shared.models.domain_event import DomainEvent -from .bus import get_handlers +from shared.models.federation import APActivity +from .bus import get_activity_handlers class EventProcessor: - """Background event processor that polls the outbox table.""" + """Background event processor that polls the ap_activities table.""" def __init__( self, @@ -64,54 +64,52 @@ class EventProcessor: await asyncio.sleep(self._poll_interval) async def _process_batch(self) -> int: - """Fetch and process a batch of pending events. Returns count processed.""" + """Fetch and process a batch of pending activities. Returns count processed.""" processed = 0 async with get_session() as session: - # FOR UPDATE SKIP LOCKED: safe for concurrent processors stmt = ( - select(DomainEvent) + select(APActivity) .where( - DomainEvent.state == "pending", - DomainEvent.attempts < DomainEvent.max_attempts, + APActivity.process_state == "pending", + APActivity.process_attempts < APActivity.process_max_attempts, ) - .order_by(DomainEvent.created_at) + .order_by(APActivity.created_at) .limit(self._batch_size) .with_for_update(skip_locked=True) ) result = await session.execute(stmt) - events = result.scalars().all() + activities = result.scalars().all() - for event in events: - await self._process_one(session, event) + for activity in activities: + await self._process_one(session, activity) processed += 1 await session.commit() return processed - async def _process_one(self, session: AsyncSession, event: DomainEvent) -> None: - """Run all handlers for a single event.""" - handlers = get_handlers(event.event_type) + async def _process_one(self, session: AsyncSession, activity: APActivity) -> None: + """Run all handlers for a single activity.""" + handlers = get_activity_handlers(activity.activity_type, activity.object_type) now = datetime.now(timezone.utc) - event.state = "processing" - event.attempts += 1 + activity.process_state = "processing" + activity.process_attempts += 1 await session.flush() if not handlers: - # No handlers registered — mark completed (nothing to do) - event.state = "completed" - event.processed_at = now + activity.process_state = "completed" + activity.processed_at = now return try: for handler in handlers: - await handler(event, session) - event.state = "completed" - event.processed_at = now + await handler(activity, session) + activity.process_state = "completed" + activity.processed_at = now except Exception as exc: - event.last_error = f"{exc.__class__.__name__}: {exc}" - if event.attempts >= event.max_attempts: - event.state = "failed" - event.processed_at = now + activity.process_error = f"{exc.__class__.__name__}: {exc}" + if activity.process_attempts >= activity.process_max_attempts: + activity.process_state = "failed" + activity.processed_at = now else: - event.state = "pending" # retry + activity.process_state = "pending" # retry diff --git a/models/__init__.py b/models/__init__.py index e696236..7a4823c 100644 --- a/models/__init__.py +++ b/models/__init__.py @@ -8,8 +8,6 @@ from .ghost_membership_entities import ( GhostNewsletter, UserNewsletter, GhostTier, GhostSubscription, ) -from .domain_event import DomainEvent - from .ghost_content import Tag, Post, Author, PostAuthor, PostTag, PostLike from .page_config import PageConfig from .order import Order, OrderItem diff --git a/models/federation.py b/models/federation.py index 422388a..8f2066e 100644 --- a/models/federation.py +++ b/models/federation.py @@ -50,14 +50,19 @@ class ActorProfile(Base): class APActivity(Base): - """An ActivityPub activity (local or remote).""" + """An ActivityPub activity (local or remote). + + Also serves as the unified event bus: internal domain events and public + federation activities both live here, distinguished by ``visibility``. + The ``EventProcessor`` polls rows with ``process_state='pending'``. + """ __tablename__ = "ap_activities" id: Mapped[int] = mapped_column(Integer, primary_key=True, autoincrement=True) activity_id: Mapped[str] = mapped_column(String(512), unique=True, nullable=False) activity_type: Mapped[str] = mapped_column(String(64), nullable=False) - actor_profile_id: Mapped[int] = mapped_column( - Integer, ForeignKey("ap_actor_profiles.id", ondelete="CASCADE"), nullable=False, + actor_profile_id: Mapped[int | None] = mapped_column( + Integer, ForeignKey("ap_actor_profiles.id", ondelete="CASCADE"), nullable=True, ) object_type: Mapped[str | None] = mapped_column(String(64), nullable=True) object_data: Mapped[dict | None] = mapped_column(JSONB, nullable=True) @@ -83,6 +88,27 @@ class APActivity(Base): DateTime(timezone=True), nullable=False, server_default=func.now(), ) + # --- Unified event-bus columns --- + actor_uri: Mapped[str | None] = mapped_column( + String(512), nullable=True, + ) + visibility: Mapped[str] = mapped_column( + String(20), nullable=False, default="public", server_default="public", + ) + process_state: Mapped[str] = mapped_column( + String(20), nullable=False, default="completed", server_default="completed", + ) + process_attempts: Mapped[int] = mapped_column( + Integer, nullable=False, default=0, server_default="0", + ) + process_max_attempts: Mapped[int] = mapped_column( + Integer, nullable=False, default=5, server_default="5", + ) + process_error: Mapped[str | None] = mapped_column(Text, nullable=True) + processed_at: Mapped[datetime | None] = mapped_column( + DateTime(timezone=True), nullable=True, + ) + # Relationships actor_profile = relationship("ActorProfile", back_populates="activities") @@ -90,6 +116,7 @@ class APActivity(Base): Index("ix_ap_activity_actor", "actor_profile_id"), Index("ix_ap_activity_source", "source_type", "source_id"), Index("ix_ap_activity_published", "published"), + Index("ix_ap_activity_process", "process_state"), ) def __repr__(self) -> str: diff --git a/services/federation_impl.py b/services/federation_impl.py index de0ad94..442ed2f 100644 --- a/services/federation_impl.py +++ b/services/federation_impl.py @@ -183,16 +183,21 @@ class SqlFederationService: now = datetime.now(timezone.utc) + actor_url = f"https://{domain}/users/{username}" + activity = APActivity( activity_id=activity_uri, activity_type=activity_type, actor_profile_id=actor.id, + actor_uri=actor_url, object_type=object_type, object_data=object_data, published=now, is_local=True, source_type=source_type, source_id=source_id, + visibility="public", + process_state="pending", ) session.add(activity) await session.flush() @@ -208,7 +213,7 @@ class SqlFederationService: ], "id": activity_uri, "type": activity_type, - "actor": f"https://{domain}/users/{username}", + "actor": actor_url, "published": now.isoformat(), "object": { "type": object_type, @@ -221,21 +226,6 @@ class SqlFederationService: except Exception: pass # IPFS failure is non-fatal - # Emit domain event for downstream processing (delivery) - from shared.events import emit_event - await emit_event( - session, - "federation.activity_created", - "APActivity", - activity.id, - { - "activity_id": activity.activity_id, - "activity_type": activity_type, - "actor_username": username, - "object_type": object_type, - }, - ) - return _activity_to_dto(activity) # -- Queries -------------------------------------------------------------- diff --git a/services/federation_publish.py b/services/federation_publish.py index 965055f..da5945b 100644 --- a/services/federation_publish.py +++ b/services/federation_publish.py @@ -1,9 +1,9 @@ """Inline federation publication — called at write time, not via async handler. -Replaces the old pattern where emit_event("post.published") → async handler → -publish_activity(). Now the originating service calls try_publish() directly, -which creates the APActivity in the same DB transaction. AP delivery -(federation.activity_created → inbox POST) stays async. +The originating service calls try_publish() directly, which creates the +APActivity (with process_state='pending') in the same DB transaction. +The EventProcessor picks it up and the delivery wildcard handler POSTs +to follower inboxes. """ from __future__ import annotations diff --git a/services/relationships.py b/services/relationships.py index ddf0db4..c7ff084 100644 --- a/services/relationships.py +++ b/services/relationships.py @@ -3,7 +3,7 @@ from __future__ import annotations from sqlalchemy import select, func from sqlalchemy.ext.asyncio import AsyncSession -from shared.events import emit_event +from shared.events import emit_activity from shared.models.container_relation import ContainerRelation @@ -40,17 +40,19 @@ async def attach_child( if label is not None: existing.label = label await session.flush() - await emit_event( + await emit_activity( session, - event_type="container.child_attached", - aggregate_type="container_relation", - aggregate_id=existing.id, - payload={ + activity_type="Add", + actor_uri="internal:system", + object_type="rose:ContainerRelation", + object_data={ "parent_type": parent_type, "parent_id": parent_id, "child_type": child_type, "child_id": child_id, }, + source_type="container_relation", + source_id=existing.id, ) return existing # Already attached and active — no-op @@ -77,17 +79,19 @@ async def attach_child( session.add(rel) await session.flush() - await emit_event( + await emit_activity( session, - event_type="container.child_attached", - aggregate_type="container_relation", - aggregate_id=rel.id, - payload={ + activity_type="Add", + actor_uri="internal:system", + object_type="rose:ContainerRelation", + object_data={ "parent_type": parent_type, "parent_id": parent_id, "child_type": child_type, "child_id": child_id, }, + source_type="container_relation", + source_id=rel.id, ) return rel @@ -139,17 +143,19 @@ async def detach_child( rel.deleted_at = func.now() await session.flush() - await emit_event( + await emit_activity( session, - event_type="container.child_detached", - aggregate_type="container_relation", - aggregate_id=rel.id, - payload={ + activity_type="Remove", + actor_uri="internal:system", + object_type="rose:ContainerRelation", + object_data={ "parent_type": parent_type, "parent_id": parent_id, "child_type": child_type, "child_id": child_id, }, + source_type="container_relation", + source_id=rel.id, ) return True