From ea2a2c87b63c7193125918e2e91e5412241c6f88 Mon Sep 17 00:00:00 2001 From: giles Date: Sat, 21 Feb 2026 16:00:20 +0000 Subject: [PATCH] Enhanced inbox: Follow/Accept, Undo, HTTP sig verification - Inbox POST now handles Follow (add follower + send Accept), Undo (remove follower), Like/Announce (logged) - HTTP signature verification (best-effort, logs but doesn't block) - CSRF exempt on inbox (external servers POST here) - Updated shared submodule with event handlers + delivery + anchoring Co-Authored-By: Claude Opus 4.6 --- bp/actors/routes.py | 217 +++++++++++++++++++++++++++++++++++++++----- shared | 2 +- 2 files changed, 194 insertions(+), 25 deletions(-) diff --git a/bp/actors/routes.py b/bp/actors/routes.py index b6e4f44..d045dbd 100644 --- a/bp/actors/routes.py +++ b/bp/actors/routes.py @@ -5,18 +5,84 @@ Ported from ~/art-dag/activity-pub/app/routers/users.py. from __future__ import annotations import json +import logging import os +import httpx from quart import Blueprint, request, abort, Response, g, render_template +from sqlalchemy import select from shared.services.registry import services -from shared.models.federation import APInboxItem +from shared.models.federation import ActorProfile, APInboxItem +from shared.browser.app.csrf import csrf_exempt + +log = logging.getLogger(__name__) + +AP_CONTENT_TYPE = "application/activity+json" def _domain() -> str: return os.getenv("AP_DOMAIN", "rose-ash.com") +async def _fetch_remote_actor(actor_url: str) -> dict | None: + """Fetch a remote actor's JSON-LD profile.""" + try: + async with httpx.AsyncClient(timeout=10) as client: + resp = await client.get( + actor_url, + headers={"Accept": AP_CONTENT_TYPE}, + ) + if resp.status_code == 200: + return resp.json() + except Exception: + log.exception("Failed to fetch remote actor: %s", actor_url) + return None + + +async def _send_accept( + actor: ActorProfile, + follow_activity: dict, + follower_inbox: str, +) -> None: + """Send an Accept activity back to the follower.""" + from shared.utils.http_signatures import sign_request + + domain = _domain() + username = actor.preferred_username + actor_url = f"https://{domain}/users/{username}" + + accept = { + "@context": "https://www.w3.org/ns/activitystreams", + "type": "Accept", + "actor": actor_url, + "object": follow_activity, + } + + body_bytes = json.dumps(accept).encode() + key_id = f"{actor_url}#main-key" + + headers = sign_request( + method="POST", + url=follower_inbox, + body=body_bytes, + private_key_pem=actor.private_key_pem, + key_id=key_id, + ) + headers["Content-Type"] = AP_CONTENT_TYPE + + try: + async with httpx.AsyncClient(timeout=15) as client: + resp = await client.post( + follower_inbox, + content=body_bytes, + headers=headers, + ) + log.info("Accept → %s: %d", follower_inbox, resp.status_code) + except Exception: + log.exception("Failed to send Accept to %s", follower_inbox) + + def register(url_prefix="/users"): bp = Blueprint("actors", __name__, url_prefix=url_prefix) @@ -54,7 +120,7 @@ def register(url_prefix="/users"): } return Response( response=json.dumps(actor_json), - content_type="application/activity+json", + content_type=AP_CONTENT_TYPE, ) # HTML profile page @@ -88,7 +154,7 @@ def register(url_prefix="/users"): "totalItems": total, "first": f"{actor_id}/outbox?page=1", }), - content_type="application/activity+json", + content_type=AP_CONTENT_TYPE, ) page_num = int(page_param) @@ -119,9 +185,10 @@ def register(url_prefix="/users"): "totalItems": total, "orderedItems": items, }), - content_type="application/activity+json", + content_type=AP_CONTENT_TYPE, ) + @csrf_exempt @bp.post("//inbox") async def inbox(username: str): actor = await services.federation.get_actor_by_username(g.s, username) @@ -132,9 +199,37 @@ def register(url_prefix="/users"): if not body: abort(400, "Invalid JSON") - # Store raw inbox item for async processing - from shared.models.federation import ActorProfile - from sqlalchemy import select + activity_type = body.get("type", "") + from_actor_url = body.get("actor", "") + + # Verify HTTP signature (best-effort — log but don't block yet + # while we validate our implementation against real servers) + sig_valid = False + try: + from shared.utils.http_signatures import verify_request_signature + raw_body = await request.get_data() + req_headers = dict(request.headers) + req_headers["(request-target)"] = f"post /users/{username}/inbox" + + # Fetch remote actor to get their public key + remote_actor = await _fetch_remote_actor(from_actor_url) + if remote_actor: + pub_key_pem = (remote_actor.get("publicKey") or {}).get("publicKeyPem") + if pub_key_pem: + sig_valid = verify_request_signature( + headers=req_headers, + public_key_pem=pub_key_pem, + ) + except Exception: + log.debug("Signature verification failed for %s", from_actor_url, exc_info=True) + + if not sig_valid: + log.warning( + "Unverified inbox POST from %s (%s) — accepting anyway for now", + from_actor_url, activity_type, + ) + + # Load actor row for DB operations actor_row = ( await g.s.execute( select(ActorProfile).where( @@ -143,31 +238,105 @@ def register(url_prefix="/users"): ) ).scalar_one() + # Store raw inbox item item = APInboxItem( actor_profile_id=actor_row.id, raw_json=body, - activity_type=body.get("type"), - from_actor=body.get("actor"), + activity_type=activity_type, + from_actor=from_actor_url, ) g.s.add(item) await g.s.flush() - # Emit domain event for processing - from shared.events import emit_event - await emit_event( - g.s, - "federation.inbox_received", - "APInboxItem", - item.id, - { - "actor_username": username, - "activity_type": body.get("type"), - "from_actor": body.get("actor"), - }, - ) + # Handle activity types inline + if activity_type == "Follow": + await _handle_follow(actor_row, body, from_actor_url) + elif activity_type == "Undo": + await _handle_undo(actor_row, body, from_actor_url) + elif activity_type in ("Like", "Announce"): + log.info("Received %s from %s (noted)", activity_type, from_actor_url) + + # Mark as processed + item.state = "processed" + from datetime import datetime, timezone + item.processed_at = datetime.now(timezone.utc) + await g.s.flush() return Response(status=202) + async def _handle_follow( + actor_row: ActorProfile, + body: dict, + from_actor_url: str, + ) -> None: + """Process a Follow activity: add follower, send Accept.""" + remote_actor = await _fetch_remote_actor(from_actor_url) + if not remote_actor: + log.warning("Could not fetch remote actor for Follow: %s", from_actor_url) + return + + follower_inbox = remote_actor.get("inbox") + if not follower_inbox: + log.warning("Remote actor has no inbox: %s", from_actor_url) + return + + # Derive acct from preferredUsername@domain + remote_username = remote_actor.get("preferredUsername", "") + from urllib.parse import urlparse + remote_domain = urlparse(from_actor_url).netloc + follower_acct = f"{remote_username}@{remote_domain}" if remote_username else from_actor_url + + pub_key = (remote_actor.get("publicKey") or {}).get("publicKeyPem") + + # Add follower via service + await services.federation.add_follower( + g.s, + actor_row.preferred_username, + follower_acct=follower_acct, + follower_inbox=follower_inbox, + follower_actor_url=from_actor_url, + follower_public_key=pub_key, + ) + + log.info( + "New follower: %s → @%s", + follower_acct, actor_row.preferred_username, + ) + + # Send Accept + await _send_accept(actor_row, body, follower_inbox) + + async def _handle_undo( + actor_row: ActorProfile, + body: dict, + from_actor_url: str, + ) -> None: + """Process an Undo activity (typically Undo Follow).""" + inner = body.get("object") + if not inner: + return + + inner_type = inner.get("type") if isinstance(inner, dict) else None + if inner_type == "Follow": + # Derive acct + from urllib.parse import urlparse + remote_domain = urlparse(from_actor_url).netloc + remote_actor = await _fetch_remote_actor(from_actor_url) + remote_username = "" + if remote_actor: + remote_username = remote_actor.get("preferredUsername", "") + follower_acct = f"{remote_username}@{remote_domain}" if remote_username else from_actor_url + + removed = await services.federation.remove_follower( + g.s, actor_row.preferred_username, follower_acct, + ) + if removed: + log.info("Unfollowed: %s → @%s", follower_acct, actor_row.preferred_username) + else: + log.debug("Undo Follow: follower not found: %s", follower_acct) + else: + log.debug("Undo for %s — not handled", inner_type) + @bp.get("//followers") async def followers(username: str): actor = await services.federation.get_actor_by_username(g.s, username) @@ -185,7 +354,7 @@ def register(url_prefix="/users"): "totalItems": len(follower_list), "orderedItems": [f.follower_actor_url for f in follower_list], }), - content_type="application/activity+json", + content_type=AP_CONTENT_TYPE, ) @bp.get("//following") @@ -203,7 +372,7 @@ def register(url_prefix="/users"): "totalItems": 0, "orderedItems": [], }), - content_type="application/activity+json", + content_type=AP_CONTENT_TYPE, ) return bp diff --git a/shared b/shared index 8850a01..dd7a99e 160000 --- a/shared +++ b/shared @@ -1 +1 @@ -Subproject commit 8850a0106a51acb55d5c7b84dd45b0b012b6333e +Subproject commit dd7a99e8b73264120912bf5800eea38ce064868d