Compare commits

...

1 Commits

Author SHA1 Message Date
giles
8951a62b90 Add NOTIFY/LISTEN wake-up to event processor
emit_activity() now fires NOTIFY ap_activity_pending inside the
caller's transaction (delivered on commit).  EventProcessor maintains
a dedicated asyncpg LISTEN connection and wakes the poll loop
immediately, dropping latency from ~2 s to sub-100 ms.  The fixed-
interval poll remains as a safety-net fallback.

Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
2026-02-23 15:35:27 +00:00
2 changed files with 75 additions and 2 deletions

View File

@@ -14,6 +14,7 @@ import uuid
from collections import defaultdict
from typing import Awaitable, Callable, Dict, List, Tuple
from sqlalchemy import text
from sqlalchemy.ext.asyncio import AsyncSession
from shared.models.federation import APActivity
@@ -119,4 +120,7 @@ async def emit_activity(
)
session.add(activity)
await session.flush()
# Wake any listening EventProcessor as soon as this transaction commits.
# NOTIFY is transactional — delivered only after commit.
await session.execute(text("NOTIFY ap_activity_pending"))
return activity

View File

@@ -4,6 +4,11 @@ activity handlers.
Runs as an asyncio background task within each app process.
Uses SELECT ... FOR UPDATE SKIP LOCKED for safe concurrent processing.
A dedicated asyncpg LISTEN connection wakes the poll loop immediately when
emit_activity() fires NOTIFY ap_activity_pending, so latency drops from
~2 seconds (poll interval) to sub-100 ms. The fixed-interval poll remains
as a safety-net fallback.
"""
from __future__ import annotations
@@ -12,10 +17,11 @@ import logging
import traceback
from datetime import datetime, timezone
import asyncpg
from sqlalchemy import select
from sqlalchemy.ext.asyncio import AsyncSession
from shared.db.session import get_session
from shared.db.session import get_session, DATABASE_URL
from shared.models.federation import APActivity
from .bus import get_activity_handlers
@@ -36,18 +42,36 @@ class EventProcessor:
self._poll_interval = poll_interval
self._batch_size = batch_size
self._task: asyncio.Task | None = None
self._listen_task: asyncio.Task | None = None
self._listen_conn: asyncpg.Connection | None = None
self._wake = asyncio.Event()
self._running = False
# ------------------------------------------------------------------
# Lifecycle
# ------------------------------------------------------------------
async def start(self) -> None:
"""Start the background polling loop."""
if self._task is not None:
return
self._running = True
self._listen_task = asyncio.create_task(self._listen_for_notify())
self._task = asyncio.create_task(self._poll_loop())
async def stop(self) -> None:
"""Stop the background polling loop gracefully."""
self._running = False
if self._listen_task is not None:
self._listen_task.cancel()
try:
await self._listen_task
except asyncio.CancelledError:
pass
self._listen_task = None
if self._listen_conn is not None and not self._listen_conn.is_closed():
await self._listen_conn.close()
self._listen_conn = None
if self._task is not None:
self._task.cancel()
try:
@@ -56,12 +80,57 @@ class EventProcessor:
pass
self._task = None
# ------------------------------------------------------------------
# LISTEN — wake poll loop on NOTIFY
# ------------------------------------------------------------------
async def _listen_for_notify(self) -> None:
"""Maintain a LISTEN connection and wake the poll loop on NOTIFY."""
dsn = DATABASE_URL.replace("+asyncpg", "")
while self._running:
try:
self._listen_conn = await asyncpg.connect(dsn)
await self._listen_conn.add_listener(
"ap_activity_pending", self._on_notify
)
log.info("LISTEN ap_activity_pending active")
# Keep alive with periodic health check
while self._running:
await asyncio.sleep(30)
await self._listen_conn.execute("SELECT 1")
except asyncio.CancelledError:
break
except Exception:
log.warning("LISTEN connection lost, reconnecting…", exc_info=True)
await asyncio.sleep(2)
finally:
if self._listen_conn is not None and not self._listen_conn.is_closed():
await self._listen_conn.close()
self._listen_conn = None
def _on_notify(self, conn, pid, channel, payload) -> None:
"""Called by asyncpg when a NOTIFY arrives."""
self._wake.set()
# ------------------------------------------------------------------
# Poll loop
# ------------------------------------------------------------------
async def _poll_loop(self) -> None:
while self._running:
try:
# Clear before processing so any NOTIFY that arrives during
# _process_batch sets the event and we loop immediately.
self._wake.clear()
processed = await self._process_batch()
if processed == 0:
await asyncio.sleep(self._poll_interval)
try:
await asyncio.wait_for(
self._wake.wait(), timeout=self._poll_interval
)
except asyncio.TimeoutError:
pass
# processed > 0 → loop immediately to drain the queue
except asyncio.CancelledError:
break
except Exception: