#!/usr/bin/env python3 """ Art DAG L1 Server Manages rendering runs and provides access to the cache. - POST /runs - start a run (recipe + inputs) - GET /runs/{run_id} - get run status/result - GET /cache/{content_hash} - get cached content """ import hashlib import json import os import uuid from datetime import datetime, timezone from pathlib import Path from typing import Optional from fastapi import FastAPI, HTTPException from fastapi.responses import FileResponse from pydantic import BaseModel import redis from celery_app import app as celery_app from tasks import render_effect # Cache directory CACHE_DIR = Path.home() / ".artdag" / "cache" CACHE_DIR.mkdir(parents=True, exist_ok=True) # Redis for persistent run storage redis_client = redis.Redis(host='localhost', port=6379, db=5) RUNS_KEY_PREFIX = "artdag:run:" def save_run(run: "RunStatus"): """Save run to Redis.""" redis_client.set(f"{RUNS_KEY_PREFIX}{run.run_id}", run.model_dump_json()) def load_run(run_id: str) -> Optional["RunStatus"]: """Load run from Redis.""" data = redis_client.get(f"{RUNS_KEY_PREFIX}{run_id}") if data: return RunStatus.model_validate_json(data) return None def list_all_runs() -> list["RunStatus"]: """List all runs from Redis.""" runs = [] for key in redis_client.scan_iter(f"{RUNS_KEY_PREFIX}*"): data = redis_client.get(key) if data: runs.append(RunStatus.model_validate_json(data)) return sorted(runs, key=lambda r: r.created_at, reverse=True) app = FastAPI( title="Art DAG L1 Server", description="Distributed rendering server for Art DAG", version="0.1.0" ) class RunRequest(BaseModel): """Request to start a run.""" recipe: str # Recipe name (e.g., "dog", "identity") inputs: list[str] # List of content hashes output_name: Optional[str] = None class RunStatus(BaseModel): """Status of a run.""" run_id: str status: str # pending, running, completed, failed recipe: str inputs: list[str] output_name: str created_at: str completed_at: Optional[str] = None output_hash: Optional[str] = None error: Optional[str] = None celery_task_id: Optional[str] = None def file_hash(path: Path) -> str: """Compute SHA3-256 hash of a file.""" hasher = hashlib.sha3_256() with open(path, "rb") as f: for chunk in iter(lambda: f.read(65536), b""): hasher.update(chunk) return hasher.hexdigest() def cache_file(source: Path) -> str: """Copy file to cache, return content hash.""" content_hash = file_hash(source) cache_path = CACHE_DIR / content_hash if not cache_path.exists(): import shutil shutil.copy2(source, cache_path) return content_hash @app.get("/") async def root(): """Server info.""" return { "name": "Art DAG L1 Server", "version": "0.1.0", "cache_dir": str(CACHE_DIR), "runs_count": len(list_all_runs()) } @app.post("/runs", response_model=RunStatus) async def create_run(request: RunRequest): """Start a new rendering run.""" run_id = str(uuid.uuid4()) # Generate output name if not provided output_name = request.output_name or f"{request.recipe}-{run_id[:8]}" # Create run record run = RunStatus( run_id=run_id, status="pending", recipe=request.recipe, inputs=request.inputs, output_name=output_name, created_at=datetime.now(timezone.utc).isoformat() ) # Submit to Celery # For now, we only support single-input recipes if len(request.inputs) != 1: raise HTTPException(400, "Currently only single-input recipes supported") input_hash = request.inputs[0] task = render_effect.delay(input_hash, request.recipe, output_name) run.celery_task_id = task.id run.status = "running" save_run(run) return run @app.get("/runs/{run_id}", response_model=RunStatus) async def get_run(run_id: str): """Get status of a run.""" run = load_run(run_id) if not run: raise HTTPException(404, f"Run {run_id} not found") # Check Celery task status if running if run.status == "running" and run.celery_task_id: task = celery_app.AsyncResult(run.celery_task_id) if task.ready(): if task.successful(): result = task.result run.status = "completed" run.completed_at = datetime.now(timezone.utc).isoformat() run.output_hash = result.get("output", {}).get("content_hash") # Cache the output output_path = Path(result.get("output", {}).get("local_path", "")) if output_path.exists(): cache_file(output_path) else: run.status = "failed" run.error = str(task.result) # Save updated status save_run(run) return run @app.get("/runs") async def list_runs(): """List all runs.""" return list_all_runs() @app.get("/cache/{content_hash}") async def get_cached(content_hash: str): """Get cached content by hash.""" cache_path = CACHE_DIR / content_hash if not cache_path.exists(): raise HTTPException(404, f"Content {content_hash} not in cache") return FileResponse(cache_path) @app.get("/cache") async def list_cache(): """List cached content hashes.""" return [f.name for f in CACHE_DIR.iterdir() if f.is_file()] # Known assets (bootstrap data) KNOWN_ASSETS = { "cat": "33268b6e167deaf018cc538de12dbe562612b33e89a749391cef855b320a269b", } @app.get("/assets") async def list_assets(): """List known assets.""" return KNOWN_ASSETS @app.post("/cache/import") async def import_to_cache(path: str): """Import a local file to cache.""" source = Path(path) if not source.exists(): raise HTTPException(404, f"File not found: {path}") content_hash = cache_file(source) return {"content_hash": content_hash, "cached": True} if __name__ == "__main__": import uvicorn uvicorn.run(app, host="0.0.0.0", port=8100)