Fix AP blueprint cross-DB queries + harden Ghost sync init
All checks were successful
Build and Deploy / build-and-deploy (push) Successful in 1m10s
All checks were successful
Build and Deploy / build-and-deploy (push) Successful in 1m10s
AP blueprints (activitypub.py, ap_social.py) were querying federation tables (ap_actor_profiles etc.) on g.s which points to the app's own DB after the per-app split. Now uses g._ap_s backed by get_federation_session() for non-federation apps. Also hardens Ghost sync before_app_serving to catch/rollback on failure instead of crashing the Hypercorn worker. Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
This commit is contained in:
@@ -69,6 +69,56 @@ def create_activitypub_blueprint(app_name: str) -> Blueprint:
|
||||
# For per-app outboxes, filter by origin_app; for federation, show all
|
||||
outbox_origin_app: str | None = None if aggregate else app_name
|
||||
|
||||
# ------------------------------------------------------------------
|
||||
# Federation session management — AP tables live in db_federation,
|
||||
# which is separate from each app's own DB after the per-app split.
|
||||
# g._ap_s points to the federation DB; on the federation app itself
|
||||
# it's just an alias for g.s.
|
||||
# ------------------------------------------------------------------
|
||||
|
||||
from shared.db.session import needs_federation_session, create_federation_session
|
||||
|
||||
@bp.before_request
|
||||
async def _open_ap_session():
|
||||
if needs_federation_session():
|
||||
sess = create_federation_session()
|
||||
g._ap_s = sess
|
||||
g._ap_tx = await sess.begin()
|
||||
g._ap_own = True
|
||||
else:
|
||||
g._ap_s = g.s
|
||||
g._ap_own = False
|
||||
|
||||
@bp.after_request
|
||||
async def _commit_ap_session(response):
|
||||
if getattr(g, "_ap_own", False):
|
||||
if 200 <= response.status_code < 400:
|
||||
try:
|
||||
await g._ap_tx.commit()
|
||||
except Exception:
|
||||
try:
|
||||
await g._ap_tx.rollback()
|
||||
except Exception:
|
||||
pass
|
||||
return response
|
||||
|
||||
@bp.teardown_request
|
||||
async def _close_ap_session(exc):
|
||||
if getattr(g, "_ap_own", False):
|
||||
s = getattr(g, "_ap_s", None)
|
||||
if s:
|
||||
if exc is not None or s.in_transaction():
|
||||
tx = getattr(g, "_ap_tx", None)
|
||||
if tx and tx.is_active:
|
||||
try:
|
||||
await tx.rollback()
|
||||
except Exception:
|
||||
pass
|
||||
try:
|
||||
await s.close()
|
||||
except Exception:
|
||||
pass
|
||||
|
||||
# ------------------------------------------------------------------
|
||||
# Well-known endpoints
|
||||
# ------------------------------------------------------------------
|
||||
@@ -87,7 +137,7 @@ def create_activitypub_blueprint(app_name: str) -> Blueprint:
|
||||
if res_domain != domain:
|
||||
abort(404, "User not on this server")
|
||||
|
||||
actor = await services.federation.get_actor_by_username(g.s, username)
|
||||
actor = await services.federation.get_actor_by_username(g._ap_s, username)
|
||||
if not actor:
|
||||
abort(404, "User not found")
|
||||
|
||||
@@ -128,7 +178,7 @@ def create_activitypub_blueprint(app_name: str) -> Blueprint:
|
||||
|
||||
@bp.get("/nodeinfo/2.0")
|
||||
async def nodeinfo():
|
||||
stats = await services.federation.get_stats(g.s)
|
||||
stats = await services.federation.get_stats(g._ap_s)
|
||||
return Response(
|
||||
response=json.dumps({
|
||||
"version": "2.0",
|
||||
@@ -170,7 +220,7 @@ def create_activitypub_blueprint(app_name: str) -> Blueprint:
|
||||
|
||||
@bp.get("/users/<username>")
|
||||
async def actor_profile(username: str):
|
||||
actor = await services.federation.get_actor_by_username(g.s, username)
|
||||
actor = await services.federation.get_actor_by_username(g._ap_s, username)
|
||||
if not actor:
|
||||
abort(404)
|
||||
|
||||
@@ -224,7 +274,7 @@ def create_activitypub_blueprint(app_name: str) -> Blueprint:
|
||||
if aggregate:
|
||||
from quart import render_template
|
||||
activities, total = await services.federation.get_outbox(
|
||||
g.s, username, page=1, per_page=20,
|
||||
g._ap_s, username, page=1, per_page=20,
|
||||
)
|
||||
return await render_template(
|
||||
"federation/profile.html",
|
||||
@@ -242,7 +292,7 @@ def create_activitypub_blueprint(app_name: str) -> Blueprint:
|
||||
@csrf_exempt
|
||||
@bp.post("/users/<username>/inbox")
|
||||
async def inbox(username: str):
|
||||
actor = await services.federation.get_actor_by_username(g.s, username)
|
||||
actor = await services.federation.get_actor_by_username(g._ap_s, username)
|
||||
if not actor:
|
||||
abort(404)
|
||||
|
||||
@@ -284,7 +334,7 @@ def create_activitypub_blueprint(app_name: str) -> Blueprint:
|
||||
|
||||
# Load actor row for DB operations
|
||||
actor_row = (
|
||||
await g.s.execute(
|
||||
await g._ap_s.execute(
|
||||
select(ActorProfile).where(
|
||||
ActorProfile.preferred_username == username
|
||||
)
|
||||
@@ -298,13 +348,13 @@ def create_activitypub_blueprint(app_name: str) -> Blueprint:
|
||||
activity_type=activity_type,
|
||||
from_actor=from_actor_url,
|
||||
)
|
||||
g.s.add(item)
|
||||
await g.s.flush()
|
||||
g._ap_s.add(item)
|
||||
await g._ap_s.flush()
|
||||
|
||||
# Dispatch to shared handlers
|
||||
from shared.infrastructure.ap_inbox_handlers import dispatch_inbox_activity
|
||||
await dispatch_inbox_activity(
|
||||
g.s, actor_row, body, from_actor_url,
|
||||
g._ap_s, actor_row, body, from_actor_url,
|
||||
domain=domain,
|
||||
app_domain=follower_app_domain,
|
||||
)
|
||||
@@ -312,7 +362,7 @@ def create_activitypub_blueprint(app_name: str) -> Blueprint:
|
||||
# Mark as processed
|
||||
item.state = "processed"
|
||||
item.processed_at = datetime.now(timezone.utc)
|
||||
await g.s.flush()
|
||||
await g._ap_s.flush()
|
||||
|
||||
return Response(status=202)
|
||||
|
||||
@@ -322,7 +372,7 @@ def create_activitypub_blueprint(app_name: str) -> Blueprint:
|
||||
|
||||
@bp.get("/users/<username>/outbox")
|
||||
async def outbox(username: str):
|
||||
actor = await services.federation.get_actor_by_username(g.s, username)
|
||||
actor = await services.federation.get_actor_by_username(g._ap_s, username)
|
||||
if not actor:
|
||||
abort(404)
|
||||
|
||||
@@ -331,7 +381,7 @@ def create_activitypub_blueprint(app_name: str) -> Blueprint:
|
||||
|
||||
if not page_param:
|
||||
_, total = await services.federation.get_outbox(
|
||||
g.s, username, page=1, per_page=1,
|
||||
g._ap_s, username, page=1, per_page=1,
|
||||
origin_app=outbox_origin_app,
|
||||
)
|
||||
return Response(
|
||||
@@ -347,7 +397,7 @@ def create_activitypub_blueprint(app_name: str) -> Blueprint:
|
||||
|
||||
page_num = int(page_param)
|
||||
activities, total = await services.federation.get_outbox(
|
||||
g.s, username, page=page_num, per_page=20,
|
||||
g._ap_s, username, page=page_num, per_page=20,
|
||||
origin_app=outbox_origin_app,
|
||||
)
|
||||
|
||||
@@ -383,13 +433,13 @@ def create_activitypub_blueprint(app_name: str) -> Blueprint:
|
||||
|
||||
@bp.get("/users/<username>/followers")
|
||||
async def followers(username: str):
|
||||
actor = await services.federation.get_actor_by_username(g.s, username)
|
||||
actor = await services.federation.get_actor_by_username(g._ap_s, username)
|
||||
if not actor:
|
||||
abort(404)
|
||||
|
||||
collection_id = f"https://{domain}/users/{username}/followers"
|
||||
follower_list = await services.federation.get_followers(
|
||||
g.s, username, app_domain=follower_app_domain,
|
||||
g._ap_s, username, app_domain=follower_app_domain,
|
||||
)
|
||||
page_param = request.args.get("page")
|
||||
|
||||
@@ -419,12 +469,12 @@ def create_activitypub_blueprint(app_name: str) -> Blueprint:
|
||||
|
||||
@bp.get("/users/<username>/following")
|
||||
async def following(username: str):
|
||||
actor = await services.federation.get_actor_by_username(g.s, username)
|
||||
actor = await services.federation.get_actor_by_username(g._ap_s, username)
|
||||
if not actor:
|
||||
abort(404)
|
||||
|
||||
collection_id = f"https://{domain}/users/{username}/following"
|
||||
following_list, total = await services.federation.get_following(g.s, username)
|
||||
following_list, total = await services.federation.get_following(g._ap_s, username)
|
||||
page_param = request.args.get("page")
|
||||
|
||||
if not page_param:
|
||||
|
||||
@@ -19,10 +19,56 @@ def create_ap_social_blueprint(app_name: str) -> Blueprint:
|
||||
"""Create a per-app social blueprint scoped to *app_name*."""
|
||||
bp = Blueprint("ap_social", __name__, url_prefix="/social")
|
||||
|
||||
# ------------------------------------------------------------------
|
||||
# Federation session — AP tables live in db_federation.
|
||||
# ------------------------------------------------------------------
|
||||
from shared.db.session import needs_federation_session, create_federation_session
|
||||
|
||||
@bp.before_request
|
||||
async def _open_ap_session():
|
||||
if needs_federation_session():
|
||||
sess = create_federation_session()
|
||||
g._ap_s = sess
|
||||
g._ap_tx = await sess.begin()
|
||||
g._ap_own = True
|
||||
else:
|
||||
g._ap_s = g.s
|
||||
g._ap_own = False
|
||||
|
||||
@bp.after_request
|
||||
async def _commit_ap_session(response):
|
||||
if getattr(g, "_ap_own", False):
|
||||
if 200 <= response.status_code < 400:
|
||||
try:
|
||||
await g._ap_tx.commit()
|
||||
except Exception:
|
||||
try:
|
||||
await g._ap_tx.rollback()
|
||||
except Exception:
|
||||
pass
|
||||
return response
|
||||
|
||||
@bp.teardown_request
|
||||
async def _close_ap_session(exc):
|
||||
if getattr(g, "_ap_own", False):
|
||||
s = getattr(g, "_ap_s", None)
|
||||
if s:
|
||||
if exc is not None or s.in_transaction():
|
||||
tx = getattr(g, "_ap_tx", None)
|
||||
if tx and tx.is_active:
|
||||
try:
|
||||
await tx.rollback()
|
||||
except Exception:
|
||||
pass
|
||||
try:
|
||||
await s.close()
|
||||
except Exception:
|
||||
pass
|
||||
|
||||
@bp.before_request
|
||||
async def load_actor():
|
||||
if g.get("user"):
|
||||
actor = await services.federation.get_actor_by_user_id(g.s, g.user.id)
|
||||
actor = await services.federation.get_actor_by_user_id(g._ap_s, g.user.id)
|
||||
g._social_actor = actor
|
||||
|
||||
def _require_actor():
|
||||
@@ -51,10 +97,10 @@ def create_ap_social_blueprint(app_name: str) -> Blueprint:
|
||||
total = 0
|
||||
followed_urls: set[str] = set()
|
||||
if query:
|
||||
actors, total = await services.federation.search_actors(g.s, query)
|
||||
actors, total = await services.federation.search_actors(g._ap_s, query)
|
||||
if actor:
|
||||
following, _ = await services.federation.get_following(
|
||||
g.s, actor.preferred_username, page=1, per_page=1000,
|
||||
g._ap_s, actor.preferred_username, page=1, per_page=1000,
|
||||
)
|
||||
followed_urls = {a.actor_url for a in following}
|
||||
return await render_template(
|
||||
@@ -77,11 +123,11 @@ def create_ap_social_blueprint(app_name: str) -> Blueprint:
|
||||
followed_urls: set[str] = set()
|
||||
if query:
|
||||
actors, total = await services.federation.search_actors(
|
||||
g.s, query, page=page,
|
||||
g._ap_s, query, page=page,
|
||||
)
|
||||
if actor:
|
||||
following, _ = await services.federation.get_following(
|
||||
g.s, actor.preferred_username, page=1, per_page=1000,
|
||||
g._ap_s, actor.preferred_username, page=1, per_page=1000,
|
||||
)
|
||||
followed_urls = {a.actor_url for a in following}
|
||||
return await render_template(
|
||||
@@ -103,7 +149,7 @@ def create_ap_social_blueprint(app_name: str) -> Blueprint:
|
||||
remote_actor_url = form.get("actor_url", "")
|
||||
if remote_actor_url:
|
||||
await services.federation.send_follow(
|
||||
g.s, actor.preferred_username, remote_actor_url,
|
||||
g._ap_s, actor.preferred_username, remote_actor_url,
|
||||
)
|
||||
if request.headers.get("HX-Request"):
|
||||
return await _actor_card_response(actor, remote_actor_url, is_followed=True)
|
||||
@@ -116,7 +162,7 @@ def create_ap_social_blueprint(app_name: str) -> Blueprint:
|
||||
remote_actor_url = form.get("actor_url", "")
|
||||
if remote_actor_url:
|
||||
await services.federation.unfollow(
|
||||
g.s, actor.preferred_username, remote_actor_url,
|
||||
g._ap_s, actor.preferred_username, remote_actor_url,
|
||||
)
|
||||
if request.headers.get("HX-Request"):
|
||||
return await _actor_card_response(actor, remote_actor_url, is_followed=False)
|
||||
@@ -125,7 +171,7 @@ def create_ap_social_blueprint(app_name: str) -> Blueprint:
|
||||
async def _actor_card_response(actor, remote_actor_url, is_followed):
|
||||
"""Re-render a single actor card after follow/unfollow via HTMX."""
|
||||
remote_dto = await services.federation.get_or_fetch_remote_actor(
|
||||
g.s, remote_actor_url,
|
||||
g._ap_s, remote_actor_url,
|
||||
)
|
||||
if not remote_dto:
|
||||
return Response("", status=200)
|
||||
@@ -151,10 +197,10 @@ def create_ap_social_blueprint(app_name: str) -> Blueprint:
|
||||
async def followers_list():
|
||||
actor = _require_actor()
|
||||
actors, total = await services.federation.get_followers_paginated(
|
||||
g.s, actor.preferred_username, app_domain=app_name,
|
||||
g._ap_s, actor.preferred_username, app_domain=app_name,
|
||||
)
|
||||
following, _ = await services.federation.get_following(
|
||||
g.s, actor.preferred_username, page=1, per_page=1000,
|
||||
g._ap_s, actor.preferred_username, page=1, per_page=1000,
|
||||
)
|
||||
followed_urls = {a.actor_url for a in following}
|
||||
return await render_template(
|
||||
@@ -171,10 +217,10 @@ def create_ap_social_blueprint(app_name: str) -> Blueprint:
|
||||
actor = _require_actor()
|
||||
page = request.args.get("page", 1, type=int)
|
||||
actors, total = await services.federation.get_followers_paginated(
|
||||
g.s, actor.preferred_username, page=page, app_domain=app_name,
|
||||
g._ap_s, actor.preferred_username, page=page, app_domain=app_name,
|
||||
)
|
||||
following, _ = await services.federation.get_following(
|
||||
g.s, actor.preferred_username, page=1, per_page=1000,
|
||||
g._ap_s, actor.preferred_username, page=1, per_page=1000,
|
||||
)
|
||||
followed_urls = {a.actor_url for a in following}
|
||||
return await render_template(
|
||||
@@ -193,7 +239,7 @@ def create_ap_social_blueprint(app_name: str) -> Blueprint:
|
||||
async def following_list():
|
||||
actor = _require_actor()
|
||||
actors, total = await services.federation.get_following(
|
||||
g.s, actor.preferred_username,
|
||||
g._ap_s, actor.preferred_username,
|
||||
)
|
||||
return await render_template(
|
||||
"social/following.html",
|
||||
@@ -208,7 +254,7 @@ def create_ap_social_blueprint(app_name: str) -> Blueprint:
|
||||
actor = _require_actor()
|
||||
page = request.args.get("page", 1, type=int)
|
||||
actors, total = await services.federation.get_following(
|
||||
g.s, actor.preferred_username, page=page,
|
||||
g._ap_s, actor.preferred_username, page=page,
|
||||
)
|
||||
return await render_template(
|
||||
"social/_actor_list_items.html",
|
||||
@@ -228,7 +274,7 @@ def create_ap_social_blueprint(app_name: str) -> Blueprint:
|
||||
from shared.models.federation import RemoteActor
|
||||
from sqlalchemy import select as sa_select
|
||||
remote = (
|
||||
await g.s.execute(
|
||||
await g._ap_s.execute(
|
||||
sa_select(RemoteActor).where(RemoteActor.id == id)
|
||||
)
|
||||
).scalar_one_or_none()
|
||||
@@ -236,12 +282,12 @@ def create_ap_social_blueprint(app_name: str) -> Blueprint:
|
||||
abort(404)
|
||||
from shared.services.federation_impl import _remote_actor_to_dto
|
||||
remote_dto = _remote_actor_to_dto(remote)
|
||||
items = await services.federation.get_actor_timeline(g.s, id)
|
||||
items = await services.federation.get_actor_timeline(g._ap_s, id)
|
||||
is_following = False
|
||||
if actor:
|
||||
from shared.models.federation import APFollowing
|
||||
existing = (
|
||||
await g.s.execute(
|
||||
await g._ap_s.execute(
|
||||
sa_select(APFollowing).where(
|
||||
APFollowing.actor_profile_id == actor.id,
|
||||
APFollowing.remote_actor_id == id,
|
||||
@@ -268,7 +314,7 @@ def create_ap_social_blueprint(app_name: str) -> Blueprint:
|
||||
except ValueError:
|
||||
pass
|
||||
items = await services.federation.get_actor_timeline(
|
||||
g.s, id, before=before,
|
||||
g._ap_s, id, before=before,
|
||||
)
|
||||
return await render_template(
|
||||
"social/_timeline_items.html",
|
||||
|
||||
Reference in New Issue
Block a user