Files
rose-ash/l2/anchoring.py
2026-02-24 23:07:31 +00:00

335 lines
9.3 KiB
Python

# art-activity-pub/anchoring.py
"""
Merkle tree anchoring to Bitcoin via OpenTimestamps.
Provides provable timestamps for ActivityPub activities without running
our own blockchain. Activities are hashed into a merkle tree, the root
is submitted to OpenTimestamps (free), and the proof is stored on IPFS.
The merkle tree + OTS proof provides cryptographic evidence that
activities existed at a specific time, anchored to Bitcoin.
"""
import hashlib
import json
import logging
import os
from datetime import datetime, timezone
from pathlib import Path
from typing import List, Optional
import requests
logger = logging.getLogger(__name__)
# Backup file location (should be on persistent volume)
ANCHOR_BACKUP_DIR = Path(os.getenv("ANCHOR_BACKUP_DIR", "/data/anchors"))
ANCHOR_BACKUP_FILE = ANCHOR_BACKUP_DIR / "anchors.jsonl"
# OpenTimestamps calendar servers
OTS_SERVERS = [
"https://a.pool.opentimestamps.org",
"https://b.pool.opentimestamps.org",
"https://a.pool.eternitywall.com",
]
def _ensure_backup_dir():
"""Ensure backup directory exists."""
ANCHOR_BACKUP_DIR.mkdir(parents=True, exist_ok=True)
def build_merkle_tree(items: List[str]) -> Optional[dict]:
"""
Build a merkle tree from a list of strings (activity IDs).
Args:
items: List of activity IDs to include
Returns:
Dict with root, tree structure, and metadata, or None if empty
"""
if not items:
return None
# Sort for deterministic ordering
items = sorted(items)
# Hash each item to create leaves
leaves = [hashlib.sha256(item.encode()).hexdigest() for item in items]
# Build tree bottom-up
tree_levels = [leaves]
current_level = leaves
while len(current_level) > 1:
next_level = []
for i in range(0, len(current_level), 2):
left = current_level[i]
# If odd number, duplicate last node
right = current_level[i + 1] if i + 1 < len(current_level) else left
# Hash pair together
combined = hashlib.sha256((left + right).encode()).hexdigest()
next_level.append(combined)
tree_levels.append(next_level)
current_level = next_level
root = current_level[0]
return {
"root": root,
"tree": tree_levels,
"items": items,
"item_count": len(items),
"created_at": datetime.now(timezone.utc).isoformat()
}
def get_merkle_proof(tree: dict, item: str) -> Optional[List[dict]]:
"""
Get merkle proof for a specific item.
Args:
tree: Merkle tree dict from build_merkle_tree
item: The item to prove membership for
Returns:
List of proof steps, or None if item not in tree
"""
items = tree["items"]
if item not in items:
return None
# Find leaf index
sorted_items = sorted(items)
leaf_index = sorted_items.index(item)
leaf_hash = hashlib.sha256(item.encode()).hexdigest()
proof = []
tree_levels = tree["tree"]
current_index = leaf_index
for level in tree_levels[:-1]: # Skip root level
sibling_index = current_index ^ 1 # XOR to get sibling
if sibling_index < len(level):
sibling_hash = level[sibling_index]
proof.append({
"hash": sibling_hash,
"position": "right" if current_index % 2 == 0 else "left"
})
current_index //= 2
return proof
def verify_merkle_proof(item: str, proof: List[dict], root: str) -> bool:
"""
Verify a merkle proof.
Args:
item: The item to verify
proof: Proof steps from get_merkle_proof
root: Expected merkle root
Returns:
True if proof is valid
"""
current_hash = hashlib.sha256(item.encode()).hexdigest()
for step in proof:
sibling = step["hash"]
if step["position"] == "right":
combined = current_hash + sibling
else:
combined = sibling + current_hash
current_hash = hashlib.sha256(combined.encode()).hexdigest()
return current_hash == root
def submit_to_opentimestamps(hash_hex: str) -> Optional[bytes]:
"""
Submit a hash to OpenTimestamps for Bitcoin anchoring.
Args:
hash_hex: Hex-encoded SHA256 hash to timestamp
Returns:
Incomplete .ots proof bytes, or None on failure
Note:
The returned proof is "incomplete" - it becomes complete
after Bitcoin confirms (usually 1-2 hours). Use upgrade_ots_proof
to get the complete proof later.
"""
hash_bytes = bytes.fromhex(hash_hex)
for server in OTS_SERVERS:
try:
resp = requests.post(
f"{server}/digest",
data=hash_bytes,
headers={"Content-Type": "application/x-www-form-urlencoded"},
timeout=10
)
if resp.status_code == 200:
logger.info(f"Submitted to OpenTimestamps via {server}")
return resp.content
except Exception as e:
logger.warning(f"OTS server {server} failed: {e}")
continue
logger.error("All OpenTimestamps servers failed")
return None
def upgrade_ots_proof(ots_proof: bytes) -> Optional[bytes]:
"""
Upgrade an incomplete OTS proof to a complete Bitcoin-anchored proof.
Args:
ots_proof: Incomplete .ots proof bytes
Returns:
Complete .ots proof bytes, or None if not yet confirmed
Note:
This should be called periodically (e.g., hourly) until
the proof is complete. Bitcoin confirmation takes ~1-2 hours.
"""
for server in OTS_SERVERS:
try:
resp = requests.post(
f"{server}/upgrade",
data=ots_proof,
headers={"Content-Type": "application/octet-stream"},
timeout=10
)
if resp.status_code == 200 and len(resp.content) > len(ots_proof):
logger.info(f"OTS proof upgraded via {server}")
return resp.content
except Exception as e:
logger.warning(f"OTS upgrade via {server} failed: {e}")
continue
return None
def append_to_backup(anchor_record: dict):
"""
Append anchor record to persistent JSONL backup file.
Args:
anchor_record: Dict with anchor metadata
"""
_ensure_backup_dir()
with open(ANCHOR_BACKUP_FILE, "a") as f:
f.write(json.dumps(anchor_record, sort_keys=True) + "\n")
logger.info(f"Anchor backed up to {ANCHOR_BACKUP_FILE}")
def load_backup_anchors() -> List[dict]:
"""
Load all anchors from backup file.
Returns:
List of anchor records
"""
if not ANCHOR_BACKUP_FILE.exists():
return []
anchors = []
with open(ANCHOR_BACKUP_FILE, "r") as f:
for line in f:
line = line.strip()
if line:
try:
anchors.append(json.loads(line))
except json.JSONDecodeError:
logger.warning(f"Invalid JSON in backup: {line[:50]}...")
return anchors
def get_latest_anchor_from_backup() -> Optional[dict]:
"""Get the most recent anchor from backup."""
anchors = load_backup_anchors()
return anchors[-1] if anchors else None
async def create_anchor(
activity_ids: List[str],
db_module,
ipfs_module
) -> Optional[dict]:
"""
Create a new anchor for a batch of activities.
Args:
activity_ids: List of activity UUIDs to anchor
db_module: Database module with anchor functions
ipfs_module: IPFS client module
Returns:
Anchor record dict, or None on failure
"""
if not activity_ids:
logger.info("No activities to anchor")
return None
# Build merkle tree
tree = build_merkle_tree(activity_ids)
if not tree:
return None
root = tree["root"]
logger.info(f"Built merkle tree: {len(activity_ids)} activities, root={root[:16]}...")
# Store tree on IPFS
try:
tree_cid = ipfs_module.add_json(tree)
logger.info(f"Merkle tree stored on IPFS: {tree_cid}")
except Exception as e:
logger.error(f"Failed to store tree on IPFS: {e}")
tree_cid = None
# Submit to OpenTimestamps
ots_proof = submit_to_opentimestamps(root)
# Store OTS proof on IPFS too
ots_cid = None
if ots_proof and ipfs_module:
try:
ots_cid = ipfs_module.add_bytes(ots_proof)
logger.info(f"OTS proof stored on IPFS: {ots_cid}")
except Exception as e:
logger.warning(f"Failed to store OTS proof on IPFS: {e}")
# Create anchor record
anchor_record = {
"merkle_root": root,
"tree_ipfs_cid": tree_cid,
"ots_proof_cid": ots_cid,
"activity_count": len(activity_ids),
"first_activity_id": activity_ids[0],
"last_activity_id": activity_ids[-1],
"created_at": datetime.now(timezone.utc).isoformat(),
"confirmed_at": None,
"bitcoin_txid": None
}
# Save to database
if db_module:
try:
await db_module.create_anchor(anchor_record)
await db_module.mark_activities_anchored(activity_ids, root)
except Exception as e:
logger.error(f"Failed to save anchor to database: {e}")
# Append to backup file (persistent)
append_to_backup(anchor_record)
return anchor_record