diff --git a/events/bus.py b/events/bus.py index 41911d6..215194e 100644 --- a/events/bus.py +++ b/events/bus.py @@ -14,6 +14,7 @@ import uuid from collections import defaultdict from typing import Awaitable, Callable, Dict, List, Tuple +from sqlalchemy import text from sqlalchemy.ext.asyncio import AsyncSession from shared.models.federation import APActivity @@ -119,4 +120,7 @@ async def emit_activity( ) session.add(activity) await session.flush() + # Wake any listening EventProcessor as soon as this transaction commits. + # NOTIFY is transactional — delivered only after commit. + await session.execute(text("NOTIFY ap_activity_pending")) return activity diff --git a/events/processor.py b/events/processor.py index cf392e4..660ffb9 100644 --- a/events/processor.py +++ b/events/processor.py @@ -4,6 +4,11 @@ activity handlers. Runs as an asyncio background task within each app process. Uses SELECT ... FOR UPDATE SKIP LOCKED for safe concurrent processing. + +A dedicated asyncpg LISTEN connection wakes the poll loop immediately when +emit_activity() fires NOTIFY ap_activity_pending, so latency drops from +~2 seconds (poll interval) to sub-100 ms. The fixed-interval poll remains +as a safety-net fallback. """ from __future__ import annotations @@ -12,10 +17,11 @@ import logging import traceback from datetime import datetime, timezone +import asyncpg from sqlalchemy import select from sqlalchemy.ext.asyncio import AsyncSession -from shared.db.session import get_session +from shared.db.session import get_session, DATABASE_URL from shared.models.federation import APActivity from .bus import get_activity_handlers @@ -36,18 +42,36 @@ class EventProcessor: self._poll_interval = poll_interval self._batch_size = batch_size self._task: asyncio.Task | None = None + self._listen_task: asyncio.Task | None = None + self._listen_conn: asyncpg.Connection | None = None + self._wake = asyncio.Event() self._running = False + # ------------------------------------------------------------------ + # Lifecycle + # ------------------------------------------------------------------ + async def start(self) -> None: """Start the background polling loop.""" if self._task is not None: return self._running = True + self._listen_task = asyncio.create_task(self._listen_for_notify()) self._task = asyncio.create_task(self._poll_loop()) async def stop(self) -> None: """Stop the background polling loop gracefully.""" self._running = False + if self._listen_task is not None: + self._listen_task.cancel() + try: + await self._listen_task + except asyncio.CancelledError: + pass + self._listen_task = None + if self._listen_conn is not None and not self._listen_conn.is_closed(): + await self._listen_conn.close() + self._listen_conn = None if self._task is not None: self._task.cancel() try: @@ -56,12 +80,57 @@ class EventProcessor: pass self._task = None + # ------------------------------------------------------------------ + # LISTEN — wake poll loop on NOTIFY + # ------------------------------------------------------------------ + + async def _listen_for_notify(self) -> None: + """Maintain a LISTEN connection and wake the poll loop on NOTIFY.""" + dsn = DATABASE_URL.replace("+asyncpg", "") + while self._running: + try: + self._listen_conn = await asyncpg.connect(dsn) + await self._listen_conn.add_listener( + "ap_activity_pending", self._on_notify + ) + log.info("LISTEN ap_activity_pending active") + # Keep alive with periodic health check + while self._running: + await asyncio.sleep(30) + await self._listen_conn.execute("SELECT 1") + except asyncio.CancelledError: + break + except Exception: + log.warning("LISTEN connection lost, reconnecting…", exc_info=True) + await asyncio.sleep(2) + finally: + if self._listen_conn is not None and not self._listen_conn.is_closed(): + await self._listen_conn.close() + self._listen_conn = None + + def _on_notify(self, conn, pid, channel, payload) -> None: + """Called by asyncpg when a NOTIFY arrives.""" + self._wake.set() + + # ------------------------------------------------------------------ + # Poll loop + # ------------------------------------------------------------------ + async def _poll_loop(self) -> None: while self._running: try: + # Clear before processing so any NOTIFY that arrives during + # _process_batch sets the event and we loop immediately. + self._wake.clear() processed = await self._process_batch() if processed == 0: - await asyncio.sleep(self._poll_interval) + try: + await asyncio.wait_for( + self._wake.wait(), timeout=self._poll_interval + ) + except asyncio.TimeoutError: + pass + # processed > 0 → loop immediately to drain the queue except asyncio.CancelledError: break except Exception: