diff --git a/anchoring.py b/anchoring.py new file mode 100644 index 0000000..49f6f1a --- /dev/null +++ b/anchoring.py @@ -0,0 +1,334 @@ +# art-activity-pub/anchoring.py +""" +Merkle tree anchoring to Bitcoin via OpenTimestamps. + +Provides provable timestamps for ActivityPub activities without running +our own blockchain. Activities are hashed into a merkle tree, the root +is submitted to OpenTimestamps (free), and the proof is stored on IPFS. + +The merkle tree + OTS proof provides cryptographic evidence that +activities existed at a specific time, anchored to Bitcoin. +""" + +import hashlib +import json +import logging +import os +from datetime import datetime, timezone +from pathlib import Path +from typing import List, Optional + +import requests + +logger = logging.getLogger(__name__) + +# Backup file location (should be on persistent volume) +ANCHOR_BACKUP_DIR = Path(os.getenv("ANCHOR_BACKUP_DIR", "/data/anchors")) +ANCHOR_BACKUP_FILE = ANCHOR_BACKUP_DIR / "anchors.jsonl" + +# OpenTimestamps calendar servers +OTS_SERVERS = [ + "https://a.pool.opentimestamps.org", + "https://b.pool.opentimestamps.org", + "https://a.pool.eternitywall.com", +] + + +def _ensure_backup_dir(): + """Ensure backup directory exists.""" + ANCHOR_BACKUP_DIR.mkdir(parents=True, exist_ok=True) + + +def build_merkle_tree(items: List[str]) -> Optional[dict]: + """ + Build a merkle tree from a list of strings (activity IDs). + + Args: + items: List of activity IDs to include + + Returns: + Dict with root, tree structure, and metadata, or None if empty + """ + if not items: + return None + + # Sort for deterministic ordering + items = sorted(items) + + # Hash each item to create leaves + leaves = [hashlib.sha256(item.encode()).hexdigest() for item in items] + + # Build tree bottom-up + tree_levels = [leaves] + current_level = leaves + + while len(current_level) > 1: + next_level = [] + for i in range(0, len(current_level), 2): + left = current_level[i] + # If odd number, duplicate last node + right = current_level[i + 1] if i + 1 < len(current_level) else left + # Hash pair together + combined = hashlib.sha256((left + right).encode()).hexdigest() + next_level.append(combined) + tree_levels.append(next_level) + current_level = next_level + + root = current_level[0] + + return { + "root": root, + "tree": tree_levels, + "items": items, + "item_count": len(items), + "created_at": datetime.now(timezone.utc).isoformat() + } + + +def get_merkle_proof(tree: dict, item: str) -> Optional[List[dict]]: + """ + Get merkle proof for a specific item. + + Args: + tree: Merkle tree dict from build_merkle_tree + item: The item to prove membership for + + Returns: + List of proof steps, or None if item not in tree + """ + items = tree["items"] + if item not in items: + return None + + # Find leaf index + sorted_items = sorted(items) + leaf_index = sorted_items.index(item) + leaf_hash = hashlib.sha256(item.encode()).hexdigest() + + proof = [] + tree_levels = tree["tree"] + current_index = leaf_index + + for level in tree_levels[:-1]: # Skip root level + sibling_index = current_index ^ 1 # XOR to get sibling + if sibling_index < len(level): + sibling_hash = level[sibling_index] + proof.append({ + "hash": sibling_hash, + "position": "right" if current_index % 2 == 0 else "left" + }) + current_index //= 2 + + return proof + + +def verify_merkle_proof(item: str, proof: List[dict], root: str) -> bool: + """ + Verify a merkle proof. + + Args: + item: The item to verify + proof: Proof steps from get_merkle_proof + root: Expected merkle root + + Returns: + True if proof is valid + """ + current_hash = hashlib.sha256(item.encode()).hexdigest() + + for step in proof: + sibling = step["hash"] + if step["position"] == "right": + combined = current_hash + sibling + else: + combined = sibling + current_hash + current_hash = hashlib.sha256(combined.encode()).hexdigest() + + return current_hash == root + + +def submit_to_opentimestamps(hash_hex: str) -> Optional[bytes]: + """ + Submit a hash to OpenTimestamps for Bitcoin anchoring. + + Args: + hash_hex: Hex-encoded SHA256 hash to timestamp + + Returns: + Incomplete .ots proof bytes, or None on failure + + Note: + The returned proof is "incomplete" - it becomes complete + after Bitcoin confirms (usually 1-2 hours). Use upgrade_ots_proof + to get the complete proof later. + """ + hash_bytes = bytes.fromhex(hash_hex) + + for server in OTS_SERVERS: + try: + resp = requests.post( + f"{server}/digest", + data=hash_bytes, + headers={"Content-Type": "application/x-www-form-urlencoded"}, + timeout=10 + ) + if resp.status_code == 200: + logger.info(f"Submitted to OpenTimestamps via {server}") + return resp.content + except Exception as e: + logger.warning(f"OTS server {server} failed: {e}") + continue + + logger.error("All OpenTimestamps servers failed") + return None + + +def upgrade_ots_proof(ots_proof: bytes) -> Optional[bytes]: + """ + Upgrade an incomplete OTS proof to a complete Bitcoin-anchored proof. + + Args: + ots_proof: Incomplete .ots proof bytes + + Returns: + Complete .ots proof bytes, or None if not yet confirmed + + Note: + This should be called periodically (e.g., hourly) until + the proof is complete. Bitcoin confirmation takes ~1-2 hours. + """ + for server in OTS_SERVERS: + try: + resp = requests.post( + f"{server}/upgrade", + data=ots_proof, + headers={"Content-Type": "application/octet-stream"}, + timeout=10 + ) + if resp.status_code == 200 and len(resp.content) > len(ots_proof): + logger.info(f"OTS proof upgraded via {server}") + return resp.content + except Exception as e: + logger.warning(f"OTS upgrade via {server} failed: {e}") + continue + + return None + + +def append_to_backup(anchor_record: dict): + """ + Append anchor record to persistent JSONL backup file. + + Args: + anchor_record: Dict with anchor metadata + """ + _ensure_backup_dir() + + with open(ANCHOR_BACKUP_FILE, "a") as f: + f.write(json.dumps(anchor_record, sort_keys=True) + "\n") + + logger.info(f"Anchor backed up to {ANCHOR_BACKUP_FILE}") + + +def load_backup_anchors() -> List[dict]: + """ + Load all anchors from backup file. + + Returns: + List of anchor records + """ + if not ANCHOR_BACKUP_FILE.exists(): + return [] + + anchors = [] + with open(ANCHOR_BACKUP_FILE, "r") as f: + for line in f: + line = line.strip() + if line: + try: + anchors.append(json.loads(line)) + except json.JSONDecodeError: + logger.warning(f"Invalid JSON in backup: {line[:50]}...") + + return anchors + + +def get_latest_anchor_from_backup() -> Optional[dict]: + """Get the most recent anchor from backup.""" + anchors = load_backup_anchors() + return anchors[-1] if anchors else None + + +async def create_anchor( + activity_ids: List[str], + db_module, + ipfs_module +) -> Optional[dict]: + """ + Create a new anchor for a batch of activities. + + Args: + activity_ids: List of activity UUIDs to anchor + db_module: Database module with anchor functions + ipfs_module: IPFS client module + + Returns: + Anchor record dict, or None on failure + """ + if not activity_ids: + logger.info("No activities to anchor") + return None + + # Build merkle tree + tree = build_merkle_tree(activity_ids) + if not tree: + return None + + root = tree["root"] + logger.info(f"Built merkle tree: {len(activity_ids)} activities, root={root[:16]}...") + + # Store tree on IPFS + try: + tree_cid = ipfs_module.add_json(tree) + logger.info(f"Merkle tree stored on IPFS: {tree_cid}") + except Exception as e: + logger.error(f"Failed to store tree on IPFS: {e}") + tree_cid = None + + # Submit to OpenTimestamps + ots_proof = submit_to_opentimestamps(root) + + # Store OTS proof on IPFS too + ots_cid = None + if ots_proof and ipfs_module: + try: + ots_cid = ipfs_module.add_bytes(ots_proof) + logger.info(f"OTS proof stored on IPFS: {ots_cid}") + except Exception as e: + logger.warning(f"Failed to store OTS proof on IPFS: {e}") + + # Create anchor record + anchor_record = { + "merkle_root": root, + "tree_ipfs_cid": tree_cid, + "ots_proof_cid": ots_cid, + "activity_count": len(activity_ids), + "first_activity_id": activity_ids[0], + "last_activity_id": activity_ids[-1], + "created_at": datetime.now(timezone.utc).isoformat(), + "confirmed_at": None, + "bitcoin_txid": None + } + + # Save to database + if db_module: + try: + await db_module.create_anchor(anchor_record) + await db_module.mark_activities_anchored(activity_ids, root) + except Exception as e: + logger.error(f"Failed to save anchor to database: {e}") + + # Append to backup file (persistent) + append_to_backup(anchor_record) + + return anchor_record diff --git a/db.py b/db.py index f954b72..cce736a 100644 --- a/db.py +++ b/db.py @@ -72,7 +72,22 @@ CREATE TABLE IF NOT EXISTS activities ( actor_id TEXT NOT NULL, object_data JSONB NOT NULL, published TIMESTAMPTZ NOT NULL, - signature JSONB + signature JSONB, + anchor_root VARCHAR(64) -- Merkle root this activity is anchored to +); + +-- Anchors table (Bitcoin timestamps via OpenTimestamps) +CREATE TABLE IF NOT EXISTS anchors ( + id SERIAL PRIMARY KEY, + merkle_root VARCHAR(64) NOT NULL UNIQUE, + tree_ipfs_cid VARCHAR(128), + ots_proof_cid VARCHAR(128), + activity_count INTEGER NOT NULL, + first_activity_id UUID, + last_activity_id UUID, + created_at TIMESTAMPTZ NOT NULL DEFAULT NOW(), + confirmed_at TIMESTAMPTZ, + bitcoin_txid VARCHAR(64) ); -- Followers table @@ -94,6 +109,8 @@ CREATE INDEX IF NOT EXISTS idx_assets_created_at ON assets(created_at DESC); CREATE INDEX IF NOT EXISTS idx_assets_tags ON assets USING GIN(tags); CREATE INDEX IF NOT EXISTS idx_activities_actor_id ON activities(actor_id); CREATE INDEX IF NOT EXISTS idx_activities_published ON activities(published DESC); +CREATE INDEX IF NOT EXISTS idx_activities_anchor ON activities(anchor_root); +CREATE INDEX IF NOT EXISTS idx_anchors_created ON anchors(created_at DESC); CREATE INDEX IF NOT EXISTS idx_followers_username ON followers(username); """ @@ -567,3 +584,120 @@ async def get_stats() -> dict: activities = await conn.fetchval("SELECT COUNT(*) FROM activities") users = await conn.fetchval("SELECT COUNT(*) FROM users") return {"assets": assets, "activities": activities, "users": users} + + +# ============ Anchors (Bitcoin timestamps) ============ + +async def get_unanchored_activities() -> list[dict]: + """Get all activities not yet anchored to Bitcoin.""" + async with get_connection() as conn: + rows = await conn.fetch( + """SELECT activity_id, activity_type, actor_id, object_data, published, signature + FROM activities WHERE anchor_root IS NULL ORDER BY published ASC""" + ) + return [_parse_activity_row(row) for row in rows] + + +async def create_anchor(anchor: dict) -> dict: + """Create an anchor record.""" + async with get_connection() as conn: + row = await conn.fetchrow( + """INSERT INTO anchors (merkle_root, tree_ipfs_cid, ots_proof_cid, + activity_count, first_activity_id, last_activity_id) + VALUES ($1, $2, $3, $4, $5, $6) + RETURNING *""", + anchor["merkle_root"], + anchor.get("tree_ipfs_cid"), + anchor.get("ots_proof_cid"), + anchor["activity_count"], + UUID(anchor["first_activity_id"]) if anchor.get("first_activity_id") else None, + UUID(anchor["last_activity_id"]) if anchor.get("last_activity_id") else None + ) + return dict(row) + + +async def mark_activities_anchored(activity_ids: list[str], merkle_root: str) -> int: + """Mark activities as anchored with the given merkle root.""" + async with get_connection() as conn: + result = await conn.execute( + """UPDATE activities SET anchor_root = $1 + WHERE activity_id = ANY($2::uuid[])""", + merkle_root, + [UUID(aid) for aid in activity_ids] + ) + # Returns "UPDATE N" + return int(result.split()[1]) if result else 0 + + +async def get_anchor(merkle_root: str) -> Optional[dict]: + """Get anchor by merkle root.""" + async with get_connection() as conn: + row = await conn.fetchrow( + "SELECT * FROM anchors WHERE merkle_root = $1", + merkle_root + ) + if row: + result = dict(row) + if result.get("first_activity_id"): + result["first_activity_id"] = str(result["first_activity_id"]) + if result.get("last_activity_id"): + result["last_activity_id"] = str(result["last_activity_id"]) + if result.get("created_at"): + result["created_at"] = result["created_at"].isoformat() + if result.get("confirmed_at"): + result["confirmed_at"] = result["confirmed_at"].isoformat() + return result + return None + + +async def get_all_anchors() -> list[dict]: + """Get all anchors, newest first.""" + async with get_connection() as conn: + rows = await conn.fetch( + "SELECT * FROM anchors ORDER BY created_at DESC" + ) + results = [] + for row in rows: + result = dict(row) + if result.get("first_activity_id"): + result["first_activity_id"] = str(result["first_activity_id"]) + if result.get("last_activity_id"): + result["last_activity_id"] = str(result["last_activity_id"]) + if result.get("created_at"): + result["created_at"] = result["created_at"].isoformat() + if result.get("confirmed_at"): + result["confirmed_at"] = result["confirmed_at"].isoformat() + results.append(result) + return results + + +async def update_anchor_confirmed(merkle_root: str, bitcoin_txid: str) -> bool: + """Mark anchor as confirmed with Bitcoin txid.""" + async with get_connection() as conn: + result = await conn.execute( + """UPDATE anchors SET confirmed_at = NOW(), bitcoin_txid = $1 + WHERE merkle_root = $2""", + bitcoin_txid, merkle_root + ) + return result == "UPDATE 1" + + +async def get_anchor_stats() -> dict: + """Get anchoring statistics.""" + async with get_connection() as conn: + total_anchors = await conn.fetchval("SELECT COUNT(*) FROM anchors") + confirmed_anchors = await conn.fetchval( + "SELECT COUNT(*) FROM anchors WHERE confirmed_at IS NOT NULL" + ) + anchored_activities = await conn.fetchval( + "SELECT COUNT(*) FROM activities WHERE anchor_root IS NOT NULL" + ) + unanchored_activities = await conn.fetchval( + "SELECT COUNT(*) FROM activities WHERE anchor_root IS NULL" + ) + return { + "total_anchors": total_anchors, + "confirmed_anchors": confirmed_anchors, + "anchored_activities": anchored_activities, + "unanchored_activities": unanchored_activities + } diff --git a/docker-compose.yml b/docker-compose.yml index 77ad4bb..b5ebae8 100644 --- a/docker-compose.yml +++ b/docker-compose.yml @@ -40,9 +40,11 @@ services: - ARTDAG_DATA=/data/l2 - DATABASE_URL=postgresql://artdag:${POSTGRES_PASSWORD:-artdag}@postgres:5432/artdag - IPFS_API=/dns/ipfs/tcp/5001 + - ANCHOR_BACKUP_DIR=/data/anchors # ARTDAG_DOMAIN, ARTDAG_USER, JWT_SECRET from .env file volumes: - l2_data:/data/l2 # Still needed for RSA keys + - anchor_backup:/data/anchors # Persistent anchor proofs (survives DB wipes) networks: - internal - externalnet @@ -58,6 +60,7 @@ volumes: l2_data: postgres_data: ipfs_data: + anchor_backup: # Persistent - don't delete when resetting DB networks: internal: diff --git a/server.py b/server.py index 61a722f..d1c3415 100644 --- a/server.py +++ b/server.py @@ -297,6 +297,17 @@ def wants_html(request: Request) -> bool: return "text/html" in accept and "application/json" not in accept and "application/activity+json" not in accept +def format_date(value, length: int = 10) -> str: + """Format a date value (datetime or string) to a string, sliced to length.""" + if value is None: + return "" + if hasattr(value, 'isoformat'): + return value.isoformat()[:length] + if isinstance(value, str): + return value[:length] + return "" + + # ============ Auth UI Endpoints ============ @app.get("/login", response_class=HTMLResponse) @@ -483,7 +494,7 @@ async def ui_activity_detail(activity_index: int, request: Request): activity_id = activity.get("activity_id", "") actor_id = activity.get("actor_id", "") actor_name = actor_id.split("/")[-1] if actor_id else "unknown" - published = activity.get("published", "")[:10] + published = format_date(activity.get("published")) obj = activity.get("object_data", {}) # Object details @@ -576,7 +587,7 @@ async def ui_activity_detail(activity_index: int, request: Request): recipe = provenance.get("recipe", "") inputs = provenance.get("inputs", []) l1_run_id = provenance.get("l1_run_id", "") - rendered_at = provenance.get("rendered_at", "")[:10] if provenance.get("rendered_at") else "" + rendered_at = format_date(provenance.get("rendered_at")) effects_commit = provenance.get("effects_commit", "") effect_url = provenance.get("effect_url") infrastructure = provenance.get("infrastructure", {}) @@ -747,7 +758,7 @@ async def ui_asset_detail(name: str, request: Request): origin = asset.get("origin") or {} provenance = asset.get("provenance") or {} metadata = asset.get("metadata") or {} - created_at = asset.get("created_at", "")[:10] + created_at = format_date(asset.get("created_at")) type_color = "bg-blue-600" if asset_type == "image" else "bg-purple-600" if asset_type == "video" else "bg-gray-600" @@ -829,7 +840,7 @@ async def ui_asset_detail(name: str, request: Request): recipe = provenance.get("recipe", "") inputs = provenance.get("inputs", []) l1_run_id = provenance.get("l1_run_id", "") - rendered_at = provenance.get("rendered_at", "")[:10] if provenance.get("rendered_at") else "" + rendered_at = format_date(provenance.get("rendered_at")) effects_commit = provenance.get("effects_commit", "") infrastructure = provenance.get("infrastructure", {}) @@ -1273,13 +1284,14 @@ async def get_users_list(request: Request, page: int = 1, limit: int = 20): rows = "" for uname, user_data in users_page: webfinger = f"@{uname}@{DOMAIN}" + created_at = format_date(user_data.get("created_at")) rows += f''' {uname} {webfinger} - {user_data.get("created_at", "")[:10]} + {created_at} ''' @@ -2104,7 +2116,7 @@ async def get_activities(request: Request, page: int = 1, limit: int = 20): {actor_name} - {activity.get("published", "")[:10]} + {format_date(activity.get("published"))} View @@ -2231,6 +2243,137 @@ async def get_object(content_hash: str, request: Request): raise HTTPException(404, f"Object not found: {content_hash}") +# ============ Anchoring (Bitcoin timestamps) ============ + +@app.post("/anchors/create") +async def create_anchor_endpoint(user: User = Depends(get_required_user)): + """ + Create a new anchor for all unanchored activities. + + Builds a merkle tree, stores it on IPFS, and submits to OpenTimestamps + for Bitcoin anchoring. The anchor proof is backed up to persistent storage. + """ + import anchoring + import ipfs_client + + # Get unanchored activities + unanchored = await db.get_unanchored_activities() + if not unanchored: + return {"message": "No unanchored activities", "anchored": 0} + + activity_ids = [a["activity_id"] for a in unanchored] + + # Create anchor + anchor = await anchoring.create_anchor(activity_ids, db, ipfs_client) + + if anchor: + return { + "message": f"Anchored {len(activity_ids)} activities", + "merkle_root": anchor["merkle_root"], + "tree_ipfs_cid": anchor.get("tree_ipfs_cid"), + "activity_count": anchor["activity_count"] + } + else: + raise HTTPException(500, "Failed to create anchor") + + +@app.get("/anchors") +async def list_anchors(): + """List all anchors.""" + anchors = await db.get_all_anchors() + stats = await db.get_anchor_stats() + return { + "anchors": anchors, + "stats": stats + } + + +@app.get("/anchors/{merkle_root}") +async def get_anchor_endpoint(merkle_root: str): + """Get anchor details by merkle root.""" + anchor = await db.get_anchor(merkle_root) + if not anchor: + raise HTTPException(404, f"Anchor not found: {merkle_root}") + return anchor + + +@app.get("/anchors/{merkle_root}/tree") +async def get_anchor_tree(merkle_root: str): + """Get the full merkle tree from IPFS.""" + anchor = await db.get_anchor(merkle_root) + if not anchor: + raise HTTPException(404, f"Anchor not found: {merkle_root}") + + tree_cid = anchor.get("tree_ipfs_cid") + if not tree_cid: + raise HTTPException(404, "Anchor has no tree on IPFS") + + import ipfs_client + try: + tree_bytes = ipfs_client.get_bytes(tree_cid) + if tree_bytes: + return json.loads(tree_bytes) + except Exception as e: + raise HTTPException(500, f"Failed to fetch tree from IPFS: {e}") + + +@app.get("/anchors/verify/{activity_id}") +async def verify_activity_anchor(activity_id: str): + """ + Verify an activity's anchor proof. + + Returns the merkle proof showing this activity is included in an anchored batch. + """ + import anchoring + import ipfs_client + + # Get activity + activity = await db.get_activity(activity_id) + if not activity: + raise HTTPException(404, f"Activity not found: {activity_id}") + + anchor_root = activity.get("anchor_root") + if not anchor_root: + return {"verified": False, "reason": "Activity not yet anchored"} + + # Get anchor + anchor = await db.get_anchor(anchor_root) + if not anchor: + return {"verified": False, "reason": "Anchor record not found"} + + # Get tree from IPFS + tree_cid = anchor.get("tree_ipfs_cid") + if not tree_cid: + return {"verified": False, "reason": "Merkle tree not on IPFS"} + + try: + tree_bytes = ipfs_client.get_bytes(tree_cid) + tree = json.loads(tree_bytes) if tree_bytes else None + except Exception: + return {"verified": False, "reason": "Failed to fetch tree from IPFS"} + + if not tree: + return {"verified": False, "reason": "Could not load merkle tree"} + + # Get proof + proof = anchoring.get_merkle_proof(tree, activity_id) + if not proof: + return {"verified": False, "reason": "Activity not in merkle tree"} + + # Verify proof + valid = anchoring.verify_merkle_proof(activity_id, proof, anchor_root) + + return { + "verified": valid, + "activity_id": activity_id, + "merkle_root": anchor_root, + "tree_ipfs_cid": tree_cid, + "proof": proof, + "bitcoin_txid": anchor.get("bitcoin_txid"), + "confirmed_at": anchor.get("confirmed_at") + } + + if __name__ == "__main__": import uvicorn uvicorn.run(app, host="0.0.0.0", port=8200)