diff --git a/server.py b/server.py index 0363573..a8b64f4 100644 --- a/server.py +++ b/server.py @@ -1212,6 +1212,9 @@ async def run_detail(run_id: str, request: Request):
{run.completed_at[:19].replace('T', ' ')}
''' + # Sub-navigation tabs for run detail pages + sub_tabs_html = render_run_sub_tabs(run_id, active="overview") + content = f''' @@ -1220,6 +1223,8 @@ async def run_detail(run_id: str, request: Request): Back to runs + {sub_tabs_html} +
@@ -1279,6 +1284,485 @@ async def run_detail(run_id: str, request: Request): return run.model_dump() +# Plan/Analysis cache directories (match tasks/orchestrate.py) +PLAN_CACHE_DIR = CACHE_DIR / 'plans' +ANALYSIS_CACHE_DIR = CACHE_DIR / 'analysis' + + +@app.get("/run/{run_id}/plan", response_class=HTMLResponse) +async def run_plan_visualization(run_id: str, request: Request): + """Visualize execution plan as interactive DAG.""" + ctx = await get_user_context_from_cookie(request) + if not ctx: + content = '

Not logged in.

' + return HTMLResponse(render_page("Login Required", content, None, active_tab="runs"), status_code=401) + + run = await asyncio.to_thread(load_run, run_id) + if not run: + content = f'

Run not found: {run_id}

' + return HTMLResponse(render_page("Not Found", content, ctx.actor_id, active_tab="runs"), status_code=404) + + # Check user owns this run + if run.username not in (ctx.username, ctx.actor_id): + content = '

Access denied.

' + return HTMLResponse(render_page("Access Denied", content, ctx.actor_id, active_tab="runs"), status_code=403) + + # Try to load existing plan from cache + plan_data = None + PLAN_CACHE_DIR.mkdir(parents=True, exist_ok=True) + + # Look for plan file matching this run + for plan_file in PLAN_CACHE_DIR.glob("*.json"): + try: + with open(plan_file) as f: + data = json.load(f) + # Check if this plan matches our run inputs + plan_inputs = data.get("input_hashes", {}) + if set(plan_inputs.values()) == set(run.inputs): + plan_data = data + break + except (json.JSONDecodeError, IOError): + continue + + # Build sub-navigation tabs + tabs_html = render_run_sub_tabs(run_id, active="plan") + + if not plan_data: + content = f''' + + + + + Back to runs + + + {tabs_html} + +
+

Execution Plan

+

No execution plan available for this run.

+

Plans are generated when using recipe-based runs with the v2 API.

+
+ ''' + return HTMLResponse(render_page_with_cytoscape(f"Plan: {run_id[:16]}...", content, ctx.actor_id, active_tab="runs")) + + # Build Cytoscape nodes and edges from plan + nodes = [] + edges = [] + steps = plan_data.get("steps", []) + + for step in steps: + node_type = step.get("node_type", "EFFECT") + color = NODE_COLORS.get(node_type, NODE_COLORS["default"]) + cached = step.get("cached", False) + status = "cached" if cached else "pending" + + # Shorter label for display + step_id = step.get("step_id", "") + label = step_id[:12] + "..." if len(step_id) > 12 else step_id + + nodes.append({ + "data": { + "id": step_id, + "label": label, + "nodeType": node_type, + "level": step.get("level", 0), + "cacheId": step.get("cache_id", ""), + "status": status, + "color": color, + "config": step.get("config") + } + }) + + # Build edges from the full plan JSON if available + if "plan_json" in plan_data: + try: + full_plan = json.loads(plan_data["plan_json"]) + for step in full_plan.get("steps", []): + step_id = step.get("step_id", "") + for input_step in step.get("input_steps", []): + edges.append({ + "data": { + "source": input_step, + "target": step_id + } + }) + except json.JSONDecodeError: + pass + + nodes_json = json.dumps(nodes) + edges_json = json.dumps(edges) + + dag_html = render_dag_cytoscape(nodes_json, edges_json) + + # Stats summary + total = plan_data.get("total_steps", len(steps)) + cached = plan_data.get("cached_steps", sum(1 for s in steps if s.get("cached"))) + pending = plan_data.get("pending_steps", total - cached) + + content = f''' + + + + + Back to runs + + + {tabs_html} + +
+

Execution Plan

+ +
+
+
{total}
+
Total Steps
+
+
+
{cached}
+
Cached
+
+
+
{pending}
+
Executed
+
+
+ +
+
+ + SOURCE + + + EFFECT + + + _LIST + + + Cached + +
+
+ + {dag_html} +
+ ''' + + return HTMLResponse(render_page_with_cytoscape(f"Plan: {run_id[:16]}...", content, ctx.actor_id, active_tab="runs")) + + +@app.get("/run/{run_id}/analysis", response_class=HTMLResponse) +async def run_analysis_page(run_id: str, request: Request): + """Show analysis results for run inputs.""" + ctx = await get_user_context_from_cookie(request) + if not ctx: + content = '

Not logged in.

' + return HTMLResponse(render_page("Login Required", content, None, active_tab="runs"), status_code=401) + + run = await asyncio.to_thread(load_run, run_id) + if not run: + content = f'

Run not found: {run_id}

' + return HTMLResponse(render_page("Not Found", content, ctx.actor_id, active_tab="runs"), status_code=404) + + # Check user owns this run + if run.username not in (ctx.username, ctx.actor_id): + content = '

Access denied.

' + return HTMLResponse(render_page("Access Denied", content, ctx.actor_id, active_tab="runs"), status_code=403) + + tabs_html = render_run_sub_tabs(run_id, active="analysis") + + # Load analysis results for each input + analysis_html = "" + ANALYSIS_CACHE_DIR.mkdir(parents=True, exist_ok=True) + + for i, input_hash in enumerate(run.inputs): + analysis_path = ANALYSIS_CACHE_DIR / f"{input_hash}.json" + analysis_data = None + + if analysis_path.exists(): + try: + with open(analysis_path) as f: + analysis_data = json.load(f) + except (json.JSONDecodeError, IOError): + pass + + input_name = f"Input {i + 1}" + + if analysis_data: + tempo = analysis_data.get("tempo", "N/A") + if isinstance(tempo, float): + tempo = f"{tempo:.1f}" + beat_times = analysis_data.get("beat_times", []) + beat_count = len(beat_times) + energy = analysis_data.get("energy") + + # Beat visualization (simple bar chart showing beat positions) + beat_bars = "" + if beat_times and len(beat_times) > 0: + # Show first 50 beats as vertical bars + display_beats = beat_times[:50] + max_time = max(display_beats) if display_beats else 1 + for bt in display_beats: + # Normalize to percentage + pos = (bt / max_time) * 100 if max_time > 0 else 0 + beat_bars += f'
' + + energy_bar = "" + if energy is not None: + try: + energy_pct = min(float(energy) * 100, 100) + energy_bar = f''' +
+
Energy Level
+
+
+
+
{energy_pct:.1f}%
+
+ ''' + except (TypeError, ValueError): + pass + + analysis_html += f''' +
+
+
+

{input_name}

+ {input_hash[:24]}... +
+ Analyzed +
+ +
+
+
{tempo}
+
BPM (Tempo)
+
+
+
{beat_count}
+
Beats Detected
+
+
+ + {energy_bar} + +
+
Beat Timeline (first 50 beats)
+
+
+ {beat_bars if beat_bars else 'No beats detected'} +
+
+
+
+ ''' + else: + analysis_html += f''' +
+
+
+

{input_name}

+ {input_hash[:24]}... +
+ Not Analyzed +
+

No analysis data available for this input.

+

Analysis is performed when using recipe-based runs.

+
+ ''' + + if not run.inputs: + analysis_html = '

No inputs found for this run.

' + + content = f''' + + + + + Back to runs + + + {tabs_html} + +

Analysis Results

+ {analysis_html} + ''' + + return HTMLResponse(render_page(f"Analysis: {run_id[:16]}...", content, ctx.actor_id, active_tab="runs")) + + +@app.get("/run/{run_id}/artifacts", response_class=HTMLResponse) +async def run_artifacts_page(run_id: str, request: Request): + """Show all cached artifacts produced by this run.""" + ctx = await get_user_context_from_cookie(request) + if not ctx: + content = '

Not logged in.

' + return HTMLResponse(render_page("Login Required", content, None, active_tab="runs"), status_code=401) + + run = await asyncio.to_thread(load_run, run_id) + if not run: + content = f'

Run not found: {run_id}

' + return HTMLResponse(render_page("Not Found", content, ctx.actor_id, active_tab="runs"), status_code=404) + + # Check user owns this run + if run.username not in (ctx.username, ctx.actor_id): + content = '

Access denied.

' + return HTMLResponse(render_page("Access Denied", content, ctx.actor_id, active_tab="runs"), status_code=403) + + tabs_html = render_run_sub_tabs(run_id, active="artifacts") + + # Collect all artifacts: inputs + output + artifacts = [] + + # Add inputs + for i, content_hash in enumerate(run.inputs): + cache_path = get_cache_path(content_hash) + if cache_path and cache_path.exists(): + size = cache_path.stat().st_size + media_type = detect_media_type(cache_path) + artifacts.append({ + "hash": content_hash, + "path": cache_path, + "size": size, + "media_type": media_type, + "role": "input", + "role_color": "blue", + "name": f"Input {i + 1}", + }) + + # Add output + if run.output_hash: + cache_path = get_cache_path(run.output_hash) + if cache_path and cache_path.exists(): + size = cache_path.stat().st_size + media_type = detect_media_type(cache_path) + artifacts.append({ + "hash": run.output_hash, + "path": cache_path, + "size": size, + "media_type": media_type, + "role": "output", + "role_color": "green", + "name": "Output", + }) + + # Build artifacts HTML + artifacts_html = "" + for artifact in artifacts: + size_kb = artifact["size"] / 1024 + if size_kb < 1024: + size_str = f"{size_kb:.1f} KB" + else: + size_str = f"{size_kb/1024:.1f} MB" + + # Thumbnail for media + thumb = "" + if artifact["media_type"] == "video": + thumb = f'' + elif artifact["media_type"] == "image": + thumb = f'' + else: + thumb = '
File
' + + role_color = artifact["role_color"] + artifacts_html += f''' +
+ {thumb} +
+
{artifact["name"]}
+ {artifact["hash"][:32]}... +
+ {size_str} + {artifact["media_type"]} +
+
+ {artifact["role"]} +
+ ''' + + if not artifacts: + artifacts_html = '

No cached artifacts found for this run.

' + + content = f''' + + + + + Back to runs + + + {tabs_html} + +

Cached Artifacts

+
+ {artifacts_html} +
+ ''' + + return HTMLResponse(render_page(f"Artifacts: {run_id[:16]}...", content, ctx.actor_id, active_tab="runs")) + + +# JSON API endpoints for future WebSocket support +@app.get("/api/run/{run_id}/plan") +async def api_run_plan(run_id: str, request: Request): + """Get execution plan data as JSON for programmatic access.""" + ctx = await get_user_context_from_cookie(request) + if not ctx: + raise HTTPException(401, "Not logged in") + + run = await asyncio.to_thread(load_run, run_id) + if not run: + raise HTTPException(404, f"Run {run_id} not found") + + if run.username not in (ctx.username, ctx.actor_id): + raise HTTPException(403, "Access denied") + + # Look for plan in cache + PLAN_CACHE_DIR.mkdir(parents=True, exist_ok=True) + for plan_file in PLAN_CACHE_DIR.glob("*.json"): + try: + with open(plan_file) as f: + data = json.load(f) + plan_inputs = data.get("input_hashes", {}) + if set(plan_inputs.values()) == set(run.inputs): + return data + except (json.JSONDecodeError, IOError): + continue + + return {"status": "not_found", "message": "No plan found for this run"} + + +@app.get("/api/run/{run_id}/analysis") +async def api_run_analysis(run_id: str, request: Request): + """Get analysis data as JSON for programmatic access.""" + ctx = await get_user_context_from_cookie(request) + if not ctx: + raise HTTPException(401, "Not logged in") + + run = await asyncio.to_thread(load_run, run_id) + if not run: + raise HTTPException(404, f"Run {run_id} not found") + + if run.username not in (ctx.username, ctx.actor_id): + raise HTTPException(403, "Access denied") + + ANALYSIS_CACHE_DIR.mkdir(parents=True, exist_ok=True) + results = {} + for input_hash in run.inputs: + analysis_path = ANALYSIS_CACHE_DIR / f"{input_hash}.json" + if analysis_path.exists(): + try: + with open(analysis_path) as f: + results[input_hash] = json.load(f) + except (json.JSONDecodeError, IOError): + results[input_hash] = None + else: + results[input_hash] = None + + return {"run_id": run_id, "inputs": run.inputs, "analysis": results} + + @app.get("/runs") async def list_runs(request: Request, page: int = 1, limit: int = 20): """List runs. HTML for browsers (with infinite scroll), JSON for APIs (with pagination).""" @@ -1868,11 +2352,12 @@ async def recipe_detail_page(recipe_id: str, request: Request):
-
+

{recipe.name}

v{recipe.version} {pinned_badge} {l2_link_html} + View DAG

{recipe.description or 'No description'}

{recipe.recipe_id}
@@ -1900,6 +2385,131 @@ async def recipe_detail_page(recipe_id: str, request: Request): return HTMLResponse(render_page(f"Recipe: {recipe.name}", content, ctx.actor_id if ctx else None, active_tab="recipes")) +@app.get("/recipe/{recipe_id}/dag", response_class=HTMLResponse) +async def recipe_dag_visualization(recipe_id: str, request: Request): + """Visualize recipe structure as DAG.""" + ctx = await get_user_context_from_cookie(request) + recipe = load_recipe(recipe_id) + + if not recipe: + return HTMLResponse(render_page_with_cytoscape( + "Recipe Not Found", + f'

Recipe {recipe_id} not found.

', + ctx.actor_id if ctx else None, + active_tab="recipes" + ), status_code=404) + + # Load recipe YAML + recipe_path = cache_manager.get_by_content_hash(recipe_id) + if not recipe_path or not recipe_path.exists(): + return HTMLResponse(render_page_with_cytoscape( + "Recipe Not Found", + '

Recipe file not found in cache.

', + ctx.actor_id if ctx else None, + active_tab="recipes" + ), status_code=404) + + try: + recipe_yaml = recipe_path.read_text() + config = yaml.safe_load(recipe_yaml) + except Exception as e: + return HTMLResponse(render_page_with_cytoscape( + "Error", + f'

Failed to parse recipe: {e}

', + ctx.actor_id if ctx else None, + active_tab="recipes" + ), status_code=500) + + dag_config = config.get("dag", {}) + dag_nodes = dag_config.get("nodes", []) + output_node = dag_config.get("output") + + # Build Cytoscape nodes and edges + nodes = [] + edges = [] + + for node_def in dag_nodes: + node_id = node_def.get("id", "") + node_type = node_def.get("type", "EFFECT") + node_config = node_def.get("config", {}) + input_names = node_def.get("inputs", []) + + # Determine if this is the output node + is_output = node_id == output_node + if is_output: + color = NODE_COLORS.get("OUTPUT", NODE_COLORS["default"]) + else: + color = NODE_COLORS.get(node_type, NODE_COLORS["default"]) + + # Get effect name if it's an effect node + label = node_id + if node_type == "EFFECT" and "effect" in node_config: + label = node_config["effect"] + + nodes.append({ + "data": { + "id": node_id, + "label": label, + "nodeType": node_type, + "isOutput": is_output, + "color": color, + "config": node_config + } + }) + + # Create edges from inputs + for input_name in input_names: + edges.append({ + "data": { + "source": input_name, + "target": node_id + } + }) + + nodes_json = json.dumps(nodes) + edges_json = json.dumps(edges) + + dag_html = render_dag_cytoscape(nodes_json, edges_json) + + content = f''' + + +
+
+

{recipe.name}

+ v{recipe.version} +
+

{recipe.description or 'No description'}

+
+ +
+

DAG Structure

+ +
+
+ + SOURCE + + + EFFECT + + + OUTPUT + +
+
+ +

Click on a node to see its configuration. The purple-bordered node is the output.

+ + {dag_html} +
+ ''' + + return HTMLResponse(render_page_with_cytoscape(f"DAG: {recipe.name}", content, ctx.actor_id if ctx else None, active_tab="recipes")) + + @app.post("/ui/recipes/{recipe_id}/run", response_class=HTMLResponse) async def ui_run_recipe(recipe_id: str, request: Request): """HTMX handler: run a recipe with form inputs.""" @@ -3803,6 +4413,228 @@ TAILWIND_CONFIG = ''' ''' +# Cytoscape.js for DAG visualization (extends TAILWIND_CONFIG) +CYTOSCAPE_CONFIG = TAILWIND_CONFIG + ''' + + + +''' + +# Node colors for DAG visualization +NODE_COLORS = { + "SOURCE": "#3b82f6", # Blue + "EFFECT": "#22c55e", # Green + "OUTPUT": "#a855f7", # Purple + "ANALYSIS": "#f59e0b", # Amber + "_LIST": "#6366f1", # Indigo + "default": "#6b7280" # Gray +} + + +def render_run_sub_tabs(run_id: str, active: str = "overview") -> str: + """Render sub-navigation tabs for run detail pages.""" + tabs = [ + ("overview", "Overview", f"/run/{run_id}"), + ("plan", "Plan", f"/run/{run_id}/plan"), + ("analysis", "Analysis", f"/run/{run_id}/analysis"), + ("artifacts", "Artifacts", f"/run/{run_id}/artifacts"), + ] + + html = '
' + for tab_id, label, url in tabs: + if tab_id == active: + active_class = "border-b-2 border-blue-500 text-white" + else: + active_class = "text-gray-400 hover:text-white" + html += f'{label}' + html += '
' + return html + + +def render_dag_cytoscape(nodes_json: str, edges_json: str, container_id: str = "cy") -> str: + """Render Cytoscape.js DAG visualization HTML with WebSocket-ready architecture.""" + return f''' +
+ + + ''' + + +def render_page_with_cytoscape(title: str, content: str, actor_id: Optional[str] = None, active_tab: str = None) -> str: + """Render a page with Cytoscape.js support for DAG visualization.""" + user_info = "" + if actor_id: + parts = actor_id.lstrip("@").split("@") + username = parts[0] if parts else actor_id + domain = parts[1] if len(parts) > 1 else "" + l2_user_url = f"https://{domain}/users/{username}" if domain else "#" + user_info = f''' +
+ Logged in as {actor_id} +
+ ''' + else: + user_info = ''' +
+ Not logged in +
+ ''' + + runs_active = "border-b-2 border-blue-500 text-white" if active_tab == "runs" else "text-gray-400 hover:text-white" + recipes_active = "border-b-2 border-blue-500 text-white" if active_tab == "recipes" else "text-gray-400 hover:text-white" + media_active = "border-b-2 border-blue-500 text-white" if active_tab == "media" else "text-gray-400 hover:text-white" + storage_active = "border-b-2 border-blue-500 text-white" if active_tab == "storage" else "text-gray-400 hover:text-white" + + return f""" + + + + + + {title} | Art DAG L1 Server + {CYTOSCAPE_CONFIG} + + +
+
+

+ Art DAG L1 Server +

+ {user_info} +
+ + + +
+ {content} +
+
+ + +""" + def render_page(title: str, content: str, actor_id: Optional[str] = None, active_tab: str = None) -> str: """Render a page with nav bar and content. Used for clean URL pages.