"""Inline federation publication — called at write time, not via async handler. The originating service calls try_publish() directly, which creates the APActivity (with process_state='pending') in the same DB transaction. The EventProcessor picks it up and the delivery wildcard handler POSTs to follower inboxes. When the federation database is separate from the caller's database, this module opens its own federation session for all AP reads/writes. """ from __future__ import annotations import logging import os from sqlalchemy.ext.asyncio import AsyncSession from shared.services.registry import services log = logging.getLogger(__name__) def _needs_federation_session() -> bool: """True when the federation DB differs from the app's default DB.""" from shared.db.session import DATABASE_URL, DATABASE_URL_FEDERATION return DATABASE_URL_FEDERATION != DATABASE_URL async def try_publish( session: AsyncSession, *, user_id: int | None, activity_type: str, object_type: str, object_data: dict, source_type: str, source_id: int, ) -> None: """Publish an AP activity if federation is available and user has a profile. Safe to call from any app — returns silently if federation isn't wired or the user has no actor profile. """ if not services.has("federation"): return if not user_id: return if _needs_federation_session(): from shared.db.session import get_federation_session async with get_federation_session() as fed_s: async with fed_s.begin(): await _publish_inner( fed_s, user_id=user_id, activity_type=activity_type, object_type=object_type, object_data=object_data, source_type=source_type, source_id=source_id, ) else: await _publish_inner( session, user_id=user_id, activity_type=activity_type, object_type=object_type, object_data=object_data, source_type=source_type, source_id=source_id, ) async def _publish_inner( session: AsyncSession, *, user_id: int, activity_type: str, object_type: str, object_data: dict, source_type: str, source_id: int, ) -> None: """Core publish logic using a session bound to the federation DB.""" actor = await services.federation.get_actor_by_user_id(session, user_id) if not actor: return # Dedup: don't re-Create if already published, don't re-Delete if already deleted existing = await services.federation.get_activity_for_source( session, source_type, source_id, ) if existing: if activity_type == "Create" and existing.activity_type != "Delete": return if activity_type == "Delete" and existing.activity_type == "Delete": return elif activity_type in ("Delete", "Update"): return # Stable object ID within a publish cycle domain = os.getenv("AP_DOMAIN", "federation.rose-ash.com") base_object_id = ( f"https://{domain}/users/{actor.preferred_username}" f"/objects/{source_type.lower()}/{source_id}" ) if activity_type == "Create" and existing and existing.activity_type == "Delete": create_count = await services.federation.count_activities_for_source( session, source_type, source_id, activity_type="Create", ) object_data["id"] = f"{base_object_id}/v{create_count + 1}" elif activity_type in ("Update", "Delete") and existing and existing.object_data: object_data["id"] = existing.object_data.get("id", base_object_id) else: object_data["id"] = base_object_id try: await services.federation.publish_activity( session, actor_user_id=user_id, activity_type=activity_type, object_type=object_type, object_data=object_data, source_type=source_type, source_id=source_id, ) log.info( "Published %s/%s for %s#%d by user %d", activity_type, object_type, source_type, source_id, user_id, ) except Exception: log.exception("Failed to publish activity for %s#%d", source_type, source_id)