All checks were successful
Build and Deploy / build-and-deploy (push) Successful in 2m33s
Merges full history from art-dag/mono.git into the monorepo under the artdag/ directory. Contains: core (DAG engine), l1 (Celery rendering server), l2 (ActivityPub registry), common (shared templates/middleware), client (CLI), test (e2e). Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com> git-subtree-dir: artdag git-subtree-mainline:1a179de547git-subtree-split:4c2e716558
335 lines
9.3 KiB
Python
335 lines
9.3 KiB
Python
# art-activity-pub/anchoring.py
|
|
"""
|
|
Merkle tree anchoring to Bitcoin via OpenTimestamps.
|
|
|
|
Provides provable timestamps for ActivityPub activities without running
|
|
our own blockchain. Activities are hashed into a merkle tree, the root
|
|
is submitted to OpenTimestamps (free), and the proof is stored on IPFS.
|
|
|
|
The merkle tree + OTS proof provides cryptographic evidence that
|
|
activities existed at a specific time, anchored to Bitcoin.
|
|
"""
|
|
|
|
import hashlib
|
|
import json
|
|
import logging
|
|
import os
|
|
from datetime import datetime, timezone
|
|
from pathlib import Path
|
|
from typing import List, Optional
|
|
|
|
import requests
|
|
|
|
logger = logging.getLogger(__name__)
|
|
|
|
# Backup file location (should be on persistent volume)
|
|
ANCHOR_BACKUP_DIR = Path(os.getenv("ANCHOR_BACKUP_DIR", "/data/anchors"))
|
|
ANCHOR_BACKUP_FILE = ANCHOR_BACKUP_DIR / "anchors.jsonl"
|
|
|
|
# OpenTimestamps calendar servers
|
|
OTS_SERVERS = [
|
|
"https://a.pool.opentimestamps.org",
|
|
"https://b.pool.opentimestamps.org",
|
|
"https://a.pool.eternitywall.com",
|
|
]
|
|
|
|
|
|
def _ensure_backup_dir():
|
|
"""Ensure backup directory exists."""
|
|
ANCHOR_BACKUP_DIR.mkdir(parents=True, exist_ok=True)
|
|
|
|
|
|
def build_merkle_tree(items: List[str]) -> Optional[dict]:
|
|
"""
|
|
Build a merkle tree from a list of strings (activity IDs).
|
|
|
|
Args:
|
|
items: List of activity IDs to include
|
|
|
|
Returns:
|
|
Dict with root, tree structure, and metadata, or None if empty
|
|
"""
|
|
if not items:
|
|
return None
|
|
|
|
# Sort for deterministic ordering
|
|
items = sorted(items)
|
|
|
|
# Hash each item to create leaves
|
|
leaves = [hashlib.sha256(item.encode()).hexdigest() for item in items]
|
|
|
|
# Build tree bottom-up
|
|
tree_levels = [leaves]
|
|
current_level = leaves
|
|
|
|
while len(current_level) > 1:
|
|
next_level = []
|
|
for i in range(0, len(current_level), 2):
|
|
left = current_level[i]
|
|
# If odd number, duplicate last node
|
|
right = current_level[i + 1] if i + 1 < len(current_level) else left
|
|
# Hash pair together
|
|
combined = hashlib.sha256((left + right).encode()).hexdigest()
|
|
next_level.append(combined)
|
|
tree_levels.append(next_level)
|
|
current_level = next_level
|
|
|
|
root = current_level[0]
|
|
|
|
return {
|
|
"root": root,
|
|
"tree": tree_levels,
|
|
"items": items,
|
|
"item_count": len(items),
|
|
"created_at": datetime.now(timezone.utc).isoformat()
|
|
}
|
|
|
|
|
|
def get_merkle_proof(tree: dict, item: str) -> Optional[List[dict]]:
|
|
"""
|
|
Get merkle proof for a specific item.
|
|
|
|
Args:
|
|
tree: Merkle tree dict from build_merkle_tree
|
|
item: The item to prove membership for
|
|
|
|
Returns:
|
|
List of proof steps, or None if item not in tree
|
|
"""
|
|
items = tree["items"]
|
|
if item not in items:
|
|
return None
|
|
|
|
# Find leaf index
|
|
sorted_items = sorted(items)
|
|
leaf_index = sorted_items.index(item)
|
|
leaf_hash = hashlib.sha256(item.encode()).hexdigest()
|
|
|
|
proof = []
|
|
tree_levels = tree["tree"]
|
|
current_index = leaf_index
|
|
|
|
for level in tree_levels[:-1]: # Skip root level
|
|
sibling_index = current_index ^ 1 # XOR to get sibling
|
|
if sibling_index < len(level):
|
|
sibling_hash = level[sibling_index]
|
|
proof.append({
|
|
"hash": sibling_hash,
|
|
"position": "right" if current_index % 2 == 0 else "left"
|
|
})
|
|
current_index //= 2
|
|
|
|
return proof
|
|
|
|
|
|
def verify_merkle_proof(item: str, proof: List[dict], root: str) -> bool:
|
|
"""
|
|
Verify a merkle proof.
|
|
|
|
Args:
|
|
item: The item to verify
|
|
proof: Proof steps from get_merkle_proof
|
|
root: Expected merkle root
|
|
|
|
Returns:
|
|
True if proof is valid
|
|
"""
|
|
current_hash = hashlib.sha256(item.encode()).hexdigest()
|
|
|
|
for step in proof:
|
|
sibling = step["hash"]
|
|
if step["position"] == "right":
|
|
combined = current_hash + sibling
|
|
else:
|
|
combined = sibling + current_hash
|
|
current_hash = hashlib.sha256(combined.encode()).hexdigest()
|
|
|
|
return current_hash == root
|
|
|
|
|
|
def submit_to_opentimestamps(hash_hex: str) -> Optional[bytes]:
|
|
"""
|
|
Submit a hash to OpenTimestamps for Bitcoin anchoring.
|
|
|
|
Args:
|
|
hash_hex: Hex-encoded SHA256 hash to timestamp
|
|
|
|
Returns:
|
|
Incomplete .ots proof bytes, or None on failure
|
|
|
|
Note:
|
|
The returned proof is "incomplete" - it becomes complete
|
|
after Bitcoin confirms (usually 1-2 hours). Use upgrade_ots_proof
|
|
to get the complete proof later.
|
|
"""
|
|
hash_bytes = bytes.fromhex(hash_hex)
|
|
|
|
for server in OTS_SERVERS:
|
|
try:
|
|
resp = requests.post(
|
|
f"{server}/digest",
|
|
data=hash_bytes,
|
|
headers={"Content-Type": "application/x-www-form-urlencoded"},
|
|
timeout=10
|
|
)
|
|
if resp.status_code == 200:
|
|
logger.info(f"Submitted to OpenTimestamps via {server}")
|
|
return resp.content
|
|
except Exception as e:
|
|
logger.warning(f"OTS server {server} failed: {e}")
|
|
continue
|
|
|
|
logger.error("All OpenTimestamps servers failed")
|
|
return None
|
|
|
|
|
|
def upgrade_ots_proof(ots_proof: bytes) -> Optional[bytes]:
|
|
"""
|
|
Upgrade an incomplete OTS proof to a complete Bitcoin-anchored proof.
|
|
|
|
Args:
|
|
ots_proof: Incomplete .ots proof bytes
|
|
|
|
Returns:
|
|
Complete .ots proof bytes, or None if not yet confirmed
|
|
|
|
Note:
|
|
This should be called periodically (e.g., hourly) until
|
|
the proof is complete. Bitcoin confirmation takes ~1-2 hours.
|
|
"""
|
|
for server in OTS_SERVERS:
|
|
try:
|
|
resp = requests.post(
|
|
f"{server}/upgrade",
|
|
data=ots_proof,
|
|
headers={"Content-Type": "application/octet-stream"},
|
|
timeout=10
|
|
)
|
|
if resp.status_code == 200 and len(resp.content) > len(ots_proof):
|
|
logger.info(f"OTS proof upgraded via {server}")
|
|
return resp.content
|
|
except Exception as e:
|
|
logger.warning(f"OTS upgrade via {server} failed: {e}")
|
|
continue
|
|
|
|
return None
|
|
|
|
|
|
def append_to_backup(anchor_record: dict):
|
|
"""
|
|
Append anchor record to persistent JSONL backup file.
|
|
|
|
Args:
|
|
anchor_record: Dict with anchor metadata
|
|
"""
|
|
_ensure_backup_dir()
|
|
|
|
with open(ANCHOR_BACKUP_FILE, "a") as f:
|
|
f.write(json.dumps(anchor_record, sort_keys=True) + "\n")
|
|
|
|
logger.info(f"Anchor backed up to {ANCHOR_BACKUP_FILE}")
|
|
|
|
|
|
def load_backup_anchors() -> List[dict]:
|
|
"""
|
|
Load all anchors from backup file.
|
|
|
|
Returns:
|
|
List of anchor records
|
|
"""
|
|
if not ANCHOR_BACKUP_FILE.exists():
|
|
return []
|
|
|
|
anchors = []
|
|
with open(ANCHOR_BACKUP_FILE, "r") as f:
|
|
for line in f:
|
|
line = line.strip()
|
|
if line:
|
|
try:
|
|
anchors.append(json.loads(line))
|
|
except json.JSONDecodeError:
|
|
logger.warning(f"Invalid JSON in backup: {line[:50]}...")
|
|
|
|
return anchors
|
|
|
|
|
|
def get_latest_anchor_from_backup() -> Optional[dict]:
|
|
"""Get the most recent anchor from backup."""
|
|
anchors = load_backup_anchors()
|
|
return anchors[-1] if anchors else None
|
|
|
|
|
|
async def create_anchor(
|
|
activity_ids: List[str],
|
|
db_module,
|
|
ipfs_module
|
|
) -> Optional[dict]:
|
|
"""
|
|
Create a new anchor for a batch of activities.
|
|
|
|
Args:
|
|
activity_ids: List of activity UUIDs to anchor
|
|
db_module: Database module with anchor functions
|
|
ipfs_module: IPFS client module
|
|
|
|
Returns:
|
|
Anchor record dict, or None on failure
|
|
"""
|
|
if not activity_ids:
|
|
logger.info("No activities to anchor")
|
|
return None
|
|
|
|
# Build merkle tree
|
|
tree = build_merkle_tree(activity_ids)
|
|
if not tree:
|
|
return None
|
|
|
|
root = tree["root"]
|
|
logger.info(f"Built merkle tree: {len(activity_ids)} activities, root={root[:16]}...")
|
|
|
|
# Store tree on IPFS
|
|
try:
|
|
tree_cid = ipfs_module.add_json(tree)
|
|
logger.info(f"Merkle tree stored on IPFS: {tree_cid}")
|
|
except Exception as e:
|
|
logger.error(f"Failed to store tree on IPFS: {e}")
|
|
tree_cid = None
|
|
|
|
# Submit to OpenTimestamps
|
|
ots_proof = submit_to_opentimestamps(root)
|
|
|
|
# Store OTS proof on IPFS too
|
|
ots_cid = None
|
|
if ots_proof and ipfs_module:
|
|
try:
|
|
ots_cid = ipfs_module.add_bytes(ots_proof)
|
|
logger.info(f"OTS proof stored on IPFS: {ots_cid}")
|
|
except Exception as e:
|
|
logger.warning(f"Failed to store OTS proof on IPFS: {e}")
|
|
|
|
# Create anchor record
|
|
anchor_record = {
|
|
"merkle_root": root,
|
|
"tree_ipfs_cid": tree_cid,
|
|
"ots_proof_cid": ots_cid,
|
|
"activity_count": len(activity_ids),
|
|
"first_activity_id": activity_ids[0],
|
|
"last_activity_id": activity_ids[-1],
|
|
"created_at": datetime.now(timezone.utc).isoformat(),
|
|
"confirmed_at": None,
|
|
"bitcoin_txid": None
|
|
}
|
|
|
|
# Save to database
|
|
if db_module:
|
|
try:
|
|
await db_module.create_anchor(anchor_record)
|
|
await db_module.mark_activities_anchored(activity_ids, root)
|
|
except Exception as e:
|
|
logger.error(f"Failed to save anchor to database: {e}")
|
|
|
|
# Append to backup file (persistent)
|
|
append_to_backup(anchor_record)
|
|
|
|
return anchor_record
|