"""
@app.get("/", response_class=HTMLResponse)
async def root(request: Request):
"""Home page."""
ctx = await get_user_context_from_cookie(request)
actor_id = ctx.actor_id if ctx else None
return render_home_html(actor_id)
@app.post("/runs", response_model=RunStatus)
async def create_run(request: RunRequest, ctx: UserContext = Depends(get_required_user_context)):
"""Start a new rendering run. Checks cache before executing."""
# Compute content-addressable run_id
run_id = compute_run_id(request.inputs, request.recipe)
# Generate output name if not provided
output_name = request.output_name or f"{request.recipe}-{run_id[:8]}"
# Use actor_id from user context
actor_id = ctx.actor_id
# Check L1 cache first
cached_run = await database.get_run_cache(run_id)
if cached_run:
output_hash = cached_run["output_hash"]
# Verify the output file still exists in cache
if cache_manager.has_content(output_hash):
logger.info(f"create_run: Cache hit for run_id={run_id[:16]}... output={output_hash[:16]}...")
return RunStatus(
run_id=run_id,
status="completed",
recipe=request.recipe,
inputs=request.inputs,
output_name=output_name,
created_at=cached_run.get("created_at", datetime.now(timezone.utc).isoformat()),
completed_at=cached_run.get("created_at", datetime.now(timezone.utc).isoformat()),
output_hash=output_hash,
username=actor_id,
provenance_cid=cached_run.get("provenance_cid"),
)
else:
logger.info(f"create_run: Cache entry exists but output missing, will re-run")
# Check L2 if not in L1
l2_server = ctx.l2_server
try:
l2_resp = http_requests.get(
f"{l2_server}/assets/by-run-id/{run_id}",
timeout=10
)
if l2_resp.status_code == 200:
l2_data = l2_resp.json()
output_hash = l2_data.get("output_hash")
ipfs_cid = l2_data.get("ipfs_cid")
if output_hash and ipfs_cid:
logger.info(f"create_run: Found on L2, pulling from IPFS: {ipfs_cid}")
# Pull from IPFS to L1 cache
import ipfs_client
legacy_dir = CACHE_DIR / "legacy"
legacy_dir.mkdir(parents=True, exist_ok=True)
recovery_path = legacy_dir / output_hash
if ipfs_client.get_file(ipfs_cid, str(recovery_path)):
# File retrieved - put() updates indexes, but file is already in legacy location
# Just update the content and IPFS indexes manually
cache_manager._set_content_index(output_hash, output_hash)
cache_manager._set_ipfs_index(output_hash, ipfs_cid)
# Save to run cache
await database.save_run_cache(
run_id=run_id,
output_hash=output_hash,
recipe=request.recipe,
inputs=request.inputs,
ipfs_cid=ipfs_cid,
provenance_cid=l2_data.get("provenance_cid"),
actor_id=actor_id,
)
logger.info(f"create_run: Recovered from L2/IPFS: {output_hash[:16]}...")
return RunStatus(
run_id=run_id,
status="completed",
recipe=request.recipe,
inputs=request.inputs,
output_name=output_name,
created_at=datetime.now(timezone.utc).isoformat(),
completed_at=datetime.now(timezone.utc).isoformat(),
output_hash=output_hash,
username=actor_id,
provenance_cid=l2_data.get("provenance_cid"),
)
except Exception as e:
logger.warning(f"create_run: L2 lookup failed (will run Celery): {e}")
# Not cached anywhere - create run record and submit to Celery
run = RunStatus(
run_id=run_id,
status="pending",
recipe=request.recipe,
inputs=request.inputs,
output_name=output_name,
created_at=datetime.now(timezone.utc).isoformat(),
username=actor_id
)
# Submit to Celery
if request.use_dag or request.recipe == "dag":
# DAG mode - use artdag engine
if request.dag_json:
# Custom DAG provided
dag_json = request.dag_json
else:
# Build simple effect DAG from recipe and inputs
dag = build_effect_dag(request.inputs, request.recipe)
dag_json = dag.to_json()
task = execute_dag.delay(dag_json, run.run_id)
else:
# Legacy mode - single effect
if len(request.inputs) != 1:
raise HTTPException(400, "Legacy mode only supports single-input recipes. Use use_dag=true for multi-input.")
input_hash = request.inputs[0]
task = render_effect.delay(input_hash, request.recipe, output_name)
run.celery_task_id = task.id
run.status = "running"
await asyncio.to_thread(save_run, run)
return run
def _check_celery_task_sync(task_id: str) -> tuple[bool, bool, Optional[dict], Optional[str]]:
"""Check Celery task status synchronously. Returns (is_ready, is_successful, result, error)."""
task = celery_app.AsyncResult(task_id)
if not task.ready():
return (False, False, None, None)
if task.successful():
return (True, True, task.result, None)
else:
return (True, False, None, str(task.result))
@app.get("/runs/{run_id}", response_model=RunStatus)
async def get_run(run_id: str):
"""Get status of a run."""
start = time.time()
logger.info(f"get_run: Starting for {run_id}")
t0 = time.time()
run = await asyncio.to_thread(load_run, run_id)
logger.info(f"get_run: load_run took {time.time()-t0:.3f}s, status={run.status if run else 'None'}")
if not run:
raise HTTPException(404, f"Run {run_id} not found")
# Check Celery task status if running
if run.status == "running" and run.celery_task_id:
t0 = time.time()
is_ready, is_successful, result, error = await asyncio.to_thread(
_check_celery_task_sync, run.celery_task_id
)
logger.info(f"get_run: Celery check took {time.time()-t0:.3f}s, ready={is_ready}")
if is_ready:
if is_successful:
run.status = "completed"
run.completed_at = datetime.now(timezone.utc).isoformat()
# Handle both legacy (render_effect) and new (execute_dag) result formats
if "output_hash" in result:
# New DAG result format
run.output_hash = result.get("output_hash")
run.provenance_cid = result.get("provenance_cid")
output_path = Path(result.get("output_path", "")) if result.get("output_path") else None
else:
# Legacy render_effect format
run.output_hash = result.get("output", {}).get("content_hash")
run.provenance_cid = result.get("provenance_cid")
output_path = Path(result.get("output", {}).get("local_path", ""))
# Extract effects info from provenance (legacy only)
effects = result.get("effects", [])
if effects:
run.effects_commit = effects[0].get("repo_commit")
run.effect_url = effects[0].get("repo_url")
# Extract infrastructure info (legacy only)
run.infrastructure = result.get("infrastructure")
# Cache the output (legacy mode - DAG already caches via cache_manager)
if output_path and output_path.exists() and "output_hash" not in result:
t0 = time.time()
await cache_file(output_path, node_type="effect_output")
logger.info(f"get_run: cache_file took {time.time()-t0:.3f}s")
# Record activity for deletion tracking (legacy mode)
if run.output_hash and run.inputs:
await asyncio.to_thread(
cache_manager.record_simple_activity,
input_hashes=run.inputs,
output_hash=run.output_hash,
run_id=run.run_id,
)
# Save to run cache for content-addressable lookup
if run.output_hash:
ipfs_cid = cache_manager._get_ipfs_cid_from_index(run.output_hash)
await database.save_run_cache(
run_id=run.run_id,
output_hash=run.output_hash,
recipe=run.recipe,
inputs=run.inputs,
ipfs_cid=ipfs_cid,
provenance_cid=run.provenance_cid,
actor_id=run.username,
)
logger.info(f"get_run: Saved run cache for {run.run_id[:16]}...")
else:
run.status = "failed"
run.error = error
# Save updated status
t0 = time.time()
await asyncio.to_thread(save_run, run)
logger.info(f"get_run: save_run took {time.time()-t0:.3f}s")
logger.info(f"get_run: Total time {time.time()-start:.3f}s")
return run
@app.delete("/runs/{run_id}")
async def discard_run(run_id: str, ctx: UserContext = Depends(get_required_user_context)):
"""
Discard (delete) a run and its outputs.
Enforces deletion rules:
- Cannot discard if output is published to L2 (pinned)
- Deletes outputs and intermediate cache entries
- Preserves inputs (cache items and recipes are NOT deleted)
"""
run = await asyncio.to_thread(load_run, run_id)
if not run:
raise HTTPException(404, f"Run {run_id} not found")
# Check ownership
if run.username not in (ctx.username, ctx.actor_id):
raise HTTPException(403, "Access denied")
# Failed runs can always be deleted (no output to protect)
if run.status != "failed":
# Only check if output is pinned - inputs are preserved, not deleted
if run.output_hash:
meta = await database.load_item_metadata(run.output_hash, ctx.actor_id)
if meta.get("pinned"):
pin_reason = meta.get("pin_reason", "published")
raise HTTPException(400, f"Cannot discard run: output {run.output_hash[:16]}... is pinned ({pin_reason})")
# Check if activity exists for this run
activity = await asyncio.to_thread(cache_manager.get_activity, run_id)
if activity:
# Discard the activity - only delete outputs, preserve inputs
success, msg = await asyncio.to_thread(cache_manager.discard_activity_outputs_only, run_id)
if not success:
raise HTTPException(400, f"Cannot discard run: {msg}")
# Remove from Redis
await asyncio.to_thread(redis_client.delete, f"{RUNS_KEY_PREFIX}{run_id}")
return {"discarded": True, "run_id": run_id}
@app.delete("/ui/runs/{run_id}/discard", response_class=HTMLResponse)
async def ui_discard_run(run_id: str, request: Request):
"""HTMX handler: discard a run. Only deletes outputs, preserves inputs."""
ctx = await get_user_context_from_cookie(request)
if not ctx:
return '
Login required
'
run = await asyncio.to_thread(load_run, run_id)
if not run:
return '
Run not found
'
# Check ownership
if run.username not in (ctx.username, ctx.actor_id):
return '
Access denied
'
# Failed runs can always be deleted
if run.status != "failed":
# Only check if output is pinned - inputs are preserved, not deleted
if run.output_hash:
meta = await database.load_item_metadata(run.output_hash, ctx.actor_id)
if meta.get("pinned"):
pin_reason = meta.get("pin_reason", "published")
return f'
Cannot discard: output is pinned ({pin_reason})
'
# Check if activity exists for this run
activity = await asyncio.to_thread(cache_manager.get_activity, run_id)
if activity:
# Discard the activity - only delete outputs, preserve inputs
success, msg = await asyncio.to_thread(cache_manager.discard_activity_outputs_only, run_id)
if not success:
return f'
Cannot discard: {msg}
'
# Remove from Redis
await asyncio.to_thread(redis_client.delete, f"{RUNS_KEY_PREFIX}{run_id}")
return '''
'''
@app.get("/run/{run_id}")
async def run_detail(run_id: str, request: Request):
"""Run detail. HTML for browsers, JSON for APIs."""
run = await asyncio.to_thread(load_run, run_id)
if not run:
if wants_html(request):
content = f'
Run not found: {run_id}
'
return HTMLResponse(render_page("Not Found", content, None, active_tab="runs"), status_code=404)
raise HTTPException(404, f"Run {run_id} not found")
# Check Celery task status if running
if run.status == "running" and run.celery_task_id:
is_ready, is_successful, result, error = await asyncio.to_thread(
_check_celery_task_sync, run.celery_task_id
)
if is_ready:
if is_successful:
run.status = "completed"
run.completed_at = datetime.now(timezone.utc).isoformat()
run.output_hash = result.get("output", {}).get("content_hash")
effects = result.get("effects", [])
if effects:
run.effects_commit = effects[0].get("repo_commit")
run.effect_url = effects[0].get("repo_url")
run.infrastructure = result.get("infrastructure")
output_path = Path(result.get("output", {}).get("local_path", ""))
if output_path.exists():
await cache_file(output_path)
# Save to run cache for content-addressable lookup
if run.output_hash:
ipfs_cid = cache_manager._get_ipfs_cid_from_index(run.output_hash)
await database.save_run_cache(
run_id=run.run_id,
output_hash=run.output_hash,
recipe=run.recipe,
inputs=run.inputs,
ipfs_cid=ipfs_cid,
provenance_cid=run.provenance_cid,
actor_id=run.username,
)
else:
run.status = "failed"
run.error = error
await asyncio.to_thread(save_run, run)
if wants_html(request):
ctx = await get_user_context_from_cookie(request)
if not ctx:
content = '
Not logged in.
'
return HTMLResponse(render_page("Login Required", content, None, active_tab="runs"), status_code=401)
# Check user owns this run
if run.username not in (ctx.username, ctx.actor_id):
content = '
Access denied.
'
return HTMLResponse(render_page("Access Denied", content, ctx.actor_id, active_tab="runs"), status_code=403)
# Build effect URL
if run.effect_url:
effect_url = run.effect_url
elif run.effects_commit and run.effects_commit != "unknown":
effect_url = f"https://git.rose-ash.com/art-dag/effects/src/commit/{run.effects_commit}/{run.recipe}"
else:
effect_url = f"https://git.rose-ash.com/art-dag/effects/src/branch/main/{run.recipe}"
# Status badge colors
status_colors = {
"completed": "bg-green-600 text-white",
"running": "bg-yellow-600 text-white",
"failed": "bg-red-600 text-white",
"pending": "bg-gray-600 text-white"
}
status_badge = status_colors.get(run.status, "bg-gray-600 text-white")
# Build media HTML for input/output
media_html = ""
has_input = run.inputs and cache_manager.has_content(run.inputs[0])
has_output = run.status == "completed" and run.output_hash and cache_manager.has_content(run.output_hash)
if has_input or has_output:
media_html = '
'''
return HTMLResponse(render_page(f"Run: {run.recipe}", content, ctx.actor_id, active_tab="runs"))
# JSON response
return run.model_dump()
@app.get("/runs")
async def list_runs(request: Request, page: int = 1, limit: int = 20):
"""List runs. HTML for browsers (with infinite scroll), JSON for APIs (with pagination)."""
ctx = await get_user_context_from_cookie(request)
all_runs = await asyncio.to_thread(list_all_runs)
total = len(all_runs)
# Filter by user if logged in for HTML
if wants_html(request) and ctx:
all_runs = [r for r in all_runs if r.username in (ctx.username, ctx.actor_id)]
total = len(all_runs)
# Pagination
start = (page - 1) * limit
end = start + limit
runs_page = all_runs[start:end]
has_more = end < total
if wants_html(request):
if not ctx:
content = '
Not logged in.
'
return HTMLResponse(render_page("Runs", content, None, active_tab="runs"))
if not runs_page:
if page == 1:
content = '
You have no runs yet. Use the CLI to start a run.
'
else:
return HTMLResponse("") # Empty for infinite scroll
else:
# Status badge colors
status_colors = {
"completed": "bg-green-600 text-white",
"running": "bg-yellow-600 text-white",
"failed": "bg-red-600 text-white",
"pending": "bg-gray-600 text-white"
}
html_parts = []
for run in runs_page:
status_badge = status_colors.get(run.status, "bg-gray-600 text-white")
html_parts.append(f'''
')
# For infinite scroll, just return cards if not first page
if page > 1:
if has_more:
html_parts.append(f'''
Loading more...
''')
return HTMLResponse('\n'.join(html_parts))
# First page - full content
infinite_scroll_trigger = ""
if has_more:
infinite_scroll_trigger = f'''
Loading more...
'''
content = f'''
Runs ({total} total)
{''.join(html_parts)}
{infinite_scroll_trigger}
'''
return HTMLResponse(render_page("Runs", content, ctx.actor_id, active_tab="runs"))
# JSON response for APIs
return {
"runs": [r.model_dump() for r in runs_page],
"pagination": {
"page": page,
"limit": limit,
"total": total,
"has_more": has_more
}
}
# ============ Recipe Endpoints ============
@app.post("/recipes/upload")
async def upload_recipe(file: UploadFile = File(...), ctx: UserContext = Depends(get_required_user_context)):
"""Upload a recipe YAML file. Requires authentication."""
import tempfile
# Read file content
content = await file.read()
try:
yaml_content = content.decode('utf-8')
except UnicodeDecodeError:
raise HTTPException(400, "Recipe file must be valid UTF-8 text")
# Validate YAML
try:
yaml.safe_load(yaml_content)
except yaml.YAMLError as e:
raise HTTPException(400, f"Invalid YAML: {e}")
# Store YAML file in cache
with tempfile.NamedTemporaryFile(delete=False, suffix=".yaml") as tmp:
tmp.write(content)
tmp_path = Path(tmp.name)
cached, ipfs_cid = cache_manager.put(tmp_path, node_type="recipe", move=True)
recipe_hash = cached.content_hash
# Parse and save metadata
actor_id = ctx.actor_id
try:
recipe_status = parse_recipe_yaml(yaml_content, recipe_hash, actor_id)
except Exception as e:
raise HTTPException(400, f"Failed to parse recipe: {e}")
await asyncio.to_thread(save_recipe, recipe_status)
# Save cache metadata to database
await database.save_item_metadata(
content_hash=recipe_hash,
actor_id=actor_id,
item_type="recipe",
filename=file.filename,
description=recipe_status.name # Use recipe name as description
)
return {
"recipe_id": recipe_hash,
"name": recipe_status.name,
"version": recipe_status.version,
"variable_inputs": len(recipe_status.variable_inputs),
"fixed_inputs": len(recipe_status.fixed_inputs)
}
@app.get("/recipes")
async def list_recipes_api(request: Request, page: int = 1, limit: int = 20):
"""List recipes. HTML for browsers, JSON for APIs."""
ctx = await get_user_context_from_cookie(request)
all_recipes = await asyncio.to_thread(list_all_recipes)
if wants_html(request):
# HTML response
if not ctx:
return HTMLResponse(render_page(
"Recipes",
'
Not logged in.
',
None,
active_tab="recipes"
))
# Filter to user's recipes
user_recipes = [c for c in all_recipes if c.uploader in (ctx.username, ctx.actor_id)]
total = len(user_recipes)
if not user_recipes:
content = '''
Recipes (0)
No recipes yet. Upload a recipe YAML file to get started.
'''
return HTMLResponse(render_page("Recipes", content, ctx.actor_id, active_tab="recipes"))
html_parts = []
for recipe in user_recipes:
var_count = len(recipe.variable_inputs)
fixed_count = len(recipe.fixed_inputs)
input_info = []
if var_count:
input_info.append(f"{var_count} variable")
if fixed_count:
input_info.append(f"{fixed_count} fixed")
inputs_str = ", ".join(input_info) if input_info else "no inputs"
html_parts.append(f'''
'''
return HTMLResponse(render_page(f"Recipe: {recipe.name}", content, ctx.actor_id if ctx else None, active_tab="recipes"))
@app.post("/ui/recipes/{recipe_id}/run", response_class=HTMLResponse)
async def ui_run_recipe(recipe_id: str, request: Request):
"""HTMX handler: run a recipe with form inputs."""
ctx = await get_user_context_from_cookie(request)
if not ctx:
return '
Login required
'
recipe = load_recipe(recipe_id)
if not recipe:
return '
Recipe not found
'
# Parse form data
form_data = await request.form()
inputs = {}
for var_input in recipe.variable_inputs:
value = form_data.get(var_input.node_id, "").strip()
if var_input.required and not value:
return f'
Missing required input: {var_input.name}
'
if value:
inputs[var_input.node_id] = value
# Load recipe YAML
recipe_path = cache_manager.get_by_content_hash(recipe_id)
if not recipe_path:
return '
Recipe YAML not found in cache
'
try:
with open(recipe_path) as f:
yaml_config = yaml.safe_load(f)
# Build DAG from recipe
dag = build_dag_from_recipe(yaml_config, inputs, recipe)
# Create run
run_id = str(uuid.uuid4())
actor_id = ctx.actor_id
# Collect all input hashes
all_inputs = list(inputs.values())
for fixed in recipe.fixed_inputs:
if fixed.content_hash:
all_inputs.append(fixed.content_hash)
run = RunStatus(
run_id=run_id,
status="pending",
recipe=f"recipe:{recipe.name}",
inputs=all_inputs,
output_name=f"{recipe.name}-{run_id[:8]}",
created_at=datetime.now(timezone.utc).isoformat(),
username=actor_id
)
# Submit to Celery
dag_json = dag.to_json()
task = execute_dag.delay(dag_json, run.run_id)
run.celery_task_id = task.id
run.status = "running"
save_run(run)
return f'''
'
@app.get("/ui/recipes-list", response_class=HTMLResponse)
async def ui_recipes_list(request: Request):
"""HTMX partial: list of recipes."""
ctx = await get_user_context_from_cookie(request)
if not ctx:
return '
Not logged in.
'
all_recipes = list_all_recipes()
# Filter to user's recipes
user_recipes = [c for c in all_recipes if c.uploader in (ctx.username, ctx.actor_id)]
if not user_recipes:
return '
No recipes yet. Upload a recipe YAML file to get started.
'
html_parts = ['
']
for recipe in user_recipes:
var_count = len(recipe.variable_inputs)
fixed_count = len(recipe.fixed_inputs)
input_info = []
if var_count:
input_info.append(f"{var_count} variable")
if fixed_count:
input_info.append(f"{fixed_count} fixed")
inputs_str = ", ".join(input_info) if input_info else "no inputs"
html_parts.append(f'''
'''
@app.get("/cache/{content_hash}")
async def get_cached(content_hash: str, request: Request):
"""Get cached content by hash. Content negotiation: HTML for browsers, JSON for APIs, file for downloads."""
start = time.time()
accept = request.headers.get("accept", "")
logger.info(f"get_cached: {content_hash[:16]}... Accept={accept[:50]}")
ctx = await get_user_context_from_cookie(request)
cache_path = get_cache_path(content_hash)
if not cache_path:
logger.info(f"get_cached: Not found, took {time.time()-start:.3f}s")
if wants_html(request):
content = f'
Content not found: {content_hash}
'
return HTMLResponse(render_page("Not Found", content, ctx.actor_id if ctx else None, active_tab="media"), status_code=404)
raise HTTPException(404, f"Content {content_hash} not in cache")
# JSON response only if explicitly requested
if "application/json" in accept and "text/html" not in accept:
t0 = time.time()
meta = await database.load_item_metadata(content_hash, ctx.actor_id if ctx else None)
logger.debug(f"get_cached: load_item_metadata took {time.time()-t0:.3f}s")
t0 = time.time()
cache_item = await database.get_cache_item(content_hash)
logger.debug(f"get_cached: get_cache_item took {time.time()-t0:.3f}s")
ipfs_cid = cache_item.get("ipfs_cid") if cache_item else None
file_size = cache_path.stat().st_size
# Use stored type from metadata, fall back to auto-detection
stored_type = meta.get("type") if meta else None
if stored_type == "recipe":
media_type = "recipe"
else:
media_type = detect_media_type(cache_path)
logger.info(f"get_cached: JSON response, ipfs_cid={ipfs_cid[:16] if ipfs_cid else 'None'}..., took {time.time()-start:.3f}s")
return {
"content_hash": content_hash,
"size": file_size,
"media_type": media_type,
"ipfs_cid": ipfs_cid,
"meta": meta
}
# HTML response for browsers (default for all non-JSON requests)
# Raw data is only served from /cache/{hash}/raw endpoint
if True: # Always show HTML page, raw data via /raw endpoint
if not ctx:
content = '
Not logged in.
'
return HTMLResponse(render_page("Login Required", content, None, active_tab="media"), status_code=401)
# Check user has access
user_hashes = await get_user_cache_hashes(ctx.username, ctx.actor_id)
if content_hash not in user_hashes:
content = '
Access denied.
'
return HTMLResponse(render_page("Access Denied", content, ctx.actor_id, active_tab="media"), status_code=403)
media_type = detect_media_type(cache_path)
file_size = cache_path.stat().st_size
size_str = f"{file_size:,} bytes"
if file_size > 1024*1024:
size_str = f"{file_size/(1024*1024):.1f} MB"
elif file_size > 1024:
size_str = f"{file_size/1024:.1f} KB"
# Get IPFS CID from database
cache_item = await database.get_cache_item(content_hash)
ipfs_cid = cache_item.get("ipfs_cid") if cache_item else None
# Build media display HTML
if media_type == "video":
video_src = video_src_for_request(content_hash, request)
media_html = f''
elif media_type == "image":
media_html = f''
else:
media_html = f'
Published to L2 ({len(l2_shares)} share{"s" if len(l2_shares) != 1 else ""})
{shares_html}
'''
else:
# Show publish form only if origin is set
if origin_type:
publish_html = f'''
'''
else:
publish_html = '''
Set an origin (self or external URL) before publishing.
'''
return f'''
Metadata
Publish to L2 (ActivityPub)
{publish_html}
Status
Pinned:
{'Yes' if pinned else 'No'}
{f'({pin_reason})' if pinned and pin_reason else ''}
Pinned items cannot be discarded. Items are pinned when published or used as inputs to published content.
{'
Cannot discard pinned items.
' if pinned else f"""
"""}
'''
@app.patch("/ui/cache/{content_hash}/meta", response_class=HTMLResponse)
async def ui_update_cache_meta(content_hash: str, request: Request):
"""HTMX handler: update cache metadata from form."""
ctx = await get_user_context_from_cookie(request)
if not ctx:
return '
Login required
'
# Check ownership
user_hashes = await get_user_cache_hashes(ctx.username, ctx.actor_id)
if content_hash not in user_hashes:
return '
Access denied
'
# Parse form data
form = await request.form()
origin_type = form.get("origin_type", "")
origin_url = form.get("origin_url", "").strip()
origin_note = form.get("origin_note", "").strip()
description = form.get("description", "").strip()
tags_str = form.get("tags", "").strip()
# Build origin
source_type = None
if origin_type == "self":
source_type = "self"
elif origin_type == "external":
if not origin_url:
return '
External origin requires a URL
'
source_type = "external"
# Parse tags
tags = [t.strip() for t in tags_str.split(",") if t.strip()] if tags_str else []
# Save to database
await database.update_item_metadata(
content_hash=content_hash,
actor_id=ctx.actor_id,
item_type="media",
description=description if description else None,
source_type=source_type,
source_url=origin_url if origin_url else None,
source_note=origin_note if origin_note else None,
tags=tags
)
return '
Metadata saved!
'
@app.post("/ui/cache/{content_hash}/publish", response_class=HTMLResponse)
async def ui_publish_cache(content_hash: str, request: Request):
"""HTMX handler: publish cache item to L2."""
ctx = await get_user_context_from_cookie(request)
if not ctx:
return '
Login required
'
token = request.cookies.get("auth_token")
if not token:
return '
Auth token required
'
# Check ownership
user_hashes = await get_user_cache_hashes(ctx.username, ctx.actor_id)
if content_hash not in user_hashes:
return '
Access denied
'
# Parse form
form = await request.form()
asset_name = form.get("asset_name", "").strip()
asset_type = form.get("asset_type", "image")
if not asset_name:
return '
Asset name required
'
# Load metadata from database
meta = await database.load_item_metadata(content_hash, ctx.actor_id)
origin = meta.get("origin")
if not origin or "type" not in origin:
return '
'''
@app.patch("/ui/cache/{content_hash}/republish", response_class=HTMLResponse)
async def ui_republish_cache(content_hash: str, request: Request):
"""HTMX handler: re-publish (update) cache item on L2."""
ctx = await get_user_context_from_cookie(request)
if not ctx:
return '
Login required
'
token = request.cookies.get("auth_token")
if not token:
return '
Auth token required
'
# Check ownership
user_hashes = await get_user_cache_hashes(ctx.username, ctx.actor_id)
if content_hash not in user_hashes:
return '
Access denied
'
# Load metadata
meta = await database.load_item_metadata(content_hash, ctx.actor_id)
l2_shares = meta.get("l2_shares", [])
# Find share for current L2 server (user's L2)
l2_server = ctx.l2_server
current_share = None
share_index = -1
for i, share in enumerate(l2_shares):
if share.get("l2_server") == l2_server:
current_share = share
share_index = i
break
if not current_share:
return '
Item not published to this L2 yet
'
asset_name = current_share.get("asset_name")
if not asset_name:
return '
'
@app.get("/media")
async def list_media(
request: Request,
page: int = 1,
limit: int = 20,
folder: Optional[str] = None,
collection: Optional[str] = None,
tag: Optional[str] = None
):
"""List media items. HTML for browsers (with infinite scroll), JSON for APIs (with pagination)."""
ctx = await get_user_context_from_cookie(request)
if wants_html(request):
# Require login for HTML media view
if not ctx:
content = '
Not logged in.
'
return HTMLResponse(render_page("Media", content, None, active_tab="media"))
# Get hashes owned by/associated with this user
user_hashes = await get_user_cache_hashes(ctx.username, ctx.actor_id)
# Get cache items that belong to the user (from cache_manager)
cache_items = []
seen_hashes = set() # Deduplicate by content_hash
for cached_file in cache_manager.list_all():
content_hash = cached_file.content_hash
if content_hash not in user_hashes:
continue
# Skip duplicates (same content from multiple runs)
if content_hash in seen_hashes:
continue
seen_hashes.add(content_hash)
# Skip recipes - they have their own section
if cached_file.node_type == "recipe":
continue
meta = await database.load_item_metadata(content_hash, ctx.actor_id)
# Apply folder filter
if folder:
item_folder = meta.get("folder", "/")
if folder != "/" and not item_folder.startswith(folder):
continue
if folder == "/" and item_folder != "/":
continue
# Apply collection filter
if collection:
if collection not in meta.get("collections", []):
continue
# Apply tag filter
if tag:
if tag not in meta.get("tags", []):
continue
cache_items.append({
"hash": content_hash,
"size": cached_file.size_bytes,
"mtime": cached_file.created_at,
"meta": meta
})
# Sort by modification time (newest first)
cache_items.sort(key=lambda x: x["mtime"], reverse=True)
total = len(cache_items)
# Pagination
start = (page - 1) * limit
end = start + limit
items_page = cache_items[start:end]
has_more = end < total
if not items_page:
if page == 1:
filter_msg = ""
if folder:
filter_msg = f" in folder {folder}"
elif collection:
filter_msg = f" in collection '{collection}'"
elif tag:
filter_msg = f" with tag '{tag}'"
content = f'
No media{filter_msg}. Upload files or run effects to see them here.
'
else:
return HTMLResponse("") # Empty for infinite scroll
else:
html_parts = []
for item in items_page:
content_hash = item["hash"]
cache_path = get_cache_path(content_hash)
media_type = detect_media_type(cache_path) if cache_path else "unknown"
# Format size
size = item["size"]
if size > 1024*1024:
size_str = f"{size/(1024*1024):.1f} MB"
elif size > 1024:
size_str = f"{size/1024:.1f} KB"
else:
size_str = f"{size} bytes"
html_parts.append(f'''
')
# For infinite scroll, just return cards if not first page
if page > 1:
if has_more:
query_params = f"page={page + 1}"
if folder:
query_params += f"&folder={folder}"
if collection:
query_params += f"&collection={collection}"
if tag:
query_params += f"&tag={tag}"
html_parts.append(f'''
Loading more...
''')
return HTMLResponse('\n'.join(html_parts))
# First page - full content
infinite_scroll_trigger = ""
if has_more:
query_params = "page=2"
if folder:
query_params += f"&folder={folder}"
if collection:
query_params += f"&collection={collection}"
if tag:
query_params += f"&tag={tag}"
infinite_scroll_trigger = f'''
Loading more...
'''
content = f'''
Media ({total} items)
{''.join(html_parts)}
{infinite_scroll_trigger}
'''
return HTMLResponse(render_page("Media", content, ctx.actor_id, active_tab="media"))
# JSON response for APIs - list all hashes with optional pagination
all_hashes = [cf.content_hash for cf in cache_manager.list_all()]
total = len(all_hashes)
start = (page - 1) * limit
end = start + limit
hashes_page = all_hashes[start:end]
has_more = end < total
return {
"hashes": hashes_page,
"pagination": {
"page": page,
"limit": limit,
"total": total,
"has_more": has_more
}
}
@app.delete("/cache/{content_hash}")
async def discard_cache(content_hash: str, ctx: UserContext = Depends(get_required_user_context)):
"""
Discard (delete) a cached item.
Enforces deletion rules:
- Cannot delete items published to L2 (shared)
- Cannot delete inputs/outputs of activities (runs)
- Cannot delete pinned items
"""
# Check if content exists
if not cache_manager.has_content(content_hash):
raise HTTPException(404, "Content not found")
# Check ownership
user_hashes = await get_user_cache_hashes(ctx.username, ctx.actor_id)
if content_hash not in user_hashes:
raise HTTPException(403, "Access denied")
# Check if pinned
meta = await database.load_item_metadata(content_hash, ctx.actor_id)
if meta.get("pinned"):
pin_reason = meta.get("pin_reason", "unknown")
raise HTTPException(400, f"Cannot discard pinned item (reason: {pin_reason})")
# Check if used by any run (Redis runs, not just activity store)
runs_using = await asyncio.to_thread(find_runs_using_content, content_hash)
if runs_using:
run, role = runs_using[0]
raise HTTPException(400, f"Cannot discard: item is {role} of run {run.run_id}")
# Check deletion rules via cache_manager (L2 shared status, activity store)
can_delete, reason = await asyncio.to_thread(cache_manager.can_delete, content_hash)
if not can_delete:
raise HTTPException(400, f"Cannot discard: {reason}")
# Delete via cache_manager
success, msg = await asyncio.to_thread(cache_manager.delete_by_content_hash, content_hash)
if not success:
# Fallback to legacy deletion
cache_path = get_cache_path(content_hash)
if cache_path and cache_path.exists():
cache_path.unlink()
# Clean up legacy metadata files
meta_path = CACHE_DIR / f"{content_hash}.meta.json"
if meta_path.exists():
meta_path.unlink()
mp4_path = CACHE_DIR / f"{content_hash}.mp4"
if mp4_path.exists():
mp4_path.unlink()
return {"discarded": True, "content_hash": content_hash}
@app.delete("/ui/cache/{content_hash}/discard", response_class=HTMLResponse)
async def ui_discard_cache(content_hash: str, request: Request):
"""HTMX handler: discard a cached item."""
ctx = await get_user_context_from_cookie(request)
if not ctx:
return '
Login required
'
# Check ownership
user_hashes = await get_user_cache_hashes(ctx.username, ctx.actor_id)
if content_hash not in user_hashes:
return '
Access denied
'
# Check if content exists
has_content = await asyncio.to_thread(cache_manager.has_content, content_hash)
if not has_content:
return '
Content not found
'
# Check if pinned
meta = await database.load_item_metadata(content_hash, ctx.actor_id)
if meta.get("pinned"):
pin_reason = meta.get("pin_reason", "unknown")
return f'
Cannot discard: item is pinned ({pin_reason})
'
# Check if used by any run (Redis runs, not just activity store)
runs_using = await asyncio.to_thread(find_runs_using_content, content_hash)
if runs_using:
run, role = runs_using[0]
return f'
Cannot discard: item is {role} of run {run.run_id}
'
# Check deletion rules via cache_manager (L2 shared status, activity store)
can_delete, reason = await asyncio.to_thread(cache_manager.can_delete, content_hash)
if not can_delete:
return f'
Cannot discard: {reason}
'
# Delete via cache_manager
success, msg = await asyncio.to_thread(cache_manager.delete_by_content_hash, content_hash)
if not success:
# Fallback to legacy deletion
cache_path = get_cache_path(content_hash)
if cache_path and cache_path.exists():
cache_path.unlink()
# Clean up legacy metadata files
meta_path = CACHE_DIR / f"{content_hash}.meta.json"
if meta_path.exists():
meta_path.unlink()
mp4_path = CACHE_DIR / f"{content_hash}.mp4"
if mp4_path.exists():
mp4_path.unlink()
return '''
'''
# Known assets (bootstrap data)
KNOWN_ASSETS = {
"cat": "33268b6e167deaf018cc538de12dbe562612b33e89a749391cef855b320a269b",
}
@app.get("/assets")
async def list_assets():
"""List known assets."""
return KNOWN_ASSETS
@app.post("/cache/import")
async def import_to_cache(path: str):
"""Import a local file to cache."""
source = Path(path)
if not source.exists():
raise HTTPException(404, f"File not found: {path}")
content_hash = await cache_file(source)
return {"content_hash": content_hash, "cached": True}
def save_cache_meta(content_hash: str, uploader: str = None, filename: str = None, **updates):
"""Save or update metadata for a cached file."""
meta_path = CACHE_DIR / f"{content_hash}.meta.json"
# Load existing or create new
if meta_path.exists():
with open(meta_path) as f:
meta = json.load(f)
else:
meta = {
"uploader": uploader,
"uploaded_at": datetime.now(timezone.utc).isoformat(),
"filename": filename
}
# Apply updates (but never change uploader or uploaded_at)
for key, value in updates.items():
if key not in ("uploader", "uploaded_at"):
meta[key] = value
with open(meta_path, "w") as f:
json.dump(meta, f, indent=2)
return meta
def load_cache_meta(content_hash: str) -> dict:
"""Load metadata for a cached file."""
meta_path = CACHE_DIR / f"{content_hash}.meta.json"
if meta_path.exists():
with open(meta_path) as f:
return json.load(f)
return {}
# User data storage (folders, collections)
USER_DATA_DIR = CACHE_DIR / ".user-data"
def load_user_data(username: str) -> dict:
"""Load user's folders and collections."""
USER_DATA_DIR.mkdir(parents=True, exist_ok=True)
# Normalize username (remove @ prefix if present)
safe_name = username.replace("@", "").replace("/", "_")
user_file = USER_DATA_DIR / f"{safe_name}.json"
if user_file.exists():
with open(user_file) as f:
return json.load(f)
return {"folders": ["/"], "collections": []}
def save_user_data(username: str, data: dict):
"""Save user's folders and collections."""
USER_DATA_DIR.mkdir(parents=True, exist_ok=True)
safe_name = username.replace("@", "").replace("/", "_")
user_file = USER_DATA_DIR / f"{safe_name}.json"
with open(user_file, "w") as f:
json.dump(data, f, indent=2)
async def get_user_cache_hashes(username: str, actor_id: Optional[str] = None) -> set:
"""Get all cache hashes owned by or associated with a user.
username: The plain username
actor_id: The full actor ID (@user@server), if available
"""
# Match against both formats for backwards compatibility
match_values = [username]
if actor_id:
match_values.append(actor_id)
hashes = set()
# Query database for items owned by user (new system)
if actor_id:
try:
db_items = await database.get_user_items(actor_id)
for item in db_items:
hashes.add(item["content_hash"])
except Exception:
pass # Database may not be initialized
# Legacy: Files uploaded by user (JSON metadata)
if CACHE_DIR.exists():
for f in CACHE_DIR.iterdir():
if f.name.endswith('.meta.json'):
try:
meta_path = CACHE_DIR / f.name
if meta_path.exists():
import json
with open(meta_path, 'r') as mf:
meta = json.load(mf)
if meta.get("uploader") in match_values:
hashes.add(f.name.replace('.meta.json', ''))
except Exception:
pass
# Files from user's runs (inputs and outputs)
for run in list_all_runs():
if run.username in match_values:
hashes.update(run.inputs)
if run.output_hash:
hashes.add(run.output_hash)
return hashes
@app.post("/cache/upload")
async def upload_to_cache(file: UploadFile = File(...), ctx: UserContext = Depends(get_required_user_context)):
"""Upload a file to cache. Requires authentication."""
# Write to temp file first
import tempfile
with tempfile.NamedTemporaryFile(delete=False) as tmp:
content = await file.read()
tmp.write(content)
tmp_path = Path(tmp.name)
# Store in cache via cache_manager
cached, ipfs_cid = cache_manager.put(tmp_path, node_type="upload", move=True)
content_hash = cached.content_hash
# Save to cache_items table (with IPFS CID)
await database.create_cache_item(content_hash, ipfs_cid)
# Save uploader metadata to database
await database.save_item_metadata(
content_hash=content_hash,
actor_id=ctx.actor_id,
item_type="media",
filename=file.filename
)
return {"content_hash": content_hash, "filename": file.filename, "size": len(content)}
class CacheMetaUpdate(BaseModel):
"""Request to update cache metadata."""
origin: Optional[dict] = None # {"type": "self"|"external", "url": "...", "note": "..."}
description: Optional[str] = None
tags: Optional[list[str]] = None
folder: Optional[str] = None
collections: Optional[list[str]] = None
class PublishRequest(BaseModel):
"""Request to publish a cache item to L2."""
asset_name: str
asset_type: str = "image" # image, video, etc.
class AddStorageRequest(BaseModel):
"""Request to add a storage provider."""
provider_type: str # 'pinata', 'web3storage', 'local', etc.
provider_name: Optional[str] = None # User-friendly name
config: dict # Provider-specific config (api_key, path, etc.)
capacity_gb: int # Storage capacity in GB
class UpdateStorageRequest(BaseModel):
"""Request to update a storage provider."""
config: Optional[dict] = None
capacity_gb: Optional[int] = None
is_active: Optional[bool] = None
@app.get("/cache/{content_hash}/meta")
async def get_cache_meta(content_hash: str, ctx: UserContext = Depends(get_required_user_context)):
"""Get metadata for a cached file."""
# Check file exists
cache_path = get_cache_path(content_hash)
if not cache_path:
raise HTTPException(404, "Content not found")
# Check ownership
user_hashes = await get_user_cache_hashes(ctx.username, ctx.actor_id)
if content_hash not in user_hashes:
raise HTTPException(403, "Access denied")
return await database.load_item_metadata(content_hash, ctx.actor_id)
@app.patch("/cache/{content_hash}/meta")
async def update_cache_meta(content_hash: str, update: CacheMetaUpdate, ctx: UserContext = Depends(get_required_user_context)):
"""Update metadata for a cached file."""
# Check file exists
cache_path = get_cache_path(content_hash)
if not cache_path:
raise HTTPException(404, "Content not found")
# Check ownership
user_hashes = await get_user_cache_hashes(ctx.username, ctx.actor_id)
if content_hash not in user_hashes:
raise HTTPException(403, "Access denied")
# Build update dict from non-None fields
updates = {}
if update.origin is not None:
updates["origin"] = update.origin
if update.description is not None:
updates["description"] = update.description
if update.tags is not None:
updates["tags"] = update.tags
if update.folder is not None:
# Ensure folder exists in user's folder list
user_data = load_user_data(ctx.username)
if update.folder not in user_data["folders"]:
raise HTTPException(400, f"Folder does not exist: {update.folder}")
updates["folder"] = update.folder
if update.collections is not None:
# Validate collections exist
user_data = load_user_data(ctx.username)
existing = {c["name"] for c in user_data["collections"]}
for col in update.collections:
if col not in existing:
raise HTTPException(400, f"Collection does not exist: {col}")
updates["collections"] = update.collections
meta = await database.update_item_metadata(content_hash, ctx.actor_id, **updates)
return meta
@app.post("/cache/{content_hash}/publish")
async def publish_cache_to_l2(
content_hash: str,
req: PublishRequest,
request: Request,
ctx: UserContext = Depends(get_required_user_context)
):
"""
Publish a cache item to L2 (ActivityPub).
Requires origin to be set in metadata before publishing.
"""
# Check file exists
cache_path = get_cache_path(content_hash)
if not cache_path:
raise HTTPException(404, "Content not found")
# Check ownership
user_hashes = await get_user_cache_hashes(ctx.username, ctx.actor_id)
if content_hash not in user_hashes:
raise HTTPException(403, "Access denied")
# Load metadata
meta = await database.load_item_metadata(content_hash, ctx.actor_id)
# Check origin is set
origin = meta.get("origin")
if not origin or "type" not in origin:
raise HTTPException(400, "Origin must be set before publishing. Use --origin self or --origin-url ")
# Get IPFS CID from cache item
cache_item = await database.get_cache_item(content_hash)
ipfs_cid = cache_item.get("ipfs_cid") if cache_item else None
# Get auth token to pass to L2
token = request.cookies.get("auth_token")
if not token:
# Try from header
auth_header = request.headers.get("Authorization", "")
if auth_header.startswith("Bearer "):
token = auth_header[7:]
if not token:
raise HTTPException(401, "Authentication token required")
# Call L2 publish-cache endpoint (use user's L2 server)
l2_server = ctx.l2_server
try:
resp = http_requests.post(
f"{l2_server}/assets/publish-cache",
headers={"Authorization": f"Bearer {token}"},
json={
"content_hash": content_hash,
"ipfs_cid": ipfs_cid,
"asset_name": req.asset_name,
"asset_type": req.asset_type,
"origin": origin,
"description": meta.get("description"),
"tags": meta.get("tags", []),
"metadata": {
"filename": meta.get("filename"),
"folder": meta.get("folder"),
"collections": meta.get("collections", [])
}
},
timeout=10
)
resp.raise_for_status()
l2_result = resp.json()
except http_requests.exceptions.HTTPError as e:
error_detail = ""
try:
error_detail = e.response.json().get("detail", str(e))
except Exception:
error_detail = str(e)
raise HTTPException(400, f"L2 publish failed: {error_detail}")
except Exception as e:
raise HTTPException(500, f"L2 publish failed: {e}")
# Update local metadata with publish status and pin
await database.save_l2_share(
content_hash=content_hash,
actor_id=ctx.actor_id,
l2_server=l2_server,
asset_name=req.asset_name,
content_type=req.asset_type
)
await database.update_item_metadata(
content_hash=content_hash,
actor_id=ctx.actor_id,
pinned=True,
pin_reason="published"
)
return {
"published": True,
"asset_name": req.asset_name,
"l2_result": l2_result
}
@app.patch("/cache/{content_hash}/republish")
async def republish_cache_to_l2(
content_hash: str,
request: Request,
ctx: UserContext = Depends(get_required_user_context)
):
"""
Re-publish (update) a cache item on L2 after metadata changes.
Only works for already-published items.
"""
# Check file exists
cache_path = get_cache_path(content_hash)
if not cache_path:
raise HTTPException(404, "Content not found")
# Check ownership
user_hashes = await get_user_cache_hashes(ctx.username, ctx.actor_id)
if content_hash not in user_hashes:
raise HTTPException(403, "Access denied")
# Load metadata
meta = await database.load_item_metadata(content_hash, ctx.actor_id)
l2_shares = meta.get("l2_shares", [])
# Find share for current L2 server (user's L2)
l2_server = ctx.l2_server
current_share = None
for share in l2_shares:
if share.get("l2_server") == l2_server:
current_share = share
break
if not current_share:
raise HTTPException(400, "Item not published yet. Use publish first.")
asset_name = current_share.get("asset_name")
if not asset_name:
raise HTTPException(400, "No asset name found in publish info")
# Get auth token
token = request.cookies.get("auth_token")
if not token:
auth_header = request.headers.get("Authorization", "")
if auth_header.startswith("Bearer "):
token = auth_header[7:]
if not token:
raise HTTPException(401, "Authentication token required")
# Get IPFS CID from cache item
cache_item = await database.get_cache_item(content_hash)
ipfs_cid = cache_item.get("ipfs_cid") if cache_item else None
# Call L2 update endpoint (use user's L2 server)
l2_server = ctx.l2_server
try:
resp = http_requests.patch(
f"{l2_server}/assets/{asset_name}",
headers={"Authorization": f"Bearer {token}"},
json={
"description": meta.get("description"),
"tags": meta.get("tags"),
"origin": meta.get("origin"),
"ipfs_cid": ipfs_cid,
"metadata": {
"filename": meta.get("filename"),
"folder": meta.get("folder"),
"collections": meta.get("collections", [])
}
},
timeout=10
)
resp.raise_for_status()
l2_result = resp.json()
except http_requests.exceptions.HTTPError as e:
error_detail = ""
try:
error_detail = e.response.json().get("detail", str(e))
except Exception:
error_detail = str(e)
raise HTTPException(400, f"L2 update failed: {error_detail}")
except Exception as e:
raise HTTPException(500, f"L2 update failed: {e}")
# Update local metadata - save_l2_share updates last_synced_at on conflict
await database.save_l2_share(
content_hash=content_hash,
actor_id=ctx.actor_id,
l2_server=l2_server,
asset_name=asset_name,
content_type=current_share.get("content_type", "media")
)
return {
"updated": True,
"asset_name": asset_name,
"l2_result": l2_result
}
# ============ L2 Sync ============
def _fetch_l2_outbox_sync(l2_server: str, username: str) -> list:
"""Fetch user's outbox from L2 (sync version for asyncio.to_thread)."""
try:
# Fetch outbox page with activities
resp = http_requests.get(
f"{l2_server}/users/{username}/outbox?page=true",
headers={"Accept": "application/activity+json"},
timeout=10
)
if resp.status_code != 200:
logger.warning(f"L2 outbox fetch failed: {resp.status_code}")
return []
data = resp.json()
return data.get("orderedItems", [])
except Exception as e:
logger.error(f"Failed to fetch L2 outbox: {e}")
return []
@app.post("/user/sync-l2")
async def sync_with_l2(ctx: UserContext = Depends(get_required_user_context)):
"""
Sync local L2 share records with user's L2 outbox.
Fetches user's published assets from their L2 server and updates local tracking.
"""
l2_server = ctx.l2_server
username = ctx.username
# Fetch outbox activities
activities = await asyncio.to_thread(_fetch_l2_outbox_sync, l2_server, username)
if not activities:
return {"synced": 0, "message": "No activities found or L2 unavailable"}
# Process Create activities for assets
synced_count = 0
for activity in activities:
if activity.get("type") != "Create":
continue
obj = activity.get("object", {})
if not isinstance(obj, dict):
continue
# Get asset info - look for content_hash in attachment or directly
content_hash = None
asset_name = obj.get("name", "")
# Check attachments for content hash
for attachment in obj.get("attachment", []):
if attachment.get("name") == "content_hash":
content_hash = attachment.get("value")
break
# Also check if there's a hash in the object URL or ID
if not content_hash:
# Try to extract from object ID like /objects/{hash}
obj_id = obj.get("id", "")
if "/objects/" in obj_id:
content_hash = obj_id.split("/objects/")[-1].split("/")[0]
if not content_hash or not asset_name:
continue
# Check if we have this content locally
cache_path = get_cache_path(content_hash)
if not cache_path:
continue # We don't have this content, skip
# Determine content type from object type
obj_type = obj.get("type", "")
if obj_type == "Video":
content_type = "video"
elif obj_type == "Image":
content_type = "image"
else:
content_type = "media"
# Update local L2 share record
await database.save_l2_share(
content_hash=content_hash,
actor_id=ctx.actor_id,
l2_server=l2_server,
asset_name=asset_name,
content_type=content_type
)
synced_count += 1
return {"synced": synced_count, "total_activities": len(activities)}
@app.post("/ui/sync-l2", response_class=HTMLResponse)
async def ui_sync_with_l2(request: Request):
"""HTMX handler: sync with L2 server."""
ctx = await get_user_context_from_cookie(request)
if not ctx:
return '
Login required
'
try:
result = await sync_with_l2(ctx)
synced = result.get("synced", 0)
total = result.get("total_activities", 0)
if synced > 0:
return f'''
Synced {synced} asset(s) from L2 ({total} activities found)
'''
# ============ Folder & Collection Management ============
@app.get("/user/folders")
async def list_folders(username: str = Depends(get_required_user)):
"""List user's folders."""
user_data = load_user_data(username)
return {"folders": user_data["folders"]}
@app.post("/user/folders")
async def create_folder(folder_path: str, username: str = Depends(get_required_user)):
"""Create a new folder."""
user_data = load_user_data(username)
# Validate path format
if not folder_path.startswith("/"):
raise HTTPException(400, "Folder path must start with /")
# Check parent exists
parent = "/".join(folder_path.rsplit("/", 1)[:-1]) or "/"
if parent != "/" and parent not in user_data["folders"]:
raise HTTPException(400, f"Parent folder does not exist: {parent}")
# Check doesn't already exist
if folder_path in user_data["folders"]:
raise HTTPException(400, f"Folder already exists: {folder_path}")
user_data["folders"].append(folder_path)
user_data["folders"].sort()
save_user_data(username, user_data)
return {"folder": folder_path, "created": True}
@app.delete("/user/folders")
async def delete_folder(folder_path: str, ctx: UserContext = Depends(get_required_user_context)):
"""Delete a folder (must be empty)."""
if folder_path == "/":
raise HTTPException(400, "Cannot delete root folder")
user_data = load_user_data(ctx.username)
if folder_path not in user_data["folders"]:
raise HTTPException(404, "Folder not found")
# Check no subfolders
for f in user_data["folders"]:
if f.startswith(folder_path + "/"):
raise HTTPException(400, f"Folder has subfolders: {f}")
# Check no items in folder
user_hashes = await get_user_cache_hashes(ctx.username, ctx.actor_id)
for h in user_hashes:
meta = await database.load_item_metadata(h, ctx.actor_id)
if meta.get("folder") == folder_path:
raise HTTPException(400, "Folder is not empty")
user_data["folders"].remove(folder_path)
save_user_data(ctx.username, user_data)
return {"folder": folder_path, "deleted": True}
@app.get("/user/collections")
async def list_collections(username: str = Depends(get_required_user)):
"""List user's collections."""
user_data = load_user_data(username)
return {"collections": user_data["collections"]}
@app.post("/user/collections")
async def create_collection(name: str, username: str = Depends(get_required_user)):
"""Create a new collection."""
user_data = load_user_data(username)
# Check doesn't already exist
for col in user_data["collections"]:
if col["name"] == name:
raise HTTPException(400, f"Collection already exists: {name}")
user_data["collections"].append({
"name": name,
"created_at": datetime.now(timezone.utc).isoformat()
})
save_user_data(username, user_data)
return {"collection": name, "created": True}
@app.delete("/user/collections")
async def delete_collection(name: str, ctx: UserContext = Depends(get_required_user_context)):
"""Delete a collection."""
user_data = load_user_data(ctx.username)
# Find and remove
for i, col in enumerate(user_data["collections"]):
if col["name"] == name:
user_data["collections"].pop(i)
save_user_data(ctx.username, user_data)
# Remove from all cache items
user_hashes = await get_user_cache_hashes(ctx.username, ctx.actor_id)
for h in user_hashes:
meta = await database.load_item_metadata(h, ctx.actor_id)
if name in meta.get("collections", []):
new_collections = [c for c in meta.get("collections", []) if c != name]
await database.update_item_metadata(h, ctx.actor_id, collections=new_collections)
return {"collection": name, "deleted": True}
raise HTTPException(404, "Collection not found")
def is_ios_request(request: Request) -> bool:
"""Check if request is from iOS device."""
ua = request.headers.get("user-agent", "").lower()
return "iphone" in ua or "ipad" in ua
def video_src_for_request(content_hash: str, request: Request) -> str:
"""Get video src URL, using MP4 endpoint for iOS, raw for others."""
if is_ios_request(request):
return f"/cache/{content_hash}/mp4"
return f"/cache/{content_hash}/raw"
def detect_media_type(cache_path: Path) -> str:
"""Detect if file is image or video based on magic bytes."""
with open(cache_path, "rb") as f:
header = f.read(32)
# Video signatures
if header[:4] == b'\x1a\x45\xdf\xa3': # WebM/MKV
return "video"
if header[4:8] == b'ftyp': # MP4/MOV
return "video"
if header[:4] == b'RIFF' and header[8:12] == b'AVI ': # AVI
return "video"
# Image signatures
if header[:8] == b'\x89PNG\r\n\x1a\n': # PNG
return "image"
if header[:2] == b'\xff\xd8': # JPEG
return "image"
if header[:6] in (b'GIF87a', b'GIF89a'): # GIF
return "image"
if header[:4] == b'RIFF' and header[8:12] == b'WEBP': # WebP
return "image"
return "unknown"
async def get_user_context_from_cookie(request) -> Optional[UserContext]:
"""Get user context from auth cookie. Returns full context with actor_id and l2_server."""
token = request.cookies.get("auth_token")
if not token:
return None
return await get_verified_user_context(token)
async def get_user_from_cookie(request) -> Optional[str]:
"""Get username from auth cookie (backwards compat - prefer get_user_context_from_cookie)."""
ctx = await get_user_context_from_cookie(request)
return ctx.username if ctx else None
def wants_html(request: Request) -> bool:
"""Check if request wants HTML (browser) vs JSON (API)."""
accept = request.headers.get("accept", "")
# Check for explicit HTML request
if "text/html" in accept and "application/json" not in accept:
return True
# Check for browser navigation (direct URL access)
fetch_mode = request.headers.get("sec-fetch-mode", "")
if fetch_mode == "navigate":
return True
return False
# Tailwind CSS config for all L1 templates
TAILWIND_CONFIG = '''
'''
def render_page(title: str, content: str, actor_id: Optional[str] = None, active_tab: str = None) -> str:
"""Render a page with nav bar and content. Used for clean URL pages.
actor_id: The user's actor ID (@user@server) or None if not logged in.
"""
user_info = ""
if actor_id:
# Extract username and domain from @username@domain format
parts = actor_id.lstrip("@").split("@")
username = parts[0] if parts else actor_id
domain = parts[1] if len(parts) > 1 else ""
l2_user_url = f"https://{domain}/users/{username}" if domain else "#"
user_info = f'''
"""
def render_ui_html(actor_id: Optional[str] = None, tab: str = "runs") -> str:
"""Render main UI HTML with optional user context.
actor_id: The user's actor ID (@user@server) or None if not logged in.
"""
user_info = ""
if actor_id:
# Extract username and domain from @username@domain format
parts = actor_id.lstrip("@").split("@")
username = parts[0] if parts else actor_id
domain = parts[1] if len(parts) > 1 else ""
l2_user_url = f"https://{domain}/users/{username}" if domain else "#"
user_info = f'''
"""
# Auth - L1 doesn't handle login (user logs in at their L2 server)
# Token can be passed via URL from L2 redirect, then L1 sets its own cookie
@app.get("/auth")
async def auth_callback(auth_token: str = None):
"""
Receive auth token from L2 redirect and set local cookie.
This enables cross-subdomain auth on iOS Safari which blocks shared cookies.
"""
if not auth_token:
return RedirectResponse(url="/", status_code=302)
# Verify the token is valid
ctx = await get_verified_user_context(auth_token)
if not ctx:
return RedirectResponse(url="/", status_code=302)
# Register token for this user (for revocation by username later)
register_user_token(ctx.username, auth_token)
# Set local first-party cookie and redirect to home
response = RedirectResponse(url="/runs", status_code=302)
response.set_cookie(
key="auth_token",
value=auth_token,
httponly=True,
max_age=60 * 60 * 24 * 30, # 30 days
samesite="lax",
secure=True
)
return response
@app.get("/logout")
async def logout():
"""Logout - clear local cookie and redirect to home."""
response = RedirectResponse(url="/", status_code=302)
response.delete_cookie("auth_token")
return response
@app.post("/auth/revoke")
async def auth_revoke(credentials: HTTPAuthorizationCredentials = Depends(security)):
"""
Revoke a token. Called by L2 when user logs out.
The token to revoke is passed in the Authorization header.
"""
if not credentials:
raise HTTPException(401, "No token provided")
token = credentials.credentials
# Verify token is valid before revoking (ensures caller has the token)
ctx = get_user_context_from_token(token)
if not ctx:
raise HTTPException(401, "Invalid token")
# Revoke the token
newly_revoked = revoke_token(token)
return {"revoked": True, "newly_revoked": newly_revoked}
class RevokeUserRequest(BaseModel):
username: str
l2_server: str # L2 server requesting the revocation
@app.post("/auth/revoke-user")
async def auth_revoke_user(request: RevokeUserRequest):
"""
Revoke all tokens for a user. Called by L2 when user logs out.
This handles the case where L2 issued scoped tokens that differ from L2's own token.
"""
# Verify the L2 server is authorized (must be in L1's known list or match token's l2_server)
# For now, we trust any request since this only affects users already on this L1
# Revoke all tokens registered for this user
count = revoke_all_user_tokens(request.username)
return {"revoked": True, "tokens_revoked": count, "username": request.username}
@app.post("/ui/publish-run/{run_id}", response_class=HTMLResponse)
async def ui_publish_run(run_id: str, request: Request):
"""Publish a run to L2 from the web UI. Assets are named by content_hash."""
ctx = await get_user_context_from_cookie(request)
if not ctx:
return HTMLResponse('
Not logged in
')
token = request.cookies.get("auth_token")
if not token:
return HTMLResponse('
Not logged in
')
# Get the run to pin its output and inputs
run = load_run(run_id)
if not run:
return HTMLResponse('
Run not found
')
# Call L2 to publish the run, including this L1's public URL
# Assets are named by their content_hash - no output_name needed
l2_server = ctx.l2_server
try:
resp = http_requests.post(
f"{l2_server}/assets/record-run",
json={"run_id": run_id, "l1_server": L1_PUBLIC_URL},
headers={"Authorization": f"Bearer {token}"},
timeout=30
)
if resp.status_code == 400:
error = resp.json().get("detail", "Bad request")
return HTMLResponse(f'
Error: {error}
')
resp.raise_for_status()
result = resp.json()
# Pin the output and record L2 share
if run.output_hash and result.get("asset"):
await database.update_item_metadata(run.output_hash, ctx.actor_id, pinned=True, pin_reason="published")
# Record L2 share so UI shows published status
cache_path = get_cache_path(run.output_hash)
media_type = detect_media_type(cache_path) if cache_path else "image"
content_type = "video" if media_type == "video" else "image"
await database.save_l2_share(
content_hash=run.output_hash,
actor_id=ctx.actor_id,
l2_server=l2_server,
asset_name=result["asset"]["name"],
content_type=content_type
)
# Pin the inputs (for provenance)
for input_hash in run.inputs:
await database.update_item_metadata(input_hash, ctx.actor_id, pinned=True, pin_reason="input_to_published")
# If this was a recipe-based run, pin the recipe and its fixed inputs
if run.recipe.startswith("recipe:"):
config_name = run.recipe.replace("recipe:", "")
for recipe in list_all_recipes():
if recipe.name == config_name:
# Pin the recipe YAML
cache_manager.pin(recipe.recipe_id, reason="recipe_for_published")
# Pin all fixed inputs referenced by the recipe
for fixed in recipe.fixed_inputs:
if fixed.content_hash:
cache_manager.pin(fixed.content_hash, reason="fixed_input_in_published_recipe")
break
# Use HTTPS for L2 links
l2_https = l2_server.replace("http://", "https://")
asset_name = result["asset"]["name"]
short_name = asset_name[:16] + "..." if len(asset_name) > 20 else asset_name
# Link to activity (the published run) rather than just the asset
activity = result.get("activity")
activity_id = activity.get("activity_id") if activity else None
l2_link = f"{l2_https}/activities/{activity_id}" if activity_id else f"{l2_https}/assets/{asset_name}"
return HTMLResponse(f'''
')
@app.get("/ui/runs", response_class=HTMLResponse)
async def ui_runs(request: Request):
"""HTMX partial: list of runs."""
ctx = await get_user_context_from_cookie(request)
runs = list_all_runs()
# Require login to see runs
if not ctx:
return '
Not logged in.
'
# Filter runs by user - match both plain username and ActivityPub format (@user@domain)
runs = [r for r in runs if r.username in (ctx.username, ctx.actor_id)]
if not runs:
return '
')
return '\n'.join(html_parts)
@app.get("/ui/media-list", response_class=HTMLResponse)
async def ui_media_list(
request: Request,
folder: Optional[str] = None,
collection: Optional[str] = None,
tag: Optional[str] = None
):
"""HTMX partial: list of media items with optional filtering."""
ctx = await get_user_context_from_cookie(request)
# Require login to see media
if not ctx:
return '
Not logged in.
'
# Get hashes owned by/associated with this user
user_hashes = await get_user_cache_hashes(ctx.username, ctx.actor_id)
# Get cache items that belong to the user (from cache_manager)
cache_items = []
seen_hashes = set() # Deduplicate by content_hash
for cached_file in cache_manager.list_all():
content_hash = cached_file.content_hash
if content_hash not in user_hashes:
continue
# Skip duplicates (same content from multiple runs)
if content_hash in seen_hashes:
continue
seen_hashes.add(content_hash)
# Skip recipes - they have their own section
if cached_file.node_type == "recipe":
continue
# Load metadata for filtering
meta = await database.load_item_metadata(content_hash, ctx.actor_id)
# Apply folder filter
if folder:
item_folder = meta.get("folder", "/")
if folder != "/" and not item_folder.startswith(folder):
continue
if folder == "/" and item_folder != "/":
continue
# Apply collection filter
if collection:
if collection not in meta.get("collections", []):
continue
# Apply tag filter
if tag:
if tag not in meta.get("tags", []):
continue
cache_items.append({
"hash": content_hash,
"size": cached_file.size_bytes,
"mtime": cached_file.created_at,
"meta": meta
})
# Sort by modification time (newest first)
cache_items.sort(key=lambda x: x["mtime"], reverse=True)
if not cache_items:
filter_msg = ""
if folder:
filter_msg = f" in folder {folder}"
elif collection:
filter_msg = f" in collection '{collection}'"
elif tag:
filter_msg = f" with tag '{tag}'"
return f'
No cached files{filter_msg}. Upload files or run effects to see them here.
'
html_parts = ['
']
for item in cache_items[:50]: # Limit to 50 items
content_hash = item["hash"]
cache_path = get_cache_path(content_hash)
media_type = detect_media_type(cache_path) if cache_path else "unknown"
# Check IPFS status
cache_item = await database.get_cache_item(content_hash)
ipfs_cid = cache_item.get("ipfs_cid") if cache_item else None
ipfs_badge = 'IPFS' if ipfs_cid else ''
# Check L2 publish status
l2_shares = item["meta"].get("l2_shares", [])
if l2_shares:
first_share = l2_shares[0]
l2_server = first_share.get("l2_server", "")
asset_name = first_share.get("asset_name", "")
asset_url = f"{l2_server}/assets/{asset_name}"
published_badge = f'L2'
else:
published_badge = ''
# Format size
size = item["size"]
if size > 1024*1024:
size_str = f"{size/(1024*1024):.1f} MB"
elif size > 1024:
size_str = f"{size/1024:.1f} KB"
else:
size_str = f"{size} bytes"
html_parts.append(f'''
'
return html
# ============ User Storage Configuration ============
STORAGE_PROVIDERS_INFO = {
"pinata": {"name": "Pinata", "desc": "1GB free, IPFS pinning", "color": "blue"},
"web3storage": {"name": "web3.storage", "desc": "IPFS + Filecoin", "color": "green"},
"nftstorage": {"name": "NFT.Storage", "desc": "Free for NFTs", "color": "pink"},
"infura": {"name": "Infura IPFS", "desc": "5GB free", "color": "orange"},
"filebase": {"name": "Filebase", "desc": "5GB free, S3+IPFS", "color": "cyan"},
"storj": {"name": "Storj", "desc": "25GB free", "color": "indigo"},
"local": {"name": "Local Storage", "desc": "Your own disk", "color": "purple"},
}
@app.get("/storage")
async def list_storage(request: Request):
"""List user's storage providers. HTML for browsers, JSON for API."""
accept = request.headers.get("accept", "")
wants_json = "application/json" in accept and "text/html" not in accept
ctx = await get_user_context_from_cookie(request)
if not ctx:
if wants_json:
raise HTTPException(401, "Authentication required")
return RedirectResponse(url="/auth", status_code=302)
storages = await database.get_user_storage(ctx.actor_id)
# Add usage stats to each storage
for storage in storages:
usage = await database.get_storage_usage(storage["id"])
storage["used_bytes"] = usage["used_bytes"]
storage["pin_count"] = usage["pin_count"]
storage["donated_gb"] = storage["capacity_gb"] // 2
# Mask sensitive config keys for display
if storage.get("config"):
config = storage["config"] if isinstance(storage["config"], dict) else json.loads(storage["config"])
masked = {}
for k, v in config.items():
if "key" in k.lower() or "token" in k.lower() or "secret" in k.lower():
masked[k] = v[:4] + "..." + v[-4:] if len(str(v)) > 8 else "****"
else:
masked[k] = v
storage["config_display"] = masked
if wants_json:
return {"storages": storages}
return await ui_storage_page(ctx.username, storages, request)
@app.post("/storage")
async def add_storage(req: AddStorageRequest, ctx: UserContext = Depends(get_required_user_context)):
"""Add a storage provider."""
valid_types = ["pinata", "web3storage", "nftstorage", "infura", "filebase", "storj", "local"]
if req.provider_type not in valid_types:
raise HTTPException(400, f"Invalid provider type: {req.provider_type}")
# Test the provider connection before saving
provider = storage_providers.create_provider(req.provider_type, {
**req.config,
"capacity_gb": req.capacity_gb
})
if not provider:
raise HTTPException(400, "Failed to create provider with given config")
success, message = await provider.test_connection()
if not success:
raise HTTPException(400, f"Provider connection failed: {message}")
# Save to database
provider_name = req.provider_name or f"{req.provider_type}-{ctx.username}"
storage_id = await database.add_user_storage(
actor_id=ctx.actor_id,
provider_type=req.provider_type,
provider_name=provider_name,
config=req.config,
capacity_gb=req.capacity_gb
)
if not storage_id:
raise HTTPException(500, "Failed to save storage provider")
return {"id": storage_id, "message": f"Storage provider added: {provider_name}"}
@app.post("/storage/add")
async def add_storage_form(
request: Request,
provider_type: str = Form(...),
provider_name: Optional[str] = Form(None),
description: Optional[str] = Form(None),
capacity_gb: int = Form(5),
api_key: Optional[str] = Form(None),
secret_key: Optional[str] = Form(None),
api_token: Optional[str] = Form(None),
project_id: Optional[str] = Form(None),
project_secret: Optional[str] = Form(None),
access_key: Optional[str] = Form(None),
bucket: Optional[str] = Form(None),
path: Optional[str] = Form(None),
):
"""Add a storage provider via HTML form (cookie auth)."""
ctx = await get_user_context_from_cookie(request)
if not ctx:
return HTMLResponse('
Not authenticated
', status_code=401)
valid_types = ["pinata", "web3storage", "nftstorage", "infura", "filebase", "storj", "local"]
if provider_type not in valid_types:
return HTMLResponse(f'
Invalid provider type: {provider_type}
')
# Build config based on provider type
config = {}
if provider_type == "pinata":
if not api_key or not secret_key:
return HTMLResponse('
Pinata requires API Key and Secret Key
')
config = {"api_key": api_key, "secret_key": secret_key}
elif provider_type == "web3storage":
if not api_token:
return HTMLResponse('
web3.storage requires API Token
')
config = {"api_token": api_token}
elif provider_type == "nftstorage":
if not api_token:
return HTMLResponse('
NFT.Storage requires API Token
')
config = {"api_token": api_token}
elif provider_type == "infura":
if not project_id or not project_secret:
return HTMLResponse('
Infura requires Project ID and Project Secret
')
config = {"project_id": project_id, "project_secret": project_secret}
elif provider_type == "filebase":
if not access_key or not secret_key or not bucket:
return HTMLResponse('
Filebase requires Access Key, Secret Key, and Bucket
')
config = {"access_key": access_key, "secret_key": secret_key, "bucket": bucket}
elif provider_type == "storj":
if not access_key or not secret_key or not bucket:
return HTMLResponse('
Storj requires Access Key, Secret Key, and Bucket
')
config = {"access_key": access_key, "secret_key": secret_key, "bucket": bucket}
elif provider_type == "local":
if not path:
return HTMLResponse('
Local storage requires a path
')
config = {"path": path}
# Test the provider connection before saving
provider = storage_providers.create_provider(provider_type, {
**config,
"capacity_gb": capacity_gb
})
if not provider:
return HTMLResponse('
Failed to create provider with given config
')
success, message = await provider.test_connection()
if not success:
return HTMLResponse(f'
Provider connection failed: {message}
')
# Save to database
name = provider_name or f"{provider_type}-{ctx.username}-{len(await database.get_user_storage_by_type(ctx.actor_id, provider_type)) + 1}"
storage_id = await database.add_user_storage(
actor_id=ctx.actor_id,
provider_type=provider_type,
provider_name=name,
config=config,
capacity_gb=capacity_gb,
description=description
)
if not storage_id:
return HTMLResponse('
Failed to save storage provider
')
return HTMLResponse(f'''
Storage provider "{name}" added successfully!
''')
@app.get("/storage/{storage_id}")
async def get_storage(storage_id: int, ctx: UserContext = Depends(get_required_user_context)):
"""Get a specific storage provider."""
storage = await database.get_storage_by_id(storage_id)
if not storage:
raise HTTPException(404, "Storage provider not found")
if storage["actor_id"] != ctx.actor_id:
raise HTTPException(403, "Not authorized")
usage = await database.get_storage_usage(storage_id)
storage["used_bytes"] = usage["used_bytes"]
storage["pin_count"] = usage["pin_count"]
storage["donated_gb"] = storage["capacity_gb"] // 2
return storage
@app.patch("/storage/{storage_id}")
async def update_storage(storage_id: int, req: UpdateStorageRequest, ctx: UserContext = Depends(get_required_user_context)):
"""Update a storage provider."""
storage = await database.get_storage_by_id(storage_id)
if not storage:
raise HTTPException(404, "Storage provider not found")
if storage["actor_id"] != ctx.actor_id:
raise HTTPException(403, "Not authorized")
# If updating config, test the new connection
if req.config:
existing_config = storage["config"] if isinstance(storage["config"], dict) else json.loads(storage["config"])
new_config = {**existing_config, **req.config}
provider = storage_providers.create_provider(storage["provider_type"], {
**new_config,
"capacity_gb": req.capacity_gb or storage["capacity_gb"]
})
if provider:
success, message = await provider.test_connection()
if not success:
raise HTTPException(400, f"Provider connection failed: {message}")
success = await database.update_user_storage(
storage_id,
config=req.config,
capacity_gb=req.capacity_gb,
is_active=req.is_active
)
if not success:
raise HTTPException(500, "Failed to update storage provider")
return {"message": "Storage provider updated"}
@app.delete("/storage/{storage_id}")
async def remove_storage(storage_id: int, request: Request):
"""Remove a storage provider."""
ctx = await get_user_context_from_cookie(request)
if not ctx:
raise HTTPException(401, "Not authenticated")
storage = await database.get_storage_by_id(storage_id)
if not storage:
raise HTTPException(404, "Storage provider not found")
if storage["actor_id"] != ctx.actor_id:
raise HTTPException(403, "Not authorized")
success = await database.remove_user_storage(storage_id)
if not success:
raise HTTPException(500, "Failed to remove storage provider")
if wants_html(request):
return HTMLResponse("")
return {"message": "Storage provider removed"}
@app.post("/storage/{storage_id}/test")
async def test_storage(storage_id: int, request: Request):
"""Test storage provider connectivity."""
ctx = await get_user_context_from_cookie(request)
if not ctx:
if wants_html(request):
return HTMLResponse('Not authenticated', status_code=401)
raise HTTPException(401, "Not authenticated")
storage = await database.get_storage_by_id(storage_id)
if not storage:
if wants_html(request):
return HTMLResponse('Storage not found', status_code=404)
raise HTTPException(404, "Storage provider not found")
if storage["actor_id"] != ctx.actor_id:
if wants_html(request):
return HTMLResponse('Not authorized', status_code=403)
raise HTTPException(403, "Not authorized")
config = storage["config"] if isinstance(storage["config"], dict) else json.loads(storage["config"])
provider = storage_providers.create_provider(storage["provider_type"], {
**config,
"capacity_gb": storage["capacity_gb"]
})
if not provider:
if wants_html(request):
return HTMLResponse('Failed to create provider')
raise HTTPException(500, "Failed to create provider")
success, message = await provider.test_connection()
if wants_html(request):
if success:
return HTMLResponse(f'{message}')
return HTMLResponse(f'{message}')
return {"success": success, "message": message}
@app.get("/storage/type/{provider_type}")
async def storage_type_page(provider_type: str, request: Request):
"""Page for managing storage configs of a specific type."""
ctx = await get_user_context_from_cookie(request)
if not ctx:
return RedirectResponse(url="/auth", status_code=302)
if provider_type not in STORAGE_PROVIDERS_INFO:
raise HTTPException(404, "Invalid provider type")
storages = await database.get_user_storage_by_type(ctx.actor_id, provider_type)
# Add usage stats and mask config
for storage in storages:
usage = await database.get_storage_usage(storage["id"])
storage["used_bytes"] = usage["used_bytes"]
storage["pin_count"] = usage["pin_count"]
# Mask sensitive config keys
if storage.get("config"):
config = storage["config"] if isinstance(storage["config"], dict) else json.loads(storage["config"])
masked = {}
for k, v in config.items():
if "key" in k.lower() or "token" in k.lower() or "secret" in k.lower():
masked[k] = v[:4] + "..." + v[-4:] if len(str(v)) > 8 else "****"
else:
masked[k] = v
storage["config_display"] = masked
info = STORAGE_PROVIDERS_INFO[provider_type]
return await ui_storage_type_page(ctx.username, provider_type, info, storages, request)
async def ui_storage_page(username: str, storages: list, request: Request) -> HTMLResponse:
"""Render the main storage management page."""
# Count by provider type
type_counts = {}
for s in storages:
ptype = s["provider_type"]
type_counts[ptype] = type_counts.get(ptype, 0) + 1
# Build provider type cards
cards = ""
for ptype, info in STORAGE_PROVIDERS_INFO.items():
count = type_counts.get(ptype, 0)
count_badge = f'{count}' if count > 0 else ""
cards += f'''
'''
# Total stats
total_capacity = sum(s["capacity_gb"] for s in storages)
total_used = sum(s["used_bytes"] for s in storages)
total_pins = sum(s["pin_count"] for s in storages)
html = f'''
Storage - Art DAG L1