This repository has been archived on 2026-02-24. You can view files and clone it. You cannot open issues or pull requests or push a commit.
Files
shared/events/processor.py
giles 2e9db11925 Unify domain_events + ap_activities into AP-shaped event bus
All cross-service events now flow through ap_activities with a unified
EventProcessor. Internal events use visibility="internal"; federation
activities use visibility="public" and get delivered by a wildcard handler.

- Add processing columns to APActivity (process_state, actor_uri, etc.)
- New emit_activity() / register_activity_handler() API
- EventProcessor polls ap_activities instead of domain_events
- Rewrite all handlers to accept APActivity
- Migrate all 7 emit_event call sites to emit_activity
- publish_activity() sets process_state=pending directly (no emit_event bridge)
- Migration to drop domain_events table

Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
2026-02-22 16:19:29 +00:00

116 lines
3.8 KiB
Python

"""
Event processor — polls the ap_activities table and dispatches to registered
activity handlers.
Runs as an asyncio background task within each app process.
Uses SELECT ... FOR UPDATE SKIP LOCKED for safe concurrent processing.
"""
from __future__ import annotations
import asyncio
import traceback
from datetime import datetime, timezone
from sqlalchemy import select
from sqlalchemy.ext.asyncio import AsyncSession
from shared.db.session import get_session
from shared.models.federation import APActivity
from .bus import get_activity_handlers
class EventProcessor:
"""Background event processor that polls the ap_activities table."""
def __init__(
self,
*,
poll_interval: float = 2.0,
batch_size: int = 10,
):
self._poll_interval = poll_interval
self._batch_size = batch_size
self._task: asyncio.Task | None = None
self._running = False
async def start(self) -> None:
"""Start the background polling loop."""
if self._task is not None:
return
self._running = True
self._task = asyncio.create_task(self._poll_loop())
async def stop(self) -> None:
"""Stop the background polling loop gracefully."""
self._running = False
if self._task is not None:
self._task.cancel()
try:
await self._task
except asyncio.CancelledError:
pass
self._task = None
async def _poll_loop(self) -> None:
while self._running:
try:
processed = await self._process_batch()
if processed == 0:
await asyncio.sleep(self._poll_interval)
except asyncio.CancelledError:
break
except Exception:
traceback.print_exc()
await asyncio.sleep(self._poll_interval)
async def _process_batch(self) -> int:
"""Fetch and process a batch of pending activities. Returns count processed."""
processed = 0
async with get_session() as session:
stmt = (
select(APActivity)
.where(
APActivity.process_state == "pending",
APActivity.process_attempts < APActivity.process_max_attempts,
)
.order_by(APActivity.created_at)
.limit(self._batch_size)
.with_for_update(skip_locked=True)
)
result = await session.execute(stmt)
activities = result.scalars().all()
for activity in activities:
await self._process_one(session, activity)
processed += 1
await session.commit()
return processed
async def _process_one(self, session: AsyncSession, activity: APActivity) -> None:
"""Run all handlers for a single activity."""
handlers = get_activity_handlers(activity.activity_type, activity.object_type)
now = datetime.now(timezone.utc)
activity.process_state = "processing"
activity.process_attempts += 1
await session.flush()
if not handlers:
activity.process_state = "completed"
activity.processed_at = now
return
try:
for handler in handlers:
await handler(activity, session)
activity.process_state = "completed"
activity.processed_at = now
except Exception as exc:
activity.process_error = f"{exc.__class__.__name__}: {exc}"
if activity.process_attempts >= activity.process_max_attempts:
activity.process_state = "failed"
activity.processed_at = now
else:
activity.process_state = "pending" # retry