Mastodon verifies the object id domain matches the actor domain.
Using the post URL (coop.rose-ash.com) as object id caused silent
rejection. Now always uses {activity_id}/object on federation domain.
Also adds to/cc on object for visibility determination.
Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
170 lines
5.3 KiB
Python
170 lines
5.3 KiB
Python
"""Deliver AP activities to remote followers.
|
|
|
|
On ``federation.activity_created`` → load activity + actor + followers →
|
|
sign with HTTP Signatures → POST to each follower inbox.
|
|
"""
|
|
from __future__ import annotations
|
|
|
|
import logging
|
|
|
|
import httpx
|
|
from sqlalchemy import select
|
|
from sqlalchemy.ext.asyncio import AsyncSession
|
|
|
|
from shared.events.bus import register_handler, DomainEvent
|
|
from shared.models.federation import ActorProfile, APActivity, APFollower
|
|
from shared.services.registry import services
|
|
|
|
log = logging.getLogger(__name__)
|
|
|
|
AP_CONTENT_TYPE = "application/activity+json"
|
|
DELIVERY_TIMEOUT = 15 # seconds per request
|
|
|
|
|
|
def _build_activity_json(activity: APActivity, actor: ActorProfile, domain: str) -> dict:
|
|
"""Build the full AP activity JSON-LD for delivery."""
|
|
username = actor.preferred_username
|
|
actor_url = f"https://{domain}/users/{username}"
|
|
|
|
obj = dict(activity.object_data or {})
|
|
|
|
# Object id MUST be on the actor's domain (Mastodon origin check).
|
|
# The post URL (e.g. coop.rose-ash.com/slug/) goes in "url" only.
|
|
object_id = activity.activity_id + "/object"
|
|
|
|
if activity.activity_type == "Delete":
|
|
# Delete: object is a Tombstone with just id + type
|
|
obj.setdefault("id", object_id)
|
|
obj.setdefault("type", "Tombstone")
|
|
else:
|
|
# Create/Update: full object with attribution
|
|
obj["id"] = object_id
|
|
obj.setdefault("type", activity.object_type)
|
|
obj.setdefault("attributedTo", actor_url)
|
|
obj.setdefault("published", activity.published.isoformat() if activity.published else None)
|
|
obj.setdefault("to", ["https://www.w3.org/ns/activitystreams#Public"])
|
|
obj.setdefault("cc", [f"{actor_url}/followers"])
|
|
|
|
return {
|
|
"@context": [
|
|
"https://www.w3.org/ns/activitystreams",
|
|
"https://w3id.org/security/v1",
|
|
],
|
|
"id": activity.activity_id,
|
|
"type": activity.activity_type,
|
|
"actor": actor_url,
|
|
"published": activity.published.isoformat() if activity.published else None,
|
|
"to": ["https://www.w3.org/ns/activitystreams#Public"],
|
|
"cc": [f"{actor_url}/followers"],
|
|
"object": obj,
|
|
}
|
|
|
|
|
|
async def _deliver_to_inbox(
|
|
client: httpx.AsyncClient,
|
|
inbox_url: str,
|
|
body: dict,
|
|
actor: ActorProfile,
|
|
domain: str,
|
|
) -> bool:
|
|
"""POST signed activity to a single inbox. Returns True on success."""
|
|
from shared.utils.http_signatures import sign_request
|
|
from urllib.parse import urlparse
|
|
import json
|
|
|
|
body_bytes = json.dumps(body).encode()
|
|
key_id = f"https://{domain}/users/{actor.preferred_username}#main-key"
|
|
|
|
parsed = urlparse(inbox_url)
|
|
headers = sign_request(
|
|
private_key_pem=actor.private_key_pem,
|
|
key_id=key_id,
|
|
method="POST",
|
|
path=parsed.path,
|
|
host=parsed.netloc,
|
|
body=body_bytes,
|
|
)
|
|
headers["Content-Type"] = AP_CONTENT_TYPE
|
|
|
|
try:
|
|
resp = await client.post(
|
|
inbox_url,
|
|
content=body_bytes,
|
|
headers=headers,
|
|
timeout=DELIVERY_TIMEOUT,
|
|
)
|
|
if resp.status_code < 300:
|
|
log.info("Delivered to %s → %d", inbox_url, resp.status_code)
|
|
return True
|
|
else:
|
|
log.warning("Delivery to %s → %d: %s", inbox_url, resp.status_code, resp.text[:200])
|
|
return False
|
|
except Exception:
|
|
log.exception("Delivery failed for %s", inbox_url)
|
|
return False
|
|
|
|
|
|
async def on_activity_created(event: DomainEvent, session: AsyncSession) -> None:
|
|
"""Deliver a newly created activity to all followers."""
|
|
import os
|
|
|
|
if not services.has("federation"):
|
|
return
|
|
|
|
payload = event.payload
|
|
activity_id_uri = payload.get("activity_id")
|
|
if not activity_id_uri:
|
|
return
|
|
|
|
domain = os.getenv("AP_DOMAIN", "rose-ash.com")
|
|
|
|
# Load the activity
|
|
activity = (
|
|
await session.execute(
|
|
select(APActivity).where(APActivity.activity_id == activity_id_uri)
|
|
)
|
|
).scalar_one_or_none()
|
|
if not activity:
|
|
log.warning("Activity not found: %s", activity_id_uri)
|
|
return
|
|
|
|
# Load actor with private key
|
|
actor = (
|
|
await session.execute(
|
|
select(ActorProfile).where(ActorProfile.id == activity.actor_profile_id)
|
|
)
|
|
).scalar_one_or_none()
|
|
if not actor or not actor.private_key_pem:
|
|
log.warning("Actor not found or missing key for activity %s", activity_id_uri)
|
|
return
|
|
|
|
# Load followers
|
|
followers = (
|
|
await session.execute(
|
|
select(APFollower).where(APFollower.actor_profile_id == actor.id)
|
|
)
|
|
).scalars().all()
|
|
|
|
if not followers:
|
|
log.debug("No followers to deliver to for %s", activity_id_uri)
|
|
return
|
|
|
|
# Build activity JSON
|
|
activity_json = _build_activity_json(activity, actor, domain)
|
|
|
|
# Deliver to each follower inbox
|
|
# Deduplicate inboxes (multiple followers might share a shared inbox)
|
|
inboxes = {f.follower_inbox for f in followers if f.follower_inbox}
|
|
|
|
log.info(
|
|
"Delivering %s to %d inbox(es) for @%s",
|
|
activity.activity_type, len(inboxes), actor.preferred_username,
|
|
)
|
|
|
|
async with httpx.AsyncClient() as client:
|
|
for inbox_url in inboxes:
|
|
await _deliver_to_inbox(client, inbox_url, activity_json, actor, domain)
|
|
|
|
|
|
register_handler("federation.activity_created", on_activity_created)
|