Files
celery/server.py
gilesb dc9c13ffd9 feat: add HTML home page with nav to UI and docs
🤖 Generated with [Claude Code](https://claude.com/claude-code)

Co-Authored-By: Claude Opus 4.5 <noreply@anthropic.com>
2026-01-07 14:13:56 +00:00

582 lines
19 KiB
Python

#!/usr/bin/env python3
"""
Art DAG L1 Server
Manages rendering runs and provides access to the cache.
- POST /runs - start a run (recipe + inputs)
- GET /runs/{run_id} - get run status/result
- GET /cache/{content_hash} - get cached content
"""
import hashlib
import json
import os
import uuid
from datetime import datetime, timezone
from pathlib import Path
from typing import Optional
from fastapi import FastAPI, HTTPException, UploadFile, File
from fastapi.responses import FileResponse, HTMLResponse
from pydantic import BaseModel
import redis
from urllib.parse import urlparse
from celery_app import app as celery_app
from tasks import render_effect
# Cache directory (use /data/cache in Docker, ~/.artdag/cache locally)
CACHE_DIR = Path(os.environ.get("CACHE_DIR", str(Path.home() / ".artdag" / "cache")))
CACHE_DIR.mkdir(parents=True, exist_ok=True)
# Redis for persistent run storage
REDIS_URL = os.environ.get('REDIS_URL', 'redis://localhost:6379/5')
parsed = urlparse(REDIS_URL)
redis_client = redis.Redis(
host=parsed.hostname or 'localhost',
port=parsed.port or 6379,
db=int(parsed.path.lstrip('/') or 0)
)
RUNS_KEY_PREFIX = "artdag:run:"
def save_run(run: "RunStatus"):
"""Save run to Redis."""
redis_client.set(f"{RUNS_KEY_PREFIX}{run.run_id}", run.model_dump_json())
def load_run(run_id: str) -> Optional["RunStatus"]:
"""Load run from Redis."""
data = redis_client.get(f"{RUNS_KEY_PREFIX}{run_id}")
if data:
return RunStatus.model_validate_json(data)
return None
def list_all_runs() -> list["RunStatus"]:
"""List all runs from Redis."""
runs = []
for key in redis_client.scan_iter(f"{RUNS_KEY_PREFIX}*"):
data = redis_client.get(key)
if data:
runs.append(RunStatus.model_validate_json(data))
return sorted(runs, key=lambda r: r.created_at, reverse=True)
app = FastAPI(
title="Art DAG L1 Server",
description="Distributed rendering server for Art DAG",
version="0.1.0"
)
class RunRequest(BaseModel):
"""Request to start a run."""
recipe: str # Recipe name (e.g., "dog", "identity")
inputs: list[str] # List of content hashes
output_name: Optional[str] = None
class RunStatus(BaseModel):
"""Status of a run."""
run_id: str
status: str # pending, running, completed, failed
recipe: str
inputs: list[str]
output_name: str
created_at: str
completed_at: Optional[str] = None
output_hash: Optional[str] = None
error: Optional[str] = None
celery_task_id: Optional[str] = None
def file_hash(path: Path) -> str:
"""Compute SHA3-256 hash of a file."""
hasher = hashlib.sha3_256()
with open(path, "rb") as f:
for chunk in iter(lambda: f.read(65536), b""):
hasher.update(chunk)
return hasher.hexdigest()
def cache_file(source: Path) -> str:
"""Copy file to cache, return content hash."""
content_hash = file_hash(source)
cache_path = CACHE_DIR / content_hash
if not cache_path.exists():
import shutil
shutil.copy2(source, cache_path)
return content_hash
@app.get("/api")
async def api_info():
"""Server info (JSON)."""
return {
"name": "Art DAG L1 Server",
"version": "0.1.0",
"cache_dir": str(CACHE_DIR),
"runs_count": len(list_all_runs())
}
HOME_HTML = """
<!DOCTYPE html>
<html>
<head>
<title>Art DAG L1 Server</title>
<style>
* { box-sizing: border-box; }
body {
font-family: system-ui, -apple-system, sans-serif;
margin: 0; padding: 40px;
background: #111; color: #eee;
line-height: 1.6;
}
.container { max-width: 800px; margin: 0 auto; }
h1 { color: #fff; border-bottom: 1px solid #333; padding-bottom: 10px; }
h2 { color: #ccc; margin-top: 30px; }
a { color: #60a5fa; }
a:hover { color: #93c5fd; }
code {
background: #222; padding: 2px 6px; border-radius: 4px;
font-family: 'SF Mono', Monaco, monospace;
}
pre {
background: #1a1a1a; padding: 16px; border-radius: 8px;
overflow-x: auto; border: 1px solid #333;
}
pre code { background: none; padding: 0; }
table { border-collapse: collapse; width: 100%; margin: 16px 0; }
th, td { border: 1px solid #333; padding: 8px 12px; text-align: left; }
th { background: #222; }
.nav {
background: #1a1a1a; padding: 16px; border-radius: 8px;
margin-bottom: 30px; display: flex; gap: 20px;
}
.nav a {
font-weight: 500; text-decoration: none;
padding: 8px 16px; background: #333; border-radius: 4px;
}
.nav a:hover { background: #444; }
</style>
</head>
<body>
<div class="container">
<div class="nav">
<a href="/ui">Runs UI</a>
<a href="/docs">API Docs</a>
<a href="/api">API Info</a>
</div>
<h1>Art DAG L1 Server</h1>
<p>L1 rendering server for the Art DAG system. Manages distributed rendering jobs via Celery workers.</p>
<h2>Dependencies</h2>
<ul>
<li><strong>artdag</strong> (GitHub): Core DAG execution engine</li>
<li><strong>artdag-effects</strong> (rose-ash): Effect implementations</li>
<li><strong>Redis</strong>: Message broker, result backend, and run persistence</li>
</ul>
<h2>API Endpoints</h2>
<table>
<tr><th>Method</th><th>Path</th><th>Description</th></tr>
<tr><td>GET</td><td><code>/ui</code></td><td>Web UI for viewing runs</td></tr>
<tr><td>POST</td><td><code>/runs</code></td><td>Start a rendering run</td></tr>
<tr><td>GET</td><td><code>/runs</code></td><td>List all runs</td></tr>
<tr><td>GET</td><td><code>/runs/{run_id}</code></td><td>Get run status</td></tr>
<tr><td>GET</td><td><code>/cache</code></td><td>List cached content hashes</td></tr>
<tr><td>GET</td><td><code>/cache/{hash}</code></td><td>Download cached content</td></tr>
<tr><td>POST</td><td><code>/cache/upload</code></td><td>Upload file to cache</td></tr>
<tr><td>GET</td><td><code>/assets</code></td><td>List known assets</td></tr>
</table>
<h2>Start a Run</h2>
<pre><code>curl -X POST /runs \\
-H "Content-Type: application/json" \\
-d '{"recipe": "dog", "inputs": ["33268b6e..."]}'</code></pre>
<h2>Provenance</h2>
<p>Every render produces a provenance record linking inputs, effects, and infrastructure:</p>
<pre><code>{
"output": {"content_hash": "..."},
"inputs": [...],
"effects": [...],
"infrastructure": {...}
}</code></pre>
</div>
</body>
</html>
"""
@app.get("/", response_class=HTMLResponse)
async def root():
"""Home page."""
return HOME_HTML
@app.post("/runs", response_model=RunStatus)
async def create_run(request: RunRequest):
"""Start a new rendering run."""
run_id = str(uuid.uuid4())
# Generate output name if not provided
output_name = request.output_name or f"{request.recipe}-{run_id[:8]}"
# Create run record
run = RunStatus(
run_id=run_id,
status="pending",
recipe=request.recipe,
inputs=request.inputs,
output_name=output_name,
created_at=datetime.now(timezone.utc).isoformat()
)
# Submit to Celery
# For now, we only support single-input recipes
if len(request.inputs) != 1:
raise HTTPException(400, "Currently only single-input recipes supported")
input_hash = request.inputs[0]
task = render_effect.delay(input_hash, request.recipe, output_name)
run.celery_task_id = task.id
run.status = "running"
save_run(run)
return run
@app.get("/runs/{run_id}", response_model=RunStatus)
async def get_run(run_id: str):
"""Get status of a run."""
run = load_run(run_id)
if not run:
raise HTTPException(404, f"Run {run_id} not found")
# Check Celery task status if running
if run.status == "running" and run.celery_task_id:
task = celery_app.AsyncResult(run.celery_task_id)
if task.ready():
if task.successful():
result = task.result
run.status = "completed"
run.completed_at = datetime.now(timezone.utc).isoformat()
run.output_hash = result.get("output", {}).get("content_hash")
# Cache the output
output_path = Path(result.get("output", {}).get("local_path", ""))
if output_path.exists():
cache_file(output_path)
else:
run.status = "failed"
run.error = str(task.result)
# Save updated status
save_run(run)
return run
@app.get("/runs")
async def list_runs():
"""List all runs."""
return list_all_runs()
@app.get("/cache/{content_hash}")
async def get_cached(content_hash: str):
"""Get cached content by hash."""
cache_path = CACHE_DIR / content_hash
if not cache_path.exists():
raise HTTPException(404, f"Content {content_hash} not in cache")
return FileResponse(cache_path)
@app.get("/cache")
async def list_cache():
"""List cached content hashes."""
return [f.name for f in CACHE_DIR.iterdir() if f.is_file()]
# Known assets (bootstrap data)
KNOWN_ASSETS = {
"cat": "33268b6e167deaf018cc538de12dbe562612b33e89a749391cef855b320a269b",
}
@app.get("/assets")
async def list_assets():
"""List known assets."""
return KNOWN_ASSETS
@app.post("/cache/import")
async def import_to_cache(path: str):
"""Import a local file to cache."""
source = Path(path)
if not source.exists():
raise HTTPException(404, f"File not found: {path}")
content_hash = cache_file(source)
return {"content_hash": content_hash, "cached": True}
@app.post("/cache/upload")
async def upload_to_cache(file: UploadFile = File(...)):
"""Upload a file to cache."""
# Write to temp file first
import tempfile
with tempfile.NamedTemporaryFile(delete=False) as tmp:
content = await file.read()
tmp.write(content)
tmp_path = Path(tmp.name)
# Hash and move to cache
content_hash = file_hash(tmp_path)
cache_path = CACHE_DIR / content_hash
if not cache_path.exists():
import shutil
shutil.move(str(tmp_path), cache_path)
else:
tmp_path.unlink()
return {"content_hash": content_hash, "filename": file.filename, "size": len(content)}
def detect_media_type(cache_path: Path) -> str:
"""Detect if file is image or video based on magic bytes."""
with open(cache_path, "rb") as f:
header = f.read(32)
# Video signatures
if header[:4] == b'\x1a\x45\xdf\xa3': # WebM/MKV
return "video"
if header[4:8] == b'ftyp': # MP4/MOV
return "video"
if header[:4] == b'RIFF' and header[8:12] == b'AVI ': # AVI
return "video"
# Image signatures
if header[:8] == b'\x89PNG\r\n\x1a\n': # PNG
return "image"
if header[:2] == b'\xff\xd8': # JPEG
return "image"
if header[:6] in (b'GIF87a', b'GIF89a'): # GIF
return "image"
if header[:4] == b'RIFF' and header[8:12] == b'WEBP': # WebP
return "image"
return "unknown"
UI_HTML = """
<!DOCTYPE html>
<html>
<head>
<title>Art DAG L1 Server</title>
<script src="https://unpkg.com/htmx.org@1.9.10"></script>
<style>
* { box-sizing: border-box; }
body {
font-family: system-ui, -apple-system, sans-serif;
margin: 0; padding: 20px;
background: #111; color: #eee;
}
h1 { margin: 0 0 20px 0; color: #fff; }
.runs { display: flex; flex-direction: column; gap: 12px; }
.run {
background: #222; border-radius: 8px; padding: 16px;
border: 1px solid #333;
}
.run-header { display: flex; justify-content: space-between; align-items: center; margin-bottom: 12px; }
.run-id { font-family: monospace; font-size: 12px; color: #888; }
.run-recipe { font-weight: bold; font-size: 18px; }
.status {
padding: 4px 12px; border-radius: 12px; font-size: 12px; font-weight: 500;
}
.status.completed { background: #1a4d1a; color: #4ade80; }
.status.running { background: #4d4d1a; color: #facc15; }
.status.failed { background: #4d1a1a; color: #f87171; }
.status.pending { background: #333; color: #888; }
.media-container { margin-top: 12px; }
.media-container img, .media-container video {
max-width: 100%; max-height: 400px; border-radius: 4px;
}
.hash { font-family: monospace; font-size: 11px; color: #666; }
.info { font-size: 13px; color: #aaa; }
.refresh-btn {
background: #333; color: #fff; border: none; padding: 8px 16px;
border-radius: 4px; cursor: pointer; margin-bottom: 16px;
}
.refresh-btn:hover { background: #444; }
.no-runs { color: #666; font-style: italic; }
</style>
</head>
<body>
<h1>Art DAG L1 Server</h1>
<button class="refresh-btn" hx-get="/ui/runs" hx-target="#runs" hx-swap="innerHTML">
Refresh
</button>
<div id="runs" hx-get="/ui/runs" hx-trigger="load" hx-swap="innerHTML">
Loading...
</div>
</body>
</html>
"""
@app.get("/ui", response_class=HTMLResponse)
async def ui_index():
"""Web UI for viewing runs."""
return UI_HTML
@app.get("/ui/runs", response_class=HTMLResponse)
async def ui_runs():
"""HTMX partial: list of runs."""
runs = list_all_runs()
if not runs:
return '<p class="no-runs">No runs yet.</p>'
html_parts = ['<div class="runs">']
for run in runs[:20]: # Limit to 20 most recent
status_class = run.status
html_parts.append(f'''
<div class="run" hx-get="/ui/run/{run.run_id}" hx-trigger="every 2s[this.querySelector('.status.running')]" hx-swap="outerHTML">
<div class="run-header">
<div>
<span class="run-recipe">{run.recipe}</span>
<span class="run-id">{run.run_id}</span>
</div>
<span class="status {status_class}">{run.status}</span>
</div>
<div class="info">
Created: {run.created_at[:19].replace('T', ' ')}
</div>
''')
# Show input
if run.inputs:
input_hash = run.inputs[0]
html_parts.append(f'<div class="hash">Input: {input_hash[:32]}...</div>')
input_cache_path = CACHE_DIR / input_hash
if input_cache_path.exists():
input_media_type = detect_media_type(input_cache_path)
html_parts.append('<div class="media-container">')
if input_media_type == "video":
html_parts.append(f'<video src="/cache/{input_hash}" controls muted loop style="max-height:200px;"></video>')
elif input_media_type == "image":
html_parts.append(f'<img src="/cache/{input_hash}" alt="input" style="max-height:200px;">')
html_parts.append('</div>')
# Show output if completed
if run.status == "completed" and run.output_hash:
cache_path = CACHE_DIR / run.output_hash
if cache_path.exists():
media_type = detect_media_type(cache_path)
html_parts.append(f'<div class="hash">Output: {run.output_hash[:32]}...</div>')
html_parts.append('<div class="media-container">')
if media_type == "video":
html_parts.append(f'<video src="/cache/{run.output_hash}" controls autoplay muted loop></video>')
elif media_type == "image":
html_parts.append(f'<img src="/cache/{run.output_hash}" alt="{run.output_name}">')
html_parts.append('</div>')
# Show error if failed
if run.status == "failed" and run.error:
html_parts.append(f'<div class="info" style="color: #f87171;">Error: {run.error}</div>')
html_parts.append('</div>')
html_parts.append('</div>')
return '\n'.join(html_parts)
@app.get("/ui/run/{run_id}", response_class=HTMLResponse)
async def ui_run_detail(run_id: str):
"""HTMX partial: single run (for polling updates)."""
run = load_run(run_id)
if not run:
return '<div class="run">Run not found</div>'
# Check Celery task status if running
if run.status == "running" and run.celery_task_id:
task = celery_app.AsyncResult(run.celery_task_id)
if task.ready():
if task.successful():
result = task.result
run.status = "completed"
run.completed_at = datetime.now(timezone.utc).isoformat()
run.output_hash = result.get("output", {}).get("content_hash")
output_path = Path(result.get("output", {}).get("local_path", ""))
if output_path.exists():
cache_file(output_path)
else:
run.status = "failed"
run.error = str(task.result)
save_run(run)
status_class = run.status
poll_attr = 'hx-get="/ui/run/{}" hx-trigger="every 2s" hx-swap="outerHTML"'.format(run_id) if run.status == "running" else ""
html = f'''
<div class="run" {poll_attr}>
<div class="run-header">
<div>
<span class="run-recipe">{run.recipe}</span>
<span class="run-id">{run.run_id}</span>
</div>
<span class="status {status_class}">{run.status}</span>
</div>
<div class="info">
Created: {run.created_at[:19].replace('T', ' ')}
</div>
'''
if run.inputs:
input_hash = run.inputs[0]
html += f'<div class="hash">Input: {input_hash[:32]}...</div>'
input_cache_path = CACHE_DIR / input_hash
if input_cache_path.exists():
input_media_type = detect_media_type(input_cache_path)
html += '<div class="media-container">'
if input_media_type == "video":
html += f'<video src="/cache/{input_hash}" controls muted loop style="max-height:200px;"></video>'
elif input_media_type == "image":
html += f'<img src="/cache/{input_hash}" alt="input" style="max-height:200px;">'
html += '</div>'
if run.status == "completed" and run.output_hash:
cache_path = CACHE_DIR / run.output_hash
if cache_path.exists():
media_type = detect_media_type(cache_path)
html += f'<div class="hash">Output: {run.output_hash[:32]}...</div>'
html += '<div class="media-container">'
if media_type == "video":
html += f'<video src="/cache/{run.output_hash}" controls autoplay muted loop></video>'
elif media_type == "image":
html += f'<img src="/cache/{run.output_hash}" alt="{run.output_name}">'
html += '</div>'
if run.status == "failed" and run.error:
html += f'<div class="info" style="color: #f87171;">Error: {run.error}</div>'
html += '</div>'
return html
if __name__ == "__main__":
import uvicorn
uvicorn.run(app, host="0.0.0.0", port=8100)