From 3bde451ce9a2e5126187489cc37ab0305d908e0d Mon Sep 17 00:00:00 2001 From: giles Date: Sun, 22 Feb 2026 07:54:14 +0000 Subject: [PATCH] Inline federation publication, remove async handlers MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Federation activities are now created at write time via try_publish() instead of relying on async event handlers. Fixes race condition where multiple EventProcessors could consume post.published events in apps that couldn't meaningfully process them. AP delivery (federation.activity_created → inbox POST) stays async. Co-Authored-By: Claude Opus 4.6 --- events/handlers/__init__.py | 2 +- events/handlers/federation_handlers.py | 212 +------------------------ services/federation_publish.py | 71 +++++++++ 3 files changed, 78 insertions(+), 207 deletions(-) create mode 100644 services/federation_publish.py diff --git a/events/handlers/__init__.py b/events/handlers/__init__.py index 76e2c9e..e96d10e 100644 --- a/events/handlers/__init__.py +++ b/events/handlers/__init__.py @@ -6,5 +6,5 @@ def register_shared_handlers(): import shared.events.handlers.container_handlers # noqa: F401 import shared.events.handlers.login_handlers # noqa: F401 import shared.events.handlers.order_handlers # noqa: F401 - import shared.events.handlers.federation_handlers # noqa: F401 + # federation_handlers removed — publication is now inline at write sites import shared.events.handlers.ap_delivery_handler # noqa: F401 diff --git a/events/handlers/federation_handlers.py b/events/handlers/federation_handlers.py index 9db83f6..0a3af3b 100644 --- a/events/handlers/federation_handlers.py +++ b/events/handlers/federation_handlers.py @@ -1,208 +1,8 @@ -"""Federation event handlers — publish domain content as AP activities. +"""Federation event handlers — REMOVED. -Listens for content events and calls services.federation.publish_activity() -to create AP activities. Each handler checks: - 1. services.has("federation") — skip if federation not wired - 2. The content has a user_id — skip anonymous/system content - 3. The user has an ActorProfile — skip users who haven't chosen a username +Federation publication is now inline at the write site (ghost_sync, entries, +market routes) via shared.services.federation_publish.try_publish(). + +AP delivery (federation.activity_created → inbox POST) remains async via +ap_delivery_handler. """ -from __future__ import annotations - -import logging - -from sqlalchemy.ext.asyncio import AsyncSession - -from shared.events.bus import register_handler, DomainEvent -from shared.services.registry import services - -log = logging.getLogger(__name__) - - -async def _try_publish( - session: AsyncSession, - *, - user_id: int | None, - activity_type: str, - object_type: str, - object_data: dict, - source_type: str, - source_id: int, -) -> None: - """Publish an AP activity if federation is available and user has a profile.""" - if not services.has("federation"): - return - - if not user_id: - return - - # Check user has an ActorProfile (chose a username) - actor = await services.federation.get_actor_by_user_id(session, user_id) - if not actor: - return - - # Don't re-publish if we already have a live activity for this source - existing = await services.federation.get_activity_for_source( - session, source_type, source_id, - ) - if existing and activity_type == "Create" and existing.activity_type != "Delete": - return # Already published (allow re-Create after Delete/unpublish) - - try: - await services.federation.publish_activity( - session, - actor_user_id=user_id, - activity_type=activity_type, - object_type=object_type, - object_data=object_data, - source_type=source_type, - source_id=source_id, - ) - log.info( - "Published %s/%s for %s#%d by user %d", - activity_type, object_type, source_type, source_id, user_id, - ) - except Exception: - log.exception("Failed to publish activity for %s#%d", source_type, source_id) - - -# -- Post published/updated (from Ghost webhook sync) ------------------------- - -async def on_post_published(event: DomainEvent, session: AsyncSession) -> None: - p = event.payload - await _try_publish( - session, - user_id=p.get("user_id"), - activity_type="Create", - object_type="Article", - object_data={ - "name": p.get("title", ""), - "content": p.get("excerpt", ""), - "url": p.get("url", ""), - }, - source_type="Post", - source_id=event.aggregate_id, - ) - - -async def on_post_updated(event: DomainEvent, session: AsyncSession) -> None: - p = event.payload - await _try_publish( - session, - user_id=p.get("user_id"), - activity_type="Update", - object_type="Article", - object_data={ - "name": p.get("title", ""), - "content": p.get("excerpt", ""), - "url": p.get("url", ""), - }, - source_type="Post", - source_id=event.aggregate_id, - ) - - -# -- Calendar entry created/updated ------------------------------------------- - -async def on_calendar_entry_created(event: DomainEvent, session: AsyncSession) -> None: - p = event.payload - await _try_publish( - session, - user_id=p.get("user_id"), - activity_type="Create", - object_type="Event", - object_data={ - "name": p.get("title", ""), - "startTime": p.get("start_time", ""), - "endTime": p.get("end_time", ""), - "url": p.get("url", ""), - }, - source_type="CalendarEntry", - source_id=event.aggregate_id, - ) - - -async def on_calendar_entry_updated(event: DomainEvent, session: AsyncSession) -> None: - p = event.payload - await _try_publish( - session, - user_id=p.get("user_id"), - activity_type="Update", - object_type="Event", - object_data={ - "name": p.get("title", ""), - "startTime": p.get("start_time", ""), - "endTime": p.get("end_time", ""), - "url": p.get("url", ""), - }, - source_type="CalendarEntry", - source_id=event.aggregate_id, - ) - - -# -- Product listed/updated --------------------------------------------------- - -async def on_product_listed(event: DomainEvent, session: AsyncSession) -> None: - p = event.payload - await _try_publish( - session, - user_id=p.get("user_id"), - activity_type="Create", - object_type="Object", - object_data={ - "name": p.get("title", ""), - "summary": p.get("description", ""), - "url": p.get("url", ""), - }, - source_type="Product", - source_id=event.aggregate_id, - ) - - -# -- Post unpublished (delete from federation) --------------------------------- - -async def on_post_unpublished(event: DomainEvent, session: AsyncSession) -> None: - """Send a Delete activity when a post is unpublished.""" - p = event.payload - user_id = p.get("user_id") - post_url = p.get("url", "") - - if not services.has("federation") or not user_id: - return - - actor = await services.federation.get_actor_by_user_id(session, user_id) - if not actor: - return - - # Find the original activity for this post - existing = await services.federation.get_activity_for_source( - session, "Post", event.aggregate_id, - ) - if not existing or existing.activity_type == "Delete": - return # Never published or already deleted - - try: - await services.federation.publish_activity( - session, - actor_user_id=user_id, - activity_type="Delete", - object_type="Tombstone", - object_data={ - "id": post_url, - "formerType": "Article", - }, - source_type="Post", - source_id=event.aggregate_id, - ) - log.info("Published Delete for Post#%d", event.aggregate_id) - except Exception: - log.exception("Failed to publish Delete for Post#%d", event.aggregate_id) - - -# -- Registration -------------------------------------------------------------- - -register_handler("post.published", on_post_published) -register_handler("post.updated", on_post_updated) -register_handler("post.unpublished", on_post_unpublished) -register_handler("calendar_entry.created", on_calendar_entry_created) -register_handler("calendar_entry.updated", on_calendar_entry_updated) -register_handler("product.listed", on_product_listed) diff --git a/services/federation_publish.py b/services/federation_publish.py new file mode 100644 index 0000000..d20053d --- /dev/null +++ b/services/federation_publish.py @@ -0,0 +1,71 @@ +"""Inline federation publication — called at write time, not via async handler. + +Replaces the old pattern where emit_event("post.published") → async handler → +publish_activity(). Now the originating service calls try_publish() directly, +which creates the APActivity in the same DB transaction. AP delivery +(federation.activity_created → inbox POST) stays async. +""" +from __future__ import annotations + +import logging + +from sqlalchemy.ext.asyncio import AsyncSession + +from shared.services.registry import services + +log = logging.getLogger(__name__) + + +async def try_publish( + session: AsyncSession, + *, + user_id: int | None, + activity_type: str, + object_type: str, + object_data: dict, + source_type: str, + source_id: int, +) -> None: + """Publish an AP activity if federation is available and user has a profile. + + Safe to call from any app — returns silently if federation isn't wired + or the user has no actor profile. + """ + if not services.has("federation"): + return + + if not user_id: + return + + actor = await services.federation.get_actor_by_user_id(session, user_id) + if not actor: + return + + # Dedup: don't re-Create if already published, don't re-Delete if already deleted + existing = await services.federation.get_activity_for_source( + session, source_type, source_id, + ) + if existing: + if activity_type == "Create" and existing.activity_type != "Delete": + return # already published (allow re-Create after Delete/unpublish) + if activity_type == "Delete" and existing.activity_type == "Delete": + return # already deleted + elif activity_type == "Delete": + return # never published, nothing to delete + + try: + await services.federation.publish_activity( + session, + actor_user_id=user_id, + activity_type=activity_type, + object_type=object_type, + object_data=object_data, + source_type=source_type, + source_id=source_id, + ) + log.info( + "Published %s/%s for %s#%d by user %d", + activity_type, object_type, source_type, source_id, user_id, + ) + except Exception: + log.exception("Failed to publish activity for %s#%d", source_type, source_id)