From d603485d407644010265dc29747deba8955233a6 Mon Sep 17 00:00:00 2001 From: gilesb Date: Tue, 13 Jan 2026 00:27:24 +0000 Subject: [PATCH] Refactor to S-expression based execution with code-addressed cache IDs Major changes: - Add execute_recipe task that uses S-expression planner - Recipe S-expression unfolds into plan S-expression with code-addressed cache IDs - Cache IDs computed from Merkle tree of plan structure (before execution) - Add ipfs_client.add_string() for storing S-expression plans - Update run_service.create_run() to use execute_recipe when recipe_sexp available - Add _sexp_to_steps() to parse S-expression plans for UI visualization - Plan endpoint now returns both sexp content and parsed steps The code-addressed hashing means each plan step's cache_id is: sha3_256({node_type, config, sorted(input_cache_ids)}) This creates deterministic "buckets" for computation results computed entirely from the plan structure, enabling automatic cache reuse. Co-Authored-By: Claude Opus 4.5 --- app/routers/recipes.py | 1 + app/services/run_service.py | 135 ++++++++++++- ipfs_client.py | 14 ++ legacy_tasks.py | 383 ++++++++++++++++++++++++++++++++++++ 4 files changed, 529 insertions(+), 4 deletions(-) diff --git a/app/routers/recipes.py b/app/routers/recipes.py index 200f0c4..1a55397 100644 --- a/app/routers/recipes.py +++ b/app/routers/recipes.py @@ -569,6 +569,7 @@ async def run_recipe( actor_id=ctx.actor_id, l2_server=ctx.l2_server, recipe_name=recipe.get("name"), # Store name for display + recipe_sexp=recipe.get("sexp"), # S-expression for code-addressed execution ) if error: diff --git a/app/services/run_service.py b/app/services/run_service.py index 54dd168..d92acd4 100644 --- a/app/services/run_service.py +++ b/app/services/run_service.py @@ -312,15 +312,19 @@ class RunService: actor_id: Optional[str] = None, l2_server: Optional[str] = None, recipe_name: Optional[str] = None, + recipe_sexp: Optional[str] = None, ) -> Tuple[Optional[RunResult], Optional[str]]: """ Create a new rendering run. Checks cache before executing. + If recipe_sexp is provided, uses the new S-expression execution path + which generates code-addressed cache IDs before execution. + Returns (run_dict, error_message). """ import httpx try: - from legacy_tasks import render_effect, execute_dag, build_effect_dag + from legacy_tasks import render_effect, execute_dag, build_effect_dag, execute_recipe except ImportError as e: return None, f"Celery tasks not available: {e}" @@ -401,7 +405,17 @@ class RunService: # Not cached - submit to Celery try: - if use_dag or recipe == "dag": + # Prefer S-expression execution path (code-addressed cache IDs) + if recipe_sexp: + # Convert inputs to dict if needed + if isinstance(inputs, dict): + input_hashes = inputs + else: + # Legacy list format - use positional names + input_hashes = {f"input_{i}": cid for i, cid in enumerate(input_list)} + + task = execute_recipe.delay(recipe_sexp, input_hashes, run_id) + elif use_dag or recipe == "dag": if dag_json: dag_data = dag_json else: @@ -562,6 +576,118 @@ class RunService: "format": "json", } + def _sexp_to_steps(self, sexp_content: str) -> Dict[str, Any]: + """Convert S-expression plan to steps list format for UI. + + Parses the S-expression plan format: + (plan :id :recipe :recipe-hash + (inputs (input_name hash) ...) + (step step_id :cache-id :level (node-type :key val ...)) + ... + :output ) + + Returns steps list compatible with UI visualization. + """ + try: + from artdag.sexp import parse, Symbol, Keyword + except ImportError: + return {"sexp": sexp_content, "steps": [], "format": "sexp"} + + try: + parsed = parse(sexp_content) + except Exception: + return {"sexp": sexp_content, "steps": [], "format": "sexp"} + + if not isinstance(parsed, list) or not parsed: + return {"sexp": sexp_content, "steps": [], "format": "sexp"} + + steps = [] + output_step_id = None + plan_id = None + recipe_name = None + + # Parse plan structure + i = 0 + while i < len(parsed): + item = parsed[i] + + if isinstance(item, Keyword): + key = item.name + if i + 1 < len(parsed): + value = parsed[i + 1] + if key == "id": + plan_id = value + elif key == "recipe": + recipe_name = value + elif key == "output": + output_step_id = value + i += 2 + continue + + if isinstance(item, list) and item: + first = item[0] + if isinstance(first, Symbol) and first.name == "step": + # Parse step: (step step_id :cache-id :level (node-expr)) + step_id = item[1] if len(item) > 1 else None + cache_id = None + level = 0 + node_type = "EFFECT" + config = {} + inputs = [] + + j = 2 + while j < len(item): + part = item[j] + if isinstance(part, Keyword): + key = part.name + if j + 1 < len(item): + val = item[j + 1] + if key == "cache-id": + cache_id = val + elif key == "level": + level = val + j += 2 + continue + elif isinstance(part, list) and part: + # Node expression: (node-type :key val ...) + if isinstance(part[0], Symbol): + node_type = part[0].name.upper() + k = 1 + while k < len(part): + if isinstance(part[k], Keyword): + kname = part[k].name + if k + 1 < len(part): + kval = part[k + 1] + if kname == "inputs": + inputs = kval if isinstance(kval, list) else [kval] + else: + config[kname] = kval + k += 2 + continue + k += 1 + j += 1 + + steps.append({ + "id": step_id, + "step_id": step_id, + "type": node_type, + "config": config, + "inputs": inputs, + "cache_id": cache_id or step_id, + "level": level, + }) + + i += 1 + + return { + "sexp": sexp_content, + "steps": steps, + "output_id": output_step_id, + "plan_id": plan_id, + "recipe": recipe_name, + "format": "sexp", + } + async def get_run_plan(self, run_id: str) -> Optional[Dict[str, Any]]: """Get execution plan for a run. @@ -582,7 +708,8 @@ class RunService: content = f.read() # Detect format if content.strip().startswith("("): - return {"sexp": content, "format": "sexp"} + # S-expression format - parse for UI + return self._sexp_to_steps(content) else: plan = json.loads(content) return self._dag_to_steps(plan) @@ -591,7 +718,7 @@ class RunService: sexp_path = self.cache_dir / "plans" / f"{run_id}.sexp" if sexp_path.exists(): with open(sexp_path) as f: - return {"sexp": f.read(), "format": "sexp"} + return self._sexp_to_steps(f.read()) json_path = self.cache_dir / "plans" / f"{run_id}.json" if json_path.exists(): diff --git a/ipfs_client.py b/ipfs_client.py index 905e15e..5079c64 100644 --- a/ipfs_client.py +++ b/ipfs_client.py @@ -118,6 +118,20 @@ def add_json(data: dict, pin: bool = True) -> Optional[str]: return add_bytes(json_bytes, pin=pin) +def add_string(content: str, pin: bool = True) -> Optional[str]: + """ + Add a string to IPFS and optionally pin it. + + Args: + content: String content to add (e.g., S-expression) + pin: Whether to pin the data (default: True) + + Returns: + IPFS CID or None on failure + """ + return add_bytes(content.encode('utf-8'), pin=pin) + + def get_file(cid: str, dest_path: Path) -> bool: """ Retrieve a file from IPFS and save to destination. diff --git a/legacy_tasks.py b/legacy_tasks.py index 15006e7..bb0448f 100644 --- a/legacy_tasks.py +++ b/legacy_tasks.py @@ -548,6 +548,389 @@ def execute_dag(self, dag_json: str, run_id: str = None) -> dict: } +@app.task(base=RenderTask, bind=True) +def execute_recipe(self, recipe_sexp: str, input_hashes: Dict[str, str], run_id: str = None) -> dict: + """ + Execute an S-expression recipe. + + The recipe S-expression unfolds into a plan S-expression with code-addressed + cache IDs computed before execution. Each plan node gets a deterministic hash + "bucket" based on the computation definition (Merkle tree), not the results. + + Phases: + 1. Parse: compile_string(recipe_sexp) -> CompiledRecipe + 2. Analyze: Extract and run analysis nodes from recipe + 3. Plan: create_plan(compiled, inputs) -> ExecutionPlanSexp with cache IDs + 4. Store: plan.to_string() -> store as S-expression + 5. Execute: Run steps level-by-level, checking cache by cache_id + 6. Return: Include plan_sexp in result + + Args: + recipe_sexp: Recipe as S-expression string + input_hashes: Mapping from input name to content hash (CID) + run_id: Optional run ID for tracking + + Returns: + Execution result with output CID, plan S-expression, and node results + """ + from cache_manager import get_cache_manager + import ipfs_client + + # Try to import S-expression modules + try: + from artdag.sexp import compile_string, CompileError, ParseError + from artdag.sexp.planner import create_plan, ExecutionPlanSexp, PlanStep + except ImportError as e: + raise ImportError(f"S-expression modules not available: {e}") + + cache_manager = get_cache_manager() + + logger.info(f"Executing recipe with {len(input_hashes)} inputs, run_id={run_id}") + + # ============ Phase 1: Parse ============ + self.update_state(state='PARSING', meta={'status': 'parsing recipe'}) + logger.info("Phase 1: Parsing recipe S-expression...") + + try: + compiled = compile_string(recipe_sexp) + except (ParseError, CompileError) as e: + raise ValueError(f"Recipe parse error: {e}") + + recipe_name = compiled.name or "unnamed" + logger.info(f"Parsed recipe: {recipe_name}") + + # ============ Phase 2: Analysis ============ + self.update_state(state='ANALYZING', meta={'status': 'running analysis'}) + logger.info("Phase 2: Running analysis nodes...") + + analysis_results = {} + # Extract analysis nodes from compiled recipe + for node in compiled.nodes: + node_type = node.get("type", "").upper() + config = node.get("config", {}) + + if node_type == "ANALYZE" or config.get("analyze"): + node_id = node.get("id") + input_ref = config.get("input") or config.get("source") + feature = config.get("feature") or config.get("analyze") + + # Resolve input reference to CID + cid = input_hashes.get(input_ref) + if not cid: + logger.warning(f"Analysis node {node_id}: input '{input_ref}' not in input_hashes") + continue + + # Get input file path + input_path = cache_manager.get_by_cid(cid) + if not input_path: + logger.warning(f"Analysis node {node_id}: content {cid[:16]}... not in cache") + continue + + # Run analysis + try: + from artdag.analysis import Analyzer + analysis_dir = CACHE_DIR / "analysis" + analysis_dir.mkdir(parents=True, exist_ok=True) + analyzer = Analyzer(cache_dir=analysis_dir) + + features = [feature] if feature else ["beats", "energy"] + result = analyzer.analyze( + input_hash=cid, + features=features, + input_path=Path(input_path), + ) + analysis_results[node_id] = result + analysis_results[cid] = result + logger.info(f"Analysis {node_id}: feature={feature}") + except Exception as e: + logger.warning(f"Analysis failed for {node_id}: {e}") + + logger.info(f"Completed {len(analysis_results)} analysis results") + + # ============ Phase 3: Generate Plan ============ + self.update_state(state='PLANNING', meta={'status': 'generating plan'}) + logger.info("Phase 3: Generating execution plan with code-addressed cache IDs...") + + plan = create_plan(compiled, inputs=input_hashes) + logger.info(f"Generated plan with {len(plan.steps)} steps, plan_id={plan.plan_id[:16]}...") + + # ============ Phase 4: Store Plan as S-expression ============ + plan_sexp = plan.to_string(pretty=True) + plan_cid = None + + try: + plan_cid = ipfs_client.add_string(plan_sexp) + if plan_cid: + logger.info(f"Stored plan to IPFS: {plan_cid}") + # Also store locally for fast retrieval + plan_path = CACHE_DIR / plan_cid + CACHE_DIR.mkdir(parents=True, exist_ok=True) + plan_path.write_text(plan_sexp) + except Exception as e: + logger.warning(f"Failed to store plan to IPFS: {e}") + + # ============ Phase 5: Execute Steps Level-by-Level ============ + self.update_state(state='EXECUTING', meta={'status': 'executing steps', 'total_steps': len(plan.steps)}) + logger.info("Phase 4: Executing plan steps...") + + # Group steps by level + steps_by_level: Dict[int, List[PlanStep]] = {} + for step in plan.steps: + level = step.level + steps_by_level.setdefault(level, []).append(step) + + max_level = max(steps_by_level.keys()) if steps_by_level else 0 + + step_results = {} # step_id -> {"status", "path", "cid", "ipfs_cid"} + cache_id_to_path = {} # cache_id -> output path (for resolving inputs) + total_cached = 0 + total_executed = 0 + + # Map input names to their cache_ids (inputs are their own cache_ids) + for name, cid in input_hashes.items(): + cache_id_to_path[cid] = cache_manager.get_by_cid(cid) + + for level in range(max_level + 1): + level_steps = steps_by_level.get(level, []) + if not level_steps: + continue + + logger.info(f"Executing level {level}: {len(level_steps)} steps") + + for step in level_steps: + self.update_state( + state='EXECUTING', + meta={ + 'step_id': step.step_id, + 'step_type': step.node_type, + 'level': level, + 'cache_id': step.cache_id[:16], + } + ) + + # Check if cached using code-addressed cache_id + cached_path = cache_manager.get_by_cid(step.cache_id) + if cached_path and cached_path.exists(): + logger.info(f"Step {step.step_id}: cached at {step.cache_id[:16]}...") + step_results[step.step_id] = { + "status": "cached", + "path": str(cached_path), + "cache_id": step.cache_id, + } + cache_id_to_path[step.cache_id] = cached_path + total_cached += 1 + continue + + # Execute the step + try: + # Resolve input paths from previous step cache_ids + input_paths = [] + for input_ref in step.inputs: + # input_ref is a step_id - find its cache_id and path + input_step = next((s for s in plan.steps if s.step_id == input_ref), None) + if input_step: + input_cache_id = input_step.cache_id + input_path = cache_id_to_path.get(input_cache_id) + if input_path: + input_paths.append(Path(input_path)) + else: + # Check if it's a source input + source_cid = step.config.get("cid") + if source_cid: + input_path = cache_manager.get_by_cid(source_cid) + if input_path: + input_paths.append(Path(input_path)) + else: + # Direct CID reference (source node) + source_cid = input_hashes.get(input_ref) or step.config.get("cid") + if source_cid: + input_path = cache_manager.get_by_cid(source_cid) + if input_path: + input_paths.append(Path(input_path)) + + # Handle SOURCE nodes + if step.node_type == "SOURCE": + source_cid = step.config.get("cid") + if source_cid: + source_path = cache_manager.get_by_cid(source_cid) + if source_path: + step_results[step.step_id] = { + "status": "source", + "path": str(source_path), + "cache_id": step.cache_id, + "cid": source_cid, + } + cache_id_to_path[step.cache_id] = source_path + total_cached += 1 + continue + else: + raise ValueError(f"Source content not found: {source_cid}") + + # Get executor for this step type + executor = get_executor(step.node_type) + if not executor: + # Try effect executor + effect_name = step.config.get("effect") + if effect_name: + executor = get_executor(f"effect:{effect_name}") + + if not executor: + raise ValueError(f"No executor for node type: {step.node_type}") + + # Determine output path + output_dir = CACHE_DIR / "nodes" / step.cache_id + output_dir.mkdir(parents=True, exist_ok=True) + output_path = output_dir / "output.mkv" + + # Execute + logger.info(f"Executing step {step.step_id} ({step.node_type}) with {len(input_paths)} inputs") + result_path = executor.execute(step.config, input_paths, output_path) + + # Store result in cache under code-addressed cache_id + cached, content_cid = cache_manager.put( + result_path, + node_type=step.node_type, + node_id=step.cache_id, # Use cache_id as node_id + ) + + step_results[step.step_id] = { + "status": "executed", + "path": str(result_path), + "cache_id": step.cache_id, + "cid": content_cid, + "ipfs_cid": content_cid if content_cid.startswith("Qm") or content_cid.startswith("bafy") else None, + } + cache_id_to_path[step.cache_id] = result_path + total_executed += 1 + + logger.info(f"Step {step.step_id}: executed -> {content_cid[:16]}...") + + except Exception as e: + logger.error(f"Step {step.step_id} failed: {e}") + return { + "success": False, + "run_id": run_id, + "error": f"Step {step.step_id} failed: {e}", + "step_results": step_results, + "plan_cid": plan_cid, + "plan_sexp": plan_sexp, + } + + # Get output from final step + output_step = next((s for s in plan.steps if s.step_id == plan.output_step_id), None) + output_cid = None + output_ipfs_cid = None + output_path = None + + if output_step: + output_result = step_results.get(output_step.step_id, {}) + output_cid = output_result.get("cid") or output_result.get("cache_id") + output_ipfs_cid = output_result.get("ipfs_cid") + output_path = output_result.get("path") + + # ============ Phase 6: Store Results ============ + logger.info("Phase 5: Storing results...") + + # Store in database + import asyncio + import database + + async def save_to_db(): + if database.pool is None: + await database.init_db() + + # Get actor_id from pending run + actor_id = None + pending = await database.get_pending_run(run_id) if run_id else None + if pending: + actor_id = pending.get("actor_id") + + await database.save_run_cache( + run_id=run_id, + output_cid=output_cid, + recipe=recipe_name, + inputs=list(input_hashes.values()), + ipfs_cid=output_ipfs_cid, + actor_id=actor_id, + plan_cid=plan_cid, + ) + + # Save output as media for user + if actor_id and output_cid: + await database.save_item_metadata( + cid=output_cid, + actor_id=actor_id, + item_type="media", + description=f"Output from recipe: {recipe_name}", + source_type="recipe", + source_note=f"run_id: {run_id}", + ) + + # Complete pending run + if pending and run_id: + await database.complete_pending_run(run_id) + + try: + loop = asyncio.get_event_loop() + if loop.is_running(): + asyncio.ensure_future(save_to_db()) + else: + loop.run_until_complete(save_to_db()) + except RuntimeError: + asyncio.run(save_to_db()) + + # Build and store provenance + provenance = { + "task_id": self.request.id, + "run_id": run_id, + "rendered_at": datetime.now(timezone.utc).isoformat(), + "recipe": recipe_name, + "recipe_sexp": recipe_sexp, + "plan_sexp": plan_sexp, + "plan_cid": plan_cid, + "output": { + "cid": output_cid, + "ipfs_cid": output_ipfs_cid, + }, + "inputs": input_hashes, + "steps": { + step_id: { + "cache_id": result.get("cache_id"), + "cid": result.get("cid"), + "status": result.get("status"), + } + for step_id, result in step_results.items() + }, + "execution": { + "total_steps": len(plan.steps), + "cached": total_cached, + "executed": total_executed, + } + } + + provenance_cid = ipfs_client.add_json(provenance) + if provenance_cid: + logger.info(f"Stored provenance on IPFS: {provenance_cid}") + + logger.info(f"Recipe execution complete: output={output_cid[:16] if output_cid else 'none'}...") + + return { + "success": True, + "run_id": run_id, + "recipe": recipe_name, + "plan_cid": plan_cid, + "plan_sexp": plan_sexp, + "output_cid": output_cid, + "output_ipfs_cid": output_ipfs_cid, + "output_path": output_path, + "total_steps": len(plan.steps), + "cached": total_cached, + "executed": total_executed, + "step_results": step_results, + "provenance_cid": provenance_cid, + } + + def build_effect_dag(input_hashes: List[str], effect_name: str) -> DAG: """ Build a simple DAG for applying an effect to inputs.