#!/usr/bin/env python3 """ Plan generator for S-expression recipes. Expands dynamic nodes (SLICE_ON) into primitives using analysis data. Outputs a plan that can be executed by execute.py. Usage: analyze.py recipe.sexp > analysis.sexp plan.py recipe.sexp --analysis analysis.sexp --sexp > plan.sexp execute.py plan.sexp --analysis analysis.sexp """ import sys import json from pathlib import Path # Add artdag to path sys.path.insert(0, str(Path(__file__).parent.parent / "artdag")) from artdag.sexp import compile_string, parse from artdag.sexp.planner import create_plan from artdag.sexp.parser import Binding, serialize as sexp_serialize, Symbol, Keyword def parse_analysis_sexp(content: str) -> dict: """Parse analysis S-expression into dict.""" sexp = parse(content) if isinstance(sexp, list) and len(sexp) == 1: sexp = sexp[0] if not isinstance(sexp, list) or not sexp: raise ValueError("Invalid analysis S-expression") # Should be (analysis (name ...) (name ...) ...) if not isinstance(sexp[0], Symbol) or sexp[0].name != "analysis": raise ValueError("Expected (analysis ...) S-expression") result = {} for item in sexp[1:]: if isinstance(item, list) and item: # Handle both Symbol names and quoted string names (node IDs) first = item[0] if isinstance(first, Symbol): name = first.name elif isinstance(first, str): name = first else: continue # Skip malformed entries data = {} i = 1 while i < len(item): if isinstance(item[i], Keyword): key = item[i].name.replace("-", "_") i += 1 if i < len(item): data[key] = item[i] i += 1 else: i += 1 result[name] = data return result def to_sexp(value, indent=0): """Convert a Python value to S-expression string.""" from artdag.sexp.parser import Lambda # Handle Binding objects if isinstance(value, Binding): # analysis_ref can be a string, node ID, or dict - serialize it properly if isinstance(value.analysis_ref, str): ref_str = f'"{value.analysis_ref}"' else: ref_str = to_sexp(value.analysis_ref, 0) s = f'(bind {ref_str} :range [{value.range_min} {value.range_max}]' if value.transform: s += f' :transform {value.transform}' return s + ')' # Handle binding dicts from compiler (convert to bind sexp format) if isinstance(value, dict) and value.get("_binding"): source = value.get("source", "") range_val = value.get("range", [0.0, 1.0]) range_min = range_val[0] if isinstance(range_val, list) else 0.0 range_max = range_val[1] if isinstance(range_val, list) and len(range_val) > 1 else 1.0 transform = value.get("transform") offset = value.get("offset") s = f'(bind "{source}" :range [{range_min} {range_max}]' if offset: s += f' :offset {offset}' if transform: s += f' :transform {transform}' return s + ')' # Handle Symbol - serialize as bare identifier if isinstance(value, Symbol): return value.name # Handle Keyword - serialize with colon prefix if isinstance(value, Keyword): return f':{value.name}' # Handle Lambda if isinstance(value, Lambda): params = " ".join(value.params) body = to_sexp(value.body, 0) return f'(fn [{params}] {body})' prefix = " " * indent if isinstance(value, dict): if not value: return "()" items = [] for k, v in value.items(): if isinstance(k, str): # Keys starting with _ are internal markers - keep underscore to avoid :-foo if k.startswith('_'): key_str = k # Keep as-is: _binding -> :_binding else: key_str = k.replace('_', '-') else: key_str = str(k) items.append(f":{key_str} {to_sexp(v, 0)}") return "(" + " ".join(items) + ")" elif isinstance(value, list): if not value: return "()" items = [to_sexp(v, 0) for v in value] return "(" + " ".join(items) + ")" elif isinstance(value, str): # Escape special characters in strings escaped = value.replace('\\', '\\\\').replace('"', '\\"').replace('\n', '\\n') return f'"{escaped}"' elif isinstance(value, bool): return "true" if value else "false" elif isinstance(value, (int, float)): return str(value) elif value is None: return "nil" else: # For any unknown type, convert to string and quote it return f'"{str(value)}"' def plan_recipe(recipe_path: Path, output_format: str = "text", output_file: Path = None, analysis_path: Path = None, params: dict = None): """Compile recipe, expand dynamic nodes using analysis, output plan. Args: recipe_path: Path to recipe file output_format: Output format (text, json, sexp) output_file: Optional output file path analysis_path: Optional pre-computed analysis file params: Optional dict of name -> value bindings to inject into compilation """ recipe_text = recipe_path.read_text() recipe_dir = recipe_path.parent print(f"Compiling: {recipe_path}", file=sys.stderr) if params: print(f"Parameters: {params}", file=sys.stderr) compiled = compile_string(recipe_text, params) print(f"Recipe: {compiled.name} v{compiled.version}", file=sys.stderr) print(f"Nodes: {len(compiled.nodes)}", file=sys.stderr) # Load pre-computed analysis if provided (file or stdin with -) pre_analysis = None if analysis_path: if str(analysis_path) == "-": print(f"Loading analysis: stdin", file=sys.stderr) analysis_text = sys.stdin.read() else: print(f"Loading analysis: {analysis_path}", file=sys.stderr) analysis_text = analysis_path.read_text() pre_analysis = parse_analysis_sexp(analysis_text) print(f" Tracks: {list(pre_analysis.keys())}", file=sys.stderr) # Track analysis results for embedding in plan analysis_data = {} def on_analysis(node_id, results): analysis_data[node_id] = results times = results.get("times", []) print(f" Analysis complete: {len(times)} beat times", file=sys.stderr) # Create plan (uses pre_analysis or runs analyzers, expands SLICE_ON) print("\n--- Planning ---", file=sys.stderr) plan = create_plan( compiled, inputs={}, recipe_dir=recipe_dir, on_analysis=on_analysis, pre_analysis=pre_analysis, ) print(f"\nPlan ID: {plan.plan_id[:16]}...", file=sys.stderr) print(f"Steps: {len(plan.steps)}", file=sys.stderr) # Generate output if output_format == "sexp": output = generate_sexp_output(compiled, plan, analysis_data) elif output_format == "json": output = generate_json_output(compiled, plan, analysis_data) else: output = generate_text_output(compiled, plan, analysis_data) # Write output if output_file: output_file.write_text(output) print(f"\nPlan written to: {output_file}", file=sys.stderr) else: print(output) class PlanJSONEncoder(json.JSONEncoder): """Custom encoder for plan objects.""" def default(self, obj): if isinstance(obj, Binding): return { "_type": "binding", "analysis_ref": obj.analysis_ref, "track": obj.track, "range_min": obj.range_min, "range_max": obj.range_max, "transform": obj.transform, } if isinstance(obj, Symbol): return {"_type": "symbol", "name": obj.name} if isinstance(obj, Keyword): return {"_type": "keyword", "name": obj.name} return super().default(obj) def generate_json_output(compiled, plan, analysis_data): """Generate JSON plan output.""" output = { "plan_id": plan.plan_id, "recipe_id": compiled.name, "recipe_hash": plan.recipe_hash, "encoding": compiled.encoding, "output_step_id": plan.output_step_id, "steps": [], } for step in plan.steps: step_dict = { "step_id": step.step_id, "node_type": step.node_type, "config": step.config, "inputs": step.inputs, "level": step.level, "cache_id": step.cache_id, } # Embed analysis results for ANALYZE steps if step.node_type == "ANALYZE" and step.step_id in analysis_data: step_dict["config"]["analysis_results"] = analysis_data[step.step_id] output["steps"].append(step_dict) return json.dumps(output, indent=2, cls=PlanJSONEncoder) def generate_sexp_output(compiled, plan, analysis_data): """Generate S-expression plan output.""" lines = [ f'(plan "{compiled.name}"', f' :version "{compiled.version}"', f' :plan-id "{plan.plan_id}"', ] if compiled.encoding: lines.append(f' :encoding {to_sexp(compiled.encoding)}') # Include analysis data for effect parameter bindings if plan.analysis: lines.append('') lines.append(' (analysis') for name, data in plan.analysis.items(): times = data.get("times", []) values = data.get("values", []) # Truncate for display but include all data times_str = " ".join(str(t) for t in times) values_str = " ".join(str(v) for v in values) lines.append(f' ({name}') lines.append(f' :times ({times_str})') lines.append(f' :values ({values_str}))') lines.append(' )') lines.append('') for step in plan.steps: lines.append(f' (step "{step.step_id}"') lines.append(f' :type {step.node_type}') lines.append(f' :level {step.level}') lines.append(f' :cache "{step.cache_id}"') if step.inputs: inputs_str = " ".join(f'"{i}"' for i in step.inputs) lines.append(f' :inputs ({inputs_str})') for key, value in step.config.items(): lines.append(f' :{key.replace("_", "-")} {to_sexp(value)}') lines.append(' )') lines.append('') lines.append(f' :output "{plan.output_step_id}")') return '\n'.join(lines) def generate_text_output(compiled, plan, analysis_data): """Generate human-readable text output.""" lines = [ f"Recipe: {compiled.name} v{compiled.version}", ] if compiled.encoding: lines.append(f"Encoding: {compiled.encoding}") lines.extend([ f"\nPlan ID: {plan.plan_id}", f"Output: {plan.output_step_id[:16]}...", f"\nSteps ({len(plan.steps)}):", "-" * 60, ]) for step in plan.steps: lines.append(f"\n[{step.level}] {step.node_type}") lines.append(f" id: {step.step_id[:16]}...") lines.append(f" cache: {step.cache_id[:16]}...") if step.inputs: lines.append(f" inputs: {[i[:16] + '...' for i in step.inputs]}") for key, value in step.config.items(): if key == "analysis_results": lines.append(f" {key}: <{len(value.get('times', []))} times>") else: lines.append(f" {key}: {value}") return '\n'.join(lines) def parse_param(param_str: str) -> tuple: """Parse a key=value parameter string. Args: param_str: String in format "key=value" Returns: Tuple of (key, parsed_value) where value is converted to int/float if possible """ if "=" not in param_str: raise ValueError(f"Invalid parameter format: {param_str} (expected key=value)") key, value = param_str.split("=", 1) key = key.strip() value = value.strip() # Try to parse as int try: return (key, int(value)) except ValueError: pass # Try to parse as float try: return (key, float(value)) except ValueError: pass # Return as string return (key, value) if __name__ == "__main__": import argparse parser = argparse.ArgumentParser(description="Generate execution plan from recipe") parser.add_argument("recipe", type=Path, help="Recipe file (.sexp)") parser.add_argument("-o", "--output", type=Path, help="Output file (default: stdout)") parser.add_argument("-a", "--analysis", type=Path, help="Pre-computed analysis file (.sexp)") parser.add_argument("-p", "--param", action="append", dest="params", metavar="KEY=VALUE", help="Set recipe parameter (can be used multiple times)") parser.add_argument("--json", action="store_true", help="Output JSON format") parser.add_argument("--text", action="store_true", help="Output human-readable text format") args = parser.parse_args() if not args.recipe.exists(): print(f"Recipe not found: {args.recipe}", file=sys.stderr) sys.exit(1) if args.analysis and str(args.analysis) != "-" and not args.analysis.exists(): print(f"Analysis file not found: {args.analysis}", file=sys.stderr) sys.exit(1) # Parse parameters params = {} if args.params: for param_str in args.params: try: key, value = parse_param(param_str) params[key] = value except ValueError as e: print(f"Error: {e}", file=sys.stderr) sys.exit(1) if args.json: fmt = "json" elif args.text: fmt = "text" else: fmt = "sexp" plan_recipe(args.recipe, fmt, args.output, args.analysis, params or None)