#!/usr/bin/env python3
"""
Art DAG L1 Server
Manages rendering runs and provides access to the cache.
- POST /runs - start a run (recipe + inputs)
- GET /runs/{run_id} - get run status/result
- GET /cache/{content_hash} - get cached content
"""
import asyncio
import base64
import hashlib
import json
import logging
import os
import time
import uuid
from dataclasses import dataclass
from datetime import datetime, timezone
from pathlib import Path
from typing import Optional
# Configure logging
logging.basicConfig(
level=logging.INFO,
format='%(asctime)s %(levelname)s %(name)s: %(message)s'
)
logger = logging.getLogger(__name__)
from fastapi import FastAPI, HTTPException, UploadFile, File, Depends, Form, Request
from fastapi.responses import FileResponse, HTMLResponse, RedirectResponse
from fastapi.security import HTTPBearer, HTTPAuthorizationCredentials
from pydantic import BaseModel
import redis
import requests as http_requests
from urllib.parse import urlparse
import yaml
from celery_app import app as celery_app
from legacy_tasks import render_effect, execute_dag, build_effect_dag
from contextlib import asynccontextmanager
from cache_manager import L1CacheManager, get_cache_manager
import database
import storage_providers
# L1 public URL for redirects
L1_PUBLIC_URL = os.environ.get("L1_PUBLIC_URL", "http://localhost:8100")
def compute_run_id(input_hashes: list[str], recipe: str, recipe_hash: str = None) -> str:
"""
Compute a deterministic run_id from inputs and recipe.
The run_id is a SHA3-256 hash of:
- Sorted input content hashes
- Recipe identifier (recipe_hash if provided, else "effect:{recipe}")
This makes runs content-addressable: same inputs + recipe = same run_id.
"""
data = {
"inputs": sorted(input_hashes),
"recipe": recipe_hash or f"effect:{recipe}",
"version": "1", # For future schema changes
}
json_str = json.dumps(data, sort_keys=True, separators=(",", ":"))
return hashlib.sha3_256(json_str.encode()).hexdigest()
# IPFS gateway URL for public access to IPFS content
IPFS_GATEWAY_URL = os.environ.get("IPFS_GATEWAY_URL", "")
# IPFS-primary mode: everything stored on IPFS, no local cache
# Set to "true" to enable
IPFS_PRIMARY = os.environ.get("IPFS_PRIMARY", "").lower() in ("true", "1", "yes")
# Cache directory (use /data/cache in Docker, ~/.artdag/cache locally)
CACHE_DIR = Path(os.environ.get("CACHE_DIR", str(Path.home() / ".artdag" / "cache")))
CACHE_DIR.mkdir(parents=True, exist_ok=True)
# Redis for persistent run storage and shared cache index (multi-worker support)
REDIS_URL = os.environ.get('REDIS_URL', 'redis://localhost:6379/5')
parsed = urlparse(REDIS_URL)
redis_client = redis.Redis(
host=parsed.hostname or 'localhost',
port=parsed.port or 6379,
db=int(parsed.path.lstrip('/') or 0),
socket_timeout=5,
socket_connect_timeout=5
)
RUNS_KEY_PREFIX = "artdag:run:"
RECIPES_KEY_PREFIX = "artdag:recipe:"
REVOKED_KEY_PREFIX = "artdag:revoked:"
USER_TOKENS_PREFIX = "artdag:user_tokens:"
# Token revocation (30 day expiry to match token lifetime)
TOKEN_EXPIRY_SECONDS = 60 * 60 * 24 * 30
def register_user_token(username: str, token: str) -> None:
"""Track a token for a user (for later revocation by username)."""
token_hash = hashlib.sha256(token.encode()).hexdigest()
key = f"{USER_TOKENS_PREFIX}{username}"
redis_client.sadd(key, token_hash)
redis_client.expire(key, TOKEN_EXPIRY_SECONDS)
def revoke_token(token: str) -> bool:
"""Add token to revocation set. Returns True if newly revoked."""
token_hash = hashlib.sha256(token.encode()).hexdigest()
key = f"{REVOKED_KEY_PREFIX}{token_hash}"
result = redis_client.set(key, "1", ex=TOKEN_EXPIRY_SECONDS, nx=True)
return result is not None
def revoke_token_hash(token_hash: str) -> bool:
"""Add token hash to revocation set. Returns True if newly revoked."""
key = f"{REVOKED_KEY_PREFIX}{token_hash}"
result = redis_client.set(key, "1", ex=TOKEN_EXPIRY_SECONDS, nx=True)
return result is not None
def revoke_all_user_tokens(username: str) -> int:
"""Revoke all tokens for a user. Returns count revoked."""
key = f"{USER_TOKENS_PREFIX}{username}"
token_hashes = redis_client.smembers(key)
count = 0
for token_hash in token_hashes:
if revoke_token_hash(token_hash.decode() if isinstance(token_hash, bytes) else token_hash):
count += 1
# Clear the user's token set
redis_client.delete(key)
return count
def is_token_revoked(token: str) -> bool:
"""Check if token has been revoked."""
token_hash = hashlib.sha256(token.encode()).hexdigest()
key = f"{REVOKED_KEY_PREFIX}{token_hash}"
return redis_client.exists(key) > 0
# Initialize L1 cache manager with Redis for shared state between workers
cache_manager = L1CacheManager(cache_dir=CACHE_DIR, redis_client=redis_client)
def save_run(run: "RunStatus"):
"""Save run to Redis."""
redis_client.set(f"{RUNS_KEY_PREFIX}{run.run_id}", run.model_dump_json())
def load_run(run_id: str) -> Optional["RunStatus"]:
"""Load run from Redis."""
data = redis_client.get(f"{RUNS_KEY_PREFIX}{run_id}")
if data:
return RunStatus.model_validate_json(data)
return None
def list_all_runs() -> list["RunStatus"]:
"""List all runs from Redis."""
runs = []
for key in redis_client.scan_iter(f"{RUNS_KEY_PREFIX}*"):
data = redis_client.get(key)
if data:
runs.append(RunStatus.model_validate_json(data))
return sorted(runs, key=lambda r: r.created_at, reverse=True)
def find_runs_using_content(content_hash: str) -> list[tuple["RunStatus", str]]:
"""Find all runs that use a content_hash as input or output.
Returns list of (run, role) tuples where role is 'input' or 'output'.
"""
results = []
for run in list_all_runs():
if run.inputs and content_hash in run.inputs:
results.append((run, "input"))
if run.output_hash == content_hash:
results.append((run, "output"))
return results
@asynccontextmanager
async def lifespan(app: FastAPI):
"""Initialize and cleanup resources."""
# Startup: initialize database
await database.init_db()
yield
# Shutdown: close database
await database.close_db()
app = FastAPI(
title="Art DAG L1 Server",
description="Distributed rendering server for Art DAG",
version="0.1.0",
lifespan=lifespan
)
@app.exception_handler(404)
async def not_found_handler(request: Request, exc):
"""Custom 404 page."""
from fastapi.responses import JSONResponse
accept = request.headers.get("accept", "")
if "text/html" in accept:
content = '''
"""
@app.get("/", response_class=HTMLResponse)
async def root(request: Request):
"""Home page."""
ctx = await get_user_context_from_cookie(request)
actor_id = ctx.actor_id if ctx else None
return render_home_html(actor_id)
@app.post("/runs", response_model=RunStatus)
async def create_run(request: RunRequest, ctx: UserContext = Depends(get_required_user_context)):
"""Start a new rendering run. Checks cache before executing."""
# Compute content-addressable run_id
run_id = compute_run_id(request.inputs, request.recipe)
# Generate output name if not provided
output_name = request.output_name or f"{request.recipe}-{run_id[:8]}"
# Use actor_id from user context
actor_id = ctx.actor_id
# Check L1 cache first
cached_run = await database.get_run_cache(run_id)
if cached_run:
output_hash = cached_run["output_hash"]
# Verify the output file still exists in cache
if cache_manager.has_content(output_hash):
logger.info(f"create_run: Cache hit for run_id={run_id[:16]}... output={output_hash[:16]}...")
return RunStatus(
run_id=run_id,
status="completed",
recipe=request.recipe,
inputs=request.inputs,
output_name=output_name,
created_at=cached_run.get("created_at", datetime.now(timezone.utc).isoformat()),
completed_at=cached_run.get("created_at", datetime.now(timezone.utc).isoformat()),
output_hash=output_hash,
username=actor_id,
provenance_cid=cached_run.get("provenance_cid"),
)
else:
logger.info(f"create_run: Cache entry exists but output missing, will re-run")
# Check L2 if not in L1
l2_server = ctx.l2_server
try:
l2_resp = http_requests.get(
f"{l2_server}/assets/by-run-id/{run_id}",
timeout=10
)
if l2_resp.status_code == 200:
l2_data = l2_resp.json()
output_hash = l2_data.get("output_hash")
ipfs_cid = l2_data.get("ipfs_cid")
if output_hash and ipfs_cid:
logger.info(f"create_run: Found on L2, pulling from IPFS: {ipfs_cid}")
# Pull from IPFS to L1 cache
import ipfs_client
legacy_dir = CACHE_DIR / "legacy"
legacy_dir.mkdir(parents=True, exist_ok=True)
recovery_path = legacy_dir / output_hash
if ipfs_client.get_file(ipfs_cid, str(recovery_path)):
# File retrieved - put() updates indexes, but file is already in legacy location
# Just update the content and IPFS indexes manually
cache_manager._set_content_index(output_hash, output_hash)
cache_manager._set_ipfs_index(output_hash, ipfs_cid)
# Save to run cache
await database.save_run_cache(
run_id=run_id,
output_hash=output_hash,
recipe=request.recipe,
inputs=request.inputs,
ipfs_cid=ipfs_cid,
provenance_cid=l2_data.get("provenance_cid"),
actor_id=actor_id,
)
logger.info(f"create_run: Recovered from L2/IPFS: {output_hash[:16]}...")
return RunStatus(
run_id=run_id,
status="completed",
recipe=request.recipe,
inputs=request.inputs,
output_name=output_name,
created_at=datetime.now(timezone.utc).isoformat(),
completed_at=datetime.now(timezone.utc).isoformat(),
output_hash=output_hash,
username=actor_id,
provenance_cid=l2_data.get("provenance_cid"),
)
except Exception as e:
logger.warning(f"create_run: L2 lookup failed (will run Celery): {e}")
# Not cached anywhere - create run record and submit to Celery
run = RunStatus(
run_id=run_id,
status="pending",
recipe=request.recipe,
inputs=request.inputs,
output_name=output_name,
created_at=datetime.now(timezone.utc).isoformat(),
username=actor_id
)
# Submit to Celery
if request.use_dag or request.recipe == "dag":
# DAG mode - use artdag engine
if request.dag_json:
# Custom DAG provided
dag_json = request.dag_json
else:
# Build simple effect DAG from recipe and inputs
dag = build_effect_dag(request.inputs, request.recipe)
dag_json = dag.to_json()
task = execute_dag.delay(dag_json, run.run_id)
else:
# Legacy mode - single effect
if len(request.inputs) != 1:
raise HTTPException(400, "Legacy mode only supports single-input recipes. Use use_dag=true for multi-input.")
input_hash = request.inputs[0]
task = render_effect.delay(input_hash, request.recipe, output_name)
run.celery_task_id = task.id
run.status = "running"
await asyncio.to_thread(save_run, run)
return run
def _check_celery_task_sync(task_id: str) -> tuple[bool, bool, Optional[dict], Optional[str]]:
"""Check Celery task status synchronously. Returns (is_ready, is_successful, result, error)."""
task = celery_app.AsyncResult(task_id)
if not task.ready():
return (False, False, None, None)
if task.successful():
return (True, True, task.result, None)
else:
return (True, False, None, str(task.result))
@app.get("/runs/{run_id}", response_model=RunStatus)
async def get_run(run_id: str):
"""Get status of a run."""
start = time.time()
logger.info(f"get_run: Starting for {run_id}")
t0 = time.time()
run = await asyncio.to_thread(load_run, run_id)
logger.info(f"get_run: load_run took {time.time()-t0:.3f}s, status={run.status if run else 'None'}")
if not run:
raise HTTPException(404, f"Run {run_id} not found")
# Check Celery task status if running
if run.status == "running" and run.celery_task_id:
t0 = time.time()
is_ready, is_successful, result, error = await asyncio.to_thread(
_check_celery_task_sync, run.celery_task_id
)
logger.info(f"get_run: Celery check took {time.time()-t0:.3f}s, ready={is_ready}")
if is_ready:
if is_successful:
run.status = "completed"
run.completed_at = datetime.now(timezone.utc).isoformat()
# Handle both legacy (render_effect) and new (execute_dag/run_plan) result formats
if "output_hash" in result or "output_cache_id" in result:
# New DAG/plan result format
run.output_hash = result.get("output_hash") or result.get("output_cache_id")
run.provenance_cid = result.get("provenance_cid")
output_path = Path(result.get("output_path", "")) if result.get("output_path") else None
# Store plan execution data
run.plan_id = result.get("plan_id")
run.plan_name = result.get("plan_name")
run.step_results = result.get("results") # step_id -> result dict
run.all_outputs = result.get("outputs") # All outputs from all steps
elif "output" in result:
# Legacy render_effect format
run.output_hash = result.get("output", {}).get("content_hash")
run.provenance_cid = result.get("provenance_cid")
output_path = Path(result.get("output", {}).get("local_path", ""))
# Extract effects info from provenance (legacy only)
effects = result.get("effects", [])
if effects:
run.effects_commit = effects[0].get("repo_commit")
run.effect_url = effects[0].get("repo_url")
# Extract infrastructure info (legacy only)
run.infrastructure = result.get("infrastructure")
# Cache the output (legacy mode - DAG/plan already caches via cache_manager)
is_plan_result = "output_hash" in result or "output_cache_id" in result
if output_path and output_path.exists() and not is_plan_result:
t0 = time.time()
await cache_file(output_path, node_type="effect_output")
logger.info(f"get_run: cache_file took {time.time()-t0:.3f}s")
# Record activity for deletion tracking (legacy mode)
if run.output_hash and run.inputs:
await asyncio.to_thread(
cache_manager.record_simple_activity,
input_hashes=run.inputs,
output_hash=run.output_hash,
run_id=run.run_id,
)
# Save to run cache for content-addressable lookup
if run.output_hash:
ipfs_cid = cache_manager._get_ipfs_cid_from_index(run.output_hash)
await database.save_run_cache(
run_id=run.run_id,
output_hash=run.output_hash,
recipe=run.recipe,
inputs=run.inputs,
ipfs_cid=ipfs_cid,
provenance_cid=run.provenance_cid,
actor_id=run.username,
)
logger.info(f"get_run: Saved run cache for {run.run_id[:16]}...")
else:
run.status = "failed"
run.error = error
# Save updated status
t0 = time.time()
await asyncio.to_thread(save_run, run)
logger.info(f"get_run: save_run took {time.time()-t0:.3f}s")
logger.info(f"get_run: Total time {time.time()-start:.3f}s")
return run
@app.delete("/runs/{run_id}")
async def discard_run(run_id: str, ctx: UserContext = Depends(get_required_user_context)):
"""
Discard (delete) a run and its outputs.
Enforces deletion rules:
- Cannot discard if output is published to L2 (pinned)
- Deletes outputs and intermediate cache entries
- Preserves inputs (cache items and recipes are NOT deleted)
"""
run = await asyncio.to_thread(load_run, run_id)
if not run:
raise HTTPException(404, f"Run {run_id} not found")
# Check ownership
if run.username not in (ctx.username, ctx.actor_id):
raise HTTPException(403, "Access denied")
# Failed runs can always be deleted (no output to protect)
if run.status != "failed":
# Only check if output is pinned - inputs are preserved, not deleted
if run.output_hash:
meta = await database.load_item_metadata(run.output_hash, ctx.actor_id)
if meta.get("pinned"):
pin_reason = meta.get("pin_reason", "published")
raise HTTPException(400, f"Cannot discard run: output {run.output_hash[:16]}... is pinned ({pin_reason})")
# Check if activity exists for this run
activity = await asyncio.to_thread(cache_manager.get_activity, run_id)
if activity:
# Discard the activity - only delete outputs, preserve inputs
success, msg = await asyncio.to_thread(cache_manager.discard_activity_outputs_only, run_id)
if not success:
raise HTTPException(400, f"Cannot discard run: {msg}")
# Remove from Redis
await asyncio.to_thread(redis_client.delete, f"{RUNS_KEY_PREFIX}{run_id}")
return {"discarded": True, "run_id": run_id}
@app.delete("/ui/runs/{run_id}/discard", response_class=HTMLResponse)
async def ui_discard_run(run_id: str, request: Request):
"""HTMX handler: discard a run. Only deletes outputs, preserves inputs."""
ctx = await get_user_context_from_cookie(request)
if not ctx:
return '
Login required
'
run = await asyncio.to_thread(load_run, run_id)
if not run:
return '
Run not found
'
# Check ownership
if run.username not in (ctx.username, ctx.actor_id):
return '
Access denied
'
# Failed runs can always be deleted
if run.status != "failed":
# Only check if output is pinned - inputs are preserved, not deleted
if run.output_hash:
meta = await database.load_item_metadata(run.output_hash, ctx.actor_id)
if meta.get("pinned"):
pin_reason = meta.get("pin_reason", "published")
return f'
Cannot discard: output is pinned ({pin_reason})
'
# Check if activity exists for this run
activity = await asyncio.to_thread(cache_manager.get_activity, run_id)
if activity:
# Discard the activity - only delete outputs, preserve inputs
success, msg = await asyncio.to_thread(cache_manager.discard_activity_outputs_only, run_id)
if not success:
return f'
Cannot discard: {msg}
'
# Remove from Redis
await asyncio.to_thread(redis_client.delete, f"{RUNS_KEY_PREFIX}{run_id}")
return '''
'''
@app.get("/run/{run_id}")
async def run_detail(run_id: str, request: Request):
"""Run detail. HTML for browsers, JSON for APIs."""
run = await asyncio.to_thread(load_run, run_id)
if not run:
if wants_html(request):
content = f'
Run not found: {run_id}
'
return HTMLResponse(render_page("Not Found", content, None, active_tab="runs"), status_code=404)
raise HTTPException(404, f"Run {run_id} not found")
# Check Celery task status if running
if run.status == "running" and run.celery_task_id:
is_ready, is_successful, result, error = await asyncio.to_thread(
_check_celery_task_sync, run.celery_task_id
)
if is_ready:
if is_successful:
run.status = "completed"
run.completed_at = datetime.now(timezone.utc).isoformat()
run.output_hash = result.get("output", {}).get("content_hash")
effects = result.get("effects", [])
if effects:
run.effects_commit = effects[0].get("repo_commit")
run.effect_url = effects[0].get("repo_url")
run.infrastructure = result.get("infrastructure")
output_path = Path(result.get("output", {}).get("local_path", ""))
if output_path.exists():
await cache_file(output_path)
# Save to run cache for content-addressable lookup
if run.output_hash:
ipfs_cid = cache_manager._get_ipfs_cid_from_index(run.output_hash)
await database.save_run_cache(
run_id=run.run_id,
output_hash=run.output_hash,
recipe=run.recipe,
inputs=run.inputs,
ipfs_cid=ipfs_cid,
provenance_cid=run.provenance_cid,
actor_id=run.username,
)
else:
run.status = "failed"
run.error = error
await asyncio.to_thread(save_run, run)
if wants_html(request):
ctx = await get_user_context_from_cookie(request)
if not ctx:
content = '
Not logged in.
'
return HTMLResponse(render_page("Login Required", content, None, active_tab="runs"), status_code=401)
# Check user owns this run
if run.username not in (ctx.username, ctx.actor_id):
content = '
Access denied.
'
return HTMLResponse(render_page("Access Denied", content, ctx.actor_id, active_tab="runs"), status_code=403)
# Build effect URL
if run.effect_url:
effect_url = run.effect_url
elif run.effects_commit and run.effects_commit != "unknown":
effect_url = f"https://git.rose-ash.com/art-dag/effects/src/commit/{run.effects_commit}/{run.recipe}"
else:
effect_url = f"https://git.rose-ash.com/art-dag/effects/src/branch/main/{run.recipe}"
# Status badge colors
status_colors = {
"completed": "bg-green-600 text-white",
"running": "bg-yellow-600 text-white",
"failed": "bg-red-600 text-white",
"pending": "bg-gray-600 text-white"
}
status_badge = status_colors.get(run.status, "bg-gray-600 text-white")
# Try to get input names from recipe
input_names = {}
recipe_name = run.recipe.replace("recipe:", "") if run.recipe.startswith("recipe:") else run.recipe
for recipe in list_all_recipes():
if recipe.name == recipe_name:
# Match variable inputs first, then fixed inputs
for i, var_input in enumerate(recipe.variable_inputs):
if i < len(run.inputs):
input_names[run.inputs[i]] = var_input.name
# Fixed inputs follow variable inputs
offset = len(recipe.variable_inputs)
for i, fixed_input in enumerate(recipe.fixed_inputs):
idx = offset + i
if idx < len(run.inputs):
input_names[run.inputs[idx]] = fixed_input.asset
break
# Build media HTML for inputs and output
media_html = ""
available_inputs = [inp for inp in run.inputs if cache_manager.has_content(inp)]
has_output = run.status == "completed" and run.output_hash and cache_manager.has_content(run.output_hash)
if available_inputs or has_output:
# Flexible grid - more columns for more items
num_items = len(available_inputs) + (1 if has_output else 0)
grid_cols = min(num_items, 3) # Max 3 columns
media_html = f'
'''
# Publish section - check if already published to L2
publish_html = ""
if run.status == "completed" and run.output_hash:
l2_shares = await database.get_l2_shares(run.output_hash, ctx.actor_id)
if l2_shares:
# Already published - show link to L2
share = l2_shares[0]
l2_server = share.get("l2_server", "")
l2_https = l2_server.replace("http://", "https://")
asset_name = share.get("asset_name", "")
activity_id = share.get("activity_id")
# Link to activity if available, otherwise fall back to asset
l2_link = f"{l2_https}/activities/{activity_id}" if activity_id else f"{l2_https}/assets/{asset_name}"
publish_html = f'''
'''
completed_html = ""
if run.completed_at:
completed_html = f'''
Completed
{run.completed_at[:19].replace('T', ' ')}
'''
# Sub-navigation tabs for run detail pages
sub_tabs_html = render_run_sub_tabs(run_id, active="overview")
content = f'''
Back to runs
{sub_tabs_html}
'
return HTMLResponse(render_page("Login Required", content, None, active_tab="runs"), status_code=401)
run = await asyncio.to_thread(load_run, run_id)
if not run:
content = f'
Run not found: {run_id}
'
return HTMLResponse(render_page("Not Found", content, ctx.actor_id, active_tab="runs"), status_code=404)
# Check user owns this run
if run.username not in (ctx.username, ctx.actor_id):
content = '
Access denied.
'
return HTMLResponse(render_page("Access Denied", content, ctx.actor_id, active_tab="runs"), status_code=403)
# Try to load existing plan from cache
plan_data = None
PLAN_CACHE_DIR.mkdir(parents=True, exist_ok=True)
# Look for plan file matching this run
for plan_file in PLAN_CACHE_DIR.glob("*.json"):
try:
with open(plan_file) as f:
data = json.load(f)
# Check if this plan matches our run inputs
plan_inputs = data.get("input_hashes", {})
if set(plan_inputs.values()) == set(run.inputs):
plan_data = data
break
except (json.JSONDecodeError, IOError):
continue
# If no cached plan, try to generate one from the recipe
if not plan_data:
recipe_name = run.recipe.replace("recipe:", "") if run.recipe.startswith("recipe:") else run.recipe
recipe_status = None
for recipe in list_all_recipes():
if recipe.name == recipe_name:
recipe_status = recipe
break
if recipe_status:
recipe_path = cache_manager.get_by_content_hash(recipe_status.recipe_id)
if recipe_path and recipe_path.exists():
try:
recipe_yaml = recipe_path.read_text()
# Build input_hashes mapping from run inputs
input_hashes = {}
for i, var_input in enumerate(recipe_status.variable_inputs):
if i < len(run.inputs):
input_hashes[var_input.node_id] = run.inputs[i]
# Try to generate plan using the orchestrate module
try:
from tasks.orchestrate import generate_plan as gen_plan_task
# Call synchronously (it's fast for just planning)
plan_result = gen_plan_task(recipe_yaml, input_hashes)
if plan_result and plan_result.get("status") == "planned":
plan_data = plan_result
except ImportError:
pass
except Exception as e:
logger.warning(f"Failed to generate plan for run {run_id}: {e}")
# Build sub-navigation tabs
tabs_html = render_run_sub_tabs(run_id, active="plan")
if not plan_data:
# Show a simpler visualization based on the run's recipe structure
content = f'''
Back to runs
{tabs_html}
Execution Plan
Could not generate execution plan for this run.
This may be a legacy effect-based run without a recipe, or the recipe is no longer available.
'''
return HTMLResponse(render_page_with_cytoscape(f"Plan: {run_id[:16]}...", content, ctx.actor_id, active_tab="runs"))
# Build Cytoscape nodes and edges from plan
nodes = []
edges = []
steps = plan_data.get("steps", [])
for step in steps:
node_type = step.get("node_type", "EFFECT")
color = NODE_COLORS.get(node_type, NODE_COLORS["default"])
step_id = step.get("step_id", "")
cache_id = step.get("cache_id", "")
# Check if this step's output exists in cache (completed)
# For completed runs, check the actual cache
has_cached = cache_manager.has_content(cache_id) if cache_id else False
if has_cached:
status = "cached"
elif run.status == "completed":
# Run completed but this step not in cache - still mark as done
status = "cached"
elif run.status == "running":
status = "running"
else:
status = "pending"
# Use human-readable name if available, otherwise short step_id
step_name = step.get("name", "")
if step_name:
# Use last part of dotted name for label
label_parts = step_name.split(".")
label = label_parts[-1] if label_parts else step_name
else:
label = step_id[:12] + "..." if len(step_id) > 12 else step_id
nodes.append({
"data": {
"id": step_id,
"label": label,
"name": step_name,
"nodeType": node_type,
"level": step.get("level", 0),
"cacheId": cache_id,
"status": status,
"color": color,
"config": step.get("config"),
"hasCached": has_cached,
}
})
# Build edges from the full plan JSON if available
if "plan_json" in plan_data:
try:
full_plan = json.loads(plan_data["plan_json"])
for step in full_plan.get("steps", []):
step_id = step.get("step_id", "")
for input_step in step.get("input_steps", []):
edges.append({
"data": {
"source": input_step,
"target": step_id
}
})
except json.JSONDecodeError:
pass
else:
# Build edges directly from steps
for step in steps:
step_id = step.get("step_id", "")
for input_step in step.get("input_steps", []):
edges.append({
"data": {
"source": input_step,
"target": step_id
}
})
nodes_json = json.dumps(nodes)
edges_json = json.dumps(edges)
dag_html = render_dag_cytoscape(nodes_json, edges_json)
# Stats summary - count from built nodes to reflect actual execution status
total = len(nodes)
cached_count = sum(1 for n in nodes if n["data"]["status"] == "cached")
completed_count = sum(1 for n in nodes if n["data"]["status"] == "completed")
running_count = sum(1 for n in nodes if n["data"]["status"] == "running")
pending_count = total - cached_count - completed_count - running_count
# Plan name for display
plan_name = plan_data.get("recipe", run.recipe.replace("recipe:", ""))
content = f'''
Back to runs
{tabs_html}
Execution Plan: {plan_name}
{total}
Total Steps
{completed_count}
Completed
{cached_count}
Cached
{pending_count}
Pending
SOURCE
EFFECT
_LIST
Cached
{dag_html}
Execution Steps
'''
# Build steps list with cache_id links
for i, step in enumerate(steps):
step_id = step.get("step_id", "")
step_name = step.get("name", step_id[:20])
node_type = step.get("node_type", "EFFECT")
cache_id = step.get("cache_id", "")
has_cached = cache_manager.has_content(cache_id) if cache_id else False
color = NODE_COLORS.get(node_type, NODE_COLORS["default"])
status_badge = ""
if has_cached:
status_badge = 'cached'
elif run.status == "completed":
status_badge = 'completed'
cache_link = ""
if cache_id:
if has_cached:
cache_link = f'''
'''
# Add collapsible Plan JSON section
# Parse nested plan_json if present (it's double-encoded as a string)
display_plan = plan_data.copy()
if "plan_json" in display_plan and isinstance(display_plan["plan_json"], str):
try:
display_plan["plan_json"] = json.loads(display_plan["plan_json"])
except json.JSONDecodeError:
pass
plan_json_str = json.dumps(display_plan, indent=2)
# Escape HTML entities in JSON
plan_json_str = plan_json_str.replace("&", "&").replace("<", "<").replace(">", ">")
content += f'''
Show Plan JSON
{plan_json_str}
'''
return HTMLResponse(render_page_with_cytoscape(f"Plan: {run_id[:16]}...", content, ctx.actor_id, active_tab="runs"))
@app.get("/run/{run_id}/analysis", response_class=HTMLResponse)
async def run_analysis_page(run_id: str, request: Request):
"""Show analysis results for run inputs."""
ctx = await get_user_context_from_cookie(request)
if not ctx:
content = '
Not logged in.
'
return HTMLResponse(render_page("Login Required", content, None, active_tab="runs"), status_code=401)
run = await asyncio.to_thread(load_run, run_id)
if not run:
content = f'
Run not found: {run_id}
'
return HTMLResponse(render_page("Not Found", content, ctx.actor_id, active_tab="runs"), status_code=404)
# Check user owns this run
if run.username not in (ctx.username, ctx.actor_id):
content = '
Access denied.
'
return HTMLResponse(render_page("Access Denied", content, ctx.actor_id, active_tab="runs"), status_code=403)
tabs_html = render_run_sub_tabs(run_id, active="analysis")
# Load analysis results for each input
analysis_html = ""
ANALYSIS_CACHE_DIR.mkdir(parents=True, exist_ok=True)
for i, input_hash in enumerate(run.inputs):
analysis_path = ANALYSIS_CACHE_DIR / f"{input_hash}.json"
analysis_data = None
if analysis_path.exists():
try:
with open(analysis_path) as f:
analysis_data = json.load(f)
except (json.JSONDecodeError, IOError):
pass
input_name = f"Input {i + 1}"
if analysis_data:
tempo = analysis_data.get("tempo", "N/A")
if isinstance(tempo, float):
tempo = f"{tempo:.1f}"
beat_times = analysis_data.get("beat_times", [])
beat_count = len(beat_times)
energy = analysis_data.get("energy")
# Beat visualization (simple bar chart showing beat positions)
beat_bars = ""
if beat_times and len(beat_times) > 0:
# Show first 50 beats as vertical bars
display_beats = beat_times[:50]
max_time = max(display_beats) if display_beats else 1
for bt in display_beats:
# Normalize to percentage
pos = (bt / max_time) * 100 if max_time > 0 else 0
beat_bars += f''
energy_bar = ""
if energy is not None:
try:
energy_pct = min(float(energy) * 100, 100)
energy_bar = f'''
{analysis_html}
'''
return HTMLResponse(render_page(f"Analysis: {run_id[:16]}...", content, ctx.actor_id, active_tab="runs"))
@app.get("/run/{run_id}/artifacts", response_class=HTMLResponse)
async def run_artifacts_page(run_id: str, request: Request):
"""Show all cached artifacts produced by this run."""
ctx = await get_user_context_from_cookie(request)
if not ctx:
content = '
Not logged in.
'
return HTMLResponse(render_page("Login Required", content, None, active_tab="runs"), status_code=401)
run = await asyncio.to_thread(load_run, run_id)
if not run:
content = f'
Run not found: {run_id}
'
return HTMLResponse(render_page("Not Found", content, ctx.actor_id, active_tab="runs"), status_code=404)
# Check user owns this run
if run.username not in (ctx.username, ctx.actor_id):
content = '
Access denied.
'
return HTMLResponse(render_page("Access Denied", content, ctx.actor_id, active_tab="runs"), status_code=403)
tabs_html = render_run_sub_tabs(run_id, active="artifacts")
# Collect all artifacts: inputs + output
artifacts = []
# Add inputs
for i, content_hash in enumerate(run.inputs):
cache_path = get_cache_path(content_hash)
if cache_path and cache_path.exists():
size = cache_path.stat().st_size
media_type = detect_media_type(cache_path)
artifacts.append({
"hash": content_hash,
"path": cache_path,
"size": size,
"media_type": media_type,
"role": "input",
"role_color": "blue",
"name": f"Input {i + 1}",
})
# Add output
if run.output_hash:
cache_path = get_cache_path(run.output_hash)
if cache_path and cache_path.exists():
size = cache_path.stat().st_size
media_type = detect_media_type(cache_path)
artifacts.append({
"hash": run.output_hash,
"path": cache_path,
"size": size,
"media_type": media_type,
"role": "output",
"role_color": "green",
"name": "Output",
})
# Build artifacts HTML
artifacts_html = ""
for artifact in artifacts:
size_kb = artifact["size"] / 1024
if size_kb < 1024:
size_str = f"{size_kb:.1f} KB"
else:
size_str = f"{size_kb/1024:.1f} MB"
# Thumbnail for media
thumb = ""
if artifact["media_type"] == "video":
thumb = f''
elif artifact["media_type"] == "image":
thumb = f''
else:
thumb = '
'''
return HTMLResponse(render_page(f"Artifacts: {run_id[:16]}...", content, ctx.actor_id, active_tab="runs"))
# JSON API endpoints for future WebSocket support
@app.get("/api/run/{run_id}/plan")
async def api_run_plan(run_id: str, request: Request):
"""Get execution plan data as JSON for programmatic access."""
ctx = await get_user_context_from_cookie(request)
if not ctx:
raise HTTPException(401, "Not logged in")
run = await asyncio.to_thread(load_run, run_id)
if not run:
raise HTTPException(404, f"Run {run_id} not found")
if run.username not in (ctx.username, ctx.actor_id):
raise HTTPException(403, "Access denied")
# Look for plan in cache
PLAN_CACHE_DIR.mkdir(parents=True, exist_ok=True)
for plan_file in PLAN_CACHE_DIR.glob("*.json"):
try:
with open(plan_file) as f:
data = json.load(f)
plan_inputs = data.get("input_hashes", {})
if set(plan_inputs.values()) == set(run.inputs):
return data
except (json.JSONDecodeError, IOError):
continue
return {"status": "not_found", "message": "No plan found for this run"}
@app.get("/api/run/{run_id}/analysis")
async def api_run_analysis(run_id: str, request: Request):
"""Get analysis data as JSON for programmatic access."""
ctx = await get_user_context_from_cookie(request)
if not ctx:
raise HTTPException(401, "Not logged in")
run = await asyncio.to_thread(load_run, run_id)
if not run:
raise HTTPException(404, f"Run {run_id} not found")
if run.username not in (ctx.username, ctx.actor_id):
raise HTTPException(403, "Access denied")
ANALYSIS_CACHE_DIR.mkdir(parents=True, exist_ok=True)
results = {}
for input_hash in run.inputs:
analysis_path = ANALYSIS_CACHE_DIR / f"{input_hash}.json"
if analysis_path.exists():
try:
with open(analysis_path) as f:
results[input_hash] = json.load(f)
except (json.JSONDecodeError, IOError):
results[input_hash] = None
else:
results[input_hash] = None
return {"run_id": run_id, "inputs": run.inputs, "analysis": results}
@app.get("/runs")
async def list_runs(request: Request, page: int = 1, limit: int = 20):
"""List runs. HTML for browsers (with infinite scroll), JSON for APIs (with pagination)."""
ctx = await get_user_context_from_cookie(request)
all_runs = await asyncio.to_thread(list_all_runs)
total = len(all_runs)
# Filter by user if logged in for HTML
if wants_html(request) and ctx:
all_runs = [r for r in all_runs if r.username in (ctx.username, ctx.actor_id)]
total = len(all_runs)
# Pagination
start = (page - 1) * limit
end = start + limit
runs_page = all_runs[start:end]
has_more = end < total
if wants_html(request):
if not ctx:
content = '
Not logged in.
'
return HTMLResponse(render_page("Runs", content, None, active_tab="runs"))
if not runs_page:
if page == 1:
content = '
You have no runs yet. Use the CLI to start a run.
'
else:
return HTMLResponse("") # Empty for infinite scroll
else:
# Status badge colors
status_colors = {
"completed": "bg-green-600 text-white",
"running": "bg-yellow-600 text-white",
"failed": "bg-red-600 text-white",
"pending": "bg-gray-600 text-white"
}
html_parts = []
for run in runs_page:
status_badge = status_colors.get(run.status, "bg-gray-600 text-white")
html_parts.append(f'''
')
# For infinite scroll, just return cards if not first page
if page > 1:
if has_more:
html_parts.append(f'''
Loading more...
''')
return HTMLResponse('\n'.join(html_parts))
# First page - full content
infinite_scroll_trigger = ""
if has_more:
infinite_scroll_trigger = f'''
Loading more...
'''
content = f'''
Runs ({total} total)
{''.join(html_parts)}
{infinite_scroll_trigger}
'''
return HTMLResponse(render_page("Runs", content, ctx.actor_id, active_tab="runs"))
# JSON response for APIs
return {
"runs": [r.model_dump() for r in runs_page],
"pagination": {
"page": page,
"limit": limit,
"total": total,
"has_more": has_more
}
}
# ============ Recipe Endpoints ============
@app.post("/recipes/upload")
async def upload_recipe(file: UploadFile = File(...), ctx: UserContext = Depends(get_required_user_context)):
"""Upload a recipe YAML file. Requires authentication."""
import tempfile
# Read file content
content = await file.read()
try:
yaml_content = content.decode('utf-8')
except UnicodeDecodeError:
raise HTTPException(400, "Recipe file must be valid UTF-8 text")
# Validate YAML
try:
yaml.safe_load(yaml_content)
except yaml.YAMLError as e:
raise HTTPException(400, f"Invalid YAML: {e}")
# Store YAML file in cache
with tempfile.NamedTemporaryFile(delete=False, suffix=".yaml") as tmp:
tmp.write(content)
tmp_path = Path(tmp.name)
cached, ipfs_cid = cache_manager.put(tmp_path, node_type="recipe", move=True)
recipe_hash = cached.content_hash
# Parse and save metadata
actor_id = ctx.actor_id
try:
recipe_status = parse_recipe_yaml(yaml_content, recipe_hash, actor_id)
except Exception as e:
raise HTTPException(400, f"Failed to parse recipe: {e}")
await asyncio.to_thread(save_recipe, recipe_status)
# Save cache metadata to database
await database.save_item_metadata(
content_hash=recipe_hash,
actor_id=actor_id,
item_type="recipe",
filename=file.filename,
description=recipe_status.name # Use recipe name as description
)
return {
"recipe_id": recipe_hash,
"name": recipe_status.name,
"version": recipe_status.version,
"variable_inputs": len(recipe_status.variable_inputs),
"fixed_inputs": len(recipe_status.fixed_inputs)
}
@app.get("/recipes")
async def list_recipes_api(request: Request, page: int = 1, limit: int = 20):
"""List recipes. HTML for browsers, JSON for APIs."""
ctx = await get_user_context_from_cookie(request)
all_recipes = await asyncio.to_thread(list_all_recipes)
if wants_html(request):
# HTML response
if not ctx:
return HTMLResponse(render_page(
"Recipes",
'
Not logged in.
',
None,
active_tab="recipes"
))
# Filter to user's recipes
user_recipes = [c for c in all_recipes if c.uploader in (ctx.username, ctx.actor_id)]
total = len(user_recipes)
if not user_recipes:
content = '''
Recipes (0)
No recipes yet. Upload a recipe YAML file to get started.
'''
return HTMLResponse(render_page("Recipes", content, ctx.actor_id, active_tab="recipes"))
html_parts = []
for recipe in user_recipes:
var_count = len(recipe.variable_inputs)
fixed_count = len(recipe.fixed_inputs)
input_info = []
if var_count:
input_info.append(f"{var_count} variable")
if fixed_count:
input_info.append(f"{fixed_count} fixed")
inputs_str = ", ".join(input_info) if input_info else "no inputs"
html_parts.append(f'''
v{recipe.version}
{pinned_badge}
{l2_link_html}
View DAG
{recipe.description or 'No description'}
{recipe.recipe_id}
{fixed_inputs_html}
Recipe Source
{recipe_source_escaped}
Run this Recipe
'''
return HTMLResponse(render_page(f"Recipe: {recipe.name}", content, ctx.actor_id if ctx else None, active_tab="recipes"))
@app.get("/recipe/{recipe_id}/dag", response_class=HTMLResponse)
async def recipe_dag_visualization(recipe_id: str, request: Request):
"""Visualize recipe structure as DAG."""
ctx = await get_user_context_from_cookie(request)
recipe = load_recipe(recipe_id)
if not recipe:
return HTMLResponse(render_page_with_cytoscape(
"Recipe Not Found",
f'
Recipe {recipe_id} not found.
',
ctx.actor_id if ctx else None,
active_tab="recipes"
), status_code=404)
# Load recipe YAML
recipe_path = cache_manager.get_by_content_hash(recipe_id)
if not recipe_path or not recipe_path.exists():
return HTMLResponse(render_page_with_cytoscape(
"Recipe Not Found",
'
Recipe file not found in cache.
',
ctx.actor_id if ctx else None,
active_tab="recipes"
), status_code=404)
try:
recipe_yaml = recipe_path.read_text()
config = yaml.safe_load(recipe_yaml)
except Exception as e:
return HTMLResponse(render_page_with_cytoscape(
"Error",
f'
Failed to parse recipe: {e}
',
ctx.actor_id if ctx else None,
active_tab="recipes"
), status_code=500)
dag_config = config.get("dag", {})
dag_nodes = dag_config.get("nodes", [])
output_node = dag_config.get("output")
# Build Cytoscape nodes and edges
nodes = []
edges = []
for node_def in dag_nodes:
node_id = node_def.get("id", "")
node_type = node_def.get("type", "EFFECT")
node_config = node_def.get("config", {})
input_names = node_def.get("inputs", [])
# Determine if this is the output node
is_output = node_id == output_node
if is_output:
color = NODE_COLORS.get("OUTPUT", NODE_COLORS["default"])
else:
color = NODE_COLORS.get(node_type, NODE_COLORS["default"])
# Get effect name if it's an effect node
label = node_id
if node_type == "EFFECT" and "effect" in node_config:
label = node_config["effect"]
nodes.append({
"data": {
"id": node_id,
"label": label,
"nodeType": node_type,
"isOutput": is_output,
"color": color,
"config": node_config
}
})
# Create edges from inputs
for input_name in input_names:
edges.append({
"data": {
"source": input_name,
"target": node_id
}
})
nodes_json = json.dumps(nodes)
edges_json = json.dumps(edges)
dag_html = render_dag_cytoscape(nodes_json, edges_json)
content = f'''
Click on a node to see its configuration. The purple-bordered node is the output.
{dag_html}
'''
return HTMLResponse(render_page_with_cytoscape(f"DAG: {recipe.name}", content, ctx.actor_id if ctx else None, active_tab="recipes"))
@app.post("/ui/recipes/{recipe_id}/run", response_class=HTMLResponse)
async def ui_run_recipe(recipe_id: str, request: Request):
"""HTMX handler: run a recipe with form inputs using 3-phase orchestration."""
ctx = await get_user_context_from_cookie(request)
if not ctx:
return '
Login required
'
recipe = load_recipe(recipe_id)
if not recipe:
return '
Recipe not found
'
# Parse form data
form_data = await request.form()
input_hashes = {}
for var_input in recipe.variable_inputs:
value = form_data.get(var_input.node_id, "").strip()
if var_input.required and not value:
return f'
Missing required input: {var_input.name}
'
if value:
input_hashes[var_input.node_id] = value
# Load recipe YAML
recipe_path = cache_manager.get_by_content_hash(recipe_id)
if not recipe_path:
return '
Recipe YAML not found in cache
'
try:
recipe_yaml = recipe_path.read_text()
# Compute deterministic run_id
run_id = compute_run_id(
list(input_hashes.values()),
recipe.name,
recipe_id # recipe_id is already the content hash
)
# Check if already completed
cached = await database.get_run_cache(run_id)
if cached:
output_hash = cached.get("output_hash")
if cache_manager.has_content(output_hash):
return f'''
'''
except Exception as e:
logger.error(f"Recipe run failed: {e}")
return f'
Error: {str(e)}
'
@app.get("/ui/recipes-list", response_class=HTMLResponse)
async def ui_recipes_list(request: Request):
"""HTMX partial: list of recipes."""
ctx = await get_user_context_from_cookie(request)
if not ctx:
return '
Not logged in.
'
all_recipes = list_all_recipes()
# Filter to user's recipes
user_recipes = [c for c in all_recipes if c.uploader in (ctx.username, ctx.actor_id)]
if not user_recipes:
return '
No recipes yet. Upload a recipe YAML file to get started.
'
html_parts = ['
']
for recipe in user_recipes:
var_count = len(recipe.variable_inputs)
fixed_count = len(recipe.fixed_inputs)
input_info = []
if var_count:
input_info.append(f"{var_count} variable")
if fixed_count:
input_info.append(f"{fixed_count} fixed")
inputs_str = ", ".join(input_info) if input_info else "no inputs"
html_parts.append(f'''
'''
@app.get("/cache/{content_hash}")
async def get_cached(content_hash: str, request: Request):
"""Get cached content by hash. Content negotiation: HTML for browsers, JSON for APIs, file for downloads."""
start = time.time()
accept = request.headers.get("accept", "")
logger.info(f"get_cached: {content_hash[:16]}... Accept={accept[:50]}")
ctx = await get_user_context_from_cookie(request)
cache_path = get_cache_path(content_hash)
if not cache_path:
logger.info(f"get_cached: Not found, took {time.time()-start:.3f}s")
if wants_html(request):
content = f'
Content not found: {content_hash}
'
return HTMLResponse(render_page("Not Found", content, ctx.actor_id if ctx else None, active_tab="media"), status_code=404)
raise HTTPException(404, f"Content {content_hash} not in cache")
# JSON response only if explicitly requested
if "application/json" in accept and "text/html" not in accept:
t0 = time.time()
meta = await database.load_item_metadata(content_hash, ctx.actor_id if ctx else None)
logger.debug(f"get_cached: load_item_metadata took {time.time()-t0:.3f}s")
t0 = time.time()
cache_item = await database.get_cache_item(content_hash)
logger.debug(f"get_cached: get_cache_item took {time.time()-t0:.3f}s")
ipfs_cid = cache_item.get("ipfs_cid") if cache_item else None
file_size = cache_path.stat().st_size
# Use stored type from metadata, fall back to auto-detection
stored_type = meta.get("type") if meta else None
if stored_type == "recipe":
media_type = "recipe"
else:
media_type = detect_media_type(cache_path)
logger.info(f"get_cached: JSON response, ipfs_cid={ipfs_cid[:16] if ipfs_cid else 'None'}..., took {time.time()-start:.3f}s")
return {
"content_hash": content_hash,
"size": file_size,
"media_type": media_type,
"ipfs_cid": ipfs_cid,
"meta": meta
}
# HTML response for browsers (default for all non-JSON requests)
# Raw data is only served from /cache/{hash}/raw endpoint
if True: # Always show HTML page, raw data via /raw endpoint
if not ctx:
content = '
Not logged in.
'
return HTMLResponse(render_page("Login Required", content, None, active_tab="media"), status_code=401)
# Check user has access
user_hashes = await get_user_cache_hashes(ctx.username, ctx.actor_id)
if content_hash not in user_hashes:
content = '
Access denied.
'
return HTMLResponse(render_page("Access Denied", content, ctx.actor_id, active_tab="media"), status_code=403)
media_type = detect_media_type(cache_path)
file_size = cache_path.stat().st_size
size_str = f"{file_size:,} bytes"
if file_size > 1024*1024:
size_str = f"{file_size/(1024*1024):.1f} MB"
elif file_size > 1024:
size_str = f"{file_size/1024:.1f} KB"
# Get IPFS CID from database
cache_item = await database.get_cache_item(content_hash)
ipfs_cid = cache_item.get("ipfs_cid") if cache_item else None
# Build media display HTML
if media_type == "video":
video_src = video_src_for_request(content_hash, request)
media_html = f''
elif media_type == "image":
media_html = f''
else:
media_html = f'
Published to L2 ({len(l2_shares)} share{"s" if len(l2_shares) != 1 else ""})
{shares_html}
'''
else:
# Show publish form only if origin is set
if origin_type:
publish_html = f'''
'''
else:
publish_html = '''
Set an origin (self or external URL) before publishing.
'''
return f'''
Metadata
Publish to L2 (ActivityPub)
{publish_html}
Status
Pinned:
{'Yes' if pinned else 'No'}
{f'({pin_reason})' if pinned and pin_reason else ''}
Pinned items cannot be discarded. Items are pinned when published or used as inputs to published content.
{'
Cannot discard pinned items.
' if pinned else f"""
"""}
'''
@app.patch("/ui/cache/{content_hash}/meta", response_class=HTMLResponse)
async def ui_update_cache_meta(content_hash: str, request: Request):
"""HTMX handler: update cache metadata from form."""
ctx = await get_user_context_from_cookie(request)
if not ctx:
return '
Login required
'
# Check ownership
user_hashes = await get_user_cache_hashes(ctx.username, ctx.actor_id)
if content_hash not in user_hashes:
return '
Access denied
'
# Parse form data
form = await request.form()
origin_type = form.get("origin_type", "")
origin_url = form.get("origin_url", "").strip()
origin_note = form.get("origin_note", "").strip()
description = form.get("description", "").strip()
tags_str = form.get("tags", "").strip()
# Build origin
source_type = None
if origin_type == "self":
source_type = "self"
elif origin_type == "external":
if not origin_url:
return '
External origin requires a URL
'
source_type = "external"
# Parse tags
tags = [t.strip() for t in tags_str.split(",") if t.strip()] if tags_str else []
# Save to database
await database.update_item_metadata(
content_hash=content_hash,
actor_id=ctx.actor_id,
item_type="media",
description=description if description else None,
source_type=source_type,
source_url=origin_url if origin_url else None,
source_note=origin_note if origin_note else None,
tags=tags
)
return '
Metadata saved!
'
@app.post("/ui/cache/{content_hash}/publish", response_class=HTMLResponse)
async def ui_publish_cache(content_hash: str, request: Request):
"""HTMX handler: publish cache item to L2."""
ctx = await get_user_context_from_cookie(request)
if not ctx:
return '
Login required
'
token = request.cookies.get("auth_token")
if not token:
return '
Auth token required
'
# Check ownership
user_hashes = await get_user_cache_hashes(ctx.username, ctx.actor_id)
if content_hash not in user_hashes:
return '
Access denied
'
# Parse form
form = await request.form()
asset_name = form.get("asset_name", "").strip()
asset_type = form.get("asset_type", "image")
if not asset_name:
return '
Asset name required
'
# Load metadata from database
meta = await database.load_item_metadata(content_hash, ctx.actor_id)
origin = meta.get("origin")
if not origin or "type" not in origin:
return '
'''
@app.patch("/ui/cache/{content_hash}/republish", response_class=HTMLResponse)
async def ui_republish_cache(content_hash: str, request: Request):
"""HTMX handler: re-publish (update) cache item on L2."""
ctx = await get_user_context_from_cookie(request)
if not ctx:
return '
Login required
'
token = request.cookies.get("auth_token")
if not token:
return '
Auth token required
'
# Check ownership
user_hashes = await get_user_cache_hashes(ctx.username, ctx.actor_id)
if content_hash not in user_hashes:
return '
Access denied
'
# Load metadata
meta = await database.load_item_metadata(content_hash, ctx.actor_id)
l2_shares = meta.get("l2_shares", [])
# Find share for current L2 server (user's L2)
l2_server = ctx.l2_server
current_share = None
share_index = -1
for i, share in enumerate(l2_shares):
if share.get("l2_server") == l2_server:
current_share = share
share_index = i
break
if not current_share:
return '
Item not published to this L2 yet
'
asset_name = current_share.get("asset_name")
if not asset_name:
return '
'
@app.get("/media")
async def list_media(
request: Request,
page: int = 1,
limit: int = 20,
folder: Optional[str] = None,
collection: Optional[str] = None,
tag: Optional[str] = None
):
"""List media items. HTML for browsers (with infinite scroll), JSON for APIs (with pagination)."""
ctx = await get_user_context_from_cookie(request)
if wants_html(request):
# Require login for HTML media view
if not ctx:
content = '
Not logged in.
'
return HTMLResponse(render_page("Media", content, None, active_tab="media"))
# Get hashes owned by/associated with this user
user_hashes = await get_user_cache_hashes(ctx.username, ctx.actor_id)
# Get cache items that belong to the user (from cache_manager)
cache_items = []
seen_hashes = set() # Deduplicate by content_hash
for cached_file in cache_manager.list_all():
content_hash = cached_file.content_hash
if content_hash not in user_hashes:
continue
# Skip duplicates (same content from multiple runs)
if content_hash in seen_hashes:
continue
seen_hashes.add(content_hash)
# Skip recipes - they have their own section
if cached_file.node_type == "recipe":
continue
meta = await database.load_item_metadata(content_hash, ctx.actor_id)
# Apply folder filter
if folder:
item_folder = meta.get("folder", "/")
if folder != "/" and not item_folder.startswith(folder):
continue
if folder == "/" and item_folder != "/":
continue
# Apply collection filter
if collection:
if collection not in meta.get("collections", []):
continue
# Apply tag filter
if tag:
if tag not in meta.get("tags", []):
continue
cache_items.append({
"hash": content_hash,
"size": cached_file.size_bytes,
"mtime": cached_file.created_at,
"meta": meta
})
# Sort by modification time (newest first)
cache_items.sort(key=lambda x: x["mtime"], reverse=True)
total = len(cache_items)
# Pagination
start = (page - 1) * limit
end = start + limit
items_page = cache_items[start:end]
has_more = end < total
if not items_page:
if page == 1:
filter_msg = ""
if folder:
filter_msg = f" in folder {folder}"
elif collection:
filter_msg = f" in collection '{collection}'"
elif tag:
filter_msg = f" with tag '{tag}'"
content = f'
No media{filter_msg}. Upload files or run effects to see them here.
'
else:
return HTMLResponse("") # Empty for infinite scroll
else:
html_parts = []
for item in items_page:
content_hash = item["hash"]
cache_path = get_cache_path(content_hash)
media_type = detect_media_type(cache_path) if cache_path else "unknown"
# Format size
size = item["size"]
if size > 1024*1024:
size_str = f"{size/(1024*1024):.1f} MB"
elif size > 1024:
size_str = f"{size/1024:.1f} KB"
else:
size_str = f"{size} bytes"
html_parts.append(f'''
')
# For infinite scroll, just return cards if not first page
if page > 1:
if has_more:
query_params = f"page={page + 1}"
if folder:
query_params += f"&folder={folder}"
if collection:
query_params += f"&collection={collection}"
if tag:
query_params += f"&tag={tag}"
html_parts.append(f'''
Loading more...
''')
return HTMLResponse('\n'.join(html_parts))
# First page - full content
infinite_scroll_trigger = ""
if has_more:
query_params = "page=2"
if folder:
query_params += f"&folder={folder}"
if collection:
query_params += f"&collection={collection}"
if tag:
query_params += f"&tag={tag}"
infinite_scroll_trigger = f'''
Loading more...
'''
content = f'''
Media ({total} items)
{''.join(html_parts)}
{infinite_scroll_trigger}
'''
return HTMLResponse(render_page("Media", content, ctx.actor_id, active_tab="media"))
# JSON response for APIs - list all hashes with optional pagination
all_hashes = [cf.content_hash for cf in cache_manager.list_all()]
total = len(all_hashes)
start = (page - 1) * limit
end = start + limit
hashes_page = all_hashes[start:end]
has_more = end < total
return {
"hashes": hashes_page,
"pagination": {
"page": page,
"limit": limit,
"total": total,
"has_more": has_more
}
}
@app.delete("/cache/{content_hash}")
async def discard_cache(content_hash: str, ctx: UserContext = Depends(get_required_user_context)):
"""
Discard (delete) a cached item.
Enforces deletion rules:
- Cannot delete items published to L2 (shared)
- Cannot delete inputs/outputs of activities (runs)
- Cannot delete pinned items
"""
# Check if content exists
if not cache_manager.has_content(content_hash):
raise HTTPException(404, "Content not found")
# Check ownership
user_hashes = await get_user_cache_hashes(ctx.username, ctx.actor_id)
if content_hash not in user_hashes:
raise HTTPException(403, "Access denied")
# Check if pinned
meta = await database.load_item_metadata(content_hash, ctx.actor_id)
if meta.get("pinned"):
pin_reason = meta.get("pin_reason", "unknown")
raise HTTPException(400, f"Cannot discard pinned item (reason: {pin_reason})")
# Check if used by any run (Redis runs, not just activity store)
runs_using = await asyncio.to_thread(find_runs_using_content, content_hash)
if runs_using:
run, role = runs_using[0]
raise HTTPException(400, f"Cannot discard: item is {role} of run {run.run_id}")
# Check deletion rules via cache_manager (L2 shared status, activity store)
can_delete, reason = await asyncio.to_thread(cache_manager.can_delete, content_hash)
if not can_delete:
raise HTTPException(400, f"Cannot discard: {reason}")
# Delete via cache_manager
success, msg = await asyncio.to_thread(cache_manager.delete_by_content_hash, content_hash)
if not success:
# Fallback to legacy deletion
cache_path = get_cache_path(content_hash)
if cache_path and cache_path.exists():
cache_path.unlink()
# Clean up legacy metadata files
meta_path = CACHE_DIR / f"{content_hash}.meta.json"
if meta_path.exists():
meta_path.unlink()
mp4_path = CACHE_DIR / f"{content_hash}.mp4"
if mp4_path.exists():
mp4_path.unlink()
return {"discarded": True, "content_hash": content_hash}
@app.delete("/ui/cache/{content_hash}/discard", response_class=HTMLResponse)
async def ui_discard_cache(content_hash: str, request: Request):
"""HTMX handler: discard a cached item."""
ctx = await get_user_context_from_cookie(request)
if not ctx:
return '
Login required
'
# Check ownership
user_hashes = await get_user_cache_hashes(ctx.username, ctx.actor_id)
if content_hash not in user_hashes:
return '
Access denied
'
# Check if content exists
has_content = await asyncio.to_thread(cache_manager.has_content, content_hash)
if not has_content:
return '
Content not found
'
# Check if pinned
meta = await database.load_item_metadata(content_hash, ctx.actor_id)
if meta.get("pinned"):
pin_reason = meta.get("pin_reason", "unknown")
return f'
Cannot discard: item is pinned ({pin_reason})
'
# Check if used by any run (Redis runs, not just activity store)
runs_using = await asyncio.to_thread(find_runs_using_content, content_hash)
if runs_using:
run, role = runs_using[0]
return f'
Cannot discard: item is {role} of run {run.run_id}
'
# Check deletion rules via cache_manager (L2 shared status, activity store)
can_delete, reason = await asyncio.to_thread(cache_manager.can_delete, content_hash)
if not can_delete:
return f'
Cannot discard: {reason}
'
# Delete via cache_manager
success, msg = await asyncio.to_thread(cache_manager.delete_by_content_hash, content_hash)
if not success:
# Fallback to legacy deletion
cache_path = get_cache_path(content_hash)
if cache_path and cache_path.exists():
cache_path.unlink()
# Clean up legacy metadata files
meta_path = CACHE_DIR / f"{content_hash}.meta.json"
if meta_path.exists():
meta_path.unlink()
mp4_path = CACHE_DIR / f"{content_hash}.mp4"
if mp4_path.exists():
mp4_path.unlink()
return '''
'''
# Known assets (bootstrap data)
KNOWN_ASSETS = {
"cat": "33268b6e167deaf018cc538de12dbe562612b33e89a749391cef855b320a269b",
}
@app.get("/assets")
async def list_assets():
"""List known assets."""
return KNOWN_ASSETS
@app.post("/cache/import")
async def import_to_cache(path: str):
"""Import a local file to cache."""
source = Path(path)
if not source.exists():
raise HTTPException(404, f"File not found: {path}")
content_hash = await cache_file(source)
return {"content_hash": content_hash, "cached": True}
def save_cache_meta(content_hash: str, uploader: str = None, filename: str = None, **updates):
"""Save or update metadata for a cached file."""
meta_path = CACHE_DIR / f"{content_hash}.meta.json"
# Load existing or create new
if meta_path.exists():
with open(meta_path) as f:
meta = json.load(f)
else:
meta = {
"uploader": uploader,
"uploaded_at": datetime.now(timezone.utc).isoformat(),
"filename": filename
}
# Apply updates (but never change uploader or uploaded_at)
for key, value in updates.items():
if key not in ("uploader", "uploaded_at"):
meta[key] = value
with open(meta_path, "w") as f:
json.dump(meta, f, indent=2)
return meta
def load_cache_meta(content_hash: str) -> dict:
"""Load metadata for a cached file."""
meta_path = CACHE_DIR / f"{content_hash}.meta.json"
if meta_path.exists():
with open(meta_path) as f:
return json.load(f)
return {}
# User data storage (folders, collections)
USER_DATA_DIR = CACHE_DIR / ".user-data"
def load_user_data(username: str) -> dict:
"""Load user's folders and collections."""
USER_DATA_DIR.mkdir(parents=True, exist_ok=True)
# Normalize username (remove @ prefix if present)
safe_name = username.replace("@", "").replace("/", "_")
user_file = USER_DATA_DIR / f"{safe_name}.json"
if user_file.exists():
with open(user_file) as f:
return json.load(f)
return {"folders": ["/"], "collections": []}
def save_user_data(username: str, data: dict):
"""Save user's folders and collections."""
USER_DATA_DIR.mkdir(parents=True, exist_ok=True)
safe_name = username.replace("@", "").replace("/", "_")
user_file = USER_DATA_DIR / f"{safe_name}.json"
with open(user_file, "w") as f:
json.dump(data, f, indent=2)
async def get_user_cache_hashes(username: str, actor_id: Optional[str] = None) -> set:
"""Get all cache hashes owned by or associated with a user.
username: The plain username
actor_id: The full actor ID (@user@server), if available
"""
# Match against both formats for backwards compatibility
match_values = [username]
if actor_id:
match_values.append(actor_id)
hashes = set()
# Query database for items owned by user (new system)
if actor_id:
try:
db_items = await database.get_user_items(actor_id)
for item in db_items:
hashes.add(item["content_hash"])
except Exception:
pass # Database may not be initialized
# Legacy: Files uploaded by user (JSON metadata)
if CACHE_DIR.exists():
for f in CACHE_DIR.iterdir():
if f.name.endswith('.meta.json'):
try:
meta_path = CACHE_DIR / f.name
if meta_path.exists():
import json
with open(meta_path, 'r') as mf:
meta = json.load(mf)
if meta.get("uploader") in match_values:
hashes.add(f.name.replace('.meta.json', ''))
except Exception:
pass
# Files from user's runs (inputs and outputs)
for run in list_all_runs():
if run.username in match_values:
hashes.update(run.inputs)
if run.output_hash:
hashes.add(run.output_hash)
return hashes
@app.post("/cache/upload")
async def upload_to_cache(file: UploadFile = File(...), ctx: UserContext = Depends(get_required_user_context)):
"""Upload a file to cache. Requires authentication."""
# Write to temp file first
import tempfile
with tempfile.NamedTemporaryFile(delete=False) as tmp:
content = await file.read()
tmp.write(content)
tmp_path = Path(tmp.name)
# Store in cache via cache_manager
cached, ipfs_cid = cache_manager.put(tmp_path, node_type="upload", move=True)
content_hash = cached.content_hash
# Save to cache_items table (with IPFS CID)
await database.create_cache_item(content_hash, ipfs_cid)
# Save uploader metadata to database
await database.save_item_metadata(
content_hash=content_hash,
actor_id=ctx.actor_id,
item_type="media",
filename=file.filename
)
return {"content_hash": content_hash, "filename": file.filename, "size": len(content)}
class CacheMetaUpdate(BaseModel):
"""Request to update cache metadata."""
origin: Optional[dict] = None # {"type": "self"|"external", "url": "...", "note": "..."}
description: Optional[str] = None
tags: Optional[list[str]] = None
folder: Optional[str] = None
collections: Optional[list[str]] = None
class PublishRequest(BaseModel):
"""Request to publish a cache item to L2."""
asset_name: str
asset_type: str = "image" # image, video, etc.
class AddStorageRequest(BaseModel):
"""Request to add a storage provider."""
provider_type: str # 'pinata', 'web3storage', 'local', etc.
provider_name: Optional[str] = None # User-friendly name
config: dict # Provider-specific config (api_key, path, etc.)
capacity_gb: int # Storage capacity in GB
class UpdateStorageRequest(BaseModel):
"""Request to update a storage provider."""
config: Optional[dict] = None
capacity_gb: Optional[int] = None
is_active: Optional[bool] = None
@app.get("/cache/{content_hash}/meta")
async def get_cache_meta(content_hash: str, ctx: UserContext = Depends(get_required_user_context)):
"""Get metadata for a cached file."""
# Check file exists
cache_path = get_cache_path(content_hash)
if not cache_path:
raise HTTPException(404, "Content not found")
# Check ownership
user_hashes = await get_user_cache_hashes(ctx.username, ctx.actor_id)
if content_hash not in user_hashes:
raise HTTPException(403, "Access denied")
return await database.load_item_metadata(content_hash, ctx.actor_id)
@app.patch("/cache/{content_hash}/meta")
async def update_cache_meta(content_hash: str, update: CacheMetaUpdate, ctx: UserContext = Depends(get_required_user_context)):
"""Update metadata for a cached file."""
# Check file exists
cache_path = get_cache_path(content_hash)
if not cache_path:
raise HTTPException(404, "Content not found")
# Check ownership
user_hashes = await get_user_cache_hashes(ctx.username, ctx.actor_id)
if content_hash not in user_hashes:
raise HTTPException(403, "Access denied")
# Build update dict from non-None fields
updates = {}
if update.origin is not None:
updates["origin"] = update.origin
if update.description is not None:
updates["description"] = update.description
if update.tags is not None:
updates["tags"] = update.tags
if update.folder is not None:
# Ensure folder exists in user's folder list
user_data = load_user_data(ctx.username)
if update.folder not in user_data["folders"]:
raise HTTPException(400, f"Folder does not exist: {update.folder}")
updates["folder"] = update.folder
if update.collections is not None:
# Validate collections exist
user_data = load_user_data(ctx.username)
existing = {c["name"] for c in user_data["collections"]}
for col in update.collections:
if col not in existing:
raise HTTPException(400, f"Collection does not exist: {col}")
updates["collections"] = update.collections
meta = await database.update_item_metadata(content_hash, ctx.actor_id, **updates)
return meta
@app.post("/cache/{content_hash}/publish")
async def publish_cache_to_l2(
content_hash: str,
req: PublishRequest,
request: Request,
ctx: UserContext = Depends(get_required_user_context)
):
"""
Publish a cache item to L2 (ActivityPub).
Requires origin to be set in metadata before publishing.
"""
# Check file exists
cache_path = get_cache_path(content_hash)
if not cache_path:
raise HTTPException(404, "Content not found")
# Check ownership
user_hashes = await get_user_cache_hashes(ctx.username, ctx.actor_id)
if content_hash not in user_hashes:
raise HTTPException(403, "Access denied")
# Load metadata
meta = await database.load_item_metadata(content_hash, ctx.actor_id)
# Check origin is set
origin = meta.get("origin")
if not origin or "type" not in origin:
raise HTTPException(400, "Origin must be set before publishing. Use --origin self or --origin-url ")
# Get IPFS CID from cache item
cache_item = await database.get_cache_item(content_hash)
ipfs_cid = cache_item.get("ipfs_cid") if cache_item else None
# Get auth token to pass to L2
token = request.cookies.get("auth_token")
if not token:
# Try from header
auth_header = request.headers.get("Authorization", "")
if auth_header.startswith("Bearer "):
token = auth_header[7:]
if not token:
raise HTTPException(401, "Authentication token required")
# Call L2 publish-cache endpoint (use user's L2 server)
l2_server = ctx.l2_server
try:
resp = http_requests.post(
f"{l2_server}/assets/publish-cache",
headers={"Authorization": f"Bearer {token}"},
json={
"content_hash": content_hash,
"ipfs_cid": ipfs_cid,
"asset_name": req.asset_name,
"asset_type": req.asset_type,
"origin": origin,
"description": meta.get("description"),
"tags": meta.get("tags", []),
"metadata": {
"filename": meta.get("filename"),
"folder": meta.get("folder"),
"collections": meta.get("collections", [])
}
},
timeout=10
)
resp.raise_for_status()
l2_result = resp.json()
except http_requests.exceptions.HTTPError as e:
error_detail = ""
try:
error_detail = e.response.json().get("detail", str(e))
except Exception:
error_detail = str(e)
raise HTTPException(400, f"L2 publish failed: {error_detail}")
except Exception as e:
raise HTTPException(500, f"L2 publish failed: {e}")
# Update local metadata with publish status and pin
await database.save_l2_share(
content_hash=content_hash,
actor_id=ctx.actor_id,
l2_server=l2_server,
asset_name=req.asset_name,
content_type=req.asset_type
)
await database.update_item_metadata(
content_hash=content_hash,
actor_id=ctx.actor_id,
pinned=True,
pin_reason="published"
)
return {
"published": True,
"asset_name": req.asset_name,
"l2_result": l2_result
}
@app.patch("/cache/{content_hash}/republish")
async def republish_cache_to_l2(
content_hash: str,
request: Request,
ctx: UserContext = Depends(get_required_user_context)
):
"""
Re-publish (update) a cache item on L2 after metadata changes.
Only works for already-published items.
"""
# Check file exists
cache_path = get_cache_path(content_hash)
if not cache_path:
raise HTTPException(404, "Content not found")
# Check ownership
user_hashes = await get_user_cache_hashes(ctx.username, ctx.actor_id)
if content_hash not in user_hashes:
raise HTTPException(403, "Access denied")
# Load metadata
meta = await database.load_item_metadata(content_hash, ctx.actor_id)
l2_shares = meta.get("l2_shares", [])
# Find share for current L2 server (user's L2)
l2_server = ctx.l2_server
current_share = None
for share in l2_shares:
if share.get("l2_server") == l2_server:
current_share = share
break
if not current_share:
raise HTTPException(400, "Item not published yet. Use publish first.")
asset_name = current_share.get("asset_name")
if not asset_name:
raise HTTPException(400, "No asset name found in publish info")
# Get auth token
token = request.cookies.get("auth_token")
if not token:
auth_header = request.headers.get("Authorization", "")
if auth_header.startswith("Bearer "):
token = auth_header[7:]
if not token:
raise HTTPException(401, "Authentication token required")
# Get IPFS CID from cache item
cache_item = await database.get_cache_item(content_hash)
ipfs_cid = cache_item.get("ipfs_cid") if cache_item else None
# Call L2 update endpoint (use user's L2 server)
l2_server = ctx.l2_server
try:
resp = http_requests.patch(
f"{l2_server}/assets/{asset_name}",
headers={"Authorization": f"Bearer {token}"},
json={
"description": meta.get("description"),
"tags": meta.get("tags"),
"origin": meta.get("origin"),
"ipfs_cid": ipfs_cid,
"metadata": {
"filename": meta.get("filename"),
"folder": meta.get("folder"),
"collections": meta.get("collections", [])
}
},
timeout=10
)
resp.raise_for_status()
l2_result = resp.json()
except http_requests.exceptions.HTTPError as e:
error_detail = ""
try:
error_detail = e.response.json().get("detail", str(e))
except Exception:
error_detail = str(e)
raise HTTPException(400, f"L2 update failed: {error_detail}")
except Exception as e:
raise HTTPException(500, f"L2 update failed: {e}")
# Update local metadata - save_l2_share updates last_synced_at on conflict
await database.save_l2_share(
content_hash=content_hash,
actor_id=ctx.actor_id,
l2_server=l2_server,
asset_name=asset_name,
content_type=current_share.get("content_type", "media")
)
return {
"updated": True,
"asset_name": asset_name,
"l2_result": l2_result
}
# ============ L2 Sync ============
def _fetch_l2_outbox_sync(l2_server: str, username: str) -> list:
"""Fetch user's outbox from L2 (sync version for asyncio.to_thread)."""
try:
# Fetch outbox page with activities
resp = http_requests.get(
f"{l2_server}/users/{username}/outbox?page=true",
headers={"Accept": "application/activity+json"},
timeout=10
)
if resp.status_code != 200:
logger.warning(f"L2 outbox fetch failed: {resp.status_code}")
return []
data = resp.json()
return data.get("orderedItems", [])
except Exception as e:
logger.error(f"Failed to fetch L2 outbox: {e}")
return []
@app.post("/user/sync-l2")
async def sync_with_l2(ctx: UserContext = Depends(get_required_user_context)):
"""
Sync local L2 share records with user's L2 outbox.
Fetches user's published assets from their L2 server and updates local tracking.
"""
l2_server = ctx.l2_server
username = ctx.username
# Fetch outbox activities
activities = await asyncio.to_thread(_fetch_l2_outbox_sync, l2_server, username)
if not activities:
return {"synced": 0, "message": "No activities found or L2 unavailable"}
# Process Create activities for assets
synced_count = 0
for activity in activities:
if activity.get("type") != "Create":
continue
obj = activity.get("object", {})
if not isinstance(obj, dict):
continue
# Get asset info - look for content_hash in attachment or directly
content_hash = None
asset_name = obj.get("name", "")
# Check attachments for content hash
for attachment in obj.get("attachment", []):
if attachment.get("name") == "content_hash":
content_hash = attachment.get("value")
break
# Also check if there's a hash in the object URL or ID
if not content_hash:
# Try to extract from object ID like /objects/{hash}
obj_id = obj.get("id", "")
if "/objects/" in obj_id:
content_hash = obj_id.split("/objects/")[-1].split("/")[0]
if not content_hash or not asset_name:
continue
# Check if we have this content locally
cache_path = get_cache_path(content_hash)
if not cache_path:
continue # We don't have this content, skip
# Determine content type from object type
obj_type = obj.get("type", "")
if obj_type == "Video":
content_type = "video"
elif obj_type == "Image":
content_type = "image"
else:
content_type = "media"
# Update local L2 share record
await database.save_l2_share(
content_hash=content_hash,
actor_id=ctx.actor_id,
l2_server=l2_server,
asset_name=asset_name,
content_type=content_type
)
synced_count += 1
return {"synced": synced_count, "total_activities": len(activities)}
@app.post("/ui/sync-l2", response_class=HTMLResponse)
async def ui_sync_with_l2(request: Request):
"""HTMX handler: sync with L2 server."""
ctx = await get_user_context_from_cookie(request)
if not ctx:
return '
Login required
'
try:
result = await sync_with_l2(ctx)
synced = result.get("synced", 0)
total = result.get("total_activities", 0)
if synced > 0:
return f'''
Synced {synced} asset(s) from L2 ({total} activities found)
"""
def render_page(title: str, content: str, actor_id: Optional[str] = None, active_tab: str = None) -> str:
"""Render a page with nav bar and content. Used for clean URL pages.
actor_id: The user's actor ID (@user@server) or None if not logged in.
"""
user_info = ""
if actor_id:
# Extract username and domain from @username@domain format
parts = actor_id.lstrip("@").split("@")
username = parts[0] if parts else actor_id
domain = parts[1] if len(parts) > 1 else ""
l2_user_url = f"https://{domain}/users/{username}" if domain else "#"
user_info = f'''
"""
def render_ui_html(actor_id: Optional[str] = None, tab: str = "runs") -> str:
"""Render main UI HTML with optional user context.
actor_id: The user's actor ID (@user@server) or None if not logged in.
"""
user_info = ""
if actor_id:
# Extract username and domain from @username@domain format
parts = actor_id.lstrip("@").split("@")
username = parts[0] if parts else actor_id
domain = parts[1] if len(parts) > 1 else ""
l2_user_url = f"https://{domain}/users/{username}" if domain else "#"
user_info = f'''
"""
# Auth - L1 doesn't handle login (user logs in at their L2 server)
# Token can be passed via URL from L2 redirect, then L1 sets its own cookie
@app.get("/auth")
async def auth_callback(auth_token: str = None):
"""
Receive auth token from L2 redirect and set local cookie.
This enables cross-subdomain auth on iOS Safari which blocks shared cookies.
"""
if not auth_token:
return RedirectResponse(url="/", status_code=302)
# Verify the token is valid
ctx = await get_verified_user_context(auth_token)
if not ctx:
return RedirectResponse(url="/", status_code=302)
# Register token for this user (for revocation by username later)
register_user_token(ctx.username, auth_token)
# Set local first-party cookie and redirect to home
response = RedirectResponse(url="/runs", status_code=302)
response.set_cookie(
key="auth_token",
value=auth_token,
httponly=True,
max_age=60 * 60 * 24 * 30, # 30 days
samesite="lax",
secure=True
)
return response
@app.get("/logout")
async def logout():
"""Logout - clear local cookie and redirect to home."""
response = RedirectResponse(url="/", status_code=302)
response.delete_cookie("auth_token")
return response
@app.post("/auth/revoke")
async def auth_revoke(credentials: HTTPAuthorizationCredentials = Depends(security)):
"""
Revoke a token. Called by L2 when user logs out.
The token to revoke is passed in the Authorization header.
"""
if not credentials:
raise HTTPException(401, "No token provided")
token = credentials.credentials
# Verify token is valid before revoking (ensures caller has the token)
ctx = get_user_context_from_token(token)
if not ctx:
raise HTTPException(401, "Invalid token")
# Revoke the token
newly_revoked = revoke_token(token)
return {"revoked": True, "newly_revoked": newly_revoked}
class RevokeUserRequest(BaseModel):
username: str
l2_server: str # L2 server requesting the revocation
@app.post("/auth/revoke-user")
async def auth_revoke_user(request: RevokeUserRequest):
"""
Revoke all tokens for a user. Called by L2 when user logs out.
This handles the case where L2 issued scoped tokens that differ from L2's own token.
"""
# Verify the L2 server is authorized (must be in L1's known list or match token's l2_server)
# For now, we trust any request since this only affects users already on this L1
# Revoke all tokens registered for this user
count = revoke_all_user_tokens(request.username)
return {"revoked": True, "tokens_revoked": count, "username": request.username}
@app.post("/ui/publish-run/{run_id}", response_class=HTMLResponse)
async def ui_publish_run(run_id: str, request: Request):
"""Publish a run to L2 from the web UI. Assets are named by content_hash."""
ctx = await get_user_context_from_cookie(request)
if not ctx:
return HTMLResponse('
Not logged in
')
token = request.cookies.get("auth_token")
if not token:
return HTMLResponse('
Not logged in
')
# Get the run to pin its output and inputs
run = load_run(run_id)
if not run:
return HTMLResponse('
Run not found
')
# Call L2 to publish the run, including this L1's public URL
# Assets are named by their content_hash - no output_name needed
l2_server = ctx.l2_server
try:
resp = http_requests.post(
f"{l2_server}/assets/record-run",
json={"run_id": run_id, "l1_server": L1_PUBLIC_URL},
headers={"Authorization": f"Bearer {token}"},
timeout=30
)
if resp.status_code == 400:
error = resp.json().get("detail", "Bad request")
return HTMLResponse(f'
Error: {error}
')
resp.raise_for_status()
result = resp.json()
# Pin the output and record L2 share
if run.output_hash and result.get("asset"):
await database.update_item_metadata(run.output_hash, ctx.actor_id, pinned=True, pin_reason="published")
# Record L2 share so UI shows published status
cache_path = get_cache_path(run.output_hash)
media_type = detect_media_type(cache_path) if cache_path else "image"
content_type = "video" if media_type == "video" else "image"
# Get activity_id for linking to the published run
activity = result.get("activity")
activity_id = activity.get("activity_id") if activity else None
await database.save_l2_share(
content_hash=run.output_hash,
actor_id=ctx.actor_id,
l2_server=l2_server,
asset_name=result["asset"]["name"],
content_type=content_type,
activity_id=activity_id
)
# Pin the inputs (for provenance)
for input_hash in run.inputs:
await database.update_item_metadata(input_hash, ctx.actor_id, pinned=True, pin_reason="input_to_published")
# If this was a recipe-based run, pin the recipe and its fixed inputs
if run.recipe.startswith("recipe:"):
config_name = run.recipe.replace("recipe:", "")
for recipe in list_all_recipes():
if recipe.name == config_name:
# Pin the recipe YAML
cache_manager.pin(recipe.recipe_id, reason="recipe_for_published")
# Pin all fixed inputs referenced by the recipe
for fixed in recipe.fixed_inputs:
if fixed.content_hash:
cache_manager.pin(fixed.content_hash, reason="fixed_input_in_published_recipe")
break
# Use HTTPS for L2 links
l2_https = l2_server.replace("http://", "https://")
asset_name = result["asset"]["name"]
short_name = asset_name[:16] + "..." if len(asset_name) > 20 else asset_name
# Link to activity (the published run) rather than just the asset
activity = result.get("activity")
activity_id = activity.get("activity_id") if activity else None
l2_link = f"{l2_https}/activities/{activity_id}" if activity_id else f"{l2_https}/assets/{asset_name}"
return HTMLResponse(f'''
')
@app.get("/ui/runs", response_class=HTMLResponse)
async def ui_runs(request: Request):
"""HTMX partial: list of runs."""
ctx = await get_user_context_from_cookie(request)
runs = list_all_runs()
# Require login to see runs
if not ctx:
return '
Not logged in.
'
# Filter runs by user - match both plain username and ActivityPub format (@user@domain)
runs = [r for r in runs if r.username in (ctx.username, ctx.actor_id)]
if not runs:
return '
')
return '\n'.join(html_parts)
@app.get("/ui/media-list", response_class=HTMLResponse)
async def ui_media_list(
request: Request,
folder: Optional[str] = None,
collection: Optional[str] = None,
tag: Optional[str] = None
):
"""HTMX partial: list of media items with optional filtering."""
ctx = await get_user_context_from_cookie(request)
# Require login to see media
if not ctx:
return '
Not logged in.
'
# Get hashes owned by/associated with this user
user_hashes = await get_user_cache_hashes(ctx.username, ctx.actor_id)
# Get cache items that belong to the user (from cache_manager)
cache_items = []
seen_hashes = set() # Deduplicate by content_hash
for cached_file in cache_manager.list_all():
content_hash = cached_file.content_hash
if content_hash not in user_hashes:
continue
# Skip duplicates (same content from multiple runs)
if content_hash in seen_hashes:
continue
seen_hashes.add(content_hash)
# Skip recipes - they have their own section
if cached_file.node_type == "recipe":
continue
# Load metadata for filtering
meta = await database.load_item_metadata(content_hash, ctx.actor_id)
# Apply folder filter
if folder:
item_folder = meta.get("folder", "/")
if folder != "/" and not item_folder.startswith(folder):
continue
if folder == "/" and item_folder != "/":
continue
# Apply collection filter
if collection:
if collection not in meta.get("collections", []):
continue
# Apply tag filter
if tag:
if tag not in meta.get("tags", []):
continue
cache_items.append({
"hash": content_hash,
"size": cached_file.size_bytes,
"mtime": cached_file.created_at,
"meta": meta
})
# Sort by modification time (newest first)
cache_items.sort(key=lambda x: x["mtime"], reverse=True)
if not cache_items:
filter_msg = ""
if folder:
filter_msg = f" in folder {folder}"
elif collection:
filter_msg = f" in collection '{collection}'"
elif tag:
filter_msg = f" with tag '{tag}'"
return f'
No cached files{filter_msg}. Upload files or run effects to see them here.
'
html_parts = ['
']
for item in cache_items[:50]: # Limit to 50 items
content_hash = item["hash"]
cache_path = get_cache_path(content_hash)
media_type = detect_media_type(cache_path) if cache_path else "unknown"
# Check IPFS status
cache_item = await database.get_cache_item(content_hash)
ipfs_cid = cache_item.get("ipfs_cid") if cache_item else None
ipfs_badge = 'IPFS' if ipfs_cid else ''
# Check L2 publish status
l2_shares = item["meta"].get("l2_shares", [])
if l2_shares:
first_share = l2_shares[0]
l2_server = first_share.get("l2_server", "")
asset_name = first_share.get("asset_name", "")
asset_url = f"{l2_server}/assets/{asset_name}"
published_badge = f'L2'
else:
published_badge = ''
# Format size
size = item["size"]
if size > 1024*1024:
size_str = f"{size/(1024*1024):.1f} MB"
elif size > 1024:
size_str = f"{size/1024:.1f} KB"
else:
size_str = f"{size} bytes"
html_parts.append(f'''
'
return html
# ============ User Storage Configuration ============
STORAGE_PROVIDERS_INFO = {
"pinata": {"name": "Pinata", "desc": "1GB free, IPFS pinning", "color": "blue"},
"web3storage": {"name": "web3.storage", "desc": "IPFS + Filecoin", "color": "green"},
"nftstorage": {"name": "NFT.Storage", "desc": "Free for NFTs", "color": "pink"},
"infura": {"name": "Infura IPFS", "desc": "5GB free", "color": "orange"},
"filebase": {"name": "Filebase", "desc": "5GB free, S3+IPFS", "color": "cyan"},
"storj": {"name": "Storj", "desc": "25GB free", "color": "indigo"},
"local": {"name": "Local Storage", "desc": "Your own disk", "color": "purple"},
}
@app.get("/storage")
async def list_storage(request: Request):
"""List user's storage providers. HTML for browsers, JSON for API."""
accept = request.headers.get("accept", "")
wants_json = "application/json" in accept and "text/html" not in accept
ctx = await get_user_context_from_cookie(request)
if not ctx:
if wants_json:
raise HTTPException(401, "Authentication required")
return RedirectResponse(url="/auth", status_code=302)
storages = await database.get_user_storage(ctx.actor_id)
# Add usage stats to each storage
for storage in storages:
usage = await database.get_storage_usage(storage["id"])
storage["used_bytes"] = usage["used_bytes"]
storage["pin_count"] = usage["pin_count"]
storage["donated_gb"] = storage["capacity_gb"] // 2
# Mask sensitive config keys for display
if storage.get("config"):
config = storage["config"] if isinstance(storage["config"], dict) else json.loads(storage["config"])
masked = {}
for k, v in config.items():
if "key" in k.lower() or "token" in k.lower() or "secret" in k.lower():
masked[k] = v[:4] + "..." + v[-4:] if len(str(v)) > 8 else "****"
else:
masked[k] = v
storage["config_display"] = masked
if wants_json:
return {"storages": storages}
return await ui_storage_page(ctx.username, storages, request)
@app.post("/storage")
async def add_storage(req: AddStorageRequest, ctx: UserContext = Depends(get_required_user_context)):
"""Add a storage provider."""
valid_types = ["pinata", "web3storage", "nftstorage", "infura", "filebase", "storj", "local"]
if req.provider_type not in valid_types:
raise HTTPException(400, f"Invalid provider type: {req.provider_type}")
# Test the provider connection before saving
provider = storage_providers.create_provider(req.provider_type, {
**req.config,
"capacity_gb": req.capacity_gb
})
if not provider:
raise HTTPException(400, "Failed to create provider with given config")
success, message = await provider.test_connection()
if not success:
raise HTTPException(400, f"Provider connection failed: {message}")
# Save to database
provider_name = req.provider_name or f"{req.provider_type}-{ctx.username}"
storage_id = await database.add_user_storage(
actor_id=ctx.actor_id,
provider_type=req.provider_type,
provider_name=provider_name,
config=req.config,
capacity_gb=req.capacity_gb
)
if not storage_id:
raise HTTPException(500, "Failed to save storage provider")
return {"id": storage_id, "message": f"Storage provider added: {provider_name}"}
@app.post("/storage/add")
async def add_storage_form(
request: Request,
provider_type: str = Form(...),
provider_name: Optional[str] = Form(None),
description: Optional[str] = Form(None),
capacity_gb: int = Form(5),
api_key: Optional[str] = Form(None),
secret_key: Optional[str] = Form(None),
api_token: Optional[str] = Form(None),
project_id: Optional[str] = Form(None),
project_secret: Optional[str] = Form(None),
access_key: Optional[str] = Form(None),
bucket: Optional[str] = Form(None),
path: Optional[str] = Form(None),
):
"""Add a storage provider via HTML form (cookie auth)."""
ctx = await get_user_context_from_cookie(request)
if not ctx:
return HTMLResponse('
Not authenticated
', status_code=401)
valid_types = ["pinata", "web3storage", "nftstorage", "infura", "filebase", "storj", "local"]
if provider_type not in valid_types:
return HTMLResponse(f'
Invalid provider type: {provider_type}
')
# Build config based on provider type
config = {}
if provider_type == "pinata":
if not api_key or not secret_key:
return HTMLResponse('
Pinata requires API Key and Secret Key
')
config = {"api_key": api_key, "secret_key": secret_key}
elif provider_type == "web3storage":
if not api_token:
return HTMLResponse('
web3.storage requires API Token
')
config = {"api_token": api_token}
elif provider_type == "nftstorage":
if not api_token:
return HTMLResponse('
NFT.Storage requires API Token
')
config = {"api_token": api_token}
elif provider_type == "infura":
if not project_id or not project_secret:
return HTMLResponse('
Infura requires Project ID and Project Secret
')
config = {"project_id": project_id, "project_secret": project_secret}
elif provider_type == "filebase":
if not access_key or not secret_key or not bucket:
return HTMLResponse('
Filebase requires Access Key, Secret Key, and Bucket
')
config = {"access_key": access_key, "secret_key": secret_key, "bucket": bucket}
elif provider_type == "storj":
if not access_key or not secret_key or not bucket:
return HTMLResponse('
Storj requires Access Key, Secret Key, and Bucket
')
config = {"access_key": access_key, "secret_key": secret_key, "bucket": bucket}
elif provider_type == "local":
if not path:
return HTMLResponse('
Local storage requires a path
')
config = {"path": path}
# Test the provider connection before saving
provider = storage_providers.create_provider(provider_type, {
**config,
"capacity_gb": capacity_gb
})
if not provider:
return HTMLResponse('
Failed to create provider with given config
')
success, message = await provider.test_connection()
if not success:
return HTMLResponse(f'
Provider connection failed: {message}
')
# Save to database
name = provider_name or f"{provider_type}-{ctx.username}-{len(await database.get_user_storage_by_type(ctx.actor_id, provider_type)) + 1}"
storage_id = await database.add_user_storage(
actor_id=ctx.actor_id,
provider_type=provider_type,
provider_name=name,
config=config,
capacity_gb=capacity_gb,
description=description
)
if not storage_id:
return HTMLResponse('
Failed to save storage provider
')
return HTMLResponse(f'''
Storage provider "{name}" added successfully!
''')
@app.get("/storage/{storage_id}")
async def get_storage(storage_id: int, ctx: UserContext = Depends(get_required_user_context)):
"""Get a specific storage provider."""
storage = await database.get_storage_by_id(storage_id)
if not storage:
raise HTTPException(404, "Storage provider not found")
if storage["actor_id"] != ctx.actor_id:
raise HTTPException(403, "Not authorized")
usage = await database.get_storage_usage(storage_id)
storage["used_bytes"] = usage["used_bytes"]
storage["pin_count"] = usage["pin_count"]
storage["donated_gb"] = storage["capacity_gb"] // 2
return storage
@app.patch("/storage/{storage_id}")
async def update_storage(storage_id: int, req: UpdateStorageRequest, ctx: UserContext = Depends(get_required_user_context)):
"""Update a storage provider."""
storage = await database.get_storage_by_id(storage_id)
if not storage:
raise HTTPException(404, "Storage provider not found")
if storage["actor_id"] != ctx.actor_id:
raise HTTPException(403, "Not authorized")
# If updating config, test the new connection
if req.config:
existing_config = storage["config"] if isinstance(storage["config"], dict) else json.loads(storage["config"])
new_config = {**existing_config, **req.config}
provider = storage_providers.create_provider(storage["provider_type"], {
**new_config,
"capacity_gb": req.capacity_gb or storage["capacity_gb"]
})
if provider:
success, message = await provider.test_connection()
if not success:
raise HTTPException(400, f"Provider connection failed: {message}")
success = await database.update_user_storage(
storage_id,
config=req.config,
capacity_gb=req.capacity_gb,
is_active=req.is_active
)
if not success:
raise HTTPException(500, "Failed to update storage provider")
return {"message": "Storage provider updated"}
@app.delete("/storage/{storage_id}")
async def remove_storage(storage_id: int, request: Request):
"""Remove a storage provider."""
ctx = await get_user_context_from_cookie(request)
if not ctx:
raise HTTPException(401, "Not authenticated")
storage = await database.get_storage_by_id(storage_id)
if not storage:
raise HTTPException(404, "Storage provider not found")
if storage["actor_id"] != ctx.actor_id:
raise HTTPException(403, "Not authorized")
success = await database.remove_user_storage(storage_id)
if not success:
raise HTTPException(500, "Failed to remove storage provider")
if wants_html(request):
return HTMLResponse("")
return {"message": "Storage provider removed"}
@app.post("/storage/{storage_id}/test")
async def test_storage(storage_id: int, request: Request):
"""Test storage provider connectivity."""
ctx = await get_user_context_from_cookie(request)
if not ctx:
if wants_html(request):
return HTMLResponse('Not authenticated', status_code=401)
raise HTTPException(401, "Not authenticated")
storage = await database.get_storage_by_id(storage_id)
if not storage:
if wants_html(request):
return HTMLResponse('Storage not found', status_code=404)
raise HTTPException(404, "Storage provider not found")
if storage["actor_id"] != ctx.actor_id:
if wants_html(request):
return HTMLResponse('Not authorized', status_code=403)
raise HTTPException(403, "Not authorized")
config = storage["config"] if isinstance(storage["config"], dict) else json.loads(storage["config"])
provider = storage_providers.create_provider(storage["provider_type"], {
**config,
"capacity_gb": storage["capacity_gb"]
})
if not provider:
if wants_html(request):
return HTMLResponse('Failed to create provider')
raise HTTPException(500, "Failed to create provider")
success, message = await provider.test_connection()
if wants_html(request):
if success:
return HTMLResponse(f'{message}')
return HTMLResponse(f'{message}')
return {"success": success, "message": message}
@app.get("/storage/type/{provider_type}")
async def storage_type_page(provider_type: str, request: Request):
"""Page for managing storage configs of a specific type."""
ctx = await get_user_context_from_cookie(request)
if not ctx:
return RedirectResponse(url="/auth", status_code=302)
if provider_type not in STORAGE_PROVIDERS_INFO:
raise HTTPException(404, "Invalid provider type")
storages = await database.get_user_storage_by_type(ctx.actor_id, provider_type)
# Add usage stats and mask config
for storage in storages:
usage = await database.get_storage_usage(storage["id"])
storage["used_bytes"] = usage["used_bytes"]
storage["pin_count"] = usage["pin_count"]
# Mask sensitive config keys
if storage.get("config"):
config = storage["config"] if isinstance(storage["config"], dict) else json.loads(storage["config"])
masked = {}
for k, v in config.items():
if "key" in k.lower() or "token" in k.lower() or "secret" in k.lower():
masked[k] = v[:4] + "..." + v[-4:] if len(str(v)) > 8 else "****"
else:
masked[k] = v
storage["config_display"] = masked
info = STORAGE_PROVIDERS_INFO[provider_type]
return await ui_storage_type_page(ctx.username, provider_type, info, storages, request)
async def ui_storage_page(username: str, storages: list, request: Request) -> HTMLResponse:
"""Render the main storage management page."""
# Count by provider type
type_counts = {}
for s in storages:
ptype = s["provider_type"]
type_counts[ptype] = type_counts.get(ptype, 0) + 1
# Build provider type cards
cards = ""
for ptype, info in STORAGE_PROVIDERS_INFO.items():
count = type_counts.get(ptype, 0)
count_badge = f'{count}' if count > 0 else ""
cards += f'''
'''
# Total stats
total_capacity = sum(s["capacity_gb"] for s in storages)
total_used = sum(s["used_bytes"] for s in storages)
total_pins = sum(s["pin_count"] for s in storages)
html = f'''
Storage - Art DAG L1