diff --git a/cache_manager.py b/cache_manager.py new file mode 100644 index 0000000..5f4f3fa --- /dev/null +++ b/cache_manager.py @@ -0,0 +1,494 @@ +# art-celery/cache_manager.py +""" +Cache management for Art DAG L1 server. + +Integrates artdag's Cache, ActivityStore, and ActivityManager to provide: +- Content-addressed caching with both node_id and content_hash +- Activity tracking for runs (input/output/intermediate relationships) +- Deletion rules enforcement (shared items protected) +- L2 ActivityPub integration for "shared" status checks +""" + +import hashlib +import json +import logging +import os +import shutil +from dataclasses import dataclass +from datetime import datetime, timezone +from pathlib import Path +from typing import Callable, Dict, List, Optional, Set + +import requests + +from artdag import Cache, CacheEntry, DAG, Node, NodeType +from artdag.activities import Activity, ActivityStore, ActivityManager, make_is_shared_fn + +logger = logging.getLogger(__name__) + + +def file_hash(path: Path, algorithm: str = "sha3_256") -> str: + """Compute SHA3-256 hash of a file.""" + hasher = hashlib.new(algorithm) + actual_path = path.resolve() if path.is_symlink() else path + with open(actual_path, "rb") as f: + for chunk in iter(lambda: f.read(65536), b""): + hasher.update(chunk) + return hasher.hexdigest() + + +@dataclass +class CachedFile: + """ + A cached file with both identifiers. + + Provides a unified view combining: + - node_id: computation identity (for DAG caching) + - content_hash: file content identity (for external references) + """ + node_id: str + content_hash: str + path: Path + size_bytes: int + node_type: str + created_at: float + + @classmethod + def from_cache_entry(cls, entry: CacheEntry) -> "CachedFile": + return cls( + node_id=entry.node_id, + content_hash=entry.content_hash, + path=entry.output_path, + size_bytes=entry.size_bytes, + node_type=entry.node_type, + created_at=entry.created_at, + ) + + +class L2SharedChecker: + """ + Checks if content is shared (published) via L2 ActivityPub server. + + Caches results to avoid repeated API calls. + """ + + def __init__(self, l2_server: str, cache_ttl: int = 300): + self.l2_server = l2_server + self.cache_ttl = cache_ttl + self._cache: Dict[str, tuple[bool, float]] = {} + + def is_shared(self, content_hash: str) -> bool: + """Check if content_hash has been published to L2.""" + import time + now = time.time() + + # Check cache + if content_hash in self._cache: + is_shared, cached_at = self._cache[content_hash] + if now - cached_at < self.cache_ttl: + return is_shared + + # Query L2 + try: + resp = requests.get( + f"{self.l2_server}/registry/by-hash/{content_hash}", + timeout=5 + ) + is_shared = resp.status_code == 200 + except Exception as e: + logger.warning(f"Failed to check L2 for {content_hash}: {e}") + # On error, assume not shared (safer for deletion) + is_shared = False + + self._cache[content_hash] = (is_shared, now) + return is_shared + + def invalidate(self, content_hash: str): + """Invalidate cache for a content_hash (call after publishing).""" + self._cache.pop(content_hash, None) + + def mark_shared(self, content_hash: str): + """Mark as shared without querying (call after successful publish).""" + import time + self._cache[content_hash] = (True, time.time()) + + +class L1CacheManager: + """ + Unified cache manager for Art DAG L1 server. + + Combines: + - artdag Cache for file storage + - ActivityStore for run tracking + - ActivityManager for deletion rules + - L2 integration for shared status + + Provides both node_id and content_hash based access. + """ + + def __init__( + self, + cache_dir: Path | str, + l2_server: str = "http://localhost:8200", + ): + self.cache_dir = Path(cache_dir) + self.cache_dir.mkdir(parents=True, exist_ok=True) + + # artdag components + self.cache = Cache(self.cache_dir / "nodes") + self.activity_store = ActivityStore(self.cache_dir / "activities") + + # L2 shared checker + self.l2_checker = L2SharedChecker(l2_server) + + # Activity manager with L2-based is_shared + self.activity_manager = ActivityManager( + cache=self.cache, + activity_store=self.activity_store, + is_shared_fn=self._is_shared_by_node_id, + ) + + # Content hash index: content_hash -> node_id + # This enables lookup by content_hash for API compatibility + self._content_index: Dict[str, str] = {} + self._load_content_index() + + # Legacy files directory (for files uploaded directly by content_hash) + self.legacy_dir = self.cache_dir / "legacy" + self.legacy_dir.mkdir(parents=True, exist_ok=True) + + def _index_path(self) -> Path: + return self.cache_dir / "content_index.json" + + def _load_content_index(self): + """Load content_hash -> node_id index.""" + if self._index_path().exists(): + try: + with open(self._index_path()) as f: + self._content_index = json.load(f) + except (json.JSONDecodeError, IOError) as e: + logger.warning(f"Failed to load content index: {e}") + self._content_index = {} + + # Also index from existing cache entries + for entry in self.cache.list_entries(): + if entry.content_hash: + self._content_index[entry.content_hash] = entry.node_id + + def _save_content_index(self): + """Save content_hash -> node_id index.""" + with open(self._index_path(), "w") as f: + json.dump(self._content_index, f, indent=2) + + def _is_shared_by_node_id(self, content_hash: str) -> bool: + """Check if a content_hash is shared via L2.""" + return self.l2_checker.is_shared(content_hash) + + # ============ File Storage ============ + + def put( + self, + source_path: Path, + node_type: str = "upload", + node_id: str = None, + execution_time: float = 0.0, + move: bool = False, + ) -> CachedFile: + """ + Store a file in the cache. + + Args: + source_path: Path to file to cache + node_type: Type of node (e.g., "upload", "source", "effect") + node_id: Optional node_id; if not provided, uses content_hash + execution_time: How long the operation took + move: If True, move instead of copy + + Returns: + CachedFile with both node_id and content_hash + """ + # Compute content hash first + content_hash = file_hash(source_path) + + # Use content_hash as node_id if not provided + # This is for legacy/uploaded files that don't have a DAG node + if node_id is None: + node_id = content_hash + + # Check if already cached (by node_id) + existing = self.cache.get_entry(node_id) + if existing and existing.output_path.exists(): + return CachedFile.from_cache_entry(existing) + + # Store in cache + self.cache.put( + node_id=node_id, + source_path=source_path, + node_type=node_type, + execution_time=execution_time, + move=move, + ) + + entry = self.cache.get_entry(node_id) + + # Update content index + self._content_index[entry.content_hash] = node_id + self._save_content_index() + + return CachedFile.from_cache_entry(entry) + + def get_by_node_id(self, node_id: str) -> Optional[Path]: + """Get cached file path by node_id.""" + return self.cache.get(node_id) + + def get_by_content_hash(self, content_hash: str) -> Optional[Path]: + """Get cached file path by content_hash.""" + # Check index first + node_id = self._content_index.get(content_hash) + if node_id: + path = self.cache.get(node_id) + if path: + return path + + # Check legacy directory + legacy_path = self.legacy_dir / content_hash + if legacy_path.exists(): + return legacy_path + + # Scan cache entries (fallback) + entry = self.cache.find_by_content_hash(content_hash) + if entry: + self._content_index[content_hash] = entry.node_id + self._save_content_index() + return entry.output_path + + return None + + def has_content(self, content_hash: str) -> bool: + """Check if content exists in cache.""" + return self.get_by_content_hash(content_hash) is not None + + def get_entry_by_content_hash(self, content_hash: str) -> Optional[CacheEntry]: + """Get cache entry by content_hash.""" + node_id = self._content_index.get(content_hash) + if node_id: + return self.cache.get_entry(node_id) + return self.cache.find_by_content_hash(content_hash) + + def list_all(self) -> List[CachedFile]: + """List all cached files.""" + files = [] + for entry in self.cache.list_entries(): + files.append(CachedFile.from_cache_entry(entry)) + + # Include legacy files + for f in self.legacy_dir.iterdir(): + if f.is_file(): + files.append(CachedFile( + node_id=f.name, + content_hash=f.name, + path=f, + size_bytes=f.stat().st_size, + node_type="legacy", + created_at=f.stat().st_mtime, + )) + + return files + + # ============ Activity Tracking ============ + + def record_activity(self, dag: DAG, run_id: str = None) -> Activity: + """ + Record a DAG execution as an activity. + + Args: + dag: The executed DAG + run_id: Optional run ID to use as activity_id + + Returns: + The created Activity + """ + activity = Activity.from_dag(dag, activity_id=run_id) + self.activity_store.add(activity) + return activity + + def record_simple_activity( + self, + input_hashes: List[str], + output_hash: str, + run_id: str = None, + ) -> Activity: + """ + Record a simple (non-DAG) execution as an activity. + + For legacy single-effect runs that don't use full DAG execution. + Uses content_hash as node_id. + """ + activity = Activity( + activity_id=run_id or str(hash((tuple(input_hashes), output_hash))), + input_ids=sorted(input_hashes), + output_id=output_hash, + intermediate_ids=[], + created_at=datetime.now(timezone.utc).timestamp(), + status="completed", + ) + self.activity_store.add(activity) + return activity + + def get_activity(self, activity_id: str) -> Optional[Activity]: + """Get activity by ID.""" + return self.activity_store.get(activity_id) + + def list_activities(self) -> List[Activity]: + """List all activities.""" + return self.activity_store.list() + + def find_activities_by_inputs(self, input_hashes: List[str]) -> List[Activity]: + """Find activities with matching inputs (for UI grouping).""" + return self.activity_store.find_by_input_ids(input_hashes) + + # ============ Deletion Rules ============ + + def can_delete(self, content_hash: str) -> tuple[bool, str]: + """ + Check if a cached item can be deleted. + + Returns: + (can_delete, reason) tuple + """ + # Check if shared via L2 + if self.l2_checker.is_shared(content_hash): + return False, "Item is published to L2" + + # Find node_id for this content + node_id = self._content_index.get(content_hash, content_hash) + + # Check if it's an input or output of any activity + for activity in self.activity_store.list(): + if node_id in activity.input_ids: + return False, f"Item is input to activity {activity.activity_id}" + if node_id == activity.output_id: + return False, f"Item is output of activity {activity.activity_id}" + + return True, "OK" + + def can_discard_activity(self, activity_id: str) -> tuple[bool, str]: + """ + Check if an activity can be discarded. + + Returns: + (can_discard, reason) tuple + """ + activity = self.activity_store.get(activity_id) + if not activity: + return False, "Activity not found" + + # Check if any item is shared + for node_id in activity.all_node_ids: + entry = self.cache.get_entry(node_id) + if entry and self.l2_checker.is_shared(entry.content_hash): + return False, f"Item {node_id} is published to L2" + + return True, "OK" + + def delete_by_content_hash(self, content_hash: str) -> tuple[bool, str]: + """ + Delete a cached item by content_hash. + + Enforces deletion rules. + + Returns: + (success, message) tuple + """ + can_delete, reason = self.can_delete(content_hash) + if not can_delete: + return False, reason + + # Find and delete + node_id = self._content_index.get(content_hash) + if node_id: + self.cache.remove(node_id) + del self._content_index[content_hash] + self._save_content_index() + return True, "Deleted" + + # Try legacy + legacy_path = self.legacy_dir / content_hash + if legacy_path.exists(): + legacy_path.unlink() + return True, "Deleted (legacy)" + + return False, "Not found" + + def discard_activity(self, activity_id: str) -> tuple[bool, str]: + """ + Discard an activity and clean up its cache entries. + + Enforces deletion rules. + + Returns: + (success, message) tuple + """ + can_discard, reason = self.can_discard_activity(activity_id) + if not can_discard: + return False, reason + + success = self.activity_manager.discard_activity(activity_id) + if success: + return True, "Activity discarded" + return False, "Failed to discard" + + def cleanup_intermediates(self) -> int: + """Delete all intermediate cache entries (reconstructible).""" + return self.activity_manager.cleanup_intermediates() + + def get_deletable_items(self) -> List[CachedFile]: + """Get all items that can be deleted.""" + deletable = [] + for entry in self.activity_manager.get_deletable_entries(): + deletable.append(CachedFile.from_cache_entry(entry)) + return deletable + + # ============ L2 Integration ============ + + def mark_published(self, content_hash: str): + """Mark a content_hash as published to L2.""" + self.l2_checker.mark_shared(content_hash) + + def invalidate_shared_cache(self, content_hash: str): + """Invalidate shared status cache (call if item might be unpublished).""" + self.l2_checker.invalidate(content_hash) + + # ============ Stats ============ + + def get_stats(self) -> dict: + """Get cache statistics.""" + stats = self.cache.get_stats() + return { + "total_entries": stats.total_entries, + "total_size_bytes": stats.total_size_bytes, + "hits": stats.hits, + "misses": stats.misses, + "hit_rate": stats.hit_rate, + "activities": len(self.activity_store), + } + + +# Singleton instance (initialized on first import with env vars) +_manager: Optional[L1CacheManager] = None + + +def get_cache_manager() -> L1CacheManager: + """Get the singleton cache manager instance.""" + global _manager + if _manager is None: + cache_dir = Path(os.environ.get("CACHE_DIR", str(Path.home() / ".artdag" / "cache"))) + l2_server = os.environ.get("L2_SERVER", "http://localhost:8200") + _manager = L1CacheManager(cache_dir=cache_dir, l2_server=l2_server) + return _manager + + +def reset_cache_manager(): + """Reset the singleton (for testing).""" + global _manager + _manager = None diff --git a/server.py b/server.py index 9e8729d..de3bbe1 100644 --- a/server.py +++ b/server.py @@ -27,6 +27,7 @@ from urllib.parse import urlparse from celery_app import app as celery_app from tasks import render_effect +from cache_manager import L1CacheManager, get_cache_manager # L2 server for auth verification L2_SERVER = os.environ.get("L2_SERVER", "http://localhost:8200") @@ -37,6 +38,9 @@ L1_PUBLIC_URL = os.environ.get("L1_PUBLIC_URL", "http://localhost:8100") CACHE_DIR = Path(os.environ.get("CACHE_DIR", str(Path.home() / ".artdag" / "cache"))) CACHE_DIR.mkdir(parents=True, exist_ok=True) +# Initialize L1 cache manager with artdag integration +cache_manager = L1CacheManager(cache_dir=CACHE_DIR, l2_server=L2_SERVER) + # Redis for persistent run storage REDIS_URL = os.environ.get('REDIS_URL', 'redis://localhost:6379/5') parsed = urlparse(REDIS_URL) @@ -152,14 +156,14 @@ def file_hash(path: Path) -> str: return hasher.hexdigest() -def cache_file(source: Path) -> str: - """Copy file to cache, return content hash.""" - content_hash = file_hash(source) - cache_path = CACHE_DIR / content_hash - if not cache_path.exists(): - import shutil - shutil.copy2(source, cache_path) - return content_hash +def cache_file(source: Path, node_type: str = "output") -> str: + """ + Copy file to cache using L1CacheManager, return content hash. + + Uses artdag's Cache internally for proper tracking. + """ + cached = cache_manager.put(source, node_type=node_type) + return cached.content_hash @app.get("/api") @@ -321,7 +325,15 @@ async def get_run(run_id: str): # Cache the output output_path = Path(result.get("output", {}).get("local_path", "")) if output_path.exists(): - cache_file(output_path) + cache_file(output_path, node_type="effect_output") + + # Record activity for deletion tracking + if run.output_hash and run.inputs: + cache_manager.record_simple_activity( + input_hashes=run.inputs, + output_hash=run.output_hash, + run_id=run.run_id, + ) else: run.status = "failed" run.error = str(task.result) @@ -332,6 +344,42 @@ async def get_run(run_id: str): return run +@app.delete("/runs/{run_id}") +async def discard_run(run_id: str, username: str = Depends(get_required_user)): + """ + Discard (delete) a run and its intermediate cache entries. + + Enforces deletion rules: + - Cannot discard if any item (input, output) is published to L2 + - Deletes intermediate cache entries + - Keeps inputs (may be used by other runs) + - Deletes orphaned outputs + """ + run = load_run(run_id) + if not run: + raise HTTPException(404, f"Run {run_id} not found") + + # Check ownership + actor_id = f"@{username}@{L2_DOMAIN}" + if run.username not in (username, actor_id): + raise HTTPException(403, "Access denied") + + # Check if run can be discarded + can_discard, reason = cache_manager.can_discard_activity(run_id) + if not can_discard: + raise HTTPException(400, f"Cannot discard run: {reason}") + + # Discard the activity (cleans up cache entries) + success, msg = cache_manager.discard_activity(run_id) + if not success: + raise HTTPException(500, f"Failed to discard: {msg}") + + # Remove from Redis + redis_client.delete(f"{RUNS_KEY_PREFIX}{run_id}") + + return {"discarded": True, "run_id": run_id} + + @app.get("/run/{run_id}") async def run_detail(run_id: str, request: Request): """Run detail. HTML for browsers, JSON for APIs.""" @@ -1428,31 +1476,45 @@ async def discard_cache(content_hash: str, username: str = Depends(get_required_ """ Discard (delete) a cached item. - Refuses to delete pinned items. Pinned items include: - - Published items - - Inputs to published items + Enforces deletion rules: + - Cannot delete items published to L2 (shared) + - Cannot delete inputs/outputs of activities (runs) + - Cannot delete pinned items """ - cache_path = CACHE_DIR / content_hash - if not cache_path.exists(): - raise HTTPException(404, "Content not found") + # Check if content exists (in cache_manager or legacy location) + if not cache_manager.has_content(content_hash): + cache_path = CACHE_DIR / content_hash + if not cache_path.exists(): + raise HTTPException(404, "Content not found") # Check ownership user_hashes = get_user_cache_hashes(username) if content_hash not in user_hashes: raise HTTPException(403, "Access denied") - # Check if pinned + # Check if pinned (legacy metadata) meta = load_cache_meta(content_hash) if meta.get("pinned"): pin_reason = meta.get("pin_reason", "unknown") raise HTTPException(400, f"Cannot discard pinned item (reason: {pin_reason})") - # Delete the file and metadata - cache_path.unlink() + # Check deletion rules via cache_manager + can_delete, reason = cache_manager.can_delete(content_hash) + if not can_delete: + raise HTTPException(400, f"Cannot discard: {reason}") + + # Delete via cache_manager + success, msg = cache_manager.delete_by_content_hash(content_hash) + if not success: + # Fallback to legacy deletion + cache_path = CACHE_DIR / content_hash + if cache_path.exists(): + cache_path.unlink() + + # Clean up legacy metadata files meta_path = CACHE_DIR / f"{content_hash}.meta.json" if meta_path.exists(): meta_path.unlink() - # Also delete transcoded mp4 if exists mp4_path = CACHE_DIR / f"{content_hash}.mp4" if mp4_path.exists(): mp4_path.unlink() @@ -1472,18 +1534,32 @@ async def ui_discard_cache(content_hash: str, request: Request): if content_hash not in user_hashes: return '