"""Deliver outbound Follow activities to remote inboxes. When send_follow() emits a Follow activity via emit_activity(), this handler picks it up and POSTs the signed Follow to the remote actor's inbox. On failure the EventProcessor retries automatically. object_data layout: { "target_inbox": "https://remote.example/inbox", "target_actor_url": "https://remote.example/users/alice", "following_id": 42, # APFollowing row id } """ from __future__ import annotations import json import logging import os from urllib.parse import urlparse import httpx from sqlalchemy.ext.asyncio import AsyncSession from sqlalchemy import select from shared.events.bus import register_activity_handler from shared.models.federation import ActorProfile, APActivity log = logging.getLogger(__name__) DELIVERY_TIMEOUT = 15 async def on_follow_activity(activity: APActivity, session: AsyncSession) -> None: """Deliver a Follow activity to the remote actor's inbox.""" if activity.visibility != "public": return if activity.actor_profile_id is None: return obj = activity.object_data or {} target_inbox = obj.get("target_inbox") target_actor_url = obj.get("target_actor_url") if not target_inbox or not target_actor_url: log.warning("Follow activity %s missing target_inbox or target_actor_url", activity.id) return actor = ( await session.execute( select(ActorProfile).where(ActorProfile.id == activity.actor_profile_id) ) ).scalar_one_or_none() if not actor or not actor.private_key_pem: log.warning("Actor not found or missing key for Follow activity %s", activity.id) return # Build the Follow activity JSON from shared.infrastructure.activitypub import _ap_domain origin_app = activity.origin_app or "federation" domain = _ap_domain(origin_app) actor_url = f"https://{domain}/users/{actor.preferred_username}" follow_json = { "@context": "https://www.w3.org/ns/activitystreams", "id": activity.activity_id, "type": "Follow", "actor": actor_url, "object": target_actor_url, } # Sign and deliver from shared.utils.http_signatures import sign_request body_bytes = json.dumps(follow_json).encode() parsed = urlparse(target_inbox) headers = sign_request( private_key_pem=actor.private_key_pem, key_id=f"{actor_url}#main-key", method="POST", path=parsed.path, host=parsed.netloc, body=body_bytes, ) headers["Content-Type"] = "application/activity+json" async with httpx.AsyncClient(timeout=DELIVERY_TIMEOUT) as client: resp = await client.post(target_inbox, content=body_bytes, headers=headers) if resp.status_code >= 300: raise RuntimeError( f"Follow delivery to {target_inbox} failed: {resp.status_code} {resp.text[:200]}" ) log.info("Follow delivered to %s → %d", target_inbox, resp.status_code) register_activity_handler("Follow", on_follow_activity)