Add app_domain to APDeliveryLog so the same activity can be delivered to the same inbox under different actor identities (blog + federation).
251 lines
8.7 KiB
Python
251 lines
8.7 KiB
Python
"""Deliver AP activities to remote followers.
|
|
|
|
Registered as a wildcard handler — fires for every activity. Skips
|
|
non-public activities and those without an actor profile.
|
|
|
|
Per-app delivery: activities are delivered using the domain that matches
|
|
the follower's subscription. A follower of ``@alice@blog.rose-ash.com``
|
|
receives activities with ``actor: https://blog.rose-ash.com/users/alice``
|
|
and signatures using that domain's key_id. Aggregate followers
|
|
(``app_domain='federation'``) receive the federation domain identity.
|
|
|
|
Idempotent: successful deliveries are recorded in ap_delivery_log.
|
|
On retry (at-least-once reaper), already-delivered inboxes are skipped.
|
|
"""
|
|
from __future__ import annotations
|
|
|
|
import logging
|
|
import os
|
|
from collections import defaultdict
|
|
|
|
import httpx
|
|
from sqlalchemy import select, or_
|
|
from sqlalchemy.ext.asyncio import AsyncSession
|
|
|
|
from shared.events.bus import register_activity_handler
|
|
from shared.models.federation import ActorProfile, APActivity, APFollower, APDeliveryLog
|
|
from shared.services.registry import services
|
|
|
|
log = logging.getLogger(__name__)
|
|
|
|
AP_CONTENT_TYPE = "application/activity+json"
|
|
DELIVERY_TIMEOUT = 15 # seconds per request
|
|
|
|
|
|
def _domain_for_app(app_name: str) -> str:
|
|
"""Resolve the public AP domain for an app name."""
|
|
from shared.infrastructure.activitypub import _ap_domain
|
|
return _ap_domain(app_name)
|
|
|
|
|
|
def _build_activity_json(activity: APActivity, actor: ActorProfile, domain: str) -> dict:
|
|
"""Build the full AP activity JSON-LD for delivery."""
|
|
username = actor.preferred_username
|
|
actor_url = f"https://{domain}/users/{username}"
|
|
|
|
obj = dict(activity.object_data or {})
|
|
|
|
# Rewrite all URLs from the federation domain to the delivery domain
|
|
# so Mastodon's origin check passes (all IDs must match actor host).
|
|
import re
|
|
fed_domain = os.getenv("AP_DOMAIN", "federation.rose-ash.com")
|
|
|
|
def _rewrite(url: str) -> str:
|
|
if isinstance(url, str) and fed_domain in url:
|
|
return url.replace(f"https://{fed_domain}", f"https://{domain}")
|
|
return url
|
|
|
|
activity_id = _rewrite(activity.activity_id)
|
|
object_id = activity_id + "/object"
|
|
|
|
# Rewrite any federation-domain URLs in object_data
|
|
if "id" in obj:
|
|
obj["id"] = _rewrite(obj["id"])
|
|
if "attributedTo" in obj:
|
|
obj["attributedTo"] = _rewrite(obj["attributedTo"])
|
|
|
|
if activity.activity_type == "Delete":
|
|
obj.setdefault("id", object_id)
|
|
obj.setdefault("type", "Tombstone")
|
|
else:
|
|
obj.setdefault("id", object_id)
|
|
obj.setdefault("type", activity.object_type)
|
|
obj.setdefault("attributedTo", actor_url)
|
|
obj.setdefault("published", activity.published.isoformat() if activity.published else None)
|
|
obj.setdefault("to", ["https://www.w3.org/ns/activitystreams#Public"])
|
|
obj.setdefault("cc", [f"{actor_url}/followers"])
|
|
if activity.activity_type == "Update":
|
|
from datetime import datetime, timezone
|
|
obj["updated"] = datetime.now(timezone.utc).isoformat()
|
|
|
|
return {
|
|
"@context": [
|
|
"https://www.w3.org/ns/activitystreams",
|
|
"https://w3id.org/security/v1",
|
|
],
|
|
"id": activity_id,
|
|
"type": activity.activity_type,
|
|
"actor": actor_url,
|
|
"published": activity.published.isoformat() if activity.published else None,
|
|
"to": ["https://www.w3.org/ns/activitystreams#Public"],
|
|
"cc": [f"{actor_url}/followers"],
|
|
"object": obj,
|
|
}
|
|
|
|
|
|
async def _deliver_to_inbox(
|
|
client: httpx.AsyncClient,
|
|
inbox_url: str,
|
|
body: dict,
|
|
actor: ActorProfile,
|
|
domain: str,
|
|
) -> int | None:
|
|
"""POST signed activity to a single inbox. Returns status code or None on error."""
|
|
from shared.utils.http_signatures import sign_request
|
|
from urllib.parse import urlparse
|
|
import json
|
|
|
|
body_bytes = json.dumps(body).encode()
|
|
key_id = f"https://{domain}/users/{actor.preferred_username}#main-key"
|
|
|
|
parsed = urlparse(inbox_url)
|
|
headers = sign_request(
|
|
private_key_pem=actor.private_key_pem,
|
|
key_id=key_id,
|
|
method="POST",
|
|
path=parsed.path,
|
|
host=parsed.netloc,
|
|
body=body_bytes,
|
|
)
|
|
headers["Content-Type"] = AP_CONTENT_TYPE
|
|
|
|
try:
|
|
resp = await client.post(
|
|
inbox_url,
|
|
content=body_bytes,
|
|
headers=headers,
|
|
timeout=DELIVERY_TIMEOUT,
|
|
)
|
|
if resp.status_code < 300:
|
|
log.info("Delivered to %s → %d", inbox_url, resp.status_code)
|
|
else:
|
|
log.warning("Delivery to %s → %d: %s", inbox_url, resp.status_code, resp.text[:200])
|
|
return resp.status_code
|
|
except Exception:
|
|
log.exception("Delivery failed for %s", inbox_url)
|
|
return None
|
|
|
|
|
|
async def on_any_activity(activity: APActivity, session: AsyncSession) -> None:
|
|
"""Deliver a public activity to all matching followers of its actor."""
|
|
|
|
# Only deliver public activities that have an actor profile
|
|
if activity.visibility != "public":
|
|
return
|
|
if activity.actor_profile_id is None:
|
|
return
|
|
if not services.has("federation"):
|
|
return
|
|
|
|
# Load actor with private key
|
|
actor = (
|
|
await session.execute(
|
|
select(ActorProfile).where(ActorProfile.id == activity.actor_profile_id)
|
|
)
|
|
).scalar_one_or_none()
|
|
if not actor or not actor.private_key_pem:
|
|
log.warning("Actor not found or missing key for activity %s", activity.activity_id)
|
|
return
|
|
|
|
# Load matching followers.
|
|
# Aggregate followers (app_domain='federation') always get everything.
|
|
# Per-app followers only get activities from their app.
|
|
origin_app = activity.origin_app
|
|
follower_filters = [APFollower.actor_profile_id == actor.id]
|
|
|
|
if origin_app and origin_app != "federation":
|
|
follower_filters.append(
|
|
or_(
|
|
APFollower.app_domain == "federation",
|
|
APFollower.app_domain == origin_app,
|
|
)
|
|
)
|
|
|
|
followers = (
|
|
await session.execute(
|
|
select(APFollower).where(*follower_filters)
|
|
)
|
|
).scalars().all()
|
|
|
|
if not followers:
|
|
log.debug("No followers to deliver to for %s", activity.activity_id)
|
|
return
|
|
|
|
# Check delivery log — skip (inbox, domain) pairs already delivered (idempotency)
|
|
existing = (
|
|
await session.execute(
|
|
select(APDeliveryLog.inbox_url, APDeliveryLog.app_domain).where(
|
|
APDeliveryLog.activity_id == activity.id,
|
|
APDeliveryLog.status_code < 300,
|
|
)
|
|
)
|
|
).all()
|
|
already_delivered: set[tuple[str, str]] = {(r[0], r[1]) for r in existing}
|
|
|
|
# Collect all (inbox, app_domain) pairs to deliver to.
|
|
# Each follower subscription gets its own delivery with the correct
|
|
# actor identity, so followers of @user@blog and @user@federation
|
|
# both see posts on their respective actor profiles.
|
|
delivery_pairs: set[tuple[str, str]] = set()
|
|
for f in followers:
|
|
if not f.follower_inbox:
|
|
continue
|
|
app_dom = f.app_domain or "federation"
|
|
pair = (f.follower_inbox, app_dom)
|
|
if pair not in already_delivered:
|
|
delivery_pairs.add(pair)
|
|
|
|
if not delivery_pairs:
|
|
if already_delivered:
|
|
log.info("All deliveries already done for %s", activity.activity_id)
|
|
return
|
|
|
|
if already_delivered:
|
|
log.info(
|
|
"Skipping %d already-delivered, delivering to %d remaining",
|
|
len(already_delivered), len(delivery_pairs),
|
|
)
|
|
|
|
# Group by domain to reuse activity JSON per domain
|
|
domain_inboxes: dict[str, list[str]] = defaultdict(list)
|
|
for inbox_url, app_dom in delivery_pairs:
|
|
domain_inboxes[app_dom].append(inbox_url)
|
|
|
|
log.info(
|
|
"Delivering %s to %d target(s) for @%s across %d domain(s)",
|
|
activity.activity_type, len(delivery_pairs),
|
|
actor.preferred_username, len(domain_inboxes),
|
|
)
|
|
|
|
async with httpx.AsyncClient() as client:
|
|
for app_dom, inboxes in domain_inboxes.items():
|
|
domain = _domain_for_app(app_dom)
|
|
activity_json = _build_activity_json(activity, actor, domain)
|
|
|
|
for inbox_url in inboxes:
|
|
status_code = await _deliver_to_inbox(
|
|
client, inbox_url, activity_json, actor, domain
|
|
)
|
|
if status_code is not None and status_code < 300:
|
|
session.add(APDeliveryLog(
|
|
activity_id=activity.id,
|
|
inbox_url=inbox_url,
|
|
app_domain=app_dom,
|
|
status_code=status_code,
|
|
))
|
|
await session.flush()
|
|
|
|
|
|
# Wildcard: fires for every activity
|
|
register_activity_handler("*", on_any_activity)
|