Files
celery/tasks/execute.py
gilesb bf188f4671 Remove dead code: execute_level, render_dog_from_cat, duplicate file_hash
- Remove execute_level() from tasks/execute.py (defined but never called)
- Remove render_dog_from_cat() from legacy_tasks.py (test convenience, never used)
- Remove duplicate file_hash() from legacy_tasks.py, import from cache_manager
- Remove unused hashlib import

Co-Authored-By: Claude Opus 4.5 <noreply@anthropic.com>
2026-01-13 01:21:55 +00:00

382 lines
13 KiB
Python

"""
Step execution task.
Phase 3 of the 3-phase execution model.
Executes individual steps from an execution plan with IPFS-backed caching.
"""
import json
import logging
import os
import socket
from pathlib import Path
from typing import Dict, List, Optional
from celery import current_task
# Import from the Celery app
import sys
sys.path.insert(0, os.path.dirname(os.path.dirname(os.path.abspath(__file__))))
from celery_app import app
from claiming import (
get_claimer,
claim_task,
complete_task,
fail_task,
ClaimStatus,
)
from cache_manager import get_cache_manager, L1CacheManager
# Import artdag
try:
from artdag import Cache, NodeType
from artdag.executor import get_executor
from artdag.planning import ExecutionStep
except ImportError:
Cache = None
NodeType = None
get_executor = None
ExecutionStep = None
logger = logging.getLogger(__name__)
def get_worker_id() -> str:
"""Get a unique identifier for this worker."""
hostname = socket.gethostname()
pid = os.getpid()
return f"{hostname}:{pid}"
@app.task(bind=True, name='tasks.execute_step')
def execute_step(
self,
step_json: str,
plan_id: str,
input_cache_ids: Dict[str, str],
) -> dict:
"""
Execute a single step from an execution plan.
Uses hash-based claiming to prevent duplicate work.
Results are stored in IPFS-backed cache.
Args:
step_json: JSON-serialized ExecutionStep
plan_id: ID of the parent execution plan
input_cache_ids: Mapping from input step_id to their cache_id
Returns:
Dict with execution result
"""
if ExecutionStep is None:
raise ImportError("artdag.planning not available")
step = ExecutionStep.from_json(step_json)
worker_id = get_worker_id()
task_id = self.request.id
logger.info(f"Executing step {step.step_id} ({step.node_type}) cache_id={step.cache_id[:16]}...")
# Get L1 cache manager (IPFS-backed)
cache_mgr = get_cache_manager()
# Check if already cached (by cache_id as cid)
cached_path = cache_mgr.get_by_cid(step.cache_id)
if cached_path:
logger.info(f"Step {step.step_id} already cached at {cached_path}")
# Mark as cached in claiming system
claimer = get_claimer()
claimer.mark_cached(step.cache_id, str(cached_path))
return {
"status": "cached",
"step_id": step.step_id,
"cache_id": step.cache_id,
"output_path": str(cached_path),
}
# Try to claim the task
if not claim_task(step.cache_id, worker_id, task_id):
# Another worker is handling it
logger.info(f"Step {step.step_id} claimed by another worker, waiting...")
claimer = get_claimer()
result = claimer.wait_for_completion(step.cache_id, timeout=600)
if result and result.status == ClaimStatus.COMPLETED:
return {
"status": "completed_by_other",
"step_id": step.step_id,
"cache_id": step.cache_id,
"output_path": result.output_path,
}
elif result and result.status == ClaimStatus.CACHED:
return {
"status": "cached",
"step_id": step.step_id,
"cache_id": step.cache_id,
"output_path": result.output_path,
}
elif result and result.status == ClaimStatus.FAILED:
return {
"status": "failed",
"step_id": step.step_id,
"cache_id": step.cache_id,
"error": result.error,
}
else:
return {
"status": "timeout",
"step_id": step.step_id,
"cache_id": step.cache_id,
"error": "Timeout waiting for other worker",
}
# We have the claim, update to running
claimer = get_claimer()
claimer.update_status(step.cache_id, worker_id, ClaimStatus.RUNNING)
try:
# Handle SOURCE nodes
if step.node_type == "SOURCE":
cid = step.config.get("cid")
if not cid:
raise ValueError(f"SOURCE step missing cid")
# Look up in cache
path = cache_mgr.get_by_cid(cid)
if not path:
raise ValueError(f"SOURCE input not found in cache: {cid[:16]}...")
output_path = str(path)
complete_task(step.cache_id, worker_id, output_path)
return {
"status": "completed",
"step_id": step.step_id,
"cache_id": step.cache_id,
"output_path": output_path,
}
# Handle _LIST virtual nodes
if step.node_type == "_LIST":
item_paths = []
for item_id in step.config.get("items", []):
item_cache_id = input_cache_ids.get(item_id)
if item_cache_id:
path = cache_mgr.get_by_cid(item_cache_id)
if path:
item_paths.append(str(path))
complete_task(step.cache_id, worker_id, json.dumps(item_paths))
return {
"status": "completed",
"step_id": step.step_id,
"cache_id": step.cache_id,
"output_path": None,
"item_paths": item_paths,
}
# Handle COMPOUND nodes (collapsed effect chains)
if step.node_type == "COMPOUND":
filter_chain = step.config.get("filter_chain", [])
if not filter_chain:
raise ValueError("COMPOUND step has empty filter_chain")
# Resolve input paths
input_paths = []
for input_step_id in step.input_steps:
input_cache_id = input_cache_ids.get(input_step_id)
if not input_cache_id:
raise ValueError(f"No cache_id for input step: {input_step_id}")
path = cache_mgr.get_by_cid(input_cache_id)
if not path:
raise ValueError(f"Input not in cache: {input_cache_id[:16]}...")
input_paths.append(Path(path))
if not input_paths:
raise ValueError("COMPOUND step has no inputs")
# Build FFmpeg filter graph from chain
import subprocess
import tempfile
filters = []
for filter_item in filter_chain:
filter_type = filter_item.get("type", "")
filter_config = filter_item.get("config", {})
if filter_type == "TRANSFORM":
effects = filter_config.get("effects", {})
for eff_name, eff_value in effects.items():
if eff_name == "saturation":
filters.append(f"eq=saturation={eff_value}")
elif eff_name == "brightness":
filters.append(f"eq=brightness={eff_value}")
elif eff_name == "contrast":
filters.append(f"eq=contrast={eff_value}")
elif eff_name == "hue":
filters.append(f"hue=h={eff_value}")
elif filter_type == "RESIZE":
width = filter_config.get("width", -1)
height = filter_config.get("height", -1)
mode = filter_config.get("mode", "fit")
if mode == "fit":
filters.append(f"scale={width}:{height}:force_original_aspect_ratio=decrease")
elif mode == "fill":
filters.append(f"scale={width}:{height}:force_original_aspect_ratio=increase,crop={width}:{height}")
else:
filters.append(f"scale={width}:{height}")
output_dir = Path(tempfile.mkdtemp())
output_path = output_dir / f"compound_{step.cache_id[:16]}.mp4"
cmd = ["ffmpeg", "-y", "-i", str(input_paths[0])]
# Handle segment timing
for filter_item in filter_chain:
if filter_item.get("type") == "SEGMENT":
seg_config = filter_item.get("config", {})
if "start" in seg_config:
cmd.extend(["-ss", str(seg_config["start"])])
if "end" in seg_config:
duration = seg_config["end"] - seg_config.get("start", 0)
cmd.extend(["-t", str(duration)])
elif "duration" in seg_config:
cmd.extend(["-t", str(seg_config["duration"])])
if filters:
cmd.extend(["-vf", ",".join(filters)])
cmd.extend(["-c:v", "libx264", "-c:a", "aac", str(output_path)])
logger.info(f"Running COMPOUND FFmpeg: {' '.join(cmd)}")
result = subprocess.run(cmd, capture_output=True, text=True)
if result.returncode != 0:
raise RuntimeError(f"FFmpeg failed: {result.stderr}")
cached_file, ipfs_cid = cache_mgr.put(
source_path=output_path,
node_type="COMPOUND",
node_id=step.cache_id,
)
logger.info(f"COMPOUND step {step.step_id} completed with {len(filter_chain)} filters")
complete_task(step.cache_id, worker_id, str(cached_file.path))
import shutil
if output_dir.exists():
shutil.rmtree(output_dir, ignore_errors=True)
return {
"status": "completed",
"step_id": step.step_id,
"cache_id": step.cache_id,
"output_path": str(cached_file.path),
"cid": cached_file.cid,
"ipfs_cid": ipfs_cid,
"filter_count": len(filter_chain),
}
# Get executor for this node type
try:
node_type = NodeType[step.node_type]
except KeyError:
node_type = step.node_type
executor = get_executor(node_type)
if executor is None:
raise ValueError(f"No executor for node type: {step.node_type}")
# Resolve input paths from cache
input_paths = []
for input_step_id in step.input_steps:
input_cache_id = input_cache_ids.get(input_step_id)
if not input_cache_id:
raise ValueError(f"No cache_id for input step: {input_step_id}")
path = cache_mgr.get_by_cid(input_cache_id)
if not path:
raise ValueError(f"Input not in cache: {input_cache_id[:16]}...")
input_paths.append(Path(path))
# Create temp output path
import tempfile
output_dir = Path(tempfile.mkdtemp())
output_path = output_dir / f"output_{step.cache_id[:16]}.mp4"
# Execute
logger.info(f"Running executor for {step.node_type} with {len(input_paths)} inputs")
result_path = executor.execute(step.config, input_paths, output_path)
# Store in IPFS-backed cache
cached_file, ipfs_cid = cache_mgr.put(
source_path=result_path,
node_type=step.node_type,
node_id=step.cache_id,
)
logger.info(f"Step {step.step_id} completed, IPFS CID: {ipfs_cid}")
# Mark completed
complete_task(step.cache_id, worker_id, str(cached_file.path))
# Build outputs list (for multi-output support)
outputs = []
if step.outputs:
# Use pre-defined outputs from step
for output_def in step.outputs:
outputs.append({
"name": output_def.name,
"cache_id": output_def.cache_id,
"media_type": output_def.media_type,
"index": output_def.index,
"path": str(cached_file.path),
"cid": cached_file.cid,
"ipfs_cid": ipfs_cid,
})
else:
# Single output (backwards compat)
outputs.append({
"name": step.name or step.step_id,
"cache_id": step.cache_id,
"media_type": "video/mp4",
"index": 0,
"path": str(cached_file.path),
"cid": cached_file.cid,
"ipfs_cid": ipfs_cid,
})
# Cleanup temp
if output_dir.exists():
import shutil
shutil.rmtree(output_dir, ignore_errors=True)
return {
"status": "completed",
"step_id": step.step_id,
"name": step.name,
"cache_id": step.cache_id,
"output_path": str(cached_file.path),
"cid": cached_file.cid,
"ipfs_cid": ipfs_cid,
"outputs": outputs,
}
except Exception as e:
logger.error(f"Step {step.step_id} failed: {e}")
fail_task(step.cache_id, worker_id, str(e))
return {
"status": "failed",
"step_id": step.step_id,
"cache_id": step.cache_id,
"error": str(e),
}