Add get_run_analysis() to RunService to load per-input analysis from
CACHE_DIR/analysis/{hash}.json files. Update runs router and template
to display tempo, beats, energy, and beat timeline visualization.
🤖 Generated with [Claude Code](https://claude.com/claude-code)
Co-Authored-By: Claude Opus 4.5 <noreply@anthropic.com>
268 lines
9.2 KiB
Python
268 lines
9.2 KiB
Python
"""
|
|
Run Service - business logic for run management.
|
|
"""
|
|
|
|
from typing import Optional, List, Dict, Any
|
|
import json
|
|
|
|
|
|
class RunService:
|
|
"""
|
|
Service for managing recipe runs.
|
|
|
|
Handles run lifecycle, plan loading, and result aggregation.
|
|
"""
|
|
|
|
def __init__(self, database, redis, cache):
|
|
self.db = database
|
|
self.redis = redis
|
|
self.cache = cache
|
|
self.run_prefix = "artdag:run:"
|
|
|
|
async def get_run(self, run_id: str) -> Optional[Dict[str, Any]]:
|
|
"""Get a run by ID."""
|
|
data = self.redis.get(f"{self.run_prefix}{run_id}")
|
|
if not data:
|
|
return None
|
|
return json.loads(data)
|
|
|
|
async def list_runs(self, actor_id: str, page: int = 1, limit: int = 20) -> Dict[str, Any]:
|
|
"""List runs for a user with pagination."""
|
|
# Get all runs and filter by actor
|
|
# TODO: Use Redis index for efficient filtering
|
|
all_runs = []
|
|
cursor = 0
|
|
|
|
while True:
|
|
cursor, keys = self.redis.scan(
|
|
cursor=cursor,
|
|
match=f"{self.run_prefix}*",
|
|
count=100
|
|
)
|
|
for key in keys:
|
|
data = self.redis.get(key)
|
|
if data:
|
|
run = json.loads(data)
|
|
if run.get("actor_id") == actor_id or run.get("username") == actor_id:
|
|
all_runs.append(run)
|
|
if cursor == 0:
|
|
break
|
|
|
|
# Sort by created_at descending
|
|
all_runs.sort(key=lambda r: r.get("created_at", ""), reverse=True)
|
|
|
|
# Paginate
|
|
total = len(all_runs)
|
|
start = (page - 1) * limit
|
|
end = start + limit
|
|
runs = all_runs[start:end]
|
|
|
|
return {
|
|
"runs": runs,
|
|
"pagination": {
|
|
"page": page,
|
|
"limit": limit,
|
|
"total": total,
|
|
"has_more": end < total,
|
|
}
|
|
}
|
|
|
|
async def create_run(
|
|
self,
|
|
run_id: str,
|
|
recipe_id: str,
|
|
inputs: Dict[str, str],
|
|
actor_id: str,
|
|
) -> Dict[str, Any]:
|
|
"""Create a new run."""
|
|
from datetime import datetime
|
|
|
|
run = {
|
|
"run_id": run_id,
|
|
"recipe": f"recipe:{recipe_id}",
|
|
"inputs": inputs,
|
|
"actor_id": actor_id,
|
|
"status": "pending",
|
|
"created_at": datetime.utcnow().isoformat(),
|
|
}
|
|
|
|
self.redis.set(f"{self.run_prefix}{run_id}", json.dumps(run))
|
|
return run
|
|
|
|
async def update_run(self, run_id: str, updates: Dict[str, Any]) -> Optional[Dict[str, Any]]:
|
|
"""Update a run's fields."""
|
|
run = await self.get_run(run_id)
|
|
if not run:
|
|
return None
|
|
|
|
run.update(updates)
|
|
self.redis.set(f"{self.run_prefix}{run_id}", json.dumps(run))
|
|
return run
|
|
|
|
async def delete_run(self, run_id: str) -> bool:
|
|
"""Delete a run."""
|
|
return self.redis.delete(f"{self.run_prefix}{run_id}") > 0
|
|
|
|
async def load_plan(self, run_id: str) -> Optional[Dict[str, Any]]:
|
|
"""Load execution plan for a run."""
|
|
from pathlib import Path
|
|
import os
|
|
|
|
# Try plan cache directory
|
|
cache_dir = Path(os.environ.get("CACHE_DIR", "/tmp/artdag-cache"))
|
|
plan_path = cache_dir / "plans" / f"{run_id}.json"
|
|
if plan_path.exists():
|
|
with open(plan_path) as f:
|
|
return json.load(f)
|
|
|
|
# Also check for plan_id in run data
|
|
run = await self.get_run(run_id)
|
|
if run and run.get("plan_id"):
|
|
plan_path = cache_dir / "plans" / f"{run['plan_id']}.json"
|
|
if plan_path.exists():
|
|
with open(plan_path) as f:
|
|
return json.load(f)
|
|
|
|
return None
|
|
|
|
async def get_run_plan(self, run_id: str) -> Optional[Dict[str, Any]]:
|
|
"""Get execution plan with step results merged in."""
|
|
run = await self.get_run(run_id)
|
|
if not run:
|
|
return None
|
|
|
|
plan = await self.load_plan(run_id)
|
|
|
|
# If no stored plan, try to reconstruct from run data
|
|
if not plan and run.get("step_results"):
|
|
plan = {
|
|
"plan_id": run.get("plan_id"),
|
|
"recipe": run.get("recipe"),
|
|
"steps": [],
|
|
}
|
|
|
|
if plan and run.get("step_results"):
|
|
# Merge step results into plan
|
|
step_results = run.get("step_results", {})
|
|
for step in plan.get("steps", []):
|
|
step_id = step.get("id") or step.get("name")
|
|
if step_id and step_id in step_results:
|
|
result = step_results[step_id]
|
|
step["cache_id"] = result.get("cache_id") or result.get("output_cache_id")
|
|
step["status"] = result.get("status", "completed")
|
|
step["cached"] = result.get("cached", False)
|
|
step["outputs"] = result.get("outputs", [])
|
|
|
|
return plan
|
|
|
|
async def get_run_artifacts(self, run_id: str) -> List[Dict[str, Any]]:
|
|
"""Get all artifacts (inputs + outputs) for a run."""
|
|
run = await self.get_run(run_id)
|
|
if not run:
|
|
return []
|
|
|
|
artifacts = []
|
|
|
|
def get_artifact_info(content_hash: str, role: str, step_name: str) -> Optional[Dict]:
|
|
"""Get artifact info using cache manager."""
|
|
if self.cache.has_content(content_hash):
|
|
path = self.cache.get_path(content_hash)
|
|
if path and path.exists():
|
|
# Detect media type
|
|
media_type = "file"
|
|
try:
|
|
with open(path, "rb") as f:
|
|
header = f.read(12)
|
|
if header[:4] == b'\x1a\x45\xdf\xa3' or header[4:8] == b'ftyp':
|
|
media_type = "video"
|
|
elif header[:8] == b'\x89PNG\r\n\x1a\n' or header[:2] == b'\xff\xd8':
|
|
media_type = "image"
|
|
elif header[:4] == b'RIFF' and header[8:12] == b'WAVE':
|
|
media_type = "audio"
|
|
except Exception:
|
|
pass
|
|
|
|
return {
|
|
"hash": content_hash,
|
|
"size_bytes": path.stat().st_size,
|
|
"media_type": media_type,
|
|
"role": role,
|
|
"step_name": step_name,
|
|
}
|
|
return None
|
|
|
|
# Add inputs
|
|
inputs = run.get("inputs", [])
|
|
if isinstance(inputs, dict):
|
|
inputs = list(inputs.values())
|
|
for i, content_hash in enumerate(inputs):
|
|
info = get_artifact_info(content_hash, "input", f"Input {i + 1}")
|
|
if info:
|
|
artifacts.append(info)
|
|
|
|
# Add step outputs from step_results
|
|
step_results = run.get("step_results", {})
|
|
for step_id, result in step_results.items():
|
|
cache_id = result.get("cache_id") or result.get("output_cache_id")
|
|
if cache_id:
|
|
info = get_artifact_info(cache_id, "step_output", step_id)
|
|
if info:
|
|
artifacts.append(info)
|
|
# Also add any additional outputs
|
|
for output in result.get("outputs", []):
|
|
if output and output != cache_id:
|
|
info = get_artifact_info(output, "step_output", step_id)
|
|
if info:
|
|
artifacts.append(info)
|
|
|
|
# Add final output
|
|
if run.get("output_hash"):
|
|
output_hash = run["output_hash"]
|
|
# Avoid duplicates
|
|
if not any(a["hash"] == output_hash for a in artifacts):
|
|
info = get_artifact_info(output_hash, "output", "Final Output")
|
|
if info:
|
|
artifacts.append(info)
|
|
|
|
return artifacts
|
|
|
|
async def get_run_analysis(self, run_id: str) -> List[Dict[str, Any]]:
|
|
"""Get analysis data for each input in a run."""
|
|
from pathlib import Path
|
|
import os
|
|
|
|
run = await self.get_run(run_id)
|
|
if not run:
|
|
return []
|
|
|
|
cache_dir = Path(os.environ.get("CACHE_DIR", "/tmp/artdag-cache"))
|
|
analysis_dir = cache_dir / "analysis"
|
|
|
|
results = []
|
|
inputs = run.get("inputs", [])
|
|
if isinstance(inputs, dict):
|
|
inputs = list(inputs.values())
|
|
|
|
for i, input_hash in enumerate(inputs):
|
|
analysis_path = analysis_dir / f"{input_hash}.json"
|
|
analysis_data = None
|
|
|
|
if analysis_path.exists():
|
|
try:
|
|
with open(analysis_path) as f:
|
|
analysis_data = json.load(f)
|
|
except (json.JSONDecodeError, IOError):
|
|
pass
|
|
|
|
results.append({
|
|
"input_hash": input_hash,
|
|
"input_name": f"Input {i + 1}",
|
|
"has_analysis": analysis_data is not None,
|
|
"tempo": analysis_data.get("tempo") if analysis_data else None,
|
|
"beat_times": analysis_data.get("beat_times", []) if analysis_data else [],
|
|
"energy": analysis_data.get("energy") if analysis_data else None,
|
|
"raw": analysis_data,
|
|
})
|
|
|
|
return results
|