Add content-addressable runs - runs identified by hash of inputs + recipe
- Add run_cache table for fast run_id -> output lookup - compute_run_id() computes deterministic run_id from inputs + recipe - create_run checks L1 cache then L2 before running Celery - If output exists on L2 but not L1, pulls from IPFS - Saves run results to cache on completion Co-Authored-By: Claude Opus 4.5 <noreply@anthropic.com>
This commit is contained in:
139
server.py
139
server.py
@@ -47,6 +47,25 @@ import database
|
||||
# L1 public URL for redirects
|
||||
L1_PUBLIC_URL = os.environ.get("L1_PUBLIC_URL", "http://localhost:8100")
|
||||
|
||||
|
||||
def compute_run_id(input_hashes: list[str], recipe: str, recipe_hash: str = None) -> str:
|
||||
"""
|
||||
Compute a deterministic run_id from inputs and recipe.
|
||||
|
||||
The run_id is a SHA3-256 hash of:
|
||||
- Sorted input content hashes
|
||||
- Recipe identifier (recipe_hash if provided, else "effect:{recipe}")
|
||||
|
||||
This makes runs content-addressable: same inputs + recipe = same run_id.
|
||||
"""
|
||||
data = {
|
||||
"inputs": sorted(input_hashes),
|
||||
"recipe": recipe_hash or f"effect:{recipe}",
|
||||
"version": "1", # For future schema changes
|
||||
}
|
||||
json_str = json.dumps(data, sort_keys=True, separators=(",", ":"))
|
||||
return hashlib.sha3_256(json_str.encode()).hexdigest()
|
||||
|
||||
# Default L2 for login redirect when not logged in (user can login to any L2)
|
||||
DEFAULT_L2_SERVER = os.environ.get("DEFAULT_L2_SERVER", "http://localhost:8200")
|
||||
|
||||
@@ -597,8 +616,9 @@ async def root(request: Request):
|
||||
|
||||
@app.post("/runs", response_model=RunStatus)
|
||||
async def create_run(request: RunRequest, ctx: UserContext = Depends(get_required_user_context)):
|
||||
"""Start a new rendering run. Requires authentication."""
|
||||
run_id = str(uuid.uuid4())
|
||||
"""Start a new rendering run. Checks cache before executing."""
|
||||
# Compute content-addressable run_id
|
||||
run_id = compute_run_id(request.inputs, request.recipe)
|
||||
|
||||
# Generate output name if not provided
|
||||
output_name = request.output_name or f"{request.recipe}-{run_id[:8]}"
|
||||
@@ -606,7 +626,78 @@ async def create_run(request: RunRequest, ctx: UserContext = Depends(get_require
|
||||
# Use actor_id from user context
|
||||
actor_id = ctx.actor_id
|
||||
|
||||
# Create run record
|
||||
# Check L1 cache first
|
||||
cached_run = await database.get_run_cache(run_id)
|
||||
if cached_run:
|
||||
output_hash = cached_run["output_hash"]
|
||||
# Verify the output file still exists in cache
|
||||
if cache_manager.has_content(output_hash):
|
||||
logger.info(f"create_run: Cache hit for run_id={run_id[:16]}... output={output_hash[:16]}...")
|
||||
return RunStatus(
|
||||
run_id=run_id,
|
||||
status="completed",
|
||||
recipe=request.recipe,
|
||||
inputs=request.inputs,
|
||||
output_name=output_name,
|
||||
created_at=cached_run.get("created_at", datetime.now(timezone.utc).isoformat()),
|
||||
completed_at=cached_run.get("created_at", datetime.now(timezone.utc).isoformat()),
|
||||
output_hash=output_hash,
|
||||
username=actor_id,
|
||||
provenance_cid=cached_run.get("provenance_cid"),
|
||||
)
|
||||
else:
|
||||
logger.info(f"create_run: Cache entry exists but output missing, will re-run")
|
||||
|
||||
# Check L2 if not in L1
|
||||
l2_server = ctx.l2_server
|
||||
try:
|
||||
l2_resp = http_requests.get(
|
||||
f"{l2_server}/assets/by-run-id/{run_id}",
|
||||
timeout=10
|
||||
)
|
||||
if l2_resp.status_code == 200:
|
||||
l2_data = l2_resp.json()
|
||||
output_hash = l2_data.get("output_hash")
|
||||
ipfs_cid = l2_data.get("ipfs_cid")
|
||||
if output_hash and ipfs_cid:
|
||||
logger.info(f"create_run: Found on L2, pulling from IPFS: {ipfs_cid}")
|
||||
# Pull from IPFS to L1 cache
|
||||
import ipfs_client
|
||||
legacy_dir = CACHE_DIR / "legacy"
|
||||
legacy_dir.mkdir(parents=True, exist_ok=True)
|
||||
recovery_path = legacy_dir / output_hash
|
||||
if ipfs_client.get_file(ipfs_cid, str(recovery_path)):
|
||||
# File retrieved - put() updates indexes, but file is already in legacy location
|
||||
# Just update the content and IPFS indexes manually
|
||||
cache_manager._set_content_index(output_hash, output_hash)
|
||||
cache_manager._set_ipfs_index(output_hash, ipfs_cid)
|
||||
# Save to run cache
|
||||
await database.save_run_cache(
|
||||
run_id=run_id,
|
||||
output_hash=output_hash,
|
||||
recipe=request.recipe,
|
||||
inputs=request.inputs,
|
||||
ipfs_cid=ipfs_cid,
|
||||
provenance_cid=l2_data.get("provenance_cid"),
|
||||
actor_id=actor_id,
|
||||
)
|
||||
logger.info(f"create_run: Recovered from L2/IPFS: {output_hash[:16]}...")
|
||||
return RunStatus(
|
||||
run_id=run_id,
|
||||
status="completed",
|
||||
recipe=request.recipe,
|
||||
inputs=request.inputs,
|
||||
output_name=output_name,
|
||||
created_at=datetime.now(timezone.utc).isoformat(),
|
||||
completed_at=datetime.now(timezone.utc).isoformat(),
|
||||
output_hash=output_hash,
|
||||
username=actor_id,
|
||||
provenance_cid=l2_data.get("provenance_cid"),
|
||||
)
|
||||
except Exception as e:
|
||||
logger.warning(f"create_run: L2 lookup failed (will run Celery): {e}")
|
||||
|
||||
# Not cached anywhere - create run record and submit to Celery
|
||||
run = RunStatus(
|
||||
run_id=run_id,
|
||||
status="pending",
|
||||
@@ -716,6 +807,20 @@ async def get_run(run_id: str):
|
||||
output_hash=run.output_hash,
|
||||
run_id=run.run_id,
|
||||
)
|
||||
|
||||
# Save to run cache for content-addressable lookup
|
||||
if run.output_hash:
|
||||
ipfs_cid = cache_manager._get_ipfs_cid_from_index(run.output_hash)
|
||||
await database.save_run_cache(
|
||||
run_id=run.run_id,
|
||||
output_hash=run.output_hash,
|
||||
recipe=run.recipe,
|
||||
inputs=run.inputs,
|
||||
ipfs_cid=ipfs_cid,
|
||||
provenance_cid=run.provenance_cid,
|
||||
actor_id=run.username,
|
||||
)
|
||||
logger.info(f"get_run: Saved run cache for {run.run_id[:16]}...")
|
||||
else:
|
||||
run.status = "failed"
|
||||
run.error = error
|
||||
@@ -842,6 +947,18 @@ async def run_detail(run_id: str, request: Request):
|
||||
output_path = Path(result.get("output", {}).get("local_path", ""))
|
||||
if output_path.exists():
|
||||
await cache_file(output_path)
|
||||
# Save to run cache for content-addressable lookup
|
||||
if run.output_hash:
|
||||
ipfs_cid = cache_manager._get_ipfs_cid_from_index(run.output_hash)
|
||||
await database.save_run_cache(
|
||||
run_id=run.run_id,
|
||||
output_hash=run.output_hash,
|
||||
recipe=run.recipe,
|
||||
inputs=run.inputs,
|
||||
ipfs_cid=ipfs_cid,
|
||||
provenance_cid=run.provenance_cid,
|
||||
actor_id=run.username,
|
||||
)
|
||||
else:
|
||||
run.status = "failed"
|
||||
run.error = error
|
||||
@@ -4073,6 +4190,22 @@ async def ui_run_partial(run_id: str, request: Request):
|
||||
return html
|
||||
|
||||
|
||||
# ============ Client Download ============
|
||||
|
||||
CLIENT_TARBALL = Path(__file__).parent / "artdag-client.tar.gz"
|
||||
|
||||
@app.get("/download/client")
|
||||
async def download_client():
|
||||
"""Download the Art DAG CLI client."""
|
||||
if not CLIENT_TARBALL.exists():
|
||||
raise HTTPException(404, "Client package not found")
|
||||
return FileResponse(
|
||||
CLIENT_TARBALL,
|
||||
media_type="application/gzip",
|
||||
filename="artdag-client.tar.gz"
|
||||
)
|
||||
|
||||
|
||||
if __name__ == "__main__":
|
||||
import uvicorn
|
||||
# Workers enabled - cache indexes shared via Redis
|
||||
|
||||
Reference in New Issue
Block a user