Fix MultipleResultsFound crash in get_activity_for_source

- Use .scalars().first() + LIMIT 1 instead of scalar_one_or_none()
  which crashes when multiple activities exist for the same source
- Allow re-Create after Delete (re-publish after unpublish)
- Add missing on_post_unpublished handler to root shared copy
- Sync add_follower upsert fix to root shared

Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
This commit is contained in:
giles
2026-02-21 23:49:19 +00:00
parent 18410c4b16
commit 798fe56165
2 changed files with 10 additions and 14 deletions

View File

@@ -30,28 +30,23 @@ async def _try_publish(
) -> None: ) -> None:
"""Publish an AP activity if federation is available and user has a profile.""" """Publish an AP activity if federation is available and user has a profile."""
if not services.has("federation"): if not services.has("federation"):
log.warning("_try_publish: no federation service")
return return
if not user_id: if not user_id:
log.warning("_try_publish: no user_id for %s#%s", source_type, source_id)
return return
# Check user has an ActorProfile (chose a username) # Check user has an ActorProfile (chose a username)
actor = await services.federation.get_actor_by_user_id(session, user_id) actor = await services.federation.get_actor_by_user_id(session, user_id)
if not actor: if not actor:
log.warning("_try_publish: no ActorProfile for user_id=%s", user_id)
return return
# Don't re-publish if we already have an activity for this source # Don't re-publish if we already have a live activity for this source
existing = await services.federation.get_activity_for_source( existing = await services.federation.get_activity_for_source(
session, source_type, source_id, session, source_type, source_id,
) )
if existing and activity_type == "Create": if existing and activity_type == "Create" and existing.activity_type != "Delete":
log.warning("_try_publish: already published %s#%s", source_type, source_id) return # Already published (allow re-Create after Delete/unpublish)
return # Already published
log.warning("_try_publish: publishing %s/%s for %s#%d user=%s", activity_type, object_type, source_type, source_id, user_id)
try: try:
await services.federation.publish_activity( await services.federation.publish_activity(
session, session,
@@ -62,7 +57,7 @@ async def _try_publish(
source_type=source_type, source_type=source_type,
source_id=source_id, source_id=source_id,
) )
log.warning( log.info(
"Published %s/%s for %s#%d by user %d", "Published %s/%s for %s#%d by user %d",
activity_type, object_type, source_type, source_id, user_id, activity_type, object_type, source_type, source_id, user_id,
) )
@@ -178,12 +173,12 @@ async def on_post_unpublished(event: DomainEvent, session: AsyncSession) -> None
if not actor: if not actor:
return return
# Find the original Create activity for this post # Find the original activity for this post
existing = await services.federation.get_activity_for_source( existing = await services.federation.get_activity_for_source(
session, "Post", event.aggregate_id, session, "Post", event.aggregate_id,
) )
if not existing: if not existing or existing.activity_type == "Delete":
return # Never published to federation, nothing to delete return # Never published or already deleted
try: try:
await services.federation.publish_activity( await services.federation.publish_activity(
@@ -198,7 +193,7 @@ async def on_post_unpublished(event: DomainEvent, session: AsyncSession) -> None
source_type="Post", source_type="Post",
source_id=event.aggregate_id, source_id=event.aggregate_id,
) )
log.warning("Published Delete for Post#%d", event.aggregate_id) log.info("Published Delete for Post#%d", event.aggregate_id)
except Exception: except Exception:
log.exception("Failed to publish Delete for Post#%d", event.aggregate_id) log.exception("Failed to publish Delete for Post#%d", event.aggregate_id)

View File

@@ -251,8 +251,9 @@ class SqlFederationService:
APActivity.source_type == source_type, APActivity.source_type == source_type,
APActivity.source_id == source_id, APActivity.source_id == source_id,
).order_by(APActivity.created_at.desc()) ).order_by(APActivity.created_at.desc())
.limit(1)
) )
).scalar_one_or_none() ).scalars().first()
return _activity_to_dto(a) if a else None return _activity_to_dto(a) if a else None
# -- Followers ------------------------------------------------------------ # -- Followers ------------------------------------------------------------