1002 lines
40 KiB
Python
1002 lines
40 KiB
Python
"""
|
|
Run Service - business logic for run management.
|
|
|
|
Runs are content-addressed (run_id computed from inputs + recipe).
|
|
Completed runs are stored in PostgreSQL, not Redis.
|
|
In-progress runs are tracked via Celery task state.
|
|
"""
|
|
|
|
import hashlib
|
|
import json
|
|
import os
|
|
from datetime import datetime, timezone
|
|
from pathlib import Path
|
|
from typing import Optional, List, Dict, Any, Tuple, Union, TYPE_CHECKING
|
|
|
|
if TYPE_CHECKING:
|
|
import redis
|
|
from cache_manager import L1CacheManager
|
|
from database import Database
|
|
|
|
from ..types import RunResult
|
|
|
|
|
|
def compute_run_id(input_hashes: Union[List[str], Dict[str, str]], recipe: str, recipe_hash: Optional[str] = None) -> str:
|
|
"""
|
|
Compute a deterministic run_id from inputs and recipe.
|
|
|
|
The run_id is a SHA3-256 hash of:
|
|
- Sorted input content hashes
|
|
- Recipe identifier (recipe_hash if provided, else "effect:{recipe}")
|
|
|
|
This makes runs content-addressable: same inputs + recipe = same run_id.
|
|
"""
|
|
# Handle both list and dict inputs
|
|
if isinstance(input_hashes, dict):
|
|
sorted_inputs = sorted(input_hashes.values())
|
|
else:
|
|
sorted_inputs = sorted(input_hashes)
|
|
|
|
data = {
|
|
"inputs": sorted_inputs,
|
|
"recipe": recipe_hash or f"effect:{recipe}",
|
|
"version": "1",
|
|
}
|
|
json_str = json.dumps(data, sort_keys=True, separators=(",", ":"))
|
|
return hashlib.sha3_256(json_str.encode()).hexdigest()
|
|
|
|
|
|
def detect_media_type(cache_path: Path) -> str:
|
|
"""Detect if file is image, video, or audio based on magic bytes."""
|
|
try:
|
|
with open(cache_path, "rb") as f:
|
|
header = f.read(32)
|
|
except Exception:
|
|
return "unknown"
|
|
|
|
# Video signatures
|
|
if header[:4] == b'\x1a\x45\xdf\xa3': # WebM/MKV
|
|
return "video"
|
|
if len(header) > 8 and header[4:8] == b'ftyp': # MP4/MOV/M4A
|
|
# Check for audio-only M4A
|
|
if len(header) > 11 and header[8:12] in (b'M4A ', b'm4a '):
|
|
return "audio"
|
|
return "video"
|
|
if header[:4] == b'RIFF' and len(header) > 12 and header[8:12] == b'AVI ': # AVI
|
|
return "video"
|
|
|
|
# Image signatures
|
|
if header[:8] == b'\x89PNG\r\n\x1a\n': # PNG
|
|
return "image"
|
|
if header[:2] == b'\xff\xd8': # JPEG
|
|
return "image"
|
|
if header[:6] in (b'GIF87a', b'GIF89a'): # GIF
|
|
return "image"
|
|
if header[:4] == b'RIFF' and len(header) > 12 and header[8:12] == b'WEBP': # WebP
|
|
return "image"
|
|
|
|
# Audio signatures
|
|
if header[:3] == b'ID3' or header[:2] == b'\xff\xfb': # MP3
|
|
return "audio"
|
|
if header[:4] == b'fLaC': # FLAC
|
|
return "audio"
|
|
if header[:4] == b'OggS': # Ogg (could be audio or video, assume audio)
|
|
return "audio"
|
|
if header[:4] == b'RIFF' and len(header) > 12 and header[8:12] == b'WAVE': # WAV
|
|
return "audio"
|
|
|
|
return "unknown"
|
|
|
|
|
|
class RunService:
|
|
"""
|
|
Service for managing recipe runs.
|
|
|
|
Uses PostgreSQL for completed runs, Celery for task state.
|
|
Redis is only used for task_id mapping (ephemeral).
|
|
"""
|
|
|
|
def __init__(self, database: "Database", redis: "redis.Redis[bytes]", cache: "L1CacheManager") -> None:
|
|
self.db = database
|
|
self.redis = redis # Only for task_id mapping
|
|
self.cache = cache
|
|
self.task_key_prefix = "artdag:task:" # run_id -> task_id mapping only
|
|
self.cache_dir = Path(os.environ.get("CACHE_DIR", "/tmp/artdag-cache"))
|
|
|
|
def _ensure_inputs_list(self, inputs: Any) -> List[str]:
|
|
"""Ensure inputs is a list, parsing JSON string if needed."""
|
|
if inputs is None:
|
|
return []
|
|
if isinstance(inputs, list):
|
|
return inputs
|
|
if isinstance(inputs, str):
|
|
try:
|
|
parsed = json.loads(inputs)
|
|
if isinstance(parsed, list):
|
|
return parsed
|
|
except json.JSONDecodeError:
|
|
pass
|
|
return []
|
|
return []
|
|
|
|
async def get_run(self, run_id: str) -> Optional[RunResult]:
|
|
"""Get a run by ID. Checks database first, then Celery task state."""
|
|
# Check database for completed run
|
|
cached = await self.db.get_run_cache(run_id)
|
|
if cached:
|
|
output_cid = cached.get("output_cid")
|
|
# Only return as completed if we have an output
|
|
# (runs with no output should be re-executed)
|
|
if output_cid:
|
|
# Also fetch recipe content from pending_runs for streaming runs
|
|
recipe_sexp = None
|
|
recipe_name = None
|
|
pending = await self.db.get_pending_run(run_id)
|
|
if pending:
|
|
recipe_sexp = pending.get("dag_json")
|
|
|
|
# Extract recipe name from streaming recipe content
|
|
if recipe_sexp:
|
|
import re
|
|
name_match = re.search(r'\(stream\s+"([^"]+)"', recipe_sexp)
|
|
if name_match:
|
|
recipe_name = name_match.group(1)
|
|
|
|
return {
|
|
"run_id": run_id,
|
|
"status": "completed",
|
|
"recipe": cached.get("recipe"),
|
|
"recipe_name": recipe_name,
|
|
"inputs": self._ensure_inputs_list(cached.get("inputs")),
|
|
"output_cid": output_cid,
|
|
"ipfs_cid": cached.get("ipfs_cid"),
|
|
"ipfs_playlist_cid": cached.get("ipfs_playlist_cid") or (pending.get("ipfs_playlist_cid") if pending else None),
|
|
"provenance_cid": cached.get("provenance_cid"),
|
|
"plan_cid": cached.get("plan_cid"),
|
|
"actor_id": cached.get("actor_id"),
|
|
"created_at": cached.get("created_at"),
|
|
"completed_at": cached.get("created_at"),
|
|
"recipe_sexp": recipe_sexp,
|
|
}
|
|
|
|
# Check database for pending run
|
|
pending = await self.db.get_pending_run(run_id)
|
|
if pending:
|
|
task_id = pending.get("celery_task_id")
|
|
if task_id:
|
|
# Check actual Celery task state
|
|
from celery.result import AsyncResult
|
|
from celery_app import app as celery_app
|
|
|
|
result = AsyncResult(task_id, app=celery_app)
|
|
status = result.status.lower()
|
|
|
|
# Normalize status
|
|
status_map = {
|
|
"pending": "pending",
|
|
"started": "running",
|
|
"rendering": "running", # Custom status from streaming task
|
|
"success": "completed",
|
|
"failure": "failed",
|
|
"retry": "running",
|
|
"revoked": "failed",
|
|
}
|
|
normalized_status = status_map.get(status, status)
|
|
|
|
run_data = {
|
|
"run_id": run_id,
|
|
"status": normalized_status,
|
|
"celery_task_id": task_id,
|
|
"actor_id": pending.get("actor_id"),
|
|
"recipe": pending.get("recipe"),
|
|
"inputs": self._ensure_inputs_list(pending.get("inputs")),
|
|
"output_name": pending.get("output_name"),
|
|
"created_at": pending.get("created_at"),
|
|
"error": pending.get("error"),
|
|
"recipe_sexp": pending.get("dag_json"), # Recipe content for streaming runs
|
|
# Checkpoint fields for resumable renders
|
|
"checkpoint_frame": pending.get("checkpoint_frame"),
|
|
"checkpoint_t": pending.get("checkpoint_t"),
|
|
"total_frames": pending.get("total_frames"),
|
|
"resumable": pending.get("resumable", True),
|
|
# IPFS streaming info
|
|
"ipfs_playlist_cid": pending.get("ipfs_playlist_cid"),
|
|
"quality_playlists": pending.get("quality_playlists"),
|
|
}
|
|
|
|
# If task completed, get result
|
|
if result.ready():
|
|
if result.successful():
|
|
task_result = result.result
|
|
if isinstance(task_result, dict):
|
|
# Check task's own success flag and output_cid
|
|
task_success = task_result.get("success", True)
|
|
output_cid = task_result.get("output_cid")
|
|
if task_success and output_cid:
|
|
run_data["status"] = "completed"
|
|
run_data["output_cid"] = output_cid
|
|
else:
|
|
run_data["status"] = "failed"
|
|
run_data["error"] = task_result.get("error", "No output produced")
|
|
else:
|
|
run_data["status"] = "completed"
|
|
else:
|
|
run_data["status"] = "failed"
|
|
run_data["error"] = str(result.result)
|
|
|
|
return run_data
|
|
|
|
# No task_id but have pending record - return from DB
|
|
return {
|
|
"run_id": run_id,
|
|
"status": pending.get("status", "pending"),
|
|
"recipe": pending.get("recipe"),
|
|
"inputs": self._ensure_inputs_list(pending.get("inputs")),
|
|
"output_name": pending.get("output_name"),
|
|
"actor_id": pending.get("actor_id"),
|
|
"created_at": pending.get("created_at"),
|
|
"error": pending.get("error"),
|
|
"recipe_sexp": pending.get("dag_json"), # Recipe content for streaming runs
|
|
# Checkpoint fields for resumable renders
|
|
"checkpoint_frame": pending.get("checkpoint_frame"),
|
|
"checkpoint_t": pending.get("checkpoint_t"),
|
|
"total_frames": pending.get("total_frames"),
|
|
"resumable": pending.get("resumable", True),
|
|
# IPFS streaming info
|
|
"ipfs_playlist_cid": pending.get("ipfs_playlist_cid"),
|
|
"quality_playlists": pending.get("quality_playlists"),
|
|
}
|
|
|
|
# Fallback: Check Redis for backwards compatibility
|
|
task_data = self.redis.get(f"{self.task_key_prefix}{run_id}")
|
|
if task_data:
|
|
if isinstance(task_data, bytes):
|
|
task_data = task_data.decode()
|
|
|
|
# Parse task data (supports both old format string and new JSON format)
|
|
try:
|
|
parsed = json.loads(task_data)
|
|
task_id = parsed.get("task_id")
|
|
task_actor_id = parsed.get("actor_id")
|
|
task_recipe = parsed.get("recipe")
|
|
task_recipe_name = parsed.get("recipe_name")
|
|
task_inputs = parsed.get("inputs")
|
|
# Ensure inputs is a list (might be JSON string)
|
|
if isinstance(task_inputs, str):
|
|
try:
|
|
task_inputs = json.loads(task_inputs)
|
|
except json.JSONDecodeError:
|
|
task_inputs = None
|
|
task_output_name = parsed.get("output_name")
|
|
task_created_at = parsed.get("created_at")
|
|
except json.JSONDecodeError:
|
|
# Old format: just the task_id string
|
|
task_id = task_data
|
|
task_actor_id = None
|
|
task_recipe = None
|
|
task_recipe_name = None
|
|
task_inputs = None
|
|
task_output_name = None
|
|
task_created_at = None
|
|
|
|
# Get task state from Celery
|
|
from celery.result import AsyncResult
|
|
from celery_app import app as celery_app
|
|
|
|
result = AsyncResult(task_id, app=celery_app)
|
|
status = result.status.lower()
|
|
|
|
# Normalize Celery status names
|
|
status_map = {
|
|
"pending": "pending",
|
|
"started": "running",
|
|
"rendering": "running", # Custom status from streaming task
|
|
"success": "completed",
|
|
"failure": "failed",
|
|
"retry": "running",
|
|
"revoked": "failed",
|
|
}
|
|
normalized_status = status_map.get(status, status)
|
|
|
|
run_data = {
|
|
"run_id": run_id,
|
|
"status": normalized_status,
|
|
"celery_task_id": task_id,
|
|
"actor_id": task_actor_id,
|
|
"recipe": task_recipe,
|
|
"recipe_name": task_recipe_name,
|
|
"inputs": self._ensure_inputs_list(task_inputs),
|
|
"output_name": task_output_name,
|
|
"created_at": task_created_at,
|
|
}
|
|
|
|
# If task completed, get result
|
|
if result.ready():
|
|
if result.successful():
|
|
task_result = result.result
|
|
if isinstance(task_result, dict):
|
|
# Check task's own success flag and output_cid
|
|
task_success = task_result.get("success", True)
|
|
output_cid = task_result.get("output_cid")
|
|
if task_success and output_cid:
|
|
run_data["status"] = "completed"
|
|
run_data["output_cid"] = output_cid
|
|
else:
|
|
run_data["status"] = "failed"
|
|
run_data["error"] = task_result.get("error", "No output produced")
|
|
else:
|
|
run_data["status"] = "completed"
|
|
else:
|
|
run_data["status"] = "failed"
|
|
run_data["error"] = str(result.result)
|
|
|
|
return run_data
|
|
|
|
return None
|
|
|
|
async def list_runs(self, actor_id: str, offset: int = 0, limit: int = 20) -> List[RunResult]:
|
|
"""List runs for a user. Returns completed and pending runs from database."""
|
|
# Get completed runs from database
|
|
completed_runs = await self.db.list_runs_by_actor(actor_id, offset=0, limit=limit + 50)
|
|
|
|
# Get pending runs from database
|
|
pending_db = await self.db.list_pending_runs(actor_id=actor_id)
|
|
|
|
# Convert pending runs to run format with live status check
|
|
pending = []
|
|
for pr in pending_db:
|
|
run_id = pr.get("run_id")
|
|
# Skip if already in completed
|
|
if any(r.get("run_id") == run_id for r in completed_runs):
|
|
continue
|
|
|
|
# Get live status - include pending, running, rendering, and failed runs
|
|
run = await self.get_run(run_id)
|
|
if run and run.get("status") in ("pending", "running", "rendering", "failed"):
|
|
pending.append(run)
|
|
|
|
# Combine and sort
|
|
all_runs = pending + completed_runs
|
|
all_runs.sort(key=lambda r: r.get("created_at", ""), reverse=True)
|
|
|
|
return all_runs[offset:offset + limit]
|
|
|
|
async def create_run(
|
|
self,
|
|
recipe: str,
|
|
inputs: Union[List[str], Dict[str, str]],
|
|
output_name: Optional[str] = None,
|
|
use_dag: bool = True,
|
|
dag_json: Optional[str] = None,
|
|
actor_id: Optional[str] = None,
|
|
l2_server: Optional[str] = None,
|
|
recipe_name: Optional[str] = None,
|
|
recipe_sexp: Optional[str] = None,
|
|
) -> Tuple[Optional[RunResult], Optional[str]]:
|
|
"""
|
|
Create a new rendering run. Checks cache before executing.
|
|
|
|
If recipe_sexp is provided, uses the new S-expression execution path
|
|
which generates code-addressed cache IDs before execution.
|
|
|
|
Returns (run_dict, error_message).
|
|
"""
|
|
import httpx
|
|
try:
|
|
from legacy_tasks import render_effect, execute_dag, build_effect_dag, execute_recipe
|
|
except ImportError as e:
|
|
return None, f"Celery tasks not available: {e}"
|
|
|
|
# Handle both list and dict inputs
|
|
if isinstance(inputs, dict):
|
|
input_list = list(inputs.values())
|
|
else:
|
|
input_list = inputs
|
|
|
|
# Compute content-addressable run_id
|
|
run_id = compute_run_id(input_list, recipe)
|
|
|
|
# Generate output name if not provided
|
|
if not output_name:
|
|
output_name = f"{recipe}-{run_id[:8]}"
|
|
|
|
# Check database cache first (completed runs)
|
|
cached_run = await self.db.get_run_cache(run_id)
|
|
if cached_run:
|
|
output_cid = cached_run.get("output_cid")
|
|
if output_cid and self.cache.has_content(output_cid):
|
|
return {
|
|
"run_id": run_id,
|
|
"status": "completed",
|
|
"recipe": recipe,
|
|
"inputs": input_list,
|
|
"output_name": output_name,
|
|
"output_cid": output_cid,
|
|
"ipfs_cid": cached_run.get("ipfs_cid"),
|
|
"provenance_cid": cached_run.get("provenance_cid"),
|
|
"created_at": cached_run.get("created_at"),
|
|
"completed_at": cached_run.get("created_at"),
|
|
"actor_id": actor_id,
|
|
}, None
|
|
|
|
# Check L2 if not in local cache
|
|
if l2_server:
|
|
try:
|
|
async with httpx.AsyncClient(timeout=10) as client:
|
|
l2_resp = await client.get(f"{l2_server}/assets/by-run-id/{run_id}")
|
|
if l2_resp.status_code == 200:
|
|
l2_data = l2_resp.json()
|
|
output_cid = l2_data.get("output_cid")
|
|
ipfs_cid = l2_data.get("ipfs_cid")
|
|
if output_cid and ipfs_cid:
|
|
# Pull from IPFS to local cache
|
|
try:
|
|
import ipfs_client
|
|
legacy_dir = self.cache_dir / "legacy"
|
|
legacy_dir.mkdir(parents=True, exist_ok=True)
|
|
recovery_path = legacy_dir / output_cid
|
|
if ipfs_client.get_file(ipfs_cid, str(recovery_path)):
|
|
# Save to database cache
|
|
await self.db.save_run_cache(
|
|
run_id=run_id,
|
|
output_cid=output_cid,
|
|
recipe=recipe,
|
|
inputs=input_list,
|
|
ipfs_cid=ipfs_cid,
|
|
provenance_cid=l2_data.get("provenance_cid"),
|
|
actor_id=actor_id,
|
|
)
|
|
return {
|
|
"run_id": run_id,
|
|
"status": "completed",
|
|
"recipe": recipe,
|
|
"inputs": input_list,
|
|
"output_cid": output_cid,
|
|
"ipfs_cid": ipfs_cid,
|
|
"provenance_cid": l2_data.get("provenance_cid"),
|
|
"created_at": datetime.now(timezone.utc).isoformat(),
|
|
"actor_id": actor_id,
|
|
}, None
|
|
except Exception:
|
|
pass # IPFS recovery failed, continue to run
|
|
except Exception:
|
|
pass # L2 lookup failed, continue to run
|
|
|
|
# Not cached - submit to Celery
|
|
try:
|
|
# Prefer S-expression execution path (code-addressed cache IDs)
|
|
if recipe_sexp:
|
|
# Convert inputs to dict if needed
|
|
if isinstance(inputs, dict):
|
|
input_hashes = inputs
|
|
else:
|
|
# Legacy list format - use positional names
|
|
input_hashes = {f"input_{i}": cid for i, cid in enumerate(input_list)}
|
|
|
|
task = execute_recipe.delay(recipe_sexp, input_hashes, run_id)
|
|
elif use_dag or recipe == "dag":
|
|
if dag_json:
|
|
dag_data = dag_json
|
|
else:
|
|
dag = build_effect_dag(input_list, recipe)
|
|
dag_data = dag.to_json()
|
|
|
|
task = execute_dag.delay(dag_data, run_id)
|
|
else:
|
|
if len(input_list) != 1:
|
|
return None, "Legacy mode only supports single-input recipes. Use use_dag=true for multi-input."
|
|
task = render_effect.delay(input_list[0], recipe, output_name)
|
|
|
|
# Store pending run in database for durability
|
|
try:
|
|
await self.db.create_pending_run(
|
|
run_id=run_id,
|
|
celery_task_id=task.id,
|
|
recipe=recipe,
|
|
inputs=input_list,
|
|
actor_id=actor_id,
|
|
dag_json=dag_json,
|
|
output_name=output_name,
|
|
)
|
|
except Exception as e:
|
|
import logging
|
|
logging.getLogger(__name__).error(f"Failed to save pending run: {e}")
|
|
|
|
# Also store in Redis for backwards compatibility (shorter TTL)
|
|
task_data = json.dumps({
|
|
"task_id": task.id,
|
|
"actor_id": actor_id,
|
|
"recipe": recipe,
|
|
"recipe_name": recipe_name,
|
|
"inputs": input_list,
|
|
"output_name": output_name,
|
|
"created_at": datetime.now(timezone.utc).isoformat(),
|
|
})
|
|
self.redis.setex(
|
|
f"{self.task_key_prefix}{run_id}",
|
|
3600 * 4, # 4 hour TTL (database is primary now)
|
|
task_data
|
|
)
|
|
|
|
return {
|
|
"run_id": run_id,
|
|
"status": "running",
|
|
"recipe": recipe,
|
|
"recipe_name": recipe_name,
|
|
"inputs": input_list,
|
|
"output_name": output_name,
|
|
"celery_task_id": task.id,
|
|
"created_at": datetime.now(timezone.utc).isoformat(),
|
|
"actor_id": actor_id,
|
|
}, None
|
|
|
|
except Exception as e:
|
|
return None, f"Failed to submit task: {e}"
|
|
|
|
async def discard_run(
|
|
self,
|
|
run_id: str,
|
|
actor_id: str,
|
|
username: str,
|
|
) -> Tuple[bool, Optional[str]]:
|
|
"""
|
|
Discard (delete) a run record and clean up outputs/intermediates.
|
|
|
|
Outputs and intermediates are only deleted if not used by other runs.
|
|
"""
|
|
import logging
|
|
logger = logging.getLogger(__name__)
|
|
|
|
run = await self.get_run(run_id)
|
|
if not run:
|
|
return False, f"Run {run_id} not found"
|
|
|
|
# Check ownership
|
|
run_owner = run.get("actor_id")
|
|
if run_owner and run_owner not in (username, actor_id):
|
|
return False, "Access denied"
|
|
|
|
# Clean up activity outputs/intermediates (only if orphaned)
|
|
# The activity_id is the same as run_id
|
|
try:
|
|
success, msg = self.cache.discard_activity_outputs_only(run_id)
|
|
if success:
|
|
logger.info(f"Cleaned up run {run_id}: {msg}")
|
|
else:
|
|
# Activity might not exist (old runs), that's OK
|
|
logger.debug(f"No activity cleanup for {run_id}: {msg}")
|
|
except Exception as e:
|
|
logger.warning(f"Failed to cleanup activity for {run_id}: {e}")
|
|
|
|
# Remove task_id mapping from Redis
|
|
self.redis.delete(f"{self.task_key_prefix}{run_id}")
|
|
|
|
# Remove from run_cache database table
|
|
try:
|
|
await self.db.delete_run_cache(run_id)
|
|
except Exception as e:
|
|
logger.warning(f"Failed to delete run_cache for {run_id}: {e}")
|
|
|
|
# Remove pending run if exists
|
|
try:
|
|
await self.db.delete_pending_run(run_id)
|
|
except Exception:
|
|
pass
|
|
|
|
return True, None
|
|
|
|
def _dag_to_steps(self, dag: Dict[str, Any]) -> Dict[str, Any]:
|
|
"""Convert DAG nodes dict format to steps list format.
|
|
|
|
DAG format: {"nodes": {"id": {...}}, "output_id": "..."}
|
|
Steps format: {"steps": [{"id": "...", "type": "...", ...}], "output_id": "..."}
|
|
"""
|
|
if "steps" in dag:
|
|
# Already in steps format
|
|
return dag
|
|
|
|
if "nodes" not in dag:
|
|
return dag
|
|
|
|
nodes = dag.get("nodes", {})
|
|
steps = []
|
|
|
|
# Sort by topological order (sources first, then by input dependencies)
|
|
def get_level(node_id: str, visited: set = None) -> int:
|
|
if visited is None:
|
|
visited = set()
|
|
if node_id in visited:
|
|
return 0
|
|
visited.add(node_id)
|
|
node = nodes.get(node_id, {})
|
|
inputs = node.get("inputs", [])
|
|
if not inputs:
|
|
return 0
|
|
return 1 + max(get_level(inp, visited) for inp in inputs)
|
|
|
|
sorted_ids = sorted(nodes.keys(), key=lambda nid: (get_level(nid), nid))
|
|
|
|
for node_id in sorted_ids:
|
|
node = nodes[node_id]
|
|
steps.append({
|
|
"id": node_id,
|
|
"step_id": node_id,
|
|
"type": node.get("node_type", "EFFECT"),
|
|
"config": node.get("config", {}),
|
|
"inputs": node.get("inputs", []),
|
|
"name": node.get("name"),
|
|
"cache_id": node_id, # In code-addressed system, node_id IS the cache_id
|
|
})
|
|
|
|
return {
|
|
"steps": steps,
|
|
"output_id": dag.get("output_id"),
|
|
"metadata": dag.get("metadata", {}),
|
|
"format": "json",
|
|
}
|
|
|
|
def _sexp_to_steps(self, sexp_content: str) -> Dict[str, Any]:
|
|
"""Convert S-expression plan to steps list format for UI.
|
|
|
|
Parses the S-expression plan format:
|
|
(plan :id <id> :recipe <name> :recipe-hash <hash>
|
|
(inputs (input_name hash) ...)
|
|
(step step_id :cache-id <hash> :level <int> (node-type :key val ...))
|
|
...
|
|
:output <output_step_id>)
|
|
|
|
Returns steps list compatible with UI visualization.
|
|
"""
|
|
try:
|
|
from artdag.sexp import parse, Symbol, Keyword
|
|
except ImportError:
|
|
return {"sexp": sexp_content, "steps": [], "format": "sexp"}
|
|
|
|
try:
|
|
parsed = parse(sexp_content)
|
|
except Exception:
|
|
return {"sexp": sexp_content, "steps": [], "format": "sexp"}
|
|
|
|
if not isinstance(parsed, list) or not parsed:
|
|
return {"sexp": sexp_content, "steps": [], "format": "sexp"}
|
|
|
|
steps = []
|
|
output_step_id = None
|
|
plan_id = None
|
|
recipe_name = None
|
|
|
|
# Parse plan structure
|
|
i = 0
|
|
while i < len(parsed):
|
|
item = parsed[i]
|
|
|
|
if isinstance(item, Keyword):
|
|
key = item.name
|
|
if i + 1 < len(parsed):
|
|
value = parsed[i + 1]
|
|
if key == "id":
|
|
plan_id = value
|
|
elif key == "recipe":
|
|
recipe_name = value
|
|
elif key == "output":
|
|
output_step_id = value
|
|
i += 2
|
|
continue
|
|
|
|
if isinstance(item, list) and item:
|
|
first = item[0]
|
|
if isinstance(first, Symbol) and first.name == "step":
|
|
# Parse step: (step step_id :cache-id <hash> :level <int> (node-expr))
|
|
step_id = item[1] if len(item) > 1 else None
|
|
cache_id = None
|
|
level = 0
|
|
node_type = "EFFECT"
|
|
config = {}
|
|
inputs = []
|
|
|
|
j = 2
|
|
while j < len(item):
|
|
part = item[j]
|
|
if isinstance(part, Keyword):
|
|
key = part.name
|
|
if j + 1 < len(item):
|
|
val = item[j + 1]
|
|
if key == "cache-id":
|
|
cache_id = val
|
|
elif key == "level":
|
|
level = val
|
|
j += 2
|
|
continue
|
|
elif isinstance(part, list) and part:
|
|
# Node expression: (node-type :key val ...)
|
|
if isinstance(part[0], Symbol):
|
|
node_type = part[0].name.upper()
|
|
k = 1
|
|
while k < len(part):
|
|
if isinstance(part[k], Keyword):
|
|
kname = part[k].name
|
|
if k + 1 < len(part):
|
|
kval = part[k + 1]
|
|
if kname == "inputs":
|
|
inputs = kval if isinstance(kval, list) else [kval]
|
|
else:
|
|
config[kname] = kval
|
|
k += 2
|
|
continue
|
|
k += 1
|
|
j += 1
|
|
|
|
steps.append({
|
|
"id": step_id,
|
|
"step_id": step_id,
|
|
"type": node_type,
|
|
"config": config,
|
|
"inputs": inputs,
|
|
"cache_id": cache_id or step_id,
|
|
"level": level,
|
|
})
|
|
|
|
i += 1
|
|
|
|
return {
|
|
"sexp": sexp_content,
|
|
"steps": steps,
|
|
"output_id": output_step_id,
|
|
"plan_id": plan_id,
|
|
"recipe": recipe_name,
|
|
"format": "sexp",
|
|
}
|
|
|
|
async def get_run_plan(self, run_id: str) -> Optional[Dict[str, Any]]:
|
|
"""Get execution plan for a run.
|
|
|
|
Plans are just node outputs - cached by content hash like everything else.
|
|
For streaming runs, returns the recipe content as the plan.
|
|
"""
|
|
# Get run to find plan_cache_id
|
|
run = await self.get_run(run_id)
|
|
if not run:
|
|
return None
|
|
|
|
# For streaming runs, return the recipe as the plan
|
|
if run.get("recipe") == "streaming" and run.get("recipe_sexp"):
|
|
return {
|
|
"steps": [{"id": "stream", "type": "STREAM", "name": "Streaming Recipe"}],
|
|
"sexp": run.get("recipe_sexp"),
|
|
"format": "sexp",
|
|
}
|
|
|
|
# Check plan_cid (stored in database) or plan_cache_id (legacy)
|
|
plan_cid = run.get("plan_cid") or run.get("plan_cache_id")
|
|
if plan_cid:
|
|
# Get plan from cache by content hash
|
|
plan_path = self.cache.get_by_cid(plan_cid)
|
|
if plan_path and plan_path.exists():
|
|
with open(plan_path) as f:
|
|
content = f.read()
|
|
# Detect format
|
|
if content.strip().startswith("("):
|
|
# S-expression format - parse for UI
|
|
return self._sexp_to_steps(content)
|
|
else:
|
|
plan = json.loads(content)
|
|
return self._dag_to_steps(plan)
|
|
|
|
# Fall back to legacy plans directory
|
|
sexp_path = self.cache_dir / "plans" / f"{run_id}.sexp"
|
|
if sexp_path.exists():
|
|
with open(sexp_path) as f:
|
|
return self._sexp_to_steps(f.read())
|
|
|
|
json_path = self.cache_dir / "plans" / f"{run_id}.json"
|
|
if json_path.exists():
|
|
with open(json_path) as f:
|
|
plan = json.load(f)
|
|
return self._dag_to_steps(plan)
|
|
|
|
return None
|
|
|
|
async def get_run_plan_sexp(self, run_id: str) -> Optional[str]:
|
|
"""Get execution plan as S-expression string."""
|
|
plan = await self.get_run_plan(run_id)
|
|
if plan and plan.get("format") == "sexp":
|
|
return plan.get("sexp")
|
|
return None
|
|
|
|
async def get_run_artifacts(self, run_id: str) -> List[Dict[str, Any]]:
|
|
"""Get all artifacts (inputs + outputs) for a run."""
|
|
run = await self.get_run(run_id)
|
|
if not run:
|
|
return []
|
|
|
|
artifacts = []
|
|
|
|
def get_artifact_info(cid: str, role: str, name: str) -> Optional[Dict]:
|
|
if self.cache.has_content(cid):
|
|
path = self.cache.get_by_cid(cid)
|
|
if path and path.exists():
|
|
return {
|
|
"cid": cid,
|
|
"size_bytes": path.stat().st_size,
|
|
"media_type": detect_media_type(path),
|
|
"role": role,
|
|
"step_name": name,
|
|
}
|
|
return None
|
|
|
|
# Add inputs
|
|
inputs = run.get("inputs", [])
|
|
if isinstance(inputs, dict):
|
|
inputs = list(inputs.values())
|
|
for i, h in enumerate(inputs):
|
|
info = get_artifact_info(h, "input", f"Input {i + 1}")
|
|
if info:
|
|
artifacts.append(info)
|
|
|
|
# Add output
|
|
if run.get("output_cid"):
|
|
info = get_artifact_info(run["output_cid"], "output", "Output")
|
|
if info:
|
|
artifacts.append(info)
|
|
|
|
return artifacts
|
|
|
|
async def get_run_analysis(self, run_id: str) -> List[Dict[str, Any]]:
|
|
"""Get analysis data for each input in a run."""
|
|
run = await self.get_run(run_id)
|
|
if not run:
|
|
return []
|
|
|
|
analysis_dir = self.cache_dir / "analysis"
|
|
results = []
|
|
|
|
inputs = run.get("inputs", [])
|
|
if isinstance(inputs, dict):
|
|
inputs = list(inputs.values())
|
|
|
|
for i, input_hash in enumerate(inputs):
|
|
analysis_path = analysis_dir / f"{input_hash}.json"
|
|
analysis_data = None
|
|
|
|
if analysis_path.exists():
|
|
try:
|
|
with open(analysis_path) as f:
|
|
analysis_data = json.load(f)
|
|
except (json.JSONDecodeError, IOError):
|
|
pass
|
|
|
|
results.append({
|
|
"input_hash": input_hash,
|
|
"input_name": f"Input {i + 1}",
|
|
"has_analysis": analysis_data is not None,
|
|
"tempo": analysis_data.get("tempo") if analysis_data else None,
|
|
"beat_times": analysis_data.get("beat_times", []) if analysis_data else [],
|
|
"raw": analysis_data,
|
|
})
|
|
|
|
return results
|
|
|
|
def detect_media_type(self, path: Path) -> str:
|
|
"""Detect media type for a file path."""
|
|
return detect_media_type(path)
|
|
|
|
async def recover_pending_runs(self) -> Dict[str, Union[int, str]]:
|
|
"""
|
|
Recover pending runs after restart.
|
|
|
|
Checks all pending runs in the database and:
|
|
- Updates status for completed tasks
|
|
- Re-queues orphaned tasks that can be retried
|
|
- Marks as failed if unrecoverable
|
|
|
|
Returns counts of recovered, completed, failed runs.
|
|
"""
|
|
from celery.result import AsyncResult
|
|
from celery_app import app as celery_app
|
|
|
|
try:
|
|
from legacy_tasks import execute_dag
|
|
except ImportError:
|
|
return {"error": "Celery tasks not available"}
|
|
|
|
stats = {"recovered": 0, "completed": 0, "failed": 0, "still_running": 0}
|
|
|
|
# Get all pending/running runs from database
|
|
pending_runs = await self.db.list_pending_runs()
|
|
|
|
for run in pending_runs:
|
|
run_id = run.get("run_id")
|
|
task_id = run.get("celery_task_id")
|
|
status = run.get("status")
|
|
|
|
if not task_id:
|
|
# No task ID - try to re-queue if we have dag_json
|
|
dag_json = run.get("dag_json")
|
|
if dag_json:
|
|
try:
|
|
new_task = execute_dag.delay(dag_json, run_id)
|
|
await self.db.create_pending_run(
|
|
run_id=run_id,
|
|
celery_task_id=new_task.id,
|
|
recipe=run.get("recipe", "unknown"),
|
|
inputs=run.get("inputs", []),
|
|
actor_id=run.get("actor_id"),
|
|
dag_json=dag_json,
|
|
output_name=run.get("output_name"),
|
|
)
|
|
stats["recovered"] += 1
|
|
except Exception as e:
|
|
await self.db.update_pending_run_status(
|
|
run_id, "failed", f"Recovery failed: {e}"
|
|
)
|
|
stats["failed"] += 1
|
|
else:
|
|
await self.db.update_pending_run_status(
|
|
run_id, "failed", "No DAG data for recovery"
|
|
)
|
|
stats["failed"] += 1
|
|
continue
|
|
|
|
# Check Celery task state
|
|
result = AsyncResult(task_id, app=celery_app)
|
|
celery_status = result.status.lower()
|
|
|
|
if result.ready():
|
|
if result.successful():
|
|
# Task completed - move to run_cache
|
|
task_result = result.result
|
|
if isinstance(task_result, dict) and task_result.get("output_cid"):
|
|
await self.db.save_run_cache(
|
|
run_id=run_id,
|
|
output_cid=task_result["output_cid"],
|
|
recipe=run.get("recipe", "unknown"),
|
|
inputs=run.get("inputs", []),
|
|
ipfs_cid=task_result.get("ipfs_cid"),
|
|
provenance_cid=task_result.get("provenance_cid"),
|
|
actor_id=run.get("actor_id"),
|
|
)
|
|
await self.db.complete_pending_run(run_id)
|
|
stats["completed"] += 1
|
|
else:
|
|
await self.db.update_pending_run_status(
|
|
run_id, "failed", "Task completed but no output hash"
|
|
)
|
|
stats["failed"] += 1
|
|
else:
|
|
# Task failed
|
|
await self.db.update_pending_run_status(
|
|
run_id, "failed", str(result.result)
|
|
)
|
|
stats["failed"] += 1
|
|
elif celery_status in ("pending", "started", "retry"):
|
|
# Still running
|
|
stats["still_running"] += 1
|
|
else:
|
|
# Unknown state - try to re-queue if we have dag_json
|
|
dag_json = run.get("dag_json")
|
|
if dag_json:
|
|
try:
|
|
new_task = execute_dag.delay(dag_json, run_id)
|
|
await self.db.create_pending_run(
|
|
run_id=run_id,
|
|
celery_task_id=new_task.id,
|
|
recipe=run.get("recipe", "unknown"),
|
|
inputs=run.get("inputs", []),
|
|
actor_id=run.get("actor_id"),
|
|
dag_json=dag_json,
|
|
output_name=run.get("output_name"),
|
|
)
|
|
stats["recovered"] += 1
|
|
except Exception as e:
|
|
await self.db.update_pending_run_status(
|
|
run_id, "failed", f"Recovery failed: {e}"
|
|
)
|
|
stats["failed"] += 1
|
|
else:
|
|
await self.db.update_pending_run_status(
|
|
run_id, "failed", f"Task in unknown state: {celery_status}"
|
|
)
|
|
stats["failed"] += 1
|
|
|
|
return stats
|