diff --git a/alembic/versions/s9q7n3o5p6_add_ap_delivery_log_table.py b/alembic/versions/s9q7n3o5p6_add_ap_delivery_log_table.py new file mode 100644 index 0000000..0635431 --- /dev/null +++ b/alembic/versions/s9q7n3o5p6_add_ap_delivery_log_table.py @@ -0,0 +1,30 @@ +"""Add ap_delivery_log table for idempotent federation delivery + +Revision ID: s9q7n3o5p6 +Revises: r8p6m2n4o5 +""" +from alembic import op +import sqlalchemy as sa + +revision = "s9q7n3o5p6" +down_revision = "r8p6m2n4o5" +branch_labels = None +depends_on = None + + +def upgrade(): + op.create_table( + "ap_delivery_log", + sa.Column("id", sa.Integer, primary_key=True, autoincrement=True), + sa.Column("activity_id", sa.Integer, sa.ForeignKey("ap_activities.id", ondelete="CASCADE"), nullable=False), + sa.Column("inbox_url", sa.String(512), nullable=False), + sa.Column("status_code", sa.Integer, nullable=True), + sa.Column("delivered_at", sa.DateTime(timezone=True), nullable=False, server_default=sa.func.now()), + sa.UniqueConstraint("activity_id", "inbox_url", name="uq_delivery_activity_inbox"), + ) + op.create_index("ix_ap_delivery_activity", "ap_delivery_log", ["activity_id"]) + + +def downgrade(): + op.drop_index("ix_ap_delivery_activity", table_name="ap_delivery_log") + op.drop_table("ap_delivery_log") diff --git a/events/handlers/ap_delivery_handler.py b/events/handlers/ap_delivery_handler.py index e9b07da..a0aea93 100644 --- a/events/handlers/ap_delivery_handler.py +++ b/events/handlers/ap_delivery_handler.py @@ -2,6 +2,9 @@ Registered as a wildcard handler — fires for every activity. Skips non-public activities and those without an actor profile. + +Idempotent: successful deliveries are recorded in ap_delivery_log. +On retry (at-least-once reaper), already-delivered inboxes are skipped. """ from __future__ import annotations @@ -12,7 +15,7 @@ from sqlalchemy import select from sqlalchemy.ext.asyncio import AsyncSession from shared.events.bus import register_activity_handler -from shared.models.federation import ActorProfile, APActivity, APFollower +from shared.models.federation import ActorProfile, APActivity, APFollower, APDeliveryLog from shared.services.registry import services log = logging.getLogger(__name__) @@ -67,8 +70,8 @@ async def _deliver_to_inbox( body: dict, actor: ActorProfile, domain: str, -) -> bool: - """POST signed activity to a single inbox. Returns True on success.""" +) -> int | None: + """POST signed activity to a single inbox. Returns status code or None on error.""" from shared.utils.http_signatures import sign_request from urllib.parse import urlparse import json @@ -96,13 +99,12 @@ async def _deliver_to_inbox( ) if resp.status_code < 300: log.info("Delivered to %s → %d", inbox_url, resp.status_code) - return True else: log.warning("Delivery to %s → %d: %s", inbox_url, resp.status_code, resp.text[:200]) - return False + return resp.status_code except Exception: log.exception("Delivery failed for %s", inbox_url) - return False + return None async def on_any_activity(activity: APActivity, session: AsyncSession) -> None: @@ -140,12 +142,34 @@ async def on_any_activity(activity: APActivity, session: AsyncSession) -> None: log.debug("No followers to deliver to for %s", activity.activity_id) return + # Deduplicate inboxes + all_inboxes = {f.follower_inbox for f in followers if f.follower_inbox} + + # Check delivery log — skip inboxes we already delivered to (idempotency) + existing = ( + await session.execute( + select(APDeliveryLog.inbox_url).where( + APDeliveryLog.activity_id == activity.id, + APDeliveryLog.status_code < 300, + ) + ) + ).scalars().all() + already_delivered = set(existing) + + inboxes = all_inboxes - already_delivered + if not inboxes: + log.info("All %d inbox(es) already delivered for %s", len(all_inboxes), activity.activity_id) + return + + if already_delivered: + log.info( + "Skipping %d already-delivered inbox(es), delivering to %d remaining", + len(already_delivered), len(inboxes), + ) + # Build activity JSON activity_json = _build_activity_json(activity, actor, domain) - # Deduplicate inboxes - inboxes = {f.follower_inbox for f in followers if f.follower_inbox} - log.info( "Delivering %s to %d inbox(es) for @%s", activity.activity_type, len(inboxes), actor.preferred_username, @@ -153,7 +177,17 @@ async def on_any_activity(activity: APActivity, session: AsyncSession) -> None: async with httpx.AsyncClient() as client: for inbox_url in inboxes: - await _deliver_to_inbox(client, inbox_url, activity_json, actor, domain) + status_code = await _deliver_to_inbox( + client, inbox_url, activity_json, actor, domain + ) + # Log successful deliveries for idempotency + if status_code is not None and status_code < 300: + session.add(APDeliveryLog( + activity_id=activity.id, + inbox_url=inbox_url, + status_code=status_code, + )) + await session.flush() # Wildcard: fires for every activity diff --git a/events/processor.py b/events/processor.py index 660ffb9..935309b 100644 --- a/events/processor.py +++ b/events/processor.py @@ -15,10 +15,10 @@ from __future__ import annotations import asyncio import logging import traceback -from datetime import datetime, timezone +from datetime import datetime, timedelta, timezone import asyncpg -from sqlalchemy import select +from sqlalchemy import select, update from sqlalchemy.ext.asyncio import AsyncSession from shared.db.session import get_session, DATABASE_URL @@ -37,15 +37,18 @@ class EventProcessor: app_name: str | None = None, poll_interval: float = 2.0, batch_size: int = 10, + stuck_timeout: float = 300.0, ): self._app_name = app_name self._poll_interval = poll_interval self._batch_size = batch_size + self._stuck_timeout = stuck_timeout # seconds before "processing" → "pending" self._task: asyncio.Task | None = None self._listen_task: asyncio.Task | None = None self._listen_conn: asyncpg.Connection | None = None self._wake = asyncio.Event() self._running = False + self._reap_counter = 0 # ------------------------------------------------------------------ # Lifecycle @@ -119,6 +122,12 @@ class EventProcessor: async def _poll_loop(self) -> None: while self._running: try: + # Periodically recover stuck activities (~every 30 cycles) + self._reap_counter += 1 + if self._reap_counter >= 30: + self._reap_counter = 0 + await self._recover_stuck() + # Clear before processing so any NOTIFY that arrives during # _process_batch sets the event and we loop immediately. self._wake.clear() @@ -137,6 +146,37 @@ class EventProcessor: traceback.print_exc() await asyncio.sleep(self._poll_interval) + async def _recover_stuck(self) -> None: + """Reset activities stuck in 'processing' back to 'pending'. + + This handles the case where a process crashed mid-handler. + Combined with idempotent handlers, this gives at-least-once delivery. + """ + cutoff = datetime.now(timezone.utc) - timedelta(seconds=self._stuck_timeout) + try: + async with get_session() as session: + filters = [ + APActivity.process_state == "processing", + APActivity.created_at < cutoff, + ] + if self._app_name: + filters.append(APActivity.origin_app == self._app_name) + result = await session.execute( + update(APActivity) + .where(*filters) + .values(process_state="pending") + .returning(APActivity.id) + ) + recovered = result.scalars().all() + await session.commit() + if recovered: + log.warning( + "Recovered %d stuck activities: %s", + len(recovered), recovered, + ) + except Exception: + log.exception("Failed to recover stuck activities") + async def _process_batch(self) -> int: """Fetch and process a batch of pending activities. Returns count processed.""" processed = 0 diff --git a/models/federation.py b/models/federation.py index 4b3f2a4..a6148b9 100644 --- a/models/federation.py +++ b/models/federation.py @@ -427,3 +427,27 @@ class APNotification(Base): Index("ix_ap_notification_read", "actor_profile_id", "read"), Index("ix_ap_notification_created", "created_at"), ) + + +class APDeliveryLog(Base): + """Tracks successful deliveries of activities to remote inboxes. + + Used for idempotency: the delivery handler skips inboxes that already + have a success row, so retries after a crash never send duplicates. + """ + __tablename__ = "ap_delivery_log" + + id: Mapped[int] = mapped_column(Integer, primary_key=True, autoincrement=True) + activity_id: Mapped[int] = mapped_column( + Integer, ForeignKey("ap_activities.id", ondelete="CASCADE"), nullable=False, + ) + inbox_url: Mapped[str] = mapped_column(String(512), nullable=False) + status_code: Mapped[int | None] = mapped_column(Integer, nullable=True) + delivered_at: Mapped[datetime] = mapped_column( + DateTime(timezone=True), nullable=False, server_default=func.now(), + ) + + __table_args__ = ( + UniqueConstraint("activity_id", "inbox_url", name="uq_delivery_activity_inbox"), + Index("ix_ap_delivery_activity", "activity_id"), + )