#!/usr/bin/env python3 """ Run analyzers from a recipe and output analysis data as S-expressions. Usage: analyze.py recipe.sexp [-o analysis.sexp] Output format: (analysis (beats-data :tempo 120.5 :times (0.0 0.5 1.0 1.5 ...) :duration 10.0) (bass-data :times (0.0 0.1 0.2 ...) :values (0.5 0.8 0.3 ...))) """ import sys import tempfile import subprocess import importlib.util from pathlib import Path # Add artdag to path sys.path.insert(0, str(Path(__file__).parent.parent / "artdag")) from artdag.sexp import compile_string, parse from artdag.sexp.parser import Symbol, Keyword, serialize def load_analyzer(analyzer_path: Path): """Load an analyzer module from file path.""" spec = importlib.util.spec_from_file_location("analyzer", analyzer_path) module = importlib.util.module_from_spec(spec) spec.loader.exec_module(module) return module def run_analyzer(analyzer_path: Path, input_path: Path, params: dict) -> dict: """Run an analyzer and return results.""" analyzer = load_analyzer(analyzer_path) return analyzer.analyze(input_path, params) def pre_execute_segment(source_path: Path, start: float, duration: float, work_dir: Path) -> Path: """Pre-execute a segment to get audio for analysis.""" suffix = source_path.suffix.lower() is_audio = suffix in ('.mp3', '.wav', '.flac', '.aac', '.ogg', '.m4a') output_ext = ".m4a" if is_audio else ".mp4" output_path = work_dir / f"segment{output_ext}" cmd = ["ffmpeg", "-y", "-i", str(source_path)] if start: cmd.extend(["-ss", str(start)]) if duration: cmd.extend(["-t", str(duration)]) if is_audio: cmd.extend(["-c:a", "aac", str(output_path)]) else: cmd.extend(["-c:v", "libx264", "-preset", "fast", "-crf", "18", "-c:a", "aac", str(output_path)]) subprocess.run(cmd, check=True, capture_output=True) return output_path def to_sexp(value, indent=0): """Convert a Python value to S-expression string.""" if isinstance(value, dict): if not value: return "()" items = [] for k, v in value.items(): key = k.replace('_', '-') items.append(f":{key} {to_sexp(v)}") return "(" + " ".join(items) + ")" elif isinstance(value, list): if not value: return "()" items = [to_sexp(v) for v in value] return "(" + " ".join(items) + ")" elif isinstance(value, str): return f'"{value}"' elif isinstance(value, bool): return "true" if value else "false" elif value is None: return "nil" elif isinstance(value, float): return f"{value:.6g}" else: return str(value) def analyze_recipe(recipe_path: Path, output_file: Path = None): """Run all analyzers in a recipe and output S-expression analysis data.""" recipe_text = recipe_path.read_text() recipe_dir = recipe_path.parent print(f"Compiling: {recipe_path}", file=sys.stderr) compiled = compile_string(recipe_text) print(f"Recipe: {compiled.name} v{compiled.version}", file=sys.stderr) # Find all ANALYZE nodes and their dependencies nodes_by_id = {n["id"]: n for n in compiled.nodes} # Track source paths and segment outputs source_paths = {} segment_outputs = {} analysis_results = {} work_dir = Path(tempfile.mkdtemp(prefix="artdag_analyze_")) # Process nodes in dependency order def get_input_path(node_id: str) -> Path: """Resolve the input path for a node.""" if node_id in segment_outputs: return segment_outputs[node_id] if node_id in source_paths: return source_paths[node_id] node = nodes_by_id.get(node_id) if not node: return None if node["type"] == "SOURCE": path = recipe_dir / node["config"].get("path", "") source_paths[node_id] = path.resolve() return source_paths[node_id] if node["type"] == "SEGMENT": inputs = node.get("inputs", []) if inputs: input_path = get_input_path(inputs[0]) if input_path: config = node.get("config", {}) start = config.get("start", 0) duration = config.get("duration") output = pre_execute_segment(input_path, start, duration, work_dir) segment_outputs[node_id] = output return output return None # Find and run all analyzers for node in compiled.nodes: if node["type"] == "ANALYZE": config = node.get("config", {}) analyzer_name = config.get("analyzer", "unknown") analyzer_path = config.get("analyzer_path") if not analyzer_path: print(f" Skipping {analyzer_name}: no path", file=sys.stderr) continue # Get input inputs = node.get("inputs", []) if not inputs: print(f" Skipping {analyzer_name}: no inputs", file=sys.stderr) continue input_path = get_input_path(inputs[0]) if not input_path or not input_path.exists(): print(f" Skipping {analyzer_name}: input not found", file=sys.stderr) continue # Run analyzer full_path = recipe_dir / analyzer_path params = {k: v for k, v in config.items() if k not in ("analyzer", "analyzer_path", "cid")} print(f" Running analyzer: {analyzer_name}", file=sys.stderr) results = run_analyzer(full_path, input_path, params) # Store by node ID for uniqueness (multiple analyzers may have same type) node_id = node.get("id") analysis_results[node_id] = results times = results.get("times", []) print(f" {len(times)} times @ {results.get('tempo', 0):.1f} BPM", file=sys.stderr) # Generate S-expression output lines = ["(analysis"] for name, data in analysis_results.items(): # Quote node IDs to prevent parser treating hex like "0e42..." as scientific notation lines.append(f' ("{name}"') for key, value in data.items(): sexp_key = key.replace('_', '-') sexp_value = to_sexp(value) lines.append(f" :{sexp_key} {sexp_value}") lines.append(" )") lines.append(")") output = "\n".join(lines) if output_file: output_file.write_text(output) print(f"\nAnalysis written to: {output_file}", file=sys.stderr) else: print(output) print(f"Debug: temp files in {work_dir}", file=sys.stderr) if __name__ == "__main__": import argparse parser = argparse.ArgumentParser(description="Run analyzers from recipe") parser.add_argument("recipe", type=Path, help="Recipe file (.sexp)") parser.add_argument("-o", "--output", type=Path, help="Output file (default: stdout)") args = parser.parse_args() if not args.recipe.exists(): print(f"Recipe not found: {args.recipe}", file=sys.stderr) sys.exit(1) analyze_recipe(args.recipe, args.output)