Logs which handlers are registered at startup and which handlers are found/called when processing each activity. Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
130 lines
4.4 KiB
Python
130 lines
4.4 KiB
Python
"""
|
|
Event processor — polls the ap_activities table and dispatches to registered
|
|
activity handlers.
|
|
|
|
Runs as an asyncio background task within each app process.
|
|
Uses SELECT ... FOR UPDATE SKIP LOCKED for safe concurrent processing.
|
|
"""
|
|
from __future__ import annotations
|
|
|
|
import asyncio
|
|
import logging
|
|
import traceback
|
|
from datetime import datetime, timezone
|
|
|
|
from sqlalchemy import select
|
|
from sqlalchemy.ext.asyncio import AsyncSession
|
|
|
|
from shared.db.session import get_session
|
|
from shared.models.federation import APActivity
|
|
from .bus import get_activity_handlers
|
|
|
|
log = logging.getLogger(__name__)
|
|
|
|
|
|
class EventProcessor:
|
|
"""Background event processor that polls the ap_activities table."""
|
|
|
|
def __init__(
|
|
self,
|
|
*,
|
|
poll_interval: float = 2.0,
|
|
batch_size: int = 10,
|
|
):
|
|
self._poll_interval = poll_interval
|
|
self._batch_size = batch_size
|
|
self._task: asyncio.Task | None = None
|
|
self._running = False
|
|
|
|
async def start(self) -> None:
|
|
"""Start the background polling loop."""
|
|
if self._task is not None:
|
|
return
|
|
self._running = True
|
|
self._task = asyncio.create_task(self._poll_loop())
|
|
|
|
async def stop(self) -> None:
|
|
"""Stop the background polling loop gracefully."""
|
|
self._running = False
|
|
if self._task is not None:
|
|
self._task.cancel()
|
|
try:
|
|
await self._task
|
|
except asyncio.CancelledError:
|
|
pass
|
|
self._task = None
|
|
|
|
async def _poll_loop(self) -> None:
|
|
while self._running:
|
|
try:
|
|
processed = await self._process_batch()
|
|
if processed == 0:
|
|
await asyncio.sleep(self._poll_interval)
|
|
except asyncio.CancelledError:
|
|
break
|
|
except Exception:
|
|
traceback.print_exc()
|
|
await asyncio.sleep(self._poll_interval)
|
|
|
|
async def _process_batch(self) -> int:
|
|
"""Fetch and process a batch of pending activities. Returns count processed."""
|
|
processed = 0
|
|
async with get_session() as session:
|
|
stmt = (
|
|
select(APActivity)
|
|
.where(
|
|
APActivity.process_state == "pending",
|
|
APActivity.process_attempts < APActivity.process_max_attempts,
|
|
)
|
|
.order_by(APActivity.created_at)
|
|
.limit(self._batch_size)
|
|
.with_for_update(skip_locked=True)
|
|
)
|
|
result = await session.execute(stmt)
|
|
activities = result.scalars().all()
|
|
|
|
for activity in activities:
|
|
await self._process_one(session, activity)
|
|
processed += 1
|
|
|
|
await session.commit()
|
|
return processed
|
|
|
|
async def _process_one(self, session: AsyncSession, activity: APActivity) -> None:
|
|
"""Run all handlers for a single activity."""
|
|
handlers = get_activity_handlers(activity.activity_type, activity.object_type)
|
|
now = datetime.now(timezone.utc)
|
|
|
|
log.info(
|
|
"Processing activity %s: type=%s object_type=%s visibility=%s actor_profile_id=%s — %d handler(s) found",
|
|
activity.id, activity.activity_type, activity.object_type,
|
|
activity.visibility, activity.actor_profile_id, len(handlers),
|
|
)
|
|
for h in handlers:
|
|
log.info(" handler: %s.%s", h.__module__, h.__qualname__)
|
|
|
|
activity.process_state = "processing"
|
|
activity.process_attempts += 1
|
|
await session.flush()
|
|
|
|
if not handlers:
|
|
activity.process_state = "completed"
|
|
activity.processed_at = now
|
|
return
|
|
|
|
try:
|
|
for handler in handlers:
|
|
log.info(" calling %s.%s …", handler.__module__, handler.__qualname__)
|
|
await handler(activity, session)
|
|
log.info(" done %s.%s", handler.__module__, handler.__qualname__)
|
|
activity.process_state = "completed"
|
|
activity.processed_at = now
|
|
except Exception as exc:
|
|
log.exception("Handler failed for activity %s", activity.id)
|
|
activity.process_error = f"{exc.__class__.__name__}: {exc}"
|
|
if activity.process_attempts >= activity.process_max_attempts:
|
|
activity.process_state = "failed"
|
|
activity.processed_at = now
|
|
else:
|
|
activity.process_state = "pending" # retry
|