Add origin_app to APActivity — apps only process their own activities

Each app's EventProcessor now filters by origin_app so apps don't steal
each other's pending activities. emit_activity() and publish_activity()
auto-detect the app name from Quart's current_app.

Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
This commit is contained in:
giles
2026-02-22 20:57:46 +00:00
parent b42f5d63db
commit 86ccfd25c5
6 changed files with 57 additions and 5 deletions

View File

@@ -0,0 +1,26 @@
"""Add origin_app column to ap_activities
Revision ID: o5m3j9k1l2
Revises: n4l2i8j0k1
Create Date: 2026-02-22
"""
from alembic import op
import sqlalchemy as sa
revision = "o5m3j9k1l2"
down_revision = "n4l2i8j0k1"
branch_labels = None
depends_on = None
def upgrade() -> None:
op.add_column(
"ap_activities",
sa.Column("origin_app", sa.String(64), nullable=True),
)
op.create_index("ix_ap_activity_origin_app", "ap_activities", ["origin_app"])
def downgrade() -> None:
op.drop_index("ix_ap_activity_origin_app", table_name="ap_activities")
op.drop_column("ap_activities", "origin_app")

View File

@@ -86,6 +86,7 @@ async def emit_activity(
source_id: int | None = None, source_id: int | None = None,
visibility: str = "internal", visibility: str = "internal",
actor_profile_id: int | None = None, actor_profile_id: int | None = None,
origin_app: str | None = None,
) -> APActivity: ) -> APActivity:
""" """
Write an AP-shaped activity to ap_activities with process_state='pending'. Write an AP-shaped activity to ap_activities with process_state='pending'.
@@ -93,6 +94,13 @@ async def emit_activity(
Called inside a service function using the same session that performs the Called inside a service function using the same session that performs the
domain change. The activity and the change commit together. domain change. The activity and the change commit together.
""" """
if not origin_app:
try:
from quart import current_app
origin_app = current_app.name
except (ImportError, RuntimeError):
pass
activity_uri = f"internal:{uuid.uuid4()}" if visibility == "internal" else f"urn:uuid:{uuid.uuid4()}" activity_uri = f"internal:{uuid.uuid4()}" if visibility == "internal" else f"urn:uuid:{uuid.uuid4()}"
activity = APActivity( activity = APActivity(
@@ -107,6 +115,7 @@ async def emit_activity(
source_id=source_id, source_id=source_id,
visibility=visibility, visibility=visibility,
process_state="pending", process_state="pending",
origin_app=origin_app,
) )
session.add(activity) session.add(activity)
await session.flush() await session.flush()

View File

@@ -28,9 +28,11 @@ class EventProcessor:
def __init__( def __init__(
self, self,
*, *,
app_name: str | None = None,
poll_interval: float = 2.0, poll_interval: float = 2.0,
batch_size: int = 10, batch_size: int = 10,
): ):
self._app_name = app_name
self._poll_interval = poll_interval self._poll_interval = poll_interval
self._batch_size = batch_size self._batch_size = batch_size
self._task: asyncio.Task | None = None self._task: asyncio.Task | None = None
@@ -70,12 +72,15 @@ class EventProcessor:
"""Fetch and process a batch of pending activities. Returns count processed.""" """Fetch and process a batch of pending activities. Returns count processed."""
processed = 0 processed = 0
async with get_session() as session: async with get_session() as session:
filters = [
APActivity.process_state == "pending",
APActivity.process_attempts < APActivity.process_max_attempts,
]
if self._app_name:
filters.append(APActivity.origin_app == self._app_name)
stmt = ( stmt = (
select(APActivity) select(APActivity)
.where( .where(*filters)
APActivity.process_state == "pending",
APActivity.process_attempts < APActivity.process_max_attempts,
)
.order_by(APActivity.created_at) .order_by(APActivity.created_at)
.limit(self._batch_size) .limit(self._batch_size)
.with_for_update(skip_locked=True) .with_for_update(skip_locked=True)

View File

@@ -144,7 +144,7 @@ def create_base_app(
return await base_context() return await base_context()
# --- event processor --- # --- event processor ---
_event_processor = EventProcessor() _event_processor = EventProcessor(app_name=name)
# --- startup --- # --- startup ---
@app.before_serving @app.before_serving

View File

@@ -108,6 +108,9 @@ class APActivity(Base):
processed_at: Mapped[datetime | None] = mapped_column( processed_at: Mapped[datetime | None] = mapped_column(
DateTime(timezone=True), nullable=True, DateTime(timezone=True), nullable=True,
) )
origin_app: Mapped[str | None] = mapped_column(
String(64), nullable=True,
)
# Relationships # Relationships
actor_profile = relationship("ActorProfile", back_populates="activities") actor_profile = relationship("ActorProfile", back_populates="activities")

View File

@@ -27,6 +27,14 @@ def _domain() -> str:
return os.getenv("AP_DOMAIN", "rose-ash.com") return os.getenv("AP_DOMAIN", "rose-ash.com")
def _get_origin_app() -> str | None:
try:
from quart import current_app
return current_app.name
except (ImportError, RuntimeError):
return None
def _actor_to_dto(actor: ActorProfile) -> ActorProfileDTO: def _actor_to_dto(actor: ActorProfile) -> ActorProfileDTO:
domain = _domain() domain = _domain()
username = actor.preferred_username username = actor.preferred_username
@@ -198,6 +206,7 @@ class SqlFederationService:
source_id=source_id, source_id=source_id,
visibility="public", visibility="public",
process_state="pending", process_state="pending",
origin_app=_get_origin_app(),
) )
session.add(activity) session.add(activity)
await session.flush() await session.flush()