|
|
|
|
@@ -4,6 +4,11 @@ activity handlers.
|
|
|
|
|
|
|
|
|
|
Runs as an asyncio background task within each app process.
|
|
|
|
|
Uses SELECT ... FOR UPDATE SKIP LOCKED for safe concurrent processing.
|
|
|
|
|
|
|
|
|
|
A dedicated asyncpg LISTEN connection wakes the poll loop immediately when
|
|
|
|
|
emit_activity() fires NOTIFY ap_activity_pending, so latency drops from
|
|
|
|
|
~2 seconds (poll interval) to sub-100 ms. The fixed-interval poll remains
|
|
|
|
|
as a safety-net fallback.
|
|
|
|
|
"""
|
|
|
|
|
from __future__ import annotations
|
|
|
|
|
|
|
|
|
|
@@ -12,10 +17,11 @@ import logging
|
|
|
|
|
import traceback
|
|
|
|
|
from datetime import datetime, timezone
|
|
|
|
|
|
|
|
|
|
import asyncpg
|
|
|
|
|
from sqlalchemy import select
|
|
|
|
|
from sqlalchemy.ext.asyncio import AsyncSession
|
|
|
|
|
|
|
|
|
|
from shared.db.session import get_session
|
|
|
|
|
from shared.db.session import get_session, DATABASE_URL
|
|
|
|
|
from shared.models.federation import APActivity
|
|
|
|
|
from .bus import get_activity_handlers
|
|
|
|
|
|
|
|
|
|
@@ -36,18 +42,36 @@ class EventProcessor:
|
|
|
|
|
self._poll_interval = poll_interval
|
|
|
|
|
self._batch_size = batch_size
|
|
|
|
|
self._task: asyncio.Task | None = None
|
|
|
|
|
self._listen_task: asyncio.Task | None = None
|
|
|
|
|
self._listen_conn: asyncpg.Connection | None = None
|
|
|
|
|
self._wake = asyncio.Event()
|
|
|
|
|
self._running = False
|
|
|
|
|
|
|
|
|
|
# ------------------------------------------------------------------
|
|
|
|
|
# Lifecycle
|
|
|
|
|
# ------------------------------------------------------------------
|
|
|
|
|
|
|
|
|
|
async def start(self) -> None:
|
|
|
|
|
"""Start the background polling loop."""
|
|
|
|
|
if self._task is not None:
|
|
|
|
|
return
|
|
|
|
|
self._running = True
|
|
|
|
|
self._listen_task = asyncio.create_task(self._listen_for_notify())
|
|
|
|
|
self._task = asyncio.create_task(self._poll_loop())
|
|
|
|
|
|
|
|
|
|
async def stop(self) -> None:
|
|
|
|
|
"""Stop the background polling loop gracefully."""
|
|
|
|
|
self._running = False
|
|
|
|
|
if self._listen_task is not None:
|
|
|
|
|
self._listen_task.cancel()
|
|
|
|
|
try:
|
|
|
|
|
await self._listen_task
|
|
|
|
|
except asyncio.CancelledError:
|
|
|
|
|
pass
|
|
|
|
|
self._listen_task = None
|
|
|
|
|
if self._listen_conn is not None and not self._listen_conn.is_closed():
|
|
|
|
|
await self._listen_conn.close()
|
|
|
|
|
self._listen_conn = None
|
|
|
|
|
if self._task is not None:
|
|
|
|
|
self._task.cancel()
|
|
|
|
|
try:
|
|
|
|
|
@@ -56,12 +80,57 @@ class EventProcessor:
|
|
|
|
|
pass
|
|
|
|
|
self._task = None
|
|
|
|
|
|
|
|
|
|
# ------------------------------------------------------------------
|
|
|
|
|
# LISTEN — wake poll loop on NOTIFY
|
|
|
|
|
# ------------------------------------------------------------------
|
|
|
|
|
|
|
|
|
|
async def _listen_for_notify(self) -> None:
|
|
|
|
|
"""Maintain a LISTEN connection and wake the poll loop on NOTIFY."""
|
|
|
|
|
dsn = DATABASE_URL.replace("+asyncpg", "")
|
|
|
|
|
while self._running:
|
|
|
|
|
try:
|
|
|
|
|
self._listen_conn = await asyncpg.connect(dsn)
|
|
|
|
|
await self._listen_conn.add_listener(
|
|
|
|
|
"ap_activity_pending", self._on_notify
|
|
|
|
|
)
|
|
|
|
|
log.info("LISTEN ap_activity_pending active")
|
|
|
|
|
# Keep alive with periodic health check
|
|
|
|
|
while self._running:
|
|
|
|
|
await asyncio.sleep(30)
|
|
|
|
|
await self._listen_conn.execute("SELECT 1")
|
|
|
|
|
except asyncio.CancelledError:
|
|
|
|
|
break
|
|
|
|
|
except Exception:
|
|
|
|
|
log.warning("LISTEN connection lost, reconnecting…", exc_info=True)
|
|
|
|
|
await asyncio.sleep(2)
|
|
|
|
|
finally:
|
|
|
|
|
if self._listen_conn is not None and not self._listen_conn.is_closed():
|
|
|
|
|
await self._listen_conn.close()
|
|
|
|
|
self._listen_conn = None
|
|
|
|
|
|
|
|
|
|
def _on_notify(self, conn, pid, channel, payload) -> None:
|
|
|
|
|
"""Called by asyncpg when a NOTIFY arrives."""
|
|
|
|
|
self._wake.set()
|
|
|
|
|
|
|
|
|
|
# ------------------------------------------------------------------
|
|
|
|
|
# Poll loop
|
|
|
|
|
# ------------------------------------------------------------------
|
|
|
|
|
|
|
|
|
|
async def _poll_loop(self) -> None:
|
|
|
|
|
while self._running:
|
|
|
|
|
try:
|
|
|
|
|
# Clear before processing so any NOTIFY that arrives during
|
|
|
|
|
# _process_batch sets the event and we loop immediately.
|
|
|
|
|
self._wake.clear()
|
|
|
|
|
processed = await self._process_batch()
|
|
|
|
|
if processed == 0:
|
|
|
|
|
await asyncio.sleep(self._poll_interval)
|
|
|
|
|
try:
|
|
|
|
|
await asyncio.wait_for(
|
|
|
|
|
self._wake.wait(), timeout=self._poll_interval
|
|
|
|
|
)
|
|
|
|
|
except asyncio.TimeoutError:
|
|
|
|
|
pass
|
|
|
|
|
# processed > 0 → loop immediately to drain the queue
|
|
|
|
|
except asyncio.CancelledError:
|
|
|
|
|
break
|
|
|
|
|
except Exception:
|
|
|
|
|
|