diff --git a/alembic/versions/t0r8n4o6p7_add_app_domain_to_ap_followers.py b/alembic/versions/t0r8n4o6p7_add_app_domain_to_ap_followers.py index 1bda1b9..3f1c8d0 100644 --- a/alembic/versions/t0r8n4o6p7_add_app_domain_to_ap_followers.py +++ b/alembic/versions/t0r8n4o6p7_add_app_domain_to_ap_followers.py @@ -13,10 +13,18 @@ depends_on = None def upgrade(): + # Add column as nullable first so we can backfill op.add_column( "ap_followers", sa.Column("app_domain", sa.String(64), nullable=True), ) + # Backfill existing rows: all current followers are aggregate + op.execute("UPDATE ap_followers SET app_domain = 'federation' WHERE app_domain IS NULL") + # Now make it NOT NULL with a default + op.alter_column( + "ap_followers", "app_domain", + nullable=False, server_default="federation", + ) # Replace old unique constraint with one that includes app_domain op.drop_constraint("uq_follower_acct", "ap_followers", type_="unique") op.create_unique_constraint( @@ -39,4 +47,5 @@ def downgrade(): "ap_followers", ["actor_profile_id", "follower_acct"], ) + op.alter_column("ap_followers", "app_domain", nullable=True, server_default=None) op.drop_column("ap_followers", "app_domain") diff --git a/contracts/dtos.py b/contracts/dtos.py index b27f9d1..cd5c50d 100644 --- a/contracts/dtos.py +++ b/contracts/dtos.py @@ -176,7 +176,7 @@ class APFollowerDTO: follower_inbox: str follower_actor_url: str created_at: datetime | None = None - app_domain: str | None = None + app_domain: str = "federation" @dataclass(frozen=True, slots=True) diff --git a/contracts/protocols.py b/contracts/protocols.py index dfb32da..e806b8a 100644 --- a/contracts/protocols.py +++ b/contracts/protocols.py @@ -249,12 +249,12 @@ class FederationService(Protocol): self, session: AsyncSession, username: str, follower_acct: str, follower_inbox: str, follower_actor_url: str, follower_public_key: str | None = None, - app_domain: str | None = None, + app_domain: str = "federation", ) -> APFollowerDTO: ... async def remove_follower( self, session: AsyncSession, username: str, follower_acct: str, - app_domain: str | None = None, + app_domain: str = "federation", ) -> bool: ... # -- Remote actors -------------------------------------------------------- diff --git a/events/handlers/ap_delivery_handler.py b/events/handlers/ap_delivery_handler.py index 4b2a837..3c0bbe5 100644 --- a/events/handlers/ap_delivery_handler.py +++ b/events/handlers/ap_delivery_handler.py @@ -3,15 +3,22 @@ Registered as a wildcard handler — fires for every activity. Skips non-public activities and those without an actor profile. +Per-app delivery: activities are delivered using the domain that matches +the follower's subscription. A follower of ``@alice@blog.rose-ash.com`` +receives activities with ``actor: https://blog.rose-ash.com/users/alice`` +and signatures using that domain's key_id. Aggregate followers +(``app_domain='federation'``) receive the federation domain identity. + Idempotent: successful deliveries are recorded in ap_delivery_log. On retry (at-least-once reaper), already-delivered inboxes are skipped. """ from __future__ import annotations import logging +from collections import defaultdict import httpx -from sqlalchemy import select +from sqlalchemy import select, or_ from sqlalchemy.ext.asyncio import AsyncSession from shared.events.bus import register_activity_handler @@ -24,6 +31,12 @@ AP_CONTENT_TYPE = "application/activity+json" DELIVERY_TIMEOUT = 15 # seconds per request +def _domain_for_app(app_name: str) -> str: + """Resolve the public AP domain for an app name.""" + from shared.infrastructure.activitypub import _ap_domain + return _ap_domain(app_name) + + def _build_activity_json(activity: APActivity, actor: ActorProfile, domain: str) -> dict: """Build the full AP activity JSON-LD for delivery.""" username = actor.preferred_username @@ -108,8 +121,7 @@ async def _deliver_to_inbox( async def on_any_activity(activity: APActivity, session: AsyncSession) -> None: - """Deliver a public activity to all followers of its actor.""" - import os + """Deliver a public activity to all matching followers of its actor.""" # Only deliver public activities that have an actor profile if activity.visibility != "public": @@ -119,8 +131,6 @@ async def on_any_activity(activity: APActivity, session: AsyncSession) -> None: if not services.has("federation"): return - domain = os.getenv("AP_DOMAIN", "federation.rose-ash.com") - # Load actor with private key actor = ( await session.execute( @@ -131,26 +141,19 @@ async def on_any_activity(activity: APActivity, session: AsyncSession) -> None: log.warning("Actor not found or missing key for activity %s", activity.activity_id) return - # Load followers: aggregate (app_domain IS NULL) always get everything. - # If the activity has an origin_app, also include app-specific followers. - from sqlalchemy import or_ - - follower_filters = [ - APFollower.actor_profile_id == actor.id, - ] - + # Load matching followers. + # Aggregate followers (app_domain='federation') always get everything. + # Per-app followers only get activities from their app. origin_app = activity.origin_app + follower_filters = [APFollower.actor_profile_id == actor.id] + if origin_app and origin_app != "federation": - # Aggregate followers (NULL) + followers of this specific app follower_filters.append( or_( - APFollower.app_domain.is_(None), + APFollower.app_domain == "federation", APFollower.app_domain == origin_app, ) ) - else: - # Federation / no origin_app: deliver to all followers - pass followers = ( await session.execute( @@ -162,9 +165,6 @@ async def on_any_activity(activity: APActivity, session: AsyncSession) -> None: log.debug("No followers to deliver to for %s", activity.activity_id) return - # Deduplicate inboxes (same remote actor may follow both aggregate + app) - all_inboxes = {f.follower_inbox for f in followers if f.follower_inbox} - # Check delivery log — skip inboxes we already delivered to (idempotency) existing = ( await session.execute( @@ -176,38 +176,59 @@ async def on_any_activity(activity: APActivity, session: AsyncSession) -> None: ).scalars().all() already_delivered = set(existing) - inboxes = all_inboxes - already_delivered - if not inboxes: - log.info("All %d inbox(es) already delivered for %s", len(all_inboxes), activity.activity_id) + # Group followers by app_domain so we deliver with the correct + # actor URL and signing domain for each subscriber. + # If the same inbox appears under multiple app_domains, prefer + # the per-app domain (it's what the follower subscribed to). + inbox_to_domain: dict[str, str] = {} + for f in followers: + if not f.follower_inbox: + continue + if f.follower_inbox in already_delivered: + continue + app_dom = f.app_domain or "federation" + # Per-app domain wins over aggregate if both exist + if f.follower_inbox not in inbox_to_domain or app_dom != "federation": + inbox_to_domain[f.follower_inbox] = app_dom + + if not inbox_to_domain: + if already_delivered: + log.info("All inbox(es) already delivered for %s", activity.activity_id) return if already_delivered: log.info( "Skipping %d already-delivered inbox(es), delivering to %d remaining", - len(already_delivered), len(inboxes), + len(already_delivered), len(inbox_to_domain), ) - # Build activity JSON - activity_json = _build_activity_json(activity, actor, domain) + # Group by domain to reuse activity JSON per domain + domain_inboxes: dict[str, list[str]] = defaultdict(list) + for inbox_url, app_dom in inbox_to_domain.items(): + domain_inboxes[app_dom].append(inbox_url) log.info( - "Delivering %s to %d inbox(es) for @%s", - activity.activity_type, len(inboxes), actor.preferred_username, + "Delivering %s to %d inbox(es) for @%s across %d domain(s)", + activity.activity_type, len(inbox_to_domain), + actor.preferred_username, len(domain_inboxes), ) async with httpx.AsyncClient() as client: - for inbox_url in inboxes: - status_code = await _deliver_to_inbox( - client, inbox_url, activity_json, actor, domain - ) - # Log successful deliveries for idempotency - if status_code is not None and status_code < 300: - session.add(APDeliveryLog( - activity_id=activity.id, - inbox_url=inbox_url, - status_code=status_code, - )) - await session.flush() + for app_dom, inboxes in domain_inboxes.items(): + domain = _domain_for_app(app_dom) + activity_json = _build_activity_json(activity, actor, domain) + + for inbox_url in inboxes: + status_code = await _deliver_to_inbox( + client, inbox_url, activity_json, actor, domain + ) + if status_code is not None and status_code < 300: + session.add(APDeliveryLog( + activity_id=activity.id, + inbox_url=inbox_url, + status_code=status_code, + )) + await session.flush() # Wildcard: fires for every activity diff --git a/infrastructure/activitypub.py b/infrastructure/activitypub.py index 2fa2020..b7d6d7e 100644 --- a/infrastructure/activitypub.py +++ b/infrastructure/activitypub.py @@ -64,8 +64,8 @@ def create_activitypub_blueprint(app_name: str) -> Blueprint: domain = _ap_domain(app_name) fed_domain = _federation_domain() aggregate = _is_aggregate(app_name) - # For per-app follows, store app_domain; for federation aggregate, NULL - follower_app_domain: str | None = None if aggregate else app_name + # For per-app follows, store app_domain; for federation, "federation" + follower_app_domain: str = app_name # For per-app outboxes, filter by origin_app; for federation, show all outbox_origin_app: str | None = None if aggregate else app_name @@ -201,8 +201,16 @@ def create_activitypub_blueprint(app_name: str) -> Blueprint: "url": actor_url, } - # Per-app actors link back to the aggregate federation actor - if not aggregate and domain != fed_domain: + if aggregate: + # Aggregate actor advertises all per-app actors + also_known = [ + f"https://{_ap_domain(a)}/users/{username}" + for a in AP_APPS if a != "federation" + ] + if also_known: + actor_json["alsoKnownAs"] = also_known + else: + # Per-app actors link back to the aggregate federation actor actor_json["alsoKnownAs"] = [ f"https://{fed_domain}/users/{username}", ] diff --git a/infrastructure/ap_inbox_handlers.py b/infrastructure/ap_inbox_handlers.py index 0db49b1..49a9732 100644 --- a/infrastructure/ap_inbox_handlers.py +++ b/infrastructure/ap_inbox_handlers.py @@ -144,7 +144,7 @@ async def handle_follow( body: dict, from_actor_url: str, domain: str, - app_domain: str | None = None, + app_domain: str = "federation", ) -> None: """Process a Follow activity: add follower, send Accept, backfill.""" remote_actor = await fetch_remote_actor(from_actor_url) @@ -204,8 +204,8 @@ async def handle_follow( await send_accept(actor_row, body, follower_inbox, domain) # Backfill: deliver recent posts (filtered by origin_app for per-app follows) - origin_app = app_domain if app_domain and app_domain != "federation" else None - await backfill_follower(session, actor_row, follower_inbox, domain, origin_app=origin_app) + backfill_origin = app_domain if app_domain != "federation" else None + await backfill_follower(session, actor_row, follower_inbox, domain, origin_app=backfill_origin) async def handle_undo( @@ -213,7 +213,7 @@ async def handle_undo( actor_row: ActorProfile, body: dict, from_actor_url: str, - app_domain: str | None = None, + app_domain: str = "federation", ) -> None: """Process an Undo activity (typically Undo Follow).""" inner = body.get("object") @@ -485,7 +485,7 @@ async def dispatch_inbox_activity( body: dict, from_actor_url: str, domain: str, - app_domain: str | None = None, + app_domain: str = "federation", ) -> None: """Route an inbox activity to the correct handler.""" activity_type = body.get("type", "") diff --git a/models/federation.py b/models/federation.py index 83ba84b..b5b780b 100644 --- a/models/federation.py +++ b/models/federation.py @@ -129,8 +129,9 @@ class APActivity(Base): class APFollower(Base): """A remote follower of a local actor. - ``app_domain`` scopes the follow to a specific app (e.g. "blog"). - NULL means the follower subscribes to the aggregate (all activities). + ``app_domain`` scopes the follow to a specific app (e.g. "blog", + "market", "events"). "federation" means the aggregate — the + follower subscribes to all activities. """ __tablename__ = "ap_followers" @@ -142,7 +143,9 @@ class APFollower(Base): follower_inbox: Mapped[str] = mapped_column(String(512), nullable=False) follower_actor_url: Mapped[str] = mapped_column(String(512), nullable=False) follower_public_key: Mapped[str | None] = mapped_column(Text, nullable=True) - app_domain: Mapped[str | None] = mapped_column(String(64), nullable=True) + app_domain: Mapped[str] = mapped_column( + String(64), nullable=False, default="federation", server_default="federation", + ) created_at: Mapped[datetime] = mapped_column( DateTime(timezone=True), nullable=False, server_default=func.now(), ) diff --git a/services/federation_impl.py b/services/federation_impl.py index cbd2c54..fa33d7d 100644 --- a/services/federation_impl.py +++ b/services/federation_impl.py @@ -339,7 +339,7 @@ class SqlFederationService: self, session: AsyncSession, username: str, follower_acct: str, follower_inbox: str, follower_actor_url: str, follower_public_key: str | None = None, - app_domain: str | None = None, + app_domain: str = "federation", ) -> APFollowerDTO: actor = ( await session.execute( @@ -350,16 +350,15 @@ class SqlFederationService: raise ValueError(f"Actor not found: {username}") # Upsert: update if already following this (actor, acct, app_domain) - q = select(APFollower).where( - APFollower.actor_profile_id == actor.id, - APFollower.follower_acct == follower_acct, - ) - if app_domain is not None: - q = q.where(APFollower.app_domain == app_domain) - else: - q = q.where(APFollower.app_domain.is_(None)) - - existing = (await session.execute(q)).scalar_one_or_none() + existing = ( + await session.execute( + select(APFollower).where( + APFollower.actor_profile_id == actor.id, + APFollower.follower_acct == follower_acct, + APFollower.app_domain == app_domain, + ) + ) + ).scalar_one_or_none() if existing: existing.follower_inbox = follower_inbox @@ -382,7 +381,7 @@ class SqlFederationService: async def remove_follower( self, session: AsyncSession, username: str, follower_acct: str, - app_domain: str | None = None, + app_domain: str = "federation", ) -> bool: actor = ( await session.execute( @@ -392,16 +391,13 @@ class SqlFederationService: if actor is None: return False - filters = [ - APFollower.actor_profile_id == actor.id, - APFollower.follower_acct == follower_acct, - ] - if app_domain is not None: - filters.append(APFollower.app_domain == app_domain) - else: - filters.append(APFollower.app_domain.is_(None)) - - result = await session.execute(delete(APFollower).where(*filters)) + result = await session.execute( + delete(APFollower).where( + APFollower.actor_profile_id == actor.id, + APFollower.follower_acct == follower_acct, + APFollower.app_domain == app_domain, + ) + ) return result.rowcount > 0 async def get_followers_paginated( diff --git a/services/stubs.py b/services/stubs.py index 5074954..eb46cca 100644 --- a/services/stubs.py +++ b/services/stubs.py @@ -235,10 +235,10 @@ class StubFederationService: async def add_follower(self, session, username, follower_acct, follower_inbox, follower_actor_url, follower_public_key=None, - app_domain=None): + app_domain="federation"): raise RuntimeError("FederationService not available") - async def remove_follower(self, session, username, follower_acct, app_domain=None): + async def remove_follower(self, session, username, follower_acct, app_domain="federation"): return False async def get_or_fetch_remote_actor(self, session, actor_url):