Inline federation publication, remove async handlers
Federation activities are now created at write time via try_publish() instead of relying on async event handlers. Fixes race condition where multiple EventProcessors could consume post.published events in apps that couldn't meaningfully process them. AP delivery (federation.activity_created → inbox POST) stays async. Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
This commit is contained in:
@@ -6,5 +6,5 @@ def register_shared_handlers():
|
||||
import shared.events.handlers.container_handlers # noqa: F401
|
||||
import shared.events.handlers.login_handlers # noqa: F401
|
||||
import shared.events.handlers.order_handlers # noqa: F401
|
||||
import shared.events.handlers.federation_handlers # noqa: F401
|
||||
# federation_handlers removed — publication is now inline at write sites
|
||||
import shared.events.handlers.ap_delivery_handler # noqa: F401
|
||||
|
||||
@@ -1,208 +1,8 @@
|
||||
"""Federation event handlers — publish domain content as AP activities.
|
||||
"""Federation event handlers — REMOVED.
|
||||
|
||||
Listens for content events and calls services.federation.publish_activity()
|
||||
to create AP activities. Each handler checks:
|
||||
1. services.has("federation") — skip if federation not wired
|
||||
2. The content has a user_id — skip anonymous/system content
|
||||
3. The user has an ActorProfile — skip users who haven't chosen a username
|
||||
Federation publication is now inline at the write site (ghost_sync, entries,
|
||||
market routes) via shared.services.federation_publish.try_publish().
|
||||
|
||||
AP delivery (federation.activity_created → inbox POST) remains async via
|
||||
ap_delivery_handler.
|
||||
"""
|
||||
from __future__ import annotations
|
||||
|
||||
import logging
|
||||
|
||||
from sqlalchemy.ext.asyncio import AsyncSession
|
||||
|
||||
from shared.events.bus import register_handler, DomainEvent
|
||||
from shared.services.registry import services
|
||||
|
||||
log = logging.getLogger(__name__)
|
||||
|
||||
|
||||
async def _try_publish(
|
||||
session: AsyncSession,
|
||||
*,
|
||||
user_id: int | None,
|
||||
activity_type: str,
|
||||
object_type: str,
|
||||
object_data: dict,
|
||||
source_type: str,
|
||||
source_id: int,
|
||||
) -> None:
|
||||
"""Publish an AP activity if federation is available and user has a profile."""
|
||||
if not services.has("federation"):
|
||||
return
|
||||
|
||||
if not user_id:
|
||||
return
|
||||
|
||||
# Check user has an ActorProfile (chose a username)
|
||||
actor = await services.federation.get_actor_by_user_id(session, user_id)
|
||||
if not actor:
|
||||
return
|
||||
|
||||
# Don't re-publish if we already have a live activity for this source
|
||||
existing = await services.federation.get_activity_for_source(
|
||||
session, source_type, source_id,
|
||||
)
|
||||
if existing and activity_type == "Create" and existing.activity_type != "Delete":
|
||||
return # Already published (allow re-Create after Delete/unpublish)
|
||||
|
||||
try:
|
||||
await services.federation.publish_activity(
|
||||
session,
|
||||
actor_user_id=user_id,
|
||||
activity_type=activity_type,
|
||||
object_type=object_type,
|
||||
object_data=object_data,
|
||||
source_type=source_type,
|
||||
source_id=source_id,
|
||||
)
|
||||
log.info(
|
||||
"Published %s/%s for %s#%d by user %d",
|
||||
activity_type, object_type, source_type, source_id, user_id,
|
||||
)
|
||||
except Exception:
|
||||
log.exception("Failed to publish activity for %s#%d", source_type, source_id)
|
||||
|
||||
|
||||
# -- Post published/updated (from Ghost webhook sync) -------------------------
|
||||
|
||||
async def on_post_published(event: DomainEvent, session: AsyncSession) -> None:
|
||||
p = event.payload
|
||||
await _try_publish(
|
||||
session,
|
||||
user_id=p.get("user_id"),
|
||||
activity_type="Create",
|
||||
object_type="Article",
|
||||
object_data={
|
||||
"name": p.get("title", ""),
|
||||
"content": p.get("excerpt", ""),
|
||||
"url": p.get("url", ""),
|
||||
},
|
||||
source_type="Post",
|
||||
source_id=event.aggregate_id,
|
||||
)
|
||||
|
||||
|
||||
async def on_post_updated(event: DomainEvent, session: AsyncSession) -> None:
|
||||
p = event.payload
|
||||
await _try_publish(
|
||||
session,
|
||||
user_id=p.get("user_id"),
|
||||
activity_type="Update",
|
||||
object_type="Article",
|
||||
object_data={
|
||||
"name": p.get("title", ""),
|
||||
"content": p.get("excerpt", ""),
|
||||
"url": p.get("url", ""),
|
||||
},
|
||||
source_type="Post",
|
||||
source_id=event.aggregate_id,
|
||||
)
|
||||
|
||||
|
||||
# -- Calendar entry created/updated -------------------------------------------
|
||||
|
||||
async def on_calendar_entry_created(event: DomainEvent, session: AsyncSession) -> None:
|
||||
p = event.payload
|
||||
await _try_publish(
|
||||
session,
|
||||
user_id=p.get("user_id"),
|
||||
activity_type="Create",
|
||||
object_type="Event",
|
||||
object_data={
|
||||
"name": p.get("title", ""),
|
||||
"startTime": p.get("start_time", ""),
|
||||
"endTime": p.get("end_time", ""),
|
||||
"url": p.get("url", ""),
|
||||
},
|
||||
source_type="CalendarEntry",
|
||||
source_id=event.aggregate_id,
|
||||
)
|
||||
|
||||
|
||||
async def on_calendar_entry_updated(event: DomainEvent, session: AsyncSession) -> None:
|
||||
p = event.payload
|
||||
await _try_publish(
|
||||
session,
|
||||
user_id=p.get("user_id"),
|
||||
activity_type="Update",
|
||||
object_type="Event",
|
||||
object_data={
|
||||
"name": p.get("title", ""),
|
||||
"startTime": p.get("start_time", ""),
|
||||
"endTime": p.get("end_time", ""),
|
||||
"url": p.get("url", ""),
|
||||
},
|
||||
source_type="CalendarEntry",
|
||||
source_id=event.aggregate_id,
|
||||
)
|
||||
|
||||
|
||||
# -- Product listed/updated ---------------------------------------------------
|
||||
|
||||
async def on_product_listed(event: DomainEvent, session: AsyncSession) -> None:
|
||||
p = event.payload
|
||||
await _try_publish(
|
||||
session,
|
||||
user_id=p.get("user_id"),
|
||||
activity_type="Create",
|
||||
object_type="Object",
|
||||
object_data={
|
||||
"name": p.get("title", ""),
|
||||
"summary": p.get("description", ""),
|
||||
"url": p.get("url", ""),
|
||||
},
|
||||
source_type="Product",
|
||||
source_id=event.aggregate_id,
|
||||
)
|
||||
|
||||
|
||||
# -- Post unpublished (delete from federation) ---------------------------------
|
||||
|
||||
async def on_post_unpublished(event: DomainEvent, session: AsyncSession) -> None:
|
||||
"""Send a Delete activity when a post is unpublished."""
|
||||
p = event.payload
|
||||
user_id = p.get("user_id")
|
||||
post_url = p.get("url", "")
|
||||
|
||||
if not services.has("federation") or not user_id:
|
||||
return
|
||||
|
||||
actor = await services.federation.get_actor_by_user_id(session, user_id)
|
||||
if not actor:
|
||||
return
|
||||
|
||||
# Find the original activity for this post
|
||||
existing = await services.federation.get_activity_for_source(
|
||||
session, "Post", event.aggregate_id,
|
||||
)
|
||||
if not existing or existing.activity_type == "Delete":
|
||||
return # Never published or already deleted
|
||||
|
||||
try:
|
||||
await services.federation.publish_activity(
|
||||
session,
|
||||
actor_user_id=user_id,
|
||||
activity_type="Delete",
|
||||
object_type="Tombstone",
|
||||
object_data={
|
||||
"id": post_url,
|
||||
"formerType": "Article",
|
||||
},
|
||||
source_type="Post",
|
||||
source_id=event.aggregate_id,
|
||||
)
|
||||
log.info("Published Delete for Post#%d", event.aggregate_id)
|
||||
except Exception:
|
||||
log.exception("Failed to publish Delete for Post#%d", event.aggregate_id)
|
||||
|
||||
|
||||
# -- Registration --------------------------------------------------------------
|
||||
|
||||
register_handler("post.published", on_post_published)
|
||||
register_handler("post.updated", on_post_updated)
|
||||
register_handler("post.unpublished", on_post_unpublished)
|
||||
register_handler("calendar_entry.created", on_calendar_entry_created)
|
||||
register_handler("calendar_entry.updated", on_calendar_entry_updated)
|
||||
register_handler("product.listed", on_product_listed)
|
||||
|
||||
71
services/federation_publish.py
Normal file
71
services/federation_publish.py
Normal file
@@ -0,0 +1,71 @@
|
||||
"""Inline federation publication — called at write time, not via async handler.
|
||||
|
||||
Replaces the old pattern where emit_event("post.published") → async handler →
|
||||
publish_activity(). Now the originating service calls try_publish() directly,
|
||||
which creates the APActivity in the same DB transaction. AP delivery
|
||||
(federation.activity_created → inbox POST) stays async.
|
||||
"""
|
||||
from __future__ import annotations
|
||||
|
||||
import logging
|
||||
|
||||
from sqlalchemy.ext.asyncio import AsyncSession
|
||||
|
||||
from shared.services.registry import services
|
||||
|
||||
log = logging.getLogger(__name__)
|
||||
|
||||
|
||||
async def try_publish(
|
||||
session: AsyncSession,
|
||||
*,
|
||||
user_id: int | None,
|
||||
activity_type: str,
|
||||
object_type: str,
|
||||
object_data: dict,
|
||||
source_type: str,
|
||||
source_id: int,
|
||||
) -> None:
|
||||
"""Publish an AP activity if federation is available and user has a profile.
|
||||
|
||||
Safe to call from any app — returns silently if federation isn't wired
|
||||
or the user has no actor profile.
|
||||
"""
|
||||
if not services.has("federation"):
|
||||
return
|
||||
|
||||
if not user_id:
|
||||
return
|
||||
|
||||
actor = await services.federation.get_actor_by_user_id(session, user_id)
|
||||
if not actor:
|
||||
return
|
||||
|
||||
# Dedup: don't re-Create if already published, don't re-Delete if already deleted
|
||||
existing = await services.federation.get_activity_for_source(
|
||||
session, source_type, source_id,
|
||||
)
|
||||
if existing:
|
||||
if activity_type == "Create" and existing.activity_type != "Delete":
|
||||
return # already published (allow re-Create after Delete/unpublish)
|
||||
if activity_type == "Delete" and existing.activity_type == "Delete":
|
||||
return # already deleted
|
||||
elif activity_type == "Delete":
|
||||
return # never published, nothing to delete
|
||||
|
||||
try:
|
||||
await services.federation.publish_activity(
|
||||
session,
|
||||
actor_user_id=user_id,
|
||||
activity_type=activity_type,
|
||||
object_type=object_type,
|
||||
object_data=object_data,
|
||||
source_type=source_type,
|
||||
source_id=source_id,
|
||||
)
|
||||
log.info(
|
||||
"Published %s/%s for %s#%d by user %d",
|
||||
activity_type, object_type, source_type, source_id, user_id,
|
||||
)
|
||||
except Exception:
|
||||
log.exception("Failed to publish activity for %s#%d", source_type, source_id)
|
||||
Reference in New Issue
Block a user