Split databases and Redis — prepare infrastructure for per-domain isolation
All checks were successful
Build and Deploy / build-and-deploy (push) Successful in 3m20s
All checks were successful
Build and Deploy / build-and-deploy (push) Successful in 3m20s
Redis: per-app DB index (0-5) with shared auth DB 15 for SSO keys; flushdb replaces flushall so deploys don't wipe cross-app auth state. Postgres: drop 13 cross-domain FK constraints (migration v2t0p8q9r0), remove dead ORM relationships, add explicit joins for 4 live ones. Multi-engine sessions (account + federation) ready for per-domain DBs via DATABASE_URL_ACCOUNT / DATABASE_URL_FEDERATION env vars. All URLs initially point to the same appdb — zero behaviour change until split-databases.sh is run to migrate data to per-domain DBs. Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
This commit is contained in:
@@ -4,6 +4,11 @@ Unified activity bus.
|
||||
emit_activity() writes an APActivity row with process_state='pending' within
|
||||
the caller's existing DB transaction — atomic with the domain change.
|
||||
|
||||
When the federation database is separate (DATABASE_URL_FEDERATION differs from
|
||||
DATABASE_URL), emit_activity() opens its own federation session and commits
|
||||
independently. Atomicity is traded for domain isolation; handlers are
|
||||
idempotent, so at-least-once delivery is safe.
|
||||
|
||||
register_activity_handler() registers async handler functions that the
|
||||
EventProcessor dispatches when processing pending activities.
|
||||
"""
|
||||
@@ -73,6 +78,12 @@ def get_activity_handlers(
|
||||
return handlers
|
||||
|
||||
|
||||
def _needs_federation_session() -> bool:
|
||||
"""True when the federation DB differs from the app's default DB."""
|
||||
from shared.db.session import DATABASE_URL, DATABASE_URL_FEDERATION
|
||||
return DATABASE_URL_FEDERATION != DATABASE_URL
|
||||
|
||||
|
||||
# ---------------------------------------------------------------------------
|
||||
# emit_activity — the primary way to emit events
|
||||
# ---------------------------------------------------------------------------
|
||||
@@ -92,8 +103,10 @@ async def emit_activity(
|
||||
"""
|
||||
Write an AP-shaped activity to ap_activities with process_state='pending'.
|
||||
|
||||
Called inside a service function using the same session that performs the
|
||||
domain change. The activity and the change commit together.
|
||||
When all apps share one database the activity is written in the caller's
|
||||
transaction (atomic with the domain change). When the federation DB is
|
||||
separate, a dedicated federation session is used and committed
|
||||
independently.
|
||||
"""
|
||||
if not origin_app:
|
||||
try:
|
||||
@@ -118,9 +131,17 @@ async def emit_activity(
|
||||
process_state="pending",
|
||||
origin_app=origin_app,
|
||||
)
|
||||
session.add(activity)
|
||||
await session.flush()
|
||||
# Wake any listening EventProcessor as soon as this transaction commits.
|
||||
# NOTIFY is transactional — delivered only after commit.
|
||||
await session.execute(text("NOTIFY ap_activity_pending"))
|
||||
|
||||
if _needs_federation_session():
|
||||
from shared.db.session import get_federation_session
|
||||
async with get_federation_session() as fed_s:
|
||||
async with fed_s.begin():
|
||||
fed_s.add(activity)
|
||||
await fed_s.flush()
|
||||
await fed_s.execute(text("NOTIFY ap_activity_pending"))
|
||||
else:
|
||||
session.add(activity)
|
||||
await session.flush()
|
||||
await session.execute(text("NOTIFY ap_activity_pending"))
|
||||
|
||||
return activity
|
||||
|
||||
@@ -9,6 +9,9 @@ A dedicated asyncpg LISTEN connection wakes the poll loop immediately when
|
||||
emit_activity() fires NOTIFY ap_activity_pending, so latency drops from
|
||||
~2 seconds (poll interval) to sub-100 ms. The fixed-interval poll remains
|
||||
as a safety-net fallback.
|
||||
|
||||
The LISTEN connection and poll queries target the federation database
|
||||
(DATABASE_URL_FEDERATION) since ap_activities lives there.
|
||||
"""
|
||||
from __future__ import annotations
|
||||
|
||||
@@ -21,7 +24,7 @@ import asyncpg
|
||||
from sqlalchemy import select, update
|
||||
from sqlalchemy.ext.asyncio import AsyncSession
|
||||
|
||||
from shared.db.session import get_session, DATABASE_URL
|
||||
from shared.db.session import get_federation_session, DATABASE_URL_FEDERATION
|
||||
from shared.models.federation import APActivity
|
||||
from .bus import get_activity_handlers
|
||||
|
||||
@@ -89,7 +92,7 @@ class EventProcessor:
|
||||
|
||||
async def _listen_for_notify(self) -> None:
|
||||
"""Maintain a LISTEN connection and wake the poll loop on NOTIFY."""
|
||||
dsn = DATABASE_URL.replace("+asyncpg", "")
|
||||
dsn = DATABASE_URL_FEDERATION.replace("+asyncpg", "")
|
||||
while self._running:
|
||||
try:
|
||||
self._listen_conn = await asyncpg.connect(dsn)
|
||||
@@ -154,7 +157,7 @@ class EventProcessor:
|
||||
"""
|
||||
cutoff = datetime.now(timezone.utc) - timedelta(seconds=self._stuck_timeout)
|
||||
try:
|
||||
async with get_session() as session:
|
||||
async with get_federation_session() as session:
|
||||
filters = [
|
||||
APActivity.process_state == "processing",
|
||||
APActivity.created_at < cutoff,
|
||||
@@ -180,7 +183,7 @@ class EventProcessor:
|
||||
async def _process_batch(self) -> int:
|
||||
"""Fetch and process a batch of pending activities. Returns count processed."""
|
||||
processed = 0
|
||||
async with get_session() as session:
|
||||
async with get_federation_session() as session:
|
||||
filters = [
|
||||
APActivity.process_state == "pending",
|
||||
APActivity.process_attempts < APActivity.process_max_attempts,
|
||||
|
||||
Reference in New Issue
Block a user