"""Deliver AP activities to remote followers. Registered as a wildcard handler — fires for every activity. Skips non-public activities and those without an actor profile. Per-app delivery: activities are delivered using the domain that matches the follower's subscription. A follower of ``@alice@blog.rose-ash.com`` receives activities with ``actor: https://blog.rose-ash.com/users/alice`` and signatures using that domain's key_id. Aggregate followers (``app_domain='federation'``) receive the federation domain identity. Idempotent: successful deliveries are recorded in ap_delivery_log. On retry (at-least-once reaper), already-delivered inboxes are skipped. """ from __future__ import annotations import logging import os from collections import defaultdict import httpx from sqlalchemy import select, or_ from sqlalchemy.ext.asyncio import AsyncSession from shared.events.bus import register_activity_handler from shared.models.federation import ActorProfile, APActivity, APFollower, APDeliveryLog from shared.services.registry import services log = logging.getLogger(__name__) AP_CONTENT_TYPE = "application/activity+json" DELIVERY_TIMEOUT = 15 # seconds per request def _domain_for_app(app_name: str) -> str: """Resolve the public AP domain for an app name.""" from shared.infrastructure.activitypub import _ap_domain return _ap_domain(app_name) def _build_activity_json(activity: APActivity, actor: ActorProfile, domain: str) -> dict: """Build the full AP activity JSON-LD for delivery.""" username = actor.preferred_username actor_url = f"https://{domain}/users/{username}" obj = dict(activity.object_data or {}) # Rewrite all URLs from the federation domain to the delivery domain # so Mastodon's origin check passes (all IDs must match actor host). import re fed_domain = os.getenv("AP_DOMAIN", "federation.rose-ash.com") def _rewrite(url: str) -> str: if isinstance(url, str) and fed_domain in url: return url.replace(f"https://{fed_domain}", f"https://{domain}") return url activity_id = _rewrite(activity.activity_id) object_id = activity_id + "/object" # Rewrite any federation-domain URLs in object_data if "id" in obj: obj["id"] = _rewrite(obj["id"]) if "attributedTo" in obj: obj["attributedTo"] = _rewrite(obj["attributedTo"]) if activity.activity_type == "Delete": obj.setdefault("id", object_id) obj.setdefault("type", "Tombstone") else: obj.setdefault("id", object_id) obj.setdefault("type", activity.object_type) obj.setdefault("attributedTo", actor_url) obj.setdefault("published", activity.published.isoformat() if activity.published else None) obj.setdefault("to", ["https://www.w3.org/ns/activitystreams#Public"]) obj.setdefault("cc", [f"{actor_url}/followers"]) if activity.activity_type == "Update": from datetime import datetime, timezone obj["updated"] = datetime.now(timezone.utc).isoformat() return { "@context": [ "https://www.w3.org/ns/activitystreams", "https://w3id.org/security/v1", ], "id": activity_id, "type": activity.activity_type, "actor": actor_url, "published": activity.published.isoformat() if activity.published else None, "to": ["https://www.w3.org/ns/activitystreams#Public"], "cc": [f"{actor_url}/followers"], "object": obj, } async def _deliver_to_inbox( client: httpx.AsyncClient, inbox_url: str, body: dict, actor: ActorProfile, domain: str, ) -> int | None: """POST signed activity to a single inbox. Returns status code or None on error.""" from shared.utils.http_signatures import sign_request from urllib.parse import urlparse import json body_bytes = json.dumps(body).encode() key_id = f"https://{domain}/users/{actor.preferred_username}#main-key" parsed = urlparse(inbox_url) headers = sign_request( private_key_pem=actor.private_key_pem, key_id=key_id, method="POST", path=parsed.path, host=parsed.netloc, body=body_bytes, ) headers["Content-Type"] = AP_CONTENT_TYPE try: resp = await client.post( inbox_url, content=body_bytes, headers=headers, timeout=DELIVERY_TIMEOUT, ) if resp.status_code < 300: log.info("Delivered to %s → %d", inbox_url, resp.status_code) else: log.warning("Delivery to %s → %d: %s", inbox_url, resp.status_code, resp.text[:200]) return resp.status_code except Exception: log.exception("Delivery failed for %s", inbox_url) return None async def on_any_activity(activity: APActivity, session: AsyncSession) -> None: """Deliver a public activity to all matching followers of its actor.""" # Only deliver public activities that have an actor profile if activity.visibility != "public": return if activity.actor_profile_id is None: return if not services.has("federation"): return # Load actor with private key actor = ( await session.execute( select(ActorProfile).where(ActorProfile.id == activity.actor_profile_id) ) ).scalar_one_or_none() if not actor or not actor.private_key_pem: log.warning("Actor not found or missing key for activity %s", activity.activity_id) return # Load matching followers. # Aggregate followers (app_domain='federation') always get everything. # Per-app followers only get activities from their app. origin_app = activity.origin_app follower_filters = [APFollower.actor_profile_id == actor.id] if origin_app and origin_app != "federation": follower_filters.append( or_( APFollower.app_domain == "federation", APFollower.app_domain == origin_app, ) ) followers = ( await session.execute( select(APFollower).where(*follower_filters) ) ).scalars().all() if not followers: log.debug("No followers to deliver to for %s", activity.activity_id) return # Check delivery log — skip inboxes we already delivered to (idempotency) existing = ( await session.execute( select(APDeliveryLog.inbox_url).where( APDeliveryLog.activity_id == activity.id, APDeliveryLog.status_code < 300, ) ) ).scalars().all() already_delivered = set(existing) # Group followers by app_domain so we deliver with the correct # actor URL and signing domain for each subscriber. # If the same inbox appears under multiple app_domains, prefer # the per-app domain (it's what the follower subscribed to). inbox_to_domain: dict[str, str] = {} for f in followers: if not f.follower_inbox: continue if f.follower_inbox in already_delivered: continue app_dom = f.app_domain or "federation" # Per-app domain wins over aggregate if both exist if f.follower_inbox not in inbox_to_domain or app_dom != "federation": inbox_to_domain[f.follower_inbox] = app_dom if not inbox_to_domain: if already_delivered: log.info("All inbox(es) already delivered for %s", activity.activity_id) return if already_delivered: log.info( "Skipping %d already-delivered inbox(es), delivering to %d remaining", len(already_delivered), len(inbox_to_domain), ) # Group by domain to reuse activity JSON per domain domain_inboxes: dict[str, list[str]] = defaultdict(list) for inbox_url, app_dom in inbox_to_domain.items(): domain_inboxes[app_dom].append(inbox_url) log.info( "Delivering %s to %d inbox(es) for @%s across %d domain(s)", activity.activity_type, len(inbox_to_domain), actor.preferred_username, len(domain_inboxes), ) async with httpx.AsyncClient() as client: for app_dom, inboxes in domain_inboxes.items(): domain = _domain_for_app(app_dom) activity_json = _build_activity_json(activity, actor, domain) for inbox_url in inboxes: status_code = await _deliver_to_inbox( client, inbox_url, activity_json, actor, domain ) if status_code is not None and status_code < 300: session.add(APDeliveryLog( activity_id=activity.id, inbox_url=inbox_url, status_code=status_code, )) await session.flush() # Wildcard: fires for every activity register_activity_handler("*", on_any_activity)