Deliver per (inbox, domain) — federation actor gets all posts too
Add app_domain to APDeliveryLog so the same activity can be delivered to the same inbox under different actor identities (blog + federation).
This commit is contained in:
@@ -0,0 +1,33 @@
|
|||||||
|
"""Add app_domain to ap_delivery_log for per-domain idempotency
|
||||||
|
|
||||||
|
Revision ID: u1s9o5p7q8
|
||||||
|
Revises: t0r8n4o6p7
|
||||||
|
"""
|
||||||
|
from alembic import op
|
||||||
|
import sqlalchemy as sa
|
||||||
|
|
||||||
|
revision = "u1s9o5p7q8"
|
||||||
|
down_revision = "t0r8n4o6p7"
|
||||||
|
|
||||||
|
|
||||||
|
def upgrade() -> None:
|
||||||
|
op.add_column(
|
||||||
|
"ap_delivery_log",
|
||||||
|
sa.Column("app_domain", sa.String(128), nullable=False, server_default="federation"),
|
||||||
|
)
|
||||||
|
op.drop_constraint("uq_delivery_activity_inbox", "ap_delivery_log", type_="unique")
|
||||||
|
op.create_unique_constraint(
|
||||||
|
"uq_delivery_activity_inbox_domain",
|
||||||
|
"ap_delivery_log",
|
||||||
|
["activity_id", "inbox_url", "app_domain"],
|
||||||
|
)
|
||||||
|
|
||||||
|
|
||||||
|
def downgrade() -> None:
|
||||||
|
op.drop_constraint("uq_delivery_activity_inbox_domain", "ap_delivery_log", type_="unique")
|
||||||
|
op.drop_column("ap_delivery_log", "app_domain")
|
||||||
|
op.create_unique_constraint(
|
||||||
|
"uq_delivery_activity_inbox",
|
||||||
|
"ap_delivery_log",
|
||||||
|
["activity_id", "inbox_url"],
|
||||||
|
)
|
||||||
@@ -181,51 +181,49 @@ async def on_any_activity(activity: APActivity, session: AsyncSession) -> None:
|
|||||||
log.debug("No followers to deliver to for %s", activity.activity_id)
|
log.debug("No followers to deliver to for %s", activity.activity_id)
|
||||||
return
|
return
|
||||||
|
|
||||||
# Check delivery log — skip inboxes we already delivered to (idempotency)
|
# Check delivery log — skip (inbox, domain) pairs already delivered (idempotency)
|
||||||
existing = (
|
existing = (
|
||||||
await session.execute(
|
await session.execute(
|
||||||
select(APDeliveryLog.inbox_url).where(
|
select(APDeliveryLog.inbox_url, APDeliveryLog.app_domain).where(
|
||||||
APDeliveryLog.activity_id == activity.id,
|
APDeliveryLog.activity_id == activity.id,
|
||||||
APDeliveryLog.status_code < 300,
|
APDeliveryLog.status_code < 300,
|
||||||
)
|
)
|
||||||
)
|
)
|
||||||
).scalars().all()
|
).all()
|
||||||
already_delivered = set(existing)
|
already_delivered: set[tuple[str, str]] = {(r[0], r[1]) for r in existing}
|
||||||
|
|
||||||
# Group followers by app_domain so we deliver with the correct
|
# Collect all (inbox, app_domain) pairs to deliver to.
|
||||||
# actor URL and signing domain for each subscriber.
|
# Each follower subscription gets its own delivery with the correct
|
||||||
# If the same inbox appears under multiple app_domains, prefer
|
# actor identity, so followers of @user@blog and @user@federation
|
||||||
# the per-app domain (it's what the follower subscribed to).
|
# both see posts on their respective actor profiles.
|
||||||
inbox_to_domain: dict[str, str] = {}
|
delivery_pairs: set[tuple[str, str]] = set()
|
||||||
for f in followers:
|
for f in followers:
|
||||||
if not f.follower_inbox:
|
if not f.follower_inbox:
|
||||||
continue
|
continue
|
||||||
if f.follower_inbox in already_delivered:
|
|
||||||
continue
|
|
||||||
app_dom = f.app_domain or "federation"
|
app_dom = f.app_domain or "federation"
|
||||||
# Per-app domain wins over aggregate if both exist
|
pair = (f.follower_inbox, app_dom)
|
||||||
if f.follower_inbox not in inbox_to_domain or app_dom != "federation":
|
if pair not in already_delivered:
|
||||||
inbox_to_domain[f.follower_inbox] = app_dom
|
delivery_pairs.add(pair)
|
||||||
|
|
||||||
if not inbox_to_domain:
|
if not delivery_pairs:
|
||||||
if already_delivered:
|
if already_delivered:
|
||||||
log.info("All inbox(es) already delivered for %s", activity.activity_id)
|
log.info("All deliveries already done for %s", activity.activity_id)
|
||||||
return
|
return
|
||||||
|
|
||||||
if already_delivered:
|
if already_delivered:
|
||||||
log.info(
|
log.info(
|
||||||
"Skipping %d already-delivered inbox(es), delivering to %d remaining",
|
"Skipping %d already-delivered, delivering to %d remaining",
|
||||||
len(already_delivered), len(inbox_to_domain),
|
len(already_delivered), len(delivery_pairs),
|
||||||
)
|
)
|
||||||
|
|
||||||
# Group by domain to reuse activity JSON per domain
|
# Group by domain to reuse activity JSON per domain
|
||||||
domain_inboxes: dict[str, list[str]] = defaultdict(list)
|
domain_inboxes: dict[str, list[str]] = defaultdict(list)
|
||||||
for inbox_url, app_dom in inbox_to_domain.items():
|
for inbox_url, app_dom in delivery_pairs:
|
||||||
domain_inboxes[app_dom].append(inbox_url)
|
domain_inboxes[app_dom].append(inbox_url)
|
||||||
|
|
||||||
log.info(
|
log.info(
|
||||||
"Delivering %s to %d inbox(es) for @%s across %d domain(s)",
|
"Delivering %s to %d target(s) for @%s across %d domain(s)",
|
||||||
activity.activity_type, len(inbox_to_domain),
|
activity.activity_type, len(delivery_pairs),
|
||||||
actor.preferred_username, len(domain_inboxes),
|
actor.preferred_username, len(domain_inboxes),
|
||||||
)
|
)
|
||||||
|
|
||||||
@@ -242,6 +240,7 @@ async def on_any_activity(activity: APActivity, session: AsyncSession) -> None:
|
|||||||
session.add(APDeliveryLog(
|
session.add(APDeliveryLog(
|
||||||
activity_id=activity.id,
|
activity_id=activity.id,
|
||||||
inbox_url=inbox_url,
|
inbox_url=inbox_url,
|
||||||
|
app_domain=app_dom,
|
||||||
status_code=status_code,
|
status_code=status_code,
|
||||||
))
|
))
|
||||||
await session.flush()
|
await session.flush()
|
||||||
|
|||||||
@@ -454,12 +454,13 @@ class APDeliveryLog(Base):
|
|||||||
Integer, ForeignKey("ap_activities.id", ondelete="CASCADE"), nullable=False,
|
Integer, ForeignKey("ap_activities.id", ondelete="CASCADE"), nullable=False,
|
||||||
)
|
)
|
||||||
inbox_url: Mapped[str] = mapped_column(String(512), nullable=False)
|
inbox_url: Mapped[str] = mapped_column(String(512), nullable=False)
|
||||||
|
app_domain: Mapped[str] = mapped_column(String(128), nullable=False, server_default="federation")
|
||||||
status_code: Mapped[int | None] = mapped_column(Integer, nullable=True)
|
status_code: Mapped[int | None] = mapped_column(Integer, nullable=True)
|
||||||
delivered_at: Mapped[datetime] = mapped_column(
|
delivered_at: Mapped[datetime] = mapped_column(
|
||||||
DateTime(timezone=True), nullable=False, server_default=func.now(),
|
DateTime(timezone=True), nullable=False, server_default=func.now(),
|
||||||
)
|
)
|
||||||
|
|
||||||
__table_args__ = (
|
__table_args__ = (
|
||||||
UniqueConstraint("activity_id", "inbox_url", name="uq_delivery_activity_inbox"),
|
UniqueConstraint("activity_id", "inbox_url", "app_domain", name="uq_delivery_activity_inbox_domain"),
|
||||||
Index("ix_ap_delivery_activity", "activity_id"),
|
Index("ix_ap_delivery_activity", "activity_id"),
|
||||||
)
|
)
|
||||||
|
|||||||
Reference in New Issue
Block a user