Add external delivery handler for cross-service AP activities

Delivers rose:DeviceAuth activities to configured external inboxes
(e.g. artdag) via signed HTTP POST. Config via EXTERNAL_INBOXES env var.

Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
This commit is contained in:
giles
2026-02-24 00:21:50 +00:00
parent 25ac3db644
commit beac1b3dab
2 changed files with 102 additions and 0 deletions

View File

@@ -7,3 +7,4 @@ def register_shared_handlers():
import shared.events.handlers.login_handlers # noqa: F401
import shared.events.handlers.order_handlers # noqa: F401
import shared.events.handlers.ap_delivery_handler # noqa: F401
import shared.events.handlers.external_delivery_handler # noqa: F401

View File

@@ -0,0 +1,101 @@
"""Deliver activities to external service inboxes via signed HTTP POST.
External services (like artdag) that don't share the coop database receive
activities via HTTP, authenticated with the same HTTP Signatures used for
ActivityPub federation.
Config via env: EXTERNAL_INBOXES=name|url,name2|url2,...
"""
from __future__ import annotations
import json
import logging
import os
from urllib.parse import urlparse
import httpx
from sqlalchemy import select
from sqlalchemy.ext.asyncio import AsyncSession
from shared.events.bus import register_activity_handler
from shared.models.federation import ActorProfile, APActivity
from shared.utils.http_signatures import sign_request
log = logging.getLogger(__name__)
# Activity types to deliver externally
_DELIVERABLE_TYPES = {"rose:DeviceAuth"}
def _get_external_inboxes() -> list[tuple[str, str]]:
"""Parse EXTERNAL_INBOXES env var into [(name, url), ...]."""
raw = os.environ.get("EXTERNAL_INBOXES", "")
if not raw:
return []
result = []
for entry in raw.split(","):
entry = entry.strip()
if "|" in entry:
name, url = entry.split("|", 1)
result.append((name.strip(), url.strip()))
return result
def _get_ap_domain() -> str:
return os.environ.get("AP_DOMAIN", "federation.rose-ash.com")
async def on_external_activity(activity: APActivity, session: AsyncSession) -> None:
"""Deliver matching activities to configured external inboxes."""
if activity.activity_type not in _DELIVERABLE_TYPES:
return
inboxes = _get_external_inboxes()
if not inboxes:
return
# Get the first actor profile for signing
actor = await session.scalar(select(ActorProfile).limit(1))
if not actor:
log.warning("No ActorProfile available for signing external deliveries")
return
domain = _get_ap_domain()
key_id = f"https://{domain}/users/{actor.preferred_username}#main-key"
payload = {
"@context": "https://www.w3.org/ns/activitystreams",
"type": activity.activity_type,
"actor": activity.actor_uri,
"object": activity.object_data,
}
if activity.published:
payload["published"] = activity.published.isoformat()
body_bytes = json.dumps(payload).encode()
for name, inbox_url in inboxes:
parsed = urlparse(inbox_url)
headers = sign_request(
private_key_pem=actor.private_key_pem,
key_id=key_id,
method="POST",
path=parsed.path,
host=parsed.netloc,
body=body_bytes,
)
headers["Content-Type"] = "application/activity+json"
try:
async with httpx.AsyncClient(timeout=3) as client:
resp = await client.post(inbox_url, content=body_bytes, headers=headers)
log.info(
"External delivery to %s: %d",
name, resp.status_code,
)
except Exception:
log.warning("External delivery to %s failed", name, exc_info=True)
# Register for all deliverable types
for _t in _DELIVERABLE_TYPES:
register_activity_handler(_t, on_external_activity)