This repository has been archived on 2026-02-24. You can view files and clone it. You cannot open issues or pull requests or push a commit.
Files
shared/events/processor.py
giles 86ccfd25c5 Add origin_app to APActivity — apps only process their own activities
Each app's EventProcessor now filters by origin_app so apps don't steal
each other's pending activities. emit_activity() and publish_activity()
auto-detect the app name from Quart's current_app.

Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
2026-02-22 20:57:46 +00:00

135 lines
4.6 KiB
Python

"""
Event processor — polls the ap_activities table and dispatches to registered
activity handlers.
Runs as an asyncio background task within each app process.
Uses SELECT ... FOR UPDATE SKIP LOCKED for safe concurrent processing.
"""
from __future__ import annotations
import asyncio
import logging
import traceback
from datetime import datetime, timezone
from sqlalchemy import select
from sqlalchemy.ext.asyncio import AsyncSession
from shared.db.session import get_session
from shared.models.federation import APActivity
from .bus import get_activity_handlers
log = logging.getLogger(__name__)
class EventProcessor:
"""Background event processor that polls the ap_activities table."""
def __init__(
self,
*,
app_name: str | None = None,
poll_interval: float = 2.0,
batch_size: int = 10,
):
self._app_name = app_name
self._poll_interval = poll_interval
self._batch_size = batch_size
self._task: asyncio.Task | None = None
self._running = False
async def start(self) -> None:
"""Start the background polling loop."""
if self._task is not None:
return
self._running = True
self._task = asyncio.create_task(self._poll_loop())
async def stop(self) -> None:
"""Stop the background polling loop gracefully."""
self._running = False
if self._task is not None:
self._task.cancel()
try:
await self._task
except asyncio.CancelledError:
pass
self._task = None
async def _poll_loop(self) -> None:
while self._running:
try:
processed = await self._process_batch()
if processed == 0:
await asyncio.sleep(self._poll_interval)
except asyncio.CancelledError:
break
except Exception:
traceback.print_exc()
await asyncio.sleep(self._poll_interval)
async def _process_batch(self) -> int:
"""Fetch and process a batch of pending activities. Returns count processed."""
processed = 0
async with get_session() as session:
filters = [
APActivity.process_state == "pending",
APActivity.process_attempts < APActivity.process_max_attempts,
]
if self._app_name:
filters.append(APActivity.origin_app == self._app_name)
stmt = (
select(APActivity)
.where(*filters)
.order_by(APActivity.created_at)
.limit(self._batch_size)
.with_for_update(skip_locked=True)
)
result = await session.execute(stmt)
activities = result.scalars().all()
for activity in activities:
await self._process_one(session, activity)
processed += 1
await session.commit()
return processed
async def _process_one(self, session: AsyncSession, activity: APActivity) -> None:
"""Run all handlers for a single activity."""
handlers = get_activity_handlers(activity.activity_type, activity.object_type)
now = datetime.now(timezone.utc)
log.info(
"Processing activity %s: type=%s object_type=%s visibility=%s actor_profile_id=%s%d handler(s) found",
activity.id, activity.activity_type, activity.object_type,
activity.visibility, activity.actor_profile_id, len(handlers),
)
for h in handlers:
log.info(" handler: %s.%s", h.__module__, h.__qualname__)
activity.process_state = "processing"
activity.process_attempts += 1
await session.flush()
if not handlers:
activity.process_state = "completed"
activity.processed_at = now
return
try:
for handler in handlers:
log.info(" calling %s.%s", handler.__module__, handler.__qualname__)
await handler(activity, session)
log.info(" done %s.%s", handler.__module__, handler.__qualname__)
activity.process_state = "completed"
activity.processed_at = now
except Exception as exc:
log.exception("Handler failed for activity %s", activity.id)
activity.process_error = f"{exc.__class__.__name__}: {exc}"
if activity.process_attempts >= activity.process_max_attempts:
activity.process_state = "failed"
activity.processed_at = now
else:
activity.process_state = "pending" # retry