Refactor to use IPFS CID as the primary content identifier: - Update database schema: content_hash -> cid, output_hash -> output_cid - Update all services, routers, and tasks to use cid terminology - Update HTML templates to display CID instead of hash - Update cache_manager parameter names - Update README documentation This completes the transition to CID-only content addressing. Co-Authored-By: Claude Opus 4.5 <noreply@anthropic.com>
430 lines
14 KiB
Python
430 lines
14 KiB
Python
"""
|
|
Step execution task.
|
|
|
|
Phase 3 of the 3-phase execution model.
|
|
Executes individual steps from an execution plan with IPFS-backed caching.
|
|
"""
|
|
|
|
import json
|
|
import logging
|
|
import os
|
|
import socket
|
|
from pathlib import Path
|
|
from typing import Dict, List, Optional
|
|
|
|
from celery import current_task
|
|
|
|
# Import from the Celery app
|
|
import sys
|
|
sys.path.insert(0, os.path.dirname(os.path.dirname(os.path.abspath(__file__))))
|
|
from celery_app import app
|
|
from claiming import (
|
|
get_claimer,
|
|
claim_task,
|
|
complete_task,
|
|
fail_task,
|
|
ClaimStatus,
|
|
)
|
|
from cache_manager import get_cache_manager, L1CacheManager
|
|
|
|
# Import artdag
|
|
try:
|
|
from artdag import Cache, NodeType
|
|
from artdag.executor import get_executor
|
|
from artdag.planning import ExecutionStep
|
|
except ImportError:
|
|
Cache = None
|
|
NodeType = None
|
|
get_executor = None
|
|
ExecutionStep = None
|
|
|
|
logger = logging.getLogger(__name__)
|
|
|
|
|
|
def get_worker_id() -> str:
|
|
"""Get a unique identifier for this worker."""
|
|
hostname = socket.gethostname()
|
|
pid = os.getpid()
|
|
return f"{hostname}:{pid}"
|
|
|
|
|
|
@app.task(bind=True, name='tasks.execute_step')
|
|
def execute_step(
|
|
self,
|
|
step_json: str,
|
|
plan_id: str,
|
|
input_cache_ids: Dict[str, str],
|
|
) -> dict:
|
|
"""
|
|
Execute a single step from an execution plan.
|
|
|
|
Uses hash-based claiming to prevent duplicate work.
|
|
Results are stored in IPFS-backed cache.
|
|
|
|
Args:
|
|
step_json: JSON-serialized ExecutionStep
|
|
plan_id: ID of the parent execution plan
|
|
input_cache_ids: Mapping from input step_id to their cache_id
|
|
|
|
Returns:
|
|
Dict with execution result
|
|
"""
|
|
if ExecutionStep is None:
|
|
raise ImportError("artdag.planning not available")
|
|
|
|
step = ExecutionStep.from_json(step_json)
|
|
worker_id = get_worker_id()
|
|
task_id = self.request.id
|
|
|
|
logger.info(f"Executing step {step.step_id} ({step.node_type}) cache_id={step.cache_id[:16]}...")
|
|
|
|
# Get L1 cache manager (IPFS-backed)
|
|
cache_mgr = get_cache_manager()
|
|
|
|
# Check if already cached (by cache_id as cid)
|
|
cached_path = cache_mgr.get_by_cid(step.cache_id)
|
|
if cached_path:
|
|
logger.info(f"Step {step.step_id} already cached at {cached_path}")
|
|
|
|
# Mark as cached in claiming system
|
|
claimer = get_claimer()
|
|
claimer.mark_cached(step.cache_id, str(cached_path))
|
|
|
|
return {
|
|
"status": "cached",
|
|
"step_id": step.step_id,
|
|
"cache_id": step.cache_id,
|
|
"output_path": str(cached_path),
|
|
}
|
|
|
|
# Try to claim the task
|
|
if not claim_task(step.cache_id, worker_id, task_id):
|
|
# Another worker is handling it
|
|
logger.info(f"Step {step.step_id} claimed by another worker, waiting...")
|
|
|
|
claimer = get_claimer()
|
|
result = claimer.wait_for_completion(step.cache_id, timeout=600)
|
|
|
|
if result and result.status == ClaimStatus.COMPLETED:
|
|
return {
|
|
"status": "completed_by_other",
|
|
"step_id": step.step_id,
|
|
"cache_id": step.cache_id,
|
|
"output_path": result.output_path,
|
|
}
|
|
elif result and result.status == ClaimStatus.CACHED:
|
|
return {
|
|
"status": "cached",
|
|
"step_id": step.step_id,
|
|
"cache_id": step.cache_id,
|
|
"output_path": result.output_path,
|
|
}
|
|
elif result and result.status == ClaimStatus.FAILED:
|
|
return {
|
|
"status": "failed",
|
|
"step_id": step.step_id,
|
|
"cache_id": step.cache_id,
|
|
"error": result.error,
|
|
}
|
|
else:
|
|
return {
|
|
"status": "timeout",
|
|
"step_id": step.step_id,
|
|
"cache_id": step.cache_id,
|
|
"error": "Timeout waiting for other worker",
|
|
}
|
|
|
|
# We have the claim, update to running
|
|
claimer = get_claimer()
|
|
claimer.update_status(step.cache_id, worker_id, ClaimStatus.RUNNING)
|
|
|
|
try:
|
|
# Handle SOURCE nodes
|
|
if step.node_type == "SOURCE":
|
|
cid = step.config.get("cid")
|
|
if not cid:
|
|
raise ValueError(f"SOURCE step missing cid")
|
|
|
|
# Look up in cache
|
|
path = cache_mgr.get_by_cid(cid)
|
|
if not path:
|
|
raise ValueError(f"SOURCE input not found in cache: {cid[:16]}...")
|
|
|
|
output_path = str(path)
|
|
complete_task(step.cache_id, worker_id, output_path)
|
|
return {
|
|
"status": "completed",
|
|
"step_id": step.step_id,
|
|
"cache_id": step.cache_id,
|
|
"output_path": output_path,
|
|
}
|
|
|
|
# Handle _LIST virtual nodes
|
|
if step.node_type == "_LIST":
|
|
item_paths = []
|
|
for item_id in step.config.get("items", []):
|
|
item_cache_id = input_cache_ids.get(item_id)
|
|
if item_cache_id:
|
|
path = cache_mgr.get_by_cid(item_cache_id)
|
|
if path:
|
|
item_paths.append(str(path))
|
|
|
|
complete_task(step.cache_id, worker_id, json.dumps(item_paths))
|
|
return {
|
|
"status": "completed",
|
|
"step_id": step.step_id,
|
|
"cache_id": step.cache_id,
|
|
"output_path": None,
|
|
"item_paths": item_paths,
|
|
}
|
|
|
|
# Handle COMPOUND nodes (collapsed effect chains)
|
|
if step.node_type == "COMPOUND":
|
|
filter_chain = step.config.get("filter_chain", [])
|
|
if not filter_chain:
|
|
raise ValueError("COMPOUND step has empty filter_chain")
|
|
|
|
# Resolve input paths
|
|
input_paths = []
|
|
for input_step_id in step.input_steps:
|
|
input_cache_id = input_cache_ids.get(input_step_id)
|
|
if not input_cache_id:
|
|
raise ValueError(f"No cache_id for input step: {input_step_id}")
|
|
path = cache_mgr.get_by_cid(input_cache_id)
|
|
if not path:
|
|
raise ValueError(f"Input not in cache: {input_cache_id[:16]}...")
|
|
input_paths.append(Path(path))
|
|
|
|
if not input_paths:
|
|
raise ValueError("COMPOUND step has no inputs")
|
|
|
|
# Build FFmpeg filter graph from chain
|
|
import subprocess
|
|
import tempfile
|
|
|
|
filters = []
|
|
for filter_item in filter_chain:
|
|
filter_type = filter_item.get("type", "")
|
|
filter_config = filter_item.get("config", {})
|
|
|
|
if filter_type == "TRANSFORM":
|
|
effects = filter_config.get("effects", {})
|
|
for eff_name, eff_value in effects.items():
|
|
if eff_name == "saturation":
|
|
filters.append(f"eq=saturation={eff_value}")
|
|
elif eff_name == "brightness":
|
|
filters.append(f"eq=brightness={eff_value}")
|
|
elif eff_name == "contrast":
|
|
filters.append(f"eq=contrast={eff_value}")
|
|
elif eff_name == "hue":
|
|
filters.append(f"hue=h={eff_value}")
|
|
|
|
elif filter_type == "RESIZE":
|
|
width = filter_config.get("width", -1)
|
|
height = filter_config.get("height", -1)
|
|
mode = filter_config.get("mode", "fit")
|
|
if mode == "fit":
|
|
filters.append(f"scale={width}:{height}:force_original_aspect_ratio=decrease")
|
|
elif mode == "fill":
|
|
filters.append(f"scale={width}:{height}:force_original_aspect_ratio=increase,crop={width}:{height}")
|
|
else:
|
|
filters.append(f"scale={width}:{height}")
|
|
|
|
output_dir = Path(tempfile.mkdtemp())
|
|
output_path = output_dir / f"compound_{step.cache_id[:16]}.mp4"
|
|
|
|
cmd = ["ffmpeg", "-y", "-i", str(input_paths[0])]
|
|
|
|
# Handle segment timing
|
|
for filter_item in filter_chain:
|
|
if filter_item.get("type") == "SEGMENT":
|
|
seg_config = filter_item.get("config", {})
|
|
if "start" in seg_config:
|
|
cmd.extend(["-ss", str(seg_config["start"])])
|
|
if "end" in seg_config:
|
|
duration = seg_config["end"] - seg_config.get("start", 0)
|
|
cmd.extend(["-t", str(duration)])
|
|
elif "duration" in seg_config:
|
|
cmd.extend(["-t", str(seg_config["duration"])])
|
|
|
|
if filters:
|
|
cmd.extend(["-vf", ",".join(filters)])
|
|
|
|
cmd.extend(["-c:v", "libx264", "-c:a", "aac", str(output_path)])
|
|
|
|
logger.info(f"Running COMPOUND FFmpeg: {' '.join(cmd)}")
|
|
result = subprocess.run(cmd, capture_output=True, text=True)
|
|
|
|
if result.returncode != 0:
|
|
raise RuntimeError(f"FFmpeg failed: {result.stderr}")
|
|
|
|
cached_file, ipfs_cid = cache_mgr.put(
|
|
source_path=output_path,
|
|
node_type="COMPOUND",
|
|
node_id=step.cache_id,
|
|
)
|
|
|
|
logger.info(f"COMPOUND step {step.step_id} completed with {len(filter_chain)} filters")
|
|
complete_task(step.cache_id, worker_id, str(cached_file.path))
|
|
|
|
import shutil
|
|
if output_dir.exists():
|
|
shutil.rmtree(output_dir, ignore_errors=True)
|
|
|
|
return {
|
|
"status": "completed",
|
|
"step_id": step.step_id,
|
|
"cache_id": step.cache_id,
|
|
"output_path": str(cached_file.path),
|
|
"cid": cached_file.cid,
|
|
"ipfs_cid": ipfs_cid,
|
|
"filter_count": len(filter_chain),
|
|
}
|
|
|
|
# Get executor for this node type
|
|
try:
|
|
node_type = NodeType[step.node_type]
|
|
except KeyError:
|
|
node_type = step.node_type
|
|
|
|
executor = get_executor(node_type)
|
|
if executor is None:
|
|
raise ValueError(f"No executor for node type: {step.node_type}")
|
|
|
|
# Resolve input paths from cache
|
|
input_paths = []
|
|
for input_step_id in step.input_steps:
|
|
input_cache_id = input_cache_ids.get(input_step_id)
|
|
if not input_cache_id:
|
|
raise ValueError(f"No cache_id for input step: {input_step_id}")
|
|
|
|
path = cache_mgr.get_by_cid(input_cache_id)
|
|
if not path:
|
|
raise ValueError(f"Input not in cache: {input_cache_id[:16]}...")
|
|
|
|
input_paths.append(Path(path))
|
|
|
|
# Create temp output path
|
|
import tempfile
|
|
output_dir = Path(tempfile.mkdtemp())
|
|
output_path = output_dir / f"output_{step.cache_id[:16]}.mp4"
|
|
|
|
# Execute
|
|
logger.info(f"Running executor for {step.node_type} with {len(input_paths)} inputs")
|
|
result_path = executor.execute(step.config, input_paths, output_path)
|
|
|
|
# Store in IPFS-backed cache
|
|
cached_file, ipfs_cid = cache_mgr.put(
|
|
source_path=result_path,
|
|
node_type=step.node_type,
|
|
node_id=step.cache_id,
|
|
)
|
|
|
|
logger.info(f"Step {step.step_id} completed, IPFS CID: {ipfs_cid}")
|
|
|
|
# Mark completed
|
|
complete_task(step.cache_id, worker_id, str(cached_file.path))
|
|
|
|
# Build outputs list (for multi-output support)
|
|
outputs = []
|
|
if step.outputs:
|
|
# Use pre-defined outputs from step
|
|
for output_def in step.outputs:
|
|
outputs.append({
|
|
"name": output_def.name,
|
|
"cache_id": output_def.cache_id,
|
|
"media_type": output_def.media_type,
|
|
"index": output_def.index,
|
|
"path": str(cached_file.path),
|
|
"cid": cached_file.cid,
|
|
"ipfs_cid": ipfs_cid,
|
|
})
|
|
else:
|
|
# Single output (backwards compat)
|
|
outputs.append({
|
|
"name": step.name or step.step_id,
|
|
"cache_id": step.cache_id,
|
|
"media_type": "video/mp4",
|
|
"index": 0,
|
|
"path": str(cached_file.path),
|
|
"cid": cached_file.cid,
|
|
"ipfs_cid": ipfs_cid,
|
|
})
|
|
|
|
# Cleanup temp
|
|
if output_dir.exists():
|
|
import shutil
|
|
shutil.rmtree(output_dir, ignore_errors=True)
|
|
|
|
return {
|
|
"status": "completed",
|
|
"step_id": step.step_id,
|
|
"name": step.name,
|
|
"cache_id": step.cache_id,
|
|
"output_path": str(cached_file.path),
|
|
"cid": cached_file.cid,
|
|
"ipfs_cid": ipfs_cid,
|
|
"outputs": outputs,
|
|
}
|
|
|
|
except Exception as e:
|
|
logger.error(f"Step {step.step_id} failed: {e}")
|
|
fail_task(step.cache_id, worker_id, str(e))
|
|
|
|
return {
|
|
"status": "failed",
|
|
"step_id": step.step_id,
|
|
"cache_id": step.cache_id,
|
|
"error": str(e),
|
|
}
|
|
|
|
|
|
@app.task(bind=True, name='tasks.execute_level')
|
|
def execute_level(
|
|
self,
|
|
steps_json: List[str],
|
|
plan_id: str,
|
|
cache_ids: Dict[str, str],
|
|
) -> dict:
|
|
"""
|
|
Execute all steps at a given dependency level.
|
|
|
|
Steps at the same level can run in parallel.
|
|
|
|
Args:
|
|
steps_json: List of JSON-serialized ExecutionSteps
|
|
plan_id: ID of the parent execution plan
|
|
cache_ids: Mapping from step_id to cache_id
|
|
|
|
Returns:
|
|
Dict with results for all steps
|
|
"""
|
|
from celery import group
|
|
|
|
# Dispatch all steps in parallel
|
|
tasks = [
|
|
execute_step.s(step_json, plan_id, cache_ids)
|
|
for step_json in steps_json
|
|
]
|
|
|
|
# Execute in parallel and collect results
|
|
job = group(tasks)
|
|
results = job.apply_async()
|
|
|
|
# Wait for completion
|
|
step_results = results.get(timeout=3600) # 1 hour timeout
|
|
|
|
# Build cache_ids from results
|
|
new_cache_ids = dict(cache_ids)
|
|
for result in step_results:
|
|
step_id = result.get("step_id")
|
|
cache_id = result.get("cache_id")
|
|
if step_id and cache_id:
|
|
new_cache_ids[step_id] = cache_id
|
|
|
|
return {
|
|
"status": "completed",
|
|
"results": step_results,
|
|
"cache_ids": new_cache_ids,
|
|
}
|