Files
rose-ash/shared/services/federation_publish.py
giles 95bd32bd71
All checks were successful
Build and Deploy / build-and-deploy (push) Successful in 6m2s
Decouple cross-domain DB queries for per-app database split
Move Ghost membership sync from blog to account service so blog no
longer queries account tables (users, ghost_labels, etc.). Account
runs membership sync at startup and exposes HTTP action/data endpoints
for webhook-triggered syncs and user lookups.

Key changes:
- account/services/ghost_membership.py: all membership sync functions
- account/bp/actions + data: ghost-sync-member, user-by-email, newsletters
- blog ghost_sync.py: stripped to content-only (posts, authors, tags)
- blog webhook member: delegates to account via call_action()
- try_publish: opens federation session when DBs differ
- oauth.py callback: uses get_account_session() for OAuthCode
- page_configs moved from db_events to db_blog in split script

Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
2026-02-25 11:32:14 +00:00

134 lines
4.3 KiB
Python

"""Inline federation publication — called at write time, not via async handler.
The originating service calls try_publish() directly, which creates the
APActivity (with process_state='pending') in the same DB transaction.
The EventProcessor picks it up and the delivery wildcard handler POSTs
to follower inboxes.
When the federation database is separate from the caller's database,
this module opens its own federation session for all AP reads/writes.
"""
from __future__ import annotations
import logging
import os
from sqlalchemy.ext.asyncio import AsyncSession
from shared.services.registry import services
log = logging.getLogger(__name__)
def _needs_federation_session() -> bool:
"""True when the federation DB differs from the app's default DB."""
from shared.db.session import DATABASE_URL, DATABASE_URL_FEDERATION
return DATABASE_URL_FEDERATION != DATABASE_URL
async def try_publish(
session: AsyncSession,
*,
user_id: int | None,
activity_type: str,
object_type: str,
object_data: dict,
source_type: str,
source_id: int,
) -> None:
"""Publish an AP activity if federation is available and user has a profile.
Safe to call from any app — returns silently if federation isn't wired
or the user has no actor profile.
"""
if not services.has("federation"):
return
if not user_id:
return
if _needs_federation_session():
from shared.db.session import get_federation_session
async with get_federation_session() as fed_s:
async with fed_s.begin():
await _publish_inner(
fed_s,
user_id=user_id,
activity_type=activity_type,
object_type=object_type,
object_data=object_data,
source_type=source_type,
source_id=source_id,
)
else:
await _publish_inner(
session,
user_id=user_id,
activity_type=activity_type,
object_type=object_type,
object_data=object_data,
source_type=source_type,
source_id=source_id,
)
async def _publish_inner(
session: AsyncSession,
*,
user_id: int,
activity_type: str,
object_type: str,
object_data: dict,
source_type: str,
source_id: int,
) -> None:
"""Core publish logic using a session bound to the federation DB."""
actor = await services.federation.get_actor_by_user_id(session, user_id)
if not actor:
return
# Dedup: don't re-Create if already published, don't re-Delete if already deleted
existing = await services.federation.get_activity_for_source(
session, source_type, source_id,
)
if existing:
if activity_type == "Create" and existing.activity_type != "Delete":
return
if activity_type == "Delete" and existing.activity_type == "Delete":
return
elif activity_type in ("Delete", "Update"):
return
# Stable object ID within a publish cycle
domain = os.getenv("AP_DOMAIN", "federation.rose-ash.com")
base_object_id = (
f"https://{domain}/users/{actor.preferred_username}"
f"/objects/{source_type.lower()}/{source_id}"
)
if activity_type == "Create" and existing and existing.activity_type == "Delete":
create_count = await services.federation.count_activities_for_source(
session, source_type, source_id, activity_type="Create",
)
object_data["id"] = f"{base_object_id}/v{create_count + 1}"
elif activity_type in ("Update", "Delete") and existing and existing.object_data:
object_data["id"] = existing.object_data.get("id", base_object_id)
else:
object_data["id"] = base_object_id
try:
await services.federation.publish_activity(
session,
actor_user_id=user_id,
activity_type=activity_type,
object_type=object_type,
object_data=object_data,
source_type=source_type,
source_id=source_id,
)
log.info(
"Published %s/%s for %s#%d by user %d",
activity_type, object_type, source_type, source_id, user_id,
)
except Exception:
log.exception("Failed to publish activity for %s#%d", source_type, source_id)