Compare commits
2 Commits
bbc376aebc
...
61ad2db2f3
| Author | SHA1 | Date | |
|---|---|---|---|
|
|
61ad2db2f3 | ||
|
|
dd9cb9f5f2 |
@@ -80,6 +80,8 @@ async def send_accept(
|
||||
)
|
||||
headers["Content-Type"] = AP_CONTENT_TYPE
|
||||
|
||||
log.info("Accept payload → %s: %s", follower_inbox, json.dumps(accept)[:500])
|
||||
|
||||
try:
|
||||
async with httpx.AsyncClient(timeout=15) as client:
|
||||
resp = await client.post(
|
||||
@@ -87,7 +89,7 @@ async def send_accept(
|
||||
content=body_bytes,
|
||||
headers=headers,
|
||||
)
|
||||
log.info("Accept → %s: %d", follower_inbox, resp.status_code)
|
||||
log.info("Accept → %s: %d %s", follower_inbox, resp.status_code, resp.text[:200])
|
||||
except Exception:
|
||||
log.exception("Failed to send Accept to %s", follower_inbox)
|
||||
|
||||
@@ -99,7 +101,11 @@ async def backfill_follower(
|
||||
domain: str,
|
||||
origin_app: str | None = None,
|
||||
) -> None:
|
||||
"""Deliver recent Create activities to a new follower's inbox."""
|
||||
"""Deliver recent *current* Create activities to a new follower's inbox.
|
||||
|
||||
Skips Creates whose source was later Deleted, and uses the latest
|
||||
Update data when available (so the follower sees the current version).
|
||||
"""
|
||||
from shared.events.handlers.ap_delivery_handler import (
|
||||
_build_activity_json, _deliver_to_inbox,
|
||||
)
|
||||
@@ -108,18 +114,68 @@ async def backfill_follower(
|
||||
APActivity.actor_profile_id == actor.id,
|
||||
APActivity.is_local == True, # noqa: E712
|
||||
APActivity.activity_type == "Create",
|
||||
APActivity.source_type.isnot(None),
|
||||
APActivity.source_id.isnot(None),
|
||||
]
|
||||
if origin_app is not None:
|
||||
filters.append(APActivity.origin_app == origin_app)
|
||||
|
||||
activities = (
|
||||
creates = (
|
||||
await session.execute(
|
||||
select(APActivity).where(*filters)
|
||||
.order_by(APActivity.published.desc())
|
||||
.limit(20)
|
||||
.limit(40)
|
||||
)
|
||||
).scalars().all()
|
||||
|
||||
if not creates:
|
||||
return
|
||||
|
||||
# Collect source keys that have been Deleted
|
||||
source_keys = {(c.source_type, c.source_id) for c in creates}
|
||||
deleted_keys: set[tuple[str | None, int | None]] = set()
|
||||
if source_keys:
|
||||
deletes = (
|
||||
await session.execute(
|
||||
select(APActivity.source_type, APActivity.source_id).where(
|
||||
APActivity.actor_profile_id == actor.id,
|
||||
APActivity.activity_type == "Delete",
|
||||
APActivity.is_local == True, # noqa: E712
|
||||
)
|
||||
)
|
||||
).all()
|
||||
deleted_keys = {(d[0], d[1]) for d in deletes}
|
||||
|
||||
# For sources with Updates, grab the latest Update's object_data
|
||||
updated_data: dict[tuple[str | None, int | None], dict] = {}
|
||||
if source_keys:
|
||||
updates = (
|
||||
await session.execute(
|
||||
select(APActivity).where(
|
||||
APActivity.actor_profile_id == actor.id,
|
||||
APActivity.activity_type == "Update",
|
||||
APActivity.is_local == True, # noqa: E712
|
||||
).order_by(APActivity.published.desc())
|
||||
)
|
||||
).scalars().all()
|
||||
for u in updates:
|
||||
key = (u.source_type, u.source_id)
|
||||
if key not in updated_data and key in source_keys:
|
||||
updated_data[key] = u.object_data or {}
|
||||
|
||||
# Filter to current, non-deleted Creates (limit 20)
|
||||
activities = []
|
||||
for c in creates:
|
||||
key = (c.source_type, c.source_id)
|
||||
if key in deleted_keys:
|
||||
continue
|
||||
# Apply latest Update data if available
|
||||
if key in updated_data:
|
||||
c.object_data = updated_data[key]
|
||||
activities.append(c)
|
||||
if len(activities) >= 20:
|
||||
break
|
||||
|
||||
if not activities:
|
||||
return
|
||||
|
||||
|
||||
Reference in New Issue
Block a user