This repository has been archived on 2026-02-24. You can view files and clone it. You cannot open issues or pull requests or push a commit.
Files
shared/events/handlers/ap_delivery_handler.py
giles fd163b577f Inline federation publication + fix AP delivery
- Replace async federation_handlers with inline try_publish() at write sites
- Fix ap_delivery_handler: urlparse for signature path/host, @context array
  with security vocab, Delete/Tombstone object handling
- Fix federation_impl: @context array for IPFS, .limit(1) + upsert follower

Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
2026-02-22 08:24:29 +00:00

163 lines
5.0 KiB
Python

"""Deliver AP activities to remote followers.
On ``federation.activity_created`` → load activity + actor + followers →
sign with HTTP Signatures → POST to each follower inbox.
"""
from __future__ import annotations
import logging
import httpx
from sqlalchemy import select
from sqlalchemy.ext.asyncio import AsyncSession
from shared.events.bus import register_handler, DomainEvent
from shared.models.federation import ActorProfile, APActivity, APFollower
from shared.services.registry import services
log = logging.getLogger(__name__)
AP_CONTENT_TYPE = "application/activity+json"
DELIVERY_TIMEOUT = 15 # seconds per request
def _build_activity_json(activity: APActivity, actor: ActorProfile, domain: str) -> dict:
"""Build the full AP activity JSON-LD for delivery."""
username = actor.preferred_username
actor_url = f"https://{domain}/users/{username}"
obj = dict(activity.object_data or {})
if activity.activity_type == "Delete":
# Delete: object is a Tombstone with just id + type
obj.setdefault("type", "Tombstone")
else:
# Create/Update: full object with attribution
obj.setdefault("id", obj.get("url") or (activity.activity_id + "/object"))
obj.setdefault("type", activity.object_type)
obj.setdefault("attributedTo", actor_url)
obj.setdefault("published", activity.published.isoformat() if activity.published else None)
return {
"@context": [
"https://www.w3.org/ns/activitystreams",
"https://w3id.org/security/v1",
],
"id": activity.activity_id,
"type": activity.activity_type,
"actor": actor_url,
"published": activity.published.isoformat() if activity.published else None,
"to": ["https://www.w3.org/ns/activitystreams#Public"],
"cc": [f"{actor_url}/followers"],
"object": obj,
}
async def _deliver_to_inbox(
client: httpx.AsyncClient,
inbox_url: str,
body: dict,
actor: ActorProfile,
domain: str,
) -> bool:
"""POST signed activity to a single inbox. Returns True on success."""
from shared.utils.http_signatures import sign_request
from urllib.parse import urlparse
import json
body_bytes = json.dumps(body).encode()
key_id = f"https://{domain}/users/{actor.preferred_username}#main-key"
parsed = urlparse(inbox_url)
headers = sign_request(
private_key_pem=actor.private_key_pem,
key_id=key_id,
method="POST",
path=parsed.path,
host=parsed.netloc,
body=body_bytes,
)
headers["Content-Type"] = AP_CONTENT_TYPE
try:
resp = await client.post(
inbox_url,
content=body_bytes,
headers=headers,
timeout=DELIVERY_TIMEOUT,
)
if resp.status_code < 300:
log.info("Delivered to %s%d", inbox_url, resp.status_code)
return True
else:
log.warning("Delivery to %s%d: %s", inbox_url, resp.status_code, resp.text[:200])
return False
except Exception:
log.exception("Delivery failed for %s", inbox_url)
return False
async def on_activity_created(event: DomainEvent, session: AsyncSession) -> None:
"""Deliver a newly created activity to all followers."""
import os
if not services.has("federation"):
return
payload = event.payload
activity_id_uri = payload.get("activity_id")
if not activity_id_uri:
return
domain = os.getenv("AP_DOMAIN", "rose-ash.com")
# Load the activity
activity = (
await session.execute(
select(APActivity).where(APActivity.activity_id == activity_id_uri)
)
).scalar_one_or_none()
if not activity:
log.warning("Activity not found: %s", activity_id_uri)
return
# Load actor with private key
actor = (
await session.execute(
select(ActorProfile).where(ActorProfile.id == activity.actor_profile_id)
)
).scalar_one_or_none()
if not actor or not actor.private_key_pem:
log.warning("Actor not found or missing key for activity %s", activity_id_uri)
return
# Load followers
followers = (
await session.execute(
select(APFollower).where(APFollower.actor_profile_id == actor.id)
)
).scalars().all()
if not followers:
log.debug("No followers to deliver to for %s", activity_id_uri)
return
# Build activity JSON
activity_json = _build_activity_json(activity, actor, domain)
# Deliver to each follower inbox
# Deduplicate inboxes (multiple followers might share a shared inbox)
inboxes = {f.follower_inbox for f in followers if f.follower_inbox}
log.info(
"Delivering %s to %d inbox(es) for @%s",
activity.activity_type, len(inboxes), actor.preferred_username,
)
async with httpx.AsyncClient() as client:
for inbox_url in inboxes:
await _deliver_to_inbox(client, inbox_url, activity_json, actor, domain)
register_handler("federation.activity_created", on_activity_created)