From e4fd5eb01088dda1e038b1baa2ff8af35e6f1ff3 Mon Sep 17 00:00:00 2001 From: gilesb Date: Thu, 8 Jan 2026 00:51:18 +0000 Subject: [PATCH] Integrate artdag cache with deletion rules - Add cache_manager.py with L1CacheManager wrapping artdag Cache - Add L2SharedChecker for checking published status via L2 API - Update server.py to use cache_manager for storage - Update DELETE /cache/{content_hash} to enforce deletion rules - Add DELETE /runs/{run_id} endpoint for discarding runs - Record activities when runs complete for deletion tracking - Add comprehensive tests for cache manager Deletion rules enforced: - Cannot delete items published to L2 - Cannot delete inputs/outputs of runs - Can delete orphaned items - Runs can only be discarded if no items are shared Co-Authored-By: Claude Opus 4.5 --- cache_manager.py | 494 ++++++++++++++++++++++++++++++++++++ server.py | 126 +++++++-- tests/test_cache_manager.py | 397 +++++++++++++++++++++++++++++ 3 files changed, 992 insertions(+), 25 deletions(-) create mode 100644 cache_manager.py create mode 100644 tests/test_cache_manager.py diff --git a/cache_manager.py b/cache_manager.py new file mode 100644 index 0000000..5f4f3fa --- /dev/null +++ b/cache_manager.py @@ -0,0 +1,494 @@ +# art-celery/cache_manager.py +""" +Cache management for Art DAG L1 server. + +Integrates artdag's Cache, ActivityStore, and ActivityManager to provide: +- Content-addressed caching with both node_id and content_hash +- Activity tracking for runs (input/output/intermediate relationships) +- Deletion rules enforcement (shared items protected) +- L2 ActivityPub integration for "shared" status checks +""" + +import hashlib +import json +import logging +import os +import shutil +from dataclasses import dataclass +from datetime import datetime, timezone +from pathlib import Path +from typing import Callable, Dict, List, Optional, Set + +import requests + +from artdag import Cache, CacheEntry, DAG, Node, NodeType +from artdag.activities import Activity, ActivityStore, ActivityManager, make_is_shared_fn + +logger = logging.getLogger(__name__) + + +def file_hash(path: Path, algorithm: str = "sha3_256") -> str: + """Compute SHA3-256 hash of a file.""" + hasher = hashlib.new(algorithm) + actual_path = path.resolve() if path.is_symlink() else path + with open(actual_path, "rb") as f: + for chunk in iter(lambda: f.read(65536), b""): + hasher.update(chunk) + return hasher.hexdigest() + + +@dataclass +class CachedFile: + """ + A cached file with both identifiers. + + Provides a unified view combining: + - node_id: computation identity (for DAG caching) + - content_hash: file content identity (for external references) + """ + node_id: str + content_hash: str + path: Path + size_bytes: int + node_type: str + created_at: float + + @classmethod + def from_cache_entry(cls, entry: CacheEntry) -> "CachedFile": + return cls( + node_id=entry.node_id, + content_hash=entry.content_hash, + path=entry.output_path, + size_bytes=entry.size_bytes, + node_type=entry.node_type, + created_at=entry.created_at, + ) + + +class L2SharedChecker: + """ + Checks if content is shared (published) via L2 ActivityPub server. + + Caches results to avoid repeated API calls. + """ + + def __init__(self, l2_server: str, cache_ttl: int = 300): + self.l2_server = l2_server + self.cache_ttl = cache_ttl + self._cache: Dict[str, tuple[bool, float]] = {} + + def is_shared(self, content_hash: str) -> bool: + """Check if content_hash has been published to L2.""" + import time + now = time.time() + + # Check cache + if content_hash in self._cache: + is_shared, cached_at = self._cache[content_hash] + if now - cached_at < self.cache_ttl: + return is_shared + + # Query L2 + try: + resp = requests.get( + f"{self.l2_server}/registry/by-hash/{content_hash}", + timeout=5 + ) + is_shared = resp.status_code == 200 + except Exception as e: + logger.warning(f"Failed to check L2 for {content_hash}: {e}") + # On error, assume not shared (safer for deletion) + is_shared = False + + self._cache[content_hash] = (is_shared, now) + return is_shared + + def invalidate(self, content_hash: str): + """Invalidate cache for a content_hash (call after publishing).""" + self._cache.pop(content_hash, None) + + def mark_shared(self, content_hash: str): + """Mark as shared without querying (call after successful publish).""" + import time + self._cache[content_hash] = (True, time.time()) + + +class L1CacheManager: + """ + Unified cache manager for Art DAG L1 server. + + Combines: + - artdag Cache for file storage + - ActivityStore for run tracking + - ActivityManager for deletion rules + - L2 integration for shared status + + Provides both node_id and content_hash based access. + """ + + def __init__( + self, + cache_dir: Path | str, + l2_server: str = "http://localhost:8200", + ): + self.cache_dir = Path(cache_dir) + self.cache_dir.mkdir(parents=True, exist_ok=True) + + # artdag components + self.cache = Cache(self.cache_dir / "nodes") + self.activity_store = ActivityStore(self.cache_dir / "activities") + + # L2 shared checker + self.l2_checker = L2SharedChecker(l2_server) + + # Activity manager with L2-based is_shared + self.activity_manager = ActivityManager( + cache=self.cache, + activity_store=self.activity_store, + is_shared_fn=self._is_shared_by_node_id, + ) + + # Content hash index: content_hash -> node_id + # This enables lookup by content_hash for API compatibility + self._content_index: Dict[str, str] = {} + self._load_content_index() + + # Legacy files directory (for files uploaded directly by content_hash) + self.legacy_dir = self.cache_dir / "legacy" + self.legacy_dir.mkdir(parents=True, exist_ok=True) + + def _index_path(self) -> Path: + return self.cache_dir / "content_index.json" + + def _load_content_index(self): + """Load content_hash -> node_id index.""" + if self._index_path().exists(): + try: + with open(self._index_path()) as f: + self._content_index = json.load(f) + except (json.JSONDecodeError, IOError) as e: + logger.warning(f"Failed to load content index: {e}") + self._content_index = {} + + # Also index from existing cache entries + for entry in self.cache.list_entries(): + if entry.content_hash: + self._content_index[entry.content_hash] = entry.node_id + + def _save_content_index(self): + """Save content_hash -> node_id index.""" + with open(self._index_path(), "w") as f: + json.dump(self._content_index, f, indent=2) + + def _is_shared_by_node_id(self, content_hash: str) -> bool: + """Check if a content_hash is shared via L2.""" + return self.l2_checker.is_shared(content_hash) + + # ============ File Storage ============ + + def put( + self, + source_path: Path, + node_type: str = "upload", + node_id: str = None, + execution_time: float = 0.0, + move: bool = False, + ) -> CachedFile: + """ + Store a file in the cache. + + Args: + source_path: Path to file to cache + node_type: Type of node (e.g., "upload", "source", "effect") + node_id: Optional node_id; if not provided, uses content_hash + execution_time: How long the operation took + move: If True, move instead of copy + + Returns: + CachedFile with both node_id and content_hash + """ + # Compute content hash first + content_hash = file_hash(source_path) + + # Use content_hash as node_id if not provided + # This is for legacy/uploaded files that don't have a DAG node + if node_id is None: + node_id = content_hash + + # Check if already cached (by node_id) + existing = self.cache.get_entry(node_id) + if existing and existing.output_path.exists(): + return CachedFile.from_cache_entry(existing) + + # Store in cache + self.cache.put( + node_id=node_id, + source_path=source_path, + node_type=node_type, + execution_time=execution_time, + move=move, + ) + + entry = self.cache.get_entry(node_id) + + # Update content index + self._content_index[entry.content_hash] = node_id + self._save_content_index() + + return CachedFile.from_cache_entry(entry) + + def get_by_node_id(self, node_id: str) -> Optional[Path]: + """Get cached file path by node_id.""" + return self.cache.get(node_id) + + def get_by_content_hash(self, content_hash: str) -> Optional[Path]: + """Get cached file path by content_hash.""" + # Check index first + node_id = self._content_index.get(content_hash) + if node_id: + path = self.cache.get(node_id) + if path: + return path + + # Check legacy directory + legacy_path = self.legacy_dir / content_hash + if legacy_path.exists(): + return legacy_path + + # Scan cache entries (fallback) + entry = self.cache.find_by_content_hash(content_hash) + if entry: + self._content_index[content_hash] = entry.node_id + self._save_content_index() + return entry.output_path + + return None + + def has_content(self, content_hash: str) -> bool: + """Check if content exists in cache.""" + return self.get_by_content_hash(content_hash) is not None + + def get_entry_by_content_hash(self, content_hash: str) -> Optional[CacheEntry]: + """Get cache entry by content_hash.""" + node_id = self._content_index.get(content_hash) + if node_id: + return self.cache.get_entry(node_id) + return self.cache.find_by_content_hash(content_hash) + + def list_all(self) -> List[CachedFile]: + """List all cached files.""" + files = [] + for entry in self.cache.list_entries(): + files.append(CachedFile.from_cache_entry(entry)) + + # Include legacy files + for f in self.legacy_dir.iterdir(): + if f.is_file(): + files.append(CachedFile( + node_id=f.name, + content_hash=f.name, + path=f, + size_bytes=f.stat().st_size, + node_type="legacy", + created_at=f.stat().st_mtime, + )) + + return files + + # ============ Activity Tracking ============ + + def record_activity(self, dag: DAG, run_id: str = None) -> Activity: + """ + Record a DAG execution as an activity. + + Args: + dag: The executed DAG + run_id: Optional run ID to use as activity_id + + Returns: + The created Activity + """ + activity = Activity.from_dag(dag, activity_id=run_id) + self.activity_store.add(activity) + return activity + + def record_simple_activity( + self, + input_hashes: List[str], + output_hash: str, + run_id: str = None, + ) -> Activity: + """ + Record a simple (non-DAG) execution as an activity. + + For legacy single-effect runs that don't use full DAG execution. + Uses content_hash as node_id. + """ + activity = Activity( + activity_id=run_id or str(hash((tuple(input_hashes), output_hash))), + input_ids=sorted(input_hashes), + output_id=output_hash, + intermediate_ids=[], + created_at=datetime.now(timezone.utc).timestamp(), + status="completed", + ) + self.activity_store.add(activity) + return activity + + def get_activity(self, activity_id: str) -> Optional[Activity]: + """Get activity by ID.""" + return self.activity_store.get(activity_id) + + def list_activities(self) -> List[Activity]: + """List all activities.""" + return self.activity_store.list() + + def find_activities_by_inputs(self, input_hashes: List[str]) -> List[Activity]: + """Find activities with matching inputs (for UI grouping).""" + return self.activity_store.find_by_input_ids(input_hashes) + + # ============ Deletion Rules ============ + + def can_delete(self, content_hash: str) -> tuple[bool, str]: + """ + Check if a cached item can be deleted. + + Returns: + (can_delete, reason) tuple + """ + # Check if shared via L2 + if self.l2_checker.is_shared(content_hash): + return False, "Item is published to L2" + + # Find node_id for this content + node_id = self._content_index.get(content_hash, content_hash) + + # Check if it's an input or output of any activity + for activity in self.activity_store.list(): + if node_id in activity.input_ids: + return False, f"Item is input to activity {activity.activity_id}" + if node_id == activity.output_id: + return False, f"Item is output of activity {activity.activity_id}" + + return True, "OK" + + def can_discard_activity(self, activity_id: str) -> tuple[bool, str]: + """ + Check if an activity can be discarded. + + Returns: + (can_discard, reason) tuple + """ + activity = self.activity_store.get(activity_id) + if not activity: + return False, "Activity not found" + + # Check if any item is shared + for node_id in activity.all_node_ids: + entry = self.cache.get_entry(node_id) + if entry and self.l2_checker.is_shared(entry.content_hash): + return False, f"Item {node_id} is published to L2" + + return True, "OK" + + def delete_by_content_hash(self, content_hash: str) -> tuple[bool, str]: + """ + Delete a cached item by content_hash. + + Enforces deletion rules. + + Returns: + (success, message) tuple + """ + can_delete, reason = self.can_delete(content_hash) + if not can_delete: + return False, reason + + # Find and delete + node_id = self._content_index.get(content_hash) + if node_id: + self.cache.remove(node_id) + del self._content_index[content_hash] + self._save_content_index() + return True, "Deleted" + + # Try legacy + legacy_path = self.legacy_dir / content_hash + if legacy_path.exists(): + legacy_path.unlink() + return True, "Deleted (legacy)" + + return False, "Not found" + + def discard_activity(self, activity_id: str) -> tuple[bool, str]: + """ + Discard an activity and clean up its cache entries. + + Enforces deletion rules. + + Returns: + (success, message) tuple + """ + can_discard, reason = self.can_discard_activity(activity_id) + if not can_discard: + return False, reason + + success = self.activity_manager.discard_activity(activity_id) + if success: + return True, "Activity discarded" + return False, "Failed to discard" + + def cleanup_intermediates(self) -> int: + """Delete all intermediate cache entries (reconstructible).""" + return self.activity_manager.cleanup_intermediates() + + def get_deletable_items(self) -> List[CachedFile]: + """Get all items that can be deleted.""" + deletable = [] + for entry in self.activity_manager.get_deletable_entries(): + deletable.append(CachedFile.from_cache_entry(entry)) + return deletable + + # ============ L2 Integration ============ + + def mark_published(self, content_hash: str): + """Mark a content_hash as published to L2.""" + self.l2_checker.mark_shared(content_hash) + + def invalidate_shared_cache(self, content_hash: str): + """Invalidate shared status cache (call if item might be unpublished).""" + self.l2_checker.invalidate(content_hash) + + # ============ Stats ============ + + def get_stats(self) -> dict: + """Get cache statistics.""" + stats = self.cache.get_stats() + return { + "total_entries": stats.total_entries, + "total_size_bytes": stats.total_size_bytes, + "hits": stats.hits, + "misses": stats.misses, + "hit_rate": stats.hit_rate, + "activities": len(self.activity_store), + } + + +# Singleton instance (initialized on first import with env vars) +_manager: Optional[L1CacheManager] = None + + +def get_cache_manager() -> L1CacheManager: + """Get the singleton cache manager instance.""" + global _manager + if _manager is None: + cache_dir = Path(os.environ.get("CACHE_DIR", str(Path.home() / ".artdag" / "cache"))) + l2_server = os.environ.get("L2_SERVER", "http://localhost:8200") + _manager = L1CacheManager(cache_dir=cache_dir, l2_server=l2_server) + return _manager + + +def reset_cache_manager(): + """Reset the singleton (for testing).""" + global _manager + _manager = None diff --git a/server.py b/server.py index 9e8729d..de3bbe1 100644 --- a/server.py +++ b/server.py @@ -27,6 +27,7 @@ from urllib.parse import urlparse from celery_app import app as celery_app from tasks import render_effect +from cache_manager import L1CacheManager, get_cache_manager # L2 server for auth verification L2_SERVER = os.environ.get("L2_SERVER", "http://localhost:8200") @@ -37,6 +38,9 @@ L1_PUBLIC_URL = os.environ.get("L1_PUBLIC_URL", "http://localhost:8100") CACHE_DIR = Path(os.environ.get("CACHE_DIR", str(Path.home() / ".artdag" / "cache"))) CACHE_DIR.mkdir(parents=True, exist_ok=True) +# Initialize L1 cache manager with artdag integration +cache_manager = L1CacheManager(cache_dir=CACHE_DIR, l2_server=L2_SERVER) + # Redis for persistent run storage REDIS_URL = os.environ.get('REDIS_URL', 'redis://localhost:6379/5') parsed = urlparse(REDIS_URL) @@ -152,14 +156,14 @@ def file_hash(path: Path) -> str: return hasher.hexdigest() -def cache_file(source: Path) -> str: - """Copy file to cache, return content hash.""" - content_hash = file_hash(source) - cache_path = CACHE_DIR / content_hash - if not cache_path.exists(): - import shutil - shutil.copy2(source, cache_path) - return content_hash +def cache_file(source: Path, node_type: str = "output") -> str: + """ + Copy file to cache using L1CacheManager, return content hash. + + Uses artdag's Cache internally for proper tracking. + """ + cached = cache_manager.put(source, node_type=node_type) + return cached.content_hash @app.get("/api") @@ -321,7 +325,15 @@ async def get_run(run_id: str): # Cache the output output_path = Path(result.get("output", {}).get("local_path", "")) if output_path.exists(): - cache_file(output_path) + cache_file(output_path, node_type="effect_output") + + # Record activity for deletion tracking + if run.output_hash and run.inputs: + cache_manager.record_simple_activity( + input_hashes=run.inputs, + output_hash=run.output_hash, + run_id=run.run_id, + ) else: run.status = "failed" run.error = str(task.result) @@ -332,6 +344,42 @@ async def get_run(run_id: str): return run +@app.delete("/runs/{run_id}") +async def discard_run(run_id: str, username: str = Depends(get_required_user)): + """ + Discard (delete) a run and its intermediate cache entries. + + Enforces deletion rules: + - Cannot discard if any item (input, output) is published to L2 + - Deletes intermediate cache entries + - Keeps inputs (may be used by other runs) + - Deletes orphaned outputs + """ + run = load_run(run_id) + if not run: + raise HTTPException(404, f"Run {run_id} not found") + + # Check ownership + actor_id = f"@{username}@{L2_DOMAIN}" + if run.username not in (username, actor_id): + raise HTTPException(403, "Access denied") + + # Check if run can be discarded + can_discard, reason = cache_manager.can_discard_activity(run_id) + if not can_discard: + raise HTTPException(400, f"Cannot discard run: {reason}") + + # Discard the activity (cleans up cache entries) + success, msg = cache_manager.discard_activity(run_id) + if not success: + raise HTTPException(500, f"Failed to discard: {msg}") + + # Remove from Redis + redis_client.delete(f"{RUNS_KEY_PREFIX}{run_id}") + + return {"discarded": True, "run_id": run_id} + + @app.get("/run/{run_id}") async def run_detail(run_id: str, request: Request): """Run detail. HTML for browsers, JSON for APIs.""" @@ -1428,31 +1476,45 @@ async def discard_cache(content_hash: str, username: str = Depends(get_required_ """ Discard (delete) a cached item. - Refuses to delete pinned items. Pinned items include: - - Published items - - Inputs to published items + Enforces deletion rules: + - Cannot delete items published to L2 (shared) + - Cannot delete inputs/outputs of activities (runs) + - Cannot delete pinned items """ - cache_path = CACHE_DIR / content_hash - if not cache_path.exists(): - raise HTTPException(404, "Content not found") + # Check if content exists (in cache_manager or legacy location) + if not cache_manager.has_content(content_hash): + cache_path = CACHE_DIR / content_hash + if not cache_path.exists(): + raise HTTPException(404, "Content not found") # Check ownership user_hashes = get_user_cache_hashes(username) if content_hash not in user_hashes: raise HTTPException(403, "Access denied") - # Check if pinned + # Check if pinned (legacy metadata) meta = load_cache_meta(content_hash) if meta.get("pinned"): pin_reason = meta.get("pin_reason", "unknown") raise HTTPException(400, f"Cannot discard pinned item (reason: {pin_reason})") - # Delete the file and metadata - cache_path.unlink() + # Check deletion rules via cache_manager + can_delete, reason = cache_manager.can_delete(content_hash) + if not can_delete: + raise HTTPException(400, f"Cannot discard: {reason}") + + # Delete via cache_manager + success, msg = cache_manager.delete_by_content_hash(content_hash) + if not success: + # Fallback to legacy deletion + cache_path = CACHE_DIR / content_hash + if cache_path.exists(): + cache_path.unlink() + + # Clean up legacy metadata files meta_path = CACHE_DIR / f"{content_hash}.meta.json" if meta_path.exists(): meta_path.unlink() - # Also delete transcoded mp4 if exists mp4_path = CACHE_DIR / f"{content_hash}.mp4" if mp4_path.exists(): mp4_path.unlink() @@ -1472,18 +1534,32 @@ async def ui_discard_cache(content_hash: str, request: Request): if content_hash not in user_hashes: return '
Access denied
' - cache_path = CACHE_DIR / content_hash - if not cache_path.exists(): - return '
Content not found
' + # Check if content exists + if not cache_manager.has_content(content_hash): + cache_path = CACHE_DIR / content_hash + if not cache_path.exists(): + return '
Content not found
' - # Check if pinned + # Check if pinned (legacy metadata) meta = load_cache_meta(content_hash) if meta.get("pinned"): pin_reason = meta.get("pin_reason", "unknown") return f'
Cannot discard: item is pinned ({pin_reason})
' - # Delete the file and metadata - cache_path.unlink() + # Check deletion rules via cache_manager + can_delete, reason = cache_manager.can_delete(content_hash) + if not can_delete: + return f'
Cannot discard: {reason}
' + + # Delete via cache_manager + success, msg = cache_manager.delete_by_content_hash(content_hash) + if not success: + # Fallback to legacy deletion + cache_path = CACHE_DIR / content_hash + if cache_path.exists(): + cache_path.unlink() + + # Clean up legacy metadata files meta_path = CACHE_DIR / f"{content_hash}.meta.json" if meta_path.exists(): meta_path.unlink() diff --git a/tests/test_cache_manager.py b/tests/test_cache_manager.py new file mode 100644 index 0000000..76446f0 --- /dev/null +++ b/tests/test_cache_manager.py @@ -0,0 +1,397 @@ +# tests/test_cache_manager.py +"""Tests for the L1 cache manager.""" + +import tempfile +import time +from pathlib import Path +from unittest.mock import Mock, patch + +import pytest + +from cache_manager import ( + L1CacheManager, + L2SharedChecker, + CachedFile, + file_hash, +) + + +@pytest.fixture +def temp_dir(): + """Create a temporary directory for tests.""" + with tempfile.TemporaryDirectory() as tmpdir: + yield Path(tmpdir) + + +@pytest.fixture +def mock_l2(): + """Mock L2 server responses.""" + with patch("cache_manager.requests") as mock_requests: + mock_requests.get.return_value = Mock(status_code=404) + yield mock_requests + + +@pytest.fixture +def manager(temp_dir, mock_l2): + """Create a cache manager instance.""" + return L1CacheManager( + cache_dir=temp_dir / "cache", + l2_server="http://mock-l2:8200", + ) + + +def create_test_file(path: Path, content: str = "test content") -> Path: + """Create a test file with content.""" + path.parent.mkdir(parents=True, exist_ok=True) + path.write_text(content) + return path + + +class TestFileHash: + """Tests for file_hash function.""" + + def test_consistent_hash(self, temp_dir): + """Same content produces same hash.""" + file1 = create_test_file(temp_dir / "f1.txt", "hello") + file2 = create_test_file(temp_dir / "f2.txt", "hello") + + assert file_hash(file1) == file_hash(file2) + + def test_different_content_different_hash(self, temp_dir): + """Different content produces different hash.""" + file1 = create_test_file(temp_dir / "f1.txt", "hello") + file2 = create_test_file(temp_dir / "f2.txt", "world") + + assert file_hash(file1) != file_hash(file2) + + def test_sha3_256_length(self, temp_dir): + """Hash is SHA3-256 (64 hex chars).""" + f = create_test_file(temp_dir / "f.txt", "test") + assert len(file_hash(f)) == 64 + + +class TestL2SharedChecker: + """Tests for L2 shared status checking.""" + + def test_not_shared_returns_false(self, mock_l2): + """Non-existent content returns False.""" + checker = L2SharedChecker("http://mock:8200") + mock_l2.get.return_value = Mock(status_code=404) + + assert checker.is_shared("abc123") is False + + def test_shared_returns_true(self, mock_l2): + """Published content returns True.""" + checker = L2SharedChecker("http://mock:8200") + mock_l2.get.return_value = Mock(status_code=200) + + assert checker.is_shared("abc123") is True + + def test_caches_result(self, mock_l2): + """Results are cached to avoid repeated API calls.""" + checker = L2SharedChecker("http://mock:8200", cache_ttl=60) + mock_l2.get.return_value = Mock(status_code=200) + + checker.is_shared("abc123") + checker.is_shared("abc123") + + # Should only call API once + assert mock_l2.get.call_count == 1 + + def test_mark_shared(self, mock_l2): + """mark_shared updates cache without API call.""" + checker = L2SharedChecker("http://mock:8200") + + checker.mark_shared("abc123") + + assert checker.is_shared("abc123") is True + assert mock_l2.get.call_count == 0 + + def test_invalidate(self, mock_l2): + """invalidate clears cache for a hash.""" + checker = L2SharedChecker("http://mock:8200") + mock_l2.get.return_value = Mock(status_code=200) + + checker.is_shared("abc123") + checker.invalidate("abc123") + + mock_l2.get.return_value = Mock(status_code=404) + assert checker.is_shared("abc123") is False + + def test_error_returns_false(self, mock_l2): + """API errors return False (safe for deletion).""" + checker = L2SharedChecker("http://mock:8200") + mock_l2.get.side_effect = Exception("Network error") + + assert checker.is_shared("abc123") is False + + +class TestL1CacheManagerStorage: + """Tests for cache storage operations.""" + + def test_put_and_get_by_content_hash(self, manager, temp_dir): + """Can store and retrieve by content hash.""" + test_file = create_test_file(temp_dir / "input.txt", "hello world") + + cached = manager.put(test_file, node_type="test") + + retrieved_path = manager.get_by_content_hash(cached.content_hash) + assert retrieved_path is not None + assert retrieved_path.read_text() == "hello world" + + def test_put_with_custom_node_id(self, manager, temp_dir): + """Can store with custom node_id.""" + test_file = create_test_file(temp_dir / "input.txt", "content") + + cached = manager.put(test_file, node_id="custom-node-123", node_type="test") + + assert cached.node_id == "custom-node-123" + assert manager.get_by_node_id("custom-node-123") is not None + + def test_has_content(self, manager, temp_dir): + """has_content checks existence.""" + test_file = create_test_file(temp_dir / "input.txt", "data") + + cached = manager.put(test_file, node_type="test") + + assert manager.has_content(cached.content_hash) is True + assert manager.has_content("nonexistent") is False + + def test_list_all(self, manager, temp_dir): + """list_all returns all cached files.""" + f1 = create_test_file(temp_dir / "f1.txt", "one") + f2 = create_test_file(temp_dir / "f2.txt", "two") + + manager.put(f1, node_type="test") + manager.put(f2, node_type="test") + + all_files = manager.list_all() + assert len(all_files) == 2 + + def test_deduplication(self, manager, temp_dir): + """Same content is not stored twice.""" + f1 = create_test_file(temp_dir / "f1.txt", "identical") + f2 = create_test_file(temp_dir / "f2.txt", "identical") + + cached1 = manager.put(f1, node_type="test") + cached2 = manager.put(f2, node_type="test") + + assert cached1.content_hash == cached2.content_hash + assert len(manager.list_all()) == 1 + + +class TestL1CacheManagerActivities: + """Tests for activity tracking.""" + + def test_record_simple_activity(self, manager, temp_dir): + """Can record a simple activity.""" + input_file = create_test_file(temp_dir / "input.txt", "input") + output_file = create_test_file(temp_dir / "output.txt", "output") + + input_cached = manager.put(input_file, node_type="source") + output_cached = manager.put(output_file, node_type="effect") + + activity = manager.record_simple_activity( + input_hashes=[input_cached.content_hash], + output_hash=output_cached.content_hash, + run_id="run-001", + ) + + assert activity.activity_id == "run-001" + assert input_cached.content_hash in activity.input_ids + assert activity.output_id == output_cached.content_hash + + def test_list_activities(self, manager, temp_dir): + """Can list all activities.""" + for i in range(3): + inp = create_test_file(temp_dir / f"in{i}.txt", f"input{i}") + out = create_test_file(temp_dir / f"out{i}.txt", f"output{i}") + inp_c = manager.put(inp, node_type="source") + out_c = manager.put(out, node_type="effect") + manager.record_simple_activity([inp_c.content_hash], out_c.content_hash) + + activities = manager.list_activities() + assert len(activities) == 3 + + def test_find_activities_by_inputs(self, manager, temp_dir): + """Can find activities with same inputs.""" + input_file = create_test_file(temp_dir / "shared_input.txt", "shared") + input_cached = manager.put(input_file, node_type="source") + + # Two activities with same input + out1 = create_test_file(temp_dir / "out1.txt", "output1") + out2 = create_test_file(temp_dir / "out2.txt", "output2") + out1_c = manager.put(out1, node_type="effect") + out2_c = manager.put(out2, node_type="effect") + + manager.record_simple_activity([input_cached.content_hash], out1_c.content_hash, "run1") + manager.record_simple_activity([input_cached.content_hash], out2_c.content_hash, "run2") + + found = manager.find_activities_by_inputs([input_cached.content_hash]) + assert len(found) == 2 + + +class TestL1CacheManagerDeletionRules: + """Tests for deletion rules enforcement.""" + + def test_can_delete_orphaned_item(self, manager, temp_dir): + """Orphaned items can be deleted.""" + test_file = create_test_file(temp_dir / "orphan.txt", "orphan") + cached = manager.put(test_file, node_type="test") + + can_delete, reason = manager.can_delete(cached.content_hash) + assert can_delete is True + + def test_cannot_delete_activity_input(self, manager, temp_dir): + """Activity inputs cannot be deleted.""" + input_file = create_test_file(temp_dir / "input.txt", "input") + output_file = create_test_file(temp_dir / "output.txt", "output") + + input_cached = manager.put(input_file, node_type="source") + output_cached = manager.put(output_file, node_type="effect") + + manager.record_simple_activity( + [input_cached.content_hash], + output_cached.content_hash, + ) + + can_delete, reason = manager.can_delete(input_cached.content_hash) + assert can_delete is False + assert "input" in reason.lower() + + def test_cannot_delete_activity_output(self, manager, temp_dir): + """Activity outputs cannot be deleted.""" + input_file = create_test_file(temp_dir / "input.txt", "input") + output_file = create_test_file(temp_dir / "output.txt", "output") + + input_cached = manager.put(input_file, node_type="source") + output_cached = manager.put(output_file, node_type="effect") + + manager.record_simple_activity( + [input_cached.content_hash], + output_cached.content_hash, + ) + + can_delete, reason = manager.can_delete(output_cached.content_hash) + assert can_delete is False + assert "output" in reason.lower() + + def test_cannot_delete_shared_item(self, manager, temp_dir, mock_l2): + """Published items cannot be deleted.""" + test_file = create_test_file(temp_dir / "shared.txt", "shared") + cached = manager.put(test_file, node_type="test") + + # Mark as published + mock_l2.get.return_value = Mock(status_code=200) + manager.l2_checker.invalidate(cached.content_hash) + + can_delete, reason = manager.can_delete(cached.content_hash) + assert can_delete is False + assert "L2" in reason + + def test_delete_orphaned_item(self, manager, temp_dir): + """Can delete orphaned items.""" + test_file = create_test_file(temp_dir / "orphan.txt", "orphan") + cached = manager.put(test_file, node_type="test") + + success, msg = manager.delete_by_content_hash(cached.content_hash) + + assert success is True + assert manager.has_content(cached.content_hash) is False + + def test_delete_protected_item_fails(self, manager, temp_dir): + """Cannot delete protected items.""" + input_file = create_test_file(temp_dir / "input.txt", "input") + output_file = create_test_file(temp_dir / "output.txt", "output") + + input_cached = manager.put(input_file, node_type="source") + output_cached = manager.put(output_file, node_type="effect") + + manager.record_simple_activity( + [input_cached.content_hash], + output_cached.content_hash, + ) + + success, msg = manager.delete_by_content_hash(input_cached.content_hash) + + assert success is False + assert manager.has_content(input_cached.content_hash) is True + + +class TestL1CacheManagerActivityDiscard: + """Tests for activity discard functionality.""" + + def test_can_discard_unshared_activity(self, manager, temp_dir): + """Activities with no shared items can be discarded.""" + input_file = create_test_file(temp_dir / "input.txt", "input") + output_file = create_test_file(temp_dir / "output.txt", "output") + + input_cached = manager.put(input_file, node_type="source") + output_cached = manager.put(output_file, node_type="effect") + + activity = manager.record_simple_activity( + [input_cached.content_hash], + output_cached.content_hash, + "run-001", + ) + + can_discard, reason = manager.can_discard_activity("run-001") + assert can_discard is True + + def test_cannot_discard_activity_with_shared_output(self, manager, temp_dir, mock_l2): + """Activities with shared outputs cannot be discarded.""" + input_file = create_test_file(temp_dir / "input.txt", "input") + output_file = create_test_file(temp_dir / "output.txt", "output") + + input_cached = manager.put(input_file, node_type="source") + output_cached = manager.put(output_file, node_type="effect") + + manager.record_simple_activity( + [input_cached.content_hash], + output_cached.content_hash, + "run-001", + ) + + # Mark output as shared + manager.l2_checker.mark_shared(output_cached.content_hash) + + can_discard, reason = manager.can_discard_activity("run-001") + assert can_discard is False + assert "L2" in reason + + def test_discard_activity_cleans_up(self, manager, temp_dir): + """Discarding activity cleans up orphaned items.""" + input_file = create_test_file(temp_dir / "input.txt", "input") + output_file = create_test_file(temp_dir / "output.txt", "output") + + input_cached = manager.put(input_file, node_type="source") + output_cached = manager.put(output_file, node_type="effect") + + manager.record_simple_activity( + [input_cached.content_hash], + output_cached.content_hash, + "run-001", + ) + + success, msg = manager.discard_activity("run-001") + + assert success is True + assert manager.get_activity("run-001") is None + + +class TestL1CacheManagerStats: + """Tests for cache statistics.""" + + def test_get_stats(self, manager, temp_dir): + """get_stats returns cache statistics.""" + f1 = create_test_file(temp_dir / "f1.txt", "content1") + f2 = create_test_file(temp_dir / "f2.txt", "content2") + + manager.put(f1, node_type="test") + manager.put(f2, node_type="test") + + stats = manager.get_stats() + + assert stats["total_entries"] == 2 + assert stats["total_size_bytes"] > 0 + assert "activities" in stats