"""Per-app ActivityPub blueprint. Factory function ``create_activitypub_blueprint(app_name)`` returns a Blueprint with WebFinger, host-meta, nodeinfo, actor profile, inbox, outbox, and followers endpoints. Per-app actors are *virtual projections* of the same ``ActorProfile``. Same keypair, same ``preferred_username`` — the only differences are: - the domain in URLs (e.g. blog.rose-ash.com vs federation.rose-ash.com) - which activities are served in the outbox (filtered by ``origin_app``) - which followers are returned (filtered by ``app_domain``) - Follow requests create ``APFollower(app_domain=app_name)`` Federation app acts as the aggregate: no origin_app filter, app_domain=NULL. """ from __future__ import annotations import json import logging import os from datetime import datetime, timezone from quart import Blueprint, request, abort, Response, g from sqlalchemy import select from shared.services.registry import services from shared.models.federation import ActorProfile, APInboxItem from shared.browser.app.csrf import csrf_exempt log = logging.getLogger(__name__) AP_CONTENT_TYPE = "application/activity+json" # Apps that serve per-app AP actors AP_APPS = {"blog", "market", "events", "federation"} def _ap_domain(app_name: str) -> str: """Return the public domain for this app's AP identity.""" env_key = f"AP_DOMAIN_{app_name.upper()}" env_val = os.getenv(env_key) if env_val: return env_val # Default: {app}.rose-ash.com, except federation uses AP_DOMAIN if app_name == "federation": return os.getenv("AP_DOMAIN", "federation.rose-ash.com") return f"{app_name}.rose-ash.com" def _federation_domain() -> str: """The aggregate federation domain (for alsoKnownAs links).""" return os.getenv("AP_DOMAIN", "federation.rose-ash.com") def _is_aggregate(app_name: str) -> bool: """Federation serves the aggregate actor (no per-app filter).""" return app_name == "federation" async def _render_profile_sx(actor, activities, total): """Render the federation actor profile page using SX.""" from markupsafe import escape from shared.sx.page import get_template_context from shared.sx.helpers import full_page_sx, oob_page_sx, sx_response from shared.browser.app.utils.htmx import is_htmx_request from shared.config import config def _e(v): s = str(v) if v else "" return str(escape(s)).replace('"', '\\"') username = _e(actor.preferred_username) display_name = _e(actor.display_name or actor.preferred_username) ap_domain = config().get("ap_domain", "rose-ash.com") summary_el = "" if actor.summary: summary_el = f'(p :class "mt-2" "{_e(actor.summary)}")' activity_items = [] for a in activities: ts = "" if a.published: ts = a.published.strftime("%Y-%m-%d %H:%M") obj_el = "" if a.object_type: obj_el = f'(span :class "text-sm text-stone-500" "{_e(a.object_type)}")' activity_items.append( f'(div :class "bg-white rounded-lg shadow p-4"' f' (div :class "flex justify-between items-start"' f' (span :class "font-medium" "{_e(a.activity_type)}")' f' (span :class "text-sm text-stone-400" "{_e(ts)}"))' f' {obj_el})') if activities: activities_el = ('(div :class "space-y-4" ' + " ".join(activity_items) + ")") else: activities_el = '(p :class "text-stone-500" "No activities yet.")' content = ( f'(div :id "main-panel"' f' (div :class "py-8"' f' (div :class "bg-white rounded-lg shadow p-6 mb-6"' f' (h1 :class "text-2xl font-bold" "{display_name}")' f' (p :class "text-stone-500" "@{username}@{_e(ap_domain)}")' f' {summary_el})' f' (h2 :class "text-xl font-bold mb-4" "Activities ({total})")' f' {activities_el}))') tctx = await get_template_context() if is_htmx_request(): # Import federation layout for OOB headers try: from federation.sxc.pages import _social_oob oob_headers = _social_oob(tctx) except ImportError: oob_headers = "" return sx_response(oob_page_sx(oobs=oob_headers, content=content)) else: try: from federation.sxc.pages import _social_full header_rows = _social_full(tctx) except ImportError: from shared.sx.helpers import root_header_sx header_rows = root_header_sx(tctx) return full_page_sx(tctx, header_rows=header_rows, content=content) def create_activitypub_blueprint(app_name: str) -> Blueprint: """Return a Blueprint with AP endpoints for *app_name*.""" bp = Blueprint("activitypub", __name__) domain = _ap_domain(app_name) fed_domain = _federation_domain() aggregate = _is_aggregate(app_name) # For per-app follows, store app_domain; for federation, "federation" follower_app_domain: str = app_name # For per-app outboxes, filter by origin_app; for federation, show all outbox_origin_app: str | None = None if aggregate else app_name # ------------------------------------------------------------------ # Federation session management — AP tables live in db_federation, # which is separate from each app's own DB after the per-app split. # g._ap_s points to the federation DB; on the federation app itself # it's just an alias for g.s. # ------------------------------------------------------------------ from shared.db.session import needs_federation_session, create_federation_session @bp.before_request async def _open_ap_session(): if needs_federation_session(): sess = create_federation_session() g._ap_s = sess g._ap_tx = await sess.begin() g._ap_own = True else: g._ap_s = g.s g._ap_own = False @bp.after_request async def _commit_ap_session(response): if getattr(g, "_ap_own", False): if 200 <= response.status_code < 400: try: await g._ap_tx.commit() except Exception: try: await g._ap_tx.rollback() except Exception: pass return response @bp.teardown_request async def _close_ap_session(exc): if getattr(g, "_ap_own", False): s = getattr(g, "_ap_s", None) if s: if exc is not None or s.in_transaction(): tx = getattr(g, "_ap_tx", None) if tx and tx.is_active: try: await tx.rollback() except Exception: pass try: await s.close() except Exception: pass # ------------------------------------------------------------------ # Well-known endpoints # ------------------------------------------------------------------ @bp.get("/.well-known/webfinger") async def webfinger(): resource = request.args.get("resource", "") if not resource.startswith("acct:"): abort(400, "Invalid resource format") parts = resource[5:].split("@") if len(parts) != 2: abort(400, "Invalid resource format") username, res_domain = parts if res_domain != domain: abort(404, "User not on this server") actor = await services.federation.get_actor_by_username(g._ap_s, username) if not actor: abort(404, "User not found") actor_url = f"https://{domain}/users/{username}" return Response( response=json.dumps({ "subject": resource, "aliases": [actor_url], "links": [ { "rel": "self", "type": AP_CONTENT_TYPE, "href": actor_url, }, { "rel": "http://webfinger.net/rel/profile-page", "type": "text/html", "href": actor_url, }, ], }), content_type="application/jrd+json", ) @bp.get("/.well-known/nodeinfo") async def nodeinfo_index(): return Response( response=json.dumps({ "links": [ { "rel": "http://nodeinfo.diaspora.software/ns/schema/2.0", "href": f"https://{domain}/nodeinfo/2.0", } ] }), content_type="application/json", ) @bp.get("/nodeinfo/2.0") async def nodeinfo(): stats = await services.federation.get_stats(g._ap_s) return Response( response=json.dumps({ "version": "2.0", "software": { "name": "rose-ash", "version": "1.0.0", }, "protocols": ["activitypub"], "usage": { "users": { "total": stats.get("actors", 0), "activeMonth": stats.get("actors", 0), }, "localPosts": stats.get("activities", 0), }, "openRegistrations": False, "metadata": { "nodeName": f"Rose Ash ({app_name})", "nodeDescription": f"Rose Ash {app_name} — ActivityPub federation", }, }), content_type="application/json", ) @bp.get("/.well-known/host-meta") async def host_meta(): xml = ( '\n' '\n' f' \n' '' ) return Response(response=xml, content_type="application/xrd+xml") # ------------------------------------------------------------------ # Actor profile # ------------------------------------------------------------------ @bp.get("/users/") async def actor_profile(username: str): actor = await services.federation.get_actor_by_username(g._ap_s, username) if not actor: abort(404) accept_header = request.headers.get("accept", "") if "application/activity+json" in accept_header or "application/ld+json" in accept_header: actor_url = f"https://{domain}/users/{username}" actor_json = { "@context": [ "https://www.w3.org/ns/activitystreams", "https://w3id.org/security/v1", ], "type": "Person", "id": actor_url, "name": actor.display_name or username, "preferredUsername": username, "summary": actor.summary or "", "manuallyApprovesFollowers": False, "inbox": f"{actor_url}/inbox", "outbox": f"{actor_url}/outbox", "followers": f"{actor_url}/followers", "following": f"{actor_url}/following", "publicKey": { "id": f"{actor_url}#main-key", "owner": actor_url, "publicKeyPem": actor.public_key_pem, }, "url": actor_url, } if aggregate: # Aggregate actor advertises all per-app actors also_known = [ f"https://{_ap_domain(a)}/users/{username}" for a in AP_APPS if a != "federation" ] if also_known: actor_json["alsoKnownAs"] = also_known else: # Per-app actors link back to the aggregate federation actor actor_json["alsoKnownAs"] = [ f"https://{fed_domain}/users/{username}", ] return Response( response=json.dumps(actor_json), content_type=AP_CONTENT_TYPE, ) # HTML: federation renders its own profile; other apps redirect there if aggregate: activities, total = await services.federation.get_outbox( g._ap_s, username, page=1, per_page=20, ) return await _render_profile_sx(actor, activities, total) from quart import redirect return redirect(f"https://{fed_domain}/users/{username}") # ------------------------------------------------------------------ # Inbox # ------------------------------------------------------------------ @csrf_exempt @bp.post("/users//inbox") async def inbox(username: str): actor = await services.federation.get_actor_by_username(g._ap_s, username) if not actor: abort(404) body = await request.get_json() if not body: abort(400, "Invalid JSON") activity_type = body.get("type", "") from_actor_url = body.get("actor", "") # Verify HTTP signature (best-effort) sig_valid = False try: from shared.utils.http_signatures import verify_request_signature from shared.infrastructure.ap_inbox_handlers import fetch_remote_actor req_headers = dict(request.headers) sig_header = req_headers.get("Signature", "") remote_actor = await fetch_remote_actor(from_actor_url) if remote_actor and sig_header: pub_key_pem = (remote_actor.get("publicKey") or {}).get("publicKeyPem") if pub_key_pem: sig_valid = verify_request_signature( public_key_pem=pub_key_pem, signature_header=sig_header, method="POST", path=f"/users/{username}/inbox", headers=req_headers, ) except Exception: log.debug("Signature verification failed for %s", from_actor_url, exc_info=True) if not sig_valid: log.warning( "Unverified inbox POST from %s (%s) on %s — rejecting", from_actor_url, activity_type, domain, ) abort(401, "Invalid or missing HTTP signature") # Load actor row for DB operations actor_row = ( await g._ap_s.execute( select(ActorProfile).where( ActorProfile.preferred_username == username ) ) ).scalar_one() # Store raw inbox item item = APInboxItem( actor_profile_id=actor_row.id, raw_json=body, activity_type=activity_type, from_actor=from_actor_url, ) g._ap_s.add(item) await g._ap_s.flush() # Dispatch to shared handlers from shared.infrastructure.ap_inbox_handlers import dispatch_inbox_activity await dispatch_inbox_activity( g._ap_s, actor_row, body, from_actor_url, domain=domain, app_domain=follower_app_domain, ) # Mark as processed item.state = "processed" item.processed_at = datetime.now(timezone.utc) await g._ap_s.flush() return Response(status=202) # ------------------------------------------------------------------ # Outbox # ------------------------------------------------------------------ @bp.get("/users//outbox") async def outbox(username: str): actor = await services.federation.get_actor_by_username(g._ap_s, username) if not actor: abort(404) actor_url = f"https://{domain}/users/{username}" page_param = request.args.get("page") if not page_param: _, total = await services.federation.get_outbox( g._ap_s, username, page=1, per_page=1, origin_app=outbox_origin_app, ) return Response( response=json.dumps({ "@context": "https://www.w3.org/ns/activitystreams", "type": "OrderedCollection", "id": f"{actor_url}/outbox", "totalItems": total, "first": f"{actor_url}/outbox?page=1", }), content_type=AP_CONTENT_TYPE, ) page_num = int(page_param) activities, total = await services.federation.get_outbox( g._ap_s, username, page=page_num, per_page=20, origin_app=outbox_origin_app, ) items = [] for a in activities: items.append({ "@context": "https://www.w3.org/ns/activitystreams", "type": a.activity_type, "id": a.activity_id, "actor": actor_url, "published": a.published.isoformat() if a.published else None, "object": { "type": a.object_type, **(a.object_data or {}), }, }) return Response( response=json.dumps({ "@context": "https://www.w3.org/ns/activitystreams", "type": "OrderedCollectionPage", "id": f"{actor_url}/outbox?page={page_num}", "partOf": f"{actor_url}/outbox", "totalItems": total, "orderedItems": items, }), content_type=AP_CONTENT_TYPE, ) # ------------------------------------------------------------------ # Followers / following collections # ------------------------------------------------------------------ @bp.get("/users//followers") async def followers(username: str): actor = await services.federation.get_actor_by_username(g._ap_s, username) if not actor: abort(404) collection_id = f"https://{domain}/users/{username}/followers" follower_list = await services.federation.get_followers( g._ap_s, username, app_domain=follower_app_domain, ) page_param = request.args.get("page") if not page_param: return Response( response=json.dumps({ "@context": "https://www.w3.org/ns/activitystreams", "type": "OrderedCollection", "id": collection_id, "totalItems": len(follower_list), "first": f"{collection_id}?page=1", }), content_type=AP_CONTENT_TYPE, ) return Response( response=json.dumps({ "@context": "https://www.w3.org/ns/activitystreams", "type": "OrderedCollectionPage", "id": f"{collection_id}?page=1", "partOf": collection_id, "totalItems": len(follower_list), "orderedItems": [f.follower_actor_url for f in follower_list], }), content_type=AP_CONTENT_TYPE, ) @bp.get("/users//following") async def following(username: str): actor = await services.federation.get_actor_by_username(g._ap_s, username) if not actor: abort(404) collection_id = f"https://{domain}/users/{username}/following" following_list, total = await services.federation.get_following(g._ap_s, username) page_param = request.args.get("page") if not page_param: return Response( response=json.dumps({ "@context": "https://www.w3.org/ns/activitystreams", "type": "OrderedCollection", "id": collection_id, "totalItems": total, "first": f"{collection_id}?page=1", }), content_type=AP_CONTENT_TYPE, ) return Response( response=json.dumps({ "@context": "https://www.w3.org/ns/activitystreams", "type": "OrderedCollectionPage", "id": f"{collection_id}?page=1", "partOf": collection_id, "totalItems": total, "orderedItems": [f.actor_url for f in following_list], }), content_type=AP_CONTENT_TYPE, ) return bp