# art-celery/cache_manager.py """ Cache management for Art DAG L1 server. Integrates artdag's Cache, ActivityStore, and ActivityManager to provide: - Content-addressed caching with both node_id and cid - Activity tracking for runs (input/output/intermediate relationships) - Deletion rules enforcement (shared items protected) - L2 ActivityPub integration for "shared" status checks - IPFS as durable backing store (local cache as hot storage) - Redis-backed indexes for multi-worker consistency """ import hashlib import json import logging import os import shutil from dataclasses import dataclass from datetime import datetime, timezone from pathlib import Path from typing import Callable, Dict, List, Optional, Set, TYPE_CHECKING import requests if TYPE_CHECKING: import redis from artdag import Cache, CacheEntry, DAG, Node, NodeType from artdag.activities import Activity, ActivityStore, ActivityManager, make_is_shared_fn import ipfs_client logger = logging.getLogger(__name__) def file_hash(path: Path, algorithm: str = "sha3_256") -> str: """Compute local content hash (fallback when IPFS unavailable).""" hasher = hashlib.new(algorithm) actual_path = path.resolve() if path.is_symlink() else path with open(actual_path, "rb") as f: for chunk in iter(lambda: f.read(65536), b""): hasher.update(chunk) return hasher.hexdigest() @dataclass class CachedFile: """ A cached file with both identifiers. Provides a unified view combining: - node_id: computation identity (for DAG caching) - cid: file content identity (for external references) """ node_id: str cid: str path: Path size_bytes: int node_type: str created_at: float @classmethod def from_cache_entry(cls, entry: CacheEntry) -> "CachedFile": return cls( node_id=entry.node_id, cid=entry.cid, path=entry.output_path, size_bytes=entry.size_bytes, node_type=entry.node_type, created_at=entry.created_at, ) class L2SharedChecker: """ Checks if content is shared (published) via L2 ActivityPub server. Caches results to avoid repeated API calls. """ def __init__(self, l2_server: str, cache_ttl: int = 300): self.l2_server = l2_server self.cache_ttl = cache_ttl self._cache: Dict[str, tuple[bool, float]] = {} def is_shared(self, cid: str) -> bool: """Check if cid has been published to L2.""" import time now = time.time() # Check cache if cid in self._cache: is_shared, cached_at = self._cache[cid] if now - cached_at < self.cache_ttl: logger.debug(f"L2 check (cached): {cid[:16]}... = {is_shared}") return is_shared # Query L2 try: url = f"{self.l2_server}/assets/by-hash/{cid}" logger.info(f"L2 check: GET {url}") resp = requests.get(url, timeout=5) logger.info(f"L2 check response: {resp.status_code}") is_shared = resp.status_code == 200 except Exception as e: logger.warning(f"Failed to check L2 for {cid}: {e}") # On error, assume IS shared (safer - prevents accidental deletion) is_shared = True self._cache[cid] = (is_shared, now) return is_shared def invalidate(self, cid: str): """Invalidate cache for a cid (call after publishing).""" self._cache.pop(cid, None) def mark_shared(self, cid: str): """Mark as shared without querying (call after successful publish).""" import time self._cache[cid] = (True, time.time()) class L1CacheManager: """ Unified cache manager for Art DAG L1 server. Combines: - artdag Cache for file storage - ActivityStore for run tracking - ActivityManager for deletion rules - L2 integration for shared status Provides both node_id and cid based access. """ def __init__( self, cache_dir: Path | str, l2_server: str = "http://localhost:8200", redis_client: Optional["redis.Redis"] = None, ): self.cache_dir = Path(cache_dir) self.cache_dir.mkdir(parents=True, exist_ok=True) # Redis for shared state between workers self._redis = redis_client self._redis_content_key = "artdag:content_index" self._redis_ipfs_key = "artdag:ipfs_index" # artdag components self.cache = Cache(self.cache_dir / "nodes") self.activity_store = ActivityStore(self.cache_dir / "activities") # L2 shared checker self.l2_checker = L2SharedChecker(l2_server) # Activity manager with L2-based is_shared self.activity_manager = ActivityManager( cache=self.cache, activity_store=self.activity_store, is_shared_fn=self._is_shared_by_node_id, ) # Content hash index: cid -> node_id # Uses Redis if available, falls back to in-memory dict self._content_index: Dict[str, str] = {} self._load_content_index() # IPFS CID index: cid -> ipfs_cid self._ipfs_cids: Dict[str, str] = {} self._load_ipfs_index() # Legacy files directory (for files uploaded directly by cid) self.legacy_dir = self.cache_dir / "legacy" self.legacy_dir.mkdir(parents=True, exist_ok=True) def _index_path(self) -> Path: return self.cache_dir / "content_index.json" def _load_content_index(self): """Load cid -> node_id index from Redis or JSON file.""" # If Redis available and has data, use it if self._redis: try: redis_data = self._redis.hgetall(self._redis_content_key) if redis_data: self._content_index = { k.decode() if isinstance(k, bytes) else k: v.decode() if isinstance(v, bytes) else v for k, v in redis_data.items() } logger.info(f"Loaded {len(self._content_index)} content index entries from Redis") return except Exception as e: logger.warning(f"Failed to load content index from Redis: {e}") # Fall back to JSON file if self._index_path().exists(): try: with open(self._index_path()) as f: self._content_index = json.load(f) except (json.JSONDecodeError, IOError) as e: logger.warning(f"Failed to load content index: {e}") self._content_index = {} # Also index from existing cache entries for entry in self.cache.list_entries(): if entry.cid: self._content_index[entry.cid] = entry.node_id # Migrate to Redis if available if self._redis and self._content_index: try: self._redis.hset(self._redis_content_key, mapping=self._content_index) logger.info(f"Migrated {len(self._content_index)} content index entries to Redis") except Exception as e: logger.warning(f"Failed to migrate content index to Redis: {e}") def _save_content_index(self): """Save cid -> node_id index to Redis and JSON file.""" # Always save to JSON as backup with open(self._index_path(), "w") as f: json.dump(self._content_index, f, indent=2) def _set_content_index(self, cid: str, node_id: str): """Set a single content index entry (Redis + in-memory).""" self._content_index[cid] = node_id if self._redis: try: self._redis.hset(self._redis_content_key, cid, node_id) except Exception as e: logger.warning(f"Failed to set content index in Redis: {e}") self._save_content_index() def _get_content_index(self, cid: str) -> Optional[str]: """Get a content index entry (Redis-first, then in-memory).""" if self._redis: try: val = self._redis.hget(self._redis_content_key, cid) if val: return val.decode() if isinstance(val, bytes) else val except Exception as e: logger.warning(f"Failed to get content index from Redis: {e}") return self._content_index.get(cid) def _del_content_index(self, cid: str): """Delete a content index entry.""" if cid in self._content_index: del self._content_index[cid] if self._redis: try: self._redis.hdel(self._redis_content_key, cid) except Exception as e: logger.warning(f"Failed to delete content index from Redis: {e}") self._save_content_index() def _ipfs_index_path(self) -> Path: return self.cache_dir / "ipfs_index.json" def _load_ipfs_index(self): """Load cid -> ipfs_cid index from Redis or JSON file.""" # If Redis available and has data, use it if self._redis: try: redis_data = self._redis.hgetall(self._redis_ipfs_key) if redis_data: self._ipfs_cids = { k.decode() if isinstance(k, bytes) else k: v.decode() if isinstance(v, bytes) else v for k, v in redis_data.items() } logger.info(f"Loaded {len(self._ipfs_cids)} IPFS index entries from Redis") return except Exception as e: logger.warning(f"Failed to load IPFS index from Redis: {e}") # Fall back to JSON file if self._ipfs_index_path().exists(): try: with open(self._ipfs_index_path()) as f: self._ipfs_cids = json.load(f) except (json.JSONDecodeError, IOError) as e: logger.warning(f"Failed to load IPFS index: {e}") self._ipfs_cids = {} # Migrate to Redis if available if self._redis and self._ipfs_cids: try: self._redis.hset(self._redis_ipfs_key, mapping=self._ipfs_cids) logger.info(f"Migrated {len(self._ipfs_cids)} IPFS index entries to Redis") except Exception as e: logger.warning(f"Failed to migrate IPFS index to Redis: {e}") def _save_ipfs_index(self): """Save cid -> ipfs_cid index to JSON file (backup).""" with open(self._ipfs_index_path(), "w") as f: json.dump(self._ipfs_cids, f, indent=2) def _set_ipfs_index(self, cid: str, ipfs_cid: str): """Set a single IPFS index entry (Redis + in-memory).""" self._ipfs_cids[cid] = ipfs_cid if self._redis: try: self._redis.hset(self._redis_ipfs_key, cid, ipfs_cid) except Exception as e: logger.warning(f"Failed to set IPFS index in Redis: {e}") self._save_ipfs_index() def _get_ipfs_cid_from_index(self, cid: str) -> Optional[str]: """Get IPFS CID from index (Redis-first, then in-memory).""" if self._redis: try: val = self._redis.hget(self._redis_ipfs_key, cid) if val: return val.decode() if isinstance(val, bytes) else val except Exception as e: logger.warning(f"Failed to get IPFS CID from Redis: {e}") return self._ipfs_cids.get(cid) def get_ipfs_cid(self, cid: str) -> Optional[str]: """Get IPFS CID for a content hash.""" return self._get_ipfs_cid_from_index(cid) def _is_shared_by_node_id(self, cid: str) -> bool: """Check if a cid is shared via L2.""" return self.l2_checker.is_shared(cid) def _load_meta(self, cid: str) -> dict: """Load metadata for a cached file.""" meta_path = self.cache_dir / f"{cid}.meta.json" if meta_path.exists(): with open(meta_path) as f: return json.load(f) return {} def is_pinned(self, cid: str) -> tuple[bool, str]: """ Check if a cid is pinned (non-deletable). Returns: (is_pinned, reason) tuple """ meta = self._load_meta(cid) if meta.get("pinned"): return True, meta.get("pin_reason", "published") return False, "" def _save_meta(self, cid: str, **updates) -> dict: """Save/update metadata for a cached file.""" meta = self._load_meta(cid) meta.update(updates) meta_path = self.cache_dir / f"{cid}.meta.json" with open(meta_path, "w") as f: json.dump(meta, f, indent=2) return meta def pin(self, cid: str, reason: str = "published") -> None: """Mark an item as pinned (non-deletable).""" self._save_meta(cid, pinned=True, pin_reason=reason) # ============ File Storage ============ def put( self, source_path: Path, node_type: str = "upload", node_id: str = None, execution_time: float = 0.0, move: bool = False, ) -> tuple[CachedFile, Optional[str]]: """ Store a file in the cache and upload to IPFS. Args: source_path: Path to file to cache node_type: Type of node (e.g., "upload", "source", "effect") node_id: Optional node_id; if not provided, uses CID execution_time: How long the operation took move: If True, move instead of copy Returns: Tuple of (CachedFile with both node_id and cid, CID) """ # Upload to IPFS first to get the CID (primary identifier) cid = ipfs_client.add_file(source_path) if not cid: # Fallback to local hash if IPFS unavailable cid = file_hash(source_path) logger.warning(f"IPFS unavailable, using local hash: {cid[:16]}...") # Use CID as node_id if not provided if node_id is None: node_id = cid # Check if already cached (by node_id) existing = self.cache.get_entry(node_id) if existing and existing.output_path.exists(): return CachedFile.from_cache_entry(existing), cid # Store in local cache self.cache.put( node_id=node_id, source_path=source_path, node_type=node_type, execution_time=execution_time, move=move, ) entry = self.cache.get_entry(node_id) # Update content index (CID -> node_id mapping) self._set_content_index(cid, node_id) # Also index by local hash if cid is an IPFS CID # This ensures both IPFS CID and local hash can be used to find the file if self._is_ipfs_cid(cid): local_hash = file_hash(source_path) if local_hash != cid: self._set_content_index(local_hash, node_id) logger.debug(f"Dual-indexed: {local_hash[:16]}... -> {node_id}") logger.info(f"Cached: {cid[:16]}...") return CachedFile.from_cache_entry(entry), cid def get_by_node_id(self, node_id: str) -> Optional[Path]: """Get cached file path by node_id.""" return self.cache.get(node_id) def _is_ipfs_cid(self, identifier: str) -> bool: """Check if identifier looks like an IPFS CID.""" # CIDv0 starts with "Qm", CIDv1 starts with "bafy" or other multibase prefixes return identifier.startswith("Qm") or identifier.startswith("bafy") or identifier.startswith("baf") def get_by_cid(self, cid: str) -> Optional[Path]: """Get cached file path by cid or IPFS CID. Falls back to IPFS if not in local cache.""" # Check index first (Redis then local) node_id = self._get_content_index(cid) if node_id: path = self.cache.get(node_id) if path and path.exists(): logger.debug(f" Found via index: {path}") return path # For uploads, node_id == cid, so try direct lookup # This works even if cache index hasn't been reloaded path = self.cache.get(cid) logger.debug(f" cache.get({cid[:16]}...) returned: {path}") if path and path.exists(): self._set_content_index(cid, cid) return path # Scan cache entries (fallback for new structure) entry = self.cache.find_by_cid(cid) if entry and entry.output_path.exists(): logger.debug(f" Found via scan: {entry.output_path}") self._set_content_index(cid, entry.node_id) return entry.output_path # Check legacy location (files stored directly as CACHE_DIR/{cid}) legacy_path = self.cache_dir / cid if legacy_path.exists() and legacy_path.is_file(): return legacy_path # Try to recover from IPFS if we have a CID ipfs_cid = self._get_ipfs_cid_from_index(cid) if ipfs_cid: logger.info(f"Recovering from IPFS: {cid[:16]}... ({ipfs_cid})") recovery_path = self.legacy_dir / cid if ipfs_client.get_file(ipfs_cid, recovery_path): logger.info(f"Recovered from IPFS: {recovery_path}") return recovery_path return None def has_content(self, cid: str) -> bool: """Check if content exists in cache.""" return self.get_by_cid(cid) is not None def get_entry_by_cid(self, cid: str) -> Optional[CacheEntry]: """Get cache entry by cid.""" node_id = self._get_content_index(cid) if node_id: return self.cache.get_entry(node_id) return self.cache.find_by_cid(cid) def list_all(self) -> List[CachedFile]: """List all cached files.""" files = [] seen_hashes = set() # New cache structure entries for entry in self.cache.list_entries(): files.append(CachedFile.from_cache_entry(entry)) if entry.cid: seen_hashes.add(entry.cid) # Legacy files stored directly in cache_dir (old structure) # These are files named by cid directly in CACHE_DIR for f in self.cache_dir.iterdir(): # Skip directories and special files if not f.is_file(): continue # Skip metadata/auxiliary files if f.suffix in ('.json', '.mp4'): continue # Skip if name doesn't look like a hash (64 hex chars) if len(f.name) != 64 or not all(c in '0123456789abcdef' for c in f.name): continue # Skip if already seen via new cache if f.name in seen_hashes: continue files.append(CachedFile( node_id=f.name, cid=f.name, path=f, size_bytes=f.stat().st_size, node_type="legacy", created_at=f.stat().st_mtime, )) seen_hashes.add(f.name) return files def list_by_type(self, node_type: str) -> List[str]: """ List CIDs of all cached files of a specific type. Args: node_type: Type to filter by (e.g., "recipe", "upload", "effect") Returns: List of CIDs (IPFS CID if available, otherwise node_id) """ cids = [] for entry in self.cache.list_entries(): if entry.node_type == node_type: # Return node_id which is the IPFS CID for uploaded content cids.append(entry.node_id) return cids # ============ Activity Tracking ============ def record_activity(self, dag: DAG, run_id: str = None) -> Activity: """ Record a DAG execution as an activity. Args: dag: The executed DAG run_id: Optional run ID to use as activity_id Returns: The created Activity """ activity = Activity.from_dag(dag, activity_id=run_id) self.activity_store.add(activity) return activity def record_simple_activity( self, input_hashes: List[str], output_cid: str, run_id: str = None, ) -> Activity: """ Record a simple (non-DAG) execution as an activity. For legacy single-effect runs that don't use full DAG execution. Uses cid as node_id. """ activity = Activity( activity_id=run_id or str(hash((tuple(input_hashes), output_cid))), input_ids=sorted(input_hashes), output_id=output_cid, intermediate_ids=[], created_at=datetime.now(timezone.utc).timestamp(), status="completed", ) self.activity_store.add(activity) return activity def get_activity(self, activity_id: str) -> Optional[Activity]: """Get activity by ID.""" return self.activity_store.get(activity_id) def list_activities(self) -> List[Activity]: """List all activities.""" return self.activity_store.list() def find_activities_by_inputs(self, input_hashes: List[str]) -> List[Activity]: """Find activities with matching inputs (for UI grouping).""" return self.activity_store.find_by_input_ids(input_hashes) # ============ Deletion Rules ============ def can_delete(self, cid: str) -> tuple[bool, str]: """ Check if a cached item can be deleted. Returns: (can_delete, reason) tuple """ # Check if pinned (published or input to published) pinned, reason = self.is_pinned(cid) if pinned: return False, f"Item is pinned ({reason})" # Find node_id for this content node_id = self._get_content_index(cid) or cid # Check if it's an input or output of any activity for activity in self.activity_store.list(): if node_id in activity.input_ids: return False, f"Item is input to activity {activity.activity_id}" if node_id == activity.output_id: return False, f"Item is output of activity {activity.activity_id}" return True, "OK" def can_discard_activity(self, activity_id: str) -> tuple[bool, str]: """ Check if an activity can be discarded. Returns: (can_discard, reason) tuple """ activity = self.activity_store.get(activity_id) if not activity: return False, "Activity not found" # Check if any item is pinned for node_id in activity.all_node_ids: entry = self.cache.get_entry(node_id) if entry: pinned, reason = self.is_pinned(entry.cid) if pinned: return False, f"Item {node_id} is pinned ({reason})" return True, "OK" def delete_by_cid(self, cid: str) -> tuple[bool, str]: """ Delete a cached item by cid. Enforces deletion rules. Returns: (success, message) tuple """ can_delete, reason = self.can_delete(cid) if not can_delete: return False, reason # Find and delete node_id = self._get_content_index(cid) if node_id: self.cache.remove(node_id) self._del_content_index(cid) return True, "Deleted" # Try legacy legacy_path = self.legacy_dir / cid if legacy_path.exists(): legacy_path.unlink() return True, "Deleted (legacy)" return False, "Not found" def discard_activity(self, activity_id: str) -> tuple[bool, str]: """ Discard an activity and clean up its cache entries. Enforces deletion rules. Returns: (success, message) tuple """ can_discard, reason = self.can_discard_activity(activity_id) if not can_discard: return False, reason success = self.activity_manager.discard_activity(activity_id) if success: return True, "Activity discarded" return False, "Failed to discard" def discard_activity_outputs_only(self, activity_id: str) -> tuple[bool, str]: """ Discard an activity, deleting only outputs and intermediates. Inputs (cache items, configs) are preserved. Returns: (success, message) tuple """ activity = self.activity_store.get(activity_id) if not activity: return False, "Activity not found" # Check if output is pinned if activity.output_id: entry = self.cache.get_entry(activity.output_id) if entry: pinned, reason = self.is_pinned(entry.cid) if pinned: return False, f"Output is pinned ({reason})" # Delete output if activity.output_id: entry = self.cache.get_entry(activity.output_id) if entry: # Remove from cache self.cache.remove(activity.output_id) # Remove from content index (Redis + local) self._del_content_index(entry.cid) # Delete from legacy dir if exists legacy_path = self.legacy_dir / entry.cid if legacy_path.exists(): legacy_path.unlink() # Delete intermediates for node_id in activity.intermediate_ids: entry = self.cache.get_entry(node_id) if entry: self.cache.remove(node_id) self._del_content_index(entry.cid) legacy_path = self.legacy_dir / entry.cid if legacy_path.exists(): legacy_path.unlink() # Remove activity record (inputs remain in cache) self.activity_store.remove(activity_id) return True, "Activity discarded (outputs only)" def cleanup_intermediates(self) -> int: """Delete all intermediate cache entries (reconstructible).""" return self.activity_manager.cleanup_intermediates() def get_deletable_items(self) -> List[CachedFile]: """Get all items that can be deleted.""" deletable = [] for entry in self.activity_manager.get_deletable_entries(): deletable.append(CachedFile.from_cache_entry(entry)) return deletable # ============ L2 Integration ============ def mark_published(self, cid: str): """Mark a cid as published to L2.""" self.l2_checker.mark_shared(cid) def invalidate_shared_cache(self, cid: str): """Invalidate shared status cache (call if item might be unpublished).""" self.l2_checker.invalidate(cid) # ============ Stats ============ def get_stats(self) -> dict: """Get cache statistics.""" stats = self.cache.get_stats() return { "total_entries": stats.total_entries, "total_size_bytes": stats.total_size_bytes, "hits": stats.hits, "misses": stats.misses, "hit_rate": stats.hit_rate, "activities": len(self.activity_store), } # Singleton instance (initialized on first import with env vars) _manager: Optional[L1CacheManager] = None def get_cache_manager() -> L1CacheManager: """Get the singleton cache manager instance.""" global _manager if _manager is None: import redis from urllib.parse import urlparse cache_dir = Path(os.environ.get("CACHE_DIR", str(Path.home() / ".artdag" / "cache"))) l2_server = os.environ.get("L2_SERVER", "http://localhost:8200") # Initialize Redis client for shared cache index redis_url = os.environ.get('REDIS_URL', 'redis://localhost:6379/5') parsed = urlparse(redis_url) redis_client = redis.Redis( host=parsed.hostname or 'localhost', port=parsed.port or 6379, db=int(parsed.path.lstrip('/') or 0), socket_timeout=5, socket_connect_timeout=5 ) _manager = L1CacheManager(cache_dir=cache_dir, l2_server=l2_server, redis_client=redis_client) return _manager def reset_cache_manager(): """Reset the singleton (for testing).""" global _manager _manager = None