- Fix all registry lookups to use "cid" instead of "hash" key - app/routers/recipes.py: asset and effect resolution - tasks/execute_sexp.py: effect config lookups - server_legacy.py references (now deleted) - Prefer IPFS CID over local hash in cache operations - cache_service.py: import_from_ipfs, upload_content - orchestrate.py: plan caching - legacy_tasks.py: node hash tracking Remove ~7800 lines of dead code: - server_legacy.py: replaced by modular app/ structure - tasks/*_cid.py: unused refactoring only imported by server_legacy Co-Authored-By: Claude Opus 4.5 <noreply@anthropic.com>
583 lines
19 KiB
Python
583 lines
19 KiB
Python
"""
|
|
S-expression step execution task.
|
|
|
|
Executes individual steps received as S-expressions.
|
|
The S-expression is the canonical format - workers verify
|
|
cache_ids by hashing the received S-expression.
|
|
"""
|
|
|
|
import json
|
|
import logging
|
|
import os
|
|
import socket
|
|
from pathlib import Path
|
|
from typing import Dict, Optional
|
|
|
|
from celery import current_task
|
|
|
|
# Import from the Celery app
|
|
import sys
|
|
sys.path.insert(0, os.path.dirname(os.path.dirname(os.path.abspath(__file__))))
|
|
from celery_app import app
|
|
from claiming import (
|
|
get_claimer,
|
|
claim_task,
|
|
complete_task,
|
|
fail_task,
|
|
ClaimStatus,
|
|
)
|
|
from cache_manager import get_cache_manager
|
|
|
|
# Import artdag S-expression modules
|
|
try:
|
|
from artdag.sexp import parse, Symbol, Keyword
|
|
from artdag import NodeType
|
|
from artdag.executor import get_executor
|
|
except ImportError:
|
|
parse = None
|
|
Symbol = None
|
|
Keyword = None
|
|
NodeType = None
|
|
get_executor = None
|
|
|
|
logger = logging.getLogger(__name__)
|
|
|
|
|
|
def get_worker_id() -> str:
|
|
"""Get a unique identifier for this worker."""
|
|
hostname = socket.gethostname()
|
|
pid = os.getpid()
|
|
return f"{hostname}:{pid}"
|
|
|
|
|
|
def sexp_to_config(sexp) -> Dict:
|
|
"""
|
|
Convert parsed S-expression to config dict.
|
|
|
|
Input: (effect :hash "abc123" :inputs ["step1"])
|
|
Output: {"node_type": "EFFECT", "hash": "abc123", "inputs": ["step1"]}
|
|
"""
|
|
if not isinstance(sexp, list) or len(sexp) < 1:
|
|
raise ValueError(f"Invalid step S-expression: {sexp}")
|
|
|
|
# First element is the node type
|
|
head = sexp[0]
|
|
if isinstance(head, Symbol):
|
|
node_type = head.name.upper()
|
|
else:
|
|
node_type = str(head).upper()
|
|
|
|
config = {"node_type": node_type}
|
|
|
|
# Parse keyword arguments
|
|
i = 1
|
|
while i < len(sexp):
|
|
item = sexp[i]
|
|
if isinstance(item, Keyword):
|
|
if i + 1 < len(sexp):
|
|
key = item.name.replace('-', '_')
|
|
value = sexp[i + 1]
|
|
config[key] = value
|
|
i += 2
|
|
else:
|
|
i += 1
|
|
else:
|
|
# Positional argument
|
|
i += 1
|
|
|
|
return config
|
|
|
|
|
|
@app.task(bind=True, name='tasks.execute_step_sexp')
|
|
def execute_step_sexp(
|
|
self,
|
|
step_sexp: str,
|
|
step_id: str,
|
|
cache_id: str,
|
|
plan_id: str,
|
|
input_cache_ids: Dict[str, str],
|
|
) -> dict:
|
|
"""
|
|
Execute a single step from an S-expression.
|
|
|
|
The step is received as a serialized S-expression string.
|
|
Workers can verify the cache_id by hashing the S-expression.
|
|
|
|
Args:
|
|
step_sexp: Serialized S-expression for the step
|
|
step_id: Human-readable step identifier
|
|
cache_id: Expected cache_id (SHA3-256 of step_sexp)
|
|
plan_id: ID of the parent execution plan
|
|
input_cache_ids: Mapping from input step_id to their cache_id
|
|
|
|
Returns:
|
|
Dict with execution result
|
|
"""
|
|
if parse is None:
|
|
raise ImportError("artdag.sexp not available")
|
|
|
|
worker_id = get_worker_id()
|
|
task_id = self.request.id
|
|
|
|
logger.info(f"Executing step {step_id} cache_id={cache_id[:16]}...")
|
|
logger.debug(f"Step S-expression: {step_sexp[:100]}...")
|
|
|
|
# Parse the S-expression
|
|
try:
|
|
parsed = parse(step_sexp)
|
|
config = sexp_to_config(parsed)
|
|
node_type = config.pop("node_type")
|
|
except Exception as e:
|
|
logger.error(f"Failed to parse step S-expression: {e}")
|
|
return {
|
|
"status": "failed",
|
|
"step_id": step_id,
|
|
"cache_id": cache_id,
|
|
"error": f"Parse error: {e}",
|
|
}
|
|
|
|
# Get cache manager
|
|
cache_mgr = get_cache_manager()
|
|
|
|
# Check if already cached
|
|
cached_path = cache_mgr.get_by_cid(cache_id)
|
|
if cached_path:
|
|
logger.info(f"Step {step_id} already cached at {cached_path}")
|
|
|
|
claimer = get_claimer()
|
|
claimer.mark_cached(cache_id, str(cached_path))
|
|
|
|
return {
|
|
"status": "cached",
|
|
"step_id": step_id,
|
|
"cache_id": cache_id,
|
|
"output_path": str(cached_path),
|
|
}
|
|
|
|
# Try to claim the task
|
|
if not claim_task(cache_id, worker_id, task_id):
|
|
logger.info(f"Step {step_id} claimed by another worker, waiting...")
|
|
|
|
claimer = get_claimer()
|
|
result = claimer.wait_for_completion(cache_id, timeout=600)
|
|
|
|
if result and result.status == ClaimStatus.COMPLETED:
|
|
return {
|
|
"status": "completed_by_other",
|
|
"step_id": step_id,
|
|
"cache_id": cache_id,
|
|
"output_path": result.output_path,
|
|
}
|
|
elif result and result.status == ClaimStatus.CACHED:
|
|
return {
|
|
"status": "cached",
|
|
"step_id": step_id,
|
|
"cache_id": cache_id,
|
|
"output_path": result.output_path,
|
|
}
|
|
elif result and result.status == ClaimStatus.FAILED:
|
|
return {
|
|
"status": "failed",
|
|
"step_id": step_id,
|
|
"cache_id": cache_id,
|
|
"error": result.error,
|
|
}
|
|
else:
|
|
return {
|
|
"status": "timeout",
|
|
"step_id": step_id,
|
|
"cache_id": cache_id,
|
|
"error": "Timeout waiting for other worker",
|
|
}
|
|
|
|
# We have the claim, update to running
|
|
claimer = get_claimer()
|
|
claimer.update_status(cache_id, worker_id, ClaimStatus.RUNNING)
|
|
|
|
try:
|
|
# Handle SOURCE nodes
|
|
if node_type == "SOURCE":
|
|
# Support both :cid (new IPFS) and :hash (legacy)
|
|
content_id = config.get("cid") or config.get("hash")
|
|
if not content_id:
|
|
raise ValueError("SOURCE step missing :cid or :hash")
|
|
|
|
path = cache_mgr.get_by_cid(content_id)
|
|
if not path:
|
|
raise ValueError(f"SOURCE input not found: {content_id[:16]}...")
|
|
|
|
output_path = str(path)
|
|
complete_task(cache_id, worker_id, output_path)
|
|
return {
|
|
"status": "completed",
|
|
"step_id": step_id,
|
|
"cache_id": cache_id,
|
|
"output_path": output_path,
|
|
}
|
|
|
|
# Handle EFFECT nodes
|
|
if node_type == "EFFECT":
|
|
effect_hash = config.get("cid") or config.get("hash")
|
|
if not effect_hash:
|
|
raise ValueError("EFFECT step missing :cid")
|
|
|
|
# Get input paths
|
|
inputs = config.get("inputs", [])
|
|
input_paths = []
|
|
for inp in inputs:
|
|
inp_cache_id = input_cache_ids.get(inp, inp)
|
|
path = cache_mgr.get_by_cid(inp_cache_id)
|
|
if not path:
|
|
raise ValueError(f"Input not found: {inp_cache_id[:16]}...")
|
|
input_paths.append(Path(path))
|
|
|
|
# Get executor
|
|
try:
|
|
executor = get_executor(NodeType.SOURCE) # Effects use SOURCE executor for now
|
|
except:
|
|
executor = None
|
|
|
|
if executor is None:
|
|
# Fallback: copy input to output (identity-like behavior)
|
|
if input_paths:
|
|
output_path = str(input_paths[0])
|
|
complete_task(cache_id, worker_id, output_path)
|
|
return {
|
|
"status": "completed",
|
|
"step_id": step_id,
|
|
"cache_id": cache_id,
|
|
"output_path": output_path,
|
|
}
|
|
raise ValueError(f"No executor for EFFECT and no inputs")
|
|
|
|
# Handle COMPOUND nodes (collapsed effect chains)
|
|
if node_type == "COMPOUND":
|
|
filter_chain = config.get("filter_chain", [])
|
|
if not filter_chain:
|
|
raise ValueError("COMPOUND step has empty filter_chain")
|
|
|
|
# Get input paths
|
|
inputs = config.get("inputs", [])
|
|
input_paths = []
|
|
for inp in inputs:
|
|
inp_cache_id = input_cache_ids.get(inp, inp)
|
|
path = cache_mgr.get_by_cid(inp_cache_id)
|
|
if not path:
|
|
raise ValueError(f"Input not found: {inp_cache_id[:16]}...")
|
|
input_paths.append(Path(path))
|
|
|
|
if not input_paths:
|
|
raise ValueError("COMPOUND step has no inputs")
|
|
|
|
# Build FFmpeg filter graph from chain
|
|
filters = []
|
|
for i, filter_item in enumerate(filter_chain):
|
|
filter_type = filter_item.get("type", "")
|
|
filter_config = filter_item.get("config", {})
|
|
|
|
if filter_type == "EFFECT":
|
|
# Effect - for now identity-like, can be extended
|
|
effect_hash = filter_config.get("cid") or filter_config.get("hash") or filter_config.get("effect")
|
|
# TODO: resolve effect to actual FFmpeg filter
|
|
# For now, skip identity-like effects
|
|
pass
|
|
|
|
elif filter_type == "TRANSFORM":
|
|
# Transform effects map to FFmpeg filters
|
|
effects = filter_config.get("effects", {})
|
|
for eff_name, eff_value in effects.items():
|
|
if eff_name == "saturation":
|
|
filters.append(f"eq=saturation={eff_value}")
|
|
elif eff_name == "brightness":
|
|
filters.append(f"eq=brightness={eff_value}")
|
|
elif eff_name == "contrast":
|
|
filters.append(f"eq=contrast={eff_value}")
|
|
elif eff_name == "hue":
|
|
filters.append(f"hue=h={eff_value}")
|
|
|
|
elif filter_type == "RESIZE":
|
|
width = filter_config.get("width", -1)
|
|
height = filter_config.get("height", -1)
|
|
mode = filter_config.get("mode", "fit")
|
|
if mode == "fit":
|
|
filters.append(f"scale={width}:{height}:force_original_aspect_ratio=decrease")
|
|
elif mode == "fill":
|
|
filters.append(f"scale={width}:{height}:force_original_aspect_ratio=increase,crop={width}:{height}")
|
|
else:
|
|
filters.append(f"scale={width}:{height}")
|
|
|
|
elif filter_type == "SEGMENT":
|
|
# Segment handled via -ss and -t, not filter
|
|
pass
|
|
|
|
# Create temp output
|
|
import tempfile
|
|
import subprocess
|
|
|
|
output_dir = Path(tempfile.mkdtemp())
|
|
output_path = output_dir / f"compound_{cache_id[:16]}.mp4"
|
|
|
|
# Build FFmpeg command
|
|
input_path = input_paths[0]
|
|
cmd = ["ffmpeg", "-y", "-i", str(input_path)]
|
|
|
|
# Handle segment timing if present
|
|
for filter_item in filter_chain:
|
|
if filter_item.get("type") == "SEGMENT":
|
|
seg_config = filter_item.get("config", {})
|
|
if "start" in seg_config:
|
|
cmd.extend(["-ss", str(seg_config["start"])])
|
|
if "end" in seg_config:
|
|
duration = seg_config["end"] - seg_config.get("start", 0)
|
|
cmd.extend(["-t", str(duration)])
|
|
elif "duration" in seg_config:
|
|
cmd.extend(["-t", str(seg_config["duration"])])
|
|
|
|
# Add filter graph if any
|
|
if filters:
|
|
cmd.extend(["-vf", ",".join(filters)])
|
|
|
|
# Output options
|
|
cmd.extend(["-c:v", "libx264", "-c:a", "aac", str(output_path)])
|
|
|
|
logger.info(f"Running COMPOUND FFmpeg: {' '.join(cmd)}")
|
|
result = subprocess.run(cmd, capture_output=True, text=True)
|
|
|
|
if result.returncode != 0:
|
|
raise RuntimeError(f"FFmpeg failed: {result.stderr}")
|
|
|
|
# Store in cache
|
|
cached_file, ipfs_cid = cache_mgr.put(
|
|
source_path=output_path,
|
|
node_type="COMPOUND",
|
|
node_id=cache_id,
|
|
)
|
|
|
|
logger.info(f"COMPOUND step {step_id} completed with {len(filter_chain)} filters, IPFS CID: {ipfs_cid}")
|
|
complete_task(cache_id, worker_id, str(cached_file.path))
|
|
|
|
# Cleanup temp
|
|
if output_dir.exists():
|
|
import shutil
|
|
shutil.rmtree(output_dir, ignore_errors=True)
|
|
|
|
return {
|
|
"status": "completed",
|
|
"step_id": step_id,
|
|
"cache_id": cache_id,
|
|
"output_path": str(cached_file.path),
|
|
"cid": cached_file.cid,
|
|
"ipfs_cid": ipfs_cid,
|
|
"filter_count": len(filter_chain),
|
|
}
|
|
|
|
# Get executor for other node types
|
|
try:
|
|
node_type_enum = NodeType[node_type]
|
|
except (KeyError, TypeError):
|
|
node_type_enum = node_type
|
|
|
|
executor = get_executor(node_type_enum)
|
|
if executor is None:
|
|
raise ValueError(f"No executor for node type: {node_type}")
|
|
|
|
# Resolve input paths
|
|
inputs = config.pop("inputs", [])
|
|
input_paths = []
|
|
for inp in inputs:
|
|
inp_cache_id = input_cache_ids.get(inp, inp)
|
|
path = cache_mgr.get_by_cid(inp_cache_id)
|
|
if not path:
|
|
raise ValueError(f"Input not found: {inp_cache_id[:16]}...")
|
|
input_paths.append(Path(path))
|
|
|
|
# Create temp output
|
|
import tempfile
|
|
output_dir = Path(tempfile.mkdtemp())
|
|
output_path = output_dir / f"output_{cache_id[:16]}.mp4"
|
|
|
|
# Execute
|
|
logger.info(f"Running executor for {node_type} with {len(input_paths)} inputs")
|
|
result_path = executor.execute(config, input_paths, output_path)
|
|
|
|
# Store in cache
|
|
cached_file, ipfs_cid = cache_mgr.put(
|
|
source_path=result_path,
|
|
node_type=node_type,
|
|
node_id=cache_id,
|
|
)
|
|
|
|
logger.info(f"Step {step_id} completed, IPFS CID: {ipfs_cid}")
|
|
complete_task(cache_id, worker_id, str(cached_file.path))
|
|
|
|
# Cleanup temp
|
|
if output_dir.exists():
|
|
import shutil
|
|
shutil.rmtree(output_dir, ignore_errors=True)
|
|
|
|
return {
|
|
"status": "completed",
|
|
"step_id": step_id,
|
|
"cache_id": cache_id,
|
|
"output_path": str(cached_file.path),
|
|
"cid": cached_file.cid,
|
|
"ipfs_cid": ipfs_cid,
|
|
}
|
|
|
|
except Exception as e:
|
|
logger.error(f"Step {step_id} failed: {e}")
|
|
fail_task(cache_id, worker_id, str(e))
|
|
|
|
return {
|
|
"status": "failed",
|
|
"step_id": step_id,
|
|
"cache_id": cache_id,
|
|
"error": str(e),
|
|
}
|
|
|
|
|
|
@app.task(bind=True, name='tasks.run_plan_sexp')
|
|
def run_plan_sexp(
|
|
self,
|
|
plan_sexp: str,
|
|
run_id: Optional[str] = None,
|
|
) -> dict:
|
|
"""
|
|
Execute a complete S-expression execution plan.
|
|
|
|
Args:
|
|
plan_sexp: Serialized S-expression plan
|
|
run_id: Optional run ID for tracking
|
|
|
|
Returns:
|
|
Dict with execution results
|
|
"""
|
|
if parse is None:
|
|
raise ImportError("artdag.sexp not available")
|
|
|
|
from artdag.sexp.scheduler import PlanScheduler
|
|
from artdag.sexp.planner import ExecutionPlanSexp, PlanStep
|
|
|
|
logger.info(f"Running plan from S-expression (run_id={run_id})")
|
|
|
|
# Parse the plan S-expression
|
|
parsed = parse(plan_sexp)
|
|
|
|
# Extract plan metadata and steps
|
|
plan_id = None
|
|
recipe_id = None
|
|
recipe_hash = None
|
|
inputs = {}
|
|
steps = []
|
|
output_step_id = None
|
|
|
|
i = 1
|
|
while i < len(parsed):
|
|
item = parsed[i]
|
|
|
|
if isinstance(item, Keyword):
|
|
key = item.name
|
|
if i + 1 < len(parsed):
|
|
value = parsed[i + 1]
|
|
|
|
if key == "id":
|
|
plan_id = value
|
|
elif key == "recipe":
|
|
recipe_id = value
|
|
elif key == "recipe-hash":
|
|
recipe_hash = value
|
|
elif key == "output":
|
|
output_step_id = value
|
|
|
|
i += 2
|
|
else:
|
|
i += 1
|
|
|
|
elif isinstance(item, list) and len(item) > 0:
|
|
head = item[0]
|
|
if isinstance(head, Symbol):
|
|
if head.name == "inputs":
|
|
# Parse inputs block
|
|
for j in range(1, len(item)):
|
|
inp = item[j]
|
|
if isinstance(inp, list) and len(inp) >= 2:
|
|
name = inp[0].name if isinstance(inp[0], Symbol) else str(inp[0])
|
|
value = inp[1]
|
|
inputs[name] = value
|
|
|
|
elif head.name == "step":
|
|
# Parse step
|
|
step_id = item[1] if len(item) > 1 else None
|
|
step_cache_id = None
|
|
step_level = 0
|
|
step_node = None
|
|
|
|
j = 2
|
|
while j < len(item):
|
|
sub = item[j]
|
|
if isinstance(sub, Keyword):
|
|
if sub.name == "cache-id" and j + 1 < len(item):
|
|
step_cache_id = item[j + 1]
|
|
j += 2
|
|
elif sub.name == "level" and j + 1 < len(item):
|
|
step_level = item[j + 1]
|
|
j += 2
|
|
else:
|
|
j += 1
|
|
elif isinstance(sub, list):
|
|
step_node = sub
|
|
j += 1
|
|
else:
|
|
j += 1
|
|
|
|
if step_id and step_cache_id and step_node:
|
|
# Convert step_node to config
|
|
config = sexp_to_config(step_node)
|
|
node_type = config.pop("node_type")
|
|
step_inputs = config.pop("inputs", [])
|
|
|
|
steps.append(PlanStep(
|
|
step_id=step_id,
|
|
node_type=node_type,
|
|
config=config,
|
|
inputs=step_inputs if isinstance(step_inputs, list) else [],
|
|
cache_id=step_cache_id,
|
|
level=step_level,
|
|
))
|
|
i += 1
|
|
else:
|
|
i += 1
|
|
|
|
# Create plan object
|
|
plan = ExecutionPlanSexp(
|
|
plan_id=plan_id or "unknown",
|
|
recipe_id=recipe_id or "unknown",
|
|
recipe_hash=recipe_hash or "",
|
|
steps=steps,
|
|
output_step_id=output_step_id or (steps[-1].step_id if steps else ""),
|
|
inputs=inputs,
|
|
)
|
|
|
|
# Create scheduler and run
|
|
cache_mgr = get_cache_manager()
|
|
scheduler = PlanScheduler(
|
|
cache_manager=cache_mgr,
|
|
celery_app=app,
|
|
execute_task_name='tasks.execute_step_sexp',
|
|
)
|
|
|
|
result = scheduler.schedule(plan)
|
|
|
|
return {
|
|
"status": result.status,
|
|
"run_id": run_id,
|
|
"plan_id": result.plan_id,
|
|
"output_cache_id": result.output_cache_id,
|
|
"output_path": result.output_path,
|
|
"output_ipfs_cid": result.output_ipfs_cid,
|
|
"steps_completed": result.steps_completed,
|
|
"steps_cached": result.steps_cached,
|
|
"steps_failed": result.steps_failed,
|
|
"error": result.error,
|
|
}
|