From ba20c2dc63f842e551f86ae94a5ce64c3f5ceea3 Mon Sep 17 00:00:00 2001 From: gilesb Date: Wed, 7 Jan 2026 10:45:30 +0000 Subject: [PATCH] feat: L1 server with persistent run storage MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit - FastAPI server on port 8100 - POST /runs to start rendering jobs - GET /runs/{id} to check status - Cache and run persistence in Redis - Auto-generated API docs at /docs 🤖 Generated with [Claude Code](https://claude.com/claude-code) Co-Authored-By: Claude Opus 4.5 --- README.md | 71 ++++++++++++--- requirements.txt | 2 + server.py | 230 +++++++++++++++++++++++++++++++++++++++++++++++ 3 files changed, 289 insertions(+), 14 deletions(-) create mode 100644 server.py diff --git a/README.md b/README.md index 28bea9d..bc2c814 100644 --- a/README.md +++ b/README.md @@ -1,12 +1,12 @@ # Art Celery -Distributed rendering for the Art DAG system using Celery. +L1 rendering server for the Art DAG system. Manages distributed rendering jobs via Celery workers. ## Dependencies - **artdag** (GitHub): Core DAG execution engine - **artdag-effects** (rose-ash): Effect implementations -- **Redis**: Message broker and result backend +- **Redis**: Message broker, result backend, and run persistence ## Setup @@ -19,9 +19,48 @@ pip install -r requirements.txt # Start a worker celery -A celery_app worker --loglevel=info + +# Start the L1 server +python server.py ``` -## Usage +## L1 Server API + +Interactive docs: http://localhost:8100/docs + +### Endpoints + +| Method | Path | Description | +|--------|------|-------------| +| GET | `/` | Server info | +| POST | `/runs` | Start a rendering run | +| GET | `/runs` | List all runs | +| GET | `/runs/{run_id}` | Get run status | +| GET | `/cache` | List cached content hashes | +| GET | `/cache/{hash}` | Download cached content | +| POST | `/cache/import?path=` | Import local file to cache | +| GET | `/assets` | List known assets | + +### Start a run + +```bash +curl -X POST http://localhost:8100/runs \ + -H "Content-Type: application/json" \ + -d '{"recipe": "dog", "inputs": ["33268b6e..."], "output_name": "my-output"}' +``` + +### Check run status + +```bash +curl http://localhost:8100/runs/{run_id} +``` + +## Storage + +- **Cache**: `~/.artdag/cache/` (content-addressed files) +- **Runs**: Redis db 5, keys `artdag:run:*` (persists across restarts) + +## CLI Usage ```bash # Render cat through dog effect @@ -37,19 +76,23 @@ python render.py dog cat ## Architecture ``` -render.py (CLI) +server.py (L1 Server - FastAPI) │ - ▼ -celery_app.py (Celery broker) + ├── POST /runs → Submit job + │ │ + │ ▼ + │ celery_app.py (Celery broker) + │ │ + │ ▼ + │ tasks.py (render_effect task) + │ │ + │ ├── artdag (GitHub) - DAG execution + │ └── artdag-effects (rose-ash) - Effects + │ │ + │ ▼ + │ Output + Provenance │ - ▼ -tasks.py (render_effect task) - │ - ├── artdag (GitHub) - DAG execution - └── artdag-effects (rose-ash) - Effect implementations - │ - ▼ - Provenance JSON + Output file + └── GET /cache/{hash} → Retrieve output ``` ## Provenance diff --git a/requirements.txt b/requirements.txt index b5836ba..6db191d 100644 --- a/requirements.txt +++ b/requirements.txt @@ -1,5 +1,7 @@ celery[redis]>=5.3.0 redis>=5.0.0 requests>=2.31.0 +fastapi>=0.109.0 +uvicorn>=0.27.0 # Core artdag from GitHub git+https://github.com/gilesbradshaw/art-dag.git diff --git a/server.py b/server.py new file mode 100644 index 0000000..8d463d7 --- /dev/null +++ b/server.py @@ -0,0 +1,230 @@ +#!/usr/bin/env python3 +""" +Art DAG L1 Server + +Manages rendering runs and provides access to the cache. +- POST /runs - start a run (recipe + inputs) +- GET /runs/{run_id} - get run status/result +- GET /cache/{content_hash} - get cached content +""" + +import hashlib +import json +import os +import uuid +from datetime import datetime, timezone +from pathlib import Path +from typing import Optional + +from fastapi import FastAPI, HTTPException +from fastapi.responses import FileResponse +from pydantic import BaseModel + +import redis + +from celery_app import app as celery_app +from tasks import render_effect + +# Cache directory +CACHE_DIR = Path.home() / ".artdag" / "cache" +CACHE_DIR.mkdir(parents=True, exist_ok=True) + +# Redis for persistent run storage +redis_client = redis.Redis(host='localhost', port=6379, db=5) +RUNS_KEY_PREFIX = "artdag:run:" + + +def save_run(run: "RunStatus"): + """Save run to Redis.""" + redis_client.set(f"{RUNS_KEY_PREFIX}{run.run_id}", run.model_dump_json()) + + +def load_run(run_id: str) -> Optional["RunStatus"]: + """Load run from Redis.""" + data = redis_client.get(f"{RUNS_KEY_PREFIX}{run_id}") + if data: + return RunStatus.model_validate_json(data) + return None + + +def list_all_runs() -> list["RunStatus"]: + """List all runs from Redis.""" + runs = [] + for key in redis_client.scan_iter(f"{RUNS_KEY_PREFIX}*"): + data = redis_client.get(key) + if data: + runs.append(RunStatus.model_validate_json(data)) + return sorted(runs, key=lambda r: r.created_at, reverse=True) + +app = FastAPI( + title="Art DAG L1 Server", + description="Distributed rendering server for Art DAG", + version="0.1.0" +) + + +class RunRequest(BaseModel): + """Request to start a run.""" + recipe: str # Recipe name (e.g., "dog", "identity") + inputs: list[str] # List of content hashes + output_name: Optional[str] = None + + +class RunStatus(BaseModel): + """Status of a run.""" + run_id: str + status: str # pending, running, completed, failed + recipe: str + inputs: list[str] + output_name: str + created_at: str + completed_at: Optional[str] = None + output_hash: Optional[str] = None + error: Optional[str] = None + celery_task_id: Optional[str] = None + + +def file_hash(path: Path) -> str: + """Compute SHA3-256 hash of a file.""" + hasher = hashlib.sha3_256() + with open(path, "rb") as f: + for chunk in iter(lambda: f.read(65536), b""): + hasher.update(chunk) + return hasher.hexdigest() + + +def cache_file(source: Path) -> str: + """Copy file to cache, return content hash.""" + content_hash = file_hash(source) + cache_path = CACHE_DIR / content_hash + if not cache_path.exists(): + import shutil + shutil.copy2(source, cache_path) + return content_hash + + +@app.get("/") +async def root(): + """Server info.""" + return { + "name": "Art DAG L1 Server", + "version": "0.1.0", + "cache_dir": str(CACHE_DIR), + "runs_count": len(list_all_runs()) + } + + +@app.post("/runs", response_model=RunStatus) +async def create_run(request: RunRequest): + """Start a new rendering run.""" + run_id = str(uuid.uuid4()) + + # Generate output name if not provided + output_name = request.output_name or f"{request.recipe}-{run_id[:8]}" + + # Create run record + run = RunStatus( + run_id=run_id, + status="pending", + recipe=request.recipe, + inputs=request.inputs, + output_name=output_name, + created_at=datetime.now(timezone.utc).isoformat() + ) + + # Submit to Celery + # For now, we only support single-input recipes + if len(request.inputs) != 1: + raise HTTPException(400, "Currently only single-input recipes supported") + + input_hash = request.inputs[0] + + task = render_effect.delay(input_hash, request.recipe, output_name) + run.celery_task_id = task.id + run.status = "running" + + save_run(run) + return run + + +@app.get("/runs/{run_id}", response_model=RunStatus) +async def get_run(run_id: str): + """Get status of a run.""" + run = load_run(run_id) + if not run: + raise HTTPException(404, f"Run {run_id} not found") + + # Check Celery task status if running + if run.status == "running" and run.celery_task_id: + task = celery_app.AsyncResult(run.celery_task_id) + + if task.ready(): + if task.successful(): + result = task.result + run.status = "completed" + run.completed_at = datetime.now(timezone.utc).isoformat() + run.output_hash = result.get("output", {}).get("content_hash") + + # Cache the output + output_path = Path(result.get("output", {}).get("local_path", "")) + if output_path.exists(): + cache_file(output_path) + else: + run.status = "failed" + run.error = str(task.result) + + # Save updated status + save_run(run) + + return run + + +@app.get("/runs") +async def list_runs(): + """List all runs.""" + return list_all_runs() + + +@app.get("/cache/{content_hash}") +async def get_cached(content_hash: str): + """Get cached content by hash.""" + cache_path = CACHE_DIR / content_hash + + if not cache_path.exists(): + raise HTTPException(404, f"Content {content_hash} not in cache") + + return FileResponse(cache_path) + + +@app.get("/cache") +async def list_cache(): + """List cached content hashes.""" + return [f.name for f in CACHE_DIR.iterdir() if f.is_file()] + + +# Known assets (bootstrap data) +KNOWN_ASSETS = { + "cat": "33268b6e167deaf018cc538de12dbe562612b33e89a749391cef855b320a269b", +} + + +@app.get("/assets") +async def list_assets(): + """List known assets.""" + return KNOWN_ASSETS + + +@app.post("/cache/import") +async def import_to_cache(path: str): + """Import a local file to cache.""" + source = Path(path) + if not source.exists(): + raise HTTPException(404, f"File not found: {path}") + + content_hash = cache_file(source) + return {"content_hash": content_hash, "cached": True} + + +if __name__ == "__main__": + import uvicorn + uvicorn.run(app, host="0.0.0.0", port=8100)