"""
@app.get("/", response_class=HTMLResponse)
async def root(request: Request):
"""Home page."""
ctx = get_user_context_from_cookie(request)
actor_id = ctx.actor_id if ctx else None
return render_home_html(actor_id)
@app.post("/runs", response_model=RunStatus)
async def create_run(request: RunRequest, ctx: UserContext = Depends(get_required_user_context)):
"""Start a new rendering run. Requires authentication."""
run_id = str(uuid.uuid4())
# Generate output name if not provided
output_name = request.output_name or f"{request.recipe}-{run_id[:8]}"
# Use actor_id from user context
actor_id = ctx.actor_id
# Create run record
run = RunStatus(
run_id=run_id,
status="pending",
recipe=request.recipe,
inputs=request.inputs,
output_name=output_name,
created_at=datetime.now(timezone.utc).isoformat(),
username=actor_id
)
# Submit to Celery
if request.use_dag or request.recipe == "dag":
# DAG mode - use artdag engine
if request.dag_json:
# Custom DAG provided
dag_json = request.dag_json
else:
# Build simple effect DAG from recipe and inputs
dag = build_effect_dag(request.inputs, request.recipe)
dag_json = dag.to_json()
task = execute_dag.delay(dag_json, run.run_id)
else:
# Legacy mode - single effect
if len(request.inputs) != 1:
raise HTTPException(400, "Legacy mode only supports single-input recipes. Use use_dag=true for multi-input.")
input_hash = request.inputs[0]
task = render_effect.delay(input_hash, request.recipe, output_name)
run.celery_task_id = task.id
run.status = "running"
save_run(run)
return run
@app.get("/runs/{run_id}", response_model=RunStatus)
async def get_run(run_id: str):
"""Get status of a run."""
run = load_run(run_id)
if not run:
raise HTTPException(404, f"Run {run_id} not found")
# Check Celery task status if running
if run.status == "running" and run.celery_task_id:
task = celery_app.AsyncResult(run.celery_task_id)
if task.ready():
if task.successful():
result = task.result
run.status = "completed"
run.completed_at = datetime.now(timezone.utc).isoformat()
# Handle both legacy (render_effect) and new (execute_dag) result formats
if "output_hash" in result:
# New DAG result format
run.output_hash = result.get("output_hash")
output_path = Path(result.get("output_path", "")) if result.get("output_path") else None
else:
# Legacy render_effect format
run.output_hash = result.get("output", {}).get("content_hash")
output_path = Path(result.get("output", {}).get("local_path", ""))
# Extract effects info from provenance (legacy only)
effects = result.get("effects", [])
if effects:
run.effects_commit = effects[0].get("repo_commit")
run.effect_url = effects[0].get("repo_url")
# Extract infrastructure info (legacy only)
run.infrastructure = result.get("infrastructure")
# Cache the output (legacy mode - DAG already caches via cache_manager)
if output_path and output_path.exists() and "output_hash" not in result:
cache_file(output_path, node_type="effect_output")
# Record activity for deletion tracking (legacy mode)
if run.output_hash and run.inputs:
cache_manager.record_simple_activity(
input_hashes=run.inputs,
output_hash=run.output_hash,
run_id=run.run_id,
)
else:
run.status = "failed"
run.error = str(task.result)
# Save updated status
save_run(run)
return run
@app.delete("/runs/{run_id}")
async def discard_run(run_id: str, ctx: UserContext = Depends(get_required_user_context)):
"""
Discard (delete) a run and its outputs.
Enforces deletion rules:
- Cannot discard if output is published to L2 (pinned)
- Deletes outputs and intermediate cache entries
- Preserves inputs (cache items and recipes are NOT deleted)
"""
run = load_run(run_id)
if not run:
raise HTTPException(404, f"Run {run_id} not found")
# Check ownership
if run.username not in (ctx.username, ctx.actor_id):
raise HTTPException(403, "Access denied")
# Failed runs can always be deleted (no output to protect)
if run.status != "failed":
# Only check if output is pinned - inputs are preserved, not deleted
if run.output_hash:
meta = await database.load_item_metadata(run.output_hash, ctx.actor_id)
if meta.get("pinned"):
pin_reason = meta.get("pin_reason", "published")
raise HTTPException(400, f"Cannot discard run: output {run.output_hash[:16]}... is pinned ({pin_reason})")
# Check if activity exists for this run
activity = cache_manager.get_activity(run_id)
if activity:
# Discard the activity - only delete outputs, preserve inputs
success, msg = cache_manager.discard_activity_outputs_only(run_id)
if not success:
raise HTTPException(400, f"Cannot discard run: {msg}")
# Remove from Redis
redis_client.delete(f"{RUNS_KEY_PREFIX}{run_id}")
return {"discarded": True, "run_id": run_id}
@app.delete("/ui/runs/{run_id}/discard", response_class=HTMLResponse)
async def ui_discard_run(run_id: str, request: Request):
"""HTMX handler: discard a run. Only deletes outputs, preserves inputs."""
ctx = get_user_context_from_cookie(request)
if not ctx:
return '
Login required
'
run = load_run(run_id)
if not run:
return '
Run not found
'
# Check ownership
if run.username not in (ctx.username, ctx.actor_id):
return '
Access denied
'
# Failed runs can always be deleted
if run.status != "failed":
# Only check if output is pinned - inputs are preserved, not deleted
if run.output_hash:
meta = await database.load_item_metadata(run.output_hash, ctx.actor_id)
if meta.get("pinned"):
pin_reason = meta.get("pin_reason", "published")
return f'
Cannot discard: output is pinned ({pin_reason})
'
# Check if activity exists for this run
activity = cache_manager.get_activity(run_id)
if activity:
# Discard the activity - only delete outputs, preserve inputs
success, msg = cache_manager.discard_activity_outputs_only(run_id)
if not success:
return f'
Cannot discard: {msg}
'
# Remove from Redis
redis_client.delete(f"{RUNS_KEY_PREFIX}{run_id}")
return '''
'''
@app.get("/run/{run_id}")
async def run_detail(run_id: str, request: Request):
"""Run detail. HTML for browsers, JSON for APIs."""
run = load_run(run_id)
if not run:
if wants_html(request):
content = f'
Run not found: {run_id}
'
return HTMLResponse(render_page("Not Found", content, None, active_tab="runs"), status_code=404)
raise HTTPException(404, f"Run {run_id} not found")
# Check Celery task status if running
if run.status == "running" and run.celery_task_id:
task = celery_app.AsyncResult(run.celery_task_id)
if task.ready():
if task.successful():
result = task.result
run.status = "completed"
run.completed_at = datetime.now(timezone.utc).isoformat()
run.output_hash = result.get("output", {}).get("content_hash")
effects = result.get("effects", [])
if effects:
run.effects_commit = effects[0].get("repo_commit")
run.effect_url = effects[0].get("repo_url")
run.infrastructure = result.get("infrastructure")
output_path = Path(result.get("output", {}).get("local_path", ""))
if output_path.exists():
cache_file(output_path)
else:
run.status = "failed"
run.error = str(task.result)
save_run(run)
if wants_html(request):
ctx = get_user_context_from_cookie(request)
if not ctx:
content = '
'
return HTMLResponse(render_page("Login Required", content, None, active_tab="runs"), status_code=401)
# Check user owns this run
if run.username not in (ctx.username, ctx.actor_id):
content = '
Access denied.
'
return HTMLResponse(render_page("Access Denied", content, ctx.actor_id, active_tab="runs"), status_code=403)
# Build effect URL
if run.effect_url:
effect_url = run.effect_url
elif run.effects_commit and run.effects_commit != "unknown":
effect_url = f"https://git.rose-ash.com/art-dag/effects/src/commit/{run.effects_commit}/{run.recipe}"
else:
effect_url = f"https://git.rose-ash.com/art-dag/effects/src/branch/main/{run.recipe}"
# Status badge colors
status_colors = {
"completed": "bg-green-600 text-white",
"running": "bg-yellow-600 text-white",
"failed": "bg-red-600 text-white",
"pending": "bg-gray-600 text-white"
}
status_badge = status_colors.get(run.status, "bg-gray-600 text-white")
# Build media HTML for input/output
media_html = ""
has_input = run.inputs and cache_manager.has_content(run.inputs[0])
has_output = run.status == "completed" and run.output_hash and cache_manager.has_content(run.output_hash)
if has_input or has_output:
media_html = '
'''
return HTMLResponse(render_page(f"Run: {run.recipe}", content, ctx.actor_id, active_tab="runs"))
# JSON response
return run.model_dump()
@app.get("/runs")
async def list_runs(request: Request, page: int = 1, limit: int = 20):
"""List runs. HTML for browsers (with infinite scroll), JSON for APIs (with pagination)."""
ctx = get_user_context_from_cookie(request)
all_runs = list_all_runs()
total = len(all_runs)
# Filter by user if logged in for HTML
if wants_html(request) and ctx:
all_runs = [r for r in all_runs if r.username in (ctx.username, ctx.actor_id)]
total = len(all_runs)
# Pagination
start = (page - 1) * limit
end = start + limit
runs_page = all_runs[start:end]
has_more = end < total
if wants_html(request):
if not ctx:
content = '
',
None,
active_tab="recipes"
))
# Filter to user's recipes
user_recipes = [c for c in all_recipes if c.uploader in (ctx.username, ctx.actor_id)]
total = len(user_recipes)
if not user_recipes:
content = '''
Recipes (0)
No recipes yet. Upload a recipe YAML file to get started.
'''
return HTMLResponse(render_page("Recipes", content, ctx.actor_id, active_tab="recipes"))
html_parts = []
for recipe in user_recipes:
var_count = len(recipe.variable_inputs)
fixed_count = len(recipe.fixed_inputs)
input_info = []
if var_count:
input_info.append(f"{var_count} variable")
if fixed_count:
input_info.append(f"{fixed_count} fixed")
inputs_str = ", ".join(input_info) if input_info else "no inputs"
html_parts.append(f'''
'''
return HTMLResponse(render_page(f"Recipe: {recipe.name}", content, ctx.actor_id if ctx else None, active_tab="recipes"))
@app.post("/ui/recipes/{recipe_id}/run", response_class=HTMLResponse)
async def ui_run_recipe(recipe_id: str, request: Request):
"""HTMX handler: run a recipe with form inputs."""
ctx = get_user_context_from_cookie(request)
if not ctx:
return '
Login required
'
recipe = load_recipe(recipe_id)
if not recipe:
return '
Recipe not found
'
# Parse form data
form_data = await request.form()
inputs = {}
for var_input in recipe.variable_inputs:
value = form_data.get(var_input.node_id, "").strip()
if var_input.required and not value:
return f'
Missing required input: {var_input.name}
'
if value:
inputs[var_input.node_id] = value
# Load recipe YAML
recipe_path = cache_manager.get_by_content_hash(recipe_id)
if not recipe_path:
return '
Recipe YAML not found in cache
'
try:
with open(recipe_path) as f:
yaml_config = yaml.safe_load(f)
# Build DAG from recipe
dag = build_dag_from_recipe(yaml_config, inputs, recipe)
# Create run
run_id = str(uuid.uuid4())
actor_id = ctx.actor_id
# Collect all input hashes
all_inputs = list(inputs.values())
for fixed in recipe.fixed_inputs:
if fixed.content_hash:
all_inputs.append(fixed.content_hash)
run = RunStatus(
run_id=run_id,
status="pending",
recipe=f"recipe:{recipe.name}",
inputs=all_inputs,
output_name=f"{recipe.name}-{run_id[:8]}",
created_at=datetime.now(timezone.utc).isoformat(),
username=actor_id
)
# Submit to Celery
dag_json = dag.to_json()
task = execute_dag.delay(dag_json, run.run_id)
run.celery_task_id = task.id
run.status = "running"
save_run(run)
return f'''
'
@app.get("/ui/recipes-list", response_class=HTMLResponse)
async def ui_recipes_list(request: Request):
"""HTMX partial: list of recipes."""
ctx = get_user_context_from_cookie(request)
if not ctx:
return '
'
all_recipes = list_all_recipes()
# Filter to user's recipes
user_recipes = [c for c in all_recipes if c.uploader in (ctx.username, ctx.actor_id)]
if not user_recipes:
return '
No recipes yet. Upload a recipe YAML file to get started.
'
html_parts = ['
']
for recipe in user_recipes:
var_count = len(recipe.variable_inputs)
fixed_count = len(recipe.fixed_inputs)
input_info = []
if var_count:
input_info.append(f"{var_count} variable")
if fixed_count:
input_info.append(f"{fixed_count} fixed")
inputs_str = ", ".join(input_info) if input_info else "no inputs"
html_parts.append(f'''
'''
@app.get("/cache/{content_hash}")
async def get_cached(content_hash: str):
"""Get cached content by hash."""
cache_path = get_cache_path(content_hash)
if not cache_path:
raise HTTPException(404, f"Content {content_hash} not in cache")
return FileResponse(cache_path)
@app.get("/cache/{content_hash}/mp4")
async def get_cached_mp4(content_hash: str):
"""Get cached content as MP4 (transcodes MKV on first request, caches result)."""
cache_path = get_cache_path(content_hash)
if not cache_path:
raise HTTPException(404, f"Content {content_hash} not in cache")
# MP4 transcodes stored alongside original in CACHE_DIR
mp4_path = CACHE_DIR / f"{content_hash}.mp4"
# If MP4 already cached, serve it
if mp4_path.exists():
return FileResponse(mp4_path, media_type="video/mp4")
# Check if source is already MP4
media_type = detect_media_type(cache_path)
if media_type != "video":
raise HTTPException(400, "Content is not a video")
# Check if already MP4 format
import subprocess
try:
result = subprocess.run(
["ffprobe", "-v", "error", "-select_streams", "v:0",
"-show_entries", "format=format_name", "-of", "csv=p=0", str(cache_path)],
capture_output=True, text=True, timeout=10
)
if "mp4" in result.stdout.lower() or "mov" in result.stdout.lower():
# Already MP4-compatible, just serve original
return FileResponse(cache_path, media_type="video/mp4")
except Exception:
pass # Continue with transcoding
# Transcode to MP4 (H.264 + AAC)
transcode_path = CACHE_DIR / f"{content_hash}.transcoding.mp4"
try:
result = subprocess.run(
["ffmpeg", "-y", "-i", str(cache_path),
"-c:v", "libx264", "-preset", "fast", "-crf", "23",
"-c:a", "aac", "-b:a", "128k",
"-movflags", "+faststart",
str(transcode_path)],
capture_output=True, text=True, timeout=600 # 10 min timeout
)
if result.returncode != 0:
raise HTTPException(500, f"Transcoding failed: {result.stderr[:200]}")
# Move to final location
transcode_path.rename(mp4_path)
except subprocess.TimeoutExpired:
if transcode_path.exists():
transcode_path.unlink()
raise HTTPException(500, "Transcoding timed out")
except Exception as e:
if transcode_path.exists():
transcode_path.unlink()
raise HTTPException(500, f"Transcoding failed: {e}")
return FileResponse(mp4_path, media_type="video/mp4")
@app.get("/cache/{content_hash}/detail")
async def cache_detail(content_hash: str, request: Request):
"""View cached content detail. HTML for browsers, JSON for APIs."""
ctx = get_user_context_from_cookie(request)
cache_path = get_cache_path(content_hash)
if not cache_path:
if wants_html(request):
content = f'
Content not found: {content_hash}
'
return HTMLResponse(render_page("Not Found", content, ctx.actor_id if ctx else None, active_tab="media"), status_code=404)
raise HTTPException(404, f"Content {content_hash} not in cache")
if wants_html(request):
if not ctx:
content = '
'''
# Add IPFS section if we have a CID
if ipfs_cid:
# Build gateway links - local gateway first if configured
gateway_links = []
if IPFS_GATEWAY_URL:
gateway_links.append(f'''
Local Gateway
''')
gateway_links.extend([
f'''
ipfs.io
''',
f'''
dweb.link
''',
f'''
Cloudflare
''',
])
gateways_html = '\n'.join(gateway_links)
content += f'''
IPFS
Content Identifier (CID)
{ipfs_cid}
Gateways:
{gateways_html}
'''
else:
content += '''
IPFS
Not yet uploaded to IPFS
'''
content += f'''
Loading metadata...
'''
return HTMLResponse(render_page(f"Cache: {content_hash[:16]}...", content, ctx.actor_id, active_tab="media"))
# JSON response - return metadata
meta = await database.load_item_metadata(content_hash, ctx.actor_id if ctx else None)
cache_item = await database.get_cache_item(content_hash)
ipfs_cid = cache_item.get("ipfs_cid") if cache_item else None
file_size = cache_path.stat().st_size
media_type = detect_media_type(cache_path)
return {
"content_hash": content_hash,
"size": file_size,
"media_type": media_type,
"ipfs_cid": ipfs_cid,
"meta": meta
}
@app.get("/cache/{content_hash}/meta-form", response_class=HTMLResponse)
async def cache_meta_form(content_hash: str, request: Request):
"""Clean URL redirect to the HTMX meta form."""
# Just redirect to the old endpoint for now
from starlette.responses import RedirectResponse
return RedirectResponse(f"/ui/cache/{content_hash}/meta-form", status_code=302)
@app.get("/ui/cache/{content_hash}")
async def ui_cache_view(content_hash: str):
"""Redirect to clean URL."""
return RedirectResponse(url=f"/cache/{content_hash}/detail", status_code=302)
@app.get("/ui/cache/{content_hash}/meta-form", response_class=HTMLResponse)
async def ui_cache_meta_form(content_hash: str, request: Request):
"""HTMX partial: metadata editing form for a cached item."""
ctx = get_user_context_from_cookie(request)
if not ctx:
return '
Login required to edit metadata
'
# Check ownership
user_hashes = await get_user_cache_hashes(ctx.username, ctx.actor_id)
if content_hash not in user_hashes:
return '
Access denied
'
# Load metadata
meta = await database.load_item_metadata(content_hash, ctx.actor_id)
origin = meta.get("origin", {})
origin_type = origin.get("type", "")
origin_url = origin.get("url", "")
origin_note = origin.get("note", "")
description = meta.get("description", "")
tags = meta.get("tags", [])
tags_str = ", ".join(tags) if tags else ""
l2_shares = meta.get("l2_shares", [])
pinned = meta.get("pinned", False)
pin_reason = meta.get("pin_reason", "")
# Detect media type for publish
cache_path = get_cache_path(content_hash)
media_type = detect_media_type(cache_path) if cache_path else "unknown"
asset_type = "video" if media_type == "video" else "image"
# Origin radio checked states
self_checked = 'checked' if origin_type == "self" else ''
external_checked = 'checked' if origin_type == "external" else ''
# Build publish section - show list of L2 shares
if l2_shares:
shares_html = ""
for share in l2_shares:
l2_server = share.get("l2_server", "Unknown")
asset_name = share.get("asset_name", "")
published_at = share.get("published_at", "")[:10] if share.get("published_at") else ""
last_synced = share.get("last_synced_at", "")[:10] if share.get("last_synced_at") else ""
shares_html += f'''
{asset_name}
{l2_server}
Published: {published_at}
Synced: {last_synced}
'''
publish_html = f'''
Published to L2 ({len(l2_shares)} share{"s" if len(l2_shares) != 1 else ""})
{shares_html}
'''
else:
# Show publish form only if origin is set
if origin_type:
publish_html = f'''
'''
else:
publish_html = '''
Set an origin (self or external URL) before publishing.
'''
return f'''
Metadata
Publish to L2 (ActivityPub)
{publish_html}
Status
Pinned:
{'Yes' if pinned else 'No'}
{f'({pin_reason})' if pinned and pin_reason else ''}
Pinned items cannot be discarded. Items are pinned when published or used as inputs to published content.
{'
Cannot discard pinned items.
' if pinned else f"""
"""}
'''
@app.patch("/ui/cache/{content_hash}/meta", response_class=HTMLResponse)
async def ui_update_cache_meta(content_hash: str, request: Request):
"""HTMX handler: update cache metadata from form."""
ctx = get_user_context_from_cookie(request)
if not ctx:
return '
Login required
'
# Check ownership
user_hashes = await get_user_cache_hashes(ctx.username, ctx.actor_id)
if content_hash not in user_hashes:
return '
Access denied
'
# Parse form data
form = await request.form()
origin_type = form.get("origin_type", "")
origin_url = form.get("origin_url", "").strip()
origin_note = form.get("origin_note", "").strip()
description = form.get("description", "").strip()
tags_str = form.get("tags", "").strip()
# Build origin
source_type = None
if origin_type == "self":
source_type = "self"
elif origin_type == "external":
if not origin_url:
return '
External origin requires a URL
'
source_type = "external"
# Parse tags
tags = [t.strip() for t in tags_str.split(",") if t.strip()] if tags_str else []
# Save to database
await database.update_item_metadata(
content_hash=content_hash,
actor_id=ctx.actor_id,
item_type="media",
description=description if description else None,
source_type=source_type,
source_url=origin_url if origin_url else None,
source_note=origin_note if origin_note else None,
tags=tags
)
return '
Metadata saved!
'
@app.post("/ui/cache/{content_hash}/publish", response_class=HTMLResponse)
async def ui_publish_cache(content_hash: str, request: Request):
"""HTMX handler: publish cache item to L2."""
ctx = get_user_context_from_cookie(request)
if not ctx:
return '
Login required
'
token = request.cookies.get("auth_token")
if not token:
return '
Auth token required
'
# Check ownership
user_hashes = await get_user_cache_hashes(ctx.username, ctx.actor_id)
if content_hash not in user_hashes:
return '
Access denied
'
# Parse form
form = await request.form()
asset_name = form.get("asset_name", "").strip()
asset_type = form.get("asset_type", "image")
if not asset_name:
return '
Asset name required
'
# Load metadata from database
meta = await database.load_item_metadata(content_hash, ctx.actor_id)
origin = meta.get("origin")
if not origin or "type" not in origin:
return '
'''
@app.patch("/ui/cache/{content_hash}/republish", response_class=HTMLResponse)
async def ui_republish_cache(content_hash: str, request: Request):
"""HTMX handler: re-publish (update) cache item on L2."""
ctx = get_user_context_from_cookie(request)
if not ctx:
return '
Login required
'
token = request.cookies.get("auth_token")
if not token:
return '
Auth token required
'
# Check ownership
user_hashes = await get_user_cache_hashes(ctx.username, ctx.actor_id)
if content_hash not in user_hashes:
return '
Access denied
'
# Load metadata
meta = await database.load_item_metadata(content_hash, ctx.actor_id)
l2_shares = meta.get("l2_shares", [])
# Find share for current L2 server (user's L2)
l2_server = ctx.l2_server
current_share = None
share_index = -1
for i, share in enumerate(l2_shares):
if share.get("l2_server") == l2_server:
current_share = share
share_index = i
break
if not current_share:
return '
Item not published to this L2 yet
'
asset_name = current_share.get("asset_name")
if not asset_name:
return '
'
@app.get("/media")
async def list_media(
request: Request,
page: int = 1,
limit: int = 20,
folder: Optional[str] = None,
collection: Optional[str] = None,
tag: Optional[str] = None
):
"""List media items. HTML for browsers (with infinite scroll), JSON for APIs (with pagination)."""
ctx = get_user_context_from_cookie(request)
if wants_html(request):
# Require login for HTML media view
if not ctx:
content = '
'
return HTMLResponse(render_page("Media", content, None, active_tab="media"))
# Get hashes owned by/associated with this user
user_hashes = await get_user_cache_hashes(ctx.username, ctx.actor_id)
# Get cache items that belong to the user (from cache_manager)
cache_items = []
for cached_file in cache_manager.list_all():
content_hash = cached_file.content_hash
if content_hash not in user_hashes:
continue
meta = await database.load_item_metadata(content_hash, ctx.actor_id)
# Apply folder filter
if folder:
item_folder = meta.get("folder", "/")
if folder != "/" and not item_folder.startswith(folder):
continue
if folder == "/" and item_folder != "/":
continue
# Apply collection filter
if collection:
if collection not in meta.get("collections", []):
continue
# Apply tag filter
if tag:
if tag not in meta.get("tags", []):
continue
cache_items.append({
"hash": content_hash,
"size": cached_file.size_bytes,
"mtime": cached_file.created_at,
"meta": meta
})
# Sort by modification time (newest first)
cache_items.sort(key=lambda x: x["mtime"], reverse=True)
total = len(cache_items)
# Pagination
start = (page - 1) * limit
end = start + limit
items_page = cache_items[start:end]
has_more = end < total
if not items_page:
if page == 1:
filter_msg = ""
if folder:
filter_msg = f" in folder {folder}"
elif collection:
filter_msg = f" in collection '{collection}'"
elif tag:
filter_msg = f" with tag '{tag}'"
content = f'
No media{filter_msg}. Upload files or run effects to see them here.
'
else:
return HTMLResponse("") # Empty for infinite scroll
else:
html_parts = []
for item in items_page:
content_hash = item["hash"]
cache_path = get_cache_path(content_hash)
media_type = detect_media_type(cache_path) if cache_path else "unknown"
# Format size
size = item["size"]
if size > 1024*1024:
size_str = f"{size/(1024*1024):.1f} MB"
elif size > 1024:
size_str = f"{size/1024:.1f} KB"
else:
size_str = f"{size} bytes"
html_parts.append(f'''
')
# For infinite scroll, just return cards if not first page
if page > 1:
if has_more:
query_params = f"page={page + 1}"
if folder:
query_params += f"&folder={folder}"
if collection:
query_params += f"&collection={collection}"
if tag:
query_params += f"&tag={tag}"
html_parts.append(f'''
Loading more...
''')
return HTMLResponse('\n'.join(html_parts))
# First page - full content
infinite_scroll_trigger = ""
if has_more:
query_params = "page=2"
if folder:
query_params += f"&folder={folder}"
if collection:
query_params += f"&collection={collection}"
if tag:
query_params += f"&tag={tag}"
infinite_scroll_trigger = f'''
Loading more...
'''
content = f'''
Media ({total} items)
{''.join(html_parts)}
{infinite_scroll_trigger}
'''
return HTMLResponse(render_page("Media", content, ctx.actor_id, active_tab="media"))
# JSON response for APIs - list all hashes with optional pagination
all_hashes = [cf.content_hash for cf in cache_manager.list_all()]
total = len(all_hashes)
start = (page - 1) * limit
end = start + limit
hashes_page = all_hashes[start:end]
has_more = end < total
return {
"hashes": hashes_page,
"pagination": {
"page": page,
"limit": limit,
"total": total,
"has_more": has_more
}
}
@app.delete("/cache/{content_hash}")
async def discard_cache(content_hash: str, ctx: UserContext = Depends(get_required_user_context)):
"""
Discard (delete) a cached item.
Enforces deletion rules:
- Cannot delete items published to L2 (shared)
- Cannot delete inputs/outputs of activities (runs)
- Cannot delete pinned items
"""
# Check if content exists
if not cache_manager.has_content(content_hash):
raise HTTPException(404, "Content not found")
# Check ownership
user_hashes = await get_user_cache_hashes(ctx.username, ctx.actor_id)
if content_hash not in user_hashes:
raise HTTPException(403, "Access denied")
# Check if pinned
meta = await database.load_item_metadata(content_hash, ctx.actor_id)
if meta.get("pinned"):
pin_reason = meta.get("pin_reason", "unknown")
raise HTTPException(400, f"Cannot discard pinned item (reason: {pin_reason})")
# Check if used by any run (Redis runs, not just activity store)
runs_using = find_runs_using_content(content_hash)
if runs_using:
run, role = runs_using[0]
raise HTTPException(400, f"Cannot discard: item is {role} of run {run.run_id}")
# Check deletion rules via cache_manager (L2 shared status, activity store)
can_delete, reason = cache_manager.can_delete(content_hash)
if not can_delete:
raise HTTPException(400, f"Cannot discard: {reason}")
# Delete via cache_manager
success, msg = cache_manager.delete_by_content_hash(content_hash)
if not success:
# Fallback to legacy deletion
cache_path = get_cache_path(content_hash)
if cache_path and cache_path.exists():
cache_path.unlink()
# Clean up legacy metadata files
meta_path = CACHE_DIR / f"{content_hash}.meta.json"
if meta_path.exists():
meta_path.unlink()
mp4_path = CACHE_DIR / f"{content_hash}.mp4"
if mp4_path.exists():
mp4_path.unlink()
return {"discarded": True, "content_hash": content_hash}
@app.delete("/ui/cache/{content_hash}/discard", response_class=HTMLResponse)
async def ui_discard_cache(content_hash: str, request: Request):
"""HTMX handler: discard a cached item."""
ctx = get_user_context_from_cookie(request)
if not ctx:
return '
Login required
'
# Check ownership
user_hashes = await get_user_cache_hashes(ctx.username, ctx.actor_id)
if content_hash not in user_hashes:
return '
Access denied
'
# Check if content exists
if not cache_manager.has_content(content_hash):
return '
Content not found
'
# Check if pinned
meta = await database.load_item_metadata(content_hash, ctx.actor_id)
if meta.get("pinned"):
pin_reason = meta.get("pin_reason", "unknown")
return f'
Cannot discard: item is pinned ({pin_reason})
'
# Check if used by any run (Redis runs, not just activity store)
runs_using = find_runs_using_content(content_hash)
if runs_using:
run, role = runs_using[0]
return f'
Cannot discard: item is {role} of run {run.run_id}
'
# Check deletion rules via cache_manager (L2 shared status, activity store)
can_delete, reason = cache_manager.can_delete(content_hash)
if not can_delete:
return f'
Cannot discard: {reason}
'
# Delete via cache_manager
success, msg = cache_manager.delete_by_content_hash(content_hash)
if not success:
# Fallback to legacy deletion
cache_path = get_cache_path(content_hash)
if cache_path and cache_path.exists():
cache_path.unlink()
# Clean up legacy metadata files
meta_path = CACHE_DIR / f"{content_hash}.meta.json"
if meta_path.exists():
meta_path.unlink()
mp4_path = CACHE_DIR / f"{content_hash}.mp4"
if mp4_path.exists():
mp4_path.unlink()
return '''
'''
# Known assets (bootstrap data)
KNOWN_ASSETS = {
"cat": "33268b6e167deaf018cc538de12dbe562612b33e89a749391cef855b320a269b",
}
@app.get("/assets")
async def list_assets():
"""List known assets."""
return KNOWN_ASSETS
@app.post("/cache/import")
async def import_to_cache(path: str):
"""Import a local file to cache."""
source = Path(path)
if not source.exists():
raise HTTPException(404, f"File not found: {path}")
content_hash = cache_file(source)
return {"content_hash": content_hash, "cached": True}
def save_cache_meta(content_hash: str, uploader: str = None, filename: str = None, **updates):
"""Save or update metadata for a cached file."""
meta_path = CACHE_DIR / f"{content_hash}.meta.json"
# Load existing or create new
if meta_path.exists():
with open(meta_path) as f:
meta = json.load(f)
else:
meta = {
"uploader": uploader,
"uploaded_at": datetime.now(timezone.utc).isoformat(),
"filename": filename
}
# Apply updates (but never change uploader or uploaded_at)
for key, value in updates.items():
if key not in ("uploader", "uploaded_at"):
meta[key] = value
with open(meta_path, "w") as f:
json.dump(meta, f, indent=2)
return meta
def load_cache_meta(content_hash: str) -> dict:
"""Load metadata for a cached file."""
meta_path = CACHE_DIR / f"{content_hash}.meta.json"
if meta_path.exists():
with open(meta_path) as f:
return json.load(f)
return {}
# User data storage (folders, collections)
USER_DATA_DIR = CACHE_DIR / ".user-data"
def load_user_data(username: str) -> dict:
"""Load user's folders and collections."""
USER_DATA_DIR.mkdir(parents=True, exist_ok=True)
# Normalize username (remove @ prefix if present)
safe_name = username.replace("@", "").replace("/", "_")
user_file = USER_DATA_DIR / f"{safe_name}.json"
if user_file.exists():
with open(user_file) as f:
return json.load(f)
return {"folders": ["/"], "collections": []}
def save_user_data(username: str, data: dict):
"""Save user's folders and collections."""
USER_DATA_DIR.mkdir(parents=True, exist_ok=True)
safe_name = username.replace("@", "").replace("/", "_")
user_file = USER_DATA_DIR / f"{safe_name}.json"
with open(user_file, "w") as f:
json.dump(data, f, indent=2)
async def get_user_cache_hashes(username: str, actor_id: Optional[str] = None) -> set:
"""Get all cache hashes owned by or associated with a user.
username: The plain username
actor_id: The full actor ID (@user@server), if available
"""
# Match against both formats for backwards compatibility
match_values = [username]
if actor_id:
match_values.append(actor_id)
hashes = set()
# Query database for items owned by user (new system)
if actor_id:
try:
db_items = await database.get_user_items(actor_id)
for item in db_items:
hashes.add(item["content_hash"])
except Exception:
pass # Database may not be initialized
# Legacy: Files uploaded by user (JSON metadata)
if CACHE_DIR.exists():
for f in CACHE_DIR.iterdir():
if f.name.endswith('.meta.json'):
try:
meta_path = CACHE_DIR / f.name
if meta_path.exists():
import json
with open(meta_path, 'r') as mf:
meta = json.load(mf)
if meta.get("uploader") in match_values:
hashes.add(f.name.replace('.meta.json', ''))
except Exception:
pass
# Files from user's runs (inputs and outputs)
for run in list_all_runs():
if run.username in match_values:
hashes.update(run.inputs)
if run.output_hash:
hashes.add(run.output_hash)
return hashes
@app.post("/cache/upload")
async def upload_to_cache(file: UploadFile = File(...), ctx: UserContext = Depends(get_required_user_context)):
"""Upload a file to cache. Requires authentication."""
# Write to temp file first
import tempfile
with tempfile.NamedTemporaryFile(delete=False) as tmp:
content = await file.read()
tmp.write(content)
tmp_path = Path(tmp.name)
# Store in cache via cache_manager
cached, ipfs_cid = cache_manager.put(tmp_path, node_type="upload", move=True)
content_hash = cached.content_hash
# Save uploader metadata to database
await database.save_item_metadata(
content_hash=content_hash,
actor_id=ctx.actor_id,
item_type="media",
filename=file.filename
)
return {"content_hash": content_hash, "filename": file.filename, "size": len(content)}
class CacheMetaUpdate(BaseModel):
"""Request to update cache metadata."""
origin: Optional[dict] = None # {"type": "self"|"external", "url": "...", "note": "..."}
description: Optional[str] = None
tags: Optional[list[str]] = None
folder: Optional[str] = None
collections: Optional[list[str]] = None
class PublishRequest(BaseModel):
"""Request to publish a cache item to L2."""
asset_name: str
asset_type: str = "image" # image, video, etc.
@app.get("/cache/{content_hash}/meta")
async def get_cache_meta(content_hash: str, ctx: UserContext = Depends(get_required_user_context)):
"""Get metadata for a cached file."""
# Check file exists
cache_path = get_cache_path(content_hash)
if not cache_path:
raise HTTPException(404, "Content not found")
# Check ownership
user_hashes = await get_user_cache_hashes(ctx.username, ctx.actor_id)
if content_hash not in user_hashes:
raise HTTPException(403, "Access denied")
return await database.load_item_metadata(content_hash, ctx.actor_id)
@app.patch("/cache/{content_hash}/meta")
async def update_cache_meta(content_hash: str, update: CacheMetaUpdate, ctx: UserContext = Depends(get_required_user_context)):
"""Update metadata for a cached file."""
# Check file exists
cache_path = get_cache_path(content_hash)
if not cache_path:
raise HTTPException(404, "Content not found")
# Check ownership
user_hashes = await get_user_cache_hashes(ctx.username, ctx.actor_id)
if content_hash not in user_hashes:
raise HTTPException(403, "Access denied")
# Build update dict from non-None fields
updates = {}
if update.origin is not None:
updates["origin"] = update.origin
if update.description is not None:
updates["description"] = update.description
if update.tags is not None:
updates["tags"] = update.tags
if update.folder is not None:
# Ensure folder exists in user's folder list
user_data = load_user_data(ctx.username)
if update.folder not in user_data["folders"]:
raise HTTPException(400, f"Folder does not exist: {update.folder}")
updates["folder"] = update.folder
if update.collections is not None:
# Validate collections exist
user_data = load_user_data(ctx.username)
existing = {c["name"] for c in user_data["collections"]}
for col in update.collections:
if col not in existing:
raise HTTPException(400, f"Collection does not exist: {col}")
updates["collections"] = update.collections
meta = await database.update_item_metadata(content_hash, ctx.actor_id, **updates)
return meta
@app.post("/cache/{content_hash}/publish")
async def publish_cache_to_l2(
content_hash: str,
req: PublishRequest,
request: Request,
ctx: UserContext = Depends(get_required_user_context)
):
"""
Publish a cache item to L2 (ActivityPub).
Requires origin to be set in metadata before publishing.
"""
# Check file exists
cache_path = get_cache_path(content_hash)
if not cache_path:
raise HTTPException(404, "Content not found")
# Check ownership
user_hashes = await get_user_cache_hashes(ctx.username, ctx.actor_id)
if content_hash not in user_hashes:
raise HTTPException(403, "Access denied")
# Load metadata
meta = await database.load_item_metadata(content_hash, ctx.actor_id)
# Check origin is set
origin = meta.get("origin")
if not origin or "type" not in origin:
raise HTTPException(400, "Origin must be set before publishing. Use --origin self or --origin-url ")
# Get auth token to pass to L2
token = request.cookies.get("auth_token")
if not token:
# Try from header
auth_header = request.headers.get("Authorization", "")
if auth_header.startswith("Bearer "):
token = auth_header[7:]
if not token:
raise HTTPException(401, "Authentication token required")
# Call L2 publish-cache endpoint (use user's L2 server)
l2_server = ctx.l2_server
try:
resp = http_requests.post(
f"{l2_server}/assets/publish-cache",
headers={"Authorization": f"Bearer {token}"},
json={
"content_hash": content_hash,
"asset_name": req.asset_name,
"asset_type": req.asset_type,
"origin": origin,
"description": meta.get("description"),
"tags": meta.get("tags", []),
"metadata": {
"filename": meta.get("filename"),
"folder": meta.get("folder"),
"collections": meta.get("collections", [])
}
},
timeout=10
)
resp.raise_for_status()
l2_result = resp.json()
except http_requests.exceptions.HTTPError as e:
error_detail = ""
try:
error_detail = e.response.json().get("detail", str(e))
except Exception:
error_detail = str(e)
raise HTTPException(400, f"L2 publish failed: {error_detail}")
except Exception as e:
raise HTTPException(500, f"L2 publish failed: {e}")
# Update local metadata with publish status and pin
await database.save_l2_share(
content_hash=content_hash,
actor_id=ctx.actor_id,
l2_server=l2_server,
asset_name=req.asset_name,
content_type=req.asset_type
)
await database.update_item_metadata(
content_hash=content_hash,
actor_id=ctx.actor_id,
pinned=True,
pin_reason="published"
)
return {
"published": True,
"asset_name": req.asset_name,
"l2_result": l2_result
}
@app.patch("/cache/{content_hash}/republish")
async def republish_cache_to_l2(
content_hash: str,
request: Request,
ctx: UserContext = Depends(get_required_user_context)
):
"""
Re-publish (update) a cache item on L2 after metadata changes.
Only works for already-published items.
"""
# Check file exists
cache_path = get_cache_path(content_hash)
if not cache_path:
raise HTTPException(404, "Content not found")
# Check ownership
user_hashes = await get_user_cache_hashes(ctx.username, ctx.actor_id)
if content_hash not in user_hashes:
raise HTTPException(403, "Access denied")
# Load metadata
meta = await database.load_item_metadata(content_hash, ctx.actor_id)
l2_shares = meta.get("l2_shares", [])
# Find share for current L2 server (user's L2)
l2_server = ctx.l2_server
current_share = None
for share in l2_shares:
if share.get("l2_server") == l2_server:
current_share = share
break
if not current_share:
raise HTTPException(400, "Item not published yet. Use publish first.")
asset_name = current_share.get("asset_name")
if not asset_name:
raise HTTPException(400, "No asset name found in publish info")
# Get auth token
token = request.cookies.get("auth_token")
if not token:
auth_header = request.headers.get("Authorization", "")
if auth_header.startswith("Bearer "):
token = auth_header[7:]
if not token:
raise HTTPException(401, "Authentication token required")
# Call L2 update endpoint (use user's L2 server)
l2_server = ctx.l2_server
try:
resp = http_requests.patch(
f"{l2_server}/assets/{asset_name}",
headers={"Authorization": f"Bearer {token}"},
json={
"description": meta.get("description"),
"tags": meta.get("tags"),
"origin": meta.get("origin"),
"metadata": {
"filename": meta.get("filename"),
"folder": meta.get("folder"),
"collections": meta.get("collections", [])
}
},
timeout=10
)
resp.raise_for_status()
l2_result = resp.json()
except http_requests.exceptions.HTTPError as e:
error_detail = ""
try:
error_detail = e.response.json().get("detail", str(e))
except Exception:
error_detail = str(e)
raise HTTPException(400, f"L2 update failed: {error_detail}")
except Exception as e:
raise HTTPException(500, f"L2 update failed: {e}")
# Update local metadata - save_l2_share updates last_synced_at on conflict
await database.save_l2_share(
content_hash=content_hash,
actor_id=ctx.actor_id,
l2_server=l2_server,
asset_name=asset_name,
content_type=current_share.get("content_type", "media")
)
return {
"updated": True,
"asset_name": asset_name,
"l2_result": l2_result
}
# ============ Folder & Collection Management ============
@app.get("/user/folders")
async def list_folders(username: str = Depends(get_required_user)):
"""List user's folders."""
user_data = load_user_data(username)
return {"folders": user_data["folders"]}
@app.post("/user/folders")
async def create_folder(folder_path: str, username: str = Depends(get_required_user)):
"""Create a new folder."""
user_data = load_user_data(username)
# Validate path format
if not folder_path.startswith("/"):
raise HTTPException(400, "Folder path must start with /")
# Check parent exists
parent = "/".join(folder_path.rsplit("/", 1)[:-1]) or "/"
if parent != "/" and parent not in user_data["folders"]:
raise HTTPException(400, f"Parent folder does not exist: {parent}")
# Check doesn't already exist
if folder_path in user_data["folders"]:
raise HTTPException(400, f"Folder already exists: {folder_path}")
user_data["folders"].append(folder_path)
user_data["folders"].sort()
save_user_data(username, user_data)
return {"folder": folder_path, "created": True}
@app.delete("/user/folders")
async def delete_folder(folder_path: str, ctx: UserContext = Depends(get_required_user_context)):
"""Delete a folder (must be empty)."""
if folder_path == "/":
raise HTTPException(400, "Cannot delete root folder")
user_data = load_user_data(ctx.username)
if folder_path not in user_data["folders"]:
raise HTTPException(404, "Folder not found")
# Check no subfolders
for f in user_data["folders"]:
if f.startswith(folder_path + "/"):
raise HTTPException(400, f"Folder has subfolders: {f}")
# Check no items in folder
user_hashes = await get_user_cache_hashes(ctx.username, ctx.actor_id)
for h in user_hashes:
meta = await database.load_item_metadata(h, ctx.actor_id)
if meta.get("folder") == folder_path:
raise HTTPException(400, "Folder is not empty")
user_data["folders"].remove(folder_path)
save_user_data(ctx.username, user_data)
return {"folder": folder_path, "deleted": True}
@app.get("/user/collections")
async def list_collections(username: str = Depends(get_required_user)):
"""List user's collections."""
user_data = load_user_data(username)
return {"collections": user_data["collections"]}
@app.post("/user/collections")
async def create_collection(name: str, username: str = Depends(get_required_user)):
"""Create a new collection."""
user_data = load_user_data(username)
# Check doesn't already exist
for col in user_data["collections"]:
if col["name"] == name:
raise HTTPException(400, f"Collection already exists: {name}")
user_data["collections"].append({
"name": name,
"created_at": datetime.now(timezone.utc).isoformat()
})
save_user_data(username, user_data)
return {"collection": name, "created": True}
@app.delete("/user/collections")
async def delete_collection(name: str, ctx: UserContext = Depends(get_required_user_context)):
"""Delete a collection."""
user_data = load_user_data(ctx.username)
# Find and remove
for i, col in enumerate(user_data["collections"]):
if col["name"] == name:
user_data["collections"].pop(i)
save_user_data(ctx.username, user_data)
# Remove from all cache items
user_hashes = await get_user_cache_hashes(ctx.username, ctx.actor_id)
for h in user_hashes:
meta = await database.load_item_metadata(h, ctx.actor_id)
if name in meta.get("collections", []):
new_collections = [c for c in meta.get("collections", []) if c != name]
await database.update_item_metadata(h, ctx.actor_id, collections=new_collections)
return {"collection": name, "deleted": True}
raise HTTPException(404, "Collection not found")
def is_ios_request(request: Request) -> bool:
"""Check if request is from iOS device."""
ua = request.headers.get("user-agent", "").lower()
return "iphone" in ua or "ipad" in ua
def video_src_for_request(content_hash: str, request: Request) -> str:
"""Get video src URL, using MP4 endpoint for iOS."""
if is_ios_request(request):
return f"/cache/{content_hash}/mp4"
return f"/cache/{content_hash}"
def detect_media_type(cache_path: Path) -> str:
"""Detect if file is image or video based on magic bytes."""
with open(cache_path, "rb") as f:
header = f.read(32)
# Video signatures
if header[:4] == b'\x1a\x45\xdf\xa3': # WebM/MKV
return "video"
if header[4:8] == b'ftyp': # MP4/MOV
return "video"
if header[:4] == b'RIFF' and header[8:12] == b'AVI ': # AVI
return "video"
# Image signatures
if header[:8] == b'\x89PNG\r\n\x1a\n': # PNG
return "image"
if header[:2] == b'\xff\xd8': # JPEG
return "image"
if header[:6] in (b'GIF87a', b'GIF89a'): # GIF
return "image"
if header[:4] == b'RIFF' and header[8:12] == b'WEBP': # WebP
return "image"
return "unknown"
def get_user_context_from_cookie(request) -> Optional[UserContext]:
"""Get user context from auth cookie. Returns full context with actor_id and l2_server."""
token = request.cookies.get("auth_token")
if not token:
return None
return get_verified_user_context(token)
def get_user_from_cookie(request) -> Optional[str]:
"""Get username from auth cookie (backwards compat - prefer get_user_context_from_cookie)."""
ctx = get_user_context_from_cookie(request)
return ctx.username if ctx else None
def wants_html(request: Request) -> bool:
"""Check if request wants HTML (browser) vs JSON (API)."""
accept = request.headers.get("accept", "")
return "text/html" in accept and "application/json" not in accept
# Tailwind CSS config for all L1 templates
TAILWIND_CONFIG = '''
'''
def render_page(title: str, content: str, actor_id: Optional[str] = None, active_tab: str = None) -> str:
"""Render a page with nav bar and content. Used for clean URL pages.
actor_id: The user's actor ID (@user@server) or None if not logged in.
"""
user_info = ""
if actor_id:
# Extract username and domain from @username@domain format
parts = actor_id.lstrip("@").split("@")
username = parts[0] if parts else actor_id
domain = parts[1] if len(parts) > 1 else ""
l2_user_url = f"https://{domain}/users/{username}" if domain else "#"
user_info = f'''
"""
def render_ui_html(actor_id: Optional[str] = None, tab: str = "runs") -> str:
"""Render main UI HTML with optional user context.
actor_id: The user's actor ID (@user@server) or None if not logged in.
"""
user_info = ""
if actor_id:
# Extract username and domain from @username@domain format
parts = actor_id.lstrip("@").split("@")
username = parts[0] if parts else actor_id
domain = parts[1] if len(parts) > 1 else ""
l2_user_url = f"https://{domain}/users/{username}" if domain else "#"
user_info = f'''
"""
# Auth routes - L1 never handles credentials, redirects to L2
# L2 sets auth_token cookie with domain=.rose-ash.com for shared auth
L1_PUBLIC_URL = os.environ.get("L1_PUBLIC_URL", "http://localhost:8100")
@app.get("/login")
async def login_page():
"""Redirect to L2 server for login. L1 never handles credentials."""
# Redirect to default L2 with return URL so L2 can redirect back after login
return_url = f"{L1_PUBLIC_URL}/runs"
return RedirectResponse(url=f"{DEFAULT_L2_SERVER}/login?return_to={return_url}", status_code=302)
@app.get("/register")
async def register_page():
"""Redirect to L2 server for registration. L1 never handles credentials."""
return_url = f"{L1_PUBLIC_URL}/runs"
return RedirectResponse(url=f"{DEFAULT_L2_SERVER}/register?return_to={return_url}", status_code=302)
@app.get("/logout")
async def logout():
"""Logout - clear cookie and redirect to L2 logout."""
# Clear local cookie and redirect to L2 to clear shared cookie
response = RedirectResponse(url=f"{DEFAULT_L2_SERVER}/logout?return_to={L1_PUBLIC_URL}/", status_code=302)
response.delete_cookie("auth_token")
return response
@app.get("/ui")
async def ui_index(tab: str = "runs"):
"""Redirect /ui to clean URLs."""
if tab == "cache":
return RedirectResponse(url="/media", status_code=302)
return RedirectResponse(url="/runs", status_code=302)
@app.get("/ui/login")
async def ui_login_page():
"""Redirect to L2 login."""
return RedirectResponse(url="/login", status_code=302)
@app.get("/ui/register")
async def ui_register_page():
"""Redirect to L2 register."""
return RedirectResponse(url="/register", status_code=302)
@app.get("/ui/logout")
async def ui_logout():
"""Redirect to logout."""
return RedirectResponse(url="/logout", status_code=302)
@app.post("/ui/publish-run/{run_id}", response_class=HTMLResponse)
async def ui_publish_run(run_id: str, request: Request, output_name: str = Form(...)):
"""Publish a run to L2 from the web UI."""
ctx = get_user_context_from_cookie(request)
if not ctx:
return HTMLResponse('
Not logged in
')
token = request.cookies.get("auth_token")
if not token:
return HTMLResponse('
Not logged in
')
# Get the run to pin its output and inputs
run = load_run(run_id)
if not run:
return HTMLResponse('
Run not found
')
# Call L2 to publish the run, including this L1's public URL
# Longer timeout because L2 calls back to L1 to fetch run details
l2_server = ctx.l2_server
try:
resp = http_requests.post(
f"{l2_server}/assets/record-run",
json={"run_id": run_id, "output_name": output_name, "l1_server": L1_PUBLIC_URL},
headers={"Authorization": f"Bearer {token}"},
timeout=30
)
if resp.status_code == 400:
error = resp.json().get("detail", "Bad request")
return HTMLResponse(f'
Error: {error}
')
resp.raise_for_status()
result = resp.json()
# Pin the output
if run.output_hash:
await database.update_item_metadata(run.output_hash, ctx.actor_id, pinned=True, pin_reason="published")
# Pin the inputs (for provenance)
for input_hash in run.inputs:
await database.update_item_metadata(input_hash, ctx.actor_id, pinned=True, pin_reason="input_to_published")
# If this was a recipe-based run, pin the recipe and its fixed inputs
if run.recipe.startswith("recipe:"):
config_name = run.recipe.replace("recipe:", "")
for recipe in list_all_recipes():
if recipe.name == config_name:
# Pin the recipe YAML
cache_manager.pin(recipe.recipe_id, reason="recipe_for_published")
# Pin all fixed inputs referenced by the recipe
for fixed in recipe.fixed_inputs:
if fixed.content_hash:
cache_manager.pin(fixed.content_hash, reason="fixed_input_in_published_recipe")
break
return HTMLResponse(f'''
')
@app.get("/ui/runs", response_class=HTMLResponse)
async def ui_runs(request: Request):
"""HTMX partial: list of runs."""
ctx = get_user_context_from_cookie(request)
runs = list_all_runs()
# Require login to see runs
if not ctx:
return '
'
# Filter runs by user - match both plain username and ActivityPub format (@user@domain)
runs = [r for r in runs if r.username in (ctx.username, ctx.actor_id)]
if not runs:
return '
'
# Get hashes owned by/associated with this user
user_hashes = await get_user_cache_hashes(ctx.username, ctx.actor_id)
# Get cache items that belong to the user (from cache_manager)
cache_items = []
for cached_file in cache_manager.list_all():
content_hash = cached_file.content_hash
if content_hash not in user_hashes:
continue
# Load metadata for filtering
meta = await database.load_item_metadata(content_hash, ctx.actor_id)
# Apply folder filter
if folder:
item_folder = meta.get("folder", "/")
if folder != "/" and not item_folder.startswith(folder):
continue
if folder == "/" and item_folder != "/":
continue
# Apply collection filter
if collection:
if collection not in meta.get("collections", []):
continue
# Apply tag filter
if tag:
if tag not in meta.get("tags", []):
continue
cache_items.append({
"hash": content_hash,
"size": cached_file.size_bytes,
"mtime": cached_file.created_at,
"meta": meta
})
# Sort by modification time (newest first)
cache_items.sort(key=lambda x: x["mtime"], reverse=True)
if not cache_items:
filter_msg = ""
if folder:
filter_msg = f" in folder {folder}"
elif collection:
filter_msg = f" in collection '{collection}'"
elif tag:
filter_msg = f" with tag '{tag}'"
return f'
No cached files{filter_msg}. Upload files or run effects to see them here.
'
html_parts = ['
']
for item in cache_items[:50]: # Limit to 50 items
content_hash = item["hash"]
cache_path = get_cache_path(content_hash)
media_type = detect_media_type(cache_path) if cache_path else "unknown"
# Check IPFS status
cache_item = await database.get_cache_item(content_hash)
ipfs_cid = cache_item.get("ipfs_cid") if cache_item else None
ipfs_badge = 'IPFS' if ipfs_cid else ''
# Format size
size = item["size"]
if size > 1024*1024:
size_str = f"{size/(1024*1024):.1f} MB"
elif size > 1024:
size_str = f"{size/1024:.1f} KB"
else:
size_str = f"{size} bytes"
html_parts.append(f'''