Files
celery/app/services/run_service.py
gilesb 92d26b2b72 Rename content_hash/output_hash to cid throughout
Refactor to use IPFS CID as the primary content identifier:
- Update database schema: content_hash -> cid, output_hash -> output_cid
- Update all services, routers, and tasks to use cid terminology
- Update HTML templates to display CID instead of hash
- Update cache_manager parameter names
- Update README documentation

This completes the transition to CID-only content addressing.

Co-Authored-By: Claude Opus 4.5 <noreply@anthropic.com>
2026-01-12 08:02:44 +00:00

726 lines
28 KiB
Python

"""
Run Service - business logic for run management.
Runs are content-addressed (run_id computed from inputs + recipe).
Completed runs are stored in PostgreSQL, not Redis.
In-progress runs are tracked via Celery task state.
"""
import hashlib
import json
import os
from datetime import datetime, timezone
from pathlib import Path
from typing import Optional, List, Dict, Any, Tuple
def compute_run_id(input_hashes: list, recipe: str, recipe_hash: str = None) -> str:
"""
Compute a deterministic run_id from inputs and recipe.
The run_id is a SHA3-256 hash of:
- Sorted input content hashes
- Recipe identifier (recipe_hash if provided, else "effect:{recipe}")
This makes runs content-addressable: same inputs + recipe = same run_id.
"""
# Handle both list and dict inputs
if isinstance(input_hashes, dict):
sorted_inputs = sorted(input_hashes.values())
else:
sorted_inputs = sorted(input_hashes)
data = {
"inputs": sorted_inputs,
"recipe": recipe_hash or f"effect:{recipe}",
"version": "1",
}
json_str = json.dumps(data, sort_keys=True, separators=(",", ":"))
return hashlib.sha3_256(json_str.encode()).hexdigest()
def detect_media_type(cache_path: Path) -> str:
"""Detect if file is image, video, or audio based on magic bytes."""
try:
with open(cache_path, "rb") as f:
header = f.read(32)
except Exception:
return "unknown"
# Video signatures
if header[:4] == b'\x1a\x45\xdf\xa3': # WebM/MKV
return "video"
if len(header) > 8 and header[4:8] == b'ftyp': # MP4/MOV/M4A
# Check for audio-only M4A
if len(header) > 11 and header[8:12] in (b'M4A ', b'm4a '):
return "audio"
return "video"
if header[:4] == b'RIFF' and len(header) > 12 and header[8:12] == b'AVI ': # AVI
return "video"
# Image signatures
if header[:8] == b'\x89PNG\r\n\x1a\n': # PNG
return "image"
if header[:2] == b'\xff\xd8': # JPEG
return "image"
if header[:6] in (b'GIF87a', b'GIF89a'): # GIF
return "image"
if header[:4] == b'RIFF' and len(header) > 12 and header[8:12] == b'WEBP': # WebP
return "image"
# Audio signatures
if header[:3] == b'ID3' or header[:2] == b'\xff\xfb': # MP3
return "audio"
if header[:4] == b'fLaC': # FLAC
return "audio"
if header[:4] == b'OggS': # Ogg (could be audio or video, assume audio)
return "audio"
if header[:4] == b'RIFF' and len(header) > 12 and header[8:12] == b'WAVE': # WAV
return "audio"
return "unknown"
class RunService:
"""
Service for managing recipe runs.
Uses PostgreSQL for completed runs, Celery for task state.
Redis is only used for task_id mapping (ephemeral).
"""
def __init__(self, database, redis, cache):
self.db = database
self.redis = redis # Only for task_id mapping
self.cache = cache
self.task_key_prefix = "artdag:task:" # run_id -> task_id mapping only
self.cache_dir = Path(os.environ.get("CACHE_DIR", "/tmp/artdag-cache"))
def _ensure_inputs_list(self, inputs) -> list:
"""Ensure inputs is a list, parsing JSON string if needed."""
if inputs is None:
return []
if isinstance(inputs, list):
return inputs
if isinstance(inputs, str):
try:
parsed = json.loads(inputs)
if isinstance(parsed, list):
return parsed
except json.JSONDecodeError:
pass
return []
return []
async def get_run(self, run_id: str) -> Optional[Dict[str, Any]]:
"""Get a run by ID. Checks database first, then Celery task state."""
# Check database for completed run
cached = await self.db.get_run_cache(run_id)
if cached:
return {
"run_id": run_id,
"status": "completed",
"recipe": cached.get("recipe"),
"inputs": self._ensure_inputs_list(cached.get("inputs")),
"output_cid": cached.get("output_cid"),
"ipfs_cid": cached.get("ipfs_cid"),
"provenance_cid": cached.get("provenance_cid"),
"actor_id": cached.get("actor_id"),
"created_at": cached.get("created_at"),
"completed_at": cached.get("created_at"),
}
# Check database for pending run
pending = await self.db.get_pending_run(run_id)
if pending:
task_id = pending.get("celery_task_id")
if task_id:
# Check actual Celery task state
from celery.result import AsyncResult
from celery_app import app as celery_app
result = AsyncResult(task_id, app=celery_app)
status = result.status.lower()
# Normalize status
status_map = {
"pending": "pending",
"started": "running",
"success": "completed",
"failure": "failed",
"retry": "running",
"revoked": "failed",
}
normalized_status = status_map.get(status, status)
run_data = {
"run_id": run_id,
"status": normalized_status,
"celery_task_id": task_id,
"actor_id": pending.get("actor_id"),
"recipe": pending.get("recipe"),
"inputs": self._ensure_inputs_list(pending.get("inputs")),
"output_name": pending.get("output_name"),
"created_at": pending.get("created_at"),
"error": pending.get("error"),
}
# If task completed, get result
if result.ready():
if result.successful():
run_data["status"] = "completed"
task_result = result.result
if isinstance(task_result, dict):
run_data["output_cid"] = task_result.get("output_cid")
else:
run_data["status"] = "failed"
run_data["error"] = str(result.result)
return run_data
# No task_id but have pending record - return from DB
return {
"run_id": run_id,
"status": pending.get("status", "pending"),
"recipe": pending.get("recipe"),
"inputs": self._ensure_inputs_list(pending.get("inputs")),
"output_name": pending.get("output_name"),
"actor_id": pending.get("actor_id"),
"created_at": pending.get("created_at"),
"error": pending.get("error"),
}
# Fallback: Check Redis for backwards compatibility
task_data = self.redis.get(f"{self.task_key_prefix}{run_id}")
if task_data:
if isinstance(task_data, bytes):
task_data = task_data.decode()
# Parse task data (supports both old format string and new JSON format)
try:
parsed = json.loads(task_data)
task_id = parsed.get("task_id")
task_actor_id = parsed.get("actor_id")
task_recipe = parsed.get("recipe")
task_recipe_name = parsed.get("recipe_name")
task_inputs = parsed.get("inputs")
# Ensure inputs is a list (might be JSON string)
if isinstance(task_inputs, str):
try:
task_inputs = json.loads(task_inputs)
except json.JSONDecodeError:
task_inputs = None
task_output_name = parsed.get("output_name")
task_created_at = parsed.get("created_at")
except json.JSONDecodeError:
# Old format: just the task_id string
task_id = task_data
task_actor_id = None
task_recipe = None
task_recipe_name = None
task_inputs = None
task_output_name = None
task_created_at = None
# Get task state from Celery
from celery.result import AsyncResult
from celery_app import app as celery_app
result = AsyncResult(task_id, app=celery_app)
status = result.status.lower()
# Normalize Celery status names
status_map = {
"pending": "pending",
"started": "running",
"success": "completed",
"failure": "failed",
"retry": "running",
"revoked": "failed",
}
normalized_status = status_map.get(status, status)
run_data = {
"run_id": run_id,
"status": normalized_status,
"celery_task_id": task_id,
"actor_id": task_actor_id,
"recipe": task_recipe,
"recipe_name": task_recipe_name,
"inputs": self._ensure_inputs_list(task_inputs),
"output_name": task_output_name,
"created_at": task_created_at,
}
# If task completed, get result
if result.ready():
if result.successful():
run_data["status"] = "completed"
task_result = result.result
if isinstance(task_result, dict):
run_data["output_cid"] = task_result.get("output_cid")
else:
run_data["status"] = "failed"
run_data["error"] = str(result.result)
return run_data
return None
async def list_runs(self, actor_id: str, offset: int = 0, limit: int = 20) -> list:
"""List runs for a user. Returns completed and pending runs from database."""
# Get completed runs from database
completed_runs = await self.db.list_runs_by_actor(actor_id, offset=0, limit=limit + 50)
# Get pending runs from database
pending_db = await self.db.list_pending_runs(actor_id=actor_id)
# Convert pending runs to run format with live status check
pending = []
for pr in pending_db:
run_id = pr.get("run_id")
# Skip if already in completed
if any(r.get("run_id") == run_id for r in completed_runs):
continue
# Get live status
run = await self.get_run(run_id)
if run and run.get("status") in ("pending", "running"):
pending.append(run)
# Combine and sort
all_runs = pending + completed_runs
all_runs.sort(key=lambda r: r.get("created_at", ""), reverse=True)
return all_runs[offset:offset + limit]
async def create_run(
self,
recipe: str,
inputs: list,
output_name: str = None,
use_dag: bool = True,
dag_json: str = None,
actor_id: str = None,
l2_server: str = None,
recipe_name: str = None,
) -> Tuple[Optional[Dict[str, Any]], Optional[str]]:
"""
Create a new rendering run. Checks cache before executing.
Returns (run_dict, error_message).
"""
import httpx
try:
from legacy_tasks import render_effect, execute_dag, build_effect_dag
except ImportError as e:
return None, f"Celery tasks not available: {e}"
# Handle both list and dict inputs
if isinstance(inputs, dict):
input_list = list(inputs.values())
else:
input_list = inputs
# Compute content-addressable run_id
run_id = compute_run_id(input_list, recipe)
# Generate output name if not provided
if not output_name:
output_name = f"{recipe}-{run_id[:8]}"
# Check database cache first (completed runs)
cached_run = await self.db.get_run_cache(run_id)
if cached_run:
output_cid = cached_run.get("output_cid")
if output_cid and self.cache.has_content(output_cid):
return {
"run_id": run_id,
"status": "completed",
"recipe": recipe,
"inputs": input_list,
"output_name": output_name,
"output_cid": output_cid,
"ipfs_cid": cached_run.get("ipfs_cid"),
"provenance_cid": cached_run.get("provenance_cid"),
"created_at": cached_run.get("created_at"),
"completed_at": cached_run.get("created_at"),
"actor_id": actor_id,
}, None
# Check L2 if not in local cache
if l2_server:
try:
async with httpx.AsyncClient(timeout=10) as client:
l2_resp = await client.get(f"{l2_server}/assets/by-run-id/{run_id}")
if l2_resp.status_code == 200:
l2_data = l2_resp.json()
output_cid = l2_data.get("output_cid")
ipfs_cid = l2_data.get("ipfs_cid")
if output_cid and ipfs_cid:
# Pull from IPFS to local cache
try:
import ipfs_client
legacy_dir = self.cache_dir / "legacy"
legacy_dir.mkdir(parents=True, exist_ok=True)
recovery_path = legacy_dir / output_cid
if ipfs_client.get_file(ipfs_cid, str(recovery_path)):
# Save to database cache
await self.db.save_run_cache(
run_id=run_id,
output_cid=output_cid,
recipe=recipe,
inputs=input_list,
ipfs_cid=ipfs_cid,
provenance_cid=l2_data.get("provenance_cid"),
actor_id=actor_id,
)
return {
"run_id": run_id,
"status": "completed",
"recipe": recipe,
"inputs": input_list,
"output_cid": output_cid,
"ipfs_cid": ipfs_cid,
"provenance_cid": l2_data.get("provenance_cid"),
"created_at": datetime.now(timezone.utc).isoformat(),
"actor_id": actor_id,
}, None
except Exception:
pass # IPFS recovery failed, continue to run
except Exception:
pass # L2 lookup failed, continue to run
# Not cached - submit to Celery
try:
if use_dag or recipe == "dag":
if dag_json:
dag_data = dag_json
else:
dag = build_effect_dag(input_list, recipe)
dag_data = dag.to_json()
task = execute_dag.delay(dag_data, run_id)
else:
if len(input_list) != 1:
return None, "Legacy mode only supports single-input recipes. Use use_dag=true for multi-input."
task = render_effect.delay(input_list[0], recipe, output_name)
# Store pending run in database for durability
try:
await self.db.create_pending_run(
run_id=run_id,
celery_task_id=task.id,
recipe=recipe,
inputs=input_list,
actor_id=actor_id,
dag_json=dag_json,
output_name=output_name,
)
except Exception as e:
import logging
logging.getLogger(__name__).error(f"Failed to save pending run: {e}")
# Also store in Redis for backwards compatibility (shorter TTL)
task_data = json.dumps({
"task_id": task.id,
"actor_id": actor_id,
"recipe": recipe,
"recipe_name": recipe_name,
"inputs": input_list,
"output_name": output_name,
"created_at": datetime.now(timezone.utc).isoformat(),
})
self.redis.setex(
f"{self.task_key_prefix}{run_id}",
3600 * 4, # 4 hour TTL (database is primary now)
task_data
)
return {
"run_id": run_id,
"status": "running",
"recipe": recipe,
"recipe_name": recipe_name,
"inputs": input_list,
"output_name": output_name,
"celery_task_id": task.id,
"created_at": datetime.now(timezone.utc).isoformat(),
"actor_id": actor_id,
}, None
except Exception as e:
return None, f"Failed to submit task: {e}"
async def discard_run(
self,
run_id: str,
actor_id: str,
username: str,
) -> Tuple[bool, Optional[str]]:
"""
Discard (delete) a run record.
Note: This removes the run record but not the output content.
"""
run = await self.get_run(run_id)
if not run:
return False, f"Run {run_id} not found"
# Check ownership
run_owner = run.get("actor_id")
if run_owner and run_owner not in (username, actor_id):
return False, "Access denied"
# Remove task_id mapping from Redis
self.redis.delete(f"{self.task_key_prefix}{run_id}")
# Note: We don't delete from run_cache as that's a permanent record
# of completed work. The content itself remains in cache.
return True, None
async def get_run_plan(self, run_id: str) -> Optional[Dict[str, Any]]:
"""Get execution plan for a run.
Plans are just node outputs - cached by content hash like everything else.
"""
# Get run to find plan_cache_id
run = await self.get_run(run_id)
if not run:
return None
plan_cache_id = run.get("plan_cache_id")
if plan_cache_id:
# Get plan from cache by content hash
plan_path = self.cache.get_by_cid(plan_cache_id)
if plan_path and plan_path.exists():
with open(plan_path) as f:
content = f.read()
# Detect format
if content.strip().startswith("("):
return {"sexp": content, "format": "sexp"}
else:
plan = json.loads(content)
plan["format"] = "json"
return plan
# Fall back to legacy plans directory
sexp_path = self.cache_dir / "plans" / f"{run_id}.sexp"
if sexp_path.exists():
with open(sexp_path) as f:
return {"sexp": f.read(), "format": "sexp"}
json_path = self.cache_dir / "plans" / f"{run_id}.json"
if json_path.exists():
with open(json_path) as f:
plan = json.load(f)
plan["format"] = "json"
return plan
return None
async def get_run_plan_sexp(self, run_id: str) -> Optional[str]:
"""Get execution plan as S-expression string."""
plan = await self.get_run_plan(run_id)
if plan and plan.get("format") == "sexp":
return plan.get("sexp")
return None
async def get_run_artifacts(self, run_id: str) -> List[Dict[str, Any]]:
"""Get all artifacts (inputs + outputs) for a run."""
run = await self.get_run(run_id)
if not run:
return []
artifacts = []
def get_artifact_info(cid: str, role: str, name: str) -> Optional[Dict]:
if self.cache.has_content(cid):
path = self.cache.get_by_cid(cid)
if path and path.exists():
return {
"hash": cid,
"size_bytes": path.stat().st_size,
"media_type": detect_media_type(path),
"role": role,
"step_name": name,
}
return None
# Add inputs
inputs = run.get("inputs", [])
if isinstance(inputs, dict):
inputs = list(inputs.values())
for i, h in enumerate(inputs):
info = get_artifact_info(h, "input", f"Input {i + 1}")
if info:
artifacts.append(info)
# Add output
if run.get("output_cid"):
info = get_artifact_info(run["output_cid"], "output", "Output")
if info:
artifacts.append(info)
return artifacts
async def get_run_analysis(self, run_id: str) -> List[Dict[str, Any]]:
"""Get analysis data for each input in a run."""
run = await self.get_run(run_id)
if not run:
return []
analysis_dir = self.cache_dir / "analysis"
results = []
inputs = run.get("inputs", [])
if isinstance(inputs, dict):
inputs = list(inputs.values())
for i, input_hash in enumerate(inputs):
analysis_path = analysis_dir / f"{input_hash}.json"
analysis_data = None
if analysis_path.exists():
try:
with open(analysis_path) as f:
analysis_data = json.load(f)
except (json.JSONDecodeError, IOError):
pass
results.append({
"input_hash": input_hash,
"input_name": f"Input {i + 1}",
"has_analysis": analysis_data is not None,
"tempo": analysis_data.get("tempo") if analysis_data else None,
"beat_times": analysis_data.get("beat_times", []) if analysis_data else [],
"raw": analysis_data,
})
return results
def detect_media_type(self, path: Path) -> str:
"""Detect media type for a file path."""
return detect_media_type(path)
async def recover_pending_runs(self) -> Dict[str, int]:
"""
Recover pending runs after restart.
Checks all pending runs in the database and:
- Updates status for completed tasks
- Re-queues orphaned tasks that can be retried
- Marks as failed if unrecoverable
Returns counts of recovered, completed, failed runs.
"""
from celery.result import AsyncResult
from celery_app import app as celery_app
try:
from legacy_tasks import execute_dag
except ImportError:
return {"error": "Celery tasks not available"}
stats = {"recovered": 0, "completed": 0, "failed": 0, "still_running": 0}
# Get all pending/running runs from database
pending_runs = await self.db.list_pending_runs()
for run in pending_runs:
run_id = run.get("run_id")
task_id = run.get("celery_task_id")
status = run.get("status")
if not task_id:
# No task ID - try to re-queue if we have dag_json
dag_json = run.get("dag_json")
if dag_json:
try:
new_task = execute_dag.delay(dag_json, run_id)
await self.db.create_pending_run(
run_id=run_id,
celery_task_id=new_task.id,
recipe=run.get("recipe", "unknown"),
inputs=run.get("inputs", []),
actor_id=run.get("actor_id"),
dag_json=dag_json,
output_name=run.get("output_name"),
)
stats["recovered"] += 1
except Exception as e:
await self.db.update_pending_run_status(
run_id, "failed", f"Recovery failed: {e}"
)
stats["failed"] += 1
else:
await self.db.update_pending_run_status(
run_id, "failed", "No DAG data for recovery"
)
stats["failed"] += 1
continue
# Check Celery task state
result = AsyncResult(task_id, app=celery_app)
celery_status = result.status.lower()
if result.ready():
if result.successful():
# Task completed - move to run_cache
task_result = result.result
if isinstance(task_result, dict) and task_result.get("output_cid"):
await self.db.save_run_cache(
run_id=run_id,
output_cid=task_result["output_cid"],
recipe=run.get("recipe", "unknown"),
inputs=run.get("inputs", []),
ipfs_cid=task_result.get("ipfs_cid"),
provenance_cid=task_result.get("provenance_cid"),
actor_id=run.get("actor_id"),
)
await self.db.complete_pending_run(run_id)
stats["completed"] += 1
else:
await self.db.update_pending_run_status(
run_id, "failed", "Task completed but no output hash"
)
stats["failed"] += 1
else:
# Task failed
await self.db.update_pending_run_status(
run_id, "failed", str(result.result)
)
stats["failed"] += 1
elif celery_status in ("pending", "started", "retry"):
# Still running
stats["still_running"] += 1
else:
# Unknown state - try to re-queue if we have dag_json
dag_json = run.get("dag_json")
if dag_json:
try:
new_task = execute_dag.delay(dag_json, run_id)
await self.db.create_pending_run(
run_id=run_id,
celery_task_id=new_task.id,
recipe=run.get("recipe", "unknown"),
inputs=run.get("inputs", []),
actor_id=run.get("actor_id"),
dag_json=dag_json,
output_name=run.get("output_name"),
)
stats["recovered"] += 1
except Exception as e:
await self.db.update_pending_run_status(
run_id, "failed", f"Recovery failed: {e}"
)
stats["failed"] += 1
else:
await self.db.update_pending_run_status(
run_id, "failed", f"Task in unknown state: {celery_status}"
)
stats["failed"] += 1
return stats