""" Cache Service - business logic for cache and media management. """ import asyncio import json import os import subprocess from pathlib import Path from typing import Optional, List, Dict, Any, Tuple, TYPE_CHECKING import httpx if TYPE_CHECKING: from database import Database from cache_manager import L1CacheManager def detect_media_type(cache_path: Path) -> str: """Detect if file is image, video, or audio based on magic bytes.""" try: with open(cache_path, "rb") as f: header = f.read(32) except Exception: return "unknown" # Video signatures if header[:4] == b'\x1a\x45\xdf\xa3': # WebM/MKV return "video" if len(header) > 8 and header[4:8] == b'ftyp': # MP4/MOV return "video" if header[:4] == b'RIFF' and len(header) > 12 and header[8:12] == b'AVI ': # AVI return "video" # Image signatures if header[:8] == b'\x89PNG\r\n\x1a\n': # PNG return "image" if header[:2] == b'\xff\xd8': # JPEG return "image" if header[:6] in (b'GIF87a', b'GIF89a'): # GIF return "image" if header[:4] == b'RIFF' and len(header) > 12 and header[8:12] == b'WEBP': # WebP return "image" # Audio signatures if header[:4] == b'RIFF' and len(header) > 12 and header[8:12] == b'WAVE': # WAV return "audio" if header[:3] == b'ID3' or header[:2] == b'\xff\xfb': # MP3 return "audio" if header[:4] == b'fLaC': # FLAC return "audio" return "unknown" def get_mime_type(path: Path) -> str: """Get MIME type based on file magic bytes.""" media_type = detect_media_type(path) if media_type == "video": try: with open(path, "rb") as f: header = f.read(12) if header[:4] == b'\x1a\x45\xdf\xa3': return "video/x-matroska" return "video/mp4" except Exception: return "video/mp4" elif media_type == "image": try: with open(path, "rb") as f: header = f.read(8) if header[:8] == b'\x89PNG\r\n\x1a\n': return "image/png" if header[:2] == b'\xff\xd8': return "image/jpeg" if header[:6] in (b'GIF87a', b'GIF89a'): return "image/gif" return "image/jpeg" except Exception: return "image/jpeg" elif media_type == "audio": return "audio/mpeg" return "application/octet-stream" class CacheService: """ Service for managing cached content. Handles content retrieval, metadata, and media type detection. """ def __init__(self, database: "Database", cache_manager: "L1CacheManager") -> None: self.db = database self.cache = cache_manager self.cache_dir = Path(os.environ.get("CACHE_DIR", "/tmp/artdag-cache")) async def get_cache_item(self, cid: str) -> Optional[Dict[str, Any]]: """Get cached item with full metadata for display.""" # Check if content exists if not self.cache.has_content(cid): return None path = self.cache.get_by_cid(cid) if not path or not path.exists(): return None # Get metadata from database meta = await self.db.load_item_metadata(cid, None) cache_item = await self.db.get_cache_item(cid) media_type = detect_media_type(path) mime_type = get_mime_type(path) size = path.stat().st_size return { "cid": cid, "path": str(path), "media_type": media_type, "mime_type": mime_type, "size": size, "ipfs_cid": cache_item.get("ipfs_cid") if cache_item else None, "meta": meta, } async def check_access(self, cid: str, actor_id: str, username: str) -> bool: """Check if user has access to content.""" user_hashes = await self._get_user_cache_hashes(username, actor_id) return cid in user_hashes async def _get_user_cache_hashes(self, username: str, actor_id: Optional[str] = None) -> set: """Get all cache hashes owned by or associated with a user.""" match_values = [username] if actor_id: match_values.append(actor_id) hashes = set() # Query database for items owned by user if actor_id: try: db_items = await self.db.get_user_items(actor_id) for item in db_items: hashes.add(item["cid"]) except Exception: pass # Legacy: Files uploaded by user (JSON metadata) if self.cache_dir.exists(): for f in self.cache_dir.iterdir(): if f.name.endswith('.meta.json'): try: with open(f, 'r') as mf: meta = json.load(mf) if meta.get("uploader") in match_values: hashes.add(f.name.replace('.meta.json', '')) except Exception: pass # Files from user's runs (inputs and outputs) runs = await self._list_user_runs(username, actor_id) for run in runs: inputs = run.get("inputs", []) if isinstance(inputs, dict): inputs = list(inputs.values()) hashes.update(inputs) if run.get("output_cid"): hashes.add(run["output_cid"]) return hashes async def _list_user_runs(self, username: str, actor_id: Optional[str]) -> List[Dict]: """List runs for a user (helper for access check).""" from ..dependencies import get_redis_client import json redis = get_redis_client() runs = [] cursor = 0 prefix = "artdag:run:" while True: cursor, keys = redis.scan(cursor=cursor, match=f"{prefix}*", count=100) for key in keys: data = redis.get(key) if data: run = json.loads(data) if run.get("actor_id") in (username, actor_id) or run.get("username") in (username, actor_id): runs.append(run) if cursor == 0: break return runs async def get_raw_file(self, cid: str) -> Tuple[Optional[Path], Optional[str], Optional[str]]: """Get raw file path, media type, and filename for download.""" if not self.cache.has_content(cid): return None, None, None path = self.cache.get_by_cid(cid) if not path or not path.exists(): return None, None, None media_type = detect_media_type(path) mime = get_mime_type(path) # Determine extension ext = "bin" if media_type == "video": try: with open(path, "rb") as f: header = f.read(12) if header[:4] == b'\x1a\x45\xdf\xa3': ext = "mkv" else: ext = "mp4" except Exception: ext = "mp4" elif media_type == "image": try: with open(path, "rb") as f: header = f.read(8) if header[:8] == b'\x89PNG\r\n\x1a\n': ext = "png" else: ext = "jpg" except Exception: ext = "jpg" filename = f"{cid}.{ext}" return path, mime, filename async def get_as_mp4(self, cid: str) -> Tuple[Optional[Path], Optional[str]]: """Get content as MP4, transcoding if necessary. Returns (path, error).""" if not self.cache.has_content(cid): return None, f"Content {cid} not in cache" path = self.cache.get_by_cid(cid) if not path or not path.exists(): return None, f"Content {cid} not in cache" # Check if video media_type = detect_media_type(path) if media_type != "video": return None, "Content is not a video" # Check for cached MP4 mp4_path = self.cache_dir / f"{cid}.mp4" if mp4_path.exists(): return mp4_path, None # Check if already MP4 format try: result = subprocess.run( ["ffprobe", "-v", "error", "-select_streams", "v:0", "-show_entries", "format=format_name", "-of", "csv=p=0", str(path)], capture_output=True, text=True, timeout=10 ) if "mp4" in result.stdout.lower() or "mov" in result.stdout.lower(): return path, None except Exception: pass # Transcode to MP4 transcode_path = self.cache_dir / f"{cid}.transcoding.mp4" try: result = subprocess.run( ["ffmpeg", "-y", "-i", str(path), "-c:v", "libx264", "-preset", "fast", "-crf", "23", "-c:a", "aac", "-b:a", "128k", "-movflags", "+faststart", str(transcode_path)], capture_output=True, text=True, timeout=600 ) if result.returncode != 0: return None, f"Transcoding failed: {result.stderr[:200]}" transcode_path.rename(mp4_path) return mp4_path, None except subprocess.TimeoutExpired: if transcode_path.exists(): transcode_path.unlink() return None, "Transcoding timed out" except Exception as e: if transcode_path.exists(): transcode_path.unlink() return None, f"Transcoding failed: {e}" async def get_metadata(self, cid: str, actor_id: str) -> Optional[Dict[str, Any]]: """Get content metadata.""" if not self.cache.has_content(cid): return None return await self.db.load_item_metadata(cid, actor_id) async def update_metadata( self, cid: str, actor_id: str, title: Optional[str] = None, description: Optional[str] = None, tags: Optional[List[str]] = None, custom: Optional[Dict[str, Any]] = None, ) -> Tuple[bool, Optional[str]]: """Update content metadata. Returns (success, error).""" if not self.cache.has_content(cid): return False, "Content not found" # Build update dict updates = {} if title is not None: updates["title"] = title if description is not None: updates["description"] = description if tags is not None: updates["tags"] = tags if custom is not None: updates["custom"] = custom try: await self.db.update_item_metadata(cid, actor_id, **updates) return True, None except Exception as e: return False, str(e) async def publish_to_l2( self, cid: str, actor_id: str, l2_server: str, auth_token: str, ) -> Tuple[Optional[str], Optional[str]]: """Publish content to L2 and IPFS. Returns (ipfs_cid, error).""" if not self.cache.has_content(cid): return None, "Content not found" # Get IPFS CID cache_item = await self.db.get_cache_item(cid) ipfs_cid = cache_item.get("ipfs_cid") if cache_item else None # Get metadata for origin info meta = await self.db.load_item_metadata(cid, actor_id) origin = meta.get("origin") if meta else None if not origin or "type" not in origin: return None, "Origin must be set before publishing" if not auth_token: return None, "Authentication token required" # Call L2 publish-cache endpoint try: async with httpx.AsyncClient(timeout=30) as client: resp = await client.post( f"{l2_server}/assets/publish-cache", headers={"Authorization": f"Bearer {auth_token}"}, json={ "cid": cid, "ipfs_cid": ipfs_cid, "asset_name": meta.get("title") or cid[:16], "asset_type": detect_media_type(self.cache.get_by_cid(cid)), "origin": origin, "description": meta.get("description"), "tags": meta.get("tags", []), } ) resp.raise_for_status() l2_result = resp.json() except httpx.HTTPStatusError as e: error_detail = str(e) try: error_detail = e.response.json().get("detail", str(e)) except Exception: pass return None, f"L2 publish failed: {error_detail}" except Exception as e: return None, f"L2 publish failed: {e}" # Update local metadata with publish status await self.db.save_l2_share( cid=cid, actor_id=actor_id, l2_server=l2_server, asset_name=meta.get("title") or cid[:16], content_type=detect_media_type(self.cache.get_by_cid(cid)) ) await self.db.update_item_metadata( cid=cid, actor_id=actor_id, pinned=True, pin_reason="published" ) return l2_result.get("ipfs_cid") or ipfs_cid, None async def delete_content(self, cid: str, actor_id: str) -> Tuple[bool, Optional[str]]: """Delete content from cache. Returns (success, error).""" if not self.cache.has_content(cid): return False, "Content not found" # Check if pinned meta = await self.db.load_item_metadata(cid, actor_id) if meta and meta.get("pinned"): pin_reason = meta.get("pin_reason", "unknown") return False, f"Cannot discard pinned item (reason: {pin_reason})" # Check deletion rules via cache_manager can_delete, reason = self.cache.can_delete(cid) if not can_delete: return False, f"Cannot discard: {reason}" # Delete via cache_manager success, msg = self.cache.delete_by_cid(cid) # Clean up legacy metadata files meta_path = self.cache_dir / f"{cid}.meta.json" if meta_path.exists(): meta_path.unlink() mp4_path = self.cache_dir / f"{cid}.mp4" if mp4_path.exists(): mp4_path.unlink() return True, None async def import_from_ipfs(self, ipfs_cid: str, actor_id: str) -> Tuple[Optional[str], Optional[str]]: """Import content from IPFS. Returns (cid, error).""" try: import ipfs_client # Download from IPFS legacy_dir = self.cache_dir / "legacy" legacy_dir.mkdir(parents=True, exist_ok=True) tmp_path = legacy_dir / f"import-{ipfs_cid[:16]}" if not ipfs_client.get_file(ipfs_cid, str(tmp_path)): return None, f"Could not fetch CID {ipfs_cid} from IPFS" # Detect media type before storing media_type = detect_media_type(tmp_path) # Store in cache cached, new_ipfs_cid = self.cache.put(tmp_path, node_type="import", move=True) cid = new_ipfs_cid or cached.cid # Prefer IPFS CID # Save to database with detected media type await self.db.create_cache_item(cid, new_ipfs_cid) await self.db.save_item_metadata( cid=cid, actor_id=actor_id, item_type=media_type, # Use detected type for filtering filename=f"ipfs-{ipfs_cid[:16]}" ) return cid, None except Exception as e: return None, f"Import failed: {e}" async def upload_content( self, content: bytes, filename: str, actor_id: str, ) -> Tuple[Optional[str], Optional[str], Optional[str]]: """Upload content to cache. Returns (cid, ipfs_cid, error).""" import tempfile try: # Write to temp file with tempfile.NamedTemporaryFile(delete=False) as tmp: tmp.write(content) tmp_path = Path(tmp.name) # Detect media type (video/image/audio) before moving file media_type = detect_media_type(tmp_path) # Store in cache (also stores in IPFS) cached, ipfs_cid = self.cache.put(tmp_path, node_type="upload", move=True) cid = ipfs_cid or cached.cid # Prefer IPFS CID # Save to database with media category type # Using media_type ("video", "image", "audio") not mime_type ("video/mp4") # so list_media filtering works correctly await self.db.create_cache_item(cid, ipfs_cid) await self.db.save_item_metadata( cid=cid, actor_id=actor_id, item_type=media_type, # Store media category for filtering filename=filename ) return cid, ipfs_cid, None except Exception as e: return None, None, f"Upload failed: {e}" async def list_media( self, actor_id: Optional[str] = None, username: Optional[str] = None, offset: int = 0, limit: int = 24, media_type: Optional[str] = None, ) -> List[Dict[str, Any]]: """List media items in cache.""" # Get items from database (uses item_types table) items = await self.db.get_user_items( actor_id=actor_id or username, item_type=media_type, # "video", "image", "audio", or None for all limit=limit, offset=offset, ) return items # Legacy compatibility methods def has_content(self, cid: str) -> bool: """Check if content exists in cache.""" return self.cache.has_content(cid) def get_ipfs_cid(self, cid: str) -> Optional[str]: """Get IPFS CID for cached content.""" return self.cache.get_ipfs_cid(cid)