"""ActivityPub actor endpoints: profiles, outbox, inbox. Ported from ~/art-dag/activity-pub/app/routers/users.py. """ from __future__ import annotations import json import logging import os import uuid import httpx from quart import Blueprint, request, abort, Response, g, render_template from sqlalchemy import select from shared.services.registry import services from shared.models.federation import ActorProfile, APInboxItem from shared.browser.app.csrf import csrf_exempt log = logging.getLogger(__name__) AP_CONTENT_TYPE = "application/activity+json" def _domain() -> str: return os.getenv("AP_DOMAIN", "rose-ash.com") async def _fetch_remote_actor(actor_url: str) -> dict | None: """Fetch a remote actor's JSON-LD profile.""" try: async with httpx.AsyncClient(timeout=10) as client: resp = await client.get( actor_url, headers={"Accept": AP_CONTENT_TYPE}, ) if resp.status_code == 200: return resp.json() except Exception: log.exception("Failed to fetch remote actor: %s", actor_url) return None async def _send_accept( actor: ActorProfile, follow_activity: dict, follower_inbox: str, ) -> None: """Send an Accept activity back to the follower.""" from shared.utils.http_signatures import sign_request domain = _domain() username = actor.preferred_username actor_url = f"https://{domain}/users/{username}" accept_id = f"{actor_url}/activities/{uuid.uuid4()}" accept = { "@context": "https://www.w3.org/ns/activitystreams", "id": accept_id, "type": "Accept", "actor": actor_url, "object": follow_activity, } body_bytes = json.dumps(accept).encode() key_id = f"{actor_url}#main-key" from urllib.parse import urlparse parsed = urlparse(follower_inbox) headers = sign_request( private_key_pem=actor.private_key_pem, key_id=key_id, method="POST", path=parsed.path, host=parsed.netloc, body=body_bytes, ) headers["Content-Type"] = AP_CONTENT_TYPE try: async with httpx.AsyncClient(timeout=15) as client: resp = await client.post( follower_inbox, content=body_bytes, headers=headers, ) log.info("Accept → %s: %d", follower_inbox, resp.status_code) except Exception: log.exception("Failed to send Accept to %s", follower_inbox) def register(url_prefix="/users"): bp = Blueprint("actors", __name__, url_prefix=url_prefix) @bp.get("/") async def profile(username: str): actor = await services.federation.get_actor_by_username(g.s, username) if not actor: abort(404) domain = _domain() accept = request.headers.get("accept", "") # AP JSON-LD response if "application/activity+json" in accept or "application/ld+json" in accept: actor_json = { "@context": [ "https://www.w3.org/ns/activitystreams", "https://w3id.org/security/v1", ], "type": "Person", "id": f"https://{domain}/users/{username}", "name": actor.display_name or username, "preferredUsername": username, "summary": actor.summary or "", "manuallyApprovesFollowers": False, "inbox": f"https://{domain}/users/{username}/inbox", "outbox": f"https://{domain}/users/{username}/outbox", "followers": f"https://{domain}/users/{username}/followers", "following": f"https://{domain}/users/{username}/following", "publicKey": { "id": f"https://{domain}/users/{username}#main-key", "owner": f"https://{domain}/users/{username}", "publicKeyPem": actor.public_key_pem, }, "url": f"https://{domain}/users/{username}", } return Response( response=json.dumps(actor_json), content_type=AP_CONTENT_TYPE, ) # HTML profile page activities, total = await services.federation.get_outbox( g.s, username, page=1, per_page=20, ) return await render_template( "federation/profile.html", actor=actor, activities=activities, total=total, ) @bp.get("//outbox") async def outbox(username: str): actor = await services.federation.get_actor_by_username(g.s, username) if not actor: abort(404) domain = _domain() actor_id = f"https://{domain}/users/{username}" page_param = request.args.get("page") if not page_param: _, total = await services.federation.get_outbox(g.s, username, page=1, per_page=1) return Response( response=json.dumps({ "@context": "https://www.w3.org/ns/activitystreams", "type": "OrderedCollection", "id": f"{actor_id}/outbox", "totalItems": total, "first": f"{actor_id}/outbox?page=1", }), content_type=AP_CONTENT_TYPE, ) page_num = int(page_param) activities, total = await services.federation.get_outbox( g.s, username, page=page_num, per_page=20, ) items = [] for a in activities: items.append({ "@context": "https://www.w3.org/ns/activitystreams", "type": a.activity_type, "id": a.activity_id, "actor": actor_id, "published": a.published.isoformat() if a.published else None, "object": { "type": a.object_type, **(a.object_data or {}), }, }) return Response( response=json.dumps({ "@context": "https://www.w3.org/ns/activitystreams", "type": "OrderedCollectionPage", "id": f"{actor_id}/outbox?page={page_num}", "partOf": f"{actor_id}/outbox", "totalItems": total, "orderedItems": items, }), content_type=AP_CONTENT_TYPE, ) @csrf_exempt @bp.post("//inbox") async def inbox(username: str): actor = await services.federation.get_actor_by_username(g.s, username) if not actor: abort(404) body = await request.get_json() if not body: abort(400, "Invalid JSON") activity_type = body.get("type", "") from_actor_url = body.get("actor", "") # Verify HTTP signature (best-effort — log but don't block yet # while we validate our implementation against real servers) sig_valid = False try: from shared.utils.http_signatures import verify_request_signature req_headers = dict(request.headers) sig_header = req_headers.get("Signature", "") # Fetch remote actor to get their public key remote_actor = await _fetch_remote_actor(from_actor_url) if remote_actor and sig_header: pub_key_pem = (remote_actor.get("publicKey") or {}).get("publicKeyPem") if pub_key_pem: sig_valid = verify_request_signature( public_key_pem=pub_key_pem, signature_header=sig_header, method="POST", path=f"/users/{username}/inbox", headers=req_headers, ) except Exception: log.debug("Signature verification failed for %s", from_actor_url, exc_info=True) if not sig_valid: log.warning( "Unverified inbox POST from %s (%s) — accepting anyway for now", from_actor_url, activity_type, ) # Load actor row for DB operations actor_row = ( await g.s.execute( select(ActorProfile).where( ActorProfile.preferred_username == username ) ) ).scalar_one() # Store raw inbox item item = APInboxItem( actor_profile_id=actor_row.id, raw_json=body, activity_type=activity_type, from_actor=from_actor_url, ) g.s.add(item) await g.s.flush() # Handle activity types inline if activity_type == "Follow": await _handle_follow(actor_row, body, from_actor_url) elif activity_type == "Undo": await _handle_undo(actor_row, body, from_actor_url) elif activity_type in ("Like", "Announce"): log.info("Received %s from %s (noted)", activity_type, from_actor_url) # Mark as processed item.state = "processed" from datetime import datetime, timezone item.processed_at = datetime.now(timezone.utc) await g.s.flush() return Response(status=202) async def _handle_follow( actor_row: ActorProfile, body: dict, from_actor_url: str, ) -> None: """Process a Follow activity: add follower, send Accept.""" remote_actor = await _fetch_remote_actor(from_actor_url) if not remote_actor: log.warning("Could not fetch remote actor for Follow: %s", from_actor_url) return follower_inbox = remote_actor.get("inbox") if not follower_inbox: log.warning("Remote actor has no inbox: %s", from_actor_url) return # Derive acct from preferredUsername@domain remote_username = remote_actor.get("preferredUsername", "") from urllib.parse import urlparse remote_domain = urlparse(from_actor_url).netloc follower_acct = f"{remote_username}@{remote_domain}" if remote_username else from_actor_url pub_key = (remote_actor.get("publicKey") or {}).get("publicKeyPem") # Add follower via service await services.federation.add_follower( g.s, actor_row.preferred_username, follower_acct=follower_acct, follower_inbox=follower_inbox, follower_actor_url=from_actor_url, follower_public_key=pub_key, ) log.info( "New follower: %s → @%s", follower_acct, actor_row.preferred_username, ) # Send Accept await _send_accept(actor_row, body, follower_inbox) async def _handle_undo( actor_row: ActorProfile, body: dict, from_actor_url: str, ) -> None: """Process an Undo activity (typically Undo Follow).""" inner = body.get("object") if not inner: return inner_type = inner.get("type") if isinstance(inner, dict) else None if inner_type == "Follow": # Derive acct from urllib.parse import urlparse remote_domain = urlparse(from_actor_url).netloc remote_actor = await _fetch_remote_actor(from_actor_url) remote_username = "" if remote_actor: remote_username = remote_actor.get("preferredUsername", "") follower_acct = f"{remote_username}@{remote_domain}" if remote_username else from_actor_url removed = await services.federation.remove_follower( g.s, actor_row.preferred_username, follower_acct, ) if removed: log.info("Unfollowed: %s → @%s", follower_acct, actor_row.preferred_username) else: log.debug("Undo Follow: follower not found: %s", follower_acct) else: log.debug("Undo for %s — not handled", inner_type) @bp.get("//followers") async def followers(username: str): actor = await services.federation.get_actor_by_username(g.s, username) if not actor: abort(404) domain = _domain() collection_id = f"https://{domain}/users/{username}/followers" follower_list = await services.federation.get_followers(g.s, username) page_param = request.args.get("page") if not page_param: return Response( response=json.dumps({ "@context": "https://www.w3.org/ns/activitystreams", "type": "OrderedCollection", "id": collection_id, "totalItems": len(follower_list), "first": f"{collection_id}?page=1", }), content_type=AP_CONTENT_TYPE, ) return Response( response=json.dumps({ "@context": "https://www.w3.org/ns/activitystreams", "type": "OrderedCollectionPage", "id": f"{collection_id}?page=1", "partOf": collection_id, "totalItems": len(follower_list), "orderedItems": [f.follower_actor_url for f in follower_list], }), content_type=AP_CONTENT_TYPE, ) @bp.get("//following") async def following(username: str): actor = await services.federation.get_actor_by_username(g.s, username) if not actor: abort(404) domain = _domain() collection_id = f"https://{domain}/users/{username}/following" page_param = request.args.get("page") if not page_param: return Response( response=json.dumps({ "@context": "https://www.w3.org/ns/activitystreams", "type": "OrderedCollection", "id": collection_id, "totalItems": 0, "first": f"{collection_id}?page=1", }), content_type=AP_CONTENT_TYPE, ) return Response( response=json.dumps({ "@context": "https://www.w3.org/ns/activitystreams", "type": "OrderedCollectionPage", "id": f"{collection_id}?page=1", "partOf": collection_id, "totalItems": 0, "orderedItems": [], }), content_type=AP_CONTENT_TYPE, ) return bp