Refactor storage: remove Redis duplication, use proper data tiers
- Recipes: Now content-addressed only (cache + IPFS), removed Redis storage - Runs: Completed runs stored in PostgreSQL, Redis only for task_id mapping - Add list_runs_by_actor() to database.py for paginated run queries - Add list_by_type() to cache_manager for filtering by node_type - Fix upload endpoint to return size and filename fields - Fix recipe run endpoint with proper DAG input binding - Fix get_run_service() dependency to pass database module Storage architecture: - Redis: Ephemeral only (sessions, task mappings with TTL) - PostgreSQL: Permanent records (completed runs, metadata) - Cache: Content-addressed files (recipes, media, outputs) Co-Authored-By: Claude Opus 4.5 <noreply@anthropic.com>
This commit is contained in:
@@ -1,147 +1,338 @@
|
||||
"""
|
||||
Run Service - business logic for run management.
|
||||
|
||||
Runs are content-addressed (run_id computed from inputs + recipe).
|
||||
Completed runs are stored in PostgreSQL, not Redis.
|
||||
In-progress runs are tracked via Celery task state.
|
||||
"""
|
||||
|
||||
from typing import Optional, List, Dict, Any
|
||||
import hashlib
|
||||
import json
|
||||
import os
|
||||
from datetime import datetime, timezone
|
||||
from pathlib import Path
|
||||
from typing import Optional, List, Dict, Any, Tuple
|
||||
|
||||
|
||||
def compute_run_id(input_hashes: list, recipe: str, recipe_hash: str = None) -> str:
|
||||
"""
|
||||
Compute a deterministic run_id from inputs and recipe.
|
||||
|
||||
The run_id is a SHA3-256 hash of:
|
||||
- Sorted input content hashes
|
||||
- Recipe identifier (recipe_hash if provided, else "effect:{recipe}")
|
||||
|
||||
This makes runs content-addressable: same inputs + recipe = same run_id.
|
||||
"""
|
||||
# Handle both list and dict inputs
|
||||
if isinstance(input_hashes, dict):
|
||||
sorted_inputs = sorted(input_hashes.values())
|
||||
else:
|
||||
sorted_inputs = sorted(input_hashes)
|
||||
|
||||
data = {
|
||||
"inputs": sorted_inputs,
|
||||
"recipe": recipe_hash or f"effect:{recipe}",
|
||||
"version": "1",
|
||||
}
|
||||
json_str = json.dumps(data, sort_keys=True, separators=(",", ":"))
|
||||
return hashlib.sha3_256(json_str.encode()).hexdigest()
|
||||
|
||||
|
||||
def detect_media_type(cache_path: Path) -> str:
|
||||
"""Detect if file is image, video, or audio based on magic bytes."""
|
||||
try:
|
||||
with open(cache_path, "rb") as f:
|
||||
header = f.read(32)
|
||||
except Exception:
|
||||
return "unknown"
|
||||
|
||||
# Video signatures
|
||||
if header[:4] == b'\x1a\x45\xdf\xa3': # WebM/MKV
|
||||
return "video"
|
||||
if len(header) > 8 and header[4:8] == b'ftyp': # MP4/MOV
|
||||
return "video"
|
||||
if header[:4] == b'RIFF' and len(header) > 12 and header[8:12] == b'AVI ': # AVI
|
||||
return "video"
|
||||
|
||||
# Image signatures
|
||||
if header[:8] == b'\x89PNG\r\n\x1a\n': # PNG
|
||||
return "image"
|
||||
if header[:2] == b'\xff\xd8': # JPEG
|
||||
return "image"
|
||||
if header[:6] in (b'GIF87a', b'GIF89a'): # GIF
|
||||
return "image"
|
||||
if header[:4] == b'RIFF' and len(header) > 12 and header[8:12] == b'WEBP': # WebP
|
||||
return "image"
|
||||
|
||||
return "unknown"
|
||||
|
||||
|
||||
class RunService:
|
||||
"""
|
||||
Service for managing recipe runs.
|
||||
|
||||
Handles run lifecycle, plan loading, and result aggregation.
|
||||
Uses PostgreSQL for completed runs, Celery for task state.
|
||||
Redis is only used for task_id mapping (ephemeral).
|
||||
"""
|
||||
|
||||
def __init__(self, database, redis, cache):
|
||||
self.db = database
|
||||
self.redis = redis
|
||||
self.redis = redis # Only for task_id mapping
|
||||
self.cache = cache
|
||||
self.run_prefix = "artdag:run:"
|
||||
self.task_key_prefix = "artdag:task:" # run_id -> task_id mapping only
|
||||
self.cache_dir = Path(os.environ.get("CACHE_DIR", "/tmp/artdag-cache"))
|
||||
|
||||
async def get_run(self, run_id: str) -> Optional[Dict[str, Any]]:
|
||||
"""Get a run by ID."""
|
||||
data = self.redis.get(f"{self.run_prefix}{run_id}")
|
||||
if not data:
|
||||
return None
|
||||
return json.loads(data)
|
||||
"""Get a run by ID. Checks database first, then Celery task state."""
|
||||
# Check database for completed run
|
||||
cached = await self.db.get_run_cache(run_id)
|
||||
if cached:
|
||||
return {
|
||||
"run_id": run_id,
|
||||
"status": "completed",
|
||||
"recipe": cached.get("recipe"),
|
||||
"inputs": cached.get("inputs", []),
|
||||
"output_hash": cached.get("output_hash"),
|
||||
"ipfs_cid": cached.get("ipfs_cid"),
|
||||
"provenance_cid": cached.get("provenance_cid"),
|
||||
"actor_id": cached.get("actor_id"),
|
||||
"created_at": cached.get("created_at"),
|
||||
"completed_at": cached.get("created_at"),
|
||||
}
|
||||
|
||||
# Check if there's a running task
|
||||
task_id = self.redis.get(f"{self.task_key_prefix}{run_id}")
|
||||
if task_id:
|
||||
if isinstance(task_id, bytes):
|
||||
task_id = task_id.decode()
|
||||
|
||||
# Get task state from Celery
|
||||
from celery.result import AsyncResult
|
||||
from celery_app import app as celery_app
|
||||
|
||||
result = AsyncResult(task_id, app=celery_app)
|
||||
status = result.status.lower()
|
||||
|
||||
run_data = {
|
||||
"run_id": run_id,
|
||||
"status": status if status != "pending" else "pending",
|
||||
"celery_task_id": task_id,
|
||||
}
|
||||
|
||||
# If task completed, get result
|
||||
if result.ready():
|
||||
if result.successful():
|
||||
run_data["status"] = "completed"
|
||||
task_result = result.result
|
||||
if isinstance(task_result, dict):
|
||||
run_data["output_hash"] = task_result.get("output_hash")
|
||||
else:
|
||||
run_data["status"] = "failed"
|
||||
run_data["error"] = str(result.result)
|
||||
|
||||
return run_data
|
||||
|
||||
return None
|
||||
|
||||
async def list_runs(self, actor_id: str, offset: int = 0, limit: int = 20) -> list:
|
||||
"""List runs for a user with pagination."""
|
||||
# Get all runs and filter by actor
|
||||
# TODO: Use Redis index for efficient filtering
|
||||
all_runs = []
|
||||
cursor = 0
|
||||
"""List runs for a user. Returns completed runs from database."""
|
||||
# Get completed runs from database
|
||||
runs = await self.db.list_runs_by_actor(actor_id, offset=offset, limit=limit)
|
||||
|
||||
# Also check for any pending tasks in Redis
|
||||
pending = []
|
||||
cursor = 0
|
||||
while True:
|
||||
cursor, keys = self.redis.scan(
|
||||
cursor=cursor,
|
||||
match=f"{self.run_prefix}*",
|
||||
match=f"{self.task_key_prefix}*",
|
||||
count=100
|
||||
)
|
||||
for key in keys:
|
||||
data = self.redis.get(key)
|
||||
if data:
|
||||
run = json.loads(data)
|
||||
if run.get("actor_id") == actor_id or run.get("username") == actor_id:
|
||||
all_runs.append(run)
|
||||
run_id = key.decode().replace(self.task_key_prefix, "") if isinstance(key, bytes) else key.replace(self.task_key_prefix, "")
|
||||
# Check if this run belongs to the user and isn't already in results
|
||||
if not any(r.get("run_id") == run_id for r in runs):
|
||||
run = await self.get_run(run_id)
|
||||
if run and run.get("status") in ("pending", "running"):
|
||||
pending.append(run)
|
||||
if cursor == 0:
|
||||
break
|
||||
|
||||
# Sort by created_at descending
|
||||
# Combine and sort
|
||||
all_runs = pending + runs
|
||||
all_runs.sort(key=lambda r: r.get("created_at", ""), reverse=True)
|
||||
|
||||
# Paginate
|
||||
return all_runs[offset:offset + limit]
|
||||
|
||||
async def create_run(
|
||||
self,
|
||||
recipe: str,
|
||||
inputs: list,
|
||||
output_name: str = None,
|
||||
use_dag: bool = True,
|
||||
dag_json: str = None,
|
||||
actor_id: str = None,
|
||||
l2_server: str = None,
|
||||
) -> Tuple[Optional[Dict[str, Any]], Optional[str]]:
|
||||
"""
|
||||
Create a new rendering run. Checks cache before executing.
|
||||
|
||||
Returns (run_dict, error_message).
|
||||
"""
|
||||
import httpx
|
||||
try:
|
||||
from legacy_tasks import render_effect, execute_dag, build_effect_dag
|
||||
except ImportError as e:
|
||||
return None, f"Celery tasks not available: {e}"
|
||||
|
||||
# Handle both list and dict inputs
|
||||
if isinstance(inputs, dict):
|
||||
input_list = list(inputs.values())
|
||||
else:
|
||||
input_list = inputs
|
||||
|
||||
# Compute content-addressable run_id
|
||||
run_id = compute_run_id(input_list, recipe)
|
||||
|
||||
# Generate output name if not provided
|
||||
if not output_name:
|
||||
output_name = f"{recipe}-{run_id[:8]}"
|
||||
|
||||
# Check database cache first (completed runs)
|
||||
cached_run = await self.db.get_run_cache(run_id)
|
||||
if cached_run:
|
||||
output_hash = cached_run.get("output_hash")
|
||||
if output_hash and self.cache.has_content(output_hash):
|
||||
return {
|
||||
"run_id": run_id,
|
||||
"status": "completed",
|
||||
"recipe": recipe,
|
||||
"inputs": input_list,
|
||||
"output_name": output_name,
|
||||
"output_hash": output_hash,
|
||||
"ipfs_cid": cached_run.get("ipfs_cid"),
|
||||
"provenance_cid": cached_run.get("provenance_cid"),
|
||||
"created_at": cached_run.get("created_at"),
|
||||
"completed_at": cached_run.get("created_at"),
|
||||
"actor_id": actor_id,
|
||||
}, None
|
||||
|
||||
# Check L2 if not in local cache
|
||||
if l2_server:
|
||||
try:
|
||||
async with httpx.AsyncClient(timeout=10) as client:
|
||||
l2_resp = await client.get(f"{l2_server}/assets/by-run-id/{run_id}")
|
||||
if l2_resp.status_code == 200:
|
||||
l2_data = l2_resp.json()
|
||||
output_hash = l2_data.get("output_hash")
|
||||
ipfs_cid = l2_data.get("ipfs_cid")
|
||||
if output_hash and ipfs_cid:
|
||||
# Pull from IPFS to local cache
|
||||
try:
|
||||
import ipfs_client
|
||||
legacy_dir = self.cache_dir / "legacy"
|
||||
legacy_dir.mkdir(parents=True, exist_ok=True)
|
||||
recovery_path = legacy_dir / output_hash
|
||||
if ipfs_client.get_file(ipfs_cid, str(recovery_path)):
|
||||
# Save to database cache
|
||||
await self.db.save_run_cache(
|
||||
run_id=run_id,
|
||||
output_hash=output_hash,
|
||||
recipe=recipe,
|
||||
inputs=input_list,
|
||||
ipfs_cid=ipfs_cid,
|
||||
provenance_cid=l2_data.get("provenance_cid"),
|
||||
actor_id=actor_id,
|
||||
)
|
||||
return {
|
||||
"run_id": run_id,
|
||||
"status": "completed",
|
||||
"recipe": recipe,
|
||||
"inputs": input_list,
|
||||
"output_hash": output_hash,
|
||||
"ipfs_cid": ipfs_cid,
|
||||
"provenance_cid": l2_data.get("provenance_cid"),
|
||||
"created_at": datetime.now(timezone.utc).isoformat(),
|
||||
"actor_id": actor_id,
|
||||
}, None
|
||||
except Exception:
|
||||
pass # IPFS recovery failed, continue to run
|
||||
except Exception:
|
||||
pass # L2 lookup failed, continue to run
|
||||
|
||||
# Not cached - submit to Celery
|
||||
try:
|
||||
if use_dag or recipe == "dag":
|
||||
if dag_json:
|
||||
dag_data = dag_json
|
||||
else:
|
||||
dag = build_effect_dag(input_list, recipe)
|
||||
dag_data = dag.to_json()
|
||||
|
||||
task = execute_dag.delay(dag_data, run_id)
|
||||
else:
|
||||
if len(input_list) != 1:
|
||||
return None, "Legacy mode only supports single-input recipes. Use use_dag=true for multi-input."
|
||||
task = render_effect.delay(input_list[0], recipe, output_name)
|
||||
|
||||
# Store task_id mapping in Redis (ephemeral)
|
||||
self.redis.setex(
|
||||
f"{self.task_key_prefix}{run_id}",
|
||||
3600 * 24, # 24 hour TTL
|
||||
task.id
|
||||
)
|
||||
|
||||
return {
|
||||
"run_id": run_id,
|
||||
"status": "running",
|
||||
"recipe": recipe,
|
||||
"inputs": input_list,
|
||||
"output_name": output_name,
|
||||
"celery_task_id": task.id,
|
||||
"created_at": datetime.now(timezone.utc).isoformat(),
|
||||
"actor_id": actor_id,
|
||||
}, None
|
||||
|
||||
except Exception as e:
|
||||
return None, f"Failed to submit task: {e}"
|
||||
|
||||
async def discard_run(
|
||||
self,
|
||||
run_id: str,
|
||||
recipe_id: str,
|
||||
inputs: Dict[str, str],
|
||||
actor_id: str,
|
||||
) -> Dict[str, Any]:
|
||||
"""Create a new run."""
|
||||
from datetime import datetime
|
||||
username: str,
|
||||
) -> Tuple[bool, Optional[str]]:
|
||||
"""
|
||||
Discard (delete) a run record.
|
||||
|
||||
run = {
|
||||
"run_id": run_id,
|
||||
"recipe": f"recipe:{recipe_id}",
|
||||
"inputs": inputs,
|
||||
"actor_id": actor_id,
|
||||
"status": "pending",
|
||||
"created_at": datetime.utcnow().isoformat(),
|
||||
}
|
||||
|
||||
self.redis.set(f"{self.run_prefix}{run_id}", json.dumps(run))
|
||||
return run
|
||||
|
||||
async def update_run(self, run_id: str, updates: Dict[str, Any]) -> Optional[Dict[str, Any]]:
|
||||
"""Update a run's fields."""
|
||||
Note: This removes the run record but not the output content.
|
||||
"""
|
||||
run = await self.get_run(run_id)
|
||||
if not run:
|
||||
return None
|
||||
return False, f"Run {run_id} not found"
|
||||
|
||||
run.update(updates)
|
||||
self.redis.set(f"{self.run_prefix}{run_id}", json.dumps(run))
|
||||
return run
|
||||
# Check ownership
|
||||
run_owner = run.get("actor_id")
|
||||
if run_owner and run_owner not in (username, actor_id):
|
||||
return False, "Access denied"
|
||||
|
||||
async def delete_run(self, run_id: str) -> bool:
|
||||
"""Delete a run."""
|
||||
return self.redis.delete(f"{self.run_prefix}{run_id}") > 0
|
||||
# Remove task_id mapping from Redis
|
||||
self.redis.delete(f"{self.task_key_prefix}{run_id}")
|
||||
|
||||
async def load_plan(self, run_id: str) -> Optional[Dict[str, Any]]:
|
||||
"""Load execution plan for a run."""
|
||||
from pathlib import Path
|
||||
import os
|
||||
# Note: We don't delete from run_cache as that's a permanent record
|
||||
# of completed work. The content itself remains in cache.
|
||||
|
||||
# Try plan cache directory
|
||||
cache_dir = Path(os.environ.get("CACHE_DIR", "/tmp/artdag-cache"))
|
||||
plan_path = cache_dir / "plans" / f"{run_id}.json"
|
||||
return True, None
|
||||
|
||||
async def get_run_plan(self, run_id: str) -> Optional[Dict[str, Any]]:
|
||||
"""Get execution plan for a run."""
|
||||
plan_path = self.cache_dir / "plans" / f"{run_id}.json"
|
||||
if plan_path.exists():
|
||||
with open(plan_path) as f:
|
||||
return json.load(f)
|
||||
|
||||
# Also check for plan_id in run data
|
||||
run = await self.get_run(run_id)
|
||||
if run and run.get("plan_id"):
|
||||
plan_path = cache_dir / "plans" / f"{run['plan_id']}.json"
|
||||
if plan_path.exists():
|
||||
with open(plan_path) as f:
|
||||
return json.load(f)
|
||||
|
||||
return None
|
||||
|
||||
async def get_run_plan(self, run_id: str) -> Optional[Dict[str, Any]]:
|
||||
"""Get execution plan with step results merged in."""
|
||||
run = await self.get_run(run_id)
|
||||
if not run:
|
||||
return None
|
||||
|
||||
plan = await self.load_plan(run_id)
|
||||
|
||||
# If no stored plan, try to reconstruct from run data
|
||||
if not plan and run.get("step_results"):
|
||||
plan = {
|
||||
"plan_id": run.get("plan_id"),
|
||||
"recipe": run.get("recipe"),
|
||||
"steps": [],
|
||||
}
|
||||
|
||||
if plan and run.get("step_results"):
|
||||
# Merge step results into plan
|
||||
step_results = run.get("step_results", {})
|
||||
for step in plan.get("steps", []):
|
||||
step_id = step.get("id") or step.get("name")
|
||||
if step_id and step_id in step_results:
|
||||
result = step_results[step_id]
|
||||
step["cache_id"] = result.get("cache_id") or result.get("output_cache_id")
|
||||
step["status"] = result.get("status", "completed")
|
||||
step["cached"] = result.get("cached", False)
|
||||
step["outputs"] = result.get("outputs", [])
|
||||
|
||||
return plan
|
||||
|
||||
async def get_run_artifacts(self, run_id: str) -> List[Dict[str, Any]]:
|
||||
"""Get all artifacts (inputs + outputs) for a run."""
|
||||
run = await self.get_run(run_id)
|
||||
@@ -150,31 +341,16 @@ class RunService:
|
||||
|
||||
artifacts = []
|
||||
|
||||
def get_artifact_info(content_hash: str, role: str, step_name: str) -> Optional[Dict]:
|
||||
"""Get artifact info using cache manager."""
|
||||
def get_artifact_info(content_hash: str, role: str, name: str) -> Optional[Dict]:
|
||||
if self.cache.has_content(content_hash):
|
||||
path = self.cache.get_path(content_hash)
|
||||
path = self.cache.get_by_content_hash(content_hash)
|
||||
if path and path.exists():
|
||||
# Detect media type
|
||||
media_type = "file"
|
||||
try:
|
||||
with open(path, "rb") as f:
|
||||
header = f.read(12)
|
||||
if header[:4] == b'\x1a\x45\xdf\xa3' or header[4:8] == b'ftyp':
|
||||
media_type = "video"
|
||||
elif header[:8] == b'\x89PNG\r\n\x1a\n' or header[:2] == b'\xff\xd8':
|
||||
media_type = "image"
|
||||
elif header[:4] == b'RIFF' and header[8:12] == b'WAVE':
|
||||
media_type = "audio"
|
||||
except Exception:
|
||||
pass
|
||||
|
||||
return {
|
||||
"hash": content_hash,
|
||||
"size_bytes": path.stat().st_size,
|
||||
"media_type": media_type,
|
||||
"media_type": detect_media_type(path),
|
||||
"role": role,
|
||||
"step_name": step_name,
|
||||
"step_name": name,
|
||||
}
|
||||
return None
|
||||
|
||||
@@ -182,50 +358,28 @@ class RunService:
|
||||
inputs = run.get("inputs", [])
|
||||
if isinstance(inputs, dict):
|
||||
inputs = list(inputs.values())
|
||||
for i, content_hash in enumerate(inputs):
|
||||
info = get_artifact_info(content_hash, "input", f"Input {i + 1}")
|
||||
for i, h in enumerate(inputs):
|
||||
info = get_artifact_info(h, "input", f"Input {i + 1}")
|
||||
if info:
|
||||
artifacts.append(info)
|
||||
|
||||
# Add step outputs from step_results
|
||||
step_results = run.get("step_results", {})
|
||||
for step_id, result in step_results.items():
|
||||
cache_id = result.get("cache_id") or result.get("output_cache_id")
|
||||
if cache_id:
|
||||
info = get_artifact_info(cache_id, "step_output", step_id)
|
||||
if info:
|
||||
artifacts.append(info)
|
||||
# Also add any additional outputs
|
||||
for output in result.get("outputs", []):
|
||||
if output and output != cache_id:
|
||||
info = get_artifact_info(output, "step_output", step_id)
|
||||
if info:
|
||||
artifacts.append(info)
|
||||
|
||||
# Add final output
|
||||
# Add output
|
||||
if run.get("output_hash"):
|
||||
output_hash = run["output_hash"]
|
||||
# Avoid duplicates
|
||||
if not any(a["hash"] == output_hash for a in artifacts):
|
||||
info = get_artifact_info(output_hash, "output", "Final Output")
|
||||
if info:
|
||||
artifacts.append(info)
|
||||
info = get_artifact_info(run["output_hash"], "output", "Output")
|
||||
if info:
|
||||
artifacts.append(info)
|
||||
|
||||
return artifacts
|
||||
|
||||
async def get_run_analysis(self, run_id: str) -> List[Dict[str, Any]]:
|
||||
"""Get analysis data for each input in a run."""
|
||||
from pathlib import Path
|
||||
import os
|
||||
|
||||
run = await self.get_run(run_id)
|
||||
if not run:
|
||||
return []
|
||||
|
||||
cache_dir = Path(os.environ.get("CACHE_DIR", "/tmp/artdag-cache"))
|
||||
analysis_dir = cache_dir / "analysis"
|
||||
|
||||
analysis_dir = self.cache_dir / "analysis"
|
||||
results = []
|
||||
|
||||
inputs = run.get("inputs", [])
|
||||
if isinstance(inputs, dict):
|
||||
inputs = list(inputs.values())
|
||||
@@ -247,8 +401,11 @@ class RunService:
|
||||
"has_analysis": analysis_data is not None,
|
||||
"tempo": analysis_data.get("tempo") if analysis_data else None,
|
||||
"beat_times": analysis_data.get("beat_times", []) if analysis_data else [],
|
||||
"energy": analysis_data.get("energy") if analysis_data else None,
|
||||
"raw": analysis_data,
|
||||
})
|
||||
|
||||
return results
|
||||
|
||||
def detect_media_type(self, path: Path) -> str:
|
||||
"""Detect media type for a file path."""
|
||||
return detect_media_type(path)
|
||||
|
||||
Reference in New Issue
Block a user