Files
celery/server.py
gilesb a97c6309d5 Fix deletion rules: runs deletable, cache items protected
- Run deletion: Handle legacy runs without activity records by
  checking L2 shared status directly (instead of failing)
- Cache deletion: Check Redis runs in addition to activity store
  to prevent deleting inputs/outputs that belong to runs
- Add find_runs_using_content() helper to check if content_hash
  is used as input or output of any run

This fixes the inverted deletion logic where runs couldn't be
deleted but their cache items could.

Co-Authored-By: Claude Opus 4.5 <noreply@anthropic.com>
2026-01-08 01:29:03 +00:00

2827 lines
115 KiB
Python

#!/usr/bin/env python3
"""
Art DAG L1 Server
Manages rendering runs and provides access to the cache.
- POST /runs - start a run (recipe + inputs)
- GET /runs/{run_id} - get run status/result
- GET /cache/{content_hash} - get cached content
"""
import hashlib
import json
import os
import uuid
from datetime import datetime, timezone
from pathlib import Path
from typing import Optional
from fastapi import FastAPI, HTTPException, UploadFile, File, Depends, Form, Request
from fastapi.responses import FileResponse, HTMLResponse, RedirectResponse
from fastapi.security import HTTPBearer, HTTPAuthorizationCredentials
from pydantic import BaseModel
import redis
import requests as http_requests
from urllib.parse import urlparse
from celery_app import app as celery_app
from tasks import render_effect
from cache_manager import L1CacheManager, get_cache_manager
# L2 server for auth verification
L2_SERVER = os.environ.get("L2_SERVER", "http://localhost:8200")
L2_DOMAIN = os.environ.get("L2_DOMAIN", "artdag.rose-ash.com")
L1_PUBLIC_URL = os.environ.get("L1_PUBLIC_URL", "http://localhost:8100")
# Cache directory (use /data/cache in Docker, ~/.artdag/cache locally)
CACHE_DIR = Path(os.environ.get("CACHE_DIR", str(Path.home() / ".artdag" / "cache")))
CACHE_DIR.mkdir(parents=True, exist_ok=True)
# Initialize L1 cache manager with artdag integration
cache_manager = L1CacheManager(cache_dir=CACHE_DIR, l2_server=L2_SERVER)
# Redis for persistent run storage
REDIS_URL = os.environ.get('REDIS_URL', 'redis://localhost:6379/5')
parsed = urlparse(REDIS_URL)
redis_client = redis.Redis(
host=parsed.hostname or 'localhost',
port=parsed.port or 6379,
db=int(parsed.path.lstrip('/') or 0)
)
RUNS_KEY_PREFIX = "artdag:run:"
def save_run(run: "RunStatus"):
"""Save run to Redis."""
redis_client.set(f"{RUNS_KEY_PREFIX}{run.run_id}", run.model_dump_json())
def load_run(run_id: str) -> Optional["RunStatus"]:
"""Load run from Redis."""
data = redis_client.get(f"{RUNS_KEY_PREFIX}{run_id}")
if data:
return RunStatus.model_validate_json(data)
return None
def list_all_runs() -> list["RunStatus"]:
"""List all runs from Redis."""
runs = []
for key in redis_client.scan_iter(f"{RUNS_KEY_PREFIX}*"):
data = redis_client.get(key)
if data:
runs.append(RunStatus.model_validate_json(data))
return sorted(runs, key=lambda r: r.created_at, reverse=True)
def find_runs_using_content(content_hash: str) -> list[tuple["RunStatus", str]]:
"""Find all runs that use a content_hash as input or output.
Returns list of (run, role) tuples where role is 'input' or 'output'.
"""
results = []
for run in list_all_runs():
if run.inputs and content_hash in run.inputs:
results.append((run, "input"))
if run.output_hash == content_hash:
results.append((run, "output"))
return results
app = FastAPI(
title="Art DAG L1 Server",
description="Distributed rendering server for Art DAG",
version="0.1.0"
)
class RunRequest(BaseModel):
"""Request to start a run."""
recipe: str # Recipe name (e.g., "dog", "identity")
inputs: list[str] # List of content hashes
output_name: Optional[str] = None
class RunStatus(BaseModel):
"""Status of a run."""
run_id: str
status: str # pending, running, completed, failed
recipe: str
inputs: list[str]
output_name: str
created_at: str
completed_at: Optional[str] = None
output_hash: Optional[str] = None
error: Optional[str] = None
celery_task_id: Optional[str] = None
effects_commit: Optional[str] = None
effect_url: Optional[str] = None # URL to effect source code
username: Optional[str] = None # Owner of the run (ActivityPub actor ID)
infrastructure: Optional[dict] = None # Hardware/software used for rendering
# ============ Auth ============
security = HTTPBearer(auto_error=False)
def verify_token_with_l2(token: str) -> Optional[str]:
"""Verify token with L2 server, return username if valid."""
try:
resp = http_requests.post(
f"{L2_SERVER}/auth/verify",
headers={"Authorization": f"Bearer {token}"},
timeout=5
)
if resp.status_code == 200:
return resp.json().get("username")
except Exception:
pass
return None
async def get_optional_user(
credentials: HTTPAuthorizationCredentials = Depends(security)
) -> Optional[str]:
"""Get username if authenticated, None otherwise."""
if not credentials:
return None
return verify_token_with_l2(credentials.credentials)
async def get_required_user(
credentials: HTTPAuthorizationCredentials = Depends(security)
) -> str:
"""Get username, raise 401 if not authenticated."""
if not credentials:
raise HTTPException(401, "Not authenticated")
username = verify_token_with_l2(credentials.credentials)
if not username:
raise HTTPException(401, "Invalid token")
return username
def file_hash(path: Path) -> str:
"""Compute SHA3-256 hash of a file."""
hasher = hashlib.sha3_256()
with open(path, "rb") as f:
for chunk in iter(lambda: f.read(65536), b""):
hasher.update(chunk)
return hasher.hexdigest()
def cache_file(source: Path, node_type: str = "output") -> str:
"""
Copy file to cache using L1CacheManager, return content hash.
Uses artdag's Cache internally for proper tracking.
"""
cached = cache_manager.put(source, node_type=node_type)
return cached.content_hash
def get_cache_path(content_hash: str) -> Optional[Path]:
"""Get the path for a cached file by content_hash."""
return cache_manager.get_by_content_hash(content_hash)
@app.get("/api")
async def api_info():
"""Server info (JSON)."""
return {
"name": "Art DAG L1 Server",
"version": "0.1.0",
"cache_dir": str(CACHE_DIR),
"runs_count": len(list_all_runs())
}
HOME_HTML = """
<!DOCTYPE html>
<html class="dark">
<head>
<meta charset="UTF-8">
<meta name="viewport" content="width=device-width, initial-scale=1.0">
<title>Art DAG L1 Server</title>
<script src="https://cdn.tailwindcss.com"></script>
<script>
tailwind.config = {
darkMode: 'class',
theme: { extend: { colors: { dark: { 900: '#0a0a0a', 800: '#111', 700: '#1a1a1a', 600: '#222', 500: '#333' } } } }
}
</script>
</head>
<body class="bg-dark-900 text-gray-100 min-h-screen">
<div class="max-w-4xl mx-auto px-4 py-8 sm:px-6 lg:px-8">
<nav class="flex flex-wrap gap-3 mb-8 p-4 bg-dark-700 rounded-lg">
<a href="/runs" class="px-4 py-2 bg-dark-500 hover:bg-dark-600 rounded-md text-blue-400 hover:text-blue-300 font-medium transition-colors">Runs</a>
<a href="/cache" class="px-4 py-2 bg-dark-500 hover:bg-dark-600 rounded-md text-blue-400 hover:text-blue-300 font-medium transition-colors">Cache</a>
<a href="/docs" class="px-4 py-2 bg-dark-500 hover:bg-dark-600 rounded-md text-blue-400 hover:text-blue-300 font-medium transition-colors">API Docs</a>
</nav>
<h1 class="text-3xl font-bold text-white border-b border-dark-500 pb-4 mb-6">Art DAG L1 Server</h1>
<p class="text-gray-300 mb-8">L1 rendering server for the Art DAG system. Manages distributed rendering jobs via Celery workers.</p>
<h2 class="text-xl font-semibold text-gray-200 mt-8 mb-4">Dependencies</h2>
<ul class="list-disc list-inside space-y-2 text-gray-300 mb-8">
<li><strong class="text-white">artdag</strong> (GitHub): Core DAG execution engine</li>
<li><strong class="text-white">artdag-effects</strong> (rose-ash): Effect implementations</li>
<li><strong class="text-white">Redis</strong>: Message broker, result backend, and run persistence</li>
</ul>
<h2 class="text-xl font-semibold text-gray-200 mt-8 mb-4">API Endpoints</h2>
<div class="overflow-x-auto mb-8">
<table class="w-full text-sm">
<thead>
<tr class="bg-dark-600">
<th class="px-4 py-3 text-left border border-dark-500">Method</th>
<th class="px-4 py-3 text-left border border-dark-500">Path</th>
<th class="px-4 py-3 text-left border border-dark-500">Description</th>
</tr>
</thead>
<tbody class="divide-y divide-dark-500">
<tr class="bg-dark-800"><td class="px-4 py-2 border border-dark-500">GET</td><td class="px-4 py-2 border border-dark-500"><code class="bg-dark-600 px-2 py-0.5 rounded text-blue-300">/ui</code></td><td class="px-4 py-2 border border-dark-500">Web UI for viewing runs</td></tr>
<tr class="bg-dark-800"><td class="px-4 py-2 border border-dark-500">POST</td><td class="px-4 py-2 border border-dark-500"><code class="bg-dark-600 px-2 py-0.5 rounded text-blue-300">/runs</code></td><td class="px-4 py-2 border border-dark-500">Start a rendering run</td></tr>
<tr class="bg-dark-800"><td class="px-4 py-2 border border-dark-500">GET</td><td class="px-4 py-2 border border-dark-500"><code class="bg-dark-600 px-2 py-0.5 rounded text-blue-300">/runs</code></td><td class="px-4 py-2 border border-dark-500">List all runs</td></tr>
<tr class="bg-dark-800"><td class="px-4 py-2 border border-dark-500">GET</td><td class="px-4 py-2 border border-dark-500"><code class="bg-dark-600 px-2 py-0.5 rounded text-blue-300">/runs/{run_id}</code></td><td class="px-4 py-2 border border-dark-500">Get run status</td></tr>
<tr class="bg-dark-800"><td class="px-4 py-2 border border-dark-500">GET</td><td class="px-4 py-2 border border-dark-500"><code class="bg-dark-600 px-2 py-0.5 rounded text-blue-300">/cache</code></td><td class="px-4 py-2 border border-dark-500">List cached content hashes</td></tr>
<tr class="bg-dark-800"><td class="px-4 py-2 border border-dark-500">GET</td><td class="px-4 py-2 border border-dark-500"><code class="bg-dark-600 px-2 py-0.5 rounded text-blue-300">/cache/{hash}</code></td><td class="px-4 py-2 border border-dark-500">Download cached content</td></tr>
<tr class="bg-dark-800"><td class="px-4 py-2 border border-dark-500">POST</td><td class="px-4 py-2 border border-dark-500"><code class="bg-dark-600 px-2 py-0.5 rounded text-blue-300">/cache/upload</code></td><td class="px-4 py-2 border border-dark-500">Upload file to cache</td></tr>
<tr class="bg-dark-800"><td class="px-4 py-2 border border-dark-500">GET</td><td class="px-4 py-2 border border-dark-500"><code class="bg-dark-600 px-2 py-0.5 rounded text-blue-300">/assets</code></td><td class="px-4 py-2 border border-dark-500">List known assets</td></tr>
</tbody>
</table>
</div>
<h2 class="text-xl font-semibold text-gray-200 mt-8 mb-4">Start a Run</h2>
<pre class="bg-dark-700 p-4 rounded-lg overflow-x-auto border border-dark-500 mb-8"><code class="text-green-300">curl -X POST /runs \\
-H "Content-Type: application/json" \\
-d '{"recipe": "dog", "inputs": ["33268b6e..."]}'</code></pre>
<h2 class="text-xl font-semibold text-gray-200 mt-8 mb-4">Provenance</h2>
<p class="text-gray-300 mb-4">Every render produces a provenance record linking inputs, effects, and infrastructure:</p>
<pre class="bg-dark-700 p-4 rounded-lg overflow-x-auto border border-dark-500"><code class="text-green-300">{
"output": {"content_hash": "..."},
"inputs": [...],
"effects": [...],
"infrastructure": {...}
}</code></pre>
</div>
</body>
</html>
"""
@app.get("/", response_class=HTMLResponse)
async def root():
"""Home page."""
return HOME_HTML
@app.post("/runs", response_model=RunStatus)
async def create_run(request: RunRequest, username: str = Depends(get_required_user)):
"""Start a new rendering run. Requires authentication."""
run_id = str(uuid.uuid4())
# Generate output name if not provided
output_name = request.output_name or f"{request.recipe}-{run_id[:8]}"
# Format username as ActivityPub actor ID
actor_id = f"@{username}@{L2_DOMAIN}"
# Create run record
run = RunStatus(
run_id=run_id,
status="pending",
recipe=request.recipe,
inputs=request.inputs,
output_name=output_name,
created_at=datetime.now(timezone.utc).isoformat(),
username=actor_id
)
# Submit to Celery
# For now, we only support single-input recipes
if len(request.inputs) != 1:
raise HTTPException(400, "Currently only single-input recipes supported")
input_hash = request.inputs[0]
task = render_effect.delay(input_hash, request.recipe, output_name)
run.celery_task_id = task.id
run.status = "running"
save_run(run)
return run
@app.get("/runs/{run_id}", response_model=RunStatus)
async def get_run(run_id: str):
"""Get status of a run."""
run = load_run(run_id)
if not run:
raise HTTPException(404, f"Run {run_id} not found")
# Check Celery task status if running
if run.status == "running" and run.celery_task_id:
task = celery_app.AsyncResult(run.celery_task_id)
if task.ready():
if task.successful():
result = task.result
run.status = "completed"
run.completed_at = datetime.now(timezone.utc).isoformat()
run.output_hash = result.get("output", {}).get("content_hash")
# Extract effects info from provenance
effects = result.get("effects", [])
if effects:
run.effects_commit = effects[0].get("repo_commit")
run.effect_url = effects[0].get("repo_url")
# Extract infrastructure info
run.infrastructure = result.get("infrastructure")
# Cache the output
output_path = Path(result.get("output", {}).get("local_path", ""))
if output_path.exists():
cache_file(output_path, node_type="effect_output")
# Record activity for deletion tracking
if run.output_hash and run.inputs:
cache_manager.record_simple_activity(
input_hashes=run.inputs,
output_hash=run.output_hash,
run_id=run.run_id,
)
else:
run.status = "failed"
run.error = str(task.result)
# Save updated status
save_run(run)
return run
@app.delete("/runs/{run_id}")
async def discard_run(run_id: str, username: str = Depends(get_required_user)):
"""
Discard (delete) a run and its intermediate cache entries.
Enforces deletion rules:
- Cannot discard if any item (input, output) is published to L2
- Deletes intermediate cache entries
- Keeps inputs (may be used by other runs)
- Deletes orphaned outputs
"""
run = load_run(run_id)
if not run:
raise HTTPException(404, f"Run {run_id} not found")
# Check ownership
actor_id = f"@{username}@{L2_DOMAIN}"
if run.username not in (username, actor_id):
raise HTTPException(403, "Access denied")
# Check if activity exists for this run
activity = cache_manager.get_activity(run_id)
if activity:
# Use activity manager deletion rules
can_discard, reason = cache_manager.can_discard_activity(run_id)
if not can_discard:
raise HTTPException(400, f"Cannot discard run: {reason}")
# Discard the activity (cleans up cache entries)
success, msg = cache_manager.discard_activity(run_id)
if not success:
raise HTTPException(500, f"Failed to discard: {msg}")
else:
# Legacy run without activity record - check L2 shared status manually
items_to_check = list(run.inputs or [])
if run.output_hash:
items_to_check.append(run.output_hash)
for content_hash in items_to_check:
if cache_manager.l2_checker.is_shared(content_hash):
raise HTTPException(400, f"Cannot discard run: item {content_hash[:16]}... is published to L2")
# Remove from Redis
redis_client.delete(f"{RUNS_KEY_PREFIX}{run_id}")
return {"discarded": True, "run_id": run_id}
@app.get("/run/{run_id}")
async def run_detail(run_id: str, request: Request):
"""Run detail. HTML for browsers, JSON for APIs."""
run = load_run(run_id)
if not run:
if wants_html(request):
content = f'<p class="text-red-400">Run not found: {run_id}</p>'
return HTMLResponse(render_page("Not Found", content, None, active_tab="runs"), status_code=404)
raise HTTPException(404, f"Run {run_id} not found")
# Check Celery task status if running
if run.status == "running" and run.celery_task_id:
task = celery_app.AsyncResult(run.celery_task_id)
if task.ready():
if task.successful():
result = task.result
run.status = "completed"
run.completed_at = datetime.now(timezone.utc).isoformat()
run.output_hash = result.get("output", {}).get("content_hash")
effects = result.get("effects", [])
if effects:
run.effects_commit = effects[0].get("repo_commit")
run.effect_url = effects[0].get("repo_url")
run.infrastructure = result.get("infrastructure")
output_path = Path(result.get("output", {}).get("local_path", ""))
if output_path.exists():
cache_file(output_path)
else:
run.status = "failed"
run.error = str(task.result)
save_run(run)
if wants_html(request):
current_user = get_user_from_cookie(request)
if not current_user:
content = '<p class="text-gray-400 py-8 text-center"><a href="/login" class="text-blue-400 hover:text-blue-300">Login</a> to view run details.</p>'
return HTMLResponse(render_page("Login Required", content, current_user, active_tab="runs"), status_code=401)
# Check user owns this run
actor_id = f"@{current_user}@{L2_DOMAIN}"
if run.username not in (current_user, actor_id):
content = '<p class="text-red-400 py-8 text-center">Access denied.</p>'
return HTMLResponse(render_page("Access Denied", content, current_user, active_tab="runs"), status_code=403)
# Build effect URL
if run.effect_url:
effect_url = run.effect_url
elif run.effects_commit and run.effects_commit != "unknown":
effect_url = f"https://git.rose-ash.com/art-dag/effects/src/commit/{run.effects_commit}/{run.recipe}"
else:
effect_url = f"https://git.rose-ash.com/art-dag/effects/src/branch/main/{run.recipe}"
# Status badge colors
status_colors = {
"completed": "bg-green-600 text-white",
"running": "bg-yellow-600 text-white",
"failed": "bg-red-600 text-white",
"pending": "bg-gray-600 text-white"
}
status_badge = status_colors.get(run.status, "bg-gray-600 text-white")
# Build media HTML for input/output
media_html = ""
has_input = run.inputs and cache_manager.has_content(run.inputs[0])
has_output = run.status == "completed" and run.output_hash and cache_manager.has_content(run.output_hash)
if has_input or has_output:
media_html = '<div class="grid gap-6 md:grid-cols-2 mb-8">'
if has_input:
input_hash = run.inputs[0]
input_media_type = detect_media_type(get_cache_path(input_hash))
input_video_src = video_src_for_request(input_hash, request)
if input_media_type == "video":
input_elem = f'<video src="{input_video_src}" controls muted loop playsinline class="max-w-full max-h-64 rounded-lg"></video>'
elif input_media_type == "image":
input_elem = f'<img src="/cache/{input_hash}" alt="input" class="max-w-full max-h-64 rounded-lg">'
else:
input_elem = '<p class="text-gray-400">Unknown format</p>'
media_html += f'''
<div class="bg-dark-600 rounded-lg p-4">
<div class="text-sm text-gray-400 mb-2">Input</div>
<a href="/cache/{input_hash}/detail" class="text-blue-400 hover:text-blue-300 font-mono text-xs">{input_hash[:24]}...</a>
<div class="mt-3 flex justify-center">{input_elem}</div>
</div>
'''
if has_output:
output_hash = run.output_hash
output_media_type = detect_media_type(get_cache_path(output_hash))
output_video_src = video_src_for_request(output_hash, request)
if output_media_type == "video":
output_elem = f'<video src="{output_video_src}" controls autoplay muted loop playsinline class="max-w-full max-h-64 rounded-lg"></video>'
elif output_media_type == "image":
output_elem = f'<img src="/cache/{output_hash}" alt="output" class="max-w-full max-h-64 rounded-lg">'
else:
output_elem = '<p class="text-gray-400">Unknown format</p>'
media_html += f'''
<div class="bg-dark-600 rounded-lg p-4">
<div class="text-sm text-gray-400 mb-2">Output</div>
<a href="/cache/{output_hash}/detail" class="text-blue-400 hover:text-blue-300 font-mono text-xs">{output_hash[:24]}...</a>
<div class="mt-3 flex justify-center">{output_elem}</div>
</div>
'''
media_html += '</div>'
# Build inputs list
inputs_html = ''.join([f'<a href="/cache/{inp}/detail" class="text-blue-400 hover:text-blue-300 font-mono text-xs block">{inp}</a>' for inp in run.inputs])
# Infrastructure section
infra_html = ""
if run.infrastructure:
software = run.infrastructure.get("software", {})
hardware = run.infrastructure.get("hardware", {})
infra_html = f'''
<div class="bg-dark-600 rounded-lg p-4">
<div class="text-sm text-gray-400 mb-1">Infrastructure</div>
<div class="text-gray-200 text-sm">
Software: {software.get("name", "unknown")} ({software.get("content_hash", "unknown")[:16]}...)<br>
Hardware: {hardware.get("name", "unknown")} ({hardware.get("content_hash", "unknown")[:16]}...)
</div>
</div>
'''
# Error display
error_html = ""
if run.error:
error_html = f'''
<div class="bg-red-900/30 border border-red-700 rounded-lg p-4 mb-6">
<div class="text-sm text-red-400 mb-1">Error</div>
<div class="text-red-300">{run.error}</div>
</div>
'''
# Publish section
publish_html = ""
if run.status == "completed" and run.output_hash:
publish_html = f'''
<div class="border-t border-dark-500 pt-6 mt-6">
<h2 class="text-lg font-semibold text-white mb-3">Publish to L2</h2>
<p class="text-sm text-gray-400 mb-4">Register this transformation output on the L2 ActivityPub server.</p>
<div id="publish-result"></div>
<form hx-post="/ui/publish-run/{run.run_id}" hx-target="#publish-result" hx-swap="innerHTML"
class="flex flex-wrap gap-3 items-center">
<input type="text" name="output_name" value="{run.output_name}"
placeholder="Asset name" required
class="px-4 py-2 bg-dark-600 border border-dark-500 rounded-lg text-white placeholder-gray-500 focus:border-blue-500 focus:outline-none min-w-[200px]">
<button type="submit"
class="px-4 py-2 bg-blue-600 hover:bg-blue-700 text-white font-medium rounded-lg transition-colors">
Publish to L2
</button>
</form>
</div>
'''
output_link = ""
if run.output_hash:
output_link = f'''<div class="bg-dark-600 rounded-lg p-4">
<div class="text-sm text-gray-400 mb-1">Output</div>
<a href="/cache/{run.output_hash}/detail" class="text-blue-400 hover:text-blue-300 font-mono text-xs">{run.output_hash}</a>
</div>'''
completed_html = ""
if run.completed_at:
completed_html = f'''<div class="bg-dark-600 rounded-lg p-4">
<div class="text-sm text-gray-400 mb-1">Completed</div>
<div class="text-gray-200">{run.completed_at[:19].replace('T', ' ')}</div>
</div>'''
content = f'''
<a href="/runs" class="inline-flex items-center text-blue-400 hover:text-blue-300 mb-6">
<svg class="w-4 h-4 mr-1" fill="none" stroke="currentColor" viewBox="0 0 24 24">
<path stroke-linecap="round" stroke-linejoin="round" stroke-width="2" d="M15 19l-7-7 7-7"/>
</svg>
Back to runs
</a>
<div class="bg-dark-700 rounded-lg p-6">
<div class="flex flex-wrap items-center justify-between gap-4 mb-6">
<div class="flex items-center gap-3">
<a href="{effect_url}" target="_blank"
class="px-3 py-1 bg-blue-600 hover:bg-blue-700 text-white text-sm font-medium rounded-full transition-colors">
{run.recipe}
</a>
<span class="text-gray-400 font-mono text-sm">{run.run_id[:16]}...</span>
</div>
<span class="px-3 py-1 {status_badge} text-sm font-medium rounded-full">{run.status}</span>
</div>
{error_html}
{media_html}
<div class="border-t border-dark-500 pt-6">
<h2 class="text-lg font-semibold text-white mb-4">Provenance</h2>
<div class="grid gap-4 sm:grid-cols-2">
<div class="bg-dark-600 rounded-lg p-4">
<div class="text-sm text-gray-400 mb-1">Owner</div>
<div class="text-gray-200">{run.username or "anonymous"}</div>
</div>
<div class="bg-dark-600 rounded-lg p-4">
<div class="text-sm text-gray-400 mb-1">Effect</div>
<a href="{effect_url}" target="_blank" class="text-blue-400 hover:text-blue-300">{run.recipe}</a>
</div>
<div class="bg-dark-600 rounded-lg p-4">
<div class="text-sm text-gray-400 mb-1">Effects Commit</div>
<div class="text-gray-200 font-mono text-xs">{run.effects_commit or "N/A"}</div>
</div>
<div class="bg-dark-600 rounded-lg p-4">
<div class="text-sm text-gray-400 mb-1">Input(s)</div>
<div>{inputs_html}</div>
</div>
{output_link}
<div class="bg-dark-600 rounded-lg p-4">
<div class="text-sm text-gray-400 mb-1">Run ID</div>
<div class="text-gray-200 font-mono text-xs">{run.run_id}</div>
</div>
<div class="bg-dark-600 rounded-lg p-4">
<div class="text-sm text-gray-400 mb-1">Created</div>
<div class="text-gray-200">{run.created_at[:19].replace('T', ' ')}</div>
</div>
{completed_html}
{infra_html}
</div>
</div>
{publish_html}
</div>
'''
return HTMLResponse(render_page(f"Run: {run.recipe}", content, current_user, active_tab="runs"))
# JSON response
return run.model_dump()
@app.get("/runs")
async def list_runs(request: Request, page: int = 1, limit: int = 20):
"""List runs. HTML for browsers (with infinite scroll), JSON for APIs (with pagination)."""
current_user = get_user_from_cookie(request)
all_runs = list_all_runs()
total = len(all_runs)
# Filter by user if logged in for HTML
if wants_html(request) and current_user:
actor_id = f"@{current_user}@{L2_DOMAIN}"
all_runs = [r for r in all_runs if r.username in (current_user, actor_id)]
total = len(all_runs)
# Pagination
start = (page - 1) * limit
end = start + limit
runs_page = all_runs[start:end]
has_more = end < total
if wants_html(request):
if not current_user:
content = '<p class="text-gray-400 py-8 text-center"><a href="/login" class="text-blue-400 hover:text-blue-300">Login</a> to see your runs.</p>'
return HTMLResponse(render_page("Runs", content, current_user, active_tab="runs"))
if not runs_page:
if page == 1:
content = '<p class="text-gray-400 py-8 text-center">You have no runs yet. Use the CLI to start a run.</p>'
else:
return HTMLResponse("") # Empty for infinite scroll
else:
# Status badge colors
status_colors = {
"completed": "bg-green-600 text-white",
"running": "bg-yellow-600 text-white",
"failed": "bg-red-600 text-white",
"pending": "bg-gray-600 text-white"
}
html_parts = []
for run in runs_page:
status_badge = status_colors.get(run.status, "bg-gray-600 text-white")
html_parts.append(f'''
<a href="/run/{run.run_id}" class="block">
<div class="bg-dark-700 rounded-lg p-4 hover:bg-dark-600 transition-colors">
<div class="flex flex-wrap items-center justify-between gap-3 mb-3">
<div class="flex items-center gap-3">
<span class="px-3 py-1 bg-blue-600 text-white text-sm font-medium rounded-full">{run.recipe}</span>
<span class="text-gray-400 font-mono text-xs hidden sm:inline">{run.run_id[:16]}...</span>
</div>
<span class="px-3 py-1 {status_badge} text-xs font-medium rounded-full">{run.status}</span>
</div>
<div class="text-sm text-gray-400 mb-3">
Created: {run.created_at[:19].replace('T', ' ')}
</div>
''')
# Show input and output thumbnails
has_input = run.inputs and cache_manager.has_content(run.inputs[0])
has_output = run.status == "completed" and run.output_hash and cache_manager.has_content(run.output_hash)
if has_input or has_output:
html_parts.append('<div class="grid gap-4 sm:grid-cols-2">')
if has_input:
input_hash = run.inputs[0]
input_media_type = detect_media_type(get_cache_path(input_hash))
html_parts.append(f'''
<div class="bg-dark-600 rounded-lg p-3">
<div class="text-xs text-gray-400 mb-2">Input</div>
<div class="flex justify-center">
''')
if input_media_type == "video":
html_parts.append(f'<video src="{video_src_for_request(input_hash, request)}" muted loop playsinline class="max-h-24 rounded"></video>')
else:
html_parts.append(f'<img src="/cache/{input_hash}" alt="input" class="max-h-24 rounded">')
html_parts.append('</div></div>')
if has_output:
output_hash = run.output_hash
output_media_type = detect_media_type(get_cache_path(output_hash))
html_parts.append(f'''
<div class="bg-dark-600 rounded-lg p-3">
<div class="text-xs text-gray-400 mb-2">Output</div>
<div class="flex justify-center">
''')
if output_media_type == "video":
html_parts.append(f'<video src="{video_src_for_request(output_hash, request)}" autoplay muted loop playsinline class="max-h-24 rounded"></video>')
else:
html_parts.append(f'<img src="/cache/{output_hash}" alt="output" class="max-h-24 rounded">')
html_parts.append('</div></div>')
html_parts.append('</div>')
if run.status == "failed" and run.error:
html_parts.append(f'<div class="mt-3 text-sm text-red-400">Error: {run.error[:100]}</div>')
html_parts.append('</div></a>')
# For infinite scroll, just return cards if not first page
if page > 1:
if has_more:
html_parts.append(f'''
<div hx-get="/runs?page={page + 1}" hx-trigger="revealed" hx-swap="afterend">
<p class="py-4 text-center text-gray-400">Loading more...</p>
</div>
''')
return HTMLResponse('\n'.join(html_parts))
# First page - full content
infinite_scroll_trigger = ""
if has_more:
infinite_scroll_trigger = f'''
<div hx-get="/runs?page=2" hx-trigger="revealed" hx-swap="afterend">
<p class="py-4 text-center text-gray-400">Loading more...</p>
</div>
'''
content = f'''
<h2 class="text-xl font-semibold text-white mb-6">Runs ({total} total)</h2>
<div class="space-y-4">
{''.join(html_parts)}
{infinite_scroll_trigger}
</div>
'''
return HTMLResponse(render_page("Runs", content, current_user, active_tab="runs"))
# JSON response for APIs
return {
"runs": [r.model_dump() for r in runs_page],
"pagination": {
"page": page,
"limit": limit,
"total": total,
"has_more": has_more
}
}
@app.get("/cache/{content_hash}")
async def get_cached(content_hash: str):
"""Get cached content by hash."""
cache_path = get_cache_path(content_hash)
if not cache_path:
raise HTTPException(404, f"Content {content_hash} not in cache")
return FileResponse(cache_path)
@app.get("/cache/{content_hash}/mp4")
async def get_cached_mp4(content_hash: str):
"""Get cached content as MP4 (transcodes MKV on first request, caches result)."""
cache_path = get_cache_path(content_hash)
if not cache_path:
raise HTTPException(404, f"Content {content_hash} not in cache")
# MP4 transcodes stored alongside original in CACHE_DIR
mp4_path = CACHE_DIR / f"{content_hash}.mp4"
# If MP4 already cached, serve it
if mp4_path.exists():
return FileResponse(mp4_path, media_type="video/mp4")
# Check if source is already MP4
media_type = detect_media_type(cache_path)
if media_type != "video":
raise HTTPException(400, "Content is not a video")
# Check if already MP4 format
import subprocess
try:
result = subprocess.run(
["ffprobe", "-v", "error", "-select_streams", "v:0",
"-show_entries", "format=format_name", "-of", "csv=p=0", str(cache_path)],
capture_output=True, text=True, timeout=10
)
if "mp4" in result.stdout.lower() or "mov" in result.stdout.lower():
# Already MP4-compatible, just serve original
return FileResponse(cache_path, media_type="video/mp4")
except Exception:
pass # Continue with transcoding
# Transcode to MP4 (H.264 + AAC)
transcode_path = CACHE_DIR / f"{content_hash}.transcoding.mp4"
try:
result = subprocess.run(
["ffmpeg", "-y", "-i", str(cache_path),
"-c:v", "libx264", "-preset", "fast", "-crf", "23",
"-c:a", "aac", "-b:a", "128k",
"-movflags", "+faststart",
str(transcode_path)],
capture_output=True, text=True, timeout=600 # 10 min timeout
)
if result.returncode != 0:
raise HTTPException(500, f"Transcoding failed: {result.stderr[:200]}")
# Move to final location
transcode_path.rename(mp4_path)
except subprocess.TimeoutExpired:
if transcode_path.exists():
transcode_path.unlink()
raise HTTPException(500, "Transcoding timed out")
except Exception as e:
if transcode_path.exists():
transcode_path.unlink()
raise HTTPException(500, f"Transcoding failed: {e}")
return FileResponse(mp4_path, media_type="video/mp4")
@app.get("/cache/{content_hash}/detail")
async def cache_detail(content_hash: str, request: Request):
"""View cached content detail. HTML for browsers, JSON for APIs."""
current_user = get_user_from_cookie(request)
cache_path = get_cache_path(content_hash)
if not cache_path:
if wants_html(request):
content = f'<p class="text-red-400">Content not found: {content_hash}</p>'
return HTMLResponse(render_page("Not Found", content, current_user, active_tab="cache"), status_code=404)
raise HTTPException(404, f"Content {content_hash} not in cache")
if wants_html(request):
if not current_user:
content = '<p class="text-gray-400 py-8 text-center"><a href="/login" class="text-blue-400 hover:text-blue-300">Login</a> to view cached content.</p>'
return HTMLResponse(render_page("Login Required", content, current_user, active_tab="cache"), status_code=401)
# Check user has access
user_hashes = get_user_cache_hashes(current_user)
if content_hash not in user_hashes:
content = '<p class="text-red-400 py-8 text-center">Access denied.</p>'
return HTMLResponse(render_page("Access Denied", content, current_user, active_tab="cache"), status_code=403)
media_type = detect_media_type(cache_path)
file_size = cache_path.stat().st_size
size_str = f"{file_size:,} bytes"
if file_size > 1024*1024:
size_str = f"{file_size/(1024*1024):.1f} MB"
elif file_size > 1024:
size_str = f"{file_size/1024:.1f} KB"
# Build media display HTML
if media_type == "video":
video_src = video_src_for_request(content_hash, request)
media_html = f'<video src="{video_src}" controls autoplay muted loop playsinline class="max-w-full max-h-96 rounded-lg"></video>'
elif media_type == "image":
media_html = f'<img src="/cache/{content_hash}" alt="{content_hash}" class="max-w-full max-h-96 rounded-lg">'
else:
media_html = f'<p class="text-gray-400">Unknown file type. <a href="/cache/{content_hash}" download class="text-blue-400 hover:text-blue-300">Download file</a></p>'
content = f'''
<a href="/cache" class="inline-flex items-center text-blue-400 hover:text-blue-300 mb-6">
<svg class="w-4 h-4 mr-1" fill="none" stroke="currentColor" viewBox="0 0 24 24">
<path stroke-linecap="round" stroke-linejoin="round" stroke-width="2" d="M15 19l-7-7 7-7"/>
</svg>
Back to cache
</a>
<div class="bg-dark-700 rounded-lg p-6">
<div class="flex flex-wrap items-center justify-between gap-4 mb-6">
<div class="flex items-center gap-3">
<span class="px-3 py-1 bg-blue-600 text-white text-sm font-medium rounded-full">{media_type.capitalize()}</span>
<span class="text-gray-400 font-mono text-sm">{content_hash[:24]}...</span>
</div>
<a href="/cache/{content_hash}" download
class="px-4 py-2 bg-green-600 hover:bg-green-700 text-white font-medium rounded-lg transition-colors">
Download
</a>
</div>
<div class="flex justify-center mb-8">
{media_html}
</div>
<div class="border-t border-dark-500 pt-6">
<h2 class="text-lg font-semibold text-white mb-4">Details</h2>
<div class="grid gap-4 sm:grid-cols-2">
<div class="bg-dark-600 rounded-lg p-4">
<div class="text-sm text-gray-400 mb-1">Content Hash (SHA3-256)</div>
<div class="font-mono text-xs text-gray-200 break-all">{content_hash}</div>
</div>
<div class="bg-dark-600 rounded-lg p-4">
<div class="text-sm text-gray-400 mb-1">Type</div>
<div class="text-gray-200">{media_type}</div>
</div>
<div class="bg-dark-600 rounded-lg p-4">
<div class="text-sm text-gray-400 mb-1">Size</div>
<div class="text-gray-200">{size_str}</div>
</div>
<div class="bg-dark-600 rounded-lg p-4">
<div class="text-sm text-gray-400 mb-1">Raw URL</div>
<div class="text-blue-400 text-sm truncate">
<a href="/cache/{content_hash}" class="hover:text-blue-300">/cache/{content_hash}</a>
</div>
</div>
</div>
</div>
<!-- Metadata Section -->
<div class="border-t border-dark-500 pt-6 mt-6" id="metadata-section"
hx-get="/cache/{content_hash}/meta-form" hx-trigger="load" hx-swap="innerHTML">
<div class="text-gray-400">Loading metadata...</div>
</div>
</div>
'''
return HTMLResponse(render_page(f"Cache: {content_hash[:16]}...", content, current_user, active_tab="cache"))
# JSON response - return metadata
meta = load_cache_meta(content_hash)
file_size = cache_path.stat().st_size
media_type = detect_media_type(cache_path)
return {
"content_hash": content_hash,
"size": file_size,
"media_type": media_type,
"meta": meta
}
@app.get("/cache/{content_hash}/meta-form", response_class=HTMLResponse)
async def cache_meta_form(content_hash: str, request: Request):
"""Clean URL redirect to the HTMX meta form."""
# Just redirect to the old endpoint for now
from starlette.responses import RedirectResponse
return RedirectResponse(f"/ui/cache/{content_hash}/meta-form", status_code=302)
@app.get("/ui/cache/{content_hash}")
async def ui_cache_view(content_hash: str):
"""Redirect to clean URL."""
return RedirectResponse(url=f"/cache/{content_hash}/detail", status_code=302)
@app.get("/ui/cache/{content_hash}/meta-form", response_class=HTMLResponse)
async def ui_cache_meta_form(content_hash: str, request: Request):
"""HTMX partial: metadata editing form for a cached item."""
current_user = get_user_from_cookie(request)
if not current_user:
return '<div class="text-red-400">Login required to edit metadata</div>'
# Check ownership
user_hashes = get_user_cache_hashes(current_user)
if content_hash not in user_hashes:
return '<div class="text-red-400">Access denied</div>'
# Load metadata
meta = load_cache_meta(content_hash)
origin = meta.get("origin", {})
origin_type = origin.get("type", "")
origin_url = origin.get("url", "")
origin_note = origin.get("note", "")
description = meta.get("description", "")
tags = meta.get("tags", [])
tags_str = ", ".join(tags) if tags else ""
published = meta.get("published", {})
pinned = meta.get("pinned", False)
pin_reason = meta.get("pin_reason", "")
# Detect media type for publish
cache_path = get_cache_path(content_hash)
media_type = detect_media_type(cache_path) if cache_path else "unknown"
asset_type = "video" if media_type == "video" else "image"
# Origin radio checked states
self_checked = 'checked' if origin_type == "self" else ''
external_checked = 'checked' if origin_type == "external" else ''
# Build publish section
if published.get("to_l2"):
asset_name = published.get("asset_name", "")
published_at = published.get("published_at", "")[:10]
last_synced = published.get("last_synced_at", "")[:10]
publish_html = f'''
<div class="bg-green-900/30 border border-green-700 rounded-lg p-4 mb-4">
<div class="text-green-400 font-medium mb-2">Published to L2</div>
<div class="text-sm text-gray-300">
Asset name: <strong>{asset_name}</strong><br>
Published: {published_at}<br>
Last synced: {last_synced}
</div>
</div>
<div id="republish-result"></div>
<button hx-patch="/ui/cache/{content_hash}/republish" hx-target="#republish-result" hx-swap="innerHTML"
class="px-4 py-2 bg-blue-600 hover:bg-blue-700 text-white font-medium rounded-lg transition-colors">
Update on L2
</button>
'''
else:
# Show publish form only if origin is set
if origin_type:
publish_html = f'''
<div id="publish-result"></div>
<form hx-post="/ui/cache/{content_hash}/publish" hx-target="#publish-result" hx-swap="innerHTML"
class="flex flex-wrap gap-3 items-end">
<div>
<label class="block text-sm text-gray-400 mb-1">Asset Name</label>
<input type="text" name="asset_name" placeholder="my-{asset_type}" required
class="px-4 py-2 bg-dark-600 border border-dark-500 rounded-lg text-white placeholder-gray-500 focus:border-blue-500 focus:outline-none min-w-[200px]">
</div>
<input type="hidden" name="asset_type" value="{asset_type}">
<button type="submit"
class="px-4 py-2 bg-blue-600 hover:bg-blue-700 text-white font-medium rounded-lg transition-colors">
Publish to L2
</button>
</form>
'''
else:
publish_html = '''
<div class="bg-yellow-900/30 border border-yellow-700 text-yellow-300 px-4 py-3 rounded-lg">
Set an origin (self or external URL) before publishing.
</div>
'''
return f'''
<h2 class="text-lg font-semibold text-white mb-4">Metadata</h2>
<div id="meta-save-result"></div>
<form hx-patch="/ui/cache/{content_hash}/meta" hx-target="#meta-save-result" hx-swap="innerHTML" class="space-y-4 mb-6">
<!-- Origin -->
<div class="bg-dark-600 rounded-lg p-4">
<label class="block text-sm font-medium text-gray-300 mb-3">Origin</label>
<div class="space-y-3">
<label class="flex items-center gap-3 cursor-pointer">
<input type="radio" name="origin_type" value="self" {self_checked}
class="w-4 h-4 text-blue-600 bg-dark-500 border-dark-400">
<span class="text-gray-200">Created by me (original content)</span>
</label>
<label class="flex items-center gap-3 cursor-pointer">
<input type="radio" name="origin_type" value="external" {external_checked}
class="w-4 h-4 text-blue-600 bg-dark-500 border-dark-400">
<span class="text-gray-200">External source</span>
</label>
<div class="ml-7 space-y-2">
<input type="url" name="origin_url" value="{origin_url}" placeholder="https://example.com/source"
class="w-full px-3 py-2 bg-dark-500 border border-dark-400 rounded-lg text-white placeholder-gray-500 focus:border-blue-500 focus:outline-none text-sm">
<input type="text" name="origin_note" value="{origin_note}" placeholder="Note (optional)"
class="w-full px-3 py-2 bg-dark-500 border border-dark-400 rounded-lg text-white placeholder-gray-500 focus:border-blue-500 focus:outline-none text-sm">
</div>
</div>
</div>
<!-- Description -->
<div class="bg-dark-600 rounded-lg p-4">
<label class="block text-sm font-medium text-gray-300 mb-2">Description</label>
<textarea name="description" rows="2" placeholder="Optional description..."
class="w-full px-3 py-2 bg-dark-500 border border-dark-400 rounded-lg text-white placeholder-gray-500 focus:border-blue-500 focus:outline-none text-sm">{description}</textarea>
</div>
<!-- Tags -->
<div class="bg-dark-600 rounded-lg p-4">
<label class="block text-sm font-medium text-gray-300 mb-2">Tags</label>
<input type="text" name="tags" value="{tags_str}" placeholder="tag1, tag2, tag3"
class="w-full px-3 py-2 bg-dark-500 border border-dark-400 rounded-lg text-white placeholder-gray-500 focus:border-blue-500 focus:outline-none text-sm">
<p class="text-xs text-gray-500 mt-1">Comma-separated list</p>
</div>
<button type="submit"
class="px-4 py-2 bg-green-600 hover:bg-green-700 text-white font-medium rounded-lg transition-colors">
Save Metadata
</button>
</form>
<!-- Publishing Section -->
<div class="border-t border-dark-500 pt-6">
<h3 class="text-lg font-semibold text-white mb-4">Publish to L2 (ActivityPub)</h3>
{publish_html}
</div>
<!-- Status & Actions Section -->
<div class="border-t border-dark-500 pt-6 mt-6">
<h3 class="text-lg font-semibold text-white mb-4">Status</h3>
<div class="bg-dark-600 rounded-lg p-4 mb-4">
<div class="flex items-center gap-2 mb-2">
<span class="text-sm text-gray-400">Pinned:</span>
{'<span class="text-green-400">Yes</span>' if pinned else '<span class="text-gray-500">No</span>'}
{f'<span class="text-xs text-gray-500 ml-2">({pin_reason})</span>' if pinned and pin_reason else ''}
</div>
<p class="text-xs text-gray-500">Pinned items cannot be discarded. Items are pinned when published or used as inputs to published content.</p>
</div>
<div id="discard-result"></div>
{'<p class="text-gray-500 text-sm">Cannot discard pinned items.</p>' if pinned else f"""
<button hx-delete="/ui/cache/{content_hash}/discard" hx-target="#discard-result" hx-swap="innerHTML"
hx-confirm="Are you sure you want to discard this item? This cannot be undone."
class="px-4 py-2 bg-red-600 hover:bg-red-700 text-white font-medium rounded-lg transition-colors">
Discard Item
</button>
"""}
</div>
'''
@app.patch("/ui/cache/{content_hash}/meta", response_class=HTMLResponse)
async def ui_update_cache_meta(content_hash: str, request: Request):
"""HTMX handler: update cache metadata from form."""
current_user = get_user_from_cookie(request)
if not current_user:
return '<div class="bg-red-900/50 border border-red-700 text-red-300 px-4 py-3 rounded-lg mb-4">Login required</div>'
# Check ownership
user_hashes = get_user_cache_hashes(current_user)
if content_hash not in user_hashes:
return '<div class="bg-red-900/50 border border-red-700 text-red-300 px-4 py-3 rounded-lg mb-4">Access denied</div>'
# Parse form data
form = await request.form()
origin_type = form.get("origin_type", "")
origin_url = form.get("origin_url", "").strip()
origin_note = form.get("origin_note", "").strip()
description = form.get("description", "").strip()
tags_str = form.get("tags", "").strip()
# Build origin
origin = None
if origin_type == "self":
origin = {"type": "self"}
elif origin_type == "external":
if not origin_url:
return '<div class="bg-red-900/50 border border-red-700 text-red-300 px-4 py-3 rounded-lg mb-4">External origin requires a URL</div>'
origin = {"type": "external", "url": origin_url}
if origin_note:
origin["note"] = origin_note
# Parse tags
tags = [t.strip() for t in tags_str.split(",") if t.strip()] if tags_str else []
# Build updates
updates = {}
if origin:
updates["origin"] = origin
if description:
updates["description"] = description
updates["tags"] = tags
# Save
save_cache_meta(content_hash, **updates)
return '<div class="bg-green-900/50 border border-green-700 text-green-300 px-4 py-3 rounded-lg mb-4">Metadata saved!</div>'
@app.post("/ui/cache/{content_hash}/publish", response_class=HTMLResponse)
async def ui_publish_cache(content_hash: str, request: Request):
"""HTMX handler: publish cache item to L2."""
current_user = get_user_from_cookie(request)
if not current_user:
return '<div class="bg-red-900/50 border border-red-700 text-red-300 px-4 py-3 rounded-lg mb-4">Login required</div>'
token = request.cookies.get("auth_token")
if not token:
return '<div class="bg-red-900/50 border border-red-700 text-red-300 px-4 py-3 rounded-lg mb-4">Auth token required</div>'
# Check ownership
user_hashes = get_user_cache_hashes(current_user)
if content_hash not in user_hashes:
return '<div class="bg-red-900/50 border border-red-700 text-red-300 px-4 py-3 rounded-lg mb-4">Access denied</div>'
# Parse form
form = await request.form()
asset_name = form.get("asset_name", "").strip()
asset_type = form.get("asset_type", "image")
if not asset_name:
return '<div class="bg-red-900/50 border border-red-700 text-red-300 px-4 py-3 rounded-lg mb-4">Asset name required</div>'
# Load metadata
meta = load_cache_meta(content_hash)
origin = meta.get("origin")
if not origin or "type" not in origin:
return '<div class="bg-red-900/50 border border-red-700 text-red-300 px-4 py-3 rounded-lg mb-4">Set origin before publishing</div>'
# Call L2
try:
resp = http_requests.post(
f"{L2_SERVER}/registry/publish-cache",
headers={"Authorization": f"Bearer {token}"},
json={
"content_hash": content_hash,
"asset_name": asset_name,
"asset_type": asset_type,
"origin": origin,
"description": meta.get("description"),
"tags": meta.get("tags", []),
"metadata": {
"filename": meta.get("filename"),
"folder": meta.get("folder"),
"collections": meta.get("collections", [])
}
},
timeout=30
)
resp.raise_for_status()
except http_requests.exceptions.HTTPError as e:
error_detail = ""
try:
error_detail = e.response.json().get("detail", str(e))
except Exception:
error_detail = str(e)
return f'<div class="bg-red-900/50 border border-red-700 text-red-300 px-4 py-3 rounded-lg mb-4">Error: {error_detail}</div>'
except Exception as e:
return f'<div class="bg-red-900/50 border border-red-700 text-red-300 px-4 py-3 rounded-lg mb-4">Error: {e}</div>'
# Update local metadata
publish_info = {
"to_l2": True,
"asset_name": asset_name,
"published_at": datetime.now(timezone.utc).isoformat(),
"last_synced_at": datetime.now(timezone.utc).isoformat()
}
save_cache_meta(content_hash, published=publish_info, pinned=True, pin_reason="published")
return f'''
<div class="bg-green-900/50 border border-green-700 text-green-300 px-4 py-3 rounded-lg mb-4">
Published to L2 as <strong>{asset_name}</strong>!
<a href="{L2_SERVER.replace("http://", "https://")}/ui/asset/{asset_name}" target="_blank" class="underline">View on L2</a>
</div>
'''
@app.patch("/ui/cache/{content_hash}/republish", response_class=HTMLResponse)
async def ui_republish_cache(content_hash: str, request: Request):
"""HTMX handler: re-publish (update) cache item on L2."""
current_user = get_user_from_cookie(request)
if not current_user:
return '<div class="bg-red-900/50 border border-red-700 text-red-300 px-4 py-3 rounded-lg mb-4">Login required</div>'
token = request.cookies.get("auth_token")
if not token:
return '<div class="bg-red-900/50 border border-red-700 text-red-300 px-4 py-3 rounded-lg mb-4">Auth token required</div>'
# Check ownership
user_hashes = get_user_cache_hashes(current_user)
if content_hash not in user_hashes:
return '<div class="bg-red-900/50 border border-red-700 text-red-300 px-4 py-3 rounded-lg mb-4">Access denied</div>'
# Load metadata
meta = load_cache_meta(content_hash)
published = meta.get("published", {})
if not published.get("to_l2"):
return '<div class="bg-red-900/50 border border-red-700 text-red-300 px-4 py-3 rounded-lg mb-4">Item not published yet</div>'
asset_name = published.get("asset_name")
if not asset_name:
return '<div class="bg-red-900/50 border border-red-700 text-red-300 px-4 py-3 rounded-lg mb-4">No asset name found</div>'
# Call L2 update
try:
resp = http_requests.patch(
f"{L2_SERVER}/registry/{asset_name}",
headers={"Authorization": f"Bearer {token}"},
json={
"description": meta.get("description"),
"tags": meta.get("tags"),
"origin": meta.get("origin"),
"metadata": {
"filename": meta.get("filename"),
"folder": meta.get("folder"),
"collections": meta.get("collections", [])
}
},
timeout=30
)
resp.raise_for_status()
except http_requests.exceptions.HTTPError as e:
error_detail = ""
try:
error_detail = e.response.json().get("detail", str(e))
except Exception:
error_detail = str(e)
return f'<div class="bg-red-900/50 border border-red-700 text-red-300 px-4 py-3 rounded-lg mb-4">Error: {error_detail}</div>'
except Exception as e:
return f'<div class="bg-red-900/50 border border-red-700 text-red-300 px-4 py-3 rounded-lg mb-4">Error: {e}</div>'
# Update local metadata
published["last_synced_at"] = datetime.now(timezone.utc).isoformat()
save_cache_meta(content_hash, published=published)
return '<div class="bg-green-900/50 border border-green-700 text-green-300 px-4 py-3 rounded-lg mb-4">Updated on L2!</div>'
@app.get("/cache")
async def list_cache(
request: Request,
page: int = 1,
limit: int = 20,
folder: Optional[str] = None,
collection: Optional[str] = None,
tag: Optional[str] = None
):
"""List cached content. HTML for browsers (with infinite scroll), JSON for APIs (with pagination)."""
current_user = get_user_from_cookie(request)
if wants_html(request):
# Require login for HTML cache view
if not current_user:
content = '<p class="text-gray-400 py-8 text-center"><a href="/login" class="text-blue-400 hover:text-blue-300">Login</a> to see cached content.</p>'
return HTMLResponse(render_page("Cache", content, current_user, active_tab="cache"))
# Get hashes owned by/associated with this user
user_hashes = get_user_cache_hashes(current_user)
# Get cache items that belong to the user
cache_items = []
if CACHE_DIR.exists():
for f in CACHE_DIR.iterdir():
if f.is_file() and not f.name.endswith('.provenance.json') and not f.name.endswith('.meta.json') and not f.name.endswith('.mp4'):
if f.name in user_hashes:
meta = load_cache_meta(f.name)
# Apply folder filter
if folder:
item_folder = meta.get("folder", "/")
if folder != "/" and not item_folder.startswith(folder):
continue
if folder == "/" and item_folder != "/":
continue
# Apply collection filter
if collection:
if collection not in meta.get("collections", []):
continue
# Apply tag filter
if tag:
if tag not in meta.get("tags", []):
continue
stat = f.stat()
cache_items.append({
"hash": f.name,
"size": stat.st_size,
"mtime": stat.st_mtime,
"meta": meta
})
# Sort by modification time (newest first)
cache_items.sort(key=lambda x: x["mtime"], reverse=True)
total = len(cache_items)
# Pagination
start = (page - 1) * limit
end = start + limit
items_page = cache_items[start:end]
has_more = end < total
if not items_page:
if page == 1:
filter_msg = ""
if folder:
filter_msg = f" in folder {folder}"
elif collection:
filter_msg = f" in collection '{collection}'"
elif tag:
filter_msg = f" with tag '{tag}'"
content = f'<p class="text-gray-400 py-8 text-center">No cached files{filter_msg}. Upload files or run effects to see them here.</p>'
else:
return HTMLResponse("") # Empty for infinite scroll
else:
html_parts = []
for item in items_page:
content_hash = item["hash"]
cache_path = get_cache_path(content_hash)
media_type = detect_media_type(cache_path) if cache_path else "unknown"
# Format size
size = item["size"]
if size > 1024*1024:
size_str = f"{size/(1024*1024):.1f} MB"
elif size > 1024:
size_str = f"{size/1024:.1f} KB"
else:
size_str = f"{size} bytes"
html_parts.append(f'''
<a href="/cache/{content_hash}/detail" class="block">
<div class="bg-dark-700 rounded-lg p-4 hover:bg-dark-600 transition-colors">
<div class="flex items-center justify-between gap-2 mb-3">
<span class="px-2 py-1 bg-blue-600 text-white text-xs font-medium rounded-full">{media_type}</span>
<span class="text-xs text-gray-400">{size_str}</span>
</div>
<div class="text-xs text-gray-400 font-mono mb-3 truncate">{content_hash[:24]}...</div>
<div class="flex justify-center bg-dark-600 rounded-lg p-2">
''')
if media_type == "video":
video_src = video_src_for_request(content_hash, request)
html_parts.append(f'<video src="{video_src}" controls muted loop playsinline class="max-h-32 rounded"></video>')
elif media_type == "image":
html_parts.append(f'<img src="/cache/{content_hash}" alt="{content_hash[:16]}" class="max-h-32 rounded object-contain">')
else:
html_parts.append('<p class="text-gray-400 text-sm py-4">Unknown file type</p>')
html_parts.append('</div></div></a>')
# For infinite scroll, just return cards if not first page
if page > 1:
if has_more:
query_params = f"page={page + 1}"
if folder:
query_params += f"&folder={folder}"
if collection:
query_params += f"&collection={collection}"
if tag:
query_params += f"&tag={tag}"
html_parts.append(f'''
<div hx-get="/cache?{query_params}" hx-trigger="revealed" hx-swap="afterend">
<p class="py-4 text-center text-gray-400">Loading more...</p>
</div>
''')
return HTMLResponse('\n'.join(html_parts))
# First page - full content
infinite_scroll_trigger = ""
if has_more:
query_params = "page=2"
if folder:
query_params += f"&folder={folder}"
if collection:
query_params += f"&collection={collection}"
if tag:
query_params += f"&tag={tag}"
infinite_scroll_trigger = f'''
<div hx-get="/cache?{query_params}" hx-trigger="revealed" hx-swap="afterend">
<p class="py-4 text-center text-gray-400">Loading more...</p>
</div>
'''
content = f'''
<h2 class="text-xl font-semibold text-white mb-6">Cache ({total} items)</h2>
<div class="grid gap-4 sm:grid-cols-2 lg:grid-cols-3">
{''.join(html_parts)}
{infinite_scroll_trigger}
</div>
'''
return HTMLResponse(render_page("Cache", content, current_user, active_tab="cache"))
# JSON response for APIs - list all hashes with optional pagination
all_hashes = [f.name for f in CACHE_DIR.iterdir() if f.is_file() and not f.name.endswith('.provenance.json') and not f.name.endswith('.meta.json') and not f.name.endswith('.mp4')]
total = len(all_hashes)
start = (page - 1) * limit
end = start + limit
hashes_page = all_hashes[start:end]
has_more = end < total
return {
"hashes": hashes_page,
"pagination": {
"page": page,
"limit": limit,
"total": total,
"has_more": has_more
}
}
@app.delete("/cache/{content_hash}")
async def discard_cache(content_hash: str, username: str = Depends(get_required_user)):
"""
Discard (delete) a cached item.
Enforces deletion rules:
- Cannot delete items published to L2 (shared)
- Cannot delete inputs/outputs of activities (runs)
- Cannot delete pinned items
"""
# Check if content exists
if not cache_manager.has_content(content_hash):
raise HTTPException(404, "Content not found")
# Check ownership
user_hashes = get_user_cache_hashes(username)
if content_hash not in user_hashes:
raise HTTPException(403, "Access denied")
# Check if pinned (legacy metadata)
meta = load_cache_meta(content_hash)
if meta.get("pinned"):
pin_reason = meta.get("pin_reason", "unknown")
raise HTTPException(400, f"Cannot discard pinned item (reason: {pin_reason})")
# Check if used by any run (Redis runs, not just activity store)
runs_using = find_runs_using_content(content_hash)
if runs_using:
run, role = runs_using[0]
raise HTTPException(400, f"Cannot discard: item is {role} of run {run.run_id}")
# Check deletion rules via cache_manager (L2 shared status, activity store)
can_delete, reason = cache_manager.can_delete(content_hash)
if not can_delete:
raise HTTPException(400, f"Cannot discard: {reason}")
# Delete via cache_manager
success, msg = cache_manager.delete_by_content_hash(content_hash)
if not success:
# Fallback to legacy deletion
cache_path = get_cache_path(content_hash)
if cache_path and cache_path.exists():
cache_path.unlink()
# Clean up legacy metadata files
meta_path = CACHE_DIR / f"{content_hash}.meta.json"
if meta_path.exists():
meta_path.unlink()
mp4_path = CACHE_DIR / f"{content_hash}.mp4"
if mp4_path.exists():
mp4_path.unlink()
return {"discarded": True, "content_hash": content_hash}
@app.delete("/ui/cache/{content_hash}/discard", response_class=HTMLResponse)
async def ui_discard_cache(content_hash: str, request: Request):
"""HTMX handler: discard a cached item."""
current_user = get_user_from_cookie(request)
if not current_user:
return '<div class="bg-red-900/50 border border-red-700 text-red-300 px-4 py-3 rounded-lg mb-4">Login required</div>'
# Check ownership
user_hashes = get_user_cache_hashes(current_user)
if content_hash not in user_hashes:
return '<div class="bg-red-900/50 border border-red-700 text-red-300 px-4 py-3 rounded-lg mb-4">Access denied</div>'
# Check if content exists
if not cache_manager.has_content(content_hash):
return '<div class="bg-red-900/50 border border-red-700 text-red-300 px-4 py-3 rounded-lg mb-4">Content not found</div>'
# Check if pinned (legacy metadata)
meta = load_cache_meta(content_hash)
if meta.get("pinned"):
pin_reason = meta.get("pin_reason", "unknown")
return f'<div class="bg-red-900/50 border border-red-700 text-red-300 px-4 py-3 rounded-lg mb-4">Cannot discard: item is pinned ({pin_reason})</div>'
# Check if used by any run (Redis runs, not just activity store)
runs_using = find_runs_using_content(content_hash)
if runs_using:
run, role = runs_using[0]
return f'<div class="bg-red-900/50 border border-red-700 text-red-300 px-4 py-3 rounded-lg mb-4">Cannot discard: item is {role} of run {run.run_id}</div>'
# Check deletion rules via cache_manager (L2 shared status, activity store)
can_delete, reason = cache_manager.can_delete(content_hash)
if not can_delete:
return f'<div class="bg-red-900/50 border border-red-700 text-red-300 px-4 py-3 rounded-lg mb-4">Cannot discard: {reason}</div>'
# Delete via cache_manager
success, msg = cache_manager.delete_by_content_hash(content_hash)
if not success:
# Fallback to legacy deletion
cache_path = get_cache_path(content_hash)
if cache_path and cache_path.exists():
cache_path.unlink()
# Clean up legacy metadata files
meta_path = CACHE_DIR / f"{content_hash}.meta.json"
if meta_path.exists():
meta_path.unlink()
mp4_path = CACHE_DIR / f"{content_hash}.mp4"
if mp4_path.exists():
mp4_path.unlink()
return '''
<div class="bg-green-900/50 border border-green-700 text-green-300 px-4 py-3 rounded-lg mb-4">
Item discarded. <a href="/cache" class="underline">Back to cache</a>
</div>
'''
# Known assets (bootstrap data)
KNOWN_ASSETS = {
"cat": "33268b6e167deaf018cc538de12dbe562612b33e89a749391cef855b320a269b",
}
@app.get("/assets")
async def list_assets():
"""List known assets."""
return KNOWN_ASSETS
@app.post("/cache/import")
async def import_to_cache(path: str):
"""Import a local file to cache."""
source = Path(path)
if not source.exists():
raise HTTPException(404, f"File not found: {path}")
content_hash = cache_file(source)
return {"content_hash": content_hash, "cached": True}
def save_cache_meta(content_hash: str, uploader: str = None, filename: str = None, **updates):
"""Save or update metadata for a cached file."""
meta_path = CACHE_DIR / f"{content_hash}.meta.json"
# Load existing or create new
if meta_path.exists():
with open(meta_path) as f:
meta = json.load(f)
else:
meta = {
"uploader": uploader,
"uploaded_at": datetime.now(timezone.utc).isoformat(),
"filename": filename
}
# Apply updates (but never change uploader or uploaded_at)
for key, value in updates.items():
if key not in ("uploader", "uploaded_at"):
meta[key] = value
with open(meta_path, "w") as f:
json.dump(meta, f, indent=2)
return meta
def load_cache_meta(content_hash: str) -> dict:
"""Load metadata for a cached file."""
meta_path = CACHE_DIR / f"{content_hash}.meta.json"
if meta_path.exists():
with open(meta_path) as f:
return json.load(f)
return {}
# User data storage (folders, collections)
USER_DATA_DIR = CACHE_DIR / ".user-data"
def load_user_data(username: str) -> dict:
"""Load user's folders and collections."""
USER_DATA_DIR.mkdir(parents=True, exist_ok=True)
# Normalize username (remove @ prefix if present)
safe_name = username.replace("@", "").replace("/", "_")
user_file = USER_DATA_DIR / f"{safe_name}.json"
if user_file.exists():
with open(user_file) as f:
return json.load(f)
return {"folders": ["/"], "collections": []}
def save_user_data(username: str, data: dict):
"""Save user's folders and collections."""
USER_DATA_DIR.mkdir(parents=True, exist_ok=True)
safe_name = username.replace("@", "").replace("/", "_")
user_file = USER_DATA_DIR / f"{safe_name}.json"
with open(user_file, "w") as f:
json.dump(data, f, indent=2)
def get_user_cache_hashes(username: str) -> set:
"""Get all cache hashes owned by or associated with a user."""
actor_id = f"@{username}@{L2_DOMAIN}"
hashes = set()
# Files uploaded by user
if CACHE_DIR.exists():
for f in CACHE_DIR.iterdir():
if f.name.endswith('.meta.json'):
meta = load_cache_meta(f.name.replace('.meta.json', ''))
if meta.get("uploader") in (username, actor_id):
hashes.add(f.name.replace('.meta.json', ''))
# Files from user's runs (inputs and outputs)
for run in list_all_runs():
if run.username in (username, actor_id):
hashes.update(run.inputs)
if run.output_hash:
hashes.add(run.output_hash)
return hashes
@app.post("/cache/upload")
async def upload_to_cache(file: UploadFile = File(...), username: str = Depends(get_required_user)):
"""Upload a file to cache. Requires authentication."""
# Write to temp file first
import tempfile
with tempfile.NamedTemporaryFile(delete=False) as tmp:
content = await file.read()
tmp.write(content)
tmp_path = Path(tmp.name)
# Store in cache via cache_manager
cached = cache_manager.put(tmp_path, node_type="upload", move=True)
content_hash = cached.content_hash
# Save uploader metadata
actor_id = f"@{username}@{L2_DOMAIN}"
save_cache_meta(content_hash, actor_id, file.filename)
return {"content_hash": content_hash, "filename": file.filename, "size": len(content)}
class CacheMetaUpdate(BaseModel):
"""Request to update cache metadata."""
origin: Optional[dict] = None # {"type": "self"|"external", "url": "...", "note": "..."}
description: Optional[str] = None
tags: Optional[list[str]] = None
folder: Optional[str] = None
collections: Optional[list[str]] = None
class PublishRequest(BaseModel):
"""Request to publish a cache item to L2."""
asset_name: str
asset_type: str = "image" # image, video, etc.
@app.get("/cache/{content_hash}/meta")
async def get_cache_meta(content_hash: str, username: str = Depends(get_required_user)):
"""Get metadata for a cached file."""
# Check file exists
cache_path = get_cache_path(content_hash)
if not cache_path:
raise HTTPException(404, "Content not found")
# Check ownership
user_hashes = get_user_cache_hashes(username)
if content_hash not in user_hashes:
raise HTTPException(403, "Access denied")
return load_cache_meta(content_hash)
@app.patch("/cache/{content_hash}/meta")
async def update_cache_meta(content_hash: str, update: CacheMetaUpdate, username: str = Depends(get_required_user)):
"""Update metadata for a cached file."""
# Check file exists
cache_path = get_cache_path(content_hash)
if not cache_path:
raise HTTPException(404, "Content not found")
# Check ownership
user_hashes = get_user_cache_hashes(username)
if content_hash not in user_hashes:
raise HTTPException(403, "Access denied")
# Build update dict from non-None fields
updates = {}
if update.origin is not None:
updates["origin"] = update.origin
if update.description is not None:
updates["description"] = update.description
if update.tags is not None:
updates["tags"] = update.tags
if update.folder is not None:
# Ensure folder exists in user's folder list
user_data = load_user_data(username)
if update.folder not in user_data["folders"]:
raise HTTPException(400, f"Folder does not exist: {update.folder}")
updates["folder"] = update.folder
if update.collections is not None:
# Validate collections exist
user_data = load_user_data(username)
existing = {c["name"] for c in user_data["collections"]}
for col in update.collections:
if col not in existing:
raise HTTPException(400, f"Collection does not exist: {col}")
updates["collections"] = update.collections
meta = save_cache_meta(content_hash, **updates)
return meta
@app.post("/cache/{content_hash}/publish")
async def publish_cache_to_l2(
content_hash: str,
req: PublishRequest,
request: Request,
username: str = Depends(get_required_user)
):
"""
Publish a cache item to L2 (ActivityPub).
Requires origin to be set in metadata before publishing.
"""
# Check file exists
cache_path = get_cache_path(content_hash)
if not cache_path:
raise HTTPException(404, "Content not found")
# Check ownership
user_hashes = get_user_cache_hashes(username)
if content_hash not in user_hashes:
raise HTTPException(403, "Access denied")
# Load metadata
meta = load_cache_meta(content_hash)
# Check origin is set
origin = meta.get("origin")
if not origin or "type" not in origin:
raise HTTPException(400, "Origin must be set before publishing. Use --origin self or --origin-url <url>")
# Get auth token to pass to L2
token = request.cookies.get("auth_token")
if not token:
# Try from header
auth_header = request.headers.get("Authorization", "")
if auth_header.startswith("Bearer "):
token = auth_header[7:]
if not token:
raise HTTPException(401, "Authentication token required")
# Call L2 publish-cache endpoint
try:
resp = http_requests.post(
f"{L2_SERVER}/registry/publish-cache",
headers={"Authorization": f"Bearer {token}"},
json={
"content_hash": content_hash,
"asset_name": req.asset_name,
"asset_type": req.asset_type,
"origin": origin,
"description": meta.get("description"),
"tags": meta.get("tags", []),
"metadata": {
"filename": meta.get("filename"),
"folder": meta.get("folder"),
"collections": meta.get("collections", [])
}
},
timeout=10
)
resp.raise_for_status()
l2_result = resp.json()
except http_requests.exceptions.HTTPError as e:
error_detail = ""
try:
error_detail = e.response.json().get("detail", str(e))
except Exception:
error_detail = str(e)
raise HTTPException(400, f"L2 publish failed: {error_detail}")
except Exception as e:
raise HTTPException(500, f"L2 publish failed: {e}")
# Update local metadata with publish status and pin
publish_info = {
"to_l2": True,
"asset_name": req.asset_name,
"published_at": datetime.now(timezone.utc).isoformat(),
"last_synced_at": datetime.now(timezone.utc).isoformat()
}
save_cache_meta(content_hash, published=publish_info, pinned=True, pin_reason="published")
return {
"published": True,
"asset_name": req.asset_name,
"l2_result": l2_result
}
@app.patch("/cache/{content_hash}/republish")
async def republish_cache_to_l2(
content_hash: str,
request: Request,
username: str = Depends(get_required_user)
):
"""
Re-publish (update) a cache item on L2 after metadata changes.
Only works for already-published items.
"""
# Check file exists
cache_path = get_cache_path(content_hash)
if not cache_path:
raise HTTPException(404, "Content not found")
# Check ownership
user_hashes = get_user_cache_hashes(username)
if content_hash not in user_hashes:
raise HTTPException(403, "Access denied")
# Load metadata
meta = load_cache_meta(content_hash)
# Check already published
published = meta.get("published", {})
if not published.get("to_l2"):
raise HTTPException(400, "Item not published yet. Use publish first.")
asset_name = published.get("asset_name")
if not asset_name:
raise HTTPException(400, "No asset name found in publish info")
# Get auth token
token = request.cookies.get("auth_token")
if not token:
auth_header = request.headers.get("Authorization", "")
if auth_header.startswith("Bearer "):
token = auth_header[7:]
if not token:
raise HTTPException(401, "Authentication token required")
# Call L2 update endpoint
try:
resp = http_requests.patch(
f"{L2_SERVER}/registry/{asset_name}",
headers={"Authorization": f"Bearer {token}"},
json={
"description": meta.get("description"),
"tags": meta.get("tags"),
"origin": meta.get("origin"),
"metadata": {
"filename": meta.get("filename"),
"folder": meta.get("folder"),
"collections": meta.get("collections", [])
}
},
timeout=10
)
resp.raise_for_status()
l2_result = resp.json()
except http_requests.exceptions.HTTPError as e:
error_detail = ""
try:
error_detail = e.response.json().get("detail", str(e))
except Exception:
error_detail = str(e)
raise HTTPException(400, f"L2 update failed: {error_detail}")
except Exception as e:
raise HTTPException(500, f"L2 update failed: {e}")
# Update local metadata
published["last_synced_at"] = datetime.now(timezone.utc).isoformat()
save_cache_meta(content_hash, published=published)
return {
"updated": True,
"asset_name": asset_name,
"l2_result": l2_result
}
# ============ Folder & Collection Management ============
@app.get("/user/folders")
async def list_folders(username: str = Depends(get_required_user)):
"""List user's folders."""
user_data = load_user_data(username)
return {"folders": user_data["folders"]}
@app.post("/user/folders")
async def create_folder(folder_path: str, username: str = Depends(get_required_user)):
"""Create a new folder."""
user_data = load_user_data(username)
# Validate path format
if not folder_path.startswith("/"):
raise HTTPException(400, "Folder path must start with /")
# Check parent exists
parent = "/".join(folder_path.rsplit("/", 1)[:-1]) or "/"
if parent != "/" and parent not in user_data["folders"]:
raise HTTPException(400, f"Parent folder does not exist: {parent}")
# Check doesn't already exist
if folder_path in user_data["folders"]:
raise HTTPException(400, f"Folder already exists: {folder_path}")
user_data["folders"].append(folder_path)
user_data["folders"].sort()
save_user_data(username, user_data)
return {"folder": folder_path, "created": True}
@app.delete("/user/folders")
async def delete_folder(folder_path: str, username: str = Depends(get_required_user)):
"""Delete a folder (must be empty)."""
if folder_path == "/":
raise HTTPException(400, "Cannot delete root folder")
user_data = load_user_data(username)
if folder_path not in user_data["folders"]:
raise HTTPException(404, "Folder not found")
# Check no subfolders
for f in user_data["folders"]:
if f.startswith(folder_path + "/"):
raise HTTPException(400, f"Folder has subfolders: {f}")
# Check no items in folder
user_hashes = get_user_cache_hashes(username)
for h in user_hashes:
meta = load_cache_meta(h)
if meta.get("folder") == folder_path:
raise HTTPException(400, "Folder is not empty")
user_data["folders"].remove(folder_path)
save_user_data(username, user_data)
return {"folder": folder_path, "deleted": True}
@app.get("/user/collections")
async def list_collections(username: str = Depends(get_required_user)):
"""List user's collections."""
user_data = load_user_data(username)
return {"collections": user_data["collections"]}
@app.post("/user/collections")
async def create_collection(name: str, username: str = Depends(get_required_user)):
"""Create a new collection."""
user_data = load_user_data(username)
# Check doesn't already exist
for col in user_data["collections"]:
if col["name"] == name:
raise HTTPException(400, f"Collection already exists: {name}")
user_data["collections"].append({
"name": name,
"created_at": datetime.now(timezone.utc).isoformat()
})
save_user_data(username, user_data)
return {"collection": name, "created": True}
@app.delete("/user/collections")
async def delete_collection(name: str, username: str = Depends(get_required_user)):
"""Delete a collection."""
user_data = load_user_data(username)
# Find and remove
for i, col in enumerate(user_data["collections"]):
if col["name"] == name:
user_data["collections"].pop(i)
save_user_data(username, user_data)
# Remove from all cache items
user_hashes = get_user_cache_hashes(username)
for h in user_hashes:
meta = load_cache_meta(h)
if name in meta.get("collections", []):
meta["collections"].remove(name)
save_cache_meta(h, **{k: v for k, v in meta.items() if k not in ("uploader", "uploaded_at")})
return {"collection": name, "deleted": True}
raise HTTPException(404, "Collection not found")
def is_ios_request(request: Request) -> bool:
"""Check if request is from iOS device."""
ua = request.headers.get("user-agent", "").lower()
return "iphone" in ua or "ipad" in ua
def video_src_for_request(content_hash: str, request: Request) -> str:
"""Get video src URL, using MP4 endpoint for iOS."""
if is_ios_request(request):
return f"/cache/{content_hash}/mp4"
return f"/cache/{content_hash}"
def detect_media_type(cache_path: Path) -> str:
"""Detect if file is image or video based on magic bytes."""
with open(cache_path, "rb") as f:
header = f.read(32)
# Video signatures
if header[:4] == b'\x1a\x45\xdf\xa3': # WebM/MKV
return "video"
if header[4:8] == b'ftyp': # MP4/MOV
return "video"
if header[:4] == b'RIFF' and header[8:12] == b'AVI ': # AVI
return "video"
# Image signatures
if header[:8] == b'\x89PNG\r\n\x1a\n': # PNG
return "image"
if header[:2] == b'\xff\xd8': # JPEG
return "image"
if header[:6] in (b'GIF87a', b'GIF89a'): # GIF
return "image"
if header[:4] == b'RIFF' and header[8:12] == b'WEBP': # WebP
return "image"
return "unknown"
def get_user_from_cookie(request) -> Optional[str]:
"""Get username from auth cookie."""
token = request.cookies.get("auth_token")
if not token:
return None
return verify_token_with_l2(token)
def wants_html(request: Request) -> bool:
"""Check if request wants HTML (browser) vs JSON (API)."""
accept = request.headers.get("accept", "")
return "text/html" in accept and "application/json" not in accept
# Tailwind CSS config for all L1 templates
TAILWIND_CONFIG = '''
<script src="https://cdn.tailwindcss.com"></script>
<script>
tailwind.config = {
darkMode: 'class',
theme: {
extend: {
colors: {
dark: { 900: '#0a0a0a', 800: '#111', 700: '#1a1a1a', 600: '#222', 500: '#333' }
}
}
}
}
</script>
<script src="https://unpkg.com/htmx.org@1.9.10"></script>
'''
def render_page(title: str, content: str, username: Optional[str] = None, active_tab: str = None) -> str:
"""Render a page with nav bar and content. Used for clean URL pages."""
user_info = ""
if username:
user_info = f'''
<div class="flex items-center gap-4 text-sm text-gray-400">
Logged in as <strong class="text-white">{username}</strong>
<a href="/logout" class="text-blue-400 hover:text-blue-300">Logout</a>
</div>
'''
else:
user_info = '''
<div class="text-sm">
<a href="/login" class="text-blue-400 hover:text-blue-300">Login</a>
</div>
'''
runs_active = "border-b-2 border-blue-500 text-white" if active_tab == "runs" else "text-gray-400 hover:text-white"
cache_active = "border-b-2 border-blue-500 text-white" if active_tab == "cache" else "text-gray-400 hover:text-white"
return f"""
<!DOCTYPE html>
<html class="dark">
<head>
<meta charset="UTF-8">
<meta name="viewport" content="width=device-width, initial-scale=1.0">
<title>{title} | Art DAG L1 Server</title>
{TAILWIND_CONFIG}
</head>
<body class="bg-dark-900 text-gray-100 min-h-screen">
<div class="max-w-6xl mx-auto px-4 py-6 sm:px-6 lg:px-8">
<header class="flex flex-wrap items-center justify-between gap-4 mb-6">
<h1 class="text-2xl font-bold">
<a href="/" class="text-white hover:text-gray-200">Art DAG L1 Server</a>
</h1>
{user_info}
</header>
<nav class="flex gap-6 mb-6 border-b border-dark-500 pb-0">
<a href="/runs" class="pb-3 px-1 font-medium transition-colors {runs_active}">Runs</a>
<a href="/cache" class="pb-3 px-1 font-medium transition-colors {cache_active}">Cache</a>
</nav>
<main>
{content}
</main>
</div>
</body>
</html>
"""
def render_ui_html(username: Optional[str] = None, tab: str = "runs") -> str:
"""Render main UI HTML with optional user context."""
user_info = ""
if username:
user_info = f'''
<div class="flex items-center gap-4 text-sm text-gray-400">
Logged in as <strong class="text-white">{username}</strong>
<a href="/ui/logout" class="text-blue-400 hover:text-blue-300">Logout</a>
</div>
'''
else:
user_info = '''
<div class="text-sm">
<a href="/ui/login" class="text-blue-400 hover:text-blue-300">Login</a>
</div>
'''
runs_active = "border-b-2 border-blue-500 text-white" if tab == "runs" else "text-gray-400 hover:text-white"
cache_active = "border-b-2 border-blue-500 text-white" if tab == "cache" else "text-gray-400 hover:text-white"
content_url = "/ui/runs" if tab == "runs" else "/ui/cache-list"
return f"""
<!DOCTYPE html>
<html class="dark">
<head>
<meta charset="UTF-8">
<meta name="viewport" content="width=device-width, initial-scale=1.0">
<title>Art DAG L1 Server</title>
{TAILWIND_CONFIG}
</head>
<body class="bg-dark-900 text-gray-100 min-h-screen">
<div class="max-w-6xl mx-auto px-4 py-6 sm:px-6 lg:px-8">
<header class="flex flex-wrap items-center justify-between gap-4 mb-6">
<h1 class="text-2xl font-bold">
<a href="/ui" class="text-white hover:text-gray-200">Art DAG L1 Server</a>
</h1>
{user_info}
</header>
<nav class="flex gap-6 mb-6 border-b border-dark-500 pb-0">
<a href="/ui" class="pb-3 px-1 font-medium transition-colors {runs_active}">Runs</a>
<a href="/ui?tab=cache" class="pb-3 px-1 font-medium transition-colors {cache_active}">Cache</a>
</nav>
<div id="content" hx-get="{content_url}" hx-trigger="load" hx-swap="innerHTML">
<div class="flex items-center justify-center py-12">
<div class="animate-pulse text-gray-400">Loading...</div>
</div>
</div>
</div>
</body>
</html>
"""
def get_auth_page_html(page_type: str = "login", error: str = None) -> str:
"""Generate login or register page HTML with Tailwind CSS."""
is_login = page_type == "login"
title = "Login" if is_login else "Register"
login_active = "bg-blue-600 text-white" if is_login else "bg-dark-500 text-gray-400 hover:text-white"
register_active = "bg-dark-500 text-gray-400 hover:text-white" if is_login else "bg-blue-600 text-white"
error_html = f'<div class="bg-red-900/50 border border-red-700 text-red-300 px-4 py-3 rounded-lg mb-4">{error}</div>' if error else ''
form_fields = '''
<input type="text" name="username" placeholder="Username" required
class="w-full px-4 py-3 bg-dark-600 border border-dark-500 rounded-lg text-white placeholder-gray-500 focus:border-blue-500 focus:outline-none focus:ring-1 focus:ring-blue-500">
<input type="password" name="password" placeholder="Password" required
class="w-full px-4 py-3 bg-dark-600 border border-dark-500 rounded-lg text-white placeholder-gray-500 focus:border-blue-500 focus:outline-none focus:ring-1 focus:ring-blue-500">
'''
if not is_login:
form_fields += '''
<input type="email" name="email" placeholder="Email (optional)"
class="w-full px-4 py-3 bg-dark-600 border border-dark-500 rounded-lg text-white placeholder-gray-500 focus:border-blue-500 focus:outline-none focus:ring-1 focus:ring-blue-500">
'''
return f"""
<!DOCTYPE html>
<html class="dark">
<head>
<meta charset="UTF-8">
<meta name="viewport" content="width=device-width, initial-scale=1.0">
<title>{title} | Art DAG L1 Server</title>
{TAILWIND_CONFIG}
</head>
<body class="bg-dark-900 text-gray-100 min-h-screen">
<div class="max-w-md mx-auto px-4 py-8 sm:px-6">
<h1 class="text-2xl font-bold mb-6">
<a href="/" class="text-white hover:text-gray-200">Art DAG L1 Server</a>
</h1>
<a href="/runs" class="inline-flex items-center text-blue-400 hover:text-blue-300 mb-6">
<svg class="w-4 h-4 mr-1" fill="none" stroke="currentColor" viewBox="0 0 24 24">
<path stroke-linecap="round" stroke-linejoin="round" stroke-width="2" d="M15 19l-7-7 7-7"/>
</svg>
Back
</a>
<div class="bg-dark-700 rounded-lg p-6">
<div class="flex gap-2 mb-6">
<a href="/login" class="px-4 py-2 rounded-lg font-medium transition-colors {login_active}">Login</a>
<a href="/register" class="px-4 py-2 rounded-lg font-medium transition-colors {register_active}">Register</a>
</div>
{error_html}
<form method="POST" action="/{page_type}" class="space-y-4">
{form_fields}
<button type="submit"
class="w-full px-4 py-3 bg-blue-600 hover:bg-blue-700 text-white font-medium rounded-lg transition-colors">
{title}
</button>
</form>
</div>
</div>
</body>
</html>
"""
UI_LOGIN_HTML = get_auth_page_html("login")
UI_REGISTER_HTML = get_auth_page_html("register")
# Clean URL auth routes
@app.get("/login", response_class=HTMLResponse)
async def login_page():
"""Login page (clean URL)."""
return UI_LOGIN_HTML
@app.post("/login")
async def login(username: str = Form(...), password: str = Form(...)):
"""Process login form (clean URL)."""
try:
resp = http_requests.post(
f"{L2_SERVER}/auth/login",
json={"username": username, "password": password},
timeout=5
)
if resp.status_code == 200:
token = resp.json().get("access_token")
response = RedirectResponse(url="/runs", status_code=303)
response.set_cookie("auth_token", token, httponly=True, max_age=30*24*60*60)
return response
except Exception:
pass
return HTMLResponse(get_auth_page_html("login", "Invalid username or password"))
@app.get("/register", response_class=HTMLResponse)
async def register_page():
"""Register page (clean URL)."""
return UI_REGISTER_HTML
@app.post("/register")
async def register(
username: str = Form(...),
password: str = Form(...),
email: str = Form(None)
):
"""Process registration form (clean URL)."""
try:
resp = http_requests.post(
f"{L2_SERVER}/auth/register",
json={"username": username, "password": password, "email": email},
timeout=5
)
if resp.status_code == 200:
token = resp.json().get("access_token")
response = RedirectResponse(url="/runs", status_code=303)
response.set_cookie("auth_token", token, httponly=True, max_age=30*24*60*60)
return response
elif resp.status_code == 400:
error = resp.json().get("detail", "Registration failed")
return HTMLResponse(get_auth_page_html("register", error))
except Exception as e:
return HTMLResponse(get_auth_page_html("register", f"Registration failed: {e}"))
@app.get("/logout")
async def logout():
"""Logout - clear cookie (clean URL)."""
response = RedirectResponse(url="/runs", status_code=303)
response.delete_cookie("auth_token")
return response
@app.get("/ui")
async def ui_index(tab: str = "runs"):
"""Redirect /ui to clean URLs."""
if tab == "cache":
return RedirectResponse(url="/cache", status_code=302)
return RedirectResponse(url="/runs", status_code=302)
@app.get("/ui/login")
async def ui_login_page():
"""Redirect to clean URL."""
return RedirectResponse(url="/login", status_code=302)
@app.post("/ui/login")
async def ui_login(username: str = Form(...), password: str = Form(...)):
"""Redirect POST to clean URL handler."""
return await login(username, password)
@app.get("/ui/register")
async def ui_register_page():
"""Redirect to clean URL."""
return RedirectResponse(url="/register", status_code=302)
@app.post("/ui/register")
async def ui_register(
username: str = Form(...),
password: str = Form(...),
email: str = Form(None)
):
"""Redirect POST to clean URL handler."""
return await register(username, password, email)
@app.get("/ui/logout")
async def ui_logout():
"""Redirect to clean URL."""
return RedirectResponse(url="/logout", status_code=302)
@app.post("/ui/publish-run/{run_id}", response_class=HTMLResponse)
async def ui_publish_run(run_id: str, request: Request, output_name: str = Form(...)):
"""Publish a run to L2 from the web UI."""
token = request.cookies.get("auth_token")
if not token:
return HTMLResponse('<div class="error">Not logged in</div>')
# Get the run to pin its output and inputs
run = load_run(run_id)
if not run:
return HTMLResponse('<div class="bg-red-900/50 border border-red-700 text-red-300 px-4 py-3 rounded-lg">Run not found</div>')
# Call L2 to publish the run, including this L1's public URL
# Longer timeout because L2 calls back to L1 to fetch run details
try:
resp = http_requests.post(
f"{L2_SERVER}/registry/record-run",
json={"run_id": run_id, "output_name": output_name, "l1_server": L1_PUBLIC_URL},
headers={"Authorization": f"Bearer {token}"},
timeout=30
)
if resp.status_code == 400:
error = resp.json().get("detail", "Bad request")
return HTMLResponse(f'<div class="error">Error: {error}</div>')
resp.raise_for_status()
result = resp.json()
# Pin the output
if run.output_hash:
save_cache_meta(run.output_hash, pinned=True, pin_reason="published")
# Pin the inputs (for provenance)
for input_hash in run.inputs:
save_cache_meta(input_hash, pinned=True, pin_reason="input_to_published")
return HTMLResponse(f'''
<div class="bg-green-900/50 border border-green-700 text-green-300 px-4 py-3 rounded-lg mb-4">
Published to L2 as <strong>{result["asset"]["name"]}</strong>
</div>
''')
except http_requests.exceptions.HTTPError as e:
error_detail = ""
try:
error_detail = e.response.json().get("detail", str(e))
except Exception:
error_detail = str(e)
return HTMLResponse(f'<div class="bg-red-900/50 border border-red-700 text-red-300 px-4 py-3 rounded-lg">Error: {error_detail}</div>')
except Exception as e:
return HTMLResponse(f'<div class="bg-red-900/50 border border-red-700 text-red-300 px-4 py-3 rounded-lg">Error: {e}</div>')
@app.get("/ui/runs", response_class=HTMLResponse)
async def ui_runs(request: Request):
"""HTMX partial: list of runs."""
current_user = get_user_from_cookie(request)
runs = list_all_runs()
# Require login to see runs
if not current_user:
return '<p class="text-gray-400 py-8 text-center"><a href="/ui/login" class="text-blue-400 hover:text-blue-300">Login</a> to see your runs.</p>'
# Filter runs by user - match both plain username and ActivityPub format (@user@domain)
actor_id = f"@{current_user}@{L2_DOMAIN}"
runs = [r for r in runs if r.username in (current_user, actor_id)]
if not runs:
return '<p class="text-gray-400 py-8 text-center">You have no runs yet. Use the CLI to start a run.</p>'
# Status badge colors
status_colors = {
"completed": "bg-green-600 text-white",
"running": "bg-yellow-600 text-white",
"failed": "bg-red-600 text-white",
"pending": "bg-gray-600 text-white"
}
html_parts = ['<div class="space-y-4">']
for run in runs[:20]: # Limit to 20 most recent
status_badge = status_colors.get(run.status, "bg-gray-600 text-white")
html_parts.append(f'''
<a href="/ui/detail/{run.run_id}" class="block">
<div class="bg-dark-700 rounded-lg p-4 hover:bg-dark-600 transition-colors" hx-get="/ui/run/{run.run_id}" hx-trigger="every 2s[classList.contains('status-running')]" hx-swap="outerHTML">
<div class="flex flex-wrap items-center justify-between gap-3 mb-3">
<div class="flex items-center gap-3">
<span class="px-3 py-1 bg-blue-600 text-white text-sm font-medium rounded-full">{run.recipe}</span>
<span class="text-gray-400 font-mono text-xs hidden sm:inline">{run.run_id[:16]}...</span>
</div>
<span class="px-3 py-1 {status_badge} text-xs font-medium rounded-full">{run.status}</span>
</div>
<div class="text-sm text-gray-400 mb-3">
Created: {run.created_at[:19].replace('T', ' ')}
</div>
''')
# Show input and output side by side
has_input = run.inputs and cache_manager.has_content(run.inputs[0])
has_output = run.status == "completed" and run.output_hash and cache_manager.has_content(run.output_hash)
if has_input or has_output:
html_parts.append('<div class="grid gap-4 sm:grid-cols-2">')
# Input box
if has_input:
input_hash = run.inputs[0]
input_media_type = detect_media_type(get_cache_path(input_hash))
html_parts.append(f'''
<div class="bg-dark-600 rounded-lg p-3">
<div class="text-xs text-gray-400 mb-2">Input: {input_hash[:16]}...</div>
<div class="flex justify-center">
''')
if input_media_type == "video":
input_video_src = video_src_for_request(input_hash, request)
html_parts.append(f'<video src="{input_video_src}" controls muted loop playsinline class="max-h-32 rounded"></video>')
elif input_media_type == "image":
html_parts.append(f'<img src="/cache/{input_hash}" alt="input" class="max-h-32 rounded">')
html_parts.append('</div></div>')
# Output box
if has_output:
output_hash = run.output_hash
output_media_type = detect_media_type(get_cache_path(output_hash))
html_parts.append(f'''
<div class="bg-dark-600 rounded-lg p-3">
<div class="text-xs text-gray-400 mb-2">Output: {output_hash[:16]}...</div>
<div class="flex justify-center">
''')
if output_media_type == "video":
output_video_src = video_src_for_request(output_hash, request)
html_parts.append(f'<video src="{output_video_src}" controls autoplay muted loop playsinline class="max-h-32 rounded"></video>')
elif output_media_type == "image":
html_parts.append(f'<img src="/cache/{output_hash}" alt="output" class="max-h-32 rounded">')
html_parts.append('</div></div>')
html_parts.append('</div>')
# Show error if failed
if run.status == "failed" and run.error:
html_parts.append(f'<div class="mt-3 text-sm text-red-400">Error: {run.error}</div>')
html_parts.append('</div></a>')
html_parts.append('</div>')
return '\n'.join(html_parts)
@app.get("/ui/cache-list", response_class=HTMLResponse)
async def ui_cache_list(
request: Request,
folder: Optional[str] = None,
collection: Optional[str] = None,
tag: Optional[str] = None
):
"""HTMX partial: list of cached items with optional filtering."""
current_user = get_user_from_cookie(request)
# Require login to see cache
if not current_user:
return '<p class="text-gray-400 py-8 text-center"><a href="/ui/login" class="text-blue-400 hover:text-blue-300">Login</a> to see cached content.</p>'
# Get hashes owned by/associated with this user
user_hashes = get_user_cache_hashes(current_user)
# Get cache items that belong to the user
cache_items = []
if CACHE_DIR.exists():
for f in CACHE_DIR.iterdir():
if f.is_file() and not f.name.endswith('.provenance.json') and not f.name.endswith('.meta.json') and not f.name.endswith('.mp4'):
if f.name in user_hashes:
# Load metadata for filtering
meta = load_cache_meta(f.name)
# Apply folder filter
if folder:
item_folder = meta.get("folder", "/")
if folder != "/" and not item_folder.startswith(folder):
continue
if folder == "/" and item_folder != "/":
continue
# Apply collection filter
if collection:
if collection not in meta.get("collections", []):
continue
# Apply tag filter
if tag:
if tag not in meta.get("tags", []):
continue
stat = f.stat()
cache_items.append({
"hash": f.name,
"size": stat.st_size,
"mtime": stat.st_mtime,
"meta": meta
})
# Sort by modification time (newest first)
cache_items.sort(key=lambda x: x["mtime"], reverse=True)
if not cache_items:
filter_msg = ""
if folder:
filter_msg = f" in folder {folder}"
elif collection:
filter_msg = f" in collection '{collection}'"
elif tag:
filter_msg = f" with tag '{tag}'"
return f'<p class="text-gray-400 py-8 text-center">No cached files{filter_msg}. Upload files or run effects to see them here.</p>'
html_parts = ['<div class="grid gap-4 sm:grid-cols-2 lg:grid-cols-3">']
for item in cache_items[:50]: # Limit to 50 items
content_hash = item["hash"]
cache_path = get_cache_path(content_hash)
media_type = detect_media_type(cache_path) if cache_path else "unknown"
# Format size
size = item["size"]
if size > 1024*1024:
size_str = f"{size/(1024*1024):.1f} MB"
elif size > 1024:
size_str = f"{size/1024:.1f} KB"
else:
size_str = f"{size} bytes"
html_parts.append(f'''
<a href="/ui/cache/{content_hash}" class="block">
<div class="bg-dark-700 rounded-lg p-4 hover:bg-dark-600 transition-colors">
<div class="flex items-center justify-between gap-2 mb-3">
<span class="px-2 py-1 bg-blue-600 text-white text-xs font-medium rounded-full">{media_type}</span>
<span class="text-xs text-gray-400">{size_str}</span>
</div>
<div class="text-xs text-gray-400 font-mono mb-3 truncate">{content_hash[:24]}...</div>
<div class="flex justify-center bg-dark-600 rounded-lg p-2">
''')
if media_type == "video":
video_src = video_src_for_request(content_hash, request)
html_parts.append(f'<video src="{video_src}" controls muted loop playsinline class="max-h-32 rounded"></video>')
elif media_type == "image":
html_parts.append(f'<img src="/cache/{content_hash}" alt="{content_hash[:16]}" class="max-h-32 rounded object-contain">')
else:
html_parts.append('<p class="text-gray-400 text-sm py-4">Unknown file type</p>')
html_parts.append('''
</div>
</div>
</a>
''')
html_parts.append('</div>')
return '\n'.join(html_parts)
@app.get("/ui/detail/{run_id}")
async def ui_detail_page(run_id: str):
"""Redirect to clean URL."""
return RedirectResponse(url=f"/run/{run_id}", status_code=302)
@app.get("/ui/run/{run_id}", response_class=HTMLResponse)
async def ui_run_partial(run_id: str, request: Request):
"""HTMX partial: single run (for polling updates)."""
run = load_run(run_id)
if not run:
return '<div class="bg-dark-700 rounded-lg p-4 text-gray-400">Run not found</div>'
# Check Celery task status if running
if run.status == "running" and run.celery_task_id:
task = celery_app.AsyncResult(run.celery_task_id)
if task.ready():
if task.successful():
result = task.result
run.status = "completed"
run.completed_at = datetime.now(timezone.utc).isoformat()
run.output_hash = result.get("output", {}).get("content_hash")
# Extract effects info from provenance
effects = result.get("effects", [])
if effects:
run.effects_commit = effects[0].get("repo_commit")
run.effect_url = effects[0].get("repo_url")
# Extract infrastructure info
run.infrastructure = result.get("infrastructure")
output_path = Path(result.get("output", {}).get("local_path", ""))
if output_path.exists():
cache_file(output_path)
else:
run.status = "failed"
run.error = str(task.result)
save_run(run)
# Status badge colors
status_colors = {
"completed": "bg-green-600 text-white",
"running": "bg-yellow-600 text-white",
"failed": "bg-red-600 text-white",
"pending": "bg-gray-600 text-white"
}
status_badge = status_colors.get(run.status, "bg-gray-600 text-white")
poll_attr = f'hx-get="/ui/run/{run_id}" hx-trigger="every 2s" hx-swap="outerHTML"' if run.status == "running" else ""
html = f'''
<a href="/ui/detail/{run.run_id}" class="block">
<div class="bg-dark-700 rounded-lg p-4 hover:bg-dark-600 transition-colors" {poll_attr}>
<div class="flex flex-wrap items-center justify-between gap-3 mb-3">
<div class="flex items-center gap-3">
<span class="px-3 py-1 bg-blue-600 text-white text-sm font-medium rounded-full">{run.recipe}</span>
<span class="text-gray-400 font-mono text-xs hidden sm:inline">{run.run_id[:16]}...</span>
</div>
<span class="px-3 py-1 {status_badge} text-xs font-medium rounded-full">{run.status}</span>
</div>
<div class="text-sm text-gray-400 mb-3">
Created: {run.created_at[:19].replace('T', ' ')}
</div>
'''
# Show input and output side by side
has_input = run.inputs and cache_manager.has_content(run.inputs[0])
has_output = run.status == "completed" and run.output_hash and cache_manager.has_content(run.output_hash)
if has_input or has_output:
html += '<div class="grid gap-4 sm:grid-cols-2">'
if has_input:
input_hash = run.inputs[0]
input_media_type = detect_media_type(get_cache_path(input_hash))
html += f'''
<div class="bg-dark-600 rounded-lg p-3">
<div class="text-xs text-gray-400 mb-2">Input: {input_hash[:16]}...</div>
<div class="flex justify-center">
'''
if input_media_type == "video":
input_video_src = video_src_for_request(input_hash, request)
html += f'<video src="{input_video_src}" controls muted loop playsinline class="max-h-32 rounded"></video>'
elif input_media_type == "image":
html += f'<img src="/cache/{input_hash}" alt="input" class="max-h-32 rounded">'
html += '</div></div>'
if has_output:
output_hash = run.output_hash
output_media_type = detect_media_type(get_cache_path(output_hash))
html += f'''
<div class="bg-dark-600 rounded-lg p-3">
<div class="text-xs text-gray-400 mb-2">Output: {output_hash[:16]}...</div>
<div class="flex justify-center">
'''
if output_media_type == "video":
output_video_src = video_src_for_request(output_hash, request)
html += f'<video src="{output_video_src}" controls autoplay muted loop playsinline class="max-h-32 rounded"></video>'
elif output_media_type == "image":
html += f'<img src="/cache/{output_hash}" alt="output" class="max-h-32 rounded">'
html += '</div></div>'
html += '</div>'
if run.status == "failed" and run.error:
html += f'<div class="mt-3 text-sm text-red-400">Error: {run.error}</div>'
html += '</div></a>'
return html
if __name__ == "__main__":
import uvicorn
uvicorn.run(app, host="0.0.0.0", port=8100)