""" Event processor — polls the ap_activities table and dispatches to registered activity handlers. Runs as an asyncio background task within each app process. Uses SELECT ... FOR UPDATE SKIP LOCKED for safe concurrent processing. """ from __future__ import annotations import asyncio import logging import traceback from datetime import datetime, timezone from sqlalchemy import select from sqlalchemy.ext.asyncio import AsyncSession from shared.db.session import get_session from shared.models.federation import APActivity from .bus import get_activity_handlers log = logging.getLogger(__name__) class EventProcessor: """Background event processor that polls the ap_activities table.""" def __init__( self, *, app_name: str | None = None, poll_interval: float = 2.0, batch_size: int = 10, ): self._app_name = app_name self._poll_interval = poll_interval self._batch_size = batch_size self._task: asyncio.Task | None = None self._running = False async def start(self) -> None: """Start the background polling loop.""" if self._task is not None: return self._running = True self._task = asyncio.create_task(self._poll_loop()) async def stop(self) -> None: """Stop the background polling loop gracefully.""" self._running = False if self._task is not None: self._task.cancel() try: await self._task except asyncio.CancelledError: pass self._task = None async def _poll_loop(self) -> None: while self._running: try: processed = await self._process_batch() if processed == 0: await asyncio.sleep(self._poll_interval) except asyncio.CancelledError: break except Exception: traceback.print_exc() await asyncio.sleep(self._poll_interval) async def _process_batch(self) -> int: """Fetch and process a batch of pending activities. Returns count processed.""" processed = 0 async with get_session() as session: filters = [ APActivity.process_state == "pending", APActivity.process_attempts < APActivity.process_max_attempts, ] if self._app_name: filters.append(APActivity.origin_app == self._app_name) stmt = ( select(APActivity) .where(*filters) .order_by(APActivity.created_at) .limit(self._batch_size) .with_for_update(skip_locked=True) ) result = await session.execute(stmt) activities = result.scalars().all() for activity in activities: await self._process_one(session, activity) processed += 1 await session.commit() return processed async def _process_one(self, session: AsyncSession, activity: APActivity) -> None: """Run all handlers for a single activity.""" handlers = get_activity_handlers(activity.activity_type, activity.object_type) now = datetime.now(timezone.utc) log.info( "Processing activity %s: type=%s object_type=%s visibility=%s actor_profile_id=%s — %d handler(s) found", activity.id, activity.activity_type, activity.object_type, activity.visibility, activity.actor_profile_id, len(handlers), ) for h in handlers: log.info(" handler: %s.%s", h.__module__, h.__qualname__) activity.process_state = "processing" activity.process_attempts += 1 await session.flush() if not handlers: activity.process_state = "completed" activity.processed_at = now return try: for handler in handlers: log.info(" calling %s.%s …", handler.__module__, handler.__qualname__) await handler(activity, session) log.info(" done %s.%s", handler.__module__, handler.__qualname__) activity.process_state = "completed" activity.processed_at = now except Exception as exc: log.exception("Handler failed for activity %s", activity.id) activity.process_error = f"{exc.__class__.__name__}: {exc}" if activity.process_attempts >= activity.process_max_attempts: activity.process_state = "failed" activity.processed_at = now else: activity.process_state = "pending" # retry