#!/usr/bin/env python3 """ Art DAG L1 Server Manages rendering runs and provides access to the cache. - POST /runs - start a run (recipe + inputs) - GET /runs/{run_id} - get run status/result - GET /cache/{content_hash} - get cached content """ import hashlib import json import os import uuid from datetime import datetime, timezone from pathlib import Path from typing import Optional from fastapi import FastAPI, HTTPException, UploadFile, File, Depends, Form, Request from fastapi.responses import FileResponse, HTMLResponse, RedirectResponse from fastapi.security import HTTPBearer, HTTPAuthorizationCredentials from pydantic import BaseModel import redis import requests as http_requests from urllib.parse import urlparse from celery_app import app as celery_app from tasks import render_effect # L2 server for auth verification L2_SERVER = os.environ.get("L2_SERVER", "http://localhost:8200") L2_DOMAIN = os.environ.get("L2_DOMAIN", "artdag.rose-ash.com") # Cache directory (use /data/cache in Docker, ~/.artdag/cache locally) CACHE_DIR = Path(os.environ.get("CACHE_DIR", str(Path.home() / ".artdag" / "cache"))) CACHE_DIR.mkdir(parents=True, exist_ok=True) # Redis for persistent run storage REDIS_URL = os.environ.get('REDIS_URL', 'redis://localhost:6379/5') parsed = urlparse(REDIS_URL) redis_client = redis.Redis( host=parsed.hostname or 'localhost', port=parsed.port or 6379, db=int(parsed.path.lstrip('/') or 0) ) RUNS_KEY_PREFIX = "artdag:run:" def save_run(run: "RunStatus"): """Save run to Redis.""" redis_client.set(f"{RUNS_KEY_PREFIX}{run.run_id}", run.model_dump_json()) def load_run(run_id: str) -> Optional["RunStatus"]: """Load run from Redis.""" data = redis_client.get(f"{RUNS_KEY_PREFIX}{run_id}") if data: return RunStatus.model_validate_json(data) return None def list_all_runs() -> list["RunStatus"]: """List all runs from Redis.""" runs = [] for key in redis_client.scan_iter(f"{RUNS_KEY_PREFIX}*"): data = redis_client.get(key) if data: runs.append(RunStatus.model_validate_json(data)) return sorted(runs, key=lambda r: r.created_at, reverse=True) app = FastAPI( title="Art DAG L1 Server", description="Distributed rendering server for Art DAG", version="0.1.0" ) class RunRequest(BaseModel): """Request to start a run.""" recipe: str # Recipe name (e.g., "dog", "identity") inputs: list[str] # List of content hashes output_name: Optional[str] = None class RunStatus(BaseModel): """Status of a run.""" run_id: str status: str # pending, running, completed, failed recipe: str inputs: list[str] output_name: str created_at: str completed_at: Optional[str] = None output_hash: Optional[str] = None error: Optional[str] = None celery_task_id: Optional[str] = None effects_commit: Optional[str] = None effect_url: Optional[str] = None # URL to effect source code username: Optional[str] = None # Owner of the run (ActivityPub actor ID) infrastructure: Optional[dict] = None # Hardware/software used for rendering # ============ Auth ============ security = HTTPBearer(auto_error=False) def verify_token_with_l2(token: str) -> Optional[str]: """Verify token with L2 server, return username if valid.""" try: resp = http_requests.post( f"{L2_SERVER}/auth/verify", headers={"Authorization": f"Bearer {token}"}, timeout=5 ) if resp.status_code == 200: return resp.json().get("username") except Exception: pass return None async def get_optional_user( credentials: HTTPAuthorizationCredentials = Depends(security) ) -> Optional[str]: """Get username if authenticated, None otherwise.""" if not credentials: return None return verify_token_with_l2(credentials.credentials) async def get_required_user( credentials: HTTPAuthorizationCredentials = Depends(security) ) -> str: """Get username, raise 401 if not authenticated.""" if not credentials: raise HTTPException(401, "Not authenticated") username = verify_token_with_l2(credentials.credentials) if not username: raise HTTPException(401, "Invalid token") return username def file_hash(path: Path) -> str: """Compute SHA3-256 hash of a file.""" hasher = hashlib.sha3_256() with open(path, "rb") as f: for chunk in iter(lambda: f.read(65536), b""): hasher.update(chunk) return hasher.hexdigest() def cache_file(source: Path) -> str: """Copy file to cache, return content hash.""" content_hash = file_hash(source) cache_path = CACHE_DIR / content_hash if not cache_path.exists(): import shutil shutil.copy2(source, cache_path) return content_hash @app.get("/api") async def api_info(): """Server info (JSON).""" return { "name": "Art DAG L1 Server", "version": "0.1.0", "cache_dir": str(CACHE_DIR), "runs_count": len(list_all_runs()) } HOME_HTML = """ Art DAG L1 Server

Art DAG L1 Server

L1 rendering server for the Art DAG system. Manages distributed rendering jobs via Celery workers.

Dependencies

API Endpoints

MethodPathDescription
GET/uiWeb UI for viewing runs
POST/runsStart a rendering run
GET/runsList all runs
GET/runs/{run_id}Get run status
GET/cacheList cached content hashes
GET/cache/{hash}Download cached content
POST/cache/uploadUpload file to cache
GET/assetsList known assets

Start a Run

curl -X POST /runs \\
  -H "Content-Type: application/json" \\
  -d '{"recipe": "dog", "inputs": ["33268b6e..."]}'

Provenance

Every render produces a provenance record linking inputs, effects, and infrastructure:

{
  "output": {"content_hash": "..."},
  "inputs": [...],
  "effects": [...],
  "infrastructure": {...}
}
""" @app.get("/", response_class=HTMLResponse) async def root(): """Home page.""" return HOME_HTML @app.post("/runs", response_model=RunStatus) async def create_run(request: RunRequest, username: str = Depends(get_required_user)): """Start a new rendering run. Requires authentication.""" run_id = str(uuid.uuid4()) # Generate output name if not provided output_name = request.output_name or f"{request.recipe}-{run_id[:8]}" # Format username as ActivityPub actor ID actor_id = f"@{username}@{L2_DOMAIN}" # Create run record run = RunStatus( run_id=run_id, status="pending", recipe=request.recipe, inputs=request.inputs, output_name=output_name, created_at=datetime.now(timezone.utc).isoformat(), username=actor_id ) # Submit to Celery # For now, we only support single-input recipes if len(request.inputs) != 1: raise HTTPException(400, "Currently only single-input recipes supported") input_hash = request.inputs[0] task = render_effect.delay(input_hash, request.recipe, output_name) run.celery_task_id = task.id run.status = "running" save_run(run) return run @app.get("/runs/{run_id}", response_model=RunStatus) async def get_run(run_id: str): """Get status of a run.""" run = load_run(run_id) if not run: raise HTTPException(404, f"Run {run_id} not found") # Check Celery task status if running if run.status == "running" and run.celery_task_id: task = celery_app.AsyncResult(run.celery_task_id) if task.ready(): if task.successful(): result = task.result run.status = "completed" run.completed_at = datetime.now(timezone.utc).isoformat() run.output_hash = result.get("output", {}).get("content_hash") # Extract effects info from provenance effects = result.get("effects", []) if effects: run.effects_commit = effects[0].get("repo_commit") run.effect_url = effects[0].get("repo_url") # Extract infrastructure info run.infrastructure = result.get("infrastructure") # Cache the output output_path = Path(result.get("output", {}).get("local_path", "")) if output_path.exists(): cache_file(output_path) else: run.status = "failed" run.error = str(task.result) # Save updated status save_run(run) return run @app.get("/runs") async def list_runs(): """List all runs.""" return list_all_runs() @app.get("/cache/{content_hash}") async def get_cached(content_hash: str): """Get cached content by hash.""" cache_path = CACHE_DIR / content_hash if not cache_path.exists(): raise HTTPException(404, f"Content {content_hash} not in cache") return FileResponse(cache_path) @app.get("/ui/cache/{content_hash}", response_class=HTMLResponse) async def ui_cache_view(content_hash: str): """View cached content with appropriate display.""" cache_path = CACHE_DIR / content_hash if not cache_path.exists(): return HTMLResponse(f""" Not Found | Art DAG L1

Art DAG L1 Server

← Back to runs

Content not found: {content_hash}

""", status_code=404) media_type = detect_media_type(cache_path) file_size = cache_path.stat().st_size size_str = f"{file_size:,} bytes" if file_size > 1024*1024: size_str = f"{file_size/(1024*1024):.1f} MB" elif file_size > 1024: size_str = f"{file_size/1024:.1f} KB" html = f""" {content_hash[:16]}... | Art DAG L1

Art DAG L1 Server

← Back to runs
{media_type.capitalize()} {content_hash[:32]}...
Download
""" if media_type == "video": html += f'' elif media_type == "image": html += f'{content_hash}' else: html += f'

Unknown file type. Download file

' html += f"""

Details

Content Hash (SHA3-256)
{content_hash}
Type
{media_type}
Size
{size_str}
""" return html @app.get("/cache") async def list_cache(): """List cached content hashes.""" return [f.name for f in CACHE_DIR.iterdir() if f.is_file()] # Known assets (bootstrap data) KNOWN_ASSETS = { "cat": "33268b6e167deaf018cc538de12dbe562612b33e89a749391cef855b320a269b", } @app.get("/assets") async def list_assets(): """List known assets.""" return KNOWN_ASSETS @app.post("/cache/import") async def import_to_cache(path: str): """Import a local file to cache.""" source = Path(path) if not source.exists(): raise HTTPException(404, f"File not found: {path}") content_hash = cache_file(source) return {"content_hash": content_hash, "cached": True} @app.post("/cache/upload") async def upload_to_cache(file: UploadFile = File(...)): """Upload a file to cache.""" # Write to temp file first import tempfile with tempfile.NamedTemporaryFile(delete=False) as tmp: content = await file.read() tmp.write(content) tmp_path = Path(tmp.name) # Hash and move to cache content_hash = file_hash(tmp_path) cache_path = CACHE_DIR / content_hash if not cache_path.exists(): import shutil shutil.move(str(tmp_path), cache_path) else: tmp_path.unlink() return {"content_hash": content_hash, "filename": file.filename, "size": len(content)} def detect_media_type(cache_path: Path) -> str: """Detect if file is image or video based on magic bytes.""" with open(cache_path, "rb") as f: header = f.read(32) # Video signatures if header[:4] == b'\x1a\x45\xdf\xa3': # WebM/MKV return "video" if header[4:8] == b'ftyp': # MP4/MOV return "video" if header[:4] == b'RIFF' and header[8:12] == b'AVI ': # AVI return "video" # Image signatures if header[:8] == b'\x89PNG\r\n\x1a\n': # PNG return "image" if header[:2] == b'\xff\xd8': # JPEG return "image" if header[:6] in (b'GIF87a', b'GIF89a'): # GIF return "image" if header[:4] == b'RIFF' and header[8:12] == b'WEBP': # WebP return "image" return "unknown" def get_user_from_cookie(request) -> Optional[str]: """Get username from auth cookie.""" token = request.cookies.get("auth_token") if not token: return None return verify_token_with_l2(token) UI_CSS = """ * { box-sizing: border-box; } body { font-family: system-ui, -apple-system, sans-serif; margin: 0; padding: 24px; background: #111; color: #eee; font-size: 16px; } h1 { margin: 0 0 24px 0; color: #fff; font-size: 28px; } h2 { color: #ccc; margin: 24px 0 12px 0; font-size: 20px; } a { color: #60a5fa; text-decoration: none; } a:hover { color: #93c5fd; text-decoration: underline; } .runs { display: flex; flex-direction: column; gap: 16px; max-width: 900px; } .run { background: #222; border-radius: 8px; padding: 20px; border: 1px solid #333; } .run-link { display: block; text-decoration: none; color: inherit; max-width: 900px; } .run-link:hover .run { border-color: #555; background: #282828; } .run-header { display: flex; justify-content: space-between; align-items: center; margin-bottom: 14px; } .run-id { font-family: monospace; font-size: 14px; color: #888; margin-left: 12px; } .run-recipe { font-weight: bold; font-size: 22px; color: #fff; } .status { padding: 6px 14px; border-radius: 12px; font-size: 14px; font-weight: 500; } .status.completed { background: #1a4d1a; color: #4ade80; } .status.running { background: #4d4d1a; color: #facc15; } .status.failed { background: #4d1a1a; color: #f87171; } .status.pending { background: #333; color: #888; } .media-row { display: flex; gap: 20px; margin-top: 16px; flex-wrap: wrap; justify-content: flex-start; } .media-box { flex: 0 1 auto; min-width: 200px; max-width: 400px; } .media-box label { font-size: 13px; color: #888; display: block; margin-bottom: 6px; } .media-container { } .media-container img, .media-container video { max-width: 100%; max-height: 300px; border-radius: 4px; } @media (max-width: 600px) { .media-row { flex-direction: column; } .media-box { min-width: 100%; max-width: 100%; } } .hash { font-family: monospace; font-size: 13px; color: #666; } .hash a { color: #888; } .hash a:hover { color: #60a5fa; } .info { font-size: 15px; color: #aaa; } .refresh-btn, .back-btn { background: #333; color: #fff; border: none; padding: 10px 20px; border-radius: 4px; cursor: pointer; margin-bottom: 20px; text-decoration: none; display: inline-block; font-size: 15px; } .refresh-btn:hover, .back-btn:hover { background: #444; } .no-runs { color: #666; font-style: italic; font-size: 16px; } .provenance { background: #1a1a1a; border-radius: 8px; padding: 20px; margin-top: 20px; max-width: 700px; } .prov-item { margin: 12px 0; } .prov-label { color: #888; font-size: 14px; margin-bottom: 4px; } .prov-value { font-family: monospace; font-size: 15px; word-break: break-all; } code { background: #222; padding: 2px 6px; border-radius: 4px; } """ def render_ui_html(username: Optional[str] = None, tab: str = "runs") -> str: """Render main UI HTML with optional user context.""" user_info = "" if username: user_info = f'''
Logged in as {username} Logout
''' else: user_info = '''
Login
''' runs_active = "active" if tab == "runs" else "" cache_active = "active" if tab == "cache" else "" runs_content = "" cache_content = "" if tab == "runs": runs_content = '''
Loading...
''' else: cache_content = '''
Loading...
''' return f""" Art DAG L1 Server {user_info}

Art DAG L1 Server

{runs_content} {cache_content} """ UI_LOGIN_HTML = """ Login | Art DAG L1 Server

Art DAG L1 Server

← Back
Login Register
""" UI_REGISTER_HTML = """ Register | Art DAG L1 Server

Art DAG L1 Server

← Back
Login Register
""" @app.get("/ui", response_class=HTMLResponse) async def ui_index(request: Request, tab: str = "runs"): """Web UI for viewing runs and cache.""" username = get_user_from_cookie(request) return render_ui_html(username, tab) @app.get("/ui/login", response_class=HTMLResponse) async def ui_login_page(): """Login page.""" return UI_LOGIN_HTML @app.post("/ui/login") async def ui_login(username: str = Form(...), password: str = Form(...)): """Process login form.""" try: resp = http_requests.post( f"{L2_SERVER}/auth/login", json={"username": username, "password": password}, timeout=5 ) if resp.status_code == 200: token = resp.json().get("access_token") response = RedirectResponse(url="/ui", status_code=303) response.set_cookie("auth_token", token, httponly=True, max_age=30*24*60*60) return response except Exception: pass return HTMLResponse(UI_LOGIN_HTML.replace( '', '

Invalid username or password

' )) @app.get("/ui/register", response_class=HTMLResponse) async def ui_register_page(): """Register page.""" return UI_REGISTER_HTML @app.post("/ui/register") async def ui_register( username: str = Form(...), password: str = Form(...), email: str = Form(None) ): """Process registration form.""" try: resp = http_requests.post( f"{L2_SERVER}/auth/register", json={"username": username, "password": password, "email": email}, timeout=5 ) if resp.status_code == 200: token = resp.json().get("access_token") response = RedirectResponse(url="/ui", status_code=303) response.set_cookie("auth_token", token, httponly=True, max_age=30*24*60*60) return response elif resp.status_code == 400: error = resp.json().get("detail", "Registration failed") return HTMLResponse(UI_REGISTER_HTML.replace( '', f'

{error}

' )) except Exception as e: return HTMLResponse(UI_REGISTER_HTML.replace( '', f'

Registration failed: {e}

' )) @app.get("/ui/logout") async def ui_logout(): """Logout - clear cookie.""" response = RedirectResponse(url="/ui", status_code=303) response.delete_cookie("auth_token") return response @app.get("/ui/runs", response_class=HTMLResponse) async def ui_runs(request: Request): """HTMX partial: list of runs.""" current_user = get_user_from_cookie(request) runs = list_all_runs() # Require login to see runs if not current_user: return '

Login to see your runs.

' # Filter runs by user - match both plain username and ActivityPub format (@user@domain) actor_id = f"@{current_user}@{L2_DOMAIN}" runs = [r for r in runs if r.username in (current_user, actor_id)] if not runs: return '

You have no runs yet. Use the CLI to start a run.

' html_parts = ['
'] for run in runs[:20]: # Limit to 20 most recent status_class = run.status effect_url = f"https://git.rose-ash.com/art-dag/effects/src/branch/main/{run.recipe}" owner_badge = f'by {run.username or "anonymous"}' if not current_user else '' html_parts.append(f'''
{run.recipe} {run.run_id}{owner_badge}
{run.status}
Created: {run.created_at[:19].replace('T', ' ')}
''') # Show input and output side by side has_input = run.inputs and (CACHE_DIR / run.inputs[0]).exists() has_output = run.status == "completed" and run.output_hash and (CACHE_DIR / run.output_hash).exists() if has_input or has_output: html_parts.append('
') # Input box if has_input: input_hash = run.inputs[0] input_media_type = detect_media_type(CACHE_DIR / input_hash) html_parts.append(f'''
''') if input_media_type == "video": html_parts.append(f'') elif input_media_type == "image": html_parts.append(f'input') html_parts.append('
') # Output box if has_output: output_hash = run.output_hash output_media_type = detect_media_type(CACHE_DIR / output_hash) html_parts.append(f'''
''') if output_media_type == "video": html_parts.append(f'') elif output_media_type == "image": html_parts.append(f'output') html_parts.append('
') html_parts.append('
') # Show error if failed if run.status == "failed" and run.error: html_parts.append(f'
Error: {run.error}
') html_parts.append('
') html_parts.append('
') return '\n'.join(html_parts) @app.get("/ui/cache-list", response_class=HTMLResponse) async def ui_cache_list(request: Request): """HTMX partial: list of cached items.""" current_user = get_user_from_cookie(request) # Require login to see cache if not current_user: return '

Login to see cached content.

' # Get all cached files cache_items = [] if CACHE_DIR.exists(): for f in CACHE_DIR.iterdir(): if f.is_file() and not f.name.endswith('.provenance.json'): stat = f.stat() cache_items.append({ "hash": f.name, "size": stat.st_size, "mtime": stat.st_mtime }) # Sort by modification time (newest first) cache_items.sort(key=lambda x: x["mtime"], reverse=True) if not cache_items: return '

Cache is empty.

' html_parts = ['
'] for item in cache_items[:50]: # Limit to 50 items content_hash = item["hash"] cache_path = CACHE_DIR / content_hash media_type = detect_media_type(cache_path) # Format size size = item["size"] if size > 1024*1024: size_str = f"{size/(1024*1024):.1f} MB" elif size > 1024: size_str = f"{size/1024:.1f} KB" else: size_str = f"{size} bytes" html_parts.append(f'''
{media_type} {content_hash}
{size_str}
''') if media_type == "video": html_parts.append(f'') elif media_type == "image": html_parts.append(f'{content_hash[:16]}') else: html_parts.append(f'

Unknown file type

') html_parts.append('''
''') html_parts.append('
') return '\n'.join(html_parts) @app.get("/ui/detail/{run_id}", response_class=HTMLResponse) async def ui_detail_page(run_id: str): """Full detail page for a run.""" run = load_run(run_id) if not run: return HTMLResponse('

Run not found

', status_code=404) # Check Celery task status if running if run.status == "running" and run.celery_task_id: task = celery_app.AsyncResult(run.celery_task_id) if task.ready(): if task.successful(): result = task.result run.status = "completed" run.completed_at = datetime.now(timezone.utc).isoformat() run.output_hash = result.get("output", {}).get("content_hash") # Extract effects info from provenance effects = result.get("effects", []) if effects: run.effects_commit = effects[0].get("repo_commit") run.effect_url = effects[0].get("repo_url") # Extract infrastructure info run.infrastructure = result.get("infrastructure") output_path = Path(result.get("output", {}).get("local_path", "")) if output_path.exists(): cache_file(output_path) else: run.status = "failed" run.error = str(task.result) save_run(run) # Use stored effect URL or build fallback if run.effect_url: effect_url = run.effect_url elif run.effects_commit and run.effects_commit != "unknown": effect_url = f"https://git.rose-ash.com/art-dag/effects/src/commit/{run.effects_commit}/{run.recipe}" else: effect_url = f"https://git.rose-ash.com/art-dag/effects/src/branch/main/{run.recipe}" status_class = run.status html = f""" {run.recipe} - {run.run_id[:8]} | Art DAG L1

Art DAG L1 Server

← Back to runs
{run.recipe} {run.run_id}
{run.status}
""" # Media row has_input = run.inputs and (CACHE_DIR / run.inputs[0]).exists() has_output = run.status == "completed" and run.output_hash and (CACHE_DIR / run.output_hash).exists() if has_input or has_output: html += '
' if has_input: input_hash = run.inputs[0] input_media_type = detect_media_type(CACHE_DIR / input_hash) html += f'''
''' if input_media_type == "video": html += f'' elif input_media_type == "image": html += f'input' html += '
' if has_output: output_hash = run.output_hash output_media_type = detect_media_type(CACHE_DIR / output_hash) html += f'''
''' if output_media_type == "video": html += f'' elif output_media_type == "image": html += f'output' html += '
' html += '
' # Provenance section html += f'''

Provenance

Owner
{run.username or "anonymous"}
Effect
Effects Commit
{run.effects_commit or "N/A"}
Input(s)
''' for inp in run.inputs: html += f'{inp}
' html += f'''
''' if run.output_hash: html += f''' ''' # Infrastructure section if run.infrastructure: software = run.infrastructure.get("software", {}) hardware = run.infrastructure.get("hardware", {}) html += f'''
Infrastructure
Software: {software.get("name", "unknown")} ({software.get("content_hash", "unknown")[:16]}...)
Hardware: {hardware.get("name", "unknown")} ({hardware.get("content_hash", "unknown")[:16]}...)
''' html += f'''
Run ID
{run.run_id}
Created
{run.created_at}
''' if run.completed_at: html += f'''
Completed
{run.completed_at}
''' if run.error: html += f'''
Error
{run.error}
''' # Raw JSON provenance provenance_json = json.dumps({ "run_id": run.run_id, "status": run.status, "recipe": run.recipe, "effects_commit": run.effects_commit, "effect_url": run.effect_url or effect_url, "inputs": run.inputs, "output_hash": run.output_hash, "output_name": run.output_name, "created_at": run.created_at, "completed_at": run.completed_at, "username": run.username, "infrastructure": run.infrastructure, "error": run.error }, indent=2) html += f'''

Raw JSON

{provenance_json}
''' return html @app.get("/ui/run/{run_id}", response_class=HTMLResponse) async def ui_run_partial(run_id: str): """HTMX partial: single run (for polling updates).""" run = load_run(run_id) if not run: return '
Run not found
' # Check Celery task status if running if run.status == "running" and run.celery_task_id: task = celery_app.AsyncResult(run.celery_task_id) if task.ready(): if task.successful(): result = task.result run.status = "completed" run.completed_at = datetime.now(timezone.utc).isoformat() run.output_hash = result.get("output", {}).get("content_hash") # Extract effects info from provenance effects = result.get("effects", []) if effects: run.effects_commit = effects[0].get("repo_commit") run.effect_url = effects[0].get("repo_url") # Extract infrastructure info run.infrastructure = result.get("infrastructure") output_path = Path(result.get("output", {}).get("local_path", "")) if output_path.exists(): cache_file(output_path) else: run.status = "failed" run.error = str(task.result) save_run(run) status_class = run.status poll_attr = 'hx-get="/ui/run/{}" hx-trigger="every 2s" hx-swap="outerHTML"'.format(run_id) if run.status == "running" else "" html = f'''
{run.recipe} {run.run_id}
{run.status}
Created: {run.created_at[:19].replace('T', ' ')}
''' # Show input and output side by side has_input = run.inputs and (CACHE_DIR / run.inputs[0]).exists() has_output = run.status == "completed" and run.output_hash and (CACHE_DIR / run.output_hash).exists() if has_input or has_output: html += '
' if has_input: input_hash = run.inputs[0] input_media_type = detect_media_type(CACHE_DIR / input_hash) html += f'
' if input_media_type == "video": html += f'' elif input_media_type == "image": html += f'input' html += '
' if has_output: output_hash = run.output_hash output_media_type = detect_media_type(CACHE_DIR / output_hash) html += f'
' if output_media_type == "video": html += f'' elif output_media_type == "image": html += f'output' html += '
' html += '
' if run.status == "failed" and run.error: html += f'
Error: {run.error}
' html += '
' return html if __name__ == "__main__": import uvicorn uvicorn.run(app, host="0.0.0.0", port=8100)