""" Tests for DAG transformation and input binding. These tests verify the critical path that was causing bugs: - Node transformation from compiled format to artdag format - Asset/effect CID resolution from registry - Variable input name mapping - Input binding """ import json import logging import pytest from typing import Any, Dict, List, Optional, Tuple # Standalone implementations of the functions for testing # This avoids importing the full app which requires external dependencies logger = logging.getLogger(__name__) def is_variable_input(config: Dict[str, Any]) -> bool: """Check if a SOURCE node config represents a variable input.""" return bool(config.get("input")) def transform_node( node: Dict[str, Any], assets: Dict[str, Dict[str, Any]], effects: Dict[str, Dict[str, Any]], ) -> Dict[str, Any]: """Transform a compiled node to artdag execution format.""" node_id = node.get("id", "") config = dict(node.get("config", {})) if node.get("type") == "SOURCE" and "asset" in config: asset_name = config["asset"] if asset_name in assets: config["cid"] = assets[asset_name].get("cid") if node.get("type") == "EFFECT" and "effect" in config: effect_name = config["effect"] if effect_name in effects: config["cid"] = effects[effect_name].get("cid") return { "node_id": node_id, "node_type": node.get("type", "EFFECT"), "config": config, "inputs": node.get("inputs", []), "name": node.get("name"), } def build_input_name_mapping(nodes: Dict[str, Dict[str, Any]]) -> Dict[str, str]: """Build a mapping from input names to node IDs for variable inputs.""" input_name_to_node: Dict[str, str] = {} for node_id, node in nodes.items(): if node.get("node_type") != "SOURCE": continue config = node.get("config", {}) if not is_variable_input(config): continue input_name_to_node[node_id] = node_id name = config.get("name") if name: input_name_to_node[name] = node_id input_name_to_node[name.lower().replace(" ", "_")] = node_id input_name_to_node[name.lower().replace(" ", "-")] = node_id node_name = node.get("name") if node_name: input_name_to_node[node_name] = node_id input_name_to_node[node_name.replace("-", "_")] = node_id return input_name_to_node def bind_inputs( nodes: Dict[str, Dict[str, Any]], input_name_to_node: Dict[str, str], user_inputs: Dict[str, str], ) -> List[str]: """Bind user-provided input CIDs to source nodes.""" warnings: List[str] = [] for input_name, cid in user_inputs.items(): if input_name in nodes: node = nodes[input_name] if node.get("node_type") == "SOURCE": node["config"]["cid"] = cid continue if input_name in input_name_to_node: node_id = input_name_to_node[input_name] node = nodes[node_id] node["config"]["cid"] = cid continue warnings.append(f"Input '{input_name}' not found in recipe") return warnings def prepare_dag_for_execution( recipe: Dict[str, Any], user_inputs: Dict[str, str], ) -> Tuple[str, List[str]]: """Prepare a recipe DAG for execution.""" recipe_dag = recipe.get("dag") if not recipe_dag or not isinstance(recipe_dag, dict): raise ValueError("Recipe has no DAG definition") dag_copy = json.loads(json.dumps(recipe_dag)) nodes = dag_copy.get("nodes", {}) registry = recipe.get("registry", {}) assets = registry.get("assets", {}) if registry else {} effects = registry.get("effects", {}) if registry else {} if isinstance(nodes, list): nodes_dict: Dict[str, Dict[str, Any]] = {} for node in nodes: node_id = node.get("id") if node_id: nodes_dict[node_id] = transform_node(node, assets, effects) nodes = nodes_dict dag_copy["nodes"] = nodes input_name_to_node = build_input_name_mapping(nodes) warnings = bind_inputs(nodes, input_name_to_node, user_inputs) if "output" in dag_copy: dag_copy["output_id"] = dag_copy.pop("output") if "metadata" not in dag_copy: dag_copy["metadata"] = {} return json.dumps(dag_copy), warnings class TestTransformNode: """Tests for transform_node function.""" def test_source_node_with_asset_resolves_cid( self, sample_registry: Dict[str, Dict[str, Any]], ) -> None: """SOURCE nodes with asset reference should get CID from registry.""" node = { "id": "source_1", "type": "SOURCE", "config": {"asset": "cat"}, "inputs": [], } assets = sample_registry["assets"] effects = sample_registry["effects"] result = transform_node(node, assets, effects) assert result["node_id"] == "source_1" assert result["node_type"] == "SOURCE" assert result["config"]["cid"] == "QmXrj6tSSn1vQXxxEY2Tyoudvt4CeeqR9gGQwSt7WFrhMZ" def test_effect_node_resolves_cid( self, sample_registry: Dict[str, Dict[str, Any]], ) -> None: """EFFECT nodes should get CID from registry.""" node = { "id": "effect_1", "type": "EFFECT", "config": {"effect": "invert", "intensity": 1.0}, "inputs": ["source_1"], } assets = sample_registry["assets"] effects = sample_registry["effects"] result = transform_node(node, assets, effects) assert result["node_id"] == "effect_1" assert result["node_type"] == "EFFECT" assert result["config"]["effect"] == "invert" assert result["config"]["cid"] == "QmPWaW5E5WFrmDjT6w8enqvtJhM8c5jvQu7XN1doHA3Z7J" assert result["config"]["intensity"] == 1.0 def test_variable_input_node_preserves_config( self, sample_registry: Dict[str, Dict[str, Any]], ) -> None: """Variable input SOURCE nodes should preserve their config.""" node = { "id": "source_2", "type": "SOURCE", "config": { "input": True, "name": "Second Video", "description": "User-provided video", }, "inputs": [], "name": "second-video", } assets = sample_registry["assets"] effects = sample_registry["effects"] result = transform_node(node, assets, effects) assert result["config"]["input"] is True assert result["config"]["name"] == "Second Video" assert result["name"] == "second-video" # No CID yet - will be bound at runtime assert "cid" not in result["config"] def test_unknown_asset_not_resolved( self, sample_registry: Dict[str, Dict[str, Any]], ) -> None: """Unknown assets should not crash, just not get CID.""" node = { "id": "source_1", "type": "SOURCE", "config": {"asset": "unknown_asset"}, "inputs": [], } assets = sample_registry["assets"] effects = sample_registry["effects"] result = transform_node(node, assets, effects) assert "cid" not in result["config"] class TestBuildInputNameMapping: """Tests for build_input_name_mapping function.""" def test_maps_by_node_id(self) -> None: """Should map by node_id.""" nodes = { "source_2": { "node_id": "source_2", "node_type": "SOURCE", "config": {"input": True, "name": "Test"}, "inputs": [], } } mapping = build_input_name_mapping(nodes) assert mapping["source_2"] == "source_2" def test_maps_by_config_name(self) -> None: """Should map by config.name with various formats.""" nodes = { "source_2": { "node_id": "source_2", "node_type": "SOURCE", "config": {"input": True, "name": "Second Video"}, "inputs": [], } } mapping = build_input_name_mapping(nodes) assert mapping["Second Video"] == "source_2" assert mapping["second_video"] == "source_2" assert mapping["second-video"] == "source_2" def test_maps_by_def_name(self) -> None: """Should map by node.name (def binding name).""" nodes = { "source_2": { "node_id": "source_2", "node_type": "SOURCE", "config": {"input": True, "name": "Second Video"}, "inputs": [], "name": "inverted-video", } } mapping = build_input_name_mapping(nodes) assert mapping["inverted-video"] == "source_2" assert mapping["inverted_video"] == "source_2" def test_ignores_non_source_nodes(self) -> None: """Should not include non-SOURCE nodes.""" nodes = { "effect_1": { "node_id": "effect_1", "node_type": "EFFECT", "config": {"effect": "invert"}, "inputs": [], } } mapping = build_input_name_mapping(nodes) assert "effect_1" not in mapping def test_ignores_fixed_sources(self) -> None: """Should not include SOURCE nodes without 'input' flag.""" nodes = { "source_1": { "node_id": "source_1", "node_type": "SOURCE", "config": {"asset": "cat", "cid": "Qm123"}, "inputs": [], } } mapping = build_input_name_mapping(nodes) assert "source_1" not in mapping class TestBindInputs: """Tests for bind_inputs function.""" def test_binds_by_direct_node_id(self) -> None: """Should bind when using node_id directly.""" nodes = { "source_2": { "node_id": "source_2", "node_type": "SOURCE", "config": {"input": True, "name": "Test"}, "inputs": [], } } mapping = {"source_2": "source_2"} user_inputs = {"source_2": "QmUserInput123"} warnings = bind_inputs(nodes, mapping, user_inputs) assert nodes["source_2"]["config"]["cid"] == "QmUserInput123" assert len(warnings) == 0 def test_binds_by_name_lookup(self) -> None: """Should bind when using input name.""" nodes = { "source_2": { "node_id": "source_2", "node_type": "SOURCE", "config": {"input": True, "name": "Second Video"}, "inputs": [], } } mapping = { "source_2": "source_2", "Second Video": "source_2", "second-video": "source_2", } user_inputs = {"second-video": "QmUserInput123"} warnings = bind_inputs(nodes, mapping, user_inputs) assert nodes["source_2"]["config"]["cid"] == "QmUserInput123" assert len(warnings) == 0 def test_warns_on_unknown_input(self) -> None: """Should warn when input name not found.""" nodes = { "source_2": { "node_id": "source_2", "node_type": "SOURCE", "config": {"input": True, "name": "Test"}, "inputs": [], } } mapping = {"source_2": "source_2", "Test": "source_2"} user_inputs = {"unknown-input": "QmUserInput123"} warnings = bind_inputs(nodes, mapping, user_inputs) assert len(warnings) == 1 assert "unknown-input" in warnings[0] class TestRegressions: """Tests for specific bugs that were found in production.""" def test_effect_cid_key_not_effect_hash( self, sample_registry: Dict[str, Dict[str, Any]], ) -> None: """ Regression test: Effect CID must use 'cid' key, not 'effect_hash'. Bug found 2026-01-12: The transform_node function was setting config["effect_hash"] but the executor looks for config["cid"]. This caused "Unknown effect: invert" errors. """ node = { "id": "effect_1", "type": "EFFECT", "config": {"effect": "invert"}, "inputs": ["source_1"], } assets = sample_registry["assets"] effects = sample_registry["effects"] result = transform_node(node, assets, effects) # MUST use 'cid' key - executor checks config.get("cid") assert "cid" in result["config"], "Effect CID must be stored as 'cid'" assert "effect_hash" not in result["config"], "Must not use 'effect_hash' key" assert result["config"]["cid"] == "QmPWaW5E5WFrmDjT6w8enqvtJhM8c5jvQu7XN1doHA3Z7J" def test_source_cid_binding_persists( self, sample_recipe: Dict[str, Any], ) -> None: """ Regression test: Bound CIDs must appear in final DAG JSON. Bug found 2026-01-12: Variable input CIDs were being bound but not appearing in the serialized DAG sent to the executor. """ user_inputs = {"second-video": "QmTestUserInput123"} dag_json, _ = prepare_dag_for_execution(sample_recipe, user_inputs) dag = json.loads(dag_json) # The bound CID must be in the final JSON source_2 = dag["nodes"]["source_2"] assert source_2["config"]["cid"] == "QmTestUserInput123" class TestPrepareDagForExecution: """Integration tests for prepare_dag_for_execution.""" def test_full_pipeline(self, sample_recipe: Dict[str, Any]) -> None: """Test the full DAG preparation pipeline.""" user_inputs = { "second-video": "QmS4885aRikrjDB4yHPg9yTiPcBFWadZKVfAEvUy7B32zS" } dag_json, warnings = prepare_dag_for_execution(sample_recipe, user_inputs) # Parse result dag = json.loads(dag_json) # Check structure assert "nodes" in dag assert "output_id" in dag assert dag["output_id"] == "sequence_1" # Check fixed source has CID source_1 = dag["nodes"]["source_1"] assert source_1["config"]["cid"] == "QmXrj6tSSn1vQXxxEY2Tyoudvt4CeeqR9gGQwSt7WFrhMZ" # Check variable input was bound source_2 = dag["nodes"]["source_2"] assert source_2["config"]["cid"] == "QmS4885aRikrjDB4yHPg9yTiPcBFWadZKVfAEvUy7B32zS" # Check effects have CIDs effect_1 = dag["nodes"]["effect_1"] assert effect_1["config"]["cid"] == "QmcWhw6wbHr1GDmorM2KDz8S3yCGTfjuyPR6y8khS2tvko" effect_2 = dag["nodes"]["effect_2"] assert effect_2["config"]["cid"] == "QmPWaW5E5WFrmDjT6w8enqvtJhM8c5jvQu7XN1doHA3Z7J" assert effect_2["config"]["intensity"] == 1.0 # No warnings assert len(warnings) == 0 def test_missing_input_produces_warning( self, sample_recipe: Dict[str, Any], ) -> None: """Missing inputs should produce warnings but not fail.""" user_inputs = {} # No inputs provided dag_json, warnings = prepare_dag_for_execution(sample_recipe, user_inputs) # Should still produce valid JSON dag = json.loads(dag_json) assert "nodes" in dag # Variable input should not have CID source_2 = dag["nodes"]["source_2"] assert "cid" not in source_2["config"] def test_raises_on_missing_dag(self) -> None: """Should raise ValueError if recipe has no DAG.""" recipe = {"name": "broken", "registry": {}} with pytest.raises(ValueError, match="no DAG"): prepare_dag_for_execution(recipe, {})