"""ActivityPub actor endpoints: profiles, outbox, inbox. Ported from ~/art-dag/activity-pub/app/routers/users.py. """ from __future__ import annotations import json import logging import os import uuid import httpx from quart import Blueprint, request, abort, Response, g, render_template from sqlalchemy import select from shared.services.registry import services from shared.models.federation import ActorProfile, APInboxItem from shared.browser.app.csrf import csrf_exempt log = logging.getLogger(__name__) AP_CONTENT_TYPE = "application/activity+json" def _domain() -> str: return os.getenv("AP_DOMAIN", "rose-ash.com") async def _fetch_remote_actor(actor_url: str) -> dict | None: """Fetch a remote actor's JSON-LD profile.""" try: async with httpx.AsyncClient(timeout=10) as client: resp = await client.get( actor_url, headers={"Accept": AP_CONTENT_TYPE}, ) if resp.status_code == 200: return resp.json() except Exception: log.exception("Failed to fetch remote actor: %s", actor_url) return None async def _send_accept( actor: ActorProfile, follow_activity: dict, follower_inbox: str, ) -> None: """Send an Accept activity back to the follower.""" from shared.utils.http_signatures import sign_request domain = _domain() username = actor.preferred_username actor_url = f"https://{domain}/users/{username}" accept_id = f"{actor_url}/activities/{uuid.uuid4()}" accept = { "@context": "https://www.w3.org/ns/activitystreams", "id": accept_id, "type": "Accept", "actor": actor_url, "object": follow_activity, } body_bytes = json.dumps(accept).encode() key_id = f"{actor_url}#main-key" from urllib.parse import urlparse parsed = urlparse(follower_inbox) headers = sign_request( private_key_pem=actor.private_key_pem, key_id=key_id, method="POST", path=parsed.path, host=parsed.netloc, body=body_bytes, ) headers["Content-Type"] = AP_CONTENT_TYPE try: async with httpx.AsyncClient(timeout=15) as client: resp = await client.post( follower_inbox, content=body_bytes, headers=headers, ) log.info("Accept → %s: %d", follower_inbox, resp.status_code) except Exception: log.exception("Failed to send Accept to %s", follower_inbox) async def _backfill_follower( actor: ActorProfile, follower_inbox: str, ) -> None: """Deliver recent Create activities to a new follower's inbox.""" from shared.events.handlers.ap_delivery_handler import ( _build_activity_json, _deliver_to_inbox, ) from shared.models.federation import APActivity domain = _domain() # Fetch recent Create activities (most recent first, limit 20) activities = ( await g.s.execute( select(APActivity).where( APActivity.actor_profile_id == actor.id, APActivity.is_local == True, # noqa: E712 APActivity.activity_type == "Create", ) .order_by(APActivity.published.desc()) .limit(20) ) ).scalars().all() if not activities: return log.info( "Backfilling %d posts to %s for @%s", len(activities), follower_inbox, actor.preferred_username, ) async with httpx.AsyncClient() as client: for activity in reversed(activities): # oldest first activity_json = _build_activity_json(activity, actor, domain) await _deliver_to_inbox(client, follower_inbox, activity_json, actor, domain) def register(url_prefix="/users"): bp = Blueprint("actors", __name__, url_prefix=url_prefix) @bp.get("/") async def profile(username: str): actor = await services.federation.get_actor_by_username(g.s, username) if not actor: abort(404) domain = _domain() accept = request.headers.get("accept", "") # AP JSON-LD response if "application/activity+json" in accept or "application/ld+json" in accept: actor_json = { "@context": [ "https://www.w3.org/ns/activitystreams", "https://w3id.org/security/v1", ], "type": "Person", "id": f"https://{domain}/users/{username}", "name": actor.display_name or username, "preferredUsername": username, "summary": actor.summary or "", "manuallyApprovesFollowers": False, "inbox": f"https://{domain}/users/{username}/inbox", "outbox": f"https://{domain}/users/{username}/outbox", "followers": f"https://{domain}/users/{username}/followers", "following": f"https://{domain}/users/{username}/following", "publicKey": { "id": f"https://{domain}/users/{username}#main-key", "owner": f"https://{domain}/users/{username}", "publicKeyPem": actor.public_key_pem, }, "url": f"https://{domain}/users/{username}", } return Response( response=json.dumps(actor_json), content_type=AP_CONTENT_TYPE, ) # HTML profile page activities, total = await services.federation.get_outbox( g.s, username, page=1, per_page=20, ) return await render_template( "federation/profile.html", actor=actor, activities=activities, total=total, ) @bp.get("//outbox") async def outbox(username: str): actor = await services.federation.get_actor_by_username(g.s, username) if not actor: abort(404) domain = _domain() actor_id = f"https://{domain}/users/{username}" page_param = request.args.get("page") if not page_param: _, total = await services.federation.get_outbox(g.s, username, page=1, per_page=1) return Response( response=json.dumps({ "@context": "https://www.w3.org/ns/activitystreams", "type": "OrderedCollection", "id": f"{actor_id}/outbox", "totalItems": total, "first": f"{actor_id}/outbox?page=1", }), content_type=AP_CONTENT_TYPE, ) page_num = int(page_param) activities, total = await services.federation.get_outbox( g.s, username, page=page_num, per_page=20, ) items = [] for a in activities: items.append({ "@context": "https://www.w3.org/ns/activitystreams", "type": a.activity_type, "id": a.activity_id, "actor": actor_id, "published": a.published.isoformat() if a.published else None, "object": { "type": a.object_type, **(a.object_data or {}), }, }) return Response( response=json.dumps({ "@context": "https://www.w3.org/ns/activitystreams", "type": "OrderedCollectionPage", "id": f"{actor_id}/outbox?page={page_num}", "partOf": f"{actor_id}/outbox", "totalItems": total, "orderedItems": items, }), content_type=AP_CONTENT_TYPE, ) @csrf_exempt @bp.post("//inbox") async def inbox(username: str): actor = await services.federation.get_actor_by_username(g.s, username) if not actor: abort(404) body = await request.get_json() if not body: abort(400, "Invalid JSON") activity_type = body.get("type", "") from_actor_url = body.get("actor", "") # Verify HTTP signature (best-effort — log but don't block yet # while we validate our implementation against real servers) sig_valid = False try: from shared.utils.http_signatures import verify_request_signature req_headers = dict(request.headers) sig_header = req_headers.get("Signature", "") # Fetch remote actor to get their public key remote_actor = await _fetch_remote_actor(from_actor_url) if remote_actor and sig_header: pub_key_pem = (remote_actor.get("publicKey") or {}).get("publicKeyPem") if pub_key_pem: sig_valid = verify_request_signature( public_key_pem=pub_key_pem, signature_header=sig_header, method="POST", path=f"/users/{username}/inbox", headers=req_headers, ) except Exception: log.debug("Signature verification failed for %s", from_actor_url, exc_info=True) if not sig_valid: log.warning( "Unverified inbox POST from %s (%s) — accepting anyway for now", from_actor_url, activity_type, ) # Load actor row for DB operations actor_row = ( await g.s.execute( select(ActorProfile).where( ActorProfile.preferred_username == username ) ) ).scalar_one() # Store raw inbox item item = APInboxItem( actor_profile_id=actor_row.id, raw_json=body, activity_type=activity_type, from_actor=from_actor_url, ) g.s.add(item) await g.s.flush() # Handle activity types inline if activity_type == "Follow": await _handle_follow(actor_row, body, from_actor_url) elif activity_type == "Undo": await _handle_undo(actor_row, body, from_actor_url) elif activity_type == "Accept": await _handle_accept(actor_row, body, from_actor_url) elif activity_type == "Create": await _handle_create(actor_row, body, from_actor_url) elif activity_type == "Update": await _handle_update(actor_row, body, from_actor_url) elif activity_type == "Delete": await _handle_delete(actor_row, body, from_actor_url) elif activity_type == "Like": await _handle_like(actor_row, body, from_actor_url) elif activity_type == "Announce": await _handle_announce(actor_row, body, from_actor_url) # Mark as processed item.state = "processed" from datetime import datetime, timezone item.processed_at = datetime.now(timezone.utc) await g.s.flush() return Response(status=202) async def _handle_follow( actor_row: ActorProfile, body: dict, from_actor_url: str, ) -> None: """Process a Follow activity: add follower, send Accept.""" remote_actor = await _fetch_remote_actor(from_actor_url) if not remote_actor: log.warning("Could not fetch remote actor for Follow: %s", from_actor_url) return follower_inbox = remote_actor.get("inbox") if not follower_inbox: log.warning("Remote actor has no inbox: %s", from_actor_url) return # Derive acct from preferredUsername@domain remote_username = remote_actor.get("preferredUsername", "") from urllib.parse import urlparse remote_domain = urlparse(from_actor_url).netloc follower_acct = f"{remote_username}@{remote_domain}" if remote_username else from_actor_url pub_key = (remote_actor.get("publicKey") or {}).get("publicKeyPem") # Add follower via service await services.federation.add_follower( g.s, actor_row.preferred_username, follower_acct=follower_acct, follower_inbox=follower_inbox, follower_actor_url=from_actor_url, follower_public_key=pub_key, ) log.info( "New follower: %s → @%s", follower_acct, actor_row.preferred_username, ) # Notification from shared.models.federation import APNotification, RemoteActor ra = ( await g.s.execute( select(RemoteActor).where(RemoteActor.actor_url == from_actor_url) ) ).scalar_one_or_none() if not ra: # Store this remote actor ra_dto = await services.federation.get_or_fetch_remote_actor(g.s, from_actor_url) if ra_dto: ra = (await g.s.execute( select(RemoteActor).where(RemoteActor.actor_url == from_actor_url) )).scalar_one_or_none() if ra: notif = APNotification( actor_profile_id=actor_row.id, notification_type="follow", from_remote_actor_id=ra.id, ) g.s.add(notif) # Send Accept await _send_accept(actor_row, body, follower_inbox) # Backfill: deliver recent posts to new follower's inbox await _backfill_follower(actor_row, follower_inbox) async def _handle_undo( actor_row: ActorProfile, body: dict, from_actor_url: str, ) -> None: """Process an Undo activity (typically Undo Follow).""" inner = body.get("object") if not inner: return inner_type = inner.get("type") if isinstance(inner, dict) else None if inner_type == "Follow": # Derive acct from urllib.parse import urlparse remote_domain = urlparse(from_actor_url).netloc remote_actor = await _fetch_remote_actor(from_actor_url) remote_username = "" if remote_actor: remote_username = remote_actor.get("preferredUsername", "") follower_acct = f"{remote_username}@{remote_domain}" if remote_username else from_actor_url removed = await services.federation.remove_follower( g.s, actor_row.preferred_username, follower_acct, ) if removed: log.info("Unfollowed: %s → @%s", follower_acct, actor_row.preferred_username) else: log.debug("Undo Follow: follower not found: %s", follower_acct) else: log.debug("Undo for %s — not handled", inner_type) async def _handle_accept( actor_row: ActorProfile, body: dict, from_actor_url: str, ) -> None: """Process Accept activity — update outbound follow state.""" inner = body.get("object") if not inner: return inner_type = inner.get("type") if isinstance(inner, dict) else None if inner_type == "Follow": await services.federation.accept_follow_response( g.s, actor_row.preferred_username, from_actor_url, ) log.info("Follow accepted by %s for @%s", from_actor_url, actor_row.preferred_username) async def _handle_create( actor_row: ActorProfile, body: dict, from_actor_url: str, ) -> None: """Process Create(Note/Article) — ingest remote post.""" obj = body.get("object") if not obj or not isinstance(obj, dict): return obj_type = obj.get("type", "") if obj_type not in ("Note", "Article"): log.debug("Create with type %s — skipping", obj_type) return # Get or create remote actor remote = await services.federation.get_or_fetch_remote_actor(g.s, from_actor_url) if not remote: log.warning("Could not resolve remote actor for Create: %s", from_actor_url) return await services.federation.ingest_remote_post(g.s, remote.id, body, obj) log.info("Ingested %s from %s", obj_type, from_actor_url) # Create notification if mentions a local actor from shared.models.federation import APNotification, APRemotePost, RemoteActor tags = obj.get("tag", []) if isinstance(tags, list): domain = _domain() for tag in tags: if not isinstance(tag, dict): continue if tag.get("type") != "Mention": continue href = tag.get("href", "") if f"https://{domain}/users/" in href: mentioned_username = href.rsplit("/", 1)[-1] mentioned = await services.federation.get_actor_by_username( g.s, mentioned_username, ) if mentioned: # Find the remote post we just created rp = (await g.s.execute( select(APRemotePost).where( APRemotePost.object_id == obj.get("id") ) )).scalar_one_or_none() ra = (await g.s.execute( select(RemoteActor).where(RemoteActor.actor_url == from_actor_url) )).scalar_one_or_none() notif = APNotification( actor_profile_id=mentioned.id, notification_type="mention", from_remote_actor_id=ra.id if ra else None, target_remote_post_id=rp.id if rp else None, ) g.s.add(notif) # Also check if it's a reply to one of our posts in_reply_to = obj.get("inReplyTo") if in_reply_to and f"https://{domain}/users/" in str(in_reply_to): # It's a reply to one of our local posts from shared.models.federation import APActivity local_activity = (await g.s.execute( select(APActivity).where( APActivity.activity_id == in_reply_to, ) )).scalar_one_or_none() if local_activity: ra = (await g.s.execute( select(RemoteActor).where(RemoteActor.actor_url == from_actor_url) )).scalar_one_or_none() rp = (await g.s.execute( select(APRemotePost).where( APRemotePost.object_id == obj.get("id") ) )).scalar_one_or_none() notif = APNotification( actor_profile_id=local_activity.actor_profile_id, notification_type="reply", from_remote_actor_id=ra.id if ra else None, target_remote_post_id=rp.id if rp else None, ) g.s.add(notif) async def _handle_update( actor_row: ActorProfile, body: dict, from_actor_url: str, ) -> None: """Process Update — re-ingest remote post.""" obj = body.get("object") if not obj or not isinstance(obj, dict): return obj_type = obj.get("type", "") if obj_type in ("Note", "Article"): remote = await services.federation.get_or_fetch_remote_actor(g.s, from_actor_url) if remote: await services.federation.ingest_remote_post(g.s, remote.id, body, obj) log.info("Updated %s from %s", obj_type, from_actor_url) async def _handle_delete( actor_row: ActorProfile, body: dict, from_actor_url: str, ) -> None: """Process Delete — remove remote post.""" obj = body.get("object") if isinstance(obj, str): object_id = obj elif isinstance(obj, dict): object_id = obj.get("id", "") else: return if object_id: await services.federation.delete_remote_post(g.s, object_id) log.info("Deleted remote post %s from %s", object_id, from_actor_url) async def _handle_like( actor_row: ActorProfile, body: dict, from_actor_url: str, ) -> None: """Process incoming Like — record interaction + notify.""" from shared.models.federation import APInteraction, APNotification, RemoteActor object_id = body.get("object", "") if isinstance(object_id, dict): object_id = object_id.get("id", "") if not object_id: return remote = await services.federation.get_or_fetch_remote_actor(g.s, from_actor_url) if not remote: return ra = (await g.s.execute( select(RemoteActor).where(RemoteActor.actor_url == from_actor_url) )).scalar_one_or_none() # Find the local activity this Like targets from shared.models.federation import APActivity target = (await g.s.execute( select(APActivity).where(APActivity.activity_id == object_id) )).scalar_one_or_none() if not target: # Try matching object_data.id log.info("Like from %s for %s (target not found locally)", from_actor_url, object_id) return # Record interaction interaction = APInteraction( remote_actor_id=ra.id if ra else None, post_type="local", post_id=target.id, interaction_type="like", activity_id=body.get("id"), ) g.s.add(interaction) # Notification notif = APNotification( actor_profile_id=target.actor_profile_id, notification_type="like", from_remote_actor_id=ra.id if ra else None, target_activity_id=target.id, ) g.s.add(notif) log.info("Like from %s on activity %s", from_actor_url, object_id) async def _handle_announce( actor_row: ActorProfile, body: dict, from_actor_url: str, ) -> None: """Process incoming Announce (boost) — record interaction + notify.""" from shared.models.federation import APInteraction, APNotification, RemoteActor object_id = body.get("object", "") if isinstance(object_id, dict): object_id = object_id.get("id", "") if not object_id: return remote = await services.federation.get_or_fetch_remote_actor(g.s, from_actor_url) if not remote: return ra = (await g.s.execute( select(RemoteActor).where(RemoteActor.actor_url == from_actor_url) )).scalar_one_or_none() from shared.models.federation import APActivity target = (await g.s.execute( select(APActivity).where(APActivity.activity_id == object_id) )).scalar_one_or_none() if not target: log.info("Announce from %s for %s (target not found locally)", from_actor_url, object_id) return interaction = APInteraction( remote_actor_id=ra.id if ra else None, post_type="local", post_id=target.id, interaction_type="boost", activity_id=body.get("id"), ) g.s.add(interaction) notif = APNotification( actor_profile_id=target.actor_profile_id, notification_type="boost", from_remote_actor_id=ra.id if ra else None, target_activity_id=target.id, ) g.s.add(notif) log.info("Announce from %s on activity %s", from_actor_url, object_id) @bp.get("//followers") async def followers(username: str): actor = await services.federation.get_actor_by_username(g.s, username) if not actor: abort(404) domain = _domain() collection_id = f"https://{domain}/users/{username}/followers" follower_list = await services.federation.get_followers(g.s, username) page_param = request.args.get("page") if not page_param: return Response( response=json.dumps({ "@context": "https://www.w3.org/ns/activitystreams", "type": "OrderedCollection", "id": collection_id, "totalItems": len(follower_list), "first": f"{collection_id}?page=1", }), content_type=AP_CONTENT_TYPE, ) return Response( response=json.dumps({ "@context": "https://www.w3.org/ns/activitystreams", "type": "OrderedCollectionPage", "id": f"{collection_id}?page=1", "partOf": collection_id, "totalItems": len(follower_list), "orderedItems": [f.follower_actor_url for f in follower_list], }), content_type=AP_CONTENT_TYPE, ) @bp.get("//following") async def following(username: str): actor = await services.federation.get_actor_by_username(g.s, username) if not actor: abort(404) domain = _domain() collection_id = f"https://{domain}/users/{username}/following" following_list, total = await services.federation.get_following(g.s, username) page_param = request.args.get("page") if not page_param: return Response( response=json.dumps({ "@context": "https://www.w3.org/ns/activitystreams", "type": "OrderedCollection", "id": collection_id, "totalItems": total, "first": f"{collection_id}?page=1", }), content_type=AP_CONTENT_TYPE, ) return Response( response=json.dumps({ "@context": "https://www.w3.org/ns/activitystreams", "type": "OrderedCollectionPage", "id": f"{collection_id}?page=1", "partOf": collection_id, "totalItems": total, "orderedItems": [f.actor_url for f in following_list], }), content_type=AP_CONTENT_TYPE, ) return bp