Files
rose-ash/sx/sxc/pages/pub_helpers.py
giles d12f38a9d5 sx-pub Phase 4: anchoring — Merkle trees, OpenTimestamps, verification
New endpoints:
- POST /pub/anchor — batch unanchored Publish activities into Merkle tree,
  pin tree to IPFS, submit root to OpenTimestamps, store OTS proof on IPFS
- GET /pub/verify/<cid> — verify a CID's Merkle proof against anchored tree

Uses existing shared/utils/anchoring.py infrastructure:
- build_merkle_tree (SHA256, deterministic sort)
- get_merkle_proof / verify_merkle_proof (inclusion proofs)
- submit_to_opentimestamps (3 calendar servers with fallback)

Tested: anchored 1 activity, Merkle tree + OTS proof pinned to IPFS,
verification returns :verified true with full proof chain.

Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
2026-03-25 02:12:40 +00:00

942 lines
30 KiB
Python

"""sx-pub Python IO helpers — actor, IPFS, collections, publishing, federation.
These are called from SX defhandlers via (helper "pub-..." args...).
All DB access uses g.s (per-request async session from register_db).
"""
from __future__ import annotations
import hashlib
import logging
import os
from datetime import datetime, timezone
from typing import Any
logger = logging.getLogger("sx.pub")
SX_PUB_DOMAIN = os.getenv("SX_PUB_DOMAIN", "pub.sx-web.org")
async def get_or_create_actor() -> dict[str, Any]:
"""Get or create the singleton sx-pub actor. Auto-generates RSA keypair."""
from quart import g
from sqlalchemy import select
from shared.models.sx_pub import SxPubActor
result = await g.s.execute(
select(SxPubActor).where(SxPubActor.preferred_username == "sx")
)
actor = result.scalar_one_or_none()
if actor is None:
from cryptography.hazmat.primitives.asymmetric import rsa
from cryptography.hazmat.primitives import serialization
private_key = rsa.generate_private_key(
public_exponent=65537, key_size=2048,
)
private_pem = private_key.private_bytes(
encoding=serialization.Encoding.PEM,
format=serialization.PrivateFormat.PKCS8,
encryption_algorithm=serialization.NoEncryption(),
).decode("utf-8")
public_pem = private_key.public_key().public_bytes(
encoding=serialization.Encoding.PEM,
format=serialization.PublicFormat.SubjectPublicKeyInfo,
).decode("utf-8")
actor = SxPubActor(
preferred_username="sx",
display_name="SX Language",
summary="Federated SX specification publisher",
public_key_pem=public_pem,
private_key_pem=private_pem,
domain=SX_PUB_DOMAIN,
)
g.s.add(actor)
await g.s.flush()
logger.info("Created sx-pub actor id=%d domain=%s", actor.id, SX_PUB_DOMAIN)
# Seed default collections on first run
await seed_default_collections()
return {
"preferred-username": actor.preferred_username,
"display-name": actor.display_name or actor.preferred_username,
"summary": actor.summary or "",
"public-key-pem": actor.public_key_pem,
"domain": actor.domain,
}
async def seed_default_collections() -> None:
"""Create default collections if they don't exist."""
from quart import g
from sqlalchemy import select
from shared.models.sx_pub import SxPubCollection
defaults = [
("core-specs", "Core Specifications", "Language spec files — evaluator, parser, primitives, render", 0),
("platforms", "Platforms", "Host platform implementations — JavaScript, Python, OCaml", 1),
("components", "Components", "Reusable UI components published as content-addressed SX", 2),
("libraries", "Libraries", "SX library modules — stdlib, signals, freeze, web forms", 3),
]
for slug, name, description, order in defaults:
exists = await g.s.execute(
select(SxPubCollection).where(SxPubCollection.slug == slug)
)
if exists.scalar_one_or_none() is None:
g.s.add(SxPubCollection(
slug=slug, name=name, description=description, sort_order=order,
))
await g.s.flush()
logger.info("Seeded %d default collections", len(defaults))
async def list_collections() -> list[dict[str, Any]]:
"""List all pub collections."""
from quart import g
from sqlalchemy import select
from shared.models.sx_pub import SxPubCollection
result = await g.s.execute(
select(SxPubCollection).order_by(SxPubCollection.sort_order)
)
return [
{
"slug": c.slug,
"name": c.name,
"description": c.description or "",
}
for c in result.scalars().all()
]
async def check_status() -> dict[str, Any]:
"""Health check — DB, IPFS, actor."""
status: dict[str, Any] = {"healthy": "true"}
# DB
try:
from quart import g
from sqlalchemy import text
await g.s.execute(text("SELECT 1"))
status["db"] = "connected"
except Exception as e:
status["db"] = f"error: {e}"
status["healthy"] = "false"
# IPFS
try:
from shared.utils.ipfs_client import is_available
ok = await is_available()
status["ipfs"] = "available" if ok else "unavailable"
except Exception as e:
status["ipfs"] = f"error: {e}"
# Actor
try:
actor = await get_or_create_actor()
status["actor"] = actor["preferred-username"]
status["domain"] = actor["domain"]
except Exception as e:
status["actor"] = f"error: {e}"
status["healthy"] = "false"
return status
# ---------------------------------------------------------------------------
# Phase 2: Publishing + Browsing
# ---------------------------------------------------------------------------
async def publish_document(collection_slug: str, slug: str, content: str,
title: str = "", summary: str = "") -> dict[str, Any]:
"""Pin SX content to IPFS and store in DB. Returns doc info dict."""
import hashlib
from quart import g
from sqlalchemy import select
from shared.models.sx_pub import SxPubCollection, SxPubDocument
from shared.utils.ipfs_client import add_bytes
# Resolve collection
result = await g.s.execute(
select(SxPubCollection).where(SxPubCollection.slug == collection_slug)
)
collection = result.scalar_one_or_none()
if collection is None:
return {"error": f"Collection not found: {collection_slug}"}
# Hash content
content_bytes = content.encode("utf-8")
content_hash = hashlib.sha3_256(content_bytes).hexdigest()
# Pin to IPFS
try:
cid = await add_bytes(content_bytes, pin=True)
except Exception as e:
logger.error("IPFS pin failed for %s/%s: %s", collection_slug, slug, e)
return {"error": f"IPFS pin failed: {e}"}
# Upsert document
result = await g.s.execute(
select(SxPubDocument).where(
SxPubDocument.collection_id == collection.id,
SxPubDocument.slug == slug,
)
)
doc = result.scalar_one_or_none()
if doc is None:
doc = SxPubDocument(
collection_id=collection.id,
slug=slug,
title=title or slug,
summary=summary,
content_hash=content_hash,
ipfs_cid=cid,
size_bytes=len(content_bytes),
status="published",
)
g.s.add(doc)
else:
doc.content_hash = content_hash
doc.ipfs_cid = cid
doc.size_bytes = len(content_bytes)
doc.status = "published"
if title:
doc.title = title
if summary:
doc.summary = summary
await g.s.flush()
logger.info("Published %s/%s%s (%d bytes)", collection_slug, slug, cid, len(content_bytes))
# Record Publish activity in outbox
from shared.models.sx_pub import SxPubActivity
g.s.add(SxPubActivity(
activity_type="Publish",
object_type="SxDocument",
object_data={
"path": f"/pub/{collection_slug}/{slug}",
"cid": cid,
"collection": collection_slug,
"slug": slug,
"title": title or slug,
"summary": summary,
},
ipfs_cid=cid,
))
await g.s.flush()
return {
"path": f"/pub/{collection_slug}/{slug}",
"cid": cid,
"hash": content_hash,
"size": len(content_bytes),
"collection": collection_slug,
"slug": slug,
"title": doc.title or slug,
}
async def collection_items(collection_slug: str) -> dict[str, Any]:
"""List published documents in a collection."""
from quart import g
from sqlalchemy import select
from shared.models.sx_pub import SxPubCollection, SxPubDocument
result = await g.s.execute(
select(SxPubCollection).where(SxPubCollection.slug == collection_slug)
)
collection = result.scalar_one_or_none()
if collection is None:
return {"error": f"Collection not found: {collection_slug}"}
result = await g.s.execute(
select(SxPubDocument).where(
SxPubDocument.collection_id == collection.id,
SxPubDocument.status == "published",
).order_by(SxPubDocument.slug)
)
docs = result.scalars().all()
return {
"collection": collection_slug,
"name": collection.name,
"description": collection.description or "",
"items": [
{
"slug": d.slug,
"title": d.title or d.slug,
"summary": d.summary or "",
"cid": d.ipfs_cid or "",
"size": d.size_bytes or 0,
}
for d in docs
],
}
async def resolve_document(collection_slug: str, slug: str) -> dict[str, Any]:
"""Resolve a document path to its content via IPFS."""
from quart import g
from sqlalchemy import select
from shared.models.sx_pub import SxPubCollection, SxPubDocument
from shared.utils.ipfs_client import get_bytes
result = await g.s.execute(
select(SxPubCollection).where(SxPubCollection.slug == collection_slug)
)
collection = result.scalar_one_or_none()
if collection is None:
return {"error": "not-found"}
result = await g.s.execute(
select(SxPubDocument).where(
SxPubDocument.collection_id == collection.id,
SxPubDocument.slug == slug,
)
)
doc = result.scalar_one_or_none()
if doc is None or not doc.ipfs_cid:
return {"error": "not-found"}
content_bytes = await get_bytes(doc.ipfs_cid)
if content_bytes is None:
return {"error": "ipfs-unavailable"}
return {
"slug": doc.slug,
"title": doc.title or doc.slug,
"summary": doc.summary or "",
"cid": doc.ipfs_cid,
"collection": collection_slug,
"content": content_bytes.decode("utf-8", errors="replace"),
}
async def fetch_cid(cid: str) -> dict[str, Any]:
"""Fetch raw content from IPFS by CID."""
from shared.utils.ipfs_client import get_bytes
content_bytes = await get_bytes(cid)
if content_bytes is None:
return {"error": "not-found"}
return {
"cid": cid,
"content": content_bytes.decode("utf-8", errors="replace"),
"size": len(content_bytes),
}
# ---------------------------------------------------------------------------
# Phase 3: Federation — outbox, inbox, follow, delivery, signatures
# ---------------------------------------------------------------------------
async def outbox_data(page: str = "") -> dict[str, Any]:
"""List published activities (outbox)."""
from quart import g
from sqlalchemy import select, func as sa_func
from shared.models.sx_pub import SxPubActivity
page_num = int(page) if page else 1
per_page = 20
offset = (page_num - 1) * per_page
total_result = await g.s.execute(select(sa_func.count(SxPubActivity.id)))
total = total_result.scalar() or 0
result = await g.s.execute(
select(SxPubActivity)
.order_by(SxPubActivity.published.desc())
.offset(offset).limit(per_page)
)
activities = result.scalars().all()
return {
"total": total,
"page": page_num,
"items": [
{
"type": a.activity_type,
"object-type": a.object_type or "",
"published": a.published.isoformat() if a.published else "",
"cid": a.ipfs_cid or "",
"data": a.object_data or {},
}
for a in activities
],
}
async def followers_data() -> list[dict[str, Any]]:
"""List followers."""
from quart import g
from sqlalchemy import select
from shared.models.sx_pub import SxPubFollower
result = await g.s.execute(
select(SxPubFollower).where(SxPubFollower.state == "accepted")
.order_by(SxPubFollower.created_at.desc())
)
return [
{
"acct": f.follower_acct,
"actor-url": f.follower_actor_url,
"inbox": f.follower_inbox,
}
for f in result.scalars().all()
]
async def following_data() -> list[dict[str, Any]]:
"""List who we follow."""
from quart import g
from sqlalchemy import select
from shared.models.sx_pub import SxPubFollowing
result = await g.s.execute(
select(SxPubFollowing).where(SxPubFollowing.state == "accepted")
.order_by(SxPubFollowing.created_at.desc())
)
return [
{
"actor-url": f.remote_actor_url,
"inbox": f.remote_inbox,
}
for f in result.scalars().all()
]
def _sign_request(method: str, url: str, body: str, private_key_pem: str,
key_id: str) -> dict[str, str]:
"""Generate HTTP Signature headers for an outgoing request."""
from urllib.parse import urlparse
from cryptography.hazmat.primitives import hashes, serialization
from cryptography.hazmat.primitives.asymmetric import padding
import base64
parsed = urlparse(url)
path = parsed.path
host = parsed.netloc
date = datetime.now(timezone.utc).strftime("%a, %d %b %Y %H:%M:%S GMT")
# Build signature string
digest = "SHA-256=" + base64.b64encode(
hashlib.sha256(body.encode("utf-8")).digest()
).decode("ascii")
signed_string = (
f"(request-target): {method.lower()} {path}\n"
f"host: {host}\n"
f"date: {date}\n"
f"digest: {digest}"
)
private_key = serialization.load_pem_private_key(
private_key_pem.encode("utf-8"), password=None,
)
signature = base64.b64encode(
private_key.sign(
signed_string.encode("utf-8"),
padding.PKCS1v15(),
hashes.SHA256(),
)
).decode("ascii")
sig_header = (
f'keyId="{key_id}",'
f'algorithm="rsa-sha256",'
f'headers="(request-target) host date digest",'
f'signature="{signature}"'
)
return {
"Host": host,
"Date": date,
"Digest": digest,
"Signature": sig_header,
"Content-Type": "text/sx; charset=utf-8",
}
def _verify_signature(headers: dict, method: str, path: str,
body: str, public_key_pem: str) -> bool:
"""Verify HTTP Signature on an incoming request."""
import base64
import re
from cryptography.hazmat.primitives import hashes, serialization
from cryptography.hazmat.primitives.asymmetric import padding
sig_header = headers.get("Signature", "")
if not sig_header:
return False
# Parse signature header
parts = {}
for match in re.finditer(r'(\w+)="([^"]*)"', sig_header):
parts[match.group(1)] = match.group(2)
if "signature" not in parts or "headers" not in parts:
return False
# Reconstruct signed string
signed_headers = parts["headers"].split()
lines = []
for h in signed_headers:
if h == "(request-target)":
lines.append(f"(request-target): {method.lower()} {path}")
else:
val = headers.get(h.title(), headers.get(h, ""))
lines.append(f"{h}: {val}")
signed_string = "\n".join(lines)
try:
public_key = serialization.load_pem_public_key(
public_key_pem.encode("utf-8"),
)
public_key.verify(
base64.b64decode(parts["signature"]),
signed_string.encode("utf-8"),
padding.PKCS1v15(),
hashes.SHA256(),
)
return True
except Exception as e:
logger.warning("Signature verification failed: %s", e)
return False
async def follow_remote(actor_url: str) -> dict[str, Any]:
"""Send a Follow activity to a remote sx-pub server."""
import httpx
from quart import g
from sqlalchemy import select
from shared.models.sx_pub import SxPubFollowing, SxPubActor
# Check not already following
result = await g.s.execute(
select(SxPubFollowing).where(SxPubFollowing.remote_actor_url == actor_url)
)
existing = result.scalar_one_or_none()
if existing:
return {"status": existing.state, "actor-url": actor_url}
# Fetch remote actor document
try:
async with httpx.AsyncClient(timeout=15) as client:
resp = await client.get(actor_url, headers={"Accept": "text/sx"})
resp.raise_for_status()
remote_actor_sx = resp.text
except Exception as e:
return {"error": f"Failed to fetch remote actor: {e}"}
# Parse inbox URL from the SX actor document
# Simple extraction — look for :inbox "..."
import re
inbox_match = re.search(r':inbox\s+"([^"]+)"', remote_actor_sx)
if not inbox_match:
return {"error": "Could not find inbox in remote actor document"}
# Build absolute inbox URL
from urllib.parse import urljoin
remote_inbox = urljoin(actor_url, inbox_match.group(1))
# Get our actor for signing
actor_result = await g.s.execute(
select(SxPubActor).where(SxPubActor.preferred_username == "sx")
)
our_actor = actor_result.scalar_one_or_none()
if not our_actor:
return {"error": "Local actor not initialized"}
our_id = f"https://{our_actor.domain}/pub/actor"
# Build Follow activity
follow_sx = (
f'(Follow\n'
f' :actor "{our_id}"\n'
f' :object "{actor_url}")'
)
# Sign and send
key_id = f"{our_id}#main-key"
headers = _sign_request("POST", remote_inbox, follow_sx,
our_actor.private_key_pem, key_id)
try:
async with httpx.AsyncClient(timeout=15) as client:
resp = await client.post(remote_inbox, content=follow_sx, headers=headers)
logger.info("Sent Follow to %s%d", remote_inbox, resp.status_code)
except Exception as e:
logger.error("Follow delivery failed to %s: %s", remote_inbox, e)
# Store the following record
following = SxPubFollowing(
remote_actor_url=actor_url,
remote_inbox=remote_inbox,
state="pending",
)
g.s.add(following)
await g.s.flush()
return {"status": "pending", "actor-url": actor_url, "inbox": remote_inbox}
async def process_inbox(body_sx: str) -> dict[str, Any]:
"""Process an incoming activity from a remote sx-pub server."""
import re
from quart import g
from sqlalchemy import select
from shared.models.sx_pub import (
SxPubFollower, SxPubFollowing, SxPubActivity, SxPubActor,
)
from shared.utils.ipfs_client import pin_cid
# Parse activity type and actor
type_match = re.match(r'\((\w+)', body_sx.strip())
actor_match = re.search(r':actor\s+"([^"]+)"', body_sx)
object_match = re.search(r':object\s+"([^"]+)"', body_sx)
if not type_match:
return {"error": "Could not parse activity type"}
activity_type = type_match.group(1)
remote_actor = actor_match.group(1) if actor_match else ""
object_val = object_match.group(1) if object_match else ""
logger.info("Inbox received: %s from %s", activity_type, remote_actor)
if activity_type == "Follow":
# Someone wants to follow us — auto-accept
inbox_match = re.search(r':inbox\s+"([^"]+)"', body_sx)
remote_inbox = inbox_match.group(1) if inbox_match else ""
# If no inbox in activity, try fetching the remote actor
if not remote_inbox and remote_actor:
import httpx
try:
async with httpx.AsyncClient(timeout=10) as client:
resp = await client.get(remote_actor, headers={"Accept": "text/sx"})
im = re.search(r':inbox\s+"([^"]+)"', resp.text)
if im:
from urllib.parse import urljoin
remote_inbox = urljoin(remote_actor, im.group(1))
except Exception:
pass
# Store follower
result = await g.s.execute(
select(SxPubFollower).where(SxPubFollower.follower_actor_url == remote_actor)
)
follower = result.scalar_one_or_none()
if follower is None:
follower = SxPubFollower(
follower_acct=remote_actor,
follower_inbox=remote_inbox or remote_actor,
follower_actor_url=remote_actor,
state="accepted",
)
g.s.add(follower)
await g.s.flush()
logger.info("Accepted follow from %s", remote_actor)
# Send Accept back
actor_result = await g.s.execute(
select(SxPubActor).where(SxPubActor.preferred_username == "sx")
)
our_actor = actor_result.scalar_one_or_none()
if our_actor and remote_inbox:
our_id = f"https://{our_actor.domain}/pub/actor"
accept_sx = (
f'(Accept\n'
f' :actor "{our_id}"\n'
f' :object {body_sx})'
)
key_id = f"{our_id}#main-key"
headers = _sign_request("POST", remote_inbox, accept_sx,
our_actor.private_key_pem, key_id)
import httpx
try:
async with httpx.AsyncClient(timeout=10) as client:
await client.post(remote_inbox, content=accept_sx, headers=headers)
except Exception as e:
logger.warning("Accept delivery failed: %s", e)
return {"accepted": remote_actor}
elif activity_type == "Accept":
# Our follow was accepted — update state
if object_val:
result = await g.s.execute(
select(SxPubFollowing).where(
SxPubFollowing.remote_actor_url == remote_actor
)
)
following = result.scalar_one_or_none()
if following:
following.state = "accepted"
following.accepted_at = datetime.now(timezone.utc)
await g.s.flush()
logger.info("Follow accepted by %s", remote_actor)
return {"accepted-by": remote_actor}
elif activity_type == "Publish":
# Pin the published CID locally
cid_match = re.search(r':cid\s+"([^"]+)"', body_sx)
if cid_match:
cid = cid_match.group(1)
pinned = await pin_cid(cid)
logger.info("Mirrored CID %s from %s (pinned=%s)", cid, remote_actor, pinned)
# Record the activity
g.s.add(SxPubActivity(
activity_type="Publish",
object_type="SxDocument",
object_data={"remote_actor": remote_actor, "body": body_sx},
ipfs_cid=cid_match.group(1) if cid_match else None,
))
await g.s.flush()
return {"mirrored": cid_match.group(1) if cid_match else ""}
return {"ignored": activity_type}
async def deliver_to_followers(activity_sx: str) -> dict[str, Any]:
"""Deliver an activity to all follower inboxes."""
import httpx
from quart import g
from sqlalchemy import select
from shared.models.sx_pub import SxPubFollower, SxPubActor
actor_result = await g.s.execute(
select(SxPubActor).where(SxPubActor.preferred_username == "sx")
)
our_actor = actor_result.scalar_one_or_none()
if not our_actor:
return {"error": "Actor not initialized", "delivered": 0}
our_id = f"https://{our_actor.domain}/pub/actor"
key_id = f"{our_id}#main-key"
result = await g.s.execute(
select(SxPubFollower).where(SxPubFollower.state == "accepted")
)
followers = result.scalars().all()
delivered = 0
failed = 0
for follower in followers:
headers = _sign_request("POST", follower.follower_inbox, activity_sx,
our_actor.private_key_pem, key_id)
try:
async with httpx.AsyncClient(timeout=10) as client:
resp = await client.post(
follower.follower_inbox,
content=activity_sx,
headers=headers,
)
if resp.status_code < 300:
delivered += 1
else:
failed += 1
logger.warning("Delivery to %s returned %d",
follower.follower_inbox, resp.status_code)
except Exception as e:
failed += 1
logger.error("Delivery to %s failed: %s", follower.follower_inbox, e)
logger.info("Delivered to %d/%d followers (%d failed)",
delivered, len(followers), failed)
return {"delivered": delivered, "failed": failed, "total": len(followers)}
async def get_request_body() -> str:
"""Get the raw request body as text."""
from quart import request
data = await request.get_data(as_text=True)
return data
async def get_request_headers() -> dict[str, str]:
"""Get request headers as a dict."""
from quart import request
return dict(request.headers)
async def get_request_method() -> str:
"""Get the HTTP method."""
from quart import request
return request.method
async def get_request_path() -> str:
"""Get the request path."""
from quart import request
return request.path
# ---------------------------------------------------------------------------
# Phase 4: Anchoring — Merkle trees, OTS, verification
# ---------------------------------------------------------------------------
async def anchor_pending() -> dict[str, Any]:
"""Anchor all unanchored Publish activities into a Merkle tree.
1. Collect unanchored activities (by CID)
2. Build Merkle tree
3. Pin tree to IPFS
4. Submit root to OpenTimestamps
5. Store proof on IPFS
6. Record anchor in DB
"""
from quart import g
from sqlalchemy import select
from shared.models.sx_pub import SxPubActivity
from shared.utils.anchoring import (
build_merkle_tree, submit_to_opentimestamps,
)
from shared.utils.ipfs_client import add_json, add_bytes, is_available
# Find unanchored Publish activities with CIDs
result = await g.s.execute(
select(SxPubActivity).where(
SxPubActivity.activity_type == "Publish",
SxPubActivity.ipfs_cid.isnot(None),
SxPubActivity.object_data["anchor_tree_cid"].is_(None),
).order_by(SxPubActivity.created_at.asc())
.limit(100)
)
activities = result.scalars().all()
if not activities:
return {"status": "nothing-to-anchor", "count": 0}
# Build Merkle tree from CIDs
cids = [a.ipfs_cid for a in activities if a.ipfs_cid]
if not cids:
return {"status": "no-cids", "count": 0}
tree = build_merkle_tree(cids)
merkle_root = tree["root"]
# Pin tree to IPFS
tree_cid = None
ots_proof_cid = None
if await is_available():
try:
tree_data = {
"root": merkle_root,
"leaves": tree["leaves"],
"cids": cids,
"created_at": datetime.now(timezone.utc).isoformat(),
}
tree_cid = await add_json(tree_data)
logger.info("Merkle tree pinned: %s (%d leaves)", tree_cid, len(cids))
except Exception as e:
logger.error("IPFS tree storage failed: %s", e)
# Submit to OpenTimestamps
ots_proof = await submit_to_opentimestamps(merkle_root)
if ots_proof and await is_available():
try:
ots_proof_cid = await add_bytes(ots_proof)
logger.info("OTS proof pinned: %s", ots_proof_cid)
except Exception as e:
logger.error("IPFS OTS proof storage failed: %s", e)
# Record anchor in activities (store in object_data)
anchor_info = {
"merkle_root": merkle_root,
"tree_cid": tree_cid,
"ots_proof_cid": ots_proof_cid,
"activity_count": len(activities),
"anchored_at": datetime.now(timezone.utc).isoformat(),
}
for a in activities:
data = dict(a.object_data or {})
data["anchor_tree_cid"] = tree_cid
data["anchor_merkle_root"] = merkle_root
data["anchor_ots_cid"] = ots_proof_cid
a.object_data = data
# Also record anchor as its own activity
from shared.models.sx_pub import SxPubActivity as SPA
g.s.add(SPA(
activity_type="Anchor",
object_type="MerkleTree",
object_data=anchor_info,
ipfs_cid=tree_cid,
))
await g.s.flush()
logger.info("Anchored %d activities, root=%s, tree=%s, ots=%s",
len(activities), merkle_root, tree_cid, ots_proof_cid)
return {
"status": "anchored",
"count": len(activities),
"merkle-root": merkle_root,
"tree-cid": tree_cid or "",
"ots-proof-cid": ots_proof_cid or "",
}
async def verify_cid_anchor(cid: str) -> dict[str, Any]:
"""Verify the anchor proof for a specific CID."""
from quart import g
from sqlalchemy import select
from shared.models.sx_pub import SxPubActivity
from shared.utils.anchoring import build_merkle_tree, verify_merkle_proof
from shared.utils.ipfs_client import get_bytes
# Find the Publish activity for this CID
result = await g.s.execute(
select(SxPubActivity).where(
SxPubActivity.activity_type == "Publish",
SxPubActivity.ipfs_cid == cid,
)
)
activity = result.scalar_one_or_none()
if not activity:
return {"error": "not-found", "cid": cid}
data = activity.object_data or {}
tree_cid = data.get("anchor_tree_cid")
merkle_root = data.get("anchor_merkle_root")
ots_cid = data.get("anchor_ots_cid")
if not tree_cid:
return {"status": "not-anchored", "cid": cid}
# Fetch the Merkle tree from IPFS to verify
verified = False
if tree_cid:
tree_bytes = await get_bytes(tree_cid)
if tree_bytes:
import json
try:
tree_data = json.loads(tree_bytes)
tree = build_merkle_tree(tree_data["cids"])
from shared.utils.anchoring import get_merkle_proof
proof = get_merkle_proof(tree, cid)
if proof is not None:
verified = verify_merkle_proof(cid, proof, tree["root"])
except Exception as e:
logger.warning("Merkle verification failed: %s", e)
return {
"status": "anchored" if verified else "unverified",
"cid": cid,
"merkle-root": merkle_root or "",
"tree-cid": tree_cid or "",
"ots-proof-cid": ots_cid or "",
"verified": "true" if verified else "false",
"published": activity.published.isoformat() if activity.published else "",
}