diff --git a/alembic/versions/t0r8n4o6p7_add_app_domain_to_ap_followers.py b/alembic/versions/t0r8n4o6p7_add_app_domain_to_ap_followers.py new file mode 100644 index 0000000..1bda1b9 --- /dev/null +++ b/alembic/versions/t0r8n4o6p7_add_app_domain_to_ap_followers.py @@ -0,0 +1,42 @@ +"""Add app_domain to ap_followers for per-app AP actors + +Revision ID: t0r8n4o6p7 +Revises: s9q7n3o5p6 +""" +from alembic import op +import sqlalchemy as sa + +revision = "t0r8n4o6p7" +down_revision = "s9q7n3o5p6" +branch_labels = None +depends_on = None + + +def upgrade(): + op.add_column( + "ap_followers", + sa.Column("app_domain", sa.String(64), nullable=True), + ) + # Replace old unique constraint with one that includes app_domain + op.drop_constraint("uq_follower_acct", "ap_followers", type_="unique") + op.create_unique_constraint( + "uq_follower_acct_app", + "ap_followers", + ["actor_profile_id", "follower_acct", "app_domain"], + ) + op.create_index( + "ix_ap_follower_app_domain", + "ap_followers", + ["actor_profile_id", "app_domain"], + ) + + +def downgrade(): + op.drop_index("ix_ap_follower_app_domain", table_name="ap_followers") + op.drop_constraint("uq_follower_acct_app", "ap_followers", type_="unique") + op.create_unique_constraint( + "uq_follower_acct", + "ap_followers", + ["actor_profile_id", "follower_acct"], + ) + op.drop_column("ap_followers", "app_domain") diff --git a/contracts/dtos.py b/contracts/dtos.py index e7c560b..b27f9d1 100644 --- a/contracts/dtos.py +++ b/contracts/dtos.py @@ -176,6 +176,7 @@ class APFollowerDTO: follower_inbox: str follower_actor_url: str created_at: datetime | None = None + app_domain: str | None = None @dataclass(frozen=True, slots=True) diff --git a/contracts/protocols.py b/contracts/protocols.py index e320b31..dfb32da 100644 --- a/contracts/protocols.py +++ b/contracts/protocols.py @@ -222,6 +222,7 @@ class FederationService(Protocol): async def get_outbox( self, session: AsyncSession, username: str, page: int = 1, per_page: int = 20, + origin_app: str | None = None, ) -> tuple[list[APActivityDTO], int]: ... async def get_activity_for_source( @@ -236,6 +237,7 @@ class FederationService(Protocol): # -- Followers ------------------------------------------------------------ async def get_followers( self, session: AsyncSession, username: str, + app_domain: str | None = None, ) -> list[APFollowerDTO]: ... async def get_followers_paginated( @@ -247,10 +249,12 @@ class FederationService(Protocol): self, session: AsyncSession, username: str, follower_acct: str, follower_inbox: str, follower_actor_url: str, follower_public_key: str | None = None, + app_domain: str | None = None, ) -> APFollowerDTO: ... async def remove_follower( self, session: AsyncSession, username: str, follower_acct: str, + app_domain: str | None = None, ) -> bool: ... # -- Remote actors -------------------------------------------------------- diff --git a/events/handlers/ap_delivery_handler.py b/events/handlers/ap_delivery_handler.py index a0aea93..4b2a837 100644 --- a/events/handlers/ap_delivery_handler.py +++ b/events/handlers/ap_delivery_handler.py @@ -131,10 +131,30 @@ async def on_any_activity(activity: APActivity, session: AsyncSession) -> None: log.warning("Actor not found or missing key for activity %s", activity.activity_id) return - # Load followers + # Load followers: aggregate (app_domain IS NULL) always get everything. + # If the activity has an origin_app, also include app-specific followers. + from sqlalchemy import or_ + + follower_filters = [ + APFollower.actor_profile_id == actor.id, + ] + + origin_app = activity.origin_app + if origin_app and origin_app != "federation": + # Aggregate followers (NULL) + followers of this specific app + follower_filters.append( + or_( + APFollower.app_domain.is_(None), + APFollower.app_domain == origin_app, + ) + ) + else: + # Federation / no origin_app: deliver to all followers + pass + followers = ( await session.execute( - select(APFollower).where(APFollower.actor_profile_id == actor.id) + select(APFollower).where(*follower_filters) ) ).scalars().all() @@ -142,7 +162,7 @@ async def on_any_activity(activity: APActivity, session: AsyncSession) -> None: log.debug("No followers to deliver to for %s", activity.activity_id) return - # Deduplicate inboxes + # Deduplicate inboxes (same remote actor may follow both aggregate + app) all_inboxes = {f.follower_inbox for f in followers if f.follower_inbox} # Check delivery log — skip inboxes we already delivered to (idempotency) diff --git a/infrastructure/activitypub.py b/infrastructure/activitypub.py new file mode 100644 index 0000000..2fa2020 --- /dev/null +++ b/infrastructure/activitypub.py @@ -0,0 +1,446 @@ +"""Per-app ActivityPub blueprint. + +Factory function ``create_activitypub_blueprint(app_name)`` returns a +Blueprint with WebFinger, host-meta, nodeinfo, actor profile, inbox, +outbox, and followers endpoints. + +Per-app actors are *virtual projections* of the same ``ActorProfile``. +Same keypair, same ``preferred_username`` — the only differences are: +- the domain in URLs (e.g. blog.rose-ash.com vs federation.rose-ash.com) +- which activities are served in the outbox (filtered by ``origin_app``) +- which followers are returned (filtered by ``app_domain``) +- Follow requests create ``APFollower(app_domain=app_name)`` + +Federation app acts as the aggregate: no origin_app filter, app_domain=NULL. +""" +from __future__ import annotations + +import json +import logging +import os +from datetime import datetime, timezone + +from quart import Blueprint, request, abort, Response, g +from sqlalchemy import select + +from shared.services.registry import services +from shared.models.federation import ActorProfile, APInboxItem +from shared.browser.app.csrf import csrf_exempt + +log = logging.getLogger(__name__) + +AP_CONTENT_TYPE = "application/activity+json" + +# Apps that serve per-app AP actors +AP_APPS = {"blog", "market", "events", "federation"} + + +def _ap_domain(app_name: str) -> str: + """Return the public domain for this app's AP identity.""" + env_key = f"AP_DOMAIN_{app_name.upper()}" + env_val = os.getenv(env_key) + if env_val: + return env_val + # Default: {app}.rose-ash.com, except federation uses AP_DOMAIN + if app_name == "federation": + return os.getenv("AP_DOMAIN", "federation.rose-ash.com") + return f"{app_name}.rose-ash.com" + + +def _federation_domain() -> str: + """The aggregate federation domain (for alsoKnownAs links).""" + return os.getenv("AP_DOMAIN", "federation.rose-ash.com") + + +def _is_aggregate(app_name: str) -> bool: + """Federation serves the aggregate actor (no per-app filter).""" + return app_name == "federation" + + +def create_activitypub_blueprint(app_name: str) -> Blueprint: + """Return a Blueprint with AP endpoints for *app_name*.""" + bp = Blueprint("activitypub", __name__) + + domain = _ap_domain(app_name) + fed_domain = _federation_domain() + aggregate = _is_aggregate(app_name) + # For per-app follows, store app_domain; for federation aggregate, NULL + follower_app_domain: str | None = None if aggregate else app_name + # For per-app outboxes, filter by origin_app; for federation, show all + outbox_origin_app: str | None = None if aggregate else app_name + + # ------------------------------------------------------------------ + # Well-known endpoints + # ------------------------------------------------------------------ + + @bp.get("/.well-known/webfinger") + async def webfinger(): + resource = request.args.get("resource", "") + if not resource.startswith("acct:"): + abort(400, "Invalid resource format") + + parts = resource[5:].split("@") + if len(parts) != 2: + abort(400, "Invalid resource format") + + username, res_domain = parts + if res_domain != domain: + abort(404, "User not on this server") + + actor = await services.federation.get_actor_by_username(g.s, username) + if not actor: + abort(404, "User not found") + + actor_url = f"https://{domain}/users/{username}" + return Response( + response=json.dumps({ + "subject": resource, + "aliases": [actor_url], + "links": [ + { + "rel": "self", + "type": AP_CONTENT_TYPE, + "href": actor_url, + }, + { + "rel": "http://webfinger.net/rel/profile-page", + "type": "text/html", + "href": actor_url, + }, + ], + }), + content_type="application/jrd+json", + ) + + @bp.get("/.well-known/nodeinfo") + async def nodeinfo_index(): + return Response( + response=json.dumps({ + "links": [ + { + "rel": "http://nodeinfo.diaspora.software/ns/schema/2.0", + "href": f"https://{domain}/nodeinfo/2.0", + } + ] + }), + content_type="application/json", + ) + + @bp.get("/nodeinfo/2.0") + async def nodeinfo(): + stats = await services.federation.get_stats(g.s) + return Response( + response=json.dumps({ + "version": "2.0", + "software": { + "name": "rose-ash", + "version": "1.0.0", + }, + "protocols": ["activitypub"], + "usage": { + "users": { + "total": stats.get("actors", 0), + "activeMonth": stats.get("actors", 0), + }, + "localPosts": stats.get("activities", 0), + }, + "openRegistrations": False, + "metadata": { + "nodeName": f"Rose Ash ({app_name})", + "nodeDescription": f"Rose Ash {app_name} — ActivityPub federation", + }, + }), + content_type="application/json", + ) + + @bp.get("/.well-known/host-meta") + async def host_meta(): + xml = ( + '\n' + '\n' + f' \n' + '' + ) + return Response(response=xml, content_type="application/xrd+xml") + + # ------------------------------------------------------------------ + # Actor profile + # ------------------------------------------------------------------ + + @bp.get("/users/") + async def actor_profile(username: str): + actor = await services.federation.get_actor_by_username(g.s, username) + if not actor: + abort(404) + + accept_header = request.headers.get("accept", "") + + if "application/activity+json" in accept_header or "application/ld+json" in accept_header: + actor_url = f"https://{domain}/users/{username}" + actor_json = { + "@context": [ + "https://www.w3.org/ns/activitystreams", + "https://w3id.org/security/v1", + ], + "type": "Person", + "id": actor_url, + "name": actor.display_name or username, + "preferredUsername": username, + "summary": actor.summary or "", + "manuallyApprovesFollowers": False, + "inbox": f"{actor_url}/inbox", + "outbox": f"{actor_url}/outbox", + "followers": f"{actor_url}/followers", + "following": f"{actor_url}/following", + "publicKey": { + "id": f"{actor_url}#main-key", + "owner": actor_url, + "publicKeyPem": actor.public_key_pem, + }, + "url": actor_url, + } + + # Per-app actors link back to the aggregate federation actor + if not aggregate and domain != fed_domain: + actor_json["alsoKnownAs"] = [ + f"https://{fed_domain}/users/{username}", + ] + + return Response( + response=json.dumps(actor_json), + content_type=AP_CONTENT_TYPE, + ) + + # HTML: federation renders its own profile; other apps redirect there + if aggregate: + from quart import render_template + activities, total = await services.federation.get_outbox( + g.s, username, page=1, per_page=20, + ) + return await render_template( + "federation/profile.html", + actor=actor, + activities=activities, + total=total, + ) + from quart import redirect + return redirect(f"https://{fed_domain}/users/{username}") + + # ------------------------------------------------------------------ + # Inbox + # ------------------------------------------------------------------ + + @csrf_exempt + @bp.post("/users//inbox") + async def inbox(username: str): + actor = await services.federation.get_actor_by_username(g.s, username) + if not actor: + abort(404) + + body = await request.get_json() + if not body: + abort(400, "Invalid JSON") + + activity_type = body.get("type", "") + from_actor_url = body.get("actor", "") + + # Verify HTTP signature (best-effort) + sig_valid = False + try: + from shared.utils.http_signatures import verify_request_signature + from shared.infrastructure.ap_inbox_handlers import fetch_remote_actor + + req_headers = dict(request.headers) + sig_header = req_headers.get("Signature", "") + + remote_actor = await fetch_remote_actor(from_actor_url) + if remote_actor and sig_header: + pub_key_pem = (remote_actor.get("publicKey") or {}).get("publicKeyPem") + if pub_key_pem: + sig_valid = verify_request_signature( + public_key_pem=pub_key_pem, + signature_header=sig_header, + method="POST", + path=f"/users/{username}/inbox", + headers=req_headers, + ) + except Exception: + log.debug("Signature verification failed for %s", from_actor_url, exc_info=True) + + if not sig_valid: + log.warning( + "Unverified inbox POST from %s (%s) on %s — accepting anyway for now", + from_actor_url, activity_type, domain, + ) + + # Load actor row for DB operations + actor_row = ( + await g.s.execute( + select(ActorProfile).where( + ActorProfile.preferred_username == username + ) + ) + ).scalar_one() + + # Store raw inbox item + item = APInboxItem( + actor_profile_id=actor_row.id, + raw_json=body, + activity_type=activity_type, + from_actor=from_actor_url, + ) + g.s.add(item) + await g.s.flush() + + # Dispatch to shared handlers + from shared.infrastructure.ap_inbox_handlers import dispatch_inbox_activity + await dispatch_inbox_activity( + g.s, actor_row, body, from_actor_url, + domain=domain, + app_domain=follower_app_domain, + ) + + # Mark as processed + item.state = "processed" + item.processed_at = datetime.now(timezone.utc) + await g.s.flush() + + return Response(status=202) + + # ------------------------------------------------------------------ + # Outbox + # ------------------------------------------------------------------ + + @bp.get("/users//outbox") + async def outbox(username: str): + actor = await services.federation.get_actor_by_username(g.s, username) + if not actor: + abort(404) + + actor_url = f"https://{domain}/users/{username}" + page_param = request.args.get("page") + + if not page_param: + _, total = await services.federation.get_outbox( + g.s, username, page=1, per_page=1, + origin_app=outbox_origin_app, + ) + return Response( + response=json.dumps({ + "@context": "https://www.w3.org/ns/activitystreams", + "type": "OrderedCollection", + "id": f"{actor_url}/outbox", + "totalItems": total, + "first": f"{actor_url}/outbox?page=1", + }), + content_type=AP_CONTENT_TYPE, + ) + + page_num = int(page_param) + activities, total = await services.federation.get_outbox( + g.s, username, page=page_num, per_page=20, + origin_app=outbox_origin_app, + ) + + items = [] + for a in activities: + items.append({ + "@context": "https://www.w3.org/ns/activitystreams", + "type": a.activity_type, + "id": a.activity_id, + "actor": actor_url, + "published": a.published.isoformat() if a.published else None, + "object": { + "type": a.object_type, + **(a.object_data or {}), + }, + }) + + return Response( + response=json.dumps({ + "@context": "https://www.w3.org/ns/activitystreams", + "type": "OrderedCollectionPage", + "id": f"{actor_url}/outbox?page={page_num}", + "partOf": f"{actor_url}/outbox", + "totalItems": total, + "orderedItems": items, + }), + content_type=AP_CONTENT_TYPE, + ) + + # ------------------------------------------------------------------ + # Followers / following collections + # ------------------------------------------------------------------ + + @bp.get("/users//followers") + async def followers(username: str): + actor = await services.federation.get_actor_by_username(g.s, username) + if not actor: + abort(404) + + collection_id = f"https://{domain}/users/{username}/followers" + follower_list = await services.federation.get_followers( + g.s, username, app_domain=follower_app_domain, + ) + page_param = request.args.get("page") + + if not page_param: + return Response( + response=json.dumps({ + "@context": "https://www.w3.org/ns/activitystreams", + "type": "OrderedCollection", + "id": collection_id, + "totalItems": len(follower_list), + "first": f"{collection_id}?page=1", + }), + content_type=AP_CONTENT_TYPE, + ) + + return Response( + response=json.dumps({ + "@context": "https://www.w3.org/ns/activitystreams", + "type": "OrderedCollectionPage", + "id": f"{collection_id}?page=1", + "partOf": collection_id, + "totalItems": len(follower_list), + "orderedItems": [f.follower_actor_url for f in follower_list], + }), + content_type=AP_CONTENT_TYPE, + ) + + @bp.get("/users//following") + async def following(username: str): + actor = await services.federation.get_actor_by_username(g.s, username) + if not actor: + abort(404) + + collection_id = f"https://{domain}/users/{username}/following" + following_list, total = await services.federation.get_following(g.s, username) + page_param = request.args.get("page") + + if not page_param: + return Response( + response=json.dumps({ + "@context": "https://www.w3.org/ns/activitystreams", + "type": "OrderedCollection", + "id": collection_id, + "totalItems": total, + "first": f"{collection_id}?page=1", + }), + content_type=AP_CONTENT_TYPE, + ) + + return Response( + response=json.dumps({ + "@context": "https://www.w3.org/ns/activitystreams", + "type": "OrderedCollectionPage", + "id": f"{collection_id}?page=1", + "partOf": collection_id, + "totalItems": total, + "orderedItems": [f.actor_url for f in following_list], + }), + content_type=AP_CONTENT_TYPE, + ) + + return bp diff --git a/infrastructure/ap_inbox_handlers.py b/infrastructure/ap_inbox_handlers.py new file mode 100644 index 0000000..0db49b1 --- /dev/null +++ b/infrastructure/ap_inbox_handlers.py @@ -0,0 +1,508 @@ +"""Reusable AP inbox handlers for all apps. + +Extracted from federation/bp/actors/routes.py so that every app's +shared AP blueprint can process Follow, Undo, Accept, Create, etc. +""" +from __future__ import annotations + +import json +import logging +import uuid +from datetime import datetime, timezone + +import httpx +from sqlalchemy import select +from sqlalchemy.ext.asyncio import AsyncSession + +from shared.models.federation import ( + ActorProfile, APInboxItem, APInteraction, APNotification, + APRemotePost, APActivity, RemoteActor, +) +from shared.services.registry import services + +log = logging.getLogger(__name__) + +AP_CONTENT_TYPE = "application/activity+json" + + +# --------------------------------------------------------------------------- +# Helpers +# --------------------------------------------------------------------------- + +async def fetch_remote_actor(actor_url: str) -> dict | None: + """Fetch a remote actor's JSON-LD profile.""" + try: + async with httpx.AsyncClient(timeout=10) as client: + resp = await client.get( + actor_url, + headers={"Accept": AP_CONTENT_TYPE}, + ) + if resp.status_code == 200: + return resp.json() + except Exception: + log.exception("Failed to fetch remote actor: %s", actor_url) + return None + + +async def send_accept( + actor: ActorProfile, + follow_activity: dict, + follower_inbox: str, + domain: str, +) -> None: + """Send an Accept activity back to the follower.""" + from shared.utils.http_signatures import sign_request + from urllib.parse import urlparse + + username = actor.preferred_username + actor_url = f"https://{domain}/users/{username}" + + accept_id = f"{actor_url}/activities/{uuid.uuid4()}" + accept = { + "@context": "https://www.w3.org/ns/activitystreams", + "id": accept_id, + "type": "Accept", + "actor": actor_url, + "object": follow_activity, + } + + body_bytes = json.dumps(accept).encode() + key_id = f"{actor_url}#main-key" + + parsed = urlparse(follower_inbox) + headers = sign_request( + private_key_pem=actor.private_key_pem, + key_id=key_id, + method="POST", + path=parsed.path, + host=parsed.netloc, + body=body_bytes, + ) + headers["Content-Type"] = AP_CONTENT_TYPE + + try: + async with httpx.AsyncClient(timeout=15) as client: + resp = await client.post( + follower_inbox, + content=body_bytes, + headers=headers, + ) + log.info("Accept → %s: %d", follower_inbox, resp.status_code) + except Exception: + log.exception("Failed to send Accept to %s", follower_inbox) + + +async def backfill_follower( + session: AsyncSession, + actor: ActorProfile, + follower_inbox: str, + domain: str, + origin_app: str | None = None, +) -> None: + """Deliver recent Create activities to a new follower's inbox.""" + from shared.events.handlers.ap_delivery_handler import ( + _build_activity_json, _deliver_to_inbox, + ) + + filters = [ + APActivity.actor_profile_id == actor.id, + APActivity.is_local == True, # noqa: E712 + APActivity.activity_type == "Create", + ] + if origin_app is not None: + filters.append(APActivity.origin_app == origin_app) + + activities = ( + await session.execute( + select(APActivity).where(*filters) + .order_by(APActivity.published.desc()) + .limit(20) + ) + ).scalars().all() + + if not activities: + return + + log.info( + "Backfilling %d posts to %s for @%s", + len(activities), follower_inbox, actor.preferred_username, + ) + + async with httpx.AsyncClient() as client: + for activity in reversed(activities): # oldest first + activity_json = _build_activity_json(activity, actor, domain) + await _deliver_to_inbox(client, follower_inbox, activity_json, actor, domain) + + +# --------------------------------------------------------------------------- +# Inbox activity handlers +# --------------------------------------------------------------------------- + +async def handle_follow( + session: AsyncSession, + actor_row: ActorProfile, + body: dict, + from_actor_url: str, + domain: str, + app_domain: str | None = None, +) -> None: + """Process a Follow activity: add follower, send Accept, backfill.""" + remote_actor = await fetch_remote_actor(from_actor_url) + if not remote_actor: + log.warning("Could not fetch remote actor for Follow: %s", from_actor_url) + return + + follower_inbox = remote_actor.get("inbox") + if not follower_inbox: + log.warning("Remote actor has no inbox: %s", from_actor_url) + return + + remote_username = remote_actor.get("preferredUsername", "") + from urllib.parse import urlparse + remote_domain = urlparse(from_actor_url).netloc + follower_acct = f"{remote_username}@{remote_domain}" if remote_username else from_actor_url + + pub_key = (remote_actor.get("publicKey") or {}).get("publicKeyPem") + + await services.federation.add_follower( + session, + actor_row.preferred_username, + follower_acct=follower_acct, + follower_inbox=follower_inbox, + follower_actor_url=from_actor_url, + follower_public_key=pub_key, + app_domain=app_domain, + ) + + log.info( + "New follower: %s → @%s (app_domain=%s)", + follower_acct, actor_row.preferred_username, app_domain, + ) + + # Notification + ra = ( + await session.execute( + select(RemoteActor).where(RemoteActor.actor_url == from_actor_url) + ) + ).scalar_one_or_none() + if not ra: + ra_dto = await services.federation.get_or_fetch_remote_actor(session, from_actor_url) + if ra_dto: + ra = (await session.execute( + select(RemoteActor).where(RemoteActor.actor_url == from_actor_url) + )).scalar_one_or_none() + + if ra: + notif = APNotification( + actor_profile_id=actor_row.id, + notification_type="follow", + from_remote_actor_id=ra.id, + ) + session.add(notif) + + # Send Accept + await send_accept(actor_row, body, follower_inbox, domain) + + # Backfill: deliver recent posts (filtered by origin_app for per-app follows) + origin_app = app_domain if app_domain and app_domain != "federation" else None + await backfill_follower(session, actor_row, follower_inbox, domain, origin_app=origin_app) + + +async def handle_undo( + session: AsyncSession, + actor_row: ActorProfile, + body: dict, + from_actor_url: str, + app_domain: str | None = None, +) -> None: + """Process an Undo activity (typically Undo Follow).""" + inner = body.get("object") + if not inner: + return + + inner_type = inner.get("type") if isinstance(inner, dict) else None + if inner_type == "Follow": + from urllib.parse import urlparse + remote_domain = urlparse(from_actor_url).netloc + remote_actor = await fetch_remote_actor(from_actor_url) + remote_username = "" + if remote_actor: + remote_username = remote_actor.get("preferredUsername", "") + follower_acct = f"{remote_username}@{remote_domain}" if remote_username else from_actor_url + + removed = await services.federation.remove_follower( + session, actor_row.preferred_username, follower_acct, + app_domain=app_domain, + ) + if removed: + log.info("Unfollowed: %s → @%s (app_domain=%s)", follower_acct, actor_row.preferred_username, app_domain) + else: + log.debug("Undo Follow: follower not found: %s", follower_acct) + else: + log.debug("Undo for %s — not handled", inner_type) + + +async def handle_accept( + session: AsyncSession, + actor_row: ActorProfile, + body: dict, + from_actor_url: str, +) -> None: + """Process Accept activity — update outbound follow state.""" + inner = body.get("object") + if not inner: + return + + inner_type = inner.get("type") if isinstance(inner, dict) else None + if inner_type == "Follow": + await services.federation.accept_follow_response( + session, actor_row.preferred_username, from_actor_url, + ) + log.info("Follow accepted by %s for @%s", from_actor_url, actor_row.preferred_username) + + +async def handle_create( + session: AsyncSession, + actor_row: ActorProfile, + body: dict, + from_actor_url: str, + federation_domain: str, +) -> None: + """Process Create(Note/Article) — ingest remote post.""" + obj = body.get("object") + if not obj or not isinstance(obj, dict): + return + + obj_type = obj.get("type", "") + if obj_type not in ("Note", "Article"): + log.debug("Create with type %s — skipping", obj_type) + return + + remote = await services.federation.get_or_fetch_remote_actor(session, from_actor_url) + if not remote: + log.warning("Could not resolve remote actor for Create: %s", from_actor_url) + return + + await services.federation.ingest_remote_post(session, remote.id, body, obj) + log.info("Ingested %s from %s", obj_type, from_actor_url) + + # Mention notification + tags = obj.get("tag", []) + if isinstance(tags, list): + for tag in tags: + if not isinstance(tag, dict): + continue + if tag.get("type") != "Mention": + continue + href = tag.get("href", "") + if f"https://{federation_domain}/users/" in href: + mentioned_username = href.rsplit("/", 1)[-1] + mentioned = await services.federation.get_actor_by_username( + session, mentioned_username, + ) + if mentioned: + rp = (await session.execute( + select(APRemotePost).where( + APRemotePost.object_id == obj.get("id") + ) + )).scalar_one_or_none() + + ra = (await session.execute( + select(RemoteActor).where(RemoteActor.actor_url == from_actor_url) + )).scalar_one_or_none() + + notif = APNotification( + actor_profile_id=mentioned.id, + notification_type="mention", + from_remote_actor_id=ra.id if ra else None, + target_remote_post_id=rp.id if rp else None, + ) + session.add(notif) + + # Reply notification + in_reply_to = obj.get("inReplyTo") + if in_reply_to and f"https://{federation_domain}/users/" in str(in_reply_to): + local_activity = (await session.execute( + select(APActivity).where( + APActivity.activity_id == in_reply_to, + ) + )).scalar_one_or_none() + if local_activity: + ra = (await session.execute( + select(RemoteActor).where(RemoteActor.actor_url == from_actor_url) + )).scalar_one_or_none() + rp = (await session.execute( + select(APRemotePost).where( + APRemotePost.object_id == obj.get("id") + ) + )).scalar_one_or_none() + + notif = APNotification( + actor_profile_id=local_activity.actor_profile_id, + notification_type="reply", + from_remote_actor_id=ra.id if ra else None, + target_remote_post_id=rp.id if rp else None, + ) + session.add(notif) + + +async def handle_update( + session: AsyncSession, + actor_row: ActorProfile, + body: dict, + from_actor_url: str, +) -> None: + """Process Update — re-ingest remote post.""" + obj = body.get("object") + if not obj or not isinstance(obj, dict): + return + obj_type = obj.get("type", "") + if obj_type in ("Note", "Article"): + remote = await services.federation.get_or_fetch_remote_actor(session, from_actor_url) + if remote: + await services.federation.ingest_remote_post(session, remote.id, body, obj) + log.info("Updated %s from %s", obj_type, from_actor_url) + + +async def handle_delete( + session: AsyncSession, + actor_row: ActorProfile, + body: dict, + from_actor_url: str, +) -> None: + """Process Delete — remove remote post.""" + obj = body.get("object") + if isinstance(obj, str): + object_id = obj + elif isinstance(obj, dict): + object_id = obj.get("id", "") + else: + return + if object_id: + await services.federation.delete_remote_post(session, object_id) + log.info("Deleted remote post %s from %s", object_id, from_actor_url) + + +async def handle_like( + session: AsyncSession, + actor_row: ActorProfile, + body: dict, + from_actor_url: str, +) -> None: + """Process incoming Like — record interaction + notify.""" + object_id = body.get("object", "") + if isinstance(object_id, dict): + object_id = object_id.get("id", "") + if not object_id: + return + + remote = await services.federation.get_or_fetch_remote_actor(session, from_actor_url) + if not remote: + return + + ra = (await session.execute( + select(RemoteActor).where(RemoteActor.actor_url == from_actor_url) + )).scalar_one_or_none() + + target = (await session.execute( + select(APActivity).where(APActivity.activity_id == object_id) + )).scalar_one_or_none() + + if not target: + log.info("Like from %s for %s (target not found locally)", from_actor_url, object_id) + return + + interaction = APInteraction( + remote_actor_id=ra.id if ra else None, + post_type="local", + post_id=target.id, + interaction_type="like", + activity_id=body.get("id"), + ) + session.add(interaction) + + notif = APNotification( + actor_profile_id=target.actor_profile_id, + notification_type="like", + from_remote_actor_id=ra.id if ra else None, + target_activity_id=target.id, + ) + session.add(notif) + log.info("Like from %s on activity %s", from_actor_url, object_id) + + +async def handle_announce( + session: AsyncSession, + actor_row: ActorProfile, + body: dict, + from_actor_url: str, +) -> None: + """Process incoming Announce (boost) — record interaction + notify.""" + object_id = body.get("object", "") + if isinstance(object_id, dict): + object_id = object_id.get("id", "") + if not object_id: + return + + remote = await services.federation.get_or_fetch_remote_actor(session, from_actor_url) + if not remote: + return + + ra = (await session.execute( + select(RemoteActor).where(RemoteActor.actor_url == from_actor_url) + )).scalar_one_or_none() + + target = (await session.execute( + select(APActivity).where(APActivity.activity_id == object_id) + )).scalar_one_or_none() + + if not target: + log.info("Announce from %s for %s (target not found locally)", from_actor_url, object_id) + return + + interaction = APInteraction( + remote_actor_id=ra.id if ra else None, + post_type="local", + post_id=target.id, + interaction_type="boost", + activity_id=body.get("id"), + ) + session.add(interaction) + + notif = APNotification( + actor_profile_id=target.actor_profile_id, + notification_type="boost", + from_remote_actor_id=ra.id if ra else None, + target_activity_id=target.id, + ) + session.add(notif) + log.info("Announce from %s on activity %s", from_actor_url, object_id) + + +async def dispatch_inbox_activity( + session: AsyncSession, + actor_row: ActorProfile, + body: dict, + from_actor_url: str, + domain: str, + app_domain: str | None = None, +) -> None: + """Route an inbox activity to the correct handler.""" + activity_type = body.get("type", "") + + if activity_type == "Follow": + await handle_follow(session, actor_row, body, from_actor_url, domain, app_domain=app_domain) + elif activity_type == "Undo": + await handle_undo(session, actor_row, body, from_actor_url, app_domain=app_domain) + elif activity_type == "Accept": + await handle_accept(session, actor_row, body, from_actor_url) + elif activity_type == "Create": + await handle_create(session, actor_row, body, from_actor_url, domain) + elif activity_type == "Update": + await handle_update(session, actor_row, body, from_actor_url) + elif activity_type == "Delete": + await handle_delete(session, actor_row, body, from_actor_url) + elif activity_type == "Like": + await handle_like(session, actor_row, body, from_actor_url) + elif activity_type == "Announce": + await handle_announce(session, actor_row, body, from_actor_url) diff --git a/infrastructure/factory.py b/infrastructure/factory.py index 6635685..9692ca8 100644 --- a/infrastructure/factory.py +++ b/infrastructure/factory.py @@ -108,6 +108,12 @@ def create_base_app( from shared.infrastructure.oauth import create_oauth_blueprint app.register_blueprint(create_oauth_blueprint(name)) + # Auto-register ActivityPub blueprint for AP-enabled apps + from shared.infrastructure.activitypub import AP_APPS + if name in AP_APPS: + from shared.infrastructure.activitypub import create_activitypub_blueprint + app.register_blueprint(create_activitypub_blueprint(name)) + # --- device id (all apps, including account) --- _did_cookie = f"{name}_did" diff --git a/models/federation.py b/models/federation.py index a6148b9..83ba84b 100644 --- a/models/federation.py +++ b/models/federation.py @@ -127,7 +127,11 @@ class APActivity(Base): class APFollower(Base): - """A remote follower of a local actor.""" + """A remote follower of a local actor. + + ``app_domain`` scopes the follow to a specific app (e.g. "blog"). + NULL means the follower subscribes to the aggregate (all activities). + """ __tablename__ = "ap_followers" id: Mapped[int] = mapped_column(Integer, primary_key=True, autoincrement=True) @@ -138,6 +142,7 @@ class APFollower(Base): follower_inbox: Mapped[str] = mapped_column(String(512), nullable=False) follower_actor_url: Mapped[str] = mapped_column(String(512), nullable=False) follower_public_key: Mapped[str | None] = mapped_column(Text, nullable=True) + app_domain: Mapped[str | None] = mapped_column(String(64), nullable=True) created_at: Mapped[datetime] = mapped_column( DateTime(timezone=True), nullable=False, server_default=func.now(), ) @@ -146,8 +151,12 @@ class APFollower(Base): actor_profile = relationship("ActorProfile", back_populates="followers") __table_args__ = ( - UniqueConstraint("actor_profile_id", "follower_acct", name="uq_follower_acct"), + UniqueConstraint( + "actor_profile_id", "follower_acct", "app_domain", + name="uq_follower_acct_app", + ), Index("ix_ap_follower_actor", "actor_profile_id"), + Index("ix_ap_follower_app_domain", "actor_profile_id", "app_domain"), ) def __repr__(self) -> str: diff --git a/services/federation_impl.py b/services/federation_impl.py index 0937908..cbd2c54 100644 --- a/services/federation_impl.py +++ b/services/federation_impl.py @@ -75,6 +75,7 @@ def _follower_to_dto(f: APFollower) -> APFollowerDTO: follower_inbox=f.follower_inbox, follower_actor_url=f.follower_actor_url, created_at=f.created_at, + app_domain=f.app_domain, ) @@ -252,6 +253,7 @@ class SqlFederationService: async def get_outbox( self, session: AsyncSession, username: str, page: int = 1, per_page: int = 20, + origin_app: str | None = None, ) -> tuple[list[APActivityDTO], int]: actor = ( await session.execute( @@ -261,22 +263,23 @@ class SqlFederationService: if actor is None: return [], 0 + filters = [ + APActivity.actor_profile_id == actor.id, + APActivity.is_local == True, # noqa: E712 + ] + if origin_app is not None: + filters.append(APActivity.origin_app == origin_app) + total = ( await session.execute( - select(func.count(APActivity.id)).where( - APActivity.actor_profile_id == actor.id, - APActivity.is_local == True, # noqa: E712 - ) + select(func.count(APActivity.id)).where(*filters) ) ).scalar() or 0 offset = (page - 1) * per_page result = await session.execute( select(APActivity) - .where( - APActivity.actor_profile_id == actor.id, - APActivity.is_local == True, # noqa: E712 - ) + .where(*filters) .order_by(APActivity.published.desc()) .limit(per_page) .offset(offset) @@ -315,6 +318,7 @@ class SqlFederationService: async def get_followers( self, session: AsyncSession, username: str, + app_domain: str | None = None, ) -> list[APFollowerDTO]: actor = ( await session.execute( @@ -324,15 +328,18 @@ class SqlFederationService: if actor is None: return [] - result = await session.execute( - select(APFollower).where(APFollower.actor_profile_id == actor.id) - ) + q = select(APFollower).where(APFollower.actor_profile_id == actor.id) + if app_domain is not None: + q = q.where(APFollower.app_domain == app_domain) + + result = await session.execute(q) return [_follower_to_dto(f) for f in result.scalars().all()] async def add_follower( self, session: AsyncSession, username: str, follower_acct: str, follower_inbox: str, follower_actor_url: str, follower_public_key: str | None = None, + app_domain: str | None = None, ) -> APFollowerDTO: actor = ( await session.execute( @@ -342,15 +349,17 @@ class SqlFederationService: if actor is None: raise ValueError(f"Actor not found: {username}") - # Upsert: update if already following, insert if new - existing = ( - await session.execute( - select(APFollower).where( - APFollower.actor_profile_id == actor.id, - APFollower.follower_acct == follower_acct, - ) - ) - ).scalar_one_or_none() + # Upsert: update if already following this (actor, acct, app_domain) + q = select(APFollower).where( + APFollower.actor_profile_id == actor.id, + APFollower.follower_acct == follower_acct, + ) + if app_domain is not None: + q = q.where(APFollower.app_domain == app_domain) + else: + q = q.where(APFollower.app_domain.is_(None)) + + existing = (await session.execute(q)).scalar_one_or_none() if existing: existing.follower_inbox = follower_inbox @@ -365,6 +374,7 @@ class SqlFederationService: follower_inbox=follower_inbox, follower_actor_url=follower_actor_url, follower_public_key=follower_public_key, + app_domain=app_domain, ) session.add(follower) await session.flush() @@ -372,6 +382,7 @@ class SqlFederationService: async def remove_follower( self, session: AsyncSession, username: str, follower_acct: str, + app_domain: str | None = None, ) -> bool: actor = ( await session.execute( @@ -381,12 +392,16 @@ class SqlFederationService: if actor is None: return False - result = await session.execute( - delete(APFollower).where( - APFollower.actor_profile_id == actor.id, - APFollower.follower_acct == follower_acct, - ) - ) + filters = [ + APFollower.actor_profile_id == actor.id, + APFollower.follower_acct == follower_acct, + ] + if app_domain is not None: + filters.append(APFollower.app_domain == app_domain) + else: + filters.append(APFollower.app_domain.is_(None)) + + result = await session.execute(delete(APFollower).where(*filters)) return result.rowcount > 0 async def get_followers_paginated( diff --git a/services/stubs.py b/services/stubs.py index a46d5ab..5074954 100644 --- a/services/stubs.py +++ b/services/stubs.py @@ -221,7 +221,7 @@ class StubFederationService: async def get_activity(self, session, activity_id): return None - async def get_outbox(self, session, username, page=1, per_page=20): + async def get_outbox(self, session, username, page=1, per_page=20, origin_app=None): return [], 0 async def get_activity_for_source(self, session, source_type, source_id): @@ -230,14 +230,15 @@ class StubFederationService: async def count_activities_for_source(self, session, source_type, source_id, *, activity_type): return 0 - async def get_followers(self, session, username): + async def get_followers(self, session, username, app_domain=None): return [] async def add_follower(self, session, username, follower_acct, follower_inbox, - follower_actor_url, follower_public_key=None): + follower_actor_url, follower_public_key=None, + app_domain=None): raise RuntimeError("FederationService not available") - async def remove_follower(self, session, username, follower_acct): + async def remove_follower(self, session, username, follower_acct, app_domain=None): return False async def get_or_fetch_remote_actor(self, session, actor_url):