Files
celery/server.py
gilesb 0cbfd87711 feat: per-user cache visibility and login protection
- Run detail page requires login and ownership check
- Cache uploads require login and track uploader metadata
- Cache list filtered to show only user's own files (uploaded or from runs)
- Cache detail view requires login and ownership check
- Add helper functions for cache metadata and user hash lookup

🤖 Generated with [Claude Code](https://claude.com/claude-code)

Co-Authored-By: Claude Opus 4.5 <noreply@anthropic.com>
2026-01-07 18:18:16 +00:00

1377 lines
47 KiB
Python

#!/usr/bin/env python3
"""
Art DAG L1 Server
Manages rendering runs and provides access to the cache.
- POST /runs - start a run (recipe + inputs)
- GET /runs/{run_id} - get run status/result
- GET /cache/{content_hash} - get cached content
"""
import hashlib
import json
import os
import uuid
from datetime import datetime, timezone
from pathlib import Path
from typing import Optional
from fastapi import FastAPI, HTTPException, UploadFile, File, Depends, Form, Request
from fastapi.responses import FileResponse, HTMLResponse, RedirectResponse
from fastapi.security import HTTPBearer, HTTPAuthorizationCredentials
from pydantic import BaseModel
import redis
import requests as http_requests
from urllib.parse import urlparse
from celery_app import app as celery_app
from tasks import render_effect
# L2 server for auth verification
L2_SERVER = os.environ.get("L2_SERVER", "http://localhost:8200")
L2_DOMAIN = os.environ.get("L2_DOMAIN", "artdag.rose-ash.com")
# Cache directory (use /data/cache in Docker, ~/.artdag/cache locally)
CACHE_DIR = Path(os.environ.get("CACHE_DIR", str(Path.home() / ".artdag" / "cache")))
CACHE_DIR.mkdir(parents=True, exist_ok=True)
# Redis for persistent run storage
REDIS_URL = os.environ.get('REDIS_URL', 'redis://localhost:6379/5')
parsed = urlparse(REDIS_URL)
redis_client = redis.Redis(
host=parsed.hostname or 'localhost',
port=parsed.port or 6379,
db=int(parsed.path.lstrip('/') or 0)
)
RUNS_KEY_PREFIX = "artdag:run:"
def save_run(run: "RunStatus"):
"""Save run to Redis."""
redis_client.set(f"{RUNS_KEY_PREFIX}{run.run_id}", run.model_dump_json())
def load_run(run_id: str) -> Optional["RunStatus"]:
"""Load run from Redis."""
data = redis_client.get(f"{RUNS_KEY_PREFIX}{run_id}")
if data:
return RunStatus.model_validate_json(data)
return None
def list_all_runs() -> list["RunStatus"]:
"""List all runs from Redis."""
runs = []
for key in redis_client.scan_iter(f"{RUNS_KEY_PREFIX}*"):
data = redis_client.get(key)
if data:
runs.append(RunStatus.model_validate_json(data))
return sorted(runs, key=lambda r: r.created_at, reverse=True)
app = FastAPI(
title="Art DAG L1 Server",
description="Distributed rendering server for Art DAG",
version="0.1.0"
)
class RunRequest(BaseModel):
"""Request to start a run."""
recipe: str # Recipe name (e.g., "dog", "identity")
inputs: list[str] # List of content hashes
output_name: Optional[str] = None
class RunStatus(BaseModel):
"""Status of a run."""
run_id: str
status: str # pending, running, completed, failed
recipe: str
inputs: list[str]
output_name: str
created_at: str
completed_at: Optional[str] = None
output_hash: Optional[str] = None
error: Optional[str] = None
celery_task_id: Optional[str] = None
effects_commit: Optional[str] = None
effect_url: Optional[str] = None # URL to effect source code
username: Optional[str] = None # Owner of the run (ActivityPub actor ID)
infrastructure: Optional[dict] = None # Hardware/software used for rendering
# ============ Auth ============
security = HTTPBearer(auto_error=False)
def verify_token_with_l2(token: str) -> Optional[str]:
"""Verify token with L2 server, return username if valid."""
try:
resp = http_requests.post(
f"{L2_SERVER}/auth/verify",
headers={"Authorization": f"Bearer {token}"},
timeout=5
)
if resp.status_code == 200:
return resp.json().get("username")
except Exception:
pass
return None
async def get_optional_user(
credentials: HTTPAuthorizationCredentials = Depends(security)
) -> Optional[str]:
"""Get username if authenticated, None otherwise."""
if not credentials:
return None
return verify_token_with_l2(credentials.credentials)
async def get_required_user(
credentials: HTTPAuthorizationCredentials = Depends(security)
) -> str:
"""Get username, raise 401 if not authenticated."""
if not credentials:
raise HTTPException(401, "Not authenticated")
username = verify_token_with_l2(credentials.credentials)
if not username:
raise HTTPException(401, "Invalid token")
return username
def file_hash(path: Path) -> str:
"""Compute SHA3-256 hash of a file."""
hasher = hashlib.sha3_256()
with open(path, "rb") as f:
for chunk in iter(lambda: f.read(65536), b""):
hasher.update(chunk)
return hasher.hexdigest()
def cache_file(source: Path) -> str:
"""Copy file to cache, return content hash."""
content_hash = file_hash(source)
cache_path = CACHE_DIR / content_hash
if not cache_path.exists():
import shutil
shutil.copy2(source, cache_path)
return content_hash
@app.get("/api")
async def api_info():
"""Server info (JSON)."""
return {
"name": "Art DAG L1 Server",
"version": "0.1.0",
"cache_dir": str(CACHE_DIR),
"runs_count": len(list_all_runs())
}
HOME_HTML = """
<!DOCTYPE html>
<html>
<head>
<title>Art DAG L1 Server</title>
<style>
* { box-sizing: border-box; }
body {
font-family: system-ui, -apple-system, sans-serif;
margin: 0; padding: 40px;
background: #111; color: #eee;
line-height: 1.6;
}
.container { max-width: 800px; margin: 0 auto; }
h1 { color: #fff; border-bottom: 1px solid #333; padding-bottom: 10px; }
h2 { color: #ccc; margin-top: 30px; }
a { color: #60a5fa; }
a:hover { color: #93c5fd; }
code {
background: #222; padding: 2px 6px; border-radius: 4px;
font-family: 'SF Mono', Monaco, monospace;
}
pre {
background: #1a1a1a; padding: 16px; border-radius: 8px;
overflow-x: auto; border: 1px solid #333;
}
pre code { background: none; padding: 0; }
table { border-collapse: collapse; width: 100%; margin: 16px 0; }
th, td { border: 1px solid #333; padding: 8px 12px; text-align: left; }
th { background: #222; }
.nav {
background: #1a1a1a; padding: 16px; border-radius: 8px;
margin-bottom: 30px; display: flex; gap: 20px;
}
.nav a {
font-weight: 500; text-decoration: none;
padding: 8px 16px; background: #333; border-radius: 4px;
}
.nav a:hover { background: #444; }
</style>
</head>
<body>
<div class="container">
<div class="nav">
<a href="/ui">Runs UI</a>
<a href="/docs">API Docs</a>
<a href="/api">API Info</a>
</div>
<h1>Art DAG L1 Server</h1>
<p>L1 rendering server for the Art DAG system. Manages distributed rendering jobs via Celery workers.</p>
<h2>Dependencies</h2>
<ul>
<li><strong>artdag</strong> (GitHub): Core DAG execution engine</li>
<li><strong>artdag-effects</strong> (rose-ash): Effect implementations</li>
<li><strong>Redis</strong>: Message broker, result backend, and run persistence</li>
</ul>
<h2>API Endpoints</h2>
<table>
<tr><th>Method</th><th>Path</th><th>Description</th></tr>
<tr><td>GET</td><td><code>/ui</code></td><td>Web UI for viewing runs</td></tr>
<tr><td>POST</td><td><code>/runs</code></td><td>Start a rendering run</td></tr>
<tr><td>GET</td><td><code>/runs</code></td><td>List all runs</td></tr>
<tr><td>GET</td><td><code>/runs/{run_id}</code></td><td>Get run status</td></tr>
<tr><td>GET</td><td><code>/cache</code></td><td>List cached content hashes</td></tr>
<tr><td>GET</td><td><code>/cache/{hash}</code></td><td>Download cached content</td></tr>
<tr><td>POST</td><td><code>/cache/upload</code></td><td>Upload file to cache</td></tr>
<tr><td>GET</td><td><code>/assets</code></td><td>List known assets</td></tr>
</table>
<h2>Start a Run</h2>
<pre><code>curl -X POST /runs \\
-H "Content-Type: application/json" \\
-d '{"recipe": "dog", "inputs": ["33268b6e..."]}'</code></pre>
<h2>Provenance</h2>
<p>Every render produces a provenance record linking inputs, effects, and infrastructure:</p>
<pre><code>{
"output": {"content_hash": "..."},
"inputs": [...],
"effects": [...],
"infrastructure": {...}
}</code></pre>
</div>
</body>
</html>
"""
@app.get("/", response_class=HTMLResponse)
async def root():
"""Home page."""
return HOME_HTML
@app.post("/runs", response_model=RunStatus)
async def create_run(request: RunRequest, username: str = Depends(get_required_user)):
"""Start a new rendering run. Requires authentication."""
run_id = str(uuid.uuid4())
# Generate output name if not provided
output_name = request.output_name or f"{request.recipe}-{run_id[:8]}"
# Format username as ActivityPub actor ID
actor_id = f"@{username}@{L2_DOMAIN}"
# Create run record
run = RunStatus(
run_id=run_id,
status="pending",
recipe=request.recipe,
inputs=request.inputs,
output_name=output_name,
created_at=datetime.now(timezone.utc).isoformat(),
username=actor_id
)
# Submit to Celery
# For now, we only support single-input recipes
if len(request.inputs) != 1:
raise HTTPException(400, "Currently only single-input recipes supported")
input_hash = request.inputs[0]
task = render_effect.delay(input_hash, request.recipe, output_name)
run.celery_task_id = task.id
run.status = "running"
save_run(run)
return run
@app.get("/runs/{run_id}", response_model=RunStatus)
async def get_run(run_id: str):
"""Get status of a run."""
run = load_run(run_id)
if not run:
raise HTTPException(404, f"Run {run_id} not found")
# Check Celery task status if running
if run.status == "running" and run.celery_task_id:
task = celery_app.AsyncResult(run.celery_task_id)
if task.ready():
if task.successful():
result = task.result
run.status = "completed"
run.completed_at = datetime.now(timezone.utc).isoformat()
run.output_hash = result.get("output", {}).get("content_hash")
# Extract effects info from provenance
effects = result.get("effects", [])
if effects:
run.effects_commit = effects[0].get("repo_commit")
run.effect_url = effects[0].get("repo_url")
# Extract infrastructure info
run.infrastructure = result.get("infrastructure")
# Cache the output
output_path = Path(result.get("output", {}).get("local_path", ""))
if output_path.exists():
cache_file(output_path)
else:
run.status = "failed"
run.error = str(task.result)
# Save updated status
save_run(run)
return run
@app.get("/runs")
async def list_runs():
"""List all runs."""
return list_all_runs()
@app.get("/cache/{content_hash}")
async def get_cached(content_hash: str):
"""Get cached content by hash."""
cache_path = CACHE_DIR / content_hash
if not cache_path.exists():
raise HTTPException(404, f"Content {content_hash} not in cache")
return FileResponse(cache_path)
@app.get("/ui/cache/{content_hash}", response_class=HTMLResponse)
async def ui_cache_view(content_hash: str, request: Request):
"""View cached content with appropriate display."""
current_user = get_user_from_cookie(request)
if not current_user:
return HTMLResponse(f'''
<!DOCTYPE html>
<html>
<head><title>Login Required | Art DAG L1</title><style>{UI_CSS}</style></head>
<body>
<h1><a href="/ui" style="color: inherit;">Art DAG L1 Server</a></h1>
<div class="run"><p><a href="/ui/login">Login</a> to view cached content.</p></div>
</body>
</html>
''', status_code=401)
# Check user has access to this file
user_hashes = get_user_cache_hashes(current_user)
if content_hash not in user_hashes:
return HTMLResponse('<h1>Access denied</h1>', status_code=403)
cache_path = CACHE_DIR / content_hash
if not cache_path.exists():
return HTMLResponse(f"""
<!DOCTYPE html>
<html>
<head><title>Not Found | Art DAG L1</title><style>{UI_CSS}</style></head>
<body>
<h1><a href="/ui" style="color: inherit;">Art DAG L1 Server</a></h1>
<a href="/ui" class="back-btn">&larr; Back to runs</a>
<div class="run"><p>Content not found: {content_hash}</p></div>
</body>
</html>
""", status_code=404)
media_type = detect_media_type(cache_path)
file_size = cache_path.stat().st_size
size_str = f"{file_size:,} bytes"
if file_size > 1024*1024:
size_str = f"{file_size/(1024*1024):.1f} MB"
elif file_size > 1024:
size_str = f"{file_size/1024:.1f} KB"
html = f"""
<!DOCTYPE html>
<html>
<head>
<title>{content_hash[:16]}... | Art DAG L1</title>
<style>{UI_CSS}</style>
</head>
<body>
<h1><a href="/ui" style="color: inherit;">Art DAG L1 Server</a></h1>
<a href="/ui" class="back-btn">&larr; Back to runs</a>
<div class="run">
<div class="run-header">
<div>
<span class="run-recipe">{media_type.capitalize()}</span>
<span class="run-id">{content_hash[:32]}...</span>
</div>
<a href="/cache/{content_hash}" download class="status completed" style="text-decoration:none;">Download</a>
</div>
<div class="media-container" style="margin: 20px 0;">
"""
if media_type == "video":
html += f'<video src="/cache/{content_hash}" controls autoplay muted loop style="max-width:100%;max-height:500px;"></video>'
elif media_type == "image":
html += f'<img src="/cache/{content_hash}" alt="{content_hash}" style="max-width:100%;max-height:500px;">'
else:
html += f'<p>Unknown file type. <a href="/cache/{content_hash}" download>Download file</a></p>'
html += f"""
</div>
<div class="provenance">
<h2>Details</h2>
<div class="prov-item">
<div class="prov-label">Content Hash (SHA3-256)</div>
<div class="prov-value">{content_hash}</div>
</div>
<div class="prov-item">
<div class="prov-label">Type</div>
<div class="prov-value">{media_type}</div>
</div>
<div class="prov-item">
<div class="prov-label">Size</div>
<div class="prov-value">{size_str}</div>
</div>
<div class="prov-item">
<div class="prov-label">Raw URL</div>
<div class="prov-value"><a href="/cache/{content_hash}">/cache/{content_hash}</a></div>
</div>
</div>
</div>
</body>
</html>
"""
return html
@app.get("/cache")
async def list_cache():
"""List cached content hashes."""
return [f.name for f in CACHE_DIR.iterdir() if f.is_file()]
# Known assets (bootstrap data)
KNOWN_ASSETS = {
"cat": "33268b6e167deaf018cc538de12dbe562612b33e89a749391cef855b320a269b",
}
@app.get("/assets")
async def list_assets():
"""List known assets."""
return KNOWN_ASSETS
@app.post("/cache/import")
async def import_to_cache(path: str):
"""Import a local file to cache."""
source = Path(path)
if not source.exists():
raise HTTPException(404, f"File not found: {path}")
content_hash = cache_file(source)
return {"content_hash": content_hash, "cached": True}
def save_cache_meta(content_hash: str, uploader: str, filename: str = None):
"""Save metadata for a cached file."""
meta_path = CACHE_DIR / f"{content_hash}.meta.json"
meta = {
"uploader": uploader,
"uploaded_at": datetime.now(timezone.utc).isoformat(),
"filename": filename
}
# Don't overwrite existing metadata (preserve original uploader)
if not meta_path.exists():
with open(meta_path, "w") as f:
json.dump(meta, f)
def load_cache_meta(content_hash: str) -> dict:
"""Load metadata for a cached file."""
meta_path = CACHE_DIR / f"{content_hash}.meta.json"
if meta_path.exists():
with open(meta_path) as f:
return json.load(f)
return {}
def get_user_cache_hashes(username: str) -> set:
"""Get all cache hashes owned by or associated with a user."""
actor_id = f"@{username}@{L2_DOMAIN}"
hashes = set()
# Files uploaded by user
if CACHE_DIR.exists():
for f in CACHE_DIR.iterdir():
if f.name.endswith('.meta.json'):
meta = load_cache_meta(f.name.replace('.meta.json', ''))
if meta.get("uploader") in (username, actor_id):
hashes.add(f.name.replace('.meta.json', ''))
# Files from user's runs (inputs and outputs)
for run in list_all_runs():
if run.username in (username, actor_id):
hashes.update(run.inputs)
if run.output_hash:
hashes.add(run.output_hash)
return hashes
@app.post("/cache/upload")
async def upload_to_cache(file: UploadFile = File(...), username: str = Depends(get_required_user)):
"""Upload a file to cache. Requires authentication."""
# Write to temp file first
import tempfile
with tempfile.NamedTemporaryFile(delete=False) as tmp:
content = await file.read()
tmp.write(content)
tmp_path = Path(tmp.name)
# Hash and move to cache
content_hash = file_hash(tmp_path)
cache_path = CACHE_DIR / content_hash
if not cache_path.exists():
import shutil
shutil.move(str(tmp_path), cache_path)
else:
tmp_path.unlink()
# Save uploader metadata
actor_id = f"@{username}@{L2_DOMAIN}"
save_cache_meta(content_hash, actor_id, file.filename)
return {"content_hash": content_hash, "filename": file.filename, "size": len(content)}
def detect_media_type(cache_path: Path) -> str:
"""Detect if file is image or video based on magic bytes."""
with open(cache_path, "rb") as f:
header = f.read(32)
# Video signatures
if header[:4] == b'\x1a\x45\xdf\xa3': # WebM/MKV
return "video"
if header[4:8] == b'ftyp': # MP4/MOV
return "video"
if header[:4] == b'RIFF' and header[8:12] == b'AVI ': # AVI
return "video"
# Image signatures
if header[:8] == b'\x89PNG\r\n\x1a\n': # PNG
return "image"
if header[:2] == b'\xff\xd8': # JPEG
return "image"
if header[:6] in (b'GIF87a', b'GIF89a'): # GIF
return "image"
if header[:4] == b'RIFF' and header[8:12] == b'WEBP': # WebP
return "image"
return "unknown"
def get_user_from_cookie(request) -> Optional[str]:
"""Get username from auth cookie."""
token = request.cookies.get("auth_token")
if not token:
return None
return verify_token_with_l2(token)
UI_CSS = """
* { box-sizing: border-box; }
body {
font-family: system-ui, -apple-system, sans-serif;
margin: 0; padding: 24px;
background: #111; color: #eee;
font-size: 16px;
}
h1 { margin: 0 0 24px 0; color: #fff; font-size: 28px; }
h2 { color: #ccc; margin: 24px 0 12px 0; font-size: 20px; }
a { color: #60a5fa; text-decoration: none; }
a:hover { color: #93c5fd; text-decoration: underline; }
.runs { display: flex; flex-direction: column; gap: 16px; max-width: 900px; }
.run {
background: #222; border-radius: 8px; padding: 20px;
border: 1px solid #333;
}
.run-link { display: block; text-decoration: none; color: inherit; max-width: 900px; }
.run-link:hover .run { border-color: #555; background: #282828; }
.run-header { display: flex; justify-content: space-between; align-items: center; margin-bottom: 14px; }
.run-id { font-family: monospace; font-size: 14px; color: #888; margin-left: 12px; }
.run-recipe { font-weight: bold; font-size: 22px; color: #fff; }
.status {
padding: 6px 14px; border-radius: 12px; font-size: 14px; font-weight: 500;
}
.status.completed { background: #1a4d1a; color: #4ade80; }
.status.running { background: #4d4d1a; color: #facc15; }
.status.failed { background: #4d1a1a; color: #f87171; }
.status.pending { background: #333; color: #888; }
.media-row { display: flex; gap: 20px; margin-top: 16px; flex-wrap: wrap; justify-content: flex-start; }
.media-box { flex: 0 1 auto; min-width: 200px; max-width: 400px; }
.media-box label { font-size: 13px; color: #888; display: block; margin-bottom: 6px; }
.media-container { }
.media-container img, .media-container video {
max-width: 100%; max-height: 300px; border-radius: 4px;
}
@media (max-width: 600px) {
.media-row { flex-direction: column; }
.media-box { min-width: 100%; max-width: 100%; }
}
.hash { font-family: monospace; font-size: 13px; color: #666; }
.hash a { color: #888; }
.hash a:hover { color: #60a5fa; }
.info { font-size: 15px; color: #aaa; }
.refresh-btn, .back-btn {
background: #333; color: #fff; border: none; padding: 10px 20px;
border-radius: 4px; cursor: pointer; margin-bottom: 20px;
text-decoration: none; display: inline-block; font-size: 15px;
}
.refresh-btn:hover, .back-btn:hover { background: #444; }
.no-runs { color: #666; font-style: italic; font-size: 16px; }
.provenance { background: #1a1a1a; border-radius: 8px; padding: 20px; margin-top: 20px; max-width: 700px; }
.prov-item { margin: 12px 0; }
.prov-label { color: #888; font-size: 14px; margin-bottom: 4px; }
.prov-value { font-family: monospace; font-size: 15px; word-break: break-all; }
code { background: #222; padding: 2px 6px; border-radius: 4px; }
"""
def render_ui_html(username: Optional[str] = None, tab: str = "runs") -> str:
"""Render main UI HTML with optional user context."""
user_info = ""
if username:
user_info = f'''
<div style="float:right;font-size:14px;color:#888;">
Logged in as <strong>{username}</strong>
<a href="/ui/logout" style="margin-left:12px;">Logout</a>
</div>
'''
else:
user_info = '''
<div style="float:right;font-size:14px;">
<a href="/ui/login">Login</a>
</div>
'''
runs_active = "active" if tab == "runs" else ""
cache_active = "active" if tab == "cache" else ""
runs_content = ""
cache_content = ""
if tab == "runs":
runs_content = '''
<div id="content" hx-get="/ui/runs" hx-trigger="load" hx-swap="innerHTML">
Loading...
</div>
'''
else:
cache_content = '''
<div id="content" hx-get="/ui/cache-list" hx-trigger="load" hx-swap="innerHTML">
Loading...
</div>
'''
return f"""
<!DOCTYPE html>
<html>
<head>
<title>Art DAG L1 Server</title>
<script src="https://unpkg.com/htmx.org@1.9.10"></script>
<style>{UI_CSS}
.nav-tabs {{ margin-bottom: 20px; }}
.nav-tabs a {{
padding: 10px 20px;
margin-right: 8px;
background: #333;
border-radius: 4px 4px 0 0;
color: #888;
}}
.nav-tabs a.active {{ background: #1a1a1a; color: #fff; }}
.nav-tabs a:hover {{ color: #fff; }}
</style>
</head>
<body>
{user_info}
<h1><a href="/ui" style="color: inherit;">Art DAG L1 Server</a></h1>
<div class="nav-tabs">
<a href="/ui" class="{runs_active}">Runs</a>
<a href="/ui?tab=cache" class="{cache_active}">Cache</a>
</div>
{runs_content}
{cache_content}
</body>
</html>
"""
UI_LOGIN_HTML = """
<!DOCTYPE html>
<html>
<head>
<title>Login | Art DAG L1 Server</title>
<style>""" + UI_CSS + """
.login-form { max-width: 400px; }
.login-form input {
width: 100%; padding: 12px; margin: 8px 0;
background: #222; border: 1px solid #333; border-radius: 4px;
color: #eee; font-size: 15px;
}
.login-form button {
width: 100%; padding: 12px; margin-top: 12px;
background: #2563eb; color: #fff; border: none; border-radius: 4px;
font-size: 15px; cursor: pointer;
}
.login-form button:hover { background: #1d4ed8; }
.error { color: #f87171; margin-top: 12px; }
.tabs { margin-bottom: 20px; }
.tabs a { padding: 8px 16px; margin-right: 8px; background: #333; border-radius: 4px; }
.tabs a.active { background: #2563eb; }
</style>
</head>
<body>
<h1><a href="/ui" style="color: inherit;">Art DAG L1 Server</a></h1>
<a href="/ui" class="back-btn">&larr; Back</a>
<div class="run login-form">
<div class="tabs">
<a href="/ui/login" class="active">Login</a>
<a href="/ui/register">Register</a>
</div>
<form method="POST" action="/ui/login">
<input type="text" name="username" placeholder="Username" required>
<input type="password" name="password" placeholder="Password" required>
<button type="submit">Login</button>
</form>
</div>
</body>
</html>
"""
UI_REGISTER_HTML = """
<!DOCTYPE html>
<html>
<head>
<title>Register | Art DAG L1 Server</title>
<style>""" + UI_CSS + """
.login-form { max-width: 400px; }
.login-form input {
width: 100%; padding: 12px; margin: 8px 0;
background: #222; border: 1px solid #333; border-radius: 4px;
color: #eee; font-size: 15px;
}
.login-form button {
width: 100%; padding: 12px; margin-top: 12px;
background: #2563eb; color: #fff; border: none; border-radius: 4px;
font-size: 15px; cursor: pointer;
}
.login-form button:hover { background: #1d4ed8; }
.error { color: #f87171; margin-top: 12px; }
.tabs { margin-bottom: 20px; }
.tabs a { padding: 8px 16px; margin-right: 8px; background: #333; border-radius: 4px; }
.tabs a.active { background: #2563eb; }
</style>
</head>
<body>
<h1><a href="/ui" style="color: inherit;">Art DAG L1 Server</a></h1>
<a href="/ui" class="back-btn">&larr; Back</a>
<div class="run login-form">
<div class="tabs">
<a href="/ui/login">Login</a>
<a href="/ui/register" class="active">Register</a>
</div>
<form method="POST" action="/ui/register">
<input type="text" name="username" placeholder="Username" required>
<input type="password" name="password" placeholder="Password" required>
<input type="email" name="email" placeholder="Email (optional)">
<button type="submit">Register</button>
</form>
</div>
</body>
</html>
"""
@app.get("/ui", response_class=HTMLResponse)
async def ui_index(request: Request, tab: str = "runs"):
"""Web UI for viewing runs and cache."""
username = get_user_from_cookie(request)
return render_ui_html(username, tab)
@app.get("/ui/login", response_class=HTMLResponse)
async def ui_login_page():
"""Login page."""
return UI_LOGIN_HTML
@app.post("/ui/login")
async def ui_login(username: str = Form(...), password: str = Form(...)):
"""Process login form."""
try:
resp = http_requests.post(
f"{L2_SERVER}/auth/login",
json={"username": username, "password": password},
timeout=5
)
if resp.status_code == 200:
token = resp.json().get("access_token")
response = RedirectResponse(url="/ui", status_code=303)
response.set_cookie("auth_token", token, httponly=True, max_age=30*24*60*60)
return response
except Exception:
pass
return HTMLResponse(UI_LOGIN_HTML.replace(
'</form>',
'<p class="error">Invalid username or password</p></form>'
))
@app.get("/ui/register", response_class=HTMLResponse)
async def ui_register_page():
"""Register page."""
return UI_REGISTER_HTML
@app.post("/ui/register")
async def ui_register(
username: str = Form(...),
password: str = Form(...),
email: str = Form(None)
):
"""Process registration form."""
try:
resp = http_requests.post(
f"{L2_SERVER}/auth/register",
json={"username": username, "password": password, "email": email},
timeout=5
)
if resp.status_code == 200:
token = resp.json().get("access_token")
response = RedirectResponse(url="/ui", status_code=303)
response.set_cookie("auth_token", token, httponly=True, max_age=30*24*60*60)
return response
elif resp.status_code == 400:
error = resp.json().get("detail", "Registration failed")
return HTMLResponse(UI_REGISTER_HTML.replace(
'</form>',
f'<p class="error">{error}</p></form>'
))
except Exception as e:
return HTMLResponse(UI_REGISTER_HTML.replace(
'</form>',
f'<p class="error">Registration failed: {e}</p></form>'
))
@app.get("/ui/logout")
async def ui_logout():
"""Logout - clear cookie."""
response = RedirectResponse(url="/ui", status_code=303)
response.delete_cookie("auth_token")
return response
@app.get("/ui/runs", response_class=HTMLResponse)
async def ui_runs(request: Request):
"""HTMX partial: list of runs."""
current_user = get_user_from_cookie(request)
runs = list_all_runs()
# Require login to see runs
if not current_user:
return '<p class="no-runs"><a href="/ui/login">Login</a> to see your runs.</p>'
# Filter runs by user - match both plain username and ActivityPub format (@user@domain)
actor_id = f"@{current_user}@{L2_DOMAIN}"
runs = [r for r in runs if r.username in (current_user, actor_id)]
if not runs:
return '<p class="no-runs">You have no runs yet. Use the CLI to start a run.</p>'
html_parts = ['<div class="runs">']
for run in runs[:20]: # Limit to 20 most recent
status_class = run.status
effect_url = f"https://git.rose-ash.com/art-dag/effects/src/branch/main/{run.recipe}"
owner_badge = f'<span style="font-size:11px;color:#666;margin-left:8px;">by {run.username or "anonymous"}</span>' if not current_user else ''
html_parts.append(f'''
<a href="/ui/detail/{run.run_id}" class="run-link">
<div class="run" hx-get="/ui/run/{run.run_id}" hx-trigger="every 2s[this.querySelector('.status.running')]" hx-swap="outerHTML">
<div class="run-header">
<div>
<span class="run-recipe">{run.recipe}</span>
<span class="run-id">{run.run_id}</span>{owner_badge}
</div>
<span class="status {status_class}">{run.status}</span>
</div>
<div class="info">
Created: {run.created_at[:19].replace('T', ' ')}
</div>
''')
# Show input and output side by side
has_input = run.inputs and (CACHE_DIR / run.inputs[0]).exists()
has_output = run.status == "completed" and run.output_hash and (CACHE_DIR / run.output_hash).exists()
if has_input or has_output:
html_parts.append('<div class="media-row">')
# Input box
if has_input:
input_hash = run.inputs[0]
input_media_type = detect_media_type(CACHE_DIR / input_hash)
html_parts.append(f'''
<div class="media-box">
<label>Input: {input_hash[:24]}...</label>
<div class="media-container">
''')
if input_media_type == "video":
html_parts.append(f'<video src="/cache/{input_hash}" controls muted loop></video>')
elif input_media_type == "image":
html_parts.append(f'<img src="/cache/{input_hash}" alt="input">')
html_parts.append('</div></div>')
# Output box
if has_output:
output_hash = run.output_hash
output_media_type = detect_media_type(CACHE_DIR / output_hash)
html_parts.append(f'''
<div class="media-box">
<label>Output: {output_hash[:24]}...</label>
<div class="media-container">
''')
if output_media_type == "video":
html_parts.append(f'<video src="/cache/{output_hash}" controls autoplay muted loop></video>')
elif output_media_type == "image":
html_parts.append(f'<img src="/cache/{output_hash}" alt="output">')
html_parts.append('</div></div>')
html_parts.append('</div>')
# Show error if failed
if run.status == "failed" and run.error:
html_parts.append(f'<div class="info" style="color: #f87171;">Error: {run.error}</div>')
html_parts.append('</div></a>')
html_parts.append('</div>')
return '\n'.join(html_parts)
@app.get("/ui/cache-list", response_class=HTMLResponse)
async def ui_cache_list(request: Request):
"""HTMX partial: list of cached items."""
current_user = get_user_from_cookie(request)
# Require login to see cache
if not current_user:
return '<p class="no-runs"><a href="/ui/login">Login</a> to see cached content.</p>'
# Get hashes owned by/associated with this user
user_hashes = get_user_cache_hashes(current_user)
# Get cache items that belong to the user
cache_items = []
if CACHE_DIR.exists():
for f in CACHE_DIR.iterdir():
if f.is_file() and not f.name.endswith('.provenance.json') and not f.name.endswith('.meta.json'):
if f.name in user_hashes:
stat = f.stat()
cache_items.append({
"hash": f.name,
"size": stat.st_size,
"mtime": stat.st_mtime
})
# Sort by modification time (newest first)
cache_items.sort(key=lambda x: x["mtime"], reverse=True)
if not cache_items:
return '<p class="no-runs">No cached files. Upload files or run effects to see them here.</p>'
html_parts = ['<div class="runs">']
for item in cache_items[:50]: # Limit to 50 items
content_hash = item["hash"]
cache_path = CACHE_DIR / content_hash
media_type = detect_media_type(cache_path)
# Format size
size = item["size"]
if size > 1024*1024:
size_str = f"{size/(1024*1024):.1f} MB"
elif size > 1024:
size_str = f"{size/1024:.1f} KB"
else:
size_str = f"{size} bytes"
html_parts.append(f'''
<a href="/ui/cache/{content_hash}" class="run-link">
<div class="run">
<div class="run-header">
<div>
<span class="run-recipe">{media_type}</span>
<span class="run-id">{content_hash}</span>
</div>
<span class="status completed">{size_str}</span>
</div>
<div class="media-row">
<div class="media-box" style="max-width: 300px;">
<div class="media-container">
''')
if media_type == "video":
html_parts.append(f'<video src="/cache/{content_hash}" controls muted loop style="max-height:150px;"></video>')
elif media_type == "image":
html_parts.append(f'<img src="/cache/{content_hash}" alt="{content_hash[:16]}" style="max-height:150px;">')
else:
html_parts.append(f'<p>Unknown file type</p>')
html_parts.append('''
</div>
</div>
</div>
</div>
</a>
''')
html_parts.append('</div>')
return '\n'.join(html_parts)
@app.get("/ui/detail/{run_id}", response_class=HTMLResponse)
async def ui_detail_page(run_id: str, request: Request):
"""Full detail page for a run."""
current_user = get_user_from_cookie(request)
if not current_user:
return HTMLResponse(f'''
<!DOCTYPE html>
<html>
<head><title>Login Required | Art DAG L1</title><style>{UI_CSS}</style></head>
<body>
<h1><a href="/ui" style="color: inherit;">Art DAG L1 Server</a></h1>
<div class="run"><p><a href="/ui/login">Login</a> to view run details.</p></div>
</body>
</html>
''', status_code=401)
run = load_run(run_id)
if not run:
return HTMLResponse('<h1>Run not found</h1>', status_code=404)
# Check user owns this run
actor_id = f"@{current_user}@{L2_DOMAIN}"
if run.username not in (current_user, actor_id):
return HTMLResponse('<h1>Access denied</h1>', status_code=403)
# Check Celery task status if running
if run.status == "running" and run.celery_task_id:
task = celery_app.AsyncResult(run.celery_task_id)
if task.ready():
if task.successful():
result = task.result
run.status = "completed"
run.completed_at = datetime.now(timezone.utc).isoformat()
run.output_hash = result.get("output", {}).get("content_hash")
# Extract effects info from provenance
effects = result.get("effects", [])
if effects:
run.effects_commit = effects[0].get("repo_commit")
run.effect_url = effects[0].get("repo_url")
# Extract infrastructure info
run.infrastructure = result.get("infrastructure")
output_path = Path(result.get("output", {}).get("local_path", ""))
if output_path.exists():
cache_file(output_path)
else:
run.status = "failed"
run.error = str(task.result)
save_run(run)
# Use stored effect URL or build fallback
if run.effect_url:
effect_url = run.effect_url
elif run.effects_commit and run.effects_commit != "unknown":
effect_url = f"https://git.rose-ash.com/art-dag/effects/src/commit/{run.effects_commit}/{run.recipe}"
else:
effect_url = f"https://git.rose-ash.com/art-dag/effects/src/branch/main/{run.recipe}"
status_class = run.status
html = f"""
<!DOCTYPE html>
<html>
<head>
<title>{run.recipe} - {run.run_id[:8]} | Art DAG L1</title>
<style>{UI_CSS}</style>
</head>
<body>
<h1><a href="/ui" style="color: inherit;">Art DAG L1 Server</a></h1>
<a href="/ui" class="back-btn">&larr; Back to runs</a>
<div class="run">
<div class="run-header">
<div>
<a href="{effect_url}" target="_blank" class="run-recipe">{run.recipe}</a>
<span class="run-id">{run.run_id}</span>
</div>
<span class="status {status_class}">{run.status}</span>
</div>
"""
# Media row
has_input = run.inputs and (CACHE_DIR / run.inputs[0]).exists()
has_output = run.status == "completed" and run.output_hash and (CACHE_DIR / run.output_hash).exists()
if has_input or has_output:
html += '<div class="media-row">'
if has_input:
input_hash = run.inputs[0]
input_media_type = detect_media_type(CACHE_DIR / input_hash)
html += f'''
<div class="media-box">
<label>Input: <a href="/ui/cache/{input_hash}">{input_hash[:24]}...</a></label>
<div class="media-container">
'''
if input_media_type == "video":
html += f'<video src="/cache/{input_hash}" controls muted loop></video>'
elif input_media_type == "image":
html += f'<img src="/cache/{input_hash}" alt="input">'
html += '</div></div>'
if has_output:
output_hash = run.output_hash
output_media_type = detect_media_type(CACHE_DIR / output_hash)
html += f'''
<div class="media-box">
<label>Output: <a href="/ui/cache/{output_hash}">{output_hash[:24]}...</a></label>
<div class="media-container">
'''
if output_media_type == "video":
html += f'<video src="/cache/{output_hash}" controls autoplay muted loop></video>'
elif output_media_type == "image":
html += f'<img src="/cache/{output_hash}" alt="output">'
html += '</div></div>'
html += '</div>'
# Provenance section
html += f'''
<div class="provenance">
<h2>Provenance</h2>
<div class="prov-item">
<div class="prov-label">Owner</div>
<div class="prov-value">{run.username or "anonymous"}</div>
</div>
<div class="prov-item">
<div class="prov-label">Effect</div>
<div class="prov-value"><a href="{effect_url}" target="_blank">{run.recipe}</a></div>
</div>
<div class="prov-item">
<div class="prov-label">Effects Commit</div>
<div class="prov-value">{run.effects_commit or "N/A"}</div>
</div>
<div class="prov-item">
<div class="prov-label">Input(s)</div>
<div class="prov-value">
'''
for inp in run.inputs:
html += f'<a href="/ui/cache/{inp}">{inp}</a><br>'
html += f'''
</div>
</div>
'''
if run.output_hash:
html += f'''
<div class="prov-item">
<div class="prov-label">Output</div>
<div class="prov-value"><a href="/ui/cache/{run.output_hash}">{run.output_hash}</a></div>
</div>
'''
# Infrastructure section
if run.infrastructure:
software = run.infrastructure.get("software", {})
hardware = run.infrastructure.get("hardware", {})
html += f'''
<div class="prov-item">
<div class="prov-label">Infrastructure</div>
<div class="prov-value">
Software: {software.get("name", "unknown")} ({software.get("content_hash", "unknown")[:16]}...)<br>
Hardware: {hardware.get("name", "unknown")} ({hardware.get("content_hash", "unknown")[:16]}...)
</div>
</div>
'''
html += f'''
<div class="prov-item">
<div class="prov-label">Run ID</div>
<div class="prov-value">{run.run_id}</div>
</div>
<div class="prov-item">
<div class="prov-label">Created</div>
<div class="prov-value">{run.created_at}</div>
</div>
'''
if run.completed_at:
html += f'''
<div class="prov-item">
<div class="prov-label">Completed</div>
<div class="prov-value">{run.completed_at}</div>
</div>
'''
if run.error:
html += f'''
<div class="prov-item">
<div class="prov-label">Error</div>
<div class="prov-value" style="color: #f87171;">{run.error}</div>
</div>
'''
# Raw JSON provenance
provenance_json = json.dumps({
"run_id": run.run_id,
"status": run.status,
"recipe": run.recipe,
"effects_commit": run.effects_commit,
"effect_url": run.effect_url or effect_url,
"inputs": run.inputs,
"output_hash": run.output_hash,
"output_name": run.output_name,
"created_at": run.created_at,
"completed_at": run.completed_at,
"username": run.username,
"infrastructure": run.infrastructure,
"error": run.error
}, indent=2)
html += f'''
<h2>Raw JSON</h2>
<pre style="background:#0a0a0a;padding:16px;border-radius:8px;overflow-x:auto;font-size:13px;"><code>{provenance_json}</code></pre>
</div>
</div>
</body>
</html>
'''
return html
@app.get("/ui/run/{run_id}", response_class=HTMLResponse)
async def ui_run_partial(run_id: str):
"""HTMX partial: single run (for polling updates)."""
run = load_run(run_id)
if not run:
return '<div class="run">Run not found</div>'
# Check Celery task status if running
if run.status == "running" and run.celery_task_id:
task = celery_app.AsyncResult(run.celery_task_id)
if task.ready():
if task.successful():
result = task.result
run.status = "completed"
run.completed_at = datetime.now(timezone.utc).isoformat()
run.output_hash = result.get("output", {}).get("content_hash")
# Extract effects info from provenance
effects = result.get("effects", [])
if effects:
run.effects_commit = effects[0].get("repo_commit")
run.effect_url = effects[0].get("repo_url")
# Extract infrastructure info
run.infrastructure = result.get("infrastructure")
output_path = Path(result.get("output", {}).get("local_path", ""))
if output_path.exists():
cache_file(output_path)
else:
run.status = "failed"
run.error = str(task.result)
save_run(run)
status_class = run.status
poll_attr = 'hx-get="/ui/run/{}" hx-trigger="every 2s" hx-swap="outerHTML"'.format(run_id) if run.status == "running" else ""
html = f'''
<div class="run" {poll_attr}>
<div class="run-header">
<div>
<span class="run-recipe">{run.recipe}</span>
<span class="run-id">{run.run_id}</span>
</div>
<span class="status {status_class}">{run.status}</span>
</div>
<div class="info">
Created: {run.created_at[:19].replace('T', ' ')}
</div>
'''
# Show input and output side by side
has_input = run.inputs and (CACHE_DIR / run.inputs[0]).exists()
has_output = run.status == "completed" and run.output_hash and (CACHE_DIR / run.output_hash).exists()
if has_input or has_output:
html += '<div class="media-row">'
if has_input:
input_hash = run.inputs[0]
input_media_type = detect_media_type(CACHE_DIR / input_hash)
html += f'<div class="media-box"><label>Input: <a href="/ui/cache/{input_hash}">{input_hash[:24]}...</a></label><div class="media-container">'
if input_media_type == "video":
html += f'<video src="/cache/{input_hash}" controls muted loop></video>'
elif input_media_type == "image":
html += f'<img src="/cache/{input_hash}" alt="input">'
html += '</div></div>'
if has_output:
output_hash = run.output_hash
output_media_type = detect_media_type(CACHE_DIR / output_hash)
html += f'<div class="media-box"><label>Output: <a href="/ui/cache/{output_hash}">{output_hash[:24]}...</a></label><div class="media-container">'
if output_media_type == "video":
html += f'<video src="/cache/{output_hash}" controls autoplay muted loop></video>'
elif output_media_type == "image":
html += f'<img src="/cache/{output_hash}" alt="output">'
html += '</div></div>'
html += '</div>'
if run.status == "failed" and run.error:
html += f'<div class="info" style="color: #f87171;">Error: {run.error}</div>'
html += '</div>'
return html
if __name__ == "__main__":
import uvicorn
uvicorn.run(app, host="0.0.0.0", port=8100)