#!/usr/bin/env python3 """ Art DAG L1 Server Manages rendering runs and provides access to the cache. - POST /runs - start a run (recipe + inputs) - GET /runs/{run_id} - get run status/result - GET /cache/{content_hash} - get cached content """ import asyncio import base64 import hashlib import json import logging import os import time import uuid from dataclasses import dataclass from datetime import datetime, timezone from pathlib import Path from typing import Optional # Configure logging logging.basicConfig( level=logging.INFO, format='%(asctime)s %(levelname)s %(name)s: %(message)s' ) logger = logging.getLogger(__name__) from fastapi import FastAPI, HTTPException, UploadFile, File, Depends, Form, Request from fastapi.responses import FileResponse, HTMLResponse, RedirectResponse from fastapi.security import HTTPBearer, HTTPAuthorizationCredentials from pydantic import BaseModel import redis import requests as http_requests from urllib.parse import urlparse import yaml from celery_app import app as celery_app from legacy_tasks import render_effect, execute_dag, build_effect_dag from contextlib import asynccontextmanager from cache_manager import L1CacheManager, get_cache_manager import database import storage_providers # L1 public URL for redirects L1_PUBLIC_URL = os.environ.get("L1_PUBLIC_URL", "http://localhost:8100") def compute_run_id(input_hashes: list[str], recipe: str, recipe_hash: str = None) -> str: """ Compute a deterministic run_id from inputs and recipe. The run_id is a SHA3-256 hash of: - Sorted input content hashes - Recipe identifier (recipe_hash if provided, else "effect:{recipe}") This makes runs content-addressable: same inputs + recipe = same run_id. """ data = { "inputs": sorted(input_hashes), "recipe": recipe_hash or f"effect:{recipe}", "version": "1", # For future schema changes } json_str = json.dumps(data, sort_keys=True, separators=(",", ":")) return hashlib.sha3_256(json_str.encode()).hexdigest() # IPFS gateway URL for public access to IPFS content IPFS_GATEWAY_URL = os.environ.get("IPFS_GATEWAY_URL", "") # Cache directory (use /data/cache in Docker, ~/.artdag/cache locally) CACHE_DIR = Path(os.environ.get("CACHE_DIR", str(Path.home() / ".artdag" / "cache"))) CACHE_DIR.mkdir(parents=True, exist_ok=True) # Redis for persistent run storage and shared cache index (multi-worker support) REDIS_URL = os.environ.get('REDIS_URL', 'redis://localhost:6379/5') parsed = urlparse(REDIS_URL) redis_client = redis.Redis( host=parsed.hostname or 'localhost', port=parsed.port or 6379, db=int(parsed.path.lstrip('/') or 0), socket_timeout=5, socket_connect_timeout=5 ) RUNS_KEY_PREFIX = "artdag:run:" RECIPES_KEY_PREFIX = "artdag:recipe:" REVOKED_KEY_PREFIX = "artdag:revoked:" USER_TOKENS_PREFIX = "artdag:user_tokens:" # Token revocation (30 day expiry to match token lifetime) TOKEN_EXPIRY_SECONDS = 60 * 60 * 24 * 30 def register_user_token(username: str, token: str) -> None: """Track a token for a user (for later revocation by username).""" token_hash = hashlib.sha256(token.encode()).hexdigest() key = f"{USER_TOKENS_PREFIX}{username}" redis_client.sadd(key, token_hash) redis_client.expire(key, TOKEN_EXPIRY_SECONDS) def revoke_token(token: str) -> bool: """Add token to revocation set. Returns True if newly revoked.""" token_hash = hashlib.sha256(token.encode()).hexdigest() key = f"{REVOKED_KEY_PREFIX}{token_hash}" result = redis_client.set(key, "1", ex=TOKEN_EXPIRY_SECONDS, nx=True) return result is not None def revoke_token_hash(token_hash: str) -> bool: """Add token hash to revocation set. Returns True if newly revoked.""" key = f"{REVOKED_KEY_PREFIX}{token_hash}" result = redis_client.set(key, "1", ex=TOKEN_EXPIRY_SECONDS, nx=True) return result is not None def revoke_all_user_tokens(username: str) -> int: """Revoke all tokens for a user. Returns count revoked.""" key = f"{USER_TOKENS_PREFIX}{username}" token_hashes = redis_client.smembers(key) count = 0 for token_hash in token_hashes: if revoke_token_hash(token_hash.decode() if isinstance(token_hash, bytes) else token_hash): count += 1 # Clear the user's token set redis_client.delete(key) return count def is_token_revoked(token: str) -> bool: """Check if token has been revoked.""" token_hash = hashlib.sha256(token.encode()).hexdigest() key = f"{REVOKED_KEY_PREFIX}{token_hash}" return redis_client.exists(key) > 0 # Initialize L1 cache manager with Redis for shared state between workers cache_manager = L1CacheManager(cache_dir=CACHE_DIR, redis_client=redis_client) def save_run(run: "RunStatus"): """Save run to Redis.""" redis_client.set(f"{RUNS_KEY_PREFIX}{run.run_id}", run.model_dump_json()) def load_run(run_id: str) -> Optional["RunStatus"]: """Load run from Redis.""" data = redis_client.get(f"{RUNS_KEY_PREFIX}{run_id}") if data: return RunStatus.model_validate_json(data) return None def list_all_runs() -> list["RunStatus"]: """List all runs from Redis.""" runs = [] for key in redis_client.scan_iter(f"{RUNS_KEY_PREFIX}*"): data = redis_client.get(key) if data: runs.append(RunStatus.model_validate_json(data)) return sorted(runs, key=lambda r: r.created_at, reverse=True) def find_runs_using_content(content_hash: str) -> list[tuple["RunStatus", str]]: """Find all runs that use a content_hash as input or output. Returns list of (run, role) tuples where role is 'input' or 'output'. """ results = [] for run in list_all_runs(): if run.inputs and content_hash in run.inputs: results.append((run, "input")) if run.output_hash == content_hash: results.append((run, "output")) return results @asynccontextmanager async def lifespan(app: FastAPI): """Initialize and cleanup resources.""" # Startup: initialize database await database.init_db() yield # Shutdown: close database await database.close_db() app = FastAPI( title="Art DAG L1 Server", description="Distributed rendering server for Art DAG", version="0.1.0", lifespan=lifespan ) @app.exception_handler(404) async def not_found_handler(request: Request, exc): """Custom 404 page.""" from fastapi.responses import JSONResponse accept = request.headers.get("accept", "") if "text/html" in accept: content = '''

404

Page not found

Go to home page
''' # Import render_page at runtime to avoid circular dependency html = f""" Not Found | Art DAG L1 Server

Art DAG L1 Server

{content}
""" return HTMLResponse(html, status_code=404) return JSONResponse({"detail": "Not found"}, status_code=404) class RunRequest(BaseModel): """Request to start a run.""" recipe: str # Recipe name (e.g., "dog", "identity") or "dag" for custom DAG inputs: list[str] # List of content hashes output_name: Optional[str] = None use_dag: bool = False # Use DAG engine instead of legacy effect runner dag_json: Optional[str] = None # Custom DAG JSON (required if recipe="dag") class RunStatus(BaseModel): """Status of a run.""" run_id: str status: str # pending, running, completed, failed recipe: str inputs: list[str] output_name: str created_at: str completed_at: Optional[str] = None output_hash: Optional[str] = None error: Optional[str] = None celery_task_id: Optional[str] = None effects_commit: Optional[str] = None effect_url: Optional[str] = None # URL to effect source code username: Optional[str] = None # Owner of the run (ActivityPub actor ID) infrastructure: Optional[dict] = None # Hardware/software used for rendering provenance_cid: Optional[str] = None # IPFS CID of provenance record # ============ Recipe Models ============ class VariableInput(BaseModel): """A variable input that must be filled at run time.""" node_id: str name: str description: Optional[str] = None required: bool = True class FixedInput(BaseModel): """A fixed input resolved from the registry.""" node_id: str asset: str content_hash: str class RecipeStatus(BaseModel): """Status/metadata of a recipe.""" recipe_id: str # Content hash of the YAML file name: str version: str description: Optional[str] = None variable_inputs: list[VariableInput] fixed_inputs: list[FixedInput] output_node: str owner: Optional[str] = None uploaded_at: str uploader: Optional[str] = None class RecipeRunRequest(BaseModel): """Request to run a recipe with variable inputs.""" inputs: dict[str, str] # node_id -> content_hash def save_recipe(recipe: RecipeStatus): """Save recipe to Redis.""" redis_client.set(f"{RECIPES_KEY_PREFIX}{recipe.recipe_id}", recipe.model_dump_json()) def load_recipe(recipe_id: str) -> Optional[RecipeStatus]: """Load recipe from Redis.""" data = redis_client.get(f"{RECIPES_KEY_PREFIX}{recipe_id}") if data: return RecipeStatus.model_validate_json(data) return None def list_all_recipes() -> list[RecipeStatus]: """List all recipes from Redis.""" recipes = [] for key in redis_client.scan_iter(f"{RECIPES_KEY_PREFIX}*"): data = redis_client.get(key) if data: recipes.append(RecipeStatus.model_validate_json(data)) return sorted(recipes, key=lambda c: c.uploaded_at, reverse=True) def delete_recipe_from_redis(recipe_id: str) -> bool: """Delete recipe from Redis.""" return redis_client.delete(f"{RECIPES_KEY_PREFIX}{recipe_id}") > 0 def parse_recipe_yaml(yaml_content: str, recipe_hash: str, uploader: str) -> RecipeStatus: """Parse a recipe YAML file and extract metadata.""" config = yaml.safe_load(yaml_content) # Extract basic info name = config.get("name", "unnamed") version = config.get("version", "1.0") description = config.get("description") owner = config.get("owner") # Parse registry registry = config.get("registry", {}) assets = registry.get("assets", {}) # Parse DAG nodes dag = config.get("dag", {}) nodes = dag.get("nodes", []) output_node = dag.get("output") variable_inputs = [] fixed_inputs = [] for node in nodes: node_id = node.get("id") node_type = node.get("type") node_config = node.get("config", {}) if node_type == "SOURCE": if node_config.get("input"): # Variable input variable_inputs.append(VariableInput( node_id=node_id, name=node_config.get("name", node_id), description=node_config.get("description"), required=node_config.get("required", True) )) elif "asset" in node_config: # Fixed input - resolve from registry asset_name = node_config["asset"] asset_info = assets.get(asset_name, {}) fixed_inputs.append(FixedInput( node_id=node_id, asset=asset_name, content_hash=asset_info.get("hash", "") )) return RecipeStatus( recipe_id=recipe_hash, name=name, version=version, description=description, variable_inputs=variable_inputs, fixed_inputs=fixed_inputs, output_node=output_node or "", owner=owner, uploaded_at=datetime.now(timezone.utc).isoformat(), uploader=uploader ) # ============ Auth ============ security = HTTPBearer(auto_error=False) @dataclass class UserContext: """User authentication context extracted from JWT token.""" username: str l2_server: str # The L2 server that issued the token (e.g., "https://artdag.rose-ash.com") l2_domain: str # Domain part for actor_id (e.g., "artdag.rose-ash.com") @property def actor_id(self) -> str: """ActivityPub-style actor ID: @username@domain""" return f"@{self.username}@{self.l2_domain}" def decode_token_claims(token: str) -> Optional[dict]: """Decode JWT payload without verification (L2 does verification).""" try: # JWT format: header.payload.signature (all base64url encoded) parts = token.split(".") if len(parts) != 3: return None # Decode payload (add padding if needed) payload = parts[1] padding = 4 - len(payload) % 4 if padding != 4: payload += "=" * padding decoded = base64.urlsafe_b64decode(payload) return json.loads(decoded) except Exception: return None def get_user_context_from_token(token: str) -> Optional[UserContext]: """Extract user context from JWT token. Token must contain l2_server claim.""" claims = decode_token_claims(token) if not claims: return None username = claims.get("username") or claims.get("sub") l2_server = claims.get("l2_server") # e.g., "https://artdag.rose-ash.com" if not username or not l2_server: return None # Extract domain from l2_server URL for actor_id from urllib.parse import urlparse parsed = urlparse(l2_server) l2_domain = parsed.netloc or l2_server return UserContext(username=username, l2_server=l2_server, l2_domain=l2_domain) def _verify_token_with_l2_sync(token: str, l2_server: str) -> Optional[str]: """Verify token with the L2 server that issued it, return username if valid. (Sync version)""" try: resp = http_requests.post( f"{l2_server}/auth/verify", headers={"Authorization": f"Bearer {token}"}, json={"l1_server": L1_PUBLIC_URL}, # Identify ourselves to L2 timeout=5 ) if resp.status_code == 200: return resp.json().get("username") except Exception: pass return None async def verify_token_with_l2(token: str, l2_server: str) -> Optional[str]: """Verify token with the L2 server that issued it, return username if valid.""" return await asyncio.to_thread(_verify_token_with_l2_sync, token, l2_server) async def get_verified_user_context(token: str) -> Optional[UserContext]: """Get verified user context from token. Verifies with the L2 that issued it.""" # Check if token has been revoked if is_token_revoked(token): return None ctx = get_user_context_from_token(token) if not ctx: return None # Verify token with the L2 server from the token (non-blocking) verified_username = await verify_token_with_l2(token, ctx.l2_server) if not verified_username: return None return ctx async def get_optional_user( credentials: HTTPAuthorizationCredentials = Depends(security) ) -> Optional[str]: """Get username if authenticated, None otherwise.""" if not credentials: return None ctx = await get_verified_user_context(credentials.credentials) return ctx.username if ctx else None async def get_required_user( credentials: HTTPAuthorizationCredentials = Depends(security) ) -> str: """Get username, raise 401 if not authenticated.""" if not credentials: raise HTTPException(401, "Not authenticated") ctx = await get_verified_user_context(credentials.credentials) if not ctx: raise HTTPException(401, "Invalid token") return ctx.username async def get_required_user_context( credentials: HTTPAuthorizationCredentials = Depends(security) ) -> UserContext: """Get full user context, raise 401 if not authenticated.""" if not credentials: raise HTTPException(401, "Not authenticated") ctx = await get_verified_user_context(credentials.credentials) if not ctx: raise HTTPException(401, "Invalid token") return ctx def file_hash(path: Path) -> str: """Compute SHA3-256 hash of a file.""" hasher = hashlib.sha3_256() with open(path, "rb") as f: for chunk in iter(lambda: f.read(65536), b""): hasher.update(chunk) return hasher.hexdigest() async def cache_file(source: Path, node_type: str = "output") -> str: """ Copy file to cache using L1CacheManager, return content hash. Uses artdag's Cache internally for proper tracking. Saves IPFS CID to database. """ cached, ipfs_cid = cache_manager.put(source, node_type=node_type) # Save to cache_items table (with IPFS CID) await database.create_cache_item(cached.content_hash, ipfs_cid) return cached.content_hash def get_cache_path(content_hash: str) -> Optional[Path]: """Get the path for a cached file by content_hash.""" return cache_manager.get_by_content_hash(content_hash) @app.get("/api") async def api_info(): """Server info (JSON).""" runs = await asyncio.to_thread(list_all_runs) return { "name": "Art DAG L1 Server", "version": "0.1.0", "cache_dir": str(CACHE_DIR), "runs_count": len(runs) } def render_home_html(actor_id: Optional[str] = None) -> str: """Render the home page HTML with optional user info.""" if actor_id: # Extract username and domain from @username@domain format parts = actor_id.lstrip("@").split("@") username = parts[0] if parts else actor_id domain = parts[1] if len(parts) > 1 else "" l2_user_url = f"https://{domain}/users/{username}" if domain else "#" user_section = f'''
Logged in as {actor_id}
''' else: user_section = '''Not logged in''' return f""" Art DAG L1 Server

Art DAG L1 Server

L1 rendering server for the Art DAG system. Manages distributed rendering jobs via Celery workers.

Dependencies

API Endpoints

Method Path Description
GET/uiWeb UI for viewing runs
POST/runsStart a rendering run
GET/runsList all runs
GET/runs/{{run_id}}Get run status
GET/mediaList media items
GET/recipesList recipes
GET/cache/{{hash}}Download cached content
POST/cache/uploadUpload file to cache
GET/assetsList known assets

Start a Run

curl -X POST /runs \\
  -H "Content-Type: application/json" \\
  -d '{{"recipe": "dog", "inputs": ["33268b6e..."]}}'

Provenance

Every render produces a provenance record linking inputs, effects, and infrastructure:

{{
  "output": {{"content_hash": "..."}},
  "inputs": [...],
  "effects": [...],
  "infrastructure": {{...}}
}}
""" @app.get("/", response_class=HTMLResponse) async def root(request: Request): """Home page.""" ctx = await get_user_context_from_cookie(request) actor_id = ctx.actor_id if ctx else None return render_home_html(actor_id) @app.post("/runs", response_model=RunStatus) async def create_run(request: RunRequest, ctx: UserContext = Depends(get_required_user_context)): """Start a new rendering run. Checks cache before executing.""" # Compute content-addressable run_id run_id = compute_run_id(request.inputs, request.recipe) # Generate output name if not provided output_name = request.output_name or f"{request.recipe}-{run_id[:8]}" # Use actor_id from user context actor_id = ctx.actor_id # Check L1 cache first cached_run = await database.get_run_cache(run_id) if cached_run: output_hash = cached_run["output_hash"] # Verify the output file still exists in cache if cache_manager.has_content(output_hash): logger.info(f"create_run: Cache hit for run_id={run_id[:16]}... output={output_hash[:16]}...") return RunStatus( run_id=run_id, status="completed", recipe=request.recipe, inputs=request.inputs, output_name=output_name, created_at=cached_run.get("created_at", datetime.now(timezone.utc).isoformat()), completed_at=cached_run.get("created_at", datetime.now(timezone.utc).isoformat()), output_hash=output_hash, username=actor_id, provenance_cid=cached_run.get("provenance_cid"), ) else: logger.info(f"create_run: Cache entry exists but output missing, will re-run") # Check L2 if not in L1 l2_server = ctx.l2_server try: l2_resp = http_requests.get( f"{l2_server}/assets/by-run-id/{run_id}", timeout=10 ) if l2_resp.status_code == 200: l2_data = l2_resp.json() output_hash = l2_data.get("output_hash") ipfs_cid = l2_data.get("ipfs_cid") if output_hash and ipfs_cid: logger.info(f"create_run: Found on L2, pulling from IPFS: {ipfs_cid}") # Pull from IPFS to L1 cache import ipfs_client legacy_dir = CACHE_DIR / "legacy" legacy_dir.mkdir(parents=True, exist_ok=True) recovery_path = legacy_dir / output_hash if ipfs_client.get_file(ipfs_cid, str(recovery_path)): # File retrieved - put() updates indexes, but file is already in legacy location # Just update the content and IPFS indexes manually cache_manager._set_content_index(output_hash, output_hash) cache_manager._set_ipfs_index(output_hash, ipfs_cid) # Save to run cache await database.save_run_cache( run_id=run_id, output_hash=output_hash, recipe=request.recipe, inputs=request.inputs, ipfs_cid=ipfs_cid, provenance_cid=l2_data.get("provenance_cid"), actor_id=actor_id, ) logger.info(f"create_run: Recovered from L2/IPFS: {output_hash[:16]}...") return RunStatus( run_id=run_id, status="completed", recipe=request.recipe, inputs=request.inputs, output_name=output_name, created_at=datetime.now(timezone.utc).isoformat(), completed_at=datetime.now(timezone.utc).isoformat(), output_hash=output_hash, username=actor_id, provenance_cid=l2_data.get("provenance_cid"), ) except Exception as e: logger.warning(f"create_run: L2 lookup failed (will run Celery): {e}") # Not cached anywhere - create run record and submit to Celery run = RunStatus( run_id=run_id, status="pending", recipe=request.recipe, inputs=request.inputs, output_name=output_name, created_at=datetime.now(timezone.utc).isoformat(), username=actor_id ) # Submit to Celery if request.use_dag or request.recipe == "dag": # DAG mode - use artdag engine if request.dag_json: # Custom DAG provided dag_json = request.dag_json else: # Build simple effect DAG from recipe and inputs dag = build_effect_dag(request.inputs, request.recipe) dag_json = dag.to_json() task = execute_dag.delay(dag_json, run.run_id) else: # Legacy mode - single effect if len(request.inputs) != 1: raise HTTPException(400, "Legacy mode only supports single-input recipes. Use use_dag=true for multi-input.") input_hash = request.inputs[0] task = render_effect.delay(input_hash, request.recipe, output_name) run.celery_task_id = task.id run.status = "running" await asyncio.to_thread(save_run, run) return run def _check_celery_task_sync(task_id: str) -> tuple[bool, bool, Optional[dict], Optional[str]]: """Check Celery task status synchronously. Returns (is_ready, is_successful, result, error).""" task = celery_app.AsyncResult(task_id) if not task.ready(): return (False, False, None, None) if task.successful(): return (True, True, task.result, None) else: return (True, False, None, str(task.result)) @app.get("/runs/{run_id}", response_model=RunStatus) async def get_run(run_id: str): """Get status of a run.""" start = time.time() logger.info(f"get_run: Starting for {run_id}") t0 = time.time() run = await asyncio.to_thread(load_run, run_id) logger.info(f"get_run: load_run took {time.time()-t0:.3f}s, status={run.status if run else 'None'}") if not run: raise HTTPException(404, f"Run {run_id} not found") # Check Celery task status if running if run.status == "running" and run.celery_task_id: t0 = time.time() is_ready, is_successful, result, error = await asyncio.to_thread( _check_celery_task_sync, run.celery_task_id ) logger.info(f"get_run: Celery check took {time.time()-t0:.3f}s, ready={is_ready}") if is_ready: if is_successful: run.status = "completed" run.completed_at = datetime.now(timezone.utc).isoformat() # Handle both legacy (render_effect) and new (execute_dag) result formats if "output_hash" in result: # New DAG result format run.output_hash = result.get("output_hash") run.provenance_cid = result.get("provenance_cid") output_path = Path(result.get("output_path", "")) if result.get("output_path") else None else: # Legacy render_effect format run.output_hash = result.get("output", {}).get("content_hash") run.provenance_cid = result.get("provenance_cid") output_path = Path(result.get("output", {}).get("local_path", "")) # Extract effects info from provenance (legacy only) effects = result.get("effects", []) if effects: run.effects_commit = effects[0].get("repo_commit") run.effect_url = effects[0].get("repo_url") # Extract infrastructure info (legacy only) run.infrastructure = result.get("infrastructure") # Cache the output (legacy mode - DAG already caches via cache_manager) if output_path and output_path.exists() and "output_hash" not in result: t0 = time.time() await cache_file(output_path, node_type="effect_output") logger.info(f"get_run: cache_file took {time.time()-t0:.3f}s") # Record activity for deletion tracking (legacy mode) if run.output_hash and run.inputs: await asyncio.to_thread( cache_manager.record_simple_activity, input_hashes=run.inputs, output_hash=run.output_hash, run_id=run.run_id, ) # Save to run cache for content-addressable lookup if run.output_hash: ipfs_cid = cache_manager._get_ipfs_cid_from_index(run.output_hash) await database.save_run_cache( run_id=run.run_id, output_hash=run.output_hash, recipe=run.recipe, inputs=run.inputs, ipfs_cid=ipfs_cid, provenance_cid=run.provenance_cid, actor_id=run.username, ) logger.info(f"get_run: Saved run cache for {run.run_id[:16]}...") else: run.status = "failed" run.error = error # Save updated status t0 = time.time() await asyncio.to_thread(save_run, run) logger.info(f"get_run: save_run took {time.time()-t0:.3f}s") logger.info(f"get_run: Total time {time.time()-start:.3f}s") return run @app.delete("/runs/{run_id}") async def discard_run(run_id: str, ctx: UserContext = Depends(get_required_user_context)): """ Discard (delete) a run and its outputs. Enforces deletion rules: - Cannot discard if output is published to L2 (pinned) - Deletes outputs and intermediate cache entries - Preserves inputs (cache items and recipes are NOT deleted) """ run = await asyncio.to_thread(load_run, run_id) if not run: raise HTTPException(404, f"Run {run_id} not found") # Check ownership if run.username not in (ctx.username, ctx.actor_id): raise HTTPException(403, "Access denied") # Failed runs can always be deleted (no output to protect) if run.status != "failed": # Only check if output is pinned - inputs are preserved, not deleted if run.output_hash: meta = await database.load_item_metadata(run.output_hash, ctx.actor_id) if meta.get("pinned"): pin_reason = meta.get("pin_reason", "published") raise HTTPException(400, f"Cannot discard run: output {run.output_hash[:16]}... is pinned ({pin_reason})") # Check if activity exists for this run activity = await asyncio.to_thread(cache_manager.get_activity, run_id) if activity: # Discard the activity - only delete outputs, preserve inputs success, msg = await asyncio.to_thread(cache_manager.discard_activity_outputs_only, run_id) if not success: raise HTTPException(400, f"Cannot discard run: {msg}") # Remove from Redis await asyncio.to_thread(redis_client.delete, f"{RUNS_KEY_PREFIX}{run_id}") return {"discarded": True, "run_id": run_id} @app.delete("/ui/runs/{run_id}/discard", response_class=HTMLResponse) async def ui_discard_run(run_id: str, request: Request): """HTMX handler: discard a run. Only deletes outputs, preserves inputs.""" ctx = await get_user_context_from_cookie(request) if not ctx: return '
Login required
' run = await asyncio.to_thread(load_run, run_id) if not run: return '
Run not found
' # Check ownership if run.username not in (ctx.username, ctx.actor_id): return '
Access denied
' # Failed runs can always be deleted if run.status != "failed": # Only check if output is pinned - inputs are preserved, not deleted if run.output_hash: meta = await database.load_item_metadata(run.output_hash, ctx.actor_id) if meta.get("pinned"): pin_reason = meta.get("pin_reason", "published") return f'
Cannot discard: output is pinned ({pin_reason})
' # Check if activity exists for this run activity = await asyncio.to_thread(cache_manager.get_activity, run_id) if activity: # Discard the activity - only delete outputs, preserve inputs success, msg = await asyncio.to_thread(cache_manager.discard_activity_outputs_only, run_id) if not success: return f'
Cannot discard: {msg}
' # Remove from Redis await asyncio.to_thread(redis_client.delete, f"{RUNS_KEY_PREFIX}{run_id}") return '''
Run deleted. Back to runs
''' @app.get("/run/{run_id}") async def run_detail(run_id: str, request: Request): """Run detail. HTML for browsers, JSON for APIs.""" run = await asyncio.to_thread(load_run, run_id) if not run: if wants_html(request): content = f'

Run not found: {run_id}

' return HTMLResponse(render_page("Not Found", content, None, active_tab="runs"), status_code=404) raise HTTPException(404, f"Run {run_id} not found") # Check Celery task status if running if run.status == "running" and run.celery_task_id: is_ready, is_successful, result, error = await asyncio.to_thread( _check_celery_task_sync, run.celery_task_id ) if is_ready: if is_successful: run.status = "completed" run.completed_at = datetime.now(timezone.utc).isoformat() run.output_hash = result.get("output", {}).get("content_hash") effects = result.get("effects", []) if effects: run.effects_commit = effects[0].get("repo_commit") run.effect_url = effects[0].get("repo_url") run.infrastructure = result.get("infrastructure") output_path = Path(result.get("output", {}).get("local_path", "")) if output_path.exists(): await cache_file(output_path) # Save to run cache for content-addressable lookup if run.output_hash: ipfs_cid = cache_manager._get_ipfs_cid_from_index(run.output_hash) await database.save_run_cache( run_id=run.run_id, output_hash=run.output_hash, recipe=run.recipe, inputs=run.inputs, ipfs_cid=ipfs_cid, provenance_cid=run.provenance_cid, actor_id=run.username, ) else: run.status = "failed" run.error = error await asyncio.to_thread(save_run, run) if wants_html(request): ctx = await get_user_context_from_cookie(request) if not ctx: content = '

Not logged in.

' return HTMLResponse(render_page("Login Required", content, None, active_tab="runs"), status_code=401) # Check user owns this run if run.username not in (ctx.username, ctx.actor_id): content = '

Access denied.

' return HTMLResponse(render_page("Access Denied", content, ctx.actor_id, active_tab="runs"), status_code=403) # Build effect URL if run.effect_url: effect_url = run.effect_url elif run.effects_commit and run.effects_commit != "unknown": effect_url = f"https://git.rose-ash.com/art-dag/effects/src/commit/{run.effects_commit}/{run.recipe}" else: effect_url = f"https://git.rose-ash.com/art-dag/effects/src/branch/main/{run.recipe}" # Status badge colors status_colors = { "completed": "bg-green-600 text-white", "running": "bg-yellow-600 text-white", "failed": "bg-red-600 text-white", "pending": "bg-gray-600 text-white" } status_badge = status_colors.get(run.status, "bg-gray-600 text-white") # Try to get input names from recipe input_names = {} recipe_name = run.recipe.replace("recipe:", "") if run.recipe.startswith("recipe:") else run.recipe for recipe in list_all_recipes(): if recipe.name == recipe_name: # Match variable inputs first, then fixed inputs for i, var_input in enumerate(recipe.variable_inputs): if i < len(run.inputs): input_names[run.inputs[i]] = var_input.name # Fixed inputs follow variable inputs offset = len(recipe.variable_inputs) for i, fixed_input in enumerate(recipe.fixed_inputs): idx = offset + i if idx < len(run.inputs): input_names[run.inputs[idx]] = fixed_input.asset break # Build media HTML for inputs and output media_html = "" available_inputs = [inp for inp in run.inputs if cache_manager.has_content(inp)] has_output = run.status == "completed" and run.output_hash and cache_manager.has_content(run.output_hash) if available_inputs or has_output: # Flexible grid - more columns for more items num_items = len(available_inputs) + (1 if has_output else 0) grid_cols = min(num_items, 3) # Max 3 columns media_html = f'
' for idx, input_hash in enumerate(available_inputs): input_media_type = detect_media_type(get_cache_path(input_hash)) input_video_src = video_src_for_request(input_hash, request) if input_media_type == "video": input_elem = f'' elif input_media_type == "image": input_elem = f'input' else: input_elem = '

Unknown format

' # Get input name or fall back to "Input N" input_name = input_names.get(input_hash, f"Input {idx + 1}") media_html += f'''
{input_name}
{input_hash[:24]}...
{input_elem}
''' if has_output: output_hash = run.output_hash output_media_type = detect_media_type(get_cache_path(output_hash)) output_video_src = video_src_for_request(output_hash, request) if output_media_type == "video": output_elem = f'' elif output_media_type == "image": output_elem = f'output' else: output_elem = '

Unknown format

' media_html += f'''
Output
{output_hash[:24]}...
{output_elem}
''' media_html += '
' # Build inputs list with names inputs_html = ''.join([ f'
{input_names.get(inp, f"Input {i+1}")}: {inp}
' for i, inp in enumerate(run.inputs) ]) # Infrastructure section infra_html = "" if run.infrastructure: software = run.infrastructure.get("software", {}) hardware = run.infrastructure.get("hardware", {}) infra_html = f'''
Infrastructure
Software: {software.get("name", "unknown")} ({software.get("content_hash", "unknown")[:16]}...)
Hardware: {hardware.get("name", "unknown")} ({hardware.get("content_hash", "unknown")[:16]}...)
''' # Error display error_html = "" if run.error: error_html = f'''
Error
{run.error}
''' # Publish section - check if already published to L2 publish_html = "" if run.status == "completed" and run.output_hash: l2_shares = await database.get_l2_shares(run.output_hash, ctx.actor_id) if l2_shares: # Already published - show link to L2 share = l2_shares[0] l2_server = share.get("l2_server", "") l2_https = l2_server.replace("http://", "https://") asset_name = share.get("asset_name", "") activity_id = share.get("activity_id") # Link to activity if available, otherwise fall back to asset l2_link = f"{l2_https}/activities/{activity_id}" if activity_id else f"{l2_https}/assets/{asset_name}" publish_html = f'''

Published to L2

Published as {asset_name[:16]}... View on L2

''' else: # Not published - show publish form publish_html = f'''

Publish to L2

Register this run (inputs, recipe, output) on the L2 ActivityPub server. Assets are identified by their content hash.

''' # Delete section delete_html = f'''

Delete Run

{"This run failed and can be deleted." if run.status == "failed" else "Delete this run and its associated cache entries."}

''' output_link = "" if run.output_hash: output_link = f'''
Output
{run.output_hash}
''' completed_html = "" if run.completed_at: completed_html = f'''
Completed
{run.completed_at[:19].replace('T', ' ')}
''' # Sub-navigation tabs for run detail pages sub_tabs_html = render_run_sub_tabs(run_id, active="overview") content = f''' Back to runs {sub_tabs_html}
{run.recipe} {run.run_id[:16]}...
{run.status}
{error_html} {media_html}

Provenance

Owner
{run.username or "anonymous"}
Effect
{run.recipe}
Effects Commit
{run.effects_commit or "N/A"}
Input(s)
{inputs_html}
{output_link}
Run ID
{run.run_id}
Created
{run.created_at[:19].replace('T', ' ')}
{completed_html} {infra_html}
{publish_html} {delete_html}
''' return HTMLResponse(render_page(f"Run: {run.recipe}", content, ctx.actor_id, active_tab="runs")) # JSON response return run.model_dump() # Plan/Analysis cache directories (match tasks/orchestrate.py) PLAN_CACHE_DIR = CACHE_DIR / 'plans' ANALYSIS_CACHE_DIR = CACHE_DIR / 'analysis' @app.get("/run/{run_id}/plan", response_class=HTMLResponse) async def run_plan_visualization(run_id: str, request: Request): """Visualize execution plan as interactive DAG.""" ctx = await get_user_context_from_cookie(request) if not ctx: content = '

Not logged in.

' return HTMLResponse(render_page("Login Required", content, None, active_tab="runs"), status_code=401) run = await asyncio.to_thread(load_run, run_id) if not run: content = f'

Run not found: {run_id}

' return HTMLResponse(render_page("Not Found", content, ctx.actor_id, active_tab="runs"), status_code=404) # Check user owns this run if run.username not in (ctx.username, ctx.actor_id): content = '

Access denied.

' return HTMLResponse(render_page("Access Denied", content, ctx.actor_id, active_tab="runs"), status_code=403) # Try to load existing plan from cache plan_data = None PLAN_CACHE_DIR.mkdir(parents=True, exist_ok=True) # Look for plan file matching this run for plan_file in PLAN_CACHE_DIR.glob("*.json"): try: with open(plan_file) as f: data = json.load(f) # Check if this plan matches our run inputs plan_inputs = data.get("input_hashes", {}) if set(plan_inputs.values()) == set(run.inputs): plan_data = data break except (json.JSONDecodeError, IOError): continue # Build sub-navigation tabs tabs_html = render_run_sub_tabs(run_id, active="plan") if not plan_data: content = f''' Back to runs {tabs_html}

Execution Plan

No execution plan available for this run.

Plans are generated when using recipe-based runs with the v2 API.

''' return HTMLResponse(render_page_with_cytoscape(f"Plan: {run_id[:16]}...", content, ctx.actor_id, active_tab="runs")) # Build Cytoscape nodes and edges from plan nodes = [] edges = [] steps = plan_data.get("steps", []) for step in steps: node_type = step.get("node_type", "EFFECT") color = NODE_COLORS.get(node_type, NODE_COLORS["default"]) cached = step.get("cached", False) status = "cached" if cached else "pending" # Shorter label for display step_id = step.get("step_id", "") label = step_id[:12] + "..." if len(step_id) > 12 else step_id nodes.append({ "data": { "id": step_id, "label": label, "nodeType": node_type, "level": step.get("level", 0), "cacheId": step.get("cache_id", ""), "status": status, "color": color, "config": step.get("config") } }) # Build edges from the full plan JSON if available if "plan_json" in plan_data: try: full_plan = json.loads(plan_data["plan_json"]) for step in full_plan.get("steps", []): step_id = step.get("step_id", "") for input_step in step.get("input_steps", []): edges.append({ "data": { "source": input_step, "target": step_id } }) except json.JSONDecodeError: pass nodes_json = json.dumps(nodes) edges_json = json.dumps(edges) dag_html = render_dag_cytoscape(nodes_json, edges_json) # Stats summary total = plan_data.get("total_steps", len(steps)) cached = plan_data.get("cached_steps", sum(1 for s in steps if s.get("cached"))) pending = plan_data.get("pending_steps", total - cached) content = f''' Back to runs {tabs_html}

Execution Plan

{total}
Total Steps
{cached}
Cached
{pending}
Executed
SOURCE EFFECT _LIST Cached
{dag_html}
''' return HTMLResponse(render_page_with_cytoscape(f"Plan: {run_id[:16]}...", content, ctx.actor_id, active_tab="runs")) @app.get("/run/{run_id}/analysis", response_class=HTMLResponse) async def run_analysis_page(run_id: str, request: Request): """Show analysis results for run inputs.""" ctx = await get_user_context_from_cookie(request) if not ctx: content = '

Not logged in.

' return HTMLResponse(render_page("Login Required", content, None, active_tab="runs"), status_code=401) run = await asyncio.to_thread(load_run, run_id) if not run: content = f'

Run not found: {run_id}

' return HTMLResponse(render_page("Not Found", content, ctx.actor_id, active_tab="runs"), status_code=404) # Check user owns this run if run.username not in (ctx.username, ctx.actor_id): content = '

Access denied.

' return HTMLResponse(render_page("Access Denied", content, ctx.actor_id, active_tab="runs"), status_code=403) tabs_html = render_run_sub_tabs(run_id, active="analysis") # Load analysis results for each input analysis_html = "" ANALYSIS_CACHE_DIR.mkdir(parents=True, exist_ok=True) for i, input_hash in enumerate(run.inputs): analysis_path = ANALYSIS_CACHE_DIR / f"{input_hash}.json" analysis_data = None if analysis_path.exists(): try: with open(analysis_path) as f: analysis_data = json.load(f) except (json.JSONDecodeError, IOError): pass input_name = f"Input {i + 1}" if analysis_data: tempo = analysis_data.get("tempo", "N/A") if isinstance(tempo, float): tempo = f"{tempo:.1f}" beat_times = analysis_data.get("beat_times", []) beat_count = len(beat_times) energy = analysis_data.get("energy") # Beat visualization (simple bar chart showing beat positions) beat_bars = "" if beat_times and len(beat_times) > 0: # Show first 50 beats as vertical bars display_beats = beat_times[:50] max_time = max(display_beats) if display_beats else 1 for bt in display_beats: # Normalize to percentage pos = (bt / max_time) * 100 if max_time > 0 else 0 beat_bars += f'
' energy_bar = "" if energy is not None: try: energy_pct = min(float(energy) * 100, 100) energy_bar = f'''
Energy Level
{energy_pct:.1f}%
''' except (TypeError, ValueError): pass analysis_html += f'''

{input_name}

{input_hash[:24]}...
Analyzed
{tempo}
BPM (Tempo)
{beat_count}
Beats Detected
{energy_bar}
Beat Timeline (first 50 beats)
{beat_bars if beat_bars else 'No beats detected'}
''' else: analysis_html += f'''

{input_name}

{input_hash[:24]}...
Not Analyzed

No analysis data available for this input.

Analysis is performed when using recipe-based runs.

''' if not run.inputs: analysis_html = '

No inputs found for this run.

' content = f''' Back to runs {tabs_html}

Analysis Results

{analysis_html} ''' return HTMLResponse(render_page(f"Analysis: {run_id[:16]}...", content, ctx.actor_id, active_tab="runs")) @app.get("/run/{run_id}/artifacts", response_class=HTMLResponse) async def run_artifacts_page(run_id: str, request: Request): """Show all cached artifacts produced by this run.""" ctx = await get_user_context_from_cookie(request) if not ctx: content = '

Not logged in.

' return HTMLResponse(render_page("Login Required", content, None, active_tab="runs"), status_code=401) run = await asyncio.to_thread(load_run, run_id) if not run: content = f'

Run not found: {run_id}

' return HTMLResponse(render_page("Not Found", content, ctx.actor_id, active_tab="runs"), status_code=404) # Check user owns this run if run.username not in (ctx.username, ctx.actor_id): content = '

Access denied.

' return HTMLResponse(render_page("Access Denied", content, ctx.actor_id, active_tab="runs"), status_code=403) tabs_html = render_run_sub_tabs(run_id, active="artifacts") # Collect all artifacts: inputs + output artifacts = [] # Add inputs for i, content_hash in enumerate(run.inputs): cache_path = get_cache_path(content_hash) if cache_path and cache_path.exists(): size = cache_path.stat().st_size media_type = detect_media_type(cache_path) artifacts.append({ "hash": content_hash, "path": cache_path, "size": size, "media_type": media_type, "role": "input", "role_color": "blue", "name": f"Input {i + 1}", }) # Add output if run.output_hash: cache_path = get_cache_path(run.output_hash) if cache_path and cache_path.exists(): size = cache_path.stat().st_size media_type = detect_media_type(cache_path) artifacts.append({ "hash": run.output_hash, "path": cache_path, "size": size, "media_type": media_type, "role": "output", "role_color": "green", "name": "Output", }) # Build artifacts HTML artifacts_html = "" for artifact in artifacts: size_kb = artifact["size"] / 1024 if size_kb < 1024: size_str = f"{size_kb:.1f} KB" else: size_str = f"{size_kb/1024:.1f} MB" # Thumbnail for media thumb = "" if artifact["media_type"] == "video": thumb = f'' elif artifact["media_type"] == "image": thumb = f'' else: thumb = '
File
' role_color = artifact["role_color"] artifacts_html += f'''
{thumb}
{artifact["name"]}
{artifact["hash"][:32]}...
{size_str} {artifact["media_type"]}
{artifact["role"]}
''' if not artifacts: artifacts_html = '

No cached artifacts found for this run.

' content = f''' Back to runs {tabs_html}

Cached Artifacts

{artifacts_html}
''' return HTMLResponse(render_page(f"Artifacts: {run_id[:16]}...", content, ctx.actor_id, active_tab="runs")) # JSON API endpoints for future WebSocket support @app.get("/api/run/{run_id}/plan") async def api_run_plan(run_id: str, request: Request): """Get execution plan data as JSON for programmatic access.""" ctx = await get_user_context_from_cookie(request) if not ctx: raise HTTPException(401, "Not logged in") run = await asyncio.to_thread(load_run, run_id) if not run: raise HTTPException(404, f"Run {run_id} not found") if run.username not in (ctx.username, ctx.actor_id): raise HTTPException(403, "Access denied") # Look for plan in cache PLAN_CACHE_DIR.mkdir(parents=True, exist_ok=True) for plan_file in PLAN_CACHE_DIR.glob("*.json"): try: with open(plan_file) as f: data = json.load(f) plan_inputs = data.get("input_hashes", {}) if set(plan_inputs.values()) == set(run.inputs): return data except (json.JSONDecodeError, IOError): continue return {"status": "not_found", "message": "No plan found for this run"} @app.get("/api/run/{run_id}/analysis") async def api_run_analysis(run_id: str, request: Request): """Get analysis data as JSON for programmatic access.""" ctx = await get_user_context_from_cookie(request) if not ctx: raise HTTPException(401, "Not logged in") run = await asyncio.to_thread(load_run, run_id) if not run: raise HTTPException(404, f"Run {run_id} not found") if run.username not in (ctx.username, ctx.actor_id): raise HTTPException(403, "Access denied") ANALYSIS_CACHE_DIR.mkdir(parents=True, exist_ok=True) results = {} for input_hash in run.inputs: analysis_path = ANALYSIS_CACHE_DIR / f"{input_hash}.json" if analysis_path.exists(): try: with open(analysis_path) as f: results[input_hash] = json.load(f) except (json.JSONDecodeError, IOError): results[input_hash] = None else: results[input_hash] = None return {"run_id": run_id, "inputs": run.inputs, "analysis": results} @app.get("/runs") async def list_runs(request: Request, page: int = 1, limit: int = 20): """List runs. HTML for browsers (with infinite scroll), JSON for APIs (with pagination).""" ctx = await get_user_context_from_cookie(request) all_runs = await asyncio.to_thread(list_all_runs) total = len(all_runs) # Filter by user if logged in for HTML if wants_html(request) and ctx: all_runs = [r for r in all_runs if r.username in (ctx.username, ctx.actor_id)] total = len(all_runs) # Pagination start = (page - 1) * limit end = start + limit runs_page = all_runs[start:end] has_more = end < total if wants_html(request): if not ctx: content = '

Not logged in.

' return HTMLResponse(render_page("Runs", content, None, active_tab="runs")) if not runs_page: if page == 1: content = '

You have no runs yet. Use the CLI to start a run.

' else: return HTMLResponse("") # Empty for infinite scroll else: # Status badge colors status_colors = { "completed": "bg-green-600 text-white", "running": "bg-yellow-600 text-white", "failed": "bg-red-600 text-white", "pending": "bg-gray-600 text-white" } html_parts = [] for run in runs_page: status_badge = status_colors.get(run.status, "bg-gray-600 text-white") html_parts.append(f'''
{run.recipe}
{run.status}
Created: {run.created_at[:19].replace('T', ' ')}
''') # Show input and output thumbnails has_input = run.inputs and cache_manager.has_content(run.inputs[0]) has_output = run.status == "completed" and run.output_hash and cache_manager.has_content(run.output_hash) if has_input or has_output: html_parts.append('
') if has_input: input_hash = run.inputs[0] input_media_type = detect_media_type(get_cache_path(input_hash)) html_parts.append(f'''
Input
''') if input_media_type == "video": html_parts.append(f'') else: html_parts.append(f'input') html_parts.append('
') if has_output: output_hash = run.output_hash output_media_type = detect_media_type(get_cache_path(output_hash)) html_parts.append(f'''
Output
''') if output_media_type == "video": html_parts.append(f'') else: html_parts.append(f'output') html_parts.append('
') html_parts.append('
') if run.status == "failed" and run.error: html_parts.append(f'
Error: {run.error[:100]}
') html_parts.append('
') # For infinite scroll, just return cards if not first page if page > 1: if has_more: html_parts.append(f'''

Loading more...

''') return HTMLResponse('\n'.join(html_parts)) # First page - full content infinite_scroll_trigger = "" if has_more: infinite_scroll_trigger = f'''

Loading more...

''' content = f'''

Runs ({total} total)

{''.join(html_parts)} {infinite_scroll_trigger}
''' return HTMLResponse(render_page("Runs", content, ctx.actor_id, active_tab="runs")) # JSON response for APIs return { "runs": [r.model_dump() for r in runs_page], "pagination": { "page": page, "limit": limit, "total": total, "has_more": has_more } } # ============ Recipe Endpoints ============ @app.post("/recipes/upload") async def upload_recipe(file: UploadFile = File(...), ctx: UserContext = Depends(get_required_user_context)): """Upload a recipe YAML file. Requires authentication.""" import tempfile # Read file content content = await file.read() try: yaml_content = content.decode('utf-8') except UnicodeDecodeError: raise HTTPException(400, "Recipe file must be valid UTF-8 text") # Validate YAML try: yaml.safe_load(yaml_content) except yaml.YAMLError as e: raise HTTPException(400, f"Invalid YAML: {e}") # Store YAML file in cache with tempfile.NamedTemporaryFile(delete=False, suffix=".yaml") as tmp: tmp.write(content) tmp_path = Path(tmp.name) cached, ipfs_cid = cache_manager.put(tmp_path, node_type="recipe", move=True) recipe_hash = cached.content_hash # Parse and save metadata actor_id = ctx.actor_id try: recipe_status = parse_recipe_yaml(yaml_content, recipe_hash, actor_id) except Exception as e: raise HTTPException(400, f"Failed to parse recipe: {e}") await asyncio.to_thread(save_recipe, recipe_status) # Save cache metadata to database await database.save_item_metadata( content_hash=recipe_hash, actor_id=actor_id, item_type="recipe", filename=file.filename, description=recipe_status.name # Use recipe name as description ) return { "recipe_id": recipe_hash, "name": recipe_status.name, "version": recipe_status.version, "variable_inputs": len(recipe_status.variable_inputs), "fixed_inputs": len(recipe_status.fixed_inputs) } @app.get("/recipes") async def list_recipes_api(request: Request, page: int = 1, limit: int = 20): """List recipes. HTML for browsers, JSON for APIs.""" ctx = await get_user_context_from_cookie(request) all_recipes = await asyncio.to_thread(list_all_recipes) if wants_html(request): # HTML response if not ctx: return HTMLResponse(render_page( "Recipes", '

Not logged in.

', None, active_tab="recipes" )) # Filter to user's recipes user_recipes = [c for c in all_recipes if c.uploader in (ctx.username, ctx.actor_id)] total = len(user_recipes) if not user_recipes: content = '''

Recipes (0)

No recipes yet. Upload a recipe YAML file to get started.

''' return HTMLResponse(render_page("Recipes", content, ctx.actor_id, active_tab="recipes")) html_parts = [] for recipe in user_recipes: var_count = len(recipe.variable_inputs) fixed_count = len(recipe.fixed_inputs) input_info = [] if var_count: input_info.append(f"{var_count} variable") if fixed_count: input_info.append(f"{fixed_count} fixed") inputs_str = ", ".join(input_info) if input_info else "no inputs" html_parts.append(f'''
{recipe.name} v{recipe.version}
{inputs_str}
{recipe.description or "No description"}
{recipe.recipe_id[:24]}...
''') content = f'''

Recipes ({total})

{''.join(html_parts)}
''' return HTMLResponse(render_page("Recipes", content, ctx.actor_id, active_tab="recipes")) # JSON response for APIs total = len(all_recipes) start = (page - 1) * limit end = start + limit recipes_page = all_recipes[start:end] has_more = end < total return { "recipes": [c.model_dump() for c in recipes_page], "pagination": { "page": page, "limit": limit, "total": total, "has_more": has_more } } @app.get("/recipes/{recipe_id}") async def get_recipe_api(recipe_id: str): """Get recipe details.""" recipe = load_recipe(recipe_id) if not recipe: raise HTTPException(404, f"Recipe {recipe_id} not found") return recipe @app.delete("/recipes/{recipe_id}") async def remove_recipe(recipe_id: str, ctx: UserContext = Depends(get_required_user_context)): """Delete a recipe. Requires authentication.""" recipe = load_recipe(recipe_id) if not recipe: raise HTTPException(404, f"Recipe {recipe_id} not found") # Check ownership if recipe.uploader not in (ctx.username, ctx.actor_id): raise HTTPException(403, "Access denied") # Check if pinned pinned, reason = cache_manager.is_pinned(recipe_id) if pinned: raise HTTPException(400, f"Cannot delete pinned recipe: {reason}") # Delete from Redis and cache delete_recipe_from_redis(recipe_id) cache_manager.delete_by_content_hash(recipe_id) return {"deleted": True, "recipe_id": recipe_id} @app.post("/recipes/{recipe_id}/run") async def run_recipe(recipe_id: str, request: RecipeRunRequest, ctx: UserContext = Depends(get_required_user_context)): """Run a recipe with provided variable inputs. Requires authentication.""" recipe = await asyncio.to_thread(load_recipe, recipe_id) if not recipe: raise HTTPException(404, f"Recipe {recipe_id} not found") # Validate all required inputs are provided for var_input in recipe.variable_inputs: if var_input.required and var_input.node_id not in request.inputs: raise HTTPException(400, f"Missing required input: {var_input.name}") # Load recipe YAML recipe_path = await asyncio.to_thread(cache_manager.get_by_content_hash, recipe_id) if not recipe_path: raise HTTPException(500, "Recipe YAML not found in cache") with open(recipe_path) as f: yaml_config = yaml.safe_load(f) # Build DAG from recipe dag = build_dag_from_recipe(yaml_config, request.inputs, recipe) actor_id = ctx.actor_id # Collect all input hashes all_inputs = list(request.inputs.values()) for fixed in recipe.fixed_inputs: if fixed.content_hash: all_inputs.append(fixed.content_hash) # Compute content-addressable run_id run_id = compute_run_id(all_inputs, f"recipe:{recipe.name}") output_name = f"{recipe.name}-{run_id[:8]}" # Check L1 cache first cached_run = await database.get_run_cache(run_id) if cached_run: output_hash = cached_run["output_hash"] if cache_manager.has_content(output_hash): logger.info(f"run_recipe: Cache hit for run_id={run_id[:16]}...") return RunStatus( run_id=run_id, status="completed", recipe=f"recipe:{recipe.name}", inputs=all_inputs, output_name=output_name, created_at=cached_run.get("created_at", datetime.now(timezone.utc).isoformat()), completed_at=cached_run.get("created_at", datetime.now(timezone.utc).isoformat()), output_hash=output_hash, username=actor_id, provenance_cid=cached_run.get("provenance_cid"), ) # Check L2 if not in L1 l2_server = ctx.l2_server try: l2_resp = http_requests.get(f"{l2_server}/assets/by-run-id/{run_id}", timeout=10) if l2_resp.status_code == 200: l2_data = l2_resp.json() output_hash = l2_data.get("output_hash") ipfs_cid = l2_data.get("ipfs_cid") if output_hash and ipfs_cid: logger.info(f"run_recipe: Found on L2, pulling from IPFS") import ipfs_client legacy_dir = CACHE_DIR / "legacy" legacy_dir.mkdir(parents=True, exist_ok=True) recovery_path = legacy_dir / output_hash if ipfs_client.get_file(ipfs_cid, str(recovery_path)): cache_manager._set_content_index(output_hash, output_hash) cache_manager._set_ipfs_index(output_hash, ipfs_cid) await database.save_run_cache( run_id=run_id, output_hash=output_hash, recipe=f"recipe:{recipe.name}", inputs=all_inputs, ipfs_cid=ipfs_cid, provenance_cid=l2_data.get("provenance_cid"), actor_id=actor_id, ) return RunStatus( run_id=run_id, status="completed", recipe=f"recipe:{recipe.name}", inputs=all_inputs, output_name=output_name, created_at=datetime.now(timezone.utc).isoformat(), completed_at=datetime.now(timezone.utc).isoformat(), output_hash=output_hash, username=actor_id, provenance_cid=l2_data.get("provenance_cid"), ) except Exception as e: logger.warning(f"run_recipe: L2 lookup failed: {e}") # Not cached - run Celery run = RunStatus( run_id=run_id, status="pending", recipe=f"recipe:{recipe.name}", inputs=all_inputs, output_name=output_name, created_at=datetime.now(timezone.utc).isoformat(), username=actor_id ) # Submit to Celery dag_json = dag.to_json() task = execute_dag.delay(dag_json, run.run_id) run.celery_task_id = task.id run.status = "running" await asyncio.to_thread(save_run, run) return run def build_dag_from_recipe(yaml_config: dict, user_inputs: dict[str, str], recipe: RecipeStatus): """Build a DAG from recipe YAML with user-provided inputs.""" from artdag import DAG, Node dag = DAG() name_to_id = {} # Map YAML node names to content-addressed IDs registry = yaml_config.get("registry", {}) assets = registry.get("assets", {}) effects = registry.get("effects", {}) dag_config = yaml_config.get("dag", {}) nodes = dag_config.get("nodes", []) output_node = dag_config.get("output") # First pass: create all nodes and map names to IDs for node_def in nodes: node_name = node_def.get("id") node_type = node_def.get("type") node_config = node_def.get("config", {}) if node_type == "SOURCE": if node_config.get("input"): # Variable input - use user-provided hash content_hash = user_inputs.get(node_name) if not content_hash: raise HTTPException(400, f"Missing input for node {node_name}") node = Node( node_type="SOURCE", config={"content_hash": content_hash}, inputs=[], name=node_name ) else: # Fixed input - use registry hash asset_name = node_config.get("asset") asset_info = assets.get(asset_name, {}) content_hash = asset_info.get("hash") if not content_hash: raise HTTPException(400, f"Asset {asset_name} not found in registry") node = Node( node_type="SOURCE", config={"content_hash": content_hash}, inputs=[], name=node_name ) name_to_id[node_name] = node.node_id dag.add_node(node) # Second pass: create nodes with inputs (now we can resolve input names to IDs) for node_def in nodes: node_name = node_def.get("id") node_type = node_def.get("type") node_config = node_def.get("config", {}) input_names = node_def.get("inputs", []) # Skip SOURCE nodes (already added) if node_type == "SOURCE": continue # Resolve input names to content-addressed IDs input_ids = [name_to_id[name] for name in input_names if name in name_to_id] if node_type == "EFFECT": effect_name = node_config.get("effect") effect_info = effects.get(effect_name, {}) effect_hash = effect_info.get("hash") node = Node( node_type="EFFECT", config={"effect": effect_name, "effect_hash": effect_hash}, inputs=input_ids, name=node_name ) else: node = Node( node_type=node_type, config=node_config, inputs=input_ids, name=node_name ) name_to_id[node_name] = node.node_id dag.add_node(node) # Set output node if output_node and output_node in name_to_id: dag.set_output(name_to_id[output_node]) return dag # ============ Recipe UI Pages ============ @app.get("/recipe/{recipe_id}", response_class=HTMLResponse) async def recipe_detail_page(recipe_id: str, request: Request): """Recipe detail page with run form.""" ctx = await get_user_context_from_cookie(request) recipe = load_recipe(recipe_id) if not recipe: return HTMLResponse(render_page( "Recipe Not Found", f'

Recipe {recipe_id} not found.

', ctx.actor_id if ctx else None, active_tab="recipes" ), status_code=404) # Build variable inputs form var_inputs_html = "" if recipe.variable_inputs: var_inputs_html = '
' for var_input in recipe.variable_inputs: required = "required" if var_input.required else "" var_inputs_html += f'''

{var_input.description or 'Enter a content hash from your cache'}

''' var_inputs_html += '
' else: var_inputs_html = '

This recipe has no variable inputs - it uses fixed assets only.

' # Build fixed inputs display fixed_inputs_html = "" if recipe.fixed_inputs: fixed_inputs_html = '

Fixed Inputs

' # Check if pinned pinned, pin_reason = cache_manager.is_pinned(recipe_id) pinned_badge = "" if pinned: pinned_badge = f'Pinned: {pin_reason}' # Check if shared to L2 l2_shares = await database.get_l2_shares(recipe_id, ctx.actor_id if ctx else None) l2_link_html = "" if l2_shares: share = l2_shares[0] l2_server = share.get("l2_server", "").replace("http://", "https://") asset_name = share.get("asset_name", "") l2_link_html = f'View on L2' # Load recipe source YAML recipe_path = cache_manager.get_by_content_hash(recipe_id) recipe_source = "" if recipe_path and recipe_path.exists(): try: recipe_source = recipe_path.read_text() except Exception: recipe_source = "(Could not load recipe source)" # Escape HTML in source for display import html recipe_source_escaped = html.escape(recipe_source) content = f'''
← Back to recipes

{recipe.name}

v{recipe.version} {pinned_badge} {l2_link_html} View DAG

{recipe.description or 'No description'}

{recipe.recipe_id}
{fixed_inputs_html}

Recipe Source

{recipe_source_escaped}

Run this Recipe

{var_inputs_html}
''' return HTMLResponse(render_page(f"Recipe: {recipe.name}", content, ctx.actor_id if ctx else None, active_tab="recipes")) @app.get("/recipe/{recipe_id}/dag", response_class=HTMLResponse) async def recipe_dag_visualization(recipe_id: str, request: Request): """Visualize recipe structure as DAG.""" ctx = await get_user_context_from_cookie(request) recipe = load_recipe(recipe_id) if not recipe: return HTMLResponse(render_page_with_cytoscape( "Recipe Not Found", f'

Recipe {recipe_id} not found.

', ctx.actor_id if ctx else None, active_tab="recipes" ), status_code=404) # Load recipe YAML recipe_path = cache_manager.get_by_content_hash(recipe_id) if not recipe_path or not recipe_path.exists(): return HTMLResponse(render_page_with_cytoscape( "Recipe Not Found", '

Recipe file not found in cache.

', ctx.actor_id if ctx else None, active_tab="recipes" ), status_code=404) try: recipe_yaml = recipe_path.read_text() config = yaml.safe_load(recipe_yaml) except Exception as e: return HTMLResponse(render_page_with_cytoscape( "Error", f'

Failed to parse recipe: {e}

', ctx.actor_id if ctx else None, active_tab="recipes" ), status_code=500) dag_config = config.get("dag", {}) dag_nodes = dag_config.get("nodes", []) output_node = dag_config.get("output") # Build Cytoscape nodes and edges nodes = [] edges = [] for node_def in dag_nodes: node_id = node_def.get("id", "") node_type = node_def.get("type", "EFFECT") node_config = node_def.get("config", {}) input_names = node_def.get("inputs", []) # Determine if this is the output node is_output = node_id == output_node if is_output: color = NODE_COLORS.get("OUTPUT", NODE_COLORS["default"]) else: color = NODE_COLORS.get(node_type, NODE_COLORS["default"]) # Get effect name if it's an effect node label = node_id if node_type == "EFFECT" and "effect" in node_config: label = node_config["effect"] nodes.append({ "data": { "id": node_id, "label": label, "nodeType": node_type, "isOutput": is_output, "color": color, "config": node_config } }) # Create edges from inputs for input_name in input_names: edges.append({ "data": { "source": input_name, "target": node_id } }) nodes_json = json.dumps(nodes) edges_json = json.dumps(edges) dag_html = render_dag_cytoscape(nodes_json, edges_json) content = f'''
← Back to recipe

{recipe.name}

v{recipe.version}

{recipe.description or 'No description'}

DAG Structure

SOURCE EFFECT OUTPUT

Click on a node to see its configuration. The purple-bordered node is the output.

{dag_html}
''' return HTMLResponse(render_page_with_cytoscape(f"DAG: {recipe.name}", content, ctx.actor_id if ctx else None, active_tab="recipes")) @app.post("/ui/recipes/{recipe_id}/run", response_class=HTMLResponse) async def ui_run_recipe(recipe_id: str, request: Request): """HTMX handler: run a recipe with form inputs.""" ctx = await get_user_context_from_cookie(request) if not ctx: return '
Login required
' recipe = load_recipe(recipe_id) if not recipe: return '
Recipe not found
' # Parse form data form_data = await request.form() inputs = {} for var_input in recipe.variable_inputs: value = form_data.get(var_input.node_id, "").strip() if var_input.required and not value: return f'
Missing required input: {var_input.name}
' if value: inputs[var_input.node_id] = value # Load recipe YAML recipe_path = cache_manager.get_by_content_hash(recipe_id) if not recipe_path: return '
Recipe YAML not found in cache
' try: with open(recipe_path) as f: yaml_config = yaml.safe_load(f) # Build DAG from recipe dag = build_dag_from_recipe(yaml_config, inputs, recipe) # Create run run_id = str(uuid.uuid4()) actor_id = ctx.actor_id # Collect all input hashes all_inputs = list(inputs.values()) for fixed in recipe.fixed_inputs: if fixed.content_hash: all_inputs.append(fixed.content_hash) run = RunStatus( run_id=run_id, status="pending", recipe=f"recipe:{recipe.name}", inputs=all_inputs, output_name=f"{recipe.name}-{run_id[:8]}", created_at=datetime.now(timezone.utc).isoformat(), username=actor_id ) # Submit to Celery dag_json = dag.to_json() task = execute_dag.delay(dag_json, run.run_id) run.celery_task_id = task.id run.status = "running" save_run(run) return f'''
Run started! View run
''' except Exception as e: return f'
Error: {str(e)}
' @app.get("/ui/recipes-list", response_class=HTMLResponse) async def ui_recipes_list(request: Request): """HTMX partial: list of recipes.""" ctx = await get_user_context_from_cookie(request) if not ctx: return '

Not logged in.

' all_recipes = list_all_recipes() # Filter to user's recipes user_recipes = [c for c in all_recipes if c.uploader in (ctx.username, ctx.actor_id)] if not user_recipes: return '

No recipes yet. Upload a recipe YAML file to get started.

' html_parts = ['
'] for recipe in user_recipes: var_count = len(recipe.variable_inputs) fixed_count = len(recipe.fixed_inputs) input_info = [] if var_count: input_info.append(f"{var_count} variable") if fixed_count: input_info.append(f"{fixed_count} fixed") inputs_str = ", ".join(input_info) if input_info else "no inputs" html_parts.append(f'''
{recipe.name} v{recipe.version}
{inputs_str}
{recipe.description or "No description"}
{recipe.recipe_id[:24]}...
''') html_parts.append('
') return '\n'.join(html_parts) @app.delete("/ui/recipes/{recipe_id}/discard", response_class=HTMLResponse) async def ui_discard_recipe(recipe_id: str, request: Request): """HTMX handler: discard a recipe.""" ctx = await get_user_context_from_cookie(request) if not ctx: return '
Login required
' recipe = load_recipe(recipe_id) if not recipe: return '
Recipe not found
' # Check ownership if recipe.uploader not in (ctx.username, ctx.actor_id): return '
Access denied
' # Check if pinned pinned, reason = cache_manager.is_pinned(recipe_id) if pinned: return f'
Cannot delete: recipe is pinned ({reason})
' # Delete from Redis and cache delete_recipe_from_redis(recipe_id) cache_manager.delete_by_content_hash(recipe_id) return '''
Recipe deleted. Back to recipes
''' @app.get("/cache/{content_hash}") async def get_cached(content_hash: str, request: Request): """Get cached content by hash. Content negotiation: HTML for browsers, JSON for APIs, file for downloads.""" start = time.time() accept = request.headers.get("accept", "") logger.info(f"get_cached: {content_hash[:16]}... Accept={accept[:50]}") ctx = await get_user_context_from_cookie(request) cache_path = get_cache_path(content_hash) if not cache_path: logger.info(f"get_cached: Not found, took {time.time()-start:.3f}s") if wants_html(request): content = f'

Content not found: {content_hash}

' return HTMLResponse(render_page("Not Found", content, ctx.actor_id if ctx else None, active_tab="media"), status_code=404) raise HTTPException(404, f"Content {content_hash} not in cache") # JSON response only if explicitly requested if "application/json" in accept and "text/html" not in accept: t0 = time.time() meta = await database.load_item_metadata(content_hash, ctx.actor_id if ctx else None) logger.debug(f"get_cached: load_item_metadata took {time.time()-t0:.3f}s") t0 = time.time() cache_item = await database.get_cache_item(content_hash) logger.debug(f"get_cached: get_cache_item took {time.time()-t0:.3f}s") ipfs_cid = cache_item.get("ipfs_cid") if cache_item else None file_size = cache_path.stat().st_size # Use stored type from metadata, fall back to auto-detection stored_type = meta.get("type") if meta else None if stored_type == "recipe": media_type = "recipe" else: media_type = detect_media_type(cache_path) logger.info(f"get_cached: JSON response, ipfs_cid={ipfs_cid[:16] if ipfs_cid else 'None'}..., took {time.time()-start:.3f}s") return { "content_hash": content_hash, "size": file_size, "media_type": media_type, "ipfs_cid": ipfs_cid, "meta": meta } # HTML response for browsers (default for all non-JSON requests) # Raw data is only served from /cache/{hash}/raw endpoint if True: # Always show HTML page, raw data via /raw endpoint if not ctx: content = '

Not logged in.

' return HTMLResponse(render_page("Login Required", content, None, active_tab="media"), status_code=401) # Check user has access user_hashes = await get_user_cache_hashes(ctx.username, ctx.actor_id) if content_hash not in user_hashes: content = '

Access denied.

' return HTMLResponse(render_page("Access Denied", content, ctx.actor_id, active_tab="media"), status_code=403) media_type = detect_media_type(cache_path) file_size = cache_path.stat().st_size size_str = f"{file_size:,} bytes" if file_size > 1024*1024: size_str = f"{file_size/(1024*1024):.1f} MB" elif file_size > 1024: size_str = f"{file_size/1024:.1f} KB" # Get IPFS CID from database cache_item = await database.get_cache_item(content_hash) ipfs_cid = cache_item.get("ipfs_cid") if cache_item else None # Build media display HTML if media_type == "video": video_src = video_src_for_request(content_hash, request) media_html = f'' elif media_type == "image": media_html = f'{content_hash}' else: media_html = f'

Unknown file type. Download file

' content = f''' Back to media
{media_type.capitalize()} {content_hash[:24]}...
Download
{media_html}

Details

Content Hash (SHA3-256)
{content_hash}
Type
{media_type}
Size
{size_str}
''' # Add IPFS section if we have a CID if ipfs_cid: gateway_links = [] if IPFS_GATEWAY_URL: gateway_links.append(f''' Local Gateway ''') gateway_links.extend([ f''' ipfs.io ''', f''' dweb.link ''', f''' Cloudflare ''', ]) gateways_html = '\n'.join(gateway_links) content += f'''

IPFS

Content Identifier (CID)
{ipfs_cid}
Gateways:
{gateways_html}
''' else: content += '''

IPFS

Not yet uploaded to IPFS
''' content += f'''
Loading metadata...
''' return HTMLResponse(render_page(f"Cache: {content_hash[:16]}...", content, ctx.actor_id, active_tab="media")) @app.get("/cache/{content_hash}/raw") async def get_cached_raw(content_hash: str): """Get raw cached content (file download).""" cache_path = get_cache_path(content_hash) if not cache_path: raise HTTPException(404, f"Content {content_hash} not in cache") # Detect media type and set appropriate content-type and filename media_type_name = detect_media_type(cache_path) if media_type_name == "video": # Check actual format with open(cache_path, "rb") as f: header = f.read(12) if header[:4] == b'\x1a\x45\xdf\xa3': # WebM/MKV return FileResponse(cache_path, media_type="video/x-matroska", filename=f"{content_hash}.mkv") elif header[4:8] == b'ftyp': # MP4 return FileResponse(cache_path, media_type="video/mp4", filename=f"{content_hash}.mp4") return FileResponse(cache_path, media_type="video/mp4", filename=f"{content_hash}.mp4") elif media_type_name == "image": with open(cache_path, "rb") as f: header = f.read(8) if header[:8] == b'\x89PNG\r\n\x1a\n': return FileResponse(cache_path, media_type="image/png", filename=f"{content_hash}.png") elif header[:2] == b'\xff\xd8': return FileResponse(cache_path, media_type="image/jpeg", filename=f"{content_hash}.jpg") return FileResponse(cache_path, media_type="image/jpeg", filename=f"{content_hash}.jpg") return FileResponse(cache_path, filename=f"{content_hash}.bin") @app.get("/cache/{content_hash}/mp4") async def get_cached_mp4(content_hash: str): """Get cached content as MP4 (transcodes MKV on first request, caches result).""" cache_path = get_cache_path(content_hash) if not cache_path: raise HTTPException(404, f"Content {content_hash} not in cache") # MP4 transcodes stored alongside original in CACHE_DIR mp4_path = CACHE_DIR / f"{content_hash}.mp4" # If MP4 already cached, serve it if mp4_path.exists(): return FileResponse(mp4_path, media_type="video/mp4") # Check if source is already MP4 media_type = detect_media_type(cache_path) if media_type != "video": raise HTTPException(400, "Content is not a video") # Check if already MP4 format import subprocess try: result = subprocess.run( ["ffprobe", "-v", "error", "-select_streams", "v:0", "-show_entries", "format=format_name", "-of", "csv=p=0", str(cache_path)], capture_output=True, text=True, timeout=10 ) if "mp4" in result.stdout.lower() or "mov" in result.stdout.lower(): # Already MP4-compatible, just serve original return FileResponse(cache_path, media_type="video/mp4") except Exception: pass # Continue with transcoding # Transcode to MP4 (H.264 + AAC) transcode_path = CACHE_DIR / f"{content_hash}.transcoding.mp4" try: result = subprocess.run( ["ffmpeg", "-y", "-i", str(cache_path), "-c:v", "libx264", "-preset", "fast", "-crf", "23", "-c:a", "aac", "-b:a", "128k", "-movflags", "+faststart", str(transcode_path)], capture_output=True, text=True, timeout=600 # 10 min timeout ) if result.returncode != 0: raise HTTPException(500, f"Transcoding failed: {result.stderr[:200]}") # Move to final location transcode_path.rename(mp4_path) except subprocess.TimeoutExpired: if transcode_path.exists(): transcode_path.unlink() raise HTTPException(500, "Transcoding timed out") except Exception as e: if transcode_path.exists(): transcode_path.unlink() raise HTTPException(500, f"Transcoding failed: {e}") return FileResponse(mp4_path, media_type="video/mp4") @app.get("/cache/{content_hash}/meta-form", response_class=HTMLResponse) async def cache_meta_form(content_hash: str, request: Request): """Clean URL redirect to the HTMX meta form.""" # Just redirect to the old endpoint for now from starlette.responses import RedirectResponse return RedirectResponse(f"/ui/cache/{content_hash}/meta-form", status_code=302) @app.get("/ui/cache/{content_hash}/meta-form", response_class=HTMLResponse) async def ui_cache_meta_form(content_hash: str, request: Request): """HTMX partial: metadata editing form for a cached item.""" ctx = await get_user_context_from_cookie(request) if not ctx: return '
Login required to edit metadata
' # Check ownership user_hashes = await get_user_cache_hashes(ctx.username, ctx.actor_id) if content_hash not in user_hashes: return '
Access denied
' # Load metadata meta = await database.load_item_metadata(content_hash, ctx.actor_id) origin = meta.get("origin", {}) origin_type = origin.get("type", "") origin_url = origin.get("url", "") origin_note = origin.get("note", "") description = meta.get("description", "") tags = meta.get("tags", []) tags_str = ", ".join(tags) if tags else "" l2_shares = meta.get("l2_shares", []) pinned = meta.get("pinned", False) pin_reason = meta.get("pin_reason", "") # Detect media type for publish cache_path = get_cache_path(content_hash) media_type = detect_media_type(cache_path) if cache_path else "unknown" asset_type = "video" if media_type == "video" else "image" # Origin radio checked states self_checked = 'checked' if origin_type == "self" else '' external_checked = 'checked' if origin_type == "external" else '' # Build publish section - show list of L2 shares if l2_shares: shares_html = "" for share in l2_shares: l2_server = share.get("l2_server", "Unknown") asset_name = share.get("asset_name", "") published_at = share.get("published_at", "")[:10] if share.get("published_at") else "" last_synced = share.get("last_synced_at", "")[:10] if share.get("last_synced_at") else "" asset_url = f"{l2_server}/assets/{asset_name}" shares_html += f'''
{asset_name}
{l2_server}
Published: {published_at}
Synced: {last_synced}
''' publish_html = f'''
Published to L2 ({len(l2_shares)} share{"s" if len(l2_shares) != 1 else ""})
{shares_html}
''' else: # Show publish form only if origin is set if origin_type: publish_html = f'''
''' else: publish_html = '''
Set an origin (self or external URL) before publishing.
''' return f'''

Metadata

Comma-separated list

Publish to L2 (ActivityPub)

{publish_html}

Status

Pinned: {'Yes' if pinned else 'No'} {f'({pin_reason})' if pinned and pin_reason else ''}

Pinned items cannot be discarded. Items are pinned when published or used as inputs to published content.

{'

Cannot discard pinned items.

' if pinned else f""" """}
''' @app.patch("/ui/cache/{content_hash}/meta", response_class=HTMLResponse) async def ui_update_cache_meta(content_hash: str, request: Request): """HTMX handler: update cache metadata from form.""" ctx = await get_user_context_from_cookie(request) if not ctx: return '
Login required
' # Check ownership user_hashes = await get_user_cache_hashes(ctx.username, ctx.actor_id) if content_hash not in user_hashes: return '
Access denied
' # Parse form data form = await request.form() origin_type = form.get("origin_type", "") origin_url = form.get("origin_url", "").strip() origin_note = form.get("origin_note", "").strip() description = form.get("description", "").strip() tags_str = form.get("tags", "").strip() # Build origin source_type = None if origin_type == "self": source_type = "self" elif origin_type == "external": if not origin_url: return '
External origin requires a URL
' source_type = "external" # Parse tags tags = [t.strip() for t in tags_str.split(",") if t.strip()] if tags_str else [] # Save to database await database.update_item_metadata( content_hash=content_hash, actor_id=ctx.actor_id, item_type="media", description=description if description else None, source_type=source_type, source_url=origin_url if origin_url else None, source_note=origin_note if origin_note else None, tags=tags ) return '
Metadata saved!
' @app.post("/ui/cache/{content_hash}/publish", response_class=HTMLResponse) async def ui_publish_cache(content_hash: str, request: Request): """HTMX handler: publish cache item to L2.""" ctx = await get_user_context_from_cookie(request) if not ctx: return '
Login required
' token = request.cookies.get("auth_token") if not token: return '
Auth token required
' # Check ownership user_hashes = await get_user_cache_hashes(ctx.username, ctx.actor_id) if content_hash not in user_hashes: return '
Access denied
' # Parse form form = await request.form() asset_name = form.get("asset_name", "").strip() asset_type = form.get("asset_type", "image") if not asset_name: return '
Asset name required
' # Load metadata from database meta = await database.load_item_metadata(content_hash, ctx.actor_id) origin = meta.get("origin") if not origin or "type" not in origin: return '
Set origin before publishing
' # Get IPFS CID from cache item cache_item = await database.get_cache_item(content_hash) ipfs_cid = cache_item.get("ipfs_cid") if cache_item else None # Call the L2 server from user's context l2_server = ctx.l2_server try: resp = http_requests.post( f"{l2_server}/assets/publish-cache", headers={"Authorization": f"Bearer {token}"}, json={ "content_hash": content_hash, "ipfs_cid": ipfs_cid, "asset_name": asset_name, "asset_type": asset_type, "origin": origin, "description": meta.get("description"), "tags": meta.get("tags", []), "metadata": { "filename": meta.get("filename"), "folder": meta.get("folder"), "collections": meta.get("collections", []) } }, timeout=30 ) resp.raise_for_status() except http_requests.exceptions.HTTPError as e: error_detail = "" try: error_detail = e.response.json().get("detail", str(e)) except Exception: error_detail = str(e) return f'
Error: {error_detail}
' except Exception as e: return f'
Error: {e}
' # Save L2 share to database and pin the item await database.save_l2_share( content_hash=content_hash, actor_id=ctx.actor_id, l2_server=l2_server, asset_name=asset_name, content_type=asset_type ) await database.update_item_metadata( content_hash=content_hash, actor_id=ctx.actor_id, pinned=True, pin_reason="published" ) # Use HTTPS for L2 links l2_https = l2_server.replace("http://", "https://") return f'''
Published to L2 as {asset_name}! View on L2
''' @app.patch("/ui/cache/{content_hash}/republish", response_class=HTMLResponse) async def ui_republish_cache(content_hash: str, request: Request): """HTMX handler: re-publish (update) cache item on L2.""" ctx = await get_user_context_from_cookie(request) if not ctx: return '
Login required
' token = request.cookies.get("auth_token") if not token: return '
Auth token required
' # Check ownership user_hashes = await get_user_cache_hashes(ctx.username, ctx.actor_id) if content_hash not in user_hashes: return '
Access denied
' # Load metadata meta = await database.load_item_metadata(content_hash, ctx.actor_id) l2_shares = meta.get("l2_shares", []) # Find share for current L2 server (user's L2) l2_server = ctx.l2_server current_share = None share_index = -1 for i, share in enumerate(l2_shares): if share.get("l2_server") == l2_server: current_share = share share_index = i break if not current_share: return '
Item not published to this L2 yet
' asset_name = current_share.get("asset_name") if not asset_name: return '
No asset name found
' # Call L2 update try: resp = http_requests.patch( f"{l2_server}/assets/{asset_name}", headers={"Authorization": f"Bearer {token}"}, json={ "description": meta.get("description"), "tags": meta.get("tags"), "origin": meta.get("origin"), "metadata": { "filename": meta.get("filename"), "folder": meta.get("folder"), "collections": meta.get("collections", []) } }, timeout=30 ) resp.raise_for_status() except http_requests.exceptions.HTTPError as e: error_detail = "" try: error_detail = e.response.json().get("detail", str(e)) except Exception: error_detail = str(e) return f'
Error: {error_detail}
' except Exception as e: return f'
Error: {e}
' # Update local metadata - save_l2_share updates last_synced_at on conflict await database.save_l2_share( content_hash=content_hash, actor_id=ctx.actor_id, l2_server=l2_server, asset_name=asset_name, content_type=current_share.get("content_type", "media") ) return '
Updated on L2!
' @app.get("/media") async def list_media( request: Request, page: int = 1, limit: int = 20, folder: Optional[str] = None, collection: Optional[str] = None, tag: Optional[str] = None ): """List media items. HTML for browsers (with infinite scroll), JSON for APIs (with pagination).""" ctx = await get_user_context_from_cookie(request) if wants_html(request): # Require login for HTML media view if not ctx: content = '

Not logged in.

' return HTMLResponse(render_page("Media", content, None, active_tab="media")) # Get hashes owned by/associated with this user user_hashes = await get_user_cache_hashes(ctx.username, ctx.actor_id) # Get cache items that belong to the user (from cache_manager) cache_items = [] seen_hashes = set() # Deduplicate by content_hash for cached_file in cache_manager.list_all(): content_hash = cached_file.content_hash if content_hash not in user_hashes: continue # Skip duplicates (same content from multiple runs) if content_hash in seen_hashes: continue seen_hashes.add(content_hash) # Skip recipes - they have their own section if cached_file.node_type == "recipe": continue meta = await database.load_item_metadata(content_hash, ctx.actor_id) # Apply folder filter if folder: item_folder = meta.get("folder", "/") if folder != "/" and not item_folder.startswith(folder): continue if folder == "/" and item_folder != "/": continue # Apply collection filter if collection: if collection not in meta.get("collections", []): continue # Apply tag filter if tag: if tag not in meta.get("tags", []): continue cache_items.append({ "hash": content_hash, "size": cached_file.size_bytes, "mtime": cached_file.created_at, "meta": meta }) # Sort by modification time (newest first) cache_items.sort(key=lambda x: x["mtime"], reverse=True) total = len(cache_items) # Pagination start = (page - 1) * limit end = start + limit items_page = cache_items[start:end] has_more = end < total if not items_page: if page == 1: filter_msg = "" if folder: filter_msg = f" in folder {folder}" elif collection: filter_msg = f" in collection '{collection}'" elif tag: filter_msg = f" with tag '{tag}'" content = f'

No media{filter_msg}. Upload files or run effects to see them here.

' else: return HTMLResponse("") # Empty for infinite scroll else: html_parts = [] for item in items_page: content_hash = item["hash"] cache_path = get_cache_path(content_hash) media_type = detect_media_type(cache_path) if cache_path else "unknown" # Format size size = item["size"] if size > 1024*1024: size_str = f"{size/(1024*1024):.1f} MB" elif size > 1024: size_str = f"{size/1024:.1f} KB" else: size_str = f"{size} bytes" html_parts.append(f'''
{media_type} {size_str}
{content_hash[:24]}...
''') if media_type == "video": video_src = video_src_for_request(content_hash, request) html_parts.append(f'') elif media_type == "image": html_parts.append(f'{content_hash[:16]}') else: html_parts.append('

Unknown file type

') html_parts.append('
') # For infinite scroll, just return cards if not first page if page > 1: if has_more: query_params = f"page={page + 1}" if folder: query_params += f"&folder={folder}" if collection: query_params += f"&collection={collection}" if tag: query_params += f"&tag={tag}" html_parts.append(f'''

Loading more...

''') return HTMLResponse('\n'.join(html_parts)) # First page - full content infinite_scroll_trigger = "" if has_more: query_params = "page=2" if folder: query_params += f"&folder={folder}" if collection: query_params += f"&collection={collection}" if tag: query_params += f"&tag={tag}" infinite_scroll_trigger = f'''

Loading more...

''' content = f'''

Media ({total} items)

{''.join(html_parts)} {infinite_scroll_trigger}
''' return HTMLResponse(render_page("Media", content, ctx.actor_id, active_tab="media")) # JSON response for APIs - list all hashes with optional pagination all_hashes = [cf.content_hash for cf in cache_manager.list_all()] total = len(all_hashes) start = (page - 1) * limit end = start + limit hashes_page = all_hashes[start:end] has_more = end < total return { "hashes": hashes_page, "pagination": { "page": page, "limit": limit, "total": total, "has_more": has_more } } @app.delete("/cache/{content_hash}") async def discard_cache(content_hash: str, ctx: UserContext = Depends(get_required_user_context)): """ Discard (delete) a cached item. Enforces deletion rules: - Cannot delete items published to L2 (shared) - Cannot delete inputs/outputs of activities (runs) - Cannot delete pinned items """ # Check if content exists if not cache_manager.has_content(content_hash): raise HTTPException(404, "Content not found") # Check ownership user_hashes = await get_user_cache_hashes(ctx.username, ctx.actor_id) if content_hash not in user_hashes: raise HTTPException(403, "Access denied") # Check if pinned meta = await database.load_item_metadata(content_hash, ctx.actor_id) if meta.get("pinned"): pin_reason = meta.get("pin_reason", "unknown") raise HTTPException(400, f"Cannot discard pinned item (reason: {pin_reason})") # Check if used by any run (Redis runs, not just activity store) runs_using = await asyncio.to_thread(find_runs_using_content, content_hash) if runs_using: run, role = runs_using[0] raise HTTPException(400, f"Cannot discard: item is {role} of run {run.run_id}") # Check deletion rules via cache_manager (L2 shared status, activity store) can_delete, reason = await asyncio.to_thread(cache_manager.can_delete, content_hash) if not can_delete: raise HTTPException(400, f"Cannot discard: {reason}") # Delete via cache_manager success, msg = await asyncio.to_thread(cache_manager.delete_by_content_hash, content_hash) if not success: # Fallback to legacy deletion cache_path = get_cache_path(content_hash) if cache_path and cache_path.exists(): cache_path.unlink() # Clean up legacy metadata files meta_path = CACHE_DIR / f"{content_hash}.meta.json" if meta_path.exists(): meta_path.unlink() mp4_path = CACHE_DIR / f"{content_hash}.mp4" if mp4_path.exists(): mp4_path.unlink() return {"discarded": True, "content_hash": content_hash} @app.delete("/ui/cache/{content_hash}/discard", response_class=HTMLResponse) async def ui_discard_cache(content_hash: str, request: Request): """HTMX handler: discard a cached item.""" ctx = await get_user_context_from_cookie(request) if not ctx: return '
Login required
' # Check ownership user_hashes = await get_user_cache_hashes(ctx.username, ctx.actor_id) if content_hash not in user_hashes: return '
Access denied
' # Check if content exists has_content = await asyncio.to_thread(cache_manager.has_content, content_hash) if not has_content: return '
Content not found
' # Check if pinned meta = await database.load_item_metadata(content_hash, ctx.actor_id) if meta.get("pinned"): pin_reason = meta.get("pin_reason", "unknown") return f'
Cannot discard: item is pinned ({pin_reason})
' # Check if used by any run (Redis runs, not just activity store) runs_using = await asyncio.to_thread(find_runs_using_content, content_hash) if runs_using: run, role = runs_using[0] return f'
Cannot discard: item is {role} of run {run.run_id}
' # Check deletion rules via cache_manager (L2 shared status, activity store) can_delete, reason = await asyncio.to_thread(cache_manager.can_delete, content_hash) if not can_delete: return f'
Cannot discard: {reason}
' # Delete via cache_manager success, msg = await asyncio.to_thread(cache_manager.delete_by_content_hash, content_hash) if not success: # Fallback to legacy deletion cache_path = get_cache_path(content_hash) if cache_path and cache_path.exists(): cache_path.unlink() # Clean up legacy metadata files meta_path = CACHE_DIR / f"{content_hash}.meta.json" if meta_path.exists(): meta_path.unlink() mp4_path = CACHE_DIR / f"{content_hash}.mp4" if mp4_path.exists(): mp4_path.unlink() return '''
Item discarded. Back to media
''' # Known assets (bootstrap data) KNOWN_ASSETS = { "cat": "33268b6e167deaf018cc538de12dbe562612b33e89a749391cef855b320a269b", } @app.get("/assets") async def list_assets(): """List known assets.""" return KNOWN_ASSETS @app.post("/cache/import") async def import_to_cache(path: str): """Import a local file to cache.""" source = Path(path) if not source.exists(): raise HTTPException(404, f"File not found: {path}") content_hash = await cache_file(source) return {"content_hash": content_hash, "cached": True} def save_cache_meta(content_hash: str, uploader: str = None, filename: str = None, **updates): """Save or update metadata for a cached file.""" meta_path = CACHE_DIR / f"{content_hash}.meta.json" # Load existing or create new if meta_path.exists(): with open(meta_path) as f: meta = json.load(f) else: meta = { "uploader": uploader, "uploaded_at": datetime.now(timezone.utc).isoformat(), "filename": filename } # Apply updates (but never change uploader or uploaded_at) for key, value in updates.items(): if key not in ("uploader", "uploaded_at"): meta[key] = value with open(meta_path, "w") as f: json.dump(meta, f, indent=2) return meta def load_cache_meta(content_hash: str) -> dict: """Load metadata for a cached file.""" meta_path = CACHE_DIR / f"{content_hash}.meta.json" if meta_path.exists(): with open(meta_path) as f: return json.load(f) return {} # User data storage (folders, collections) USER_DATA_DIR = CACHE_DIR / ".user-data" def load_user_data(username: str) -> dict: """Load user's folders and collections.""" USER_DATA_DIR.mkdir(parents=True, exist_ok=True) # Normalize username (remove @ prefix if present) safe_name = username.replace("@", "").replace("/", "_") user_file = USER_DATA_DIR / f"{safe_name}.json" if user_file.exists(): with open(user_file) as f: return json.load(f) return {"folders": ["/"], "collections": []} def save_user_data(username: str, data: dict): """Save user's folders and collections.""" USER_DATA_DIR.mkdir(parents=True, exist_ok=True) safe_name = username.replace("@", "").replace("/", "_") user_file = USER_DATA_DIR / f"{safe_name}.json" with open(user_file, "w") as f: json.dump(data, f, indent=2) async def get_user_cache_hashes(username: str, actor_id: Optional[str] = None) -> set: """Get all cache hashes owned by or associated with a user. username: The plain username actor_id: The full actor ID (@user@server), if available """ # Match against both formats for backwards compatibility match_values = [username] if actor_id: match_values.append(actor_id) hashes = set() # Query database for items owned by user (new system) if actor_id: try: db_items = await database.get_user_items(actor_id) for item in db_items: hashes.add(item["content_hash"]) except Exception: pass # Database may not be initialized # Legacy: Files uploaded by user (JSON metadata) if CACHE_DIR.exists(): for f in CACHE_DIR.iterdir(): if f.name.endswith('.meta.json'): try: meta_path = CACHE_DIR / f.name if meta_path.exists(): import json with open(meta_path, 'r') as mf: meta = json.load(mf) if meta.get("uploader") in match_values: hashes.add(f.name.replace('.meta.json', '')) except Exception: pass # Files from user's runs (inputs and outputs) for run in list_all_runs(): if run.username in match_values: hashes.update(run.inputs) if run.output_hash: hashes.add(run.output_hash) return hashes @app.post("/cache/upload") async def upload_to_cache(file: UploadFile = File(...), ctx: UserContext = Depends(get_required_user_context)): """Upload a file to cache. Requires authentication.""" # Write to temp file first import tempfile with tempfile.NamedTemporaryFile(delete=False) as tmp: content = await file.read() tmp.write(content) tmp_path = Path(tmp.name) # Store in cache via cache_manager cached, ipfs_cid = cache_manager.put(tmp_path, node_type="upload", move=True) content_hash = cached.content_hash # Save to cache_items table (with IPFS CID) await database.create_cache_item(content_hash, ipfs_cid) # Save uploader metadata to database await database.save_item_metadata( content_hash=content_hash, actor_id=ctx.actor_id, item_type="media", filename=file.filename ) return {"content_hash": content_hash, "filename": file.filename, "size": len(content)} class CacheMetaUpdate(BaseModel): """Request to update cache metadata.""" origin: Optional[dict] = None # {"type": "self"|"external", "url": "...", "note": "..."} description: Optional[str] = None tags: Optional[list[str]] = None folder: Optional[str] = None collections: Optional[list[str]] = None class PublishRequest(BaseModel): """Request to publish a cache item to L2.""" asset_name: str asset_type: str = "image" # image, video, etc. class AddStorageRequest(BaseModel): """Request to add a storage provider.""" provider_type: str # 'pinata', 'web3storage', 'local', etc. provider_name: Optional[str] = None # User-friendly name config: dict # Provider-specific config (api_key, path, etc.) capacity_gb: int # Storage capacity in GB class UpdateStorageRequest(BaseModel): """Request to update a storage provider.""" config: Optional[dict] = None capacity_gb: Optional[int] = None is_active: Optional[bool] = None @app.get("/cache/{content_hash}/meta") async def get_cache_meta(content_hash: str, ctx: UserContext = Depends(get_required_user_context)): """Get metadata for a cached file.""" # Check file exists cache_path = get_cache_path(content_hash) if not cache_path: raise HTTPException(404, "Content not found") # Check ownership user_hashes = await get_user_cache_hashes(ctx.username, ctx.actor_id) if content_hash not in user_hashes: raise HTTPException(403, "Access denied") return await database.load_item_metadata(content_hash, ctx.actor_id) @app.patch("/cache/{content_hash}/meta") async def update_cache_meta(content_hash: str, update: CacheMetaUpdate, ctx: UserContext = Depends(get_required_user_context)): """Update metadata for a cached file.""" # Check file exists cache_path = get_cache_path(content_hash) if not cache_path: raise HTTPException(404, "Content not found") # Check ownership user_hashes = await get_user_cache_hashes(ctx.username, ctx.actor_id) if content_hash not in user_hashes: raise HTTPException(403, "Access denied") # Build update dict from non-None fields updates = {} if update.origin is not None: updates["origin"] = update.origin if update.description is not None: updates["description"] = update.description if update.tags is not None: updates["tags"] = update.tags if update.folder is not None: # Ensure folder exists in user's folder list user_data = load_user_data(ctx.username) if update.folder not in user_data["folders"]: raise HTTPException(400, f"Folder does not exist: {update.folder}") updates["folder"] = update.folder if update.collections is not None: # Validate collections exist user_data = load_user_data(ctx.username) existing = {c["name"] for c in user_data["collections"]} for col in update.collections: if col not in existing: raise HTTPException(400, f"Collection does not exist: {col}") updates["collections"] = update.collections meta = await database.update_item_metadata(content_hash, ctx.actor_id, **updates) return meta @app.post("/cache/{content_hash}/publish") async def publish_cache_to_l2( content_hash: str, req: PublishRequest, request: Request, ctx: UserContext = Depends(get_required_user_context) ): """ Publish a cache item to L2 (ActivityPub). Requires origin to be set in metadata before publishing. """ # Check file exists cache_path = get_cache_path(content_hash) if not cache_path: raise HTTPException(404, "Content not found") # Check ownership user_hashes = await get_user_cache_hashes(ctx.username, ctx.actor_id) if content_hash not in user_hashes: raise HTTPException(403, "Access denied") # Load metadata meta = await database.load_item_metadata(content_hash, ctx.actor_id) # Check origin is set origin = meta.get("origin") if not origin or "type" not in origin: raise HTTPException(400, "Origin must be set before publishing. Use --origin self or --origin-url ") # Get IPFS CID from cache item cache_item = await database.get_cache_item(content_hash) ipfs_cid = cache_item.get("ipfs_cid") if cache_item else None # Get auth token to pass to L2 token = request.cookies.get("auth_token") if not token: # Try from header auth_header = request.headers.get("Authorization", "") if auth_header.startswith("Bearer "): token = auth_header[7:] if not token: raise HTTPException(401, "Authentication token required") # Call L2 publish-cache endpoint (use user's L2 server) l2_server = ctx.l2_server try: resp = http_requests.post( f"{l2_server}/assets/publish-cache", headers={"Authorization": f"Bearer {token}"}, json={ "content_hash": content_hash, "ipfs_cid": ipfs_cid, "asset_name": req.asset_name, "asset_type": req.asset_type, "origin": origin, "description": meta.get("description"), "tags": meta.get("tags", []), "metadata": { "filename": meta.get("filename"), "folder": meta.get("folder"), "collections": meta.get("collections", []) } }, timeout=10 ) resp.raise_for_status() l2_result = resp.json() except http_requests.exceptions.HTTPError as e: error_detail = "" try: error_detail = e.response.json().get("detail", str(e)) except Exception: error_detail = str(e) raise HTTPException(400, f"L2 publish failed: {error_detail}") except Exception as e: raise HTTPException(500, f"L2 publish failed: {e}") # Update local metadata with publish status and pin await database.save_l2_share( content_hash=content_hash, actor_id=ctx.actor_id, l2_server=l2_server, asset_name=req.asset_name, content_type=req.asset_type ) await database.update_item_metadata( content_hash=content_hash, actor_id=ctx.actor_id, pinned=True, pin_reason="published" ) return { "published": True, "asset_name": req.asset_name, "l2_result": l2_result } @app.patch("/cache/{content_hash}/republish") async def republish_cache_to_l2( content_hash: str, request: Request, ctx: UserContext = Depends(get_required_user_context) ): """ Re-publish (update) a cache item on L2 after metadata changes. Only works for already-published items. """ # Check file exists cache_path = get_cache_path(content_hash) if not cache_path: raise HTTPException(404, "Content not found") # Check ownership user_hashes = await get_user_cache_hashes(ctx.username, ctx.actor_id) if content_hash not in user_hashes: raise HTTPException(403, "Access denied") # Load metadata meta = await database.load_item_metadata(content_hash, ctx.actor_id) l2_shares = meta.get("l2_shares", []) # Find share for current L2 server (user's L2) l2_server = ctx.l2_server current_share = None for share in l2_shares: if share.get("l2_server") == l2_server: current_share = share break if not current_share: raise HTTPException(400, "Item not published yet. Use publish first.") asset_name = current_share.get("asset_name") if not asset_name: raise HTTPException(400, "No asset name found in publish info") # Get auth token token = request.cookies.get("auth_token") if not token: auth_header = request.headers.get("Authorization", "") if auth_header.startswith("Bearer "): token = auth_header[7:] if not token: raise HTTPException(401, "Authentication token required") # Get IPFS CID from cache item cache_item = await database.get_cache_item(content_hash) ipfs_cid = cache_item.get("ipfs_cid") if cache_item else None # Call L2 update endpoint (use user's L2 server) l2_server = ctx.l2_server try: resp = http_requests.patch( f"{l2_server}/assets/{asset_name}", headers={"Authorization": f"Bearer {token}"}, json={ "description": meta.get("description"), "tags": meta.get("tags"), "origin": meta.get("origin"), "ipfs_cid": ipfs_cid, "metadata": { "filename": meta.get("filename"), "folder": meta.get("folder"), "collections": meta.get("collections", []) } }, timeout=10 ) resp.raise_for_status() l2_result = resp.json() except http_requests.exceptions.HTTPError as e: error_detail = "" try: error_detail = e.response.json().get("detail", str(e)) except Exception: error_detail = str(e) raise HTTPException(400, f"L2 update failed: {error_detail}") except Exception as e: raise HTTPException(500, f"L2 update failed: {e}") # Update local metadata - save_l2_share updates last_synced_at on conflict await database.save_l2_share( content_hash=content_hash, actor_id=ctx.actor_id, l2_server=l2_server, asset_name=asset_name, content_type=current_share.get("content_type", "media") ) return { "updated": True, "asset_name": asset_name, "l2_result": l2_result } # ============ L2 Sync ============ def _fetch_l2_outbox_sync(l2_server: str, username: str) -> list: """Fetch user's outbox from L2 (sync version for asyncio.to_thread).""" try: # Fetch outbox page with activities resp = http_requests.get( f"{l2_server}/users/{username}/outbox?page=true", headers={"Accept": "application/activity+json"}, timeout=10 ) if resp.status_code != 200: logger.warning(f"L2 outbox fetch failed: {resp.status_code}") return [] data = resp.json() return data.get("orderedItems", []) except Exception as e: logger.error(f"Failed to fetch L2 outbox: {e}") return [] @app.post("/user/sync-l2") async def sync_with_l2(ctx: UserContext = Depends(get_required_user_context)): """ Sync local L2 share records with user's L2 outbox. Fetches user's published assets from their L2 server and updates local tracking. """ l2_server = ctx.l2_server username = ctx.username # Fetch outbox activities activities = await asyncio.to_thread(_fetch_l2_outbox_sync, l2_server, username) if not activities: return {"synced": 0, "message": "No activities found or L2 unavailable"} # Process Create activities for assets synced_count = 0 for activity in activities: if activity.get("type") != "Create": continue obj = activity.get("object", {}) if not isinstance(obj, dict): continue # Get asset info - look for content_hash in attachment or directly content_hash = None asset_name = obj.get("name", "") # Check attachments for content hash for attachment in obj.get("attachment", []): if attachment.get("name") == "content_hash": content_hash = attachment.get("value") break # Also check if there's a hash in the object URL or ID if not content_hash: # Try to extract from object ID like /objects/{hash} obj_id = obj.get("id", "") if "/objects/" in obj_id: content_hash = obj_id.split("/objects/")[-1].split("/")[0] if not content_hash or not asset_name: continue # Check if we have this content locally cache_path = get_cache_path(content_hash) if not cache_path: continue # We don't have this content, skip # Determine content type from object type obj_type = obj.get("type", "") if obj_type == "Video": content_type = "video" elif obj_type == "Image": content_type = "image" else: content_type = "media" # Update local L2 share record await database.save_l2_share( content_hash=content_hash, actor_id=ctx.actor_id, l2_server=l2_server, asset_name=asset_name, content_type=content_type ) synced_count += 1 return {"synced": synced_count, "total_activities": len(activities)} @app.post("/ui/sync-l2", response_class=HTMLResponse) async def ui_sync_with_l2(request: Request): """HTMX handler: sync with L2 server.""" ctx = await get_user_context_from_cookie(request) if not ctx: return '
Login required
' try: result = await sync_with_l2(ctx) synced = result.get("synced", 0) total = result.get("total_activities", 0) if synced > 0: return f'''
Synced {synced} asset(s) from L2 ({total} activities found)
''' else: return f'''
No new assets to sync ({total} activities found)
''' except Exception as e: logger.error(f"L2 sync failed: {e}") return f'''
Sync failed: {str(e)}
''' # ============ Folder & Collection Management ============ @app.get("/user/folders") async def list_folders(username: str = Depends(get_required_user)): """List user's folders.""" user_data = load_user_data(username) return {"folders": user_data["folders"]} @app.post("/user/folders") async def create_folder(folder_path: str, username: str = Depends(get_required_user)): """Create a new folder.""" user_data = load_user_data(username) # Validate path format if not folder_path.startswith("/"): raise HTTPException(400, "Folder path must start with /") # Check parent exists parent = "/".join(folder_path.rsplit("/", 1)[:-1]) or "/" if parent != "/" and parent not in user_data["folders"]: raise HTTPException(400, f"Parent folder does not exist: {parent}") # Check doesn't already exist if folder_path in user_data["folders"]: raise HTTPException(400, f"Folder already exists: {folder_path}") user_data["folders"].append(folder_path) user_data["folders"].sort() save_user_data(username, user_data) return {"folder": folder_path, "created": True} @app.delete("/user/folders") async def delete_folder(folder_path: str, ctx: UserContext = Depends(get_required_user_context)): """Delete a folder (must be empty).""" if folder_path == "/": raise HTTPException(400, "Cannot delete root folder") user_data = load_user_data(ctx.username) if folder_path not in user_data["folders"]: raise HTTPException(404, "Folder not found") # Check no subfolders for f in user_data["folders"]: if f.startswith(folder_path + "/"): raise HTTPException(400, f"Folder has subfolders: {f}") # Check no items in folder user_hashes = await get_user_cache_hashes(ctx.username, ctx.actor_id) for h in user_hashes: meta = await database.load_item_metadata(h, ctx.actor_id) if meta.get("folder") == folder_path: raise HTTPException(400, "Folder is not empty") user_data["folders"].remove(folder_path) save_user_data(ctx.username, user_data) return {"folder": folder_path, "deleted": True} @app.get("/user/collections") async def list_collections(username: str = Depends(get_required_user)): """List user's collections.""" user_data = load_user_data(username) return {"collections": user_data["collections"]} @app.post("/user/collections") async def create_collection(name: str, username: str = Depends(get_required_user)): """Create a new collection.""" user_data = load_user_data(username) # Check doesn't already exist for col in user_data["collections"]: if col["name"] == name: raise HTTPException(400, f"Collection already exists: {name}") user_data["collections"].append({ "name": name, "created_at": datetime.now(timezone.utc).isoformat() }) save_user_data(username, user_data) return {"collection": name, "created": True} @app.delete("/user/collections") async def delete_collection(name: str, ctx: UserContext = Depends(get_required_user_context)): """Delete a collection.""" user_data = load_user_data(ctx.username) # Find and remove for i, col in enumerate(user_data["collections"]): if col["name"] == name: user_data["collections"].pop(i) save_user_data(ctx.username, user_data) # Remove from all cache items user_hashes = await get_user_cache_hashes(ctx.username, ctx.actor_id) for h in user_hashes: meta = await database.load_item_metadata(h, ctx.actor_id) if name in meta.get("collections", []): new_collections = [c for c in meta.get("collections", []) if c != name] await database.update_item_metadata(h, ctx.actor_id, collections=new_collections) return {"collection": name, "deleted": True} raise HTTPException(404, "Collection not found") def is_ios_request(request: Request) -> bool: """Check if request is from iOS device.""" ua = request.headers.get("user-agent", "").lower() return "iphone" in ua or "ipad" in ua def video_src_for_request(content_hash: str, request: Request) -> str: """Get video src URL, using MP4 endpoint for iOS, raw for others.""" if is_ios_request(request): return f"/cache/{content_hash}/mp4" return f"/cache/{content_hash}/raw" def detect_media_type(cache_path: Path) -> str: """Detect if file is image or video based on magic bytes.""" with open(cache_path, "rb") as f: header = f.read(32) # Video signatures if header[:4] == b'\x1a\x45\xdf\xa3': # WebM/MKV return "video" if header[4:8] == b'ftyp': # MP4/MOV return "video" if header[:4] == b'RIFF' and header[8:12] == b'AVI ': # AVI return "video" # Image signatures if header[:8] == b'\x89PNG\r\n\x1a\n': # PNG return "image" if header[:2] == b'\xff\xd8': # JPEG return "image" if header[:6] in (b'GIF87a', b'GIF89a'): # GIF return "image" if header[:4] == b'RIFF' and header[8:12] == b'WEBP': # WebP return "image" return "unknown" async def get_user_context_from_cookie(request) -> Optional[UserContext]: """Get user context from auth cookie. Returns full context with actor_id and l2_server.""" token = request.cookies.get("auth_token") if not token: return None return await get_verified_user_context(token) async def get_user_from_cookie(request) -> Optional[str]: """Get username from auth cookie (backwards compat - prefer get_user_context_from_cookie).""" ctx = await get_user_context_from_cookie(request) return ctx.username if ctx else None def wants_html(request: Request) -> bool: """Check if request wants HTML (browser) vs JSON (API).""" accept = request.headers.get("accept", "") # Check for explicit HTML request if "text/html" in accept and "application/json" not in accept: return True # Check for browser navigation (direct URL access) fetch_mode = request.headers.get("sec-fetch-mode", "") if fetch_mode == "navigate": return True return False # Tailwind CSS config for all L1 templates TAILWIND_CONFIG = ''' ''' # Cytoscape.js for DAG visualization (extends TAILWIND_CONFIG) CYTOSCAPE_CONFIG = TAILWIND_CONFIG + ''' ''' # Node colors for DAG visualization NODE_COLORS = { "SOURCE": "#3b82f6", # Blue "EFFECT": "#22c55e", # Green "OUTPUT": "#a855f7", # Purple "ANALYSIS": "#f59e0b", # Amber "_LIST": "#6366f1", # Indigo "default": "#6b7280" # Gray } def render_run_sub_tabs(run_id: str, active: str = "overview") -> str: """Render sub-navigation tabs for run detail pages.""" tabs = [ ("overview", "Overview", f"/run/{run_id}"), ("plan", "Plan", f"/run/{run_id}/plan"), ("analysis", "Analysis", f"/run/{run_id}/analysis"), ("artifacts", "Artifacts", f"/run/{run_id}/artifacts"), ] html = '
' for tab_id, label, url in tabs: if tab_id == active: active_class = "border-b-2 border-blue-500 text-white" else: active_class = "text-gray-400 hover:text-white" html += f'{label}' html += '
' return html def render_dag_cytoscape(nodes_json: str, edges_json: str, container_id: str = "cy") -> str: """Render Cytoscape.js DAG visualization HTML with WebSocket-ready architecture.""" return f'''
''' def render_page_with_cytoscape(title: str, content: str, actor_id: Optional[str] = None, active_tab: str = None) -> str: """Render a page with Cytoscape.js support for DAG visualization.""" user_info = "" if actor_id: parts = actor_id.lstrip("@").split("@") username = parts[0] if parts else actor_id domain = parts[1] if len(parts) > 1 else "" l2_user_url = f"https://{domain}/users/{username}" if domain else "#" user_info = f'''
Logged in as {actor_id}
''' else: user_info = '''
Not logged in
''' runs_active = "border-b-2 border-blue-500 text-white" if active_tab == "runs" else "text-gray-400 hover:text-white" recipes_active = "border-b-2 border-blue-500 text-white" if active_tab == "recipes" else "text-gray-400 hover:text-white" media_active = "border-b-2 border-blue-500 text-white" if active_tab == "media" else "text-gray-400 hover:text-white" storage_active = "border-b-2 border-blue-500 text-white" if active_tab == "storage" else "text-gray-400 hover:text-white" return f""" {title} | Art DAG L1 Server {CYTOSCAPE_CONFIG}

Art DAG L1 Server

{user_info}
{content}
""" def render_page(title: str, content: str, actor_id: Optional[str] = None, active_tab: str = None) -> str: """Render a page with nav bar and content. Used for clean URL pages. actor_id: The user's actor ID (@user@server) or None if not logged in. """ user_info = "" if actor_id: # Extract username and domain from @username@domain format parts = actor_id.lstrip("@").split("@") username = parts[0] if parts else actor_id domain = parts[1] if len(parts) > 1 else "" l2_user_url = f"https://{domain}/users/{username}" if domain else "#" user_info = f'''
Logged in as {actor_id}
''' else: user_info = '''
Not logged in
''' runs_active = "border-b-2 border-blue-500 text-white" if active_tab == "runs" else "text-gray-400 hover:text-white" recipes_active = "border-b-2 border-blue-500 text-white" if active_tab == "recipes" else "text-gray-400 hover:text-white" media_active = "border-b-2 border-blue-500 text-white" if active_tab == "media" else "text-gray-400 hover:text-white" storage_active = "border-b-2 border-blue-500 text-white" if active_tab == "storage" else "text-gray-400 hover:text-white" return f""" {title} | Art DAG L1 Server {TAILWIND_CONFIG}

Art DAG L1 Server

{user_info}
{content}
""" def render_ui_html(actor_id: Optional[str] = None, tab: str = "runs") -> str: """Render main UI HTML with optional user context. actor_id: The user's actor ID (@user@server) or None if not logged in. """ user_info = "" if actor_id: # Extract username and domain from @username@domain format parts = actor_id.lstrip("@").split("@") username = parts[0] if parts else actor_id domain = parts[1] if len(parts) > 1 else "" l2_user_url = f"https://{domain}/users/{username}" if domain else "#" user_info = f'''
Logged in as {actor_id}
''' else: user_info = '''
Not logged in
''' runs_active = "border-b-2 border-blue-500 text-white" if tab == "runs" else "text-gray-400 hover:text-white" recipes_active = "border-b-2 border-blue-500 text-white" if tab == "recipes" else "text-gray-400 hover:text-white" media_active = "border-b-2 border-blue-500 text-white" if tab == "media" else "text-gray-400 hover:text-white" storage_active = "border-b-2 border-blue-500 text-white" if tab == "storage" else "text-gray-400 hover:text-white" if tab == "runs": content_url = "/ui/runs" elif tab == "recipes": content_url = "/ui/recipes-list" else: content_url = "/ui/media-list" return f""" Art DAG L1 Server {TAILWIND_CONFIG}

Art DAG L1 Server

{user_info}
Loading...
""" # Auth - L1 doesn't handle login (user logs in at their L2 server) # Token can be passed via URL from L2 redirect, then L1 sets its own cookie @app.get("/auth") async def auth_callback(auth_token: str = None): """ Receive auth token from L2 redirect and set local cookie. This enables cross-subdomain auth on iOS Safari which blocks shared cookies. """ if not auth_token: return RedirectResponse(url="/", status_code=302) # Verify the token is valid ctx = await get_verified_user_context(auth_token) if not ctx: return RedirectResponse(url="/", status_code=302) # Register token for this user (for revocation by username later) register_user_token(ctx.username, auth_token) # Set local first-party cookie and redirect to home response = RedirectResponse(url="/runs", status_code=302) response.set_cookie( key="auth_token", value=auth_token, httponly=True, max_age=60 * 60 * 24 * 30, # 30 days samesite="lax", secure=True ) return response @app.get("/logout") async def logout(): """Logout - clear local cookie and redirect to home.""" response = RedirectResponse(url="/", status_code=302) response.delete_cookie("auth_token") return response @app.post("/auth/revoke") async def auth_revoke(credentials: HTTPAuthorizationCredentials = Depends(security)): """ Revoke a token. Called by L2 when user logs out. The token to revoke is passed in the Authorization header. """ if not credentials: raise HTTPException(401, "No token provided") token = credentials.credentials # Verify token is valid before revoking (ensures caller has the token) ctx = get_user_context_from_token(token) if not ctx: raise HTTPException(401, "Invalid token") # Revoke the token newly_revoked = revoke_token(token) return {"revoked": True, "newly_revoked": newly_revoked} class RevokeUserRequest(BaseModel): username: str l2_server: str # L2 server requesting the revocation @app.post("/auth/revoke-user") async def auth_revoke_user(request: RevokeUserRequest): """ Revoke all tokens for a user. Called by L2 when user logs out. This handles the case where L2 issued scoped tokens that differ from L2's own token. """ # Verify the L2 server is authorized (must be in L1's known list or match token's l2_server) # For now, we trust any request since this only affects users already on this L1 # Revoke all tokens registered for this user count = revoke_all_user_tokens(request.username) return {"revoked": True, "tokens_revoked": count, "username": request.username} @app.post("/ui/publish-run/{run_id}", response_class=HTMLResponse) async def ui_publish_run(run_id: str, request: Request): """Publish a run to L2 from the web UI. Assets are named by content_hash.""" ctx = await get_user_context_from_cookie(request) if not ctx: return HTMLResponse('
Not logged in
') token = request.cookies.get("auth_token") if not token: return HTMLResponse('
Not logged in
') # Get the run to pin its output and inputs run = load_run(run_id) if not run: return HTMLResponse('
Run not found
') # Call L2 to publish the run, including this L1's public URL # Assets are named by their content_hash - no output_name needed l2_server = ctx.l2_server try: resp = http_requests.post( f"{l2_server}/assets/record-run", json={"run_id": run_id, "l1_server": L1_PUBLIC_URL}, headers={"Authorization": f"Bearer {token}"}, timeout=30 ) if resp.status_code == 400: error = resp.json().get("detail", "Bad request") return HTMLResponse(f'
Error: {error}
') resp.raise_for_status() result = resp.json() # Pin the output and record L2 share if run.output_hash and result.get("asset"): await database.update_item_metadata(run.output_hash, ctx.actor_id, pinned=True, pin_reason="published") # Record L2 share so UI shows published status cache_path = get_cache_path(run.output_hash) media_type = detect_media_type(cache_path) if cache_path else "image" content_type = "video" if media_type == "video" else "image" # Get activity_id for linking to the published run activity = result.get("activity") activity_id = activity.get("activity_id") if activity else None await database.save_l2_share( content_hash=run.output_hash, actor_id=ctx.actor_id, l2_server=l2_server, asset_name=result["asset"]["name"], content_type=content_type, activity_id=activity_id ) # Pin the inputs (for provenance) for input_hash in run.inputs: await database.update_item_metadata(input_hash, ctx.actor_id, pinned=True, pin_reason="input_to_published") # If this was a recipe-based run, pin the recipe and its fixed inputs if run.recipe.startswith("recipe:"): config_name = run.recipe.replace("recipe:", "") for recipe in list_all_recipes(): if recipe.name == config_name: # Pin the recipe YAML cache_manager.pin(recipe.recipe_id, reason="recipe_for_published") # Pin all fixed inputs referenced by the recipe for fixed in recipe.fixed_inputs: if fixed.content_hash: cache_manager.pin(fixed.content_hash, reason="fixed_input_in_published_recipe") break # Use HTTPS for L2 links l2_https = l2_server.replace("http://", "https://") asset_name = result["asset"]["name"] short_name = asset_name[:16] + "..." if len(asset_name) > 20 else asset_name # Link to activity (the published run) rather than just the asset activity = result.get("activity") activity_id = activity.get("activity_id") if activity else None l2_link = f"{l2_https}/activities/{activity_id}" if activity_id else f"{l2_https}/assets/{asset_name}" return HTMLResponse(f'''
Published to L2 as {short_name}! View on L2
''') except http_requests.exceptions.HTTPError as e: error_detail = "" try: error_detail = e.response.json().get("detail", str(e)) except Exception: error_detail = str(e) return HTMLResponse(f'
Error: {error_detail}
') except Exception as e: return HTMLResponse(f'
Error: {e}
') @app.get("/ui/runs", response_class=HTMLResponse) async def ui_runs(request: Request): """HTMX partial: list of runs.""" ctx = await get_user_context_from_cookie(request) runs = list_all_runs() # Require login to see runs if not ctx: return '

Not logged in.

' # Filter runs by user - match both plain username and ActivityPub format (@user@domain) runs = [r for r in runs if r.username in (ctx.username, ctx.actor_id)] if not runs: return '

You have no runs yet. Use the CLI to start a run.

' # Status badge colors status_colors = { "completed": "bg-green-600 text-white", "running": "bg-yellow-600 text-white", "failed": "bg-red-600 text-white", "pending": "bg-gray-600 text-white" } html_parts = ['
'] for run in runs[:20]: # Limit to 20 most recent status_badge = status_colors.get(run.status, "bg-gray-600 text-white") html_parts.append(f'''
{run.recipe}
{run.status}
Created: {run.created_at[:19].replace('T', ' ')}
''') # Show input and output side by side has_input = run.inputs and cache_manager.has_content(run.inputs[0]) has_output = run.status == "completed" and run.output_hash and cache_manager.has_content(run.output_hash) if has_input or has_output: html_parts.append('
') # Input box if has_input: input_hash = run.inputs[0] input_media_type = detect_media_type(get_cache_path(input_hash)) html_parts.append(f'''
Input: {input_hash[:16]}...
''') if input_media_type == "video": input_video_src = video_src_for_request(input_hash, request) html_parts.append(f'') elif input_media_type == "image": html_parts.append(f'input') html_parts.append('
') # Output box if has_output: output_hash = run.output_hash output_media_type = detect_media_type(get_cache_path(output_hash)) html_parts.append(f'''
Output: {output_hash[:16]}...
''') if output_media_type == "video": output_video_src = video_src_for_request(output_hash, request) html_parts.append(f'') elif output_media_type == "image": html_parts.append(f'output') html_parts.append('
') html_parts.append('
') # Show error if failed if run.status == "failed" and run.error: html_parts.append(f'
Error: {run.error}
') html_parts.append('
') html_parts.append('
') return '\n'.join(html_parts) @app.get("/ui/media-list", response_class=HTMLResponse) async def ui_media_list( request: Request, folder: Optional[str] = None, collection: Optional[str] = None, tag: Optional[str] = None ): """HTMX partial: list of media items with optional filtering.""" ctx = await get_user_context_from_cookie(request) # Require login to see media if not ctx: return '

Not logged in.

' # Get hashes owned by/associated with this user user_hashes = await get_user_cache_hashes(ctx.username, ctx.actor_id) # Get cache items that belong to the user (from cache_manager) cache_items = [] seen_hashes = set() # Deduplicate by content_hash for cached_file in cache_manager.list_all(): content_hash = cached_file.content_hash if content_hash not in user_hashes: continue # Skip duplicates (same content from multiple runs) if content_hash in seen_hashes: continue seen_hashes.add(content_hash) # Skip recipes - they have their own section if cached_file.node_type == "recipe": continue # Load metadata for filtering meta = await database.load_item_metadata(content_hash, ctx.actor_id) # Apply folder filter if folder: item_folder = meta.get("folder", "/") if folder != "/" and not item_folder.startswith(folder): continue if folder == "/" and item_folder != "/": continue # Apply collection filter if collection: if collection not in meta.get("collections", []): continue # Apply tag filter if tag: if tag not in meta.get("tags", []): continue cache_items.append({ "hash": content_hash, "size": cached_file.size_bytes, "mtime": cached_file.created_at, "meta": meta }) # Sort by modification time (newest first) cache_items.sort(key=lambda x: x["mtime"], reverse=True) if not cache_items: filter_msg = "" if folder: filter_msg = f" in folder {folder}" elif collection: filter_msg = f" in collection '{collection}'" elif tag: filter_msg = f" with tag '{tag}'" return f'

No cached files{filter_msg}. Upload files or run effects to see them here.

' html_parts = ['
'] for item in cache_items[:50]: # Limit to 50 items content_hash = item["hash"] cache_path = get_cache_path(content_hash) media_type = detect_media_type(cache_path) if cache_path else "unknown" # Check IPFS status cache_item = await database.get_cache_item(content_hash) ipfs_cid = cache_item.get("ipfs_cid") if cache_item else None ipfs_badge = 'IPFS' if ipfs_cid else '' # Check L2 publish status l2_shares = item["meta"].get("l2_shares", []) if l2_shares: first_share = l2_shares[0] l2_server = first_share.get("l2_server", "") asset_name = first_share.get("asset_name", "") asset_url = f"{l2_server}/assets/{asset_name}" published_badge = f'L2' else: published_badge = '' # Format size size = item["size"] if size > 1024*1024: size_str = f"{size/(1024*1024):.1f} MB" elif size > 1024: size_str = f"{size/1024:.1f} KB" else: size_str = f"{size} bytes" html_parts.append(f'''
{media_type} {ipfs_badge} {published_badge}
{size_str}
{content_hash[:24]}...
''') if media_type == "video": video_src = video_src_for_request(content_hash, request) html_parts.append(f'') elif media_type == "image": html_parts.append(f'{content_hash[:16]}') else: html_parts.append('

Unknown file type

') html_parts.append('''
''') html_parts.append('
') return '\n'.join(html_parts) @app.get("/ui/detail/{run_id}") async def ui_detail_page(run_id: str): """Redirect to clean URL.""" return RedirectResponse(url=f"/run/{run_id}", status_code=302) @app.get("/ui/run/{run_id}", response_class=HTMLResponse) async def ui_run_partial(run_id: str, request: Request): """HTMX partial: single run (for polling updates).""" run = load_run(run_id) if not run: return '
Run not found
' # Check Celery task status if running if run.status == "running" and run.celery_task_id: task = celery_app.AsyncResult(run.celery_task_id) if task.ready(): if task.successful(): result = task.result run.status = "completed" run.completed_at = datetime.now(timezone.utc).isoformat() run.output_hash = result.get("output", {}).get("content_hash") # Extract effects info from provenance effects = result.get("effects", []) if effects: run.effects_commit = effects[0].get("repo_commit") run.effect_url = effects[0].get("repo_url") # Extract infrastructure info run.infrastructure = result.get("infrastructure") output_path = Path(result.get("output", {}).get("local_path", "")) if output_path.exists(): await cache_file(output_path) else: run.status = "failed" run.error = str(task.result) save_run(run) # Status badge colors status_colors = { "completed": "bg-green-600 text-white", "running": "bg-yellow-600 text-white", "failed": "bg-red-600 text-white", "pending": "bg-gray-600 text-white" } status_badge = status_colors.get(run.status, "bg-gray-600 text-white") poll_attr = f'hx-get="/ui/run/{run_id}" hx-trigger="every 2s" hx-swap="outerHTML"' if run.status == "running" else "" html = f'''
{run.recipe}
{run.status}
Created: {run.created_at[:19].replace('T', ' ')}
''' # Show input and output side by side has_input = run.inputs and cache_manager.has_content(run.inputs[0]) has_output = run.status == "completed" and run.output_hash and cache_manager.has_content(run.output_hash) if has_input or has_output: html += '
' if has_input: input_hash = run.inputs[0] input_media_type = detect_media_type(get_cache_path(input_hash)) html += f'''
Input: {input_hash[:16]}...
''' if input_media_type == "video": input_video_src = video_src_for_request(input_hash, request) html += f'' elif input_media_type == "image": html += f'input' html += '
' if has_output: output_hash = run.output_hash output_media_type = detect_media_type(get_cache_path(output_hash)) html += f'''
Output: {output_hash[:16]}...
''' if output_media_type == "video": output_video_src = video_src_for_request(output_hash, request) html += f'' elif output_media_type == "image": html += f'output' html += '
' html += '
' if run.status == "failed" and run.error: html += f'
Error: {run.error}
' html += '
' return html # ============ User Storage Configuration ============ STORAGE_PROVIDERS_INFO = { "pinata": {"name": "Pinata", "desc": "1GB free, IPFS pinning", "color": "blue"}, "web3storage": {"name": "web3.storage", "desc": "IPFS + Filecoin", "color": "green"}, "nftstorage": {"name": "NFT.Storage", "desc": "Free for NFTs", "color": "pink"}, "infura": {"name": "Infura IPFS", "desc": "5GB free", "color": "orange"}, "filebase": {"name": "Filebase", "desc": "5GB free, S3+IPFS", "color": "cyan"}, "storj": {"name": "Storj", "desc": "25GB free", "color": "indigo"}, "local": {"name": "Local Storage", "desc": "Your own disk", "color": "purple"}, } @app.get("/storage") async def list_storage(request: Request): """List user's storage providers. HTML for browsers, JSON for API.""" accept = request.headers.get("accept", "") wants_json = "application/json" in accept and "text/html" not in accept ctx = await get_user_context_from_cookie(request) if not ctx: if wants_json: raise HTTPException(401, "Authentication required") return RedirectResponse(url="/auth", status_code=302) storages = await database.get_user_storage(ctx.actor_id) # Add usage stats to each storage for storage in storages: usage = await database.get_storage_usage(storage["id"]) storage["used_bytes"] = usage["used_bytes"] storage["pin_count"] = usage["pin_count"] storage["donated_gb"] = storage["capacity_gb"] // 2 # Mask sensitive config keys for display if storage.get("config"): config = storage["config"] if isinstance(storage["config"], dict) else json.loads(storage["config"]) masked = {} for k, v in config.items(): if "key" in k.lower() or "token" in k.lower() or "secret" in k.lower(): masked[k] = v[:4] + "..." + v[-4:] if len(str(v)) > 8 else "****" else: masked[k] = v storage["config_display"] = masked if wants_json: return {"storages": storages} return await ui_storage_page(ctx.username, storages, request) @app.post("/storage") async def add_storage(req: AddStorageRequest, ctx: UserContext = Depends(get_required_user_context)): """Add a storage provider.""" valid_types = ["pinata", "web3storage", "nftstorage", "infura", "filebase", "storj", "local"] if req.provider_type not in valid_types: raise HTTPException(400, f"Invalid provider type: {req.provider_type}") # Test the provider connection before saving provider = storage_providers.create_provider(req.provider_type, { **req.config, "capacity_gb": req.capacity_gb }) if not provider: raise HTTPException(400, "Failed to create provider with given config") success, message = await provider.test_connection() if not success: raise HTTPException(400, f"Provider connection failed: {message}") # Save to database provider_name = req.provider_name or f"{req.provider_type}-{ctx.username}" storage_id = await database.add_user_storage( actor_id=ctx.actor_id, provider_type=req.provider_type, provider_name=provider_name, config=req.config, capacity_gb=req.capacity_gb ) if not storage_id: raise HTTPException(500, "Failed to save storage provider") return {"id": storage_id, "message": f"Storage provider added: {provider_name}"} @app.post("/storage/add") async def add_storage_form( request: Request, provider_type: str = Form(...), provider_name: Optional[str] = Form(None), description: Optional[str] = Form(None), capacity_gb: int = Form(5), api_key: Optional[str] = Form(None), secret_key: Optional[str] = Form(None), api_token: Optional[str] = Form(None), project_id: Optional[str] = Form(None), project_secret: Optional[str] = Form(None), access_key: Optional[str] = Form(None), bucket: Optional[str] = Form(None), path: Optional[str] = Form(None), ): """Add a storage provider via HTML form (cookie auth).""" ctx = await get_user_context_from_cookie(request) if not ctx: return HTMLResponse('
Not authenticated
', status_code=401) valid_types = ["pinata", "web3storage", "nftstorage", "infura", "filebase", "storj", "local"] if provider_type not in valid_types: return HTMLResponse(f'
Invalid provider type: {provider_type}
') # Build config based on provider type config = {} if provider_type == "pinata": if not api_key or not secret_key: return HTMLResponse('
Pinata requires API Key and Secret Key
') config = {"api_key": api_key, "secret_key": secret_key} elif provider_type == "web3storage": if not api_token: return HTMLResponse('
web3.storage requires API Token
') config = {"api_token": api_token} elif provider_type == "nftstorage": if not api_token: return HTMLResponse('
NFT.Storage requires API Token
') config = {"api_token": api_token} elif provider_type == "infura": if not project_id or not project_secret: return HTMLResponse('
Infura requires Project ID and Project Secret
') config = {"project_id": project_id, "project_secret": project_secret} elif provider_type == "filebase": if not access_key or not secret_key or not bucket: return HTMLResponse('
Filebase requires Access Key, Secret Key, and Bucket
') config = {"access_key": access_key, "secret_key": secret_key, "bucket": bucket} elif provider_type == "storj": if not access_key or not secret_key or not bucket: return HTMLResponse('
Storj requires Access Key, Secret Key, and Bucket
') config = {"access_key": access_key, "secret_key": secret_key, "bucket": bucket} elif provider_type == "local": if not path: return HTMLResponse('
Local storage requires a path
') config = {"path": path} # Test the provider connection before saving provider = storage_providers.create_provider(provider_type, { **config, "capacity_gb": capacity_gb }) if not provider: return HTMLResponse('
Failed to create provider with given config
') success, message = await provider.test_connection() if not success: return HTMLResponse(f'
Provider connection failed: {message}
') # Save to database name = provider_name or f"{provider_type}-{ctx.username}-{len(await database.get_user_storage_by_type(ctx.actor_id, provider_type)) + 1}" storage_id = await database.add_user_storage( actor_id=ctx.actor_id, provider_type=provider_type, provider_name=name, config=config, capacity_gb=capacity_gb, description=description ) if not storage_id: return HTMLResponse('
Failed to save storage provider
') return HTMLResponse(f'''
Storage provider "{name}" added successfully!
''') @app.get("/storage/{storage_id}") async def get_storage(storage_id: int, ctx: UserContext = Depends(get_required_user_context)): """Get a specific storage provider.""" storage = await database.get_storage_by_id(storage_id) if not storage: raise HTTPException(404, "Storage provider not found") if storage["actor_id"] != ctx.actor_id: raise HTTPException(403, "Not authorized") usage = await database.get_storage_usage(storage_id) storage["used_bytes"] = usage["used_bytes"] storage["pin_count"] = usage["pin_count"] storage["donated_gb"] = storage["capacity_gb"] // 2 return storage @app.patch("/storage/{storage_id}") async def update_storage(storage_id: int, req: UpdateStorageRequest, ctx: UserContext = Depends(get_required_user_context)): """Update a storage provider.""" storage = await database.get_storage_by_id(storage_id) if not storage: raise HTTPException(404, "Storage provider not found") if storage["actor_id"] != ctx.actor_id: raise HTTPException(403, "Not authorized") # If updating config, test the new connection if req.config: existing_config = storage["config"] if isinstance(storage["config"], dict) else json.loads(storage["config"]) new_config = {**existing_config, **req.config} provider = storage_providers.create_provider(storage["provider_type"], { **new_config, "capacity_gb": req.capacity_gb or storage["capacity_gb"] }) if provider: success, message = await provider.test_connection() if not success: raise HTTPException(400, f"Provider connection failed: {message}") success = await database.update_user_storage( storage_id, config=req.config, capacity_gb=req.capacity_gb, is_active=req.is_active ) if not success: raise HTTPException(500, "Failed to update storage provider") return {"message": "Storage provider updated"} @app.delete("/storage/{storage_id}") async def remove_storage(storage_id: int, request: Request): """Remove a storage provider.""" ctx = await get_user_context_from_cookie(request) if not ctx: raise HTTPException(401, "Not authenticated") storage = await database.get_storage_by_id(storage_id) if not storage: raise HTTPException(404, "Storage provider not found") if storage["actor_id"] != ctx.actor_id: raise HTTPException(403, "Not authorized") success = await database.remove_user_storage(storage_id) if not success: raise HTTPException(500, "Failed to remove storage provider") if wants_html(request): return HTMLResponse("") return {"message": "Storage provider removed"} @app.post("/storage/{storage_id}/test") async def test_storage(storage_id: int, request: Request): """Test storage provider connectivity.""" ctx = await get_user_context_from_cookie(request) if not ctx: if wants_html(request): return HTMLResponse('Not authenticated', status_code=401) raise HTTPException(401, "Not authenticated") storage = await database.get_storage_by_id(storage_id) if not storage: if wants_html(request): return HTMLResponse('Storage not found', status_code=404) raise HTTPException(404, "Storage provider not found") if storage["actor_id"] != ctx.actor_id: if wants_html(request): return HTMLResponse('Not authorized', status_code=403) raise HTTPException(403, "Not authorized") config = storage["config"] if isinstance(storage["config"], dict) else json.loads(storage["config"]) provider = storage_providers.create_provider(storage["provider_type"], { **config, "capacity_gb": storage["capacity_gb"] }) if not provider: if wants_html(request): return HTMLResponse('Failed to create provider') raise HTTPException(500, "Failed to create provider") success, message = await provider.test_connection() if wants_html(request): if success: return HTMLResponse(f'{message}') return HTMLResponse(f'{message}') return {"success": success, "message": message} @app.get("/storage/type/{provider_type}") async def storage_type_page(provider_type: str, request: Request): """Page for managing storage configs of a specific type.""" ctx = await get_user_context_from_cookie(request) if not ctx: return RedirectResponse(url="/auth", status_code=302) if provider_type not in STORAGE_PROVIDERS_INFO: raise HTTPException(404, "Invalid provider type") storages = await database.get_user_storage_by_type(ctx.actor_id, provider_type) # Add usage stats and mask config for storage in storages: usage = await database.get_storage_usage(storage["id"]) storage["used_bytes"] = usage["used_bytes"] storage["pin_count"] = usage["pin_count"] # Mask sensitive config keys if storage.get("config"): config = storage["config"] if isinstance(storage["config"], dict) else json.loads(storage["config"]) masked = {} for k, v in config.items(): if "key" in k.lower() or "token" in k.lower() or "secret" in k.lower(): masked[k] = v[:4] + "..." + v[-4:] if len(str(v)) > 8 else "****" else: masked[k] = v storage["config_display"] = masked info = STORAGE_PROVIDERS_INFO[provider_type] return await ui_storage_type_page(ctx.username, provider_type, info, storages, request) async def ui_storage_page(username: str, storages: list, request: Request) -> HTMLResponse: """Render the main storage management page.""" # Count by provider type type_counts = {} for s in storages: ptype = s["provider_type"] type_counts[ptype] = type_counts.get(ptype, 0) + 1 # Build provider type cards cards = "" for ptype, info in STORAGE_PROVIDERS_INFO.items(): count = type_counts.get(ptype, 0) count_badge = f'{count}' if count > 0 else "" cards += f'''

{info["name"]}

{info["desc"]}

{count_badge}
''' # Total stats total_capacity = sum(s["capacity_gb"] for s in storages) total_used = sum(s["used_bytes"] for s in storages) total_pins = sum(s["pin_count"] for s in storages) html = f''' Storage - Art DAG L1

Storage Providers

Total Storage

{len(storages)} providers configured

{total_used / (1024**3):.1f} / {total_capacity} GB

{total_pins} items pinned

{cards}
''' return HTMLResponse(html) async def ui_storage_type_page(username: str, provider_type: str, info: dict, storages: list, request: Request) -> HTMLResponse: """Render storage management page for a specific provider type.""" # Build storage list storage_rows = "" for s in storages: used_gb = s["used_bytes"] / (1024**3) status_class = "bg-green-600" if s.get("is_active", True) else "bg-gray-600" status_text = "Active" if s.get("is_active", True) else "Inactive" config_display = "" if s.get("config_display"): for k, v in s["config_display"].items(): config_display += f'{k}: {v}
' storage_rows += f'''

{s.get("provider_name", "Unnamed")}

{status_text}
{used_gb:.2f} / {s["capacity_gb"]} GB used ({s["pin_count"]} items)
{config_display}
''' if not storages: storage_rows = f'

No {info["name"]} configs yet. Add one below.

' # Build form fields based on provider type form_fields = "" if provider_type == "pinata": form_fields = ''' ''' elif provider_type in ["web3storage", "nftstorage"]: form_fields = ''' ''' elif provider_type == "infura": form_fields = ''' ''' elif provider_type in ["filebase", "storj"]: form_fields = ''' ''' elif provider_type == "local": form_fields = ''' ''' html = f''' {info["name"]} Storage - Art DAG L1
← Back

{info["name"]}

{storage_rows}

Add New {info["name"]} Config

{form_fields}
''' return HTMLResponse(html) # ============ Client Download ============ CLIENT_TARBALL = Path(__file__).parent / "artdag-client.tar.gz" @app.get("/download/client") async def download_client(): """Download the Art DAG CLI client.""" if not CLIENT_TARBALL.exists(): raise HTTPException(404, "Client package not found") return FileResponse( CLIENT_TARBALL, media_type="application/gzip", filename="artdag-client.tar.gz" ) # ============================================================================ # 3-Phase Execution API (Analyze → Plan → Execute) # ============================================================================ class RecipeRunRequest(BaseModel): """Request to run a recipe with the 3-phase execution model.""" recipe_yaml: str # Recipe YAML content input_hashes: dict # Mapping from input name to content hash features: Optional[list[str]] = None # Features to extract (default: beats, energy) class PlanRequest(BaseModel): """Request to generate an execution plan.""" recipe_yaml: str input_hashes: dict features: Optional[list[str]] = None class ExecutePlanRequest(BaseModel): """Request to execute a pre-generated plan.""" plan_json: str # JSON-serialized ExecutionPlan @app.post("/api/v2/plan") async def generate_plan_endpoint( request: PlanRequest, ctx: UserContext = Depends(get_required_user_context) ): """ Generate an execution plan without executing it. Phase 1 (Analyze) + Phase 2 (Plan) of the 3-phase model. Returns the plan with cache status for each step. """ from tasks.orchestrate import generate_plan try: # Submit to Celery task = generate_plan.delay( recipe_yaml=request.recipe_yaml, input_hashes=request.input_hashes, features=request.features, ) # Wait for result (plan generation is usually fast) result = task.get(timeout=60) return { "status": result.get("status"), "recipe": result.get("recipe"), "plan_id": result.get("plan_id"), "total_steps": result.get("total_steps"), "cached_steps": result.get("cached_steps"), "pending_steps": result.get("pending_steps"), "steps": result.get("steps"), } except Exception as e: logger.error(f"Plan generation failed: {e}") raise HTTPException(status_code=500, detail=str(e)) @app.post("/api/v2/execute") async def execute_plan_endpoint( request: ExecutePlanRequest, ctx: UserContext = Depends(get_required_user_context) ): """ Execute a pre-generated execution plan. Phase 3 (Execute) of the 3-phase model. Submits the plan to Celery for parallel execution. """ from tasks.orchestrate import run_plan run_id = str(uuid.uuid4()) try: # Submit to Celery (async) task = run_plan.delay( plan_json=request.plan_json, run_id=run_id, ) return { "status": "submitted", "run_id": run_id, "celery_task_id": task.id, } except Exception as e: logger.error(f"Plan execution failed: {e}") raise HTTPException(status_code=500, detail=str(e)) @app.post("/api/v2/run-recipe") async def run_recipe_endpoint( request: RecipeRunRequest, ctx: UserContext = Depends(get_required_user_context) ): """ Run a complete recipe through all 3 phases. 1. Analyze: Extract features from inputs 2. Plan: Generate execution plan with cache IDs 3. Execute: Run steps with parallel execution Returns immediately with run_id. Poll /api/v2/run/{run_id} for status. """ from tasks.orchestrate import run_recipe # Compute run_id from inputs and recipe try: recipe_data = yaml.safe_load(request.recipe_yaml) recipe_name = recipe_data.get("name", "unknown") except Exception: recipe_name = "unknown" run_id = compute_run_id( list(request.input_hashes.values()), recipe_name, hashlib.sha3_256(request.recipe_yaml.encode()).hexdigest() ) # Check if already completed cached = await database.get_run_cache(run_id) if cached: output_hash = cached.get("output_hash") if cache_manager.has_content(output_hash): return { "status": "completed", "run_id": run_id, "output_hash": output_hash, "output_ipfs_cid": cache_manager.get_ipfs_cid(output_hash), "cached": True, } # Submit to Celery try: task = run_recipe.delay( recipe_yaml=request.recipe_yaml, input_hashes=request.input_hashes, features=request.features, run_id=run_id, ) # Store run status in Redis run_data = { "run_id": run_id, "status": "pending", "recipe": recipe_name, "inputs": list(request.input_hashes.values()), "celery_task_id": task.id, "created_at": datetime.now(timezone.utc).isoformat(), "username": ctx.actor_id, } redis_client.setex( f"{RUNS_KEY_PREFIX}{run_id}", 86400, # 24 hour expiry json.dumps(run_data) ) return { "status": "submitted", "run_id": run_id, "celery_task_id": task.id, "recipe": recipe_name, } except Exception as e: logger.error(f"Recipe run failed: {e}") raise HTTPException(status_code=500, detail=str(e)) @app.get("/api/v2/run/{run_id}") async def get_run_v2(run_id: str, ctx: UserContext = Depends(get_required_user_context)): """ Get status of a 3-phase execution run. """ # Check Redis for run status run_data = redis_client.get(f"{RUNS_KEY_PREFIX}{run_id}") if run_data: data = json.loads(run_data) # If pending, check Celery task status if data.get("status") == "pending" and data.get("celery_task_id"): from celery.result import AsyncResult result = AsyncResult(data["celery_task_id"]) if result.ready(): if result.successful(): task_result = result.get() data["status"] = task_result.get("status", "completed") data["output_hash"] = task_result.get("output_cache_id") data["output_ipfs_cid"] = task_result.get("output_ipfs_cid") data["total_steps"] = task_result.get("total_steps") data["cached"] = task_result.get("cached") data["executed"] = task_result.get("executed") # Update Redis redis_client.setex( f"{RUNS_KEY_PREFIX}{run_id}", 86400, json.dumps(data) ) else: data["status"] = "failed" data["error"] = str(result.result) else: data["celery_status"] = result.status return data # Check database cache cached = await database.get_run_cache(run_id) if cached: return { "run_id": run_id, "status": "completed", "output_hash": cached.get("output_hash"), "cached": True, } raise HTTPException(status_code=404, detail="Run not found") if __name__ == "__main__": import uvicorn # Workers enabled - cache indexes shared via Redis uvicorn.run("server:app", host="0.0.0.0", port=8100, workers=4)