Files
rose-ash/artdag/test/plan.py
giles 1a74d811f7
All checks were successful
Build and Deploy / build-and-deploy (push) Successful in 2m33s
Incorporate art-dag-mono repo into artdag/ subfolder
Merges full history from art-dag/mono.git into the monorepo
under the artdag/ directory. Contains: core (DAG engine),
l1 (Celery rendering server), l2 (ActivityPub registry),
common (shared templates/middleware), client (CLI), test (e2e).

Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>

git-subtree-dir: artdag
git-subtree-mainline: 1a179de547
git-subtree-split: 4c2e716558
2026-02-27 09:07:23 +00:00

416 lines
14 KiB
Python

#!/usr/bin/env python3
"""
Plan generator for S-expression recipes.
Expands dynamic nodes (SLICE_ON) into primitives using analysis data.
Outputs a plan that can be executed by execute.py.
Usage:
analyze.py recipe.sexp > analysis.sexp
plan.py recipe.sexp --analysis analysis.sexp --sexp > plan.sexp
execute.py plan.sexp --analysis analysis.sexp
"""
import sys
import json
from pathlib import Path
# Add artdag to path
sys.path.insert(0, str(Path(__file__).parent.parent / "artdag"))
from artdag.sexp import compile_string, parse
from artdag.sexp.planner import create_plan
from artdag.sexp.parser import Binding, serialize as sexp_serialize, Symbol, Keyword
def parse_analysis_sexp(content: str) -> dict:
"""Parse analysis S-expression into dict."""
sexp = parse(content)
if isinstance(sexp, list) and len(sexp) == 1:
sexp = sexp[0]
if not isinstance(sexp, list) or not sexp:
raise ValueError("Invalid analysis S-expression")
# Should be (analysis (name ...) (name ...) ...)
if not isinstance(sexp[0], Symbol) or sexp[0].name != "analysis":
raise ValueError("Expected (analysis ...) S-expression")
result = {}
for item in sexp[1:]:
if isinstance(item, list) and item:
# Handle both Symbol names and quoted string names (node IDs)
first = item[0]
if isinstance(first, Symbol):
name = first.name
elif isinstance(first, str):
name = first
else:
continue # Skip malformed entries
data = {}
i = 1
while i < len(item):
if isinstance(item[i], Keyword):
key = item[i].name.replace("-", "_")
i += 1
if i < len(item):
data[key] = item[i]
i += 1
else:
i += 1
result[name] = data
return result
def to_sexp(value, indent=0):
"""Convert a Python value to S-expression string."""
from artdag.sexp.parser import Lambda
# Handle Binding objects
if isinstance(value, Binding):
# analysis_ref can be a string, node ID, or dict - serialize it properly
if isinstance(value.analysis_ref, str):
ref_str = f'"{value.analysis_ref}"'
else:
ref_str = to_sexp(value.analysis_ref, 0)
s = f'(bind {ref_str} :range [{value.range_min} {value.range_max}]'
if value.transform:
s += f' :transform {value.transform}'
return s + ')'
# Handle binding dicts from compiler (convert to bind sexp format)
if isinstance(value, dict) and value.get("_binding"):
source = value.get("source", "")
range_val = value.get("range", [0.0, 1.0])
range_min = range_val[0] if isinstance(range_val, list) else 0.0
range_max = range_val[1] if isinstance(range_val, list) and len(range_val) > 1 else 1.0
transform = value.get("transform")
offset = value.get("offset")
s = f'(bind "{source}" :range [{range_min} {range_max}]'
if offset:
s += f' :offset {offset}'
if transform:
s += f' :transform {transform}'
return s + ')'
# Handle Symbol - serialize as bare identifier
if isinstance(value, Symbol):
return value.name
# Handle Keyword - serialize with colon prefix
if isinstance(value, Keyword):
return f':{value.name}'
# Handle Lambda
if isinstance(value, Lambda):
params = " ".join(value.params)
body = to_sexp(value.body, 0)
return f'(fn [{params}] {body})'
prefix = " " * indent
if isinstance(value, dict):
if not value:
return "()"
items = []
for k, v in value.items():
if isinstance(k, str):
# Keys starting with _ are internal markers - keep underscore to avoid :-foo
if k.startswith('_'):
key_str = k # Keep as-is: _binding -> :_binding
else:
key_str = k.replace('_', '-')
else:
key_str = str(k)
items.append(f":{key_str} {to_sexp(v, 0)}")
return "(" + " ".join(items) + ")"
elif isinstance(value, list):
if not value:
return "()"
items = [to_sexp(v, 0) for v in value]
return "(" + " ".join(items) + ")"
elif isinstance(value, str):
# Escape special characters in strings
escaped = value.replace('\\', '\\\\').replace('"', '\\"').replace('\n', '\\n')
return f'"{escaped}"'
elif isinstance(value, bool):
return "true" if value else "false"
elif isinstance(value, (int, float)):
return str(value)
elif value is None:
return "nil"
else:
# For any unknown type, convert to string and quote it
return f'"{str(value)}"'
def plan_recipe(recipe_path: Path, output_format: str = "text", output_file: Path = None, analysis_path: Path = None, params: dict = None):
"""Compile recipe, expand dynamic nodes using analysis, output plan.
Args:
recipe_path: Path to recipe file
output_format: Output format (text, json, sexp)
output_file: Optional output file path
analysis_path: Optional pre-computed analysis file
params: Optional dict of name -> value bindings to inject into compilation
"""
recipe_text = recipe_path.read_text()
recipe_dir = recipe_path.parent
print(f"Compiling: {recipe_path}", file=sys.stderr)
if params:
print(f"Parameters: {params}", file=sys.stderr)
compiled = compile_string(recipe_text, params)
print(f"Recipe: {compiled.name} v{compiled.version}", file=sys.stderr)
print(f"Nodes: {len(compiled.nodes)}", file=sys.stderr)
# Load pre-computed analysis if provided (file or stdin with -)
pre_analysis = None
if analysis_path:
if str(analysis_path) == "-":
print(f"Loading analysis: stdin", file=sys.stderr)
analysis_text = sys.stdin.read()
else:
print(f"Loading analysis: {analysis_path}", file=sys.stderr)
analysis_text = analysis_path.read_text()
pre_analysis = parse_analysis_sexp(analysis_text)
print(f" Tracks: {list(pre_analysis.keys())}", file=sys.stderr)
# Track analysis results for embedding in plan
analysis_data = {}
def on_analysis(node_id, results):
analysis_data[node_id] = results
times = results.get("times", [])
print(f" Analysis complete: {len(times)} beat times", file=sys.stderr)
# Create plan (uses pre_analysis or runs analyzers, expands SLICE_ON)
print("\n--- Planning ---", file=sys.stderr)
plan = create_plan(
compiled,
inputs={},
recipe_dir=recipe_dir,
on_analysis=on_analysis,
pre_analysis=pre_analysis,
)
print(f"\nPlan ID: {plan.plan_id[:16]}...", file=sys.stderr)
print(f"Steps: {len(plan.steps)}", file=sys.stderr)
# Generate output
if output_format == "sexp":
output = generate_sexp_output(compiled, plan, analysis_data)
elif output_format == "json":
output = generate_json_output(compiled, plan, analysis_data)
else:
output = generate_text_output(compiled, plan, analysis_data)
# Write output
if output_file:
output_file.write_text(output)
print(f"\nPlan written to: {output_file}", file=sys.stderr)
else:
print(output)
class PlanJSONEncoder(json.JSONEncoder):
"""Custom encoder for plan objects."""
def default(self, obj):
if isinstance(obj, Binding):
return {
"_type": "binding",
"analysis_ref": obj.analysis_ref,
"track": obj.track,
"range_min": obj.range_min,
"range_max": obj.range_max,
"transform": obj.transform,
}
if isinstance(obj, Symbol):
return {"_type": "symbol", "name": obj.name}
if isinstance(obj, Keyword):
return {"_type": "keyword", "name": obj.name}
return super().default(obj)
def generate_json_output(compiled, plan, analysis_data):
"""Generate JSON plan output."""
output = {
"plan_id": plan.plan_id,
"recipe_id": compiled.name,
"recipe_hash": plan.recipe_hash,
"encoding": compiled.encoding,
"output_step_id": plan.output_step_id,
"steps": [],
}
for step in plan.steps:
step_dict = {
"step_id": step.step_id,
"node_type": step.node_type,
"config": step.config,
"inputs": step.inputs,
"level": step.level,
"cache_id": step.cache_id,
}
# Embed analysis results for ANALYZE steps
if step.node_type == "ANALYZE" and step.step_id in analysis_data:
step_dict["config"]["analysis_results"] = analysis_data[step.step_id]
output["steps"].append(step_dict)
return json.dumps(output, indent=2, cls=PlanJSONEncoder)
def generate_sexp_output(compiled, plan, analysis_data):
"""Generate S-expression plan output."""
lines = [
f'(plan "{compiled.name}"',
f' :version "{compiled.version}"',
f' :plan-id "{plan.plan_id}"',
]
if compiled.encoding:
lines.append(f' :encoding {to_sexp(compiled.encoding)}')
# Include analysis data for effect parameter bindings
if plan.analysis:
lines.append('')
lines.append(' (analysis')
for name, data in plan.analysis.items():
times = data.get("times", [])
values = data.get("values", [])
# Truncate for display but include all data
times_str = " ".join(str(t) for t in times)
values_str = " ".join(str(v) for v in values)
lines.append(f' ({name}')
lines.append(f' :times ({times_str})')
lines.append(f' :values ({values_str}))')
lines.append(' )')
lines.append('')
for step in plan.steps:
lines.append(f' (step "{step.step_id}"')
lines.append(f' :type {step.node_type}')
lines.append(f' :level {step.level}')
lines.append(f' :cache "{step.cache_id}"')
if step.inputs:
inputs_str = " ".join(f'"{i}"' for i in step.inputs)
lines.append(f' :inputs ({inputs_str})')
for key, value in step.config.items():
lines.append(f' :{key.replace("_", "-")} {to_sexp(value)}')
lines.append(' )')
lines.append('')
lines.append(f' :output "{plan.output_step_id}")')
return '\n'.join(lines)
def generate_text_output(compiled, plan, analysis_data):
"""Generate human-readable text output."""
lines = [
f"Recipe: {compiled.name} v{compiled.version}",
]
if compiled.encoding:
lines.append(f"Encoding: {compiled.encoding}")
lines.extend([
f"\nPlan ID: {plan.plan_id}",
f"Output: {plan.output_step_id[:16]}...",
f"\nSteps ({len(plan.steps)}):",
"-" * 60,
])
for step in plan.steps:
lines.append(f"\n[{step.level}] {step.node_type}")
lines.append(f" id: {step.step_id[:16]}...")
lines.append(f" cache: {step.cache_id[:16]}...")
if step.inputs:
lines.append(f" inputs: {[i[:16] + '...' for i in step.inputs]}")
for key, value in step.config.items():
if key == "analysis_results":
lines.append(f" {key}: <{len(value.get('times', []))} times>")
else:
lines.append(f" {key}: {value}")
return '\n'.join(lines)
def parse_param(param_str: str) -> tuple:
"""Parse a key=value parameter string.
Args:
param_str: String in format "key=value"
Returns:
Tuple of (key, parsed_value) where value is converted to int/float if possible
"""
if "=" not in param_str:
raise ValueError(f"Invalid parameter format: {param_str} (expected key=value)")
key, value = param_str.split("=", 1)
key = key.strip()
value = value.strip()
# Try to parse as int
try:
return (key, int(value))
except ValueError:
pass
# Try to parse as float
try:
return (key, float(value))
except ValueError:
pass
# Return as string
return (key, value)
if __name__ == "__main__":
import argparse
parser = argparse.ArgumentParser(description="Generate execution plan from recipe")
parser.add_argument("recipe", type=Path, help="Recipe file (.sexp)")
parser.add_argument("-o", "--output", type=Path, help="Output file (default: stdout)")
parser.add_argument("-a", "--analysis", type=Path, help="Pre-computed analysis file (.sexp)")
parser.add_argument("-p", "--param", action="append", dest="params", metavar="KEY=VALUE",
help="Set recipe parameter (can be used multiple times)")
parser.add_argument("--json", action="store_true", help="Output JSON format")
parser.add_argument("--text", action="store_true", help="Output human-readable text format")
args = parser.parse_args()
if not args.recipe.exists():
print(f"Recipe not found: {args.recipe}", file=sys.stderr)
sys.exit(1)
if args.analysis and str(args.analysis) != "-" and not args.analysis.exists():
print(f"Analysis file not found: {args.analysis}", file=sys.stderr)
sys.exit(1)
# Parse parameters
params = {}
if args.params:
for param_str in args.params:
try:
key, value = parse_param(param_str)
params[key] = value
except ValueError as e:
print(f"Error: {e}", file=sys.stderr)
sys.exit(1)
if args.json:
fmt = "json"
elif args.text:
fmt = "text"
else:
fmt = "sexp"
plan_recipe(args.recipe, fmt, args.output, args.analysis, params or None)