diff --git a/cache_manager.py b/cache_manager.py index e62180b..839278c 100644 --- a/cache_manager.py +++ b/cache_manager.py @@ -325,6 +325,12 @@ class L1CacheManager: self._set_content_index(cid, node_id) logger.info(f"put: Set content index {cid[:16]}... -> {node_id[:16]}...") + # Also index by node_id itself (for code-addressed cache lookups) + # This allows get_by_cid(cache_id) to work when cache_id != IPFS CID + if node_id != cid: + self._set_content_index(node_id, node_id) + logger.debug(f"Self-indexed: {node_id[:16]}... -> {node_id[:16]}...") + # Also index by local hash if cid is an IPFS CID # This ensures both IPFS CID and local hash can be used to find the file if local_hash and local_hash != cid: diff --git a/legacy_tasks.py b/legacy_tasks.py index bc45082..72d1d6d 100644 --- a/legacy_tasks.py +++ b/legacy_tasks.py @@ -771,6 +771,78 @@ def execute_recipe(self, recipe_sexp: str, input_hashes: Dict[str, str], run_id: else: raise ValueError(f"SOURCE step has no cid and no :input flag: {step.config}") + # Handle COMPOUND nodes (collapsed effect chains) + if step.node_type == "COMPOUND": + import subprocess + import tempfile + + filter_chain = step.config.get("filter_chain", []) + if not filter_chain: + raise ValueError("COMPOUND step has empty filter_chain") + + # Get input path + if not input_paths: + raise ValueError("COMPOUND step has no inputs") + + # For COMPOUND with EFFECT filters, run effects sequentially + current_input = input_paths[0] + temp_files = [] + + for i, filter_item in enumerate(filter_chain): + filter_type = filter_item.get("type", "") + filter_config = filter_item.get("config", {}) + + if filter_type == "EFFECT": + effect_name = filter_config.get("effect") + effect_cid = filter_config.get("cid") + + if effect_name and effect_cid: + # Get effect executor + effect_executor = get_executor(f"effect:{effect_name}") + if effect_executor: + temp_dir = Path(tempfile.mkdtemp()) + temp_output = temp_dir / f"compound_{i}_{effect_name}.mkv" + + logger.info(f"COMPOUND: Running effect {effect_name} (step {i+1}/{len(filter_chain)})") + result_path = effect_executor.execute(filter_config, [current_input], temp_output) + + current_input = result_path + temp_files.append(temp_dir) + else: + logger.warning(f"COMPOUND: No executor for effect {effect_name}, skipping") + + # Store final result + output_dir = CACHE_DIR / "nodes" / step.cache_id + output_dir.mkdir(parents=True, exist_ok=True) + final_output = output_dir / "output.mkv" + + import shutil + shutil.copy2(current_input, final_output) + + # Upload to IPFS + cached, content_cid = cache_manager.put( + final_output, + node_type="COMPOUND", + node_id=step.cache_id, + ) + + # Cleanup temp files + for temp_dir in temp_files: + if temp_dir.exists(): + shutil.rmtree(temp_dir, ignore_errors=True) + + step_results[step.step_id] = { + "status": "executed", + "path": str(final_output), + "cache_id": step.cache_id, + "cid": content_cid, + "filter_count": len(filter_chain), + } + cache_id_to_path[step.cache_id] = final_output + total_executed += 1 + logger.info(f"COMPOUND step {step.step_id}: {len(filter_chain)} effects -> {content_cid[:16]}...") + continue + # Get executor for this step type executor = get_executor(step.node_type) if not executor: diff --git a/tests/test_cache_manager.py b/tests/test_cache_manager.py index f9e22ea..da2b5ab 100644 --- a/tests/test_cache_manager.py +++ b/tests/test_cache_manager.py @@ -134,7 +134,7 @@ class TestL1CacheManagerStorage: """Can store and retrieve by content hash.""" test_file = create_test_file(temp_dir / "input.txt", "hello world") - cached = manager.put(test_file, node_type="test") + cached, cid = manager.put(test_file, node_type="test") retrieved_path = manager.get_by_cid(cached.cid) assert retrieved_path is not None @@ -144,7 +144,7 @@ class TestL1CacheManagerStorage: """Can store with custom node_id.""" test_file = create_test_file(temp_dir / "input.txt", "content") - cached = manager.put(test_file, node_id="custom-node-123", node_type="test") + cached, cid = manager.put(test_file, node_id="custom-node-123", node_type="test") assert cached.node_id == "custom-node-123" assert manager.get_by_node_id("custom-node-123") is not None @@ -153,7 +153,7 @@ class TestL1CacheManagerStorage: """has_content checks existence.""" test_file = create_test_file(temp_dir / "input.txt", "data") - cached = manager.put(test_file, node_type="test") + cached, cid = manager.put(test_file, node_type="test") assert manager.has_content(cached.cid) is True assert manager.has_content("nonexistent") is False @@ -174,8 +174,8 @@ class TestL1CacheManagerStorage: f1 = create_test_file(temp_dir / "f1.txt", "identical") f2 = create_test_file(temp_dir / "f2.txt", "identical") - cached1 = manager.put(f1, node_type="test") - cached2 = manager.put(f2, node_type="test") + cached1, cid1 = manager.put(f1, node_type="test") + cached2, cid2 = manager.put(f2, node_type="test") assert cached1.cid == cached2.cid assert len(manager.list_all()) == 1 @@ -189,8 +189,8 @@ class TestL1CacheManagerActivities: input_file = create_test_file(temp_dir / "input.txt", "input") output_file = create_test_file(temp_dir / "output.txt", "output") - input_cached = manager.put(input_file, node_type="source") - output_cached = manager.put(output_file, node_type="effect") + input_cached, _ = manager.put(input_file, node_type="source") + output_cached, _ = manager.put(output_file, node_type="effect") activity = manager.record_simple_activity( input_hashes=[input_cached.cid], @@ -207,8 +207,8 @@ class TestL1CacheManagerActivities: for i in range(3): inp = create_test_file(temp_dir / f"in{i}.txt", f"input{i}") out = create_test_file(temp_dir / f"out{i}.txt", f"output{i}") - inp_c = manager.put(inp, node_type="source") - out_c = manager.put(out, node_type="effect") + inp_c, _ = manager.put(inp, node_type="source") + out_c, _ = manager.put(out, node_type="effect") manager.record_simple_activity([inp_c.cid], out_c.cid) activities = manager.list_activities() @@ -217,13 +217,13 @@ class TestL1CacheManagerActivities: def test_find_activities_by_inputs(self, manager, temp_dir): """Can find activities with same inputs.""" input_file = create_test_file(temp_dir / "shared_input.txt", "shared") - input_cached = manager.put(input_file, node_type="source") + input_cached, _ = manager.put(input_file, node_type="source") # Two activities with same input out1 = create_test_file(temp_dir / "out1.txt", "output1") out2 = create_test_file(temp_dir / "out2.txt", "output2") - out1_c = manager.put(out1, node_type="effect") - out2_c = manager.put(out2, node_type="effect") + out1_c, _ = manager.put(out1, node_type="effect") + out2_c, _ = manager.put(out2, node_type="effect") manager.record_simple_activity([input_cached.cid], out1_c.cid, "run1") manager.record_simple_activity([input_cached.cid], out2_c.cid, "run2") @@ -238,7 +238,7 @@ class TestL1CacheManagerDeletionRules: def test_can_delete_orphaned_item(self, manager, temp_dir): """Orphaned items can be deleted.""" test_file = create_test_file(temp_dir / "orphan.txt", "orphan") - cached = manager.put(test_file, node_type="test") + cached, _ = manager.put(test_file, node_type="test") can_delete, reason = manager.can_delete(cached.cid) assert can_delete is True @@ -248,8 +248,8 @@ class TestL1CacheManagerDeletionRules: input_file = create_test_file(temp_dir / "input.txt", "input") output_file = create_test_file(temp_dir / "output.txt", "output") - input_cached = manager.put(input_file, node_type="source") - output_cached = manager.put(output_file, node_type="effect") + input_cached, _ = manager.put(input_file, node_type="source") + output_cached, _ = manager.put(output_file, node_type="effect") manager.record_simple_activity( [input_cached.cid], @@ -265,8 +265,8 @@ class TestL1CacheManagerDeletionRules: input_file = create_test_file(temp_dir / "input.txt", "input") output_file = create_test_file(temp_dir / "output.txt", "output") - input_cached = manager.put(input_file, node_type="source") - output_cached = manager.put(output_file, node_type="effect") + input_cached, _ = manager.put(input_file, node_type="source") + output_cached, _ = manager.put(output_file, node_type="effect") manager.record_simple_activity( [input_cached.cid], @@ -280,7 +280,7 @@ class TestL1CacheManagerDeletionRules: def test_cannot_delete_pinned_item(self, manager, temp_dir): """Pinned items cannot be deleted.""" test_file = create_test_file(temp_dir / "shared.txt", "shared") - cached = manager.put(test_file, node_type="test") + cached, _ = manager.put(test_file, node_type="test") # Mark as pinned (published) manager.pin(cached.cid, reason="published") @@ -292,7 +292,7 @@ class TestL1CacheManagerDeletionRules: def test_delete_orphaned_item(self, manager, temp_dir): """Can delete orphaned items.""" test_file = create_test_file(temp_dir / "orphan.txt", "orphan") - cached = manager.put(test_file, node_type="test") + cached, _ = manager.put(test_file, node_type="test") success, msg = manager.delete_by_cid(cached.cid) @@ -304,8 +304,8 @@ class TestL1CacheManagerDeletionRules: input_file = create_test_file(temp_dir / "input.txt", "input") output_file = create_test_file(temp_dir / "output.txt", "output") - input_cached = manager.put(input_file, node_type="source") - output_cached = manager.put(output_file, node_type="effect") + input_cached, _ = manager.put(input_file, node_type="source") + output_cached, _ = manager.put(output_file, node_type="effect") manager.record_simple_activity( [input_cached.cid], @@ -326,8 +326,8 @@ class TestL1CacheManagerActivityDiscard: input_file = create_test_file(temp_dir / "input.txt", "input") output_file = create_test_file(temp_dir / "output.txt", "output") - input_cached = manager.put(input_file, node_type="source") - output_cached = manager.put(output_file, node_type="effect") + input_cached, _ = manager.put(input_file, node_type="source") + output_cached, _ = manager.put(output_file, node_type="effect") activity = manager.record_simple_activity( [input_cached.cid], @@ -343,8 +343,8 @@ class TestL1CacheManagerActivityDiscard: input_file = create_test_file(temp_dir / "input.txt", "input") output_file = create_test_file(temp_dir / "output.txt", "output") - input_cached = manager.put(input_file, node_type="source") - output_cached = manager.put(output_file, node_type="effect") + input_cached, _ = manager.put(input_file, node_type="source") + output_cached, _ = manager.put(output_file, node_type="effect") manager.record_simple_activity( [input_cached.cid], @@ -364,8 +364,8 @@ class TestL1CacheManagerActivityDiscard: input_file = create_test_file(temp_dir / "input.txt", "input") output_file = create_test_file(temp_dir / "output.txt", "output") - input_cached = manager.put(input_file, node_type="source") - output_cached = manager.put(output_file, node_type="effect") + input_cached, _ = manager.put(input_file, node_type="source") + output_cached, _ = manager.put(output_file, node_type="effect") manager.record_simple_activity( [input_cached.cid],