From 854396680fdf2953fea14867332d2de724f55286 Mon Sep 17 00:00:00 2001 From: giles Date: Sun, 11 Jan 2026 14:05:31 +0000 Subject: [PATCH] Refactor storage: remove Redis duplication, use proper data tiers - Recipes: Now content-addressed only (cache + IPFS), removed Redis storage - Runs: Completed runs stored in PostgreSQL, Redis only for task_id mapping - Add list_runs_by_actor() to database.py for paginated run queries - Add list_by_type() to cache_manager for filtering by node_type - Fix upload endpoint to return size and filename fields - Fix recipe run endpoint with proper DAG input binding - Fix get_run_service() dependency to pass database module Storage architecture: - Redis: Ephemeral only (sessions, task mappings with TTL) - PostgreSQL: Permanent records (completed runs, metadata) - Cache: Content-addressed files (recipes, media, outputs) Co-Authored-By: Claude Opus 4.5 --- app/dependencies.py | 3 +- app/routers/cache.py | 7 +- app/routers/recipes.py | 64 +++-- app/services/cache_service.py | 511 ++++++++++++++++++++++++++++----- app/services/recipe_service.py | 155 +++++++--- app/services/run_service.py | 445 ++++++++++++++++++---------- cache_manager.py | 16 ++ database.py | 28 ++ 8 files changed, 965 insertions(+), 264 deletions(-) diff --git a/app/dependencies.py b/app/dependencies.py index 7a685f0..c433848 100644 --- a/app/dependencies.py +++ b/app/dependencies.py @@ -112,6 +112,7 @@ def get_run_service(): """Get the run service.""" from .services.run_service import RunService return RunService( + database=get_database(), redis=get_redis_client(), cache=get_cache_manager(), ) @@ -121,7 +122,7 @@ def get_recipe_service(): """Get the recipe service.""" from .services.recipe_service import RecipeService return RecipeService( - redis=get_redis_client(), + redis=get_redis_client(), # Kept for API compatibility, not used cache=get_cache_manager(), ) diff --git a/app/routers/cache.py b/app/routers/cache.py index d2aed9f..ec0f1b0 100644 --- a/app/routers/cache.py +++ b/app/routers/cache.py @@ -219,7 +219,12 @@ async def upload_content( if error: raise HTTPException(400, error) - return {"content_hash": content_hash, "uploaded": True} + return { + "content_hash": content_hash, + "filename": file.filename, + "size": len(content), + "uploaded": True, + } # Media listing endpoint diff --git a/app/routers/recipes.py b/app/routers/recipes.py index 2ac45c6..517de4b 100644 --- a/app/routers/recipes.py +++ b/app/routers/recipes.py @@ -172,24 +172,56 @@ async def run_recipe( if not recipe: raise HTTPException(404, "Recipe not found") - # Create run using run service - run_service = RunService(database, get_redis_client(), get_cache_manager()) - run, error = await run_service.create_run( - recipe=recipe.get("name", recipe_id), - inputs=req.inputs, - use_dag=True, - actor_id=ctx.actor_id, - l2_server=ctx.l2_server, - ) + try: + import json - if error: - raise HTTPException(400, error) + # Create run using run service + run_service = RunService(database, get_redis_client(), get_cache_manager()) - return { - "run_id": run.run_id, - "status": run.status, - "message": "Recipe execution started", - } + # If recipe has a DAG definition, bind inputs and convert to JSON + recipe_dag = recipe.get("dag") + dag_json = None + if recipe_dag and isinstance(recipe_dag, dict): + # Bind inputs to the DAG's source nodes + dag_copy = json.loads(json.dumps(recipe_dag)) # Deep copy + nodes = dag_copy.get("nodes", {}) + + # Map input names to content hashes + for input_name, content_hash in req.inputs.items(): + if input_name in nodes: + node = nodes[input_name] + if node.get("type") == "SOURCE": + if "config" not in node: + node["config"] = {} + node["config"]["content_hash"] = content_hash + + dag_json = json.dumps(dag_copy) + + run, error = await run_service.create_run( + recipe=recipe.get("name", recipe_id), + inputs=req.inputs, + use_dag=True, + dag_json=dag_json, + actor_id=ctx.actor_id, + l2_server=ctx.l2_server, + ) + + if error: + raise HTTPException(400, error) + + if not run: + raise HTTPException(500, "Run creation returned no result") + + return { + "run_id": run.run_id, + "status": run.status, + "message": "Recipe execution started", + } + except HTTPException: + raise + except Exception as e: + logger.exception(f"Error running recipe {recipe_id}") + raise HTTPException(500, f"Run failed: {e}") @router.get("/{recipe_id}/dag") diff --git a/app/services/cache_service.py b/app/services/cache_service.py index 7c31ff2..bc309dc 100644 --- a/app/services/cache_service.py +++ b/app/services/cache_service.py @@ -2,10 +2,81 @@ Cache Service - business logic for cache and media management. """ +import asyncio +import json +import os +import subprocess from pathlib import Path -from typing import Optional, List, Dict, Any +from typing import Optional, List, Dict, Any, Tuple -from artdag_common.utils.media import detect_media_type, get_mime_type +import httpx + + +def detect_media_type(cache_path: Path) -> str: + """Detect if file is image, video, or audio based on magic bytes.""" + try: + with open(cache_path, "rb") as f: + header = f.read(32) + except Exception: + return "unknown" + + # Video signatures + if header[:4] == b'\x1a\x45\xdf\xa3': # WebM/MKV + return "video" + if len(header) > 8 and header[4:8] == b'ftyp': # MP4/MOV + return "video" + if header[:4] == b'RIFF' and len(header) > 12 and header[8:12] == b'AVI ': # AVI + return "video" + + # Image signatures + if header[:8] == b'\x89PNG\r\n\x1a\n': # PNG + return "image" + if header[:2] == b'\xff\xd8': # JPEG + return "image" + if header[:6] in (b'GIF87a', b'GIF89a'): # GIF + return "image" + if header[:4] == b'RIFF' and len(header) > 12 and header[8:12] == b'WEBP': # WebP + return "image" + + # Audio signatures + if header[:4] == b'RIFF' and len(header) > 12 and header[8:12] == b'WAVE': # WAV + return "audio" + if header[:3] == b'ID3' or header[:2] == b'\xff\xfb': # MP3 + return "audio" + if header[:4] == b'fLaC': # FLAC + return "audio" + + return "unknown" + + +def get_mime_type(path: Path) -> str: + """Get MIME type based on file magic bytes.""" + media_type = detect_media_type(path) + if media_type == "video": + try: + with open(path, "rb") as f: + header = f.read(12) + if header[:4] == b'\x1a\x45\xdf\xa3': + return "video/x-matroska" + return "video/mp4" + except Exception: + return "video/mp4" + elif media_type == "image": + try: + with open(path, "rb") as f: + header = f.read(8) + if header[:8] == b'\x89PNG\r\n\x1a\n': + return "image/png" + if header[:2] == b'\xff\xd8': + return "image/jpeg" + if header[:6] in (b'GIF87a', b'GIF89a'): + return "image/gif" + return "image/jpeg" + except Exception: + return "image/jpeg" + elif media_type == "audio": + return "audio/mpeg" + return "application/octet-stream" class CacheService: @@ -15,18 +86,24 @@ class CacheService: Handles content retrieval, metadata, and media type detection. """ - def __init__(self, cache_manager, database): - self.cache = cache_manager + def __init__(self, database, cache_manager): self.db = database + self.cache = cache_manager + self.cache_dir = Path(os.environ.get("CACHE_DIR", "/tmp/artdag-cache")) - async def get_item(self, content_hash: str) -> Optional[Dict[str, Any]]: - """Get cached item by content hash.""" - path = self.cache.get_by_content_hash(content_hash) + async def get_cache_item(self, content_hash: str) -> Optional[Dict[str, Any]]: + """Get cached item with full metadata for display.""" + # Check if content exists + if not self.cache.has_content(content_hash): + return None + + path = self.cache.get_content_path(content_hash) if not path or not path.exists(): return None # Get metadata from database - meta = await self.db.get_cache_item(content_hash) + meta = await self.db.load_item_metadata(content_hash, None) + cache_item = await self.db.get_cache_item(content_hash) media_type = detect_media_type(path) mime_type = get_mime_type(path) @@ -38,76 +115,370 @@ class CacheService: "media_type": media_type, "mime_type": mime_type, "size": size, - "name": meta.get("name") if meta else None, - "description": meta.get("description") if meta else None, - "tags": meta.get("tags", []) if meta else [], - "ipfs_cid": meta.get("ipfs_cid") if meta else None, + "ipfs_cid": cache_item.get("ipfs_cid") if cache_item else None, + "meta": meta, } - async def get_path(self, content_hash: str) -> Optional[Path]: - """Get the file path for cached content.""" - return self.cache.get_by_content_hash(content_hash) + async def check_access(self, content_hash: str, actor_id: str, username: str) -> bool: + """Check if user has access to content.""" + user_hashes = await self._get_user_cache_hashes(username, actor_id) + return content_hash in user_hashes - async def list_items( - self, - actor_id: str = None, - media_type: str = None, - page: int = 1, - limit: int = 20, - ) -> Dict[str, Any]: - """List cached items with filters and pagination.""" - # Get items from database - items = await self.db.list_cache_items( - actor_id=actor_id, - media_type=media_type, - offset=(page - 1) * limit, - limit=limit, - ) + async def _get_user_cache_hashes(self, username: str, actor_id: Optional[str] = None) -> set: + """Get all cache hashes owned by or associated with a user.""" + match_values = [username] + if actor_id: + match_values.append(actor_id) - total = await self.db.count_cache_items(actor_id=actor_id, media_type=media_type) + hashes = set() - return { - "items": items, - "pagination": { - "page": page, - "limit": limit, - "total": total, - "has_more": page * limit < total, - } - } + # Query database for items owned by user + if actor_id: + try: + db_items = await self.db.get_user_items(actor_id) + for item in db_items: + hashes.add(item["content_hash"]) + except Exception: + pass + + # Legacy: Files uploaded by user (JSON metadata) + if self.cache_dir.exists(): + for f in self.cache_dir.iterdir(): + if f.name.endswith('.meta.json'): + try: + with open(f, 'r') as mf: + meta = json.load(mf) + if meta.get("uploader") in match_values: + hashes.add(f.name.replace('.meta.json', '')) + except Exception: + pass + + # Files from user's runs (inputs and outputs) + runs = await self._list_user_runs(username, actor_id) + for run in runs: + inputs = run.get("inputs", []) + if isinstance(inputs, dict): + inputs = list(inputs.values()) + hashes.update(inputs) + if run.get("output_hash"): + hashes.add(run["output_hash"]) + + return hashes + + async def _list_user_runs(self, username: str, actor_id: Optional[str]) -> List[Dict]: + """List runs for a user (helper for access check).""" + from ..dependencies import get_redis_client + import json + + redis = get_redis_client() + runs = [] + cursor = 0 + prefix = "artdag:run:" + + while True: + cursor, keys = redis.scan(cursor=cursor, match=f"{prefix}*", count=100) + for key in keys: + data = redis.get(key) + if data: + run = json.loads(data) + if run.get("actor_id") in (username, actor_id) or run.get("username") in (username, actor_id): + runs.append(run) + if cursor == 0: + break + + return runs + + async def get_raw_file(self, content_hash: str) -> Tuple[Optional[Path], Optional[str], Optional[str]]: + """Get raw file path, media type, and filename for download.""" + if not self.cache.has_content(content_hash): + return None, None, None + + path = self.cache.get_content_path(content_hash) + if not path or not path.exists(): + return None, None, None + + media_type = detect_media_type(path) + mime = get_mime_type(path) + + # Determine extension + ext = "bin" + if media_type == "video": + try: + with open(path, "rb") as f: + header = f.read(12) + if header[:4] == b'\x1a\x45\xdf\xa3': + ext = "mkv" + else: + ext = "mp4" + except Exception: + ext = "mp4" + elif media_type == "image": + try: + with open(path, "rb") as f: + header = f.read(8) + if header[:8] == b'\x89PNG\r\n\x1a\n': + ext = "png" + else: + ext = "jpg" + except Exception: + ext = "jpg" + + filename = f"{content_hash}.{ext}" + return path, mime, filename + + async def get_as_mp4(self, content_hash: str) -> Tuple[Optional[Path], Optional[str]]: + """Get content as MP4, transcoding if necessary. Returns (path, error).""" + if not self.cache.has_content(content_hash): + return None, f"Content {content_hash} not in cache" + + path = self.cache.get_content_path(content_hash) + if not path or not path.exists(): + return None, f"Content {content_hash} not in cache" + + # Check if video + media_type = detect_media_type(path) + if media_type != "video": + return None, "Content is not a video" + + # Check for cached MP4 + mp4_path = self.cache_dir / f"{content_hash}.mp4" + if mp4_path.exists(): + return mp4_path, None + + # Check if already MP4 format + try: + result = subprocess.run( + ["ffprobe", "-v", "error", "-select_streams", "v:0", + "-show_entries", "format=format_name", "-of", "csv=p=0", str(path)], + capture_output=True, text=True, timeout=10 + ) + if "mp4" in result.stdout.lower() or "mov" in result.stdout.lower(): + return path, None + except Exception: + pass + + # Transcode to MP4 + transcode_path = self.cache_dir / f"{content_hash}.transcoding.mp4" + try: + result = subprocess.run( + ["ffmpeg", "-y", "-i", str(path), + "-c:v", "libx264", "-preset", "fast", "-crf", "23", + "-c:a", "aac", "-b:a", "128k", + "-movflags", "+faststart", + str(transcode_path)], + capture_output=True, text=True, timeout=600 + ) + if result.returncode != 0: + return None, f"Transcoding failed: {result.stderr[:200]}" + + transcode_path.rename(mp4_path) + return mp4_path, None + + except subprocess.TimeoutExpired: + if transcode_path.exists(): + transcode_path.unlink() + return None, "Transcoding timed out" + except Exception as e: + if transcode_path.exists(): + transcode_path.unlink() + return None, f"Transcoding failed: {e}" + + async def get_metadata(self, content_hash: str, actor_id: str) -> Optional[Dict[str, Any]]: + """Get content metadata.""" + if not self.cache.has_content(content_hash): + return None + return await self.db.load_item_metadata(content_hash, actor_id) async def update_metadata( self, content_hash: str, - name: str = None, + actor_id: str, + title: str = None, description: str = None, tags: List[str] = None, - ) -> bool: - """Update item metadata.""" - return await self.db.update_cache_metadata( + custom: Dict[str, Any] = None, + ) -> Tuple[bool, Optional[str]]: + """Update content metadata. Returns (success, error).""" + if not self.cache.has_content(content_hash): + return False, "Content not found" + + # Build update dict + updates = {} + if title is not None: + updates["title"] = title + if description is not None: + updates["description"] = description + if tags is not None: + updates["tags"] = tags + if custom is not None: + updates["custom"] = custom + + try: + await self.db.update_item_metadata(content_hash, actor_id, **updates) + return True, None + except Exception as e: + return False, str(e) + + async def publish_to_l2( + self, + content_hash: str, + actor_id: str, + l2_server: str, + auth_token: str, + ) -> Tuple[Optional[str], Optional[str]]: + """Publish content to L2 and IPFS. Returns (ipfs_cid, error).""" + if not self.cache.has_content(content_hash): + return None, "Content not found" + + # Get IPFS CID + cache_item = await self.db.get_cache_item(content_hash) + ipfs_cid = cache_item.get("ipfs_cid") if cache_item else None + + # Get metadata for origin info + meta = await self.db.load_item_metadata(content_hash, actor_id) + origin = meta.get("origin") if meta else None + + if not origin or "type" not in origin: + return None, "Origin must be set before publishing" + + if not auth_token: + return None, "Authentication token required" + + # Call L2 publish-cache endpoint + try: + async with httpx.AsyncClient(timeout=30) as client: + resp = await client.post( + f"{l2_server}/assets/publish-cache", + headers={"Authorization": f"Bearer {auth_token}"}, + json={ + "content_hash": content_hash, + "ipfs_cid": ipfs_cid, + "asset_name": meta.get("title") or content_hash[:16], + "asset_type": detect_media_type(self.cache.get_content_path(content_hash)), + "origin": origin, + "description": meta.get("description"), + "tags": meta.get("tags", []), + } + ) + resp.raise_for_status() + l2_result = resp.json() + except httpx.HTTPStatusError as e: + error_detail = str(e) + try: + error_detail = e.response.json().get("detail", str(e)) + except Exception: + pass + return None, f"L2 publish failed: {error_detail}" + except Exception as e: + return None, f"L2 publish failed: {e}" + + # Update local metadata with publish status + await self.db.save_l2_share( content_hash=content_hash, - name=name, - description=description, - tags=tags, + actor_id=actor_id, + l2_server=l2_server, + asset_name=meta.get("title") or content_hash[:16], + content_type=detect_media_type(self.cache.get_content_path(content_hash)) + ) + await self.db.update_item_metadata( + content_hash=content_hash, + actor_id=actor_id, + pinned=True, + pin_reason="published" ) - async def delete_item(self, content_hash: str) -> bool: - """Delete a cached item.""" - path = self.cache.get_by_content_hash(content_hash) - if path and path.exists(): - path.unlink() + return l2_result.get("ipfs_cid") or ipfs_cid, None - # Remove from database - await self.db.delete_cache_item(content_hash) - return True + async def delete_content(self, content_hash: str, actor_id: str) -> Tuple[bool, Optional[str]]: + """Delete content from cache. Returns (success, error).""" + if not self.cache.has_content(content_hash): + return False, "Content not found" - def has_content(self, content_hash: str) -> bool: - """Check if content exists in cache.""" - return self.cache.has_content(content_hash) + # Check if pinned + meta = await self.db.load_item_metadata(content_hash, actor_id) + if meta and meta.get("pinned"): + pin_reason = meta.get("pin_reason", "unknown") + return False, f"Cannot discard pinned item (reason: {pin_reason})" - def get_ipfs_cid(self, content_hash: str) -> Optional[str]: - """Get IPFS CID for cached content.""" - return self.cache.get_ipfs_cid(content_hash) + # Check deletion rules via cache_manager + can_delete, reason = self.cache.can_delete(content_hash) + if not can_delete: + return False, f"Cannot discard: {reason}" + + # Delete via cache_manager + success, msg = self.cache.delete_by_content_hash(content_hash) + + # Clean up legacy metadata files + meta_path = self.cache_dir / f"{content_hash}.meta.json" + if meta_path.exists(): + meta_path.unlink() + mp4_path = self.cache_dir / f"{content_hash}.mp4" + if mp4_path.exists(): + mp4_path.unlink() + + return True, None + + async def import_from_ipfs(self, ipfs_cid: str, actor_id: str) -> Tuple[Optional[str], Optional[str]]: + """Import content from IPFS. Returns (content_hash, error).""" + try: + import ipfs_client + + # Download from IPFS + legacy_dir = self.cache_dir / "legacy" + legacy_dir.mkdir(parents=True, exist_ok=True) + tmp_path = legacy_dir / f"import-{ipfs_cid[:16]}" + + if not ipfs_client.get_file(ipfs_cid, str(tmp_path)): + return None, f"Could not fetch CID {ipfs_cid} from IPFS" + + # Store in cache + cached, _ = self.cache.put(tmp_path, node_type="import", move=True) + content_hash = cached.content_hash + + # Save to database + await self.db.create_cache_item(content_hash, ipfs_cid) + await self.db.save_item_metadata( + content_hash=content_hash, + actor_id=actor_id, + item_type="media", + filename=f"ipfs-{ipfs_cid[:16]}" + ) + + return content_hash, None + except Exception as e: + return None, f"Import failed: {e}" + + async def upload_content( + self, + content: bytes, + filename: str, + actor_id: str, + ) -> Tuple[Optional[str], Optional[str]]: + """Upload content to cache. Returns (content_hash, error).""" + import tempfile + + try: + # Write to temp file + with tempfile.NamedTemporaryFile(delete=False) as tmp: + tmp.write(content) + tmp_path = Path(tmp.name) + + # Store in cache + cached, ipfs_cid = self.cache.put(tmp_path, node_type="upload", move=True) + content_hash = cached.content_hash + + # Save to database + await self.db.create_cache_item(content_hash, ipfs_cid) + await self.db.save_item_metadata( + content_hash=content_hash, + actor_id=actor_id, + item_type="media", + filename=filename + ) + + return content_hash, None + except Exception as e: + return None, f"Upload failed: {e}" async def list_media( self, @@ -118,12 +489,20 @@ class CacheService: media_type: str = None, ) -> List[Dict[str, Any]]: """List media items in cache.""" - # Use list_items internally, converting offset to page - page = (offset // limit) + 1 if limit > 0 else 1 - result = await self.list_items( + # Get items from database + items = await self.db.list_cache_items( actor_id=actor_id or username, media_type=media_type, - page=page, + offset=offset, limit=limit, ) - return result.get("items", []) + return items + + # Legacy compatibility methods + def has_content(self, content_hash: str) -> bool: + """Check if content exists in cache.""" + return self.cache.has_content(content_hash) + + def get_ipfs_cid(self, content_hash: str) -> Optional[str]: + """Get IPFS CID for cached content.""" + return self.cache.get_ipfs_cid(content_hash) diff --git a/app/services/recipe_service.py b/app/services/recipe_service.py index 321792b..a61c39c 100644 --- a/app/services/recipe_service.py +++ b/app/services/recipe_service.py @@ -1,9 +1,14 @@ """ Recipe Service - business logic for recipe management. + +Recipes are content-addressed YAML files stored in the cache (and IPFS). +The recipe ID is the content hash of the YAML file. """ -from typing import Optional, List, Dict, Any -import json +import tempfile +from pathlib import Path +from typing import Optional, List, Dict, Any, Tuple + import yaml @@ -11,49 +16,54 @@ class RecipeService: """ Service for managing recipes. - Handles recipe parsing, validation, and DAG building. + Recipes are stored in the content-addressed cache, not Redis. """ def __init__(self, redis, cache): + # Redis kept for compatibility but not used for recipe storage self.redis = redis self.cache = cache - self.recipe_prefix = "recipe:" async def get_recipe(self, recipe_id: str) -> Optional[Dict[str, Any]]: """Get a recipe by ID (content hash).""" - # First check Redis - data = self.redis.get(f"{self.recipe_prefix}{recipe_id}") - if data: - return json.loads(data) - - # Fall back to cache + # Get from cache (content-addressed storage) path = self.cache.get_by_content_hash(recipe_id) - if path and path.exists(): - with open(path) as f: - return yaml.safe_load(f) + if not path or not path.exists(): + return None - return None + with open(path) as f: + recipe_data = yaml.safe_load(f) + + # Add the recipe_id to the data for convenience + if isinstance(recipe_data, dict): + recipe_data["recipe_id"] = recipe_id + # Get IPFS CID if available + ipfs_cid = self.cache.get_ipfs_cid(recipe_id) + if ipfs_cid: + recipe_data["ipfs_cid"] = ipfs_cid + + return recipe_data async def list_recipes(self, actor_id: str = None, offset: int = 0, limit: int = 20) -> list: - """List available recipes with pagination.""" - recipes = [] - cursor = 0 + """ + List available recipes. - while True: - cursor, keys = self.redis.scan( - cursor=cursor, - match=f"{self.recipe_prefix}*", - count=100 - ) - for key in keys: - data = self.redis.get(key) - if data: - recipe = json.loads(data) + Note: This scans the cache for recipe files. For production, + you might want a database index of recipes by owner. + """ + # Get all cached items and filter for recipes + # This is a simplified implementation - production would use a proper index + recipes = [] + + # Check if cache has a list method for recipes + if hasattr(self.cache, 'list_by_type'): + items = self.cache.list_by_type('recipe') + for content_hash in items: + recipe = await self.get_recipe(content_hash) + if recipe: # Filter by actor if specified - if actor_id is None or recipe.get("actor_id") == actor_id: + if actor_id is None or recipe.get("uploader") == actor_id: recipes.append(recipe) - if cursor == 0: - break # Sort by name recipes.sort(key=lambda r: r.get("name", "")) @@ -61,13 +71,86 @@ class RecipeService: # Paginate return recipes[offset:offset + limit] - async def save_recipe(self, recipe_id: str, recipe_data: Dict[str, Any]) -> None: - """Save a recipe to Redis.""" - self.redis.set(f"{self.recipe_prefix}{recipe_id}", json.dumps(recipe_data)) + async def upload_recipe( + self, + yaml_content: str, + uploader: str, + name: str = None, + description: str = None, + ) -> Tuple[Optional[str], Optional[str]]: + """ + Upload a recipe from YAML content. - async def delete_recipe(self, recipe_id: str) -> bool: - """Delete a recipe.""" - return self.redis.delete(f"{self.recipe_prefix}{recipe_id}") > 0 + The recipe is stored in the cache and optionally pinned to IPFS. + Returns (recipe_id, error_message). + """ + # Validate YAML + try: + recipe_data = yaml.safe_load(yaml_content) + except yaml.YAMLError as e: + return None, f"Invalid YAML: {e}" + + if not isinstance(recipe_data, dict): + return None, "Recipe must be a YAML dictionary" + + # Add uploader info to the YAML before storing + recipe_data["uploader"] = uploader + if name: + recipe_data["name"] = name + if description: + recipe_data["description"] = description + + # Serialize back to YAML (with added metadata) + final_yaml = yaml.dump(recipe_data, default_flow_style=False) + + # Write to temp file for caching + try: + with tempfile.NamedTemporaryFile(delete=False, suffix=".yaml", mode="w") as tmp: + tmp.write(final_yaml) + tmp_path = Path(tmp.name) + + # Store in cache (content-addressed, auto-pins to IPFS) + cached, ipfs_cid = self.cache.put(tmp_path, node_type="recipe", move=True) + recipe_id = cached.content_hash + + return recipe_id, None + + except Exception as e: + return None, f"Failed to cache recipe: {e}" + + async def delete_recipe(self, recipe_id: str, actor_id: str = None) -> Tuple[bool, Optional[str]]: + """ + Delete a recipe. + + Note: This only removes from local cache. IPFS copies persist. + Returns (success, error_message). + """ + # Get recipe to check ownership + recipe = await self.get_recipe(recipe_id) + if not recipe: + return False, "Recipe not found" + + # Check ownership if actor_id provided + if actor_id: + recipe_owner = recipe.get("uploader") + if recipe_owner and recipe_owner != actor_id: + return False, "Cannot delete: you don't own this recipe" + + # Delete from cache + try: + if hasattr(self.cache, 'delete_by_content_hash'): + success, msg = self.cache.delete_by_content_hash(recipe_id) + if not success: + return False, msg + else: + # Fallback: get path and delete directly + path = self.cache.get_by_content_hash(recipe_id) + if path and path.exists(): + path.unlink() + + return True, None + except Exception as e: + return False, f"Failed to delete: {e}" def parse_yaml(self, yaml_content: str) -> Dict[str, Any]: """Parse recipe YAML content.""" diff --git a/app/services/run_service.py b/app/services/run_service.py index 0b25fb7..7a34635 100644 --- a/app/services/run_service.py +++ b/app/services/run_service.py @@ -1,147 +1,338 @@ """ Run Service - business logic for run management. + +Runs are content-addressed (run_id computed from inputs + recipe). +Completed runs are stored in PostgreSQL, not Redis. +In-progress runs are tracked via Celery task state. """ -from typing import Optional, List, Dict, Any +import hashlib import json +import os +from datetime import datetime, timezone +from pathlib import Path +from typing import Optional, List, Dict, Any, Tuple + + +def compute_run_id(input_hashes: list, recipe: str, recipe_hash: str = None) -> str: + """ + Compute a deterministic run_id from inputs and recipe. + + The run_id is a SHA3-256 hash of: + - Sorted input content hashes + - Recipe identifier (recipe_hash if provided, else "effect:{recipe}") + + This makes runs content-addressable: same inputs + recipe = same run_id. + """ + # Handle both list and dict inputs + if isinstance(input_hashes, dict): + sorted_inputs = sorted(input_hashes.values()) + else: + sorted_inputs = sorted(input_hashes) + + data = { + "inputs": sorted_inputs, + "recipe": recipe_hash or f"effect:{recipe}", + "version": "1", + } + json_str = json.dumps(data, sort_keys=True, separators=(",", ":")) + return hashlib.sha3_256(json_str.encode()).hexdigest() + + +def detect_media_type(cache_path: Path) -> str: + """Detect if file is image, video, or audio based on magic bytes.""" + try: + with open(cache_path, "rb") as f: + header = f.read(32) + except Exception: + return "unknown" + + # Video signatures + if header[:4] == b'\x1a\x45\xdf\xa3': # WebM/MKV + return "video" + if len(header) > 8 and header[4:8] == b'ftyp': # MP4/MOV + return "video" + if header[:4] == b'RIFF' and len(header) > 12 and header[8:12] == b'AVI ': # AVI + return "video" + + # Image signatures + if header[:8] == b'\x89PNG\r\n\x1a\n': # PNG + return "image" + if header[:2] == b'\xff\xd8': # JPEG + return "image" + if header[:6] in (b'GIF87a', b'GIF89a'): # GIF + return "image" + if header[:4] == b'RIFF' and len(header) > 12 and header[8:12] == b'WEBP': # WebP + return "image" + + return "unknown" class RunService: """ Service for managing recipe runs. - Handles run lifecycle, plan loading, and result aggregation. + Uses PostgreSQL for completed runs, Celery for task state. + Redis is only used for task_id mapping (ephemeral). """ def __init__(self, database, redis, cache): self.db = database - self.redis = redis + self.redis = redis # Only for task_id mapping self.cache = cache - self.run_prefix = "artdag:run:" + self.task_key_prefix = "artdag:task:" # run_id -> task_id mapping only + self.cache_dir = Path(os.environ.get("CACHE_DIR", "/tmp/artdag-cache")) async def get_run(self, run_id: str) -> Optional[Dict[str, Any]]: - """Get a run by ID.""" - data = self.redis.get(f"{self.run_prefix}{run_id}") - if not data: - return None - return json.loads(data) + """Get a run by ID. Checks database first, then Celery task state.""" + # Check database for completed run + cached = await self.db.get_run_cache(run_id) + if cached: + return { + "run_id": run_id, + "status": "completed", + "recipe": cached.get("recipe"), + "inputs": cached.get("inputs", []), + "output_hash": cached.get("output_hash"), + "ipfs_cid": cached.get("ipfs_cid"), + "provenance_cid": cached.get("provenance_cid"), + "actor_id": cached.get("actor_id"), + "created_at": cached.get("created_at"), + "completed_at": cached.get("created_at"), + } + + # Check if there's a running task + task_id = self.redis.get(f"{self.task_key_prefix}{run_id}") + if task_id: + if isinstance(task_id, bytes): + task_id = task_id.decode() + + # Get task state from Celery + from celery.result import AsyncResult + from celery_app import app as celery_app + + result = AsyncResult(task_id, app=celery_app) + status = result.status.lower() + + run_data = { + "run_id": run_id, + "status": status if status != "pending" else "pending", + "celery_task_id": task_id, + } + + # If task completed, get result + if result.ready(): + if result.successful(): + run_data["status"] = "completed" + task_result = result.result + if isinstance(task_result, dict): + run_data["output_hash"] = task_result.get("output_hash") + else: + run_data["status"] = "failed" + run_data["error"] = str(result.result) + + return run_data + + return None async def list_runs(self, actor_id: str, offset: int = 0, limit: int = 20) -> list: - """List runs for a user with pagination.""" - # Get all runs and filter by actor - # TODO: Use Redis index for efficient filtering - all_runs = [] - cursor = 0 + """List runs for a user. Returns completed runs from database.""" + # Get completed runs from database + runs = await self.db.list_runs_by_actor(actor_id, offset=offset, limit=limit) + # Also check for any pending tasks in Redis + pending = [] + cursor = 0 while True: cursor, keys = self.redis.scan( cursor=cursor, - match=f"{self.run_prefix}*", + match=f"{self.task_key_prefix}*", count=100 ) for key in keys: - data = self.redis.get(key) - if data: - run = json.loads(data) - if run.get("actor_id") == actor_id or run.get("username") == actor_id: - all_runs.append(run) + run_id = key.decode().replace(self.task_key_prefix, "") if isinstance(key, bytes) else key.replace(self.task_key_prefix, "") + # Check if this run belongs to the user and isn't already in results + if not any(r.get("run_id") == run_id for r in runs): + run = await self.get_run(run_id) + if run and run.get("status") in ("pending", "running"): + pending.append(run) if cursor == 0: break - # Sort by created_at descending + # Combine and sort + all_runs = pending + runs all_runs.sort(key=lambda r: r.get("created_at", ""), reverse=True) - # Paginate return all_runs[offset:offset + limit] async def create_run( + self, + recipe: str, + inputs: list, + output_name: str = None, + use_dag: bool = True, + dag_json: str = None, + actor_id: str = None, + l2_server: str = None, + ) -> Tuple[Optional[Dict[str, Any]], Optional[str]]: + """ + Create a new rendering run. Checks cache before executing. + + Returns (run_dict, error_message). + """ + import httpx + try: + from legacy_tasks import render_effect, execute_dag, build_effect_dag + except ImportError as e: + return None, f"Celery tasks not available: {e}" + + # Handle both list and dict inputs + if isinstance(inputs, dict): + input_list = list(inputs.values()) + else: + input_list = inputs + + # Compute content-addressable run_id + run_id = compute_run_id(input_list, recipe) + + # Generate output name if not provided + if not output_name: + output_name = f"{recipe}-{run_id[:8]}" + + # Check database cache first (completed runs) + cached_run = await self.db.get_run_cache(run_id) + if cached_run: + output_hash = cached_run.get("output_hash") + if output_hash and self.cache.has_content(output_hash): + return { + "run_id": run_id, + "status": "completed", + "recipe": recipe, + "inputs": input_list, + "output_name": output_name, + "output_hash": output_hash, + "ipfs_cid": cached_run.get("ipfs_cid"), + "provenance_cid": cached_run.get("provenance_cid"), + "created_at": cached_run.get("created_at"), + "completed_at": cached_run.get("created_at"), + "actor_id": actor_id, + }, None + + # Check L2 if not in local cache + if l2_server: + try: + async with httpx.AsyncClient(timeout=10) as client: + l2_resp = await client.get(f"{l2_server}/assets/by-run-id/{run_id}") + if l2_resp.status_code == 200: + l2_data = l2_resp.json() + output_hash = l2_data.get("output_hash") + ipfs_cid = l2_data.get("ipfs_cid") + if output_hash and ipfs_cid: + # Pull from IPFS to local cache + try: + import ipfs_client + legacy_dir = self.cache_dir / "legacy" + legacy_dir.mkdir(parents=True, exist_ok=True) + recovery_path = legacy_dir / output_hash + if ipfs_client.get_file(ipfs_cid, str(recovery_path)): + # Save to database cache + await self.db.save_run_cache( + run_id=run_id, + output_hash=output_hash, + recipe=recipe, + inputs=input_list, + ipfs_cid=ipfs_cid, + provenance_cid=l2_data.get("provenance_cid"), + actor_id=actor_id, + ) + return { + "run_id": run_id, + "status": "completed", + "recipe": recipe, + "inputs": input_list, + "output_hash": output_hash, + "ipfs_cid": ipfs_cid, + "provenance_cid": l2_data.get("provenance_cid"), + "created_at": datetime.now(timezone.utc).isoformat(), + "actor_id": actor_id, + }, None + except Exception: + pass # IPFS recovery failed, continue to run + except Exception: + pass # L2 lookup failed, continue to run + + # Not cached - submit to Celery + try: + if use_dag or recipe == "dag": + if dag_json: + dag_data = dag_json + else: + dag = build_effect_dag(input_list, recipe) + dag_data = dag.to_json() + + task = execute_dag.delay(dag_data, run_id) + else: + if len(input_list) != 1: + return None, "Legacy mode only supports single-input recipes. Use use_dag=true for multi-input." + task = render_effect.delay(input_list[0], recipe, output_name) + + # Store task_id mapping in Redis (ephemeral) + self.redis.setex( + f"{self.task_key_prefix}{run_id}", + 3600 * 24, # 24 hour TTL + task.id + ) + + return { + "run_id": run_id, + "status": "running", + "recipe": recipe, + "inputs": input_list, + "output_name": output_name, + "celery_task_id": task.id, + "created_at": datetime.now(timezone.utc).isoformat(), + "actor_id": actor_id, + }, None + + except Exception as e: + return None, f"Failed to submit task: {e}" + + async def discard_run( self, run_id: str, - recipe_id: str, - inputs: Dict[str, str], actor_id: str, - ) -> Dict[str, Any]: - """Create a new run.""" - from datetime import datetime + username: str, + ) -> Tuple[bool, Optional[str]]: + """ + Discard (delete) a run record. - run = { - "run_id": run_id, - "recipe": f"recipe:{recipe_id}", - "inputs": inputs, - "actor_id": actor_id, - "status": "pending", - "created_at": datetime.utcnow().isoformat(), - } - - self.redis.set(f"{self.run_prefix}{run_id}", json.dumps(run)) - return run - - async def update_run(self, run_id: str, updates: Dict[str, Any]) -> Optional[Dict[str, Any]]: - """Update a run's fields.""" + Note: This removes the run record but not the output content. + """ run = await self.get_run(run_id) if not run: - return None + return False, f"Run {run_id} not found" - run.update(updates) - self.redis.set(f"{self.run_prefix}{run_id}", json.dumps(run)) - return run + # Check ownership + run_owner = run.get("actor_id") + if run_owner and run_owner not in (username, actor_id): + return False, "Access denied" - async def delete_run(self, run_id: str) -> bool: - """Delete a run.""" - return self.redis.delete(f"{self.run_prefix}{run_id}") > 0 + # Remove task_id mapping from Redis + self.redis.delete(f"{self.task_key_prefix}{run_id}") - async def load_plan(self, run_id: str) -> Optional[Dict[str, Any]]: - """Load execution plan for a run.""" - from pathlib import Path - import os + # Note: We don't delete from run_cache as that's a permanent record + # of completed work. The content itself remains in cache. - # Try plan cache directory - cache_dir = Path(os.environ.get("CACHE_DIR", "/tmp/artdag-cache")) - plan_path = cache_dir / "plans" / f"{run_id}.json" + return True, None + + async def get_run_plan(self, run_id: str) -> Optional[Dict[str, Any]]: + """Get execution plan for a run.""" + plan_path = self.cache_dir / "plans" / f"{run_id}.json" if plan_path.exists(): with open(plan_path) as f: return json.load(f) - - # Also check for plan_id in run data - run = await self.get_run(run_id) - if run and run.get("plan_id"): - plan_path = cache_dir / "plans" / f"{run['plan_id']}.json" - if plan_path.exists(): - with open(plan_path) as f: - return json.load(f) - return None - async def get_run_plan(self, run_id: str) -> Optional[Dict[str, Any]]: - """Get execution plan with step results merged in.""" - run = await self.get_run(run_id) - if not run: - return None - - plan = await self.load_plan(run_id) - - # If no stored plan, try to reconstruct from run data - if not plan and run.get("step_results"): - plan = { - "plan_id": run.get("plan_id"), - "recipe": run.get("recipe"), - "steps": [], - } - - if plan and run.get("step_results"): - # Merge step results into plan - step_results = run.get("step_results", {}) - for step in plan.get("steps", []): - step_id = step.get("id") or step.get("name") - if step_id and step_id in step_results: - result = step_results[step_id] - step["cache_id"] = result.get("cache_id") or result.get("output_cache_id") - step["status"] = result.get("status", "completed") - step["cached"] = result.get("cached", False) - step["outputs"] = result.get("outputs", []) - - return plan - async def get_run_artifacts(self, run_id: str) -> List[Dict[str, Any]]: """Get all artifacts (inputs + outputs) for a run.""" run = await self.get_run(run_id) @@ -150,31 +341,16 @@ class RunService: artifacts = [] - def get_artifact_info(content_hash: str, role: str, step_name: str) -> Optional[Dict]: - """Get artifact info using cache manager.""" + def get_artifact_info(content_hash: str, role: str, name: str) -> Optional[Dict]: if self.cache.has_content(content_hash): - path = self.cache.get_path(content_hash) + path = self.cache.get_by_content_hash(content_hash) if path and path.exists(): - # Detect media type - media_type = "file" - try: - with open(path, "rb") as f: - header = f.read(12) - if header[:4] == b'\x1a\x45\xdf\xa3' or header[4:8] == b'ftyp': - media_type = "video" - elif header[:8] == b'\x89PNG\r\n\x1a\n' or header[:2] == b'\xff\xd8': - media_type = "image" - elif header[:4] == b'RIFF' and header[8:12] == b'WAVE': - media_type = "audio" - except Exception: - pass - return { "hash": content_hash, "size_bytes": path.stat().st_size, - "media_type": media_type, + "media_type": detect_media_type(path), "role": role, - "step_name": step_name, + "step_name": name, } return None @@ -182,50 +358,28 @@ class RunService: inputs = run.get("inputs", []) if isinstance(inputs, dict): inputs = list(inputs.values()) - for i, content_hash in enumerate(inputs): - info = get_artifact_info(content_hash, "input", f"Input {i + 1}") + for i, h in enumerate(inputs): + info = get_artifact_info(h, "input", f"Input {i + 1}") if info: artifacts.append(info) - # Add step outputs from step_results - step_results = run.get("step_results", {}) - for step_id, result in step_results.items(): - cache_id = result.get("cache_id") or result.get("output_cache_id") - if cache_id: - info = get_artifact_info(cache_id, "step_output", step_id) - if info: - artifacts.append(info) - # Also add any additional outputs - for output in result.get("outputs", []): - if output and output != cache_id: - info = get_artifact_info(output, "step_output", step_id) - if info: - artifacts.append(info) - - # Add final output + # Add output if run.get("output_hash"): - output_hash = run["output_hash"] - # Avoid duplicates - if not any(a["hash"] == output_hash for a in artifacts): - info = get_artifact_info(output_hash, "output", "Final Output") - if info: - artifacts.append(info) + info = get_artifact_info(run["output_hash"], "output", "Output") + if info: + artifacts.append(info) return artifacts async def get_run_analysis(self, run_id: str) -> List[Dict[str, Any]]: """Get analysis data for each input in a run.""" - from pathlib import Path - import os - run = await self.get_run(run_id) if not run: return [] - cache_dir = Path(os.environ.get("CACHE_DIR", "/tmp/artdag-cache")) - analysis_dir = cache_dir / "analysis" - + analysis_dir = self.cache_dir / "analysis" results = [] + inputs = run.get("inputs", []) if isinstance(inputs, dict): inputs = list(inputs.values()) @@ -247,8 +401,11 @@ class RunService: "has_analysis": analysis_data is not None, "tempo": analysis_data.get("tempo") if analysis_data else None, "beat_times": analysis_data.get("beat_times", []) if analysis_data else [], - "energy": analysis_data.get("energy") if analysis_data else None, "raw": analysis_data, }) return results + + def detect_media_type(self, path: Path) -> str: + """Detect media type for a file path.""" + return detect_media_type(path) diff --git a/cache_manager.py b/cache_manager.py index 2940199..57f4f60 100644 --- a/cache_manager.py +++ b/cache_manager.py @@ -519,6 +519,22 @@ class L1CacheManager: return files + def list_by_type(self, node_type: str) -> List[str]: + """ + List content hashes of all cached files of a specific type. + + Args: + node_type: Type to filter by (e.g., "recipe", "upload", "effect") + + Returns: + List of content hashes + """ + hashes = [] + for entry in self.cache.list_entries(): + if entry.node_type == node_type and entry.content_hash: + hashes.append(entry.content_hash) + return hashes + # ============ Activity Tracking ============ def record_activity(self, dag: DAG, run_id: str = None) -> Activity: diff --git a/database.py b/database.py index 1d59a6a..0a456d7 100644 --- a/database.py +++ b/database.py @@ -1132,6 +1132,34 @@ async def get_run_by_output(output_hash: str) -> Optional[dict]: return None +async def list_runs_by_actor(actor_id: str, offset: int = 0, limit: int = 20) -> List[dict]: + """List completed runs for a user, ordered by creation time (newest first).""" + async with pool.acquire() as conn: + rows = await conn.fetch( + """ + SELECT run_id, output_hash, ipfs_cid, provenance_cid, recipe, inputs, actor_id, created_at + FROM run_cache + WHERE actor_id = $1 + ORDER BY created_at DESC + LIMIT $2 OFFSET $3 + """, + actor_id, limit, offset + ) + return [ + { + "run_id": row["run_id"], + "output_hash": row["output_hash"], + "ipfs_cid": row["ipfs_cid"], + "provenance_cid": row["provenance_cid"], + "recipe": row["recipe"], + "inputs": row["inputs"], + "actor_id": row["actor_id"], + "created_at": row["created_at"].isoformat() if row["created_at"] else None, + } + for row in rows + ] + + # ============ Storage Backends ============ async def get_user_storage(actor_id: str) -> List[dict]: