Add at-least-once delivery + idempotent federation handler
- EventProcessor now recovers stuck "processing" activities back to "pending" after 5 minutes (handles process crashes) - New ap_delivery_log table records successful inbox deliveries - Federation delivery handler checks the log before sending, so retries skip already-delivered inboxes - Together these give at-least-once + idempotent semantics Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
This commit is contained in:
@@ -2,6 +2,9 @@
|
||||
|
||||
Registered as a wildcard handler — fires for every activity. Skips
|
||||
non-public activities and those without an actor profile.
|
||||
|
||||
Idempotent: successful deliveries are recorded in ap_delivery_log.
|
||||
On retry (at-least-once reaper), already-delivered inboxes are skipped.
|
||||
"""
|
||||
from __future__ import annotations
|
||||
|
||||
@@ -12,7 +15,7 @@ from sqlalchemy import select
|
||||
from sqlalchemy.ext.asyncio import AsyncSession
|
||||
|
||||
from shared.events.bus import register_activity_handler
|
||||
from shared.models.federation import ActorProfile, APActivity, APFollower
|
||||
from shared.models.federation import ActorProfile, APActivity, APFollower, APDeliveryLog
|
||||
from shared.services.registry import services
|
||||
|
||||
log = logging.getLogger(__name__)
|
||||
@@ -67,8 +70,8 @@ async def _deliver_to_inbox(
|
||||
body: dict,
|
||||
actor: ActorProfile,
|
||||
domain: str,
|
||||
) -> bool:
|
||||
"""POST signed activity to a single inbox. Returns True on success."""
|
||||
) -> int | None:
|
||||
"""POST signed activity to a single inbox. Returns status code or None on error."""
|
||||
from shared.utils.http_signatures import sign_request
|
||||
from urllib.parse import urlparse
|
||||
import json
|
||||
@@ -96,13 +99,12 @@ async def _deliver_to_inbox(
|
||||
)
|
||||
if resp.status_code < 300:
|
||||
log.info("Delivered to %s → %d", inbox_url, resp.status_code)
|
||||
return True
|
||||
else:
|
||||
log.warning("Delivery to %s → %d: %s", inbox_url, resp.status_code, resp.text[:200])
|
||||
return False
|
||||
return resp.status_code
|
||||
except Exception:
|
||||
log.exception("Delivery failed for %s", inbox_url)
|
||||
return False
|
||||
return None
|
||||
|
||||
|
||||
async def on_any_activity(activity: APActivity, session: AsyncSession) -> None:
|
||||
@@ -140,12 +142,34 @@ async def on_any_activity(activity: APActivity, session: AsyncSession) -> None:
|
||||
log.debug("No followers to deliver to for %s", activity.activity_id)
|
||||
return
|
||||
|
||||
# Deduplicate inboxes
|
||||
all_inboxes = {f.follower_inbox for f in followers if f.follower_inbox}
|
||||
|
||||
# Check delivery log — skip inboxes we already delivered to (idempotency)
|
||||
existing = (
|
||||
await session.execute(
|
||||
select(APDeliveryLog.inbox_url).where(
|
||||
APDeliveryLog.activity_id == activity.id,
|
||||
APDeliveryLog.status_code < 300,
|
||||
)
|
||||
)
|
||||
).scalars().all()
|
||||
already_delivered = set(existing)
|
||||
|
||||
inboxes = all_inboxes - already_delivered
|
||||
if not inboxes:
|
||||
log.info("All %d inbox(es) already delivered for %s", len(all_inboxes), activity.activity_id)
|
||||
return
|
||||
|
||||
if already_delivered:
|
||||
log.info(
|
||||
"Skipping %d already-delivered inbox(es), delivering to %d remaining",
|
||||
len(already_delivered), len(inboxes),
|
||||
)
|
||||
|
||||
# Build activity JSON
|
||||
activity_json = _build_activity_json(activity, actor, domain)
|
||||
|
||||
# Deduplicate inboxes
|
||||
inboxes = {f.follower_inbox for f in followers if f.follower_inbox}
|
||||
|
||||
log.info(
|
||||
"Delivering %s to %d inbox(es) for @%s",
|
||||
activity.activity_type, len(inboxes), actor.preferred_username,
|
||||
@@ -153,7 +177,17 @@ async def on_any_activity(activity: APActivity, session: AsyncSession) -> None:
|
||||
|
||||
async with httpx.AsyncClient() as client:
|
||||
for inbox_url in inboxes:
|
||||
await _deliver_to_inbox(client, inbox_url, activity_json, actor, domain)
|
||||
status_code = await _deliver_to_inbox(
|
||||
client, inbox_url, activity_json, actor, domain
|
||||
)
|
||||
# Log successful deliveries for idempotency
|
||||
if status_code is not None and status_code < 300:
|
||||
session.add(APDeliveryLog(
|
||||
activity_id=activity.id,
|
||||
inbox_url=inbox_url,
|
||||
status_code=status_code,
|
||||
))
|
||||
await session.flush()
|
||||
|
||||
|
||||
# Wildcard: fires for every activity
|
||||
|
||||
@@ -15,10 +15,10 @@ from __future__ import annotations
|
||||
import asyncio
|
||||
import logging
|
||||
import traceback
|
||||
from datetime import datetime, timezone
|
||||
from datetime import datetime, timedelta, timezone
|
||||
|
||||
import asyncpg
|
||||
from sqlalchemy import select
|
||||
from sqlalchemy import select, update
|
||||
from sqlalchemy.ext.asyncio import AsyncSession
|
||||
|
||||
from shared.db.session import get_session, DATABASE_URL
|
||||
@@ -37,15 +37,18 @@ class EventProcessor:
|
||||
app_name: str | None = None,
|
||||
poll_interval: float = 2.0,
|
||||
batch_size: int = 10,
|
||||
stuck_timeout: float = 300.0,
|
||||
):
|
||||
self._app_name = app_name
|
||||
self._poll_interval = poll_interval
|
||||
self._batch_size = batch_size
|
||||
self._stuck_timeout = stuck_timeout # seconds before "processing" → "pending"
|
||||
self._task: asyncio.Task | None = None
|
||||
self._listen_task: asyncio.Task | None = None
|
||||
self._listen_conn: asyncpg.Connection | None = None
|
||||
self._wake = asyncio.Event()
|
||||
self._running = False
|
||||
self._reap_counter = 0
|
||||
|
||||
# ------------------------------------------------------------------
|
||||
# Lifecycle
|
||||
@@ -119,6 +122,12 @@ class EventProcessor:
|
||||
async def _poll_loop(self) -> None:
|
||||
while self._running:
|
||||
try:
|
||||
# Periodically recover stuck activities (~every 30 cycles)
|
||||
self._reap_counter += 1
|
||||
if self._reap_counter >= 30:
|
||||
self._reap_counter = 0
|
||||
await self._recover_stuck()
|
||||
|
||||
# Clear before processing so any NOTIFY that arrives during
|
||||
# _process_batch sets the event and we loop immediately.
|
||||
self._wake.clear()
|
||||
@@ -137,6 +146,37 @@ class EventProcessor:
|
||||
traceback.print_exc()
|
||||
await asyncio.sleep(self._poll_interval)
|
||||
|
||||
async def _recover_stuck(self) -> None:
|
||||
"""Reset activities stuck in 'processing' back to 'pending'.
|
||||
|
||||
This handles the case where a process crashed mid-handler.
|
||||
Combined with idempotent handlers, this gives at-least-once delivery.
|
||||
"""
|
||||
cutoff = datetime.now(timezone.utc) - timedelta(seconds=self._stuck_timeout)
|
||||
try:
|
||||
async with get_session() as session:
|
||||
filters = [
|
||||
APActivity.process_state == "processing",
|
||||
APActivity.created_at < cutoff,
|
||||
]
|
||||
if self._app_name:
|
||||
filters.append(APActivity.origin_app == self._app_name)
|
||||
result = await session.execute(
|
||||
update(APActivity)
|
||||
.where(*filters)
|
||||
.values(process_state="pending")
|
||||
.returning(APActivity.id)
|
||||
)
|
||||
recovered = result.scalars().all()
|
||||
await session.commit()
|
||||
if recovered:
|
||||
log.warning(
|
||||
"Recovered %d stuck activities: %s",
|
||||
len(recovered), recovered,
|
||||
)
|
||||
except Exception:
|
||||
log.exception("Failed to recover stuck activities")
|
||||
|
||||
async def _process_batch(self) -> int:
|
||||
"""Fetch and process a batch of pending activities. Returns count processed."""
|
||||
processed = 0
|
||||
|
||||
Reference in New Issue
Block a user