""" Run Service - business logic for run management. """ from typing import Optional, List, Dict, Any import json class RunService: """ Service for managing recipe runs. Handles run lifecycle, plan loading, and result aggregation. """ def __init__(self, redis, cache): self.redis = redis self.cache = cache self.run_prefix = "run:" async def get_run(self, run_id: str) -> Optional[Dict[str, Any]]: """Get a run by ID.""" data = self.redis.get(f"{self.run_prefix}{run_id}") if not data: return None return json.loads(data) async def list_runs(self, actor_id: str, page: int = 1, limit: int = 20) -> Dict[str, Any]: """List runs for a user with pagination.""" # Get all runs and filter by actor # TODO: Use Redis index for efficient filtering all_runs = [] cursor = 0 while True: cursor, keys = self.redis.scan( cursor=cursor, match=f"{self.run_prefix}*", count=100 ) for key in keys: data = self.redis.get(key) if data: run = json.loads(data) if run.get("actor_id") == actor_id or run.get("username") == actor_id: all_runs.append(run) if cursor == 0: break # Sort by created_at descending all_runs.sort(key=lambda r: r.get("created_at", ""), reverse=True) # Paginate total = len(all_runs) start = (page - 1) * limit end = start + limit runs = all_runs[start:end] return { "runs": runs, "pagination": { "page": page, "limit": limit, "total": total, "has_more": end < total, } } async def create_run( self, run_id: str, recipe_id: str, inputs: Dict[str, str], actor_id: str, ) -> Dict[str, Any]: """Create a new run.""" from datetime import datetime run = { "run_id": run_id, "recipe": f"recipe:{recipe_id}", "inputs": inputs, "actor_id": actor_id, "status": "pending", "created_at": datetime.utcnow().isoformat(), } self.redis.set(f"{self.run_prefix}{run_id}", json.dumps(run)) return run async def update_run(self, run_id: str, updates: Dict[str, Any]) -> Optional[Dict[str, Any]]: """Update a run's fields.""" run = await self.get_run(run_id) if not run: return None run.update(updates) self.redis.set(f"{self.run_prefix}{run_id}", json.dumps(run)) return run async def delete_run(self, run_id: str) -> bool: """Delete a run.""" return self.redis.delete(f"{self.run_prefix}{run_id}") > 0 async def load_plan(self, run_id: str) -> Optional[Dict[str, Any]]: """Load execution plan for a run.""" from ..config import settings plan_path = settings.plan_cache_dir / f"{run_id}.json" if plan_path.exists(): with open(plan_path) as f: return json.load(f) return None