Move Ghost membership sync from blog to account service so blog no longer queries account tables (users, ghost_labels, etc.). Account runs membership sync at startup and exposes HTTP action/data endpoints for webhook-triggered syncs and user lookups. Key changes: - account/services/ghost_membership.py: all membership sync functions - account/bp/actions + data: ghost-sync-member, user-by-email, newsletters - blog ghost_sync.py: stripped to content-only (posts, authors, tags) - blog webhook member: delegates to account via call_action() - try_publish: opens federation session when DBs differ - oauth.py callback: uses get_account_session() for OAuthCode - page_configs moved from db_events to db_blog in split script Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
134 lines
4.3 KiB
Python
134 lines
4.3 KiB
Python
"""Inline federation publication — called at write time, not via async handler.
|
|
|
|
The originating service calls try_publish() directly, which creates the
|
|
APActivity (with process_state='pending') in the same DB transaction.
|
|
The EventProcessor picks it up and the delivery wildcard handler POSTs
|
|
to follower inboxes.
|
|
|
|
When the federation database is separate from the caller's database,
|
|
this module opens its own federation session for all AP reads/writes.
|
|
"""
|
|
from __future__ import annotations
|
|
|
|
import logging
|
|
import os
|
|
|
|
from sqlalchemy.ext.asyncio import AsyncSession
|
|
|
|
from shared.services.registry import services
|
|
|
|
log = logging.getLogger(__name__)
|
|
|
|
|
|
def _needs_federation_session() -> bool:
|
|
"""True when the federation DB differs from the app's default DB."""
|
|
from shared.db.session import DATABASE_URL, DATABASE_URL_FEDERATION
|
|
return DATABASE_URL_FEDERATION != DATABASE_URL
|
|
|
|
|
|
async def try_publish(
|
|
session: AsyncSession,
|
|
*,
|
|
user_id: int | None,
|
|
activity_type: str,
|
|
object_type: str,
|
|
object_data: dict,
|
|
source_type: str,
|
|
source_id: int,
|
|
) -> None:
|
|
"""Publish an AP activity if federation is available and user has a profile.
|
|
|
|
Safe to call from any app — returns silently if federation isn't wired
|
|
or the user has no actor profile.
|
|
"""
|
|
if not services.has("federation"):
|
|
return
|
|
|
|
if not user_id:
|
|
return
|
|
|
|
if _needs_federation_session():
|
|
from shared.db.session import get_federation_session
|
|
async with get_federation_session() as fed_s:
|
|
async with fed_s.begin():
|
|
await _publish_inner(
|
|
fed_s,
|
|
user_id=user_id,
|
|
activity_type=activity_type,
|
|
object_type=object_type,
|
|
object_data=object_data,
|
|
source_type=source_type,
|
|
source_id=source_id,
|
|
)
|
|
else:
|
|
await _publish_inner(
|
|
session,
|
|
user_id=user_id,
|
|
activity_type=activity_type,
|
|
object_type=object_type,
|
|
object_data=object_data,
|
|
source_type=source_type,
|
|
source_id=source_id,
|
|
)
|
|
|
|
|
|
async def _publish_inner(
|
|
session: AsyncSession,
|
|
*,
|
|
user_id: int,
|
|
activity_type: str,
|
|
object_type: str,
|
|
object_data: dict,
|
|
source_type: str,
|
|
source_id: int,
|
|
) -> None:
|
|
"""Core publish logic using a session bound to the federation DB."""
|
|
actor = await services.federation.get_actor_by_user_id(session, user_id)
|
|
if not actor:
|
|
return
|
|
|
|
# Dedup: don't re-Create if already published, don't re-Delete if already deleted
|
|
existing = await services.federation.get_activity_for_source(
|
|
session, source_type, source_id,
|
|
)
|
|
if existing:
|
|
if activity_type == "Create" and existing.activity_type != "Delete":
|
|
return
|
|
if activity_type == "Delete" and existing.activity_type == "Delete":
|
|
return
|
|
elif activity_type in ("Delete", "Update"):
|
|
return
|
|
|
|
# Stable object ID within a publish cycle
|
|
domain = os.getenv("AP_DOMAIN", "federation.rose-ash.com")
|
|
base_object_id = (
|
|
f"https://{domain}/users/{actor.preferred_username}"
|
|
f"/objects/{source_type.lower()}/{source_id}"
|
|
)
|
|
if activity_type == "Create" and existing and existing.activity_type == "Delete":
|
|
create_count = await services.federation.count_activities_for_source(
|
|
session, source_type, source_id, activity_type="Create",
|
|
)
|
|
object_data["id"] = f"{base_object_id}/v{create_count + 1}"
|
|
elif activity_type in ("Update", "Delete") and existing and existing.object_data:
|
|
object_data["id"] = existing.object_data.get("id", base_object_id)
|
|
else:
|
|
object_data["id"] = base_object_id
|
|
|
|
try:
|
|
await services.federation.publish_activity(
|
|
session,
|
|
actor_user_id=user_id,
|
|
activity_type=activity_type,
|
|
object_type=object_type,
|
|
object_data=object_data,
|
|
source_type=source_type,
|
|
source_id=source_id,
|
|
)
|
|
log.info(
|
|
"Published %s/%s for %s#%d by user %d",
|
|
activity_type, object_type, source_type, source_id, user_id,
|
|
)
|
|
except Exception:
|
|
log.exception("Failed to publish activity for %s#%d", source_type, source_id)
|