From 86ccfd25c5bf70e944177c40e52c626da78254ff Mon Sep 17 00:00:00 2001 From: giles Date: Sun, 22 Feb 2026 20:57:46 +0000 Subject: [PATCH] =?UTF-8?q?Add=20origin=5Fapp=20to=20APActivity=20?= =?UTF-8?q?=E2=80=94=20apps=20only=20process=20their=20own=20activities?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Each app's EventProcessor now filters by origin_app so apps don't steal each other's pending activities. emit_activity() and publish_activity() auto-detect the app name from Quart's current_app. Co-Authored-By: Claude Opus 4.6 --- .../o5m3j9k1l2_add_origin_app_column.py | 26 +++++++++++++++++++ events/bus.py | 9 +++++++ events/processor.py | 13 +++++++--- infrastructure/factory.py | 2 +- models/federation.py | 3 +++ services/federation_impl.py | 9 +++++++ 6 files changed, 57 insertions(+), 5 deletions(-) create mode 100644 alembic/versions/o5m3j9k1l2_add_origin_app_column.py diff --git a/alembic/versions/o5m3j9k1l2_add_origin_app_column.py b/alembic/versions/o5m3j9k1l2_add_origin_app_column.py new file mode 100644 index 0000000..cd83123 --- /dev/null +++ b/alembic/versions/o5m3j9k1l2_add_origin_app_column.py @@ -0,0 +1,26 @@ +"""Add origin_app column to ap_activities + +Revision ID: o5m3j9k1l2 +Revises: n4l2i8j0k1 +Create Date: 2026-02-22 +""" +from alembic import op +import sqlalchemy as sa + +revision = "o5m3j9k1l2" +down_revision = "n4l2i8j0k1" +branch_labels = None +depends_on = None + + +def upgrade() -> None: + op.add_column( + "ap_activities", + sa.Column("origin_app", sa.String(64), nullable=True), + ) + op.create_index("ix_ap_activity_origin_app", "ap_activities", ["origin_app"]) + + +def downgrade() -> None: + op.drop_index("ix_ap_activity_origin_app", table_name="ap_activities") + op.drop_column("ap_activities", "origin_app") diff --git a/events/bus.py b/events/bus.py index 07cc958..41911d6 100644 --- a/events/bus.py +++ b/events/bus.py @@ -86,6 +86,7 @@ async def emit_activity( source_id: int | None = None, visibility: str = "internal", actor_profile_id: int | None = None, + origin_app: str | None = None, ) -> APActivity: """ Write an AP-shaped activity to ap_activities with process_state='pending'. @@ -93,6 +94,13 @@ async def emit_activity( Called inside a service function using the same session that performs the domain change. The activity and the change commit together. """ + if not origin_app: + try: + from quart import current_app + origin_app = current_app.name + except (ImportError, RuntimeError): + pass + activity_uri = f"internal:{uuid.uuid4()}" if visibility == "internal" else f"urn:uuid:{uuid.uuid4()}" activity = APActivity( @@ -107,6 +115,7 @@ async def emit_activity( source_id=source_id, visibility=visibility, process_state="pending", + origin_app=origin_app, ) session.add(activity) await session.flush() diff --git a/events/processor.py b/events/processor.py index e4423a7..cf392e4 100644 --- a/events/processor.py +++ b/events/processor.py @@ -28,9 +28,11 @@ class EventProcessor: def __init__( self, *, + app_name: str | None = None, poll_interval: float = 2.0, batch_size: int = 10, ): + self._app_name = app_name self._poll_interval = poll_interval self._batch_size = batch_size self._task: asyncio.Task | None = None @@ -70,12 +72,15 @@ class EventProcessor: """Fetch and process a batch of pending activities. Returns count processed.""" processed = 0 async with get_session() as session: + filters = [ + APActivity.process_state == "pending", + APActivity.process_attempts < APActivity.process_max_attempts, + ] + if self._app_name: + filters.append(APActivity.origin_app == self._app_name) stmt = ( select(APActivity) - .where( - APActivity.process_state == "pending", - APActivity.process_attempts < APActivity.process_max_attempts, - ) + .where(*filters) .order_by(APActivity.created_at) .limit(self._batch_size) .with_for_update(skip_locked=True) diff --git a/infrastructure/factory.py b/infrastructure/factory.py index 8eadda9..1bd7e4e 100644 --- a/infrastructure/factory.py +++ b/infrastructure/factory.py @@ -144,7 +144,7 @@ def create_base_app( return await base_context() # --- event processor --- - _event_processor = EventProcessor() + _event_processor = EventProcessor(app_name=name) # --- startup --- @app.before_serving diff --git a/models/federation.py b/models/federation.py index 8f2066e..4b3f2a4 100644 --- a/models/federation.py +++ b/models/federation.py @@ -108,6 +108,9 @@ class APActivity(Base): processed_at: Mapped[datetime | None] = mapped_column( DateTime(timezone=True), nullable=True, ) + origin_app: Mapped[str | None] = mapped_column( + String(64), nullable=True, + ) # Relationships actor_profile = relationship("ActorProfile", back_populates="activities") diff --git a/services/federation_impl.py b/services/federation_impl.py index f1eac78..40f0998 100644 --- a/services/federation_impl.py +++ b/services/federation_impl.py @@ -27,6 +27,14 @@ def _domain() -> str: return os.getenv("AP_DOMAIN", "rose-ash.com") +def _get_origin_app() -> str | None: + try: + from quart import current_app + return current_app.name + except (ImportError, RuntimeError): + return None + + def _actor_to_dto(actor: ActorProfile) -> ActorProfileDTO: domain = _domain() username = actor.preferred_username @@ -198,6 +206,7 @@ class SqlFederationService: source_id=source_id, visibility="public", process_state="pending", + origin_app=_get_origin_app(), ) session.add(activity) await session.flush()