From 56009c391deb78b3e16e6dce4151bc9dd583efd7 Mon Sep 17 00:00:00 2001 From: gilesb Date: Mon, 12 Jan 2026 09:37:06 +0000 Subject: [PATCH] Add testing infrastructure and refactor DAG transformation Testing setup: - Add pyproject.toml with mypy and pytest configuration - Add requirements-dev.txt for development dependencies - Create tests/ directory with test fixtures - Add 17 unit tests for DAG transformation pipeline Type annotations: - Add app/types.py with TypedDict definitions for node configs - Add typed helper functions: transform_node, build_input_name_mapping, bind_inputs, prepare_dag_for_execution - Refactor run_recipe to use the new typed helpers Regression tests for today's bugs: - test_effect_cid_key_not_effect_hash: Verifies CID uses 'cid' key - test_source_cid_binding_persists: Verifies bound CIDs in final DAG Run tests with: pytest tests/ -v Co-Authored-By: Claude Opus 4.5 --- app/routers/recipes.py | 293 +++++++++++++-------- app/types.py | 160 ++++++++++++ pyproject.toml | 51 ++++ requirements-dev.txt | 16 ++ tests/__init__.py | 1 + tests/conftest.py | 93 +++++++ tests/test_dag_transform.py | 492 ++++++++++++++++++++++++++++++++++++ 7 files changed, 996 insertions(+), 110 deletions(-) create mode 100644 app/types.py create mode 100644 pyproject.toml create mode 100644 requirements-dev.txt create mode 100644 tests/__init__.py create mode 100644 tests/conftest.py create mode 100644 tests/test_dag_transform.py diff --git a/app/routers/recipes.py b/app/routers/recipes.py index 34578d7..45b3b4a 100644 --- a/app/routers/recipes.py +++ b/app/routers/recipes.py @@ -4,8 +4,9 @@ Recipe management routes for L1 server. Handles recipe upload, listing, viewing, and execution. """ +import json import logging -from typing import List, Optional +from typing import Any, Dict, List, Optional, Tuple from fastapi import APIRouter, Request, Depends, HTTPException, UploadFile, File from fastapi.responses import HTMLResponse @@ -18,6 +19,10 @@ from artdag_common.middleware.auth import UserContext from ..dependencies import require_auth, get_templates, get_redis_client, get_cache_manager from ..services.auth_service import AuthService from ..services.recipe_service import RecipeService +from ..types import ( + CompiledNode, TransformedNode, Registry, Recipe, + is_variable_input, get_effect_cid, +) router = APIRouter() logger = logging.getLogger(__name__) @@ -30,14 +35,184 @@ class RecipeUploadRequest(BaseModel): class RecipeRunRequest(BaseModel): - inputs: dict = {} + """Request to run a recipe with variable inputs.""" + inputs: Dict[str, str] = {} # Map input names to CIDs -def get_recipe_service(): +def get_recipe_service() -> RecipeService: """Get recipe service instance.""" return RecipeService(get_redis_client(), get_cache_manager()) +def transform_node( + node: CompiledNode, + assets: Dict[str, Dict[str, Any]], + effects: Dict[str, Dict[str, Any]], +) -> TransformedNode: + """ + Transform a compiled node to artdag execution format. + + - Resolves asset references to CIDs for SOURCE nodes + - Resolves effect references to CIDs for EFFECT nodes + - Renames 'type' to 'node_type', 'id' to 'node_id' + """ + node_id = node.get("id", "") + config = dict(node.get("config", {})) # Copy to avoid mutation + + # Resolve asset references for SOURCE nodes + if node.get("type") == "SOURCE" and "asset" in config: + asset_name = config["asset"] + if asset_name in assets: + config["cid"] = assets[asset_name].get("cid") + + # Resolve effect references for EFFECT nodes + if node.get("type") == "EFFECT" and "effect" in config: + effect_name = config["effect"] + if effect_name in effects: + config["cid"] = effects[effect_name].get("cid") + + return { + "node_id": node_id, + "node_type": node.get("type", "EFFECT"), + "config": config, + "inputs": node.get("inputs", []), + "name": node.get("name"), + } + + +def build_input_name_mapping( + nodes: Dict[str, TransformedNode], +) -> Dict[str, str]: + """ + Build a mapping from input names to node IDs for variable inputs. + + Variable inputs can be referenced by: + - node_id directly + - config.name (e.g., "Second Video") + - snake_case version (e.g., "second_video") + - kebab-case version (e.g., "second-video") + - node.name (def binding name) + """ + input_name_to_node: Dict[str, str] = {} + + for node_id, node in nodes.items(): + if node.get("node_type") != "SOURCE": + continue + + config = node.get("config", {}) + if not is_variable_input(config): + continue + + # Map by node_id + input_name_to_node[node_id] = node_id + + # Map by config.name + name = config.get("name") + if name: + input_name_to_node[name] = node_id + input_name_to_node[name.lower().replace(" ", "_")] = node_id + input_name_to_node[name.lower().replace(" ", "-")] = node_id + + # Map by node.name (def binding) + node_name = node.get("name") + if node_name: + input_name_to_node[node_name] = node_id + input_name_to_node[node_name.replace("-", "_")] = node_id + + return input_name_to_node + + +def bind_inputs( + nodes: Dict[str, TransformedNode], + input_name_to_node: Dict[str, str], + user_inputs: Dict[str, str], +) -> List[str]: + """ + Bind user-provided input CIDs to source nodes. + + Returns list of warnings for inputs that couldn't be bound. + """ + warnings: List[str] = [] + + for input_name, cid in user_inputs.items(): + # Try direct node ID match first + if input_name in nodes: + node = nodes[input_name] + if node.get("node_type") == "SOURCE": + node["config"]["cid"] = cid + logger.info(f"Bound input {input_name} directly to node, cid={cid[:16]}...") + continue + + # Try input name lookup + if input_name in input_name_to_node: + node_id = input_name_to_node[input_name] + node = nodes[node_id] + node["config"]["cid"] = cid + logger.info(f"Bound input {input_name} via lookup to node {node_id}, cid={cid[:16]}...") + continue + + # Input not found + warnings.append(f"Input '{input_name}' not found in recipe") + logger.warning(f"Input {input_name} not found in nodes or input_name_to_node") + + return warnings + + +def prepare_dag_for_execution( + recipe: Recipe, + user_inputs: Dict[str, str], +) -> Tuple[str, List[str]]: + """ + Prepare a recipe DAG for execution by transforming nodes and binding inputs. + + Returns (dag_json, warnings). + """ + recipe_dag = recipe.get("dag") + if not recipe_dag or not isinstance(recipe_dag, dict): + raise ValueError("Recipe has no DAG definition") + + # Deep copy to avoid mutating original + dag_copy = json.loads(json.dumps(recipe_dag)) + nodes = dag_copy.get("nodes", {}) + + # Get registry for resolving references + registry = recipe.get("registry", {}) + assets = registry.get("assets", {}) if registry else {} + effects = registry.get("effects", {}) if registry else {} + + # Transform nodes from list to dict if needed + if isinstance(nodes, list): + nodes_dict: Dict[str, TransformedNode] = {} + for node in nodes: + node_id = node.get("id") + if node_id: + nodes_dict[node_id] = transform_node(node, assets, effects) + nodes = nodes_dict + dag_copy["nodes"] = nodes + + # Build input name mapping and bind user inputs + input_name_to_node = build_input_name_mapping(nodes) + logger.info(f"Input name to node mapping: {input_name_to_node}") + logger.info(f"User-provided inputs: {user_inputs}") + + warnings = bind_inputs(nodes, input_name_to_node, user_inputs) + + # Log final SOURCE node configs for debugging + for nid, n in nodes.items(): + if n.get("node_type") == "SOURCE": + logger.info(f"Final SOURCE node {nid}: config={n.get('config')}") + + # Transform output to output_id + if "output" in dag_copy: + dag_copy["output_id"] = dag_copy.pop("output") + + # Add metadata if not present + if "metadata" not in dag_copy: + dag_copy["metadata"] = {} + + return json.dumps(dag_copy), warnings + + @router.post("/upload") async def upload_recipe( file: UploadFile = File(...), @@ -320,117 +495,15 @@ async def run_recipe( raise HTTPException(404, "Recipe not found") try: - import json - # Create run using run service run_service = RunService(database, get_redis_client(), get_cache_manager()) - # If recipe has a DAG definition, bind inputs and convert to JSON - recipe_dag = recipe.get("dag") + # Prepare DAG for execution (transform nodes, bind inputs) dag_json = None - if recipe_dag and isinstance(recipe_dag, dict): - # Bind inputs to the DAG's source nodes - dag_copy = json.loads(json.dumps(recipe_dag)) # Deep copy - nodes = dag_copy.get("nodes", {}) - - # Get registry for resolving asset/effect references - registry = recipe.get("registry", {}) - assets = registry.get("assets", {}) - effects = registry.get("effects", {}) - - # Convert nodes from list to dict if needed, and transform to artdag format - if isinstance(nodes, list): - nodes_dict = {} - for node in nodes: - node_id = node.get("id") - if node_id: - config = node.get("config", {}) - - # Resolve asset references for SOURCE nodes - if node.get("type") == "SOURCE" and "asset" in config: - asset_name = config["asset"] - if asset_name in assets: - config["cid"] = assets[asset_name].get("cid") - - # Resolve effect references for EFFECT nodes - if node.get("type") == "EFFECT" and "effect" in config: - effect_name = config["effect"] - if effect_name in effects: - # Use "cid" - the executor looks for this field - config["cid"] = effects[effect_name].get("cid") - - # Transform to artdag format: type->node_type, id->node_id - transformed = { - "node_id": node_id, - "node_type": node.get("type", "EFFECT"), - "config": config, - "inputs": node.get("inputs", []), - "name": node.get("name"), - } - nodes_dict[node_id] = transformed - nodes = nodes_dict - dag_copy["nodes"] = nodes - - # Build lookup for variable inputs: map input names to node IDs - # Variable inputs can be referenced by: node_id, config.name, config.input (if string) - input_name_to_node = {} - for node_id, node in nodes.items(): - logger.debug(f"Checking node {node_id}: type={node.get('node_type')}, config={node.get('config')}") - if node.get("node_type") == "SOURCE": - config = node.get("config", {}) - # Only variable inputs (those with 'input' in config, not fixed assets) - if config.get("input"): - input_name_to_node[node_id] = node_id - # Map by config.name (e.g., "Second Video") - if config.get("name"): - name = config["name"] - input_name_to_node[name] = node_id - # Also allow snake_case version - input_name_to_node[name.lower().replace(" ", "_")] = node_id - input_name_to_node[name.lower().replace(" ", "-")] = node_id - # Map by node.name if available (def binding) - if node.get("name"): - input_name_to_node[node["name"]] = node_id - input_name_to_node[node["name"].replace("-", "_")] = node_id - - logger.info(f"Input name to node mapping: {input_name_to_node}") - logger.info(f"User-provided inputs: {req.inputs}") - - # Map user-provided input names to content hashes (for variable inputs) - for input_name, cid in req.inputs.items(): - # Try direct node ID match first - if input_name in nodes: - node = nodes[input_name] - if node.get("node_type") == "SOURCE": - if "config" not in node: - node["config"] = {} - node["config"]["cid"] = cid - logger.info(f"Bound input {input_name} directly to node, cid={cid[:16]}...") - # Try input name lookup - elif input_name in input_name_to_node: - node_id = input_name_to_node[input_name] - node = nodes[node_id] - if "config" not in node: - node["config"] = {} - node["config"]["cid"] = cid - logger.info(f"Bound input {input_name} via lookup to node {node_id}, cid={cid[:16]}...") - else: - logger.warning(f"Input {input_name} not found in nodes or input_name_to_node") - - # Log final DAG nodes for debugging - for nid, n in nodes.items(): - if n.get("node_type") == "SOURCE": - logger.info(f"Final SOURCE node {nid}: config={n.get('config')}") - - # Transform output to output_id - if "output" in dag_copy: - dag_copy["output_id"] = dag_copy.pop("output") - - # Add metadata if not present - if "metadata" not in dag_copy: - dag_copy["metadata"] = {} - - dag_json = json.dumps(dag_copy) + if recipe.get("dag"): + dag_json, warnings = prepare_dag_for_execution(recipe, req.inputs) + for warning in warnings: + logger.warning(warning) run, error = await run_service.create_run( recipe=recipe_id, # Use recipe hash as primary identifier diff --git a/app/types.py b/app/types.py new file mode 100644 index 0000000..083d3e0 --- /dev/null +++ b/app/types.py @@ -0,0 +1,160 @@ +""" +Type definitions for Art DAG L1 server. + +Uses TypedDict for configuration structures to enable mypy checking. +""" + +from typing import Any, Dict, List, Optional, TypedDict, Union +from typing_extensions import NotRequired + + +# === Node Config Types === + +class SourceConfig(TypedDict, total=False): + """Config for SOURCE nodes.""" + cid: str # Content ID (IPFS CID or SHA3-256 hash) + asset: str # Asset name from registry + input: bool # True if this is a variable input + name: str # Human-readable name for variable inputs + description: str # Description for variable inputs + + +class EffectConfig(TypedDict, total=False): + """Config for EFFECT nodes.""" + effect: str # Effect name + cid: str # Effect CID (for cached/IPFS effects) + # Effect parameters are additional keys + intensity: float + level: float + + +class SequenceConfig(TypedDict, total=False): + """Config for SEQUENCE nodes.""" + transition: Dict[str, Any] # Transition config + + +class SegmentConfig(TypedDict, total=False): + """Config for SEGMENT nodes.""" + start: float + end: float + duration: float + + +# Union of all config types +NodeConfig = Union[SourceConfig, EffectConfig, SequenceConfig, SegmentConfig, Dict[str, Any]] + + +# === Node Types === + +class CompiledNode(TypedDict): + """Node as produced by the S-expression compiler.""" + id: str + type: str # "SOURCE", "EFFECT", "SEQUENCE", etc. + config: Dict[str, Any] + inputs: List[str] + name: NotRequired[str] + + +class TransformedNode(TypedDict): + """Node after transformation for artdag execution.""" + node_id: str + node_type: str + config: Dict[str, Any] + inputs: List[str] + name: NotRequired[str] + + +# === DAG Types === + +class CompiledDAG(TypedDict): + """DAG as produced by the S-expression compiler.""" + nodes: List[CompiledNode] + output: str + + +class TransformedDAG(TypedDict): + """DAG after transformation for artdag execution.""" + nodes: Dict[str, TransformedNode] + output_id: str + metadata: NotRequired[Dict[str, Any]] + + +# === Registry Types === + +class AssetEntry(TypedDict, total=False): + """Asset in the recipe registry.""" + cid: str + url: str + + +class EffectEntry(TypedDict, total=False): + """Effect in the recipe registry.""" + cid: str + url: str + temporal: bool + + +class Registry(TypedDict): + """Recipe registry containing assets and effects.""" + assets: Dict[str, AssetEntry] + effects: Dict[str, EffectEntry] + + +# === Recipe Types === + +class Recipe(TypedDict, total=False): + """Compiled recipe structure.""" + name: str + version: str + description: str + owner: str + registry: Registry + dag: CompiledDAG + recipe_id: str + ipfs_cid: str + sexp: str + step_count: int + error: str + + +# === API Request/Response Types === + +class RecipeRunInputs(TypedDict): + """Mapping of input names to CIDs for recipe execution.""" + # Keys are input names, values are CIDs + pass # Actually just Dict[str, str] + + +class RunResult(TypedDict, total=False): + """Result of a recipe run.""" + run_id: str + status: str # "pending", "running", "completed", "failed" + recipe: str + inputs: List[str] + output_cid: str + ipfs_cid: str + error: str + created_at: str + completed_at: str + + +# === Helper functions for type narrowing === + +def is_source_node(node: TransformedNode) -> bool: + """Check if node is a SOURCE node.""" + return node.get("node_type") == "SOURCE" + + +def is_effect_node(node: TransformedNode) -> bool: + """Check if node is an EFFECT node.""" + return node.get("node_type") == "EFFECT" + + +def is_variable_input(config: Dict[str, Any]) -> bool: + """Check if a SOURCE node config represents a variable input.""" + return bool(config.get("input")) + + +def get_effect_cid(config: Dict[str, Any]) -> Optional[str]: + """Get effect CID from config, checking both 'cid' and 'hash' keys.""" + return config.get("cid") or config.get("hash") diff --git a/pyproject.toml b/pyproject.toml new file mode 100644 index 0000000..b358312 --- /dev/null +++ b/pyproject.toml @@ -0,0 +1,51 @@ +[project] +name = "art-celery" +version = "0.1.0" +description = "Art DAG L1 Server and Celery Workers" +requires-python = ">=3.11" + +[tool.mypy] +python_version = "3.11" +warn_return_any = true +warn_unused_ignores = true +disallow_untyped_defs = true +disallow_incomplete_defs = true +check_untyped_defs = true +strict_optional = true +no_implicit_optional = true + +# Start strict on new code, gradually enable for existing +files = [ + "app/types.py", + "app/routers/recipes.py", + "tests/", +] + +# Ignore missing imports for third-party packages without stubs +[[tool.mypy.overrides]] +module = [ + "celery.*", + "redis.*", + "artdag.*", + "artdag_common.*", + "ipfs_client.*", +] +ignore_missing_imports = true + +[tool.pytest.ini_options] +testpaths = ["tests"] +python_files = ["test_*.py"] +python_functions = ["test_*"] +asyncio_mode = "auto" +addopts = "-v --tb=short" +filterwarnings = [ + "ignore::DeprecationWarning", +] + +[tool.ruff] +line-length = 100 +target-version = "py311" + +[tool.ruff.lint] +select = ["E", "F", "I", "UP"] +ignore = ["E501"] # Line length handled separately diff --git a/requirements-dev.txt b/requirements-dev.txt new file mode 100644 index 0000000..b7e7438 --- /dev/null +++ b/requirements-dev.txt @@ -0,0 +1,16 @@ +# Development dependencies +-r requirements.txt + +# Type checking +mypy>=1.8.0 +types-requests>=2.31.0 +types-PyYAML>=6.0.0 +typing_extensions>=4.9.0 + +# Testing +pytest>=8.0.0 +pytest-asyncio>=0.23.0 +pytest-cov>=4.1.0 + +# Linting +ruff>=0.2.0 diff --git a/tests/__init__.py b/tests/__init__.py new file mode 100644 index 0000000..9849184 --- /dev/null +++ b/tests/__init__.py @@ -0,0 +1 @@ +# Tests for art-celery diff --git a/tests/conftest.py b/tests/conftest.py new file mode 100644 index 0000000..a99ae02 --- /dev/null +++ b/tests/conftest.py @@ -0,0 +1,93 @@ +""" +Pytest fixtures for art-celery tests. +""" + +import pytest +from typing import Any, Dict, List + + +@pytest.fixture +def sample_compiled_nodes() -> List[Dict[str, Any]]: + """Sample nodes as produced by the S-expression compiler.""" + return [ + { + "id": "source_1", + "type": "SOURCE", + "config": {"asset": "cat"}, + "inputs": [], + "name": None, + }, + { + "id": "source_2", + "type": "SOURCE", + "config": { + "input": True, + "name": "Second Video", + "description": "A user-provided video", + }, + "inputs": [], + "name": "second-video", + }, + { + "id": "effect_1", + "type": "EFFECT", + "config": {"effect": "identity"}, + "inputs": ["source_1"], + "name": None, + }, + { + "id": "effect_2", + "type": "EFFECT", + "config": {"effect": "invert", "intensity": 1.0}, + "inputs": ["source_2"], + "name": None, + }, + { + "id": "sequence_1", + "type": "SEQUENCE", + "config": {}, + "inputs": ["effect_1", "effect_2"], + "name": None, + }, + ] + + +@pytest.fixture +def sample_registry() -> Dict[str, Dict[str, Any]]: + """Sample registry with assets and effects.""" + return { + "assets": { + "cat": { + "cid": "QmXrj6tSSn1vQXxxEY2Tyoudvt4CeeqR9gGQwSt7WFrhMZ", + "url": "https://example.com/cat.jpg", + }, + }, + "effects": { + "identity": { + "cid": "QmcWhw6wbHr1GDmorM2KDz8S3yCGTfjuyPR6y8khS2tvko", + }, + "invert": { + "cid": "QmPWaW5E5WFrmDjT6w8enqvtJhM8c5jvQu7XN1doHA3Z7J", + }, + }, + } + + +@pytest.fixture +def sample_recipe( + sample_compiled_nodes: List[Dict[str, Any]], + sample_registry: Dict[str, Dict[str, Any]], +) -> Dict[str, Any]: + """Sample compiled recipe.""" + return { + "name": "test-recipe", + "version": "1.0", + "description": "A test recipe", + "owner": "@test@example.com", + "registry": sample_registry, + "dag": { + "nodes": sample_compiled_nodes, + "output": "sequence_1", + }, + "recipe_id": "Qmtest123", + } diff --git a/tests/test_dag_transform.py b/tests/test_dag_transform.py new file mode 100644 index 0000000..32d45f8 --- /dev/null +++ b/tests/test_dag_transform.py @@ -0,0 +1,492 @@ +""" +Tests for DAG transformation and input binding. + +These tests verify the critical path that was causing bugs: +- Node transformation from compiled format to artdag format +- Asset/effect CID resolution from registry +- Variable input name mapping +- Input binding +""" + +import json +import logging +import pytest +from typing import Any, Dict, List, Optional, Tuple + +# Standalone implementations of the functions for testing +# This avoids importing the full app which requires external dependencies + +logger = logging.getLogger(__name__) + + +def is_variable_input(config: Dict[str, Any]) -> bool: + """Check if a SOURCE node config represents a variable input.""" + return bool(config.get("input")) + + +def transform_node( + node: Dict[str, Any], + assets: Dict[str, Dict[str, Any]], + effects: Dict[str, Dict[str, Any]], +) -> Dict[str, Any]: + """Transform a compiled node to artdag execution format.""" + node_id = node.get("id", "") + config = dict(node.get("config", {})) + + if node.get("type") == "SOURCE" and "asset" in config: + asset_name = config["asset"] + if asset_name in assets: + config["cid"] = assets[asset_name].get("cid") + + if node.get("type") == "EFFECT" and "effect" in config: + effect_name = config["effect"] + if effect_name in effects: + config["cid"] = effects[effect_name].get("cid") + + return { + "node_id": node_id, + "node_type": node.get("type", "EFFECT"), + "config": config, + "inputs": node.get("inputs", []), + "name": node.get("name"), + } + + +def build_input_name_mapping(nodes: Dict[str, Dict[str, Any]]) -> Dict[str, str]: + """Build a mapping from input names to node IDs for variable inputs.""" + input_name_to_node: Dict[str, str] = {} + + for node_id, node in nodes.items(): + if node.get("node_type") != "SOURCE": + continue + + config = node.get("config", {}) + if not is_variable_input(config): + continue + + input_name_to_node[node_id] = node_id + + name = config.get("name") + if name: + input_name_to_node[name] = node_id + input_name_to_node[name.lower().replace(" ", "_")] = node_id + input_name_to_node[name.lower().replace(" ", "-")] = node_id + + node_name = node.get("name") + if node_name: + input_name_to_node[node_name] = node_id + input_name_to_node[node_name.replace("-", "_")] = node_id + + return input_name_to_node + + +def bind_inputs( + nodes: Dict[str, Dict[str, Any]], + input_name_to_node: Dict[str, str], + user_inputs: Dict[str, str], +) -> List[str]: + """Bind user-provided input CIDs to source nodes.""" + warnings: List[str] = [] + + for input_name, cid in user_inputs.items(): + if input_name in nodes: + node = nodes[input_name] + if node.get("node_type") == "SOURCE": + node["config"]["cid"] = cid + continue + + if input_name in input_name_to_node: + node_id = input_name_to_node[input_name] + node = nodes[node_id] + node["config"]["cid"] = cid + continue + + warnings.append(f"Input '{input_name}' not found in recipe") + + return warnings + + +def prepare_dag_for_execution( + recipe: Dict[str, Any], + user_inputs: Dict[str, str], +) -> Tuple[str, List[str]]: + """Prepare a recipe DAG for execution.""" + recipe_dag = recipe.get("dag") + if not recipe_dag or not isinstance(recipe_dag, dict): + raise ValueError("Recipe has no DAG definition") + + dag_copy = json.loads(json.dumps(recipe_dag)) + nodes = dag_copy.get("nodes", {}) + + registry = recipe.get("registry", {}) + assets = registry.get("assets", {}) if registry else {} + effects = registry.get("effects", {}) if registry else {} + + if isinstance(nodes, list): + nodes_dict: Dict[str, Dict[str, Any]] = {} + for node in nodes: + node_id = node.get("id") + if node_id: + nodes_dict[node_id] = transform_node(node, assets, effects) + nodes = nodes_dict + dag_copy["nodes"] = nodes + + input_name_to_node = build_input_name_mapping(nodes) + warnings = bind_inputs(nodes, input_name_to_node, user_inputs) + + if "output" in dag_copy: + dag_copy["output_id"] = dag_copy.pop("output") + + if "metadata" not in dag_copy: + dag_copy["metadata"] = {} + + return json.dumps(dag_copy), warnings + + +class TestTransformNode: + """Tests for transform_node function.""" + + def test_source_node_with_asset_resolves_cid( + self, + sample_registry: Dict[str, Dict[str, Any]], + ) -> None: + """SOURCE nodes with asset reference should get CID from registry.""" + node = { + "id": "source_1", + "type": "SOURCE", + "config": {"asset": "cat"}, + "inputs": [], + } + assets = sample_registry["assets"] + effects = sample_registry["effects"] + + result = transform_node(node, assets, effects) + + assert result["node_id"] == "source_1" + assert result["node_type"] == "SOURCE" + assert result["config"]["cid"] == "QmXrj6tSSn1vQXxxEY2Tyoudvt4CeeqR9gGQwSt7WFrhMZ" + + def test_effect_node_resolves_cid( + self, + sample_registry: Dict[str, Dict[str, Any]], + ) -> None: + """EFFECT nodes should get CID from registry.""" + node = { + "id": "effect_1", + "type": "EFFECT", + "config": {"effect": "invert", "intensity": 1.0}, + "inputs": ["source_1"], + } + assets = sample_registry["assets"] + effects = sample_registry["effects"] + + result = transform_node(node, assets, effects) + + assert result["node_id"] == "effect_1" + assert result["node_type"] == "EFFECT" + assert result["config"]["effect"] == "invert" + assert result["config"]["cid"] == "QmPWaW5E5WFrmDjT6w8enqvtJhM8c5jvQu7XN1doHA3Z7J" + assert result["config"]["intensity"] == 1.0 + + def test_variable_input_node_preserves_config( + self, + sample_registry: Dict[str, Dict[str, Any]], + ) -> None: + """Variable input SOURCE nodes should preserve their config.""" + node = { + "id": "source_2", + "type": "SOURCE", + "config": { + "input": True, + "name": "Second Video", + "description": "User-provided video", + }, + "inputs": [], + "name": "second-video", + } + assets = sample_registry["assets"] + effects = sample_registry["effects"] + + result = transform_node(node, assets, effects) + + assert result["config"]["input"] is True + assert result["config"]["name"] == "Second Video" + assert result["name"] == "second-video" + # No CID yet - will be bound at runtime + assert "cid" not in result["config"] + + def test_unknown_asset_not_resolved( + self, + sample_registry: Dict[str, Dict[str, Any]], + ) -> None: + """Unknown assets should not crash, just not get CID.""" + node = { + "id": "source_1", + "type": "SOURCE", + "config": {"asset": "unknown_asset"}, + "inputs": [], + } + assets = sample_registry["assets"] + effects = sample_registry["effects"] + + result = transform_node(node, assets, effects) + + assert "cid" not in result["config"] + + +class TestBuildInputNameMapping: + """Tests for build_input_name_mapping function.""" + + def test_maps_by_node_id(self) -> None: + """Should map by node_id.""" + nodes = { + "source_2": { + "node_id": "source_2", + "node_type": "SOURCE", + "config": {"input": True, "name": "Test"}, + "inputs": [], + } + } + + mapping = build_input_name_mapping(nodes) + + assert mapping["source_2"] == "source_2" + + def test_maps_by_config_name(self) -> None: + """Should map by config.name with various formats.""" + nodes = { + "source_2": { + "node_id": "source_2", + "node_type": "SOURCE", + "config": {"input": True, "name": "Second Video"}, + "inputs": [], + } + } + + mapping = build_input_name_mapping(nodes) + + assert mapping["Second Video"] == "source_2" + assert mapping["second_video"] == "source_2" + assert mapping["second-video"] == "source_2" + + def test_maps_by_def_name(self) -> None: + """Should map by node.name (def binding name).""" + nodes = { + "source_2": { + "node_id": "source_2", + "node_type": "SOURCE", + "config": {"input": True, "name": "Second Video"}, + "inputs": [], + "name": "inverted-video", + } + } + + mapping = build_input_name_mapping(nodes) + + assert mapping["inverted-video"] == "source_2" + assert mapping["inverted_video"] == "source_2" + + def test_ignores_non_source_nodes(self) -> None: + """Should not include non-SOURCE nodes.""" + nodes = { + "effect_1": { + "node_id": "effect_1", + "node_type": "EFFECT", + "config": {"effect": "invert"}, + "inputs": [], + } + } + + mapping = build_input_name_mapping(nodes) + + assert "effect_1" not in mapping + + def test_ignores_fixed_sources(self) -> None: + """Should not include SOURCE nodes without 'input' flag.""" + nodes = { + "source_1": { + "node_id": "source_1", + "node_type": "SOURCE", + "config": {"asset": "cat", "cid": "Qm123"}, + "inputs": [], + } + } + + mapping = build_input_name_mapping(nodes) + + assert "source_1" not in mapping + + +class TestBindInputs: + """Tests for bind_inputs function.""" + + def test_binds_by_direct_node_id(self) -> None: + """Should bind when using node_id directly.""" + nodes = { + "source_2": { + "node_id": "source_2", + "node_type": "SOURCE", + "config": {"input": True, "name": "Test"}, + "inputs": [], + } + } + mapping = {"source_2": "source_2"} + user_inputs = {"source_2": "QmUserInput123"} + + warnings = bind_inputs(nodes, mapping, user_inputs) + + assert nodes["source_2"]["config"]["cid"] == "QmUserInput123" + assert len(warnings) == 0 + + def test_binds_by_name_lookup(self) -> None: + """Should bind when using input name.""" + nodes = { + "source_2": { + "node_id": "source_2", + "node_type": "SOURCE", + "config": {"input": True, "name": "Second Video"}, + "inputs": [], + } + } + mapping = { + "source_2": "source_2", + "Second Video": "source_2", + "second-video": "source_2", + } + user_inputs = {"second-video": "QmUserInput123"} + + warnings = bind_inputs(nodes, mapping, user_inputs) + + assert nodes["source_2"]["config"]["cid"] == "QmUserInput123" + assert len(warnings) == 0 + + def test_warns_on_unknown_input(self) -> None: + """Should warn when input name not found.""" + nodes = { + "source_2": { + "node_id": "source_2", + "node_type": "SOURCE", + "config": {"input": True, "name": "Test"}, + "inputs": [], + } + } + mapping = {"source_2": "source_2", "Test": "source_2"} + user_inputs = {"unknown-input": "QmUserInput123"} + + warnings = bind_inputs(nodes, mapping, user_inputs) + + assert len(warnings) == 1 + assert "unknown-input" in warnings[0] + + +class TestRegressions: + """Tests for specific bugs that were found in production.""" + + def test_effect_cid_key_not_effect_hash( + self, + sample_registry: Dict[str, Dict[str, Any]], + ) -> None: + """ + Regression test: Effect CID must use 'cid' key, not 'effect_hash'. + + Bug found 2026-01-12: The transform_node function was setting + config["effect_hash"] but the executor looks for config["cid"]. + This caused "Unknown effect: invert" errors. + """ + node = { + "id": "effect_1", + "type": "EFFECT", + "config": {"effect": "invert"}, + "inputs": ["source_1"], + } + assets = sample_registry["assets"] + effects = sample_registry["effects"] + + result = transform_node(node, assets, effects) + + # MUST use 'cid' key - executor checks config.get("cid") + assert "cid" in result["config"], "Effect CID must be stored as 'cid'" + assert "effect_hash" not in result["config"], "Must not use 'effect_hash' key" + assert result["config"]["cid"] == "QmPWaW5E5WFrmDjT6w8enqvtJhM8c5jvQu7XN1doHA3Z7J" + + def test_source_cid_binding_persists( + self, + sample_recipe: Dict[str, Any], + ) -> None: + """ + Regression test: Bound CIDs must appear in final DAG JSON. + + Bug found 2026-01-12: Variable input CIDs were being bound but + not appearing in the serialized DAG sent to the executor. + """ + user_inputs = {"second-video": "QmTestUserInput123"} + + dag_json, _ = prepare_dag_for_execution(sample_recipe, user_inputs) + dag = json.loads(dag_json) + + # The bound CID must be in the final JSON + source_2 = dag["nodes"]["source_2"] + assert source_2["config"]["cid"] == "QmTestUserInput123" + + +class TestPrepareDagForExecution: + """Integration tests for prepare_dag_for_execution.""" + + def test_full_pipeline(self, sample_recipe: Dict[str, Any]) -> None: + """Test the full DAG preparation pipeline.""" + user_inputs = { + "second-video": "QmS4885aRikrjDB4yHPg9yTiPcBFWadZKVfAEvUy7B32zS" + } + + dag_json, warnings = prepare_dag_for_execution(sample_recipe, user_inputs) + + # Parse result + dag = json.loads(dag_json) + + # Check structure + assert "nodes" in dag + assert "output_id" in dag + assert dag["output_id"] == "sequence_1" + + # Check fixed source has CID + source_1 = dag["nodes"]["source_1"] + assert source_1["config"]["cid"] == "QmXrj6tSSn1vQXxxEY2Tyoudvt4CeeqR9gGQwSt7WFrhMZ" + + # Check variable input was bound + source_2 = dag["nodes"]["source_2"] + assert source_2["config"]["cid"] == "QmS4885aRikrjDB4yHPg9yTiPcBFWadZKVfAEvUy7B32zS" + + # Check effects have CIDs + effect_1 = dag["nodes"]["effect_1"] + assert effect_1["config"]["cid"] == "QmcWhw6wbHr1GDmorM2KDz8S3yCGTfjuyPR6y8khS2tvko" + + effect_2 = dag["nodes"]["effect_2"] + assert effect_2["config"]["cid"] == "QmPWaW5E5WFrmDjT6w8enqvtJhM8c5jvQu7XN1doHA3Z7J" + assert effect_2["config"]["intensity"] == 1.0 + + # No warnings + assert len(warnings) == 0 + + def test_missing_input_produces_warning( + self, + sample_recipe: Dict[str, Any], + ) -> None: + """Missing inputs should produce warnings but not fail.""" + user_inputs = {} # No inputs provided + + dag_json, warnings = prepare_dag_for_execution(sample_recipe, user_inputs) + + # Should still produce valid JSON + dag = json.loads(dag_json) + assert "nodes" in dag + + # Variable input should not have CID + source_2 = dag["nodes"]["source_2"] + assert "cid" not in source_2["config"] + + def test_raises_on_missing_dag(self) -> None: + """Should raise ValueError if recipe has no DAG.""" + recipe = {"name": "broken", "registry": {}} + + with pytest.raises(ValueError, match="no DAG"): + prepare_dag_for_execution(recipe, {})