Files
rose-ash/test/run_staged.py
2026-02-24 23:10:04 +00:00

529 lines
19 KiB
Python

#!/usr/bin/env python3
"""
Run a staged recipe through analyze -> plan -> execute pipeline.
This script demonstrates stage-level caching: analysis stages can be
skipped on re-run if the inputs haven't changed.
Usage:
python3 run_staged.py recipe.sexp [-o output.mp4]
python3 run_staged.py effects/ascii_art_staged.sexp -o ascii_out.mp4
The script:
1. Compiles the recipe and extracts stage information
2. For each stage in topological order:
- Check stage cache (skip if hit)
- Run stage (analyze, plan, execute)
- Cache stage outputs
3. Produce final output
"""
import os
import sys
import json
import tempfile
import shutil
import subprocess
from pathlib import Path
from typing import Dict, List, Optional, Any
# Add artdag to path
sys.path.insert(0, str(Path(__file__).parent.parent / "artdag"))
from artdag.sexp import compile_string, parse
from artdag.sexp.parser import Symbol, Keyword, serialize
from artdag.sexp.planner import create_plan
# Import unified cache
import cache as unified_cache
import hashlib
def _cache_analysis_tracks(plan):
"""Cache each analysis track individually, replace data with cache-id refs."""
import json as _json
for name, data in plan.analysis.items():
json_str = _json.dumps(data, sort_keys=True)
content_cid = hashlib.sha256(json_str.encode()).hexdigest()
unified_cache.cache_store_json(content_cid, data)
plan.analysis[name] = {"_cache_id": content_cid}
def _resolve_analysis_refs(analysis_dict):
"""Resolve cache-id refs back to full analysis data."""
resolved = {}
for name, data in analysis_dict.items():
if isinstance(data, dict) and "_cache_id" in data:
loaded = unified_cache.cache_get_json(data["_cache_id"])
if loaded:
resolved[name] = loaded
else:
resolved[name] = data
return resolved
def run_staged_recipe(
recipe_path: Path,
output_path: Optional[Path] = None,
cache_dir: Optional[Path] = None,
params: Optional[Dict[str, Any]] = None,
verbose: bool = True,
force_replan: bool = False,
) -> Path:
"""
Run a staged recipe with stage-level caching.
Args:
recipe_path: Path to the .sexp recipe file
output_path: Optional output file path
cache_dir: Optional cache directory for stage results
params: Optional parameter overrides
verbose: Print progress information
Returns:
Path to the final output file
"""
recipe_text = recipe_path.read_text()
recipe_dir = recipe_path.parent
# Use unified cache
content_cache_dir = unified_cache.get_content_dir()
def log(msg: str):
if verbose:
print(msg, file=sys.stderr)
# Store recipe source by CID
recipe_cid, _ = unified_cache.content_store_string(recipe_text)
log(f"Recipe CID: {recipe_cid[:16]}...")
# Compile recipe
log(f"Compiling: {recipe_path}")
compiled = compile_string(recipe_text, params, recipe_dir=recipe_dir)
log(f"Recipe: {compiled.name} v{compiled.version}")
log(f"Nodes: {len(compiled.nodes)}")
# Store effects by CID
for effect_name, effect_info in compiled.registry.get("effects", {}).items():
effect_path = effect_info.get("path")
effect_cid = effect_info.get("cid")
if effect_path and effect_cid:
effect_file = Path(effect_path)
if effect_file.exists():
stored_cid, _ = unified_cache.content_store_file(effect_file)
if stored_cid == effect_cid:
log(f"Effect '{effect_name}' CID: {effect_cid[:16]}...")
else:
log(f"Warning: Effect '{effect_name}' CID mismatch")
# Store analyzers by CID
for analyzer_name, analyzer_info in compiled.registry.get("analyzers", {}).items():
analyzer_path = analyzer_info.get("path")
analyzer_cid = analyzer_info.get("cid")
if analyzer_path:
analyzer_file = Path(analyzer_path) if Path(analyzer_path).is_absolute() else recipe_dir / analyzer_path
if analyzer_file.exists():
stored_cid, _ = unified_cache.content_store_file(analyzer_file)
log(f"Analyzer '{analyzer_name}' CID: {stored_cid[:16]}...")
# Store included files by CID
for include_path, include_cid in compiled.registry.get("includes", {}).items():
include_file = Path(include_path)
if include_file.exists():
stored_cid, _ = unified_cache.content_store_file(include_file)
if stored_cid == include_cid:
log(f"Include '{include_file.name}' CID: {include_cid[:16]}...")
else:
log(f"Warning: Include '{include_file.name}' CID mismatch")
# Check for stages
if not compiled.stages:
log("No stages found - running as regular recipe")
return _run_non_staged(compiled, recipe_dir, output_path, verbose)
log(f"\nStages: {len(compiled.stages)}")
log(f"Stage order: {compiled.stage_order}")
# Display stage info
for stage in compiled.stages:
log(f"\n Stage: {stage.name}")
log(f" Requires: {stage.requires or '(none)'}")
log(f" Inputs: {stage.inputs or '(none)'}")
log(f" Outputs: {stage.outputs}")
# Create plan with analysis
log("\n--- Planning ---")
analysis_data = {}
def on_analysis(node_id: str, results: dict):
analysis_data[node_id] = results
times = results.get("times", [])
log(f" Analysis complete: {node_id[:16]}... ({len(times)} times)")
# Check for cached plan using unified cache
plan_cid = unified_cache.plan_exists(recipe_cid, params)
if plan_cid and not force_replan:
plan_cache_path = unified_cache.plan_get_path(recipe_cid, params)
log(f"\nFound cached plan: {plan_cid[:16]}...")
plan_sexp_str = unified_cache.plan_load(recipe_cid, params)
# Parse the cached plan
from execute import parse_plan_input
plan_dict = parse_plan_input(plan_sexp_str)
# Resolve cache-id refs in plan's embedded analysis
if "analysis" in plan_dict:
plan_dict["analysis"] = _resolve_analysis_refs(plan_dict["analysis"])
# Load analysis data from unified cache
analysis_data = {}
for step in plan_dict.get("steps", []):
if step.get("node_type") == "ANALYZE":
step_id = step.get("step_id")
cached_analysis = unified_cache.cache_get_json(step_id)
if cached_analysis:
analysis_data[step_id] = cached_analysis
log(f" Loaded analysis: {step_id[:16]}...")
log(f"Plan ID: {plan_dict.get('plan_id', 'unknown')[:16]}...")
log(f"Steps: {len(plan_dict.get('steps', []))}")
log(f"Analysis tracks: {list(analysis_data.keys())}")
# Execute directly from cached plan
log("\n--- Execution (from cached plan) ---")
from execute import execute_plan
result_path = execute_plan(
plan_path=plan_cache_path,
output_path=output_path,
recipe_dir=recipe_dir,
external_analysis=analysis_data,
cache_dir=content_cache_dir,
)
log(f"\n--- Complete ---")
log(f"Output: {result_path}")
return result_path
# No cached plan - create new one
plan = create_plan(
compiled,
inputs={},
recipe_dir=recipe_dir,
on_analysis=on_analysis,
)
log(f"\nPlan ID: {plan.plan_id[:16]}...")
log(f"Steps: {len(plan.steps)}")
log(f"Analysis tracks: {list(analysis_data.keys())}")
# Cache analysis tracks individually and replace with cache-id refs
_cache_analysis_tracks(plan)
# Save plan to unified cache
plan_sexp_str = plan.to_string(pretty=True)
plan_cache_id, plan_cid, plan_cache_path = unified_cache.plan_store(recipe_cid, params, plan_sexp_str)
log(f"Saved plan: {plan_cache_id[:16]}... → {plan_cid[:16]}...")
# Execute the plan using execute.py logic
log("\n--- Execution ---")
from execute import execute_plan
# Resolve cache-id refs back to full data for execution
resolved_analysis = _resolve_analysis_refs(plan.analysis)
plan_dict = {
"plan_id": plan.plan_id,
"source_hash": plan.source_hash,
"encoding": compiled.encoding,
"output_step_id": plan.output_step_id,
"analysis": {**resolved_analysis, **analysis_data},
"effects_registry": plan.effects_registry,
"minimal_primitives": plan.minimal_primitives,
"steps": [],
}
for step in plan.steps:
step_dict = {
"step_id": step.step_id,
"node_type": step.node_type,
"config": step.config,
"inputs": step.inputs,
"level": step.level,
"cache_id": step.cache_id,
}
# Tag with stage info if present
if step.stage:
step_dict["stage"] = step.stage
plan_dict["steps"].append(step_dict)
# Execute using unified cache
result_path = execute_plan(
plan_path=None,
output_path=output_path,
recipe_dir=recipe_dir,
plan_data=plan_dict,
external_analysis=analysis_data,
cache_dir=content_cache_dir,
)
log(f"\n--- Complete ---")
log(f"Output: {result_path}")
return result_path
def _run_non_staged(compiled, recipe_dir: Path, output_path: Optional[Path], verbose: bool) -> Path:
"""Run a non-staged recipe using the standard pipeline."""
from execute import execute_plan
from plan import plan_recipe
# This is a fallback for recipes without stages
# Just run through regular plan -> execute
raise NotImplementedError("Non-staged recipes should use plan.py | execute.py")
def list_cache(verbose: bool = False):
"""List all cached items using the unified cache."""
unified_cache.print_cache_listing(verbose)
def list_params(recipe_path: Path):
"""List available parameters for a recipe and its effects."""
from artdag.sexp import parse
from artdag.sexp.parser import Symbol, Keyword
from artdag.sexp.compiler import _parse_params
from artdag.sexp.effect_loader import load_sexp_effect_file
recipe_text = recipe_path.read_text()
sexp = parse(recipe_text)
if isinstance(sexp, list) and len(sexp) == 1:
sexp = sexp[0]
# Find recipe name
recipe_name = sexp[1] if len(sexp) > 1 and isinstance(sexp[1], str) else recipe_path.stem
# Find :params block and effect declarations
recipe_params = []
effect_declarations = {} # name -> path
i = 2
while i < len(sexp):
item = sexp[i]
if isinstance(item, Keyword) and item.name == "params":
if i + 1 < len(sexp):
recipe_params = _parse_params(sexp[i + 1])
i += 2
elif isinstance(item, list) and item:
# Check for effect declaration: (effect name :path "...")
if isinstance(item[0], Symbol) and item[0].name == "effect":
if len(item) >= 2:
effect_name = item[1].name if isinstance(item[1], Symbol) else item[1]
# Find :path
j = 2
while j < len(item):
if isinstance(item[j], Keyword) and item[j].name == "path":
if j + 1 < len(item):
effect_declarations[effect_name] = item[j + 1]
break
j += 1
i += 1
else:
i += 1
# Load effect params
effect_params = {} # effect_name -> list of ParamDef
recipe_dir = recipe_path.parent
for effect_name, effect_rel_path in effect_declarations.items():
effect_path = recipe_dir / effect_rel_path
if effect_path.exists() and effect_path.suffix == ".sexp":
try:
_, _, _, param_defs = load_sexp_effect_file(effect_path)
if param_defs:
effect_params[effect_name] = param_defs
except Exception as e:
print(f"Warning: Could not load params from effect {effect_name}: {e}", file=sys.stderr)
# Print results
def print_params(params, header_prefix=""):
print(f"{header_prefix}{'Name':<20} {'Type':<8} {'Default':<12} {'Range/Choices':<20} Description")
print(f"{header_prefix}{'-' * 88}")
for p in params:
range_str = ""
if p.range_min is not None and p.range_max is not None:
range_str = f"[{p.range_min}, {p.range_max}]"
elif p.choices:
range_str = ", ".join(p.choices[:3])
if len(p.choices) > 3:
range_str += "..."
default_str = str(p.default) if p.default is not None else "-"
if len(default_str) > 10:
default_str = default_str[:9] + ""
print(f"{header_prefix}{p.name:<20} {p.param_type:<8} {default_str:<12} {range_str:<20} {p.description}")
if recipe_params:
print(f"\nRecipe parameters for '{recipe_name}':\n")
print_params(recipe_params)
else:
print(f"\nRecipe '{recipe_name}' has no declared parameters.")
if effect_params:
for effect_name, params in effect_params.items():
print(f"\n\nEffect '{effect_name}' parameters:\n")
print_params(params)
if not recipe_params and not effect_params:
print("\nParameters can be declared using :params block:")
print("""
:params (
(color_mode :type string :default "color" :desc "Character color")
(char_size :type int :default 12 :range [4 32] :desc "Cell size")
)
""")
return
print("\n\nUsage:")
print(f" python3 run_staged.py {recipe_path} -p <name>=<value> [-p <name>=<value> ...]")
print(f"\nExample:")
all_params = recipe_params + [p for params in effect_params.values() for p in params]
if all_params:
p = all_params[0]
example_val = p.default if p.default else ("value" if p.param_type == "string" else "1")
print(f" python3 run_staged.py {recipe_path} -p {p.name}={example_val}")
def main():
import argparse
parser = argparse.ArgumentParser(
description="Run a staged recipe with stage-level caching",
formatter_class=argparse.RawDescriptionHelpFormatter,
epilog="""
Examples:
python3 run_staged.py effects/ascii_art_fx_staged.sexp --list-params
python3 run_staged.py effects/ascii_art_fx_staged.sexp -o output.mp4
python3 run_staged.py recipe.sexp -p color_mode=lime -p char_jitter=5
"""
)
parser.add_argument("recipe", type=Path, nargs="?", help="Recipe file (.sexp)")
parser.add_argument("-o", "--output", type=Path, help="Output file path")
parser.add_argument("-p", "--param", action="append", dest="params",
metavar="KEY=VALUE", help="Set recipe parameter")
parser.add_argument("-q", "--quiet", action="store_true", help="Suppress progress output")
parser.add_argument("--list-params", action="store_true", help="List available parameters and exit")
parser.add_argument("--list-cache", action="store_true", help="List cached items and exit")
parser.add_argument("--no-cache", action="store_true", help="Ignore cached plan, force re-planning")
parser.add_argument("--show-plan", action="store_true", help="Show the plan S-expression and exit (don't execute)")
parser.add_argument("-v", "--verbose", action="store_true", help="Verbose output")
parser.add_argument("-j", "--jobs", type=int, default=None,
help="Max parallel workers (default: 4, or ARTDAG_WORKERS env)")
parser.add_argument("--pipelines", type=int, default=None,
help="Max concurrent video pipelines (default: 1, or ARTDAG_VIDEO_PIPELINES env)")
args = parser.parse_args()
# Apply concurrency limits before any execution
if args.jobs is not None:
os.environ["ARTDAG_WORKERS"] = str(args.jobs)
if args.pipelines is not None:
os.environ["ARTDAG_VIDEO_PIPELINES"] = str(args.pipelines)
from execute import set_max_video_pipelines
set_max_video_pipelines(args.pipelines)
# List cache mode - doesn't require recipe
if args.list_cache:
list_cache(verbose=args.verbose)
sys.exit(0)
# All other modes require a recipe
if not args.recipe:
print("Error: recipe file required", file=sys.stderr)
sys.exit(1)
if not args.recipe.exists():
print(f"Recipe not found: {args.recipe}", file=sys.stderr)
sys.exit(1)
# List params mode
if args.list_params:
list_params(args.recipe)
sys.exit(0)
# Parse parameters
params = {}
if args.params:
for param_str in args.params:
if "=" not in param_str:
print(f"Invalid parameter format: {param_str}", file=sys.stderr)
sys.exit(1)
key, value = param_str.split("=", 1)
# Try to parse as number
try:
value = int(value)
except ValueError:
try:
value = float(value)
except ValueError:
pass # Keep as string
params[key] = value
# Show plan mode - generate plan and display without executing
if args.show_plan:
recipe_text = args.recipe.read_text()
recipe_dir = args.recipe.parent
# Compute recipe CID (content hash)
recipe_cid, _ = unified_cache.content_store_string(recipe_text)
compiled = compile_string(recipe_text, params if params else None, recipe_dir=recipe_dir)
# Check for cached plan using unified cache (keyed by source CID + params)
plan_cid = unified_cache.plan_exists(recipe_cid, params if params else None)
if plan_cid and not args.no_cache:
print(f";; Cached plan CID: {plan_cid}", file=sys.stderr)
plan_sexp_str = unified_cache.plan_load(recipe_cid, params if params else None)
print(plan_sexp_str)
else:
print(f";; Generating new plan...", file=sys.stderr)
analysis_data = {}
def on_analysis(node_id: str, results: dict):
analysis_data[node_id] = results
plan = create_plan(
compiled,
inputs={},
recipe_dir=recipe_dir,
on_analysis=on_analysis,
)
# Cache analysis tracks individually before serialization
_cache_analysis_tracks(plan)
plan_sexp_str = plan.to_string(pretty=True)
# Save to unified cache
cache_id, plan_cid, plan_path = unified_cache.plan_store(recipe_cid, params if params else None, plan_sexp_str)
print(f";; Saved: {cache_id[:16]}... → {plan_cid}", file=sys.stderr)
print(plan_sexp_str)
sys.exit(0)
result = run_staged_recipe(
recipe_path=args.recipe,
output_path=args.output,
params=params if params else None,
verbose=not args.quiet,
force_replan=args.no_cache,
)
# Print final output path
print(result)
if __name__ == "__main__":
main()