""" Event processor — polls the ap_activities table and dispatches to registered activity handlers. Runs as an asyncio background task within each app process. Uses SELECT ... FOR UPDATE SKIP LOCKED for safe concurrent processing. A dedicated asyncpg LISTEN connection wakes the poll loop immediately when emit_activity() fires NOTIFY ap_activity_pending, so latency drops from ~2 seconds (poll interval) to sub-100 ms. The fixed-interval poll remains as a safety-net fallback. The LISTEN connection and poll queries target the federation database (DATABASE_URL_FEDERATION) since ap_activities lives there. """ from __future__ import annotations import asyncio import logging import traceback from datetime import datetime, timedelta, timezone import asyncpg from sqlalchemy import select, update from sqlalchemy.ext.asyncio import AsyncSession from shared.db.session import get_federation_session, DATABASE_URL_FEDERATION from shared.models.federation import APActivity from .bus import get_activity_handlers log = logging.getLogger(__name__) class EventProcessor: """Background event processor that polls the ap_activities table.""" def __init__( self, *, app_name: str | None = None, poll_interval: float = 2.0, batch_size: int = 10, stuck_timeout: float = 300.0, ): self._app_name = app_name self._poll_interval = poll_interval self._batch_size = batch_size self._stuck_timeout = stuck_timeout # seconds before "processing" → "pending" self._task: asyncio.Task | None = None self._listen_task: asyncio.Task | None = None self._listen_conn: asyncpg.Connection | None = None self._wake = asyncio.Event() self._running = False self._reap_counter = 0 # ------------------------------------------------------------------ # Lifecycle # ------------------------------------------------------------------ async def start(self) -> None: """Start the background polling loop.""" if self._task is not None: return self._running = True self._listen_task = asyncio.create_task(self._listen_for_notify()) self._task = asyncio.create_task(self._poll_loop()) async def stop(self) -> None: """Stop the background polling loop gracefully.""" self._running = False if self._listen_task is not None: self._listen_task.cancel() try: await self._listen_task except asyncio.CancelledError: pass self._listen_task = None if self._listen_conn is not None and not self._listen_conn.is_closed(): await self._listen_conn.close() self._listen_conn = None if self._task is not None: self._task.cancel() try: await self._task except asyncio.CancelledError: pass self._task = None # ------------------------------------------------------------------ # LISTEN — wake poll loop on NOTIFY # ------------------------------------------------------------------ async def _listen_for_notify(self) -> None: """Maintain a LISTEN connection and wake the poll loop on NOTIFY.""" dsn = DATABASE_URL_FEDERATION.replace("+asyncpg", "") while self._running: try: self._listen_conn = await asyncpg.connect(dsn) await self._listen_conn.add_listener( "ap_activity_pending", self._on_notify ) log.info("LISTEN ap_activity_pending active") # Keep alive with periodic health check while self._running: await asyncio.sleep(30) await self._listen_conn.execute("SELECT 1") except asyncio.CancelledError: break except Exception: log.warning("LISTEN connection lost, reconnecting…", exc_info=True) await asyncio.sleep(2) finally: if self._listen_conn is not None and not self._listen_conn.is_closed(): await self._listen_conn.close() self._listen_conn = None def _on_notify(self, conn, pid, channel, payload) -> None: """Called by asyncpg when a NOTIFY arrives.""" self._wake.set() # ------------------------------------------------------------------ # Poll loop # ------------------------------------------------------------------ async def _poll_loop(self) -> None: while self._running: try: # Periodically recover stuck activities (~every 30 cycles) self._reap_counter += 1 if self._reap_counter >= 30: self._reap_counter = 0 await self._recover_stuck() # Clear before processing so any NOTIFY that arrives during # _process_batch sets the event and we loop immediately. self._wake.clear() processed = await self._process_batch() if processed == 0: try: await asyncio.wait_for( self._wake.wait(), timeout=self._poll_interval ) except asyncio.TimeoutError: pass # processed > 0 → loop immediately to drain the queue except asyncio.CancelledError: break except Exception: traceback.print_exc() await asyncio.sleep(self._poll_interval) async def _recover_stuck(self) -> None: """Reset activities stuck in 'processing' back to 'pending'. This handles the case where a process crashed mid-handler. Combined with idempotent handlers, this gives at-least-once delivery. """ cutoff = datetime.now(timezone.utc) - timedelta(seconds=self._stuck_timeout) try: async with get_federation_session() as session: filters = [ APActivity.process_state == "processing", APActivity.created_at < cutoff, ] if self._app_name: filters.append(APActivity.origin_app == self._app_name) result = await session.execute( update(APActivity) .where(*filters) .values(process_state="pending") .returning(APActivity.id) ) recovered = result.scalars().all() await session.commit() if recovered: log.warning( "Recovered %d stuck activities: %s", len(recovered), recovered, ) except Exception: log.exception("Failed to recover stuck activities") async def _process_batch(self) -> int: """Fetch and process a batch of pending activities. Returns count processed.""" processed = 0 async with get_federation_session() as session: filters = [ APActivity.process_state == "pending", APActivity.process_attempts < APActivity.process_max_attempts, ] if self._app_name: filters.append(APActivity.origin_app == self._app_name) stmt = ( select(APActivity) .where(*filters) .order_by(APActivity.created_at) .limit(self._batch_size) .with_for_update(skip_locked=True) ) result = await session.execute(stmt) activities = result.scalars().all() for activity in activities: await self._process_one(session, activity) processed += 1 await session.commit() return processed async def _process_one(self, session: AsyncSession, activity: APActivity) -> None: """Run all handlers for a single activity.""" handlers = get_activity_handlers(activity.activity_type, activity.object_type) now = datetime.now(timezone.utc) log.info( "Processing activity %s: type=%s object_type=%s visibility=%s actor_profile_id=%s — %d handler(s) found", activity.id, activity.activity_type, activity.object_type, activity.visibility, activity.actor_profile_id, len(handlers), ) for h in handlers: log.info(" handler: %s.%s", h.__module__, h.__qualname__) activity.process_state = "processing" activity.process_attempts += 1 await session.flush() if not handlers: activity.process_state = "completed" activity.processed_at = now return try: for handler in handlers: log.info(" calling %s.%s …", handler.__module__, handler.__qualname__) await handler(activity, session) log.info(" done %s.%s", handler.__module__, handler.__qualname__) activity.process_state = "completed" activity.processed_at = now except Exception as exc: log.exception("Handler failed for activity %s", activity.id) activity.process_error = f"{exc.__class__.__name__}: {exc}" if activity.process_attempts >= activity.process_max_attempts: activity.process_state = "failed" activity.processed_at = now else: activity.process_state = "pending" # retry