From 647c564c47ad4a3347490f9dbbeac007ce9fa1bb Mon Sep 17 00:00:00 2001 From: gilesb Date: Fri, 9 Jan 2026 00:59:12 +0000 Subject: [PATCH] Implement atomic publishing with IPFS and DB transactions All publishing operations now use three-phase atomic approach: 1. Phase 1: Preparation - validate inputs, gather IPFS CIDs 2. Phase 2: IPFS operations - pin all content before any DB changes 3. Phase 3: DB transaction - all-or-nothing database commits Changes: ipfs_client.py: - Add IPFSError exception class - Add add_bytes() to store content on IPFS - Add add_json() to store JSON documents on IPFS - Add pin_or_raise() for synchronous pinning with error handling db.py: - Add transaction() context manager for atomic DB operations - Add create_asset_tx() for transactional asset creation - Add create_activity_tx() for transactional activity creation - Add get_asset_by_hash_tx() for lookup within transactions - Add asset_exists_by_name_tx() for existence check within transactions server.py: - Rewrite record_run: - Check L2 first for inputs, fall back to L1 - Store recipe JSON on IPFS with CID in provenance - Auto-register input assets if not already on L2 - All operations atomic - Rewrite publish_cache: - IPFS CID now required - Synchronous pinning before DB commit - Transaction for asset + activity - Rewrite _register_asset_impl: - IPFS CID now required - Synchronous pinning before DB commit - Transaction for asset + activity Co-Authored-By: Claude Opus 4.5 --- db.py | 81 +++++++++ ipfs_client.py | 79 ++++++++- server.py | 465 ++++++++++++++++++++++++++++++++++--------------- 3 files changed, 488 insertions(+), 137 deletions(-) diff --git a/db.py b/db.py index 9131dc5..f954b72 100644 --- a/db.py +++ b/db.py @@ -134,6 +134,23 @@ async def get_connection(): yield conn +@asynccontextmanager +async def transaction(): + """ + Get a connection with an active transaction. + + Usage: + async with db.transaction() as conn: + await create_asset_tx(conn, asset1) + await create_asset_tx(conn, asset2) + await create_activity_tx(conn, activity) + # Commits on exit, rolls back on exception + """ + async with get_pool().acquire() as conn: + async with conn.transaction(): + yield conn + + # ============ Users ============ async def get_user(username: str) -> Optional[dict]: @@ -329,6 +346,52 @@ def _parse_asset_row(row) -> dict: return asset +# ============ Assets (Transaction variants) ============ + +async def get_asset_by_hash_tx(conn, content_hash: str) -> Optional[dict]: + """Get asset by content hash within a transaction.""" + row = await conn.fetchrow( + """SELECT name, content_hash, ipfs_cid, asset_type, tags, metadata, url, + provenance, description, origin, owner, created_at, updated_at + FROM assets WHERE content_hash = $1""", + content_hash + ) + if row: + return _parse_asset_row(row) + return None + + +async def asset_exists_by_name_tx(conn, name: str) -> bool: + """Check if asset name exists within a transaction.""" + return await conn.fetchval( + "SELECT EXISTS(SELECT 1 FROM assets WHERE name = $1)", + name + ) + + +async def create_asset_tx(conn, asset: dict) -> dict: + """Create a new asset within a transaction.""" + row = await conn.fetchrow( + """INSERT INTO assets (name, content_hash, ipfs_cid, asset_type, tags, metadata, + url, provenance, description, origin, owner, created_at) + VALUES ($1, $2, $3, $4, $5, $6, $7, $8, $9, $10, $11, $12) + RETURNING *""", + asset["name"], + asset["content_hash"], + asset.get("ipfs_cid"), + asset["asset_type"], + json.dumps(asset.get("tags", [])), + json.dumps(asset.get("metadata", {})), + asset.get("url"), + json.dumps(asset.get("provenance")) if asset.get("provenance") else None, + asset.get("description"), + json.dumps(asset.get("origin")) if asset.get("origin") else None, + asset["owner"], + _parse_timestamp(asset.get("created_at")) + ) + return _parse_asset_row(row) + + # ============ Activities ============ async def get_activity(activity_id: str) -> Optional[dict]: @@ -432,6 +495,24 @@ def _parse_activity_row(row) -> dict: return activity +# ============ Activities (Transaction variants) ============ + +async def create_activity_tx(conn, activity: dict) -> dict: + """Create a new activity within a transaction.""" + row = await conn.fetchrow( + """INSERT INTO activities (activity_id, activity_type, actor_id, object_data, published, signature) + VALUES ($1, $2, $3, $4, $5, $6) + RETURNING *""", + UUID(activity["activity_id"]), + activity["activity_type"], + activity["actor_id"], + json.dumps(activity["object_data"]), + _parse_timestamp(activity["published"]), + json.dumps(activity.get("signature")) if activity.get("signature") else None + ) + return _parse_activity_row(row) + + # ============ Followers ============ async def get_followers(username: str) -> list[dict]: diff --git a/ipfs_client.py b/ipfs_client.py index 6d3117f..108327b 100644 --- a/ipfs_client.py +++ b/ipfs_client.py @@ -2,10 +2,11 @@ """ IPFS client for Art DAG L2 server. -Provides functions to fetch and pin content from IPFS. +Provides functions to fetch, pin, and add content to IPFS. Uses direct HTTP API calls for compatibility with all Kubo versions. """ +import json import logging import os import re @@ -13,6 +14,11 @@ from typing import Optional import requests + +class IPFSError(Exception): + """Raised when an IPFS operation fails.""" + pass + logger = logging.getLogger(__name__) # IPFS API multiaddr - default to local, docker uses /dns/ipfs/tcp/5001 @@ -147,3 +153,74 @@ def get_node_id() -> Optional[str]: except Exception as e: logger.error(f"Failed to get node ID: {e}") return None + + +def add_bytes(data: bytes, pin: bool = True) -> str: + """ + Add bytes data to IPFS and optionally pin it. + + Args: + data: Bytes to add + pin: Whether to pin the data (default: True) + + Returns: + IPFS CID + + Raises: + IPFSError: If adding fails + """ + try: + url = f"{IPFS_BASE_URL}/api/v0/add" + params = {"pin": str(pin).lower()} + files = {"file": ("data", data)} + + response = requests.post(url, params=params, files=files, timeout=IPFS_TIMEOUT) + response.raise_for_status() + result = response.json() + cid = result["Hash"] + + logger.info(f"Added to IPFS: {len(data)} bytes -> {cid}") + return cid + except Exception as e: + logger.error(f"Failed to add bytes to IPFS: {e}") + raise IPFSError(f"Failed to add bytes to IPFS: {e}") from e + + +def add_json(data: dict) -> str: + """ + Serialize dict to JSON and add to IPFS. + + Args: + data: Dictionary to serialize and store + + Returns: + IPFS CID + + Raises: + IPFSError: If adding fails + """ + json_bytes = json.dumps(data, indent=2, sort_keys=True).encode('utf-8') + return add_bytes(json_bytes, pin=True) + + +def pin_or_raise(cid: str) -> None: + """ + Pin a CID on IPFS. Raises exception on failure. + + Args: + cid: IPFS CID to pin + + Raises: + IPFSError: If pinning fails + """ + try: + url = f"{IPFS_BASE_URL}/api/v0/pin/add" + params = {"arg": cid} + + response = requests.post(url, params=params, timeout=IPFS_TIMEOUT) + response.raise_for_status() + + logger.info(f"Pinned on IPFS: {cid}") + except Exception as e: + logger.error(f"Failed to pin on IPFS: {e}") + raise IPFSError(f"Failed to pin {cid}: {e}") from e diff --git a/server.py b/server.py index 9300076..61a722f 100644 --- a/server.py +++ b/server.py @@ -1647,65 +1647,89 @@ def _pin_ipfs_async(cid: str): async def _register_asset_impl(req: RegisterRequest, owner: str): - """Internal implementation for registering an asset.""" + """ + Internal implementation for registering an asset atomically. + + Requires IPFS CID - content must be on IPFS before registering. + Uses a transaction for all DB operations. + """ + import ipfs_client + from ipfs_client import IPFSError + + # ===== PHASE 1: VALIDATION ===== + # IPFS CID is required + if not req.ipfs_cid: + raise HTTPException(400, "IPFS CID is required for registration") + # Check if name exists if await db.asset_exists(req.name): raise HTTPException(400, f"Asset already exists: {req.name}") - # Pin content on IPFS if CID provided (fire-and-forget, don't block) - if req.ipfs_cid: - import threading - threading.Thread(target=_pin_ipfs_async, args=(req.ipfs_cid,), daemon=True).start() + # ===== PHASE 2: IPFS OPERATIONS ===== + try: + ipfs_client.pin_or_raise(req.ipfs_cid) + except IPFSError as e: + raise HTTPException(500, f"IPFS operation failed: {e}") - # Create asset + # ===== PHASE 3: DB TRANSACTION ===== now = datetime.now(timezone.utc).isoformat() - asset = { - "name": req.name, - "content_hash": req.content_hash, - "ipfs_cid": req.ipfs_cid, - "asset_type": req.asset_type, - "tags": req.tags, - "metadata": req.metadata, - "url": req.url, - "provenance": req.provenance, - "owner": owner, - "created_at": now - } - # Save asset to database - created_asset = await db.create_asset(asset) + try: + async with db.transaction() as conn: + # Check name again inside transaction (race condition protection) + if await db.asset_exists_by_name_tx(conn, req.name): + raise HTTPException(400, f"Asset already exists: {req.name}") - # Create ownership activity - object_data = { - "type": req.asset_type.capitalize(), - "name": req.name, - "id": f"https://{DOMAIN}/objects/{req.content_hash}", - "contentHash": { - "algorithm": "sha3-256", - "value": req.content_hash - }, - "attributedTo": f"https://{DOMAIN}/users/{owner}" - } + # Create asset + asset = { + "name": req.name, + "content_hash": req.content_hash, + "ipfs_cid": req.ipfs_cid, + "asset_type": req.asset_type, + "tags": req.tags, + "metadata": req.metadata, + "url": req.url, + "provenance": req.provenance, + "owner": owner, + "created_at": now + } + created_asset = await db.create_asset_tx(conn, asset) - # Include provenance in activity object_data if present - if req.provenance: - object_data["provenance"] = req.provenance + # Create ownership activity + object_data = { + "type": req.asset_type.capitalize(), + "name": req.name, + "id": f"https://{DOMAIN}/objects/{req.content_hash}", + "contentHash": { + "algorithm": "sha3-256", + "value": req.content_hash + }, + "attributedTo": f"https://{DOMAIN}/users/{owner}" + } - activity = { - "activity_id": str(uuid.uuid4()), - "activity_type": "Create", - "actor_id": f"https://{DOMAIN}/users/{owner}", - "object_data": object_data, - "published": now - } + # Include provenance in activity object_data if present + if req.provenance: + object_data["provenance"] = req.provenance - # Sign activity with the owner's keys - activity = sign_activity(activity, owner) + activity = { + "activity_id": str(uuid.uuid4()), + "activity_type": "Create", + "actor_id": f"https://{DOMAIN}/users/{owner}", + "object_data": object_data, + "published": now + } + activity = sign_activity(activity, owner) + created_activity = await db.create_activity_tx(conn, activity) - # Save activity to database - await db.create_activity(activity) + # Transaction commits here on successful exit - return {"asset": created_asset, "activity": activity} + except HTTPException: + raise + except Exception as e: + logger.error(f"Database transaction failed: {e}") + raise HTTPException(500, f"Failed to register asset: {e}") + + return {"asset": created_asset, "activity": created_activity} @app.post("/assets") @@ -1717,11 +1741,23 @@ async def register_asset(req: RegisterRequest, user: User = Depends(get_required @app.post("/assets/record-run") @app.post("/registry/record-run") # Legacy route async def record_run(req: RecordRunRequest, user: User = Depends(get_required_user)): - """Record an L1 run and register the output. Requires authentication.""" - # Fetch run from the specified L1 server + """ + Record an L1 run and register the output atomically. + + Ensures all operations succeed or none do: + 1. All input assets registered (if not already on L2) + pinned on IPFS + 2. Output asset registered + pinned on IPFS + 3. Recipe serialized to JSON, stored on IPFS, CID saved in provenance + """ + import ipfs_client + from ipfs_client import IPFSError + + # ===== PHASE 1: PREPARATION (read-only) ===== l1_url = req.l1_server.rstrip('/') + + # Fetch run from L1 try: - resp = requests.get(f"{l1_url}/runs/{req.run_id}") + resp = requests.get(f"{l1_url}/runs/{req.run_id}", timeout=30) resp.raise_for_status() run = resp.json() except Exception as e: @@ -1734,7 +1770,7 @@ async def record_run(req: RecordRunRequest, user: User = Depends(get_required_us if not output_hash: raise HTTPException(400, "Run has no output hash") - # Fetch media type from L1 cache + # Fetch output cache info from L1 (must exist - it's new) try: cache_resp = requests.get( f"{l1_url}/cache/{output_hash}", @@ -1743,45 +1779,187 @@ async def record_run(req: RecordRunRequest, user: User = Depends(get_required_us ) cache_resp.raise_for_status() cache_info = cache_resp.json() - media_type = cache_info.get("media_type", "image") - ipfs_cid = cache_info.get("ipfs_cid") + output_media_type = cache_info.get("media_type", "image") + output_ipfs_cid = cache_info.get("ipfs_cid") except Exception as e: - logger.warning(f"Failed to fetch cache info from L1: {e}") - media_type = "image" # Default fallback - ipfs_cid = None + raise HTTPException(400, f"Failed to fetch output cache info: {e}") - # Build provenance from run - provenance = { - "inputs": [{"content_hash": h} for h in run.get("inputs", [])], - "recipe": run.get("recipe"), - "effect_url": run.get("effect_url"), - "effects_commit": run.get("effects_commit"), - "l1_server": l1_url, - "l1_run_id": req.run_id, - "rendered_at": run.get("completed_at"), - "infrastructure": run.get("infrastructure") - } + if not output_ipfs_cid: + raise HTTPException(400, "Output has no IPFS CID - cannot publish") - # Register the output under the authenticated user - return await _register_asset_impl(RegisterRequest( - name=req.output_name, - content_hash=output_hash, - ipfs_cid=ipfs_cid, - asset_type=media_type, # Detected from L1 cache - tags=["rendered", "l1"], - metadata={"l1_server": l1_url, "l1_run_id": req.run_id}, - provenance=provenance - ), user.username) + # Gather input info: check L2 first, then fall back to L1 + input_hashes = run.get("inputs", []) + input_infos = [] # List of {content_hash, ipfs_cid, media_type, existing_asset} + + for input_hash in input_hashes: + # Check if already on L2 + existing = await db.get_asset_by_hash(input_hash) + if existing and existing.get("ipfs_cid"): + input_infos.append({ + "content_hash": input_hash, + "ipfs_cid": existing["ipfs_cid"], + "media_type": existing.get("asset_type", "image"), + "existing_asset": existing + }) + else: + # Not on L2, try L1 + try: + inp_resp = requests.get( + f"{l1_url}/cache/{input_hash}", + headers={"Accept": "application/json"}, + timeout=10 + ) + inp_resp.raise_for_status() + inp_info = inp_resp.json() + ipfs_cid = inp_info.get("ipfs_cid") + if not ipfs_cid: + raise HTTPException(400, f"Input {input_hash[:16]}... has no IPFS CID (not on L2 or L1)") + input_infos.append({ + "content_hash": input_hash, + "ipfs_cid": ipfs_cid, + "media_type": inp_info.get("media_type", "image"), + "existing_asset": None + }) + except HTTPException: + raise + except Exception as e: + raise HTTPException(400, f"Input {input_hash[:16]}... not on L2 and failed to fetch from L1: {e}") + + # Prepare recipe data + recipe_data = run.get("recipe") + if not recipe_data: + recipe_data = { + "name": run.get("recipe_name", "unknown"), + "effect_url": run.get("effect_url"), + "effects_commit": run.get("effects_commit"), + } + + # ===== PHASE 2: IPFS OPERATIONS (blocking, before any DB changes) ===== + try: + # Pin all inputs + for inp in input_infos: + ipfs_client.pin_or_raise(inp["ipfs_cid"]) + + # Pin output + ipfs_client.pin_or_raise(output_ipfs_cid) + + # Store recipe on IPFS + recipe_cid = ipfs_client.add_json(recipe_data) + + except IPFSError as e: + raise HTTPException(500, f"IPFS operation failed: {e}") + + # ===== PHASE 3: DB TRANSACTION (all-or-nothing) ===== + now = datetime.now(timezone.utc).isoformat() + + try: + async with db.transaction() as conn: + # Register input assets (if not already on L2) + registered_inputs = [] + for inp in input_infos: + if inp["existing_asset"]: + # Already on L2 + registered_inputs.append({ + "content_hash": inp["content_hash"], + "name": inp["existing_asset"]["name"] + }) + else: + # Create new input asset + input_name = f"input-{inp['content_hash'][:16]}" + input_asset = { + "name": input_name, + "content_hash": inp["content_hash"], + "ipfs_cid": inp["ipfs_cid"], + "asset_type": inp["media_type"], + "tags": ["auto-registered", "input"], + "metadata": {"auto_registered_from_run": req.run_id}, + "owner": user.username, + "created_at": now + } + await db.create_asset_tx(conn, input_asset) + registered_inputs.append({ + "content_hash": inp["content_hash"], + "name": input_name + }) + + # Check output name doesn't exist + if await db.asset_exists_by_name_tx(conn, req.output_name): + raise HTTPException(400, f"Asset already exists: {req.output_name}") + + # Build provenance with recipe CID + provenance = { + "inputs": registered_inputs, + "recipe": recipe_data, + "recipe_cid": recipe_cid, + "effect_url": run.get("effect_url"), + "effects_commit": run.get("effects_commit"), + "l1_server": l1_url, + "l1_run_id": req.run_id, + "rendered_at": run.get("completed_at"), + "infrastructure": run.get("infrastructure") + } + + # Create output asset + output_asset = { + "name": req.output_name, + "content_hash": output_hash, + "ipfs_cid": output_ipfs_cid, + "asset_type": output_media_type, + "tags": ["rendered", "l1"], + "metadata": {"l1_server": l1_url, "l1_run_id": req.run_id}, + "provenance": provenance, + "owner": user.username, + "created_at": now + } + created_asset = await db.create_asset_tx(conn, output_asset) + + # Create activity + object_data = { + "type": output_media_type.capitalize(), + "name": req.output_name, + "id": f"https://{DOMAIN}/objects/{output_hash}", + "contentHash": { + "algorithm": "sha3-256", + "value": output_hash + }, + "attributedTo": f"https://{DOMAIN}/users/{user.username}", + "provenance": provenance + } + + activity = { + "activity_id": str(uuid.uuid4()), + "activity_type": "Create", + "actor_id": f"https://{DOMAIN}/users/{user.username}", + "object_data": object_data, + "published": now + } + activity = sign_activity(activity, user.username) + created_activity = await db.create_activity_tx(conn, activity) + + # Transaction commits here on successful exit + + except HTTPException: + raise + except Exception as e: + logger.error(f"Database transaction failed: {e}") + raise HTTPException(500, f"Failed to record run: {e}") + + return {"asset": created_asset, "activity": created_activity} @app.post("/assets/publish-cache") async def publish_cache(req: PublishCacheRequest, user: User = Depends(get_required_user)): """ - Publish a cache item from L1 with metadata. + Publish a cache item from L1 with metadata atomically. Requires origin to be set (self or external URL). - Creates a new asset and Create activity. + Requires IPFS CID - content must be on IPFS before publishing. + Creates a new asset and Create activity in a single transaction. """ + import ipfs_client + from ipfs_client import IPFSError + + # ===== PHASE 1: VALIDATION ===== # Validate origin if not req.origin or "type" not in req.origin: raise HTTPException(400, "Origin is required for publishing (type: 'self' or 'external')") @@ -1793,78 +1971,93 @@ async def publish_cache(req: PublishCacheRequest, user: User = Depends(get_requi if origin_type == "external" and not req.origin.get("url"): raise HTTPException(400, "External origin requires a URL") + # IPFS CID is now required + if not req.ipfs_cid: + raise HTTPException(400, "IPFS CID is required for publishing") + # Check if asset name already exists if await db.asset_exists(req.asset_name): raise HTTPException(400, f"Asset name already exists: {req.asset_name}") - # Pin content on IPFS if CID provided (fire-and-forget, don't block) - if req.ipfs_cid: - import threading - threading.Thread(target=_pin_ipfs_async, args=(req.ipfs_cid,), daemon=True).start() + # ===== PHASE 2: IPFS OPERATIONS ===== + try: + ipfs_client.pin_or_raise(req.ipfs_cid) + except IPFSError as e: + raise HTTPException(500, f"IPFS operation failed: {e}") - # Create asset + # ===== PHASE 3: DB TRANSACTION ===== now = datetime.now(timezone.utc).isoformat() - asset = { - "name": req.asset_name, - "content_hash": req.content_hash, - "ipfs_cid": req.ipfs_cid, - "asset_type": req.asset_type, - "tags": req.tags, - "description": req.description, - "origin": req.origin, - "metadata": req.metadata, - "owner": user.username, - "created_at": now - } - # Save asset to database - created_asset = await db.create_asset(asset) + try: + async with db.transaction() as conn: + # Check name again inside transaction (race condition protection) + if await db.asset_exists_by_name_tx(conn, req.asset_name): + raise HTTPException(400, f"Asset name already exists: {req.asset_name}") - # Create ownership activity with origin info - object_data = { - "type": req.asset_type.capitalize(), - "name": req.asset_name, - "id": f"https://{DOMAIN}/objects/{req.content_hash}", - "contentHash": { - "algorithm": "sha3-256", - "value": req.content_hash - }, - "attributedTo": f"https://{DOMAIN}/users/{user.username}", - "tag": req.tags - } + # Create asset + asset = { + "name": req.asset_name, + "content_hash": req.content_hash, + "ipfs_cid": req.ipfs_cid, + "asset_type": req.asset_type, + "tags": req.tags, + "description": req.description, + "origin": req.origin, + "metadata": req.metadata, + "owner": user.username, + "created_at": now + } + created_asset = await db.create_asset_tx(conn, asset) - if req.description: - object_data["summary"] = req.description + # Create ownership activity with origin info + object_data = { + "type": req.asset_type.capitalize(), + "name": req.asset_name, + "id": f"https://{DOMAIN}/objects/{req.content_hash}", + "contentHash": { + "algorithm": "sha3-256", + "value": req.content_hash + }, + "attributedTo": f"https://{DOMAIN}/users/{user.username}", + "tag": req.tags + } - # Include origin in ActivityPub object - if origin_type == "self": - object_data["generator"] = { - "type": "Application", - "name": "Art DAG", - "note": "Original content created by the author" - } - else: - object_data["source"] = { - "type": "Link", - "href": req.origin.get("url"), - "name": req.origin.get("note", "External source") - } + if req.description: + object_data["summary"] = req.description - activity = { - "activity_id": str(uuid.uuid4()), - "activity_type": "Create", - "actor_id": f"https://{DOMAIN}/users/{user.username}", - "object_data": object_data, - "published": now - } + # Include origin in ActivityPub object + if origin_type == "self": + object_data["generator"] = { + "type": "Application", + "name": "Art DAG", + "note": "Original content created by the author" + } + else: + object_data["source"] = { + "type": "Link", + "href": req.origin.get("url"), + "name": req.origin.get("note", "External source") + } - # Sign activity with the user's keys - activity = sign_activity(activity, user.username) + activity = { + "activity_id": str(uuid.uuid4()), + "activity_type": "Create", + "actor_id": f"https://{DOMAIN}/users/{user.username}", + "object_data": object_data, + "published": now + } + activity = sign_activity(activity, user.username) + created_activity = await db.create_activity_tx(conn, activity) - # Save activity to database - await db.create_activity(activity) + # Transaction commits here on successful exit - return {"asset": created_asset, "activity": activity} + except HTTPException: + raise + except Exception as e: + logger.error(f"Database transaction failed: {e}") + raise HTTPException(500, f"Failed to publish cache item: {e}") + + return {"asset": created_asset, "activity": created_activity} # ============ Activities Endpoints ============