diff --git a/bp/actors/__init__.py b/bp/actors/__init__.py deleted file mode 100644 index e69de29..0000000 diff --git a/bp/actors/routes.py b/bp/actors/routes.py deleted file mode 100644 index 403c134..0000000 --- a/bp/actors/routes.py +++ /dev/null @@ -1,734 +0,0 @@ -"""ActivityPub actor endpoints: profiles, outbox, inbox. - -Ported from ~/art-dag/activity-pub/app/routers/users.py. -""" -from __future__ import annotations - -import json -import logging -import os -import uuid - -import httpx -from quart import Blueprint, request, abort, Response, g, render_template -from sqlalchemy import select - -from shared.services.registry import services -from shared.models.federation import ActorProfile, APInboxItem -from shared.browser.app.csrf import csrf_exempt - -log = logging.getLogger(__name__) - -AP_CONTENT_TYPE = "application/activity+json" - - -def _domain() -> str: - return os.getenv("AP_DOMAIN", "rose-ash.com") - - -async def _fetch_remote_actor(actor_url: str) -> dict | None: - """Fetch a remote actor's JSON-LD profile.""" - try: - async with httpx.AsyncClient(timeout=10) as client: - resp = await client.get( - actor_url, - headers={"Accept": AP_CONTENT_TYPE}, - ) - if resp.status_code == 200: - return resp.json() - except Exception: - log.exception("Failed to fetch remote actor: %s", actor_url) - return None - - -async def _send_accept( - actor: ActorProfile, - follow_activity: dict, - follower_inbox: str, -) -> None: - """Send an Accept activity back to the follower.""" - from shared.utils.http_signatures import sign_request - - domain = _domain() - username = actor.preferred_username - actor_url = f"https://{domain}/users/{username}" - - accept_id = f"{actor_url}/activities/{uuid.uuid4()}" - accept = { - "@context": "https://www.w3.org/ns/activitystreams", - "id": accept_id, - "type": "Accept", - "actor": actor_url, - "object": follow_activity, - } - - body_bytes = json.dumps(accept).encode() - key_id = f"{actor_url}#main-key" - - from urllib.parse import urlparse - parsed = urlparse(follower_inbox) - headers = sign_request( - private_key_pem=actor.private_key_pem, - key_id=key_id, - method="POST", - path=parsed.path, - host=parsed.netloc, - body=body_bytes, - ) - headers["Content-Type"] = AP_CONTENT_TYPE - - try: - async with httpx.AsyncClient(timeout=15) as client: - resp = await client.post( - follower_inbox, - content=body_bytes, - headers=headers, - ) - log.info("Accept → %s: %d", follower_inbox, resp.status_code) - except Exception: - log.exception("Failed to send Accept to %s", follower_inbox) - - -async def _backfill_follower( - actor: ActorProfile, - follower_inbox: str, -) -> None: - """Deliver recent Create activities to a new follower's inbox.""" - from shared.events.handlers.ap_delivery_handler import ( - _build_activity_json, _deliver_to_inbox, - ) - from shared.models.federation import APActivity - - domain = _domain() - - # Fetch recent Create activities (most recent first, limit 20) - activities = ( - await g.s.execute( - select(APActivity).where( - APActivity.actor_profile_id == actor.id, - APActivity.is_local == True, # noqa: E712 - APActivity.activity_type == "Create", - ) - .order_by(APActivity.published.desc()) - .limit(20) - ) - ).scalars().all() - - if not activities: - return - - log.info( - "Backfilling %d posts to %s for @%s", - len(activities), follower_inbox, actor.preferred_username, - ) - - async with httpx.AsyncClient() as client: - for activity in reversed(activities): # oldest first - activity_json = _build_activity_json(activity, actor, domain) - await _deliver_to_inbox(client, follower_inbox, activity_json, actor, domain) - - -def register(url_prefix="/users"): - bp = Blueprint("actors", __name__, url_prefix=url_prefix) - - @bp.get("/") - async def profile(username: str): - actor = await services.federation.get_actor_by_username(g.s, username) - if not actor: - abort(404) - - domain = _domain() - accept = request.headers.get("accept", "") - - # AP JSON-LD response - if "application/activity+json" in accept or "application/ld+json" in accept: - actor_json = { - "@context": [ - "https://www.w3.org/ns/activitystreams", - "https://w3id.org/security/v1", - ], - "type": "Person", - "id": f"https://{domain}/users/{username}", - "name": actor.display_name or username, - "preferredUsername": username, - "summary": actor.summary or "", - "manuallyApprovesFollowers": False, - "inbox": f"https://{domain}/users/{username}/inbox", - "outbox": f"https://{domain}/users/{username}/outbox", - "followers": f"https://{domain}/users/{username}/followers", - "following": f"https://{domain}/users/{username}/following", - "publicKey": { - "id": f"https://{domain}/users/{username}#main-key", - "owner": f"https://{domain}/users/{username}", - "publicKeyPem": actor.public_key_pem, - }, - "url": f"https://{domain}/users/{username}", - } - return Response( - response=json.dumps(actor_json), - content_type=AP_CONTENT_TYPE, - ) - - # HTML profile page - activities, total = await services.federation.get_outbox( - g.s, username, page=1, per_page=20, - ) - return await render_template( - "federation/profile.html", - actor=actor, - activities=activities, - total=total, - ) - - @bp.get("//outbox") - async def outbox(username: str): - actor = await services.federation.get_actor_by_username(g.s, username) - if not actor: - abort(404) - - domain = _domain() - actor_id = f"https://{domain}/users/{username}" - page_param = request.args.get("page") - - if not page_param: - _, total = await services.federation.get_outbox(g.s, username, page=1, per_page=1) - return Response( - response=json.dumps({ - "@context": "https://www.w3.org/ns/activitystreams", - "type": "OrderedCollection", - "id": f"{actor_id}/outbox", - "totalItems": total, - "first": f"{actor_id}/outbox?page=1", - }), - content_type=AP_CONTENT_TYPE, - ) - - page_num = int(page_param) - activities, total = await services.federation.get_outbox( - g.s, username, page=page_num, per_page=20, - ) - - items = [] - for a in activities: - items.append({ - "@context": "https://www.w3.org/ns/activitystreams", - "type": a.activity_type, - "id": a.activity_id, - "actor": actor_id, - "published": a.published.isoformat() if a.published else None, - "object": { - "type": a.object_type, - **(a.object_data or {}), - }, - }) - - return Response( - response=json.dumps({ - "@context": "https://www.w3.org/ns/activitystreams", - "type": "OrderedCollectionPage", - "id": f"{actor_id}/outbox?page={page_num}", - "partOf": f"{actor_id}/outbox", - "totalItems": total, - "orderedItems": items, - }), - content_type=AP_CONTENT_TYPE, - ) - - @csrf_exempt - @bp.post("//inbox") - async def inbox(username: str): - actor = await services.federation.get_actor_by_username(g.s, username) - if not actor: - abort(404) - - body = await request.get_json() - if not body: - abort(400, "Invalid JSON") - - activity_type = body.get("type", "") - from_actor_url = body.get("actor", "") - - # Verify HTTP signature (best-effort — log but don't block yet - # while we validate our implementation against real servers) - sig_valid = False - try: - from shared.utils.http_signatures import verify_request_signature - req_headers = dict(request.headers) - sig_header = req_headers.get("Signature", "") - - # Fetch remote actor to get their public key - remote_actor = await _fetch_remote_actor(from_actor_url) - if remote_actor and sig_header: - pub_key_pem = (remote_actor.get("publicKey") or {}).get("publicKeyPem") - if pub_key_pem: - sig_valid = verify_request_signature( - public_key_pem=pub_key_pem, - signature_header=sig_header, - method="POST", - path=f"/users/{username}/inbox", - headers=req_headers, - ) - except Exception: - log.debug("Signature verification failed for %s", from_actor_url, exc_info=True) - - if not sig_valid: - log.warning( - "Unverified inbox POST from %s (%s) — accepting anyway for now", - from_actor_url, activity_type, - ) - - # Load actor row for DB operations - actor_row = ( - await g.s.execute( - select(ActorProfile).where( - ActorProfile.preferred_username == username - ) - ) - ).scalar_one() - - # Store raw inbox item - item = APInboxItem( - actor_profile_id=actor_row.id, - raw_json=body, - activity_type=activity_type, - from_actor=from_actor_url, - ) - g.s.add(item) - await g.s.flush() - - # Handle activity types inline - if activity_type == "Follow": - await _handle_follow(actor_row, body, from_actor_url) - elif activity_type == "Undo": - await _handle_undo(actor_row, body, from_actor_url) - elif activity_type == "Accept": - await _handle_accept(actor_row, body, from_actor_url) - elif activity_type == "Create": - await _handle_create(actor_row, body, from_actor_url) - elif activity_type == "Update": - await _handle_update(actor_row, body, from_actor_url) - elif activity_type == "Delete": - await _handle_delete(actor_row, body, from_actor_url) - elif activity_type == "Like": - await _handle_like(actor_row, body, from_actor_url) - elif activity_type == "Announce": - await _handle_announce(actor_row, body, from_actor_url) - - # Mark as processed - item.state = "processed" - from datetime import datetime, timezone - item.processed_at = datetime.now(timezone.utc) - await g.s.flush() - - return Response(status=202) - - async def _handle_follow( - actor_row: ActorProfile, - body: dict, - from_actor_url: str, - ) -> None: - """Process a Follow activity: add follower, send Accept.""" - remote_actor = await _fetch_remote_actor(from_actor_url) - if not remote_actor: - log.warning("Could not fetch remote actor for Follow: %s", from_actor_url) - return - - follower_inbox = remote_actor.get("inbox") - if not follower_inbox: - log.warning("Remote actor has no inbox: %s", from_actor_url) - return - - # Derive acct from preferredUsername@domain - remote_username = remote_actor.get("preferredUsername", "") - from urllib.parse import urlparse - remote_domain = urlparse(from_actor_url).netloc - follower_acct = f"{remote_username}@{remote_domain}" if remote_username else from_actor_url - - pub_key = (remote_actor.get("publicKey") or {}).get("publicKeyPem") - - # Add follower via service - await services.federation.add_follower( - g.s, - actor_row.preferred_username, - follower_acct=follower_acct, - follower_inbox=follower_inbox, - follower_actor_url=from_actor_url, - follower_public_key=pub_key, - ) - - log.info( - "New follower: %s → @%s", - follower_acct, actor_row.preferred_username, - ) - - # Notification - from shared.models.federation import APNotification, RemoteActor - ra = ( - await g.s.execute( - select(RemoteActor).where(RemoteActor.actor_url == from_actor_url) - ) - ).scalar_one_or_none() - if not ra: - # Store this remote actor - ra_dto = await services.federation.get_or_fetch_remote_actor(g.s, from_actor_url) - if ra_dto: - ra = (await g.s.execute( - select(RemoteActor).where(RemoteActor.actor_url == from_actor_url) - )).scalar_one_or_none() - - if ra: - notif = APNotification( - actor_profile_id=actor_row.id, - notification_type="follow", - from_remote_actor_id=ra.id, - ) - g.s.add(notif) - - # Send Accept - await _send_accept(actor_row, body, follower_inbox) - - # Backfill: deliver recent posts to new follower's inbox - await _backfill_follower(actor_row, follower_inbox) - - async def _handle_undo( - actor_row: ActorProfile, - body: dict, - from_actor_url: str, - ) -> None: - """Process an Undo activity (typically Undo Follow).""" - inner = body.get("object") - if not inner: - return - - inner_type = inner.get("type") if isinstance(inner, dict) else None - if inner_type == "Follow": - # Derive acct - from urllib.parse import urlparse - remote_domain = urlparse(from_actor_url).netloc - remote_actor = await _fetch_remote_actor(from_actor_url) - remote_username = "" - if remote_actor: - remote_username = remote_actor.get("preferredUsername", "") - follower_acct = f"{remote_username}@{remote_domain}" if remote_username else from_actor_url - - removed = await services.federation.remove_follower( - g.s, actor_row.preferred_username, follower_acct, - ) - if removed: - log.info("Unfollowed: %s → @%s", follower_acct, actor_row.preferred_username) - else: - log.debug("Undo Follow: follower not found: %s", follower_acct) - else: - log.debug("Undo for %s — not handled", inner_type) - - async def _handle_accept( - actor_row: ActorProfile, - body: dict, - from_actor_url: str, - ) -> None: - """Process Accept activity — update outbound follow state.""" - inner = body.get("object") - if not inner: - return - - inner_type = inner.get("type") if isinstance(inner, dict) else None - if inner_type == "Follow": - await services.federation.accept_follow_response( - g.s, actor_row.preferred_username, from_actor_url, - ) - log.info("Follow accepted by %s for @%s", from_actor_url, actor_row.preferred_username) - - async def _handle_create( - actor_row: ActorProfile, - body: dict, - from_actor_url: str, - ) -> None: - """Process Create(Note/Article) — ingest remote post.""" - obj = body.get("object") - if not obj or not isinstance(obj, dict): - return - - obj_type = obj.get("type", "") - if obj_type not in ("Note", "Article"): - log.debug("Create with type %s — skipping", obj_type) - return - - # Get or create remote actor - remote = await services.federation.get_or_fetch_remote_actor(g.s, from_actor_url) - if not remote: - log.warning("Could not resolve remote actor for Create: %s", from_actor_url) - return - - await services.federation.ingest_remote_post(g.s, remote.id, body, obj) - log.info("Ingested %s from %s", obj_type, from_actor_url) - - # Create notification if mentions a local actor - from shared.models.federation import APNotification, APRemotePost, RemoteActor - tags = obj.get("tag", []) - if isinstance(tags, list): - domain = _domain() - for tag in tags: - if not isinstance(tag, dict): - continue - if tag.get("type") != "Mention": - continue - href = tag.get("href", "") - if f"https://{domain}/users/" in href: - mentioned_username = href.rsplit("/", 1)[-1] - mentioned = await services.federation.get_actor_by_username( - g.s, mentioned_username, - ) - if mentioned: - # Find the remote post we just created - rp = (await g.s.execute( - select(APRemotePost).where( - APRemotePost.object_id == obj.get("id") - ) - )).scalar_one_or_none() - - ra = (await g.s.execute( - select(RemoteActor).where(RemoteActor.actor_url == from_actor_url) - )).scalar_one_or_none() - - notif = APNotification( - actor_profile_id=mentioned.id, - notification_type="mention", - from_remote_actor_id=ra.id if ra else None, - target_remote_post_id=rp.id if rp else None, - ) - g.s.add(notif) - - # Also check if it's a reply to one of our posts - in_reply_to = obj.get("inReplyTo") - if in_reply_to and f"https://{domain}/users/" in str(in_reply_to): - # It's a reply to one of our local posts - from shared.models.federation import APActivity - local_activity = (await g.s.execute( - select(APActivity).where( - APActivity.activity_id == in_reply_to, - ) - )).scalar_one_or_none() - if local_activity: - ra = (await g.s.execute( - select(RemoteActor).where(RemoteActor.actor_url == from_actor_url) - )).scalar_one_or_none() - rp = (await g.s.execute( - select(APRemotePost).where( - APRemotePost.object_id == obj.get("id") - ) - )).scalar_one_or_none() - - notif = APNotification( - actor_profile_id=local_activity.actor_profile_id, - notification_type="reply", - from_remote_actor_id=ra.id if ra else None, - target_remote_post_id=rp.id if rp else None, - ) - g.s.add(notif) - - async def _handle_update( - actor_row: ActorProfile, - body: dict, - from_actor_url: str, - ) -> None: - """Process Update — re-ingest remote post.""" - obj = body.get("object") - if not obj or not isinstance(obj, dict): - return - obj_type = obj.get("type", "") - if obj_type in ("Note", "Article"): - remote = await services.federation.get_or_fetch_remote_actor(g.s, from_actor_url) - if remote: - await services.federation.ingest_remote_post(g.s, remote.id, body, obj) - log.info("Updated %s from %s", obj_type, from_actor_url) - - async def _handle_delete( - actor_row: ActorProfile, - body: dict, - from_actor_url: str, - ) -> None: - """Process Delete — remove remote post.""" - obj = body.get("object") - if isinstance(obj, str): - object_id = obj - elif isinstance(obj, dict): - object_id = obj.get("id", "") - else: - return - if object_id: - await services.federation.delete_remote_post(g.s, object_id) - log.info("Deleted remote post %s from %s", object_id, from_actor_url) - - async def _handle_like( - actor_row: ActorProfile, - body: dict, - from_actor_url: str, - ) -> None: - """Process incoming Like — record interaction + notify.""" - from shared.models.federation import APInteraction, APNotification, RemoteActor - - object_id = body.get("object", "") - if isinstance(object_id, dict): - object_id = object_id.get("id", "") - if not object_id: - return - - remote = await services.federation.get_or_fetch_remote_actor(g.s, from_actor_url) - if not remote: - return - - ra = (await g.s.execute( - select(RemoteActor).where(RemoteActor.actor_url == from_actor_url) - )).scalar_one_or_none() - - # Find the local activity this Like targets - from shared.models.federation import APActivity - target = (await g.s.execute( - select(APActivity).where(APActivity.activity_id == object_id) - )).scalar_one_or_none() - - if not target: - # Try matching object_data.id - log.info("Like from %s for %s (target not found locally)", from_actor_url, object_id) - return - - # Record interaction - interaction = APInteraction( - remote_actor_id=ra.id if ra else None, - post_type="local", - post_id=target.id, - interaction_type="like", - activity_id=body.get("id"), - ) - g.s.add(interaction) - - # Notification - notif = APNotification( - actor_profile_id=target.actor_profile_id, - notification_type="like", - from_remote_actor_id=ra.id if ra else None, - target_activity_id=target.id, - ) - g.s.add(notif) - log.info("Like from %s on activity %s", from_actor_url, object_id) - - async def _handle_announce( - actor_row: ActorProfile, - body: dict, - from_actor_url: str, - ) -> None: - """Process incoming Announce (boost) — record interaction + notify.""" - from shared.models.federation import APInteraction, APNotification, RemoteActor - - object_id = body.get("object", "") - if isinstance(object_id, dict): - object_id = object_id.get("id", "") - if not object_id: - return - - remote = await services.federation.get_or_fetch_remote_actor(g.s, from_actor_url) - if not remote: - return - - ra = (await g.s.execute( - select(RemoteActor).where(RemoteActor.actor_url == from_actor_url) - )).scalar_one_or_none() - - from shared.models.federation import APActivity - target = (await g.s.execute( - select(APActivity).where(APActivity.activity_id == object_id) - )).scalar_one_or_none() - - if not target: - log.info("Announce from %s for %s (target not found locally)", from_actor_url, object_id) - return - - interaction = APInteraction( - remote_actor_id=ra.id if ra else None, - post_type="local", - post_id=target.id, - interaction_type="boost", - activity_id=body.get("id"), - ) - g.s.add(interaction) - - notif = APNotification( - actor_profile_id=target.actor_profile_id, - notification_type="boost", - from_remote_actor_id=ra.id if ra else None, - target_activity_id=target.id, - ) - g.s.add(notif) - log.info("Announce from %s on activity %s", from_actor_url, object_id) - - @bp.get("//followers") - async def followers(username: str): - actor = await services.federation.get_actor_by_username(g.s, username) - if not actor: - abort(404) - - domain = _domain() - collection_id = f"https://{domain}/users/{username}/followers" - follower_list = await services.federation.get_followers(g.s, username) - page_param = request.args.get("page") - - if not page_param: - return Response( - response=json.dumps({ - "@context": "https://www.w3.org/ns/activitystreams", - "type": "OrderedCollection", - "id": collection_id, - "totalItems": len(follower_list), - "first": f"{collection_id}?page=1", - }), - content_type=AP_CONTENT_TYPE, - ) - - return Response( - response=json.dumps({ - "@context": "https://www.w3.org/ns/activitystreams", - "type": "OrderedCollectionPage", - "id": f"{collection_id}?page=1", - "partOf": collection_id, - "totalItems": len(follower_list), - "orderedItems": [f.follower_actor_url for f in follower_list], - }), - content_type=AP_CONTENT_TYPE, - ) - - @bp.get("//following") - async def following(username: str): - actor = await services.federation.get_actor_by_username(g.s, username) - if not actor: - abort(404) - - domain = _domain() - collection_id = f"https://{domain}/users/{username}/following" - following_list, total = await services.federation.get_following(g.s, username) - page_param = request.args.get("page") - - if not page_param: - return Response( - response=json.dumps({ - "@context": "https://www.w3.org/ns/activitystreams", - "type": "OrderedCollection", - "id": collection_id, - "totalItems": total, - "first": f"{collection_id}?page=1", - }), - content_type=AP_CONTENT_TYPE, - ) - - return Response( - response=json.dumps({ - "@context": "https://www.w3.org/ns/activitystreams", - "type": "OrderedCollectionPage", - "id": f"{collection_id}?page=1", - "partOf": collection_id, - "totalItems": total, - "orderedItems": [f.actor_url for f in following_list], - }), - content_type=AP_CONTENT_TYPE, - ) - - return bp diff --git a/bp/wellknown/__init__.py b/bp/wellknown/__init__.py deleted file mode 100644 index e69de29..0000000 diff --git a/bp/wellknown/routes.py b/bp/wellknown/routes.py deleted file mode 100644 index f36e32b..0000000 --- a/bp/wellknown/routes.py +++ /dev/null @@ -1,114 +0,0 @@ -"""Well-known federation endpoints: WebFinger, NodeInfo, host-meta. - -Ported from ~/art-dag/activity-pub/app/routers/federation.py. -""" -from __future__ import annotations - -import os - -from quart import Blueprint, request, abort, Response, g - -from shared.services.registry import services - - -def _domain() -> str: - return os.getenv("AP_DOMAIN", "rose-ash.com") - - -def register(url_prefix=""): - bp = Blueprint("wellknown", __name__, url_prefix=url_prefix) - - @bp.get("/.well-known/webfinger") - async def webfinger(): - resource = request.args.get("resource", "") - if not resource.startswith("acct:"): - abort(400, "Invalid resource format") - - parts = resource[5:].split("@") - if len(parts) != 2: - abort(400, "Invalid resource format") - - username, domain = parts - if domain != _domain(): - abort(404, "User not on this server") - - actor = await services.federation.get_actor_by_username(g.s, username) - if not actor: - abort(404, "User not found") - - domain = _domain() - return Response( - response=__import__("json").dumps({ - "subject": resource, - "aliases": [f"https://{domain}/users/{username}"], - "links": [ - { - "rel": "self", - "type": "application/activity+json", - "href": f"https://{domain}/users/{username}", - }, - { - "rel": "http://webfinger.net/rel/profile-page", - "type": "text/html", - "href": f"https://{domain}/users/{username}", - }, - ], - }), - content_type="application/jrd+json", - ) - - @bp.get("/.well-known/nodeinfo") - async def nodeinfo_index(): - domain = _domain() - return Response( - response=__import__("json").dumps({ - "links": [ - { - "rel": "http://nodeinfo.diaspora.software/ns/schema/2.0", - "href": f"https://{domain}/nodeinfo/2.0", - } - ] - }), - content_type="application/json", - ) - - @bp.get("/nodeinfo/2.0") - async def nodeinfo(): - stats = await services.federation.get_stats(g.s) - return Response( - response=__import__("json").dumps({ - "version": "2.0", - "software": { - "name": "rose-ash", - "version": "1.0.0", - }, - "protocols": ["activitypub"], - "usage": { - "users": { - "total": stats.get("actors", 0), - "activeMonth": stats.get("actors", 0), - }, - "localPosts": stats.get("activities", 0), - }, - "openRegistrations": False, - "metadata": { - "nodeName": "Rose Ash", - "nodeDescription": "Cooperative platform with ActivityPub federation", - }, - }), - content_type="application/json", - ) - - @bp.get("/.well-known/host-meta") - async def host_meta(): - domain = _domain() - xml = ( - '\n' - '\n' - f' \n' - '' - ) - return Response(response=xml, content_type="application/xrd+xml") - - return bp diff --git a/shared b/shared index f2262f7..1bb19c9 160000 --- a/shared +++ b/shared @@ -1 +1 @@ -Subproject commit f2262f702b82df1f7b03ed3808dceb693f9cff96 +Subproject commit 1bb19c96ed35a404aa093f04e4457880f37d4b07