#!/usr/bin/env python3 """ Art DAG L1 Server Manages rendering runs and provides access to the cache. - POST /runs - start a run (recipe + inputs) - GET /runs/{run_id} - get run status/result - GET /cache/{content_hash} - get cached content """ import hashlib import json import os import uuid from datetime import datetime, timezone from pathlib import Path from typing import Optional from fastapi import FastAPI, HTTPException, UploadFile, File, Depends, Form, Request from fastapi.responses import FileResponse, HTMLResponse, RedirectResponse from fastapi.security import HTTPBearer, HTTPAuthorizationCredentials from pydantic import BaseModel import redis import requests as http_requests from urllib.parse import urlparse import yaml from celery_app import app as celery_app from tasks import render_effect, execute_dag, build_effect_dag from cache_manager import L1CacheManager, get_cache_manager # L2 server for auth verification L2_SERVER = os.environ.get("L2_SERVER", "http://localhost:8200") L2_DOMAIN = os.environ.get("L2_DOMAIN", "artdag.rose-ash.com") L1_PUBLIC_URL = os.environ.get("L1_PUBLIC_URL", "http://localhost:8100") # Cache directory (use /data/cache in Docker, ~/.artdag/cache locally) CACHE_DIR = Path(os.environ.get("CACHE_DIR", str(Path.home() / ".artdag" / "cache"))) CACHE_DIR.mkdir(parents=True, exist_ok=True) # Initialize L1 cache manager with artdag integration cache_manager = L1CacheManager(cache_dir=CACHE_DIR, l2_server=L2_SERVER) # Redis for persistent run storage REDIS_URL = os.environ.get('REDIS_URL', 'redis://localhost:6379/5') parsed = urlparse(REDIS_URL) redis_client = redis.Redis( host=parsed.hostname or 'localhost', port=parsed.port or 6379, db=int(parsed.path.lstrip('/') or 0) ) RUNS_KEY_PREFIX = "artdag:run:" CONFIGS_KEY_PREFIX = "artdag:config:" def save_run(run: "RunStatus"): """Save run to Redis.""" redis_client.set(f"{RUNS_KEY_PREFIX}{run.run_id}", run.model_dump_json()) def load_run(run_id: str) -> Optional["RunStatus"]: """Load run from Redis.""" data = redis_client.get(f"{RUNS_KEY_PREFIX}{run_id}") if data: return RunStatus.model_validate_json(data) return None def list_all_runs() -> list["RunStatus"]: """List all runs from Redis.""" runs = [] for key in redis_client.scan_iter(f"{RUNS_KEY_PREFIX}*"): data = redis_client.get(key) if data: runs.append(RunStatus.model_validate_json(data)) return sorted(runs, key=lambda r: r.created_at, reverse=True) def find_runs_using_content(content_hash: str) -> list[tuple["RunStatus", str]]: """Find all runs that use a content_hash as input or output. Returns list of (run, role) tuples where role is 'input' or 'output'. """ results = [] for run in list_all_runs(): if run.inputs and content_hash in run.inputs: results.append((run, "input")) if run.output_hash == content_hash: results.append((run, "output")) return results app = FastAPI( title="Art DAG L1 Server", description="Distributed rendering server for Art DAG", version="0.1.0" ) class RunRequest(BaseModel): """Request to start a run.""" recipe: str # Recipe name (e.g., "dog", "identity") or "dag" for custom DAG inputs: list[str] # List of content hashes output_name: Optional[str] = None use_dag: bool = False # Use DAG engine instead of legacy effect runner dag_json: Optional[str] = None # Custom DAG JSON (required if recipe="dag") class RunStatus(BaseModel): """Status of a run.""" run_id: str status: str # pending, running, completed, failed recipe: str inputs: list[str] output_name: str created_at: str completed_at: Optional[str] = None output_hash: Optional[str] = None error: Optional[str] = None celery_task_id: Optional[str] = None effects_commit: Optional[str] = None effect_url: Optional[str] = None # URL to effect source code username: Optional[str] = None # Owner of the run (ActivityPub actor ID) infrastructure: Optional[dict] = None # Hardware/software used for rendering # ============ Config Models ============ class VariableInput(BaseModel): """A variable input that must be filled at run time.""" node_id: str name: str description: Optional[str] = None required: bool = True class FixedInput(BaseModel): """A fixed input resolved from the registry.""" node_id: str asset: str content_hash: str class ConfigStatus(BaseModel): """Status/metadata of a config.""" config_id: str # Content hash of the YAML file name: str version: str description: Optional[str] = None variable_inputs: list[VariableInput] fixed_inputs: list[FixedInput] output_node: str owner: Optional[str] = None uploaded_at: str uploader: Optional[str] = None class ConfigRunRequest(BaseModel): """Request to run a config with variable inputs.""" inputs: dict[str, str] # node_id -> content_hash def save_config(config: ConfigStatus): """Save config to Redis.""" redis_client.set(f"{CONFIGS_KEY_PREFIX}{config.config_id}", config.model_dump_json()) def load_config(config_id: str) -> Optional[ConfigStatus]: """Load config from Redis.""" data = redis_client.get(f"{CONFIGS_KEY_PREFIX}{config_id}") if data: return ConfigStatus.model_validate_json(data) return None def list_all_configs() -> list[ConfigStatus]: """List all configs from Redis.""" configs = [] for key in redis_client.scan_iter(f"{CONFIGS_KEY_PREFIX}*"): data = redis_client.get(key) if data: configs.append(ConfigStatus.model_validate_json(data)) return sorted(configs, key=lambda c: c.uploaded_at, reverse=True) def delete_config_from_redis(config_id: str) -> bool: """Delete config from Redis.""" return redis_client.delete(f"{CONFIGS_KEY_PREFIX}{config_id}") > 0 def parse_config_yaml(yaml_content: str, config_hash: str, uploader: str) -> ConfigStatus: """Parse a config YAML file and extract metadata.""" config = yaml.safe_load(yaml_content) # Extract basic info name = config.get("name", "unnamed") version = config.get("version", "1.0") description = config.get("description") owner = config.get("owner") # Parse registry registry = config.get("registry", {}) assets = registry.get("assets", {}) # Parse DAG nodes dag = config.get("dag", {}) nodes = dag.get("nodes", []) output_node = dag.get("output") variable_inputs = [] fixed_inputs = [] for node in nodes: node_id = node.get("id") node_type = node.get("type") node_config = node.get("config", {}) if node_type == "SOURCE": if node_config.get("input"): # Variable input variable_inputs.append(VariableInput( node_id=node_id, name=node_config.get("name", node_id), description=node_config.get("description"), required=node_config.get("required", True) )) elif "asset" in node_config: # Fixed input - resolve from registry asset_name = node_config["asset"] asset_info = assets.get(asset_name, {}) fixed_inputs.append(FixedInput( node_id=node_id, asset=asset_name, content_hash=asset_info.get("hash", "") )) return ConfigStatus( config_id=config_hash, name=name, version=version, description=description, variable_inputs=variable_inputs, fixed_inputs=fixed_inputs, output_node=output_node or "", owner=owner, uploaded_at=datetime.now(timezone.utc).isoformat(), uploader=uploader ) # ============ Auth ============ security = HTTPBearer(auto_error=False) def verify_token_with_l2(token: str) -> Optional[str]: """Verify token with L2 server, return username if valid.""" try: resp = http_requests.post( f"{L2_SERVER}/auth/verify", headers={"Authorization": f"Bearer {token}"}, timeout=5 ) if resp.status_code == 200: return resp.json().get("username") except Exception: pass return None async def get_optional_user( credentials: HTTPAuthorizationCredentials = Depends(security) ) -> Optional[str]: """Get username if authenticated, None otherwise.""" if not credentials: return None return verify_token_with_l2(credentials.credentials) async def get_required_user( credentials: HTTPAuthorizationCredentials = Depends(security) ) -> str: """Get username, raise 401 if not authenticated.""" if not credentials: raise HTTPException(401, "Not authenticated") username = verify_token_with_l2(credentials.credentials) if not username: raise HTTPException(401, "Invalid token") return username def file_hash(path: Path) -> str: """Compute SHA3-256 hash of a file.""" hasher = hashlib.sha3_256() with open(path, "rb") as f: for chunk in iter(lambda: f.read(65536), b""): hasher.update(chunk) return hasher.hexdigest() def cache_file(source: Path, node_type: str = "output") -> str: """ Copy file to cache using L1CacheManager, return content hash. Uses artdag's Cache internally for proper tracking. """ cached = cache_manager.put(source, node_type=node_type) return cached.content_hash def get_cache_path(content_hash: str) -> Optional[Path]: """Get the path for a cached file by content_hash.""" return cache_manager.get_by_content_hash(content_hash) @app.get("/api") async def api_info(): """Server info (JSON).""" return { "name": "Art DAG L1 Server", "version": "0.1.0", "cache_dir": str(CACHE_DIR), "runs_count": len(list_all_runs()) } HOME_HTML = """ Art DAG L1 Server

Art DAG L1 Server

L1 rendering server for the Art DAG system. Manages distributed rendering jobs via Celery workers.

Dependencies

API Endpoints

Method Path Description
GET/uiWeb UI for viewing runs
POST/runsStart a rendering run
GET/runsList all runs
GET/runs/{run_id}Get run status
GET/cacheList cached content hashes
GET/cache/{hash}Download cached content
POST/cache/uploadUpload file to cache
GET/assetsList known assets

Start a Run

curl -X POST /runs \\
  -H "Content-Type: application/json" \\
  -d '{"recipe": "dog", "inputs": ["33268b6e..."]}'

Provenance

Every render produces a provenance record linking inputs, effects, and infrastructure:

{
  "output": {"content_hash": "..."},
  "inputs": [...],
  "effects": [...],
  "infrastructure": {...}
}
""" @app.get("/", response_class=HTMLResponse) async def root(): """Home page.""" return HOME_HTML @app.post("/runs", response_model=RunStatus) async def create_run(request: RunRequest, username: str = Depends(get_required_user)): """Start a new rendering run. Requires authentication.""" run_id = str(uuid.uuid4()) # Generate output name if not provided output_name = request.output_name or f"{request.recipe}-{run_id[:8]}" # Format username as ActivityPub actor ID actor_id = f"@{username}@{L2_DOMAIN}" # Create run record run = RunStatus( run_id=run_id, status="pending", recipe=request.recipe, inputs=request.inputs, output_name=output_name, created_at=datetime.now(timezone.utc).isoformat(), username=actor_id ) # Submit to Celery if request.use_dag or request.recipe == "dag": # DAG mode - use artdag engine if request.dag_json: # Custom DAG provided dag_json = request.dag_json else: # Build simple effect DAG from recipe and inputs dag = build_effect_dag(request.inputs, request.recipe) dag_json = dag.to_json() task = execute_dag.delay(dag_json, run.run_id) else: # Legacy mode - single effect if len(request.inputs) != 1: raise HTTPException(400, "Legacy mode only supports single-input recipes. Use use_dag=true for multi-input.") input_hash = request.inputs[0] task = render_effect.delay(input_hash, request.recipe, output_name) run.celery_task_id = task.id run.status = "running" save_run(run) return run @app.get("/runs/{run_id}", response_model=RunStatus) async def get_run(run_id: str): """Get status of a run.""" run = load_run(run_id) if not run: raise HTTPException(404, f"Run {run_id} not found") # Check Celery task status if running if run.status == "running" and run.celery_task_id: task = celery_app.AsyncResult(run.celery_task_id) if task.ready(): if task.successful(): result = task.result run.status = "completed" run.completed_at = datetime.now(timezone.utc).isoformat() # Handle both legacy (render_effect) and new (execute_dag) result formats if "output_hash" in result: # New DAG result format run.output_hash = result.get("output_hash") output_path = Path(result.get("output_path", "")) if result.get("output_path") else None else: # Legacy render_effect format run.output_hash = result.get("output", {}).get("content_hash") output_path = Path(result.get("output", {}).get("local_path", "")) # Extract effects info from provenance (legacy only) effects = result.get("effects", []) if effects: run.effects_commit = effects[0].get("repo_commit") run.effect_url = effects[0].get("repo_url") # Extract infrastructure info (legacy only) run.infrastructure = result.get("infrastructure") # Cache the output (legacy mode - DAG already caches via cache_manager) if output_path and output_path.exists() and "output_hash" not in result: cache_file(output_path, node_type="effect_output") # Record activity for deletion tracking (legacy mode) if run.output_hash and run.inputs: cache_manager.record_simple_activity( input_hashes=run.inputs, output_hash=run.output_hash, run_id=run.run_id, ) else: run.status = "failed" run.error = str(task.result) # Save updated status save_run(run) return run @app.delete("/runs/{run_id}") async def discard_run(run_id: str, username: str = Depends(get_required_user)): """ Discard (delete) a run and its intermediate cache entries. Enforces deletion rules: - Cannot discard if any item (input, output) is published to L2 - Deletes intermediate cache entries - Keeps inputs (may be used by other runs) - Deletes orphaned outputs """ run = load_run(run_id) if not run: raise HTTPException(404, f"Run {run_id} not found") # Check ownership actor_id = f"@{username}@{L2_DOMAIN}" if run.username not in (username, actor_id): raise HTTPException(403, "Access denied") # Failed runs can always be deleted (no output to protect) if run.status != "failed": # Check if any items are pinned (published or input to published) items_to_check = list(run.inputs or []) if run.output_hash: items_to_check.append(run.output_hash) for content_hash in items_to_check: meta = load_cache_meta(content_hash) if meta.get("pinned"): pin_reason = meta.get("pin_reason", "published") raise HTTPException(400, f"Cannot discard run: item {content_hash[:16]}... is pinned ({pin_reason})") # Check if activity exists for this run activity = cache_manager.get_activity(run_id) if activity: # Use activity manager deletion rules can_discard, reason = cache_manager.can_discard_activity(run_id) if not can_discard: raise HTTPException(400, f"Cannot discard run: {reason}") # Discard the activity (cleans up cache entries) success, msg = cache_manager.discard_activity(run_id) if not success: raise HTTPException(500, f"Failed to discard: {msg}") # Remove from Redis redis_client.delete(f"{RUNS_KEY_PREFIX}{run_id}") return {"discarded": True, "run_id": run_id} @app.delete("/ui/runs/{run_id}/discard", response_class=HTMLResponse) async def ui_discard_run(run_id: str, request: Request): """HTMX handler: discard a run.""" current_user = get_user_from_cookie(request) if not current_user: return '
Login required
' run = load_run(run_id) if not run: return '
Run not found
' # Check ownership actor_id = f"@{current_user}@{L2_DOMAIN}" if run.username not in (current_user, actor_id): return '
Access denied
' # Failed runs can always be deleted if run.status != "failed": # Check if any items are pinned (published or input to published) items_to_check = list(run.inputs or []) if run.output_hash: items_to_check.append(run.output_hash) for content_hash in items_to_check: meta = load_cache_meta(content_hash) if meta.get("pinned"): pin_reason = meta.get("pin_reason", "published") return f'
Cannot discard: item {content_hash[:16]}... is pinned ({pin_reason})
' # Check if activity exists for this run activity = cache_manager.get_activity(run_id) if activity: can_discard, reason = cache_manager.can_discard_activity(run_id) if not can_discard: return f'
Cannot discard: {reason}
' success, msg = cache_manager.discard_activity(run_id) if not success: return f'
Failed to discard: {msg}
' # Remove from Redis redis_client.delete(f"{RUNS_KEY_PREFIX}{run_id}") return '''
Run deleted. Back to runs
''' @app.get("/run/{run_id}") async def run_detail(run_id: str, request: Request): """Run detail. HTML for browsers, JSON for APIs.""" run = load_run(run_id) if not run: if wants_html(request): content = f'

Run not found: {run_id}

' return HTMLResponse(render_page("Not Found", content, None, active_tab="runs"), status_code=404) raise HTTPException(404, f"Run {run_id} not found") # Check Celery task status if running if run.status == "running" and run.celery_task_id: task = celery_app.AsyncResult(run.celery_task_id) if task.ready(): if task.successful(): result = task.result run.status = "completed" run.completed_at = datetime.now(timezone.utc).isoformat() run.output_hash = result.get("output", {}).get("content_hash") effects = result.get("effects", []) if effects: run.effects_commit = effects[0].get("repo_commit") run.effect_url = effects[0].get("repo_url") run.infrastructure = result.get("infrastructure") output_path = Path(result.get("output", {}).get("local_path", "")) if output_path.exists(): cache_file(output_path) else: run.status = "failed" run.error = str(task.result) save_run(run) if wants_html(request): current_user = get_user_from_cookie(request) if not current_user: content = '

Login to view run details.

' return HTMLResponse(render_page("Login Required", content, current_user, active_tab="runs"), status_code=401) # Check user owns this run actor_id = f"@{current_user}@{L2_DOMAIN}" if run.username not in (current_user, actor_id): content = '

Access denied.

' return HTMLResponse(render_page("Access Denied", content, current_user, active_tab="runs"), status_code=403) # Build effect URL if run.effect_url: effect_url = run.effect_url elif run.effects_commit and run.effects_commit != "unknown": effect_url = f"https://git.rose-ash.com/art-dag/effects/src/commit/{run.effects_commit}/{run.recipe}" else: effect_url = f"https://git.rose-ash.com/art-dag/effects/src/branch/main/{run.recipe}" # Status badge colors status_colors = { "completed": "bg-green-600 text-white", "running": "bg-yellow-600 text-white", "failed": "bg-red-600 text-white", "pending": "bg-gray-600 text-white" } status_badge = status_colors.get(run.status, "bg-gray-600 text-white") # Build media HTML for input/output media_html = "" has_input = run.inputs and cache_manager.has_content(run.inputs[0]) has_output = run.status == "completed" and run.output_hash and cache_manager.has_content(run.output_hash) if has_input or has_output: media_html = '
' if has_input: input_hash = run.inputs[0] input_media_type = detect_media_type(get_cache_path(input_hash)) input_video_src = video_src_for_request(input_hash, request) if input_media_type == "video": input_elem = f'' elif input_media_type == "image": input_elem = f'input' else: input_elem = '

Unknown format

' media_html += f'''
Input
{input_hash[:24]}...
{input_elem}
''' if has_output: output_hash = run.output_hash output_media_type = detect_media_type(get_cache_path(output_hash)) output_video_src = video_src_for_request(output_hash, request) if output_media_type == "video": output_elem = f'' elif output_media_type == "image": output_elem = f'output' else: output_elem = '

Unknown format

' media_html += f'''
Output
{output_hash[:24]}...
{output_elem}
''' media_html += '
' # Build inputs list inputs_html = ''.join([f'{inp}' for inp in run.inputs]) # Infrastructure section infra_html = "" if run.infrastructure: software = run.infrastructure.get("software", {}) hardware = run.infrastructure.get("hardware", {}) infra_html = f'''
Infrastructure
Software: {software.get("name", "unknown")} ({software.get("content_hash", "unknown")[:16]}...)
Hardware: {hardware.get("name", "unknown")} ({hardware.get("content_hash", "unknown")[:16]}...)
''' # Error display error_html = "" if run.error: error_html = f'''
Error
{run.error}
''' # Publish section publish_html = "" if run.status == "completed" and run.output_hash: publish_html = f'''

Publish to L2

Register this transformation output on the L2 ActivityPub server.

''' # Delete section delete_html = f'''

Delete Run

{"This run failed and can be deleted." if run.status == "failed" else "Delete this run and its associated cache entries."}

''' output_link = "" if run.output_hash: output_link = f'''
Output
{run.output_hash}
''' completed_html = "" if run.completed_at: completed_html = f'''
Completed
{run.completed_at[:19].replace('T', ' ')}
''' content = f''' Back to runs
{run.recipe} {run.run_id[:16]}...
{run.status}
{error_html} {media_html}

Provenance

Owner
{run.username or "anonymous"}
Effect
{run.recipe}
Effects Commit
{run.effects_commit or "N/A"}
Input(s)
{inputs_html}
{output_link}
Run ID
{run.run_id}
Created
{run.created_at[:19].replace('T', ' ')}
{completed_html} {infra_html}
{publish_html} {delete_html}
''' return HTMLResponse(render_page(f"Run: {run.recipe}", content, current_user, active_tab="runs")) # JSON response return run.model_dump() @app.get("/runs") async def list_runs(request: Request, page: int = 1, limit: int = 20): """List runs. HTML for browsers (with infinite scroll), JSON for APIs (with pagination).""" current_user = get_user_from_cookie(request) all_runs = list_all_runs() total = len(all_runs) # Filter by user if logged in for HTML if wants_html(request) and current_user: actor_id = f"@{current_user}@{L2_DOMAIN}" all_runs = [r for r in all_runs if r.username in (current_user, actor_id)] total = len(all_runs) # Pagination start = (page - 1) * limit end = start + limit runs_page = all_runs[start:end] has_more = end < total if wants_html(request): if not current_user: content = '

Login to see your runs.

' return HTMLResponse(render_page("Runs", content, current_user, active_tab="runs")) if not runs_page: if page == 1: content = '

You have no runs yet. Use the CLI to start a run.

' else: return HTMLResponse("") # Empty for infinite scroll else: # Status badge colors status_colors = { "completed": "bg-green-600 text-white", "running": "bg-yellow-600 text-white", "failed": "bg-red-600 text-white", "pending": "bg-gray-600 text-white" } html_parts = [] for run in runs_page: status_badge = status_colors.get(run.status, "bg-gray-600 text-white") html_parts.append(f'''
{run.recipe}
{run.status}
Created: {run.created_at[:19].replace('T', ' ')}
''') # Show input and output thumbnails has_input = run.inputs and cache_manager.has_content(run.inputs[0]) has_output = run.status == "completed" and run.output_hash and cache_manager.has_content(run.output_hash) if has_input or has_output: html_parts.append('
') if has_input: input_hash = run.inputs[0] input_media_type = detect_media_type(get_cache_path(input_hash)) html_parts.append(f'''
Input
''') if input_media_type == "video": html_parts.append(f'') else: html_parts.append(f'input') html_parts.append('
') if has_output: output_hash = run.output_hash output_media_type = detect_media_type(get_cache_path(output_hash)) html_parts.append(f'''
Output
''') if output_media_type == "video": html_parts.append(f'') else: html_parts.append(f'output') html_parts.append('
') html_parts.append('
') if run.status == "failed" and run.error: html_parts.append(f'
Error: {run.error[:100]}
') html_parts.append('
') # For infinite scroll, just return cards if not first page if page > 1: if has_more: html_parts.append(f'''

Loading more...

''') return HTMLResponse('\n'.join(html_parts)) # First page - full content infinite_scroll_trigger = "" if has_more: infinite_scroll_trigger = f'''

Loading more...

''' content = f'''

Runs ({total} total)

{''.join(html_parts)} {infinite_scroll_trigger}
''' return HTMLResponse(render_page("Runs", content, current_user, active_tab="runs")) # JSON response for APIs return { "runs": [r.model_dump() for r in runs_page], "pagination": { "page": page, "limit": limit, "total": total, "has_more": has_more } } # ============ Config Endpoints ============ @app.post("/configs/upload") async def upload_config(file: UploadFile = File(...), username: str = Depends(get_required_user)): """Upload a config YAML file. Requires authentication.""" import tempfile # Read file content content = await file.read() try: yaml_content = content.decode('utf-8') except UnicodeDecodeError: raise HTTPException(400, "Config file must be valid UTF-8 text") # Validate YAML try: yaml.safe_load(yaml_content) except yaml.YAMLError as e: raise HTTPException(400, f"Invalid YAML: {e}") # Store YAML file in cache with tempfile.NamedTemporaryFile(delete=False, suffix=".yaml") as tmp: tmp.write(content) tmp_path = Path(tmp.name) cached = cache_manager.put(tmp_path, node_type="config", move=True) config_hash = cached.content_hash # Parse and save metadata actor_id = f"@{username}@{L2_DOMAIN}" try: config_status = parse_config_yaml(yaml_content, config_hash, actor_id) except Exception as e: raise HTTPException(400, f"Failed to parse config: {e}") save_config(config_status) # Save cache metadata save_cache_meta(config_hash, actor_id, file.filename, type="config", config_name=config_status.name) return { "config_id": config_hash, "name": config_status.name, "version": config_status.version, "variable_inputs": len(config_status.variable_inputs), "fixed_inputs": len(config_status.fixed_inputs) } @app.get("/configs") async def list_configs_api(request: Request, page: int = 1, limit: int = 20): """List configs. HTML for browsers, JSON for APIs.""" current_user = get_user_from_cookie(request) all_configs = list_all_configs() total = len(all_configs) # Pagination start = (page - 1) * limit end = start + limit configs_page = all_configs[start:end] has_more = end < total if wants_html(request): # HTML response - redirect to /configs page with proper UI return RedirectResponse(f"/configs?page={page}") # JSON response for APIs return { "configs": [c.model_dump() for c in configs_page], "pagination": { "page": page, "limit": limit, "total": total, "has_more": has_more } } @app.get("/configs/{config_id}") async def get_config_api(config_id: str): """Get config details.""" config = load_config(config_id) if not config: raise HTTPException(404, f"Config {config_id} not found") return config @app.delete("/configs/{config_id}") async def remove_config(config_id: str, username: str = Depends(get_required_user)): """Delete a config. Requires authentication.""" config = load_config(config_id) if not config: raise HTTPException(404, f"Config {config_id} not found") # Check ownership actor_id = f"@{username}@{L2_DOMAIN}" if config.uploader not in (username, actor_id): raise HTTPException(403, "Access denied") # Check if pinned pinned, reason = cache_manager.is_pinned(config_id) if pinned: raise HTTPException(400, f"Cannot delete pinned config: {reason}") # Delete from Redis and cache delete_config_from_redis(config_id) cache_manager.delete_by_content_hash(config_id) return {"deleted": True, "config_id": config_id} @app.post("/configs/{config_id}/run") async def run_config(config_id: str, request: ConfigRunRequest, username: str = Depends(get_required_user)): """Run a config with provided variable inputs. Requires authentication.""" config = load_config(config_id) if not config: raise HTTPException(404, f"Config {config_id} not found") # Validate all required inputs are provided for var_input in config.variable_inputs: if var_input.required and var_input.node_id not in request.inputs: raise HTTPException(400, f"Missing required input: {var_input.name}") # Load config YAML config_path = cache_manager.get_by_content_hash(config_id) if not config_path: raise HTTPException(500, "Config YAML not found in cache") with open(config_path) as f: yaml_config = yaml.safe_load(f) # Build DAG from config dag = build_dag_from_config(yaml_config, request.inputs, config) # Create run run_id = str(uuid.uuid4()) actor_id = f"@{username}@{L2_DOMAIN}" # Collect all input hashes all_inputs = list(request.inputs.values()) for fixed in config.fixed_inputs: if fixed.content_hash: all_inputs.append(fixed.content_hash) run = RunStatus( run_id=run_id, status="pending", recipe=f"config:{config.name}", inputs=all_inputs, output_name=f"{config.name}-{run_id[:8]}", created_at=datetime.now(timezone.utc).isoformat(), username=actor_id ) # Submit to Celery dag_json = dag.to_json() task = execute_dag.delay(dag_json, run.run_id) run.celery_task_id = task.id run.status = "running" save_run(run) return run def build_dag_from_config(yaml_config: dict, user_inputs: dict[str, str], config: ConfigStatus): """Build a DAG from config YAML with user-provided inputs.""" from artdag import DAG, Node dag = DAG() node_map = {} # node_id -> Node registry = yaml_config.get("registry", {}) assets = registry.get("assets", {}) effects = registry.get("effects", {}) dag_config = yaml_config.get("dag", {}) nodes = dag_config.get("nodes", []) for node_def in nodes: node_id = node_def.get("id") node_type = node_def.get("type") node_config = node_def.get("config", {}) input_ids = node_def.get("inputs", []) if node_type == "SOURCE": if node_config.get("input"): # Variable input - use user-provided hash content_hash = user_inputs.get(node_id) if not content_hash: raise HTTPException(400, f"Missing input for node {node_id}") node = Node(node_id, "SOURCE", {"content_hash": content_hash}) else: # Fixed input - use registry hash asset_name = node_config.get("asset") asset_info = assets.get(asset_name, {}) content_hash = asset_info.get("hash") if not content_hash: raise HTTPException(400, f"Asset {asset_name} not found in registry") node = Node(node_id, "SOURCE", {"content_hash": content_hash}) elif node_type == "EFFECT": effect_name = node_config.get("effect") effect_info = effects.get(effect_name, {}) effect_hash = effect_info.get("hash") node = Node(node_id, "EFFECT", {"effect": effect_name, "effect_hash": effect_hash}) else: node = Node(node_id, node_type, node_config) node_map[node_id] = node dag.add_node(node) # Connect edges for node_def in nodes: node_id = node_def.get("id") input_ids = node_def.get("inputs", []) for input_id in input_ids: dag.add_edge(input_id, node_id) return dag # ============ Config UI Pages ============ @app.get("/configs", response_class=HTMLResponse) async def configs_page(request: Request, page: int = 1): """Configs list page (HTML).""" current_user = get_user_from_cookie(request) if not current_user: return HTMLResponse(render_page( "Configs", '

Login to see configs.

', None, active_tab="configs" )) all_configs = list_all_configs() # Filter to user's configs actor_id = f"@{current_user}@{L2_DOMAIN}" user_configs = [c for c in all_configs if c.uploader in (current_user, actor_id)] total = len(user_configs) if not user_configs: content = '''

Configs (0)

No configs yet. Upload a config YAML file to get started.

''' return HTMLResponse(render_page("Configs", content, current_user, active_tab="configs")) html_parts = [] for config in user_configs: var_count = len(config.variable_inputs) fixed_count = len(config.fixed_inputs) input_info = [] if var_count: input_info.append(f"{var_count} variable") if fixed_count: input_info.append(f"{fixed_count} fixed") inputs_str = ", ".join(input_info) if input_info else "no inputs" html_parts.append(f'''
{config.name} v{config.version}
{inputs_str}
{config.description or "No description"}
{config.config_id[:24]}...
''') content = f'''

Configs ({total})

{''.join(html_parts)}
''' return HTMLResponse(render_page("Configs", content, current_user, active_tab="configs")) @app.get("/config/{config_id}", response_class=HTMLResponse) async def config_detail_page(config_id: str, request: Request): """Config detail page with run form.""" current_user = get_user_from_cookie(request) config = load_config(config_id) if not config: return HTMLResponse(render_page( "Config Not Found", f'

Config {config_id} not found.

', current_user, active_tab="configs" ), status_code=404) # Build variable inputs form var_inputs_html = "" if config.variable_inputs: var_inputs_html = '
' for var_input in config.variable_inputs: required = "required" if var_input.required else "" var_inputs_html += f'''

{var_input.description or 'Enter a content hash from your cache'}

''' var_inputs_html += '
' else: var_inputs_html = '

This config has no variable inputs - it uses fixed assets only.

' # Build fixed inputs display fixed_inputs_html = "" if config.fixed_inputs: fixed_inputs_html = '

Fixed Inputs

' # Check if pinned pinned, pin_reason = cache_manager.is_pinned(config_id) pinned_badge = "" if pinned: pinned_badge = f'Pinned: {pin_reason}' content = f'''
← Back to configs

{config.name}

v{config.version} {pinned_badge}

{config.description or 'No description'}

{config.config_id}
{fixed_inputs_html}

Run this Config

{var_inputs_html}
''' return HTMLResponse(render_page(f"Config: {config.name}", content, current_user, active_tab="configs")) @app.post("/ui/configs/{config_id}/run", response_class=HTMLResponse) async def ui_run_config(config_id: str, request: Request): """HTMX handler: run a config with form inputs.""" current_user = get_user_from_cookie(request) if not current_user: return '
Login required
' config = load_config(config_id) if not config: return '
Config not found
' # Parse form data form_data = await request.form() inputs = {} for var_input in config.variable_inputs: value = form_data.get(var_input.node_id, "").strip() if var_input.required and not value: return f'
Missing required input: {var_input.name}
' if value: inputs[var_input.node_id] = value # Load config YAML config_path = cache_manager.get_by_content_hash(config_id) if not config_path: return '
Config YAML not found in cache
' try: with open(config_path) as f: yaml_config = yaml.safe_load(f) # Build DAG from config dag = build_dag_from_config(yaml_config, inputs, config) # Create run run_id = str(uuid.uuid4()) actor_id = f"@{current_user}@{L2_DOMAIN}" # Collect all input hashes all_inputs = list(inputs.values()) for fixed in config.fixed_inputs: if fixed.content_hash: all_inputs.append(fixed.content_hash) run = RunStatus( run_id=run_id, status="pending", recipe=f"config:{config.name}", inputs=all_inputs, output_name=f"{config.name}-{run_id[:8]}", created_at=datetime.now(timezone.utc).isoformat(), username=actor_id ) # Submit to Celery dag_json = dag.to_json() task = execute_dag.delay(dag_json, run.run_id) run.celery_task_id = task.id run.status = "running" save_run(run) return f'''
Run started! View run
''' except Exception as e: return f'
Error: {str(e)}
' @app.get("/ui/configs-list", response_class=HTMLResponse) async def ui_configs_list(request: Request): """HTMX partial: list of configs.""" current_user = get_user_from_cookie(request) if not current_user: return '

Login to see configs.

' all_configs = list_all_configs() # Filter to user's configs actor_id = f"@{current_user}@{L2_DOMAIN}" user_configs = [c for c in all_configs if c.uploader in (current_user, actor_id)] if not user_configs: return '

No configs yet. Upload a config YAML file to get started.

' html_parts = ['
'] for config in user_configs: var_count = len(config.variable_inputs) fixed_count = len(config.fixed_inputs) input_info = [] if var_count: input_info.append(f"{var_count} variable") if fixed_count: input_info.append(f"{fixed_count} fixed") inputs_str = ", ".join(input_info) if input_info else "no inputs" html_parts.append(f'''
{config.name} v{config.version}
{inputs_str}
{config.description or "No description"}
{config.config_id[:24]}...
''') html_parts.append('
') return '\n'.join(html_parts) @app.delete("/ui/configs/{config_id}/discard", response_class=HTMLResponse) async def ui_discard_config(config_id: str, request: Request): """HTMX handler: discard a config.""" current_user = get_user_from_cookie(request) if not current_user: return '
Login required
' config = load_config(config_id) if not config: return '
Config not found
' # Check ownership actor_id = f"@{current_user}@{L2_DOMAIN}" if config.uploader not in (current_user, actor_id): return '
Access denied
' # Check if pinned pinned, reason = cache_manager.is_pinned(config_id) if pinned: return f'
Cannot delete: config is pinned ({reason})
' # Delete from Redis and cache delete_config_from_redis(config_id) cache_manager.delete_by_content_hash(config_id) return '''
Config deleted. Back to configs
''' @app.get("/cache/{content_hash}") async def get_cached(content_hash: str): """Get cached content by hash.""" cache_path = get_cache_path(content_hash) if not cache_path: raise HTTPException(404, f"Content {content_hash} not in cache") return FileResponse(cache_path) @app.get("/cache/{content_hash}/mp4") async def get_cached_mp4(content_hash: str): """Get cached content as MP4 (transcodes MKV on first request, caches result).""" cache_path = get_cache_path(content_hash) if not cache_path: raise HTTPException(404, f"Content {content_hash} not in cache") # MP4 transcodes stored alongside original in CACHE_DIR mp4_path = CACHE_DIR / f"{content_hash}.mp4" # If MP4 already cached, serve it if mp4_path.exists(): return FileResponse(mp4_path, media_type="video/mp4") # Check if source is already MP4 media_type = detect_media_type(cache_path) if media_type != "video": raise HTTPException(400, "Content is not a video") # Check if already MP4 format import subprocess try: result = subprocess.run( ["ffprobe", "-v", "error", "-select_streams", "v:0", "-show_entries", "format=format_name", "-of", "csv=p=0", str(cache_path)], capture_output=True, text=True, timeout=10 ) if "mp4" in result.stdout.lower() or "mov" in result.stdout.lower(): # Already MP4-compatible, just serve original return FileResponse(cache_path, media_type="video/mp4") except Exception: pass # Continue with transcoding # Transcode to MP4 (H.264 + AAC) transcode_path = CACHE_DIR / f"{content_hash}.transcoding.mp4" try: result = subprocess.run( ["ffmpeg", "-y", "-i", str(cache_path), "-c:v", "libx264", "-preset", "fast", "-crf", "23", "-c:a", "aac", "-b:a", "128k", "-movflags", "+faststart", str(transcode_path)], capture_output=True, text=True, timeout=600 # 10 min timeout ) if result.returncode != 0: raise HTTPException(500, f"Transcoding failed: {result.stderr[:200]}") # Move to final location transcode_path.rename(mp4_path) except subprocess.TimeoutExpired: if transcode_path.exists(): transcode_path.unlink() raise HTTPException(500, "Transcoding timed out") except Exception as e: if transcode_path.exists(): transcode_path.unlink() raise HTTPException(500, f"Transcoding failed: {e}") return FileResponse(mp4_path, media_type="video/mp4") @app.get("/cache/{content_hash}/detail") async def cache_detail(content_hash: str, request: Request): """View cached content detail. HTML for browsers, JSON for APIs.""" current_user = get_user_from_cookie(request) cache_path = get_cache_path(content_hash) if not cache_path: if wants_html(request): content = f'

Content not found: {content_hash}

' return HTMLResponse(render_page("Not Found", content, current_user, active_tab="cache"), status_code=404) raise HTTPException(404, f"Content {content_hash} not in cache") if wants_html(request): if not current_user: content = '

Login to view cached content.

' return HTMLResponse(render_page("Login Required", content, current_user, active_tab="cache"), status_code=401) # Check user has access user_hashes = get_user_cache_hashes(current_user) if content_hash not in user_hashes: content = '

Access denied.

' return HTMLResponse(render_page("Access Denied", content, current_user, active_tab="cache"), status_code=403) media_type = detect_media_type(cache_path) file_size = cache_path.stat().st_size size_str = f"{file_size:,} bytes" if file_size > 1024*1024: size_str = f"{file_size/(1024*1024):.1f} MB" elif file_size > 1024: size_str = f"{file_size/1024:.1f} KB" # Build media display HTML if media_type == "video": video_src = video_src_for_request(content_hash, request) media_html = f'' elif media_type == "image": media_html = f'{content_hash}' else: media_html = f'

Unknown file type. Download file

' content = f''' Back to cache
{media_type.capitalize()} {content_hash[:24]}...
Download
{media_html}

Details

Content Hash (SHA3-256)
{content_hash}
Type
{media_type}
Size
{size_str}
Loading metadata...
''' return HTMLResponse(render_page(f"Cache: {content_hash[:16]}...", content, current_user, active_tab="cache")) # JSON response - return metadata meta = load_cache_meta(content_hash) file_size = cache_path.stat().st_size media_type = detect_media_type(cache_path) return { "content_hash": content_hash, "size": file_size, "media_type": media_type, "meta": meta } @app.get("/cache/{content_hash}/meta-form", response_class=HTMLResponse) async def cache_meta_form(content_hash: str, request: Request): """Clean URL redirect to the HTMX meta form.""" # Just redirect to the old endpoint for now from starlette.responses import RedirectResponse return RedirectResponse(f"/ui/cache/{content_hash}/meta-form", status_code=302) @app.get("/ui/cache/{content_hash}") async def ui_cache_view(content_hash: str): """Redirect to clean URL.""" return RedirectResponse(url=f"/cache/{content_hash}/detail", status_code=302) @app.get("/ui/cache/{content_hash}/meta-form", response_class=HTMLResponse) async def ui_cache_meta_form(content_hash: str, request: Request): """HTMX partial: metadata editing form for a cached item.""" current_user = get_user_from_cookie(request) if not current_user: return '
Login required to edit metadata
' # Check ownership user_hashes = get_user_cache_hashes(current_user) if content_hash not in user_hashes: return '
Access denied
' # Load metadata meta = load_cache_meta(content_hash) origin = meta.get("origin", {}) origin_type = origin.get("type", "") origin_url = origin.get("url", "") origin_note = origin.get("note", "") description = meta.get("description", "") tags = meta.get("tags", []) tags_str = ", ".join(tags) if tags else "" published = meta.get("published", {}) pinned = meta.get("pinned", False) pin_reason = meta.get("pin_reason", "") # Detect media type for publish cache_path = get_cache_path(content_hash) media_type = detect_media_type(cache_path) if cache_path else "unknown" asset_type = "video" if media_type == "video" else "image" # Origin radio checked states self_checked = 'checked' if origin_type == "self" else '' external_checked = 'checked' if origin_type == "external" else '' # Build publish section if published.get("to_l2"): asset_name = published.get("asset_name", "") published_at = published.get("published_at", "")[:10] last_synced = published.get("last_synced_at", "")[:10] publish_html = f'''
Published to L2
Asset name: {asset_name}
Published: {published_at}
Last synced: {last_synced}
''' else: # Show publish form only if origin is set if origin_type: publish_html = f'''
''' else: publish_html = '''
Set an origin (self or external URL) before publishing.
''' return f'''

Metadata

Comma-separated list

Publish to L2 (ActivityPub)

{publish_html}

Status

Pinned: {'Yes' if pinned else 'No'} {f'({pin_reason})' if pinned and pin_reason else ''}

Pinned items cannot be discarded. Items are pinned when published or used as inputs to published content.

{'

Cannot discard pinned items.

' if pinned else f""" """}
''' @app.patch("/ui/cache/{content_hash}/meta", response_class=HTMLResponse) async def ui_update_cache_meta(content_hash: str, request: Request): """HTMX handler: update cache metadata from form.""" current_user = get_user_from_cookie(request) if not current_user: return '
Login required
' # Check ownership user_hashes = get_user_cache_hashes(current_user) if content_hash not in user_hashes: return '
Access denied
' # Parse form data form = await request.form() origin_type = form.get("origin_type", "") origin_url = form.get("origin_url", "").strip() origin_note = form.get("origin_note", "").strip() description = form.get("description", "").strip() tags_str = form.get("tags", "").strip() # Build origin origin = None if origin_type == "self": origin = {"type": "self"} elif origin_type == "external": if not origin_url: return '
External origin requires a URL
' origin = {"type": "external", "url": origin_url} if origin_note: origin["note"] = origin_note # Parse tags tags = [t.strip() for t in tags_str.split(",") if t.strip()] if tags_str else [] # Build updates updates = {} if origin: updates["origin"] = origin if description: updates["description"] = description updates["tags"] = tags # Save save_cache_meta(content_hash, **updates) return '
Metadata saved!
' @app.post("/ui/cache/{content_hash}/publish", response_class=HTMLResponse) async def ui_publish_cache(content_hash: str, request: Request): """HTMX handler: publish cache item to L2.""" current_user = get_user_from_cookie(request) if not current_user: return '
Login required
' token = request.cookies.get("auth_token") if not token: return '
Auth token required
' # Check ownership user_hashes = get_user_cache_hashes(current_user) if content_hash not in user_hashes: return '
Access denied
' # Parse form form = await request.form() asset_name = form.get("asset_name", "").strip() asset_type = form.get("asset_type", "image") if not asset_name: return '
Asset name required
' # Load metadata meta = load_cache_meta(content_hash) origin = meta.get("origin") if not origin or "type" not in origin: return '
Set origin before publishing
' # Call L2 try: resp = http_requests.post( f"{L2_SERVER}/registry/publish-cache", headers={"Authorization": f"Bearer {token}"}, json={ "content_hash": content_hash, "asset_name": asset_name, "asset_type": asset_type, "origin": origin, "description": meta.get("description"), "tags": meta.get("tags", []), "metadata": { "filename": meta.get("filename"), "folder": meta.get("folder"), "collections": meta.get("collections", []) } }, timeout=30 ) resp.raise_for_status() except http_requests.exceptions.HTTPError as e: error_detail = "" try: error_detail = e.response.json().get("detail", str(e)) except Exception: error_detail = str(e) return f'
Error: {error_detail}
' except Exception as e: return f'
Error: {e}
' # Update local metadata publish_info = { "to_l2": True, "asset_name": asset_name, "published_at": datetime.now(timezone.utc).isoformat(), "last_synced_at": datetime.now(timezone.utc).isoformat() } save_cache_meta(content_hash, published=publish_info, pinned=True, pin_reason="published") return f'''
Published to L2 as {asset_name}! View on L2
''' @app.patch("/ui/cache/{content_hash}/republish", response_class=HTMLResponse) async def ui_republish_cache(content_hash: str, request: Request): """HTMX handler: re-publish (update) cache item on L2.""" current_user = get_user_from_cookie(request) if not current_user: return '
Login required
' token = request.cookies.get("auth_token") if not token: return '
Auth token required
' # Check ownership user_hashes = get_user_cache_hashes(current_user) if content_hash not in user_hashes: return '
Access denied
' # Load metadata meta = load_cache_meta(content_hash) published = meta.get("published", {}) if not published.get("to_l2"): return '
Item not published yet
' asset_name = published.get("asset_name") if not asset_name: return '
No asset name found
' # Call L2 update try: resp = http_requests.patch( f"{L2_SERVER}/registry/{asset_name}", headers={"Authorization": f"Bearer {token}"}, json={ "description": meta.get("description"), "tags": meta.get("tags"), "origin": meta.get("origin"), "metadata": { "filename": meta.get("filename"), "folder": meta.get("folder"), "collections": meta.get("collections", []) } }, timeout=30 ) resp.raise_for_status() except http_requests.exceptions.HTTPError as e: error_detail = "" try: error_detail = e.response.json().get("detail", str(e)) except Exception: error_detail = str(e) return f'
Error: {error_detail}
' except Exception as e: return f'
Error: {e}
' # Update local metadata published["last_synced_at"] = datetime.now(timezone.utc).isoformat() save_cache_meta(content_hash, published=published) return '
Updated on L2!
' @app.get("/cache") async def list_cache( request: Request, page: int = 1, limit: int = 20, folder: Optional[str] = None, collection: Optional[str] = None, tag: Optional[str] = None ): """List cached content. HTML for browsers (with infinite scroll), JSON for APIs (with pagination).""" current_user = get_user_from_cookie(request) if wants_html(request): # Require login for HTML cache view if not current_user: content = '

Login to see cached content.

' return HTMLResponse(render_page("Cache", content, current_user, active_tab="cache")) # Get hashes owned by/associated with this user user_hashes = get_user_cache_hashes(current_user) # Get cache items that belong to the user (from cache_manager) cache_items = [] for cached_file in cache_manager.list_all(): content_hash = cached_file.content_hash if content_hash not in user_hashes: continue meta = load_cache_meta(content_hash) # Apply folder filter if folder: item_folder = meta.get("folder", "/") if folder != "/" and not item_folder.startswith(folder): continue if folder == "/" and item_folder != "/": continue # Apply collection filter if collection: if collection not in meta.get("collections", []): continue # Apply tag filter if tag: if tag not in meta.get("tags", []): continue cache_items.append({ "hash": content_hash, "size": cached_file.size_bytes, "mtime": cached_file.created_at, "meta": meta }) # Sort by modification time (newest first) cache_items.sort(key=lambda x: x["mtime"], reverse=True) total = len(cache_items) # Pagination start = (page - 1) * limit end = start + limit items_page = cache_items[start:end] has_more = end < total if not items_page: if page == 1: filter_msg = "" if folder: filter_msg = f" in folder {folder}" elif collection: filter_msg = f" in collection '{collection}'" elif tag: filter_msg = f" with tag '{tag}'" content = f'

No cached files{filter_msg}. Upload files or run effects to see them here.

' else: return HTMLResponse("") # Empty for infinite scroll else: html_parts = [] for item in items_page: content_hash = item["hash"] cache_path = get_cache_path(content_hash) media_type = detect_media_type(cache_path) if cache_path else "unknown" # Format size size = item["size"] if size > 1024*1024: size_str = f"{size/(1024*1024):.1f} MB" elif size > 1024: size_str = f"{size/1024:.1f} KB" else: size_str = f"{size} bytes" html_parts.append(f'''
{media_type} {size_str}
{content_hash[:24]}...
''') if media_type == "video": video_src = video_src_for_request(content_hash, request) html_parts.append(f'') elif media_type == "image": html_parts.append(f'{content_hash[:16]}') else: html_parts.append('

Unknown file type

') html_parts.append('
') # For infinite scroll, just return cards if not first page if page > 1: if has_more: query_params = f"page={page + 1}" if folder: query_params += f"&folder={folder}" if collection: query_params += f"&collection={collection}" if tag: query_params += f"&tag={tag}" html_parts.append(f'''

Loading more...

''') return HTMLResponse('\n'.join(html_parts)) # First page - full content infinite_scroll_trigger = "" if has_more: query_params = "page=2" if folder: query_params += f"&folder={folder}" if collection: query_params += f"&collection={collection}" if tag: query_params += f"&tag={tag}" infinite_scroll_trigger = f'''

Loading more...

''' content = f'''

Cache ({total} items)

{''.join(html_parts)} {infinite_scroll_trigger}
''' return HTMLResponse(render_page("Cache", content, current_user, active_tab="cache")) # JSON response for APIs - list all hashes with optional pagination all_hashes = [cf.content_hash for cf in cache_manager.list_all()] total = len(all_hashes) start = (page - 1) * limit end = start + limit hashes_page = all_hashes[start:end] has_more = end < total return { "hashes": hashes_page, "pagination": { "page": page, "limit": limit, "total": total, "has_more": has_more } } @app.delete("/cache/{content_hash}") async def discard_cache(content_hash: str, username: str = Depends(get_required_user)): """ Discard (delete) a cached item. Enforces deletion rules: - Cannot delete items published to L2 (shared) - Cannot delete inputs/outputs of activities (runs) - Cannot delete pinned items """ # Check if content exists if not cache_manager.has_content(content_hash): raise HTTPException(404, "Content not found") # Check ownership user_hashes = get_user_cache_hashes(username) if content_hash not in user_hashes: raise HTTPException(403, "Access denied") # Check if pinned (legacy metadata) meta = load_cache_meta(content_hash) if meta.get("pinned"): pin_reason = meta.get("pin_reason", "unknown") raise HTTPException(400, f"Cannot discard pinned item (reason: {pin_reason})") # Check if used by any run (Redis runs, not just activity store) runs_using = find_runs_using_content(content_hash) if runs_using: run, role = runs_using[0] raise HTTPException(400, f"Cannot discard: item is {role} of run {run.run_id}") # Check deletion rules via cache_manager (L2 shared status, activity store) can_delete, reason = cache_manager.can_delete(content_hash) if not can_delete: raise HTTPException(400, f"Cannot discard: {reason}") # Delete via cache_manager success, msg = cache_manager.delete_by_content_hash(content_hash) if not success: # Fallback to legacy deletion cache_path = get_cache_path(content_hash) if cache_path and cache_path.exists(): cache_path.unlink() # Clean up legacy metadata files meta_path = CACHE_DIR / f"{content_hash}.meta.json" if meta_path.exists(): meta_path.unlink() mp4_path = CACHE_DIR / f"{content_hash}.mp4" if mp4_path.exists(): mp4_path.unlink() return {"discarded": True, "content_hash": content_hash} @app.delete("/ui/cache/{content_hash}/discard", response_class=HTMLResponse) async def ui_discard_cache(content_hash: str, request: Request): """HTMX handler: discard a cached item.""" current_user = get_user_from_cookie(request) if not current_user: return '
Login required
' # Check ownership user_hashes = get_user_cache_hashes(current_user) if content_hash not in user_hashes: return '
Access denied
' # Check if content exists if not cache_manager.has_content(content_hash): return '
Content not found
' # Check if pinned (legacy metadata) meta = load_cache_meta(content_hash) if meta.get("pinned"): pin_reason = meta.get("pin_reason", "unknown") return f'
Cannot discard: item is pinned ({pin_reason})
' # Check if used by any run (Redis runs, not just activity store) runs_using = find_runs_using_content(content_hash) if runs_using: run, role = runs_using[0] return f'
Cannot discard: item is {role} of run {run.run_id}
' # Check deletion rules via cache_manager (L2 shared status, activity store) can_delete, reason = cache_manager.can_delete(content_hash) if not can_delete: return f'
Cannot discard: {reason}
' # Delete via cache_manager success, msg = cache_manager.delete_by_content_hash(content_hash) if not success: # Fallback to legacy deletion cache_path = get_cache_path(content_hash) if cache_path and cache_path.exists(): cache_path.unlink() # Clean up legacy metadata files meta_path = CACHE_DIR / f"{content_hash}.meta.json" if meta_path.exists(): meta_path.unlink() mp4_path = CACHE_DIR / f"{content_hash}.mp4" if mp4_path.exists(): mp4_path.unlink() return '''
Item discarded. Back to cache
''' # Known assets (bootstrap data) KNOWN_ASSETS = { "cat": "33268b6e167deaf018cc538de12dbe562612b33e89a749391cef855b320a269b", } @app.get("/assets") async def list_assets(): """List known assets.""" return KNOWN_ASSETS @app.post("/cache/import") async def import_to_cache(path: str): """Import a local file to cache.""" source = Path(path) if not source.exists(): raise HTTPException(404, f"File not found: {path}") content_hash = cache_file(source) return {"content_hash": content_hash, "cached": True} def save_cache_meta(content_hash: str, uploader: str = None, filename: str = None, **updates): """Save or update metadata for a cached file.""" meta_path = CACHE_DIR / f"{content_hash}.meta.json" # Load existing or create new if meta_path.exists(): with open(meta_path) as f: meta = json.load(f) else: meta = { "uploader": uploader, "uploaded_at": datetime.now(timezone.utc).isoformat(), "filename": filename } # Apply updates (but never change uploader or uploaded_at) for key, value in updates.items(): if key not in ("uploader", "uploaded_at"): meta[key] = value with open(meta_path, "w") as f: json.dump(meta, f, indent=2) return meta def load_cache_meta(content_hash: str) -> dict: """Load metadata for a cached file.""" meta_path = CACHE_DIR / f"{content_hash}.meta.json" if meta_path.exists(): with open(meta_path) as f: return json.load(f) return {} # User data storage (folders, collections) USER_DATA_DIR = CACHE_DIR / ".user-data" def load_user_data(username: str) -> dict: """Load user's folders and collections.""" USER_DATA_DIR.mkdir(parents=True, exist_ok=True) # Normalize username (remove @ prefix if present) safe_name = username.replace("@", "").replace("/", "_") user_file = USER_DATA_DIR / f"{safe_name}.json" if user_file.exists(): with open(user_file) as f: return json.load(f) return {"folders": ["/"], "collections": []} def save_user_data(username: str, data: dict): """Save user's folders and collections.""" USER_DATA_DIR.mkdir(parents=True, exist_ok=True) safe_name = username.replace("@", "").replace("/", "_") user_file = USER_DATA_DIR / f"{safe_name}.json" with open(user_file, "w") as f: json.dump(data, f, indent=2) def get_user_cache_hashes(username: str) -> set: """Get all cache hashes owned by or associated with a user.""" actor_id = f"@{username}@{L2_DOMAIN}" hashes = set() # Files uploaded by user if CACHE_DIR.exists(): for f in CACHE_DIR.iterdir(): if f.name.endswith('.meta.json'): meta = load_cache_meta(f.name.replace('.meta.json', '')) if meta.get("uploader") in (username, actor_id): hashes.add(f.name.replace('.meta.json', '')) # Files from user's runs (inputs and outputs) for run in list_all_runs(): if run.username in (username, actor_id): hashes.update(run.inputs) if run.output_hash: hashes.add(run.output_hash) return hashes @app.post("/cache/upload") async def upload_to_cache(file: UploadFile = File(...), username: str = Depends(get_required_user)): """Upload a file to cache. Requires authentication.""" # Write to temp file first import tempfile with tempfile.NamedTemporaryFile(delete=False) as tmp: content = await file.read() tmp.write(content) tmp_path = Path(tmp.name) # Store in cache via cache_manager cached = cache_manager.put(tmp_path, node_type="upload", move=True) content_hash = cached.content_hash # Save uploader metadata actor_id = f"@{username}@{L2_DOMAIN}" save_cache_meta(content_hash, actor_id, file.filename) return {"content_hash": content_hash, "filename": file.filename, "size": len(content)} class CacheMetaUpdate(BaseModel): """Request to update cache metadata.""" origin: Optional[dict] = None # {"type": "self"|"external", "url": "...", "note": "..."} description: Optional[str] = None tags: Optional[list[str]] = None folder: Optional[str] = None collections: Optional[list[str]] = None class PublishRequest(BaseModel): """Request to publish a cache item to L2.""" asset_name: str asset_type: str = "image" # image, video, etc. @app.get("/cache/{content_hash}/meta") async def get_cache_meta(content_hash: str, username: str = Depends(get_required_user)): """Get metadata for a cached file.""" # Check file exists cache_path = get_cache_path(content_hash) if not cache_path: raise HTTPException(404, "Content not found") # Check ownership user_hashes = get_user_cache_hashes(username) if content_hash not in user_hashes: raise HTTPException(403, "Access denied") return load_cache_meta(content_hash) @app.patch("/cache/{content_hash}/meta") async def update_cache_meta(content_hash: str, update: CacheMetaUpdate, username: str = Depends(get_required_user)): """Update metadata for a cached file.""" # Check file exists cache_path = get_cache_path(content_hash) if not cache_path: raise HTTPException(404, "Content not found") # Check ownership user_hashes = get_user_cache_hashes(username) if content_hash not in user_hashes: raise HTTPException(403, "Access denied") # Build update dict from non-None fields updates = {} if update.origin is not None: updates["origin"] = update.origin if update.description is not None: updates["description"] = update.description if update.tags is not None: updates["tags"] = update.tags if update.folder is not None: # Ensure folder exists in user's folder list user_data = load_user_data(username) if update.folder not in user_data["folders"]: raise HTTPException(400, f"Folder does not exist: {update.folder}") updates["folder"] = update.folder if update.collections is not None: # Validate collections exist user_data = load_user_data(username) existing = {c["name"] for c in user_data["collections"]} for col in update.collections: if col not in existing: raise HTTPException(400, f"Collection does not exist: {col}") updates["collections"] = update.collections meta = save_cache_meta(content_hash, **updates) return meta @app.post("/cache/{content_hash}/publish") async def publish_cache_to_l2( content_hash: str, req: PublishRequest, request: Request, username: str = Depends(get_required_user) ): """ Publish a cache item to L2 (ActivityPub). Requires origin to be set in metadata before publishing. """ # Check file exists cache_path = get_cache_path(content_hash) if not cache_path: raise HTTPException(404, "Content not found") # Check ownership user_hashes = get_user_cache_hashes(username) if content_hash not in user_hashes: raise HTTPException(403, "Access denied") # Load metadata meta = load_cache_meta(content_hash) # Check origin is set origin = meta.get("origin") if not origin or "type" not in origin: raise HTTPException(400, "Origin must be set before publishing. Use --origin self or --origin-url ") # Get auth token to pass to L2 token = request.cookies.get("auth_token") if not token: # Try from header auth_header = request.headers.get("Authorization", "") if auth_header.startswith("Bearer "): token = auth_header[7:] if not token: raise HTTPException(401, "Authentication token required") # Call L2 publish-cache endpoint try: resp = http_requests.post( f"{L2_SERVER}/registry/publish-cache", headers={"Authorization": f"Bearer {token}"}, json={ "content_hash": content_hash, "asset_name": req.asset_name, "asset_type": req.asset_type, "origin": origin, "description": meta.get("description"), "tags": meta.get("tags", []), "metadata": { "filename": meta.get("filename"), "folder": meta.get("folder"), "collections": meta.get("collections", []) } }, timeout=10 ) resp.raise_for_status() l2_result = resp.json() except http_requests.exceptions.HTTPError as e: error_detail = "" try: error_detail = e.response.json().get("detail", str(e)) except Exception: error_detail = str(e) raise HTTPException(400, f"L2 publish failed: {error_detail}") except Exception as e: raise HTTPException(500, f"L2 publish failed: {e}") # Update local metadata with publish status and pin publish_info = { "to_l2": True, "asset_name": req.asset_name, "published_at": datetime.now(timezone.utc).isoformat(), "last_synced_at": datetime.now(timezone.utc).isoformat() } save_cache_meta(content_hash, published=publish_info, pinned=True, pin_reason="published") return { "published": True, "asset_name": req.asset_name, "l2_result": l2_result } @app.patch("/cache/{content_hash}/republish") async def republish_cache_to_l2( content_hash: str, request: Request, username: str = Depends(get_required_user) ): """ Re-publish (update) a cache item on L2 after metadata changes. Only works for already-published items. """ # Check file exists cache_path = get_cache_path(content_hash) if not cache_path: raise HTTPException(404, "Content not found") # Check ownership user_hashes = get_user_cache_hashes(username) if content_hash not in user_hashes: raise HTTPException(403, "Access denied") # Load metadata meta = load_cache_meta(content_hash) # Check already published published = meta.get("published", {}) if not published.get("to_l2"): raise HTTPException(400, "Item not published yet. Use publish first.") asset_name = published.get("asset_name") if not asset_name: raise HTTPException(400, "No asset name found in publish info") # Get auth token token = request.cookies.get("auth_token") if not token: auth_header = request.headers.get("Authorization", "") if auth_header.startswith("Bearer "): token = auth_header[7:] if not token: raise HTTPException(401, "Authentication token required") # Call L2 update endpoint try: resp = http_requests.patch( f"{L2_SERVER}/registry/{asset_name}", headers={"Authorization": f"Bearer {token}"}, json={ "description": meta.get("description"), "tags": meta.get("tags"), "origin": meta.get("origin"), "metadata": { "filename": meta.get("filename"), "folder": meta.get("folder"), "collections": meta.get("collections", []) } }, timeout=10 ) resp.raise_for_status() l2_result = resp.json() except http_requests.exceptions.HTTPError as e: error_detail = "" try: error_detail = e.response.json().get("detail", str(e)) except Exception: error_detail = str(e) raise HTTPException(400, f"L2 update failed: {error_detail}") except Exception as e: raise HTTPException(500, f"L2 update failed: {e}") # Update local metadata published["last_synced_at"] = datetime.now(timezone.utc).isoformat() save_cache_meta(content_hash, published=published) return { "updated": True, "asset_name": asset_name, "l2_result": l2_result } # ============ Folder & Collection Management ============ @app.get("/user/folders") async def list_folders(username: str = Depends(get_required_user)): """List user's folders.""" user_data = load_user_data(username) return {"folders": user_data["folders"]} @app.post("/user/folders") async def create_folder(folder_path: str, username: str = Depends(get_required_user)): """Create a new folder.""" user_data = load_user_data(username) # Validate path format if not folder_path.startswith("/"): raise HTTPException(400, "Folder path must start with /") # Check parent exists parent = "/".join(folder_path.rsplit("/", 1)[:-1]) or "/" if parent != "/" and parent not in user_data["folders"]: raise HTTPException(400, f"Parent folder does not exist: {parent}") # Check doesn't already exist if folder_path in user_data["folders"]: raise HTTPException(400, f"Folder already exists: {folder_path}") user_data["folders"].append(folder_path) user_data["folders"].sort() save_user_data(username, user_data) return {"folder": folder_path, "created": True} @app.delete("/user/folders") async def delete_folder(folder_path: str, username: str = Depends(get_required_user)): """Delete a folder (must be empty).""" if folder_path == "/": raise HTTPException(400, "Cannot delete root folder") user_data = load_user_data(username) if folder_path not in user_data["folders"]: raise HTTPException(404, "Folder not found") # Check no subfolders for f in user_data["folders"]: if f.startswith(folder_path + "/"): raise HTTPException(400, f"Folder has subfolders: {f}") # Check no items in folder user_hashes = get_user_cache_hashes(username) for h in user_hashes: meta = load_cache_meta(h) if meta.get("folder") == folder_path: raise HTTPException(400, "Folder is not empty") user_data["folders"].remove(folder_path) save_user_data(username, user_data) return {"folder": folder_path, "deleted": True} @app.get("/user/collections") async def list_collections(username: str = Depends(get_required_user)): """List user's collections.""" user_data = load_user_data(username) return {"collections": user_data["collections"]} @app.post("/user/collections") async def create_collection(name: str, username: str = Depends(get_required_user)): """Create a new collection.""" user_data = load_user_data(username) # Check doesn't already exist for col in user_data["collections"]: if col["name"] == name: raise HTTPException(400, f"Collection already exists: {name}") user_data["collections"].append({ "name": name, "created_at": datetime.now(timezone.utc).isoformat() }) save_user_data(username, user_data) return {"collection": name, "created": True} @app.delete("/user/collections") async def delete_collection(name: str, username: str = Depends(get_required_user)): """Delete a collection.""" user_data = load_user_data(username) # Find and remove for i, col in enumerate(user_data["collections"]): if col["name"] == name: user_data["collections"].pop(i) save_user_data(username, user_data) # Remove from all cache items user_hashes = get_user_cache_hashes(username) for h in user_hashes: meta = load_cache_meta(h) if name in meta.get("collections", []): meta["collections"].remove(name) save_cache_meta(h, **{k: v for k, v in meta.items() if k not in ("uploader", "uploaded_at")}) return {"collection": name, "deleted": True} raise HTTPException(404, "Collection not found") def is_ios_request(request: Request) -> bool: """Check if request is from iOS device.""" ua = request.headers.get("user-agent", "").lower() return "iphone" in ua or "ipad" in ua def video_src_for_request(content_hash: str, request: Request) -> str: """Get video src URL, using MP4 endpoint for iOS.""" if is_ios_request(request): return f"/cache/{content_hash}/mp4" return f"/cache/{content_hash}" def detect_media_type(cache_path: Path) -> str: """Detect if file is image or video based on magic bytes.""" with open(cache_path, "rb") as f: header = f.read(32) # Video signatures if header[:4] == b'\x1a\x45\xdf\xa3': # WebM/MKV return "video" if header[4:8] == b'ftyp': # MP4/MOV return "video" if header[:4] == b'RIFF' and header[8:12] == b'AVI ': # AVI return "video" # Image signatures if header[:8] == b'\x89PNG\r\n\x1a\n': # PNG return "image" if header[:2] == b'\xff\xd8': # JPEG return "image" if header[:6] in (b'GIF87a', b'GIF89a'): # GIF return "image" if header[:4] == b'RIFF' and header[8:12] == b'WEBP': # WebP return "image" return "unknown" def get_user_from_cookie(request) -> Optional[str]: """Get username from auth cookie.""" token = request.cookies.get("auth_token") if not token: return None return verify_token_with_l2(token) def wants_html(request: Request) -> bool: """Check if request wants HTML (browser) vs JSON (API).""" accept = request.headers.get("accept", "") return "text/html" in accept and "application/json" not in accept # Tailwind CSS config for all L1 templates TAILWIND_CONFIG = ''' ''' def render_page(title: str, content: str, username: Optional[str] = None, active_tab: str = None) -> str: """Render a page with nav bar and content. Used for clean URL pages.""" user_info = "" if username: user_info = f'''
Logged in as {username} Logout
''' else: user_info = '''
Login
''' runs_active = "border-b-2 border-blue-500 text-white" if active_tab == "runs" else "text-gray-400 hover:text-white" configs_active = "border-b-2 border-blue-500 text-white" if active_tab == "configs" else "text-gray-400 hover:text-white" cache_active = "border-b-2 border-blue-500 text-white" if active_tab == "cache" else "text-gray-400 hover:text-white" return f""" {title} | Art DAG L1 Server {TAILWIND_CONFIG}

Art DAG L1 Server

{user_info}
{content}
""" def render_ui_html(username: Optional[str] = None, tab: str = "runs") -> str: """Render main UI HTML with optional user context.""" user_info = "" if username: user_info = f'''
Logged in as {username} Logout
''' else: user_info = '''
Login
''' runs_active = "border-b-2 border-blue-500 text-white" if tab == "runs" else "text-gray-400 hover:text-white" configs_active = "border-b-2 border-blue-500 text-white" if tab == "configs" else "text-gray-400 hover:text-white" cache_active = "border-b-2 border-blue-500 text-white" if tab == "cache" else "text-gray-400 hover:text-white" if tab == "runs": content_url = "/ui/runs" elif tab == "configs": content_url = "/ui/configs-list" else: content_url = "/ui/cache-list" return f""" Art DAG L1 Server {TAILWIND_CONFIG}

Art DAG L1 Server

{user_info}
Loading...
""" def get_auth_page_html(page_type: str = "login", error: str = None) -> str: """Generate login or register page HTML with Tailwind CSS.""" is_login = page_type == "login" title = "Login" if is_login else "Register" login_active = "bg-blue-600 text-white" if is_login else "bg-dark-500 text-gray-400 hover:text-white" register_active = "bg-dark-500 text-gray-400 hover:text-white" if is_login else "bg-blue-600 text-white" error_html = f'
{error}
' if error else '' form_fields = ''' ''' if not is_login: form_fields += ''' ''' return f""" {title} | Art DAG L1 Server {TAILWIND_CONFIG}

Art DAG L1 Server

Back
{error_html}
{form_fields}
""" UI_LOGIN_HTML = get_auth_page_html("login") UI_REGISTER_HTML = get_auth_page_html("register") # Clean URL auth routes @app.get("/login", response_class=HTMLResponse) async def login_page(): """Login page (clean URL).""" return UI_LOGIN_HTML @app.post("/login") async def login(username: str = Form(...), password: str = Form(...)): """Process login form (clean URL).""" try: resp = http_requests.post( f"{L2_SERVER}/auth/login", json={"username": username, "password": password}, timeout=5 ) if resp.status_code == 200: token = resp.json().get("access_token") response = RedirectResponse(url="/runs", status_code=303) response.set_cookie("auth_token", token, httponly=True, max_age=30*24*60*60) return response except Exception: pass return HTMLResponse(get_auth_page_html("login", "Invalid username or password")) @app.get("/register", response_class=HTMLResponse) async def register_page(): """Register page (clean URL).""" return UI_REGISTER_HTML @app.post("/register") async def register( username: str = Form(...), password: str = Form(...), email: str = Form(None) ): """Process registration form (clean URL).""" try: resp = http_requests.post( f"{L2_SERVER}/auth/register", json={"username": username, "password": password, "email": email}, timeout=5 ) if resp.status_code == 200: token = resp.json().get("access_token") response = RedirectResponse(url="/runs", status_code=303) response.set_cookie("auth_token", token, httponly=True, max_age=30*24*60*60) return response elif resp.status_code == 400: error = resp.json().get("detail", "Registration failed") return HTMLResponse(get_auth_page_html("register", error)) except Exception as e: return HTMLResponse(get_auth_page_html("register", f"Registration failed: {e}")) @app.get("/logout") async def logout(): """Logout - clear cookie (clean URL).""" response = RedirectResponse(url="/runs", status_code=303) response.delete_cookie("auth_token") return response @app.get("/ui") async def ui_index(tab: str = "runs"): """Redirect /ui to clean URLs.""" if tab == "cache": return RedirectResponse(url="/cache", status_code=302) return RedirectResponse(url="/runs", status_code=302) @app.get("/ui/login") async def ui_login_page(): """Redirect to clean URL.""" return RedirectResponse(url="/login", status_code=302) @app.post("/ui/login") async def ui_login(username: str = Form(...), password: str = Form(...)): """Redirect POST to clean URL handler.""" return await login(username, password) @app.get("/ui/register") async def ui_register_page(): """Redirect to clean URL.""" return RedirectResponse(url="/register", status_code=302) @app.post("/ui/register") async def ui_register( username: str = Form(...), password: str = Form(...), email: str = Form(None) ): """Redirect POST to clean URL handler.""" return await register(username, password, email) @app.get("/ui/logout") async def ui_logout(): """Redirect to clean URL.""" return RedirectResponse(url="/logout", status_code=302) @app.post("/ui/publish-run/{run_id}", response_class=HTMLResponse) async def ui_publish_run(run_id: str, request: Request, output_name: str = Form(...)): """Publish a run to L2 from the web UI.""" token = request.cookies.get("auth_token") if not token: return HTMLResponse('
Not logged in
') # Get the run to pin its output and inputs run = load_run(run_id) if not run: return HTMLResponse('
Run not found
') # Call L2 to publish the run, including this L1's public URL # Longer timeout because L2 calls back to L1 to fetch run details try: resp = http_requests.post( f"{L2_SERVER}/registry/record-run", json={"run_id": run_id, "output_name": output_name, "l1_server": L1_PUBLIC_URL}, headers={"Authorization": f"Bearer {token}"}, timeout=30 ) if resp.status_code == 400: error = resp.json().get("detail", "Bad request") return HTMLResponse(f'
Error: {error}
') resp.raise_for_status() result = resp.json() # Pin the output if run.output_hash: save_cache_meta(run.output_hash, pinned=True, pin_reason="published") # Pin the inputs (for provenance) for input_hash in run.inputs: save_cache_meta(input_hash, pinned=True, pin_reason="input_to_published") # If this was a config-based run, pin the config and its fixed inputs if run.recipe.startswith("config:"): config_name = run.recipe.replace("config:", "") for config in list_all_configs(): if config.name == config_name: # Pin the config YAML cache_manager.pin(config.config_id, reason="config_for_published") # Pin all fixed inputs referenced by the config for fixed in config.fixed_inputs: if fixed.content_hash: cache_manager.pin(fixed.content_hash, reason="fixed_input_in_published_config") break return HTMLResponse(f'''
Published to L2 as {result["asset"]["name"]}
''') except http_requests.exceptions.HTTPError as e: error_detail = "" try: error_detail = e.response.json().get("detail", str(e)) except Exception: error_detail = str(e) return HTMLResponse(f'
Error: {error_detail}
') except Exception as e: return HTMLResponse(f'
Error: {e}
') @app.get("/ui/runs", response_class=HTMLResponse) async def ui_runs(request: Request): """HTMX partial: list of runs.""" current_user = get_user_from_cookie(request) runs = list_all_runs() # Require login to see runs if not current_user: return '

Login to see your runs.

' # Filter runs by user - match both plain username and ActivityPub format (@user@domain) actor_id = f"@{current_user}@{L2_DOMAIN}" runs = [r for r in runs if r.username in (current_user, actor_id)] if not runs: return '

You have no runs yet. Use the CLI to start a run.

' # Status badge colors status_colors = { "completed": "bg-green-600 text-white", "running": "bg-yellow-600 text-white", "failed": "bg-red-600 text-white", "pending": "bg-gray-600 text-white" } html_parts = ['
'] for run in runs[:20]: # Limit to 20 most recent status_badge = status_colors.get(run.status, "bg-gray-600 text-white") html_parts.append(f'''
{run.recipe}
{run.status}
Created: {run.created_at[:19].replace('T', ' ')}
''') # Show input and output side by side has_input = run.inputs and cache_manager.has_content(run.inputs[0]) has_output = run.status == "completed" and run.output_hash and cache_manager.has_content(run.output_hash) if has_input or has_output: html_parts.append('
') # Input box if has_input: input_hash = run.inputs[0] input_media_type = detect_media_type(get_cache_path(input_hash)) html_parts.append(f'''
Input: {input_hash[:16]}...
''') if input_media_type == "video": input_video_src = video_src_for_request(input_hash, request) html_parts.append(f'') elif input_media_type == "image": html_parts.append(f'input') html_parts.append('
') # Output box if has_output: output_hash = run.output_hash output_media_type = detect_media_type(get_cache_path(output_hash)) html_parts.append(f'''
Output: {output_hash[:16]}...
''') if output_media_type == "video": output_video_src = video_src_for_request(output_hash, request) html_parts.append(f'') elif output_media_type == "image": html_parts.append(f'output') html_parts.append('
') html_parts.append('
') # Show error if failed if run.status == "failed" and run.error: html_parts.append(f'
Error: {run.error}
') html_parts.append('
') html_parts.append('
') return '\n'.join(html_parts) @app.get("/ui/cache-list", response_class=HTMLResponse) async def ui_cache_list( request: Request, folder: Optional[str] = None, collection: Optional[str] = None, tag: Optional[str] = None ): """HTMX partial: list of cached items with optional filtering.""" current_user = get_user_from_cookie(request) # Require login to see cache if not current_user: return '

Login to see cached content.

' # Get hashes owned by/associated with this user user_hashes = get_user_cache_hashes(current_user) # Get cache items that belong to the user (from cache_manager) cache_items = [] for cached_file in cache_manager.list_all(): content_hash = cached_file.content_hash if content_hash not in user_hashes: continue # Load metadata for filtering meta = load_cache_meta(content_hash) # Apply folder filter if folder: item_folder = meta.get("folder", "/") if folder != "/" and not item_folder.startswith(folder): continue if folder == "/" and item_folder != "/": continue # Apply collection filter if collection: if collection not in meta.get("collections", []): continue # Apply tag filter if tag: if tag not in meta.get("tags", []): continue cache_items.append({ "hash": content_hash, "size": cached_file.size_bytes, "mtime": cached_file.created_at, "meta": meta }) # Sort by modification time (newest first) cache_items.sort(key=lambda x: x["mtime"], reverse=True) if not cache_items: filter_msg = "" if folder: filter_msg = f" in folder {folder}" elif collection: filter_msg = f" in collection '{collection}'" elif tag: filter_msg = f" with tag '{tag}'" return f'

No cached files{filter_msg}. Upload files or run effects to see them here.

' html_parts = ['
'] for item in cache_items[:50]: # Limit to 50 items content_hash = item["hash"] cache_path = get_cache_path(content_hash) media_type = detect_media_type(cache_path) if cache_path else "unknown" # Format size size = item["size"] if size > 1024*1024: size_str = f"{size/(1024*1024):.1f} MB" elif size > 1024: size_str = f"{size/1024:.1f} KB" else: size_str = f"{size} bytes" html_parts.append(f'''
{media_type} {size_str}
{content_hash[:24]}...
''') if media_type == "video": video_src = video_src_for_request(content_hash, request) html_parts.append(f'') elif media_type == "image": html_parts.append(f'{content_hash[:16]}') else: html_parts.append('

Unknown file type

') html_parts.append('''
''') html_parts.append('
') return '\n'.join(html_parts) @app.get("/ui/detail/{run_id}") async def ui_detail_page(run_id: str): """Redirect to clean URL.""" return RedirectResponse(url=f"/run/{run_id}", status_code=302) @app.get("/ui/run/{run_id}", response_class=HTMLResponse) async def ui_run_partial(run_id: str, request: Request): """HTMX partial: single run (for polling updates).""" run = load_run(run_id) if not run: return '
Run not found
' # Check Celery task status if running if run.status == "running" and run.celery_task_id: task = celery_app.AsyncResult(run.celery_task_id) if task.ready(): if task.successful(): result = task.result run.status = "completed" run.completed_at = datetime.now(timezone.utc).isoformat() run.output_hash = result.get("output", {}).get("content_hash") # Extract effects info from provenance effects = result.get("effects", []) if effects: run.effects_commit = effects[0].get("repo_commit") run.effect_url = effects[0].get("repo_url") # Extract infrastructure info run.infrastructure = result.get("infrastructure") output_path = Path(result.get("output", {}).get("local_path", "")) if output_path.exists(): cache_file(output_path) else: run.status = "failed" run.error = str(task.result) save_run(run) # Status badge colors status_colors = { "completed": "bg-green-600 text-white", "running": "bg-yellow-600 text-white", "failed": "bg-red-600 text-white", "pending": "bg-gray-600 text-white" } status_badge = status_colors.get(run.status, "bg-gray-600 text-white") poll_attr = f'hx-get="/ui/run/{run_id}" hx-trigger="every 2s" hx-swap="outerHTML"' if run.status == "running" else "" html = f'''
{run.recipe}
{run.status}
Created: {run.created_at[:19].replace('T', ' ')}
''' # Show input and output side by side has_input = run.inputs and cache_manager.has_content(run.inputs[0]) has_output = run.status == "completed" and run.output_hash and cache_manager.has_content(run.output_hash) if has_input or has_output: html += '
' if has_input: input_hash = run.inputs[0] input_media_type = detect_media_type(get_cache_path(input_hash)) html += f'''
Input: {input_hash[:16]}...
''' if input_media_type == "video": input_video_src = video_src_for_request(input_hash, request) html += f'' elif input_media_type == "image": html += f'input' html += '
' if has_output: output_hash = run.output_hash output_media_type = detect_media_type(get_cache_path(output_hash)) html += f'''
Output: {output_hash[:16]}...
''' if output_media_type == "video": output_video_src = video_src_for_request(output_hash, request) html += f'' elif output_media_type == "image": html += f'output' html += '
' html += '
' if run.status == "failed" and run.error: html += f'
Error: {run.error}
' html += '
' return html if __name__ == "__main__": import uvicorn uvicorn.run(app, host="0.0.0.0", port=8100)