From f23a721816e27f654761574325e241e9072b8e71 Mon Sep 17 00:00:00 2001 From: gilesb Date: Thu, 8 Jan 2026 02:44:18 +0000 Subject: [PATCH] Use local pinned metadata for deletion checks instead of L2 API - Add is_pinned(), pin(), _load_meta(), _save_meta() to L1CacheManager - Update can_delete() and can_discard_activity() to check local pinned status - Update run deletion endpoints (API and UI) to check pinned metadata - Remove L2 shared check fallback from run deletion - Fix L2SharedChecker to return True on error (safer - prevents accidental deletion) - Update tests for new pinned behavior When items are published to L2, the publish flow marks them as pinned locally. This ensures items remain non-deletable even if L2 is unreachable, and both outputs AND inputs of published runs are protected. Co-Authored-By: Claude Opus 4.5 --- cache_manager.py | 61 +++++++++++++++++++++++++++++-------- server.py | 40 +++++++++++++----------- tests/test_cache_manager.py | 28 ++++++++--------- 3 files changed, 85 insertions(+), 44 deletions(-) diff --git a/cache_manager.py b/cache_manager.py index d7b425b..04b3942 100644 --- a/cache_manager.py +++ b/cache_manager.py @@ -86,19 +86,20 @@ class L2SharedChecker: if content_hash in self._cache: is_shared, cached_at = self._cache[content_hash] if now - cached_at < self.cache_ttl: + logger.debug(f"L2 check (cached): {content_hash[:16]}... = {is_shared}") return is_shared # Query L2 try: - resp = requests.get( - f"{self.l2_server}/registry/by-hash/{content_hash}", - timeout=5 - ) + url = f"{self.l2_server}/registry/by-hash/{content_hash}" + logger.info(f"L2 check: GET {url}") + resp = requests.get(url, timeout=5) + logger.info(f"L2 check response: {resp.status_code}") is_shared = resp.status_code == 200 except Exception as e: logger.warning(f"Failed to check L2 for {content_hash}: {e}") - # On error, assume not shared (safer for deletion) - is_shared = False + # On error, assume IS shared (safer - prevents accidental deletion) + is_shared = True self._cache[content_hash] = (is_shared, now) return is_shared @@ -184,6 +185,39 @@ class L1CacheManager: """Check if a content_hash is shared via L2.""" return self.l2_checker.is_shared(content_hash) + def _load_meta(self, content_hash: str) -> dict: + """Load metadata for a cached file.""" + meta_path = self.cache_dir / f"{content_hash}.meta.json" + if meta_path.exists(): + with open(meta_path) as f: + return json.load(f) + return {} + + def is_pinned(self, content_hash: str) -> tuple[bool, str]: + """ + Check if a content_hash is pinned (non-deletable). + + Returns: + (is_pinned, reason) tuple + """ + meta = self._load_meta(content_hash) + if meta.get("pinned"): + return True, meta.get("pin_reason", "published") + return False, "" + + def _save_meta(self, content_hash: str, **updates) -> dict: + """Save/update metadata for a cached file.""" + meta = self._load_meta(content_hash) + meta.update(updates) + meta_path = self.cache_dir / f"{content_hash}.meta.json" + with open(meta_path, "w") as f: + json.dump(meta, f, indent=2) + return meta + + def pin(self, content_hash: str, reason: str = "published") -> None: + """Mark an item as pinned (non-deletable).""" + self._save_meta(content_hash, pinned=True, pin_reason=reason) + # ============ File Storage ============ def put( @@ -396,9 +430,10 @@ class L1CacheManager: Returns: (can_delete, reason) tuple """ - # Check if shared via L2 - if self.l2_checker.is_shared(content_hash): - return False, "Item is published to L2" + # Check if pinned (published or input to published) + pinned, reason = self.is_pinned(content_hash) + if pinned: + return False, f"Item is pinned ({reason})" # Find node_id for this content node_id = self._content_index.get(content_hash, content_hash) @@ -423,11 +458,13 @@ class L1CacheManager: if not activity: return False, "Activity not found" - # Check if any item is shared + # Check if any item is pinned for node_id in activity.all_node_ids: entry = self.cache.get_entry(node_id) - if entry and self.l2_checker.is_shared(entry.content_hash): - return False, f"Item {node_id} is published to L2" + if entry: + pinned, reason = self.is_pinned(entry.content_hash) + if pinned: + return False, f"Item {node_id} is pinned ({reason})" return True, "OK" diff --git a/server.py b/server.py index d7191da..78b54d3 100644 --- a/server.py +++ b/server.py @@ -444,6 +444,17 @@ async def discard_run(run_id: str, username: str = Depends(get_required_user)): # Failed runs can always be deleted (no output to protect) if run.status != "failed": + # Check if any items are pinned (published or input to published) + items_to_check = list(run.inputs or []) + if run.output_hash: + items_to_check.append(run.output_hash) + + for content_hash in items_to_check: + meta = load_cache_meta(content_hash) + if meta.get("pinned"): + pin_reason = meta.get("pin_reason", "published") + raise HTTPException(400, f"Cannot discard run: item {content_hash[:16]}... is pinned ({pin_reason})") + # Check if activity exists for this run activity = cache_manager.get_activity(run_id) @@ -457,15 +468,6 @@ async def discard_run(run_id: str, username: str = Depends(get_required_user)): success, msg = cache_manager.discard_activity(run_id) if not success: raise HTTPException(500, f"Failed to discard: {msg}") - else: - # Legacy run without activity record - check L2 shared status manually - items_to_check = list(run.inputs or []) - if run.output_hash: - items_to_check.append(run.output_hash) - - for content_hash in items_to_check: - if cache_manager.l2_checker.is_shared(content_hash): - raise HTTPException(400, f"Cannot discard run: item {content_hash[:16]}... is published to L2") # Remove from Redis redis_client.delete(f"{RUNS_KEY_PREFIX}{run_id}") @@ -491,6 +493,17 @@ async def ui_discard_run(run_id: str, request: Request): # Failed runs can always be deleted if run.status != "failed": + # Check if any items are pinned (published or input to published) + items_to_check = list(run.inputs or []) + if run.output_hash: + items_to_check.append(run.output_hash) + + for content_hash in items_to_check: + meta = load_cache_meta(content_hash) + if meta.get("pinned"): + pin_reason = meta.get("pin_reason", "published") + return f'
Cannot discard: item {content_hash[:16]}... is pinned ({pin_reason})
' + # Check if activity exists for this run activity = cache_manager.get_activity(run_id) @@ -502,15 +515,6 @@ async def ui_discard_run(run_id: str, request: Request): success, msg = cache_manager.discard_activity(run_id) if not success: return f'
Failed to discard: {msg}
' - else: - # Legacy run - check L2 shared status - items_to_check = list(run.inputs or []) - if run.output_hash: - items_to_check.append(run.output_hash) - - for content_hash in items_to_check: - if cache_manager.l2_checker.is_shared(content_hash): - return f'
Cannot discard: item {content_hash[:16]}... is published to L2
' # Remove from Redis redis_client.delete(f"{RUNS_KEY_PREFIX}{run_id}") diff --git a/tests/test_cache_manager.py b/tests/test_cache_manager.py index 76446f0..e8cd5fc 100644 --- a/tests/test_cache_manager.py +++ b/tests/test_cache_manager.py @@ -118,12 +118,13 @@ class TestL2SharedChecker: mock_l2.get.return_value = Mock(status_code=404) assert checker.is_shared("abc123") is False - def test_error_returns_false(self, mock_l2): - """API errors return False (safe for deletion).""" + def test_error_returns_true(self, mock_l2): + """API errors return True (safe - prevents accidental deletion).""" checker = L2SharedChecker("http://mock:8200") mock_l2.get.side_effect = Exception("Network error") - assert checker.is_shared("abc123") is False + # On error, assume IS shared to prevent accidental deletion + assert checker.is_shared("abc123") is True class TestL1CacheManagerStorage: @@ -276,18 +277,17 @@ class TestL1CacheManagerDeletionRules: assert can_delete is False assert "output" in reason.lower() - def test_cannot_delete_shared_item(self, manager, temp_dir, mock_l2): - """Published items cannot be deleted.""" + def test_cannot_delete_pinned_item(self, manager, temp_dir): + """Pinned items cannot be deleted.""" test_file = create_test_file(temp_dir / "shared.txt", "shared") cached = manager.put(test_file, node_type="test") - # Mark as published - mock_l2.get.return_value = Mock(status_code=200) - manager.l2_checker.invalidate(cached.content_hash) + # Mark as pinned (published) + manager.pin(cached.content_hash, reason="published") can_delete, reason = manager.can_delete(cached.content_hash) assert can_delete is False - assert "L2" in reason + assert "pinned" in reason def test_delete_orphaned_item(self, manager, temp_dir): """Can delete orphaned items.""" @@ -338,8 +338,8 @@ class TestL1CacheManagerActivityDiscard: can_discard, reason = manager.can_discard_activity("run-001") assert can_discard is True - def test_cannot_discard_activity_with_shared_output(self, manager, temp_dir, mock_l2): - """Activities with shared outputs cannot be discarded.""" + def test_cannot_discard_activity_with_pinned_output(self, manager, temp_dir): + """Activities with pinned outputs cannot be discarded.""" input_file = create_test_file(temp_dir / "input.txt", "input") output_file = create_test_file(temp_dir / "output.txt", "output") @@ -352,12 +352,12 @@ class TestL1CacheManagerActivityDiscard: "run-001", ) - # Mark output as shared - manager.l2_checker.mark_shared(output_cached.content_hash) + # Mark output as pinned (published) + manager.pin(output_cached.content_hash, reason="published") can_discard, reason = manager.can_discard_activity("run-001") assert can_discard is False - assert "L2" in reason + assert "pinned" in reason def test_discard_activity_cleans_up(self, manager, temp_dir): """Discarding activity cleans up orphaned items."""