#!/usr/bin/env python3 """ Run a staged recipe through analyze -> plan -> execute pipeline. This script demonstrates stage-level caching: analysis stages can be skipped on re-run if the inputs haven't changed. Usage: python3 run_staged.py recipe.sexp [-o output.mp4] python3 run_staged.py effects/ascii_art_staged.sexp -o ascii_out.mp4 The script: 1. Compiles the recipe and extracts stage information 2. For each stage in topological order: - Check stage cache (skip if hit) - Run stage (analyze, plan, execute) - Cache stage outputs 3. Produce final output """ import os import sys import json import tempfile import shutil import subprocess from pathlib import Path from typing import Dict, List, Optional, Any # Add artdag to path sys.path.insert(0, str(Path(__file__).parent.parent / "artdag")) from artdag.sexp import compile_string, parse from artdag.sexp.parser import Symbol, Keyword, serialize from artdag.sexp.planner import create_plan # Import unified cache import cache as unified_cache import hashlib def _cache_analysis_tracks(plan): """Cache each analysis track individually, replace data with cache-id refs.""" import json as _json for name, data in plan.analysis.items(): json_str = _json.dumps(data, sort_keys=True) content_cid = hashlib.sha256(json_str.encode()).hexdigest() unified_cache.cache_store_json(content_cid, data) plan.analysis[name] = {"_cache_id": content_cid} def _resolve_analysis_refs(analysis_dict): """Resolve cache-id refs back to full analysis data.""" resolved = {} for name, data in analysis_dict.items(): if isinstance(data, dict) and "_cache_id" in data: loaded = unified_cache.cache_get_json(data["_cache_id"]) if loaded: resolved[name] = loaded else: resolved[name] = data return resolved def run_staged_recipe( recipe_path: Path, output_path: Optional[Path] = None, cache_dir: Optional[Path] = None, params: Optional[Dict[str, Any]] = None, verbose: bool = True, force_replan: bool = False, ) -> Path: """ Run a staged recipe with stage-level caching. Args: recipe_path: Path to the .sexp recipe file output_path: Optional output file path cache_dir: Optional cache directory for stage results params: Optional parameter overrides verbose: Print progress information Returns: Path to the final output file """ recipe_text = recipe_path.read_text() recipe_dir = recipe_path.parent # Use unified cache content_cache_dir = unified_cache.get_content_dir() def log(msg: str): if verbose: print(msg, file=sys.stderr) # Store recipe source by CID recipe_cid, _ = unified_cache.content_store_string(recipe_text) log(f"Recipe CID: {recipe_cid[:16]}...") # Compile recipe log(f"Compiling: {recipe_path}") compiled = compile_string(recipe_text, params, recipe_dir=recipe_dir) log(f"Recipe: {compiled.name} v{compiled.version}") log(f"Nodes: {len(compiled.nodes)}") # Store effects by CID for effect_name, effect_info in compiled.registry.get("effects", {}).items(): effect_path = effect_info.get("path") effect_cid = effect_info.get("cid") if effect_path and effect_cid: effect_file = Path(effect_path) if effect_file.exists(): stored_cid, _ = unified_cache.content_store_file(effect_file) if stored_cid == effect_cid: log(f"Effect '{effect_name}' CID: {effect_cid[:16]}...") else: log(f"Warning: Effect '{effect_name}' CID mismatch") # Store analyzers by CID for analyzer_name, analyzer_info in compiled.registry.get("analyzers", {}).items(): analyzer_path = analyzer_info.get("path") analyzer_cid = analyzer_info.get("cid") if analyzer_path: analyzer_file = Path(analyzer_path) if Path(analyzer_path).is_absolute() else recipe_dir / analyzer_path if analyzer_file.exists(): stored_cid, _ = unified_cache.content_store_file(analyzer_file) log(f"Analyzer '{analyzer_name}' CID: {stored_cid[:16]}...") # Store included files by CID for include_path, include_cid in compiled.registry.get("includes", {}).items(): include_file = Path(include_path) if include_file.exists(): stored_cid, _ = unified_cache.content_store_file(include_file) if stored_cid == include_cid: log(f"Include '{include_file.name}' CID: {include_cid[:16]}...") else: log(f"Warning: Include '{include_file.name}' CID mismatch") # Check for stages if not compiled.stages: log("No stages found - running as regular recipe") return _run_non_staged(compiled, recipe_dir, output_path, verbose) log(f"\nStages: {len(compiled.stages)}") log(f"Stage order: {compiled.stage_order}") # Display stage info for stage in compiled.stages: log(f"\n Stage: {stage.name}") log(f" Requires: {stage.requires or '(none)'}") log(f" Inputs: {stage.inputs or '(none)'}") log(f" Outputs: {stage.outputs}") # Create plan with analysis log("\n--- Planning ---") analysis_data = {} def on_analysis(node_id: str, results: dict): analysis_data[node_id] = results times = results.get("times", []) log(f" Analysis complete: {node_id[:16]}... ({len(times)} times)") # Check for cached plan using unified cache plan_cid = unified_cache.plan_exists(recipe_cid, params) if plan_cid and not force_replan: plan_cache_path = unified_cache.plan_get_path(recipe_cid, params) log(f"\nFound cached plan: {plan_cid[:16]}...") plan_sexp_str = unified_cache.plan_load(recipe_cid, params) # Parse the cached plan from execute import parse_plan_input plan_dict = parse_plan_input(plan_sexp_str) # Resolve cache-id refs in plan's embedded analysis if "analysis" in plan_dict: plan_dict["analysis"] = _resolve_analysis_refs(plan_dict["analysis"]) # Load analysis data from unified cache analysis_data = {} for step in plan_dict.get("steps", []): if step.get("node_type") == "ANALYZE": step_id = step.get("step_id") cached_analysis = unified_cache.cache_get_json(step_id) if cached_analysis: analysis_data[step_id] = cached_analysis log(f" Loaded analysis: {step_id[:16]}...") log(f"Plan ID: {plan_dict.get('plan_id', 'unknown')[:16]}...") log(f"Steps: {len(plan_dict.get('steps', []))}") log(f"Analysis tracks: {list(analysis_data.keys())}") # Execute directly from cached plan log("\n--- Execution (from cached plan) ---") from execute import execute_plan result_path = execute_plan( plan_path=plan_cache_path, output_path=output_path, recipe_dir=recipe_dir, external_analysis=analysis_data, cache_dir=content_cache_dir, ) log(f"\n--- Complete ---") log(f"Output: {result_path}") return result_path # No cached plan - create new one plan = create_plan( compiled, inputs={}, recipe_dir=recipe_dir, on_analysis=on_analysis, ) log(f"\nPlan ID: {plan.plan_id[:16]}...") log(f"Steps: {len(plan.steps)}") log(f"Analysis tracks: {list(analysis_data.keys())}") # Cache analysis tracks individually and replace with cache-id refs _cache_analysis_tracks(plan) # Save plan to unified cache plan_sexp_str = plan.to_string(pretty=True) plan_cache_id, plan_cid, plan_cache_path = unified_cache.plan_store(recipe_cid, params, plan_sexp_str) log(f"Saved plan: {plan_cache_id[:16]}... → {plan_cid[:16]}...") # Execute the plan using execute.py logic log("\n--- Execution ---") from execute import execute_plan # Resolve cache-id refs back to full data for execution resolved_analysis = _resolve_analysis_refs(plan.analysis) plan_dict = { "plan_id": plan.plan_id, "source_hash": plan.source_hash, "encoding": compiled.encoding, "output_step_id": plan.output_step_id, "analysis": {**resolved_analysis, **analysis_data}, "effects_registry": plan.effects_registry, "minimal_primitives": plan.minimal_primitives, "steps": [], } for step in plan.steps: step_dict = { "step_id": step.step_id, "node_type": step.node_type, "config": step.config, "inputs": step.inputs, "level": step.level, "cache_id": step.cache_id, } # Tag with stage info if present if step.stage: step_dict["stage"] = step.stage plan_dict["steps"].append(step_dict) # Execute using unified cache result_path = execute_plan( plan_path=None, output_path=output_path, recipe_dir=recipe_dir, plan_data=plan_dict, external_analysis=analysis_data, cache_dir=content_cache_dir, ) log(f"\n--- Complete ---") log(f"Output: {result_path}") return result_path def _run_non_staged(compiled, recipe_dir: Path, output_path: Optional[Path], verbose: bool) -> Path: """Run a non-staged recipe using the standard pipeline.""" from execute import execute_plan from plan import plan_recipe # This is a fallback for recipes without stages # Just run through regular plan -> execute raise NotImplementedError("Non-staged recipes should use plan.py | execute.py") def list_cache(verbose: bool = False): """List all cached items using the unified cache.""" unified_cache.print_cache_listing(verbose) def list_params(recipe_path: Path): """List available parameters for a recipe and its effects.""" from artdag.sexp import parse from artdag.sexp.parser import Symbol, Keyword from artdag.sexp.compiler import _parse_params from artdag.sexp.effect_loader import load_sexp_effect_file recipe_text = recipe_path.read_text() sexp = parse(recipe_text) if isinstance(sexp, list) and len(sexp) == 1: sexp = sexp[0] # Find recipe name recipe_name = sexp[1] if len(sexp) > 1 and isinstance(sexp[1], str) else recipe_path.stem # Find :params block and effect declarations recipe_params = [] effect_declarations = {} # name -> path i = 2 while i < len(sexp): item = sexp[i] if isinstance(item, Keyword) and item.name == "params": if i + 1 < len(sexp): recipe_params = _parse_params(sexp[i + 1]) i += 2 elif isinstance(item, list) and item: # Check for effect declaration: (effect name :path "...") if isinstance(item[0], Symbol) and item[0].name == "effect": if len(item) >= 2: effect_name = item[1].name if isinstance(item[1], Symbol) else item[1] # Find :path j = 2 while j < len(item): if isinstance(item[j], Keyword) and item[j].name == "path": if j + 1 < len(item): effect_declarations[effect_name] = item[j + 1] break j += 1 i += 1 else: i += 1 # Load effect params effect_params = {} # effect_name -> list of ParamDef recipe_dir = recipe_path.parent for effect_name, effect_rel_path in effect_declarations.items(): effect_path = recipe_dir / effect_rel_path if effect_path.exists() and effect_path.suffix == ".sexp": try: _, _, _, param_defs = load_sexp_effect_file(effect_path) if param_defs: effect_params[effect_name] = param_defs except Exception as e: print(f"Warning: Could not load params from effect {effect_name}: {e}", file=sys.stderr) # Print results def print_params(params, header_prefix=""): print(f"{header_prefix}{'Name':<20} {'Type':<8} {'Default':<12} {'Range/Choices':<20} Description") print(f"{header_prefix}{'-' * 88}") for p in params: range_str = "" if p.range_min is not None and p.range_max is not None: range_str = f"[{p.range_min}, {p.range_max}]" elif p.choices: range_str = ", ".join(p.choices[:3]) if len(p.choices) > 3: range_str += "..." default_str = str(p.default) if p.default is not None else "-" if len(default_str) > 10: default_str = default_str[:9] + "…" print(f"{header_prefix}{p.name:<20} {p.param_type:<8} {default_str:<12} {range_str:<20} {p.description}") if recipe_params: print(f"\nRecipe parameters for '{recipe_name}':\n") print_params(recipe_params) else: print(f"\nRecipe '{recipe_name}' has no declared parameters.") if effect_params: for effect_name, params in effect_params.items(): print(f"\n\nEffect '{effect_name}' parameters:\n") print_params(params) if not recipe_params and not effect_params: print("\nParameters can be declared using :params block:") print(""" :params ( (color_mode :type string :default "color" :desc "Character color") (char_size :type int :default 12 :range [4 32] :desc "Cell size") ) """) return print("\n\nUsage:") print(f" python3 run_staged.py {recipe_path} -p = [-p = ...]") print(f"\nExample:") all_params = recipe_params + [p for params in effect_params.values() for p in params] if all_params: p = all_params[0] example_val = p.default if p.default else ("value" if p.param_type == "string" else "1") print(f" python3 run_staged.py {recipe_path} -p {p.name}={example_val}") def main(): import argparse parser = argparse.ArgumentParser( description="Run a staged recipe with stage-level caching", formatter_class=argparse.RawDescriptionHelpFormatter, epilog=""" Examples: python3 run_staged.py effects/ascii_art_fx_staged.sexp --list-params python3 run_staged.py effects/ascii_art_fx_staged.sexp -o output.mp4 python3 run_staged.py recipe.sexp -p color_mode=lime -p char_jitter=5 """ ) parser.add_argument("recipe", type=Path, nargs="?", help="Recipe file (.sexp)") parser.add_argument("-o", "--output", type=Path, help="Output file path") parser.add_argument("-p", "--param", action="append", dest="params", metavar="KEY=VALUE", help="Set recipe parameter") parser.add_argument("-q", "--quiet", action="store_true", help="Suppress progress output") parser.add_argument("--list-params", action="store_true", help="List available parameters and exit") parser.add_argument("--list-cache", action="store_true", help="List cached items and exit") parser.add_argument("--no-cache", action="store_true", help="Ignore cached plan, force re-planning") parser.add_argument("--show-plan", action="store_true", help="Show the plan S-expression and exit (don't execute)") parser.add_argument("-v", "--verbose", action="store_true", help="Verbose output") parser.add_argument("-j", "--jobs", type=int, default=None, help="Max parallel workers (default: 4, or ARTDAG_WORKERS env)") parser.add_argument("--pipelines", type=int, default=None, help="Max concurrent video pipelines (default: 1, or ARTDAG_VIDEO_PIPELINES env)") args = parser.parse_args() # Apply concurrency limits before any execution if args.jobs is not None: os.environ["ARTDAG_WORKERS"] = str(args.jobs) if args.pipelines is not None: os.environ["ARTDAG_VIDEO_PIPELINES"] = str(args.pipelines) from execute import set_max_video_pipelines set_max_video_pipelines(args.pipelines) # List cache mode - doesn't require recipe if args.list_cache: list_cache(verbose=args.verbose) sys.exit(0) # All other modes require a recipe if not args.recipe: print("Error: recipe file required", file=sys.stderr) sys.exit(1) if not args.recipe.exists(): print(f"Recipe not found: {args.recipe}", file=sys.stderr) sys.exit(1) # List params mode if args.list_params: list_params(args.recipe) sys.exit(0) # Parse parameters params = {} if args.params: for param_str in args.params: if "=" not in param_str: print(f"Invalid parameter format: {param_str}", file=sys.stderr) sys.exit(1) key, value = param_str.split("=", 1) # Try to parse as number try: value = int(value) except ValueError: try: value = float(value) except ValueError: pass # Keep as string params[key] = value # Show plan mode - generate plan and display without executing if args.show_plan: recipe_text = args.recipe.read_text() recipe_dir = args.recipe.parent # Compute recipe CID (content hash) recipe_cid, _ = unified_cache.content_store_string(recipe_text) compiled = compile_string(recipe_text, params if params else None, recipe_dir=recipe_dir) # Check for cached plan using unified cache (keyed by source CID + params) plan_cid = unified_cache.plan_exists(recipe_cid, params if params else None) if plan_cid and not args.no_cache: print(f";; Cached plan CID: {plan_cid}", file=sys.stderr) plan_sexp_str = unified_cache.plan_load(recipe_cid, params if params else None) print(plan_sexp_str) else: print(f";; Generating new plan...", file=sys.stderr) analysis_data = {} def on_analysis(node_id: str, results: dict): analysis_data[node_id] = results plan = create_plan( compiled, inputs={}, recipe_dir=recipe_dir, on_analysis=on_analysis, ) # Cache analysis tracks individually before serialization _cache_analysis_tracks(plan) plan_sexp_str = plan.to_string(pretty=True) # Save to unified cache cache_id, plan_cid, plan_path = unified_cache.plan_store(recipe_cid, params if params else None, plan_sexp_str) print(f";; Saved: {cache_id[:16]}... → {plan_cid}", file=sys.stderr) print(plan_sexp_str) sys.exit(0) result = run_staged_recipe( recipe_path=args.recipe, output_path=args.output, params=params if params else None, verbose=not args.quiet, force_replan=args.no_cache, ) # Print final output path print(result) if __name__ == "__main__": main()