#!/usr/bin/env python3
"""
Art DAG L1 Server
Manages rendering runs and provides access to the cache.
- POST /runs - start a run (recipe + inputs)
- GET /runs/{run_id} - get run status/result
- GET /cache/{content_hash} - get cached content
"""
import asyncio
import base64
import hashlib
import json
import logging
import os
import time
import uuid
from dataclasses import dataclass
from datetime import datetime, timezone
from pathlib import Path
from typing import Optional
# Configure logging
logging.basicConfig(
level=logging.INFO,
format='%(asctime)s %(levelname)s %(name)s: %(message)s'
)
logger = logging.getLogger(__name__)
from fastapi import FastAPI, HTTPException, UploadFile, File, Depends, Form, Request
from fastapi.responses import FileResponse, HTMLResponse, RedirectResponse
from fastapi.security import HTTPBearer, HTTPAuthorizationCredentials
from pydantic import BaseModel
import redis
import requests as http_requests
from urllib.parse import urlparse
import yaml
from celery_app import app as celery_app
from tasks import render_effect, execute_dag, build_effect_dag
from contextlib import asynccontextmanager
from cache_manager import L1CacheManager, get_cache_manager
import database
# L1 public URL for redirects
L1_PUBLIC_URL = os.environ.get("L1_PUBLIC_URL", "http://localhost:8100")
def compute_run_id(input_hashes: list[str], recipe: str, recipe_hash: str = None) -> str:
"""
Compute a deterministic run_id from inputs and recipe.
The run_id is a SHA3-256 hash of:
- Sorted input content hashes
- Recipe identifier (recipe_hash if provided, else "effect:{recipe}")
This makes runs content-addressable: same inputs + recipe = same run_id.
"""
data = {
"inputs": sorted(input_hashes),
"recipe": recipe_hash or f"effect:{recipe}",
"version": "1", # For future schema changes
}
json_str = json.dumps(data, sort_keys=True, separators=(",", ":"))
return hashlib.sha3_256(json_str.encode()).hexdigest()
# Default L2 for login redirect when not logged in (user can login to any L2)
DEFAULT_L2_SERVER = os.environ.get("DEFAULT_L2_SERVER", "http://localhost:8200")
# IPFS gateway URL for public access to IPFS content
IPFS_GATEWAY_URL = os.environ.get("IPFS_GATEWAY_URL", "")
# Cache directory (use /data/cache in Docker, ~/.artdag/cache locally)
CACHE_DIR = Path(os.environ.get("CACHE_DIR", str(Path.home() / ".artdag" / "cache")))
CACHE_DIR.mkdir(parents=True, exist_ok=True)
# Redis for persistent run storage and shared cache index (multi-worker support)
REDIS_URL = os.environ.get('REDIS_URL', 'redis://localhost:6379/5')
parsed = urlparse(REDIS_URL)
redis_client = redis.Redis(
host=parsed.hostname or 'localhost',
port=parsed.port or 6379,
db=int(parsed.path.lstrip('/') or 0),
socket_timeout=5,
socket_connect_timeout=5
)
RUNS_KEY_PREFIX = "artdag:run:"
RECIPES_KEY_PREFIX = "artdag:recipe:"
# Initialize L1 cache manager with Redis for shared state between workers
cache_manager = L1CacheManager(cache_dir=CACHE_DIR, redis_client=redis_client)
def save_run(run: "RunStatus"):
"""Save run to Redis."""
redis_client.set(f"{RUNS_KEY_PREFIX}{run.run_id}", run.model_dump_json())
def load_run(run_id: str) -> Optional["RunStatus"]:
"""Load run from Redis."""
data = redis_client.get(f"{RUNS_KEY_PREFIX}{run_id}")
if data:
return RunStatus.model_validate_json(data)
return None
def list_all_runs() -> list["RunStatus"]:
"""List all runs from Redis."""
runs = []
for key in redis_client.scan_iter(f"{RUNS_KEY_PREFIX}*"):
data = redis_client.get(key)
if data:
runs.append(RunStatus.model_validate_json(data))
return sorted(runs, key=lambda r: r.created_at, reverse=True)
def find_runs_using_content(content_hash: str) -> list[tuple["RunStatus", str]]:
"""Find all runs that use a content_hash as input or output.
Returns list of (run, role) tuples where role is 'input' or 'output'.
"""
results = []
for run in list_all_runs():
if run.inputs and content_hash in run.inputs:
results.append((run, "input"))
if run.output_hash == content_hash:
results.append((run, "output"))
return results
@asynccontextmanager
async def lifespan(app: FastAPI):
"""Initialize and cleanup resources."""
# Startup: initialize database
await database.init_db()
yield
# Shutdown: close database
await database.close_db()
app = FastAPI(
title="Art DAG L1 Server",
description="Distributed rendering server for Art DAG",
version="0.1.0",
lifespan=lifespan
)
@app.exception_handler(404)
async def not_found_handler(request: Request, exc):
"""Custom 404 page."""
from fastapi.responses import JSONResponse
accept = request.headers.get("accept", "")
if "text/html" in accept:
content = '''
"""
@app.get("/", response_class=HTMLResponse)
async def root(request: Request):
"""Home page."""
ctx = await get_user_context_from_cookie(request)
actor_id = ctx.actor_id if ctx else None
return render_home_html(actor_id)
@app.post("/runs", response_model=RunStatus)
async def create_run(request: RunRequest, ctx: UserContext = Depends(get_required_user_context)):
"""Start a new rendering run. Checks cache before executing."""
# Compute content-addressable run_id
run_id = compute_run_id(request.inputs, request.recipe)
# Generate output name if not provided
output_name = request.output_name or f"{request.recipe}-{run_id[:8]}"
# Use actor_id from user context
actor_id = ctx.actor_id
# Check L1 cache first
cached_run = await database.get_run_cache(run_id)
if cached_run:
output_hash = cached_run["output_hash"]
# Verify the output file still exists in cache
if cache_manager.has_content(output_hash):
logger.info(f"create_run: Cache hit for run_id={run_id[:16]}... output={output_hash[:16]}...")
return RunStatus(
run_id=run_id,
status="completed",
recipe=request.recipe,
inputs=request.inputs,
output_name=output_name,
created_at=cached_run.get("created_at", datetime.now(timezone.utc).isoformat()),
completed_at=cached_run.get("created_at", datetime.now(timezone.utc).isoformat()),
output_hash=output_hash,
username=actor_id,
provenance_cid=cached_run.get("provenance_cid"),
)
else:
logger.info(f"create_run: Cache entry exists but output missing, will re-run")
# Check L2 if not in L1
l2_server = ctx.l2_server
try:
l2_resp = http_requests.get(
f"{l2_server}/assets/by-run-id/{run_id}",
timeout=10
)
if l2_resp.status_code == 200:
l2_data = l2_resp.json()
output_hash = l2_data.get("output_hash")
ipfs_cid = l2_data.get("ipfs_cid")
if output_hash and ipfs_cid:
logger.info(f"create_run: Found on L2, pulling from IPFS: {ipfs_cid}")
# Pull from IPFS to L1 cache
import ipfs_client
legacy_dir = CACHE_DIR / "legacy"
legacy_dir.mkdir(parents=True, exist_ok=True)
recovery_path = legacy_dir / output_hash
if ipfs_client.get_file(ipfs_cid, str(recovery_path)):
# File retrieved - put() updates indexes, but file is already in legacy location
# Just update the content and IPFS indexes manually
cache_manager._set_content_index(output_hash, output_hash)
cache_manager._set_ipfs_index(output_hash, ipfs_cid)
# Save to run cache
await database.save_run_cache(
run_id=run_id,
output_hash=output_hash,
recipe=request.recipe,
inputs=request.inputs,
ipfs_cid=ipfs_cid,
provenance_cid=l2_data.get("provenance_cid"),
actor_id=actor_id,
)
logger.info(f"create_run: Recovered from L2/IPFS: {output_hash[:16]}...")
return RunStatus(
run_id=run_id,
status="completed",
recipe=request.recipe,
inputs=request.inputs,
output_name=output_name,
created_at=datetime.now(timezone.utc).isoformat(),
completed_at=datetime.now(timezone.utc).isoformat(),
output_hash=output_hash,
username=actor_id,
provenance_cid=l2_data.get("provenance_cid"),
)
except Exception as e:
logger.warning(f"create_run: L2 lookup failed (will run Celery): {e}")
# Not cached anywhere - create run record and submit to Celery
run = RunStatus(
run_id=run_id,
status="pending",
recipe=request.recipe,
inputs=request.inputs,
output_name=output_name,
created_at=datetime.now(timezone.utc).isoformat(),
username=actor_id
)
# Submit to Celery
if request.use_dag or request.recipe == "dag":
# DAG mode - use artdag engine
if request.dag_json:
# Custom DAG provided
dag_json = request.dag_json
else:
# Build simple effect DAG from recipe and inputs
dag = build_effect_dag(request.inputs, request.recipe)
dag_json = dag.to_json()
task = execute_dag.delay(dag_json, run.run_id)
else:
# Legacy mode - single effect
if len(request.inputs) != 1:
raise HTTPException(400, "Legacy mode only supports single-input recipes. Use use_dag=true for multi-input.")
input_hash = request.inputs[0]
task = render_effect.delay(input_hash, request.recipe, output_name)
run.celery_task_id = task.id
run.status = "running"
await asyncio.to_thread(save_run, run)
return run
def _check_celery_task_sync(task_id: str) -> tuple[bool, bool, Optional[dict], Optional[str]]:
"""Check Celery task status synchronously. Returns (is_ready, is_successful, result, error)."""
task = celery_app.AsyncResult(task_id)
if not task.ready():
return (False, False, None, None)
if task.successful():
return (True, True, task.result, None)
else:
return (True, False, None, str(task.result))
@app.get("/runs/{run_id}", response_model=RunStatus)
async def get_run(run_id: str):
"""Get status of a run."""
start = time.time()
logger.info(f"get_run: Starting for {run_id}")
t0 = time.time()
run = await asyncio.to_thread(load_run, run_id)
logger.info(f"get_run: load_run took {time.time()-t0:.3f}s, status={run.status if run else 'None'}")
if not run:
raise HTTPException(404, f"Run {run_id} not found")
# Check Celery task status if running
if run.status == "running" and run.celery_task_id:
t0 = time.time()
is_ready, is_successful, result, error = await asyncio.to_thread(
_check_celery_task_sync, run.celery_task_id
)
logger.info(f"get_run: Celery check took {time.time()-t0:.3f}s, ready={is_ready}")
if is_ready:
if is_successful:
run.status = "completed"
run.completed_at = datetime.now(timezone.utc).isoformat()
# Handle both legacy (render_effect) and new (execute_dag) result formats
if "output_hash" in result:
# New DAG result format
run.output_hash = result.get("output_hash")
run.provenance_cid = result.get("provenance_cid")
output_path = Path(result.get("output_path", "")) if result.get("output_path") else None
else:
# Legacy render_effect format
run.output_hash = result.get("output", {}).get("content_hash")
run.provenance_cid = result.get("provenance_cid")
output_path = Path(result.get("output", {}).get("local_path", ""))
# Extract effects info from provenance (legacy only)
effects = result.get("effects", [])
if effects:
run.effects_commit = effects[0].get("repo_commit")
run.effect_url = effects[0].get("repo_url")
# Extract infrastructure info (legacy only)
run.infrastructure = result.get("infrastructure")
# Cache the output (legacy mode - DAG already caches via cache_manager)
if output_path and output_path.exists() and "output_hash" not in result:
t0 = time.time()
await cache_file(output_path, node_type="effect_output")
logger.info(f"get_run: cache_file took {time.time()-t0:.3f}s")
# Record activity for deletion tracking (legacy mode)
if run.output_hash and run.inputs:
await asyncio.to_thread(
cache_manager.record_simple_activity,
input_hashes=run.inputs,
output_hash=run.output_hash,
run_id=run.run_id,
)
# Save to run cache for content-addressable lookup
if run.output_hash:
ipfs_cid = cache_manager._get_ipfs_cid_from_index(run.output_hash)
await database.save_run_cache(
run_id=run.run_id,
output_hash=run.output_hash,
recipe=run.recipe,
inputs=run.inputs,
ipfs_cid=ipfs_cid,
provenance_cid=run.provenance_cid,
actor_id=run.username,
)
logger.info(f"get_run: Saved run cache for {run.run_id[:16]}...")
else:
run.status = "failed"
run.error = error
# Save updated status
t0 = time.time()
await asyncio.to_thread(save_run, run)
logger.info(f"get_run: save_run took {time.time()-t0:.3f}s")
logger.info(f"get_run: Total time {time.time()-start:.3f}s")
return run
@app.delete("/runs/{run_id}")
async def discard_run(run_id: str, ctx: UserContext = Depends(get_required_user_context)):
"""
Discard (delete) a run and its outputs.
Enforces deletion rules:
- Cannot discard if output is published to L2 (pinned)
- Deletes outputs and intermediate cache entries
- Preserves inputs (cache items and recipes are NOT deleted)
"""
run = await asyncio.to_thread(load_run, run_id)
if not run:
raise HTTPException(404, f"Run {run_id} not found")
# Check ownership
if run.username not in (ctx.username, ctx.actor_id):
raise HTTPException(403, "Access denied")
# Failed runs can always be deleted (no output to protect)
if run.status != "failed":
# Only check if output is pinned - inputs are preserved, not deleted
if run.output_hash:
meta = await database.load_item_metadata(run.output_hash, ctx.actor_id)
if meta.get("pinned"):
pin_reason = meta.get("pin_reason", "published")
raise HTTPException(400, f"Cannot discard run: output {run.output_hash[:16]}... is pinned ({pin_reason})")
# Check if activity exists for this run
activity = await asyncio.to_thread(cache_manager.get_activity, run_id)
if activity:
# Discard the activity - only delete outputs, preserve inputs
success, msg = await asyncio.to_thread(cache_manager.discard_activity_outputs_only, run_id)
if not success:
raise HTTPException(400, f"Cannot discard run: {msg}")
# Remove from Redis
await asyncio.to_thread(redis_client.delete, f"{RUNS_KEY_PREFIX}{run_id}")
return {"discarded": True, "run_id": run_id}
@app.delete("/ui/runs/{run_id}/discard", response_class=HTMLResponse)
async def ui_discard_run(run_id: str, request: Request):
"""HTMX handler: discard a run. Only deletes outputs, preserves inputs."""
ctx = await get_user_context_from_cookie(request)
if not ctx:
return '
Login required
'
run = await asyncio.to_thread(load_run, run_id)
if not run:
return '
Run not found
'
# Check ownership
if run.username not in (ctx.username, ctx.actor_id):
return '
Access denied
'
# Failed runs can always be deleted
if run.status != "failed":
# Only check if output is pinned - inputs are preserved, not deleted
if run.output_hash:
meta = await database.load_item_metadata(run.output_hash, ctx.actor_id)
if meta.get("pinned"):
pin_reason = meta.get("pin_reason", "published")
return f'
Cannot discard: output is pinned ({pin_reason})
'
# Check if activity exists for this run
activity = await asyncio.to_thread(cache_manager.get_activity, run_id)
if activity:
# Discard the activity - only delete outputs, preserve inputs
success, msg = await asyncio.to_thread(cache_manager.discard_activity_outputs_only, run_id)
if not success:
return f'
Cannot discard: {msg}
'
# Remove from Redis
await asyncio.to_thread(redis_client.delete, f"{RUNS_KEY_PREFIX}{run_id}")
return '''
'''
@app.get("/run/{run_id}")
async def run_detail(run_id: str, request: Request):
"""Run detail. HTML for browsers, JSON for APIs."""
run = await asyncio.to_thread(load_run, run_id)
if not run:
if wants_html(request):
content = f'
Run not found: {run_id}
'
return HTMLResponse(render_page("Not Found", content, None, active_tab="runs"), status_code=404)
raise HTTPException(404, f"Run {run_id} not found")
# Check Celery task status if running
if run.status == "running" and run.celery_task_id:
is_ready, is_successful, result, error = await asyncio.to_thread(
_check_celery_task_sync, run.celery_task_id
)
if is_ready:
if is_successful:
run.status = "completed"
run.completed_at = datetime.now(timezone.utc).isoformat()
run.output_hash = result.get("output", {}).get("content_hash")
effects = result.get("effects", [])
if effects:
run.effects_commit = effects[0].get("repo_commit")
run.effect_url = effects[0].get("repo_url")
run.infrastructure = result.get("infrastructure")
output_path = Path(result.get("output", {}).get("local_path", ""))
if output_path.exists():
await cache_file(output_path)
# Save to run cache for content-addressable lookup
if run.output_hash:
ipfs_cid = cache_manager._get_ipfs_cid_from_index(run.output_hash)
await database.save_run_cache(
run_id=run.run_id,
output_hash=run.output_hash,
recipe=run.recipe,
inputs=run.inputs,
ipfs_cid=ipfs_cid,
provenance_cid=run.provenance_cid,
actor_id=run.username,
)
else:
run.status = "failed"
run.error = error
await asyncio.to_thread(save_run, run)
if wants_html(request):
ctx = await get_user_context_from_cookie(request)
if not ctx:
content = '
'
return HTMLResponse(render_page("Login Required", content, None, active_tab="runs"), status_code=401)
# Check user owns this run
if run.username not in (ctx.username, ctx.actor_id):
content = '
Access denied.
'
return HTMLResponse(render_page("Access Denied", content, ctx.actor_id, active_tab="runs"), status_code=403)
# Build effect URL
if run.effect_url:
effect_url = run.effect_url
elif run.effects_commit and run.effects_commit != "unknown":
effect_url = f"https://git.rose-ash.com/art-dag/effects/src/commit/{run.effects_commit}/{run.recipe}"
else:
effect_url = f"https://git.rose-ash.com/art-dag/effects/src/branch/main/{run.recipe}"
# Status badge colors
status_colors = {
"completed": "bg-green-600 text-white",
"running": "bg-yellow-600 text-white",
"failed": "bg-red-600 text-white",
"pending": "bg-gray-600 text-white"
}
status_badge = status_colors.get(run.status, "bg-gray-600 text-white")
# Build media HTML for input/output
media_html = ""
has_input = run.inputs and cache_manager.has_content(run.inputs[0])
has_output = run.status == "completed" and run.output_hash and cache_manager.has_content(run.output_hash)
if has_input or has_output:
media_html = '
'''
return HTMLResponse(render_page(f"Run: {run.recipe}", content, ctx.actor_id, active_tab="runs"))
# JSON response
return run.model_dump()
@app.get("/runs")
async def list_runs(request: Request, page: int = 1, limit: int = 20):
"""List runs. HTML for browsers (with infinite scroll), JSON for APIs (with pagination)."""
ctx = await get_user_context_from_cookie(request)
all_runs = await asyncio.to_thread(list_all_runs)
total = len(all_runs)
# Filter by user if logged in for HTML
if wants_html(request) and ctx:
all_runs = [r for r in all_runs if r.username in (ctx.username, ctx.actor_id)]
total = len(all_runs)
# Pagination
start = (page - 1) * limit
end = start + limit
runs_page = all_runs[start:end]
has_more = end < total
if wants_html(request):
if not ctx:
content = '
',
None,
active_tab="recipes"
))
# Filter to user's recipes
user_recipes = [c for c in all_recipes if c.uploader in (ctx.username, ctx.actor_id)]
total = len(user_recipes)
if not user_recipes:
content = '''
Recipes (0)
No recipes yet. Upload a recipe YAML file to get started.
'''
return HTMLResponse(render_page("Recipes", content, ctx.actor_id, active_tab="recipes"))
html_parts = []
for recipe in user_recipes:
var_count = len(recipe.variable_inputs)
fixed_count = len(recipe.fixed_inputs)
input_info = []
if var_count:
input_info.append(f"{var_count} variable")
if fixed_count:
input_info.append(f"{fixed_count} fixed")
inputs_str = ", ".join(input_info) if input_info else "no inputs"
html_parts.append(f'''
'''
return HTMLResponse(render_page(f"Recipe: {recipe.name}", content, ctx.actor_id if ctx else None, active_tab="recipes"))
@app.post("/ui/recipes/{recipe_id}/run", response_class=HTMLResponse)
async def ui_run_recipe(recipe_id: str, request: Request):
"""HTMX handler: run a recipe with form inputs."""
ctx = await get_user_context_from_cookie(request)
if not ctx:
return '
Login required
'
recipe = load_recipe(recipe_id)
if not recipe:
return '
Recipe not found
'
# Parse form data
form_data = await request.form()
inputs = {}
for var_input in recipe.variable_inputs:
value = form_data.get(var_input.node_id, "").strip()
if var_input.required and not value:
return f'
Missing required input: {var_input.name}
'
if value:
inputs[var_input.node_id] = value
# Load recipe YAML
recipe_path = cache_manager.get_by_content_hash(recipe_id)
if not recipe_path:
return '
Recipe YAML not found in cache
'
try:
with open(recipe_path) as f:
yaml_config = yaml.safe_load(f)
# Build DAG from recipe
dag = build_dag_from_recipe(yaml_config, inputs, recipe)
# Create run
run_id = str(uuid.uuid4())
actor_id = ctx.actor_id
# Collect all input hashes
all_inputs = list(inputs.values())
for fixed in recipe.fixed_inputs:
if fixed.content_hash:
all_inputs.append(fixed.content_hash)
run = RunStatus(
run_id=run_id,
status="pending",
recipe=f"recipe:{recipe.name}",
inputs=all_inputs,
output_name=f"{recipe.name}-{run_id[:8]}",
created_at=datetime.now(timezone.utc).isoformat(),
username=actor_id
)
# Submit to Celery
dag_json = dag.to_json()
task = execute_dag.delay(dag_json, run.run_id)
run.celery_task_id = task.id
run.status = "running"
save_run(run)
return f'''
'
all_recipes = list_all_recipes()
# Filter to user's recipes
user_recipes = [c for c in all_recipes if c.uploader in (ctx.username, ctx.actor_id)]
if not user_recipes:
return '
No recipes yet. Upload a recipe YAML file to get started.
'
html_parts = ['
']
for recipe in user_recipes:
var_count = len(recipe.variable_inputs)
fixed_count = len(recipe.fixed_inputs)
input_info = []
if var_count:
input_info.append(f"{var_count} variable")
if fixed_count:
input_info.append(f"{fixed_count} fixed")
inputs_str = ", ".join(input_info) if input_info else "no inputs"
html_parts.append(f'''
'''
@app.get("/cache/{content_hash}")
async def get_cached(content_hash: str, request: Request):
"""Get cached content by hash. Content negotiation: HTML for browsers, JSON for APIs, file for downloads."""
start = time.time()
accept = request.headers.get("accept", "")
logger.info(f"get_cached: {content_hash[:16]}... Accept={accept[:50]}")
ctx = await get_user_context_from_cookie(request)
cache_path = get_cache_path(content_hash)
if not cache_path:
logger.info(f"get_cached: Not found, took {time.time()-start:.3f}s")
if wants_html(request):
content = f'
Content not found: {content_hash}
'
return HTMLResponse(render_page("Not Found", content, ctx.actor_id if ctx else None, active_tab="media"), status_code=404)
raise HTTPException(404, f"Content {content_hash} not in cache")
# JSON response only if explicitly requested
if "application/json" in accept and "text/html" not in accept:
t0 = time.time()
meta = await database.load_item_metadata(content_hash, ctx.actor_id if ctx else None)
logger.debug(f"get_cached: load_item_metadata took {time.time()-t0:.3f}s")
t0 = time.time()
cache_item = await database.get_cache_item(content_hash)
logger.debug(f"get_cached: get_cache_item took {time.time()-t0:.3f}s")
ipfs_cid = cache_item.get("ipfs_cid") if cache_item else None
file_size = cache_path.stat().st_size
# Use stored type from metadata, fall back to auto-detection
stored_type = meta.get("type") if meta else None
if stored_type == "recipe":
media_type = "recipe"
else:
media_type = detect_media_type(cache_path)
logger.info(f"get_cached: JSON response, ipfs_cid={ipfs_cid[:16] if ipfs_cid else 'None'}..., took {time.time()-start:.3f}s")
return {
"content_hash": content_hash,
"size": file_size,
"media_type": media_type,
"ipfs_cid": ipfs_cid,
"meta": meta
}
# HTML response for browsers (default for all non-JSON requests)
# Raw data is only served from /cache/{hash}/raw endpoint
if True: # Always show HTML page, raw data via /raw endpoint
if not ctx:
content = '
Published to L2 ({len(l2_shares)} share{"s" if len(l2_shares) != 1 else ""})
{shares_html}
'''
else:
# Show publish form only if origin is set
if origin_type:
publish_html = f'''
'''
else:
publish_html = '''
Set an origin (self or external URL) before publishing.
'''
return f'''
Metadata
Publish to L2 (ActivityPub)
{publish_html}
Status
Pinned:
{'Yes' if pinned else 'No'}
{f'({pin_reason})' if pinned and pin_reason else ''}
Pinned items cannot be discarded. Items are pinned when published or used as inputs to published content.
{'
Cannot discard pinned items.
' if pinned else f"""
"""}
'''
@app.patch("/ui/cache/{content_hash}/meta", response_class=HTMLResponse)
async def ui_update_cache_meta(content_hash: str, request: Request):
"""HTMX handler: update cache metadata from form."""
ctx = await get_user_context_from_cookie(request)
if not ctx:
return '
Login required
'
# Check ownership
user_hashes = await get_user_cache_hashes(ctx.username, ctx.actor_id)
if content_hash not in user_hashes:
return '
Access denied
'
# Parse form data
form = await request.form()
origin_type = form.get("origin_type", "")
origin_url = form.get("origin_url", "").strip()
origin_note = form.get("origin_note", "").strip()
description = form.get("description", "").strip()
tags_str = form.get("tags", "").strip()
# Build origin
source_type = None
if origin_type == "self":
source_type = "self"
elif origin_type == "external":
if not origin_url:
return '
External origin requires a URL
'
source_type = "external"
# Parse tags
tags = [t.strip() for t in tags_str.split(",") if t.strip()] if tags_str else []
# Save to database
await database.update_item_metadata(
content_hash=content_hash,
actor_id=ctx.actor_id,
item_type="media",
description=description if description else None,
source_type=source_type,
source_url=origin_url if origin_url else None,
source_note=origin_note if origin_note else None,
tags=tags
)
return '
Metadata saved!
'
@app.post("/ui/cache/{content_hash}/publish", response_class=HTMLResponse)
async def ui_publish_cache(content_hash: str, request: Request):
"""HTMX handler: publish cache item to L2."""
ctx = await get_user_context_from_cookie(request)
if not ctx:
return '
Login required
'
token = request.cookies.get("auth_token")
if not token:
return '
Auth token required
'
# Check ownership
user_hashes = await get_user_cache_hashes(ctx.username, ctx.actor_id)
if content_hash not in user_hashes:
return '
Access denied
'
# Parse form
form = await request.form()
asset_name = form.get("asset_name", "").strip()
asset_type = form.get("asset_type", "image")
if not asset_name:
return '
Asset name required
'
# Load metadata from database
meta = await database.load_item_metadata(content_hash, ctx.actor_id)
origin = meta.get("origin")
if not origin or "type" not in origin:
return '
'''
@app.patch("/ui/cache/{content_hash}/republish", response_class=HTMLResponse)
async def ui_republish_cache(content_hash: str, request: Request):
"""HTMX handler: re-publish (update) cache item on L2."""
ctx = await get_user_context_from_cookie(request)
if not ctx:
return '
Login required
'
token = request.cookies.get("auth_token")
if not token:
return '
Auth token required
'
# Check ownership
user_hashes = await get_user_cache_hashes(ctx.username, ctx.actor_id)
if content_hash not in user_hashes:
return '
Access denied
'
# Load metadata
meta = await database.load_item_metadata(content_hash, ctx.actor_id)
l2_shares = meta.get("l2_shares", [])
# Find share for current L2 server (user's L2)
l2_server = ctx.l2_server
current_share = None
share_index = -1
for i, share in enumerate(l2_shares):
if share.get("l2_server") == l2_server:
current_share = share
share_index = i
break
if not current_share:
return '
Item not published to this L2 yet
'
asset_name = current_share.get("asset_name")
if not asset_name:
return '
'
return HTMLResponse(render_page("Media", content, None, active_tab="media"))
# Get hashes owned by/associated with this user
user_hashes = await get_user_cache_hashes(ctx.username, ctx.actor_id)
# Get cache items that belong to the user (from cache_manager)
cache_items = []
seen_hashes = set() # Deduplicate by content_hash
for cached_file in cache_manager.list_all():
content_hash = cached_file.content_hash
if content_hash not in user_hashes:
continue
# Skip duplicates (same content from multiple runs)
if content_hash in seen_hashes:
continue
seen_hashes.add(content_hash)
# Skip recipes - they have their own section
if cached_file.node_type == "recipe":
continue
meta = await database.load_item_metadata(content_hash, ctx.actor_id)
# Apply folder filter
if folder:
item_folder = meta.get("folder", "/")
if folder != "/" and not item_folder.startswith(folder):
continue
if folder == "/" and item_folder != "/":
continue
# Apply collection filter
if collection:
if collection not in meta.get("collections", []):
continue
# Apply tag filter
if tag:
if tag not in meta.get("tags", []):
continue
cache_items.append({
"hash": content_hash,
"size": cached_file.size_bytes,
"mtime": cached_file.created_at,
"meta": meta
})
# Sort by modification time (newest first)
cache_items.sort(key=lambda x: x["mtime"], reverse=True)
total = len(cache_items)
# Pagination
start = (page - 1) * limit
end = start + limit
items_page = cache_items[start:end]
has_more = end < total
if not items_page:
if page == 1:
filter_msg = ""
if folder:
filter_msg = f" in folder {folder}"
elif collection:
filter_msg = f" in collection '{collection}'"
elif tag:
filter_msg = f" with tag '{tag}'"
content = f'
No media{filter_msg}. Upload files or run effects to see them here.
'
else:
return HTMLResponse("") # Empty for infinite scroll
else:
html_parts = []
for item in items_page:
content_hash = item["hash"]
cache_path = get_cache_path(content_hash)
media_type = detect_media_type(cache_path) if cache_path else "unknown"
# Format size
size = item["size"]
if size > 1024*1024:
size_str = f"{size/(1024*1024):.1f} MB"
elif size > 1024:
size_str = f"{size/1024:.1f} KB"
else:
size_str = f"{size} bytes"
html_parts.append(f'''
')
# For infinite scroll, just return cards if not first page
if page > 1:
if has_more:
query_params = f"page={page + 1}"
if folder:
query_params += f"&folder={folder}"
if collection:
query_params += f"&collection={collection}"
if tag:
query_params += f"&tag={tag}"
html_parts.append(f'''
Loading more...
''')
return HTMLResponse('\n'.join(html_parts))
# First page - full content
infinite_scroll_trigger = ""
if has_more:
query_params = "page=2"
if folder:
query_params += f"&folder={folder}"
if collection:
query_params += f"&collection={collection}"
if tag:
query_params += f"&tag={tag}"
infinite_scroll_trigger = f'''
Loading more...
'''
content = f'''
Media ({total} items)
{''.join(html_parts)}
{infinite_scroll_trigger}
'''
return HTMLResponse(render_page("Media", content, ctx.actor_id, active_tab="media"))
# JSON response for APIs - list all hashes with optional pagination
all_hashes = [cf.content_hash for cf in cache_manager.list_all()]
total = len(all_hashes)
start = (page - 1) * limit
end = start + limit
hashes_page = all_hashes[start:end]
has_more = end < total
return {
"hashes": hashes_page,
"pagination": {
"page": page,
"limit": limit,
"total": total,
"has_more": has_more
}
}
@app.delete("/cache/{content_hash}")
async def discard_cache(content_hash: str, ctx: UserContext = Depends(get_required_user_context)):
"""
Discard (delete) a cached item.
Enforces deletion rules:
- Cannot delete items published to L2 (shared)
- Cannot delete inputs/outputs of activities (runs)
- Cannot delete pinned items
"""
# Check if content exists
if not cache_manager.has_content(content_hash):
raise HTTPException(404, "Content not found")
# Check ownership
user_hashes = await get_user_cache_hashes(ctx.username, ctx.actor_id)
if content_hash not in user_hashes:
raise HTTPException(403, "Access denied")
# Check if pinned
meta = await database.load_item_metadata(content_hash, ctx.actor_id)
if meta.get("pinned"):
pin_reason = meta.get("pin_reason", "unknown")
raise HTTPException(400, f"Cannot discard pinned item (reason: {pin_reason})")
# Check if used by any run (Redis runs, not just activity store)
runs_using = await asyncio.to_thread(find_runs_using_content, content_hash)
if runs_using:
run, role = runs_using[0]
raise HTTPException(400, f"Cannot discard: item is {role} of run {run.run_id}")
# Check deletion rules via cache_manager (L2 shared status, activity store)
can_delete, reason = await asyncio.to_thread(cache_manager.can_delete, content_hash)
if not can_delete:
raise HTTPException(400, f"Cannot discard: {reason}")
# Delete via cache_manager
success, msg = await asyncio.to_thread(cache_manager.delete_by_content_hash, content_hash)
if not success:
# Fallback to legacy deletion
cache_path = get_cache_path(content_hash)
if cache_path and cache_path.exists():
cache_path.unlink()
# Clean up legacy metadata files
meta_path = CACHE_DIR / f"{content_hash}.meta.json"
if meta_path.exists():
meta_path.unlink()
mp4_path = CACHE_DIR / f"{content_hash}.mp4"
if mp4_path.exists():
mp4_path.unlink()
return {"discarded": True, "content_hash": content_hash}
@app.delete("/ui/cache/{content_hash}/discard", response_class=HTMLResponse)
async def ui_discard_cache(content_hash: str, request: Request):
"""HTMX handler: discard a cached item."""
ctx = await get_user_context_from_cookie(request)
if not ctx:
return '
Login required
'
# Check ownership
user_hashes = await get_user_cache_hashes(ctx.username, ctx.actor_id)
if content_hash not in user_hashes:
return '
Access denied
'
# Check if content exists
has_content = await asyncio.to_thread(cache_manager.has_content, content_hash)
if not has_content:
return '
Content not found
'
# Check if pinned
meta = await database.load_item_metadata(content_hash, ctx.actor_id)
if meta.get("pinned"):
pin_reason = meta.get("pin_reason", "unknown")
return f'
Cannot discard: item is pinned ({pin_reason})
'
# Check if used by any run (Redis runs, not just activity store)
runs_using = await asyncio.to_thread(find_runs_using_content, content_hash)
if runs_using:
run, role = runs_using[0]
return f'
Cannot discard: item is {role} of run {run.run_id}
'
# Check deletion rules via cache_manager (L2 shared status, activity store)
can_delete, reason = await asyncio.to_thread(cache_manager.can_delete, content_hash)
if not can_delete:
return f'
Cannot discard: {reason}
'
# Delete via cache_manager
success, msg = await asyncio.to_thread(cache_manager.delete_by_content_hash, content_hash)
if not success:
# Fallback to legacy deletion
cache_path = get_cache_path(content_hash)
if cache_path and cache_path.exists():
cache_path.unlink()
# Clean up legacy metadata files
meta_path = CACHE_DIR / f"{content_hash}.meta.json"
if meta_path.exists():
meta_path.unlink()
mp4_path = CACHE_DIR / f"{content_hash}.mp4"
if mp4_path.exists():
mp4_path.unlink()
return '''
'''
# Known assets (bootstrap data)
KNOWN_ASSETS = {
"cat": "33268b6e167deaf018cc538de12dbe562612b33e89a749391cef855b320a269b",
}
@app.get("/assets")
async def list_assets():
"""List known assets."""
return KNOWN_ASSETS
@app.post("/cache/import")
async def import_to_cache(path: str):
"""Import a local file to cache."""
source = Path(path)
if not source.exists():
raise HTTPException(404, f"File not found: {path}")
content_hash = await cache_file(source)
return {"content_hash": content_hash, "cached": True}
def save_cache_meta(content_hash: str, uploader: str = None, filename: str = None, **updates):
"""Save or update metadata for a cached file."""
meta_path = CACHE_DIR / f"{content_hash}.meta.json"
# Load existing or create new
if meta_path.exists():
with open(meta_path) as f:
meta = json.load(f)
else:
meta = {
"uploader": uploader,
"uploaded_at": datetime.now(timezone.utc).isoformat(),
"filename": filename
}
# Apply updates (but never change uploader or uploaded_at)
for key, value in updates.items():
if key not in ("uploader", "uploaded_at"):
meta[key] = value
with open(meta_path, "w") as f:
json.dump(meta, f, indent=2)
return meta
def load_cache_meta(content_hash: str) -> dict:
"""Load metadata for a cached file."""
meta_path = CACHE_DIR / f"{content_hash}.meta.json"
if meta_path.exists():
with open(meta_path) as f:
return json.load(f)
return {}
# User data storage (folders, collections)
USER_DATA_DIR = CACHE_DIR / ".user-data"
def load_user_data(username: str) -> dict:
"""Load user's folders and collections."""
USER_DATA_DIR.mkdir(parents=True, exist_ok=True)
# Normalize username (remove @ prefix if present)
safe_name = username.replace("@", "").replace("/", "_")
user_file = USER_DATA_DIR / f"{safe_name}.json"
if user_file.exists():
with open(user_file) as f:
return json.load(f)
return {"folders": ["/"], "collections": []}
def save_user_data(username: str, data: dict):
"""Save user's folders and collections."""
USER_DATA_DIR.mkdir(parents=True, exist_ok=True)
safe_name = username.replace("@", "").replace("/", "_")
user_file = USER_DATA_DIR / f"{safe_name}.json"
with open(user_file, "w") as f:
json.dump(data, f, indent=2)
async def get_user_cache_hashes(username: str, actor_id: Optional[str] = None) -> set:
"""Get all cache hashes owned by or associated with a user.
username: The plain username
actor_id: The full actor ID (@user@server), if available
"""
# Match against both formats for backwards compatibility
match_values = [username]
if actor_id:
match_values.append(actor_id)
hashes = set()
# Query database for items owned by user (new system)
if actor_id:
try:
db_items = await database.get_user_items(actor_id)
for item in db_items:
hashes.add(item["content_hash"])
except Exception:
pass # Database may not be initialized
# Legacy: Files uploaded by user (JSON metadata)
if CACHE_DIR.exists():
for f in CACHE_DIR.iterdir():
if f.name.endswith('.meta.json'):
try:
meta_path = CACHE_DIR / f.name
if meta_path.exists():
import json
with open(meta_path, 'r') as mf:
meta = json.load(mf)
if meta.get("uploader") in match_values:
hashes.add(f.name.replace('.meta.json', ''))
except Exception:
pass
# Files from user's runs (inputs and outputs)
for run in list_all_runs():
if run.username in match_values:
hashes.update(run.inputs)
if run.output_hash:
hashes.add(run.output_hash)
return hashes
@app.post("/cache/upload")
async def upload_to_cache(file: UploadFile = File(...), ctx: UserContext = Depends(get_required_user_context)):
"""Upload a file to cache. Requires authentication."""
# Write to temp file first
import tempfile
with tempfile.NamedTemporaryFile(delete=False) as tmp:
content = await file.read()
tmp.write(content)
tmp_path = Path(tmp.name)
# Store in cache via cache_manager
cached, ipfs_cid = cache_manager.put(tmp_path, node_type="upload", move=True)
content_hash = cached.content_hash
# Save to cache_items table (with IPFS CID)
await database.create_cache_item(content_hash, ipfs_cid)
# Save uploader metadata to database
await database.save_item_metadata(
content_hash=content_hash,
actor_id=ctx.actor_id,
item_type="media",
filename=file.filename
)
return {"content_hash": content_hash, "filename": file.filename, "size": len(content)}
class CacheMetaUpdate(BaseModel):
"""Request to update cache metadata."""
origin: Optional[dict] = None # {"type": "self"|"external", "url": "...", "note": "..."}
description: Optional[str] = None
tags: Optional[list[str]] = None
folder: Optional[str] = None
collections: Optional[list[str]] = None
class PublishRequest(BaseModel):
"""Request to publish a cache item to L2."""
asset_name: str
asset_type: str = "image" # image, video, etc.
@app.get("/cache/{content_hash}/meta")
async def get_cache_meta(content_hash: str, ctx: UserContext = Depends(get_required_user_context)):
"""Get metadata for a cached file."""
# Check file exists
cache_path = get_cache_path(content_hash)
if not cache_path:
raise HTTPException(404, "Content not found")
# Check ownership
user_hashes = await get_user_cache_hashes(ctx.username, ctx.actor_id)
if content_hash not in user_hashes:
raise HTTPException(403, "Access denied")
return await database.load_item_metadata(content_hash, ctx.actor_id)
@app.patch("/cache/{content_hash}/meta")
async def update_cache_meta(content_hash: str, update: CacheMetaUpdate, ctx: UserContext = Depends(get_required_user_context)):
"""Update metadata for a cached file."""
# Check file exists
cache_path = get_cache_path(content_hash)
if not cache_path:
raise HTTPException(404, "Content not found")
# Check ownership
user_hashes = await get_user_cache_hashes(ctx.username, ctx.actor_id)
if content_hash not in user_hashes:
raise HTTPException(403, "Access denied")
# Build update dict from non-None fields
updates = {}
if update.origin is not None:
updates["origin"] = update.origin
if update.description is not None:
updates["description"] = update.description
if update.tags is not None:
updates["tags"] = update.tags
if update.folder is not None:
# Ensure folder exists in user's folder list
user_data = load_user_data(ctx.username)
if update.folder not in user_data["folders"]:
raise HTTPException(400, f"Folder does not exist: {update.folder}")
updates["folder"] = update.folder
if update.collections is not None:
# Validate collections exist
user_data = load_user_data(ctx.username)
existing = {c["name"] for c in user_data["collections"]}
for col in update.collections:
if col not in existing:
raise HTTPException(400, f"Collection does not exist: {col}")
updates["collections"] = update.collections
meta = await database.update_item_metadata(content_hash, ctx.actor_id, **updates)
return meta
@app.post("/cache/{content_hash}/publish")
async def publish_cache_to_l2(
content_hash: str,
req: PublishRequest,
request: Request,
ctx: UserContext = Depends(get_required_user_context)
):
"""
Publish a cache item to L2 (ActivityPub).
Requires origin to be set in metadata before publishing.
"""
# Check file exists
cache_path = get_cache_path(content_hash)
if not cache_path:
raise HTTPException(404, "Content not found")
# Check ownership
user_hashes = await get_user_cache_hashes(ctx.username, ctx.actor_id)
if content_hash not in user_hashes:
raise HTTPException(403, "Access denied")
# Load metadata
meta = await database.load_item_metadata(content_hash, ctx.actor_id)
# Check origin is set
origin = meta.get("origin")
if not origin or "type" not in origin:
raise HTTPException(400, "Origin must be set before publishing. Use --origin self or --origin-url ")
# Get IPFS CID from cache item
cache_item = await database.get_cache_item(content_hash)
ipfs_cid = cache_item.get("ipfs_cid") if cache_item else None
# Get auth token to pass to L2
token = request.cookies.get("auth_token")
if not token:
# Try from header
auth_header = request.headers.get("Authorization", "")
if auth_header.startswith("Bearer "):
token = auth_header[7:]
if not token:
raise HTTPException(401, "Authentication token required")
# Call L2 publish-cache endpoint (use user's L2 server)
l2_server = ctx.l2_server
try:
resp = http_requests.post(
f"{l2_server}/assets/publish-cache",
headers={"Authorization": f"Bearer {token}"},
json={
"content_hash": content_hash,
"ipfs_cid": ipfs_cid,
"asset_name": req.asset_name,
"asset_type": req.asset_type,
"origin": origin,
"description": meta.get("description"),
"tags": meta.get("tags", []),
"metadata": {
"filename": meta.get("filename"),
"folder": meta.get("folder"),
"collections": meta.get("collections", [])
}
},
timeout=10
)
resp.raise_for_status()
l2_result = resp.json()
except http_requests.exceptions.HTTPError as e:
error_detail = ""
try:
error_detail = e.response.json().get("detail", str(e))
except Exception:
error_detail = str(e)
raise HTTPException(400, f"L2 publish failed: {error_detail}")
except Exception as e:
raise HTTPException(500, f"L2 publish failed: {e}")
# Update local metadata with publish status and pin
await database.save_l2_share(
content_hash=content_hash,
actor_id=ctx.actor_id,
l2_server=l2_server,
asset_name=req.asset_name,
content_type=req.asset_type
)
await database.update_item_metadata(
content_hash=content_hash,
actor_id=ctx.actor_id,
pinned=True,
pin_reason="published"
)
return {
"published": True,
"asset_name": req.asset_name,
"l2_result": l2_result
}
@app.patch("/cache/{content_hash}/republish")
async def republish_cache_to_l2(
content_hash: str,
request: Request,
ctx: UserContext = Depends(get_required_user_context)
):
"""
Re-publish (update) a cache item on L2 after metadata changes.
Only works for already-published items.
"""
# Check file exists
cache_path = get_cache_path(content_hash)
if not cache_path:
raise HTTPException(404, "Content not found")
# Check ownership
user_hashes = await get_user_cache_hashes(ctx.username, ctx.actor_id)
if content_hash not in user_hashes:
raise HTTPException(403, "Access denied")
# Load metadata
meta = await database.load_item_metadata(content_hash, ctx.actor_id)
l2_shares = meta.get("l2_shares", [])
# Find share for current L2 server (user's L2)
l2_server = ctx.l2_server
current_share = None
for share in l2_shares:
if share.get("l2_server") == l2_server:
current_share = share
break
if not current_share:
raise HTTPException(400, "Item not published yet. Use publish first.")
asset_name = current_share.get("asset_name")
if not asset_name:
raise HTTPException(400, "No asset name found in publish info")
# Get auth token
token = request.cookies.get("auth_token")
if not token:
auth_header = request.headers.get("Authorization", "")
if auth_header.startswith("Bearer "):
token = auth_header[7:]
if not token:
raise HTTPException(401, "Authentication token required")
# Get IPFS CID from cache item
cache_item = await database.get_cache_item(content_hash)
ipfs_cid = cache_item.get("ipfs_cid") if cache_item else None
# Call L2 update endpoint (use user's L2 server)
l2_server = ctx.l2_server
try:
resp = http_requests.patch(
f"{l2_server}/assets/{asset_name}",
headers={"Authorization": f"Bearer {token}"},
json={
"description": meta.get("description"),
"tags": meta.get("tags"),
"origin": meta.get("origin"),
"ipfs_cid": ipfs_cid,
"metadata": {
"filename": meta.get("filename"),
"folder": meta.get("folder"),
"collections": meta.get("collections", [])
}
},
timeout=10
)
resp.raise_for_status()
l2_result = resp.json()
except http_requests.exceptions.HTTPError as e:
error_detail = ""
try:
error_detail = e.response.json().get("detail", str(e))
except Exception:
error_detail = str(e)
raise HTTPException(400, f"L2 update failed: {error_detail}")
except Exception as e:
raise HTTPException(500, f"L2 update failed: {e}")
# Update local metadata - save_l2_share updates last_synced_at on conflict
await database.save_l2_share(
content_hash=content_hash,
actor_id=ctx.actor_id,
l2_server=l2_server,
asset_name=asset_name,
content_type=current_share.get("content_type", "media")
)
return {
"updated": True,
"asset_name": asset_name,
"l2_result": l2_result
}
# ============ L2 Sync ============
def _fetch_l2_outbox_sync(l2_server: str, username: str) -> list:
"""Fetch user's outbox from L2 (sync version for asyncio.to_thread)."""
try:
# Fetch outbox page with activities
resp = http_requests.get(
f"{l2_server}/users/{username}/outbox?page=true",
headers={"Accept": "application/activity+json"},
timeout=10
)
if resp.status_code != 200:
logger.warning(f"L2 outbox fetch failed: {resp.status_code}")
return []
data = resp.json()
return data.get("orderedItems", [])
except Exception as e:
logger.error(f"Failed to fetch L2 outbox: {e}")
return []
@app.post("/user/sync-l2")
async def sync_with_l2(ctx: UserContext = Depends(get_required_user_context)):
"""
Sync local L2 share records with user's L2 outbox.
Fetches user's published assets from their L2 server and updates local tracking.
"""
l2_server = ctx.l2_server
username = ctx.username
# Fetch outbox activities
activities = await asyncio.to_thread(_fetch_l2_outbox_sync, l2_server, username)
if not activities:
return {"synced": 0, "message": "No activities found or L2 unavailable"}
# Process Create activities for assets
synced_count = 0
for activity in activities:
if activity.get("type") != "Create":
continue
obj = activity.get("object", {})
if not isinstance(obj, dict):
continue
# Get asset info - look for content_hash in attachment or directly
content_hash = None
asset_name = obj.get("name", "")
# Check attachments for content hash
for attachment in obj.get("attachment", []):
if attachment.get("name") == "content_hash":
content_hash = attachment.get("value")
break
# Also check if there's a hash in the object URL or ID
if not content_hash:
# Try to extract from object ID like /objects/{hash}
obj_id = obj.get("id", "")
if "/objects/" in obj_id:
content_hash = obj_id.split("/objects/")[-1].split("/")[0]
if not content_hash or not asset_name:
continue
# Check if we have this content locally
cache_path = get_cache_path(content_hash)
if not cache_path:
continue # We don't have this content, skip
# Determine content type from object type
obj_type = obj.get("type", "")
if obj_type == "Video":
content_type = "video"
elif obj_type == "Image":
content_type = "image"
else:
content_type = "media"
# Update local L2 share record
await database.save_l2_share(
content_hash=content_hash,
actor_id=ctx.actor_id,
l2_server=l2_server,
asset_name=asset_name,
content_type=content_type
)
synced_count += 1
return {"synced": synced_count, "total_activities": len(activities)}
@app.post("/ui/sync-l2", response_class=HTMLResponse)
async def ui_sync_with_l2(request: Request):
"""HTMX handler: sync with L2 server."""
ctx = await get_user_context_from_cookie(request)
if not ctx:
return '
Login required
'
try:
result = await sync_with_l2(ctx)
synced = result.get("synced", 0)
total = result.get("total_activities", 0)
if synced > 0:
return f'''
Synced {synced} asset(s) from L2 ({total} activities found)
'''
# ============ Folder & Collection Management ============
@app.get("/user/folders")
async def list_folders(username: str = Depends(get_required_user)):
"""List user's folders."""
user_data = load_user_data(username)
return {"folders": user_data["folders"]}
@app.post("/user/folders")
async def create_folder(folder_path: str, username: str = Depends(get_required_user)):
"""Create a new folder."""
user_data = load_user_data(username)
# Validate path format
if not folder_path.startswith("/"):
raise HTTPException(400, "Folder path must start with /")
# Check parent exists
parent = "/".join(folder_path.rsplit("/", 1)[:-1]) or "/"
if parent != "/" and parent not in user_data["folders"]:
raise HTTPException(400, f"Parent folder does not exist: {parent}")
# Check doesn't already exist
if folder_path in user_data["folders"]:
raise HTTPException(400, f"Folder already exists: {folder_path}")
user_data["folders"].append(folder_path)
user_data["folders"].sort()
save_user_data(username, user_data)
return {"folder": folder_path, "created": True}
@app.delete("/user/folders")
async def delete_folder(folder_path: str, ctx: UserContext = Depends(get_required_user_context)):
"""Delete a folder (must be empty)."""
if folder_path == "/":
raise HTTPException(400, "Cannot delete root folder")
user_data = load_user_data(ctx.username)
if folder_path not in user_data["folders"]:
raise HTTPException(404, "Folder not found")
# Check no subfolders
for f in user_data["folders"]:
if f.startswith(folder_path + "/"):
raise HTTPException(400, f"Folder has subfolders: {f}")
# Check no items in folder
user_hashes = await get_user_cache_hashes(ctx.username, ctx.actor_id)
for h in user_hashes:
meta = await database.load_item_metadata(h, ctx.actor_id)
if meta.get("folder") == folder_path:
raise HTTPException(400, "Folder is not empty")
user_data["folders"].remove(folder_path)
save_user_data(ctx.username, user_data)
return {"folder": folder_path, "deleted": True}
@app.get("/user/collections")
async def list_collections(username: str = Depends(get_required_user)):
"""List user's collections."""
user_data = load_user_data(username)
return {"collections": user_data["collections"]}
@app.post("/user/collections")
async def create_collection(name: str, username: str = Depends(get_required_user)):
"""Create a new collection."""
user_data = load_user_data(username)
# Check doesn't already exist
for col in user_data["collections"]:
if col["name"] == name:
raise HTTPException(400, f"Collection already exists: {name}")
user_data["collections"].append({
"name": name,
"created_at": datetime.now(timezone.utc).isoformat()
})
save_user_data(username, user_data)
return {"collection": name, "created": True}
@app.delete("/user/collections")
async def delete_collection(name: str, ctx: UserContext = Depends(get_required_user_context)):
"""Delete a collection."""
user_data = load_user_data(ctx.username)
# Find and remove
for i, col in enumerate(user_data["collections"]):
if col["name"] == name:
user_data["collections"].pop(i)
save_user_data(ctx.username, user_data)
# Remove from all cache items
user_hashes = await get_user_cache_hashes(ctx.username, ctx.actor_id)
for h in user_hashes:
meta = await database.load_item_metadata(h, ctx.actor_id)
if name in meta.get("collections", []):
new_collections = [c for c in meta.get("collections", []) if c != name]
await database.update_item_metadata(h, ctx.actor_id, collections=new_collections)
return {"collection": name, "deleted": True}
raise HTTPException(404, "Collection not found")
def is_ios_request(request: Request) -> bool:
"""Check if request is from iOS device."""
ua = request.headers.get("user-agent", "").lower()
return "iphone" in ua or "ipad" in ua
def video_src_for_request(content_hash: str, request: Request) -> str:
"""Get video src URL, using MP4 endpoint for iOS."""
if is_ios_request(request):
return f"/cache/{content_hash}/mp4"
return f"/cache/{content_hash}"
def detect_media_type(cache_path: Path) -> str:
"""Detect if file is image or video based on magic bytes."""
with open(cache_path, "rb") as f:
header = f.read(32)
# Video signatures
if header[:4] == b'\x1a\x45\xdf\xa3': # WebM/MKV
return "video"
if header[4:8] == b'ftyp': # MP4/MOV
return "video"
if header[:4] == b'RIFF' and header[8:12] == b'AVI ': # AVI
return "video"
# Image signatures
if header[:8] == b'\x89PNG\r\n\x1a\n': # PNG
return "image"
if header[:2] == b'\xff\xd8': # JPEG
return "image"
if header[:6] in (b'GIF87a', b'GIF89a'): # GIF
return "image"
if header[:4] == b'RIFF' and header[8:12] == b'WEBP': # WebP
return "image"
return "unknown"
async def get_user_context_from_cookie(request) -> Optional[UserContext]:
"""Get user context from auth cookie. Returns full context with actor_id and l2_server."""
token = request.cookies.get("auth_token")
if not token:
return None
return await get_verified_user_context(token)
async def get_user_from_cookie(request) -> Optional[str]:
"""Get username from auth cookie (backwards compat - prefer get_user_context_from_cookie)."""
ctx = await get_user_context_from_cookie(request)
return ctx.username if ctx else None
def wants_html(request: Request) -> bool:
"""Check if request wants HTML (browser) vs JSON (API)."""
accept = request.headers.get("accept", "")
# Check for explicit HTML request
if "text/html" in accept and "application/json" not in accept:
return True
# Check for browser navigation (direct URL access)
fetch_mode = request.headers.get("sec-fetch-mode", "")
if fetch_mode == "navigate":
return True
return False
# Tailwind CSS config for all L1 templates
TAILWIND_CONFIG = '''
'''
def render_page(title: str, content: str, actor_id: Optional[str] = None, active_tab: str = None) -> str:
"""Render a page with nav bar and content. Used for clean URL pages.
actor_id: The user's actor ID (@user@server) or None if not logged in.
"""
user_info = ""
if actor_id:
# Extract username and domain from @username@domain format
parts = actor_id.lstrip("@").split("@")
username = parts[0] if parts else actor_id
domain = parts[1] if len(parts) > 1 else ""
l2_user_url = f"https://{domain}/users/{username}" if domain else "#"
user_info = f'''
"""
def render_ui_html(actor_id: Optional[str] = None, tab: str = "runs") -> str:
"""Render main UI HTML with optional user context.
actor_id: The user's actor ID (@user@server) or None if not logged in.
"""
user_info = ""
if actor_id:
# Extract username and domain from @username@domain format
parts = actor_id.lstrip("@").split("@")
username = parts[0] if parts else actor_id
domain = parts[1] if len(parts) > 1 else ""
l2_user_url = f"https://{domain}/users/{username}" if domain else "#"
user_info = f'''
"""
# Auth routes - L1 never handles credentials, redirects to L2
# L2 sets auth_token cookie with domain=.rose-ash.com for shared auth
L1_PUBLIC_URL = os.environ.get("L1_PUBLIC_URL", "http://localhost:8100")
@app.get("/login")
async def login_page():
"""Redirect to L2 server for login. L1 never handles credentials."""
# Redirect to default L2 with return URL so L2 can redirect back after login
return_url = f"{L1_PUBLIC_URL}/runs"
return RedirectResponse(url=f"{DEFAULT_L2_SERVER}/login?return_to={return_url}", status_code=302)
@app.get("/register")
async def register_page():
"""Redirect to L2 server for registration. L1 never handles credentials."""
return_url = f"{L1_PUBLIC_URL}/runs"
return RedirectResponse(url=f"{DEFAULT_L2_SERVER}/register?return_to={return_url}", status_code=302)
@app.get("/logout")
async def logout():
"""Logout - clear cookie and redirect to L2 logout."""
# Clear local cookie and redirect to L2 to clear shared cookie
response = RedirectResponse(url=f"{DEFAULT_L2_SERVER}/logout?return_to={L1_PUBLIC_URL}/", status_code=302)
response.delete_cookie("auth_token")
return response
@app.post("/ui/publish-run/{run_id}", response_class=HTMLResponse)
async def ui_publish_run(run_id: str, request: Request):
"""Publish a run to L2 from the web UI. Assets are named by content_hash."""
ctx = await get_user_context_from_cookie(request)
if not ctx:
return HTMLResponse('
Not logged in
')
token = request.cookies.get("auth_token")
if not token:
return HTMLResponse('
Not logged in
')
# Get the run to pin its output and inputs
run = load_run(run_id)
if not run:
return HTMLResponse('
Run not found
')
# Call L2 to publish the run, including this L1's public URL
# Assets are named by their content_hash - no output_name needed
l2_server = ctx.l2_server
try:
resp = http_requests.post(
f"{l2_server}/assets/record-run",
json={"run_id": run_id, "l1_server": L1_PUBLIC_URL},
headers={"Authorization": f"Bearer {token}"},
timeout=30
)
if resp.status_code == 400:
error = resp.json().get("detail", "Bad request")
return HTMLResponse(f'
Error: {error}
')
resp.raise_for_status()
result = resp.json()
# Pin the output and record L2 share
if run.output_hash:
await database.update_item_metadata(run.output_hash, ctx.actor_id, pinned=True, pin_reason="published")
# Record L2 share so UI shows published status
cache_path = get_cache_path(run.output_hash)
media_type = detect_media_type(cache_path) if cache_path else "image"
content_type = "video" if media_type == "video" else "image"
await database.save_l2_share(
content_hash=run.output_hash,
actor_id=ctx.actor_id,
l2_server=l2_server,
asset_name=result["asset"]["name"],
content_type=content_type
)
# Pin the inputs (for provenance)
for input_hash in run.inputs:
await database.update_item_metadata(input_hash, ctx.actor_id, pinned=True, pin_reason="input_to_published")
# If this was a recipe-based run, pin the recipe and its fixed inputs
if run.recipe.startswith("recipe:"):
config_name = run.recipe.replace("recipe:", "")
for recipe in list_all_recipes():
if recipe.name == config_name:
# Pin the recipe YAML
cache_manager.pin(recipe.recipe_id, reason="recipe_for_published")
# Pin all fixed inputs referenced by the recipe
for fixed in recipe.fixed_inputs:
if fixed.content_hash:
cache_manager.pin(fixed.content_hash, reason="fixed_input_in_published_recipe")
break
# Use HTTPS for L2 links
l2_https = l2_server.replace("http://", "https://")
asset_name = result["asset"]["name"]
short_name = asset_name[:16] + "..." if len(asset_name) > 20 else asset_name
return HTMLResponse(f'''
')
@app.get("/ui/runs", response_class=HTMLResponse)
async def ui_runs(request: Request):
"""HTMX partial: list of runs."""
ctx = await get_user_context_from_cookie(request)
runs = list_all_runs()
# Require login to see runs
if not ctx:
return '
'
# Filter runs by user - match both plain username and ActivityPub format (@user@domain)
runs = [r for r in runs if r.username in (ctx.username, ctx.actor_id)]
if not runs:
return '
'
# Get hashes owned by/associated with this user
user_hashes = await get_user_cache_hashes(ctx.username, ctx.actor_id)
# Get cache items that belong to the user (from cache_manager)
cache_items = []
seen_hashes = set() # Deduplicate by content_hash
for cached_file in cache_manager.list_all():
content_hash = cached_file.content_hash
if content_hash not in user_hashes:
continue
# Skip duplicates (same content from multiple runs)
if content_hash in seen_hashes:
continue
seen_hashes.add(content_hash)
# Skip recipes - they have their own section
if cached_file.node_type == "recipe":
continue
# Load metadata for filtering
meta = await database.load_item_metadata(content_hash, ctx.actor_id)
# Apply folder filter
if folder:
item_folder = meta.get("folder", "/")
if folder != "/" and not item_folder.startswith(folder):
continue
if folder == "/" and item_folder != "/":
continue
# Apply collection filter
if collection:
if collection not in meta.get("collections", []):
continue
# Apply tag filter
if tag:
if tag not in meta.get("tags", []):
continue
cache_items.append({
"hash": content_hash,
"size": cached_file.size_bytes,
"mtime": cached_file.created_at,
"meta": meta
})
# Sort by modification time (newest first)
cache_items.sort(key=lambda x: x["mtime"], reverse=True)
if not cache_items:
filter_msg = ""
if folder:
filter_msg = f" in folder {folder}"
elif collection:
filter_msg = f" in collection '{collection}'"
elif tag:
filter_msg = f" with tag '{tag}'"
return f'
No cached files{filter_msg}. Upload files or run effects to see them here.
'
html_parts = ['
']
for item in cache_items[:50]: # Limit to 50 items
content_hash = item["hash"]
cache_path = get_cache_path(content_hash)
media_type = detect_media_type(cache_path) if cache_path else "unknown"
# Check IPFS status
cache_item = await database.get_cache_item(content_hash)
ipfs_cid = cache_item.get("ipfs_cid") if cache_item else None
ipfs_badge = 'IPFS' if ipfs_cid else ''
# Check L2 publish status
l2_shares = item["meta"].get("l2_shares", [])
if l2_shares:
first_share = l2_shares[0]
l2_server = first_share.get("l2_server", "")
asset_name = first_share.get("asset_name", "")
asset_url = f"{l2_server}/assets/{asset_name}"
published_badge = f'L2'
else:
published_badge = ''
# Format size
size = item["size"]
if size > 1024*1024:
size_str = f"{size/(1024*1024):.1f} MB"
elif size > 1024:
size_str = f"{size/1024:.1f} KB"
else:
size_str = f"{size} bytes"
html_parts.append(f'''