Merges full history from art-dag/mono.git into the monorepo under the artdag/ directory. Contains: core (DAG engine), l1 (Celery rendering server), l2 (ActivityPub registry), common (shared templates/middleware), client (CLI), test (e2e). Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com> git-subtree-dir: artdag git-subtree-mainline:1a179de547git-subtree-split:4c2e716558
493 lines
16 KiB
Python
493 lines
16 KiB
Python
"""
|
|
Tests for DAG transformation and input binding.
|
|
|
|
These tests verify the critical path that was causing bugs:
|
|
- Node transformation from compiled format to artdag format
|
|
- Asset/effect CID resolution from registry
|
|
- Variable input name mapping
|
|
- Input binding
|
|
"""
|
|
|
|
import json
|
|
import logging
|
|
import pytest
|
|
from typing import Any, Dict, List, Optional, Tuple
|
|
|
|
# Standalone implementations of the functions for testing
|
|
# This avoids importing the full app which requires external dependencies
|
|
|
|
logger = logging.getLogger(__name__)
|
|
|
|
|
|
def is_variable_input(config: Dict[str, Any]) -> bool:
|
|
"""Check if a SOURCE node config represents a variable input."""
|
|
return bool(config.get("input"))
|
|
|
|
|
|
def transform_node(
|
|
node: Dict[str, Any],
|
|
assets: Dict[str, Dict[str, Any]],
|
|
effects: Dict[str, Dict[str, Any]],
|
|
) -> Dict[str, Any]:
|
|
"""Transform a compiled node to artdag execution format."""
|
|
node_id = node.get("id", "")
|
|
config = dict(node.get("config", {}))
|
|
|
|
if node.get("type") == "SOURCE" and "asset" in config:
|
|
asset_name = config["asset"]
|
|
if asset_name in assets:
|
|
config["cid"] = assets[asset_name].get("cid")
|
|
|
|
if node.get("type") == "EFFECT" and "effect" in config:
|
|
effect_name = config["effect"]
|
|
if effect_name in effects:
|
|
config["cid"] = effects[effect_name].get("cid")
|
|
|
|
return {
|
|
"node_id": node_id,
|
|
"node_type": node.get("type", "EFFECT"),
|
|
"config": config,
|
|
"inputs": node.get("inputs", []),
|
|
"name": node.get("name"),
|
|
}
|
|
|
|
|
|
def build_input_name_mapping(nodes: Dict[str, Dict[str, Any]]) -> Dict[str, str]:
|
|
"""Build a mapping from input names to node IDs for variable inputs."""
|
|
input_name_to_node: Dict[str, str] = {}
|
|
|
|
for node_id, node in nodes.items():
|
|
if node.get("node_type") != "SOURCE":
|
|
continue
|
|
|
|
config = node.get("config", {})
|
|
if not is_variable_input(config):
|
|
continue
|
|
|
|
input_name_to_node[node_id] = node_id
|
|
|
|
name = config.get("name")
|
|
if name:
|
|
input_name_to_node[name] = node_id
|
|
input_name_to_node[name.lower().replace(" ", "_")] = node_id
|
|
input_name_to_node[name.lower().replace(" ", "-")] = node_id
|
|
|
|
node_name = node.get("name")
|
|
if node_name:
|
|
input_name_to_node[node_name] = node_id
|
|
input_name_to_node[node_name.replace("-", "_")] = node_id
|
|
|
|
return input_name_to_node
|
|
|
|
|
|
def bind_inputs(
|
|
nodes: Dict[str, Dict[str, Any]],
|
|
input_name_to_node: Dict[str, str],
|
|
user_inputs: Dict[str, str],
|
|
) -> List[str]:
|
|
"""Bind user-provided input CIDs to source nodes."""
|
|
warnings: List[str] = []
|
|
|
|
for input_name, cid in user_inputs.items():
|
|
if input_name in nodes:
|
|
node = nodes[input_name]
|
|
if node.get("node_type") == "SOURCE":
|
|
node["config"]["cid"] = cid
|
|
continue
|
|
|
|
if input_name in input_name_to_node:
|
|
node_id = input_name_to_node[input_name]
|
|
node = nodes[node_id]
|
|
node["config"]["cid"] = cid
|
|
continue
|
|
|
|
warnings.append(f"Input '{input_name}' not found in recipe")
|
|
|
|
return warnings
|
|
|
|
|
|
def prepare_dag_for_execution(
|
|
recipe: Dict[str, Any],
|
|
user_inputs: Dict[str, str],
|
|
) -> Tuple[str, List[str]]:
|
|
"""Prepare a recipe DAG for execution."""
|
|
recipe_dag = recipe.get("dag")
|
|
if not recipe_dag or not isinstance(recipe_dag, dict):
|
|
raise ValueError("Recipe has no DAG definition")
|
|
|
|
dag_copy = json.loads(json.dumps(recipe_dag))
|
|
nodes = dag_copy.get("nodes", {})
|
|
|
|
registry = recipe.get("registry", {})
|
|
assets = registry.get("assets", {}) if registry else {}
|
|
effects = registry.get("effects", {}) if registry else {}
|
|
|
|
if isinstance(nodes, list):
|
|
nodes_dict: Dict[str, Dict[str, Any]] = {}
|
|
for node in nodes:
|
|
node_id = node.get("id")
|
|
if node_id:
|
|
nodes_dict[node_id] = transform_node(node, assets, effects)
|
|
nodes = nodes_dict
|
|
dag_copy["nodes"] = nodes
|
|
|
|
input_name_to_node = build_input_name_mapping(nodes)
|
|
warnings = bind_inputs(nodes, input_name_to_node, user_inputs)
|
|
|
|
if "output" in dag_copy:
|
|
dag_copy["output_id"] = dag_copy.pop("output")
|
|
|
|
if "metadata" not in dag_copy:
|
|
dag_copy["metadata"] = {}
|
|
|
|
return json.dumps(dag_copy), warnings
|
|
|
|
|
|
class TestTransformNode:
|
|
"""Tests for transform_node function."""
|
|
|
|
def test_source_node_with_asset_resolves_cid(
|
|
self,
|
|
sample_registry: Dict[str, Dict[str, Any]],
|
|
) -> None:
|
|
"""SOURCE nodes with asset reference should get CID from registry."""
|
|
node = {
|
|
"id": "source_1",
|
|
"type": "SOURCE",
|
|
"config": {"asset": "cat"},
|
|
"inputs": [],
|
|
}
|
|
assets = sample_registry["assets"]
|
|
effects = sample_registry["effects"]
|
|
|
|
result = transform_node(node, assets, effects)
|
|
|
|
assert result["node_id"] == "source_1"
|
|
assert result["node_type"] == "SOURCE"
|
|
assert result["config"]["cid"] == "QmXrj6tSSn1vQXxxEY2Tyoudvt4CeeqR9gGQwSt7WFrhMZ"
|
|
|
|
def test_effect_node_resolves_cid(
|
|
self,
|
|
sample_registry: Dict[str, Dict[str, Any]],
|
|
) -> None:
|
|
"""EFFECT nodes should get CID from registry."""
|
|
node = {
|
|
"id": "effect_1",
|
|
"type": "EFFECT",
|
|
"config": {"effect": "invert", "intensity": 1.0},
|
|
"inputs": ["source_1"],
|
|
}
|
|
assets = sample_registry["assets"]
|
|
effects = sample_registry["effects"]
|
|
|
|
result = transform_node(node, assets, effects)
|
|
|
|
assert result["node_id"] == "effect_1"
|
|
assert result["node_type"] == "EFFECT"
|
|
assert result["config"]["effect"] == "invert"
|
|
assert result["config"]["cid"] == "QmPWaW5E5WFrmDjT6w8enqvtJhM8c5jvQu7XN1doHA3Z7J"
|
|
assert result["config"]["intensity"] == 1.0
|
|
|
|
def test_variable_input_node_preserves_config(
|
|
self,
|
|
sample_registry: Dict[str, Dict[str, Any]],
|
|
) -> None:
|
|
"""Variable input SOURCE nodes should preserve their config."""
|
|
node = {
|
|
"id": "source_2",
|
|
"type": "SOURCE",
|
|
"config": {
|
|
"input": True,
|
|
"name": "Second Video",
|
|
"description": "User-provided video",
|
|
},
|
|
"inputs": [],
|
|
"name": "second-video",
|
|
}
|
|
assets = sample_registry["assets"]
|
|
effects = sample_registry["effects"]
|
|
|
|
result = transform_node(node, assets, effects)
|
|
|
|
assert result["config"]["input"] is True
|
|
assert result["config"]["name"] == "Second Video"
|
|
assert result["name"] == "second-video"
|
|
# No CID yet - will be bound at runtime
|
|
assert "cid" not in result["config"]
|
|
|
|
def test_unknown_asset_not_resolved(
|
|
self,
|
|
sample_registry: Dict[str, Dict[str, Any]],
|
|
) -> None:
|
|
"""Unknown assets should not crash, just not get CID."""
|
|
node = {
|
|
"id": "source_1",
|
|
"type": "SOURCE",
|
|
"config": {"asset": "unknown_asset"},
|
|
"inputs": [],
|
|
}
|
|
assets = sample_registry["assets"]
|
|
effects = sample_registry["effects"]
|
|
|
|
result = transform_node(node, assets, effects)
|
|
|
|
assert "cid" not in result["config"]
|
|
|
|
|
|
class TestBuildInputNameMapping:
|
|
"""Tests for build_input_name_mapping function."""
|
|
|
|
def test_maps_by_node_id(self) -> None:
|
|
"""Should map by node_id."""
|
|
nodes = {
|
|
"source_2": {
|
|
"node_id": "source_2",
|
|
"node_type": "SOURCE",
|
|
"config": {"input": True, "name": "Test"},
|
|
"inputs": [],
|
|
}
|
|
}
|
|
|
|
mapping = build_input_name_mapping(nodes)
|
|
|
|
assert mapping["source_2"] == "source_2"
|
|
|
|
def test_maps_by_config_name(self) -> None:
|
|
"""Should map by config.name with various formats."""
|
|
nodes = {
|
|
"source_2": {
|
|
"node_id": "source_2",
|
|
"node_type": "SOURCE",
|
|
"config": {"input": True, "name": "Second Video"},
|
|
"inputs": [],
|
|
}
|
|
}
|
|
|
|
mapping = build_input_name_mapping(nodes)
|
|
|
|
assert mapping["Second Video"] == "source_2"
|
|
assert mapping["second_video"] == "source_2"
|
|
assert mapping["second-video"] == "source_2"
|
|
|
|
def test_maps_by_def_name(self) -> None:
|
|
"""Should map by node.name (def binding name)."""
|
|
nodes = {
|
|
"source_2": {
|
|
"node_id": "source_2",
|
|
"node_type": "SOURCE",
|
|
"config": {"input": True, "name": "Second Video"},
|
|
"inputs": [],
|
|
"name": "inverted-video",
|
|
}
|
|
}
|
|
|
|
mapping = build_input_name_mapping(nodes)
|
|
|
|
assert mapping["inverted-video"] == "source_2"
|
|
assert mapping["inverted_video"] == "source_2"
|
|
|
|
def test_ignores_non_source_nodes(self) -> None:
|
|
"""Should not include non-SOURCE nodes."""
|
|
nodes = {
|
|
"effect_1": {
|
|
"node_id": "effect_1",
|
|
"node_type": "EFFECT",
|
|
"config": {"effect": "invert"},
|
|
"inputs": [],
|
|
}
|
|
}
|
|
|
|
mapping = build_input_name_mapping(nodes)
|
|
|
|
assert "effect_1" not in mapping
|
|
|
|
def test_ignores_fixed_sources(self) -> None:
|
|
"""Should not include SOURCE nodes without 'input' flag."""
|
|
nodes = {
|
|
"source_1": {
|
|
"node_id": "source_1",
|
|
"node_type": "SOURCE",
|
|
"config": {"asset": "cat", "cid": "Qm123"},
|
|
"inputs": [],
|
|
}
|
|
}
|
|
|
|
mapping = build_input_name_mapping(nodes)
|
|
|
|
assert "source_1" not in mapping
|
|
|
|
|
|
class TestBindInputs:
|
|
"""Tests for bind_inputs function."""
|
|
|
|
def test_binds_by_direct_node_id(self) -> None:
|
|
"""Should bind when using node_id directly."""
|
|
nodes = {
|
|
"source_2": {
|
|
"node_id": "source_2",
|
|
"node_type": "SOURCE",
|
|
"config": {"input": True, "name": "Test"},
|
|
"inputs": [],
|
|
}
|
|
}
|
|
mapping = {"source_2": "source_2"}
|
|
user_inputs = {"source_2": "QmUserInput123"}
|
|
|
|
warnings = bind_inputs(nodes, mapping, user_inputs)
|
|
|
|
assert nodes["source_2"]["config"]["cid"] == "QmUserInput123"
|
|
assert len(warnings) == 0
|
|
|
|
def test_binds_by_name_lookup(self) -> None:
|
|
"""Should bind when using input name."""
|
|
nodes = {
|
|
"source_2": {
|
|
"node_id": "source_2",
|
|
"node_type": "SOURCE",
|
|
"config": {"input": True, "name": "Second Video"},
|
|
"inputs": [],
|
|
}
|
|
}
|
|
mapping = {
|
|
"source_2": "source_2",
|
|
"Second Video": "source_2",
|
|
"second-video": "source_2",
|
|
}
|
|
user_inputs = {"second-video": "QmUserInput123"}
|
|
|
|
warnings = bind_inputs(nodes, mapping, user_inputs)
|
|
|
|
assert nodes["source_2"]["config"]["cid"] == "QmUserInput123"
|
|
assert len(warnings) == 0
|
|
|
|
def test_warns_on_unknown_input(self) -> None:
|
|
"""Should warn when input name not found."""
|
|
nodes = {
|
|
"source_2": {
|
|
"node_id": "source_2",
|
|
"node_type": "SOURCE",
|
|
"config": {"input": True, "name": "Test"},
|
|
"inputs": [],
|
|
}
|
|
}
|
|
mapping = {"source_2": "source_2", "Test": "source_2"}
|
|
user_inputs = {"unknown-input": "QmUserInput123"}
|
|
|
|
warnings = bind_inputs(nodes, mapping, user_inputs)
|
|
|
|
assert len(warnings) == 1
|
|
assert "unknown-input" in warnings[0]
|
|
|
|
|
|
class TestRegressions:
|
|
"""Tests for specific bugs that were found in production."""
|
|
|
|
def test_effect_cid_key_not_effect_hash(
|
|
self,
|
|
sample_registry: Dict[str, Dict[str, Any]],
|
|
) -> None:
|
|
"""
|
|
Regression test: Effect CID must use 'cid' key, not 'effect_hash'.
|
|
|
|
Bug found 2026-01-12: The transform_node function was setting
|
|
config["effect_hash"] but the executor looks for config["cid"].
|
|
This caused "Unknown effect: invert" errors.
|
|
"""
|
|
node = {
|
|
"id": "effect_1",
|
|
"type": "EFFECT",
|
|
"config": {"effect": "invert"},
|
|
"inputs": ["source_1"],
|
|
}
|
|
assets = sample_registry["assets"]
|
|
effects = sample_registry["effects"]
|
|
|
|
result = transform_node(node, assets, effects)
|
|
|
|
# MUST use 'cid' key - executor checks config.get("cid")
|
|
assert "cid" in result["config"], "Effect CID must be stored as 'cid'"
|
|
assert "effect_hash" not in result["config"], "Must not use 'effect_hash' key"
|
|
assert result["config"]["cid"] == "QmPWaW5E5WFrmDjT6w8enqvtJhM8c5jvQu7XN1doHA3Z7J"
|
|
|
|
def test_source_cid_binding_persists(
|
|
self,
|
|
sample_recipe: Dict[str, Any],
|
|
) -> None:
|
|
"""
|
|
Regression test: Bound CIDs must appear in final DAG JSON.
|
|
|
|
Bug found 2026-01-12: Variable input CIDs were being bound but
|
|
not appearing in the serialized DAG sent to the executor.
|
|
"""
|
|
user_inputs = {"second-video": "QmTestUserInput123"}
|
|
|
|
dag_json, _ = prepare_dag_for_execution(sample_recipe, user_inputs)
|
|
dag = json.loads(dag_json)
|
|
|
|
# The bound CID must be in the final JSON
|
|
source_2 = dag["nodes"]["source_2"]
|
|
assert source_2["config"]["cid"] == "QmTestUserInput123"
|
|
|
|
|
|
class TestPrepareDagForExecution:
|
|
"""Integration tests for prepare_dag_for_execution."""
|
|
|
|
def test_full_pipeline(self, sample_recipe: Dict[str, Any]) -> None:
|
|
"""Test the full DAG preparation pipeline."""
|
|
user_inputs = {
|
|
"second-video": "QmS4885aRikrjDB4yHPg9yTiPcBFWadZKVfAEvUy7B32zS"
|
|
}
|
|
|
|
dag_json, warnings = prepare_dag_for_execution(sample_recipe, user_inputs)
|
|
|
|
# Parse result
|
|
dag = json.loads(dag_json)
|
|
|
|
# Check structure
|
|
assert "nodes" in dag
|
|
assert "output_id" in dag
|
|
assert dag["output_id"] == "sequence_1"
|
|
|
|
# Check fixed source has CID
|
|
source_1 = dag["nodes"]["source_1"]
|
|
assert source_1["config"]["cid"] == "QmXrj6tSSn1vQXxxEY2Tyoudvt4CeeqR9gGQwSt7WFrhMZ"
|
|
|
|
# Check variable input was bound
|
|
source_2 = dag["nodes"]["source_2"]
|
|
assert source_2["config"]["cid"] == "QmS4885aRikrjDB4yHPg9yTiPcBFWadZKVfAEvUy7B32zS"
|
|
|
|
# Check effects have CIDs
|
|
effect_1 = dag["nodes"]["effect_1"]
|
|
assert effect_1["config"]["cid"] == "QmcWhw6wbHr1GDmorM2KDz8S3yCGTfjuyPR6y8khS2tvko"
|
|
|
|
effect_2 = dag["nodes"]["effect_2"]
|
|
assert effect_2["config"]["cid"] == "QmPWaW5E5WFrmDjT6w8enqvtJhM8c5jvQu7XN1doHA3Z7J"
|
|
assert effect_2["config"]["intensity"] == 1.0
|
|
|
|
# No warnings
|
|
assert len(warnings) == 0
|
|
|
|
def test_missing_input_produces_warning(
|
|
self,
|
|
sample_recipe: Dict[str, Any],
|
|
) -> None:
|
|
"""Missing inputs should produce warnings but not fail."""
|
|
user_inputs = {} # No inputs provided
|
|
|
|
dag_json, warnings = prepare_dag_for_execution(sample_recipe, user_inputs)
|
|
|
|
# Should still produce valid JSON
|
|
dag = json.loads(dag_json)
|
|
assert "nodes" in dag
|
|
|
|
# Variable input should not have CID
|
|
source_2 = dag["nodes"]["source_2"]
|
|
assert "cid" not in source_2["config"]
|
|
|
|
def test_raises_on_missing_dag(self) -> None:
|
|
"""Should raise ValueError if recipe has no DAG."""
|
|
recipe = {"name": "broken", "registry": {}}
|
|
|
|
with pytest.raises(ValueError, match="no DAG"):
|
|
prepare_dag_for_execution(recipe, {})
|