Index all DAG node outputs by content_hash and upload to IPFS

- Process all node_results after DAG execution
- Store each intermediate/effect output in cache_manager
- Upload all node outputs to IPFS (not just final output)
- Track node_hashes and node_ipfs_cids mappings
- Save run result to database with run_id
- Include nodes with content_hash + ipfs_cid in provenance
- Return node_hashes and node_ipfs_cids in task result

All DAG nodes are now content-addressable via /cache/{content_hash}

Co-Authored-By: Claude Opus 4.5 <noreply@anthropic.com>
This commit is contained in:
giles
2026-01-11 14:08:41 +00:00
parent 854396680f
commit 4b22fb6588

View File

@@ -327,23 +327,69 @@ def execute_dag(self, dag_json: str, run_id: str = None) -> dict:
if not result.success:
raise RuntimeError(f"DAG execution failed: {result.error}")
# Get output hash
# Index all node outputs by content_hash and upload to IPFS
cache_manager = get_cache_manager()
output_hash = None
node_hashes = {} # node_id -> content_hash mapping
node_ipfs_cids = {} # node_id -> ipfs_cid mapping
# Process all node results (intermediates + output)
for node_id, node_path in result.node_results.items():
if node_path and Path(node_path).exists():
node = dag.nodes.get(node_id)
# Skip SOURCE nodes - they're already in cache
if node and (node.node_type == NodeType.SOURCE or str(node.node_type) == "SOURCE"):
content_hash = node.config.get("content_hash")
if content_hash:
node_hashes[node_id] = content_hash
continue
# Determine node type for cache metadata
node_type_str = str(node.node_type) if node else "intermediate"
if "effect" in node_type_str.lower():
cache_node_type = "effect_output"
else:
cache_node_type = "dag_intermediate"
# Store in cache_manager (indexes by content_hash, uploads to IPFS)
cached, ipfs_cid = cache_manager.put(
Path(node_path),
node_type=cache_node_type,
node_id=node_id,
)
node_hashes[node_id] = cached.content_hash
if ipfs_cid:
node_ipfs_cids[node_id] = ipfs_cid
logger.info(f"Cached node {node_id}: {cached.content_hash[:16]}... -> {ipfs_cid or 'no IPFS'}")
# Get output hash from the output node
if result.output_path and result.output_path.exists():
output_hash = file_hash(result.output_path)
output_ipfs_cid = node_ipfs_cids.get(dag.output)
# Store in cache_manager for proper tracking (returns tuple)
cached, ipfs_cid = cache_manager.put(result.output_path, node_type="dag_output")
# Store in database (for L2 to query IPFS CID)
# Store output in database (for L2 to query IPFS CID)
import asyncio
import database
async def save_to_db():
if database.pool is None:
await database.init_db()
await database.create_cache_item(output_hash, ipfs_cid)
await database.create_cache_item(output_hash, output_ipfs_cid)
# Also save the run result
if run_id:
input_hashes_for_db = [
node.config.get("content_hash")
for node in dag.nodes.values()
if (node.node_type == NodeType.SOURCE or str(node.node_type) == "SOURCE")
and node.config.get("content_hash")
]
await database.save_run_cache(
run_id=run_id,
output_hash=output_hash,
recipe="dag",
inputs=input_hashes_for_db,
ipfs_cid=output_ipfs_cid,
)
try:
loop = asyncio.get_event_loop()
@@ -356,18 +402,27 @@ def execute_dag(self, dag_json: str, run_id: str = None) -> dict:
# Record activity for deletion tracking
input_hashes = []
intermediate_hashes = []
for node_id, node in dag.nodes.items():
if node.node_type == NodeType.SOURCE or str(node.node_type) == "SOURCE":
content_hash = node.config.get("content_hash")
if content_hash:
input_hashes.append(content_hash)
elif node_id != dag.output and node_id in node_hashes:
intermediate_hashes.append(node_hashes[node_id])
if input_hashes:
cache_manager.record_simple_activity(
input_hashes=input_hashes,
output_hash=output_hash,
run_id=run_id,
from artdag.activities import Activity
from datetime import datetime, timezone
activity = Activity(
activity_id=run_id or f"dag-{output_hash[:16]}",
input_ids=sorted(input_hashes),
output_id=output_hash,
intermediate_ids=intermediate_hashes,
created_at=datetime.now(timezone.utc).timestamp(),
status="completed",
)
cache_manager.activity_store.add(activity)
# Build provenance
input_hashes_for_provenance = []
@@ -383,9 +438,18 @@ def execute_dag(self, dag_json: str, run_id: str = None) -> dict:
"rendered_at": datetime.now(timezone.utc).isoformat(),
"output": {
"content_hash": output_hash,
"ipfs_cid": node_ipfs_cids.get(dag.output) if dag.output else None,
},
"inputs": input_hashes_for_provenance,
"dag": dag_json, # Full DAG definition
"nodes": {
node_id: {
"content_hash": node_hashes.get(node_id),
"ipfs_cid": node_ipfs_cids.get(node_id),
}
for node_id in dag.nodes.keys()
if node_id in node_hashes
},
"execution": {
"execution_time": result.execution_time,
"nodes_executed": result.nodes_executed,
@@ -407,6 +471,7 @@ def execute_dag(self, dag_json: str, run_id: str = None) -> dict:
"success": True,
"run_id": run_id,
"output_hash": output_hash,
"output_ipfs_cid": node_ipfs_cids.get(dag.output) if dag.output else None,
"output_path": str(result.output_path) if result.output_path else None,
"execution_time": result.execution_time,
"nodes_executed": result.nodes_executed,
@@ -414,6 +479,8 @@ def execute_dag(self, dag_json: str, run_id: str = None) -> dict:
"node_results": {
node_id: str(path) for node_id, path in result.node_results.items()
},
"node_hashes": node_hashes, # node_id -> content_hash
"node_ipfs_cids": node_ipfs_cids, # node_id -> ipfs_cid
"provenance_cid": provenance_cid,
}