"""Deliver AP activities to remote followers. On ``federation.activity_created`` → load activity + actor + followers → sign with HTTP Signatures → POST to each follower inbox. """ from __future__ import annotations import logging import httpx from sqlalchemy import select from sqlalchemy.ext.asyncio import AsyncSession from shared.events.bus import register_handler, DomainEvent from shared.models.federation import ActorProfile, APActivity, APFollower from shared.services.registry import services log = logging.getLogger(__name__) AP_CONTENT_TYPE = "application/activity+json" DELIVERY_TIMEOUT = 15 # seconds per request def _build_activity_json(activity: APActivity, actor: ActorProfile, domain: str) -> dict: """Build the full AP activity JSON-LD for delivery.""" username = actor.preferred_username actor_url = f"https://{domain}/users/{username}" obj = dict(activity.object_data or {}) # Object id MUST be on the actor's domain (Mastodon origin check). # The post URL (e.g. coop.rose-ash.com/slug/) goes in "url" only. object_id = activity.activity_id + "/object" if activity.activity_type == "Delete": # Delete: object is a Tombstone with just id + type obj.setdefault("id", object_id) obj.setdefault("type", "Tombstone") else: # Create/Update: full object with attribution # Prefer stable id from object_data (set by try_publish), fall back to activity-derived obj.setdefault("id", object_id) obj.setdefault("type", activity.object_type) obj.setdefault("attributedTo", actor_url) obj.setdefault("published", activity.published.isoformat() if activity.published else None) obj.setdefault("to", ["https://www.w3.org/ns/activitystreams#Public"]) obj.setdefault("cc", [f"{actor_url}/followers"]) return { "@context": [ "https://www.w3.org/ns/activitystreams", "https://w3id.org/security/v1", ], "id": activity.activity_id, "type": activity.activity_type, "actor": actor_url, "published": activity.published.isoformat() if activity.published else None, "to": ["https://www.w3.org/ns/activitystreams#Public"], "cc": [f"{actor_url}/followers"], "object": obj, } async def _deliver_to_inbox( client: httpx.AsyncClient, inbox_url: str, body: dict, actor: ActorProfile, domain: str, ) -> bool: """POST signed activity to a single inbox. Returns True on success.""" from shared.utils.http_signatures import sign_request from urllib.parse import urlparse import json body_bytes = json.dumps(body).encode() key_id = f"https://{domain}/users/{actor.preferred_username}#main-key" parsed = urlparse(inbox_url) headers = sign_request( private_key_pem=actor.private_key_pem, key_id=key_id, method="POST", path=parsed.path, host=parsed.netloc, body=body_bytes, ) headers["Content-Type"] = AP_CONTENT_TYPE try: resp = await client.post( inbox_url, content=body_bytes, headers=headers, timeout=DELIVERY_TIMEOUT, ) if resp.status_code < 300: log.info("Delivered to %s → %d", inbox_url, resp.status_code) return True else: log.warning("Delivery to %s → %d: %s", inbox_url, resp.status_code, resp.text[:200]) return False except Exception: log.exception("Delivery failed for %s", inbox_url) return False async def on_activity_created(event: DomainEvent, session: AsyncSession) -> None: """Deliver a newly created activity to all followers.""" import os if not services.has("federation"): return payload = event.payload activity_id_uri = payload.get("activity_id") if not activity_id_uri: return domain = os.getenv("AP_DOMAIN", "rose-ash.com") # Load the activity activity = ( await session.execute( select(APActivity).where(APActivity.activity_id == activity_id_uri) ) ).scalar_one_or_none() if not activity: log.warning("Activity not found: %s", activity_id_uri) return # Load actor with private key actor = ( await session.execute( select(ActorProfile).where(ActorProfile.id == activity.actor_profile_id) ) ).scalar_one_or_none() if not actor or not actor.private_key_pem: log.warning("Actor not found or missing key for activity %s", activity_id_uri) return # Load followers followers = ( await session.execute( select(APFollower).where(APFollower.actor_profile_id == actor.id) ) ).scalars().all() if not followers: log.debug("No followers to deliver to for %s", activity_id_uri) return # Build activity JSON activity_json = _build_activity_json(activity, actor, domain) # Deliver to each follower inbox # Deduplicate inboxes (multiple followers might share a shared inbox) inboxes = {f.follower_inbox for f in followers if f.follower_inbox} log.info( "Delivering %s to %d inbox(es) for @%s", activity.activity_type, len(inboxes), actor.preferred_username, ) async with httpx.AsyncClient() as client: for inbox_url in inboxes: await _deliver_to_inbox(client, inbox_url, activity_json, actor, domain) register_handler("federation.activity_created", on_activity_created)