#!/usr/bin/env python3
"""
Art DAG L1 Server
Manages rendering runs and provides access to the cache.
- POST /runs - start a run (recipe + inputs)
- GET /runs/{run_id} - get run status/result
- GET /cache/{content_hash} - get cached content
"""
import hashlib
import json
import os
import uuid
from datetime import datetime, timezone
from pathlib import Path
from typing import Optional
from fastapi import FastAPI, HTTPException, UploadFile, File, Depends, Form, Request
from fastapi.responses import FileResponse, HTMLResponse, RedirectResponse
from fastapi.security import HTTPBearer, HTTPAuthorizationCredentials
from pydantic import BaseModel
import redis
import requests as http_requests
from urllib.parse import urlparse
from celery_app import app as celery_app
from tasks import render_effect
# L2 server for auth verification
L2_SERVER = os.environ.get("L2_SERVER", "http://localhost:8200")
L2_DOMAIN = os.environ.get("L2_DOMAIN", "artdag.rose-ash.com")
L1_PUBLIC_URL = os.environ.get("L1_PUBLIC_URL", "http://localhost:8100")
# Cache directory (use /data/cache in Docker, ~/.artdag/cache locally)
CACHE_DIR = Path(os.environ.get("CACHE_DIR", str(Path.home() / ".artdag" / "cache")))
CACHE_DIR.mkdir(parents=True, exist_ok=True)
# Redis for persistent run storage
REDIS_URL = os.environ.get('REDIS_URL', 'redis://localhost:6379/5')
parsed = urlparse(REDIS_URL)
redis_client = redis.Redis(
host=parsed.hostname or 'localhost',
port=parsed.port or 6379,
db=int(parsed.path.lstrip('/') or 0)
)
RUNS_KEY_PREFIX = "artdag:run:"
def save_run(run: "RunStatus"):
"""Save run to Redis."""
redis_client.set(f"{RUNS_KEY_PREFIX}{run.run_id}", run.model_dump_json())
def load_run(run_id: str) -> Optional["RunStatus"]:
"""Load run from Redis."""
data = redis_client.get(f"{RUNS_KEY_PREFIX}{run_id}")
if data:
return RunStatus.model_validate_json(data)
return None
def list_all_runs() -> list["RunStatus"]:
"""List all runs from Redis."""
runs = []
for key in redis_client.scan_iter(f"{RUNS_KEY_PREFIX}*"):
data = redis_client.get(key)
if data:
runs.append(RunStatus.model_validate_json(data))
return sorted(runs, key=lambda r: r.created_at, reverse=True)
app = FastAPI(
title="Art DAG L1 Server",
description="Distributed rendering server for Art DAG",
version="0.1.0"
)
class RunRequest(BaseModel):
"""Request to start a run."""
recipe: str # Recipe name (e.g., "dog", "identity")
inputs: list[str] # List of content hashes
output_name: Optional[str] = None
class RunStatus(BaseModel):
"""Status of a run."""
run_id: str
status: str # pending, running, completed, failed
recipe: str
inputs: list[str]
output_name: str
created_at: str
completed_at: Optional[str] = None
output_hash: Optional[str] = None
error: Optional[str] = None
celery_task_id: Optional[str] = None
effects_commit: Optional[str] = None
effect_url: Optional[str] = None # URL to effect source code
username: Optional[str] = None # Owner of the run (ActivityPub actor ID)
infrastructure: Optional[dict] = None # Hardware/software used for rendering
# ============ Auth ============
security = HTTPBearer(auto_error=False)
def verify_token_with_l2(token: str) -> Optional[str]:
"""Verify token with L2 server, return username if valid."""
try:
resp = http_requests.post(
f"{L2_SERVER}/auth/verify",
headers={"Authorization": f"Bearer {token}"},
timeout=5
)
if resp.status_code == 200:
return resp.json().get("username")
except Exception:
pass
return None
async def get_optional_user(
credentials: HTTPAuthorizationCredentials = Depends(security)
) -> Optional[str]:
"""Get username if authenticated, None otherwise."""
if not credentials:
return None
return verify_token_with_l2(credentials.credentials)
async def get_required_user(
credentials: HTTPAuthorizationCredentials = Depends(security)
) -> str:
"""Get username, raise 401 if not authenticated."""
if not credentials:
raise HTTPException(401, "Not authenticated")
username = verify_token_with_l2(credentials.credentials)
if not username:
raise HTTPException(401, "Invalid token")
return username
def file_hash(path: Path) -> str:
"""Compute SHA3-256 hash of a file."""
hasher = hashlib.sha3_256()
with open(path, "rb") as f:
for chunk in iter(lambda: f.read(65536), b""):
hasher.update(chunk)
return hasher.hexdigest()
def cache_file(source: Path) -> str:
"""Copy file to cache, return content hash."""
content_hash = file_hash(source)
cache_path = CACHE_DIR / content_hash
if not cache_path.exists():
import shutil
shutil.copy2(source, cache_path)
return content_hash
@app.get("/api")
async def api_info():
"""Server info (JSON)."""
return {
"name": "Art DAG L1 Server",
"version": "0.1.0",
"cache_dir": str(CACHE_DIR),
"runs_count": len(list_all_runs())
}
HOME_HTML = """
Art DAG L1 Server
Runs UI
API Docs
API Info
Art DAG L1 Server
L1 rendering server for the Art DAG system. Manages distributed rendering jobs via Celery workers.
Dependencies
artdag (GitHub): Core DAG execution engine
artdag-effects (rose-ash): Effect implementations
Redis : Message broker, result backend, and run persistence
API Endpoints
Method
Path
Description
GET /uiWeb UI for viewing runs
POST /runsStart a rendering run
GET /runsList all runs
GET /runs/{run_id}Get run status
GET /cacheList cached content hashes
GET /cache/{hash}Download cached content
POST /cache/uploadUpload file to cache
GET /assetsList known assets
Start a Run
curl -X POST /runs \\
-H "Content-Type: application/json" \\
-d '{"recipe": "dog", "inputs": ["33268b6e..."]}'
Provenance
Every render produces a provenance record linking inputs, effects, and infrastructure:
{
"output": {"content_hash": "..."},
"inputs": [...],
"effects": [...],
"infrastructure": {...}
}
"""
@app.get("/", response_class=HTMLResponse)
async def root():
"""Home page."""
return HOME_HTML
@app.post("/runs", response_model=RunStatus)
async def create_run(request: RunRequest, username: str = Depends(get_required_user)):
"""Start a new rendering run. Requires authentication."""
run_id = str(uuid.uuid4())
# Generate output name if not provided
output_name = request.output_name or f"{request.recipe}-{run_id[:8]}"
# Format username as ActivityPub actor ID
actor_id = f"@{username}@{L2_DOMAIN}"
# Create run record
run = RunStatus(
run_id=run_id,
status="pending",
recipe=request.recipe,
inputs=request.inputs,
output_name=output_name,
created_at=datetime.now(timezone.utc).isoformat(),
username=actor_id
)
# Submit to Celery
# For now, we only support single-input recipes
if len(request.inputs) != 1:
raise HTTPException(400, "Currently only single-input recipes supported")
input_hash = request.inputs[0]
task = render_effect.delay(input_hash, request.recipe, output_name)
run.celery_task_id = task.id
run.status = "running"
save_run(run)
return run
@app.get("/runs/{run_id}", response_model=RunStatus)
async def get_run(run_id: str):
"""Get status of a run."""
run = load_run(run_id)
if not run:
raise HTTPException(404, f"Run {run_id} not found")
# Check Celery task status if running
if run.status == "running" and run.celery_task_id:
task = celery_app.AsyncResult(run.celery_task_id)
if task.ready():
if task.successful():
result = task.result
run.status = "completed"
run.completed_at = datetime.now(timezone.utc).isoformat()
run.output_hash = result.get("output", {}).get("content_hash")
# Extract effects info from provenance
effects = result.get("effects", [])
if effects:
run.effects_commit = effects[0].get("repo_commit")
run.effect_url = effects[0].get("repo_url")
# Extract infrastructure info
run.infrastructure = result.get("infrastructure")
# Cache the output
output_path = Path(result.get("output", {}).get("local_path", ""))
if output_path.exists():
cache_file(output_path)
else:
run.status = "failed"
run.error = str(task.result)
# Save updated status
save_run(run)
return run
@app.get("/runs")
async def list_runs():
"""List all runs."""
return list_all_runs()
@app.get("/cache/{content_hash}")
async def get_cached(content_hash: str):
"""Get cached content by hash."""
cache_path = CACHE_DIR / content_hash
if not cache_path.exists():
raise HTTPException(404, f"Content {content_hash} not in cache")
return FileResponse(cache_path)
@app.get("/cache/{content_hash}/mp4")
async def get_cached_mp4(content_hash: str):
"""Get cached content as MP4 (transcodes MKV on first request, caches result)."""
cache_path = CACHE_DIR / content_hash
mp4_path = CACHE_DIR / f"{content_hash}.mp4"
if not cache_path.exists():
raise HTTPException(404, f"Content {content_hash} not in cache")
# If MP4 already cached, serve it
if mp4_path.exists():
return FileResponse(mp4_path, media_type="video/mp4")
# Check if source is already MP4
media_type = detect_media_type(cache_path)
if media_type != "video":
raise HTTPException(400, "Content is not a video")
# Check if already MP4 format
import subprocess
try:
result = subprocess.run(
["ffprobe", "-v", "error", "-select_streams", "v:0",
"-show_entries", "format=format_name", "-of", "csv=p=0", str(cache_path)],
capture_output=True, text=True, timeout=10
)
if "mp4" in result.stdout.lower() or "mov" in result.stdout.lower():
# Already MP4-compatible, just serve original
return FileResponse(cache_path, media_type="video/mp4")
except Exception:
pass # Continue with transcoding
# Transcode to MP4 (H.264 + AAC)
transcode_path = CACHE_DIR / f"{content_hash}.transcoding.mp4"
try:
result = subprocess.run(
["ffmpeg", "-y", "-i", str(cache_path),
"-c:v", "libx264", "-preset", "fast", "-crf", "23",
"-c:a", "aac", "-b:a", "128k",
"-movflags", "+faststart",
str(transcode_path)],
capture_output=True, text=True, timeout=600 # 10 min timeout
)
if result.returncode != 0:
raise HTTPException(500, f"Transcoding failed: {result.stderr[:200]}")
# Move to final location
transcode_path.rename(mp4_path)
except subprocess.TimeoutExpired:
if transcode_path.exists():
transcode_path.unlink()
raise HTTPException(500, "Transcoding timed out")
except Exception as e:
if transcode_path.exists():
transcode_path.unlink()
raise HTTPException(500, f"Transcoding failed: {e}")
return FileResponse(mp4_path, media_type="video/mp4")
@app.get("/ui/cache/{content_hash}", response_class=HTMLResponse)
async def ui_cache_view(content_hash: str, request: Request):
"""View cached content with appropriate display."""
current_user = get_user_from_cookie(request)
if not current_user:
return HTMLResponse(f'''
Login Required | Art DAG L1
{TAILWIND_CONFIG}
Login to view cached content.
''', status_code=401)
# Check user has access to this file
user_hashes = get_user_cache_hashes(current_user)
if content_hash not in user_hashes:
return HTMLResponse(f'''
Access Denied | Art DAG L1
{TAILWIND_CONFIG}
Access denied
''', status_code=403)
cache_path = CACHE_DIR / content_hash
if not cache_path.exists():
return HTMLResponse(f'''
Not Found | Art DAG L1
{TAILWIND_CONFIG}
Back
Content not found: {content_hash}
''', status_code=404)
media_type = detect_media_type(cache_path)
file_size = cache_path.stat().st_size
size_str = f"{file_size:,} bytes"
if file_size > 1024*1024:
size_str = f"{file_size/(1024*1024):.1f} MB"
elif file_size > 1024:
size_str = f"{file_size/1024:.1f} KB"
# Build media display HTML
if media_type == "video":
video_src = video_src_for_request(content_hash, request)
media_html = f' '
elif media_type == "image":
media_html = f' '
else:
media_html = f'Unknown file type. Download file
'
html = f"""
{content_hash[:16]}... | Art DAG L1
{TAILWIND_CONFIG}
Back to cache
{media_type.capitalize()}
{content_hash[:24]}...
Download
{media_html}
Details
Content Hash (SHA3-256)
{content_hash}
"""
return html
@app.get("/cache")
async def list_cache():
"""List cached content hashes."""
return [f.name for f in CACHE_DIR.iterdir() if f.is_file()]
# Known assets (bootstrap data)
KNOWN_ASSETS = {
"cat": "33268b6e167deaf018cc538de12dbe562612b33e89a749391cef855b320a269b",
}
@app.get("/assets")
async def list_assets():
"""List known assets."""
return KNOWN_ASSETS
@app.post("/cache/import")
async def import_to_cache(path: str):
"""Import a local file to cache."""
source = Path(path)
if not source.exists():
raise HTTPException(404, f"File not found: {path}")
content_hash = cache_file(source)
return {"content_hash": content_hash, "cached": True}
def save_cache_meta(content_hash: str, uploader: str = None, filename: str = None, **updates):
"""Save or update metadata for a cached file."""
meta_path = CACHE_DIR / f"{content_hash}.meta.json"
# Load existing or create new
if meta_path.exists():
with open(meta_path) as f:
meta = json.load(f)
else:
meta = {
"uploader": uploader,
"uploaded_at": datetime.now(timezone.utc).isoformat(),
"filename": filename
}
# Apply updates (but never change uploader or uploaded_at)
for key, value in updates.items():
if key not in ("uploader", "uploaded_at"):
meta[key] = value
with open(meta_path, "w") as f:
json.dump(meta, f, indent=2)
return meta
def load_cache_meta(content_hash: str) -> dict:
"""Load metadata for a cached file."""
meta_path = CACHE_DIR / f"{content_hash}.meta.json"
if meta_path.exists():
with open(meta_path) as f:
return json.load(f)
return {}
# User data storage (folders, collections)
USER_DATA_DIR = CACHE_DIR / ".user-data"
def load_user_data(username: str) -> dict:
"""Load user's folders and collections."""
USER_DATA_DIR.mkdir(parents=True, exist_ok=True)
# Normalize username (remove @ prefix if present)
safe_name = username.replace("@", "").replace("/", "_")
user_file = USER_DATA_DIR / f"{safe_name}.json"
if user_file.exists():
with open(user_file) as f:
return json.load(f)
return {"folders": ["/"], "collections": []}
def save_user_data(username: str, data: dict):
"""Save user's folders and collections."""
USER_DATA_DIR.mkdir(parents=True, exist_ok=True)
safe_name = username.replace("@", "").replace("/", "_")
user_file = USER_DATA_DIR / f"{safe_name}.json"
with open(user_file, "w") as f:
json.dump(data, f, indent=2)
def get_user_cache_hashes(username: str) -> set:
"""Get all cache hashes owned by or associated with a user."""
actor_id = f"@{username}@{L2_DOMAIN}"
hashes = set()
# Files uploaded by user
if CACHE_DIR.exists():
for f in CACHE_DIR.iterdir():
if f.name.endswith('.meta.json'):
meta = load_cache_meta(f.name.replace('.meta.json', ''))
if meta.get("uploader") in (username, actor_id):
hashes.add(f.name.replace('.meta.json', ''))
# Files from user's runs (inputs and outputs)
for run in list_all_runs():
if run.username in (username, actor_id):
hashes.update(run.inputs)
if run.output_hash:
hashes.add(run.output_hash)
return hashes
@app.post("/cache/upload")
async def upload_to_cache(file: UploadFile = File(...), username: str = Depends(get_required_user)):
"""Upload a file to cache. Requires authentication."""
# Write to temp file first
import tempfile
with tempfile.NamedTemporaryFile(delete=False) as tmp:
content = await file.read()
tmp.write(content)
tmp_path = Path(tmp.name)
# Hash and move to cache
content_hash = file_hash(tmp_path)
cache_path = CACHE_DIR / content_hash
if not cache_path.exists():
import shutil
shutil.move(str(tmp_path), cache_path)
else:
tmp_path.unlink()
# Save uploader metadata
actor_id = f"@{username}@{L2_DOMAIN}"
save_cache_meta(content_hash, actor_id, file.filename)
return {"content_hash": content_hash, "filename": file.filename, "size": len(content)}
class CacheMetaUpdate(BaseModel):
"""Request to update cache metadata."""
origin: Optional[dict] = None # {"type": "self"|"external", "url": "...", "note": "..."}
description: Optional[str] = None
tags: Optional[list[str]] = None
folder: Optional[str] = None
collections: Optional[list[str]] = None
class PublishRequest(BaseModel):
"""Request to publish a cache item to L2."""
asset_name: str
asset_type: str = "image" # image, video, etc.
@app.get("/cache/{content_hash}/meta")
async def get_cache_meta(content_hash: str, username: str = Depends(get_required_user)):
"""Get metadata for a cached file."""
# Check file exists
cache_path = CACHE_DIR / content_hash
if not cache_path.exists():
raise HTTPException(404, "Content not found")
# Check ownership
user_hashes = get_user_cache_hashes(username)
if content_hash not in user_hashes:
raise HTTPException(403, "Access denied")
return load_cache_meta(content_hash)
@app.patch("/cache/{content_hash}/meta")
async def update_cache_meta(content_hash: str, update: CacheMetaUpdate, username: str = Depends(get_required_user)):
"""Update metadata for a cached file."""
# Check file exists
cache_path = CACHE_DIR / content_hash
if not cache_path.exists():
raise HTTPException(404, "Content not found")
# Check ownership
user_hashes = get_user_cache_hashes(username)
if content_hash not in user_hashes:
raise HTTPException(403, "Access denied")
# Build update dict from non-None fields
updates = {}
if update.origin is not None:
updates["origin"] = update.origin
if update.description is not None:
updates["description"] = update.description
if update.tags is not None:
updates["tags"] = update.tags
if update.folder is not None:
# Ensure folder exists in user's folder list
user_data = load_user_data(username)
if update.folder not in user_data["folders"]:
raise HTTPException(400, f"Folder does not exist: {update.folder}")
updates["folder"] = update.folder
if update.collections is not None:
# Validate collections exist
user_data = load_user_data(username)
existing = {c["name"] for c in user_data["collections"]}
for col in update.collections:
if col not in existing:
raise HTTPException(400, f"Collection does not exist: {col}")
updates["collections"] = update.collections
meta = save_cache_meta(content_hash, **updates)
return meta
@app.post("/cache/{content_hash}/publish")
async def publish_cache_to_l2(
content_hash: str,
req: PublishRequest,
request: Request,
username: str = Depends(get_required_user)
):
"""
Publish a cache item to L2 (ActivityPub).
Requires origin to be set in metadata before publishing.
"""
# Check file exists
cache_path = CACHE_DIR / content_hash
if not cache_path.exists():
raise HTTPException(404, "Content not found")
# Check ownership
user_hashes = get_user_cache_hashes(username)
if content_hash not in user_hashes:
raise HTTPException(403, "Access denied")
# Load metadata
meta = load_cache_meta(content_hash)
# Check origin is set
origin = meta.get("origin")
if not origin or "type" not in origin:
raise HTTPException(400, "Origin must be set before publishing. Use --origin self or --origin-url ")
# Get auth token to pass to L2
token = request.cookies.get("auth_token")
if not token:
# Try from header
auth_header = request.headers.get("Authorization", "")
if auth_header.startswith("Bearer "):
token = auth_header[7:]
if not token:
raise HTTPException(401, "Authentication token required")
# Call L2 publish-cache endpoint
try:
resp = http_requests.post(
f"{L2_SERVER}/registry/publish-cache",
headers={"Authorization": f"Bearer {token}"},
json={
"content_hash": content_hash,
"asset_name": req.asset_name,
"asset_type": req.asset_type,
"origin": origin,
"description": meta.get("description"),
"tags": meta.get("tags", []),
"metadata": {
"filename": meta.get("filename"),
"folder": meta.get("folder"),
"collections": meta.get("collections", [])
}
},
timeout=10
)
resp.raise_for_status()
l2_result = resp.json()
except http_requests.exceptions.HTTPError as e:
error_detail = ""
try:
error_detail = e.response.json().get("detail", str(e))
except Exception:
error_detail = str(e)
raise HTTPException(400, f"L2 publish failed: {error_detail}")
except Exception as e:
raise HTTPException(500, f"L2 publish failed: {e}")
# Update local metadata with publish status
publish_info = {
"to_l2": True,
"asset_name": req.asset_name,
"published_at": datetime.now(timezone.utc).isoformat(),
"last_synced_at": datetime.now(timezone.utc).isoformat()
}
save_cache_meta(content_hash, published=publish_info)
return {
"published": True,
"asset_name": req.asset_name,
"l2_result": l2_result
}
@app.patch("/cache/{content_hash}/republish")
async def republish_cache_to_l2(
content_hash: str,
request: Request,
username: str = Depends(get_required_user)
):
"""
Re-publish (update) a cache item on L2 after metadata changes.
Only works for already-published items.
"""
# Check file exists
cache_path = CACHE_DIR / content_hash
if not cache_path.exists():
raise HTTPException(404, "Content not found")
# Check ownership
user_hashes = get_user_cache_hashes(username)
if content_hash not in user_hashes:
raise HTTPException(403, "Access denied")
# Load metadata
meta = load_cache_meta(content_hash)
# Check already published
published = meta.get("published", {})
if not published.get("to_l2"):
raise HTTPException(400, "Item not published yet. Use publish first.")
asset_name = published.get("asset_name")
if not asset_name:
raise HTTPException(400, "No asset name found in publish info")
# Get auth token
token = request.cookies.get("auth_token")
if not token:
auth_header = request.headers.get("Authorization", "")
if auth_header.startswith("Bearer "):
token = auth_header[7:]
if not token:
raise HTTPException(401, "Authentication token required")
# Call L2 update endpoint
try:
resp = http_requests.patch(
f"{L2_SERVER}/registry/{asset_name}",
headers={"Authorization": f"Bearer {token}"},
json={
"description": meta.get("description"),
"tags": meta.get("tags"),
"origin": meta.get("origin"),
"metadata": {
"filename": meta.get("filename"),
"folder": meta.get("folder"),
"collections": meta.get("collections", [])
}
},
timeout=10
)
resp.raise_for_status()
l2_result = resp.json()
except http_requests.exceptions.HTTPError as e:
error_detail = ""
try:
error_detail = e.response.json().get("detail", str(e))
except Exception:
error_detail = str(e)
raise HTTPException(400, f"L2 update failed: {error_detail}")
except Exception as e:
raise HTTPException(500, f"L2 update failed: {e}")
# Update local metadata
published["last_synced_at"] = datetime.now(timezone.utc).isoformat()
save_cache_meta(content_hash, published=published)
return {
"updated": True,
"asset_name": asset_name,
"l2_result": l2_result
}
# ============ Folder & Collection Management ============
@app.get("/user/folders")
async def list_folders(username: str = Depends(get_required_user)):
"""List user's folders."""
user_data = load_user_data(username)
return {"folders": user_data["folders"]}
@app.post("/user/folders")
async def create_folder(folder_path: str, username: str = Depends(get_required_user)):
"""Create a new folder."""
user_data = load_user_data(username)
# Validate path format
if not folder_path.startswith("/"):
raise HTTPException(400, "Folder path must start with /")
# Check parent exists
parent = "/".join(folder_path.rsplit("/", 1)[:-1]) or "/"
if parent != "/" and parent not in user_data["folders"]:
raise HTTPException(400, f"Parent folder does not exist: {parent}")
# Check doesn't already exist
if folder_path in user_data["folders"]:
raise HTTPException(400, f"Folder already exists: {folder_path}")
user_data["folders"].append(folder_path)
user_data["folders"].sort()
save_user_data(username, user_data)
return {"folder": folder_path, "created": True}
@app.delete("/user/folders")
async def delete_folder(folder_path: str, username: str = Depends(get_required_user)):
"""Delete a folder (must be empty)."""
if folder_path == "/":
raise HTTPException(400, "Cannot delete root folder")
user_data = load_user_data(username)
if folder_path not in user_data["folders"]:
raise HTTPException(404, "Folder not found")
# Check no subfolders
for f in user_data["folders"]:
if f.startswith(folder_path + "/"):
raise HTTPException(400, f"Folder has subfolders: {f}")
# Check no items in folder
user_hashes = get_user_cache_hashes(username)
for h in user_hashes:
meta = load_cache_meta(h)
if meta.get("folder") == folder_path:
raise HTTPException(400, "Folder is not empty")
user_data["folders"].remove(folder_path)
save_user_data(username, user_data)
return {"folder": folder_path, "deleted": True}
@app.get("/user/collections")
async def list_collections(username: str = Depends(get_required_user)):
"""List user's collections."""
user_data = load_user_data(username)
return {"collections": user_data["collections"]}
@app.post("/user/collections")
async def create_collection(name: str, username: str = Depends(get_required_user)):
"""Create a new collection."""
user_data = load_user_data(username)
# Check doesn't already exist
for col in user_data["collections"]:
if col["name"] == name:
raise HTTPException(400, f"Collection already exists: {name}")
user_data["collections"].append({
"name": name,
"created_at": datetime.now(timezone.utc).isoformat()
})
save_user_data(username, user_data)
return {"collection": name, "created": True}
@app.delete("/user/collections")
async def delete_collection(name: str, username: str = Depends(get_required_user)):
"""Delete a collection."""
user_data = load_user_data(username)
# Find and remove
for i, col in enumerate(user_data["collections"]):
if col["name"] == name:
user_data["collections"].pop(i)
save_user_data(username, user_data)
# Remove from all cache items
user_hashes = get_user_cache_hashes(username)
for h in user_hashes:
meta = load_cache_meta(h)
if name in meta.get("collections", []):
meta["collections"].remove(name)
save_cache_meta(h, **{k: v for k, v in meta.items() if k not in ("uploader", "uploaded_at")})
return {"collection": name, "deleted": True}
raise HTTPException(404, "Collection not found")
def is_ios_request(request: Request) -> bool:
"""Check if request is from iOS device."""
ua = request.headers.get("user-agent", "").lower()
return "iphone" in ua or "ipad" in ua
def video_src_for_request(content_hash: str, request: Request) -> str:
"""Get video src URL, using MP4 endpoint for iOS."""
if is_ios_request(request):
return f"/cache/{content_hash}/mp4"
return f"/cache/{content_hash}"
def detect_media_type(cache_path: Path) -> str:
"""Detect if file is image or video based on magic bytes."""
with open(cache_path, "rb") as f:
header = f.read(32)
# Video signatures
if header[:4] == b'\x1a\x45\xdf\xa3': # WebM/MKV
return "video"
if header[4:8] == b'ftyp': # MP4/MOV
return "video"
if header[:4] == b'RIFF' and header[8:12] == b'AVI ': # AVI
return "video"
# Image signatures
if header[:8] == b'\x89PNG\r\n\x1a\n': # PNG
return "image"
if header[:2] == b'\xff\xd8': # JPEG
return "image"
if header[:6] in (b'GIF87a', b'GIF89a'): # GIF
return "image"
if header[:4] == b'RIFF' and header[8:12] == b'WEBP': # WebP
return "image"
return "unknown"
def get_user_from_cookie(request) -> Optional[str]:
"""Get username from auth cookie."""
token = request.cookies.get("auth_token")
if not token:
return None
return verify_token_with_l2(token)
# Tailwind CSS config for all L1 templates
TAILWIND_CONFIG = '''
'''
def render_ui_html(username: Optional[str] = None, tab: str = "runs") -> str:
"""Render main UI HTML with optional user context."""
user_info = ""
if username:
user_info = f'''
Logged in as
{username}
Logout
'''
else:
user_info = '''
'''
runs_active = "border-b-2 border-blue-500 text-white" if tab == "runs" else "text-gray-400 hover:text-white"
cache_active = "border-b-2 border-blue-500 text-white" if tab == "cache" else "text-gray-400 hover:text-white"
content_url = "/ui/runs" if tab == "runs" else "/ui/cache-list"
return f"""
Art DAG L1 Server
{TAILWIND_CONFIG}
"""
def get_auth_page_html(page_type: str = "login", error: str = None) -> str:
"""Generate login or register page HTML with Tailwind CSS."""
is_login = page_type == "login"
title = "Login" if is_login else "Register"
login_active = "bg-blue-600 text-white" if is_login else "bg-dark-500 text-gray-400 hover:text-white"
register_active = "bg-dark-500 text-gray-400 hover:text-white" if is_login else "bg-blue-600 text-white"
error_html = f'{error}
' if error else ''
form_fields = '''
'''
if not is_login:
form_fields += '''
'''
return f"""
{title} | Art DAG L1 Server
{TAILWIND_CONFIG}
"""
UI_LOGIN_HTML = get_auth_page_html("login")
UI_REGISTER_HTML = get_auth_page_html("register")
@app.get("/ui", response_class=HTMLResponse)
async def ui_index(request: Request, tab: str = "runs"):
"""Web UI for viewing runs and cache."""
username = get_user_from_cookie(request)
return render_ui_html(username, tab)
@app.get("/ui/login", response_class=HTMLResponse)
async def ui_login_page():
"""Login page."""
return UI_LOGIN_HTML
@app.post("/ui/login")
async def ui_login(username: str = Form(...), password: str = Form(...)):
"""Process login form."""
try:
resp = http_requests.post(
f"{L2_SERVER}/auth/login",
json={"username": username, "password": password},
timeout=5
)
if resp.status_code == 200:
token = resp.json().get("access_token")
response = RedirectResponse(url="/ui", status_code=303)
response.set_cookie("auth_token", token, httponly=True, max_age=30*24*60*60)
return response
except Exception:
pass
return HTMLResponse(get_auth_page_html("login", "Invalid username or password"))
@app.get("/ui/register", response_class=HTMLResponse)
async def ui_register_page():
"""Register page."""
return UI_REGISTER_HTML
@app.post("/ui/register")
async def ui_register(
username: str = Form(...),
password: str = Form(...),
email: str = Form(None)
):
"""Process registration form."""
try:
resp = http_requests.post(
f"{L2_SERVER}/auth/register",
json={"username": username, "password": password, "email": email},
timeout=5
)
if resp.status_code == 200:
token = resp.json().get("access_token")
response = RedirectResponse(url="/ui", status_code=303)
response.set_cookie("auth_token", token, httponly=True, max_age=30*24*60*60)
return response
elif resp.status_code == 400:
error = resp.json().get("detail", "Registration failed")
return HTMLResponse(get_auth_page_html("register", error))
except Exception as e:
return HTMLResponse(get_auth_page_html("register", f"Registration failed: {e}"))
@app.get("/ui/logout")
async def ui_logout():
"""Logout - clear cookie."""
response = RedirectResponse(url="/ui", status_code=303)
response.delete_cookie("auth_token")
return response
@app.post("/ui/publish-run/{run_id}", response_class=HTMLResponse)
async def ui_publish_run(run_id: str, request: Request, output_name: str = Form(...)):
"""Publish a run to L2 from the web UI."""
token = request.cookies.get("auth_token")
if not token:
return HTMLResponse('Not logged in
')
# Call L2 to publish the run, including this L1's public URL
try:
resp = http_requests.post(
f"{L2_SERVER}/registry/record-run",
json={"run_id": run_id, "output_name": output_name, "l1_server": L1_PUBLIC_URL},
headers={"Authorization": f"Bearer {token}"},
timeout=10
)
if resp.status_code == 400:
error = resp.json().get("detail", "Bad request")
return HTMLResponse(f'Error: {error}
')
resp.raise_for_status()
result = resp.json()
return HTMLResponse(f'''
Published to L2 as {result["asset"]["name"]}
''')
except http_requests.exceptions.HTTPError as e:
error_detail = ""
try:
error_detail = e.response.json().get("detail", str(e))
except Exception:
error_detail = str(e)
return HTMLResponse(f'Error: {error_detail}
')
except Exception as e:
return HTMLResponse(f'Error: {e}
')
@app.get("/ui/runs", response_class=HTMLResponse)
async def ui_runs(request: Request):
"""HTMX partial: list of runs."""
current_user = get_user_from_cookie(request)
runs = list_all_runs()
# Require login to see runs
if not current_user:
return 'Login to see your runs.
'
# Filter runs by user - match both plain username and ActivityPub format (@user@domain)
actor_id = f"@{current_user}@{L2_DOMAIN}"
runs = [r for r in runs if r.username in (current_user, actor_id)]
if not runs:
return 'You have no runs yet. Use the CLI to start a run.
'
# Status badge colors
status_colors = {
"completed": "bg-green-600 text-white",
"running": "bg-yellow-600 text-white",
"failed": "bg-red-600 text-white",
"pending": "bg-gray-600 text-white"
}
html_parts = ['')
return '\n'.join(html_parts)
@app.get("/ui/cache-list", response_class=HTMLResponse)
async def ui_cache_list(
request: Request,
folder: Optional[str] = None,
collection: Optional[str] = None,
tag: Optional[str] = None
):
"""HTMX partial: list of cached items with optional filtering."""
current_user = get_user_from_cookie(request)
# Require login to see cache
if not current_user:
return 'Login to see cached content.
'
# Get hashes owned by/associated with this user
user_hashes = get_user_cache_hashes(current_user)
# Get cache items that belong to the user
cache_items = []
if CACHE_DIR.exists():
for f in CACHE_DIR.iterdir():
if f.is_file() and not f.name.endswith('.provenance.json') and not f.name.endswith('.meta.json') and not f.name.endswith('.mp4'):
if f.name in user_hashes:
# Load metadata for filtering
meta = load_cache_meta(f.name)
# Apply folder filter
if folder:
item_folder = meta.get("folder", "/")
if folder != "/" and not item_folder.startswith(folder):
continue
if folder == "/" and item_folder != "/":
continue
# Apply collection filter
if collection:
if collection not in meta.get("collections", []):
continue
# Apply tag filter
if tag:
if tag not in meta.get("tags", []):
continue
stat = f.stat()
cache_items.append({
"hash": f.name,
"size": stat.st_size,
"mtime": stat.st_mtime,
"meta": meta
})
# Sort by modification time (newest first)
cache_items.sort(key=lambda x: x["mtime"], reverse=True)
if not cache_items:
filter_msg = ""
if folder:
filter_msg = f" in folder {folder}"
elif collection:
filter_msg = f" in collection '{collection}'"
elif tag:
filter_msg = f" with tag '{tag}'"
return f'No cached files{filter_msg}. Upload files or run effects to see them here.
'
html_parts = ['')
return '\n'.join(html_parts)
@app.get("/ui/detail/{run_id}", response_class=HTMLResponse)
async def ui_detail_page(run_id: str, request: Request):
"""Full detail page for a run."""
current_user = get_user_from_cookie(request)
if not current_user:
return HTMLResponse(f'''
Login Required | Art DAG L1
{TAILWIND_CONFIG}
Login to view run details.
''', status_code=401)
run = load_run(run_id)
if not run:
return HTMLResponse(f'''
Not Found | Art DAG L1
{TAILWIND_CONFIG}
Run not found
''', status_code=404)
# Check user owns this run
actor_id = f"@{current_user}@{L2_DOMAIN}"
if run.username not in (current_user, actor_id):
return HTMLResponse(f'''
Access Denied | Art DAG L1
{TAILWIND_CONFIG}
Access denied
''', status_code=403)
# Check Celery task status if running
if run.status == "running" and run.celery_task_id:
task = celery_app.AsyncResult(run.celery_task_id)
if task.ready():
if task.successful():
result = task.result
run.status = "completed"
run.completed_at = datetime.now(timezone.utc).isoformat()
run.output_hash = result.get("output", {}).get("content_hash")
# Extract effects info from provenance
effects = result.get("effects", [])
if effects:
run.effects_commit = effects[0].get("repo_commit")
run.effect_url = effects[0].get("repo_url")
# Extract infrastructure info
run.infrastructure = result.get("infrastructure")
output_path = Path(result.get("output", {}).get("local_path", ""))
if output_path.exists():
cache_file(output_path)
else:
run.status = "failed"
run.error = str(task.result)
save_run(run)
# Use stored effect URL or build fallback
if run.effect_url:
effect_url = run.effect_url
elif run.effects_commit and run.effects_commit != "unknown":
effect_url = f"https://git.rose-ash.com/art-dag/effects/src/commit/{run.effects_commit}/{run.recipe}"
else:
effect_url = f"https://git.rose-ash.com/art-dag/effects/src/branch/main/{run.recipe}"
# Status badge colors
status_colors = {
"completed": "bg-green-600 text-white",
"running": "bg-yellow-600 text-white",
"failed": "bg-red-600 text-white",
"pending": "bg-gray-600 text-white"
}
status_badge = status_colors.get(run.status, "bg-gray-600 text-white")
# Build media HTML for input/output
media_html = ""
has_input = run.inputs and (CACHE_DIR / run.inputs[0]).exists()
has_output = run.status == "completed" and run.output_hash and (CACHE_DIR / run.output_hash).exists()
if has_input or has_output:
media_html = ''
if has_input:
input_hash = run.inputs[0]
input_media_type = detect_media_type(CACHE_DIR / input_hash)
input_video_src = video_src_for_request(input_hash, request)
if input_media_type == "video":
input_elem = f'
'
elif input_media_type == "image":
input_elem = f'
'
else:
input_elem = '
Unknown format
'
media_html += f'''
'''
if has_output:
output_hash = run.output_hash
output_media_type = detect_media_type(CACHE_DIR / output_hash)
output_video_src = video_src_for_request(output_hash, request)
if output_media_type == "video":
output_elem = f'
'
elif output_media_type == "image":
output_elem = f'
'
else:
output_elem = '
Unknown format
'
media_html += f'''
'''
media_html += '
'
# Build inputs list
inputs_html = ''.join([f'{inp} ' for inp in run.inputs])
# Infrastructure section
infra_html = ""
if run.infrastructure:
software = run.infrastructure.get("software", {})
hardware = run.infrastructure.get("hardware", {})
infra_html = f'''
Infrastructure
Software: {software.get("name", "unknown")} ({software.get("content_hash", "unknown")[:16]}...)
Hardware: {hardware.get("name", "unknown")} ({hardware.get("content_hash", "unknown")[:16]}...)
'''
# Error display
error_html = ""
if run.error:
error_html = f'''
'''
# Raw JSON provenance
provenance_json = json.dumps({
"run_id": run.run_id,
"status": run.status,
"recipe": run.recipe,
"effects_commit": run.effects_commit,
"effect_url": run.effect_url or effect_url,
"inputs": run.inputs,
"output_hash": run.output_hash,
"output_name": run.output_name,
"created_at": run.created_at,
"completed_at": run.completed_at,
"username": run.username,
"infrastructure": run.infrastructure,
"error": run.error
}, indent=2)
# Publish section
publish_html = ""
if run.status == "completed" and run.output_hash:
publish_html = f'''
Publish to L2
Register this transformation output on the L2 ActivityPub server.
'''
html = f"""
{run.recipe} - {run.run_id[:8]} | Art DAG L1
{TAILWIND_CONFIG}
Back to runs
{error_html}
{media_html}
Provenance
Owner
{run.username or "anonymous"}
Effects Commit
{run.effects_commit or "N/A"}
{"
" if run.output_hash else ""}
Created
{run.created_at[:19].replace('T', ' ')}
{"
Completed
" + run.completed_at[:19].replace('T', ' ') + "
" if run.completed_at else ""}
{infra_html}
Raw JSON
{provenance_json}
{publish_html}
"""
return html
@app.get("/ui/run/{run_id}", response_class=HTMLResponse)
async def ui_run_partial(run_id: str):
"""HTMX partial: single run (for polling updates)."""
run = load_run(run_id)
if not run:
return 'Run not found
'
# Check Celery task status if running
if run.status == "running" and run.celery_task_id:
task = celery_app.AsyncResult(run.celery_task_id)
if task.ready():
if task.successful():
result = task.result
run.status = "completed"
run.completed_at = datetime.now(timezone.utc).isoformat()
run.output_hash = result.get("output", {}).get("content_hash")
# Extract effects info from provenance
effects = result.get("effects", [])
if effects:
run.effects_commit = effects[0].get("repo_commit")
run.effect_url = effects[0].get("repo_url")
# Extract infrastructure info
run.infrastructure = result.get("infrastructure")
output_path = Path(result.get("output", {}).get("local_path", ""))
if output_path.exists():
cache_file(output_path)
else:
run.status = "failed"
run.error = str(task.result)
save_run(run)
# Status badge colors
status_colors = {
"completed": "bg-green-600 text-white",
"running": "bg-yellow-600 text-white",
"failed": "bg-red-600 text-white",
"pending": "bg-gray-600 text-white"
}
status_badge = status_colors.get(run.status, "bg-gray-600 text-white")
poll_attr = f'hx-get="/ui/run/{run_id}" hx-trigger="every 2s" hx-swap="outerHTML"' if run.status == "running" else ""
html = f'''
{run.recipe}
{run.run_id[:16]}...
{run.status}
Created: {run.created_at[:19].replace('T', ' ')}
'''
# Show input and output side by side
has_input = run.inputs and (CACHE_DIR / run.inputs[0]).exists()
has_output = run.status == "completed" and run.output_hash and (CACHE_DIR / run.output_hash).exists()
if has_input or has_output:
html += '
'
if has_input:
input_hash = run.inputs[0]
input_media_type = detect_media_type(CACHE_DIR / input_hash)
html += f'''
Input: {input_hash[:16]}...
'''
if input_media_type == "video":
html += f'
'
elif input_media_type == "image":
html += f'
'
html += '
'
if has_output:
output_hash = run.output_hash
output_media_type = detect_media_type(CACHE_DIR / output_hash)
html += f'''
Output: {output_hash[:16]}...
'''
if output_media_type == "video":
html += f'
'
elif output_media_type == "image":
html += f'
'
html += '
'
html += '
'
if run.status == "failed" and run.error:
html += f'
Error: {run.error}
'
html += '
'
return html
if __name__ == "__main__":
import uvicorn
uvicorn.run(app, host="0.0.0.0", port=8100)