"""
@app.get("/", response_class=HTMLResponse)
async def root():
"""Home page."""
return HOME_HTML
@app.get("/debug/cache/{content_hash}")
async def debug_cache(content_hash: str):
"""Debug endpoint to check cache status for a content hash."""
import os
result = {
"content_hash": content_hash,
"cache_dir": str(cache_manager.cache_dir),
"nodes_dir": str(cache_manager.cache.cache_dir),
"in_content_index": content_hash in cache_manager._content_index,
"node_id_from_index": cache_manager._content_index.get(content_hash),
}
# Check various locations
locations = {
"legacy_direct": cache_manager.cache_dir / content_hash,
"nodes_dir": cache_manager.cache.cache_dir / content_hash,
}
for name, path in locations.items():
result[f"{name}_path"] = str(path)
result[f"{name}_exists"] = path.exists()
if path.exists() and path.is_dir():
result[f"{name}_contents"] = [f.name for f in path.iterdir()]
# Check if artdag cache has it
result["artdag_cache_get"] = str(cache_manager.cache.get(content_hash))
# Check via cache_manager
found_path = cache_manager.get_by_content_hash(content_hash)
result["cache_manager_path"] = str(found_path) if found_path else None
result["has_content"] = cache_manager.has_content(content_hash)
return result
@app.post("/runs", response_model=RunStatus)
async def create_run(request: RunRequest, username: str = Depends(get_required_user)):
"""Start a new rendering run. Requires authentication."""
run_id = str(uuid.uuid4())
# Generate output name if not provided
output_name = request.output_name or f"{request.recipe}-{run_id[:8]}"
# Format username as ActivityPub actor ID
actor_id = f"@{username}@{L2_DOMAIN}"
# Create run record
run = RunStatus(
run_id=run_id,
status="pending",
recipe=request.recipe,
inputs=request.inputs,
output_name=output_name,
created_at=datetime.now(timezone.utc).isoformat(),
username=actor_id
)
# Submit to Celery
if request.use_dag or request.recipe == "dag":
# DAG mode - use artdag engine
if request.dag_json:
# Custom DAG provided
dag_json = request.dag_json
else:
# Build simple effect DAG from recipe and inputs
dag = build_effect_dag(request.inputs, request.recipe)
dag_json = dag.to_json()
task = execute_dag.delay(dag_json, run.run_id)
else:
# Legacy mode - single effect
if len(request.inputs) != 1:
raise HTTPException(400, "Legacy mode only supports single-input recipes. Use use_dag=true for multi-input.")
input_hash = request.inputs[0]
task = render_effect.delay(input_hash, request.recipe, output_name)
run.celery_task_id = task.id
run.status = "running"
save_run(run)
return run
@app.get("/runs/{run_id}", response_model=RunStatus)
async def get_run(run_id: str):
"""Get status of a run."""
run = load_run(run_id)
if not run:
raise HTTPException(404, f"Run {run_id} not found")
# Check Celery task status if running
if run.status == "running" and run.celery_task_id:
task = celery_app.AsyncResult(run.celery_task_id)
if task.ready():
if task.successful():
result = task.result
run.status = "completed"
run.completed_at = datetime.now(timezone.utc).isoformat()
# Handle both legacy (render_effect) and new (execute_dag) result formats
if "output_hash" in result:
# New DAG result format
run.output_hash = result.get("output_hash")
output_path = Path(result.get("output_path", "")) if result.get("output_path") else None
else:
# Legacy render_effect format
run.output_hash = result.get("output", {}).get("content_hash")
output_path = Path(result.get("output", {}).get("local_path", ""))
# Extract effects info from provenance (legacy only)
effects = result.get("effects", [])
if effects:
run.effects_commit = effects[0].get("repo_commit")
run.effect_url = effects[0].get("repo_url")
# Extract infrastructure info (legacy only)
run.infrastructure = result.get("infrastructure")
# Cache the output (legacy mode - DAG already caches via cache_manager)
if output_path and output_path.exists() and "output_hash" not in result:
cache_file(output_path, node_type="effect_output")
# Record activity for deletion tracking (legacy mode)
if run.output_hash and run.inputs:
cache_manager.record_simple_activity(
input_hashes=run.inputs,
output_hash=run.output_hash,
run_id=run.run_id,
)
else:
run.status = "failed"
run.error = str(task.result)
# Save updated status
save_run(run)
return run
@app.delete("/runs/{run_id}")
async def discard_run(run_id: str, username: str = Depends(get_required_user)):
"""
Discard (delete) a run and its intermediate cache entries.
Enforces deletion rules:
- Cannot discard if any item (input, output) is published to L2
- Deletes intermediate cache entries
- Keeps inputs (may be used by other runs)
- Deletes orphaned outputs
"""
run = load_run(run_id)
if not run:
raise HTTPException(404, f"Run {run_id} not found")
# Check ownership
actor_id = f"@{username}@{L2_DOMAIN}"
if run.username not in (username, actor_id):
raise HTTPException(403, "Access denied")
# Failed runs can always be deleted (no output to protect)
if run.status != "failed":
# Check if activity exists for this run
activity = cache_manager.get_activity(run_id)
if activity:
# Use activity manager deletion rules
can_discard, reason = cache_manager.can_discard_activity(run_id)
if not can_discard:
raise HTTPException(400, f"Cannot discard run: {reason}")
# Discard the activity (cleans up cache entries)
success, msg = cache_manager.discard_activity(run_id)
if not success:
raise HTTPException(500, f"Failed to discard: {msg}")
else:
# Legacy run without activity record - check L2 shared status manually
items_to_check = list(run.inputs or [])
if run.output_hash:
items_to_check.append(run.output_hash)
for content_hash in items_to_check:
if cache_manager.l2_checker.is_shared(content_hash):
raise HTTPException(400, f"Cannot discard run: item {content_hash[:16]}... is published to L2")
# Remove from Redis
redis_client.delete(f"{RUNS_KEY_PREFIX}{run_id}")
return {"discarded": True, "run_id": run_id}
@app.delete("/ui/runs/{run_id}/discard", response_class=HTMLResponse)
async def ui_discard_run(run_id: str, request: Request):
"""HTMX handler: discard a run."""
current_user = get_user_from_cookie(request)
if not current_user:
return '
Login required
'
run = load_run(run_id)
if not run:
return '
Run not found
'
# Check ownership
actor_id = f"@{current_user}@{L2_DOMAIN}"
if run.username not in (current_user, actor_id):
return '
Access denied
'
# Failed runs can always be deleted
if run.status != "failed":
# Check if activity exists for this run
activity = cache_manager.get_activity(run_id)
if activity:
can_discard, reason = cache_manager.can_discard_activity(run_id)
if not can_discard:
return f'
Cannot discard: {reason}
'
success, msg = cache_manager.discard_activity(run_id)
if not success:
return f'
Failed to discard: {msg}
'
else:
# Legacy run - check L2 shared status
items_to_check = list(run.inputs or [])
if run.output_hash:
items_to_check.append(run.output_hash)
for content_hash in items_to_check:
if cache_manager.l2_checker.is_shared(content_hash):
return f'
Cannot discard: item {content_hash[:16]}... is published to L2
'
# Remove from Redis
redis_client.delete(f"{RUNS_KEY_PREFIX}{run_id}")
return '''
'''
@app.get("/run/{run_id}")
async def run_detail(run_id: str, request: Request):
"""Run detail. HTML for browsers, JSON for APIs."""
run = load_run(run_id)
if not run:
if wants_html(request):
content = f'
Run not found: {run_id}
'
return HTMLResponse(render_page("Not Found", content, None, active_tab="runs"), status_code=404)
raise HTTPException(404, f"Run {run_id} not found")
# Check Celery task status if running
if run.status == "running" and run.celery_task_id:
task = celery_app.AsyncResult(run.celery_task_id)
if task.ready():
if task.successful():
result = task.result
run.status = "completed"
run.completed_at = datetime.now(timezone.utc).isoformat()
run.output_hash = result.get("output", {}).get("content_hash")
effects = result.get("effects", [])
if effects:
run.effects_commit = effects[0].get("repo_commit")
run.effect_url = effects[0].get("repo_url")
run.infrastructure = result.get("infrastructure")
output_path = Path(result.get("output", {}).get("local_path", ""))
if output_path.exists():
cache_file(output_path)
else:
run.status = "failed"
run.error = str(task.result)
save_run(run)
if wants_html(request):
current_user = get_user_from_cookie(request)
if not current_user:
content = '
'
return HTMLResponse(render_page("Login Required", content, current_user, active_tab="runs"), status_code=401)
# Check user owns this run
actor_id = f"@{current_user}@{L2_DOMAIN}"
if run.username not in (current_user, actor_id):
content = '
Access denied.
'
return HTMLResponse(render_page("Access Denied", content, current_user, active_tab="runs"), status_code=403)
# Build effect URL
if run.effect_url:
effect_url = run.effect_url
elif run.effects_commit and run.effects_commit != "unknown":
effect_url = f"https://git.rose-ash.com/art-dag/effects/src/commit/{run.effects_commit}/{run.recipe}"
else:
effect_url = f"https://git.rose-ash.com/art-dag/effects/src/branch/main/{run.recipe}"
# Status badge colors
status_colors = {
"completed": "bg-green-600 text-white",
"running": "bg-yellow-600 text-white",
"failed": "bg-red-600 text-white",
"pending": "bg-gray-600 text-white"
}
status_badge = status_colors.get(run.status, "bg-gray-600 text-white")
# Build media HTML for input/output
media_html = ""
has_input = run.inputs and cache_manager.has_content(run.inputs[0])
has_output = run.status == "completed" and run.output_hash and cache_manager.has_content(run.output_hash)
if has_input or has_output:
media_html = '
'''
return HTMLResponse(render_page(f"Run: {run.recipe}", content, current_user, active_tab="runs"))
# JSON response
return run.model_dump()
@app.get("/runs")
async def list_runs(request: Request, page: int = 1, limit: int = 20):
"""List runs. HTML for browsers (with infinite scroll), JSON for APIs (with pagination)."""
current_user = get_user_from_cookie(request)
all_runs = list_all_runs()
total = len(all_runs)
# Filter by user if logged in for HTML
if wants_html(request) and current_user:
actor_id = f"@{current_user}@{L2_DOMAIN}"
all_runs = [r for r in all_runs if r.username in (current_user, actor_id)]
total = len(all_runs)
# Pagination
start = (page - 1) * limit
end = start + limit
runs_page = all_runs[start:end]
has_more = end < total
if wants_html(request):
if not current_user:
content = '
')
# For infinite scroll, just return cards if not first page
if page > 1:
if has_more:
html_parts.append(f'''
Loading more...
''')
return HTMLResponse('\n'.join(html_parts))
# First page - full content
infinite_scroll_trigger = ""
if has_more:
infinite_scroll_trigger = f'''
Loading more...
'''
content = f'''
Runs ({total} total)
{''.join(html_parts)}
{infinite_scroll_trigger}
'''
return HTMLResponse(render_page("Runs", content, current_user, active_tab="runs"))
# JSON response for APIs
return {
"runs": [r.model_dump() for r in runs_page],
"pagination": {
"page": page,
"limit": limit,
"total": total,
"has_more": has_more
}
}
@app.get("/cache/{content_hash}")
async def get_cached(content_hash: str):
"""Get cached content by hash."""
cache_path = get_cache_path(content_hash)
if not cache_path:
raise HTTPException(404, f"Content {content_hash} not in cache")
return FileResponse(cache_path)
@app.get("/cache/{content_hash}/mp4")
async def get_cached_mp4(content_hash: str):
"""Get cached content as MP4 (transcodes MKV on first request, caches result)."""
cache_path = get_cache_path(content_hash)
if not cache_path:
raise HTTPException(404, f"Content {content_hash} not in cache")
# MP4 transcodes stored alongside original in CACHE_DIR
mp4_path = CACHE_DIR / f"{content_hash}.mp4"
# If MP4 already cached, serve it
if mp4_path.exists():
return FileResponse(mp4_path, media_type="video/mp4")
# Check if source is already MP4
media_type = detect_media_type(cache_path)
if media_type != "video":
raise HTTPException(400, "Content is not a video")
# Check if already MP4 format
import subprocess
try:
result = subprocess.run(
["ffprobe", "-v", "error", "-select_streams", "v:0",
"-show_entries", "format=format_name", "-of", "csv=p=0", str(cache_path)],
capture_output=True, text=True, timeout=10
)
if "mp4" in result.stdout.lower() or "mov" in result.stdout.lower():
# Already MP4-compatible, just serve original
return FileResponse(cache_path, media_type="video/mp4")
except Exception:
pass # Continue with transcoding
# Transcode to MP4 (H.264 + AAC)
transcode_path = CACHE_DIR / f"{content_hash}.transcoding.mp4"
try:
result = subprocess.run(
["ffmpeg", "-y", "-i", str(cache_path),
"-c:v", "libx264", "-preset", "fast", "-crf", "23",
"-c:a", "aac", "-b:a", "128k",
"-movflags", "+faststart",
str(transcode_path)],
capture_output=True, text=True, timeout=600 # 10 min timeout
)
if result.returncode != 0:
raise HTTPException(500, f"Transcoding failed: {result.stderr[:200]}")
# Move to final location
transcode_path.rename(mp4_path)
except subprocess.TimeoutExpired:
if transcode_path.exists():
transcode_path.unlink()
raise HTTPException(500, "Transcoding timed out")
except Exception as e:
if transcode_path.exists():
transcode_path.unlink()
raise HTTPException(500, f"Transcoding failed: {e}")
return FileResponse(mp4_path, media_type="video/mp4")
@app.get("/cache/{content_hash}/detail")
async def cache_detail(content_hash: str, request: Request):
"""View cached content detail. HTML for browsers, JSON for APIs."""
current_user = get_user_from_cookie(request)
cache_path = get_cache_path(content_hash)
if not cache_path:
if wants_html(request):
content = f'
Content not found: {content_hash}
'
return HTMLResponse(render_page("Not Found", content, current_user, active_tab="cache"), status_code=404)
raise HTTPException(404, f"Content {content_hash} not in cache")
if wants_html(request):
if not current_user:
content = '
'
return HTMLResponse(render_page("Login Required", content, current_user, active_tab="cache"), status_code=401)
# Check user has access
user_hashes = get_user_cache_hashes(current_user)
if content_hash not in user_hashes:
content = '
'
return HTMLResponse(render_page("Cache", content, current_user, active_tab="cache"))
# Get hashes owned by/associated with this user
user_hashes = get_user_cache_hashes(current_user)
# Get cache items that belong to the user (from cache_manager)
cache_items = []
for cached_file in cache_manager.list_all():
content_hash = cached_file.content_hash
if content_hash not in user_hashes:
continue
meta = load_cache_meta(content_hash)
# Apply folder filter
if folder:
item_folder = meta.get("folder", "/")
if folder != "/" and not item_folder.startswith(folder):
continue
if folder == "/" and item_folder != "/":
continue
# Apply collection filter
if collection:
if collection not in meta.get("collections", []):
continue
# Apply tag filter
if tag:
if tag not in meta.get("tags", []):
continue
cache_items.append({
"hash": content_hash,
"size": cached_file.size_bytes,
"mtime": cached_file.created_at,
"meta": meta
})
# Sort by modification time (newest first)
cache_items.sort(key=lambda x: x["mtime"], reverse=True)
total = len(cache_items)
# Pagination
start = (page - 1) * limit
end = start + limit
items_page = cache_items[start:end]
has_more = end < total
if not items_page:
if page == 1:
filter_msg = ""
if folder:
filter_msg = f" in folder {folder}"
elif collection:
filter_msg = f" in collection '{collection}'"
elif tag:
filter_msg = f" with tag '{tag}'"
content = f'
No cached files{filter_msg}. Upload files or run effects to see them here.
'
else:
return HTMLResponse("") # Empty for infinite scroll
else:
html_parts = []
for item in items_page:
content_hash = item["hash"]
cache_path = get_cache_path(content_hash)
media_type = detect_media_type(cache_path) if cache_path else "unknown"
# Format size
size = item["size"]
if size > 1024*1024:
size_str = f"{size/(1024*1024):.1f} MB"
elif size > 1024:
size_str = f"{size/1024:.1f} KB"
else:
size_str = f"{size} bytes"
html_parts.append(f'''
')
# For infinite scroll, just return cards if not first page
if page > 1:
if has_more:
query_params = f"page={page + 1}"
if folder:
query_params += f"&folder={folder}"
if collection:
query_params += f"&collection={collection}"
if tag:
query_params += f"&tag={tag}"
html_parts.append(f'''
Loading more...
''')
return HTMLResponse('\n'.join(html_parts))
# First page - full content
infinite_scroll_trigger = ""
if has_more:
query_params = "page=2"
if folder:
query_params += f"&folder={folder}"
if collection:
query_params += f"&collection={collection}"
if tag:
query_params += f"&tag={tag}"
infinite_scroll_trigger = f'''
Loading more...
'''
content = f'''
Cache ({total} items)
{''.join(html_parts)}
{infinite_scroll_trigger}
'''
return HTMLResponse(render_page("Cache", content, current_user, active_tab="cache"))
# JSON response for APIs - list all hashes with optional pagination
all_hashes = [cf.content_hash for cf in cache_manager.list_all()]
total = len(all_hashes)
start = (page - 1) * limit
end = start + limit
hashes_page = all_hashes[start:end]
has_more = end < total
return {
"hashes": hashes_page,
"pagination": {
"page": page,
"limit": limit,
"total": total,
"has_more": has_more
}
}
@app.delete("/cache/{content_hash}")
async def discard_cache(content_hash: str, username: str = Depends(get_required_user)):
"""
Discard (delete) a cached item.
Enforces deletion rules:
- Cannot delete items published to L2 (shared)
- Cannot delete inputs/outputs of activities (runs)
- Cannot delete pinned items
"""
# Check if content exists
if not cache_manager.has_content(content_hash):
raise HTTPException(404, "Content not found")
# Check ownership
user_hashes = get_user_cache_hashes(username)
if content_hash not in user_hashes:
raise HTTPException(403, "Access denied")
# Check if pinned (legacy metadata)
meta = load_cache_meta(content_hash)
if meta.get("pinned"):
pin_reason = meta.get("pin_reason", "unknown")
raise HTTPException(400, f"Cannot discard pinned item (reason: {pin_reason})")
# Check if used by any run (Redis runs, not just activity store)
runs_using = find_runs_using_content(content_hash)
if runs_using:
run, role = runs_using[0]
raise HTTPException(400, f"Cannot discard: item is {role} of run {run.run_id}")
# Check deletion rules via cache_manager (L2 shared status, activity store)
can_delete, reason = cache_manager.can_delete(content_hash)
if not can_delete:
raise HTTPException(400, f"Cannot discard: {reason}")
# Delete via cache_manager
success, msg = cache_manager.delete_by_content_hash(content_hash)
if not success:
# Fallback to legacy deletion
cache_path = get_cache_path(content_hash)
if cache_path and cache_path.exists():
cache_path.unlink()
# Clean up legacy metadata files
meta_path = CACHE_DIR / f"{content_hash}.meta.json"
if meta_path.exists():
meta_path.unlink()
mp4_path = CACHE_DIR / f"{content_hash}.mp4"
if mp4_path.exists():
mp4_path.unlink()
return {"discarded": True, "content_hash": content_hash}
@app.delete("/ui/cache/{content_hash}/discard", response_class=HTMLResponse)
async def ui_discard_cache(content_hash: str, request: Request):
"""HTMX handler: discard a cached item."""
current_user = get_user_from_cookie(request)
if not current_user:
return '
Login required
'
# Check ownership
user_hashes = get_user_cache_hashes(current_user)
if content_hash not in user_hashes:
return '
Access denied
'
# Check if content exists
if not cache_manager.has_content(content_hash):
return '
Content not found
'
# Check if pinned (legacy metadata)
meta = load_cache_meta(content_hash)
if meta.get("pinned"):
pin_reason = meta.get("pin_reason", "unknown")
return f'
Cannot discard: item is pinned ({pin_reason})
'
# Check if used by any run (Redis runs, not just activity store)
runs_using = find_runs_using_content(content_hash)
if runs_using:
run, role = runs_using[0]
return f'
Cannot discard: item is {role} of run {run.run_id}
'
# Check deletion rules via cache_manager (L2 shared status, activity store)
can_delete, reason = cache_manager.can_delete(content_hash)
if not can_delete:
return f'
Cannot discard: {reason}
'
# Delete via cache_manager
success, msg = cache_manager.delete_by_content_hash(content_hash)
if not success:
# Fallback to legacy deletion
cache_path = get_cache_path(content_hash)
if cache_path and cache_path.exists():
cache_path.unlink()
# Clean up legacy metadata files
meta_path = CACHE_DIR / f"{content_hash}.meta.json"
if meta_path.exists():
meta_path.unlink()
mp4_path = CACHE_DIR / f"{content_hash}.mp4"
if mp4_path.exists():
mp4_path.unlink()
return '''
'''
# Known assets (bootstrap data)
KNOWN_ASSETS = {
"cat": "33268b6e167deaf018cc538de12dbe562612b33e89a749391cef855b320a269b",
}
@app.get("/assets")
async def list_assets():
"""List known assets."""
return KNOWN_ASSETS
@app.post("/cache/import")
async def import_to_cache(path: str):
"""Import a local file to cache."""
source = Path(path)
if not source.exists():
raise HTTPException(404, f"File not found: {path}")
content_hash = cache_file(source)
return {"content_hash": content_hash, "cached": True}
def save_cache_meta(content_hash: str, uploader: str = None, filename: str = None, **updates):
"""Save or update metadata for a cached file."""
meta_path = CACHE_DIR / f"{content_hash}.meta.json"
# Load existing or create new
if meta_path.exists():
with open(meta_path) as f:
meta = json.load(f)
else:
meta = {
"uploader": uploader,
"uploaded_at": datetime.now(timezone.utc).isoformat(),
"filename": filename
}
# Apply updates (but never change uploader or uploaded_at)
for key, value in updates.items():
if key not in ("uploader", "uploaded_at"):
meta[key] = value
with open(meta_path, "w") as f:
json.dump(meta, f, indent=2)
return meta
def load_cache_meta(content_hash: str) -> dict:
"""Load metadata for a cached file."""
meta_path = CACHE_DIR / f"{content_hash}.meta.json"
if meta_path.exists():
with open(meta_path) as f:
return json.load(f)
return {}
# User data storage (folders, collections)
USER_DATA_DIR = CACHE_DIR / ".user-data"
def load_user_data(username: str) -> dict:
"""Load user's folders and collections."""
USER_DATA_DIR.mkdir(parents=True, exist_ok=True)
# Normalize username (remove @ prefix if present)
safe_name = username.replace("@", "").replace("/", "_")
user_file = USER_DATA_DIR / f"{safe_name}.json"
if user_file.exists():
with open(user_file) as f:
return json.load(f)
return {"folders": ["/"], "collections": []}
def save_user_data(username: str, data: dict):
"""Save user's folders and collections."""
USER_DATA_DIR.mkdir(parents=True, exist_ok=True)
safe_name = username.replace("@", "").replace("/", "_")
user_file = USER_DATA_DIR / f"{safe_name}.json"
with open(user_file, "w") as f:
json.dump(data, f, indent=2)
def get_user_cache_hashes(username: str) -> set:
"""Get all cache hashes owned by or associated with a user."""
actor_id = f"@{username}@{L2_DOMAIN}"
hashes = set()
# Files uploaded by user
if CACHE_DIR.exists():
for f in CACHE_DIR.iterdir():
if f.name.endswith('.meta.json'):
meta = load_cache_meta(f.name.replace('.meta.json', ''))
if meta.get("uploader") in (username, actor_id):
hashes.add(f.name.replace('.meta.json', ''))
# Files from user's runs (inputs and outputs)
for run in list_all_runs():
if run.username in (username, actor_id):
hashes.update(run.inputs)
if run.output_hash:
hashes.add(run.output_hash)
return hashes
@app.post("/cache/upload")
async def upload_to_cache(file: UploadFile = File(...), username: str = Depends(get_required_user)):
"""Upload a file to cache. Requires authentication."""
# Write to temp file first
import tempfile
with tempfile.NamedTemporaryFile(delete=False) as tmp:
content = await file.read()
tmp.write(content)
tmp_path = Path(tmp.name)
# Store in cache via cache_manager
cached = cache_manager.put(tmp_path, node_type="upload", move=True)
content_hash = cached.content_hash
# Save uploader metadata
actor_id = f"@{username}@{L2_DOMAIN}"
save_cache_meta(content_hash, actor_id, file.filename)
return {"content_hash": content_hash, "filename": file.filename, "size": len(content)}
class CacheMetaUpdate(BaseModel):
"""Request to update cache metadata."""
origin: Optional[dict] = None # {"type": "self"|"external", "url": "...", "note": "..."}
description: Optional[str] = None
tags: Optional[list[str]] = None
folder: Optional[str] = None
collections: Optional[list[str]] = None
class PublishRequest(BaseModel):
"""Request to publish a cache item to L2."""
asset_name: str
asset_type: str = "image" # image, video, etc.
@app.get("/cache/{content_hash}/meta")
async def get_cache_meta(content_hash: str, username: str = Depends(get_required_user)):
"""Get metadata for a cached file."""
# Check file exists
cache_path = get_cache_path(content_hash)
if not cache_path:
raise HTTPException(404, "Content not found")
# Check ownership
user_hashes = get_user_cache_hashes(username)
if content_hash not in user_hashes:
raise HTTPException(403, "Access denied")
return load_cache_meta(content_hash)
@app.patch("/cache/{content_hash}/meta")
async def update_cache_meta(content_hash: str, update: CacheMetaUpdate, username: str = Depends(get_required_user)):
"""Update metadata for a cached file."""
# Check file exists
cache_path = get_cache_path(content_hash)
if not cache_path:
raise HTTPException(404, "Content not found")
# Check ownership
user_hashes = get_user_cache_hashes(username)
if content_hash not in user_hashes:
raise HTTPException(403, "Access denied")
# Build update dict from non-None fields
updates = {}
if update.origin is not None:
updates["origin"] = update.origin
if update.description is not None:
updates["description"] = update.description
if update.tags is not None:
updates["tags"] = update.tags
if update.folder is not None:
# Ensure folder exists in user's folder list
user_data = load_user_data(username)
if update.folder not in user_data["folders"]:
raise HTTPException(400, f"Folder does not exist: {update.folder}")
updates["folder"] = update.folder
if update.collections is not None:
# Validate collections exist
user_data = load_user_data(username)
existing = {c["name"] for c in user_data["collections"]}
for col in update.collections:
if col not in existing:
raise HTTPException(400, f"Collection does not exist: {col}")
updates["collections"] = update.collections
meta = save_cache_meta(content_hash, **updates)
return meta
@app.post("/cache/{content_hash}/publish")
async def publish_cache_to_l2(
content_hash: str,
req: PublishRequest,
request: Request,
username: str = Depends(get_required_user)
):
"""
Publish a cache item to L2 (ActivityPub).
Requires origin to be set in metadata before publishing.
"""
# Check file exists
cache_path = get_cache_path(content_hash)
if not cache_path:
raise HTTPException(404, "Content not found")
# Check ownership
user_hashes = get_user_cache_hashes(username)
if content_hash not in user_hashes:
raise HTTPException(403, "Access denied")
# Load metadata
meta = load_cache_meta(content_hash)
# Check origin is set
origin = meta.get("origin")
if not origin or "type" not in origin:
raise HTTPException(400, "Origin must be set before publishing. Use --origin self or --origin-url ")
# Get auth token to pass to L2
token = request.cookies.get("auth_token")
if not token:
# Try from header
auth_header = request.headers.get("Authorization", "")
if auth_header.startswith("Bearer "):
token = auth_header[7:]
if not token:
raise HTTPException(401, "Authentication token required")
# Call L2 publish-cache endpoint
try:
resp = http_requests.post(
f"{L2_SERVER}/registry/publish-cache",
headers={"Authorization": f"Bearer {token}"},
json={
"content_hash": content_hash,
"asset_name": req.asset_name,
"asset_type": req.asset_type,
"origin": origin,
"description": meta.get("description"),
"tags": meta.get("tags", []),
"metadata": {
"filename": meta.get("filename"),
"folder": meta.get("folder"),
"collections": meta.get("collections", [])
}
},
timeout=10
)
resp.raise_for_status()
l2_result = resp.json()
except http_requests.exceptions.HTTPError as e:
error_detail = ""
try:
error_detail = e.response.json().get("detail", str(e))
except Exception:
error_detail = str(e)
raise HTTPException(400, f"L2 publish failed: {error_detail}")
except Exception as e:
raise HTTPException(500, f"L2 publish failed: {e}")
# Update local metadata with publish status and pin
publish_info = {
"to_l2": True,
"asset_name": req.asset_name,
"published_at": datetime.now(timezone.utc).isoformat(),
"last_synced_at": datetime.now(timezone.utc).isoformat()
}
save_cache_meta(content_hash, published=publish_info, pinned=True, pin_reason="published")
return {
"published": True,
"asset_name": req.asset_name,
"l2_result": l2_result
}
@app.patch("/cache/{content_hash}/republish")
async def republish_cache_to_l2(
content_hash: str,
request: Request,
username: str = Depends(get_required_user)
):
"""
Re-publish (update) a cache item on L2 after metadata changes.
Only works for already-published items.
"""
# Check file exists
cache_path = get_cache_path(content_hash)
if not cache_path:
raise HTTPException(404, "Content not found")
# Check ownership
user_hashes = get_user_cache_hashes(username)
if content_hash not in user_hashes:
raise HTTPException(403, "Access denied")
# Load metadata
meta = load_cache_meta(content_hash)
# Check already published
published = meta.get("published", {})
if not published.get("to_l2"):
raise HTTPException(400, "Item not published yet. Use publish first.")
asset_name = published.get("asset_name")
if not asset_name:
raise HTTPException(400, "No asset name found in publish info")
# Get auth token
token = request.cookies.get("auth_token")
if not token:
auth_header = request.headers.get("Authorization", "")
if auth_header.startswith("Bearer "):
token = auth_header[7:]
if not token:
raise HTTPException(401, "Authentication token required")
# Call L2 update endpoint
try:
resp = http_requests.patch(
f"{L2_SERVER}/registry/{asset_name}",
headers={"Authorization": f"Bearer {token}"},
json={
"description": meta.get("description"),
"tags": meta.get("tags"),
"origin": meta.get("origin"),
"metadata": {
"filename": meta.get("filename"),
"folder": meta.get("folder"),
"collections": meta.get("collections", [])
}
},
timeout=10
)
resp.raise_for_status()
l2_result = resp.json()
except http_requests.exceptions.HTTPError as e:
error_detail = ""
try:
error_detail = e.response.json().get("detail", str(e))
except Exception:
error_detail = str(e)
raise HTTPException(400, f"L2 update failed: {error_detail}")
except Exception as e:
raise HTTPException(500, f"L2 update failed: {e}")
# Update local metadata
published["last_synced_at"] = datetime.now(timezone.utc).isoformat()
save_cache_meta(content_hash, published=published)
return {
"updated": True,
"asset_name": asset_name,
"l2_result": l2_result
}
# ============ Folder & Collection Management ============
@app.get("/user/folders")
async def list_folders(username: str = Depends(get_required_user)):
"""List user's folders."""
user_data = load_user_data(username)
return {"folders": user_data["folders"]}
@app.post("/user/folders")
async def create_folder(folder_path: str, username: str = Depends(get_required_user)):
"""Create a new folder."""
user_data = load_user_data(username)
# Validate path format
if not folder_path.startswith("/"):
raise HTTPException(400, "Folder path must start with /")
# Check parent exists
parent = "/".join(folder_path.rsplit("/", 1)[:-1]) or "/"
if parent != "/" and parent not in user_data["folders"]:
raise HTTPException(400, f"Parent folder does not exist: {parent}")
# Check doesn't already exist
if folder_path in user_data["folders"]:
raise HTTPException(400, f"Folder already exists: {folder_path}")
user_data["folders"].append(folder_path)
user_data["folders"].sort()
save_user_data(username, user_data)
return {"folder": folder_path, "created": True}
@app.delete("/user/folders")
async def delete_folder(folder_path: str, username: str = Depends(get_required_user)):
"""Delete a folder (must be empty)."""
if folder_path == "/":
raise HTTPException(400, "Cannot delete root folder")
user_data = load_user_data(username)
if folder_path not in user_data["folders"]:
raise HTTPException(404, "Folder not found")
# Check no subfolders
for f in user_data["folders"]:
if f.startswith(folder_path + "/"):
raise HTTPException(400, f"Folder has subfolders: {f}")
# Check no items in folder
user_hashes = get_user_cache_hashes(username)
for h in user_hashes:
meta = load_cache_meta(h)
if meta.get("folder") == folder_path:
raise HTTPException(400, "Folder is not empty")
user_data["folders"].remove(folder_path)
save_user_data(username, user_data)
return {"folder": folder_path, "deleted": True}
@app.get("/user/collections")
async def list_collections(username: str = Depends(get_required_user)):
"""List user's collections."""
user_data = load_user_data(username)
return {"collections": user_data["collections"]}
@app.post("/user/collections")
async def create_collection(name: str, username: str = Depends(get_required_user)):
"""Create a new collection."""
user_data = load_user_data(username)
# Check doesn't already exist
for col in user_data["collections"]:
if col["name"] == name:
raise HTTPException(400, f"Collection already exists: {name}")
user_data["collections"].append({
"name": name,
"created_at": datetime.now(timezone.utc).isoformat()
})
save_user_data(username, user_data)
return {"collection": name, "created": True}
@app.delete("/user/collections")
async def delete_collection(name: str, username: str = Depends(get_required_user)):
"""Delete a collection."""
user_data = load_user_data(username)
# Find and remove
for i, col in enumerate(user_data["collections"]):
if col["name"] == name:
user_data["collections"].pop(i)
save_user_data(username, user_data)
# Remove from all cache items
user_hashes = get_user_cache_hashes(username)
for h in user_hashes:
meta = load_cache_meta(h)
if name in meta.get("collections", []):
meta["collections"].remove(name)
save_cache_meta(h, **{k: v for k, v in meta.items() if k not in ("uploader", "uploaded_at")})
return {"collection": name, "deleted": True}
raise HTTPException(404, "Collection not found")
def is_ios_request(request: Request) -> bool:
"""Check if request is from iOS device."""
ua = request.headers.get("user-agent", "").lower()
return "iphone" in ua or "ipad" in ua
def video_src_for_request(content_hash: str, request: Request) -> str:
"""Get video src URL, using MP4 endpoint for iOS."""
if is_ios_request(request):
return f"/cache/{content_hash}/mp4"
return f"/cache/{content_hash}"
def detect_media_type(cache_path: Path) -> str:
"""Detect if file is image or video based on magic bytes."""
with open(cache_path, "rb") as f:
header = f.read(32)
# Video signatures
if header[:4] == b'\x1a\x45\xdf\xa3': # WebM/MKV
return "video"
if header[4:8] == b'ftyp': # MP4/MOV
return "video"
if header[:4] == b'RIFF' and header[8:12] == b'AVI ': # AVI
return "video"
# Image signatures
if header[:8] == b'\x89PNG\r\n\x1a\n': # PNG
return "image"
if header[:2] == b'\xff\xd8': # JPEG
return "image"
if header[:6] in (b'GIF87a', b'GIF89a'): # GIF
return "image"
if header[:4] == b'RIFF' and header[8:12] == b'WEBP': # WebP
return "image"
return "unknown"
def get_user_from_cookie(request) -> Optional[str]:
"""Get username from auth cookie."""
token = request.cookies.get("auth_token")
if not token:
return None
return verify_token_with_l2(token)
def wants_html(request: Request) -> bool:
"""Check if request wants HTML (browser) vs JSON (API)."""
accept = request.headers.get("accept", "")
return "text/html" in accept and "application/json" not in accept
# Tailwind CSS config for all L1 templates
TAILWIND_CONFIG = '''
'''
def render_page(title: str, content: str, username: Optional[str] = None, active_tab: str = None) -> str:
"""Render a page with nav bar and content. Used for clean URL pages."""
user_info = ""
if username:
user_info = f'''
')
# Get the run to pin its output and inputs
run = load_run(run_id)
if not run:
return HTMLResponse('
Run not found
')
# Call L2 to publish the run, including this L1's public URL
# Longer timeout because L2 calls back to L1 to fetch run details
try:
resp = http_requests.post(
f"{L2_SERVER}/registry/record-run",
json={"run_id": run_id, "output_name": output_name, "l1_server": L1_PUBLIC_URL},
headers={"Authorization": f"Bearer {token}"},
timeout=30
)
if resp.status_code == 400:
error = resp.json().get("detail", "Bad request")
return HTMLResponse(f'
Error: {error}
')
resp.raise_for_status()
result = resp.json()
# Pin the output
if run.output_hash:
save_cache_meta(run.output_hash, pinned=True, pin_reason="published")
# Pin the inputs (for provenance)
for input_hash in run.inputs:
save_cache_meta(input_hash, pinned=True, pin_reason="input_to_published")
return HTMLResponse(f'''
')
@app.get("/ui/runs", response_class=HTMLResponse)
async def ui_runs(request: Request):
"""HTMX partial: list of runs."""
current_user = get_user_from_cookie(request)
runs = list_all_runs()
# Require login to see runs
if not current_user:
return '
'
# Filter runs by user - match both plain username and ActivityPub format (@user@domain)
actor_id = f"@{current_user}@{L2_DOMAIN}"
runs = [r for r in runs if r.username in (current_user, actor_id)]
if not runs:
return '
'
# Get hashes owned by/associated with this user
user_hashes = get_user_cache_hashes(current_user)
# Get cache items that belong to the user (from cache_manager)
cache_items = []
for cached_file in cache_manager.list_all():
content_hash = cached_file.content_hash
if content_hash not in user_hashes:
continue
# Load metadata for filtering
meta = load_cache_meta(content_hash)
# Apply folder filter
if folder:
item_folder = meta.get("folder", "/")
if folder != "/" and not item_folder.startswith(folder):
continue
if folder == "/" and item_folder != "/":
continue
# Apply collection filter
if collection:
if collection not in meta.get("collections", []):
continue
# Apply tag filter
if tag:
if tag not in meta.get("tags", []):
continue
cache_items.append({
"hash": content_hash,
"size": cached_file.size_bytes,
"mtime": cached_file.created_at,
"meta": meta
})
# Sort by modification time (newest first)
cache_items.sort(key=lambda x: x["mtime"], reverse=True)
if not cache_items:
filter_msg = ""
if folder:
filter_msg = f" in folder {folder}"
elif collection:
filter_msg = f" in collection '{collection}'"
elif tag:
filter_msg = f" with tag '{tag}'"
return f'
No cached files{filter_msg}. Upload files or run effects to see them here.
'
html_parts = ['
']
for item in cache_items[:50]: # Limit to 50 items
content_hash = item["hash"]
cache_path = get_cache_path(content_hash)
media_type = detect_media_type(cache_path) if cache_path else "unknown"
# Format size
size = item["size"]
if size > 1024*1024:
size_str = f"{size/(1024*1024):.1f} MB"
elif size > 1024:
size_str = f"{size/1024:.1f} KB"
else:
size_str = f"{size} bytes"
html_parts.append(f'''