Add federation event handlers, AP delivery, and anchoring
Phase 3-5 of ActivityPub integration: - Federation handlers: post.published, calendar_entry.created, product.listed → publish_activity() for AP outbox - AP delivery handler: federation.activity_created → sign + POST to follower inboxes with HTTP Signatures - IPFS storage wired into publish_activity() (best-effort) - Anchoring utility: merkle trees + OpenTimestamps Bitcoin timestamping Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
This commit is contained in:
@@ -6,3 +6,5 @@ def register_shared_handlers():
|
|||||||
import shared.events.handlers.container_handlers # noqa: F401
|
import shared.events.handlers.container_handlers # noqa: F401
|
||||||
import shared.events.handlers.login_handlers # noqa: F401
|
import shared.events.handlers.login_handlers # noqa: F401
|
||||||
import shared.events.handlers.order_handlers # noqa: F401
|
import shared.events.handlers.order_handlers # noqa: F401
|
||||||
|
import shared.events.handlers.federation_handlers # noqa: F401
|
||||||
|
import shared.events.handlers.ap_delivery_handler # noqa: F401
|
||||||
|
|||||||
150
events/handlers/ap_delivery_handler.py
Normal file
150
events/handlers/ap_delivery_handler.py
Normal file
@@ -0,0 +1,150 @@
|
|||||||
|
"""Deliver AP activities to remote followers.
|
||||||
|
|
||||||
|
On ``federation.activity_created`` → load activity + actor + followers →
|
||||||
|
sign with HTTP Signatures → POST to each follower inbox.
|
||||||
|
"""
|
||||||
|
from __future__ import annotations
|
||||||
|
|
||||||
|
import logging
|
||||||
|
|
||||||
|
import httpx
|
||||||
|
from sqlalchemy import select
|
||||||
|
from sqlalchemy.ext.asyncio import AsyncSession
|
||||||
|
|
||||||
|
from shared.events.bus import register_handler, DomainEvent
|
||||||
|
from shared.models.federation import ActorProfile, APActivity, APFollower
|
||||||
|
from shared.services.registry import services
|
||||||
|
|
||||||
|
log = logging.getLogger(__name__)
|
||||||
|
|
||||||
|
AP_CONTENT_TYPE = "application/activity+json"
|
||||||
|
DELIVERY_TIMEOUT = 15 # seconds per request
|
||||||
|
|
||||||
|
|
||||||
|
def _build_activity_json(activity: APActivity, actor: ActorProfile, domain: str) -> dict:
|
||||||
|
"""Build the full AP activity JSON-LD for delivery."""
|
||||||
|
username = actor.preferred_username
|
||||||
|
actor_url = f"https://{domain}/users/{username}"
|
||||||
|
|
||||||
|
obj = dict(activity.object_data or {})
|
||||||
|
obj.setdefault("id", activity.activity_id + "/object")
|
||||||
|
obj.setdefault("type", activity.object_type)
|
||||||
|
obj.setdefault("attributedTo", actor_url)
|
||||||
|
obj.setdefault("published", activity.published.isoformat() if activity.published else None)
|
||||||
|
|
||||||
|
return {
|
||||||
|
"@context": "https://www.w3.org/ns/activitystreams",
|
||||||
|
"id": activity.activity_id,
|
||||||
|
"type": activity.activity_type,
|
||||||
|
"actor": actor_url,
|
||||||
|
"published": activity.published.isoformat() if activity.published else None,
|
||||||
|
"to": ["https://www.w3.org/ns/activitystreams#Public"],
|
||||||
|
"cc": [f"{actor_url}/followers"],
|
||||||
|
"object": obj,
|
||||||
|
}
|
||||||
|
|
||||||
|
|
||||||
|
async def _deliver_to_inbox(
|
||||||
|
client: httpx.AsyncClient,
|
||||||
|
inbox_url: str,
|
||||||
|
body: dict,
|
||||||
|
actor: ActorProfile,
|
||||||
|
domain: str,
|
||||||
|
) -> bool:
|
||||||
|
"""POST signed activity to a single inbox. Returns True on success."""
|
||||||
|
from shared.utils.http_signatures import sign_request
|
||||||
|
import json
|
||||||
|
|
||||||
|
body_bytes = json.dumps(body).encode()
|
||||||
|
key_id = f"https://{domain}/users/{actor.preferred_username}#main-key"
|
||||||
|
|
||||||
|
headers = sign_request(
|
||||||
|
method="POST",
|
||||||
|
url=inbox_url,
|
||||||
|
body=body_bytes,
|
||||||
|
private_key_pem=actor.private_key_pem,
|
||||||
|
key_id=key_id,
|
||||||
|
)
|
||||||
|
headers["Content-Type"] = AP_CONTENT_TYPE
|
||||||
|
|
||||||
|
try:
|
||||||
|
resp = await client.post(
|
||||||
|
inbox_url,
|
||||||
|
content=body_bytes,
|
||||||
|
headers=headers,
|
||||||
|
timeout=DELIVERY_TIMEOUT,
|
||||||
|
)
|
||||||
|
if resp.status_code < 300:
|
||||||
|
log.info("Delivered to %s → %d", inbox_url, resp.status_code)
|
||||||
|
return True
|
||||||
|
else:
|
||||||
|
log.warning("Delivery to %s → %d: %s", inbox_url, resp.status_code, resp.text[:200])
|
||||||
|
return False
|
||||||
|
except Exception:
|
||||||
|
log.exception("Delivery failed for %s", inbox_url)
|
||||||
|
return False
|
||||||
|
|
||||||
|
|
||||||
|
async def on_activity_created(event: DomainEvent, session: AsyncSession) -> None:
|
||||||
|
"""Deliver a newly created activity to all followers."""
|
||||||
|
import os
|
||||||
|
|
||||||
|
if not services.has("federation"):
|
||||||
|
return
|
||||||
|
|
||||||
|
payload = event.payload
|
||||||
|
activity_id_uri = payload.get("activity_id")
|
||||||
|
if not activity_id_uri:
|
||||||
|
return
|
||||||
|
|
||||||
|
domain = os.getenv("AP_DOMAIN", "rose-ash.com")
|
||||||
|
|
||||||
|
# Load the activity
|
||||||
|
activity = (
|
||||||
|
await session.execute(
|
||||||
|
select(APActivity).where(APActivity.activity_id == activity_id_uri)
|
||||||
|
)
|
||||||
|
).scalar_one_or_none()
|
||||||
|
if not activity:
|
||||||
|
log.warning("Activity not found: %s", activity_id_uri)
|
||||||
|
return
|
||||||
|
|
||||||
|
# Load actor with private key
|
||||||
|
actor = (
|
||||||
|
await session.execute(
|
||||||
|
select(ActorProfile).where(ActorProfile.id == activity.actor_profile_id)
|
||||||
|
)
|
||||||
|
).scalar_one_or_none()
|
||||||
|
if not actor or not actor.private_key_pem:
|
||||||
|
log.warning("Actor not found or missing key for activity %s", activity_id_uri)
|
||||||
|
return
|
||||||
|
|
||||||
|
# Load followers
|
||||||
|
followers = (
|
||||||
|
await session.execute(
|
||||||
|
select(APFollower).where(APFollower.actor_profile_id == actor.id)
|
||||||
|
)
|
||||||
|
).scalars().all()
|
||||||
|
|
||||||
|
if not followers:
|
||||||
|
log.debug("No followers to deliver to for %s", activity_id_uri)
|
||||||
|
return
|
||||||
|
|
||||||
|
# Build activity JSON
|
||||||
|
activity_json = _build_activity_json(activity, actor, domain)
|
||||||
|
|
||||||
|
# Deliver to each follower inbox
|
||||||
|
# Deduplicate inboxes (multiple followers might share a shared inbox)
|
||||||
|
inboxes = {f.follower_inbox for f in followers if f.follower_inbox}
|
||||||
|
|
||||||
|
log.info(
|
||||||
|
"Delivering %s to %d inbox(es) for @%s",
|
||||||
|
activity.activity_type, len(inboxes), actor.preferred_username,
|
||||||
|
)
|
||||||
|
|
||||||
|
async with httpx.AsyncClient() as client:
|
||||||
|
for inbox_url in inboxes:
|
||||||
|
await _deliver_to_inbox(client, inbox_url, activity_json, actor, domain)
|
||||||
|
|
||||||
|
|
||||||
|
register_handler("federation.activity_created", on_activity_created)
|
||||||
167
events/handlers/federation_handlers.py
Normal file
167
events/handlers/federation_handlers.py
Normal file
@@ -0,0 +1,167 @@
|
|||||||
|
"""Federation event handlers — publish domain content as AP activities.
|
||||||
|
|
||||||
|
Listens for content events and calls services.federation.publish_activity()
|
||||||
|
to create AP activities. Each handler checks:
|
||||||
|
1. services.has("federation") — skip if federation not wired
|
||||||
|
2. The content has a user_id — skip anonymous/system content
|
||||||
|
3. The user has an ActorProfile — skip users who haven't chosen a username
|
||||||
|
"""
|
||||||
|
from __future__ import annotations
|
||||||
|
|
||||||
|
import logging
|
||||||
|
|
||||||
|
from sqlalchemy.ext.asyncio import AsyncSession
|
||||||
|
|
||||||
|
from shared.events.bus import register_handler, DomainEvent
|
||||||
|
from shared.services.registry import services
|
||||||
|
|
||||||
|
log = logging.getLogger(__name__)
|
||||||
|
|
||||||
|
|
||||||
|
async def _try_publish(
|
||||||
|
session: AsyncSession,
|
||||||
|
*,
|
||||||
|
user_id: int | None,
|
||||||
|
activity_type: str,
|
||||||
|
object_type: str,
|
||||||
|
object_data: dict,
|
||||||
|
source_type: str,
|
||||||
|
source_id: int,
|
||||||
|
) -> None:
|
||||||
|
"""Publish an AP activity if federation is available and user has a profile."""
|
||||||
|
if not services.has("federation"):
|
||||||
|
return
|
||||||
|
|
||||||
|
if not user_id:
|
||||||
|
return
|
||||||
|
|
||||||
|
# Check user has an ActorProfile (chose a username)
|
||||||
|
actor = await services.federation.get_actor_by_user_id(session, user_id)
|
||||||
|
if not actor:
|
||||||
|
return
|
||||||
|
|
||||||
|
# Don't re-publish if we already have an activity for this source
|
||||||
|
existing = await services.federation.get_activity_for_source(
|
||||||
|
session, source_type, source_id,
|
||||||
|
)
|
||||||
|
if existing and activity_type == "Create":
|
||||||
|
return # Already published
|
||||||
|
|
||||||
|
try:
|
||||||
|
await services.federation.publish_activity(
|
||||||
|
session,
|
||||||
|
actor_user_id=user_id,
|
||||||
|
activity_type=activity_type,
|
||||||
|
object_type=object_type,
|
||||||
|
object_data=object_data,
|
||||||
|
source_type=source_type,
|
||||||
|
source_id=source_id,
|
||||||
|
)
|
||||||
|
log.info(
|
||||||
|
"Published %s/%s for %s#%d by user %d",
|
||||||
|
activity_type, object_type, source_type, source_id, user_id,
|
||||||
|
)
|
||||||
|
except Exception:
|
||||||
|
log.exception("Failed to publish activity for %s#%d", source_type, source_id)
|
||||||
|
|
||||||
|
|
||||||
|
# -- Post published/updated (from Ghost webhook sync) -------------------------
|
||||||
|
|
||||||
|
async def on_post_published(event: DomainEvent, session: AsyncSession) -> None:
|
||||||
|
p = event.payload
|
||||||
|
await _try_publish(
|
||||||
|
session,
|
||||||
|
user_id=p.get("user_id"),
|
||||||
|
activity_type="Create",
|
||||||
|
object_type="Article",
|
||||||
|
object_data={
|
||||||
|
"name": p.get("title", ""),
|
||||||
|
"content": p.get("excerpt", ""),
|
||||||
|
"url": p.get("url", ""),
|
||||||
|
},
|
||||||
|
source_type="Post",
|
||||||
|
source_id=event.aggregate_id,
|
||||||
|
)
|
||||||
|
|
||||||
|
|
||||||
|
async def on_post_updated(event: DomainEvent, session: AsyncSession) -> None:
|
||||||
|
p = event.payload
|
||||||
|
await _try_publish(
|
||||||
|
session,
|
||||||
|
user_id=p.get("user_id"),
|
||||||
|
activity_type="Update",
|
||||||
|
object_type="Article",
|
||||||
|
object_data={
|
||||||
|
"name": p.get("title", ""),
|
||||||
|
"content": p.get("excerpt", ""),
|
||||||
|
"url": p.get("url", ""),
|
||||||
|
},
|
||||||
|
source_type="Post",
|
||||||
|
source_id=event.aggregate_id,
|
||||||
|
)
|
||||||
|
|
||||||
|
|
||||||
|
# -- Calendar entry created/updated -------------------------------------------
|
||||||
|
|
||||||
|
async def on_calendar_entry_created(event: DomainEvent, session: AsyncSession) -> None:
|
||||||
|
p = event.payload
|
||||||
|
await _try_publish(
|
||||||
|
session,
|
||||||
|
user_id=p.get("user_id"),
|
||||||
|
activity_type="Create",
|
||||||
|
object_type="Event",
|
||||||
|
object_data={
|
||||||
|
"name": p.get("title", ""),
|
||||||
|
"startTime": p.get("start_time", ""),
|
||||||
|
"endTime": p.get("end_time", ""),
|
||||||
|
"url": p.get("url", ""),
|
||||||
|
},
|
||||||
|
source_type="CalendarEntry",
|
||||||
|
source_id=event.aggregate_id,
|
||||||
|
)
|
||||||
|
|
||||||
|
|
||||||
|
async def on_calendar_entry_updated(event: DomainEvent, session: AsyncSession) -> None:
|
||||||
|
p = event.payload
|
||||||
|
await _try_publish(
|
||||||
|
session,
|
||||||
|
user_id=p.get("user_id"),
|
||||||
|
activity_type="Update",
|
||||||
|
object_type="Event",
|
||||||
|
object_data={
|
||||||
|
"name": p.get("title", ""),
|
||||||
|
"startTime": p.get("start_time", ""),
|
||||||
|
"endTime": p.get("end_time", ""),
|
||||||
|
"url": p.get("url", ""),
|
||||||
|
},
|
||||||
|
source_type="CalendarEntry",
|
||||||
|
source_id=event.aggregate_id,
|
||||||
|
)
|
||||||
|
|
||||||
|
|
||||||
|
# -- Product listed/updated ---------------------------------------------------
|
||||||
|
|
||||||
|
async def on_product_listed(event: DomainEvent, session: AsyncSession) -> None:
|
||||||
|
p = event.payload
|
||||||
|
await _try_publish(
|
||||||
|
session,
|
||||||
|
user_id=p.get("user_id"),
|
||||||
|
activity_type="Create",
|
||||||
|
object_type="Object",
|
||||||
|
object_data={
|
||||||
|
"name": p.get("title", ""),
|
||||||
|
"summary": p.get("description", ""),
|
||||||
|
"url": p.get("url", ""),
|
||||||
|
},
|
||||||
|
source_type="Product",
|
||||||
|
source_id=event.aggregate_id,
|
||||||
|
)
|
||||||
|
|
||||||
|
|
||||||
|
# -- Registration --------------------------------------------------------------
|
||||||
|
|
||||||
|
register_handler("post.published", on_post_published)
|
||||||
|
register_handler("post.updated", on_post_updated)
|
||||||
|
register_handler("calendar_entry.created", on_calendar_entry_created)
|
||||||
|
register_handler("calendar_entry.updated", on_calendar_entry_updated)
|
||||||
|
register_handler("product.listed", on_product_listed)
|
||||||
@@ -158,7 +158,28 @@ class SqlFederationService:
|
|||||||
session.add(activity)
|
session.add(activity)
|
||||||
await session.flush()
|
await session.flush()
|
||||||
|
|
||||||
# Emit domain event for downstream processing (IPFS storage, delivery)
|
# Store activity JSON on IPFS (best-effort — don't fail publish if IPFS down)
|
||||||
|
try:
|
||||||
|
from shared.utils.ipfs_client import add_json, is_available
|
||||||
|
if await is_available():
|
||||||
|
activity_json = {
|
||||||
|
"@context": "https://www.w3.org/ns/activitystreams",
|
||||||
|
"id": activity_uri,
|
||||||
|
"type": activity_type,
|
||||||
|
"actor": f"https://{domain}/users/{username}",
|
||||||
|
"published": now.isoformat(),
|
||||||
|
"object": {
|
||||||
|
"type": object_type,
|
||||||
|
**object_data,
|
||||||
|
},
|
||||||
|
}
|
||||||
|
cid = await add_json(activity_json)
|
||||||
|
activity.ipfs_cid = cid
|
||||||
|
await session.flush()
|
||||||
|
except Exception:
|
||||||
|
pass # IPFS failure is non-fatal
|
||||||
|
|
||||||
|
# Emit domain event for downstream processing (delivery)
|
||||||
from shared.events import emit_event
|
from shared.events import emit_event
|
||||||
await emit_event(
|
await emit_event(
|
||||||
session,
|
session,
|
||||||
|
|||||||
236
utils/anchoring.py
Normal file
236
utils/anchoring.py
Normal file
@@ -0,0 +1,236 @@
|
|||||||
|
"""Merkle tree construction and OpenTimestamps anchoring.
|
||||||
|
|
||||||
|
Ported from ~/art-dag/activity-pub/anchoring.py.
|
||||||
|
Builds a SHA256 merkle tree from activity IDs, submits the root to
|
||||||
|
OpenTimestamps for Bitcoin timestamping, and stores the tree + proof on IPFS.
|
||||||
|
"""
|
||||||
|
from __future__ import annotations
|
||||||
|
|
||||||
|
import hashlib
|
||||||
|
import logging
|
||||||
|
from datetime import datetime, timezone
|
||||||
|
|
||||||
|
import httpx
|
||||||
|
from sqlalchemy import select, func
|
||||||
|
from sqlalchemy.ext.asyncio import AsyncSession
|
||||||
|
|
||||||
|
from shared.models.federation import APActivity, APAnchor
|
||||||
|
|
||||||
|
log = logging.getLogger(__name__)
|
||||||
|
|
||||||
|
OTS_SERVERS = [
|
||||||
|
"https://a.pool.opentimestamps.org",
|
||||||
|
"https://b.pool.opentimestamps.org",
|
||||||
|
"https://a.pool.eternitywall.com",
|
||||||
|
]
|
||||||
|
|
||||||
|
|
||||||
|
def _sha256(data: str | bytes) -> str:
|
||||||
|
"""SHA256 hex digest."""
|
||||||
|
if isinstance(data, str):
|
||||||
|
data = data.encode()
|
||||||
|
return hashlib.sha256(data).hexdigest()
|
||||||
|
|
||||||
|
|
||||||
|
def build_merkle_tree(items: list[str]) -> dict:
|
||||||
|
"""Build a SHA256 merkle tree from a list of strings (activity IDs).
|
||||||
|
|
||||||
|
Returns:
|
||||||
|
{
|
||||||
|
"root": hex_str,
|
||||||
|
"leaves": [hex_str, ...],
|
||||||
|
"levels": [[hex_str, ...], ...], # bottom-up
|
||||||
|
}
|
||||||
|
"""
|
||||||
|
if not items:
|
||||||
|
raise ValueError("Cannot build merkle tree from empty list")
|
||||||
|
|
||||||
|
# Sort for deterministic ordering
|
||||||
|
items = sorted(items)
|
||||||
|
|
||||||
|
# Leaf hashes
|
||||||
|
leaves = [_sha256(item) for item in items]
|
||||||
|
levels = [leaves[:]]
|
||||||
|
|
||||||
|
current = leaves[:]
|
||||||
|
while len(current) > 1:
|
||||||
|
next_level = []
|
||||||
|
for i in range(0, len(current), 2):
|
||||||
|
left = current[i]
|
||||||
|
right = current[i + 1] if i + 1 < len(current) else left
|
||||||
|
combined = _sha256(left + right)
|
||||||
|
next_level.append(combined)
|
||||||
|
levels.append(next_level)
|
||||||
|
current = next_level
|
||||||
|
|
||||||
|
return {
|
||||||
|
"root": current[0],
|
||||||
|
"leaves": leaves,
|
||||||
|
"levels": levels,
|
||||||
|
}
|
||||||
|
|
||||||
|
|
||||||
|
def get_merkle_proof(tree: dict, item: str) -> list[dict] | None:
|
||||||
|
"""Generate a proof-of-membership for an item.
|
||||||
|
|
||||||
|
Returns a list of {sibling: hex, position: "left"|"right"} dicts,
|
||||||
|
or None if the item is not in the tree.
|
||||||
|
"""
|
||||||
|
item_hash = _sha256(item)
|
||||||
|
leaves = tree["leaves"]
|
||||||
|
|
||||||
|
try:
|
||||||
|
idx = leaves.index(item_hash)
|
||||||
|
except ValueError:
|
||||||
|
return None
|
||||||
|
|
||||||
|
proof = []
|
||||||
|
for level in tree["levels"][:-1]: # skip root level
|
||||||
|
if idx % 2 == 0:
|
||||||
|
sibling_idx = idx + 1
|
||||||
|
position = "right"
|
||||||
|
else:
|
||||||
|
sibling_idx = idx - 1
|
||||||
|
position = "left"
|
||||||
|
|
||||||
|
if sibling_idx < len(level):
|
||||||
|
proof.append({"sibling": level[sibling_idx], "position": position})
|
||||||
|
else:
|
||||||
|
proof.append({"sibling": level[idx], "position": position})
|
||||||
|
|
||||||
|
idx = idx // 2
|
||||||
|
|
||||||
|
return proof
|
||||||
|
|
||||||
|
|
||||||
|
def verify_merkle_proof(item: str, proof: list[dict], root: str) -> bool:
|
||||||
|
"""Verify a merkle proof against a root hash."""
|
||||||
|
current = _sha256(item)
|
||||||
|
for step in proof:
|
||||||
|
sibling = step["sibling"]
|
||||||
|
if step["position"] == "right":
|
||||||
|
current = _sha256(current + sibling)
|
||||||
|
else:
|
||||||
|
current = _sha256(sibling + current)
|
||||||
|
return current == root
|
||||||
|
|
||||||
|
|
||||||
|
async def submit_to_opentimestamps(merkle_root: str) -> bytes | None:
|
||||||
|
"""Submit a hash to OpenTimestamps. Returns the (incomplete) OTS proof bytes."""
|
||||||
|
root_bytes = bytes.fromhex(merkle_root)
|
||||||
|
|
||||||
|
for server in OTS_SERVERS:
|
||||||
|
try:
|
||||||
|
async with httpx.AsyncClient(timeout=30) as client:
|
||||||
|
resp = await client.post(
|
||||||
|
f"{server}/digest",
|
||||||
|
content=root_bytes,
|
||||||
|
headers={"Content-Type": "application/x-opentimestamps"},
|
||||||
|
)
|
||||||
|
if resp.status_code == 200:
|
||||||
|
log.info("OTS proof obtained from %s", server)
|
||||||
|
return resp.content
|
||||||
|
except Exception:
|
||||||
|
log.debug("OTS server %s failed", server, exc_info=True)
|
||||||
|
continue
|
||||||
|
|
||||||
|
log.warning("All OTS servers failed for root %s", merkle_root)
|
||||||
|
return None
|
||||||
|
|
||||||
|
|
||||||
|
async def upgrade_ots_proof(proof_bytes: bytes) -> tuple[bytes, bool]:
|
||||||
|
"""Try to upgrade an incomplete OTS proof to a Bitcoin-confirmed one.
|
||||||
|
|
||||||
|
Returns (proof_bytes, is_confirmed). The proof_bytes may be updated.
|
||||||
|
"""
|
||||||
|
# OpenTimestamps upgrade is done via the `ots` CLI or their calendar API.
|
||||||
|
# For now, return the proof as-is with is_confirmed=False.
|
||||||
|
# TODO: Implement calendar-based upgrade polling.
|
||||||
|
return proof_bytes, False
|
||||||
|
|
||||||
|
|
||||||
|
async def create_anchor(
|
||||||
|
session: AsyncSession,
|
||||||
|
batch_size: int = 100,
|
||||||
|
) -> APAnchor | None:
|
||||||
|
"""Anchor a batch of un-anchored activities.
|
||||||
|
|
||||||
|
1. Select activities without an anchor_id
|
||||||
|
2. Build merkle tree from their activity_ids
|
||||||
|
3. Store tree on IPFS
|
||||||
|
4. Submit root to OpenTimestamps
|
||||||
|
5. Store OTS proof on IPFS
|
||||||
|
6. Create APAnchor record
|
||||||
|
7. Link activities to anchor
|
||||||
|
"""
|
||||||
|
# Find un-anchored activities
|
||||||
|
result = await session.execute(
|
||||||
|
select(APActivity)
|
||||||
|
.where(
|
||||||
|
APActivity.anchor_id.is_(None),
|
||||||
|
APActivity.is_local == True, # noqa: E712
|
||||||
|
)
|
||||||
|
.order_by(APActivity.created_at.asc())
|
||||||
|
.limit(batch_size)
|
||||||
|
)
|
||||||
|
activities = result.scalars().all()
|
||||||
|
|
||||||
|
if not activities:
|
||||||
|
log.debug("No un-anchored activities to process")
|
||||||
|
return None
|
||||||
|
|
||||||
|
activity_ids = [a.activity_id for a in activities]
|
||||||
|
log.info("Anchoring %d activities", len(activity_ids))
|
||||||
|
|
||||||
|
# Build merkle tree
|
||||||
|
tree = build_merkle_tree(activity_ids)
|
||||||
|
merkle_root = tree["root"]
|
||||||
|
|
||||||
|
# Store tree on IPFS
|
||||||
|
tree_cid = None
|
||||||
|
ots_proof_cid = None
|
||||||
|
try:
|
||||||
|
from shared.utils.ipfs_client import add_json, add_bytes, is_available
|
||||||
|
if await is_available():
|
||||||
|
tree_cid = await add_json({
|
||||||
|
"root": merkle_root,
|
||||||
|
"leaves": tree["leaves"],
|
||||||
|
"activity_ids": activity_ids,
|
||||||
|
"created_at": datetime.now(timezone.utc).isoformat(),
|
||||||
|
})
|
||||||
|
log.info("Merkle tree stored on IPFS: %s", tree_cid)
|
||||||
|
except Exception:
|
||||||
|
log.exception("IPFS tree storage failed")
|
||||||
|
|
||||||
|
# Submit to OpenTimestamps
|
||||||
|
ots_proof = await submit_to_opentimestamps(merkle_root)
|
||||||
|
if ots_proof:
|
||||||
|
try:
|
||||||
|
from shared.utils.ipfs_client import add_bytes, is_available
|
||||||
|
if await is_available():
|
||||||
|
ots_proof_cid = await add_bytes(ots_proof)
|
||||||
|
log.info("OTS proof stored on IPFS: %s", ots_proof_cid)
|
||||||
|
except Exception:
|
||||||
|
log.exception("IPFS OTS proof storage failed")
|
||||||
|
|
||||||
|
# Create anchor record
|
||||||
|
anchor = APAnchor(
|
||||||
|
merkle_root=merkle_root,
|
||||||
|
tree_ipfs_cid=tree_cid,
|
||||||
|
ots_proof_cid=ots_proof_cid,
|
||||||
|
activity_count=len(activities),
|
||||||
|
)
|
||||||
|
session.add(anchor)
|
||||||
|
await session.flush()
|
||||||
|
|
||||||
|
# Link activities to anchor
|
||||||
|
for a in activities:
|
||||||
|
a.anchor_id = anchor.id
|
||||||
|
await session.flush()
|
||||||
|
|
||||||
|
log.info(
|
||||||
|
"Anchor created: root=%s, activities=%d, tree_cid=%s",
|
||||||
|
merkle_root, len(activities), tree_cid,
|
||||||
|
)
|
||||||
|
|
||||||
|
return anchor
|
||||||
Reference in New Issue
Block a user