Files
celery/server.py
gilesb 9c158ff884 Fix media list duplicates and cache browse link
- Database: Use DISTINCT ON to deduplicate items by content_hash
- Database: Count unique content_hashes in count_user_items
- Server: Fix media card link from /ui/cache to /cache
- Server: Use /raw endpoint for image thumbnails
- Server: Add seen_hashes dedup in media list iteration

Co-Authored-By: Claude Opus 4.5 <noreply@anthropic.com>
2026-01-09 10:17:35 +00:00

4054 lines
163 KiB
Python

#!/usr/bin/env python3
"""
Art DAG L1 Server
Manages rendering runs and provides access to the cache.
- POST /runs - start a run (recipe + inputs)
- GET /runs/{run_id} - get run status/result
- GET /cache/{content_hash} - get cached content
"""
import asyncio
import base64
import hashlib
import json
import logging
import os
import time
import uuid
from dataclasses import dataclass
from datetime import datetime, timezone
from pathlib import Path
from typing import Optional
# Configure logging
logging.basicConfig(
level=logging.INFO,
format='%(asctime)s %(levelname)s %(name)s: %(message)s'
)
logger = logging.getLogger(__name__)
from fastapi import FastAPI, HTTPException, UploadFile, File, Depends, Form, Request
from fastapi.responses import FileResponse, HTMLResponse, RedirectResponse
from fastapi.security import HTTPBearer, HTTPAuthorizationCredentials
from pydantic import BaseModel
import redis
import requests as http_requests
from urllib.parse import urlparse
import yaml
from celery_app import app as celery_app
from tasks import render_effect, execute_dag, build_effect_dag
from contextlib import asynccontextmanager
from cache_manager import L1CacheManager, get_cache_manager
import database
# L1 public URL for redirects
L1_PUBLIC_URL = os.environ.get("L1_PUBLIC_URL", "http://localhost:8100")
# Default L2 for login redirect when not logged in (user can login to any L2)
DEFAULT_L2_SERVER = os.environ.get("DEFAULT_L2_SERVER", "http://localhost:8200")
# IPFS gateway URL for public access to IPFS content
IPFS_GATEWAY_URL = os.environ.get("IPFS_GATEWAY_URL", "")
# Cache directory (use /data/cache in Docker, ~/.artdag/cache locally)
CACHE_DIR = Path(os.environ.get("CACHE_DIR", str(Path.home() / ".artdag" / "cache")))
CACHE_DIR.mkdir(parents=True, exist_ok=True)
# Redis for persistent run storage and shared cache index (multi-worker support)
REDIS_URL = os.environ.get('REDIS_URL', 'redis://localhost:6379/5')
parsed = urlparse(REDIS_URL)
redis_client = redis.Redis(
host=parsed.hostname or 'localhost',
port=parsed.port or 6379,
db=int(parsed.path.lstrip('/') or 0),
socket_timeout=5,
socket_connect_timeout=5
)
RUNS_KEY_PREFIX = "artdag:run:"
RECIPES_KEY_PREFIX = "artdag:recipe:"
# Initialize L1 cache manager with Redis for shared state between workers
cache_manager = L1CacheManager(cache_dir=CACHE_DIR, redis_client=redis_client)
def save_run(run: "RunStatus"):
"""Save run to Redis."""
redis_client.set(f"{RUNS_KEY_PREFIX}{run.run_id}", run.model_dump_json())
def load_run(run_id: str) -> Optional["RunStatus"]:
"""Load run from Redis."""
data = redis_client.get(f"{RUNS_KEY_PREFIX}{run_id}")
if data:
return RunStatus.model_validate_json(data)
return None
def list_all_runs() -> list["RunStatus"]:
"""List all runs from Redis."""
runs = []
for key in redis_client.scan_iter(f"{RUNS_KEY_PREFIX}*"):
data = redis_client.get(key)
if data:
runs.append(RunStatus.model_validate_json(data))
return sorted(runs, key=lambda r: r.created_at, reverse=True)
def find_runs_using_content(content_hash: str) -> list[tuple["RunStatus", str]]:
"""Find all runs that use a content_hash as input or output.
Returns list of (run, role) tuples where role is 'input' or 'output'.
"""
results = []
for run in list_all_runs():
if run.inputs and content_hash in run.inputs:
results.append((run, "input"))
if run.output_hash == content_hash:
results.append((run, "output"))
return results
@asynccontextmanager
async def lifespan(app: FastAPI):
"""Initialize and cleanup resources."""
# Startup: initialize database
await database.init_db()
yield
# Shutdown: close database
await database.close_db()
app = FastAPI(
title="Art DAG L1 Server",
description="Distributed rendering server for Art DAG",
version="0.1.0",
lifespan=lifespan
)
class RunRequest(BaseModel):
"""Request to start a run."""
recipe: str # Recipe name (e.g., "dog", "identity") or "dag" for custom DAG
inputs: list[str] # List of content hashes
output_name: Optional[str] = None
use_dag: bool = False # Use DAG engine instead of legacy effect runner
dag_json: Optional[str] = None # Custom DAG JSON (required if recipe="dag")
class RunStatus(BaseModel):
"""Status of a run."""
run_id: str
status: str # pending, running, completed, failed
recipe: str
inputs: list[str]
output_name: str
created_at: str
completed_at: Optional[str] = None
output_hash: Optional[str] = None
error: Optional[str] = None
celery_task_id: Optional[str] = None
effects_commit: Optional[str] = None
effect_url: Optional[str] = None # URL to effect source code
username: Optional[str] = None # Owner of the run (ActivityPub actor ID)
infrastructure: Optional[dict] = None # Hardware/software used for rendering
provenance_cid: Optional[str] = None # IPFS CID of provenance record
# ============ Recipe Models ============
class VariableInput(BaseModel):
"""A variable input that must be filled at run time."""
node_id: str
name: str
description: Optional[str] = None
required: bool = True
class FixedInput(BaseModel):
"""A fixed input resolved from the registry."""
node_id: str
asset: str
content_hash: str
class RecipeStatus(BaseModel):
"""Status/metadata of a recipe."""
recipe_id: str # Content hash of the YAML file
name: str
version: str
description: Optional[str] = None
variable_inputs: list[VariableInput]
fixed_inputs: list[FixedInput]
output_node: str
owner: Optional[str] = None
uploaded_at: str
uploader: Optional[str] = None
class RecipeRunRequest(BaseModel):
"""Request to run a recipe with variable inputs."""
inputs: dict[str, str] # node_id -> content_hash
def save_recipe(recipe: RecipeStatus):
"""Save recipe to Redis."""
redis_client.set(f"{RECIPES_KEY_PREFIX}{recipe.recipe_id}", recipe.model_dump_json())
def load_recipe(recipe_id: str) -> Optional[RecipeStatus]:
"""Load recipe from Redis."""
data = redis_client.get(f"{RECIPES_KEY_PREFIX}{recipe_id}")
if data:
return RecipeStatus.model_validate_json(data)
return None
def list_all_recipes() -> list[RecipeStatus]:
"""List all recipes from Redis."""
recipes = []
for key in redis_client.scan_iter(f"{RECIPES_KEY_PREFIX}*"):
data = redis_client.get(key)
if data:
recipes.append(RecipeStatus.model_validate_json(data))
return sorted(recipes, key=lambda c: c.uploaded_at, reverse=True)
def delete_recipe_from_redis(recipe_id: str) -> bool:
"""Delete recipe from Redis."""
return redis_client.delete(f"{RECIPES_KEY_PREFIX}{recipe_id}") > 0
def parse_recipe_yaml(yaml_content: str, recipe_hash: str, uploader: str) -> RecipeStatus:
"""Parse a recipe YAML file and extract metadata."""
config = yaml.safe_load(yaml_content)
# Extract basic info
name = config.get("name", "unnamed")
version = config.get("version", "1.0")
description = config.get("description")
owner = config.get("owner")
# Parse registry
registry = config.get("registry", {})
assets = registry.get("assets", {})
# Parse DAG nodes
dag = config.get("dag", {})
nodes = dag.get("nodes", [])
output_node = dag.get("output")
variable_inputs = []
fixed_inputs = []
for node in nodes:
node_id = node.get("id")
node_type = node.get("type")
node_config = node.get("config", {})
if node_type == "SOURCE":
if node_config.get("input"):
# Variable input
variable_inputs.append(VariableInput(
node_id=node_id,
name=node_config.get("name", node_id),
description=node_config.get("description"),
required=node_config.get("required", True)
))
elif "asset" in node_config:
# Fixed input - resolve from registry
asset_name = node_config["asset"]
asset_info = assets.get(asset_name, {})
fixed_inputs.append(FixedInput(
node_id=node_id,
asset=asset_name,
content_hash=asset_info.get("hash", "")
))
return RecipeStatus(
recipe_id=recipe_hash,
name=name,
version=version,
description=description,
variable_inputs=variable_inputs,
fixed_inputs=fixed_inputs,
output_node=output_node or "",
owner=owner,
uploaded_at=datetime.now(timezone.utc).isoformat(),
uploader=uploader
)
# ============ Auth ============
security = HTTPBearer(auto_error=False)
@dataclass
class UserContext:
"""User authentication context extracted from JWT token."""
username: str
l2_server: str # The L2 server that issued the token (e.g., "https://artdag.rose-ash.com")
l2_domain: str # Domain part for actor_id (e.g., "artdag.rose-ash.com")
@property
def actor_id(self) -> str:
"""ActivityPub-style actor ID: @username@domain"""
return f"@{self.username}@{self.l2_domain}"
def decode_token_claims(token: str) -> Optional[dict]:
"""Decode JWT payload without verification (L2 does verification)."""
try:
# JWT format: header.payload.signature (all base64url encoded)
parts = token.split(".")
if len(parts) != 3:
return None
# Decode payload (add padding if needed)
payload = parts[1]
padding = 4 - len(payload) % 4
if padding != 4:
payload += "=" * padding
decoded = base64.urlsafe_b64decode(payload)
return json.loads(decoded)
except Exception:
return None
def get_user_context_from_token(token: str) -> Optional[UserContext]:
"""Extract user context from JWT token. Token must contain l2_server claim."""
claims = decode_token_claims(token)
if not claims:
return None
username = claims.get("username") or claims.get("sub")
l2_server = claims.get("l2_server") # e.g., "https://artdag.rose-ash.com"
if not username or not l2_server:
return None
# Extract domain from l2_server URL for actor_id
from urllib.parse import urlparse
parsed = urlparse(l2_server)
l2_domain = parsed.netloc or l2_server
return UserContext(username=username, l2_server=l2_server, l2_domain=l2_domain)
def _verify_token_with_l2_sync(token: str, l2_server: str) -> Optional[str]:
"""Verify token with the L2 server that issued it, return username if valid. (Sync version)"""
try:
resp = http_requests.post(
f"{l2_server}/auth/verify",
headers={"Authorization": f"Bearer {token}"},
timeout=5
)
if resp.status_code == 200:
return resp.json().get("username")
except Exception:
pass
return None
async def verify_token_with_l2(token: str, l2_server: str) -> Optional[str]:
"""Verify token with the L2 server that issued it, return username if valid."""
return await asyncio.to_thread(_verify_token_with_l2_sync, token, l2_server)
async def get_verified_user_context(token: str) -> Optional[UserContext]:
"""Get verified user context from token. Verifies with the L2 that issued it."""
ctx = get_user_context_from_token(token)
if not ctx:
return None
# Verify token with the L2 server from the token (non-blocking)
verified_username = await verify_token_with_l2(token, ctx.l2_server)
if not verified_username:
return None
return ctx
async def get_optional_user(
credentials: HTTPAuthorizationCredentials = Depends(security)
) -> Optional[str]:
"""Get username if authenticated, None otherwise."""
if not credentials:
return None
ctx = await get_verified_user_context(credentials.credentials)
return ctx.username if ctx else None
async def get_required_user(
credentials: HTTPAuthorizationCredentials = Depends(security)
) -> str:
"""Get username, raise 401 if not authenticated."""
if not credentials:
raise HTTPException(401, "Not authenticated")
ctx = await get_verified_user_context(credentials.credentials)
if not ctx:
raise HTTPException(401, "Invalid token")
return ctx.username
async def get_required_user_context(
credentials: HTTPAuthorizationCredentials = Depends(security)
) -> UserContext:
"""Get full user context, raise 401 if not authenticated."""
if not credentials:
raise HTTPException(401, "Not authenticated")
ctx = await get_verified_user_context(credentials.credentials)
if not ctx:
raise HTTPException(401, "Invalid token")
return ctx
def file_hash(path: Path) -> str:
"""Compute SHA3-256 hash of a file."""
hasher = hashlib.sha3_256()
with open(path, "rb") as f:
for chunk in iter(lambda: f.read(65536), b""):
hasher.update(chunk)
return hasher.hexdigest()
async def cache_file(source: Path, node_type: str = "output") -> str:
"""
Copy file to cache using L1CacheManager, return content hash.
Uses artdag's Cache internally for proper tracking.
Saves IPFS CID to database.
"""
cached, ipfs_cid = cache_manager.put(source, node_type=node_type)
# Save to cache_items table (with IPFS CID)
await database.create_cache_item(cached.content_hash, ipfs_cid)
return cached.content_hash
def get_cache_path(content_hash: str) -> Optional[Path]:
"""Get the path for a cached file by content_hash."""
return cache_manager.get_by_content_hash(content_hash)
@app.get("/api")
async def api_info():
"""Server info (JSON)."""
runs = await asyncio.to_thread(list_all_runs)
return {
"name": "Art DAG L1 Server",
"version": "0.1.0",
"cache_dir": str(CACHE_DIR),
"runs_count": len(runs)
}
def render_home_html(actor_id: Optional[str] = None) -> str:
"""Render the home page HTML with optional user info."""
if actor_id:
# Extract username and domain from @username@domain format
parts = actor_id.lstrip("@").split("@")
username = parts[0] if parts else actor_id
domain = parts[1] if len(parts) > 1 else ""
l2_user_url = f"https://{domain}/users/{username}" if domain else "#"
user_section = f'''<div class="ml-auto flex items-center gap-2 text-sm text-gray-300">
Logged in as <a href="{l2_user_url}" class="text-blue-400 hover:text-blue-300">{actor_id}</a>
</div>'''
else:
user_section = '''<a href="/login" class="ml-auto px-4 py-2 bg-blue-600 hover:bg-blue-700 rounded-md text-white font-medium transition-colors">Login via L2</a>'''
return f"""
<!DOCTYPE html>
<html class="dark">
<head>
<meta charset="UTF-8">
<meta name="viewport" content="width=device-width, initial-scale=1.0">
<title>Art DAG L1 Server</title>
<script src="https://cdn.tailwindcss.com"></script>
<script>
tailwind.config = {{
darkMode: 'class',
theme: {{ extend: {{ colors: {{ dark: {{ 900: '#0a0a0a', 800: '#111', 700: '#1a1a1a', 600: '#222', 500: '#333' }} }} }} }}
}}
</script>
</head>
<body class="bg-dark-900 text-gray-100 min-h-screen">
<div class="max-w-4xl mx-auto px-4 py-8 sm:px-6 lg:px-8">
<nav class="flex flex-wrap gap-3 mb-8 p-4 bg-dark-700 rounded-lg">
<a href="/runs" class="px-4 py-2 bg-dark-500 hover:bg-dark-600 rounded-md text-blue-400 hover:text-blue-300 font-medium transition-colors">Runs</a>
<a href="/recipes" class="px-4 py-2 bg-dark-500 hover:bg-dark-600 rounded-md text-blue-400 hover:text-blue-300 font-medium transition-colors">Recipes</a>
<a href="/media" class="px-4 py-2 bg-dark-500 hover:bg-dark-600 rounded-md text-blue-400 hover:text-blue-300 font-medium transition-colors">Media</a>
<a href="/docs" class="px-4 py-2 bg-dark-500 hover:bg-dark-600 rounded-md text-blue-400 hover:text-blue-300 font-medium transition-colors">API Docs</a>
{user_section}
</nav>
<h1 class="text-3xl font-bold text-white border-b border-dark-500 pb-4 mb-6">Art DAG L1 Server</h1>
<p class="text-gray-300 mb-8">L1 rendering server for the Art DAG system. Manages distributed rendering jobs via Celery workers.</p>
<h2 class="text-xl font-semibold text-gray-200 mt-8 mb-4">Dependencies</h2>
<ul class="list-disc list-inside space-y-2 text-gray-300 mb-8">
<li><strong class="text-white">artdag</strong> (GitHub): Core DAG execution engine</li>
<li><strong class="text-white">artdag-effects</strong> (rose-ash): Effect implementations</li>
<li><strong class="text-white">Redis</strong>: Message broker, result backend, and run persistence</li>
</ul>
<h2 class="text-xl font-semibold text-gray-200 mt-8 mb-4">API Endpoints</h2>
<div class="overflow-x-auto mb-8">
<table class="w-full text-sm">
<thead>
<tr class="bg-dark-600">
<th class="px-4 py-3 text-left border border-dark-500">Method</th>
<th class="px-4 py-3 text-left border border-dark-500">Path</th>
<th class="px-4 py-3 text-left border border-dark-500">Description</th>
</tr>
</thead>
<tbody class="divide-y divide-dark-500">
<tr class="bg-dark-800"><td class="px-4 py-2 border border-dark-500">GET</td><td class="px-4 py-2 border border-dark-500"><code class="bg-dark-600 px-2 py-0.5 rounded text-blue-300">/ui</code></td><td class="px-4 py-2 border border-dark-500">Web UI for viewing runs</td></tr>
<tr class="bg-dark-800"><td class="px-4 py-2 border border-dark-500">POST</td><td class="px-4 py-2 border border-dark-500"><code class="bg-dark-600 px-2 py-0.5 rounded text-blue-300">/runs</code></td><td class="px-4 py-2 border border-dark-500">Start a rendering run</td></tr>
<tr class="bg-dark-800"><td class="px-4 py-2 border border-dark-500">GET</td><td class="px-4 py-2 border border-dark-500"><code class="bg-dark-600 px-2 py-0.5 rounded text-blue-300">/runs</code></td><td class="px-4 py-2 border border-dark-500">List all runs</td></tr>
<tr class="bg-dark-800"><td class="px-4 py-2 border border-dark-500">GET</td><td class="px-4 py-2 border border-dark-500"><code class="bg-dark-600 px-2 py-0.5 rounded text-blue-300">/runs/{{run_id}}</code></td><td class="px-4 py-2 border border-dark-500">Get run status</td></tr>
<tr class="bg-dark-800"><td class="px-4 py-2 border border-dark-500">GET</td><td class="px-4 py-2 border border-dark-500"><code class="bg-dark-600 px-2 py-0.5 rounded text-blue-300">/media</code></td><td class="px-4 py-2 border border-dark-500">List media items</td></tr>
<tr class="bg-dark-800"><td class="px-4 py-2 border border-dark-500">GET</td><td class="px-4 py-2 border border-dark-500"><code class="bg-dark-600 px-2 py-0.5 rounded text-blue-300">/recipes</code></td><td class="px-4 py-2 border border-dark-500">List recipes</td></tr>
<tr class="bg-dark-800"><td class="px-4 py-2 border border-dark-500">GET</td><td class="px-4 py-2 border border-dark-500"><code class="bg-dark-600 px-2 py-0.5 rounded text-blue-300">/cache/{{hash}}</code></td><td class="px-4 py-2 border border-dark-500">Download cached content</td></tr>
<tr class="bg-dark-800"><td class="px-4 py-2 border border-dark-500">POST</td><td class="px-4 py-2 border border-dark-500"><code class="bg-dark-600 px-2 py-0.5 rounded text-blue-300">/cache/upload</code></td><td class="px-4 py-2 border border-dark-500">Upload file to cache</td></tr>
<tr class="bg-dark-800"><td class="px-4 py-2 border border-dark-500">GET</td><td class="px-4 py-2 border border-dark-500"><code class="bg-dark-600 px-2 py-0.5 rounded text-blue-300">/assets</code></td><td class="px-4 py-2 border border-dark-500">List known assets</td></tr>
</tbody>
</table>
</div>
<h2 class="text-xl font-semibold text-gray-200 mt-8 mb-4">Start a Run</h2>
<pre class="bg-dark-700 p-4 rounded-lg overflow-x-auto border border-dark-500 mb-8"><code class="text-green-300">curl -X POST /runs \\
-H "Content-Type: application/json" \\
-d '{{"recipe": "dog", "inputs": ["33268b6e..."]}}'</code></pre>
<h2 class="text-xl font-semibold text-gray-200 mt-8 mb-4">Provenance</h2>
<p class="text-gray-300 mb-4">Every render produces a provenance record linking inputs, effects, and infrastructure:</p>
<pre class="bg-dark-700 p-4 rounded-lg overflow-x-auto border border-dark-500"><code class="text-green-300">{{
"output": {{"content_hash": "..."}},
"inputs": [...],
"effects": [...],
"infrastructure": {{...}}
}}</code></pre>
</div>
</body>
</html>
"""
@app.get("/", response_class=HTMLResponse)
async def root(request: Request):
"""Home page."""
ctx = await get_user_context_from_cookie(request)
actor_id = ctx.actor_id if ctx else None
return render_home_html(actor_id)
@app.post("/runs", response_model=RunStatus)
async def create_run(request: RunRequest, ctx: UserContext = Depends(get_required_user_context)):
"""Start a new rendering run. Requires authentication."""
run_id = str(uuid.uuid4())
# Generate output name if not provided
output_name = request.output_name or f"{request.recipe}-{run_id[:8]}"
# Use actor_id from user context
actor_id = ctx.actor_id
# Create run record
run = RunStatus(
run_id=run_id,
status="pending",
recipe=request.recipe,
inputs=request.inputs,
output_name=output_name,
created_at=datetime.now(timezone.utc).isoformat(),
username=actor_id
)
# Submit to Celery
if request.use_dag or request.recipe == "dag":
# DAG mode - use artdag engine
if request.dag_json:
# Custom DAG provided
dag_json = request.dag_json
else:
# Build simple effect DAG from recipe and inputs
dag = build_effect_dag(request.inputs, request.recipe)
dag_json = dag.to_json()
task = execute_dag.delay(dag_json, run.run_id)
else:
# Legacy mode - single effect
if len(request.inputs) != 1:
raise HTTPException(400, "Legacy mode only supports single-input recipes. Use use_dag=true for multi-input.")
input_hash = request.inputs[0]
task = render_effect.delay(input_hash, request.recipe, output_name)
run.celery_task_id = task.id
run.status = "running"
await asyncio.to_thread(save_run, run)
return run
def _check_celery_task_sync(task_id: str) -> tuple[bool, bool, Optional[dict], Optional[str]]:
"""Check Celery task status synchronously. Returns (is_ready, is_successful, result, error)."""
task = celery_app.AsyncResult(task_id)
if not task.ready():
return (False, False, None, None)
if task.successful():
return (True, True, task.result, None)
else:
return (True, False, None, str(task.result))
@app.get("/runs/{run_id}", response_model=RunStatus)
async def get_run(run_id: str):
"""Get status of a run."""
start = time.time()
logger.info(f"get_run: Starting for {run_id}")
t0 = time.time()
run = await asyncio.to_thread(load_run, run_id)
logger.info(f"get_run: load_run took {time.time()-t0:.3f}s, status={run.status if run else 'None'}")
if not run:
raise HTTPException(404, f"Run {run_id} not found")
# Check Celery task status if running
if run.status == "running" and run.celery_task_id:
t0 = time.time()
is_ready, is_successful, result, error = await asyncio.to_thread(
_check_celery_task_sync, run.celery_task_id
)
logger.info(f"get_run: Celery check took {time.time()-t0:.3f}s, ready={is_ready}")
if is_ready:
if is_successful:
run.status = "completed"
run.completed_at = datetime.now(timezone.utc).isoformat()
# Handle both legacy (render_effect) and new (execute_dag) result formats
if "output_hash" in result:
# New DAG result format
run.output_hash = result.get("output_hash")
run.provenance_cid = result.get("provenance_cid")
output_path = Path(result.get("output_path", "")) if result.get("output_path") else None
else:
# Legacy render_effect format
run.output_hash = result.get("output", {}).get("content_hash")
run.provenance_cid = result.get("provenance_cid")
output_path = Path(result.get("output", {}).get("local_path", ""))
# Extract effects info from provenance (legacy only)
effects = result.get("effects", [])
if effects:
run.effects_commit = effects[0].get("repo_commit")
run.effect_url = effects[0].get("repo_url")
# Extract infrastructure info (legacy only)
run.infrastructure = result.get("infrastructure")
# Cache the output (legacy mode - DAG already caches via cache_manager)
if output_path and output_path.exists() and "output_hash" not in result:
t0 = time.time()
await cache_file(output_path, node_type="effect_output")
logger.info(f"get_run: cache_file took {time.time()-t0:.3f}s")
# Record activity for deletion tracking (legacy mode)
if run.output_hash and run.inputs:
await asyncio.to_thread(
cache_manager.record_simple_activity,
input_hashes=run.inputs,
output_hash=run.output_hash,
run_id=run.run_id,
)
else:
run.status = "failed"
run.error = error
# Save updated status
t0 = time.time()
await asyncio.to_thread(save_run, run)
logger.info(f"get_run: save_run took {time.time()-t0:.3f}s")
logger.info(f"get_run: Total time {time.time()-start:.3f}s")
return run
@app.delete("/runs/{run_id}")
async def discard_run(run_id: str, ctx: UserContext = Depends(get_required_user_context)):
"""
Discard (delete) a run and its outputs.
Enforces deletion rules:
- Cannot discard if output is published to L2 (pinned)
- Deletes outputs and intermediate cache entries
- Preserves inputs (cache items and recipes are NOT deleted)
"""
run = await asyncio.to_thread(load_run, run_id)
if not run:
raise HTTPException(404, f"Run {run_id} not found")
# Check ownership
if run.username not in (ctx.username, ctx.actor_id):
raise HTTPException(403, "Access denied")
# Failed runs can always be deleted (no output to protect)
if run.status != "failed":
# Only check if output is pinned - inputs are preserved, not deleted
if run.output_hash:
meta = await database.load_item_metadata(run.output_hash, ctx.actor_id)
if meta.get("pinned"):
pin_reason = meta.get("pin_reason", "published")
raise HTTPException(400, f"Cannot discard run: output {run.output_hash[:16]}... is pinned ({pin_reason})")
# Check if activity exists for this run
activity = await asyncio.to_thread(cache_manager.get_activity, run_id)
if activity:
# Discard the activity - only delete outputs, preserve inputs
success, msg = await asyncio.to_thread(cache_manager.discard_activity_outputs_only, run_id)
if not success:
raise HTTPException(400, f"Cannot discard run: {msg}")
# Remove from Redis
await asyncio.to_thread(redis_client.delete, f"{RUNS_KEY_PREFIX}{run_id}")
return {"discarded": True, "run_id": run_id}
@app.delete("/ui/runs/{run_id}/discard", response_class=HTMLResponse)
async def ui_discard_run(run_id: str, request: Request):
"""HTMX handler: discard a run. Only deletes outputs, preserves inputs."""
ctx = await get_user_context_from_cookie(request)
if not ctx:
return '<div class="bg-red-900/50 border border-red-700 text-red-300 px-4 py-3 rounded-lg mb-4">Login required</div>'
run = await asyncio.to_thread(load_run, run_id)
if not run:
return '<div class="bg-red-900/50 border border-red-700 text-red-300 px-4 py-3 rounded-lg mb-4">Run not found</div>'
# Check ownership
if run.username not in (ctx.username, ctx.actor_id):
return '<div class="bg-red-900/50 border border-red-700 text-red-300 px-4 py-3 rounded-lg mb-4">Access denied</div>'
# Failed runs can always be deleted
if run.status != "failed":
# Only check if output is pinned - inputs are preserved, not deleted
if run.output_hash:
meta = await database.load_item_metadata(run.output_hash, ctx.actor_id)
if meta.get("pinned"):
pin_reason = meta.get("pin_reason", "published")
return f'<div class="bg-red-900/50 border border-red-700 text-red-300 px-4 py-3 rounded-lg mb-4">Cannot discard: output is pinned ({pin_reason})</div>'
# Check if activity exists for this run
activity = await asyncio.to_thread(cache_manager.get_activity, run_id)
if activity:
# Discard the activity - only delete outputs, preserve inputs
success, msg = await asyncio.to_thread(cache_manager.discard_activity_outputs_only, run_id)
if not success:
return f'<div class="bg-red-900/50 border border-red-700 text-red-300 px-4 py-3 rounded-lg mb-4">Cannot discard: {msg}</div>'
# Remove from Redis
await asyncio.to_thread(redis_client.delete, f"{RUNS_KEY_PREFIX}{run_id}")
return '''
<div class="bg-green-900/50 border border-green-700 text-green-300 px-4 py-3 rounded-lg mb-4">
Run deleted. <a href="/runs" class="underline">Back to runs</a>
</div>
'''
@app.get("/run/{run_id}")
async def run_detail(run_id: str, request: Request):
"""Run detail. HTML for browsers, JSON for APIs."""
run = await asyncio.to_thread(load_run, run_id)
if not run:
if wants_html(request):
content = f'<p class="text-red-400">Run not found: {run_id}</p>'
return HTMLResponse(render_page("Not Found", content, None, active_tab="runs"), status_code=404)
raise HTTPException(404, f"Run {run_id} not found")
# Check Celery task status if running
if run.status == "running" and run.celery_task_id:
is_ready, is_successful, result, error = await asyncio.to_thread(
_check_celery_task_sync, run.celery_task_id
)
if is_ready:
if is_successful:
run.status = "completed"
run.completed_at = datetime.now(timezone.utc).isoformat()
run.output_hash = result.get("output", {}).get("content_hash")
effects = result.get("effects", [])
if effects:
run.effects_commit = effects[0].get("repo_commit")
run.effect_url = effects[0].get("repo_url")
run.infrastructure = result.get("infrastructure")
output_path = Path(result.get("output", {}).get("local_path", ""))
if output_path.exists():
await cache_file(output_path)
else:
run.status = "failed"
run.error = error
await asyncio.to_thread(save_run, run)
if wants_html(request):
ctx = await get_user_context_from_cookie(request)
if not ctx:
content = '<p class="text-gray-400 py-8 text-center"><a href="/login" class="text-blue-400 hover:text-blue-300">Login via L2</a> to view run details.</p>'
return HTMLResponse(render_page("Login Required", content, None, active_tab="runs"), status_code=401)
# Check user owns this run
if run.username not in (ctx.username, ctx.actor_id):
content = '<p class="text-red-400 py-8 text-center">Access denied.</p>'
return HTMLResponse(render_page("Access Denied", content, ctx.actor_id, active_tab="runs"), status_code=403)
# Build effect URL
if run.effect_url:
effect_url = run.effect_url
elif run.effects_commit and run.effects_commit != "unknown":
effect_url = f"https://git.rose-ash.com/art-dag/effects/src/commit/{run.effects_commit}/{run.recipe}"
else:
effect_url = f"https://git.rose-ash.com/art-dag/effects/src/branch/main/{run.recipe}"
# Status badge colors
status_colors = {
"completed": "bg-green-600 text-white",
"running": "bg-yellow-600 text-white",
"failed": "bg-red-600 text-white",
"pending": "bg-gray-600 text-white"
}
status_badge = status_colors.get(run.status, "bg-gray-600 text-white")
# Build media HTML for input/output
media_html = ""
has_input = run.inputs and cache_manager.has_content(run.inputs[0])
has_output = run.status == "completed" and run.output_hash and cache_manager.has_content(run.output_hash)
if has_input or has_output:
media_html = '<div class="grid gap-6 md:grid-cols-2 mb-8">'
if has_input:
input_hash = run.inputs[0]
input_media_type = detect_media_type(get_cache_path(input_hash))
input_video_src = video_src_for_request(input_hash, request)
if input_media_type == "video":
input_elem = f'<video src="{input_video_src}" controls muted loop playsinline class="max-w-full max-h-64 rounded-lg"></video>'
elif input_media_type == "image":
input_elem = f'<img src="/cache/{input_hash}" alt="input" class="max-w-full max-h-64 rounded-lg">'
else:
input_elem = '<p class="text-gray-400">Unknown format</p>'
media_html += f'''
<div class="bg-dark-600 rounded-lg p-4">
<div class="text-sm text-gray-400 mb-2">Input</div>
<a href="/cache/{input_hash}" class="text-blue-400 hover:text-blue-300 font-mono text-xs">{input_hash[:24]}...</a>
<div class="mt-3 flex justify-center">{input_elem}</div>
</div>
'''
if has_output:
output_hash = run.output_hash
output_media_type = detect_media_type(get_cache_path(output_hash))
output_video_src = video_src_for_request(output_hash, request)
if output_media_type == "video":
output_elem = f'<video src="{output_video_src}" controls autoplay muted loop playsinline class="max-w-full max-h-64 rounded-lg"></video>'
elif output_media_type == "image":
output_elem = f'<img src="/cache/{output_hash}" alt="output" class="max-w-full max-h-64 rounded-lg">'
else:
output_elem = '<p class="text-gray-400">Unknown format</p>'
media_html += f'''
<div class="bg-dark-600 rounded-lg p-4">
<div class="text-sm text-gray-400 mb-2">Output</div>
<a href="/cache/{output_hash}" class="text-blue-400 hover:text-blue-300 font-mono text-xs">{output_hash[:24]}...</a>
<div class="mt-3 flex justify-center">{output_elem}</div>
</div>
'''
media_html += '</div>'
# Build inputs list
inputs_html = ''.join([f'<a href="/cache/{inp}" class="text-blue-400 hover:text-blue-300 font-mono text-xs block">{inp}</a>' for inp in run.inputs])
# Infrastructure section
infra_html = ""
if run.infrastructure:
software = run.infrastructure.get("software", {})
hardware = run.infrastructure.get("hardware", {})
infra_html = f'''
<div class="bg-dark-600 rounded-lg p-4">
<div class="text-sm text-gray-400 mb-1">Infrastructure</div>
<div class="text-gray-200 text-sm">
Software: {software.get("name", "unknown")} ({software.get("content_hash", "unknown")[:16]}...)<br>
Hardware: {hardware.get("name", "unknown")} ({hardware.get("content_hash", "unknown")[:16]}...)
</div>
</div>
'''
# Error display
error_html = ""
if run.error:
error_html = f'''
<div class="bg-red-900/30 border border-red-700 rounded-lg p-4 mb-6">
<div class="text-sm text-red-400 mb-1">Error</div>
<div class="text-red-300">{run.error}</div>
</div>
'''
# Publish section - check if already published to L2
publish_html = ""
if run.status == "completed" and run.output_hash:
l2_shares = await database.get_l2_shares(run.output_hash, ctx.actor_id)
if l2_shares:
# Already published - show link to L2
share = l2_shares[0]
l2_server = share.get("l2_server", "")
l2_https = l2_server.replace("http://", "https://")
asset_name = share.get("asset_name", "")
publish_html = f'''
<div class="border-t border-dark-500 pt-6 mt-6">
<h2 class="text-lg font-semibold text-white mb-3">Published to L2</h2>
<div class="bg-green-900/30 border border-green-700 rounded-lg p-4">
<p class="text-green-300">
Published as <strong>{asset_name}</strong>
<a href="{l2_https}/ui/asset/{asset_name}" target="_blank" class="underline ml-2">View on L2</a>
</p>
</div>
</div>
'''
else:
# Not published - show publish form
publish_html = f'''
<div class="border-t border-dark-500 pt-6 mt-6">
<h2 class="text-lg font-semibold text-white mb-3">Publish to L2</h2>
<p class="text-sm text-gray-400 mb-4">Register this transformation output on the L2 ActivityPub server.</p>
<div id="publish-result"></div>
<form hx-post="/ui/publish-run/{run.run_id}" hx-target="#publish-result" hx-swap="innerHTML"
class="flex flex-wrap gap-3 items-center">
<input type="text" name="output_name" value="{run.output_name}"
placeholder="Asset name" required
class="px-4 py-2 bg-dark-600 border border-dark-500 rounded-lg text-white placeholder-gray-500 focus:border-blue-500 focus:outline-none min-w-[200px]">
<button type="submit"
class="px-4 py-2 bg-blue-600 hover:bg-blue-700 text-white font-medium rounded-lg transition-colors">
Publish to L2
</button>
</form>
</div>
'''
# Delete section
delete_html = f'''
<div class="border-t border-dark-500 pt-6 mt-6">
<h2 class="text-lg font-semibold text-white mb-3">Delete Run</h2>
<p class="text-sm text-gray-400 mb-4">
{"This run failed and can be deleted." if run.status == "failed" else "Delete this run and its associated cache entries."}
</p>
<div id="delete-result"></div>
<button hx-delete="/ui/runs/{run.run_id}/discard" hx-target="#delete-result" hx-swap="innerHTML"
hx-confirm="Are you sure you want to delete this run? This cannot be undone."
class="px-4 py-2 bg-red-600 hover:bg-red-700 text-white font-medium rounded-lg transition-colors">
Delete Run
</button>
</div>
'''
output_link = ""
if run.output_hash:
output_link = f'''<div class="bg-dark-600 rounded-lg p-4">
<div class="text-sm text-gray-400 mb-1">Output</div>
<a href="/cache/{run.output_hash}" class="text-blue-400 hover:text-blue-300 font-mono text-xs">{run.output_hash}</a>
</div>'''
completed_html = ""
if run.completed_at:
completed_html = f'''<div class="bg-dark-600 rounded-lg p-4">
<div class="text-sm text-gray-400 mb-1">Completed</div>
<div class="text-gray-200">{run.completed_at[:19].replace('T', ' ')}</div>
</div>'''
content = f'''
<a href="/runs" class="inline-flex items-center text-blue-400 hover:text-blue-300 mb-6">
<svg class="w-4 h-4 mr-1" fill="none" stroke="currentColor" viewBox="0 0 24 24">
<path stroke-linecap="round" stroke-linejoin="round" stroke-width="2" d="M15 19l-7-7 7-7"/>
</svg>
Back to runs
</a>
<div class="bg-dark-700 rounded-lg p-6">
<div class="flex flex-wrap items-center justify-between gap-4 mb-6">
<div class="flex items-center gap-3">
<a href="{effect_url}" target="_blank"
class="px-3 py-1 bg-blue-600 hover:bg-blue-700 text-white text-sm font-medium rounded-full transition-colors">
{run.recipe}
</a>
<span class="text-gray-400 font-mono text-sm">{run.run_id[:16]}...</span>
</div>
<span class="px-3 py-1 {status_badge} text-sm font-medium rounded-full">{run.status}</span>
</div>
{error_html}
{media_html}
<div class="border-t border-dark-500 pt-6">
<h2 class="text-lg font-semibold text-white mb-4">Provenance</h2>
<div class="grid gap-4 sm:grid-cols-2">
<div class="bg-dark-600 rounded-lg p-4">
<div class="text-sm text-gray-400 mb-1">Owner</div>
<div class="text-gray-200">{run.username or "anonymous"}</div>
</div>
<div class="bg-dark-600 rounded-lg p-4">
<div class="text-sm text-gray-400 mb-1">Effect</div>
<a href="{effect_url}" target="_blank" class="text-blue-400 hover:text-blue-300">{run.recipe}</a>
</div>
<div class="bg-dark-600 rounded-lg p-4">
<div class="text-sm text-gray-400 mb-1">Effects Commit</div>
<div class="text-gray-200 font-mono text-xs">{run.effects_commit or "N/A"}</div>
</div>
<div class="bg-dark-600 rounded-lg p-4">
<div class="text-sm text-gray-400 mb-1">Input(s)</div>
<div>{inputs_html}</div>
</div>
{output_link}
<div class="bg-dark-600 rounded-lg p-4">
<div class="text-sm text-gray-400 mb-1">Run ID</div>
<div class="text-gray-200 font-mono text-xs">{run.run_id}</div>
</div>
<div class="bg-dark-600 rounded-lg p-4">
<div class="text-sm text-gray-400 mb-1">Created</div>
<div class="text-gray-200">{run.created_at[:19].replace('T', ' ')}</div>
</div>
{completed_html}
{infra_html}
</div>
</div>
{publish_html}
{delete_html}
</div>
'''
return HTMLResponse(render_page(f"Run: {run.recipe}", content, ctx.actor_id, active_tab="runs"))
# JSON response
return run.model_dump()
@app.get("/runs")
async def list_runs(request: Request, page: int = 1, limit: int = 20):
"""List runs. HTML for browsers (with infinite scroll), JSON for APIs (with pagination)."""
ctx = await get_user_context_from_cookie(request)
all_runs = await asyncio.to_thread(list_all_runs)
total = len(all_runs)
# Filter by user if logged in for HTML
if wants_html(request) and ctx:
all_runs = [r for r in all_runs if r.username in (ctx.username, ctx.actor_id)]
total = len(all_runs)
# Pagination
start = (page - 1) * limit
end = start + limit
runs_page = all_runs[start:end]
has_more = end < total
if wants_html(request):
if not ctx:
content = '<p class="text-gray-400 py-8 text-center"><a href="/login" class="text-blue-400 hover:text-blue-300">Login via L2</a> to see your runs.</p>'
return HTMLResponse(render_page("Runs", content, None, active_tab="runs"))
if not runs_page:
if page == 1:
content = '<p class="text-gray-400 py-8 text-center">You have no runs yet. Use the CLI to start a run.</p>'
else:
return HTMLResponse("") # Empty for infinite scroll
else:
# Status badge colors
status_colors = {
"completed": "bg-green-600 text-white",
"running": "bg-yellow-600 text-white",
"failed": "bg-red-600 text-white",
"pending": "bg-gray-600 text-white"
}
html_parts = []
for run in runs_page:
status_badge = status_colors.get(run.status, "bg-gray-600 text-white")
html_parts.append(f'''
<a href="/run/{run.run_id}" class="block">
<div class="bg-dark-700 rounded-lg p-4 hover:bg-dark-600 transition-colors">
<div class="flex flex-wrap items-center justify-between gap-3 mb-3">
<div class="flex items-center gap-3">
<span class="px-3 py-1 bg-blue-600 text-white text-sm font-medium rounded-full">{run.recipe}</span>
<span class="text-gray-400 font-mono text-xs hidden sm:inline">{run.run_id[:16]}...</span>
</div>
<span class="px-3 py-1 {status_badge} text-xs font-medium rounded-full">{run.status}</span>
</div>
<div class="text-sm text-gray-400 mb-3">
Created: {run.created_at[:19].replace('T', ' ')}
</div>
''')
# Show input and output thumbnails
has_input = run.inputs and cache_manager.has_content(run.inputs[0])
has_output = run.status == "completed" and run.output_hash and cache_manager.has_content(run.output_hash)
if has_input or has_output:
html_parts.append('<div class="grid gap-4 sm:grid-cols-2">')
if has_input:
input_hash = run.inputs[0]
input_media_type = detect_media_type(get_cache_path(input_hash))
html_parts.append(f'''
<div class="bg-dark-600 rounded-lg p-3">
<div class="text-xs text-gray-400 mb-2">Input</div>
<div class="flex justify-center">
''')
if input_media_type == "video":
html_parts.append(f'<video src="{video_src_for_request(input_hash, request)}" muted loop playsinline class="max-h-24 rounded"></video>')
else:
html_parts.append(f'<img src="/cache/{input_hash}" alt="input" class="max-h-24 rounded">')
html_parts.append('</div></div>')
if has_output:
output_hash = run.output_hash
output_media_type = detect_media_type(get_cache_path(output_hash))
html_parts.append(f'''
<div class="bg-dark-600 rounded-lg p-3">
<div class="text-xs text-gray-400 mb-2">Output</div>
<div class="flex justify-center">
''')
if output_media_type == "video":
html_parts.append(f'<video src="{video_src_for_request(output_hash, request)}" autoplay muted loop playsinline class="max-h-24 rounded"></video>')
else:
html_parts.append(f'<img src="/cache/{output_hash}" alt="output" class="max-h-24 rounded">')
html_parts.append('</div></div>')
html_parts.append('</div>')
if run.status == "failed" and run.error:
html_parts.append(f'<div class="mt-3 text-sm text-red-400">Error: {run.error[:100]}</div>')
html_parts.append('</div></a>')
# For infinite scroll, just return cards if not first page
if page > 1:
if has_more:
html_parts.append(f'''
<div hx-get="/runs?page={page + 1}" hx-trigger="revealed" hx-swap="afterend">
<p class="py-4 text-center text-gray-400">Loading more...</p>
</div>
''')
return HTMLResponse('\n'.join(html_parts))
# First page - full content
infinite_scroll_trigger = ""
if has_more:
infinite_scroll_trigger = f'''
<div hx-get="/runs?page=2" hx-trigger="revealed" hx-swap="afterend">
<p class="py-4 text-center text-gray-400">Loading more...</p>
</div>
'''
content = f'''
<h2 class="text-xl font-semibold text-white mb-6">Runs ({total} total)</h2>
<div class="space-y-4">
{''.join(html_parts)}
{infinite_scroll_trigger}
</div>
'''
return HTMLResponse(render_page("Runs", content, ctx.actor_id, active_tab="runs"))
# JSON response for APIs
return {
"runs": [r.model_dump() for r in runs_page],
"pagination": {
"page": page,
"limit": limit,
"total": total,
"has_more": has_more
}
}
# ============ Recipe Endpoints ============
@app.post("/recipes/upload")
async def upload_recipe(file: UploadFile = File(...), ctx: UserContext = Depends(get_required_user_context)):
"""Upload a recipe YAML file. Requires authentication."""
import tempfile
# Read file content
content = await file.read()
try:
yaml_content = content.decode('utf-8')
except UnicodeDecodeError:
raise HTTPException(400, "Recipe file must be valid UTF-8 text")
# Validate YAML
try:
yaml.safe_load(yaml_content)
except yaml.YAMLError as e:
raise HTTPException(400, f"Invalid YAML: {e}")
# Store YAML file in cache
with tempfile.NamedTemporaryFile(delete=False, suffix=".yaml") as tmp:
tmp.write(content)
tmp_path = Path(tmp.name)
cached, ipfs_cid = cache_manager.put(tmp_path, node_type="recipe", move=True)
recipe_hash = cached.content_hash
# Parse and save metadata
actor_id = ctx.actor_id
try:
recipe_status = parse_recipe_yaml(yaml_content, recipe_hash, actor_id)
except Exception as e:
raise HTTPException(400, f"Failed to parse recipe: {e}")
await asyncio.to_thread(save_recipe, recipe_status)
# Save cache metadata to database
await database.save_item_metadata(
content_hash=recipe_hash,
actor_id=actor_id,
item_type="recipe",
filename=file.filename,
description=recipe_status.name # Use recipe name as description
)
return {
"recipe_id": recipe_hash,
"name": recipe_status.name,
"version": recipe_status.version,
"variable_inputs": len(recipe_status.variable_inputs),
"fixed_inputs": len(recipe_status.fixed_inputs)
}
@app.get("/recipes")
async def list_recipes_api(request: Request, page: int = 1, limit: int = 20):
"""List recipes. HTML for browsers, JSON for APIs."""
ctx = await get_user_context_from_cookie(request)
all_recipes = await asyncio.to_thread(list_all_recipes)
if wants_html(request):
# HTML response
if not ctx:
return HTMLResponse(render_page(
"Recipes",
'<p class="text-gray-400 py-8 text-center"><a href="/login" class="text-blue-400 hover:text-blue-300">Login via L2</a> to see recipes.</p>',
None,
active_tab="recipes"
))
# Filter to user's recipes
user_recipes = [c for c in all_recipes if c.uploader in (ctx.username, ctx.actor_id)]
total = len(user_recipes)
if not user_recipes:
content = '''
<h2 class="text-xl font-semibold text-white mb-6">Recipes (0)</h2>
<p class="text-gray-400 py-8 text-center">No recipes yet. Upload a recipe YAML file to get started.</p>
'''
return HTMLResponse(render_page("Recipes", content, ctx.actor_id, active_tab="recipes"))
html_parts = []
for recipe in user_recipes:
var_count = len(recipe.variable_inputs)
fixed_count = len(recipe.fixed_inputs)
input_info = []
if var_count:
input_info.append(f"{var_count} variable")
if fixed_count:
input_info.append(f"{fixed_count} fixed")
inputs_str = ", ".join(input_info) if input_info else "no inputs"
html_parts.append(f'''
<a href="/recipe/{recipe.recipe_id}" class="block">
<div class="bg-dark-700 rounded-lg p-4 hover:bg-dark-600 transition-colors">
<div class="flex flex-wrap items-center justify-between gap-3 mb-3">
<div class="flex items-center gap-3">
<span class="px-3 py-1 bg-purple-600 text-white text-sm font-medium rounded-full">{recipe.name}</span>
<span class="text-gray-400 text-xs">v{recipe.version}</span>
</div>
<span class="text-xs text-gray-400">{inputs_str}</span>
</div>
<div class="text-sm text-gray-400 mb-2">
{recipe.description or "No description"}
</div>
<div class="text-xs text-gray-500 font-mono truncate">
{recipe.recipe_id[:24]}...
</div>
</div>
</a>
''')
content = f'''
<h2 class="text-xl font-semibold text-white mb-6">Recipes ({total})</h2>
<div class="space-y-4">
{''.join(html_parts)}
</div>
'''
return HTMLResponse(render_page("Recipes", content, ctx.actor_id, active_tab="recipes"))
# JSON response for APIs
total = len(all_recipes)
start = (page - 1) * limit
end = start + limit
recipes_page = all_recipes[start:end]
has_more = end < total
return {
"recipes": [c.model_dump() for c in recipes_page],
"pagination": {
"page": page,
"limit": limit,
"total": total,
"has_more": has_more
}
}
@app.get("/recipes/{recipe_id}")
async def get_recipe_api(recipe_id: str):
"""Get recipe details."""
recipe = load_recipe(recipe_id)
if not recipe:
raise HTTPException(404, f"Recipe {recipe_id} not found")
return recipe
@app.delete("/recipes/{recipe_id}")
async def remove_recipe(recipe_id: str, ctx: UserContext = Depends(get_required_user_context)):
"""Delete a recipe. Requires authentication."""
recipe = load_recipe(recipe_id)
if not recipe:
raise HTTPException(404, f"Recipe {recipe_id} not found")
# Check ownership
if recipe.uploader not in (ctx.username, ctx.actor_id):
raise HTTPException(403, "Access denied")
# Check if pinned
pinned, reason = cache_manager.is_pinned(recipe_id)
if pinned:
raise HTTPException(400, f"Cannot delete pinned recipe: {reason}")
# Delete from Redis and cache
delete_recipe_from_redis(recipe_id)
cache_manager.delete_by_content_hash(recipe_id)
return {"deleted": True, "recipe_id": recipe_id}
@app.post("/recipes/{recipe_id}/run")
async def run_recipe(recipe_id: str, request: RecipeRunRequest, ctx: UserContext = Depends(get_required_user_context)):
"""Run a recipe with provided variable inputs. Requires authentication."""
recipe = await asyncio.to_thread(load_recipe, recipe_id)
if not recipe:
raise HTTPException(404, f"Recipe {recipe_id} not found")
# Validate all required inputs are provided
for var_input in recipe.variable_inputs:
if var_input.required and var_input.node_id not in request.inputs:
raise HTTPException(400, f"Missing required input: {var_input.name}")
# Load recipe YAML
recipe_path = await asyncio.to_thread(cache_manager.get_by_content_hash, recipe_id)
if not recipe_path:
raise HTTPException(500, "Recipe YAML not found in cache")
with open(recipe_path) as f:
yaml_config = yaml.safe_load(f)
# Build DAG from recipe
dag = build_dag_from_recipe(yaml_config, request.inputs, recipe)
# Create run
run_id = str(uuid.uuid4())
actor_id = ctx.actor_id
# Collect all input hashes
all_inputs = list(request.inputs.values())
for fixed in recipe.fixed_inputs:
if fixed.content_hash:
all_inputs.append(fixed.content_hash)
run = RunStatus(
run_id=run_id,
status="pending",
recipe=f"recipe:{recipe.name}",
inputs=all_inputs,
output_name=f"{recipe.name}-{run_id[:8]}",
created_at=datetime.now(timezone.utc).isoformat(),
username=actor_id
)
# Submit to Celery
dag_json = dag.to_json()
task = execute_dag.delay(dag_json, run.run_id)
run.celery_task_id = task.id
run.status = "running"
await asyncio.to_thread(save_run, run)
return run
def build_dag_from_recipe(yaml_config: dict, user_inputs: dict[str, str], recipe: RecipeStatus):
"""Build a DAG from recipe YAML with user-provided inputs."""
from artdag import DAG, Node
dag = DAG()
name_to_id = {} # Map YAML node names to content-addressed IDs
registry = yaml_config.get("registry", {})
assets = registry.get("assets", {})
effects = registry.get("effects", {})
dag_config = yaml_config.get("dag", {})
nodes = dag_config.get("nodes", [])
output_node = dag_config.get("output")
# First pass: create all nodes and map names to IDs
for node_def in nodes:
node_name = node_def.get("id")
node_type = node_def.get("type")
node_config = node_def.get("config", {})
if node_type == "SOURCE":
if node_config.get("input"):
# Variable input - use user-provided hash
content_hash = user_inputs.get(node_name)
if not content_hash:
raise HTTPException(400, f"Missing input for node {node_name}")
node = Node(
node_type="SOURCE",
config={"content_hash": content_hash},
inputs=[],
name=node_name
)
else:
# Fixed input - use registry hash
asset_name = node_config.get("asset")
asset_info = assets.get(asset_name, {})
content_hash = asset_info.get("hash")
if not content_hash:
raise HTTPException(400, f"Asset {asset_name} not found in registry")
node = Node(
node_type="SOURCE",
config={"content_hash": content_hash},
inputs=[],
name=node_name
)
name_to_id[node_name] = node.node_id
dag.add_node(node)
# Second pass: create nodes with inputs (now we can resolve input names to IDs)
for node_def in nodes:
node_name = node_def.get("id")
node_type = node_def.get("type")
node_config = node_def.get("config", {})
input_names = node_def.get("inputs", [])
# Skip SOURCE nodes (already added)
if node_type == "SOURCE":
continue
# Resolve input names to content-addressed IDs
input_ids = [name_to_id[name] for name in input_names if name in name_to_id]
if node_type == "EFFECT":
effect_name = node_config.get("effect")
effect_info = effects.get(effect_name, {})
effect_hash = effect_info.get("hash")
node = Node(
node_type="EFFECT",
config={"effect": effect_name, "effect_hash": effect_hash},
inputs=input_ids,
name=node_name
)
else:
node = Node(
node_type=node_type,
config=node_config,
inputs=input_ids,
name=node_name
)
name_to_id[node_name] = node.node_id
dag.add_node(node)
# Set output node
if output_node and output_node in name_to_id:
dag.set_output(name_to_id[output_node])
return dag
# ============ Recipe UI Pages ============
@app.get("/recipe/{recipe_id}", response_class=HTMLResponse)
async def recipe_detail_page(recipe_id: str, request: Request):
"""Recipe detail page with run form."""
ctx = await get_user_context_from_cookie(request)
recipe = load_recipe(recipe_id)
if not recipe:
return HTMLResponse(render_page(
"Recipe Not Found",
f'<p class="text-red-400">Recipe {recipe_id} not found.</p>',
ctx.actor_id if ctx else None,
active_tab="recipes"
), status_code=404)
# Build variable inputs form
var_inputs_html = ""
if recipe.variable_inputs:
var_inputs_html = '<div class="space-y-4 mb-6">'
for var_input in recipe.variable_inputs:
required = "required" if var_input.required else ""
var_inputs_html += f'''
<div>
<label class="block text-sm font-medium text-gray-300 mb-2">
{var_input.name} {'*' if var_input.required else ''}
</label>
<input type="text" name="{var_input.node_id}" {required}
placeholder="Content hash..."
class="w-full px-4 py-2 bg-dark-600 border border-dark-500 rounded-lg text-white focus:ring-2 focus:ring-blue-500 focus:border-transparent">
<p class="text-xs text-gray-400 mt-1">{var_input.description or 'Enter a content hash from your cache'}</p>
</div>
'''
var_inputs_html += '</div>'
else:
var_inputs_html = '<p class="text-gray-400 mb-4">This recipe has no variable inputs - it uses fixed assets only.</p>'
# Build fixed inputs display
fixed_inputs_html = ""
if recipe.fixed_inputs:
fixed_inputs_html = '<div class="mt-4"><h4 class="text-sm font-medium text-gray-300 mb-2">Fixed Inputs</h4><ul class="text-sm text-gray-400 space-y-1">'
for fixed in recipe.fixed_inputs:
fixed_inputs_html += f'<li><span class="text-gray-500">{fixed.asset}:</span> <span class="font-mono text-xs">{fixed.content_hash[:16]}...</span></li>'
fixed_inputs_html += '</ul></div>'
# Check if pinned
pinned, pin_reason = cache_manager.is_pinned(recipe_id)
pinned_badge = ""
if pinned:
pinned_badge = f'<span class="px-2 py-1 bg-yellow-600 text-white text-xs rounded-full ml-2">Pinned: {pin_reason}</span>'
content = f'''
<div class="mb-6">
<a href="/recipes" class="text-blue-400 hover:text-blue-300 text-sm">&larr; Back to recipes</a>
</div>
<div class="bg-dark-700 rounded-lg p-6 mb-6">
<div class="flex items-center gap-3 mb-4">
<h2 class="text-2xl font-bold text-white">{recipe.name}</h2>
<span class="px-2 py-1 bg-gray-600 text-white text-xs rounded-full">v{recipe.version}</span>
{pinned_badge}
</div>
<p class="text-gray-400 mb-4">{recipe.description or 'No description'}</p>
<div class="text-xs text-gray-500 font-mono">{recipe.recipe_id}</div>
{fixed_inputs_html}
</div>
<div class="bg-dark-700 rounded-lg p-6">
<h3 class="text-lg font-semibold text-white mb-4">Run this Recipe</h3>
<form hx-post="/ui/recipes/{recipe_id}/run" hx-target="#run-result" hx-swap="innerHTML">
{var_inputs_html}
<div id="run-result"></div>
<button type="submit"
class="px-6 py-2 bg-green-600 hover:bg-green-700 text-white font-medium rounded-lg transition-colors">
Run Recipe
</button>
</form>
</div>
'''
return HTMLResponse(render_page(f"Recipe: {recipe.name}", content, ctx.actor_id if ctx else None, active_tab="recipes"))
@app.post("/ui/recipes/{recipe_id}/run", response_class=HTMLResponse)
async def ui_run_recipe(recipe_id: str, request: Request):
"""HTMX handler: run a recipe with form inputs."""
ctx = await get_user_context_from_cookie(request)
if not ctx:
return '<div class="bg-red-900/50 border border-red-700 text-red-300 px-4 py-3 rounded-lg mb-4">Login required</div>'
recipe = load_recipe(recipe_id)
if not recipe:
return '<div class="bg-red-900/50 border border-red-700 text-red-300 px-4 py-3 rounded-lg mb-4">Recipe not found</div>'
# Parse form data
form_data = await request.form()
inputs = {}
for var_input in recipe.variable_inputs:
value = form_data.get(var_input.node_id, "").strip()
if var_input.required and not value:
return f'<div class="bg-red-900/50 border border-red-700 text-red-300 px-4 py-3 rounded-lg mb-4">Missing required input: {var_input.name}</div>'
if value:
inputs[var_input.node_id] = value
# Load recipe YAML
recipe_path = cache_manager.get_by_content_hash(recipe_id)
if not recipe_path:
return '<div class="bg-red-900/50 border border-red-700 text-red-300 px-4 py-3 rounded-lg mb-4">Recipe YAML not found in cache</div>'
try:
with open(recipe_path) as f:
yaml_config = yaml.safe_load(f)
# Build DAG from recipe
dag = build_dag_from_recipe(yaml_config, inputs, recipe)
# Create run
run_id = str(uuid.uuid4())
actor_id = ctx.actor_id
# Collect all input hashes
all_inputs = list(inputs.values())
for fixed in recipe.fixed_inputs:
if fixed.content_hash:
all_inputs.append(fixed.content_hash)
run = RunStatus(
run_id=run_id,
status="pending",
recipe=f"recipe:{recipe.name}",
inputs=all_inputs,
output_name=f"{recipe.name}-{run_id[:8]}",
created_at=datetime.now(timezone.utc).isoformat(),
username=actor_id
)
# Submit to Celery
dag_json = dag.to_json()
task = execute_dag.delay(dag_json, run.run_id)
run.celery_task_id = task.id
run.status = "running"
save_run(run)
return f'''
<div class="bg-green-900/50 border border-green-700 text-green-300 px-4 py-3 rounded-lg mb-4">
Run started! <a href="/run/{run_id}" class="underline">View run</a>
</div>
'''
except Exception as e:
return f'<div class="bg-red-900/50 border border-red-700 text-red-300 px-4 py-3 rounded-lg mb-4">Error: {str(e)}</div>'
@app.get("/ui/recipes-list", response_class=HTMLResponse)
async def ui_recipes_list(request: Request):
"""HTMX partial: list of recipes."""
ctx = await get_user_context_from_cookie(request)
if not ctx:
return '<p class="text-gray-400 py-8 text-center"><a href="/ui/login" class="text-blue-400 hover:text-blue-300">Login via L2</a> to see recipes.</p>'
all_recipes = list_all_recipes()
# Filter to user's recipes
user_recipes = [c for c in all_recipes if c.uploader in (ctx.username, ctx.actor_id)]
if not user_recipes:
return '<p class="text-gray-400 py-8 text-center">No recipes yet. Upload a recipe YAML file to get started.</p>'
html_parts = ['<div class="space-y-4">']
for recipe in user_recipes:
var_count = len(recipe.variable_inputs)
fixed_count = len(recipe.fixed_inputs)
input_info = []
if var_count:
input_info.append(f"{var_count} variable")
if fixed_count:
input_info.append(f"{fixed_count} fixed")
inputs_str = ", ".join(input_info) if input_info else "no inputs"
html_parts.append(f'''
<a href="/recipe/{recipe.recipe_id}" class="block">
<div class="bg-dark-700 rounded-lg p-4 hover:bg-dark-600 transition-colors">
<div class="flex flex-wrap items-center justify-between gap-3 mb-3">
<div class="flex items-center gap-3">
<span class="px-3 py-1 bg-purple-600 text-white text-sm font-medium rounded-full">{recipe.name}</span>
<span class="text-gray-400 text-xs">v{recipe.version}</span>
</div>
<span class="text-xs text-gray-400">{inputs_str}</span>
</div>
<div class="text-sm text-gray-400 mb-2">
{recipe.description or "No description"}
</div>
<div class="text-xs text-gray-500 font-mono truncate">
{recipe.recipe_id[:24]}...
</div>
</div>
</a>
''')
html_parts.append('</div>')
return '\n'.join(html_parts)
@app.delete("/ui/recipes/{recipe_id}/discard", response_class=HTMLResponse)
async def ui_discard_recipe(recipe_id: str, request: Request):
"""HTMX handler: discard a recipe."""
ctx = await get_user_context_from_cookie(request)
if not ctx:
return '<div class="bg-red-900/50 border border-red-700 text-red-300 px-4 py-3 rounded-lg mb-4">Login required</div>'
recipe = load_recipe(recipe_id)
if not recipe:
return '<div class="bg-red-900/50 border border-red-700 text-red-300 px-4 py-3 rounded-lg mb-4">Recipe not found</div>'
# Check ownership
if recipe.uploader not in (ctx.username, ctx.actor_id):
return '<div class="bg-red-900/50 border border-red-700 text-red-300 px-4 py-3 rounded-lg mb-4">Access denied</div>'
# Check if pinned
pinned, reason = cache_manager.is_pinned(recipe_id)
if pinned:
return f'<div class="bg-red-900/50 border border-red-700 text-red-300 px-4 py-3 rounded-lg mb-4">Cannot delete: recipe is pinned ({reason})</div>'
# Delete from Redis and cache
delete_recipe_from_redis(recipe_id)
cache_manager.delete_by_content_hash(recipe_id)
return '''
<div class="bg-green-900/50 border border-green-700 text-green-300 px-4 py-3 rounded-lg mb-4">
Recipe deleted. <a href="/recipes" class="underline">Back to recipes</a>
</div>
'''
@app.get("/cache/{content_hash}")
async def get_cached(content_hash: str, request: Request):
"""Get cached content by hash. Content negotiation: HTML for browsers, JSON for APIs, file for downloads."""
start = time.time()
accept = request.headers.get("accept", "")
logger.info(f"get_cached: {content_hash[:16]}... Accept={accept[:50]}")
ctx = await get_user_context_from_cookie(request)
cache_path = get_cache_path(content_hash)
if not cache_path:
logger.info(f"get_cached: Not found, took {time.time()-start:.3f}s")
if wants_html(request):
content = f'<p class="text-red-400">Content not found: {content_hash}</p>'
return HTMLResponse(render_page("Not Found", content, ctx.actor_id if ctx else None, active_tab="media"), status_code=404)
raise HTTPException(404, f"Content {content_hash} not in cache")
# JSON response for API clients
if "application/json" in accept:
t0 = time.time()
meta = await database.load_item_metadata(content_hash, ctx.actor_id if ctx else None)
logger.debug(f"get_cached: load_item_metadata took {time.time()-t0:.3f}s")
t0 = time.time()
cache_item = await database.get_cache_item(content_hash)
logger.debug(f"get_cached: get_cache_item took {time.time()-t0:.3f}s")
ipfs_cid = cache_item.get("ipfs_cid") if cache_item else None
file_size = cache_path.stat().st_size
media_type = detect_media_type(cache_path)
logger.info(f"get_cached: JSON response, ipfs_cid={ipfs_cid[:16] if ipfs_cid else 'None'}..., took {time.time()-start:.3f}s")
return {
"content_hash": content_hash,
"size": file_size,
"media_type": media_type,
"ipfs_cid": ipfs_cid,
"meta": meta
}
# HTML response for browsers - show detail page
if wants_html(request):
if not ctx:
content = '<p class="text-gray-400 py-8 text-center"><a href="/login" class="text-blue-400 hover:text-blue-300">Login via L2</a> to view cached content.</p>'
return HTMLResponse(render_page("Login Required", content, None, active_tab="media"), status_code=401)
# Check user has access
user_hashes = await get_user_cache_hashes(ctx.username, ctx.actor_id)
if content_hash not in user_hashes:
content = '<p class="text-red-400 py-8 text-center">Access denied.</p>'
return HTMLResponse(render_page("Access Denied", content, ctx.actor_id, active_tab="media"), status_code=403)
media_type = detect_media_type(cache_path)
file_size = cache_path.stat().st_size
size_str = f"{file_size:,} bytes"
if file_size > 1024*1024:
size_str = f"{file_size/(1024*1024):.1f} MB"
elif file_size > 1024:
size_str = f"{file_size/1024:.1f} KB"
# Get IPFS CID from database
cache_item = await database.get_cache_item(content_hash)
ipfs_cid = cache_item.get("ipfs_cid") if cache_item else None
# Build media display HTML
if media_type == "video":
video_src = video_src_for_request(content_hash, request)
media_html = f'<video src="{video_src}" controls autoplay muted loop playsinline class="max-w-full max-h-96 rounded-lg"></video>'
elif media_type == "image":
media_html = f'<img src="/cache/{content_hash}/raw" alt="{content_hash}" class="max-w-full max-h-96 rounded-lg">'
else:
media_html = f'<p class="text-gray-400">Unknown file type. <a href="/cache/{content_hash}/raw" download class="text-blue-400 hover:text-blue-300">Download file</a></p>'
content = f'''
<a href="/media" class="inline-flex items-center text-blue-400 hover:text-blue-300 mb-6">
<svg class="w-4 h-4 mr-1" fill="none" stroke="currentColor" viewBox="0 0 24 24">
<path stroke-linecap="round" stroke-linejoin="round" stroke-width="2" d="M15 19l-7-7 7-7"/>
</svg>
Back to media
</a>
<div class="bg-dark-700 rounded-lg p-6">
<div class="flex flex-wrap items-center justify-between gap-4 mb-6">
<div class="flex items-center gap-3">
<span class="px-3 py-1 bg-blue-600 text-white text-sm font-medium rounded-full">{media_type.capitalize()}</span>
<span class="text-gray-400 font-mono text-sm">{content_hash[:24]}...</span>
</div>
<a href="/cache/{content_hash}/raw" download
class="px-4 py-2 bg-green-600 hover:bg-green-700 text-white font-medium rounded-lg transition-colors">
Download
</a>
</div>
<div class="flex justify-center mb-8">
{media_html}
</div>
<div class="border-t border-dark-500 pt-6">
<h2 class="text-lg font-semibold text-white mb-4">Details</h2>
<div class="grid gap-4 sm:grid-cols-2">
<div class="bg-dark-600 rounded-lg p-4">
<div class="text-sm text-gray-400 mb-1">Content Hash (SHA3-256)</div>
<div class="font-mono text-xs text-gray-200 break-all">{content_hash}</div>
</div>
<div class="bg-dark-600 rounded-lg p-4">
<div class="text-sm text-gray-400 mb-1">Type</div>
<div class="text-gray-200">{media_type}</div>
</div>
<div class="bg-dark-600 rounded-lg p-4">
<div class="text-sm text-gray-400 mb-1">Size</div>
<div class="text-gray-200">{size_str}</div>
</div>
<div class="bg-dark-600 rounded-lg p-4">
<div class="text-sm text-gray-400 mb-1">Raw URL</div>
<div class="text-blue-400 text-sm truncate">
<a href="/cache/{content_hash}/raw" class="hover:text-blue-300">/cache/{content_hash}/raw</a>
</div>
</div>
</div>
</div>
'''
# Add IPFS section if we have a CID
if ipfs_cid:
gateway_links = []
if IPFS_GATEWAY_URL:
gateway_links.append(f'''
<a href="{IPFS_GATEWAY_URL}/ipfs/{ipfs_cid}" target="_blank" rel="noopener"
class="px-3 py-1 bg-green-600 hover:bg-green-700 text-white text-sm rounded-lg transition-colors">
Local Gateway
</a>''')
gateway_links.extend([
f'''<a href="https://ipfs.io/ipfs/{ipfs_cid}" target="_blank" rel="noopener"
class="px-3 py-1 bg-purple-600 hover:bg-purple-700 text-white text-sm rounded-lg transition-colors">
ipfs.io
</a>''',
f'''<a href="https://dweb.link/ipfs/{ipfs_cid}" target="_blank" rel="noopener"
class="px-3 py-1 bg-purple-600 hover:bg-purple-700 text-white text-sm rounded-lg transition-colors">
dweb.link
</a>''',
f'''<a href="https://cloudflare-ipfs.com/ipfs/{ipfs_cid}" target="_blank" rel="noopener"
class="px-3 py-1 bg-purple-600 hover:bg-purple-700 text-white text-sm rounded-lg transition-colors">
Cloudflare
</a>''',
])
gateways_html = '\n'.join(gateway_links)
content += f'''
<div class="border-t border-dark-500 pt-6 mt-6">
<h2 class="text-lg font-semibold text-white mb-4">IPFS</h2>
<div class="bg-dark-600 rounded-lg p-4 mb-4">
<div class="text-sm text-gray-400 mb-1">Content Identifier (CID)</div>
<div class="font-mono text-xs text-gray-200 break-all">{ipfs_cid}</div>
</div>
<div class="text-sm text-gray-400 mb-2">Gateways:</div>
<div class="flex flex-wrap gap-2">
{gateways_html}
</div>
</div>
'''
else:
content += '''
<div class="border-t border-dark-500 pt-6 mt-6">
<h2 class="text-lg font-semibold text-white mb-4">IPFS</h2>
<div class="text-gray-400 text-sm">Not yet uploaded to IPFS</div>
</div>
'''
content += f'''
<!-- Metadata Section -->
<div class="border-t border-dark-500 pt-6 mt-6" id="metadata-section"
hx-get="/cache/{content_hash}/meta-form" hx-trigger="load" hx-swap="innerHTML">
<div class="text-gray-400">Loading metadata...</div>
</div>
</div>
'''
return HTMLResponse(render_page(f"Cache: {content_hash[:16]}...", content, ctx.actor_id, active_tab="media"))
# Default: return raw file
return FileResponse(cache_path)
@app.get("/cache/{content_hash}/raw")
async def get_cached_raw(content_hash: str):
"""Get raw cached content (file download)."""
cache_path = get_cache_path(content_hash)
if not cache_path:
raise HTTPException(404, f"Content {content_hash} not in cache")
return FileResponse(cache_path)
@app.get("/cache/{content_hash}/mp4")
async def get_cached_mp4(content_hash: str):
"""Get cached content as MP4 (transcodes MKV on first request, caches result)."""
cache_path = get_cache_path(content_hash)
if not cache_path:
raise HTTPException(404, f"Content {content_hash} not in cache")
# MP4 transcodes stored alongside original in CACHE_DIR
mp4_path = CACHE_DIR / f"{content_hash}.mp4"
# If MP4 already cached, serve it
if mp4_path.exists():
return FileResponse(mp4_path, media_type="video/mp4")
# Check if source is already MP4
media_type = detect_media_type(cache_path)
if media_type != "video":
raise HTTPException(400, "Content is not a video")
# Check if already MP4 format
import subprocess
try:
result = subprocess.run(
["ffprobe", "-v", "error", "-select_streams", "v:0",
"-show_entries", "format=format_name", "-of", "csv=p=0", str(cache_path)],
capture_output=True, text=True, timeout=10
)
if "mp4" in result.stdout.lower() or "mov" in result.stdout.lower():
# Already MP4-compatible, just serve original
return FileResponse(cache_path, media_type="video/mp4")
except Exception:
pass # Continue with transcoding
# Transcode to MP4 (H.264 + AAC)
transcode_path = CACHE_DIR / f"{content_hash}.transcoding.mp4"
try:
result = subprocess.run(
["ffmpeg", "-y", "-i", str(cache_path),
"-c:v", "libx264", "-preset", "fast", "-crf", "23",
"-c:a", "aac", "-b:a", "128k",
"-movflags", "+faststart",
str(transcode_path)],
capture_output=True, text=True, timeout=600 # 10 min timeout
)
if result.returncode != 0:
raise HTTPException(500, f"Transcoding failed: {result.stderr[:200]}")
# Move to final location
transcode_path.rename(mp4_path)
except subprocess.TimeoutExpired:
if transcode_path.exists():
transcode_path.unlink()
raise HTTPException(500, "Transcoding timed out")
except Exception as e:
if transcode_path.exists():
transcode_path.unlink()
raise HTTPException(500, f"Transcoding failed: {e}")
return FileResponse(mp4_path, media_type="video/mp4")
@app.get("/cache/{content_hash}/meta-form", response_class=HTMLResponse)
async def cache_meta_form(content_hash: str, request: Request):
"""Clean URL redirect to the HTMX meta form."""
# Just redirect to the old endpoint for now
from starlette.responses import RedirectResponse
return RedirectResponse(f"/ui/cache/{content_hash}/meta-form", status_code=302)
@app.get("/ui/cache/{content_hash}/meta-form", response_class=HTMLResponse)
async def ui_cache_meta_form(content_hash: str, request: Request):
"""HTMX partial: metadata editing form for a cached item."""
ctx = await get_user_context_from_cookie(request)
if not ctx:
return '<div class="text-red-400">Login required to edit metadata</div>'
# Check ownership
user_hashes = await get_user_cache_hashes(ctx.username, ctx.actor_id)
if content_hash not in user_hashes:
return '<div class="text-red-400">Access denied</div>'
# Load metadata
meta = await database.load_item_metadata(content_hash, ctx.actor_id)
origin = meta.get("origin", {})
origin_type = origin.get("type", "")
origin_url = origin.get("url", "")
origin_note = origin.get("note", "")
description = meta.get("description", "")
tags = meta.get("tags", [])
tags_str = ", ".join(tags) if tags else ""
l2_shares = meta.get("l2_shares", [])
pinned = meta.get("pinned", False)
pin_reason = meta.get("pin_reason", "")
# Detect media type for publish
cache_path = get_cache_path(content_hash)
media_type = detect_media_type(cache_path) if cache_path else "unknown"
asset_type = "video" if media_type == "video" else "image"
# Origin radio checked states
self_checked = 'checked' if origin_type == "self" else ''
external_checked = 'checked' if origin_type == "external" else ''
# Build publish section - show list of L2 shares
if l2_shares:
shares_html = ""
for share in l2_shares:
l2_server = share.get("l2_server", "Unknown")
asset_name = share.get("asset_name", "")
published_at = share.get("published_at", "")[:10] if share.get("published_at") else ""
last_synced = share.get("last_synced_at", "")[:10] if share.get("last_synced_at") else ""
asset_url = f"{l2_server}/assets/{asset_name}"
shares_html += f'''
<div class="flex justify-between items-start py-2 border-b border-green-800 last:border-0">
<div>
<a href="{asset_url}" target="_blank" class="text-white font-medium hover:text-blue-300">{asset_name}</a>
<div class="text-xs text-gray-400">{l2_server}</div>
</div>
<div class="text-right text-xs text-gray-400">
Published: {published_at}<br>
Synced: {last_synced}
</div>
</div>
'''
publish_html = f'''
<div class="bg-green-900/30 border border-green-700 rounded-lg p-4 mb-4">
<div class="text-green-400 font-medium mb-2">Published to L2 ({len(l2_shares)} share{"s" if len(l2_shares) != 1 else ""})</div>
<div class="text-sm">
{shares_html}
</div>
</div>
<div id="republish-result"></div>
<button hx-patch="/ui/cache/{content_hash}/republish" hx-target="#republish-result" hx-swap="innerHTML"
class="px-4 py-2 bg-blue-600 hover:bg-blue-700 text-white font-medium rounded-lg transition-colors">
Update on L2
</button>
'''
else:
# Show publish form only if origin is set
if origin_type:
publish_html = f'''
<div id="publish-result"></div>
<form hx-post="/ui/cache/{content_hash}/publish" hx-target="#publish-result" hx-swap="innerHTML"
class="flex flex-wrap gap-3 items-end">
<div>
<label class="block text-sm text-gray-400 mb-1">Asset Name</label>
<input type="text" name="asset_name" placeholder="my-{asset_type}" required
class="px-4 py-2 bg-dark-600 border border-dark-500 rounded-lg text-white placeholder-gray-500 focus:border-blue-500 focus:outline-none min-w-[200px]">
</div>
<input type="hidden" name="asset_type" value="{asset_type}">
<button type="submit"
class="px-4 py-2 bg-blue-600 hover:bg-blue-700 text-white font-medium rounded-lg transition-colors">
Publish to L2
</button>
</form>
'''
else:
publish_html = '''
<div class="bg-yellow-900/30 border border-yellow-700 text-yellow-300 px-4 py-3 rounded-lg">
Set an origin (self or external URL) before publishing.
</div>
'''
return f'''
<h2 class="text-lg font-semibold text-white mb-4">Metadata</h2>
<div id="meta-save-result"></div>
<form hx-patch="/ui/cache/{content_hash}/meta" hx-target="#meta-save-result" hx-swap="innerHTML" class="space-y-4 mb-6">
<!-- Origin -->
<div class="bg-dark-600 rounded-lg p-4">
<label class="block text-sm font-medium text-gray-300 mb-3">Origin</label>
<div class="space-y-3">
<label class="flex items-center gap-3 cursor-pointer">
<input type="radio" name="origin_type" value="self" {self_checked}
class="w-4 h-4 text-blue-600 bg-dark-500 border-dark-400">
<span class="text-gray-200">Created by me (original content)</span>
</label>
<label class="flex items-center gap-3 cursor-pointer">
<input type="radio" name="origin_type" value="external" {external_checked}
class="w-4 h-4 text-blue-600 bg-dark-500 border-dark-400">
<span class="text-gray-200">External source</span>
</label>
<div class="ml-7 space-y-2">
<input type="url" name="origin_url" value="{origin_url}" placeholder="https://example.com/source"
class="w-full px-3 py-2 bg-dark-500 border border-dark-400 rounded-lg text-white placeholder-gray-500 focus:border-blue-500 focus:outline-none text-sm">
<input type="text" name="origin_note" value="{origin_note}" placeholder="Note (optional)"
class="w-full px-3 py-2 bg-dark-500 border border-dark-400 rounded-lg text-white placeholder-gray-500 focus:border-blue-500 focus:outline-none text-sm">
</div>
</div>
</div>
<!-- Description -->
<div class="bg-dark-600 rounded-lg p-4">
<label class="block text-sm font-medium text-gray-300 mb-2">Description</label>
<textarea name="description" rows="2" placeholder="Optional description..."
class="w-full px-3 py-2 bg-dark-500 border border-dark-400 rounded-lg text-white placeholder-gray-500 focus:border-blue-500 focus:outline-none text-sm">{description}</textarea>
</div>
<!-- Tags -->
<div class="bg-dark-600 rounded-lg p-4">
<label class="block text-sm font-medium text-gray-300 mb-2">Tags</label>
<input type="text" name="tags" value="{tags_str}" placeholder="tag1, tag2, tag3"
class="w-full px-3 py-2 bg-dark-500 border border-dark-400 rounded-lg text-white placeholder-gray-500 focus:border-blue-500 focus:outline-none text-sm">
<p class="text-xs text-gray-500 mt-1">Comma-separated list</p>
</div>
<button type="submit"
class="px-4 py-2 bg-green-600 hover:bg-green-700 text-white font-medium rounded-lg transition-colors">
Save Metadata
</button>
</form>
<!-- Publishing Section -->
<div class="border-t border-dark-500 pt-6">
<h3 class="text-lg font-semibold text-white mb-4">Publish to L2 (ActivityPub)</h3>
{publish_html}
</div>
<!-- Status & Actions Section -->
<div class="border-t border-dark-500 pt-6 mt-6">
<h3 class="text-lg font-semibold text-white mb-4">Status</h3>
<div class="bg-dark-600 rounded-lg p-4 mb-4">
<div class="flex items-center gap-2 mb-2">
<span class="text-sm text-gray-400">Pinned:</span>
{'<span class="text-green-400">Yes</span>' if pinned else '<span class="text-gray-500">No</span>'}
{f'<span class="text-xs text-gray-500 ml-2">({pin_reason})</span>' if pinned and pin_reason else ''}
</div>
<p class="text-xs text-gray-500">Pinned items cannot be discarded. Items are pinned when published or used as inputs to published content.</p>
</div>
<div id="discard-result"></div>
{'<p class="text-gray-500 text-sm">Cannot discard pinned items.</p>' if pinned else f"""
<button hx-delete="/ui/cache/{content_hash}/discard" hx-target="#discard-result" hx-swap="innerHTML"
hx-confirm="Are you sure you want to discard this item? This cannot be undone."
class="px-4 py-2 bg-red-600 hover:bg-red-700 text-white font-medium rounded-lg transition-colors">
Discard Item
</button>
"""}
</div>
'''
@app.patch("/ui/cache/{content_hash}/meta", response_class=HTMLResponse)
async def ui_update_cache_meta(content_hash: str, request: Request):
"""HTMX handler: update cache metadata from form."""
ctx = await get_user_context_from_cookie(request)
if not ctx:
return '<div class="bg-red-900/50 border border-red-700 text-red-300 px-4 py-3 rounded-lg mb-4">Login required</div>'
# Check ownership
user_hashes = await get_user_cache_hashes(ctx.username, ctx.actor_id)
if content_hash not in user_hashes:
return '<div class="bg-red-900/50 border border-red-700 text-red-300 px-4 py-3 rounded-lg mb-4">Access denied</div>'
# Parse form data
form = await request.form()
origin_type = form.get("origin_type", "")
origin_url = form.get("origin_url", "").strip()
origin_note = form.get("origin_note", "").strip()
description = form.get("description", "").strip()
tags_str = form.get("tags", "").strip()
# Build origin
source_type = None
if origin_type == "self":
source_type = "self"
elif origin_type == "external":
if not origin_url:
return '<div class="bg-red-900/50 border border-red-700 text-red-300 px-4 py-3 rounded-lg mb-4">External origin requires a URL</div>'
source_type = "external"
# Parse tags
tags = [t.strip() for t in tags_str.split(",") if t.strip()] if tags_str else []
# Save to database
await database.update_item_metadata(
content_hash=content_hash,
actor_id=ctx.actor_id,
item_type="media",
description=description if description else None,
source_type=source_type,
source_url=origin_url if origin_url else None,
source_note=origin_note if origin_note else None,
tags=tags
)
return '<div class="bg-green-900/50 border border-green-700 text-green-300 px-4 py-3 rounded-lg mb-4">Metadata saved!</div>'
@app.post("/ui/cache/{content_hash}/publish", response_class=HTMLResponse)
async def ui_publish_cache(content_hash: str, request: Request):
"""HTMX handler: publish cache item to L2."""
ctx = await get_user_context_from_cookie(request)
if not ctx:
return '<div class="bg-red-900/50 border border-red-700 text-red-300 px-4 py-3 rounded-lg mb-4">Login required</div>'
token = request.cookies.get("auth_token")
if not token:
return '<div class="bg-red-900/50 border border-red-700 text-red-300 px-4 py-3 rounded-lg mb-4">Auth token required</div>'
# Check ownership
user_hashes = await get_user_cache_hashes(ctx.username, ctx.actor_id)
if content_hash not in user_hashes:
return '<div class="bg-red-900/50 border border-red-700 text-red-300 px-4 py-3 rounded-lg mb-4">Access denied</div>'
# Parse form
form = await request.form()
asset_name = form.get("asset_name", "").strip()
asset_type = form.get("asset_type", "image")
if not asset_name:
return '<div class="bg-red-900/50 border border-red-700 text-red-300 px-4 py-3 rounded-lg mb-4">Asset name required</div>'
# Load metadata from database
meta = await database.load_item_metadata(content_hash, ctx.actor_id)
origin = meta.get("origin")
if not origin or "type" not in origin:
return '<div class="bg-red-900/50 border border-red-700 text-red-300 px-4 py-3 rounded-lg mb-4">Set origin before publishing</div>'
# Get IPFS CID from cache item
cache_item = await database.get_cache_item(content_hash)
ipfs_cid = cache_item.get("ipfs_cid") if cache_item else None
# Call the L2 server from user's context
l2_server = ctx.l2_server
try:
resp = http_requests.post(
f"{l2_server}/assets/publish-cache",
headers={"Authorization": f"Bearer {token}"},
json={
"content_hash": content_hash,
"ipfs_cid": ipfs_cid,
"asset_name": asset_name,
"asset_type": asset_type,
"origin": origin,
"description": meta.get("description"),
"tags": meta.get("tags", []),
"metadata": {
"filename": meta.get("filename"),
"folder": meta.get("folder"),
"collections": meta.get("collections", [])
}
},
timeout=30
)
resp.raise_for_status()
except http_requests.exceptions.HTTPError as e:
error_detail = ""
try:
error_detail = e.response.json().get("detail", str(e))
except Exception:
error_detail = str(e)
return f'<div class="bg-red-900/50 border border-red-700 text-red-300 px-4 py-3 rounded-lg mb-4">Error: {error_detail}</div>'
except Exception as e:
return f'<div class="bg-red-900/50 border border-red-700 text-red-300 px-4 py-3 rounded-lg mb-4">Error: {e}</div>'
# Save L2 share to database and pin the item
await database.save_l2_share(
content_hash=content_hash,
actor_id=ctx.actor_id,
l2_server=l2_server,
asset_name=asset_name,
content_type=asset_type
)
await database.update_item_metadata(
content_hash=content_hash,
actor_id=ctx.actor_id,
pinned=True,
pin_reason="published"
)
# Use HTTPS for L2 links
l2_https = l2_server.replace("http://", "https://")
return f'''
<div class="bg-green-900/50 border border-green-700 text-green-300 px-4 py-3 rounded-lg mb-4">
Published to L2 as <strong>{asset_name}</strong>!
<a href="{l2_https}/ui/asset/{asset_name}" target="_blank" class="underline">View on L2</a>
</div>
'''
@app.patch("/ui/cache/{content_hash}/republish", response_class=HTMLResponse)
async def ui_republish_cache(content_hash: str, request: Request):
"""HTMX handler: re-publish (update) cache item on L2."""
ctx = await get_user_context_from_cookie(request)
if not ctx:
return '<div class="bg-red-900/50 border border-red-700 text-red-300 px-4 py-3 rounded-lg mb-4">Login required</div>'
token = request.cookies.get("auth_token")
if not token:
return '<div class="bg-red-900/50 border border-red-700 text-red-300 px-4 py-3 rounded-lg mb-4">Auth token required</div>'
# Check ownership
user_hashes = await get_user_cache_hashes(ctx.username, ctx.actor_id)
if content_hash not in user_hashes:
return '<div class="bg-red-900/50 border border-red-700 text-red-300 px-4 py-3 rounded-lg mb-4">Access denied</div>'
# Load metadata
meta = await database.load_item_metadata(content_hash, ctx.actor_id)
l2_shares = meta.get("l2_shares", [])
# Find share for current L2 server (user's L2)
l2_server = ctx.l2_server
current_share = None
share_index = -1
for i, share in enumerate(l2_shares):
if share.get("l2_server") == l2_server:
current_share = share
share_index = i
break
if not current_share:
return '<div class="bg-red-900/50 border border-red-700 text-red-300 px-4 py-3 rounded-lg mb-4">Item not published to this L2 yet</div>'
asset_name = current_share.get("asset_name")
if not asset_name:
return '<div class="bg-red-900/50 border border-red-700 text-red-300 px-4 py-3 rounded-lg mb-4">No asset name found</div>'
# Call L2 update
try:
resp = http_requests.patch(
f"{l2_server}/assets/{asset_name}",
headers={"Authorization": f"Bearer {token}"},
json={
"description": meta.get("description"),
"tags": meta.get("tags"),
"origin": meta.get("origin"),
"metadata": {
"filename": meta.get("filename"),
"folder": meta.get("folder"),
"collections": meta.get("collections", [])
}
},
timeout=30
)
resp.raise_for_status()
except http_requests.exceptions.HTTPError as e:
error_detail = ""
try:
error_detail = e.response.json().get("detail", str(e))
except Exception:
error_detail = str(e)
return f'<div class="bg-red-900/50 border border-red-700 text-red-300 px-4 py-3 rounded-lg mb-4">Error: {error_detail}</div>'
except Exception as e:
return f'<div class="bg-red-900/50 border border-red-700 text-red-300 px-4 py-3 rounded-lg mb-4">Error: {e}</div>'
# Update local metadata - save_l2_share updates last_synced_at on conflict
await database.save_l2_share(
content_hash=content_hash,
actor_id=ctx.actor_id,
l2_server=l2_server,
asset_name=asset_name,
content_type=current_share.get("content_type", "media")
)
return '<div class="bg-green-900/50 border border-green-700 text-green-300 px-4 py-3 rounded-lg mb-4">Updated on L2!</div>'
@app.get("/media")
async def list_media(
request: Request,
page: int = 1,
limit: int = 20,
folder: Optional[str] = None,
collection: Optional[str] = None,
tag: Optional[str] = None
):
"""List media items. HTML for browsers (with infinite scroll), JSON for APIs (with pagination)."""
ctx = await get_user_context_from_cookie(request)
if wants_html(request):
# Require login for HTML media view
if not ctx:
content = '<p class="text-gray-400 py-8 text-center"><a href="/login" class="text-blue-400 hover:text-blue-300">Login via L2</a> to see media.</p>'
return HTMLResponse(render_page("Media", content, None, active_tab="media"))
# Get hashes owned by/associated with this user
user_hashes = await get_user_cache_hashes(ctx.username, ctx.actor_id)
# Get cache items that belong to the user (from cache_manager)
cache_items = []
seen_hashes = set() # Deduplicate by content_hash
for cached_file in cache_manager.list_all():
content_hash = cached_file.content_hash
if content_hash not in user_hashes:
continue
# Skip duplicates (same content from multiple runs)
if content_hash in seen_hashes:
continue
seen_hashes.add(content_hash)
# Skip recipes - they have their own section
if cached_file.node_type == "recipe":
continue
meta = await database.load_item_metadata(content_hash, ctx.actor_id)
# Apply folder filter
if folder:
item_folder = meta.get("folder", "/")
if folder != "/" and not item_folder.startswith(folder):
continue
if folder == "/" and item_folder != "/":
continue
# Apply collection filter
if collection:
if collection not in meta.get("collections", []):
continue
# Apply tag filter
if tag:
if tag not in meta.get("tags", []):
continue
cache_items.append({
"hash": content_hash,
"size": cached_file.size_bytes,
"mtime": cached_file.created_at,
"meta": meta
})
# Sort by modification time (newest first)
cache_items.sort(key=lambda x: x["mtime"], reverse=True)
total = len(cache_items)
# Pagination
start = (page - 1) * limit
end = start + limit
items_page = cache_items[start:end]
has_more = end < total
if not items_page:
if page == 1:
filter_msg = ""
if folder:
filter_msg = f" in folder {folder}"
elif collection:
filter_msg = f" in collection '{collection}'"
elif tag:
filter_msg = f" with tag '{tag}'"
content = f'<p class="text-gray-400 py-8 text-center">No media{filter_msg}. Upload files or run effects to see them here.</p>'
else:
return HTMLResponse("") # Empty for infinite scroll
else:
html_parts = []
for item in items_page:
content_hash = item["hash"]
cache_path = get_cache_path(content_hash)
media_type = detect_media_type(cache_path) if cache_path else "unknown"
# Format size
size = item["size"]
if size > 1024*1024:
size_str = f"{size/(1024*1024):.1f} MB"
elif size > 1024:
size_str = f"{size/1024:.1f} KB"
else:
size_str = f"{size} bytes"
html_parts.append(f'''
<a href="/cache/{content_hash}" class="block">
<div class="bg-dark-700 rounded-lg p-4 hover:bg-dark-600 transition-colors">
<div class="flex items-center justify-between gap-2 mb-3">
<span class="px-2 py-1 bg-blue-600 text-white text-xs font-medium rounded-full">{media_type}</span>
<span class="text-xs text-gray-400">{size_str}</span>
</div>
<div class="text-xs text-gray-400 font-mono mb-3 truncate">{content_hash[:24]}...</div>
<div class="flex justify-center bg-dark-600 rounded-lg p-2">
''')
if media_type == "video":
video_src = video_src_for_request(content_hash, request)
html_parts.append(f'<video src="{video_src}" controls muted loop playsinline class="max-h-32 rounded"></video>')
elif media_type == "image":
html_parts.append(f'<img src="/cache/{content_hash}" alt="{content_hash[:16]}" class="max-h-32 rounded object-contain">')
else:
html_parts.append('<p class="text-gray-400 text-sm py-4">Unknown file type</p>')
html_parts.append('</div></div></a>')
# For infinite scroll, just return cards if not first page
if page > 1:
if has_more:
query_params = f"page={page + 1}"
if folder:
query_params += f"&folder={folder}"
if collection:
query_params += f"&collection={collection}"
if tag:
query_params += f"&tag={tag}"
html_parts.append(f'''
<div hx-get="/media?{query_params}" hx-trigger="revealed" hx-swap="afterend">
<p class="py-4 text-center text-gray-400">Loading more...</p>
</div>
''')
return HTMLResponse('\n'.join(html_parts))
# First page - full content
infinite_scroll_trigger = ""
if has_more:
query_params = "page=2"
if folder:
query_params += f"&folder={folder}"
if collection:
query_params += f"&collection={collection}"
if tag:
query_params += f"&tag={tag}"
infinite_scroll_trigger = f'''
<div hx-get="/media?{query_params}" hx-trigger="revealed" hx-swap="afterend">
<p class="py-4 text-center text-gray-400">Loading more...</p>
</div>
'''
content = f'''
<div class="flex items-center justify-between mb-6">
<h2 class="text-xl font-semibold text-white">Media ({total} items)</h2>
<div class="flex items-center gap-3">
<div id="sync-result"></div>
<button hx-post="/ui/sync-l2" hx-target="#sync-result" hx-swap="innerHTML"
class="px-3 py-1.5 bg-blue-600 hover:bg-blue-700 text-white text-sm font-medium rounded-lg transition-colors">
Sync with L2
</button>
</div>
</div>
<div class="grid gap-4 sm:grid-cols-2 lg:grid-cols-3">
{''.join(html_parts)}
{infinite_scroll_trigger}
</div>
'''
return HTMLResponse(render_page("Media", content, ctx.actor_id, active_tab="media"))
# JSON response for APIs - list all hashes with optional pagination
all_hashes = [cf.content_hash for cf in cache_manager.list_all()]
total = len(all_hashes)
start = (page - 1) * limit
end = start + limit
hashes_page = all_hashes[start:end]
has_more = end < total
return {
"hashes": hashes_page,
"pagination": {
"page": page,
"limit": limit,
"total": total,
"has_more": has_more
}
}
@app.delete("/cache/{content_hash}")
async def discard_cache(content_hash: str, ctx: UserContext = Depends(get_required_user_context)):
"""
Discard (delete) a cached item.
Enforces deletion rules:
- Cannot delete items published to L2 (shared)
- Cannot delete inputs/outputs of activities (runs)
- Cannot delete pinned items
"""
# Check if content exists
if not cache_manager.has_content(content_hash):
raise HTTPException(404, "Content not found")
# Check ownership
user_hashes = await get_user_cache_hashes(ctx.username, ctx.actor_id)
if content_hash not in user_hashes:
raise HTTPException(403, "Access denied")
# Check if pinned
meta = await database.load_item_metadata(content_hash, ctx.actor_id)
if meta.get("pinned"):
pin_reason = meta.get("pin_reason", "unknown")
raise HTTPException(400, f"Cannot discard pinned item (reason: {pin_reason})")
# Check if used by any run (Redis runs, not just activity store)
runs_using = await asyncio.to_thread(find_runs_using_content, content_hash)
if runs_using:
run, role = runs_using[0]
raise HTTPException(400, f"Cannot discard: item is {role} of run {run.run_id}")
# Check deletion rules via cache_manager (L2 shared status, activity store)
can_delete, reason = await asyncio.to_thread(cache_manager.can_delete, content_hash)
if not can_delete:
raise HTTPException(400, f"Cannot discard: {reason}")
# Delete via cache_manager
success, msg = await asyncio.to_thread(cache_manager.delete_by_content_hash, content_hash)
if not success:
# Fallback to legacy deletion
cache_path = get_cache_path(content_hash)
if cache_path and cache_path.exists():
cache_path.unlink()
# Clean up legacy metadata files
meta_path = CACHE_DIR / f"{content_hash}.meta.json"
if meta_path.exists():
meta_path.unlink()
mp4_path = CACHE_DIR / f"{content_hash}.mp4"
if mp4_path.exists():
mp4_path.unlink()
return {"discarded": True, "content_hash": content_hash}
@app.delete("/ui/cache/{content_hash}/discard", response_class=HTMLResponse)
async def ui_discard_cache(content_hash: str, request: Request):
"""HTMX handler: discard a cached item."""
ctx = await get_user_context_from_cookie(request)
if not ctx:
return '<div class="bg-red-900/50 border border-red-700 text-red-300 px-4 py-3 rounded-lg mb-4">Login required</div>'
# Check ownership
user_hashes = await get_user_cache_hashes(ctx.username, ctx.actor_id)
if content_hash not in user_hashes:
return '<div class="bg-red-900/50 border border-red-700 text-red-300 px-4 py-3 rounded-lg mb-4">Access denied</div>'
# Check if content exists
has_content = await asyncio.to_thread(cache_manager.has_content, content_hash)
if not has_content:
return '<div class="bg-red-900/50 border border-red-700 text-red-300 px-4 py-3 rounded-lg mb-4">Content not found</div>'
# Check if pinned
meta = await database.load_item_metadata(content_hash, ctx.actor_id)
if meta.get("pinned"):
pin_reason = meta.get("pin_reason", "unknown")
return f'<div class="bg-red-900/50 border border-red-700 text-red-300 px-4 py-3 rounded-lg mb-4">Cannot discard: item is pinned ({pin_reason})</div>'
# Check if used by any run (Redis runs, not just activity store)
runs_using = await asyncio.to_thread(find_runs_using_content, content_hash)
if runs_using:
run, role = runs_using[0]
return f'<div class="bg-red-900/50 border border-red-700 text-red-300 px-4 py-3 rounded-lg mb-4">Cannot discard: item is {role} of run {run.run_id}</div>'
# Check deletion rules via cache_manager (L2 shared status, activity store)
can_delete, reason = await asyncio.to_thread(cache_manager.can_delete, content_hash)
if not can_delete:
return f'<div class="bg-red-900/50 border border-red-700 text-red-300 px-4 py-3 rounded-lg mb-4">Cannot discard: {reason}</div>'
# Delete via cache_manager
success, msg = await asyncio.to_thread(cache_manager.delete_by_content_hash, content_hash)
if not success:
# Fallback to legacy deletion
cache_path = get_cache_path(content_hash)
if cache_path and cache_path.exists():
cache_path.unlink()
# Clean up legacy metadata files
meta_path = CACHE_DIR / f"{content_hash}.meta.json"
if meta_path.exists():
meta_path.unlink()
mp4_path = CACHE_DIR / f"{content_hash}.mp4"
if mp4_path.exists():
mp4_path.unlink()
return '''
<div class="bg-green-900/50 border border-green-700 text-green-300 px-4 py-3 rounded-lg mb-4">
Item discarded. <a href="/media" class="underline">Back to media</a>
</div>
'''
# Known assets (bootstrap data)
KNOWN_ASSETS = {
"cat": "33268b6e167deaf018cc538de12dbe562612b33e89a749391cef855b320a269b",
}
@app.get("/assets")
async def list_assets():
"""List known assets."""
return KNOWN_ASSETS
@app.post("/cache/import")
async def import_to_cache(path: str):
"""Import a local file to cache."""
source = Path(path)
if not source.exists():
raise HTTPException(404, f"File not found: {path}")
content_hash = await cache_file(source)
return {"content_hash": content_hash, "cached": True}
def save_cache_meta(content_hash: str, uploader: str = None, filename: str = None, **updates):
"""Save or update metadata for a cached file."""
meta_path = CACHE_DIR / f"{content_hash}.meta.json"
# Load existing or create new
if meta_path.exists():
with open(meta_path) as f:
meta = json.load(f)
else:
meta = {
"uploader": uploader,
"uploaded_at": datetime.now(timezone.utc).isoformat(),
"filename": filename
}
# Apply updates (but never change uploader or uploaded_at)
for key, value in updates.items():
if key not in ("uploader", "uploaded_at"):
meta[key] = value
with open(meta_path, "w") as f:
json.dump(meta, f, indent=2)
return meta
def load_cache_meta(content_hash: str) -> dict:
"""Load metadata for a cached file."""
meta_path = CACHE_DIR / f"{content_hash}.meta.json"
if meta_path.exists():
with open(meta_path) as f:
return json.load(f)
return {}
# User data storage (folders, collections)
USER_DATA_DIR = CACHE_DIR / ".user-data"
def load_user_data(username: str) -> dict:
"""Load user's folders and collections."""
USER_DATA_DIR.mkdir(parents=True, exist_ok=True)
# Normalize username (remove @ prefix if present)
safe_name = username.replace("@", "").replace("/", "_")
user_file = USER_DATA_DIR / f"{safe_name}.json"
if user_file.exists():
with open(user_file) as f:
return json.load(f)
return {"folders": ["/"], "collections": []}
def save_user_data(username: str, data: dict):
"""Save user's folders and collections."""
USER_DATA_DIR.mkdir(parents=True, exist_ok=True)
safe_name = username.replace("@", "").replace("/", "_")
user_file = USER_DATA_DIR / f"{safe_name}.json"
with open(user_file, "w") as f:
json.dump(data, f, indent=2)
async def get_user_cache_hashes(username: str, actor_id: Optional[str] = None) -> set:
"""Get all cache hashes owned by or associated with a user.
username: The plain username
actor_id: The full actor ID (@user@server), if available
"""
# Match against both formats for backwards compatibility
match_values = [username]
if actor_id:
match_values.append(actor_id)
hashes = set()
# Query database for items owned by user (new system)
if actor_id:
try:
db_items = await database.get_user_items(actor_id)
for item in db_items:
hashes.add(item["content_hash"])
except Exception:
pass # Database may not be initialized
# Legacy: Files uploaded by user (JSON metadata)
if CACHE_DIR.exists():
for f in CACHE_DIR.iterdir():
if f.name.endswith('.meta.json'):
try:
meta_path = CACHE_DIR / f.name
if meta_path.exists():
import json
with open(meta_path, 'r') as mf:
meta = json.load(mf)
if meta.get("uploader") in match_values:
hashes.add(f.name.replace('.meta.json', ''))
except Exception:
pass
# Files from user's runs (inputs and outputs)
for run in list_all_runs():
if run.username in match_values:
hashes.update(run.inputs)
if run.output_hash:
hashes.add(run.output_hash)
return hashes
@app.post("/cache/upload")
async def upload_to_cache(file: UploadFile = File(...), ctx: UserContext = Depends(get_required_user_context)):
"""Upload a file to cache. Requires authentication."""
# Write to temp file first
import tempfile
with tempfile.NamedTemporaryFile(delete=False) as tmp:
content = await file.read()
tmp.write(content)
tmp_path = Path(tmp.name)
# Store in cache via cache_manager
cached, ipfs_cid = cache_manager.put(tmp_path, node_type="upload", move=True)
content_hash = cached.content_hash
# Save to cache_items table (with IPFS CID)
await database.create_cache_item(content_hash, ipfs_cid)
# Save uploader metadata to database
await database.save_item_metadata(
content_hash=content_hash,
actor_id=ctx.actor_id,
item_type="media",
filename=file.filename
)
return {"content_hash": content_hash, "filename": file.filename, "size": len(content)}
class CacheMetaUpdate(BaseModel):
"""Request to update cache metadata."""
origin: Optional[dict] = None # {"type": "self"|"external", "url": "...", "note": "..."}
description: Optional[str] = None
tags: Optional[list[str]] = None
folder: Optional[str] = None
collections: Optional[list[str]] = None
class PublishRequest(BaseModel):
"""Request to publish a cache item to L2."""
asset_name: str
asset_type: str = "image" # image, video, etc.
@app.get("/cache/{content_hash}/meta")
async def get_cache_meta(content_hash: str, ctx: UserContext = Depends(get_required_user_context)):
"""Get metadata for a cached file."""
# Check file exists
cache_path = get_cache_path(content_hash)
if not cache_path:
raise HTTPException(404, "Content not found")
# Check ownership
user_hashes = await get_user_cache_hashes(ctx.username, ctx.actor_id)
if content_hash not in user_hashes:
raise HTTPException(403, "Access denied")
return await database.load_item_metadata(content_hash, ctx.actor_id)
@app.patch("/cache/{content_hash}/meta")
async def update_cache_meta(content_hash: str, update: CacheMetaUpdate, ctx: UserContext = Depends(get_required_user_context)):
"""Update metadata for a cached file."""
# Check file exists
cache_path = get_cache_path(content_hash)
if not cache_path:
raise HTTPException(404, "Content not found")
# Check ownership
user_hashes = await get_user_cache_hashes(ctx.username, ctx.actor_id)
if content_hash not in user_hashes:
raise HTTPException(403, "Access denied")
# Build update dict from non-None fields
updates = {}
if update.origin is not None:
updates["origin"] = update.origin
if update.description is not None:
updates["description"] = update.description
if update.tags is not None:
updates["tags"] = update.tags
if update.folder is not None:
# Ensure folder exists in user's folder list
user_data = load_user_data(ctx.username)
if update.folder not in user_data["folders"]:
raise HTTPException(400, f"Folder does not exist: {update.folder}")
updates["folder"] = update.folder
if update.collections is not None:
# Validate collections exist
user_data = load_user_data(ctx.username)
existing = {c["name"] for c in user_data["collections"]}
for col in update.collections:
if col not in existing:
raise HTTPException(400, f"Collection does not exist: {col}")
updates["collections"] = update.collections
meta = await database.update_item_metadata(content_hash, ctx.actor_id, **updates)
return meta
@app.post("/cache/{content_hash}/publish")
async def publish_cache_to_l2(
content_hash: str,
req: PublishRequest,
request: Request,
ctx: UserContext = Depends(get_required_user_context)
):
"""
Publish a cache item to L2 (ActivityPub).
Requires origin to be set in metadata before publishing.
"""
# Check file exists
cache_path = get_cache_path(content_hash)
if not cache_path:
raise HTTPException(404, "Content not found")
# Check ownership
user_hashes = await get_user_cache_hashes(ctx.username, ctx.actor_id)
if content_hash not in user_hashes:
raise HTTPException(403, "Access denied")
# Load metadata
meta = await database.load_item_metadata(content_hash, ctx.actor_id)
# Check origin is set
origin = meta.get("origin")
if not origin or "type" not in origin:
raise HTTPException(400, "Origin must be set before publishing. Use --origin self or --origin-url <url>")
# Get IPFS CID from cache item
cache_item = await database.get_cache_item(content_hash)
ipfs_cid = cache_item.get("ipfs_cid") if cache_item else None
# Get auth token to pass to L2
token = request.cookies.get("auth_token")
if not token:
# Try from header
auth_header = request.headers.get("Authorization", "")
if auth_header.startswith("Bearer "):
token = auth_header[7:]
if not token:
raise HTTPException(401, "Authentication token required")
# Call L2 publish-cache endpoint (use user's L2 server)
l2_server = ctx.l2_server
try:
resp = http_requests.post(
f"{l2_server}/assets/publish-cache",
headers={"Authorization": f"Bearer {token}"},
json={
"content_hash": content_hash,
"ipfs_cid": ipfs_cid,
"asset_name": req.asset_name,
"asset_type": req.asset_type,
"origin": origin,
"description": meta.get("description"),
"tags": meta.get("tags", []),
"metadata": {
"filename": meta.get("filename"),
"folder": meta.get("folder"),
"collections": meta.get("collections", [])
}
},
timeout=10
)
resp.raise_for_status()
l2_result = resp.json()
except http_requests.exceptions.HTTPError as e:
error_detail = ""
try:
error_detail = e.response.json().get("detail", str(e))
except Exception:
error_detail = str(e)
raise HTTPException(400, f"L2 publish failed: {error_detail}")
except Exception as e:
raise HTTPException(500, f"L2 publish failed: {e}")
# Update local metadata with publish status and pin
await database.save_l2_share(
content_hash=content_hash,
actor_id=ctx.actor_id,
l2_server=l2_server,
asset_name=req.asset_name,
content_type=req.asset_type
)
await database.update_item_metadata(
content_hash=content_hash,
actor_id=ctx.actor_id,
pinned=True,
pin_reason="published"
)
return {
"published": True,
"asset_name": req.asset_name,
"l2_result": l2_result
}
@app.patch("/cache/{content_hash}/republish")
async def republish_cache_to_l2(
content_hash: str,
request: Request,
ctx: UserContext = Depends(get_required_user_context)
):
"""
Re-publish (update) a cache item on L2 after metadata changes.
Only works for already-published items.
"""
# Check file exists
cache_path = get_cache_path(content_hash)
if not cache_path:
raise HTTPException(404, "Content not found")
# Check ownership
user_hashes = await get_user_cache_hashes(ctx.username, ctx.actor_id)
if content_hash not in user_hashes:
raise HTTPException(403, "Access denied")
# Load metadata
meta = await database.load_item_metadata(content_hash, ctx.actor_id)
l2_shares = meta.get("l2_shares", [])
# Find share for current L2 server (user's L2)
l2_server = ctx.l2_server
current_share = None
for share in l2_shares:
if share.get("l2_server") == l2_server:
current_share = share
break
if not current_share:
raise HTTPException(400, "Item not published yet. Use publish first.")
asset_name = current_share.get("asset_name")
if not asset_name:
raise HTTPException(400, "No asset name found in publish info")
# Get auth token
token = request.cookies.get("auth_token")
if not token:
auth_header = request.headers.get("Authorization", "")
if auth_header.startswith("Bearer "):
token = auth_header[7:]
if not token:
raise HTTPException(401, "Authentication token required")
# Get IPFS CID from cache item
cache_item = await database.get_cache_item(content_hash)
ipfs_cid = cache_item.get("ipfs_cid") if cache_item else None
# Call L2 update endpoint (use user's L2 server)
l2_server = ctx.l2_server
try:
resp = http_requests.patch(
f"{l2_server}/assets/{asset_name}",
headers={"Authorization": f"Bearer {token}"},
json={
"description": meta.get("description"),
"tags": meta.get("tags"),
"origin": meta.get("origin"),
"ipfs_cid": ipfs_cid,
"metadata": {
"filename": meta.get("filename"),
"folder": meta.get("folder"),
"collections": meta.get("collections", [])
}
},
timeout=10
)
resp.raise_for_status()
l2_result = resp.json()
except http_requests.exceptions.HTTPError as e:
error_detail = ""
try:
error_detail = e.response.json().get("detail", str(e))
except Exception:
error_detail = str(e)
raise HTTPException(400, f"L2 update failed: {error_detail}")
except Exception as e:
raise HTTPException(500, f"L2 update failed: {e}")
# Update local metadata - save_l2_share updates last_synced_at on conflict
await database.save_l2_share(
content_hash=content_hash,
actor_id=ctx.actor_id,
l2_server=l2_server,
asset_name=asset_name,
content_type=current_share.get("content_type", "media")
)
return {
"updated": True,
"asset_name": asset_name,
"l2_result": l2_result
}
# ============ L2 Sync ============
def _fetch_l2_outbox_sync(l2_server: str, username: str) -> list:
"""Fetch user's outbox from L2 (sync version for asyncio.to_thread)."""
try:
# Fetch outbox page with activities
resp = http_requests.get(
f"{l2_server}/users/{username}/outbox?page=true",
headers={"Accept": "application/activity+json"},
timeout=10
)
if resp.status_code != 200:
logger.warning(f"L2 outbox fetch failed: {resp.status_code}")
return []
data = resp.json()
return data.get("orderedItems", [])
except Exception as e:
logger.error(f"Failed to fetch L2 outbox: {e}")
return []
@app.post("/user/sync-l2")
async def sync_with_l2(ctx: UserContext = Depends(get_required_user_context)):
"""
Sync local L2 share records with user's L2 outbox.
Fetches user's published assets from their L2 server and updates local tracking.
"""
l2_server = ctx.l2_server
username = ctx.username
# Fetch outbox activities
activities = await asyncio.to_thread(_fetch_l2_outbox_sync, l2_server, username)
if not activities:
return {"synced": 0, "message": "No activities found or L2 unavailable"}
# Process Create activities for assets
synced_count = 0
for activity in activities:
if activity.get("type") != "Create":
continue
obj = activity.get("object", {})
if not isinstance(obj, dict):
continue
# Get asset info - look for content_hash in attachment or directly
content_hash = None
asset_name = obj.get("name", "")
# Check attachments for content hash
for attachment in obj.get("attachment", []):
if attachment.get("name") == "content_hash":
content_hash = attachment.get("value")
break
# Also check if there's a hash in the object URL or ID
if not content_hash:
# Try to extract from object ID like /objects/{hash}
obj_id = obj.get("id", "")
if "/objects/" in obj_id:
content_hash = obj_id.split("/objects/")[-1].split("/")[0]
if not content_hash or not asset_name:
continue
# Check if we have this content locally
cache_path = get_cache_path(content_hash)
if not cache_path:
continue # We don't have this content, skip
# Determine content type from object type
obj_type = obj.get("type", "")
if obj_type == "Video":
content_type = "video"
elif obj_type == "Image":
content_type = "image"
else:
content_type = "media"
# Update local L2 share record
await database.save_l2_share(
content_hash=content_hash,
actor_id=ctx.actor_id,
l2_server=l2_server,
asset_name=asset_name,
content_type=content_type
)
synced_count += 1
return {"synced": synced_count, "total_activities": len(activities)}
@app.post("/ui/sync-l2", response_class=HTMLResponse)
async def ui_sync_with_l2(request: Request):
"""HTMX handler: sync with L2 server."""
ctx = await get_user_context_from_cookie(request)
if not ctx:
return '<div class="bg-red-900/50 border border-red-700 text-red-300 px-4 py-3 rounded-lg">Login required</div>'
try:
result = await sync_with_l2(ctx)
synced = result.get("synced", 0)
total = result.get("total_activities", 0)
if synced > 0:
return f'''
<div class="bg-green-900/50 border border-green-700 text-green-300 px-4 py-3 rounded-lg">
Synced {synced} asset(s) from L2 ({total} activities found)
</div>
'''
else:
return f'''
<div class="bg-yellow-900/50 border border-yellow-700 text-yellow-300 px-4 py-3 rounded-lg">
No new assets to sync ({total} activities found)
</div>
'''
except Exception as e:
logger.error(f"L2 sync failed: {e}")
return f'''
<div class="bg-red-900/50 border border-red-700 text-red-300 px-4 py-3 rounded-lg">
Sync failed: {str(e)}
</div>
'''
# ============ Folder & Collection Management ============
@app.get("/user/folders")
async def list_folders(username: str = Depends(get_required_user)):
"""List user's folders."""
user_data = load_user_data(username)
return {"folders": user_data["folders"]}
@app.post("/user/folders")
async def create_folder(folder_path: str, username: str = Depends(get_required_user)):
"""Create a new folder."""
user_data = load_user_data(username)
# Validate path format
if not folder_path.startswith("/"):
raise HTTPException(400, "Folder path must start with /")
# Check parent exists
parent = "/".join(folder_path.rsplit("/", 1)[:-1]) or "/"
if parent != "/" and parent not in user_data["folders"]:
raise HTTPException(400, f"Parent folder does not exist: {parent}")
# Check doesn't already exist
if folder_path in user_data["folders"]:
raise HTTPException(400, f"Folder already exists: {folder_path}")
user_data["folders"].append(folder_path)
user_data["folders"].sort()
save_user_data(username, user_data)
return {"folder": folder_path, "created": True}
@app.delete("/user/folders")
async def delete_folder(folder_path: str, ctx: UserContext = Depends(get_required_user_context)):
"""Delete a folder (must be empty)."""
if folder_path == "/":
raise HTTPException(400, "Cannot delete root folder")
user_data = load_user_data(ctx.username)
if folder_path not in user_data["folders"]:
raise HTTPException(404, "Folder not found")
# Check no subfolders
for f in user_data["folders"]:
if f.startswith(folder_path + "/"):
raise HTTPException(400, f"Folder has subfolders: {f}")
# Check no items in folder
user_hashes = await get_user_cache_hashes(ctx.username, ctx.actor_id)
for h in user_hashes:
meta = await database.load_item_metadata(h, ctx.actor_id)
if meta.get("folder") == folder_path:
raise HTTPException(400, "Folder is not empty")
user_data["folders"].remove(folder_path)
save_user_data(ctx.username, user_data)
return {"folder": folder_path, "deleted": True}
@app.get("/user/collections")
async def list_collections(username: str = Depends(get_required_user)):
"""List user's collections."""
user_data = load_user_data(username)
return {"collections": user_data["collections"]}
@app.post("/user/collections")
async def create_collection(name: str, username: str = Depends(get_required_user)):
"""Create a new collection."""
user_data = load_user_data(username)
# Check doesn't already exist
for col in user_data["collections"]:
if col["name"] == name:
raise HTTPException(400, f"Collection already exists: {name}")
user_data["collections"].append({
"name": name,
"created_at": datetime.now(timezone.utc).isoformat()
})
save_user_data(username, user_data)
return {"collection": name, "created": True}
@app.delete("/user/collections")
async def delete_collection(name: str, ctx: UserContext = Depends(get_required_user_context)):
"""Delete a collection."""
user_data = load_user_data(ctx.username)
# Find and remove
for i, col in enumerate(user_data["collections"]):
if col["name"] == name:
user_data["collections"].pop(i)
save_user_data(ctx.username, user_data)
# Remove from all cache items
user_hashes = await get_user_cache_hashes(ctx.username, ctx.actor_id)
for h in user_hashes:
meta = await database.load_item_metadata(h, ctx.actor_id)
if name in meta.get("collections", []):
new_collections = [c for c in meta.get("collections", []) if c != name]
await database.update_item_metadata(h, ctx.actor_id, collections=new_collections)
return {"collection": name, "deleted": True}
raise HTTPException(404, "Collection not found")
def is_ios_request(request: Request) -> bool:
"""Check if request is from iOS device."""
ua = request.headers.get("user-agent", "").lower()
return "iphone" in ua or "ipad" in ua
def video_src_for_request(content_hash: str, request: Request) -> str:
"""Get video src URL, using MP4 endpoint for iOS."""
if is_ios_request(request):
return f"/cache/{content_hash}/mp4"
return f"/cache/{content_hash}"
def detect_media_type(cache_path: Path) -> str:
"""Detect if file is image or video based on magic bytes."""
with open(cache_path, "rb") as f:
header = f.read(32)
# Video signatures
if header[:4] == b'\x1a\x45\xdf\xa3': # WebM/MKV
return "video"
if header[4:8] == b'ftyp': # MP4/MOV
return "video"
if header[:4] == b'RIFF' and header[8:12] == b'AVI ': # AVI
return "video"
# Image signatures
if header[:8] == b'\x89PNG\r\n\x1a\n': # PNG
return "image"
if header[:2] == b'\xff\xd8': # JPEG
return "image"
if header[:6] in (b'GIF87a', b'GIF89a'): # GIF
return "image"
if header[:4] == b'RIFF' and header[8:12] == b'WEBP': # WebP
return "image"
return "unknown"
async def get_user_context_from_cookie(request) -> Optional[UserContext]:
"""Get user context from auth cookie. Returns full context with actor_id and l2_server."""
token = request.cookies.get("auth_token")
if not token:
return None
return await get_verified_user_context(token)
async def get_user_from_cookie(request) -> Optional[str]:
"""Get username from auth cookie (backwards compat - prefer get_user_context_from_cookie)."""
ctx = await get_user_context_from_cookie(request)
return ctx.username if ctx else None
def wants_html(request: Request) -> bool:
"""Check if request wants HTML (browser) vs JSON (API)."""
accept = request.headers.get("accept", "")
# Check for explicit HTML request
if "text/html" in accept and "application/json" not in accept:
return True
# Check for browser navigation (direct URL access)
fetch_mode = request.headers.get("sec-fetch-mode", "")
if fetch_mode == "navigate":
return True
return False
# Tailwind CSS config for all L1 templates
TAILWIND_CONFIG = '''
<script src="https://cdn.tailwindcss.com"></script>
<script>
tailwind.config = {
darkMode: 'class',
theme: {
extend: {
colors: {
dark: { 900: '#0a0a0a', 800: '#111', 700: '#1a1a1a', 600: '#222', 500: '#333' }
}
}
}
}
</script>
<script src="https://unpkg.com/htmx.org@1.9.10"></script>
'''
def render_page(title: str, content: str, actor_id: Optional[str] = None, active_tab: str = None) -> str:
"""Render a page with nav bar and content. Used for clean URL pages.
actor_id: The user's actor ID (@user@server) or None if not logged in.
"""
user_info = ""
if actor_id:
# Extract username and domain from @username@domain format
parts = actor_id.lstrip("@").split("@")
username = parts[0] if parts else actor_id
domain = parts[1] if len(parts) > 1 else ""
l2_user_url = f"https://{domain}/users/{username}" if domain else "#"
user_info = f'''
<div class="flex items-center gap-4 text-sm text-gray-400">
Logged in as <a href="{l2_user_url}" class="text-white hover:text-blue-300">{actor_id}</a>
</div>
'''
else:
user_info = '''
<div class="text-sm">
<a href="/login" class="text-blue-400 hover:text-blue-300">Login via L2</a>
</div>
'''
runs_active = "border-b-2 border-blue-500 text-white" if active_tab == "runs" else "text-gray-400 hover:text-white"
recipes_active = "border-b-2 border-blue-500 text-white" if active_tab == "recipes" else "text-gray-400 hover:text-white"
media_active = "border-b-2 border-blue-500 text-white" if active_tab == "media" else "text-gray-400 hover:text-white"
return f"""
<!DOCTYPE html>
<html class="dark">
<head>
<meta charset="UTF-8">
<meta name="viewport" content="width=device-width, initial-scale=1.0">
<title>{title} | Art DAG L1 Server</title>
{TAILWIND_CONFIG}
</head>
<body class="bg-dark-900 text-gray-100 min-h-screen">
<div class="max-w-6xl mx-auto px-4 py-6 sm:px-6 lg:px-8">
<header class="flex flex-wrap items-center justify-between gap-4 mb-6">
<h1 class="text-2xl font-bold">
<a href="/" class="text-white hover:text-gray-200">Art DAG L1 Server</a>
</h1>
{user_info}
</header>
<nav class="flex gap-6 mb-6 border-b border-dark-500 pb-0">
<a href="/runs" class="pb-3 px-1 font-medium transition-colors {runs_active}">Runs</a>
<a href="/recipes" class="pb-3 px-1 font-medium transition-colors {recipes_active}">Recipes</a>
<a href="/media" class="pb-3 px-1 font-medium transition-colors {media_active}">Media</a>
</nav>
<main>
{content}
</main>
</div>
</body>
</html>
"""
def render_ui_html(actor_id: Optional[str] = None, tab: str = "runs") -> str:
"""Render main UI HTML with optional user context.
actor_id: The user's actor ID (@user@server) or None if not logged in.
"""
user_info = ""
if actor_id:
# Extract username and domain from @username@domain format
parts = actor_id.lstrip("@").split("@")
username = parts[0] if parts else actor_id
domain = parts[1] if len(parts) > 1 else ""
l2_user_url = f"https://{domain}/users/{username}" if domain else "#"
user_info = f'''
<div class="flex items-center gap-4 text-sm text-gray-400">
Logged in as <a href="{l2_user_url}" class="text-white hover:text-blue-300">{actor_id}</a>
</div>
'''
else:
user_info = '''
<div class="text-sm">
<a href="/login" class="text-blue-400 hover:text-blue-300">Login via L2</a>
</div>
'''
runs_active = "border-b-2 border-blue-500 text-white" if tab == "runs" else "text-gray-400 hover:text-white"
recipes_active = "border-b-2 border-blue-500 text-white" if tab == "recipes" else "text-gray-400 hover:text-white"
media_active = "border-b-2 border-blue-500 text-white" if tab == "media" else "text-gray-400 hover:text-white"
if tab == "runs":
content_url = "/ui/runs"
elif tab == "recipes":
content_url = "/ui/recipes-list"
else:
content_url = "/ui/media-list"
return f"""
<!DOCTYPE html>
<html class="dark">
<head>
<meta charset="UTF-8">
<meta name="viewport" content="width=device-width, initial-scale=1.0">
<title>Art DAG L1 Server</title>
{TAILWIND_CONFIG}
</head>
<body class="bg-dark-900 text-gray-100 min-h-screen">
<div class="max-w-6xl mx-auto px-4 py-6 sm:px-6 lg:px-8">
<header class="flex flex-wrap items-center justify-between gap-4 mb-6">
<h1 class="text-2xl font-bold">
<a href="/runs" class="text-white hover:text-gray-200">Art DAG L1 Server</a>
</h1>
{user_info}
</header>
<nav class="flex gap-6 mb-6 border-b border-dark-500 pb-0">
<a href="/runs" class="pb-3 px-1 font-medium transition-colors {runs_active}">Runs</a>
<a href="/recipes" class="pb-3 px-1 font-medium transition-colors {recipes_active}">Recipes</a>
<a href="/media" class="pb-3 px-1 font-medium transition-colors {media_active}">Media</a>
</nav>
<div id="content" hx-get="{content_url}" hx-trigger="load" hx-swap="innerHTML">
<div class="flex items-center justify-center py-12">
<div class="animate-pulse text-gray-400">Loading...</div>
</div>
</div>
</div>
</body>
</html>
"""
# Auth routes - L1 never handles credentials, redirects to L2
# L2 sets auth_token cookie with domain=.rose-ash.com for shared auth
L1_PUBLIC_URL = os.environ.get("L1_PUBLIC_URL", "http://localhost:8100")
@app.get("/login")
async def login_page():
"""Redirect to L2 server for login. L1 never handles credentials."""
# Redirect to default L2 with return URL so L2 can redirect back after login
return_url = f"{L1_PUBLIC_URL}/runs"
return RedirectResponse(url=f"{DEFAULT_L2_SERVER}/login?return_to={return_url}", status_code=302)
@app.get("/register")
async def register_page():
"""Redirect to L2 server for registration. L1 never handles credentials."""
return_url = f"{L1_PUBLIC_URL}/runs"
return RedirectResponse(url=f"{DEFAULT_L2_SERVER}/register?return_to={return_url}", status_code=302)
@app.get("/logout")
async def logout():
"""Logout - clear cookie and redirect to L2 logout."""
# Clear local cookie and redirect to L2 to clear shared cookie
response = RedirectResponse(url=f"{DEFAULT_L2_SERVER}/logout?return_to={L1_PUBLIC_URL}/", status_code=302)
response.delete_cookie("auth_token")
return response
@app.get("/ui")
async def ui_index(tab: str = "runs"):
"""Redirect /ui to clean URLs."""
if tab == "cache":
return RedirectResponse(url="/media", status_code=302)
return RedirectResponse(url="/runs", status_code=302)
@app.get("/ui/login")
async def ui_login_page():
"""Redirect to L2 login."""
return RedirectResponse(url="/login", status_code=302)
@app.get("/ui/register")
async def ui_register_page():
"""Redirect to L2 register."""
return RedirectResponse(url="/register", status_code=302)
@app.get("/ui/logout")
async def ui_logout():
"""Redirect to logout."""
return RedirectResponse(url="/logout", status_code=302)
@app.post("/ui/publish-run/{run_id}", response_class=HTMLResponse)
async def ui_publish_run(run_id: str, request: Request, output_name: str = Form(...)):
"""Publish a run to L2 from the web UI."""
ctx = await get_user_context_from_cookie(request)
if not ctx:
return HTMLResponse('<div class="error">Not logged in</div>')
token = request.cookies.get("auth_token")
if not token:
return HTMLResponse('<div class="error">Not logged in</div>')
# Get the run to pin its output and inputs
run = load_run(run_id)
if not run:
return HTMLResponse('<div class="bg-red-900/50 border border-red-700 text-red-300 px-4 py-3 rounded-lg">Run not found</div>')
# Call L2 to publish the run, including this L1's public URL
# Longer timeout because L2 calls back to L1 to fetch run details
l2_server = ctx.l2_server
try:
resp = http_requests.post(
f"{l2_server}/assets/record-run",
json={"run_id": run_id, "output_name": output_name, "l1_server": L1_PUBLIC_URL},
headers={"Authorization": f"Bearer {token}"},
timeout=30
)
if resp.status_code == 400:
error = resp.json().get("detail", "Bad request")
return HTMLResponse(f'<div class="error">Error: {error}</div>')
resp.raise_for_status()
result = resp.json()
# Pin the output and record L2 share
if run.output_hash:
await database.update_item_metadata(run.output_hash, ctx.actor_id, pinned=True, pin_reason="published")
# Record L2 share so UI shows published status
cache_path = get_cache_path(run.output_hash)
media_type = detect_media_type(cache_path) if cache_path else "image"
content_type = "video" if media_type == "video" else "image"
await database.save_l2_share(
content_hash=run.output_hash,
actor_id=ctx.actor_id,
l2_server=l2_server,
asset_name=result["asset"]["name"],
content_type=content_type
)
# Pin the inputs (for provenance)
for input_hash in run.inputs:
await database.update_item_metadata(input_hash, ctx.actor_id, pinned=True, pin_reason="input_to_published")
# If this was a recipe-based run, pin the recipe and its fixed inputs
if run.recipe.startswith("recipe:"):
config_name = run.recipe.replace("recipe:", "")
for recipe in list_all_recipes():
if recipe.name == config_name:
# Pin the recipe YAML
cache_manager.pin(recipe.recipe_id, reason="recipe_for_published")
# Pin all fixed inputs referenced by the recipe
for fixed in recipe.fixed_inputs:
if fixed.content_hash:
cache_manager.pin(fixed.content_hash, reason="fixed_input_in_published_recipe")
break
# Use HTTPS for L2 links
l2_https = l2_server.replace("http://", "https://")
return HTMLResponse(f'''
<div class="bg-green-900/50 border border-green-700 text-green-300 px-4 py-3 rounded-lg mb-4">
Published to L2 as <strong>{result["asset"]["name"]}</strong>!
<a href="{l2_https}/ui/asset/{result["asset"]["name"]}" target="_blank" class="underline">View on L2</a>
</div>
''')
except http_requests.exceptions.HTTPError as e:
error_detail = ""
try:
error_detail = e.response.json().get("detail", str(e))
except Exception:
error_detail = str(e)
return HTMLResponse(f'<div class="bg-red-900/50 border border-red-700 text-red-300 px-4 py-3 rounded-lg">Error: {error_detail}</div>')
except Exception as e:
return HTMLResponse(f'<div class="bg-red-900/50 border border-red-700 text-red-300 px-4 py-3 rounded-lg">Error: {e}</div>')
@app.get("/ui/runs", response_class=HTMLResponse)
async def ui_runs(request: Request):
"""HTMX partial: list of runs."""
ctx = await get_user_context_from_cookie(request)
runs = list_all_runs()
# Require login to see runs
if not ctx:
return '<p class="text-gray-400 py-8 text-center"><a href="/ui/login" class="text-blue-400 hover:text-blue-300">Login via L2</a> to see your runs.</p>'
# Filter runs by user - match both plain username and ActivityPub format (@user@domain)
runs = [r for r in runs if r.username in (ctx.username, ctx.actor_id)]
if not runs:
return '<p class="text-gray-400 py-8 text-center">You have no runs yet. Use the CLI to start a run.</p>'
# Status badge colors
status_colors = {
"completed": "bg-green-600 text-white",
"running": "bg-yellow-600 text-white",
"failed": "bg-red-600 text-white",
"pending": "bg-gray-600 text-white"
}
html_parts = ['<div class="space-y-4">']
for run in runs[:20]: # Limit to 20 most recent
status_badge = status_colors.get(run.status, "bg-gray-600 text-white")
html_parts.append(f'''
<a href="/ui/detail/{run.run_id}" class="block">
<div class="bg-dark-700 rounded-lg p-4 hover:bg-dark-600 transition-colors" hx-get="/ui/run/{run.run_id}" hx-trigger="every 2s[classList.contains('status-running')]" hx-swap="outerHTML">
<div class="flex flex-wrap items-center justify-between gap-3 mb-3">
<div class="flex items-center gap-3">
<span class="px-3 py-1 bg-blue-600 text-white text-sm font-medium rounded-full">{run.recipe}</span>
<span class="text-gray-400 font-mono text-xs hidden sm:inline">{run.run_id[:16]}...</span>
</div>
<span class="px-3 py-1 {status_badge} text-xs font-medium rounded-full">{run.status}</span>
</div>
<div class="text-sm text-gray-400 mb-3">
Created: {run.created_at[:19].replace('T', ' ')}
</div>
''')
# Show input and output side by side
has_input = run.inputs and cache_manager.has_content(run.inputs[0])
has_output = run.status == "completed" and run.output_hash and cache_manager.has_content(run.output_hash)
if has_input or has_output:
html_parts.append('<div class="grid gap-4 sm:grid-cols-2">')
# Input box
if has_input:
input_hash = run.inputs[0]
input_media_type = detect_media_type(get_cache_path(input_hash))
html_parts.append(f'''
<div class="bg-dark-600 rounded-lg p-3">
<div class="text-xs text-gray-400 mb-2">Input: {input_hash[:16]}...</div>
<div class="flex justify-center">
''')
if input_media_type == "video":
input_video_src = video_src_for_request(input_hash, request)
html_parts.append(f'<video src="{input_video_src}" controls muted loop playsinline class="max-h-32 rounded"></video>')
elif input_media_type == "image":
html_parts.append(f'<img src="/cache/{input_hash}" alt="input" class="max-h-32 rounded">')
html_parts.append('</div></div>')
# Output box
if has_output:
output_hash = run.output_hash
output_media_type = detect_media_type(get_cache_path(output_hash))
html_parts.append(f'''
<div class="bg-dark-600 rounded-lg p-3">
<div class="text-xs text-gray-400 mb-2">Output: {output_hash[:16]}...</div>
<div class="flex justify-center">
''')
if output_media_type == "video":
output_video_src = video_src_for_request(output_hash, request)
html_parts.append(f'<video src="{output_video_src}" controls autoplay muted loop playsinline class="max-h-32 rounded"></video>')
elif output_media_type == "image":
html_parts.append(f'<img src="/cache/{output_hash}" alt="output" class="max-h-32 rounded">')
html_parts.append('</div></div>')
html_parts.append('</div>')
# Show error if failed
if run.status == "failed" and run.error:
html_parts.append(f'<div class="mt-3 text-sm text-red-400">Error: {run.error}</div>')
html_parts.append('</div></a>')
html_parts.append('</div>')
return '\n'.join(html_parts)
@app.get("/ui/media-list", response_class=HTMLResponse)
async def ui_media_list(
request: Request,
folder: Optional[str] = None,
collection: Optional[str] = None,
tag: Optional[str] = None
):
"""HTMX partial: list of media items with optional filtering."""
ctx = await get_user_context_from_cookie(request)
# Require login to see media
if not ctx:
return '<p class="text-gray-400 py-8 text-center"><a href="/login" class="text-blue-400 hover:text-blue-300">Login via L2</a> to see media.</p>'
# Get hashes owned by/associated with this user
user_hashes = await get_user_cache_hashes(ctx.username, ctx.actor_id)
# Get cache items that belong to the user (from cache_manager)
cache_items = []
seen_hashes = set() # Deduplicate by content_hash
for cached_file in cache_manager.list_all():
content_hash = cached_file.content_hash
if content_hash not in user_hashes:
continue
# Skip duplicates (same content from multiple runs)
if content_hash in seen_hashes:
continue
seen_hashes.add(content_hash)
# Skip recipes - they have their own section
if cached_file.node_type == "recipe":
continue
# Load metadata for filtering
meta = await database.load_item_metadata(content_hash, ctx.actor_id)
# Apply folder filter
if folder:
item_folder = meta.get("folder", "/")
if folder != "/" and not item_folder.startswith(folder):
continue
if folder == "/" and item_folder != "/":
continue
# Apply collection filter
if collection:
if collection not in meta.get("collections", []):
continue
# Apply tag filter
if tag:
if tag not in meta.get("tags", []):
continue
cache_items.append({
"hash": content_hash,
"size": cached_file.size_bytes,
"mtime": cached_file.created_at,
"meta": meta
})
# Sort by modification time (newest first)
cache_items.sort(key=lambda x: x["mtime"], reverse=True)
if not cache_items:
filter_msg = ""
if folder:
filter_msg = f" in folder {folder}"
elif collection:
filter_msg = f" in collection '{collection}'"
elif tag:
filter_msg = f" with tag '{tag}'"
return f'<p class="text-gray-400 py-8 text-center">No cached files{filter_msg}. Upload files or run effects to see them here.</p>'
html_parts = ['<div class="grid gap-4 sm:grid-cols-2 lg:grid-cols-3">']
for item in cache_items[:50]: # Limit to 50 items
content_hash = item["hash"]
cache_path = get_cache_path(content_hash)
media_type = detect_media_type(cache_path) if cache_path else "unknown"
# Check IPFS status
cache_item = await database.get_cache_item(content_hash)
ipfs_cid = cache_item.get("ipfs_cid") if cache_item else None
ipfs_badge = '<span class="px-2 py-1 bg-purple-600 text-white text-xs font-medium rounded-full" title="On IPFS">IPFS</span>' if ipfs_cid else ''
# Check L2 publish status
l2_shares = item["meta"].get("l2_shares", [])
if l2_shares:
first_share = l2_shares[0]
l2_server = first_share.get("l2_server", "")
asset_name = first_share.get("asset_name", "")
asset_url = f"{l2_server}/assets/{asset_name}"
published_badge = f'<span class="px-2 py-1 bg-green-600 text-white text-xs font-medium rounded-full" title="Published to L2">L2</span>'
else:
published_badge = ''
# Format size
size = item["size"]
if size > 1024*1024:
size_str = f"{size/(1024*1024):.1f} MB"
elif size > 1024:
size_str = f"{size/1024:.1f} KB"
else:
size_str = f"{size} bytes"
html_parts.append(f'''
<a href="/cache/{content_hash}" class="block">
<div class="bg-dark-700 rounded-lg p-4 hover:bg-dark-600 transition-colors">
<div class="flex items-center justify-between gap-2 mb-3">
<div class="flex items-center gap-2">
<span class="px-2 py-1 bg-blue-600 text-white text-xs font-medium rounded-full">{media_type}</span>
{ipfs_badge}
{published_badge}
</div>
<span class="text-xs text-gray-400">{size_str}</span>
</div>
<div class="text-xs text-gray-400 font-mono mb-3 truncate">{content_hash[:24]}...</div>
<div class="flex justify-center bg-dark-600 rounded-lg p-2">
''')
if media_type == "video":
video_src = video_src_for_request(content_hash, request)
html_parts.append(f'<video src="{video_src}" controls muted loop playsinline class="max-h-32 rounded"></video>')
elif media_type == "image":
html_parts.append(f'<img src="/cache/{content_hash}/raw" alt="{content_hash[:16]}" class="max-h-32 rounded object-contain">')
else:
html_parts.append('<p class="text-gray-400 text-sm py-4">Unknown file type</p>')
html_parts.append('''
</div>
</div>
</a>
''')
html_parts.append('</div>')
return '\n'.join(html_parts)
@app.get("/ui/detail/{run_id}")
async def ui_detail_page(run_id: str):
"""Redirect to clean URL."""
return RedirectResponse(url=f"/run/{run_id}", status_code=302)
@app.get("/ui/run/{run_id}", response_class=HTMLResponse)
async def ui_run_partial(run_id: str, request: Request):
"""HTMX partial: single run (for polling updates)."""
run = load_run(run_id)
if not run:
return '<div class="bg-dark-700 rounded-lg p-4 text-gray-400">Run not found</div>'
# Check Celery task status if running
if run.status == "running" and run.celery_task_id:
task = celery_app.AsyncResult(run.celery_task_id)
if task.ready():
if task.successful():
result = task.result
run.status = "completed"
run.completed_at = datetime.now(timezone.utc).isoformat()
run.output_hash = result.get("output", {}).get("content_hash")
# Extract effects info from provenance
effects = result.get("effects", [])
if effects:
run.effects_commit = effects[0].get("repo_commit")
run.effect_url = effects[0].get("repo_url")
# Extract infrastructure info
run.infrastructure = result.get("infrastructure")
output_path = Path(result.get("output", {}).get("local_path", ""))
if output_path.exists():
await cache_file(output_path)
else:
run.status = "failed"
run.error = str(task.result)
save_run(run)
# Status badge colors
status_colors = {
"completed": "bg-green-600 text-white",
"running": "bg-yellow-600 text-white",
"failed": "bg-red-600 text-white",
"pending": "bg-gray-600 text-white"
}
status_badge = status_colors.get(run.status, "bg-gray-600 text-white")
poll_attr = f'hx-get="/ui/run/{run_id}" hx-trigger="every 2s" hx-swap="outerHTML"' if run.status == "running" else ""
html = f'''
<a href="/ui/detail/{run.run_id}" class="block">
<div class="bg-dark-700 rounded-lg p-4 hover:bg-dark-600 transition-colors" {poll_attr}>
<div class="flex flex-wrap items-center justify-between gap-3 mb-3">
<div class="flex items-center gap-3">
<span class="px-3 py-1 bg-blue-600 text-white text-sm font-medium rounded-full">{run.recipe}</span>
<span class="text-gray-400 font-mono text-xs hidden sm:inline">{run.run_id[:16]}...</span>
</div>
<span class="px-3 py-1 {status_badge} text-xs font-medium rounded-full">{run.status}</span>
</div>
<div class="text-sm text-gray-400 mb-3">
Created: {run.created_at[:19].replace('T', ' ')}
</div>
'''
# Show input and output side by side
has_input = run.inputs and cache_manager.has_content(run.inputs[0])
has_output = run.status == "completed" and run.output_hash and cache_manager.has_content(run.output_hash)
if has_input or has_output:
html += '<div class="grid gap-4 sm:grid-cols-2">'
if has_input:
input_hash = run.inputs[0]
input_media_type = detect_media_type(get_cache_path(input_hash))
html += f'''
<div class="bg-dark-600 rounded-lg p-3">
<div class="text-xs text-gray-400 mb-2">Input: {input_hash[:16]}...</div>
<div class="flex justify-center">
'''
if input_media_type == "video":
input_video_src = video_src_for_request(input_hash, request)
html += f'<video src="{input_video_src}" controls muted loop playsinline class="max-h-32 rounded"></video>'
elif input_media_type == "image":
html += f'<img src="/cache/{input_hash}" alt="input" class="max-h-32 rounded">'
html += '</div></div>'
if has_output:
output_hash = run.output_hash
output_media_type = detect_media_type(get_cache_path(output_hash))
html += f'''
<div class="bg-dark-600 rounded-lg p-3">
<div class="text-xs text-gray-400 mb-2">Output: {output_hash[:16]}...</div>
<div class="flex justify-center">
'''
if output_media_type == "video":
output_video_src = video_src_for_request(output_hash, request)
html += f'<video src="{output_video_src}" controls autoplay muted loop playsinline class="max-h-32 rounded"></video>'
elif output_media_type == "image":
html += f'<img src="/cache/{output_hash}" alt="output" class="max-h-32 rounded">'
html += '</div></div>'
html += '</div>'
if run.status == "failed" and run.error:
html += f'<div class="mt-3 text-sm text-red-400">Error: {run.error}</div>'
html += '</div></a>'
return html
if __name__ == "__main__":
import uvicorn
# Workers enabled - cache indexes shared via Redis
uvicorn.run("server:app", host="0.0.0.0", port=8100, workers=4)