diff --git a/legacy_tasks.py b/legacy_tasks.py index a7877cc..e9d71f9 100644 --- a/legacy_tasks.py +++ b/legacy_tasks.py @@ -327,23 +327,69 @@ def execute_dag(self, dag_json: str, run_id: str = None) -> dict: if not result.success: raise RuntimeError(f"DAG execution failed: {result.error}") - # Get output hash + # Index all node outputs by content_hash and upload to IPFS cache_manager = get_cache_manager() output_hash = None + node_hashes = {} # node_id -> content_hash mapping + node_ipfs_cids = {} # node_id -> ipfs_cid mapping + + # Process all node results (intermediates + output) + for node_id, node_path in result.node_results.items(): + if node_path and Path(node_path).exists(): + node = dag.nodes.get(node_id) + # Skip SOURCE nodes - they're already in cache + if node and (node.node_type == NodeType.SOURCE or str(node.node_type) == "SOURCE"): + content_hash = node.config.get("content_hash") + if content_hash: + node_hashes[node_id] = content_hash + continue + + # Determine node type for cache metadata + node_type_str = str(node.node_type) if node else "intermediate" + if "effect" in node_type_str.lower(): + cache_node_type = "effect_output" + else: + cache_node_type = "dag_intermediate" + + # Store in cache_manager (indexes by content_hash, uploads to IPFS) + cached, ipfs_cid = cache_manager.put( + Path(node_path), + node_type=cache_node_type, + node_id=node_id, + ) + node_hashes[node_id] = cached.content_hash + if ipfs_cid: + node_ipfs_cids[node_id] = ipfs_cid + logger.info(f"Cached node {node_id}: {cached.content_hash[:16]}... -> {ipfs_cid or 'no IPFS'}") + + # Get output hash from the output node if result.output_path and result.output_path.exists(): output_hash = file_hash(result.output_path) + output_ipfs_cid = node_ipfs_cids.get(dag.output) - # Store in cache_manager for proper tracking (returns tuple) - cached, ipfs_cid = cache_manager.put(result.output_path, node_type="dag_output") - - # Store in database (for L2 to query IPFS CID) + # Store output in database (for L2 to query IPFS CID) import asyncio import database async def save_to_db(): if database.pool is None: await database.init_db() - await database.create_cache_item(output_hash, ipfs_cid) + await database.create_cache_item(output_hash, output_ipfs_cid) + # Also save the run result + if run_id: + input_hashes_for_db = [ + node.config.get("content_hash") + for node in dag.nodes.values() + if (node.node_type == NodeType.SOURCE or str(node.node_type) == "SOURCE") + and node.config.get("content_hash") + ] + await database.save_run_cache( + run_id=run_id, + output_hash=output_hash, + recipe="dag", + inputs=input_hashes_for_db, + ipfs_cid=output_ipfs_cid, + ) try: loop = asyncio.get_event_loop() @@ -356,18 +402,27 @@ def execute_dag(self, dag_json: str, run_id: str = None) -> dict: # Record activity for deletion tracking input_hashes = [] + intermediate_hashes = [] for node_id, node in dag.nodes.items(): if node.node_type == NodeType.SOURCE or str(node.node_type) == "SOURCE": content_hash = node.config.get("content_hash") if content_hash: input_hashes.append(content_hash) + elif node_id != dag.output and node_id in node_hashes: + intermediate_hashes.append(node_hashes[node_id]) if input_hashes: - cache_manager.record_simple_activity( - input_hashes=input_hashes, - output_hash=output_hash, - run_id=run_id, + from artdag.activities import Activity + from datetime import datetime, timezone + activity = Activity( + activity_id=run_id or f"dag-{output_hash[:16]}", + input_ids=sorted(input_hashes), + output_id=output_hash, + intermediate_ids=intermediate_hashes, + created_at=datetime.now(timezone.utc).timestamp(), + status="completed", ) + cache_manager.activity_store.add(activity) # Build provenance input_hashes_for_provenance = [] @@ -383,9 +438,18 @@ def execute_dag(self, dag_json: str, run_id: str = None) -> dict: "rendered_at": datetime.now(timezone.utc).isoformat(), "output": { "content_hash": output_hash, + "ipfs_cid": node_ipfs_cids.get(dag.output) if dag.output else None, }, "inputs": input_hashes_for_provenance, "dag": dag_json, # Full DAG definition + "nodes": { + node_id: { + "content_hash": node_hashes.get(node_id), + "ipfs_cid": node_ipfs_cids.get(node_id), + } + for node_id in dag.nodes.keys() + if node_id in node_hashes + }, "execution": { "execution_time": result.execution_time, "nodes_executed": result.nodes_executed, @@ -407,6 +471,7 @@ def execute_dag(self, dag_json: str, run_id: str = None) -> dict: "success": True, "run_id": run_id, "output_hash": output_hash, + "output_ipfs_cid": node_ipfs_cids.get(dag.output) if dag.output else None, "output_path": str(result.output_path) if result.output_path else None, "execution_time": result.execution_time, "nodes_executed": result.nodes_executed, @@ -414,6 +479,8 @@ def execute_dag(self, dag_json: str, run_id: str = None) -> dict: "node_results": { node_id: str(path) for node_id, path in result.node_results.items() }, + "node_hashes": node_hashes, # node_id -> content_hash + "node_ipfs_cids": node_ipfs_cids, # node_id -> ipfs_cid "provenance_cid": provenance_cid, }