Fix per-app AP delivery, NULL uniqueness, and reverse discovery

- Delivery handler now signs/delivers using the per-app domain that
  matches the follower's subscription (not always federation domain)
- app_domain is NOT NULL with default 'federation' (sentinel replaces
  NULL to avoid uniqueness constraint edge case)
- Aggregate actor advertises per-app actors via alsoKnownAs
- Migration backfills existing NULL rows to 'federation'

Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
This commit is contained in:
giles
2026-02-23 19:25:24 +00:00
parent f2262f702b
commit 1bb19c96ed
9 changed files with 117 additions and 80 deletions

View File

@@ -13,10 +13,18 @@ depends_on = None
def upgrade(): def upgrade():
# Add column as nullable first so we can backfill
op.add_column( op.add_column(
"ap_followers", "ap_followers",
sa.Column("app_domain", sa.String(64), nullable=True), sa.Column("app_domain", sa.String(64), nullable=True),
) )
# Backfill existing rows: all current followers are aggregate
op.execute("UPDATE ap_followers SET app_domain = 'federation' WHERE app_domain IS NULL")
# Now make it NOT NULL with a default
op.alter_column(
"ap_followers", "app_domain",
nullable=False, server_default="federation",
)
# Replace old unique constraint with one that includes app_domain # Replace old unique constraint with one that includes app_domain
op.drop_constraint("uq_follower_acct", "ap_followers", type_="unique") op.drop_constraint("uq_follower_acct", "ap_followers", type_="unique")
op.create_unique_constraint( op.create_unique_constraint(
@@ -39,4 +47,5 @@ def downgrade():
"ap_followers", "ap_followers",
["actor_profile_id", "follower_acct"], ["actor_profile_id", "follower_acct"],
) )
op.alter_column("ap_followers", "app_domain", nullable=True, server_default=None)
op.drop_column("ap_followers", "app_domain") op.drop_column("ap_followers", "app_domain")

View File

@@ -176,7 +176,7 @@ class APFollowerDTO:
follower_inbox: str follower_inbox: str
follower_actor_url: str follower_actor_url: str
created_at: datetime | None = None created_at: datetime | None = None
app_domain: str | None = None app_domain: str = "federation"
@dataclass(frozen=True, slots=True) @dataclass(frozen=True, slots=True)

View File

@@ -249,12 +249,12 @@ class FederationService(Protocol):
self, session: AsyncSession, username: str, self, session: AsyncSession, username: str,
follower_acct: str, follower_inbox: str, follower_actor_url: str, follower_acct: str, follower_inbox: str, follower_actor_url: str,
follower_public_key: str | None = None, follower_public_key: str | None = None,
app_domain: str | None = None, app_domain: str = "federation",
) -> APFollowerDTO: ... ) -> APFollowerDTO: ...
async def remove_follower( async def remove_follower(
self, session: AsyncSession, username: str, follower_acct: str, self, session: AsyncSession, username: str, follower_acct: str,
app_domain: str | None = None, app_domain: str = "federation",
) -> bool: ... ) -> bool: ...
# -- Remote actors -------------------------------------------------------- # -- Remote actors --------------------------------------------------------

View File

@@ -3,15 +3,22 @@
Registered as a wildcard handler — fires for every activity. Skips Registered as a wildcard handler — fires for every activity. Skips
non-public activities and those without an actor profile. non-public activities and those without an actor profile.
Per-app delivery: activities are delivered using the domain that matches
the follower's subscription. A follower of ``@alice@blog.rose-ash.com``
receives activities with ``actor: https://blog.rose-ash.com/users/alice``
and signatures using that domain's key_id. Aggregate followers
(``app_domain='federation'``) receive the federation domain identity.
Idempotent: successful deliveries are recorded in ap_delivery_log. Idempotent: successful deliveries are recorded in ap_delivery_log.
On retry (at-least-once reaper), already-delivered inboxes are skipped. On retry (at-least-once reaper), already-delivered inboxes are skipped.
""" """
from __future__ import annotations from __future__ import annotations
import logging import logging
from collections import defaultdict
import httpx import httpx
from sqlalchemy import select from sqlalchemy import select, or_
from sqlalchemy.ext.asyncio import AsyncSession from sqlalchemy.ext.asyncio import AsyncSession
from shared.events.bus import register_activity_handler from shared.events.bus import register_activity_handler
@@ -24,6 +31,12 @@ AP_CONTENT_TYPE = "application/activity+json"
DELIVERY_TIMEOUT = 15 # seconds per request DELIVERY_TIMEOUT = 15 # seconds per request
def _domain_for_app(app_name: str) -> str:
"""Resolve the public AP domain for an app name."""
from shared.infrastructure.activitypub import _ap_domain
return _ap_domain(app_name)
def _build_activity_json(activity: APActivity, actor: ActorProfile, domain: str) -> dict: def _build_activity_json(activity: APActivity, actor: ActorProfile, domain: str) -> dict:
"""Build the full AP activity JSON-LD for delivery.""" """Build the full AP activity JSON-LD for delivery."""
username = actor.preferred_username username = actor.preferred_username
@@ -108,8 +121,7 @@ async def _deliver_to_inbox(
async def on_any_activity(activity: APActivity, session: AsyncSession) -> None: async def on_any_activity(activity: APActivity, session: AsyncSession) -> None:
"""Deliver a public activity to all followers of its actor.""" """Deliver a public activity to all matching followers of its actor."""
import os
# Only deliver public activities that have an actor profile # Only deliver public activities that have an actor profile
if activity.visibility != "public": if activity.visibility != "public":
@@ -119,8 +131,6 @@ async def on_any_activity(activity: APActivity, session: AsyncSession) -> None:
if not services.has("federation"): if not services.has("federation"):
return return
domain = os.getenv("AP_DOMAIN", "federation.rose-ash.com")
# Load actor with private key # Load actor with private key
actor = ( actor = (
await session.execute( await session.execute(
@@ -131,26 +141,19 @@ async def on_any_activity(activity: APActivity, session: AsyncSession) -> None:
log.warning("Actor not found or missing key for activity %s", activity.activity_id) log.warning("Actor not found or missing key for activity %s", activity.activity_id)
return return
# Load followers: aggregate (app_domain IS NULL) always get everything. # Load matching followers.
# If the activity has an origin_app, also include app-specific followers. # Aggregate followers (app_domain='federation') always get everything.
from sqlalchemy import or_ # Per-app followers only get activities from their app.
follower_filters = [
APFollower.actor_profile_id == actor.id,
]
origin_app = activity.origin_app origin_app = activity.origin_app
follower_filters = [APFollower.actor_profile_id == actor.id]
if origin_app and origin_app != "federation": if origin_app and origin_app != "federation":
# Aggregate followers (NULL) + followers of this specific app
follower_filters.append( follower_filters.append(
or_( or_(
APFollower.app_domain.is_(None), APFollower.app_domain == "federation",
APFollower.app_domain == origin_app, APFollower.app_domain == origin_app,
) )
) )
else:
# Federation / no origin_app: deliver to all followers
pass
followers = ( followers = (
await session.execute( await session.execute(
@@ -162,9 +165,6 @@ async def on_any_activity(activity: APActivity, session: AsyncSession) -> None:
log.debug("No followers to deliver to for %s", activity.activity_id) log.debug("No followers to deliver to for %s", activity.activity_id)
return return
# Deduplicate inboxes (same remote actor may follow both aggregate + app)
all_inboxes = {f.follower_inbox for f in followers if f.follower_inbox}
# Check delivery log — skip inboxes we already delivered to (idempotency) # Check delivery log — skip inboxes we already delivered to (idempotency)
existing = ( existing = (
await session.execute( await session.execute(
@@ -176,38 +176,59 @@ async def on_any_activity(activity: APActivity, session: AsyncSession) -> None:
).scalars().all() ).scalars().all()
already_delivered = set(existing) already_delivered = set(existing)
inboxes = all_inboxes - already_delivered # Group followers by app_domain so we deliver with the correct
if not inboxes: # actor URL and signing domain for each subscriber.
log.info("All %d inbox(es) already delivered for %s", len(all_inboxes), activity.activity_id) # If the same inbox appears under multiple app_domains, prefer
# the per-app domain (it's what the follower subscribed to).
inbox_to_domain: dict[str, str] = {}
for f in followers:
if not f.follower_inbox:
continue
if f.follower_inbox in already_delivered:
continue
app_dom = f.app_domain or "federation"
# Per-app domain wins over aggregate if both exist
if f.follower_inbox not in inbox_to_domain or app_dom != "federation":
inbox_to_domain[f.follower_inbox] = app_dom
if not inbox_to_domain:
if already_delivered:
log.info("All inbox(es) already delivered for %s", activity.activity_id)
return return
if already_delivered: if already_delivered:
log.info( log.info(
"Skipping %d already-delivered inbox(es), delivering to %d remaining", "Skipping %d already-delivered inbox(es), delivering to %d remaining",
len(already_delivered), len(inboxes), len(already_delivered), len(inbox_to_domain),
) )
# Build activity JSON # Group by domain to reuse activity JSON per domain
activity_json = _build_activity_json(activity, actor, domain) domain_inboxes: dict[str, list[str]] = defaultdict(list)
for inbox_url, app_dom in inbox_to_domain.items():
domain_inboxes[app_dom].append(inbox_url)
log.info( log.info(
"Delivering %s to %d inbox(es) for @%s", "Delivering %s to %d inbox(es) for @%s across %d domain(s)",
activity.activity_type, len(inboxes), actor.preferred_username, activity.activity_type, len(inbox_to_domain),
actor.preferred_username, len(domain_inboxes),
) )
async with httpx.AsyncClient() as client: async with httpx.AsyncClient() as client:
for inbox_url in inboxes: for app_dom, inboxes in domain_inboxes.items():
status_code = await _deliver_to_inbox( domain = _domain_for_app(app_dom)
client, inbox_url, activity_json, actor, domain activity_json = _build_activity_json(activity, actor, domain)
)
# Log successful deliveries for idempotency for inbox_url in inboxes:
if status_code is not None and status_code < 300: status_code = await _deliver_to_inbox(
session.add(APDeliveryLog( client, inbox_url, activity_json, actor, domain
activity_id=activity.id, )
inbox_url=inbox_url, if status_code is not None and status_code < 300:
status_code=status_code, session.add(APDeliveryLog(
)) activity_id=activity.id,
await session.flush() inbox_url=inbox_url,
status_code=status_code,
))
await session.flush()
# Wildcard: fires for every activity # Wildcard: fires for every activity

View File

@@ -64,8 +64,8 @@ def create_activitypub_blueprint(app_name: str) -> Blueprint:
domain = _ap_domain(app_name) domain = _ap_domain(app_name)
fed_domain = _federation_domain() fed_domain = _federation_domain()
aggregate = _is_aggregate(app_name) aggregate = _is_aggregate(app_name)
# For per-app follows, store app_domain; for federation aggregate, NULL # For per-app follows, store app_domain; for federation, "federation"
follower_app_domain: str | None = None if aggregate else app_name follower_app_domain: str = app_name
# For per-app outboxes, filter by origin_app; for federation, show all # For per-app outboxes, filter by origin_app; for federation, show all
outbox_origin_app: str | None = None if aggregate else app_name outbox_origin_app: str | None = None if aggregate else app_name
@@ -201,8 +201,16 @@ def create_activitypub_blueprint(app_name: str) -> Blueprint:
"url": actor_url, "url": actor_url,
} }
# Per-app actors link back to the aggregate federation actor if aggregate:
if not aggregate and domain != fed_domain: # Aggregate actor advertises all per-app actors
also_known = [
f"https://{_ap_domain(a)}/users/{username}"
for a in AP_APPS if a != "federation"
]
if also_known:
actor_json["alsoKnownAs"] = also_known
else:
# Per-app actors link back to the aggregate federation actor
actor_json["alsoKnownAs"] = [ actor_json["alsoKnownAs"] = [
f"https://{fed_domain}/users/{username}", f"https://{fed_domain}/users/{username}",
] ]

View File

@@ -144,7 +144,7 @@ async def handle_follow(
body: dict, body: dict,
from_actor_url: str, from_actor_url: str,
domain: str, domain: str,
app_domain: str | None = None, app_domain: str = "federation",
) -> None: ) -> None:
"""Process a Follow activity: add follower, send Accept, backfill.""" """Process a Follow activity: add follower, send Accept, backfill."""
remote_actor = await fetch_remote_actor(from_actor_url) remote_actor = await fetch_remote_actor(from_actor_url)
@@ -204,8 +204,8 @@ async def handle_follow(
await send_accept(actor_row, body, follower_inbox, domain) await send_accept(actor_row, body, follower_inbox, domain)
# Backfill: deliver recent posts (filtered by origin_app for per-app follows) # Backfill: deliver recent posts (filtered by origin_app for per-app follows)
origin_app = app_domain if app_domain and app_domain != "federation" else None backfill_origin = app_domain if app_domain != "federation" else None
await backfill_follower(session, actor_row, follower_inbox, domain, origin_app=origin_app) await backfill_follower(session, actor_row, follower_inbox, domain, origin_app=backfill_origin)
async def handle_undo( async def handle_undo(
@@ -213,7 +213,7 @@ async def handle_undo(
actor_row: ActorProfile, actor_row: ActorProfile,
body: dict, body: dict,
from_actor_url: str, from_actor_url: str,
app_domain: str | None = None, app_domain: str = "federation",
) -> None: ) -> None:
"""Process an Undo activity (typically Undo Follow).""" """Process an Undo activity (typically Undo Follow)."""
inner = body.get("object") inner = body.get("object")
@@ -485,7 +485,7 @@ async def dispatch_inbox_activity(
body: dict, body: dict,
from_actor_url: str, from_actor_url: str,
domain: str, domain: str,
app_domain: str | None = None, app_domain: str = "federation",
) -> None: ) -> None:
"""Route an inbox activity to the correct handler.""" """Route an inbox activity to the correct handler."""
activity_type = body.get("type", "") activity_type = body.get("type", "")

View File

@@ -129,8 +129,9 @@ class APActivity(Base):
class APFollower(Base): class APFollower(Base):
"""A remote follower of a local actor. """A remote follower of a local actor.
``app_domain`` scopes the follow to a specific app (e.g. "blog"). ``app_domain`` scopes the follow to a specific app (e.g. "blog",
NULL means the follower subscribes to the aggregate (all activities). "market", "events"). "federation" means the aggregate — the
follower subscribes to all activities.
""" """
__tablename__ = "ap_followers" __tablename__ = "ap_followers"
@@ -142,7 +143,9 @@ class APFollower(Base):
follower_inbox: Mapped[str] = mapped_column(String(512), nullable=False) follower_inbox: Mapped[str] = mapped_column(String(512), nullable=False)
follower_actor_url: Mapped[str] = mapped_column(String(512), nullable=False) follower_actor_url: Mapped[str] = mapped_column(String(512), nullable=False)
follower_public_key: Mapped[str | None] = mapped_column(Text, nullable=True) follower_public_key: Mapped[str | None] = mapped_column(Text, nullable=True)
app_domain: Mapped[str | None] = mapped_column(String(64), nullable=True) app_domain: Mapped[str] = mapped_column(
String(64), nullable=False, default="federation", server_default="federation",
)
created_at: Mapped[datetime] = mapped_column( created_at: Mapped[datetime] = mapped_column(
DateTime(timezone=True), nullable=False, server_default=func.now(), DateTime(timezone=True), nullable=False, server_default=func.now(),
) )

View File

@@ -339,7 +339,7 @@ class SqlFederationService:
self, session: AsyncSession, username: str, self, session: AsyncSession, username: str,
follower_acct: str, follower_inbox: str, follower_actor_url: str, follower_acct: str, follower_inbox: str, follower_actor_url: str,
follower_public_key: str | None = None, follower_public_key: str | None = None,
app_domain: str | None = None, app_domain: str = "federation",
) -> APFollowerDTO: ) -> APFollowerDTO:
actor = ( actor = (
await session.execute( await session.execute(
@@ -350,16 +350,15 @@ class SqlFederationService:
raise ValueError(f"Actor not found: {username}") raise ValueError(f"Actor not found: {username}")
# Upsert: update if already following this (actor, acct, app_domain) # Upsert: update if already following this (actor, acct, app_domain)
q = select(APFollower).where( existing = (
APFollower.actor_profile_id == actor.id, await session.execute(
APFollower.follower_acct == follower_acct, select(APFollower).where(
) APFollower.actor_profile_id == actor.id,
if app_domain is not None: APFollower.follower_acct == follower_acct,
q = q.where(APFollower.app_domain == app_domain) APFollower.app_domain == app_domain,
else: )
q = q.where(APFollower.app_domain.is_(None)) )
).scalar_one_or_none()
existing = (await session.execute(q)).scalar_one_or_none()
if existing: if existing:
existing.follower_inbox = follower_inbox existing.follower_inbox = follower_inbox
@@ -382,7 +381,7 @@ class SqlFederationService:
async def remove_follower( async def remove_follower(
self, session: AsyncSession, username: str, follower_acct: str, self, session: AsyncSession, username: str, follower_acct: str,
app_domain: str | None = None, app_domain: str = "federation",
) -> bool: ) -> bool:
actor = ( actor = (
await session.execute( await session.execute(
@@ -392,16 +391,13 @@ class SqlFederationService:
if actor is None: if actor is None:
return False return False
filters = [ result = await session.execute(
APFollower.actor_profile_id == actor.id, delete(APFollower).where(
APFollower.follower_acct == follower_acct, APFollower.actor_profile_id == actor.id,
] APFollower.follower_acct == follower_acct,
if app_domain is not None: APFollower.app_domain == app_domain,
filters.append(APFollower.app_domain == app_domain) )
else: )
filters.append(APFollower.app_domain.is_(None))
result = await session.execute(delete(APFollower).where(*filters))
return result.rowcount > 0 return result.rowcount > 0
async def get_followers_paginated( async def get_followers_paginated(

View File

@@ -235,10 +235,10 @@ class StubFederationService:
async def add_follower(self, session, username, follower_acct, follower_inbox, async def add_follower(self, session, username, follower_acct, follower_inbox,
follower_actor_url, follower_public_key=None, follower_actor_url, follower_public_key=None,
app_domain=None): app_domain="federation"):
raise RuntimeError("FederationService not available") raise RuntimeError("FederationService not available")
async def remove_follower(self, session, username, follower_acct, app_domain=None): async def remove_follower(self, session, username, follower_acct, app_domain="federation"):
return False return False
async def get_or_fetch_remote_actor(self, session, actor_url): async def get_or_fetch_remote_actor(self, session, actor_url):