diff --git a/events/bus.py b/events/bus.py index e1e90c0..07cc958 100644 --- a/events/bus.py +++ b/events/bus.py @@ -9,6 +9,7 @@ EventProcessor dispatches when processing pending activities. """ from __future__ import annotations +import logging import uuid from collections import defaultdict from typing import Awaitable, Callable, Dict, List, Tuple @@ -17,6 +18,8 @@ from sqlalchemy.ext.asyncio import AsyncSession from shared.models.federation import APActivity +log = logging.getLogger(__name__) + # --------------------------------------------------------------------------- # Activity-handler registry # --------------------------------------------------------------------------- @@ -40,6 +43,7 @@ def register_activity_handler( """ key = (activity_type, object_type or "*") _activity_handlers[key].append(fn) + log.info("Registered activity handler %s.%s for key %s", fn.__module__, fn.__qualname__, key) def get_activity_handlers( diff --git a/events/processor.py b/events/processor.py index e580233..e4423a7 100644 --- a/events/processor.py +++ b/events/processor.py @@ -8,6 +8,7 @@ Uses SELECT ... FOR UPDATE SKIP LOCKED for safe concurrent processing. from __future__ import annotations import asyncio +import logging import traceback from datetime import datetime, timezone @@ -18,6 +19,8 @@ from shared.db.session import get_session from shared.models.federation import APActivity from .bus import get_activity_handlers +log = logging.getLogger(__name__) + class EventProcessor: """Background event processor that polls the ap_activities table.""" @@ -92,6 +95,14 @@ class EventProcessor: handlers = get_activity_handlers(activity.activity_type, activity.object_type) now = datetime.now(timezone.utc) + log.info( + "Processing activity %s: type=%s object_type=%s visibility=%s actor_profile_id=%s — %d handler(s) found", + activity.id, activity.activity_type, activity.object_type, + activity.visibility, activity.actor_profile_id, len(handlers), + ) + for h in handlers: + log.info(" handler: %s.%s", h.__module__, h.__qualname__) + activity.process_state = "processing" activity.process_attempts += 1 await session.flush() @@ -103,10 +114,13 @@ class EventProcessor: try: for handler in handlers: + log.info(" calling %s.%s …", handler.__module__, handler.__qualname__) await handler(activity, session) + log.info(" done %s.%s", handler.__module__, handler.__qualname__) activity.process_state = "completed" activity.processed_at = now except Exception as exc: + log.exception("Handler failed for activity %s", activity.id) activity.process_error = f"{exc.__class__.__name__}: {exc}" if activity.process_attempts >= activity.process_max_attempts: activity.process_state = "failed"