Compare commits

..

3 Commits

Author SHA1 Message Date
giles
0e89dbee55 Make origin_app migration idempotent
Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
2026-02-22 20:59:19 +00:00
giles
86ccfd25c5 Add origin_app to APActivity — apps only process their own activities
Each app's EventProcessor now filters by origin_app so apps don't steal
each other's pending activities. emit_activity() and publish_activity()
auto-detect the app name from Quart's current_app.

Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
2026-02-22 20:57:46 +00:00
giles
b42f5d63db Add debug logging to EventProcessor and activity handler registry
Logs which handlers are registered at startup and which handlers are
found/called when processing each activity.

Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
2026-02-22 20:49:37 +00:00
6 changed files with 84 additions and 5 deletions

View File

@@ -0,0 +1,35 @@
"""Add origin_app column to ap_activities
Revision ID: o5m3j9k1l2
Revises: n4l2i8j0k1
Create Date: 2026-02-22
"""
from alembic import op
import sqlalchemy as sa
from sqlalchemy import inspect as sa_inspect
revision = "o5m3j9k1l2"
down_revision = "n4l2i8j0k1"
branch_labels = None
depends_on = None
def upgrade() -> None:
conn = op.get_bind()
inspector = sa_inspect(conn)
columns = [c["name"] for c in inspector.get_columns("ap_activities")]
if "origin_app" not in columns:
op.add_column(
"ap_activities",
sa.Column("origin_app", sa.String(64), nullable=True),
)
# Index is idempotent with if_not_exists
op.create_index(
"ix_ap_activity_origin_app", "ap_activities", ["origin_app"],
if_not_exists=True,
)
def downgrade() -> None:
op.drop_index("ix_ap_activity_origin_app", table_name="ap_activities")
op.drop_column("ap_activities", "origin_app")

View File

@@ -9,6 +9,7 @@ EventProcessor dispatches when processing pending activities.
""" """
from __future__ import annotations from __future__ import annotations
import logging
import uuid import uuid
from collections import defaultdict from collections import defaultdict
from typing import Awaitable, Callable, Dict, List, Tuple from typing import Awaitable, Callable, Dict, List, Tuple
@@ -17,6 +18,8 @@ from sqlalchemy.ext.asyncio import AsyncSession
from shared.models.federation import APActivity from shared.models.federation import APActivity
log = logging.getLogger(__name__)
# --------------------------------------------------------------------------- # ---------------------------------------------------------------------------
# Activity-handler registry # Activity-handler registry
# --------------------------------------------------------------------------- # ---------------------------------------------------------------------------
@@ -40,6 +43,7 @@ def register_activity_handler(
""" """
key = (activity_type, object_type or "*") key = (activity_type, object_type or "*")
_activity_handlers[key].append(fn) _activity_handlers[key].append(fn)
log.info("Registered activity handler %s.%s for key %s", fn.__module__, fn.__qualname__, key)
def get_activity_handlers( def get_activity_handlers(
@@ -82,6 +86,7 @@ async def emit_activity(
source_id: int | None = None, source_id: int | None = None,
visibility: str = "internal", visibility: str = "internal",
actor_profile_id: int | None = None, actor_profile_id: int | None = None,
origin_app: str | None = None,
) -> APActivity: ) -> APActivity:
""" """
Write an AP-shaped activity to ap_activities with process_state='pending'. Write an AP-shaped activity to ap_activities with process_state='pending'.
@@ -89,6 +94,13 @@ async def emit_activity(
Called inside a service function using the same session that performs the Called inside a service function using the same session that performs the
domain change. The activity and the change commit together. domain change. The activity and the change commit together.
""" """
if not origin_app:
try:
from quart import current_app
origin_app = current_app.name
except (ImportError, RuntimeError):
pass
activity_uri = f"internal:{uuid.uuid4()}" if visibility == "internal" else f"urn:uuid:{uuid.uuid4()}" activity_uri = f"internal:{uuid.uuid4()}" if visibility == "internal" else f"urn:uuid:{uuid.uuid4()}"
activity = APActivity( activity = APActivity(
@@ -103,6 +115,7 @@ async def emit_activity(
source_id=source_id, source_id=source_id,
visibility=visibility, visibility=visibility,
process_state="pending", process_state="pending",
origin_app=origin_app,
) )
session.add(activity) session.add(activity)
await session.flush() await session.flush()

View File

@@ -8,6 +8,7 @@ Uses SELECT ... FOR UPDATE SKIP LOCKED for safe concurrent processing.
from __future__ import annotations from __future__ import annotations
import asyncio import asyncio
import logging
import traceback import traceback
from datetime import datetime, timezone from datetime import datetime, timezone
@@ -18,6 +19,8 @@ from shared.db.session import get_session
from shared.models.federation import APActivity from shared.models.federation import APActivity
from .bus import get_activity_handlers from .bus import get_activity_handlers
log = logging.getLogger(__name__)
class EventProcessor: class EventProcessor:
"""Background event processor that polls the ap_activities table.""" """Background event processor that polls the ap_activities table."""
@@ -25,9 +28,11 @@ class EventProcessor:
def __init__( def __init__(
self, self,
*, *,
app_name: str | None = None,
poll_interval: float = 2.0, poll_interval: float = 2.0,
batch_size: int = 10, batch_size: int = 10,
): ):
self._app_name = app_name
self._poll_interval = poll_interval self._poll_interval = poll_interval
self._batch_size = batch_size self._batch_size = batch_size
self._task: asyncio.Task | None = None self._task: asyncio.Task | None = None
@@ -67,12 +72,15 @@ class EventProcessor:
"""Fetch and process a batch of pending activities. Returns count processed.""" """Fetch and process a batch of pending activities. Returns count processed."""
processed = 0 processed = 0
async with get_session() as session: async with get_session() as session:
stmt = ( filters = [
select(APActivity)
.where(
APActivity.process_state == "pending", APActivity.process_state == "pending",
APActivity.process_attempts < APActivity.process_max_attempts, APActivity.process_attempts < APActivity.process_max_attempts,
) ]
if self._app_name:
filters.append(APActivity.origin_app == self._app_name)
stmt = (
select(APActivity)
.where(*filters)
.order_by(APActivity.created_at) .order_by(APActivity.created_at)
.limit(self._batch_size) .limit(self._batch_size)
.with_for_update(skip_locked=True) .with_for_update(skip_locked=True)
@@ -92,6 +100,14 @@ class EventProcessor:
handlers = get_activity_handlers(activity.activity_type, activity.object_type) handlers = get_activity_handlers(activity.activity_type, activity.object_type)
now = datetime.now(timezone.utc) now = datetime.now(timezone.utc)
log.info(
"Processing activity %s: type=%s object_type=%s visibility=%s actor_profile_id=%s%d handler(s) found",
activity.id, activity.activity_type, activity.object_type,
activity.visibility, activity.actor_profile_id, len(handlers),
)
for h in handlers:
log.info(" handler: %s.%s", h.__module__, h.__qualname__)
activity.process_state = "processing" activity.process_state = "processing"
activity.process_attempts += 1 activity.process_attempts += 1
await session.flush() await session.flush()
@@ -103,10 +119,13 @@ class EventProcessor:
try: try:
for handler in handlers: for handler in handlers:
log.info(" calling %s.%s", handler.__module__, handler.__qualname__)
await handler(activity, session) await handler(activity, session)
log.info(" done %s.%s", handler.__module__, handler.__qualname__)
activity.process_state = "completed" activity.process_state = "completed"
activity.processed_at = now activity.processed_at = now
except Exception as exc: except Exception as exc:
log.exception("Handler failed for activity %s", activity.id)
activity.process_error = f"{exc.__class__.__name__}: {exc}" activity.process_error = f"{exc.__class__.__name__}: {exc}"
if activity.process_attempts >= activity.process_max_attempts: if activity.process_attempts >= activity.process_max_attempts:
activity.process_state = "failed" activity.process_state = "failed"

View File

@@ -144,7 +144,7 @@ def create_base_app(
return await base_context() return await base_context()
# --- event processor --- # --- event processor ---
_event_processor = EventProcessor() _event_processor = EventProcessor(app_name=name)
# --- startup --- # --- startup ---
@app.before_serving @app.before_serving

View File

@@ -108,6 +108,9 @@ class APActivity(Base):
processed_at: Mapped[datetime | None] = mapped_column( processed_at: Mapped[datetime | None] = mapped_column(
DateTime(timezone=True), nullable=True, DateTime(timezone=True), nullable=True,
) )
origin_app: Mapped[str | None] = mapped_column(
String(64), nullable=True,
)
# Relationships # Relationships
actor_profile = relationship("ActorProfile", back_populates="activities") actor_profile = relationship("ActorProfile", back_populates="activities")

View File

@@ -27,6 +27,14 @@ def _domain() -> str:
return os.getenv("AP_DOMAIN", "rose-ash.com") return os.getenv("AP_DOMAIN", "rose-ash.com")
def _get_origin_app() -> str | None:
try:
from quart import current_app
return current_app.name
except (ImportError, RuntimeError):
return None
def _actor_to_dto(actor: ActorProfile) -> ActorProfileDTO: def _actor_to_dto(actor: ActorProfile) -> ActorProfileDTO:
domain = _domain() domain = _domain()
username = actor.preferred_username username = actor.preferred_username
@@ -198,6 +206,7 @@ class SqlFederationService:
source_id=source_id, source_id=source_id,
visibility="public", visibility="public",
process_state="pending", process_state="pending",
origin_app=_get_origin_app(),
) )
session.add(activity) session.add(activity)
await session.flush() await session.flush()