This repository has been archived on 2026-02-24. You can view files and clone it. You cannot open issues or pull requests or push a commit.
Files
shared/events/handlers/federation_handlers.py
giles 18410c4b16 Add unpublish (Delete) support + improve object IDs
- on_post_unpublished handler sends Delete/Tombstone activity
- Create/Update objects use post URL as id (for Delete reference)
- Delete objects use Tombstone type

Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
2026-02-21 23:26:56 +00:00

214 lines
6.8 KiB
Python

"""Federation event handlers — publish domain content as AP activities.
Listens for content events and calls services.federation.publish_activity()
to create AP activities. Each handler checks:
1. services.has("federation") — skip if federation not wired
2. The content has a user_id — skip anonymous/system content
3. The user has an ActorProfile — skip users who haven't chosen a username
"""
from __future__ import annotations
import logging
from sqlalchemy.ext.asyncio import AsyncSession
from shared.events.bus import register_handler, DomainEvent
from shared.services.registry import services
log = logging.getLogger(__name__)
async def _try_publish(
session: AsyncSession,
*,
user_id: int | None,
activity_type: str,
object_type: str,
object_data: dict,
source_type: str,
source_id: int,
) -> None:
"""Publish an AP activity if federation is available and user has a profile."""
if not services.has("federation"):
log.warning("_try_publish: no federation service")
return
if not user_id:
log.warning("_try_publish: no user_id for %s#%s", source_type, source_id)
return
# Check user has an ActorProfile (chose a username)
actor = await services.federation.get_actor_by_user_id(session, user_id)
if not actor:
log.warning("_try_publish: no ActorProfile for user_id=%s", user_id)
return
# Don't re-publish if we already have an activity for this source
existing = await services.federation.get_activity_for_source(
session, source_type, source_id,
)
if existing and activity_type == "Create":
log.warning("_try_publish: already published %s#%s", source_type, source_id)
return # Already published
log.warning("_try_publish: publishing %s/%s for %s#%d user=%s", activity_type, object_type, source_type, source_id, user_id)
try:
await services.federation.publish_activity(
session,
actor_user_id=user_id,
activity_type=activity_type,
object_type=object_type,
object_data=object_data,
source_type=source_type,
source_id=source_id,
)
log.warning(
"Published %s/%s for %s#%d by user %d",
activity_type, object_type, source_type, source_id, user_id,
)
except Exception:
log.exception("Failed to publish activity for %s#%d", source_type, source_id)
# -- Post published/updated (from Ghost webhook sync) -------------------------
async def on_post_published(event: DomainEvent, session: AsyncSession) -> None:
p = event.payload
await _try_publish(
session,
user_id=p.get("user_id"),
activity_type="Create",
object_type="Article",
object_data={
"name": p.get("title", ""),
"content": p.get("excerpt", ""),
"url": p.get("url", ""),
},
source_type="Post",
source_id=event.aggregate_id,
)
async def on_post_updated(event: DomainEvent, session: AsyncSession) -> None:
p = event.payload
await _try_publish(
session,
user_id=p.get("user_id"),
activity_type="Update",
object_type="Article",
object_data={
"name": p.get("title", ""),
"content": p.get("excerpt", ""),
"url": p.get("url", ""),
},
source_type="Post",
source_id=event.aggregate_id,
)
# -- Calendar entry created/updated -------------------------------------------
async def on_calendar_entry_created(event: DomainEvent, session: AsyncSession) -> None:
p = event.payload
await _try_publish(
session,
user_id=p.get("user_id"),
activity_type="Create",
object_type="Event",
object_data={
"name": p.get("title", ""),
"startTime": p.get("start_time", ""),
"endTime": p.get("end_time", ""),
"url": p.get("url", ""),
},
source_type="CalendarEntry",
source_id=event.aggregate_id,
)
async def on_calendar_entry_updated(event: DomainEvent, session: AsyncSession) -> None:
p = event.payload
await _try_publish(
session,
user_id=p.get("user_id"),
activity_type="Update",
object_type="Event",
object_data={
"name": p.get("title", ""),
"startTime": p.get("start_time", ""),
"endTime": p.get("end_time", ""),
"url": p.get("url", ""),
},
source_type="CalendarEntry",
source_id=event.aggregate_id,
)
# -- Product listed/updated ---------------------------------------------------
async def on_product_listed(event: DomainEvent, session: AsyncSession) -> None:
p = event.payload
await _try_publish(
session,
user_id=p.get("user_id"),
activity_type="Create",
object_type="Object",
object_data={
"name": p.get("title", ""),
"summary": p.get("description", ""),
"url": p.get("url", ""),
},
source_type="Product",
source_id=event.aggregate_id,
)
# -- Post unpublished (delete from federation) ---------------------------------
async def on_post_unpublished(event: DomainEvent, session: AsyncSession) -> None:
"""Send a Delete activity when a post is unpublished."""
p = event.payload
user_id = p.get("user_id")
post_url = p.get("url", "")
if not services.has("federation") or not user_id:
return
actor = await services.federation.get_actor_by_user_id(session, user_id)
if not actor:
return
# Find the original Create activity for this post
existing = await services.federation.get_activity_for_source(
session, "Post", event.aggregate_id,
)
if not existing:
return # Never published to federation, nothing to delete
try:
await services.federation.publish_activity(
session,
actor_user_id=user_id,
activity_type="Delete",
object_type="Tombstone",
object_data={
"id": post_url,
"formerType": "Article",
},
source_type="Post",
source_id=event.aggregate_id,
)
log.warning("Published Delete for Post#%d", event.aggregate_id)
except Exception:
log.exception("Failed to publish Delete for Post#%d", event.aggregate_id)
# -- Registration --------------------------------------------------------------
register_handler("post.published", on_post_published)
register_handler("post.updated", on_post_updated)
register_handler("post.unpublished", on_post_unpublished)
register_handler("calendar_entry.created", on_calendar_entry_created)
register_handler("calendar_entry.updated", on_calendar_entry_updated)
register_handler("product.listed", on_product_listed)