Implement atomic publishing with IPFS and DB transactions

All publishing operations now use three-phase atomic approach:
1. Phase 1: Preparation - validate inputs, gather IPFS CIDs
2. Phase 2: IPFS operations - pin all content before any DB changes
3. Phase 3: DB transaction - all-or-nothing database commits

Changes:

ipfs_client.py:
- Add IPFSError exception class
- Add add_bytes() to store content on IPFS
- Add add_json() to store JSON documents on IPFS
- Add pin_or_raise() for synchronous pinning with error handling

db.py:
- Add transaction() context manager for atomic DB operations
- Add create_asset_tx() for transactional asset creation
- Add create_activity_tx() for transactional activity creation
- Add get_asset_by_hash_tx() for lookup within transactions
- Add asset_exists_by_name_tx() for existence check within transactions

server.py:
- Rewrite record_run:
  - Check L2 first for inputs, fall back to L1
  - Store recipe JSON on IPFS with CID in provenance
  - Auto-register input assets if not already on L2
  - All operations atomic
- Rewrite publish_cache:
  - IPFS CID now required
  - Synchronous pinning before DB commit
  - Transaction for asset + activity
- Rewrite _register_asset_impl:
  - IPFS CID now required
  - Synchronous pinning before DB commit
  - Transaction for asset + activity

Co-Authored-By: Claude Opus 4.5 <noreply@anthropic.com>
This commit is contained in:
gilesb
2026-01-09 00:59:12 +00:00
parent a0ed1ae5ae
commit 647c564c47
3 changed files with 488 additions and 137 deletions

81
db.py
View File

@@ -134,6 +134,23 @@ async def get_connection():
yield conn
@asynccontextmanager
async def transaction():
"""
Get a connection with an active transaction.
Usage:
async with db.transaction() as conn:
await create_asset_tx(conn, asset1)
await create_asset_tx(conn, asset2)
await create_activity_tx(conn, activity)
# Commits on exit, rolls back on exception
"""
async with get_pool().acquire() as conn:
async with conn.transaction():
yield conn
# ============ Users ============
async def get_user(username: str) -> Optional[dict]:
@@ -329,6 +346,52 @@ def _parse_asset_row(row) -> dict:
return asset
# ============ Assets (Transaction variants) ============
async def get_asset_by_hash_tx(conn, content_hash: str) -> Optional[dict]:
"""Get asset by content hash within a transaction."""
row = await conn.fetchrow(
"""SELECT name, content_hash, ipfs_cid, asset_type, tags, metadata, url,
provenance, description, origin, owner, created_at, updated_at
FROM assets WHERE content_hash = $1""",
content_hash
)
if row:
return _parse_asset_row(row)
return None
async def asset_exists_by_name_tx(conn, name: str) -> bool:
"""Check if asset name exists within a transaction."""
return await conn.fetchval(
"SELECT EXISTS(SELECT 1 FROM assets WHERE name = $1)",
name
)
async def create_asset_tx(conn, asset: dict) -> dict:
"""Create a new asset within a transaction."""
row = await conn.fetchrow(
"""INSERT INTO assets (name, content_hash, ipfs_cid, asset_type, tags, metadata,
url, provenance, description, origin, owner, created_at)
VALUES ($1, $2, $3, $4, $5, $6, $7, $8, $9, $10, $11, $12)
RETURNING *""",
asset["name"],
asset["content_hash"],
asset.get("ipfs_cid"),
asset["asset_type"],
json.dumps(asset.get("tags", [])),
json.dumps(asset.get("metadata", {})),
asset.get("url"),
json.dumps(asset.get("provenance")) if asset.get("provenance") else None,
asset.get("description"),
json.dumps(asset.get("origin")) if asset.get("origin") else None,
asset["owner"],
_parse_timestamp(asset.get("created_at"))
)
return _parse_asset_row(row)
# ============ Activities ============
async def get_activity(activity_id: str) -> Optional[dict]:
@@ -432,6 +495,24 @@ def _parse_activity_row(row) -> dict:
return activity
# ============ Activities (Transaction variants) ============
async def create_activity_tx(conn, activity: dict) -> dict:
"""Create a new activity within a transaction."""
row = await conn.fetchrow(
"""INSERT INTO activities (activity_id, activity_type, actor_id, object_data, published, signature)
VALUES ($1, $2, $3, $4, $5, $6)
RETURNING *""",
UUID(activity["activity_id"]),
activity["activity_type"],
activity["actor_id"],
json.dumps(activity["object_data"]),
_parse_timestamp(activity["published"]),
json.dumps(activity.get("signature")) if activity.get("signature") else None
)
return _parse_activity_row(row)
# ============ Followers ============
async def get_followers(username: str) -> list[dict]:

View File

@@ -2,10 +2,11 @@
"""
IPFS client for Art DAG L2 server.
Provides functions to fetch and pin content from IPFS.
Provides functions to fetch, pin, and add content to IPFS.
Uses direct HTTP API calls for compatibility with all Kubo versions.
"""
import json
import logging
import os
import re
@@ -13,6 +14,11 @@ from typing import Optional
import requests
class IPFSError(Exception):
"""Raised when an IPFS operation fails."""
pass
logger = logging.getLogger(__name__)
# IPFS API multiaddr - default to local, docker uses /dns/ipfs/tcp/5001
@@ -147,3 +153,74 @@ def get_node_id() -> Optional[str]:
except Exception as e:
logger.error(f"Failed to get node ID: {e}")
return None
def add_bytes(data: bytes, pin: bool = True) -> str:
"""
Add bytes data to IPFS and optionally pin it.
Args:
data: Bytes to add
pin: Whether to pin the data (default: True)
Returns:
IPFS CID
Raises:
IPFSError: If adding fails
"""
try:
url = f"{IPFS_BASE_URL}/api/v0/add"
params = {"pin": str(pin).lower()}
files = {"file": ("data", data)}
response = requests.post(url, params=params, files=files, timeout=IPFS_TIMEOUT)
response.raise_for_status()
result = response.json()
cid = result["Hash"]
logger.info(f"Added to IPFS: {len(data)} bytes -> {cid}")
return cid
except Exception as e:
logger.error(f"Failed to add bytes to IPFS: {e}")
raise IPFSError(f"Failed to add bytes to IPFS: {e}") from e
def add_json(data: dict) -> str:
"""
Serialize dict to JSON and add to IPFS.
Args:
data: Dictionary to serialize and store
Returns:
IPFS CID
Raises:
IPFSError: If adding fails
"""
json_bytes = json.dumps(data, indent=2, sort_keys=True).encode('utf-8')
return add_bytes(json_bytes, pin=True)
def pin_or_raise(cid: str) -> None:
"""
Pin a CID on IPFS. Raises exception on failure.
Args:
cid: IPFS CID to pin
Raises:
IPFSError: If pinning fails
"""
try:
url = f"{IPFS_BASE_URL}/api/v0/pin/add"
params = {"arg": cid}
response = requests.post(url, params=params, timeout=IPFS_TIMEOUT)
response.raise_for_status()
logger.info(f"Pinned on IPFS: {cid}")
except Exception as e:
logger.error(f"Failed to pin on IPFS: {e}")
raise IPFSError(f"Failed to pin {cid}: {e}") from e

465
server.py
View File

@@ -1647,65 +1647,89 @@ def _pin_ipfs_async(cid: str):
async def _register_asset_impl(req: RegisterRequest, owner: str):
"""Internal implementation for registering an asset."""
"""
Internal implementation for registering an asset atomically.
Requires IPFS CID - content must be on IPFS before registering.
Uses a transaction for all DB operations.
"""
import ipfs_client
from ipfs_client import IPFSError
# ===== PHASE 1: VALIDATION =====
# IPFS CID is required
if not req.ipfs_cid:
raise HTTPException(400, "IPFS CID is required for registration")
# Check if name exists
if await db.asset_exists(req.name):
raise HTTPException(400, f"Asset already exists: {req.name}")
# Pin content on IPFS if CID provided (fire-and-forget, don't block)
if req.ipfs_cid:
import threading
threading.Thread(target=_pin_ipfs_async, args=(req.ipfs_cid,), daemon=True).start()
# ===== PHASE 2: IPFS OPERATIONS =====
try:
ipfs_client.pin_or_raise(req.ipfs_cid)
except IPFSError as e:
raise HTTPException(500, f"IPFS operation failed: {e}")
# Create asset
# ===== PHASE 3: DB TRANSACTION =====
now = datetime.now(timezone.utc).isoformat()
asset = {
"name": req.name,
"content_hash": req.content_hash,
"ipfs_cid": req.ipfs_cid,
"asset_type": req.asset_type,
"tags": req.tags,
"metadata": req.metadata,
"url": req.url,
"provenance": req.provenance,
"owner": owner,
"created_at": now
}
# Save asset to database
created_asset = await db.create_asset(asset)
try:
async with db.transaction() as conn:
# Check name again inside transaction (race condition protection)
if await db.asset_exists_by_name_tx(conn, req.name):
raise HTTPException(400, f"Asset already exists: {req.name}")
# Create ownership activity
object_data = {
"type": req.asset_type.capitalize(),
"name": req.name,
"id": f"https://{DOMAIN}/objects/{req.content_hash}",
"contentHash": {
"algorithm": "sha3-256",
"value": req.content_hash
},
"attributedTo": f"https://{DOMAIN}/users/{owner}"
}
# Create asset
asset = {
"name": req.name,
"content_hash": req.content_hash,
"ipfs_cid": req.ipfs_cid,
"asset_type": req.asset_type,
"tags": req.tags,
"metadata": req.metadata,
"url": req.url,
"provenance": req.provenance,
"owner": owner,
"created_at": now
}
created_asset = await db.create_asset_tx(conn, asset)
# Include provenance in activity object_data if present
if req.provenance:
object_data["provenance"] = req.provenance
# Create ownership activity
object_data = {
"type": req.asset_type.capitalize(),
"name": req.name,
"id": f"https://{DOMAIN}/objects/{req.content_hash}",
"contentHash": {
"algorithm": "sha3-256",
"value": req.content_hash
},
"attributedTo": f"https://{DOMAIN}/users/{owner}"
}
activity = {
"activity_id": str(uuid.uuid4()),
"activity_type": "Create",
"actor_id": f"https://{DOMAIN}/users/{owner}",
"object_data": object_data,
"published": now
}
# Include provenance in activity object_data if present
if req.provenance:
object_data["provenance"] = req.provenance
# Sign activity with the owner's keys
activity = sign_activity(activity, owner)
activity = {
"activity_id": str(uuid.uuid4()),
"activity_type": "Create",
"actor_id": f"https://{DOMAIN}/users/{owner}",
"object_data": object_data,
"published": now
}
activity = sign_activity(activity, owner)
created_activity = await db.create_activity_tx(conn, activity)
# Save activity to database
await db.create_activity(activity)
# Transaction commits here on successful exit
return {"asset": created_asset, "activity": activity}
except HTTPException:
raise
except Exception as e:
logger.error(f"Database transaction failed: {e}")
raise HTTPException(500, f"Failed to register asset: {e}")
return {"asset": created_asset, "activity": created_activity}
@app.post("/assets")
@@ -1717,11 +1741,23 @@ async def register_asset(req: RegisterRequest, user: User = Depends(get_required
@app.post("/assets/record-run")
@app.post("/registry/record-run") # Legacy route
async def record_run(req: RecordRunRequest, user: User = Depends(get_required_user)):
"""Record an L1 run and register the output. Requires authentication."""
# Fetch run from the specified L1 server
"""
Record an L1 run and register the output atomically.
Ensures all operations succeed or none do:
1. All input assets registered (if not already on L2) + pinned on IPFS
2. Output asset registered + pinned on IPFS
3. Recipe serialized to JSON, stored on IPFS, CID saved in provenance
"""
import ipfs_client
from ipfs_client import IPFSError
# ===== PHASE 1: PREPARATION (read-only) =====
l1_url = req.l1_server.rstrip('/')
# Fetch run from L1
try:
resp = requests.get(f"{l1_url}/runs/{req.run_id}")
resp = requests.get(f"{l1_url}/runs/{req.run_id}", timeout=30)
resp.raise_for_status()
run = resp.json()
except Exception as e:
@@ -1734,7 +1770,7 @@ async def record_run(req: RecordRunRequest, user: User = Depends(get_required_us
if not output_hash:
raise HTTPException(400, "Run has no output hash")
# Fetch media type from L1 cache
# Fetch output cache info from L1 (must exist - it's new)
try:
cache_resp = requests.get(
f"{l1_url}/cache/{output_hash}",
@@ -1743,45 +1779,187 @@ async def record_run(req: RecordRunRequest, user: User = Depends(get_required_us
)
cache_resp.raise_for_status()
cache_info = cache_resp.json()
media_type = cache_info.get("media_type", "image")
ipfs_cid = cache_info.get("ipfs_cid")
output_media_type = cache_info.get("media_type", "image")
output_ipfs_cid = cache_info.get("ipfs_cid")
except Exception as e:
logger.warning(f"Failed to fetch cache info from L1: {e}")
media_type = "image" # Default fallback
ipfs_cid = None
raise HTTPException(400, f"Failed to fetch output cache info: {e}")
# Build provenance from run
provenance = {
"inputs": [{"content_hash": h} for h in run.get("inputs", [])],
"recipe": run.get("recipe"),
"effect_url": run.get("effect_url"),
"effects_commit": run.get("effects_commit"),
"l1_server": l1_url,
"l1_run_id": req.run_id,
"rendered_at": run.get("completed_at"),
"infrastructure": run.get("infrastructure")
}
if not output_ipfs_cid:
raise HTTPException(400, "Output has no IPFS CID - cannot publish")
# Register the output under the authenticated user
return await _register_asset_impl(RegisterRequest(
name=req.output_name,
content_hash=output_hash,
ipfs_cid=ipfs_cid,
asset_type=media_type, # Detected from L1 cache
tags=["rendered", "l1"],
metadata={"l1_server": l1_url, "l1_run_id": req.run_id},
provenance=provenance
), user.username)
# Gather input info: check L2 first, then fall back to L1
input_hashes = run.get("inputs", [])
input_infos = [] # List of {content_hash, ipfs_cid, media_type, existing_asset}
for input_hash in input_hashes:
# Check if already on L2
existing = await db.get_asset_by_hash(input_hash)
if existing and existing.get("ipfs_cid"):
input_infos.append({
"content_hash": input_hash,
"ipfs_cid": existing["ipfs_cid"],
"media_type": existing.get("asset_type", "image"),
"existing_asset": existing
})
else:
# Not on L2, try L1
try:
inp_resp = requests.get(
f"{l1_url}/cache/{input_hash}",
headers={"Accept": "application/json"},
timeout=10
)
inp_resp.raise_for_status()
inp_info = inp_resp.json()
ipfs_cid = inp_info.get("ipfs_cid")
if not ipfs_cid:
raise HTTPException(400, f"Input {input_hash[:16]}... has no IPFS CID (not on L2 or L1)")
input_infos.append({
"content_hash": input_hash,
"ipfs_cid": ipfs_cid,
"media_type": inp_info.get("media_type", "image"),
"existing_asset": None
})
except HTTPException:
raise
except Exception as e:
raise HTTPException(400, f"Input {input_hash[:16]}... not on L2 and failed to fetch from L1: {e}")
# Prepare recipe data
recipe_data = run.get("recipe")
if not recipe_data:
recipe_data = {
"name": run.get("recipe_name", "unknown"),
"effect_url": run.get("effect_url"),
"effects_commit": run.get("effects_commit"),
}
# ===== PHASE 2: IPFS OPERATIONS (blocking, before any DB changes) =====
try:
# Pin all inputs
for inp in input_infos:
ipfs_client.pin_or_raise(inp["ipfs_cid"])
# Pin output
ipfs_client.pin_or_raise(output_ipfs_cid)
# Store recipe on IPFS
recipe_cid = ipfs_client.add_json(recipe_data)
except IPFSError as e:
raise HTTPException(500, f"IPFS operation failed: {e}")
# ===== PHASE 3: DB TRANSACTION (all-or-nothing) =====
now = datetime.now(timezone.utc).isoformat()
try:
async with db.transaction() as conn:
# Register input assets (if not already on L2)
registered_inputs = []
for inp in input_infos:
if inp["existing_asset"]:
# Already on L2
registered_inputs.append({
"content_hash": inp["content_hash"],
"name": inp["existing_asset"]["name"]
})
else:
# Create new input asset
input_name = f"input-{inp['content_hash'][:16]}"
input_asset = {
"name": input_name,
"content_hash": inp["content_hash"],
"ipfs_cid": inp["ipfs_cid"],
"asset_type": inp["media_type"],
"tags": ["auto-registered", "input"],
"metadata": {"auto_registered_from_run": req.run_id},
"owner": user.username,
"created_at": now
}
await db.create_asset_tx(conn, input_asset)
registered_inputs.append({
"content_hash": inp["content_hash"],
"name": input_name
})
# Check output name doesn't exist
if await db.asset_exists_by_name_tx(conn, req.output_name):
raise HTTPException(400, f"Asset already exists: {req.output_name}")
# Build provenance with recipe CID
provenance = {
"inputs": registered_inputs,
"recipe": recipe_data,
"recipe_cid": recipe_cid,
"effect_url": run.get("effect_url"),
"effects_commit": run.get("effects_commit"),
"l1_server": l1_url,
"l1_run_id": req.run_id,
"rendered_at": run.get("completed_at"),
"infrastructure": run.get("infrastructure")
}
# Create output asset
output_asset = {
"name": req.output_name,
"content_hash": output_hash,
"ipfs_cid": output_ipfs_cid,
"asset_type": output_media_type,
"tags": ["rendered", "l1"],
"metadata": {"l1_server": l1_url, "l1_run_id": req.run_id},
"provenance": provenance,
"owner": user.username,
"created_at": now
}
created_asset = await db.create_asset_tx(conn, output_asset)
# Create activity
object_data = {
"type": output_media_type.capitalize(),
"name": req.output_name,
"id": f"https://{DOMAIN}/objects/{output_hash}",
"contentHash": {
"algorithm": "sha3-256",
"value": output_hash
},
"attributedTo": f"https://{DOMAIN}/users/{user.username}",
"provenance": provenance
}
activity = {
"activity_id": str(uuid.uuid4()),
"activity_type": "Create",
"actor_id": f"https://{DOMAIN}/users/{user.username}",
"object_data": object_data,
"published": now
}
activity = sign_activity(activity, user.username)
created_activity = await db.create_activity_tx(conn, activity)
# Transaction commits here on successful exit
except HTTPException:
raise
except Exception as e:
logger.error(f"Database transaction failed: {e}")
raise HTTPException(500, f"Failed to record run: {e}")
return {"asset": created_asset, "activity": created_activity}
@app.post("/assets/publish-cache")
async def publish_cache(req: PublishCacheRequest, user: User = Depends(get_required_user)):
"""
Publish a cache item from L1 with metadata.
Publish a cache item from L1 with metadata atomically.
Requires origin to be set (self or external URL).
Creates a new asset and Create activity.
Requires IPFS CID - content must be on IPFS before publishing.
Creates a new asset and Create activity in a single transaction.
"""
import ipfs_client
from ipfs_client import IPFSError
# ===== PHASE 1: VALIDATION =====
# Validate origin
if not req.origin or "type" not in req.origin:
raise HTTPException(400, "Origin is required for publishing (type: 'self' or 'external')")
@@ -1793,78 +1971,93 @@ async def publish_cache(req: PublishCacheRequest, user: User = Depends(get_requi
if origin_type == "external" and not req.origin.get("url"):
raise HTTPException(400, "External origin requires a URL")
# IPFS CID is now required
if not req.ipfs_cid:
raise HTTPException(400, "IPFS CID is required for publishing")
# Check if asset name already exists
if await db.asset_exists(req.asset_name):
raise HTTPException(400, f"Asset name already exists: {req.asset_name}")
# Pin content on IPFS if CID provided (fire-and-forget, don't block)
if req.ipfs_cid:
import threading
threading.Thread(target=_pin_ipfs_async, args=(req.ipfs_cid,), daemon=True).start()
# ===== PHASE 2: IPFS OPERATIONS =====
try:
ipfs_client.pin_or_raise(req.ipfs_cid)
except IPFSError as e:
raise HTTPException(500, f"IPFS operation failed: {e}")
# Create asset
# ===== PHASE 3: DB TRANSACTION =====
now = datetime.now(timezone.utc).isoformat()
asset = {
"name": req.asset_name,
"content_hash": req.content_hash,
"ipfs_cid": req.ipfs_cid,
"asset_type": req.asset_type,
"tags": req.tags,
"description": req.description,
"origin": req.origin,
"metadata": req.metadata,
"owner": user.username,
"created_at": now
}
# Save asset to database
created_asset = await db.create_asset(asset)
try:
async with db.transaction() as conn:
# Check name again inside transaction (race condition protection)
if await db.asset_exists_by_name_tx(conn, req.asset_name):
raise HTTPException(400, f"Asset name already exists: {req.asset_name}")
# Create ownership activity with origin info
object_data = {
"type": req.asset_type.capitalize(),
"name": req.asset_name,
"id": f"https://{DOMAIN}/objects/{req.content_hash}",
"contentHash": {
"algorithm": "sha3-256",
"value": req.content_hash
},
"attributedTo": f"https://{DOMAIN}/users/{user.username}",
"tag": req.tags
}
# Create asset
asset = {
"name": req.asset_name,
"content_hash": req.content_hash,
"ipfs_cid": req.ipfs_cid,
"asset_type": req.asset_type,
"tags": req.tags,
"description": req.description,
"origin": req.origin,
"metadata": req.metadata,
"owner": user.username,
"created_at": now
}
created_asset = await db.create_asset_tx(conn, asset)
if req.description:
object_data["summary"] = req.description
# Create ownership activity with origin info
object_data = {
"type": req.asset_type.capitalize(),
"name": req.asset_name,
"id": f"https://{DOMAIN}/objects/{req.content_hash}",
"contentHash": {
"algorithm": "sha3-256",
"value": req.content_hash
},
"attributedTo": f"https://{DOMAIN}/users/{user.username}",
"tag": req.tags
}
# Include origin in ActivityPub object
if origin_type == "self":
object_data["generator"] = {
"type": "Application",
"name": "Art DAG",
"note": "Original content created by the author"
}
else:
object_data["source"] = {
"type": "Link",
"href": req.origin.get("url"),
"name": req.origin.get("note", "External source")
}
if req.description:
object_data["summary"] = req.description
activity = {
"activity_id": str(uuid.uuid4()),
"activity_type": "Create",
"actor_id": f"https://{DOMAIN}/users/{user.username}",
"object_data": object_data,
"published": now
}
# Include origin in ActivityPub object
if origin_type == "self":
object_data["generator"] = {
"type": "Application",
"name": "Art DAG",
"note": "Original content created by the author"
}
else:
object_data["source"] = {
"type": "Link",
"href": req.origin.get("url"),
"name": req.origin.get("note", "External source")
}
# Sign activity with the user's keys
activity = sign_activity(activity, user.username)
activity = {
"activity_id": str(uuid.uuid4()),
"activity_type": "Create",
"actor_id": f"https://{DOMAIN}/users/{user.username}",
"object_data": object_data,
"published": now
}
activity = sign_activity(activity, user.username)
created_activity = await db.create_activity_tx(conn, activity)
# Save activity to database
await db.create_activity(activity)
# Transaction commits here on successful exit
return {"asset": created_asset, "activity": activity}
except HTTPException:
raise
except Exception as e:
logger.error(f"Database transaction failed: {e}")
raise HTTPException(500, f"Failed to publish cache item: {e}")
return {"asset": created_asset, "activity": created_activity}
# ============ Activities Endpoints ============