"""Deliver activities to external service inboxes via signed HTTP POST. External services (like artdag) that don't share the coop database receive activities via HTTP, authenticated with the same HTTP Signatures used for ActivityPub federation. Config via env: EXTERNAL_INBOXES=name|url,name2|url2,... """ from __future__ import annotations import json import logging import os from urllib.parse import urlparse import httpx from sqlalchemy import select from sqlalchemy.ext.asyncio import AsyncSession from shared.events.bus import register_activity_handler from shared.models.federation import ActorProfile, APActivity from shared.utils.http_signatures import sign_request log = logging.getLogger(__name__) # Activity types to deliver externally _DELIVERABLE_TYPES = {"rose:DeviceAuth"} def _get_external_inboxes() -> list[tuple[str, str]]: """Parse EXTERNAL_INBOXES env var into [(name, url), ...].""" raw = os.environ.get("EXTERNAL_INBOXES", "") if not raw: return [] result = [] for entry in raw.split(","): entry = entry.strip() if "|" in entry: name, url = entry.split("|", 1) result.append((name.strip(), url.strip())) return result def _get_ap_domain() -> str: return os.environ.get("AP_DOMAIN", "federation.rose-ash.com") async def on_external_activity(activity: APActivity, session: AsyncSession) -> None: """Deliver matching activities to configured external inboxes.""" if activity.activity_type not in _DELIVERABLE_TYPES: return inboxes = _get_external_inboxes() if not inboxes: return # Get the first actor profile for signing actor = await session.scalar(select(ActorProfile).limit(1)) if not actor: log.warning("No ActorProfile available for signing external deliveries") return domain = _get_ap_domain() key_id = f"https://{domain}/users/{actor.preferred_username}#main-key" payload = { "@context": "https://www.w3.org/ns/activitystreams", "type": activity.activity_type, "actor": activity.actor_uri, "object": activity.object_data, } if activity.published: payload["published"] = activity.published.isoformat() body_bytes = json.dumps(payload).encode() for name, inbox_url in inboxes: parsed = urlparse(inbox_url) headers = sign_request( private_key_pem=actor.private_key_pem, key_id=key_id, method="POST", path=parsed.path, host=parsed.netloc, body=body_bytes, ) headers["Content-Type"] = "application/activity+json" try: async with httpx.AsyncClient(timeout=3) as client: resp = await client.post(inbox_url, content=body_bytes, headers=headers) log.info( "External delivery to %s: %d", name, resp.status_code, ) except Exception: log.warning("External delivery to %s failed", name, exc_info=True) # Register for all deliverable types for _t in _DELIVERABLE_TYPES: register_activity_handler(_t, on_external_activity)