Add COMPOUND node handling and fix cache lookups by code-addressed hash
- Add COMPOUND node handling in execute_recipe for collapsed effect chains - Index cache entries by node_id (cache_id) when different from IPFS CID - Fix test_cache_manager.py to unpack put() tuple returns Co-Authored-By: Claude Opus 4.5 <noreply@anthropic.com>
This commit is contained in:
@@ -771,6 +771,78 @@ def execute_recipe(self, recipe_sexp: str, input_hashes: Dict[str, str], run_id:
|
||||
else:
|
||||
raise ValueError(f"SOURCE step has no cid and no :input flag: {step.config}")
|
||||
|
||||
# Handle COMPOUND nodes (collapsed effect chains)
|
||||
if step.node_type == "COMPOUND":
|
||||
import subprocess
|
||||
import tempfile
|
||||
|
||||
filter_chain = step.config.get("filter_chain", [])
|
||||
if not filter_chain:
|
||||
raise ValueError("COMPOUND step has empty filter_chain")
|
||||
|
||||
# Get input path
|
||||
if not input_paths:
|
||||
raise ValueError("COMPOUND step has no inputs")
|
||||
|
||||
# For COMPOUND with EFFECT filters, run effects sequentially
|
||||
current_input = input_paths[0]
|
||||
temp_files = []
|
||||
|
||||
for i, filter_item in enumerate(filter_chain):
|
||||
filter_type = filter_item.get("type", "")
|
||||
filter_config = filter_item.get("config", {})
|
||||
|
||||
if filter_type == "EFFECT":
|
||||
effect_name = filter_config.get("effect")
|
||||
effect_cid = filter_config.get("cid")
|
||||
|
||||
if effect_name and effect_cid:
|
||||
# Get effect executor
|
||||
effect_executor = get_executor(f"effect:{effect_name}")
|
||||
if effect_executor:
|
||||
temp_dir = Path(tempfile.mkdtemp())
|
||||
temp_output = temp_dir / f"compound_{i}_{effect_name}.mkv"
|
||||
|
||||
logger.info(f"COMPOUND: Running effect {effect_name} (step {i+1}/{len(filter_chain)})")
|
||||
result_path = effect_executor.execute(filter_config, [current_input], temp_output)
|
||||
|
||||
current_input = result_path
|
||||
temp_files.append(temp_dir)
|
||||
else:
|
||||
logger.warning(f"COMPOUND: No executor for effect {effect_name}, skipping")
|
||||
|
||||
# Store final result
|
||||
output_dir = CACHE_DIR / "nodes" / step.cache_id
|
||||
output_dir.mkdir(parents=True, exist_ok=True)
|
||||
final_output = output_dir / "output.mkv"
|
||||
|
||||
import shutil
|
||||
shutil.copy2(current_input, final_output)
|
||||
|
||||
# Upload to IPFS
|
||||
cached, content_cid = cache_manager.put(
|
||||
final_output,
|
||||
node_type="COMPOUND",
|
||||
node_id=step.cache_id,
|
||||
)
|
||||
|
||||
# Cleanup temp files
|
||||
for temp_dir in temp_files:
|
||||
if temp_dir.exists():
|
||||
shutil.rmtree(temp_dir, ignore_errors=True)
|
||||
|
||||
step_results[step.step_id] = {
|
||||
"status": "executed",
|
||||
"path": str(final_output),
|
||||
"cache_id": step.cache_id,
|
||||
"cid": content_cid,
|
||||
"filter_count": len(filter_chain),
|
||||
}
|
||||
cache_id_to_path[step.cache_id] = final_output
|
||||
total_executed += 1
|
||||
logger.info(f"COMPOUND step {step.step_id}: {len(filter_chain)} effects -> {content_cid[:16]}...")
|
||||
continue
|
||||
|
||||
# Get executor for this step type
|
||||
executor = get_executor(step.node_type)
|
||||
if not executor:
|
||||
|
||||
Reference in New Issue
Block a user