"""Merkle tree construction and OpenTimestamps anchoring. Ported from ~/art-dag/activity-pub/anchoring.py. Builds a SHA256 merkle tree from activity IDs, submits the root to OpenTimestamps for Bitcoin timestamping, and stores the tree + proof on IPFS. """ from __future__ import annotations import hashlib import logging from datetime import datetime, timezone import httpx from sqlalchemy import select, func from sqlalchemy.ext.asyncio import AsyncSession from shared.models.federation import APActivity, APAnchor log = logging.getLogger(__name__) OTS_SERVERS = [ "https://a.pool.opentimestamps.org", "https://b.pool.opentimestamps.org", "https://a.pool.eternitywall.com", ] def _sha256(data: str | bytes) -> str: """SHA256 hex digest.""" if isinstance(data, str): data = data.encode() return hashlib.sha256(data).hexdigest() def build_merkle_tree(items: list[str]) -> dict: """Build a SHA256 merkle tree from a list of strings (activity IDs). Returns: { "root": hex_str, "leaves": [hex_str, ...], "levels": [[hex_str, ...], ...], # bottom-up } """ if not items: raise ValueError("Cannot build merkle tree from empty list") # Sort for deterministic ordering items = sorted(items) # Leaf hashes leaves = [_sha256(item) for item in items] levels = [leaves[:]] current = leaves[:] while len(current) > 1: next_level = [] for i in range(0, len(current), 2): left = current[i] right = current[i + 1] if i + 1 < len(current) else left combined = _sha256(left + right) next_level.append(combined) levels.append(next_level) current = next_level return { "root": current[0], "leaves": leaves, "levels": levels, } def get_merkle_proof(tree: dict, item: str) -> list[dict] | None: """Generate a proof-of-membership for an item. Returns a list of {sibling: hex, position: "left"|"right"} dicts, or None if the item is not in the tree. """ item_hash = _sha256(item) leaves = tree["leaves"] try: idx = leaves.index(item_hash) except ValueError: return None proof = [] for level in tree["levels"][:-1]: # skip root level if idx % 2 == 0: sibling_idx = idx + 1 position = "right" else: sibling_idx = idx - 1 position = "left" if sibling_idx < len(level): proof.append({"sibling": level[sibling_idx], "position": position}) else: proof.append({"sibling": level[idx], "position": position}) idx = idx // 2 return proof def verify_merkle_proof(item: str, proof: list[dict], root: str) -> bool: """Verify a merkle proof against a root hash.""" current = _sha256(item) for step in proof: sibling = step["sibling"] if step["position"] == "right": current = _sha256(current + sibling) else: current = _sha256(sibling + current) return current == root async def submit_to_opentimestamps(merkle_root: str) -> bytes | None: """Submit a hash to OpenTimestamps. Returns the (incomplete) OTS proof bytes.""" root_bytes = bytes.fromhex(merkle_root) for server in OTS_SERVERS: try: async with httpx.AsyncClient(timeout=30) as client: resp = await client.post( f"{server}/digest", content=root_bytes, headers={"Content-Type": "application/x-opentimestamps"}, ) if resp.status_code == 200: log.info("OTS proof obtained from %s", server) return resp.content except Exception: log.debug("OTS server %s failed", server, exc_info=True) continue log.warning("All OTS servers failed for root %s", merkle_root) return None async def upgrade_ots_proof(proof_bytes: bytes) -> tuple[bytes, bool]: """Try to upgrade an incomplete OTS proof to a Bitcoin-confirmed one. Returns (proof_bytes, is_confirmed). The proof_bytes may be updated. """ # OpenTimestamps upgrade is done via the `ots` CLI or their calendar API. # For now, return the proof as-is with is_confirmed=False. # Calendar-based upgrade polling not yet implemented. return proof_bytes, False async def create_anchor( session: AsyncSession, batch_size: int = 100, ) -> APAnchor | None: """Anchor a batch of un-anchored activities. 1. Select activities without an anchor_id 2. Build merkle tree from their activity_ids 3. Store tree on IPFS 4. Submit root to OpenTimestamps 5. Store OTS proof on IPFS 6. Create APAnchor record 7. Link activities to anchor """ # Find un-anchored activities result = await session.execute( select(APActivity) .where( APActivity.anchor_id.is_(None), APActivity.is_local == True, # noqa: E712 ) .order_by(APActivity.created_at.asc()) .limit(batch_size) ) activities = result.scalars().all() if not activities: log.debug("No un-anchored activities to process") return None activity_ids = [a.activity_id for a in activities] log.info("Anchoring %d activities", len(activity_ids)) # Build merkle tree tree = build_merkle_tree(activity_ids) merkle_root = tree["root"] # Store tree on IPFS tree_cid = None ots_proof_cid = None try: from shared.utils.ipfs_client import add_json, add_bytes, is_available if await is_available(): tree_cid = await add_json({ "root": merkle_root, "leaves": tree["leaves"], "activity_ids": activity_ids, "created_at": datetime.now(timezone.utc).isoformat(), }) log.info("Merkle tree stored on IPFS: %s", tree_cid) except Exception: log.exception("IPFS tree storage failed") # Submit to OpenTimestamps ots_proof = await submit_to_opentimestamps(merkle_root) if ots_proof: try: from shared.utils.ipfs_client import add_bytes, is_available if await is_available(): ots_proof_cid = await add_bytes(ots_proof) log.info("OTS proof stored on IPFS: %s", ots_proof_cid) except Exception: log.exception("IPFS OTS proof storage failed") # Create anchor record anchor = APAnchor( merkle_root=merkle_root, tree_ipfs_cid=tree_cid, ots_proof_cid=ots_proof_cid, activity_count=len(activities), ) session.add(anchor) await session.flush() # Link activities to anchor for a in activities: a.anchor_id = anchor.id await session.flush() log.info( "Anchor created: root=%s, activities=%d, tree_cid=%s", merkle_root, len(activities), tree_cid, ) return anchor