diff --git a/alembic/versions/u1s9o5p7q8_add_app_domain_to_delivery_log.py b/alembic/versions/u1s9o5p7q8_add_app_domain_to_delivery_log.py new file mode 100644 index 0000000..1306c9f --- /dev/null +++ b/alembic/versions/u1s9o5p7q8_add_app_domain_to_delivery_log.py @@ -0,0 +1,33 @@ +"""Add app_domain to ap_delivery_log for per-domain idempotency + +Revision ID: u1s9o5p7q8 +Revises: t0r8n4o6p7 +""" +from alembic import op +import sqlalchemy as sa + +revision = "u1s9o5p7q8" +down_revision = "t0r8n4o6p7" + + +def upgrade() -> None: + op.add_column( + "ap_delivery_log", + sa.Column("app_domain", sa.String(128), nullable=False, server_default="federation"), + ) + op.drop_constraint("uq_delivery_activity_inbox", "ap_delivery_log", type_="unique") + op.create_unique_constraint( + "uq_delivery_activity_inbox_domain", + "ap_delivery_log", + ["activity_id", "inbox_url", "app_domain"], + ) + + +def downgrade() -> None: + op.drop_constraint("uq_delivery_activity_inbox_domain", "ap_delivery_log", type_="unique") + op.drop_column("ap_delivery_log", "app_domain") + op.create_unique_constraint( + "uq_delivery_activity_inbox", + "ap_delivery_log", + ["activity_id", "inbox_url"], + ) diff --git a/events/handlers/ap_delivery_handler.py b/events/handlers/ap_delivery_handler.py index 0ac9299..7a175bf 100644 --- a/events/handlers/ap_delivery_handler.py +++ b/events/handlers/ap_delivery_handler.py @@ -181,51 +181,49 @@ async def on_any_activity(activity: APActivity, session: AsyncSession) -> None: log.debug("No followers to deliver to for %s", activity.activity_id) return - # Check delivery log — skip inboxes we already delivered to (idempotency) + # Check delivery log — skip (inbox, domain) pairs already delivered (idempotency) existing = ( await session.execute( - select(APDeliveryLog.inbox_url).where( + select(APDeliveryLog.inbox_url, APDeliveryLog.app_domain).where( APDeliveryLog.activity_id == activity.id, APDeliveryLog.status_code < 300, ) ) - ).scalars().all() - already_delivered = set(existing) + ).all() + already_delivered: set[tuple[str, str]] = {(r[0], r[1]) for r in existing} - # Group followers by app_domain so we deliver with the correct - # actor URL and signing domain for each subscriber. - # If the same inbox appears under multiple app_domains, prefer - # the per-app domain (it's what the follower subscribed to). - inbox_to_domain: dict[str, str] = {} + # Collect all (inbox, app_domain) pairs to deliver to. + # Each follower subscription gets its own delivery with the correct + # actor identity, so followers of @user@blog and @user@federation + # both see posts on their respective actor profiles. + delivery_pairs: set[tuple[str, str]] = set() for f in followers: if not f.follower_inbox: continue - if f.follower_inbox in already_delivered: - continue app_dom = f.app_domain or "federation" - # Per-app domain wins over aggregate if both exist - if f.follower_inbox not in inbox_to_domain or app_dom != "federation": - inbox_to_domain[f.follower_inbox] = app_dom + pair = (f.follower_inbox, app_dom) + if pair not in already_delivered: + delivery_pairs.add(pair) - if not inbox_to_domain: + if not delivery_pairs: if already_delivered: - log.info("All inbox(es) already delivered for %s", activity.activity_id) + log.info("All deliveries already done for %s", activity.activity_id) return if already_delivered: log.info( - "Skipping %d already-delivered inbox(es), delivering to %d remaining", - len(already_delivered), len(inbox_to_domain), + "Skipping %d already-delivered, delivering to %d remaining", + len(already_delivered), len(delivery_pairs), ) # Group by domain to reuse activity JSON per domain domain_inboxes: dict[str, list[str]] = defaultdict(list) - for inbox_url, app_dom in inbox_to_domain.items(): + for inbox_url, app_dom in delivery_pairs: domain_inboxes[app_dom].append(inbox_url) log.info( - "Delivering %s to %d inbox(es) for @%s across %d domain(s)", - activity.activity_type, len(inbox_to_domain), + "Delivering %s to %d target(s) for @%s across %d domain(s)", + activity.activity_type, len(delivery_pairs), actor.preferred_username, len(domain_inboxes), ) @@ -242,6 +240,7 @@ async def on_any_activity(activity: APActivity, session: AsyncSession) -> None: session.add(APDeliveryLog( activity_id=activity.id, inbox_url=inbox_url, + app_domain=app_dom, status_code=status_code, )) await session.flush() diff --git a/models/federation.py b/models/federation.py index b5b780b..daef64a 100644 --- a/models/federation.py +++ b/models/federation.py @@ -454,12 +454,13 @@ class APDeliveryLog(Base): Integer, ForeignKey("ap_activities.id", ondelete="CASCADE"), nullable=False, ) inbox_url: Mapped[str] = mapped_column(String(512), nullable=False) + app_domain: Mapped[str] = mapped_column(String(128), nullable=False, server_default="federation") status_code: Mapped[int | None] = mapped_column(Integer, nullable=True) delivered_at: Mapped[datetime] = mapped_column( DateTime(timezone=True), nullable=False, server_default=func.now(), ) __table_args__ = ( - UniqueConstraint("activity_id", "inbox_url", name="uq_delivery_activity_inbox"), + UniqueConstraint("activity_id", "inbox_url", "app_domain", name="uq_delivery_activity_inbox_domain"), Index("ix_ap_delivery_activity", "activity_id"), )