Integrate artdag cache with deletion rules
- Add cache_manager.py with L1CacheManager wrapping artdag Cache
- Add L2SharedChecker for checking published status via L2 API
- Update server.py to use cache_manager for storage
- Update DELETE /cache/{content_hash} to enforce deletion rules
- Add DELETE /runs/{run_id} endpoint for discarding runs
- Record activities when runs complete for deletion tracking
- Add comprehensive tests for cache manager
Deletion rules enforced:
- Cannot delete items published to L2
- Cannot delete inputs/outputs of runs
- Can delete orphaned items
- Runs can only be discarded if no items are shared
Co-Authored-By: Claude Opus 4.5 <noreply@anthropic.com>
This commit is contained in:
494
cache_manager.py
Normal file
494
cache_manager.py
Normal file
@@ -0,0 +1,494 @@
|
||||
# art-celery/cache_manager.py
|
||||
"""
|
||||
Cache management for Art DAG L1 server.
|
||||
|
||||
Integrates artdag's Cache, ActivityStore, and ActivityManager to provide:
|
||||
- Content-addressed caching with both node_id and content_hash
|
||||
- Activity tracking for runs (input/output/intermediate relationships)
|
||||
- Deletion rules enforcement (shared items protected)
|
||||
- L2 ActivityPub integration for "shared" status checks
|
||||
"""
|
||||
|
||||
import hashlib
|
||||
import json
|
||||
import logging
|
||||
import os
|
||||
import shutil
|
||||
from dataclasses import dataclass
|
||||
from datetime import datetime, timezone
|
||||
from pathlib import Path
|
||||
from typing import Callable, Dict, List, Optional, Set
|
||||
|
||||
import requests
|
||||
|
||||
from artdag import Cache, CacheEntry, DAG, Node, NodeType
|
||||
from artdag.activities import Activity, ActivityStore, ActivityManager, make_is_shared_fn
|
||||
|
||||
logger = logging.getLogger(__name__)
|
||||
|
||||
|
||||
def file_hash(path: Path, algorithm: str = "sha3_256") -> str:
|
||||
"""Compute SHA3-256 hash of a file."""
|
||||
hasher = hashlib.new(algorithm)
|
||||
actual_path = path.resolve() if path.is_symlink() else path
|
||||
with open(actual_path, "rb") as f:
|
||||
for chunk in iter(lambda: f.read(65536), b""):
|
||||
hasher.update(chunk)
|
||||
return hasher.hexdigest()
|
||||
|
||||
|
||||
@dataclass
|
||||
class CachedFile:
|
||||
"""
|
||||
A cached file with both identifiers.
|
||||
|
||||
Provides a unified view combining:
|
||||
- node_id: computation identity (for DAG caching)
|
||||
- content_hash: file content identity (for external references)
|
||||
"""
|
||||
node_id: str
|
||||
content_hash: str
|
||||
path: Path
|
||||
size_bytes: int
|
||||
node_type: str
|
||||
created_at: float
|
||||
|
||||
@classmethod
|
||||
def from_cache_entry(cls, entry: CacheEntry) -> "CachedFile":
|
||||
return cls(
|
||||
node_id=entry.node_id,
|
||||
content_hash=entry.content_hash,
|
||||
path=entry.output_path,
|
||||
size_bytes=entry.size_bytes,
|
||||
node_type=entry.node_type,
|
||||
created_at=entry.created_at,
|
||||
)
|
||||
|
||||
|
||||
class L2SharedChecker:
|
||||
"""
|
||||
Checks if content is shared (published) via L2 ActivityPub server.
|
||||
|
||||
Caches results to avoid repeated API calls.
|
||||
"""
|
||||
|
||||
def __init__(self, l2_server: str, cache_ttl: int = 300):
|
||||
self.l2_server = l2_server
|
||||
self.cache_ttl = cache_ttl
|
||||
self._cache: Dict[str, tuple[bool, float]] = {}
|
||||
|
||||
def is_shared(self, content_hash: str) -> bool:
|
||||
"""Check if content_hash has been published to L2."""
|
||||
import time
|
||||
now = time.time()
|
||||
|
||||
# Check cache
|
||||
if content_hash in self._cache:
|
||||
is_shared, cached_at = self._cache[content_hash]
|
||||
if now - cached_at < self.cache_ttl:
|
||||
return is_shared
|
||||
|
||||
# Query L2
|
||||
try:
|
||||
resp = requests.get(
|
||||
f"{self.l2_server}/registry/by-hash/{content_hash}",
|
||||
timeout=5
|
||||
)
|
||||
is_shared = resp.status_code == 200
|
||||
except Exception as e:
|
||||
logger.warning(f"Failed to check L2 for {content_hash}: {e}")
|
||||
# On error, assume not shared (safer for deletion)
|
||||
is_shared = False
|
||||
|
||||
self._cache[content_hash] = (is_shared, now)
|
||||
return is_shared
|
||||
|
||||
def invalidate(self, content_hash: str):
|
||||
"""Invalidate cache for a content_hash (call after publishing)."""
|
||||
self._cache.pop(content_hash, None)
|
||||
|
||||
def mark_shared(self, content_hash: str):
|
||||
"""Mark as shared without querying (call after successful publish)."""
|
||||
import time
|
||||
self._cache[content_hash] = (True, time.time())
|
||||
|
||||
|
||||
class L1CacheManager:
|
||||
"""
|
||||
Unified cache manager for Art DAG L1 server.
|
||||
|
||||
Combines:
|
||||
- artdag Cache for file storage
|
||||
- ActivityStore for run tracking
|
||||
- ActivityManager for deletion rules
|
||||
- L2 integration for shared status
|
||||
|
||||
Provides both node_id and content_hash based access.
|
||||
"""
|
||||
|
||||
def __init__(
|
||||
self,
|
||||
cache_dir: Path | str,
|
||||
l2_server: str = "http://localhost:8200",
|
||||
):
|
||||
self.cache_dir = Path(cache_dir)
|
||||
self.cache_dir.mkdir(parents=True, exist_ok=True)
|
||||
|
||||
# artdag components
|
||||
self.cache = Cache(self.cache_dir / "nodes")
|
||||
self.activity_store = ActivityStore(self.cache_dir / "activities")
|
||||
|
||||
# L2 shared checker
|
||||
self.l2_checker = L2SharedChecker(l2_server)
|
||||
|
||||
# Activity manager with L2-based is_shared
|
||||
self.activity_manager = ActivityManager(
|
||||
cache=self.cache,
|
||||
activity_store=self.activity_store,
|
||||
is_shared_fn=self._is_shared_by_node_id,
|
||||
)
|
||||
|
||||
# Content hash index: content_hash -> node_id
|
||||
# This enables lookup by content_hash for API compatibility
|
||||
self._content_index: Dict[str, str] = {}
|
||||
self._load_content_index()
|
||||
|
||||
# Legacy files directory (for files uploaded directly by content_hash)
|
||||
self.legacy_dir = self.cache_dir / "legacy"
|
||||
self.legacy_dir.mkdir(parents=True, exist_ok=True)
|
||||
|
||||
def _index_path(self) -> Path:
|
||||
return self.cache_dir / "content_index.json"
|
||||
|
||||
def _load_content_index(self):
|
||||
"""Load content_hash -> node_id index."""
|
||||
if self._index_path().exists():
|
||||
try:
|
||||
with open(self._index_path()) as f:
|
||||
self._content_index = json.load(f)
|
||||
except (json.JSONDecodeError, IOError) as e:
|
||||
logger.warning(f"Failed to load content index: {e}")
|
||||
self._content_index = {}
|
||||
|
||||
# Also index from existing cache entries
|
||||
for entry in self.cache.list_entries():
|
||||
if entry.content_hash:
|
||||
self._content_index[entry.content_hash] = entry.node_id
|
||||
|
||||
def _save_content_index(self):
|
||||
"""Save content_hash -> node_id index."""
|
||||
with open(self._index_path(), "w") as f:
|
||||
json.dump(self._content_index, f, indent=2)
|
||||
|
||||
def _is_shared_by_node_id(self, content_hash: str) -> bool:
|
||||
"""Check if a content_hash is shared via L2."""
|
||||
return self.l2_checker.is_shared(content_hash)
|
||||
|
||||
# ============ File Storage ============
|
||||
|
||||
def put(
|
||||
self,
|
||||
source_path: Path,
|
||||
node_type: str = "upload",
|
||||
node_id: str = None,
|
||||
execution_time: float = 0.0,
|
||||
move: bool = False,
|
||||
) -> CachedFile:
|
||||
"""
|
||||
Store a file in the cache.
|
||||
|
||||
Args:
|
||||
source_path: Path to file to cache
|
||||
node_type: Type of node (e.g., "upload", "source", "effect")
|
||||
node_id: Optional node_id; if not provided, uses content_hash
|
||||
execution_time: How long the operation took
|
||||
move: If True, move instead of copy
|
||||
|
||||
Returns:
|
||||
CachedFile with both node_id and content_hash
|
||||
"""
|
||||
# Compute content hash first
|
||||
content_hash = file_hash(source_path)
|
||||
|
||||
# Use content_hash as node_id if not provided
|
||||
# This is for legacy/uploaded files that don't have a DAG node
|
||||
if node_id is None:
|
||||
node_id = content_hash
|
||||
|
||||
# Check if already cached (by node_id)
|
||||
existing = self.cache.get_entry(node_id)
|
||||
if existing and existing.output_path.exists():
|
||||
return CachedFile.from_cache_entry(existing)
|
||||
|
||||
# Store in cache
|
||||
self.cache.put(
|
||||
node_id=node_id,
|
||||
source_path=source_path,
|
||||
node_type=node_type,
|
||||
execution_time=execution_time,
|
||||
move=move,
|
||||
)
|
||||
|
||||
entry = self.cache.get_entry(node_id)
|
||||
|
||||
# Update content index
|
||||
self._content_index[entry.content_hash] = node_id
|
||||
self._save_content_index()
|
||||
|
||||
return CachedFile.from_cache_entry(entry)
|
||||
|
||||
def get_by_node_id(self, node_id: str) -> Optional[Path]:
|
||||
"""Get cached file path by node_id."""
|
||||
return self.cache.get(node_id)
|
||||
|
||||
def get_by_content_hash(self, content_hash: str) -> Optional[Path]:
|
||||
"""Get cached file path by content_hash."""
|
||||
# Check index first
|
||||
node_id = self._content_index.get(content_hash)
|
||||
if node_id:
|
||||
path = self.cache.get(node_id)
|
||||
if path:
|
||||
return path
|
||||
|
||||
# Check legacy directory
|
||||
legacy_path = self.legacy_dir / content_hash
|
||||
if legacy_path.exists():
|
||||
return legacy_path
|
||||
|
||||
# Scan cache entries (fallback)
|
||||
entry = self.cache.find_by_content_hash(content_hash)
|
||||
if entry:
|
||||
self._content_index[content_hash] = entry.node_id
|
||||
self._save_content_index()
|
||||
return entry.output_path
|
||||
|
||||
return None
|
||||
|
||||
def has_content(self, content_hash: str) -> bool:
|
||||
"""Check if content exists in cache."""
|
||||
return self.get_by_content_hash(content_hash) is not None
|
||||
|
||||
def get_entry_by_content_hash(self, content_hash: str) -> Optional[CacheEntry]:
|
||||
"""Get cache entry by content_hash."""
|
||||
node_id = self._content_index.get(content_hash)
|
||||
if node_id:
|
||||
return self.cache.get_entry(node_id)
|
||||
return self.cache.find_by_content_hash(content_hash)
|
||||
|
||||
def list_all(self) -> List[CachedFile]:
|
||||
"""List all cached files."""
|
||||
files = []
|
||||
for entry in self.cache.list_entries():
|
||||
files.append(CachedFile.from_cache_entry(entry))
|
||||
|
||||
# Include legacy files
|
||||
for f in self.legacy_dir.iterdir():
|
||||
if f.is_file():
|
||||
files.append(CachedFile(
|
||||
node_id=f.name,
|
||||
content_hash=f.name,
|
||||
path=f,
|
||||
size_bytes=f.stat().st_size,
|
||||
node_type="legacy",
|
||||
created_at=f.stat().st_mtime,
|
||||
))
|
||||
|
||||
return files
|
||||
|
||||
# ============ Activity Tracking ============
|
||||
|
||||
def record_activity(self, dag: DAG, run_id: str = None) -> Activity:
|
||||
"""
|
||||
Record a DAG execution as an activity.
|
||||
|
||||
Args:
|
||||
dag: The executed DAG
|
||||
run_id: Optional run ID to use as activity_id
|
||||
|
||||
Returns:
|
||||
The created Activity
|
||||
"""
|
||||
activity = Activity.from_dag(dag, activity_id=run_id)
|
||||
self.activity_store.add(activity)
|
||||
return activity
|
||||
|
||||
def record_simple_activity(
|
||||
self,
|
||||
input_hashes: List[str],
|
||||
output_hash: str,
|
||||
run_id: str = None,
|
||||
) -> Activity:
|
||||
"""
|
||||
Record a simple (non-DAG) execution as an activity.
|
||||
|
||||
For legacy single-effect runs that don't use full DAG execution.
|
||||
Uses content_hash as node_id.
|
||||
"""
|
||||
activity = Activity(
|
||||
activity_id=run_id or str(hash((tuple(input_hashes), output_hash))),
|
||||
input_ids=sorted(input_hashes),
|
||||
output_id=output_hash,
|
||||
intermediate_ids=[],
|
||||
created_at=datetime.now(timezone.utc).timestamp(),
|
||||
status="completed",
|
||||
)
|
||||
self.activity_store.add(activity)
|
||||
return activity
|
||||
|
||||
def get_activity(self, activity_id: str) -> Optional[Activity]:
|
||||
"""Get activity by ID."""
|
||||
return self.activity_store.get(activity_id)
|
||||
|
||||
def list_activities(self) -> List[Activity]:
|
||||
"""List all activities."""
|
||||
return self.activity_store.list()
|
||||
|
||||
def find_activities_by_inputs(self, input_hashes: List[str]) -> List[Activity]:
|
||||
"""Find activities with matching inputs (for UI grouping)."""
|
||||
return self.activity_store.find_by_input_ids(input_hashes)
|
||||
|
||||
# ============ Deletion Rules ============
|
||||
|
||||
def can_delete(self, content_hash: str) -> tuple[bool, str]:
|
||||
"""
|
||||
Check if a cached item can be deleted.
|
||||
|
||||
Returns:
|
||||
(can_delete, reason) tuple
|
||||
"""
|
||||
# Check if shared via L2
|
||||
if self.l2_checker.is_shared(content_hash):
|
||||
return False, "Item is published to L2"
|
||||
|
||||
# Find node_id for this content
|
||||
node_id = self._content_index.get(content_hash, content_hash)
|
||||
|
||||
# Check if it's an input or output of any activity
|
||||
for activity in self.activity_store.list():
|
||||
if node_id in activity.input_ids:
|
||||
return False, f"Item is input to activity {activity.activity_id}"
|
||||
if node_id == activity.output_id:
|
||||
return False, f"Item is output of activity {activity.activity_id}"
|
||||
|
||||
return True, "OK"
|
||||
|
||||
def can_discard_activity(self, activity_id: str) -> tuple[bool, str]:
|
||||
"""
|
||||
Check if an activity can be discarded.
|
||||
|
||||
Returns:
|
||||
(can_discard, reason) tuple
|
||||
"""
|
||||
activity = self.activity_store.get(activity_id)
|
||||
if not activity:
|
||||
return False, "Activity not found"
|
||||
|
||||
# Check if any item is shared
|
||||
for node_id in activity.all_node_ids:
|
||||
entry = self.cache.get_entry(node_id)
|
||||
if entry and self.l2_checker.is_shared(entry.content_hash):
|
||||
return False, f"Item {node_id} is published to L2"
|
||||
|
||||
return True, "OK"
|
||||
|
||||
def delete_by_content_hash(self, content_hash: str) -> tuple[bool, str]:
|
||||
"""
|
||||
Delete a cached item by content_hash.
|
||||
|
||||
Enforces deletion rules.
|
||||
|
||||
Returns:
|
||||
(success, message) tuple
|
||||
"""
|
||||
can_delete, reason = self.can_delete(content_hash)
|
||||
if not can_delete:
|
||||
return False, reason
|
||||
|
||||
# Find and delete
|
||||
node_id = self._content_index.get(content_hash)
|
||||
if node_id:
|
||||
self.cache.remove(node_id)
|
||||
del self._content_index[content_hash]
|
||||
self._save_content_index()
|
||||
return True, "Deleted"
|
||||
|
||||
# Try legacy
|
||||
legacy_path = self.legacy_dir / content_hash
|
||||
if legacy_path.exists():
|
||||
legacy_path.unlink()
|
||||
return True, "Deleted (legacy)"
|
||||
|
||||
return False, "Not found"
|
||||
|
||||
def discard_activity(self, activity_id: str) -> tuple[bool, str]:
|
||||
"""
|
||||
Discard an activity and clean up its cache entries.
|
||||
|
||||
Enforces deletion rules.
|
||||
|
||||
Returns:
|
||||
(success, message) tuple
|
||||
"""
|
||||
can_discard, reason = self.can_discard_activity(activity_id)
|
||||
if not can_discard:
|
||||
return False, reason
|
||||
|
||||
success = self.activity_manager.discard_activity(activity_id)
|
||||
if success:
|
||||
return True, "Activity discarded"
|
||||
return False, "Failed to discard"
|
||||
|
||||
def cleanup_intermediates(self) -> int:
|
||||
"""Delete all intermediate cache entries (reconstructible)."""
|
||||
return self.activity_manager.cleanup_intermediates()
|
||||
|
||||
def get_deletable_items(self) -> List[CachedFile]:
|
||||
"""Get all items that can be deleted."""
|
||||
deletable = []
|
||||
for entry in self.activity_manager.get_deletable_entries():
|
||||
deletable.append(CachedFile.from_cache_entry(entry))
|
||||
return deletable
|
||||
|
||||
# ============ L2 Integration ============
|
||||
|
||||
def mark_published(self, content_hash: str):
|
||||
"""Mark a content_hash as published to L2."""
|
||||
self.l2_checker.mark_shared(content_hash)
|
||||
|
||||
def invalidate_shared_cache(self, content_hash: str):
|
||||
"""Invalidate shared status cache (call if item might be unpublished)."""
|
||||
self.l2_checker.invalidate(content_hash)
|
||||
|
||||
# ============ Stats ============
|
||||
|
||||
def get_stats(self) -> dict:
|
||||
"""Get cache statistics."""
|
||||
stats = self.cache.get_stats()
|
||||
return {
|
||||
"total_entries": stats.total_entries,
|
||||
"total_size_bytes": stats.total_size_bytes,
|
||||
"hits": stats.hits,
|
||||
"misses": stats.misses,
|
||||
"hit_rate": stats.hit_rate,
|
||||
"activities": len(self.activity_store),
|
||||
}
|
||||
|
||||
|
||||
# Singleton instance (initialized on first import with env vars)
|
||||
_manager: Optional[L1CacheManager] = None
|
||||
|
||||
|
||||
def get_cache_manager() -> L1CacheManager:
|
||||
"""Get the singleton cache manager instance."""
|
||||
global _manager
|
||||
if _manager is None:
|
||||
cache_dir = Path(os.environ.get("CACHE_DIR", str(Path.home() / ".artdag" / "cache")))
|
||||
l2_server = os.environ.get("L2_SERVER", "http://localhost:8200")
|
||||
_manager = L1CacheManager(cache_dir=cache_dir, l2_server=l2_server)
|
||||
return _manager
|
||||
|
||||
|
||||
def reset_cache_manager():
|
||||
"""Reset the singleton (for testing)."""
|
||||
global _manager
|
||||
_manager = None
|
||||
Reference in New Issue
Block a user