"""Inline federation publication — called at write time, not via async handler. The originating service calls try_publish() directly, which creates the APActivity (with process_state='pending') in the same DB transaction. The EventProcessor picks it up and the delivery wildcard handler POSTs to follower inboxes. """ from __future__ import annotations import logging import os from sqlalchemy.ext.asyncio import AsyncSession from shared.services.registry import services log = logging.getLogger(__name__) async def try_publish( session: AsyncSession, *, user_id: int | None, activity_type: str, object_type: str, object_data: dict, source_type: str, source_id: int, ) -> None: """Publish an AP activity if federation is available and user has a profile. Safe to call from any app — returns silently if federation isn't wired or the user has no actor profile. """ if not services.has("federation"): return if not user_id: return actor = await services.federation.get_actor_by_user_id(session, user_id) if not actor: return # Dedup: don't re-Create if already published, don't re-Delete if already deleted existing = await services.federation.get_activity_for_source( session, source_type, source_id, ) if existing: if activity_type == "Create" and existing.activity_type != "Delete": return # already published (allow re-Create after Delete/unpublish) if activity_type == "Delete" and existing.activity_type == "Delete": return # already deleted elif activity_type in ("Delete", "Update"): return # never published, nothing to delete/update # Stable object ID within a publish cycle. After Delete + re-Create # we append a version suffix so remote servers (Mastodon) treat it as # a brand-new post rather than ignoring the tombstoned ID. domain = os.getenv("AP_DOMAIN", "federation.rose-ash.com") base_object_id = ( f"https://{domain}/users/{actor.preferred_username}" f"/objects/{source_type.lower()}/{source_id}" ) if activity_type == "Create" and existing and existing.activity_type == "Delete": # Count prior Creates to derive a version number create_count = await services.federation.count_activities_for_source( session, source_type, source_id, activity_type="Create", ) object_data["id"] = f"{base_object_id}/v{create_count + 1}" elif activity_type in ("Update", "Delete") and existing and existing.object_data: # Use the same object ID as the most recent activity object_data["id"] = existing.object_data.get("id", base_object_id) else: object_data["id"] = base_object_id try: await services.federation.publish_activity( session, actor_user_id=user_id, activity_type=activity_type, object_type=object_type, object_data=object_data, source_type=source_type, source_id=source_id, ) log.info( "Published %s/%s for %s#%d by user %d", activity_type, object_type, source_type, source_id, user_id, ) except Exception: log.exception("Failed to publish activity for %s#%d", source_type, source_id)