Files
celery/cache_manager.py

618 lines
21 KiB
Python

# art-celery/cache_manager.py
"""
Cache management for Art DAG L1 server.
Integrates artdag's Cache, ActivityStore, and ActivityManager to provide:
- Content-addressed caching with both node_id and content_hash
- Activity tracking for runs (input/output/intermediate relationships)
- Deletion rules enforcement (shared items protected)
- L2 ActivityPub integration for "shared" status checks
"""
import hashlib
import json
import logging
import os
import shutil
from dataclasses import dataclass
from datetime import datetime, timezone
from pathlib import Path
from typing import Callable, Dict, List, Optional, Set
import requests
from artdag import Cache, CacheEntry, DAG, Node, NodeType
from artdag.activities import Activity, ActivityStore, ActivityManager, make_is_shared_fn
logger = logging.getLogger(__name__)
def file_hash(path: Path, algorithm: str = "sha3_256") -> str:
"""Compute SHA3-256 hash of a file."""
hasher = hashlib.new(algorithm)
actual_path = path.resolve() if path.is_symlink() else path
with open(actual_path, "rb") as f:
for chunk in iter(lambda: f.read(65536), b""):
hasher.update(chunk)
return hasher.hexdigest()
@dataclass
class CachedFile:
"""
A cached file with both identifiers.
Provides a unified view combining:
- node_id: computation identity (for DAG caching)
- content_hash: file content identity (for external references)
"""
node_id: str
content_hash: str
path: Path
size_bytes: int
node_type: str
created_at: float
@classmethod
def from_cache_entry(cls, entry: CacheEntry) -> "CachedFile":
return cls(
node_id=entry.node_id,
content_hash=entry.content_hash,
path=entry.output_path,
size_bytes=entry.size_bytes,
node_type=entry.node_type,
created_at=entry.created_at,
)
class L2SharedChecker:
"""
Checks if content is shared (published) via L2 ActivityPub server.
Caches results to avoid repeated API calls.
"""
def __init__(self, l2_server: str, cache_ttl: int = 300):
self.l2_server = l2_server
self.cache_ttl = cache_ttl
self._cache: Dict[str, tuple[bool, float]] = {}
def is_shared(self, content_hash: str) -> bool:
"""Check if content_hash has been published to L2."""
import time
now = time.time()
# Check cache
if content_hash in self._cache:
is_shared, cached_at = self._cache[content_hash]
if now - cached_at < self.cache_ttl:
logger.debug(f"L2 check (cached): {content_hash[:16]}... = {is_shared}")
return is_shared
# Query L2
try:
url = f"{self.l2_server}/assets/by-hash/{content_hash}"
logger.info(f"L2 check: GET {url}")
resp = requests.get(url, timeout=5)
logger.info(f"L2 check response: {resp.status_code}")
is_shared = resp.status_code == 200
except Exception as e:
logger.warning(f"Failed to check L2 for {content_hash}: {e}")
# On error, assume IS shared (safer - prevents accidental deletion)
is_shared = True
self._cache[content_hash] = (is_shared, now)
return is_shared
def invalidate(self, content_hash: str):
"""Invalidate cache for a content_hash (call after publishing)."""
self._cache.pop(content_hash, None)
def mark_shared(self, content_hash: str):
"""Mark as shared without querying (call after successful publish)."""
import time
self._cache[content_hash] = (True, time.time())
class L1CacheManager:
"""
Unified cache manager for Art DAG L1 server.
Combines:
- artdag Cache for file storage
- ActivityStore for run tracking
- ActivityManager for deletion rules
- L2 integration for shared status
Provides both node_id and content_hash based access.
"""
def __init__(
self,
cache_dir: Path | str,
l2_server: str = "http://localhost:8200",
):
self.cache_dir = Path(cache_dir)
self.cache_dir.mkdir(parents=True, exist_ok=True)
# artdag components
self.cache = Cache(self.cache_dir / "nodes")
self.activity_store = ActivityStore(self.cache_dir / "activities")
# L2 shared checker
self.l2_checker = L2SharedChecker(l2_server)
# Activity manager with L2-based is_shared
self.activity_manager = ActivityManager(
cache=self.cache,
activity_store=self.activity_store,
is_shared_fn=self._is_shared_by_node_id,
)
# Content hash index: content_hash -> node_id
# This enables lookup by content_hash for API compatibility
self._content_index: Dict[str, str] = {}
self._load_content_index()
# Legacy files directory (for files uploaded directly by content_hash)
self.legacy_dir = self.cache_dir / "legacy"
self.legacy_dir.mkdir(parents=True, exist_ok=True)
def _index_path(self) -> Path:
return self.cache_dir / "content_index.json"
def _load_content_index(self):
"""Load content_hash -> node_id index."""
if self._index_path().exists():
try:
with open(self._index_path()) as f:
self._content_index = json.load(f)
except (json.JSONDecodeError, IOError) as e:
logger.warning(f"Failed to load content index: {e}")
self._content_index = {}
# Also index from existing cache entries
for entry in self.cache.list_entries():
if entry.content_hash:
self._content_index[entry.content_hash] = entry.node_id
def _save_content_index(self):
"""Save content_hash -> node_id index."""
with open(self._index_path(), "w") as f:
json.dump(self._content_index, f, indent=2)
def _is_shared_by_node_id(self, content_hash: str) -> bool:
"""Check if a content_hash is shared via L2."""
return self.l2_checker.is_shared(content_hash)
def _load_meta(self, content_hash: str) -> dict:
"""Load metadata for a cached file."""
meta_path = self.cache_dir / f"{content_hash}.meta.json"
if meta_path.exists():
with open(meta_path) as f:
return json.load(f)
return {}
def is_pinned(self, content_hash: str) -> tuple[bool, str]:
"""
Check if a content_hash is pinned (non-deletable).
Returns:
(is_pinned, reason) tuple
"""
meta = self._load_meta(content_hash)
if meta.get("pinned"):
return True, meta.get("pin_reason", "published")
return False, ""
def _save_meta(self, content_hash: str, **updates) -> dict:
"""Save/update metadata for a cached file."""
meta = self._load_meta(content_hash)
meta.update(updates)
meta_path = self.cache_dir / f"{content_hash}.meta.json"
with open(meta_path, "w") as f:
json.dump(meta, f, indent=2)
return meta
def pin(self, content_hash: str, reason: str = "published") -> None:
"""Mark an item as pinned (non-deletable)."""
self._save_meta(content_hash, pinned=True, pin_reason=reason)
# ============ File Storage ============
def put(
self,
source_path: Path,
node_type: str = "upload",
node_id: str = None,
execution_time: float = 0.0,
move: bool = False,
) -> CachedFile:
"""
Store a file in the cache.
Args:
source_path: Path to file to cache
node_type: Type of node (e.g., "upload", "source", "effect")
node_id: Optional node_id; if not provided, uses content_hash
execution_time: How long the operation took
move: If True, move instead of copy
Returns:
CachedFile with both node_id and content_hash
"""
# Compute content hash first
content_hash = file_hash(source_path)
# Use content_hash as node_id if not provided
# This is for legacy/uploaded files that don't have a DAG node
if node_id is None:
node_id = content_hash
# Check if already cached (by node_id)
existing = self.cache.get_entry(node_id)
if existing and existing.output_path.exists():
return CachedFile.from_cache_entry(existing)
# Store in cache
self.cache.put(
node_id=node_id,
source_path=source_path,
node_type=node_type,
execution_time=execution_time,
move=move,
)
entry = self.cache.get_entry(node_id)
# Update content index
self._content_index[entry.content_hash] = node_id
self._save_content_index()
return CachedFile.from_cache_entry(entry)
def get_by_node_id(self, node_id: str) -> Optional[Path]:
"""Get cached file path by node_id."""
return self.cache.get(node_id)
def get_by_content_hash(self, content_hash: str) -> Optional[Path]:
"""Get cached file path by content_hash."""
# Check index first (new cache structure)
node_id = self._content_index.get(content_hash)
if node_id:
path = self.cache.get(node_id)
if path and path.exists():
logger.info(f" Found via index: {path}")
return path
# For uploads, node_id == content_hash, so try direct lookup
# This works even if cache index hasn't been reloaded
path = self.cache.get(content_hash)
logger.info(f" cache.get({content_hash[:16]}...) returned: {path}")
if path and path.exists():
self._content_index[content_hash] = content_hash
self._save_content_index()
return path
# Scan cache entries (fallback for new structure)
entry = self.cache.find_by_content_hash(content_hash)
if entry and entry.output_path.exists():
logger.info(f" Found via scan: {entry.output_path}")
self._content_index[content_hash] = entry.node_id
self._save_content_index()
return entry.output_path
# Check legacy location (files stored directly as CACHE_DIR/{content_hash})
legacy_path = self.cache_dir / content_hash
if legacy_path.exists() and legacy_path.is_file():
return legacy_path
return None
def has_content(self, content_hash: str) -> bool:
"""Check if content exists in cache."""
return self.get_by_content_hash(content_hash) is not None
def get_entry_by_content_hash(self, content_hash: str) -> Optional[CacheEntry]:
"""Get cache entry by content_hash."""
node_id = self._content_index.get(content_hash)
if node_id:
return self.cache.get_entry(node_id)
return self.cache.find_by_content_hash(content_hash)
def list_all(self) -> List[CachedFile]:
"""List all cached files."""
files = []
seen_hashes = set()
# New cache structure entries
for entry in self.cache.list_entries():
files.append(CachedFile.from_cache_entry(entry))
if entry.content_hash:
seen_hashes.add(entry.content_hash)
# Legacy files stored directly in cache_dir (old structure)
# These are files named by content_hash directly in CACHE_DIR
for f in self.cache_dir.iterdir():
# Skip directories and special files
if not f.is_file():
continue
# Skip metadata/auxiliary files
if f.suffix in ('.json', '.mp4'):
continue
# Skip if name doesn't look like a hash (64 hex chars)
if len(f.name) != 64 or not all(c in '0123456789abcdef' for c in f.name):
continue
# Skip if already seen via new cache
if f.name in seen_hashes:
continue
files.append(CachedFile(
node_id=f.name,
content_hash=f.name,
path=f,
size_bytes=f.stat().st_size,
node_type="legacy",
created_at=f.stat().st_mtime,
))
seen_hashes.add(f.name)
return files
# ============ Activity Tracking ============
def record_activity(self, dag: DAG, run_id: str = None) -> Activity:
"""
Record a DAG execution as an activity.
Args:
dag: The executed DAG
run_id: Optional run ID to use as activity_id
Returns:
The created Activity
"""
activity = Activity.from_dag(dag, activity_id=run_id)
self.activity_store.add(activity)
return activity
def record_simple_activity(
self,
input_hashes: List[str],
output_hash: str,
run_id: str = None,
) -> Activity:
"""
Record a simple (non-DAG) execution as an activity.
For legacy single-effect runs that don't use full DAG execution.
Uses content_hash as node_id.
"""
activity = Activity(
activity_id=run_id or str(hash((tuple(input_hashes), output_hash))),
input_ids=sorted(input_hashes),
output_id=output_hash,
intermediate_ids=[],
created_at=datetime.now(timezone.utc).timestamp(),
status="completed",
)
self.activity_store.add(activity)
return activity
def get_activity(self, activity_id: str) -> Optional[Activity]:
"""Get activity by ID."""
return self.activity_store.get(activity_id)
def list_activities(self) -> List[Activity]:
"""List all activities."""
return self.activity_store.list()
def find_activities_by_inputs(self, input_hashes: List[str]) -> List[Activity]:
"""Find activities with matching inputs (for UI grouping)."""
return self.activity_store.find_by_input_ids(input_hashes)
# ============ Deletion Rules ============
def can_delete(self, content_hash: str) -> tuple[bool, str]:
"""
Check if a cached item can be deleted.
Returns:
(can_delete, reason) tuple
"""
# Check if pinned (published or input to published)
pinned, reason = self.is_pinned(content_hash)
if pinned:
return False, f"Item is pinned ({reason})"
# Find node_id for this content
node_id = self._content_index.get(content_hash, content_hash)
# Check if it's an input or output of any activity
for activity in self.activity_store.list():
if node_id in activity.input_ids:
return False, f"Item is input to activity {activity.activity_id}"
if node_id == activity.output_id:
return False, f"Item is output of activity {activity.activity_id}"
return True, "OK"
def can_discard_activity(self, activity_id: str) -> tuple[bool, str]:
"""
Check if an activity can be discarded.
Returns:
(can_discard, reason) tuple
"""
activity = self.activity_store.get(activity_id)
if not activity:
return False, "Activity not found"
# Check if any item is pinned
for node_id in activity.all_node_ids:
entry = self.cache.get_entry(node_id)
if entry:
pinned, reason = self.is_pinned(entry.content_hash)
if pinned:
return False, f"Item {node_id} is pinned ({reason})"
return True, "OK"
def delete_by_content_hash(self, content_hash: str) -> tuple[bool, str]:
"""
Delete a cached item by content_hash.
Enforces deletion rules.
Returns:
(success, message) tuple
"""
can_delete, reason = self.can_delete(content_hash)
if not can_delete:
return False, reason
# Find and delete
node_id = self._content_index.get(content_hash)
if node_id:
self.cache.remove(node_id)
del self._content_index[content_hash]
self._save_content_index()
return True, "Deleted"
# Try legacy
legacy_path = self.legacy_dir / content_hash
if legacy_path.exists():
legacy_path.unlink()
return True, "Deleted (legacy)"
return False, "Not found"
def discard_activity(self, activity_id: str) -> tuple[bool, str]:
"""
Discard an activity and clean up its cache entries.
Enforces deletion rules.
Returns:
(success, message) tuple
"""
can_discard, reason = self.can_discard_activity(activity_id)
if not can_discard:
return False, reason
success = self.activity_manager.discard_activity(activity_id)
if success:
return True, "Activity discarded"
return False, "Failed to discard"
def discard_activity_outputs_only(self, activity_id: str) -> tuple[bool, str]:
"""
Discard an activity, deleting only outputs and intermediates.
Inputs (cache items, configs) are preserved.
Returns:
(success, message) tuple
"""
activity = self.activity_store.get(activity_id)
if not activity:
return False, "Activity not found"
# Check if output is pinned
if activity.output_id:
entry = self.cache.get_entry(activity.output_id)
if entry:
pinned, reason = self.is_pinned(entry.content_hash)
if pinned:
return False, f"Output is pinned ({reason})"
# Delete output
if activity.output_id:
entry = self.cache.get_entry(activity.output_id)
if entry:
# Remove from cache
self.cache.remove(activity.output_id)
# Remove from content index
if entry.content_hash in self._content_index:
del self._content_index[entry.content_hash]
self._save_content_index()
# Delete from legacy dir if exists
legacy_path = self.legacy_dir / entry.content_hash
if legacy_path.exists():
legacy_path.unlink()
# Delete intermediates
for node_id in activity.intermediate_ids:
entry = self.cache.get_entry(node_id)
if entry:
self.cache.remove(node_id)
if entry.content_hash in self._content_index:
del self._content_index[entry.content_hash]
legacy_path = self.legacy_dir / entry.content_hash
if legacy_path.exists():
legacy_path.unlink()
if activity.intermediate_ids:
self._save_content_index()
# Remove activity record (inputs remain in cache)
self.activity_store.remove(activity_id)
return True, "Activity discarded (outputs only)"
def cleanup_intermediates(self) -> int:
"""Delete all intermediate cache entries (reconstructible)."""
return self.activity_manager.cleanup_intermediates()
def get_deletable_items(self) -> List[CachedFile]:
"""Get all items that can be deleted."""
deletable = []
for entry in self.activity_manager.get_deletable_entries():
deletable.append(CachedFile.from_cache_entry(entry))
return deletable
# ============ L2 Integration ============
def mark_published(self, content_hash: str):
"""Mark a content_hash as published to L2."""
self.l2_checker.mark_shared(content_hash)
def invalidate_shared_cache(self, content_hash: str):
"""Invalidate shared status cache (call if item might be unpublished)."""
self.l2_checker.invalidate(content_hash)
# ============ Stats ============
def get_stats(self) -> dict:
"""Get cache statistics."""
stats = self.cache.get_stats()
return {
"total_entries": stats.total_entries,
"total_size_bytes": stats.total_size_bytes,
"hits": stats.hits,
"misses": stats.misses,
"hit_rate": stats.hit_rate,
"activities": len(self.activity_store),
}
# Singleton instance (initialized on first import with env vars)
_manager: Optional[L1CacheManager] = None
def get_cache_manager() -> L1CacheManager:
"""Get the singleton cache manager instance."""
global _manager
if _manager is None:
cache_dir = Path(os.environ.get("CACHE_DIR", str(Path.home() / ".artdag" / "cache")))
l2_server = os.environ.get("L2_SERVER", "http://localhost:8200")
_manager = L1CacheManager(cache_dir=cache_dir, l2_server=l2_server)
return _manager
def reset_cache_manager():
"""Reset the singleton (for testing)."""
global _manager
_manager = None