# art-celery/cache_manager.py """ Cache management for Art DAG L1 server. Integrates artdag's Cache, ActivityStore, and ActivityManager to provide: - Content-addressed caching with both node_id and cid - Activity tracking for runs (input/output/intermediate relationships) - Deletion rules enforcement (shared items protected) - L2 ActivityPub integration for "shared" status checks - IPFS as durable backing store (local cache as hot storage) - Redis-backed indexes for multi-worker consistency """ import hashlib import json import logging import os import shutil from dataclasses import dataclass from datetime import datetime, timezone from pathlib import Path from typing import Callable, Dict, List, Optional, Set, TYPE_CHECKING import requests if TYPE_CHECKING: import redis from artdag import Cache, CacheEntry, DAG, Node, NodeType from artdag.activities import Activity, ActivityStore, ActivityManager, make_is_shared_fn import ipfs_client logger = logging.getLogger(__name__) def file_hash(path: Path, algorithm: str = "sha3_256") -> str: """Compute local content hash (fallback when IPFS unavailable).""" hasher = hashlib.new(algorithm) actual_path = path.resolve() if path.is_symlink() else path with open(actual_path, "rb") as f: for chunk in iter(lambda: f.read(65536), b""): hasher.update(chunk) return hasher.hexdigest() @dataclass class CachedFile: """ A cached file with both identifiers. Provides a unified view combining: - node_id: computation identity (for DAG caching) - cid: file content identity (for external references) """ node_id: str cid: str path: Path size_bytes: int node_type: str created_at: float @classmethod def from_cache_entry(cls, entry: CacheEntry) -> "CachedFile": return cls( node_id=entry.node_id, cid=entry.cid, path=entry.output_path, size_bytes=entry.size_bytes, node_type=entry.node_type, created_at=entry.created_at, ) class L2SharedChecker: """ Checks if content is shared (published) via L2 ActivityPub server. Caches results to avoid repeated API calls. """ def __init__(self, l2_server: str, cache_ttl: int = 300): self.l2_server = l2_server self.cache_ttl = cache_ttl self._cache: Dict[str, tuple[bool, float]] = {} def is_shared(self, cid: str) -> bool: """Check if cid has been published to L2.""" import time now = time.time() # Check cache if cid in self._cache: is_shared, cached_at = self._cache[cid] if now - cached_at < self.cache_ttl: logger.debug(f"L2 check (cached): {cid[:16]}... = {is_shared}") return is_shared # Query L2 try: url = f"{self.l2_server}/assets/by-hash/{cid}" logger.info(f"L2 check: GET {url}") resp = requests.get(url, timeout=5) logger.info(f"L2 check response: {resp.status_code}") is_shared = resp.status_code == 200 except Exception as e: logger.warning(f"Failed to check L2 for {cid}: {e}") # On error, assume IS shared (safer - prevents accidental deletion) is_shared = True self._cache[cid] = (is_shared, now) return is_shared def invalidate(self, cid: str): """Invalidate cache for a cid (call after publishing).""" self._cache.pop(cid, None) def mark_shared(self, cid: str): """Mark as shared without querying (call after successful publish).""" import time self._cache[cid] = (True, time.time()) class L1CacheManager: """ Unified cache manager for Art DAG L1 server. Combines: - artdag Cache for file storage - ActivityStore for run tracking - ActivityManager for deletion rules - L2 integration for shared status Provides both node_id and cid based access. """ def __init__( self, cache_dir: Path | str, l2_server: str = "http://localhost:8200", redis_client: Optional["redis.Redis"] = None, ): self.cache_dir = Path(cache_dir) self.cache_dir.mkdir(parents=True, exist_ok=True) # Redis for shared state between workers self._redis = redis_client self._redis_content_key = "artdag:content_index" self._redis_ipfs_key = "artdag:ipfs_index" # artdag components self.cache = Cache(self.cache_dir / "nodes") self.activity_store = ActivityStore(self.cache_dir / "activities") # L2 shared checker self.l2_checker = L2SharedChecker(l2_server) # Activity manager with L2-based is_shared self.activity_manager = ActivityManager( cache=self.cache, activity_store=self.activity_store, is_shared_fn=self._is_shared_by_node_id, ) # Legacy files directory (for files uploaded directly by cid) self.legacy_dir = self.cache_dir / "legacy" self.legacy_dir.mkdir(parents=True, exist_ok=True) # ============ Redis Index (no JSON files) ============ # # Content index maps: CID (content hash or IPFS CID) -> node_id (code hash) # IPFS index maps: node_id -> IPFS CID # # Database is the ONLY source of truth for cache_id -> ipfs_cid mapping. # No fallbacks - failures raise exceptions. def _run_async(self, coro): """Run async coroutine from sync context. Always creates a fresh event loop to avoid issues with Celery's prefork workers where loops may be closed by previous tasks. """ import asyncio # Check if we're already in an async context try: asyncio.get_running_loop() # We're in an async context - use a thread with its own loop import threading result = [None] error = [None] def run_in_thread(): try: new_loop = asyncio.new_event_loop() asyncio.set_event_loop(new_loop) try: result[0] = new_loop.run_until_complete(coro) finally: new_loop.close() except Exception as e: error[0] = e thread = threading.Thread(target=run_in_thread) thread.start() thread.join(timeout=30) if error[0]: raise error[0] return result[0] except RuntimeError: # No running loop - create a fresh one (don't reuse potentially closed loops) loop = asyncio.new_event_loop() asyncio.set_event_loop(loop) try: return loop.run_until_complete(coro) finally: loop.close() def _set_content_index(self, cache_id: str, ipfs_cid: str): """Set content index entry in database (cache_id -> ipfs_cid).""" import database async def save_to_db(): import asyncpg conn = await asyncpg.connect(database.DATABASE_URL) try: await conn.execute( """ INSERT INTO cache_items (cid, ipfs_cid) VALUES ($1, $2) ON CONFLICT (cid) DO UPDATE SET ipfs_cid = $2 """, cache_id, ipfs_cid ) finally: await conn.close() self._run_async(save_to_db()) logger.info(f"Indexed in database: {cache_id[:16]}... -> {ipfs_cid}") def _get_content_index(self, cache_id: str) -> Optional[str]: """Get content index entry (cache_id -> ipfs_cid) from database.""" import database async def get_from_db(): import asyncpg conn = await asyncpg.connect(database.DATABASE_URL) try: row = await conn.fetchrow( "SELECT ipfs_cid FROM cache_items WHERE cid = $1", cache_id ) return {"ipfs_cid": row["ipfs_cid"]} if row else None finally: await conn.close() result = self._run_async(get_from_db()) if result and result.get("ipfs_cid"): return result["ipfs_cid"] return None def _del_content_index(self, cache_id: str): """Delete content index entry from database.""" import database async def delete_from_db(): import asyncpg conn = await asyncpg.connect(database.DATABASE_URL) try: await conn.execute("DELETE FROM cache_items WHERE cid = $1", cache_id) finally: await conn.close() self._run_async(delete_from_db()) def _set_ipfs_index(self, cid: str, ipfs_cid: str): """Set IPFS index entry in Redis.""" if self._redis: try: self._redis.hset(self._redis_ipfs_key, cid, ipfs_cid) except Exception as e: logger.warning(f"Failed to set IPFS index in Redis: {e}") def _get_ipfs_cid_from_index(self, cid: str) -> Optional[str]: """Get IPFS CID from Redis.""" if self._redis: try: val = self._redis.hget(self._redis_ipfs_key, cid) if val: return val.decode() if isinstance(val, bytes) else val except Exception as e: logger.warning(f"Failed to get IPFS CID from Redis: {e}") return None def get_ipfs_cid(self, cid: str) -> Optional[str]: """Get IPFS CID for a content hash.""" return self._get_ipfs_cid_from_index(cid) def _is_shared_by_node_id(self, cid: str) -> bool: """Check if a cid is shared via L2.""" return self.l2_checker.is_shared(cid) def _load_meta(self, cid: str) -> dict: """Load metadata for a cached file.""" meta_path = self.cache_dir / f"{cid}.meta.json" if meta_path.exists(): with open(meta_path) as f: return json.load(f) return {} def is_pinned(self, cid: str) -> tuple[bool, str]: """ Check if a cid is pinned (non-deletable). Returns: (is_pinned, reason) tuple """ meta = self._load_meta(cid) if meta.get("pinned"): return True, meta.get("pin_reason", "published") return False, "" def _save_meta(self, cid: str, **updates) -> dict: """Save/update metadata for a cached file.""" meta = self._load_meta(cid) meta.update(updates) meta_path = self.cache_dir / f"{cid}.meta.json" with open(meta_path, "w") as f: json.dump(meta, f, indent=2) return meta def pin(self, cid: str, reason: str = "published") -> None: """Mark an item as pinned (non-deletable).""" self._save_meta(cid, pinned=True, pin_reason=reason) # ============ File Storage ============ def put( self, source_path: Path, node_type: str = "upload", node_id: str = None, cache_id: str = None, execution_time: float = 0.0, move: bool = False, skip_ipfs: bool = False, ) -> tuple[CachedFile, Optional[str]]: """ Store a file in the cache and optionally upload to IPFS. Files are stored by IPFS CID when skip_ipfs=False (default), or by local content hash when skip_ipfs=True. The cache_id parameter creates an index from cache_id -> CID for code-addressed lookups. Args: source_path: Path to file to cache node_type: Type of node (e.g., "upload", "source", "effect") node_id: DEPRECATED - ignored, always uses CID cache_id: Optional code-addressed cache ID to index execution_time: How long the operation took move: If True, move instead of copy skip_ipfs: If True, skip IPFS upload and use local hash (faster for large files) Returns: Tuple of (CachedFile with both node_id and cid, CID or None if skip_ipfs) """ if skip_ipfs: # Use local content hash instead of IPFS CID (much faster) cid = file_hash(source_path) ipfs_cid = None logger.info(f"put: Using local hash (skip_ipfs=True): {cid[:16]}...") else: # Upload to IPFS first to get the CID (primary identifier) cid = ipfs_client.add_file(source_path) if not cid: raise RuntimeError(f"IPFS upload failed for {source_path}. IPFS is required.") ipfs_cid = cid # Always store by IPFS CID (node_id parameter is deprecated) node_id = cid # Check if already cached (by node_id) existing = self.cache.get_entry(node_id) if existing and existing.output_path.exists(): return CachedFile.from_cache_entry(existing), ipfs_cid # Compute local hash BEFORE moving the file (for dual-indexing) # Only needed if we uploaded to IPFS (to map local hash -> IPFS CID) local_hash = None if not skip_ipfs and self._is_ipfs_cid(cid): local_hash = file_hash(source_path) # Store in local cache logger.info(f"put: Storing in cache with node_id={node_id[:16]}...") self.cache.put( node_id=node_id, source_path=source_path, node_type=node_type, execution_time=execution_time, move=move, ) entry = self.cache.get_entry(node_id) logger.info(f"put: After cache.put, get_entry(node_id={node_id[:16]}...) returned entry={entry is not None}, path={entry.output_path if entry else None}") # Verify we can retrieve it verify_path = self.cache.get(node_id) logger.info(f"put: Verify cache.get(node_id={node_id[:16]}...) = {verify_path}") # Index by cache_id if provided (code-addressed cache lookup) # This allows get_by_cid(cache_id) to find files stored by IPFS CID if cache_id and cache_id != cid: self._set_content_index(cache_id, cid) logger.info(f"put: Indexed cache_id {cache_id[:16]}... -> IPFS {cid}") # Also index by local hash for content-based lookup if local_hash and local_hash != cid: self._set_content_index(local_hash, cid) logger.debug(f"Indexed local hash {local_hash[:16]}... -> IPFS {cid}") logger.info(f"Cached: {cid[:16]}..." + (" (local only)" if skip_ipfs else " (IPFS)")) return CachedFile.from_cache_entry(entry), ipfs_cid if not skip_ipfs else None def get_by_node_id(self, node_id: str) -> Optional[Path]: """Get cached file path by node_id.""" return self.cache.get(node_id) def _is_ipfs_cid(self, identifier: str) -> bool: """Check if identifier looks like an IPFS CID.""" # CIDv0 starts with "Qm", CIDv1 starts with "bafy" or other multibase prefixes return identifier.startswith("Qm") or identifier.startswith("bafy") or identifier.startswith("baf") def get_by_cid(self, cid: str) -> Optional[Path]: """Get cached file path by cid or IPFS CID. Falls back to IPFS if not in local cache.""" logger.info(f"get_by_cid: Looking for cid={cid[:16]}...") # Check index first (Redis then local) node_id = self._get_content_index(cid) logger.info(f"get_by_cid: Index lookup returned node_id={node_id[:16] if node_id else None}...") if node_id: path = self.cache.get(node_id) logger.info(f"get_by_cid: cache.get(node_id={node_id[:16]}...) returned path={path}") if path and path.exists(): logger.info(f"get_by_cid: Found via index: {path}") return path # artdag Cache doesn't know about entry - check filesystem directly # Files are stored at {cache_dir}/nodes/{node_id}/output.* nodes_dir = self.cache_dir / "nodes" / node_id if nodes_dir.exists(): for f in nodes_dir.iterdir(): if f.name.startswith("output."): logger.info(f"get_by_cid: Found on filesystem: {f}") return f # For uploads, node_id == cid, so try direct lookup # This works even if cache index hasn't been reloaded path = self.cache.get(cid) logger.info(f"get_by_cid: Direct cache.get({cid[:16]}...) returned: {path}") if path and path.exists(): self._set_content_index(cid, cid) return path # Check filesystem directly for cid as node_id nodes_dir = self.cache_dir / "nodes" / cid if nodes_dir.exists(): for f in nodes_dir.iterdir(): if f.name.startswith("output."): logger.info(f"get_by_cid: Found on filesystem (direct): {f}") self._set_content_index(cid, cid) return f # Scan cache entries (fallback for new structure) entry = self.cache.find_by_cid(cid) logger.info(f"get_by_cid: find_by_cid({cid[:16]}...) returned entry={entry}") if entry and entry.output_path.exists(): logger.info(f"get_by_cid: Found via scan: {entry.output_path}") self._set_content_index(cid, entry.node_id) return entry.output_path # Check legacy location (files stored directly as CACHE_DIR/{cid}) legacy_path = self.cache_dir / cid logger.info(f"get_by_cid: Checking legacy path: {legacy_path} exists={legacy_path.exists()}") if legacy_path.exists() and legacy_path.is_file(): logger.info(f"get_by_cid: Found at legacy path: {legacy_path}") return legacy_path # Fetch from IPFS - this is the source of truth for all content if self._is_ipfs_cid(cid): logger.info(f"get_by_cid: Fetching from IPFS: {cid[:16]}...") recovery_path = self.legacy_dir / cid recovery_path.parent.mkdir(parents=True, exist_ok=True) if ipfs_client.get_file(cid, str(recovery_path)): logger.info(f"get_by_cid: Fetched from IPFS: {recovery_path}") self._set_content_index(cid, cid) return recovery_path else: logger.warning(f"get_by_cid: IPFS fetch failed for {cid[:16]}...") # Also try with a mapped IPFS CID if different from cid ipfs_cid = self._get_ipfs_cid_from_index(cid) if ipfs_cid and ipfs_cid != cid: logger.info(f"get_by_cid: Fetching from IPFS via mapping: {ipfs_cid[:16]}...") recovery_path = self.legacy_dir / cid recovery_path.parent.mkdir(parents=True, exist_ok=True) if ipfs_client.get_file(ipfs_cid, str(recovery_path)): logger.info(f"get_by_cid: Fetched from IPFS: {recovery_path}") return recovery_path return None def has_content(self, cid: str) -> bool: """Check if content exists in cache.""" return self.get_by_cid(cid) is not None def get_entry_by_cid(self, cid: str) -> Optional[CacheEntry]: """Get cache entry by cid.""" node_id = self._get_content_index(cid) if node_id: return self.cache.get_entry(node_id) return self.cache.find_by_cid(cid) def list_all(self) -> List[CachedFile]: """List all cached files.""" files = [] seen_hashes = set() # New cache structure entries for entry in self.cache.list_entries(): files.append(CachedFile.from_cache_entry(entry)) if entry.cid: seen_hashes.add(entry.cid) # Legacy files stored directly in cache_dir (old structure) # These are files named by cid directly in CACHE_DIR for f in self.cache_dir.iterdir(): # Skip directories and special files if not f.is_file(): continue # Skip metadata/auxiliary files if f.suffix in ('.json', '.mp4'): continue # Skip if name doesn't look like a hash (64 hex chars) if len(f.name) != 64 or not all(c in '0123456789abcdef' for c in f.name): continue # Skip if already seen via new cache if f.name in seen_hashes: continue files.append(CachedFile( node_id=f.name, cid=f.name, path=f, size_bytes=f.stat().st_size, node_type="legacy", created_at=f.stat().st_mtime, )) seen_hashes.add(f.name) return files def list_by_type(self, node_type: str) -> List[str]: """ List CIDs of all cached files of a specific type. Args: node_type: Type to filter by (e.g., "recipe", "upload", "effect") Returns: List of CIDs (IPFS CID if available, otherwise node_id) """ cids = [] for entry in self.cache.list_entries(): if entry.node_type == node_type: # Return node_id which is the IPFS CID for uploaded content cids.append(entry.node_id) return cids # ============ Activity Tracking ============ def record_activity(self, dag: DAG, run_id: str = None) -> Activity: """ Record a DAG execution as an activity. Args: dag: The executed DAG run_id: Optional run ID to use as activity_id Returns: The created Activity """ activity = Activity.from_dag(dag, activity_id=run_id) self.activity_store.add(activity) return activity def record_simple_activity( self, input_hashes: List[str], output_cid: str, run_id: str = None, ) -> Activity: """ Record a simple (non-DAG) execution as an activity. For legacy single-effect runs that don't use full DAG execution. Uses cid as node_id. """ activity = Activity( activity_id=run_id or str(hash((tuple(input_hashes), output_cid))), input_ids=sorted(input_hashes), output_id=output_cid, intermediate_ids=[], created_at=datetime.now(timezone.utc).timestamp(), status="completed", ) self.activity_store.add(activity) return activity def get_activity(self, activity_id: str) -> Optional[Activity]: """Get activity by ID.""" return self.activity_store.get(activity_id) def list_activities(self) -> List[Activity]: """List all activities.""" return self.activity_store.list() def find_activities_by_inputs(self, input_hashes: List[str]) -> List[Activity]: """Find activities with matching inputs (for UI grouping).""" return self.activity_store.find_by_input_ids(input_hashes) # ============ Deletion Rules ============ def can_delete(self, cid: str) -> tuple[bool, str]: """ Check if a cached item can be deleted. Returns: (can_delete, reason) tuple """ # Check if pinned (published or input to published) pinned, reason = self.is_pinned(cid) if pinned: return False, f"Item is pinned ({reason})" # Find node_id for this content node_id = self._get_content_index(cid) or cid # Check if it's an input or output of any activity for activity in self.activity_store.list(): if node_id in activity.input_ids: return False, f"Item is input to activity {activity.activity_id}" if node_id == activity.output_id: return False, f"Item is output of activity {activity.activity_id}" return True, "OK" def can_discard_activity(self, activity_id: str) -> tuple[bool, str]: """ Check if an activity can be discarded. Returns: (can_discard, reason) tuple """ activity = self.activity_store.get(activity_id) if not activity: return False, "Activity not found" # Check if any item is pinned for node_id in activity.all_node_ids: entry = self.cache.get_entry(node_id) if entry: pinned, reason = self.is_pinned(entry.cid) if pinned: return False, f"Item {node_id} is pinned ({reason})" return True, "OK" def delete_by_cid(self, cid: str) -> tuple[bool, str]: """ Delete a cached item by cid. Enforces deletion rules. Returns: (success, message) tuple """ can_delete, reason = self.can_delete(cid) if not can_delete: return False, reason # Find and delete node_id = self._get_content_index(cid) if node_id: self.cache.remove(node_id) self._del_content_index(cid) return True, "Deleted" # Try legacy legacy_path = self.legacy_dir / cid if legacy_path.exists(): legacy_path.unlink() return True, "Deleted (legacy)" return False, "Not found" def discard_activity(self, activity_id: str) -> tuple[bool, str]: """ Discard an activity and clean up its cache entries. Enforces deletion rules. Returns: (success, message) tuple """ can_discard, reason = self.can_discard_activity(activity_id) if not can_discard: return False, reason success = self.activity_manager.discard_activity(activity_id) if success: return True, "Activity discarded" return False, "Failed to discard" def _is_used_by_other_activities(self, node_id: str, exclude_activity_id: str) -> bool: """Check if a node is used by any activity other than the excluded one.""" for other_activity in self.activity_store.list(): if other_activity.activity_id == exclude_activity_id: continue # Check if used as input, output, or intermediate if node_id in other_activity.input_ids: return True if node_id == other_activity.output_id: return True if node_id in other_activity.intermediate_ids: return True return False def discard_activity_outputs_only(self, activity_id: str) -> tuple[bool, str]: """ Discard an activity, deleting only outputs and intermediates. Inputs (cache items, configs) are preserved. Outputs/intermediates used by other activities are preserved. Returns: (success, message) tuple """ activity = self.activity_store.get(activity_id) if not activity: return False, "Activity not found" # Check if output is pinned if activity.output_id: entry = self.cache.get_entry(activity.output_id) if entry: pinned, reason = self.is_pinned(entry.cid) if pinned: return False, f"Output is pinned ({reason})" deleted_outputs = 0 preserved_shared = 0 # Delete output (only if not used by other activities) if activity.output_id: if self._is_used_by_other_activities(activity.output_id, activity_id): preserved_shared += 1 else: entry = self.cache.get_entry(activity.output_id) if entry: # Remove from cache self.cache.remove(activity.output_id) # Remove from content index (Redis + local) self._del_content_index(entry.cid) # Delete from legacy dir if exists legacy_path = self.legacy_dir / entry.cid if legacy_path.exists(): legacy_path.unlink() deleted_outputs += 1 # Delete intermediates (only if not used by other activities) for node_id in activity.intermediate_ids: if self._is_used_by_other_activities(node_id, activity_id): preserved_shared += 1 continue entry = self.cache.get_entry(node_id) if entry: self.cache.remove(node_id) self._del_content_index(entry.cid) legacy_path = self.legacy_dir / entry.cid if legacy_path.exists(): legacy_path.unlink() deleted_outputs += 1 # Remove activity record (inputs remain in cache) self.activity_store.remove(activity_id) msg = f"Activity discarded (deleted {deleted_outputs} outputs" if preserved_shared > 0: msg += f", preserved {preserved_shared} shared items" msg += ")" return True, msg def cleanup_intermediates(self) -> int: """Delete all intermediate cache entries (reconstructible).""" return self.activity_manager.cleanup_intermediates() def get_deletable_items(self) -> List[CachedFile]: """Get all items that can be deleted.""" deletable = [] for entry in self.activity_manager.get_deletable_entries(): deletable.append(CachedFile.from_cache_entry(entry)) return deletable # ============ L2 Integration ============ def mark_published(self, cid: str): """Mark a cid as published to L2.""" self.l2_checker.mark_shared(cid) def invalidate_shared_cache(self, cid: str): """Invalidate shared status cache (call if item might be unpublished).""" self.l2_checker.invalidate(cid) # ============ Stats ============ def get_stats(self) -> dict: """Get cache statistics.""" stats = self.cache.get_stats() return { "total_entries": stats.total_entries, "total_size_bytes": stats.total_size_bytes, "hits": stats.hits, "misses": stats.misses, "hit_rate": stats.hit_rate, "activities": len(self.activity_store), } # Singleton instance (initialized on first import with env vars) _manager: Optional[L1CacheManager] = None def get_cache_manager() -> L1CacheManager: """Get the singleton cache manager instance.""" global _manager if _manager is None: import redis from urllib.parse import urlparse cache_dir = Path(os.environ.get("CACHE_DIR", str(Path.home() / ".artdag" / "cache"))) l2_server = os.environ.get("L2_SERVER", "http://localhost:8200") # Initialize Redis client for shared cache index redis_url = os.environ.get('REDIS_URL', 'redis://localhost:6379/5') parsed = urlparse(redis_url) redis_client = redis.Redis( host=parsed.hostname or 'localhost', port=parsed.port or 6379, db=int(parsed.path.lstrip('/') or 0), socket_timeout=5, socket_connect_timeout=5 ) _manager = L1CacheManager(cache_dir=cache_dir, l2_server=l2_server, redis_client=redis_client) return _manager def reset_cache_manager(): """Reset the singleton (for testing).""" global _manager _manager = None