Add NOTIFY/LISTEN wake-up to event processor

emit_activity() now fires NOTIFY ap_activity_pending inside the
caller's transaction (delivered on commit).  EventProcessor maintains
a dedicated asyncpg LISTEN connection and wakes the poll loop
immediately, dropping latency from ~2 s to sub-100 ms.  The fixed-
interval poll remains as a safety-net fallback.

Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
This commit is contained in:
giles
2026-02-23 15:35:27 +00:00
parent 7b878a501b
commit 8951a62b90
2 changed files with 75 additions and 2 deletions

View File

@@ -14,6 +14,7 @@ import uuid
from collections import defaultdict from collections import defaultdict
from typing import Awaitable, Callable, Dict, List, Tuple from typing import Awaitable, Callable, Dict, List, Tuple
from sqlalchemy import text
from sqlalchemy.ext.asyncio import AsyncSession from sqlalchemy.ext.asyncio import AsyncSession
from shared.models.federation import APActivity from shared.models.federation import APActivity
@@ -119,4 +120,7 @@ async def emit_activity(
) )
session.add(activity) session.add(activity)
await session.flush() await session.flush()
# Wake any listening EventProcessor as soon as this transaction commits.
# NOTIFY is transactional — delivered only after commit.
await session.execute(text("NOTIFY ap_activity_pending"))
return activity return activity

View File

@@ -4,6 +4,11 @@ activity handlers.
Runs as an asyncio background task within each app process. Runs as an asyncio background task within each app process.
Uses SELECT ... FOR UPDATE SKIP LOCKED for safe concurrent processing. Uses SELECT ... FOR UPDATE SKIP LOCKED for safe concurrent processing.
A dedicated asyncpg LISTEN connection wakes the poll loop immediately when
emit_activity() fires NOTIFY ap_activity_pending, so latency drops from
~2 seconds (poll interval) to sub-100 ms. The fixed-interval poll remains
as a safety-net fallback.
""" """
from __future__ import annotations from __future__ import annotations
@@ -12,10 +17,11 @@ import logging
import traceback import traceback
from datetime import datetime, timezone from datetime import datetime, timezone
import asyncpg
from sqlalchemy import select from sqlalchemy import select
from sqlalchemy.ext.asyncio import AsyncSession from sqlalchemy.ext.asyncio import AsyncSession
from shared.db.session import get_session from shared.db.session import get_session, DATABASE_URL
from shared.models.federation import APActivity from shared.models.federation import APActivity
from .bus import get_activity_handlers from .bus import get_activity_handlers
@@ -36,18 +42,36 @@ class EventProcessor:
self._poll_interval = poll_interval self._poll_interval = poll_interval
self._batch_size = batch_size self._batch_size = batch_size
self._task: asyncio.Task | None = None self._task: asyncio.Task | None = None
self._listen_task: asyncio.Task | None = None
self._listen_conn: asyncpg.Connection | None = None
self._wake = asyncio.Event()
self._running = False self._running = False
# ------------------------------------------------------------------
# Lifecycle
# ------------------------------------------------------------------
async def start(self) -> None: async def start(self) -> None:
"""Start the background polling loop.""" """Start the background polling loop."""
if self._task is not None: if self._task is not None:
return return
self._running = True self._running = True
self._listen_task = asyncio.create_task(self._listen_for_notify())
self._task = asyncio.create_task(self._poll_loop()) self._task = asyncio.create_task(self._poll_loop())
async def stop(self) -> None: async def stop(self) -> None:
"""Stop the background polling loop gracefully.""" """Stop the background polling loop gracefully."""
self._running = False self._running = False
if self._listen_task is not None:
self._listen_task.cancel()
try:
await self._listen_task
except asyncio.CancelledError:
pass
self._listen_task = None
if self._listen_conn is not None and not self._listen_conn.is_closed():
await self._listen_conn.close()
self._listen_conn = None
if self._task is not None: if self._task is not None:
self._task.cancel() self._task.cancel()
try: try:
@@ -56,12 +80,57 @@ class EventProcessor:
pass pass
self._task = None self._task = None
# ------------------------------------------------------------------
# LISTEN — wake poll loop on NOTIFY
# ------------------------------------------------------------------
async def _listen_for_notify(self) -> None:
"""Maintain a LISTEN connection and wake the poll loop on NOTIFY."""
dsn = DATABASE_URL.replace("+asyncpg", "")
while self._running:
try:
self._listen_conn = await asyncpg.connect(dsn)
await self._listen_conn.add_listener(
"ap_activity_pending", self._on_notify
)
log.info("LISTEN ap_activity_pending active")
# Keep alive with periodic health check
while self._running:
await asyncio.sleep(30)
await self._listen_conn.execute("SELECT 1")
except asyncio.CancelledError:
break
except Exception:
log.warning("LISTEN connection lost, reconnecting…", exc_info=True)
await asyncio.sleep(2)
finally:
if self._listen_conn is not None and not self._listen_conn.is_closed():
await self._listen_conn.close()
self._listen_conn = None
def _on_notify(self, conn, pid, channel, payload) -> None:
"""Called by asyncpg when a NOTIFY arrives."""
self._wake.set()
# ------------------------------------------------------------------
# Poll loop
# ------------------------------------------------------------------
async def _poll_loop(self) -> None: async def _poll_loop(self) -> None:
while self._running: while self._running:
try: try:
# Clear before processing so any NOTIFY that arrives during
# _process_batch sets the event and we loop immediately.
self._wake.clear()
processed = await self._process_batch() processed = await self._process_batch()
if processed == 0: if processed == 0:
await asyncio.sleep(self._poll_interval) try:
await asyncio.wait_for(
self._wake.wait(), timeout=self._poll_interval
)
except asyncio.TimeoutError:
pass
# processed > 0 → loop immediately to drain the queue
except asyncio.CancelledError: except asyncio.CancelledError:
break break
except Exception: except Exception: