From d20eef76ad9d2c640169c3685e4aaf820f3d7223 Mon Sep 17 00:00:00 2001 From: giles Date: Mon, 2 Feb 2026 23:24:39 +0000 Subject: [PATCH] Fix completed runs not appearing in list + add purge-failed endpoint - Update save_run_cache to also update actor_id, recipe, inputs on conflict - Add logging for actor_id when saving runs to run_cache - Add admin endpoint DELETE /runs/admin/purge-failed to delete all failed runs Co-Authored-By: Claude Opus 4.5 --- app/routers/cache.py | 173 +++++++- app/routers/effects.py | 168 +++----- app/routers/runs.py | 128 +++++- app/services/cache_service.py | 30 +- app/services/recipe_service.py | 44 +- app/services/run_service.py | 27 ++ app/templates/cache/detail.html | 20 +- app/templates/cache/media_list.html | 206 ++++++++++ app/templates/effects/detail.html | 39 +- app/templates/effects/list.html | 171 +++++--- app/templates/recipes/detail.html | 22 +- app/templates/runs/detail.html | 9 +- cache_manager.py | 63 +-- celery_app.py | 2 +- database.py | 5 +- ipfs_client.py | 4 +- recipes/woods-recipe.sexp | 134 +++++++ sexp_effects/parser.py | 489 +++++++++++++++++------ sexp_effects/primitive_libs/streaming.py | 25 +- streaming/stream_sexp_generic.py | 148 +++++-- tasks/__init__.py | 3 + tasks/ipfs_upload.py | 83 ++++ tasks/streaming.py | 119 +++++- templates/standard-effects.sexp | 12 +- 24 files changed, 1671 insertions(+), 453 deletions(-) create mode 100644 recipes/woods-recipe.sexp create mode 100644 tasks/ipfs_upload.py diff --git a/app/routers/cache.py b/app/routers/cache.py index 1a491d7..dc03d44 100644 --- a/app/routers/cache.py +++ b/app/routers/cache.py @@ -8,7 +8,7 @@ import logging from pathlib import Path from typing import Optional, Dict, Any -from fastapi import APIRouter, Request, Depends, HTTPException, UploadFile, File +from fastapi import APIRouter, Request, Depends, HTTPException, UploadFile, File, Form from fastapi.responses import HTMLResponse, FileResponse from pydantic import BaseModel @@ -208,13 +208,95 @@ async def import_from_ipfs( return {"cid": cid, "imported": True} -@router.post("/upload") -async def upload_content( - file: UploadFile = File(...), +@router.post("/upload/chunk") +async def upload_chunk( + request: Request, + chunk: UploadFile = File(...), + upload_id: str = Form(...), + chunk_index: int = Form(...), + total_chunks: int = Form(...), + filename: str = Form(...), + display_name: Optional[str] = Form(None), ctx: UserContext = Depends(require_auth), cache_service: CacheService = Depends(get_cache_service), ): - """Upload content to cache and IPFS.""" + """Upload a file chunk. Assembles file when all chunks received.""" + import tempfile + import os + + # Create temp dir for this upload + chunk_dir = Path(tempfile.gettempdir()) / "uploads" / upload_id + chunk_dir.mkdir(parents=True, exist_ok=True) + + # Save this chunk + chunk_path = chunk_dir / f"chunk_{chunk_index:05d}" + chunk_data = await chunk.read() + chunk_path.write_bytes(chunk_data) + + # Check if all chunks received + received = len(list(chunk_dir.glob("chunk_*"))) + + if received < total_chunks: + return {"status": "partial", "received": received, "total": total_chunks} + + # All chunks received - assemble file + final_path = chunk_dir / filename + with open(final_path, 'wb') as f: + for i in range(total_chunks): + cp = chunk_dir / f"chunk_{i:05d}" + f.write(cp.read_bytes()) + cp.unlink() # Clean up chunk + + # Read assembled file + content = final_path.read_bytes() + final_path.unlink() + chunk_dir.rmdir() + + # Now do the normal upload flow + cid, ipfs_cid, error = await cache_service.upload_content( + content=content, + filename=filename, + actor_id=ctx.actor_id, + ) + + if error: + raise HTTPException(400, error) + + # Assign friendly name + final_cid = ipfs_cid or cid + from ..services.naming_service import get_naming_service + naming = get_naming_service() + friendly_entry = await naming.assign_name( + cid=final_cid, + actor_id=ctx.actor_id, + item_type="media", + display_name=display_name, + filename=filename, + ) + + return { + "status": "complete", + "cid": final_cid, + "friendly_name": friendly_entry["friendly_name"], + "filename": filename, + "size": len(content), + "uploaded": True, + } + + +@router.post("/upload") +async def upload_content( + file: UploadFile = File(...), + display_name: Optional[str] = Form(None), + ctx: UserContext = Depends(require_auth), + cache_service: CacheService = Depends(get_cache_service), +): + """Upload content to cache and IPFS. + + Args: + file: The file to upload + display_name: Optional custom name for the media (used as friendly name) + """ content = await file.read() cid, ipfs_cid, error = await cache_service.upload_content( content=content, @@ -233,6 +315,7 @@ async def upload_content( cid=final_cid, actor_id=ctx.actor_id, item_type="media", + display_name=display_name, # Use custom name if provided filename=file.filename, ) @@ -350,3 +433,83 @@ async def update_metadata_htmx(
Metadata saved!
''') + + +# Friendly name editing +@router.get("/{cid}/name-form", response_class=HTMLResponse) +async def get_name_form( + cid: str, + request: Request, + cache_service: CacheService = Depends(get_cache_service), +): + """Get friendly name editing form (HTMX).""" + ctx = await get_current_user(request) + if not ctx: + return HTMLResponse('
Login required
') + + # Get current friendly name + from ..services.naming_service import get_naming_service + naming = get_naming_service() + entry = await naming.get_by_cid(ctx.actor_id, cid) + current_name = entry.get("base_name", "") if entry else "" + + return HTMLResponse(f''' +
+
+ + +

A name to reference this media in recipes

+
+
+ + +
+
+ ''') + + +@router.post("/{cid}/name", response_class=HTMLResponse) +async def update_friendly_name( + cid: str, + request: Request, +): + """Update friendly name (HTMX form handler).""" + ctx = await get_current_user(request) + if not ctx: + return HTMLResponse('
Login required
') + + form_data = await request.form() + display_name = form_data.get("display_name", "").strip() + + if not display_name: + return HTMLResponse('
Name cannot be empty
') + + from ..services.naming_service import get_naming_service + naming = get_naming_service() + + try: + entry = await naming.assign_name( + cid=cid, + actor_id=ctx.actor_id, + item_type="media", + display_name=display_name, + ) + + return HTMLResponse(f''' +
Name updated!
+ + ''') + except Exception as e: + return HTMLResponse(f'
Error: {e}
') diff --git a/app/routers/effects.py b/app/routers/effects.py index 6bb4f0a..994a925 100644 --- a/app/routers/effects.py +++ b/app/routers/effects.py @@ -2,17 +2,17 @@ Effects routes for L1 server. Handles effect upload, listing, and metadata. -Effects are stored in IPFS like all other content-addressed data. +Effects are S-expression files stored in IPFS like all other content-addressed data. """ -import hashlib import json import logging +import re import time from pathlib import Path from typing import Optional -from fastapi import APIRouter, Request, Depends, HTTPException, UploadFile, File +from fastapi import APIRouter, Request, Depends, HTTPException, UploadFile, File, Form from fastapi.responses import HTMLResponse, PlainTextResponse from artdag_common import render @@ -40,12 +40,11 @@ def get_effects_dir() -> Path: def parse_effect_metadata(source: str) -> dict: """ - Parse effect metadata from source code. + Parse effect metadata from S-expression source code. - Extracts PEP 723 dependencies and @-tag metadata from docstring. + Extracts metadata from comment headers (;; @key value format) + or from (defeffect name ...) form. """ - import re - metadata = { "name": "", "version": "1.0.0", @@ -53,97 +52,54 @@ def parse_effect_metadata(source: str) -> dict: "temporal": False, "description": "", "params": [], - "dependencies": [], - "requires_python": ">=3.10", } - # Parse PEP 723 dependencies - pep723_match = re.search(r"# /// script\n(.*?)# ///", source, re.DOTALL) - if pep723_match: - block = pep723_match.group(1) - deps_match = re.search(r'# dependencies = \[(.*?)\]', block, re.DOTALL) - if deps_match: - metadata["dependencies"] = re.findall(r'"([^"]+)"', deps_match.group(1)) - python_match = re.search(r'# requires-python = "([^"]+)"', block) - if python_match: - metadata["requires_python"] = python_match.group(1) + # Parse comment-based metadata (;; @key value) + for line in source.split("\n"): + stripped = line.strip() + if not stripped.startswith(";"): + # Stop parsing metadata at first non-comment line + if stripped and not stripped.startswith("("): + continue + if stripped.startswith("("): + break - # Parse docstring @-tags - docstring_match = re.search(r'"""(.*?)"""', source, re.DOTALL) - if not docstring_match: - docstring_match = re.search(r"'''(.*?)'''", source, re.DOTALL) + # Remove comment prefix + comment = stripped.lstrip(";").strip() - if docstring_match: - docstring = docstring_match.group(1) - lines = docstring.split("\n") + if comment.startswith("@effect "): + metadata["name"] = comment[8:].strip() + elif comment.startswith("@name "): + metadata["name"] = comment[6:].strip() + elif comment.startswith("@version "): + metadata["version"] = comment[9:].strip() + elif comment.startswith("@author "): + metadata["author"] = comment[8:].strip() + elif comment.startswith("@temporal"): + val = comment[9:].strip().lower() if len(comment) > 9 else "true" + metadata["temporal"] = val in ("true", "yes", "1", "") + elif comment.startswith("@description "): + metadata["description"] = comment[13:].strip() + elif comment.startswith("@param "): + # Format: @param name type [description] + parts = comment[7:].split(None, 2) + if len(parts) >= 2: + param = {"name": parts[0], "type": parts[1]} + if len(parts) > 2: + param["description"] = parts[2] + metadata["params"].append(param) - current_param = None - desc_lines = [] - in_description = False + # Also try to extract name from (defeffect "name" ...) or (effect "name" ...) + if not metadata["name"]: + name_match = re.search(r'\((defeffect|effect)\s+"([^"]+)"', source) + if name_match: + metadata["name"] = name_match.group(2) - for line in lines: - stripped = line.strip() - - if stripped.startswith("@effect "): - metadata["name"] = stripped[8:].strip() - in_description = False - - elif stripped.startswith("@version "): - metadata["version"] = stripped[9:].strip() - - elif stripped.startswith("@author "): - metadata["author"] = stripped[8:].strip() - - elif stripped.startswith("@temporal "): - val = stripped[10:].strip().lower() - metadata["temporal"] = val in ("true", "yes", "1") - - elif stripped.startswith("@description"): - in_description = True - desc_lines = [] - - elif stripped.startswith("@param "): - in_description = False - if current_param: - metadata["params"].append(current_param) - parts = stripped[7:].split() - if len(parts) >= 2: - current_param = { - "name": parts[0], - "type": parts[1], - "description": "", - } - else: - current_param = None - - elif stripped.startswith("@range ") and current_param: - range_parts = stripped[7:].split() - if len(range_parts) >= 2: - try: - current_param["range"] = [float(range_parts[0]), float(range_parts[1])] - except ValueError: - pass - - elif stripped.startswith("@default ") and current_param: - current_param["default"] = stripped[9:].strip() - - elif stripped.startswith("@example"): - in_description = False - if current_param: - metadata["params"].append(current_param) - current_param = None - - elif in_description and stripped: - desc_lines.append(stripped) - - elif current_param and stripped and not stripped.startswith("@"): - current_param["description"] = stripped - - if in_description: - metadata["description"] = " ".join(desc_lines) - - if current_param: - metadata["params"].append(current_param) + # Try to extract name from first (define ...) form + if not metadata["name"]: + define_match = re.search(r'\(define\s+(\w+)', source) + if define_match: + metadata["name"] = define_match.group(1) return metadata @@ -151,13 +107,18 @@ def parse_effect_metadata(source: str) -> dict: @router.post("/upload") async def upload_effect( file: UploadFile = File(...), + display_name: Optional[str] = Form(None), ctx: UserContext = Depends(require_auth), ): """ - Upload an effect to IPFS. + Upload an S-expression effect to IPFS. - Parses PEP 723 metadata and @-tag docstring. + Parses metadata from comment headers. Returns IPFS CID for use in recipes. + + Args: + file: The .sexp effect file + display_name: Optional custom friendly name for the effect """ content = await file.read() @@ -166,7 +127,7 @@ async def upload_effect( except UnicodeDecodeError: raise HTTPException(400, "Effect must be valid UTF-8 text") - # Parse metadata + # Parse metadata from sexp source try: meta = parse_effect_metadata(source) except Exception as e: @@ -185,7 +146,7 @@ async def upload_effect( effects_dir = get_effects_dir() effect_dir = effects_dir / cid effect_dir.mkdir(parents=True, exist_ok=True) - (effect_dir / "effect.py").write_text(source, encoding="utf-8") + (effect_dir / "effect.sexp").write_text(source, encoding="utf-8") # Store metadata (locally and in IPFS) full_meta = { @@ -209,14 +170,14 @@ async def upload_effect( filename=file.filename, ) - # Assign friendly name + # Assign friendly name (use custom display_name if provided, else from metadata) from ..services.naming_service import get_naming_service naming = get_naming_service() friendly_entry = await naming.assign_name( cid=cid, actor_id=ctx.actor_id, item_type="effect", - display_name=meta.get("name"), + display_name=display_name or meta.get("name"), filename=file.filename, ) @@ -230,7 +191,6 @@ async def upload_effect( "version": meta.get("version"), "temporal": meta.get("temporal", False), "params": meta.get("params", []), - "dependencies": meta.get("dependencies", []), "uploaded": True, } @@ -258,7 +218,7 @@ async def get_effect( # Cache locally effect_dir.mkdir(parents=True, exist_ok=True) source = source_bytes.decode("utf-8") - (effect_dir / "effect.py").write_text(source) + (effect_dir / "effect.sexp").write_text(source) # Parse metadata from source parsed_meta = parse_effect_metadata(source) @@ -297,12 +257,16 @@ async def get_effect_source( ): """Get effect source code.""" effects_dir = get_effects_dir() - source_path = effects_dir / cid / "effect.py" + source_path = effects_dir / cid / "effect.sexp" - # Try local cache first + # Try local cache first (check both .sexp and legacy .py) if source_path.exists(): return PlainTextResponse(source_path.read_text()) + legacy_path = effects_dir / cid / "effect.py" + if legacy_path.exists(): + return PlainTextResponse(legacy_path.read_text()) + # Fetch from IPFS source_bytes = ipfs_client.get_bytes(cid) if not source_bytes: diff --git a/app/routers/runs.py b/app/routers/runs.py index f3cfefc..33bdc44 100644 --- a/app/routers/runs.py +++ b/app/routers/runs.py @@ -156,7 +156,6 @@ async def create_run( async def create_stream_run( request: StreamRequest, ctx: UserContext = Depends(require_auth), - redis = Depends(get_redis_client), ): """Start a streaming video render. @@ -166,13 +165,57 @@ async def create_stream_run( Assets can be referenced by CID or friendly name in the recipe. """ import uuid + import tempfile + from pathlib import Path + import database from tasks.streaming import run_stream + # Generate run ID run_id = str(uuid.uuid4()) - created_at = datetime.now(timezone.utc).isoformat() + + # Store recipe in cache so it appears on /recipes page + recipe_id = None + try: + cache_manager = get_cache_manager() + with tempfile.NamedTemporaryFile(delete=False, suffix=".sexp", mode="w") as tmp: + tmp.write(request.recipe_sexp) + tmp_path = Path(tmp.name) + + cached, ipfs_cid = cache_manager.put(tmp_path, node_type="recipe", move=True) + recipe_id = cached.cid + + # Extract recipe name from S-expression (look for (stream "name" ...) pattern) + import re + name_match = re.search(r'\(stream\s+"([^"]+)"', request.recipe_sexp) + recipe_name = name_match.group(1) if name_match else f"stream-{run_id[:8]}" + + # Track ownership in item_types + await database.save_item_metadata( + cid=recipe_id, + actor_id=ctx.actor_id, + item_type="recipe", + description=f"Streaming recipe: {recipe_name}", + filename=f"{recipe_name}.sexp", + ) + + # Assign friendly name + from ..services.naming_service import get_naming_service + naming = get_naming_service() + await naming.assign_name( + cid=recipe_id, + actor_id=ctx.actor_id, + item_type="recipe", + display_name=recipe_name, + ) + + logger.info(f"Stored streaming recipe {recipe_id[:16]}... as '{recipe_name}'") + except Exception as e: + logger.warning(f"Failed to store recipe in cache: {e}") + # Continue anyway - run will still work, just won't appear in /recipes # Submit Celery task task = run_stream.delay( + run_id=run_id, recipe_sexp=request.recipe_sexp, output_name=request.output_name, duration=request.duration, @@ -182,21 +225,15 @@ async def create_stream_run( audio_sexp=request.audio_sexp, ) - # Store run metadata in Redis - run_data = { - "run_id": run_id, - "status": "pending", - "recipe": "streaming", - "actor_id": ctx.actor_id, - "created_at": created_at, - "celery_task_id": task.id, - "output_name": request.output_name, - } - - await redis.set( - f"{RUNS_KEY_PREFIX}{run_id}", - json.dumps(run_data), - ex=86400 * 7 # 7 days + # Store in database for durability + pending = await database.create_pending_run( + run_id=run_id, + celery_task_id=task.id, + recipe=recipe_id or "streaming", # Use recipe CID if available + inputs=[], # Streaming recipes don't have traditional inputs + actor_id=ctx.actor_id, + dag_json=request.recipe_sexp, # Store recipe content for viewing + output_name=request.output_name, ) logger.info(f"Started stream run {run_id} with task {task.id}") @@ -204,8 +241,8 @@ async def create_stream_run( return RunStatus( run_id=run_id, status="pending", - recipe="streaming", - created_at=created_at, + recipe=recipe_id or "streaming", + created_at=pending.get("created_at"), celery_task_id=task.id, ) @@ -305,6 +342,32 @@ async def get_run( except Exception as e: logger.warning(f"Failed to load recipe for plan: {e}") + # Handle streaming runs - detect by recipe_sexp content or legacy "streaming" marker + recipe_sexp_content = run.get("recipe_sexp") + is_streaming = run.get("recipe") == "streaming" # Legacy marker + if not is_streaming and recipe_sexp_content: + # Check if content starts with (stream after skipping comments + for line in recipe_sexp_content.split('\n'): + stripped = line.strip() + if not stripped or stripped.startswith(';'): + continue + is_streaming = stripped.startswith('(stream') + break + if is_streaming and recipe_sexp_content and not plan: + plan_sexp = recipe_sexp_content + plan = { + "steps": [{ + "id": "stream", + "type": "STREAM", + "name": "Streaming Recipe", + "inputs": [], + "config": {}, + "status": "completed" if run.get("status") == "completed" else "pending", + }] + } + run["total_steps"] = 1 + run["executed"] = 1 if run.get("status") == "completed" else 0 + # Helper to convert simple type to MIME type prefix for template def type_to_mime(simple_type: str) -> str: if simple_type == "video": @@ -564,10 +627,14 @@ async def run_detail( "analysis": analysis, } + # Extract plan_sexp for streaming runs + plan_sexp = plan.get("sexp") if plan else None + templates = get_templates(request) return render(templates, "runs/detail.html", request, run=run, plan=plan, + plan_sexp=plan_sexp, artifacts=artifacts, analysis=analysis, dag_elements=dag_elements, @@ -824,3 +891,26 @@ async def publish_run( return HTMLResponse(f'Shared: {ipfs_cid[:16]}...') return {"ipfs_cid": ipfs_cid, "output_cid": output_cid, "published": True} + + +@router.delete("/admin/purge-failed") +async def purge_failed_runs( + ctx: UserContext = Depends(require_auth), +): + """Delete all failed runs from pending_runs table.""" + import database + + # Get all failed runs + failed_runs = await database.list_pending_runs(status="failed") + + deleted = [] + for run in failed_runs: + run_id = run.get("run_id") + try: + await database.delete_pending_run(run_id) + deleted.append(run_id) + except Exception as e: + logger.warning(f"Failed to delete run {run_id}: {e}") + + logger.info(f"Purged {len(deleted)} failed runs") + return {"purged": len(deleted), "run_ids": deleted} diff --git a/app/services/cache_service.py b/app/services/cache_service.py index 1faed97..a4d54f6 100644 --- a/app/services/cache_service.py +++ b/app/services/cache_service.py @@ -4,6 +4,7 @@ Cache Service - business logic for cache and media management. import asyncio import json +import logging import os import subprocess from pathlib import Path @@ -11,6 +12,8 @@ from typing import Optional, List, Dict, Any, Tuple, TYPE_CHECKING import httpx +logger = logging.getLogger(__name__) + if TYPE_CHECKING: from database import Database from cache_manager import L1CacheManager @@ -513,7 +516,11 @@ class CacheService: filename: str, actor_id: str, ) -> Tuple[Optional[str], Optional[str], Optional[str]]: - """Upload content to cache. Returns (cid, ipfs_cid, error).""" + """Upload content to cache. Returns (cid, ipfs_cid, error). + + Files are stored locally first for fast response, then uploaded + to IPFS in the background. + """ import tempfile try: @@ -525,21 +532,28 @@ class CacheService: # Detect media type (video/image/audio) before moving file media_type = detect_media_type(tmp_path) - # Store in cache (also stores in IPFS) - cached, ipfs_cid = self.cache.put(tmp_path, node_type="upload", move=True) - cid = ipfs_cid or cached.cid # Prefer IPFS CID + # Store locally first (skip_ipfs=True for fast response) + # IPFS upload happens in background + cached, ipfs_cid = self.cache.put(tmp_path, node_type="upload", move=True, skip_ipfs=True) + cid = cached.cid # Use local hash since we skipped IPFS # Save to database with media category type - # Using media_type ("video", "image", "audio") not mime_type ("video/mp4") - # so list_media filtering works correctly - await self.db.create_cache_item(cid, ipfs_cid) + await self.db.create_cache_item(cid, ipfs_cid) # ipfs_cid is None initially await self.db.save_item_metadata( cid=cid, actor_id=actor_id, - item_type=media_type, # Store media category for filtering + item_type=media_type, filename=filename ) + # Queue background IPFS upload + try: + from tasks.ipfs_upload import upload_to_ipfs + upload_to_ipfs.delay(cid, actor_id) + logger.info(f"Queued background IPFS upload for {cid[:16]}...") + except Exception as e: + logger.warning(f"Failed to queue IPFS upload (will retry manually): {e}") + return cid, ipfs_cid, None except Exception as e: return None, None, f"Upload failed: {e}" diff --git a/app/services/recipe_service.py b/app/services/recipe_service.py index f9be0c6..6b0a70d 100644 --- a/app/services/recipe_service.py +++ b/app/services/recipe_service.py @@ -60,16 +60,40 @@ class RecipeService: logger = logging.getLogger(__name__) if is_sexp_format(content): - # Parse S-expression - try: - compiled = compile_string(content) - recipe_data = compiled.to_dict() - recipe_data["sexp"] = content - recipe_data["format"] = "sexp" - logger.info(f"Parsed sexp recipe {recipe_id[:16]}..., keys: {list(recipe_data.keys())}") - except (ParseError, CompileError) as e: - logger.warning(f"Failed to parse sexp recipe {recipe_id[:16]}...: {e}") - return {"error": str(e), "recipe_id": recipe_id} + # Detect if this is a streaming recipe (starts with (stream ...)) + def is_streaming_recipe(text): + for line in text.split('\n'): + stripped = line.strip() + if not stripped or stripped.startswith(';'): + continue + return stripped.startswith('(stream') + return False + + if is_streaming_recipe(content): + # Streaming recipes have different format - parse manually + import re + name_match = re.search(r'\(stream\s+"([^"]+)"', content) + recipe_name = name_match.group(1) if name_match else "streaming" + + recipe_data = { + "name": recipe_name, + "sexp": content, + "format": "sexp", + "type": "streaming", + "dag": {"nodes": []}, # Streaming recipes don't have traditional DAG + } + logger.info(f"Parsed streaming recipe {recipe_id[:16]}..., name: {recipe_name}") + else: + # Parse traditional (recipe ...) S-expression + try: + compiled = compile_string(content) + recipe_data = compiled.to_dict() + recipe_data["sexp"] = content + recipe_data["format"] = "sexp" + logger.info(f"Parsed sexp recipe {recipe_id[:16]}..., keys: {list(recipe_data.keys())}") + except (ParseError, CompileError) as e: + logger.warning(f"Failed to parse sexp recipe {recipe_id[:16]}...: {e}") + return {"error": str(e), "recipe_id": recipe_id} else: # Parse YAML try: diff --git a/app/services/run_service.py b/app/services/run_service.py index d1aa838..4e2feb6 100644 --- a/app/services/run_service.py +++ b/app/services/run_service.py @@ -128,10 +128,25 @@ class RunService: # Only return as completed if we have an output # (runs with no output should be re-executed) if output_cid: + # Also fetch recipe content from pending_runs for streaming runs + recipe_sexp = None + recipe_name = None + pending = await self.db.get_pending_run(run_id) + if pending: + recipe_sexp = pending.get("dag_json") + + # Extract recipe name from streaming recipe content + if recipe_sexp: + import re + name_match = re.search(r'\(stream\s+"([^"]+)"', recipe_sexp) + if name_match: + recipe_name = name_match.group(1) + return { "run_id": run_id, "status": "completed", "recipe": cached.get("recipe"), + "recipe_name": recipe_name, "inputs": self._ensure_inputs_list(cached.get("inputs")), "output_cid": output_cid, "ipfs_cid": cached.get("ipfs_cid"), @@ -140,6 +155,7 @@ class RunService: "actor_id": cached.get("actor_id"), "created_at": cached.get("created_at"), "completed_at": cached.get("created_at"), + "recipe_sexp": recipe_sexp, } # Check database for pending run @@ -175,6 +191,7 @@ class RunService: "output_name": pending.get("output_name"), "created_at": pending.get("created_at"), "error": pending.get("error"), + "recipe_sexp": pending.get("dag_json"), # Recipe content for streaming runs } # If task completed, get result @@ -209,6 +226,7 @@ class RunService: "actor_id": pending.get("actor_id"), "created_at": pending.get("created_at"), "error": pending.get("error"), + "recipe_sexp": pending.get("dag_json"), # Recipe content for streaming runs } # Fallback: Check Redis for backwards compatibility @@ -714,12 +732,21 @@ class RunService: """Get execution plan for a run. Plans are just node outputs - cached by content hash like everything else. + For streaming runs, returns the recipe content as the plan. """ # Get run to find plan_cache_id run = await self.get_run(run_id) if not run: return None + # For streaming runs, return the recipe as the plan + if run.get("recipe") == "streaming" and run.get("recipe_sexp"): + return { + "steps": [{"id": "stream", "type": "STREAM", "name": "Streaming Recipe"}], + "sexp": run.get("recipe_sexp"), + "format": "sexp", + } + # Check plan_cid (stored in database) or plan_cache_id (legacy) plan_cid = run.get("plan_cid") or run.get("plan_cache_id") if plan_cid: diff --git a/app/templates/cache/detail.html b/app/templates/cache/detail.html index da68606..c4f7915 100644 --- a/app/templates/cache/detail.html +++ b/app/templates/cache/detail.html @@ -40,15 +40,23 @@ - {% if cache.friendly_name %} -
-
+
+
Friendly Name -

{{ cache.friendly_name }}

+
-

Use in recipes: {{ cache.base_name }}

+ {% if cache.friendly_name %} +

{{ cache.friendly_name }}

+

Use in recipes: {{ cache.base_name }}

+ {% else %} +

No friendly name assigned. Click Edit to add one.

+ {% endif %}
- {% endif %}
diff --git a/app/templates/cache/media_list.html b/app/templates/cache/media_list.html index 55a11c2..0a436aa 100644 --- a/app/templates/cache/media_list.html +++ b/app/templates/cache/media_list.html @@ -7,6 +7,10 @@

Media

+ +

Select one or more files to upload

+
+ +
+ + +

A friendly name to reference this media in recipes

+
+ + + + + +
+ + +
+ +
+
+ {% if items %}
{% for item in items %} @@ -115,5 +171,155 @@ function filterMedia() { } }); } + +// Show/hide name field based on file count +document.getElementById('upload-file').addEventListener('change', function(e) { + const nameField = document.getElementById('single-name-field'); + if (e.target.files.length > 1) { + nameField.style.display = 'none'; + } else { + nameField.style.display = 'block'; + } +}); + +// Handle upload form +document.getElementById('upload-form').addEventListener('submit', async function(e) { + e.preventDefault(); + + const form = e.target; + const fileInput = document.getElementById('upload-file'); + const files = fileInput.files; + const displayName = document.getElementById('upload-name').value; + const progressDiv = document.getElementById('upload-progress'); + const progressBar = document.getElementById('progress-bar'); + const progressText = document.getElementById('progress-text'); + const resultDiv = document.getElementById('upload-result'); + const uploadBtn = document.getElementById('upload-btn'); + + // Show progress + progressDiv.classList.remove('hidden'); + resultDiv.classList.add('hidden'); + uploadBtn.disabled = true; + + const results = []; + const errors = []; + + const CHUNK_SIZE = 1024 * 1024; // 1MB chunks + + for (let i = 0; i < files.length; i++) { + const file = files[i]; + const totalChunks = Math.ceil(file.size / CHUNK_SIZE); + const uploadId = crypto.randomUUID(); + const useChunked = file.size > CHUNK_SIZE * 2; // Use chunked for files > 2MB + + progressText.textContent = `Uploading ${i + 1} of ${files.length}: ${file.name}`; + + try { + let data; + + if (useChunked && totalChunks > 1) { + // Chunked upload for large files + for (let chunkIndex = 0; chunkIndex < totalChunks; chunkIndex++) { + const start = chunkIndex * CHUNK_SIZE; + const end = Math.min(start + CHUNK_SIZE, file.size); + const chunk = file.slice(start, end); + + const chunkForm = new FormData(); + chunkForm.append('chunk', chunk); + chunkForm.append('upload_id', uploadId); + chunkForm.append('chunk_index', chunkIndex); + chunkForm.append('total_chunks', totalChunks); + chunkForm.append('filename', file.name); + if (files.length === 1 && displayName) { + chunkForm.append('display_name', displayName); + } + + const chunkProgress = ((i + (chunkIndex + 1) / totalChunks) / files.length) * 100; + progressBar.style.width = `${chunkProgress}%`; + progressText.textContent = `Uploading ${i + 1} of ${files.length}: ${file.name} (${chunkIndex + 1}/${totalChunks} chunks)`; + + const response = await fetch('/media/upload/chunk', { + method: 'POST', + body: chunkForm, + }); + + const contentType = response.headers.get('content-type') || ''; + if (!contentType.includes('application/json')) { + const text = await response.text(); + throw new Error(`Server error (${response.status}): ${text.substring(0, 100)}`); + } + + data = await response.json(); + if (!response.ok) { + throw new Error(data.detail || 'Chunk upload failed'); + } + } + } else { + // Regular upload for small files + const formData = new FormData(); + formData.append('file', file); + if (files.length === 1 && displayName) { + formData.append('display_name', displayName); + } + + progressBar.style.width = `${((i + 0.5) / files.length) * 100}%`; + + const response = await fetch('/media/upload', { + method: 'POST', + body: formData, + }); + + const contentType = response.headers.get('content-type') || ''; + if (!contentType.includes('application/json')) { + const text = await response.text(); + throw new Error(`Server error (${response.status}): ${text.substring(0, 100)}`); + } + + data = await response.json(); + if (!response.ok) { + throw new Error(data.detail || 'Upload failed'); + } + } + + results.push({ filename: file.name, friendly_name: data.friendly_name, cid: data.cid }); + } catch (err) { + errors.push({ filename: file.name, error: err.message }); + } + + progressBar.style.width = `${((i + 1) / files.length) * 100}%`; + } + + progressText.textContent = 'Upload complete!'; + + // Show results + let html = ''; + if (results.length > 0) { + html += '
'; + html += `

${results.length} file(s) uploaded successfully!

`; + for (const r of results) { + html += `

${r.filename} → ${r.friendly_name}

`; + } + html += '
'; + } + if (errors.length > 0) { + html += '
'; + html += `

${errors.length} file(s) failed:

`; + for (const e of errors) { + html += `

${e.filename}: ${e.error}

`; + } + html += '
'; + } + + resultDiv.innerHTML = html; + resultDiv.classList.remove('hidden'); + + if (results.length > 0) { + // Reload page after 2 seconds + setTimeout(() => location.reload(), 2000); + } else { + uploadBtn.disabled = false; + uploadBtn.textContent = 'Upload'; + } +}); {% endblock %} diff --git a/app/templates/effects/detail.html b/app/templates/effects/detail.html index a7d9403..c94d914 100644 --- a/app/templates/effects/detail.html +++ b/app/templates/effects/detail.html @@ -8,7 +8,8 @@ {{ super() }} - + + {% endblock %} {% block content %} @@ -93,35 +94,23 @@
{% endif %} - - {% if meta.dependencies %} -
-
- Dependencies -
-
-
- {% for dep in meta.dependencies %} - {{ dep }} - {% endfor %} -
- {% if meta.requires_python %} -

Python {{ meta.requires_python }}

- {% endif %} -
-
- {% endif %} -
Usage in Recipe
-
(effect {{ meta.name or 'effect' }} :cid "{{ effect.cid }}")
+ {% if effect.base_name %} +
({{ effect.base_name }} ...)

- Reference this effect in your recipe S-expression. + Use the friendly name to reference this effect.

+ {% else %} +
(effect :cid "{{ effect.cid }}")
+

+ Reference this effect by CID in your recipe. +

+ {% endif %}
@@ -130,17 +119,17 @@
- Source Code + Source Code (S-expression)
-
Loading...
+
Loading...
diff --git a/app/templates/effects/list.html b/app/templates/effects/list.html index 74b7696..065d2bb 100644 --- a/app/templates/effects/list.html +++ b/app/templates/effects/list.html @@ -6,15 +6,59 @@

Effects

-
+ + +

- Effects are Python scripts that process video frames or whole videos. - Each effect is stored in IPFS and can be referenced by CID in recipes. + Effects are S-expression files that define video processing operations. + Each effect is stored in IPFS and can be referenced by name in recipes.

{% if effects %} @@ -49,17 +93,6 @@
{% endif %} - {% if meta.dependencies %} -
- {% for dep in meta.dependencies[:3] %} - {{ dep }} - {% endfor %} - {% if meta.dependencies | length > 3 %} - +{{ meta.dependencies | length - 3 }} more - {% endif %} -
- {% endif %} -
{% if effect.friendly_name %} {{ effect.friendly_name }} @@ -83,67 +116,85 @@ {% else %}
+ + +

No effects uploaded yet.

- Effects are Python files with @effect metadata in a docstring. + Effects are S-expression files with metadata in comment headers.

-
{% endif %}
-
- {% endblock %} diff --git a/app/templates/recipes/detail.html b/app/templates/recipes/detail.html index be6bc95..b96051d 100644 --- a/app/templates/recipes/detail.html +++ b/app/templates/recipes/detail.html @@ -50,6 +50,17 @@
+ {% if recipe.type == 'streaming' %} + +
+
+ Streaming Recipe +
+

+ This recipe uses frame-by-frame streaming rendering. The pipeline is defined as an S-expression that generates frames dynamically. +

+
+ {% else %}
@@ -99,11 +110,16 @@
{% endfor %}
+ {% endif %} - -

Source

+ +

Recipe (S-expression)

-
{{ recipe.yaml }}
+ {% if recipe.sexp %} +
{{ recipe.sexp }}
+ {% else %} +

No source available

+ {% endif %}
diff --git a/app/templates/runs/detail.html b/app/templates/runs/detail.html index 916fc6f..989319b 100644 --- a/app/templates/runs/detail.html +++ b/app/templates/runs/detail.html @@ -24,6 +24,9 @@ {% if run.cached %} Cached {% endif %} + {% if run.error %} + {{ run.error }} + {% endif %}