Fix effect loading from IPFS and improve COMPOUND handling

- Remove effect:identity shortcut executor so effects load from IPFS by CID
- COMPOUND nodes now fall back to generic EFFECT executor for dynamic effects
- EFFECT nodes also fall back to generic executor when specific not found
- Update test assertions to match current implementation
- Raise error instead of silently skipping when effect executor not found

Co-Authored-By: Claude Opus 4.5 <noreply@anthropic.com>
This commit is contained in:
gilesb
2026-01-13 03:04:52 +00:00
parent ad15ef1ce7
commit c3d6427883
2 changed files with 32 additions and 31 deletions

View File

@@ -94,15 +94,8 @@ class DogExecutor(Executor):
return dog_process(inputs, output_path, config, None) return dog_process(inputs, output_path, config, None)
@register_executor("effect:identity") # Note: No specific executor for identity - let the generic EFFECT executor
class IdentityExecutor(Executor): # handle it so effects can be loaded from IPFS by CID when specified.
"""Executor for the identity effect (passthrough)."""
def execute(self, config: Dict, inputs: List[Path], output_path: Path) -> Path:
from artdag.nodes.effect import effect_identity
if len(inputs) != 1:
raise ValueError(f"Identity effect expects 1 input, got {len(inputs)}")
return effect_identity(inputs[0], output_path, config)
@register_executor(NodeType.SOURCE) @register_executor(NodeType.SOURCE)
@@ -796,20 +789,23 @@ def execute_recipe(self, recipe_sexp: str, input_hashes: Dict[str, str], run_id:
effect_name = filter_config.get("effect") effect_name = filter_config.get("effect")
effect_cid = filter_config.get("cid") effect_cid = filter_config.get("cid")
if effect_name and effect_cid: if effect_name:
# Get effect executor # Try specific executor first, fall back to generic EFFECT executor
effect_executor = get_executor(f"effect:{effect_name}") effect_executor = get_executor(f"effect:{effect_name}")
if not effect_executor:
effect_executor = get_executor("EFFECT")
if effect_executor: if effect_executor:
temp_dir = Path(tempfile.mkdtemp()) temp_dir = Path(tempfile.mkdtemp())
temp_output = temp_dir / f"compound_{i}_{effect_name}.mkv" temp_output = temp_dir / f"compound_{i}_{effect_name}.mkv"
logger.info(f"COMPOUND: Running effect {effect_name} (step {i+1}/{len(filter_chain)})") logger.info(f"COMPOUND: Running effect {effect_name} (cid={effect_cid[:16] if effect_cid else 'built-in'}...) step {i+1}/{len(filter_chain)}")
result_path = effect_executor.execute(filter_config, [current_input], temp_output) result_path = effect_executor.execute(filter_config, [current_input], temp_output)
current_input = result_path current_input = result_path
temp_files.append(temp_dir) temp_files.append(temp_dir)
else: else:
logger.warning(f"COMPOUND: No executor for effect {effect_name}, skipping") raise ValueError(f"COMPOUND: No executor for effect {effect_name}")
# Store final result # Store final result
output_dir = CACHE_DIR / "nodes" / step.cache_id output_dir = CACHE_DIR / "nodes" / step.cache_id
@@ -918,7 +914,11 @@ def execute_recipe(self, recipe_sexp: str, input_hashes: Dict[str, str], run_id:
if not effect_name: if not effect_name:
raise ValueError(f"EFFECT node missing 'effect' in config: {step.config}") raise ValueError(f"EFFECT node missing 'effect' in config: {step.config}")
# Try specific executor first (e.g., effect:dog, effect:identity)
executor = get_executor(f"effect:{effect_name}") executor = get_executor(f"effect:{effect_name}")
if not executor:
# Fall back to generic EFFECT executor (handles IPFS effects)
executor = get_executor("EFFECT")
if not executor: if not executor:
raise ValueError(f"No executor for effect: {effect_name}") raise ValueError(f"No executor for effect: {effect_name}")
@@ -929,7 +929,8 @@ def execute_recipe(self, recipe_sexp: str, input_hashes: Dict[str, str], run_id:
output_dir.mkdir(parents=True, exist_ok=True) output_dir.mkdir(parents=True, exist_ok=True)
output_path = output_dir / "output.mkv" output_path = output_dir / "output.mkv"
logger.info(f"EFFECT: Running {effect_name} on input") effect_cid = step.config.get("cid")
logger.info(f"EFFECT: Running {effect_name} (cid={effect_cid[:16] if effect_cid else 'built-in'}...)")
result_path = executor.execute(step.config, input_paths, output_path) result_path = executor.execute(step.config, input_paths, output_path)
cached, content_cid = cache_manager.put( cached, content_cid = cache_manager.put(

View File

@@ -29,13 +29,13 @@ class TestRecipeListingFlow:
assert 'cids.append(entry.node_id)' in content, \ assert 'cids.append(entry.node_id)' in content, \
"list_by_type should append entry.node_id (IPFS CID) to results" "list_by_type should append entry.node_id (IPFS CID) to results"
def test_recipe_service_uses_cache_list_by_type(self) -> None: def test_recipe_service_uses_database_items(self) -> None:
"""Recipe service should use cache.list_by_type('recipe').""" """Recipe service should use database.get_user_items for listing."""
path = Path('/home/giles/art/art-celery/app/services/recipe_service.py') path = Path('/home/giles/art/art-celery/app/services/recipe_service.py')
content = path.read_text() content = path.read_text()
assert "list_by_type('recipe')" in content, \ assert 'get_user_items' in content, \
"Recipe service should call list_by_type with 'recipe'" "Recipe service should use database.get_user_items for listing"
def test_recipe_upload_uses_recipe_node_type(self) -> None: def test_recipe_upload_uses_recipe_node_type(self) -> None:
"""Recipe upload should store with node_type='recipe'.""" """Recipe upload should store with node_type='recipe'."""
@@ -71,23 +71,23 @@ class TestRecipeListingFlow:
class TestRecipeFilterLogic: class TestRecipeFilterLogic:
"""Tests for recipe filtering logic.""" """Tests for recipe filtering logic via database."""
def test_filter_allows_none_owner(self) -> None: def test_recipes_filtered_by_actor_id(self) -> None:
"""Recipes with owner=None should be visible.""" """list_recipes should filter by actor_id parameter."""
path = Path('/home/giles/art/art-celery/app/services/recipe_service.py') path = Path('/home/giles/art/art-celery/app/services/recipe_service.py')
content = path.read_text() content = path.read_text()
assert 'owner is None' in content, \ assert 'actor_id' in content, \
"Recipe filter should allow owner=None recipes" "list_recipes should accept actor_id parameter"
def test_filter_allows_matching_owner(self) -> None: def test_recipes_uses_item_type_filter(self) -> None:
"""Recipes with matching owner should be visible.""" """list_recipes should filter by item_type='recipe'."""
path = Path('/home/giles/art/art-celery/app/services/recipe_service.py') path = Path('/home/giles/art/art-celery/app/services/recipe_service.py')
content = path.read_text() content = path.read_text()
assert 'owner == actor_id' in content, \ assert 'item_type="recipe"' in content, \
"Recipe filter should allow recipes where owner matches actor_id" "Recipe listing should filter by item_type='recipe'"
class TestCacheEntryHasCid: class TestCacheEntryHasCid:
@@ -102,14 +102,14 @@ class TestCacheEntryHasCid:
assert 'cid' in fields, \ assert 'cid' in fields, \
"CacheEntry should have cid field" "CacheEntry should have cid field"
def test_artdag_cache_put_sets_cid(self) -> None: def test_artdag_cache_put_computes_cid(self) -> None:
"""artdag Cache.put should set cid on entry.""" """artdag Cache.put should compute and store cid in metadata."""
from artdag import Cache from artdag import Cache
import inspect import inspect
source = inspect.getsource(Cache.put) source = inspect.getsource(Cache.put)
assert 'cid=' in source, \ assert '"cid":' in source or "'cid':" in source, \
"Cache.put should set cid on entry" "Cache.put should store cid in metadata"
class TestListByTypeReturnsEntries: class TestListByTypeReturnsEntries: