-
-
-"""
-
-
-@app.get("/", response_class=HTMLResponse)
-async def root(request: Request):
- """Home page."""
- ctx = await get_user_context_from_cookie(request)
- actor_id = ctx.actor_id if ctx else None
- return render_home_html(actor_id)
-
-
-@app.post("/runs", response_model=RunStatus)
-async def create_run(request: RunRequest, ctx: UserContext = Depends(get_required_user_context)):
- """Start a new rendering run. Checks cache before executing."""
- # Compute content-addressable run_id
- run_id = compute_run_id(request.inputs, request.recipe)
-
- # Generate output name if not provided
- output_name = request.output_name or f"{request.recipe}-{run_id[:8]}"
-
- # Use actor_id from user context
- actor_id = ctx.actor_id
-
- # Check L1 cache first
- cached_run = await database.get_run_cache(run_id)
- if cached_run:
- output_cid = cached_run["output_cid"]
- # Verify the output file still exists in cache
- if cache_manager.has_content(output_cid):
- logger.info(f"create_run: Cache hit for run_id={run_id[:16]}... output={output_cid[:16]}...")
- return RunStatus(
- run_id=run_id,
- status="completed",
- recipe=request.recipe,
- inputs=request.inputs,
- output_name=output_name,
- created_at=cached_run.get("created_at", datetime.now(timezone.utc).isoformat()),
- completed_at=cached_run.get("created_at", datetime.now(timezone.utc).isoformat()),
- output_cid=output_cid,
- username=actor_id,
- provenance_cid=cached_run.get("provenance_cid"),
- )
- else:
- logger.info(f"create_run: Cache entry exists but output missing, will re-run")
-
- # Check L2 if not in L1
- l2_server = ctx.l2_server
- try:
- l2_resp = http_requests.get(
- f"{l2_server}/assets/by-run-id/{run_id}",
- timeout=10
- )
- if l2_resp.status_code == 200:
- l2_data = l2_resp.json()
- output_cid = l2_data.get("output_cid")
- ipfs_cid = l2_data.get("ipfs_cid")
- if output_cid and ipfs_cid:
- logger.info(f"create_run: Found on L2, pulling from IPFS: {ipfs_cid}")
- # Pull from IPFS to L1 cache
- import ipfs_client
- legacy_dir = CACHE_DIR / "legacy"
- legacy_dir.mkdir(parents=True, exist_ok=True)
- recovery_path = legacy_dir / output_cid
- if ipfs_client.get_file(ipfs_cid, str(recovery_path)):
- # File retrieved - put() updates indexes, but file is already in legacy location
- # Just update the content and IPFS indexes manually
- cache_manager._set_content_index(output_cid, output_cid)
- cache_manager._set_ipfs_index(output_cid, ipfs_cid)
- # Save to run cache
- await database.save_run_cache(
- run_id=run_id,
- output_cid=output_cid,
- recipe=request.recipe,
- inputs=request.inputs,
- ipfs_cid=ipfs_cid,
- provenance_cid=l2_data.get("provenance_cid"),
- actor_id=actor_id,
- )
- logger.info(f"create_run: Recovered from L2/IPFS: {output_cid[:16]}...")
- return RunStatus(
- run_id=run_id,
- status="completed",
- recipe=request.recipe,
- inputs=request.inputs,
- output_name=output_name,
- created_at=datetime.now(timezone.utc).isoformat(),
- completed_at=datetime.now(timezone.utc).isoformat(),
- output_cid=output_cid,
- username=actor_id,
- provenance_cid=l2_data.get("provenance_cid"),
- )
- except Exception as e:
- logger.warning(f"create_run: L2 lookup failed (will run Celery): {e}")
-
- # Not cached anywhere - create run record and submit to Celery
- run = RunStatus(
- run_id=run_id,
- status="pending",
- recipe=request.recipe,
- inputs=request.inputs,
- output_name=output_name,
- created_at=datetime.now(timezone.utc).isoformat(),
- username=actor_id
- )
-
- # Submit to Celery
- if request.use_dag or request.recipe == "dag":
- # DAG mode - use artdag engine
- if request.dag_json:
- # Custom DAG provided
- dag_json = request.dag_json
- else:
- # Build simple effect DAG from recipe and inputs
- dag = build_effect_dag(request.inputs, request.recipe)
- dag_json = dag.to_json()
-
- task = execute_dag.delay(dag_json, run.run_id)
- else:
- # Legacy mode - single effect
- if len(request.inputs) != 1:
- raise HTTPException(400, "Legacy mode only supports single-input recipes. Use use_dag=true for multi-input.")
-
- input_hash = request.inputs[0]
- task = render_effect.delay(input_hash, request.recipe, output_name)
-
- run.celery_task_id = task.id
- run.status = "running"
-
- await asyncio.to_thread(save_run, run)
- return run
-
-
-def _check_celery_task_sync(task_id: str) -> tuple[bool, bool, Optional[dict], Optional[str]]:
- """Check Celery task status synchronously. Returns (is_ready, is_successful, result, error)."""
- task = celery_app.AsyncResult(task_id)
- if not task.ready():
- return (False, False, None, None)
- if task.successful():
- return (True, True, task.result, None)
- else:
- return (True, False, None, str(task.result))
-
-
-@app.get("/runs/{run_id}", response_model=RunStatus)
-async def get_run(run_id: str):
- """Get status of a run."""
- start = time.time()
- logger.info(f"get_run: Starting for {run_id}")
-
- t0 = time.time()
- run = await asyncio.to_thread(load_run, run_id)
- logger.info(f"get_run: load_run took {time.time()-t0:.3f}s, status={run.status if run else 'None'}")
-
- if not run:
- raise HTTPException(404, f"Run {run_id} not found")
-
- # Check Celery task status if running
- if run.status == "running" and run.celery_task_id:
- t0 = time.time()
- is_ready, is_successful, result, error = await asyncio.to_thread(
- _check_celery_task_sync, run.celery_task_id
- )
- logger.info(f"get_run: Celery check took {time.time()-t0:.3f}s, ready={is_ready}")
-
- if is_ready:
- if is_successful:
- run.status = "completed"
- run.completed_at = datetime.now(timezone.utc).isoformat()
-
- # Handle both legacy (render_effect) and new (execute_dag/run_plan) result formats
- if "output_cid" in result:
- # IPFS-primary mode: everything on IPFS
- run.output_ipfs_cid = result.get("output_cid")
- run.plan_id = result.get("plan_id")
- # Store step CIDs for UI
- run.step_results = {
- step_id: {"cid": cid, "status": "completed"}
- for step_id, cid in result.get("step_cids", {}).items()
- }
- # Try to get cid from cache_id mapping in Redis
- # (cache_id is often the same as cid)
- output_path = None
- elif "output_cid" in result or "output_cache_id" in result:
- # New DAG/plan result format
- run.output_cid = result.get("output_cid") or result.get("output_cache_id")
- run.provenance_cid = result.get("provenance_cid")
- output_path = Path(result.get("output_path", "")) if result.get("output_path") else None
- # Store plan execution data
- run.plan_id = result.get("plan_id")
- run.plan_name = result.get("plan_name")
- run.step_results = result.get("results") # step_id -> result dict
- run.all_outputs = result.get("outputs") # All outputs from all steps
- elif "output" in result:
- # Legacy render_effect format
- run.output_cid = result.get("output", {}).get("cid")
- run.provenance_cid = result.get("provenance_cid")
- output_path = Path(result.get("output", {}).get("local_path", ""))
-
- # Extract effects info from provenance (legacy only)
- effects = result.get("effects", [])
- if effects:
- run.effects_commit = effects[0].get("repo_commit")
- run.effect_url = effects[0].get("repo_url")
-
- # Extract infrastructure info (legacy only)
- run.infrastructure = result.get("infrastructure")
-
- # Cache the output (legacy mode - DAG/plan already caches via cache_manager)
- is_plan_result = "output_cid" in result or "output_cache_id" in result
- if output_path and output_path.exists() and not is_plan_result:
- t0 = time.time()
- await cache_file(output_path, node_type="effect_output")
- logger.info(f"get_run: cache_file took {time.time()-t0:.3f}s")
-
- # Record activity for deletion tracking (legacy mode)
- if run.output_cid and run.inputs:
- await asyncio.to_thread(
- cache_manager.record_simple_activity,
- input_hashes=run.inputs,
- output_cid=run.output_cid,
- run_id=run.run_id,
- )
-
- # Save to run cache for content-addressable lookup
- if run.output_cid:
- ipfs_cid = cache_manager._get_ipfs_cid_from_index(run.output_cid)
- await database.save_run_cache(
- run_id=run.run_id,
- output_cid=run.output_cid,
- recipe=run.recipe,
- inputs=run.inputs,
- ipfs_cid=ipfs_cid,
- provenance_cid=run.provenance_cid,
- actor_id=run.username,
- )
- logger.info(f"get_run: Saved run cache for {run.run_id[:16]}...")
- else:
- run.status = "failed"
- run.error = error
-
- # Save updated status
- t0 = time.time()
- await asyncio.to_thread(save_run, run)
- logger.info(f"get_run: save_run took {time.time()-t0:.3f}s")
-
- logger.info(f"get_run: Total time {time.time()-start:.3f}s")
- return run
-
-
-@app.delete("/runs/{run_id}")
-async def discard_run(run_id: str, ctx: UserContext = Depends(get_required_user_context)):
- """
- Discard (delete) a run and its outputs.
-
- Enforces deletion rules:
- - Cannot discard if output is published to L2 (pinned)
- - Deletes outputs and intermediate cache entries
- - Preserves inputs (cache items and recipes are NOT deleted)
- """
- run = await asyncio.to_thread(load_run, run_id)
- if not run:
- raise HTTPException(404, f"Run {run_id} not found")
-
- # Check ownership
- if run.username not in (ctx.username, ctx.actor_id):
- raise HTTPException(403, "Access denied")
-
- # Failed runs can always be deleted (no output to protect)
- if run.status != "failed":
- # Only check if output is pinned - inputs are preserved, not deleted
- if run.output_cid:
- meta = await database.load_item_metadata(run.output_cid, ctx.actor_id)
- if meta.get("pinned"):
- pin_reason = meta.get("pin_reason", "published")
- raise HTTPException(400, f"Cannot discard run: output {run.output_cid[:16]}... is pinned ({pin_reason})")
-
- # Check if activity exists for this run
- activity = await asyncio.to_thread(cache_manager.get_activity, run_id)
-
- if activity:
- # Discard the activity - only delete outputs, preserve inputs
- success, msg = await asyncio.to_thread(cache_manager.discard_activity_outputs_only, run_id)
- if not success:
- raise HTTPException(400, f"Cannot discard run: {msg}")
-
- # Remove from Redis
- await asyncio.to_thread(redis_client.delete, f"{RUNS_KEY_PREFIX}{run_id}")
-
- return {"discarded": True, "run_id": run_id}
-
-
-@app.delete("/ui/runs/{run_id}/discard", response_class=HTMLResponse)
-async def ui_discard_run(run_id: str, request: Request):
- """HTMX handler: discard a run. Only deletes outputs, preserves inputs."""
- ctx = await get_user_context_from_cookie(request)
- if not ctx:
- return '
Login required
'
-
- run = await asyncio.to_thread(load_run, run_id)
- if not run:
- return '
Run not found
'
-
- # Check ownership
- if run.username not in (ctx.username, ctx.actor_id):
- return '
Access denied
'
-
- # Failed runs can always be deleted
- if run.status != "failed":
- # Only check if output is pinned - inputs are preserved, not deleted
- if run.output_cid:
- meta = await database.load_item_metadata(run.output_cid, ctx.actor_id)
- if meta.get("pinned"):
- pin_reason = meta.get("pin_reason", "published")
- return f'
Cannot discard: output is pinned ({pin_reason})
'
-
- # Check if activity exists for this run
- activity = await asyncio.to_thread(cache_manager.get_activity, run_id)
-
- if activity:
- # Discard the activity - only delete outputs, preserve inputs
- success, msg = await asyncio.to_thread(cache_manager.discard_activity_outputs_only, run_id)
- if not success:
- return f'
- '''
-
-
-@app.get("/run/{run_id}")
-async def run_detail(run_id: str, request: Request):
- """Run detail. HTML for browsers, JSON for APIs."""
- run = await asyncio.to_thread(load_run, run_id)
- if not run:
- if wants_html(request):
- content = f'
Run not found: {run_id}
'
- return HTMLResponse(render_page("Not Found", content, None, active_tab="runs"), status_code=404)
- raise HTTPException(404, f"Run {run_id} not found")
-
- # Check Celery task status if running
- if run.status == "running" and run.celery_task_id:
- is_ready, is_successful, result, error = await asyncio.to_thread(
- _check_celery_task_sync, run.celery_task_id
- )
- if is_ready:
- if is_successful:
- run.status = "completed"
- run.completed_at = datetime.now(timezone.utc).isoformat()
- run.output_cid = result.get("output", {}).get("cid")
- effects = result.get("effects", [])
- if effects:
- run.effects_commit = effects[0].get("repo_commit")
- run.effect_url = effects[0].get("repo_url")
- run.infrastructure = result.get("infrastructure")
- output_path = Path(result.get("output", {}).get("local_path", ""))
- if output_path.exists():
- await cache_file(output_path)
- # Save to run cache for content-addressable lookup
- if run.output_cid:
- ipfs_cid = cache_manager._get_ipfs_cid_from_index(run.output_cid)
- await database.save_run_cache(
- run_id=run.run_id,
- output_cid=run.output_cid,
- recipe=run.recipe,
- inputs=run.inputs,
- ipfs_cid=ipfs_cid,
- provenance_cid=run.provenance_cid,
- actor_id=run.username,
- )
- else:
- run.status = "failed"
- run.error = error
- await asyncio.to_thread(save_run, run)
-
- if wants_html(request):
- ctx = await get_user_context_from_cookie(request)
- if not ctx:
- content = '
Not logged in.
'
- return HTMLResponse(render_page("Login Required", content, None, active_tab="runs"), status_code=401)
-
- # Check user owns this run
- if run.username not in (ctx.username, ctx.actor_id):
- content = '
Access denied.
'
- return HTMLResponse(render_page("Access Denied", content, ctx.actor_id, active_tab="runs"), status_code=403)
-
- # Build effect URL
- if run.effect_url:
- effect_url = run.effect_url
- elif run.effects_commit and run.effects_commit != "unknown":
- effect_url = f"https://git.rose-ash.com/art-dag/effects/src/commit/{run.effects_commit}/{run.recipe}"
- else:
- effect_url = f"https://git.rose-ash.com/art-dag/effects/src/branch/main/{run.recipe}"
-
- # Status badge colors
- status_colors = {
- "completed": "bg-green-600 text-white",
- "running": "bg-yellow-600 text-white",
- "failed": "bg-red-600 text-white",
- "pending": "bg-gray-600 text-white"
- }
- status_badge = status_colors.get(run.status, "bg-gray-600 text-white")
-
- # Try to get input names from recipe
- input_names = {}
- recipe_name = run.recipe.replace("recipe:", "") if run.recipe.startswith("recipe:") else run.recipe
- for recipe in list_all_recipes():
- if recipe.name == recipe_name:
- # Match variable inputs first, then fixed inputs
- for i, var_input in enumerate(recipe.variable_inputs):
- if i < len(run.inputs):
- input_names[run.inputs[i]] = var_input.name
- # Fixed inputs follow variable inputs
- offset = len(recipe.variable_inputs)
- for i, fixed_input in enumerate(recipe.fixed_inputs):
- idx = offset + i
- if idx < len(run.inputs):
- input_names[run.inputs[idx]] = fixed_input.asset
- break
-
- # Build media HTML for inputs and output
- media_html = ""
- available_inputs = [inp for inp in run.inputs if cache_manager.has_content(inp)]
- has_output = run.status == "completed" and run.output_cid and cache_manager.has_content(run.output_cid)
- has_ipfs_output = run.status == "completed" and run.output_ipfs_cid and not has_output
-
- if available_inputs or has_output or has_ipfs_output:
- # Flexible grid - more columns for more items
- num_items = len(available_inputs) + (1 if has_output else 0)
- grid_cols = min(num_items, 3) # Max 3 columns
- media_html = f'
- '''
-
- return HTMLResponse(render_page(f"Run: {run.recipe}", content, ctx.actor_id, active_tab="runs"))
-
- # JSON response
- return run.model_dump()
-
-
-# Plan/Analysis cache directories (match tasks/orchestrate.py)
-PLAN_CACHE_DIR = CACHE_DIR / 'plans'
-ANALYSIS_CACHE_DIR = CACHE_DIR / 'analysis'
-
-
-def load_plan_for_run(run: RunStatus) -> Optional[dict]:
- """Load plan data for a run, trying plan_id first, then matching by inputs."""
- PLAN_CACHE_DIR.mkdir(parents=True, exist_ok=True)
-
- logger.info(f"[load_plan] run_id={run.run_id[:16]}, plan_id={run.plan_id}, inputs={run.inputs}")
- logger.info(f"[load_plan] PLAN_CACHE_DIR={PLAN_CACHE_DIR}")
-
- # First try by plan_id if available
- if run.plan_id:
- plan_file = PLAN_CACHE_DIR / f"{run.plan_id}.json"
- logger.info(f"[load_plan] Trying plan_id file: {plan_file}, exists={plan_file.exists()}")
- if plan_file.exists():
- try:
- with open(plan_file) as f:
- return json.load(f)
- except (json.JSONDecodeError, IOError) as e:
- logger.warning(f"[load_plan] Failed to load plan file: {e}")
-
- # List available plan files
- plan_files = list(PLAN_CACHE_DIR.glob("*.json"))
- logger.info(f"[load_plan] Available plan files: {len(plan_files)}")
-
- # Fall back to matching by inputs
- for plan_file in plan_files:
- try:
- with open(plan_file) as f:
- data = json.load(f)
- plan_inputs = data.get("input_hashes", {})
- if run.inputs and set(plan_inputs.values()) == set(run.inputs):
- logger.info(f"[load_plan] Found matching plan by inputs: {plan_file}")
- return data
- except (json.JSONDecodeError, IOError):
- continue
-
- # Try to load from plan_json in step_results (for IPFS_PRIMARY mode)
- if run.step_results:
- logger.info(f"[load_plan] Checking step_results for embedded plan, keys={list(run.step_results.keys())[:5]}")
- # Check if there's embedded plan data
- for step_id, result in run.step_results.items():
- if isinstance(result, dict) and "plan_json" in result:
- logger.info(f"[load_plan] Found embedded plan_json in step {step_id}")
- try:
- return json.loads(result["plan_json"])
- except (json.JSONDecodeError, TypeError):
- pass
-
- logger.warning(f"[load_plan] No plan found for run {run.run_id[:16]}")
- return None
-
-
-async def load_plan_for_run_with_fallback(run: RunStatus) -> Optional[dict]:
- """Load plan data for a run, with fallback to generate from recipe."""
- # First try cached plans
- plan_data = load_plan_for_run(run)
- if plan_data:
- return plan_data
-
- # Fallback: generate from recipe
- recipe_name = run.recipe.replace("recipe:", "") if run.recipe.startswith("recipe:") else run.recipe
- recipe_status = None
- for recipe in list_all_recipes():
- if recipe.name == recipe_name:
- recipe_status = recipe
- break
-
- if recipe_status:
- recipe_path = cache_manager.get_by_cid(recipe_status.recipe_id)
- if recipe_path and recipe_path.exists():
- try:
- recipe_yaml = recipe_path.read_text()
- # Build input_hashes mapping from run inputs
- input_hashes = {}
- for i, var_input in enumerate(recipe_status.variable_inputs):
- if i < len(run.inputs):
- input_hashes[var_input.node_id] = run.inputs[i]
-
- # Try to generate plan
- try:
- from tasks.orchestrate import generate_plan as gen_plan_task
- plan_result = gen_plan_task(recipe_yaml, input_hashes)
- if plan_result and plan_result.get("status") == "planned":
- return plan_result
- except ImportError:
- pass
- except Exception as e:
- logger.warning(f"Failed to generate plan for run {run.run_id}: {e}")
-
- return None
-
-
-@app.get("/run/{run_id}/plan/node/{cache_id}", response_class=HTMLResponse)
-async def run_plan_node_detail(run_id: str, cache_id: str, request: Request):
- """HTMX partial: Get node detail HTML fragment by cache_id."""
- ctx = await get_user_context_from_cookie(request)
- if not ctx:
- return HTMLResponse('
Login required
', status_code=401)
-
- run = await asyncio.to_thread(load_run, run_id)
- if not run:
- return HTMLResponse(f'
Run not found
', status_code=404)
-
- # Load plan data (with fallback to generate from recipe)
- plan_data = await load_plan_for_run_with_fallback(run)
-
- if not plan_data:
- return HTMLResponse('
Plan not found
')
-
- # Build a lookup from cache_id to step and step_id to step
- steps_by_cache_id = {}
- steps_by_step_id = {}
- for s in plan_data.get("steps", []):
- if s.get("cache_id"):
- steps_by_cache_id[s["cache_id"]] = s
- if s.get("step_id"):
- steps_by_step_id[s["step_id"]] = s
-
- # Find the step by cache_id
- step = steps_by_cache_id.get(cache_id)
- if not step:
- return HTMLResponse(f'
Step with cache {cache_id[:16]}... not found
')
-
- # Get step info
- step_id = step.get("step_id", "")
- step_name = step.get("name", step_id[:20])
- node_type = step.get("node_type", "EFFECT")
- config = step.get("config", {})
- level = step.get("level", 0)
- input_steps = step.get("input_steps", [])
- input_hashes = step.get("input_hashes", {})
-
- # Check for IPFS CID
- step_cid = None
- if run.step_results:
- res = run.step_results.get(step_id)
- if isinstance(res, dict) and res.get("cid"):
- step_cid = res["cid"]
-
- has_cached = cache_manager.has_content(cache_id) if cache_id else False
- color = NODE_COLORS.get(node_type, NODE_COLORS["default"])
-
- # Build INPUT media previews - show each input's cached content
- inputs_html = ""
- if input_steps:
- input_items = ""
- for inp_step_id in input_steps:
- inp_step = steps_by_step_id.get(inp_step_id)
- if inp_step:
- inp_cache_id = inp_step.get("cache_id", "")
- inp_name = inp_step.get("name", inp_step_id[:12])
- inp_has_cached = cache_manager.has_content(inp_cache_id) if inp_cache_id else False
-
- # Build preview thumbnail for input
- inp_preview = ""
- if inp_has_cached and inp_cache_id:
- inp_media_type = detect_media_type(get_cache_path(inp_cache_id))
- if inp_media_type == "video":
- inp_preview = f''
- elif inp_media_type == "image":
- inp_preview = f''
- else:
- inp_preview = f'
'''
-
- # Build output link
- output_link = ""
- if step_cid:
- output_link = f'''
-
- {step_cid[:24]}...
- View
- '''
- elif has_cached and cache_id:
- output_link = f'''
-
- {cache_id[:24]}...
- View
- '''
-
- # Config/parameters display
- config_html = ""
- if config:
- config_items = ""
- for key, value in config.items():
- if isinstance(value, (dict, list)):
- value_str = json.dumps(value)
- else:
- value_str = str(value)
- config_items += f'
{key}:{value_str[:30]}
'
- config_html = f'''
-
-
Parameters
-
{config_items}
-
'''
-
- status = "cached" if (has_cached or step_cid) else ("completed" if run.status == "completed" else "pending")
- status_color = "green" if status in ("cached", "completed") else "yellow"
-
- return HTMLResponse(f'''
-
- ''')
-
-
-@app.get("/run/{run_id}/plan", response_class=HTMLResponse)
-async def run_plan_visualization(run_id: str, request: Request, node: Optional[str] = None):
- """Visualize execution plan as interactive DAG."""
- ctx = await get_user_context_from_cookie(request)
- if not ctx:
- content = '
Not logged in.
'
- return HTMLResponse(render_page("Login Required", content, None, active_tab="runs"), status_code=401)
-
- run = await asyncio.to_thread(load_run, run_id)
- if not run:
- content = f'
Run not found: {run_id}
'
- return HTMLResponse(render_page("Not Found", content, ctx.actor_id, active_tab="runs"), status_code=404)
-
- # Check user owns this run
- if run.username not in (ctx.username, ctx.actor_id):
- content = '
Access denied.
'
- return HTMLResponse(render_page("Access Denied", content, ctx.actor_id, active_tab="runs"), status_code=403)
-
- # Load plan data (with fallback to generate from recipe)
- plan_data = await load_plan_for_run_with_fallback(run)
-
- # Build sub-navigation tabs
- tabs_html = render_run_sub_tabs(run_id, active="plan")
-
- if not plan_data:
- # Show a simpler visualization based on the run's recipe structure
- content = f'''
-
-
- Back to runs
-
-
- {tabs_html}
-
-
-
Execution Plan
-
Could not generate execution plan for this run.
-
This may be a legacy effect-based run without a recipe, or the recipe is no longer available.
-
- '''
- return HTMLResponse(render_page_with_cytoscape(f"Plan: {run_id[:16]}...", content, ctx.actor_id, active_tab="runs"))
-
- # Build Cytoscape nodes and edges from plan
- nodes = []
- edges = []
- steps = plan_data.get("steps", [])
-
- for step in steps:
- node_type = step.get("node_type", "EFFECT")
- color = NODE_COLORS.get(node_type, NODE_COLORS["default"])
- step_id = step.get("step_id", "")
- cache_id = step.get("cache_id", "")
-
- # Check if this step's output exists in cache (completed)
- # For completed runs, check the actual cache
- has_cached = cache_manager.has_content(cache_id) if cache_id else False
-
- if has_cached:
- status = "cached"
- elif run.status == "completed":
- # Run completed but this step not in cache - still mark as done
- status = "cached"
- elif run.status == "running":
- status = "running"
- else:
- status = "pending"
-
- # Use human-readable name if available, otherwise short step_id
- step_name = step.get("name", "")
- if step_name:
- # Use last part of dotted name for label
- label_parts = step_name.split(".")
- label = label_parts[-1] if label_parts else step_name
- else:
- label = step_id[:12] + "..." if len(step_id) > 12 else step_id
-
- nodes.append({
- "data": {
- "id": step_id,
- "label": label,
- "name": step_name,
- "nodeType": node_type,
- "level": step.get("level", 0),
- "cacheId": cache_id,
- "status": status,
- "color": color,
- "config": step.get("config"),
- "hasCached": has_cached,
- }
- })
-
- # Build edges from the full plan JSON if available
- if "plan_json" in plan_data:
- try:
- full_plan = json.loads(plan_data["plan_json"])
- for step in full_plan.get("steps", []):
- step_id = step.get("step_id", "")
- for input_step in step.get("input_steps", []):
- edges.append({
- "data": {
- "source": input_step,
- "target": step_id
- }
- })
- except json.JSONDecodeError:
- pass
- else:
- # Build edges directly from steps
- for step in steps:
- step_id = step.get("step_id", "")
- for input_step in step.get("input_steps", []):
- edges.append({
- "data": {
- "source": input_step,
- "target": step_id
- }
- })
-
- nodes_json = json.dumps(nodes)
- edges_json = json.dumps(edges)
-
- dag_html = render_dag_cytoscape(nodes_json, edges_json, run_id=run_id, initial_node=node or "")
-
- # Stats summary - count from built nodes to reflect actual execution status
- total = len(nodes)
- cached_count = sum(1 for n in nodes if n["data"]["status"] == "cached")
- completed_count = sum(1 for n in nodes if n["data"]["status"] == "completed")
- running_count = sum(1 for n in nodes if n["data"]["status"] == "running")
- pending_count = total - cached_count - completed_count - running_count
-
- # Plan name for display
- plan_name = plan_data.get("recipe", run.recipe.replace("recipe:", ""))
-
- content = f'''
-
-
- Back to runs
-
-
- {tabs_html}
-
-
- '''
-
- # Add collapsible Plan JSON section
- # Parse nested plan_json if present (it's double-encoded as a string)
- display_plan = plan_data.copy()
- if "plan_json" in display_plan and isinstance(display_plan["plan_json"], str):
- try:
- display_plan["plan_json"] = json.loads(display_plan["plan_json"])
- except json.JSONDecodeError:
- pass
- plan_json_str = json.dumps(display_plan, indent=2)
- # Escape HTML entities in JSON
- plan_json_str = plan_json_str.replace("&", "&").replace("<", "<").replace(">", ">")
- content += f'''
-
-
-
- Show Plan JSON
-
-
-
{plan_json_str}
-
-
-
- '''
-
- return HTMLResponse(render_page_with_cytoscape(f"Plan: {run_id[:16]}...", content, ctx.actor_id, active_tab="runs"))
-
-
-@app.get("/run/{run_id}/analysis", response_class=HTMLResponse)
-async def run_analysis_page(run_id: str, request: Request):
- """Show analysis results for run inputs."""
- ctx = await get_user_context_from_cookie(request)
- if not ctx:
- content = '
Not logged in.
'
- return HTMLResponse(render_page("Login Required", content, None, active_tab="runs"), status_code=401)
-
- run = await asyncio.to_thread(load_run, run_id)
- if not run:
- content = f'
Run not found: {run_id}
'
- return HTMLResponse(render_page("Not Found", content, ctx.actor_id, active_tab="runs"), status_code=404)
-
- # Check user owns this run
- if run.username not in (ctx.username, ctx.actor_id):
- content = '
Access denied.
'
- return HTMLResponse(render_page("Access Denied", content, ctx.actor_id, active_tab="runs"), status_code=403)
-
- tabs_html = render_run_sub_tabs(run_id, active="analysis")
-
- # Load analysis results for each input
- analysis_html = ""
- ANALYSIS_CACHE_DIR.mkdir(parents=True, exist_ok=True)
-
- for i, input_hash in enumerate(run.inputs):
- analysis_path = ANALYSIS_CACHE_DIR / f"{input_hash}.json"
- analysis_data = None
-
- if analysis_path.exists():
- try:
- with open(analysis_path) as f:
- analysis_data = json.load(f)
- except (json.JSONDecodeError, IOError):
- pass
-
- input_name = f"Input {i + 1}"
-
- if analysis_data:
- tempo = analysis_data.get("tempo", "N/A")
- if isinstance(tempo, float):
- tempo = f"{tempo:.1f}"
- beat_times = analysis_data.get("beat_times", [])
- beat_count = len(beat_times)
- energy = analysis_data.get("energy")
-
- # Beat visualization (simple bar chart showing beat positions)
- beat_bars = ""
- if beat_times and len(beat_times) > 0:
- # Show first 50 beats as vertical bars
- display_beats = beat_times[:50]
- max_time = max(display_beats) if display_beats else 1
- for bt in display_beats:
- # Normalize to percentage
- pos = (bt / max_time) * 100 if max_time > 0 else 0
- beat_bars += f''
-
- energy_bar = ""
- if energy is not None:
- try:
- energy_pct = min(float(energy) * 100, 100)
- energy_bar = f'''
-
- {analysis_html}
- '''
-
- return HTMLResponse(render_page(f"Analysis: {run_id[:16]}...", content, ctx.actor_id, active_tab="runs"))
-
-
-@app.get("/run/{run_id}/artifacts", response_class=HTMLResponse)
-async def run_artifacts_page(run_id: str, request: Request):
- """Show all cached artifacts produced by this run."""
- ctx = await get_user_context_from_cookie(request)
- if not ctx:
- content = '
Not logged in.
'
- return HTMLResponse(render_page("Login Required", content, None, active_tab="runs"), status_code=401)
-
- run = await asyncio.to_thread(load_run, run_id)
- if not run:
- content = f'
Run not found: {run_id}
'
- return HTMLResponse(render_page("Not Found", content, ctx.actor_id, active_tab="runs"), status_code=404)
-
- # Check user owns this run
- if run.username not in (ctx.username, ctx.actor_id):
- content = '
- '''
-
- return HTMLResponse(render_page("Runs", content, ctx.actor_id, active_tab="runs"))
-
- # JSON response for APIs
- return {
- "runs": [r.model_dump() for r in runs_page],
- "pagination": {
- "page": page,
- "limit": limit,
- "total": total,
- "has_more": has_more
- }
- }
-
-
-# ============ Recipe Endpoints ============
-
-@app.post("/recipes/upload")
-async def upload_recipe(file: UploadFile = File(...), ctx: UserContext = Depends(get_required_user_context)):
- """Upload a recipe YAML file. Requires authentication."""
- import tempfile
-
- # Read file content
- content = await file.read()
- try:
- yaml_content = content.decode('utf-8')
- except UnicodeDecodeError:
- raise HTTPException(400, "Recipe file must be valid UTF-8 text")
-
- # Validate YAML
- try:
- yaml.safe_load(yaml_content)
- except yaml.YAMLError as e:
- raise HTTPException(400, f"Invalid YAML: {e}")
-
- # Store YAML file in cache
- with tempfile.NamedTemporaryFile(delete=False, suffix=".yaml") as tmp:
- tmp.write(content)
- tmp_path = Path(tmp.name)
-
- cached, ipfs_cid = cache_manager.put(tmp_path, node_type="recipe", move=True)
- recipe_hash = cached.cid
-
- # Parse and save metadata
- actor_id = ctx.actor_id
- try:
- recipe_status = parse_recipe_yaml(yaml_content, recipe_hash, actor_id)
- except Exception as e:
- raise HTTPException(400, f"Failed to parse recipe: {e}")
-
- await asyncio.to_thread(save_recipe, recipe_status)
-
- # Save cache metadata to database
- await database.save_item_metadata(
- cid=recipe_hash,
- actor_id=actor_id,
- item_type="recipe",
- filename=file.filename,
- description=recipe_status.name # Use recipe name as description
- )
-
- return {
- "recipe_id": recipe_hash,
- "name": recipe_status.name,
- "version": recipe_status.version,
- "variable_inputs": len(recipe_status.variable_inputs),
- "fixed_inputs": len(recipe_status.fixed_inputs)
- }
-
-
-@app.get("/recipes")
-async def list_recipes_api(request: Request, page: int = 1, limit: int = 20):
- """List recipes. HTML for browsers, JSON for APIs."""
- ctx = await get_user_context_from_cookie(request)
-
- all_recipes = await asyncio.to_thread(list_all_recipes)
-
- if wants_html(request):
- # HTML response
- if not ctx:
- return HTMLResponse(render_page(
- "Recipes",
- '
Not logged in.
',
- None,
- active_tab="recipes"
- ))
-
- # Filter to user's recipes
- user_recipes = [c for c in all_recipes if c.uploader in (ctx.username, ctx.actor_id)]
- total = len(user_recipes)
-
- if not user_recipes:
- content = '''
-
Recipes (0)
-
No recipes yet. Upload a recipe YAML file to get started.
- '''
- return HTMLResponse(render_page("Recipes", content, ctx.actor_id, active_tab="recipes"))
-
- html_parts = []
- for recipe in user_recipes:
- var_count = len(recipe.variable_inputs)
- fixed_count = len(recipe.fixed_inputs)
- input_info = []
- if var_count:
- input_info.append(f"{var_count} variable")
- if fixed_count:
- input_info.append(f"{fixed_count} fixed")
- inputs_str = ", ".join(input_info) if input_info else "no inputs"
-
- html_parts.append(f'''
-
-
Click on a node to see its configuration. The purple-bordered node is the output.
-
- {dag_html}
-
- '''
-
- return HTMLResponse(render_page_with_cytoscape(f"DAG: {recipe.name}", content, ctx.actor_id if ctx else None, active_tab="recipes"))
-
-
-@app.post("/ui/recipes/{recipe_id}/run", response_class=HTMLResponse)
-async def ui_run_recipe(recipe_id: str, request: Request):
- """HTMX handler: run a recipe with form inputs using 3-phase orchestration."""
- ctx = await get_user_context_from_cookie(request)
- if not ctx:
- return '
Login required
'
-
- recipe = load_recipe(recipe_id)
- if not recipe:
- return '
Recipe not found
'
-
- # Parse form data
- form_data = await request.form()
- input_hashes = {}
- for var_input in recipe.variable_inputs:
- value = form_data.get(var_input.node_id, "").strip()
- if var_input.required and not value:
- return f'
Missing required input: {var_input.name}
'
- if value:
- input_hashes[var_input.node_id] = value
-
- # Load recipe YAML
- recipe_path = cache_manager.get_by_cid(recipe_id)
- if not recipe_path:
- return '
- '''
- except Exception as e:
- logger.error(f"Recipe run failed: {e}")
- return f'
Error: {str(e)}
'
-
-
-@app.get("/ui/recipes-list", response_class=HTMLResponse)
-async def ui_recipes_list(request: Request):
- """HTMX partial: list of recipes."""
- ctx = await get_user_context_from_cookie(request)
-
- if not ctx:
- return '
Not logged in.
'
-
- all_recipes = list_all_recipes()
-
- # Filter to user's recipes
- user_recipes = [c for c in all_recipes if c.uploader in (ctx.username, ctx.actor_id)]
-
- if not user_recipes:
- return '
No recipes yet. Upload a recipe YAML file to get started.
'
-
- html_parts = ['
']
- for recipe in user_recipes:
- var_count = len(recipe.variable_inputs)
- fixed_count = len(recipe.fixed_inputs)
- input_info = []
- if var_count:
- input_info.append(f"{var_count} variable")
- if fixed_count:
- input_info.append(f"{fixed_count} fixed")
- inputs_str = ", ".join(input_info) if input_info else "no inputs"
-
- html_parts.append(f'''
-
-
- '''
-
-
-@app.get("/cache/{cid}")
-async def get_cached(cid: str, request: Request):
- """Get cached content by hash. Content negotiation: HTML for browsers, JSON for APIs, file for downloads."""
- start = time.time()
- accept = request.headers.get("accept", "")
- logger.info(f"get_cached: {cid[:16]}... Accept={accept[:50]}")
-
- ctx = await get_user_context_from_cookie(request)
- cache_path = get_cache_path(cid)
-
- if not cache_path:
- logger.info(f"get_cached: Not found, took {time.time()-start:.3f}s")
- if wants_html(request):
- content = f'
Content not found: {cid}
'
- return HTMLResponse(render_page("Not Found", content, ctx.actor_id if ctx else None, active_tab="media"), status_code=404)
- raise HTTPException(404, f"Content {cid} not in cache")
-
- # JSON response only if explicitly requested
- if "application/json" in accept and "text/html" not in accept:
- t0 = time.time()
- meta = await database.load_item_metadata(cid, ctx.actor_id if ctx else None)
- logger.debug(f"get_cached: load_item_metadata took {time.time()-t0:.3f}s")
-
- t0 = time.time()
- cache_item = await database.get_cache_item(cid)
- logger.debug(f"get_cached: get_cache_item took {time.time()-t0:.3f}s")
-
- ipfs_cid = cache_item.get("ipfs_cid") if cache_item else None
- file_size = cache_path.stat().st_size
- # Use stored type from metadata, fall back to auto-detection
- stored_type = meta.get("type") if meta else None
- if stored_type == "recipe":
- media_type = "recipe"
- else:
- media_type = detect_media_type(cache_path)
- logger.info(f"get_cached: JSON response, ipfs_cid={ipfs_cid[:16] if ipfs_cid else 'None'}..., took {time.time()-start:.3f}s")
- return {
- "cid": cid,
- "size": file_size,
- "media_type": media_type,
- "ipfs_cid": ipfs_cid,
- "meta": meta
- }
-
- # HTML response for browsers (default for all non-JSON requests)
- # Raw data is only served from /cache/{hash}/raw endpoint
- if True: # Always show HTML page, raw data via /raw endpoint
- if not ctx:
- content = '
Not logged in.
'
- return HTMLResponse(render_page("Login Required", content, None, active_tab="media"), status_code=401)
-
- # Check user has access
- user_hashes = await get_user_cache_hashes(ctx.username, ctx.actor_id)
- if cid not in user_hashes:
- content = '
'
-
- token = request.cookies.get("auth_token")
- if not token:
- return '
Auth token required
'
-
- # Check ownership
- user_hashes = await get_user_cache_hashes(ctx.username, ctx.actor_id)
- if cid not in user_hashes:
- return '
Access denied
'
-
- # Parse form
- form = await request.form()
- asset_name = form.get("asset_name", "").strip()
- asset_type = form.get("asset_type", "image")
-
- if not asset_name:
- return '
Asset name required
'
-
- # Load metadata from database
- meta = await database.load_item_metadata(cid, ctx.actor_id)
- origin = meta.get("origin")
-
- if not origin or "type" not in origin:
- return '
- '''
-
- return HTMLResponse(render_page("Media", content, ctx.actor_id, active_tab="media"))
-
- # JSON response for APIs - list all hashes with optional pagination
- all_hashes = [cf.cid for cf in cache_manager.list_all()]
- total = len(all_hashes)
- start = (page - 1) * limit
- end = start + limit
- hashes_page = all_hashes[start:end]
- has_more = end < total
-
- return {
- "hashes": hashes_page,
- "pagination": {
- "page": page,
- "limit": limit,
- "total": total,
- "has_more": has_more
- }
- }
-
-
-@app.delete("/cache/{cid}")
-async def discard_cache(cid: str, ctx: UserContext = Depends(get_required_user_context)):
- """
- Discard (delete) a cached item.
-
- Enforces deletion rules:
- - Cannot delete items published to L2 (shared)
- - Cannot delete inputs/outputs of activities (runs)
- - Cannot delete pinned items
- """
- # Check if content exists
- if not cache_manager.has_content(cid):
- raise HTTPException(404, "Content not found")
-
- # Check ownership
- user_hashes = await get_user_cache_hashes(ctx.username, ctx.actor_id)
- if cid not in user_hashes:
- raise HTTPException(403, "Access denied")
-
- # Check if pinned
- meta = await database.load_item_metadata(cid, ctx.actor_id)
- if meta.get("pinned"):
- pin_reason = meta.get("pin_reason", "unknown")
- raise HTTPException(400, f"Cannot discard pinned item (reason: {pin_reason})")
-
- # Check if used by any run (Redis runs, not just activity store)
- runs_using = await asyncio.to_thread(find_runs_using_content, cid)
- if runs_using:
- run, role = runs_using[0]
- raise HTTPException(400, f"Cannot discard: item is {role} of run {run.run_id}")
-
- # Check deletion rules via cache_manager (L2 shared status, activity store)
- can_delete, reason = await asyncio.to_thread(cache_manager.can_delete, cid)
- if not can_delete:
- raise HTTPException(400, f"Cannot discard: {reason}")
-
- # Delete via cache_manager
- success, msg = await asyncio.to_thread(cache_manager.delete_by_cid, cid)
- if not success:
- # Fallback to legacy deletion
- cache_path = get_cache_path(cid)
- if cache_path and cache_path.exists():
- cache_path.unlink()
-
- # Clean up legacy metadata files
- meta_path = CACHE_DIR / f"{cid}.meta.json"
- if meta_path.exists():
- meta_path.unlink()
- mp4_path = CACHE_DIR / f"{cid}.mp4"
- if mp4_path.exists():
- mp4_path.unlink()
-
- return {"discarded": True, "cid": cid}
-
-
-@app.delete("/ui/cache/{cid}/discard", response_class=HTMLResponse)
-async def ui_discard_cache(cid: str, request: Request):
- """HTMX handler: discard a cached item."""
- ctx = await get_user_context_from_cookie(request)
- if not ctx:
- return '
Login required
'
-
- # Check ownership
- user_hashes = await get_user_cache_hashes(ctx.username, ctx.actor_id)
- if cid not in user_hashes:
- return '
Access denied
'
-
- # Check if content exists
- has_content = await asyncio.to_thread(cache_manager.has_content, cid)
- if not has_content:
- return '
Content not found
'
-
- # Check if pinned
- meta = await database.load_item_metadata(cid, ctx.actor_id)
- if meta.get("pinned"):
- pin_reason = meta.get("pin_reason", "unknown")
- return f'
Cannot discard: item is pinned ({pin_reason})
'
-
- # Check if used by any run (Redis runs, not just activity store)
- runs_using = await asyncio.to_thread(find_runs_using_content, cid)
- if runs_using:
- run, role = runs_using[0]
- return f'
Cannot discard: item is {role} of run {run.run_id}
'
-
- # Check deletion rules via cache_manager (L2 shared status, activity store)
- can_delete, reason = await asyncio.to_thread(cache_manager.can_delete, cid)
- if not can_delete:
- return f'
Cannot discard: {reason}
'
-
- # Delete via cache_manager
- success, msg = await asyncio.to_thread(cache_manager.delete_by_cid, cid)
- if not success:
- # Fallback to legacy deletion
- cache_path = get_cache_path(cid)
- if cache_path and cache_path.exists():
- cache_path.unlink()
-
- # Clean up legacy metadata files
- meta_path = CACHE_DIR / f"{cid}.meta.json"
- if meta_path.exists():
- meta_path.unlink()
- mp4_path = CACHE_DIR / f"{cid}.mp4"
- if mp4_path.exists():
- mp4_path.unlink()
-
- return '''
-
-
-
-"""
-
-
-def render_page(title: str, content: str, actor_id: Optional[str] = None, active_tab: str = None) -> str:
- """Render a page with nav bar and content. Used for clean URL pages.
-
- actor_id: The user's actor ID (@user@server) or None if not logged in.
- """
- user_info = ""
- if actor_id:
- # Extract username and domain from @username@domain format
- parts = actor_id.lstrip("@").split("@")
- username = parts[0] if parts else actor_id
- domain = parts[1] if len(parts) > 1 else ""
- l2_user_url = f"https://{domain}/users/{username}" if domain else "#"
- user_info = f'''
-
-
-
-"""
-
-
-def render_ui_html(actor_id: Optional[str] = None, tab: str = "runs") -> str:
- """Render main UI HTML with optional user context.
-
- actor_id: The user's actor ID (@user@server) or None if not logged in.
- """
- user_info = ""
- if actor_id:
- # Extract username and domain from @username@domain format
- parts = actor_id.lstrip("@").split("@")
- username = parts[0] if parts else actor_id
- domain = parts[1] if len(parts) > 1 else ""
- l2_user_url = f"https://{domain}/users/{username}" if domain else "#"
- user_info = f'''
-
-
-
-"""
-
-
-# Auth - L1 doesn't handle login (user logs in at their L2 server)
-# Token can be passed via URL from L2 redirect, then L1 sets its own cookie
-
-@app.get("/auth")
-async def auth_callback(auth_token: str = None):
- """
- Receive auth token from L2 redirect and set local cookie.
- This enables cross-subdomain auth on iOS Safari which blocks shared cookies.
- """
- if not auth_token:
- return RedirectResponse(url="/", status_code=302)
-
- # Verify the token is valid
- ctx = await get_verified_user_context(auth_token)
- if not ctx:
- return RedirectResponse(url="/", status_code=302)
-
- # Register token for this user (for revocation by username later)
- register_user_token(ctx.username, auth_token)
-
- # Set local first-party cookie and redirect to home
- response = RedirectResponse(url="/runs", status_code=302)
- response.set_cookie(
- key="auth_token",
- value=auth_token,
- httponly=True,
- max_age=60 * 60 * 24 * 30, # 30 days
- samesite="lax",
- secure=True
- )
- return response
-
-
-@app.get("/logout")
-async def logout():
- """Logout - clear local cookie and redirect to home."""
- response = RedirectResponse(url="/", status_code=302)
- response.delete_cookie("auth_token")
- return response
-
-
-@app.post("/auth/revoke")
-async def auth_revoke(credentials: HTTPAuthorizationCredentials = Depends(security)):
- """
- Revoke a token. Called by L2 when user logs out.
- The token to revoke is passed in the Authorization header.
- """
- if not credentials:
- raise HTTPException(401, "No token provided")
-
- token = credentials.credentials
-
- # Verify token is valid before revoking (ensures caller has the token)
- ctx = get_user_context_from_token(token)
- if not ctx:
- raise HTTPException(401, "Invalid token")
-
- # Revoke the token
- newly_revoked = revoke_token(token)
-
- return {"revoked": True, "newly_revoked": newly_revoked}
-
-
-class RevokeUserRequest(BaseModel):
- username: str
- l2_server: str # L2 server requesting the revocation
-
-
-@app.post("/auth/revoke-user")
-async def auth_revoke_user(request: RevokeUserRequest):
- """
- Revoke all tokens for a user. Called by L2 when user logs out.
- This handles the case where L2 issued scoped tokens that differ from L2's own token.
- """
- # Verify the L2 server is authorized (must be in L1's known list or match token's l2_server)
- # For now, we trust any request since this only affects users already on this L1
-
- # Revoke all tokens registered for this user
- count = revoke_all_user_tokens(request.username)
-
- return {"revoked": True, "tokens_revoked": count, "username": request.username}
-
-
-@app.post("/ui/publish-run/{run_id}", response_class=HTMLResponse)
-async def ui_publish_run(run_id: str, request: Request):
- """Publish a run to L2 from the web UI. Assets are named by cid."""
- ctx = await get_user_context_from_cookie(request)
- if not ctx:
- return HTMLResponse('
Not logged in
')
-
- token = request.cookies.get("auth_token")
- if not token:
- return HTMLResponse('
Not logged in
')
-
- # Get the run to pin its output and inputs
- run = load_run(run_id)
- if not run:
- return HTMLResponse('
Run not found
')
-
- # Call L2 to publish the run, including this L1's public URL
- # Assets are named by their cid - no output_name needed
- l2_server = ctx.l2_server
- try:
- resp = http_requests.post(
- f"{l2_server}/assets/record-run",
- json={"run_id": run_id, "l1_server": L1_PUBLIC_URL},
- headers={"Authorization": f"Bearer {token}"},
- timeout=30
- )
- if resp.status_code == 400:
- error = resp.json().get("detail", "Bad request")
- return HTMLResponse(f'
Error: {error}
')
- resp.raise_for_status()
- result = resp.json()
-
- # Pin the output and record L2 share
- if run.output_cid and result.get("asset"):
- await database.update_item_metadata(run.output_cid, ctx.actor_id, pinned=True, pin_reason="published")
- # Record L2 share so UI shows published status
- cache_path = get_cache_path(run.output_cid)
- media_type = detect_media_type(cache_path) if cache_path else "image"
- content_type = "video" if media_type == "video" else "image"
- # Get activity_id for linking to the published run
- activity = result.get("activity")
- activity_id = activity.get("activity_id") if activity else None
- await database.save_l2_share(
- cid=run.output_cid,
- actor_id=ctx.actor_id,
- l2_server=l2_server,
- asset_name=result["asset"]["name"],
- content_type=content_type,
- activity_id=activity_id
- )
-
- # Pin the inputs (for provenance)
- for input_hash in run.inputs:
- await database.update_item_metadata(input_hash, ctx.actor_id, pinned=True, pin_reason="input_to_published")
-
- # If this was a recipe-based run, pin the recipe and its fixed inputs
- if run.recipe.startswith("recipe:"):
- config_name = run.recipe.replace("recipe:", "")
- for recipe in list_all_recipes():
- if recipe.name == config_name:
- # Pin the recipe YAML
- cache_manager.pin(recipe.recipe_id, reason="recipe_for_published")
- # Pin all fixed inputs referenced by the recipe
- for fixed in recipe.fixed_inputs:
- if fixed.cid:
- cache_manager.pin(fixed.cid, reason="fixed_input_in_published_recipe")
- break
-
- # Use HTTPS for L2 links
- l2_https = l2_server.replace("http://", "https://")
- asset_name = result["asset"]["name"]
- short_name = asset_name[:16] + "..." if len(asset_name) > 20 else asset_name
- # Link to activity (the published run) rather than just the asset
- activity = result.get("activity")
- activity_id = activity.get("activity_id") if activity else None
- l2_link = f"{l2_https}/activities/{activity_id}" if activity_id else f"{l2_https}/assets/{asset_name}"
- return HTMLResponse(f'''
-
')
- except Exception as e:
- return HTMLResponse(f'
Error: {e}
')
-
-
-@app.get("/ui/runs", response_class=HTMLResponse)
-async def ui_runs(request: Request):
- """HTMX partial: list of runs."""
- ctx = await get_user_context_from_cookie(request)
- runs = list_all_runs()
-
- # Require login to see runs
- if not ctx:
- return '
Not logged in.
'
-
- # Filter runs by user - match both plain username and ActivityPub format (@user@domain)
- runs = [r for r in runs if r.username in (ctx.username, ctx.actor_id)]
-
- if not runs:
- return '
']
-
- for run in runs[:20]: # Limit to 20 most recent
- status_badge = status_colors.get(run.status, "bg-gray-600 text-white")
-
- html_parts.append(f'''
-
-
')
- return '\n'.join(html_parts)
-
-
-@app.get("/ui/media-list", response_class=HTMLResponse)
-async def ui_media_list(
- request: Request,
- folder: Optional[str] = None,
- collection: Optional[str] = None,
- tag: Optional[str] = None
-):
- """HTMX partial: list of media items with optional filtering."""
- ctx = await get_user_context_from_cookie(request)
-
- # Require login to see media
- if not ctx:
- return '
Not logged in.
'
-
- # Get hashes owned by/associated with this user
- user_hashes = await get_user_cache_hashes(ctx.username, ctx.actor_id)
-
- # Get cache items that belong to the user (from cache_manager)
- cache_items = []
- seen_hashes = set() # Deduplicate by cid
- for cached_file in cache_manager.list_all():
- cid = cached_file.cid
- if cid not in user_hashes:
- continue
-
- # Skip duplicates (same content from multiple runs)
- if cid in seen_hashes:
- continue
- seen_hashes.add(cid)
-
- # Skip recipes - they have their own section
- if cached_file.node_type == "recipe":
- continue
-
- # Load metadata for filtering
- meta = await database.load_item_metadata(cid, ctx.actor_id)
-
- # Apply folder filter
- if folder:
- item_folder = meta.get("folder", "/")
- if folder != "/" and not item_folder.startswith(folder):
- continue
- if folder == "/" and item_folder != "/":
- continue
-
- # Apply collection filter
- if collection:
- if collection not in meta.get("collections", []):
- continue
-
- # Apply tag filter
- if tag:
- if tag not in meta.get("tags", []):
- continue
-
- cache_items.append({
- "hash": cid,
- "size": cached_file.size_bytes,
- "mtime": cached_file.created_at,
- "meta": meta
- })
-
- # Sort by modification time (newest first)
- cache_items.sort(key=lambda x: x["mtime"], reverse=True)
-
- if not cache_items:
- filter_msg = ""
- if folder:
- filter_msg = f" in folder {folder}"
- elif collection:
- filter_msg = f" in collection '{collection}'"
- elif tag:
- filter_msg = f" with tag '{tag}'"
- return f'
No cached files{filter_msg}. Upload files or run effects to see them here.
'
- return html
-
-
-# ============ User Storage Configuration ============
-
-STORAGE_PROVIDERS_INFO = {
- "pinata": {"name": "Pinata", "desc": "1GB free, IPFS pinning", "color": "blue"},
- "web3storage": {"name": "web3.storage", "desc": "IPFS + Filecoin", "color": "green"},
- "nftstorage": {"name": "NFT.Storage", "desc": "Free for NFTs", "color": "pink"},
- "infura": {"name": "Infura IPFS", "desc": "5GB free", "color": "orange"},
- "filebase": {"name": "Filebase", "desc": "5GB free, S3+IPFS", "color": "cyan"},
- "storj": {"name": "Storj", "desc": "25GB free", "color": "indigo"},
- "local": {"name": "Local Storage", "desc": "Your own disk", "color": "purple"},
-}
-
-
-@app.get("/storage")
-async def list_storage(request: Request):
- """List user's storage providers. HTML for browsers, JSON for API."""
- accept = request.headers.get("accept", "")
- wants_json = "application/json" in accept and "text/html" not in accept
-
- ctx = await get_user_context_from_cookie(request)
- if not ctx:
- if wants_json:
- raise HTTPException(401, "Authentication required")
- return RedirectResponse(url="/auth", status_code=302)
-
- storages = await database.get_user_storage(ctx.actor_id)
-
- # Add usage stats to each storage
- for storage in storages:
- usage = await database.get_storage_usage(storage["id"])
- storage["used_bytes"] = usage["used_bytes"]
- storage["pin_count"] = usage["pin_count"]
- storage["donated_gb"] = storage["capacity_gb"] // 2
- # Mask sensitive config keys for display
- if storage.get("config"):
- config = storage["config"] if isinstance(storage["config"], dict) else json.loads(storage["config"])
- masked = {}
- for k, v in config.items():
- if "key" in k.lower() or "token" in k.lower() or "secret" in k.lower():
- masked[k] = v[:4] + "..." + v[-4:] if len(str(v)) > 8 else "****"
- else:
- masked[k] = v
- storage["config_display"] = masked
-
- if wants_json:
- return {"storages": storages}
-
- return await ui_storage_page(ctx.username, storages, request)
-
-
-@app.post("/storage")
-async def add_storage(req: AddStorageRequest, ctx: UserContext = Depends(get_required_user_context)):
- """Add a storage provider."""
- valid_types = ["pinata", "web3storage", "nftstorage", "infura", "filebase", "storj", "local"]
- if req.provider_type not in valid_types:
- raise HTTPException(400, f"Invalid provider type: {req.provider_type}")
-
- # Test the provider connection before saving
- provider = storage_providers.create_provider(req.provider_type, {
- **req.config,
- "capacity_gb": req.capacity_gb
- })
- if not provider:
- raise HTTPException(400, "Failed to create provider with given config")
-
- success, message = await provider.test_connection()
- if not success:
- raise HTTPException(400, f"Provider connection failed: {message}")
-
- # Save to database
- provider_name = req.provider_name or f"{req.provider_type}-{ctx.username}"
- storage_id = await database.add_user_storage(
- actor_id=ctx.actor_id,
- provider_type=req.provider_type,
- provider_name=provider_name,
- config=req.config,
- capacity_gb=req.capacity_gb
- )
-
- if not storage_id:
- raise HTTPException(500, "Failed to save storage provider")
-
- return {"id": storage_id, "message": f"Storage provider added: {provider_name}"}
-
-
-@app.post("/storage/add")
-async def add_storage_form(
- request: Request,
- provider_type: str = Form(...),
- provider_name: Optional[str] = Form(None),
- description: Optional[str] = Form(None),
- capacity_gb: int = Form(5),
- api_key: Optional[str] = Form(None),
- secret_key: Optional[str] = Form(None),
- api_token: Optional[str] = Form(None),
- project_id: Optional[str] = Form(None),
- project_secret: Optional[str] = Form(None),
- access_key: Optional[str] = Form(None),
- bucket: Optional[str] = Form(None),
- path: Optional[str] = Form(None),
-):
- """Add a storage provider via HTML form (cookie auth)."""
- ctx = await get_user_context_from_cookie(request)
- if not ctx:
- return HTMLResponse('
Not authenticated
', status_code=401)
-
- valid_types = ["pinata", "web3storage", "nftstorage", "infura", "filebase", "storj", "local"]
- if provider_type not in valid_types:
- return HTMLResponse(f'
Invalid provider type: {provider_type}
')
-
- # Build config based on provider type
- config = {}
- if provider_type == "pinata":
- if not api_key or not secret_key:
- return HTMLResponse('
Pinata requires API Key and Secret Key
')
- config = {"api_key": api_key, "secret_key": secret_key}
- elif provider_type == "web3storage":
- if not api_token:
- return HTMLResponse('
web3.storage requires API Token
')
- config = {"api_token": api_token}
- elif provider_type == "nftstorage":
- if not api_token:
- return HTMLResponse('
NFT.Storage requires API Token
')
- config = {"api_token": api_token}
- elif provider_type == "infura":
- if not project_id or not project_secret:
- return HTMLResponse('
Infura requires Project ID and Project Secret
')
- config = {"project_id": project_id, "project_secret": project_secret}
- elif provider_type == "filebase":
- if not access_key or not secret_key or not bucket:
- return HTMLResponse('
Filebase requires Access Key, Secret Key, and Bucket
')
- config = {"access_key": access_key, "secret_key": secret_key, "bucket": bucket}
- elif provider_type == "storj":
- if not access_key or not secret_key or not bucket:
- return HTMLResponse('
-
- '''
-
- # Total stats
- total_capacity = sum(s["capacity_gb"] for s in storages)
- total_used = sum(s["used_bytes"] for s in storages)
- total_pins = sum(s["pin_count"] for s in storages)
-
- html = f'''
-
-
-
- Storage - Art DAG L1
-
-
-
-
-
-
-
-