Use local pinned metadata for deletion checks instead of L2 API
- Add is_pinned(), pin(), _load_meta(), _save_meta() to L1CacheManager - Update can_delete() and can_discard_activity() to check local pinned status - Update run deletion endpoints (API and UI) to check pinned metadata - Remove L2 shared check fallback from run deletion - Fix L2SharedChecker to return True on error (safer - prevents accidental deletion) - Update tests for new pinned behavior When items are published to L2, the publish flow marks them as pinned locally. This ensures items remain non-deletable even if L2 is unreachable, and both outputs AND inputs of published runs are protected. Co-Authored-By: Claude Opus 4.5 <noreply@anthropic.com>
This commit is contained in:
@@ -86,19 +86,20 @@ class L2SharedChecker:
|
|||||||
if content_hash in self._cache:
|
if content_hash in self._cache:
|
||||||
is_shared, cached_at = self._cache[content_hash]
|
is_shared, cached_at = self._cache[content_hash]
|
||||||
if now - cached_at < self.cache_ttl:
|
if now - cached_at < self.cache_ttl:
|
||||||
|
logger.debug(f"L2 check (cached): {content_hash[:16]}... = {is_shared}")
|
||||||
return is_shared
|
return is_shared
|
||||||
|
|
||||||
# Query L2
|
# Query L2
|
||||||
try:
|
try:
|
||||||
resp = requests.get(
|
url = f"{self.l2_server}/registry/by-hash/{content_hash}"
|
||||||
f"{self.l2_server}/registry/by-hash/{content_hash}",
|
logger.info(f"L2 check: GET {url}")
|
||||||
timeout=5
|
resp = requests.get(url, timeout=5)
|
||||||
)
|
logger.info(f"L2 check response: {resp.status_code}")
|
||||||
is_shared = resp.status_code == 200
|
is_shared = resp.status_code == 200
|
||||||
except Exception as e:
|
except Exception as e:
|
||||||
logger.warning(f"Failed to check L2 for {content_hash}: {e}")
|
logger.warning(f"Failed to check L2 for {content_hash}: {e}")
|
||||||
# On error, assume not shared (safer for deletion)
|
# On error, assume IS shared (safer - prevents accidental deletion)
|
||||||
is_shared = False
|
is_shared = True
|
||||||
|
|
||||||
self._cache[content_hash] = (is_shared, now)
|
self._cache[content_hash] = (is_shared, now)
|
||||||
return is_shared
|
return is_shared
|
||||||
@@ -184,6 +185,39 @@ class L1CacheManager:
|
|||||||
"""Check if a content_hash is shared via L2."""
|
"""Check if a content_hash is shared via L2."""
|
||||||
return self.l2_checker.is_shared(content_hash)
|
return self.l2_checker.is_shared(content_hash)
|
||||||
|
|
||||||
|
def _load_meta(self, content_hash: str) -> dict:
|
||||||
|
"""Load metadata for a cached file."""
|
||||||
|
meta_path = self.cache_dir / f"{content_hash}.meta.json"
|
||||||
|
if meta_path.exists():
|
||||||
|
with open(meta_path) as f:
|
||||||
|
return json.load(f)
|
||||||
|
return {}
|
||||||
|
|
||||||
|
def is_pinned(self, content_hash: str) -> tuple[bool, str]:
|
||||||
|
"""
|
||||||
|
Check if a content_hash is pinned (non-deletable).
|
||||||
|
|
||||||
|
Returns:
|
||||||
|
(is_pinned, reason) tuple
|
||||||
|
"""
|
||||||
|
meta = self._load_meta(content_hash)
|
||||||
|
if meta.get("pinned"):
|
||||||
|
return True, meta.get("pin_reason", "published")
|
||||||
|
return False, ""
|
||||||
|
|
||||||
|
def _save_meta(self, content_hash: str, **updates) -> dict:
|
||||||
|
"""Save/update metadata for a cached file."""
|
||||||
|
meta = self._load_meta(content_hash)
|
||||||
|
meta.update(updates)
|
||||||
|
meta_path = self.cache_dir / f"{content_hash}.meta.json"
|
||||||
|
with open(meta_path, "w") as f:
|
||||||
|
json.dump(meta, f, indent=2)
|
||||||
|
return meta
|
||||||
|
|
||||||
|
def pin(self, content_hash: str, reason: str = "published") -> None:
|
||||||
|
"""Mark an item as pinned (non-deletable)."""
|
||||||
|
self._save_meta(content_hash, pinned=True, pin_reason=reason)
|
||||||
|
|
||||||
# ============ File Storage ============
|
# ============ File Storage ============
|
||||||
|
|
||||||
def put(
|
def put(
|
||||||
@@ -396,9 +430,10 @@ class L1CacheManager:
|
|||||||
Returns:
|
Returns:
|
||||||
(can_delete, reason) tuple
|
(can_delete, reason) tuple
|
||||||
"""
|
"""
|
||||||
# Check if shared via L2
|
# Check if pinned (published or input to published)
|
||||||
if self.l2_checker.is_shared(content_hash):
|
pinned, reason = self.is_pinned(content_hash)
|
||||||
return False, "Item is published to L2"
|
if pinned:
|
||||||
|
return False, f"Item is pinned ({reason})"
|
||||||
|
|
||||||
# Find node_id for this content
|
# Find node_id for this content
|
||||||
node_id = self._content_index.get(content_hash, content_hash)
|
node_id = self._content_index.get(content_hash, content_hash)
|
||||||
@@ -423,11 +458,13 @@ class L1CacheManager:
|
|||||||
if not activity:
|
if not activity:
|
||||||
return False, "Activity not found"
|
return False, "Activity not found"
|
||||||
|
|
||||||
# Check if any item is shared
|
# Check if any item is pinned
|
||||||
for node_id in activity.all_node_ids:
|
for node_id in activity.all_node_ids:
|
||||||
entry = self.cache.get_entry(node_id)
|
entry = self.cache.get_entry(node_id)
|
||||||
if entry and self.l2_checker.is_shared(entry.content_hash):
|
if entry:
|
||||||
return False, f"Item {node_id} is published to L2"
|
pinned, reason = self.is_pinned(entry.content_hash)
|
||||||
|
if pinned:
|
||||||
|
return False, f"Item {node_id} is pinned ({reason})"
|
||||||
|
|
||||||
return True, "OK"
|
return True, "OK"
|
||||||
|
|
||||||
|
|||||||
40
server.py
40
server.py
@@ -444,6 +444,17 @@ async def discard_run(run_id: str, username: str = Depends(get_required_user)):
|
|||||||
|
|
||||||
# Failed runs can always be deleted (no output to protect)
|
# Failed runs can always be deleted (no output to protect)
|
||||||
if run.status != "failed":
|
if run.status != "failed":
|
||||||
|
# Check if any items are pinned (published or input to published)
|
||||||
|
items_to_check = list(run.inputs or [])
|
||||||
|
if run.output_hash:
|
||||||
|
items_to_check.append(run.output_hash)
|
||||||
|
|
||||||
|
for content_hash in items_to_check:
|
||||||
|
meta = load_cache_meta(content_hash)
|
||||||
|
if meta.get("pinned"):
|
||||||
|
pin_reason = meta.get("pin_reason", "published")
|
||||||
|
raise HTTPException(400, f"Cannot discard run: item {content_hash[:16]}... is pinned ({pin_reason})")
|
||||||
|
|
||||||
# Check if activity exists for this run
|
# Check if activity exists for this run
|
||||||
activity = cache_manager.get_activity(run_id)
|
activity = cache_manager.get_activity(run_id)
|
||||||
|
|
||||||
@@ -457,15 +468,6 @@ async def discard_run(run_id: str, username: str = Depends(get_required_user)):
|
|||||||
success, msg = cache_manager.discard_activity(run_id)
|
success, msg = cache_manager.discard_activity(run_id)
|
||||||
if not success:
|
if not success:
|
||||||
raise HTTPException(500, f"Failed to discard: {msg}")
|
raise HTTPException(500, f"Failed to discard: {msg}")
|
||||||
else:
|
|
||||||
# Legacy run without activity record - check L2 shared status manually
|
|
||||||
items_to_check = list(run.inputs or [])
|
|
||||||
if run.output_hash:
|
|
||||||
items_to_check.append(run.output_hash)
|
|
||||||
|
|
||||||
for content_hash in items_to_check:
|
|
||||||
if cache_manager.l2_checker.is_shared(content_hash):
|
|
||||||
raise HTTPException(400, f"Cannot discard run: item {content_hash[:16]}... is published to L2")
|
|
||||||
|
|
||||||
# Remove from Redis
|
# Remove from Redis
|
||||||
redis_client.delete(f"{RUNS_KEY_PREFIX}{run_id}")
|
redis_client.delete(f"{RUNS_KEY_PREFIX}{run_id}")
|
||||||
@@ -491,6 +493,17 @@ async def ui_discard_run(run_id: str, request: Request):
|
|||||||
|
|
||||||
# Failed runs can always be deleted
|
# Failed runs can always be deleted
|
||||||
if run.status != "failed":
|
if run.status != "failed":
|
||||||
|
# Check if any items are pinned (published or input to published)
|
||||||
|
items_to_check = list(run.inputs or [])
|
||||||
|
if run.output_hash:
|
||||||
|
items_to_check.append(run.output_hash)
|
||||||
|
|
||||||
|
for content_hash in items_to_check:
|
||||||
|
meta = load_cache_meta(content_hash)
|
||||||
|
if meta.get("pinned"):
|
||||||
|
pin_reason = meta.get("pin_reason", "published")
|
||||||
|
return f'<div class="bg-red-900/50 border border-red-700 text-red-300 px-4 py-3 rounded-lg mb-4">Cannot discard: item {content_hash[:16]}... is pinned ({pin_reason})</div>'
|
||||||
|
|
||||||
# Check if activity exists for this run
|
# Check if activity exists for this run
|
||||||
activity = cache_manager.get_activity(run_id)
|
activity = cache_manager.get_activity(run_id)
|
||||||
|
|
||||||
@@ -502,15 +515,6 @@ async def ui_discard_run(run_id: str, request: Request):
|
|||||||
success, msg = cache_manager.discard_activity(run_id)
|
success, msg = cache_manager.discard_activity(run_id)
|
||||||
if not success:
|
if not success:
|
||||||
return f'<div class="bg-red-900/50 border border-red-700 text-red-300 px-4 py-3 rounded-lg mb-4">Failed to discard: {msg}</div>'
|
return f'<div class="bg-red-900/50 border border-red-700 text-red-300 px-4 py-3 rounded-lg mb-4">Failed to discard: {msg}</div>'
|
||||||
else:
|
|
||||||
# Legacy run - check L2 shared status
|
|
||||||
items_to_check = list(run.inputs or [])
|
|
||||||
if run.output_hash:
|
|
||||||
items_to_check.append(run.output_hash)
|
|
||||||
|
|
||||||
for content_hash in items_to_check:
|
|
||||||
if cache_manager.l2_checker.is_shared(content_hash):
|
|
||||||
return f'<div class="bg-red-900/50 border border-red-700 text-red-300 px-4 py-3 rounded-lg mb-4">Cannot discard: item {content_hash[:16]}... is published to L2</div>'
|
|
||||||
|
|
||||||
# Remove from Redis
|
# Remove from Redis
|
||||||
redis_client.delete(f"{RUNS_KEY_PREFIX}{run_id}")
|
redis_client.delete(f"{RUNS_KEY_PREFIX}{run_id}")
|
||||||
|
|||||||
@@ -118,12 +118,13 @@ class TestL2SharedChecker:
|
|||||||
mock_l2.get.return_value = Mock(status_code=404)
|
mock_l2.get.return_value = Mock(status_code=404)
|
||||||
assert checker.is_shared("abc123") is False
|
assert checker.is_shared("abc123") is False
|
||||||
|
|
||||||
def test_error_returns_false(self, mock_l2):
|
def test_error_returns_true(self, mock_l2):
|
||||||
"""API errors return False (safe for deletion)."""
|
"""API errors return True (safe - prevents accidental deletion)."""
|
||||||
checker = L2SharedChecker("http://mock:8200")
|
checker = L2SharedChecker("http://mock:8200")
|
||||||
mock_l2.get.side_effect = Exception("Network error")
|
mock_l2.get.side_effect = Exception("Network error")
|
||||||
|
|
||||||
assert checker.is_shared("abc123") is False
|
# On error, assume IS shared to prevent accidental deletion
|
||||||
|
assert checker.is_shared("abc123") is True
|
||||||
|
|
||||||
|
|
||||||
class TestL1CacheManagerStorage:
|
class TestL1CacheManagerStorage:
|
||||||
@@ -276,18 +277,17 @@ class TestL1CacheManagerDeletionRules:
|
|||||||
assert can_delete is False
|
assert can_delete is False
|
||||||
assert "output" in reason.lower()
|
assert "output" in reason.lower()
|
||||||
|
|
||||||
def test_cannot_delete_shared_item(self, manager, temp_dir, mock_l2):
|
def test_cannot_delete_pinned_item(self, manager, temp_dir):
|
||||||
"""Published items cannot be deleted."""
|
"""Pinned items cannot be deleted."""
|
||||||
test_file = create_test_file(temp_dir / "shared.txt", "shared")
|
test_file = create_test_file(temp_dir / "shared.txt", "shared")
|
||||||
cached = manager.put(test_file, node_type="test")
|
cached = manager.put(test_file, node_type="test")
|
||||||
|
|
||||||
# Mark as published
|
# Mark as pinned (published)
|
||||||
mock_l2.get.return_value = Mock(status_code=200)
|
manager.pin(cached.content_hash, reason="published")
|
||||||
manager.l2_checker.invalidate(cached.content_hash)
|
|
||||||
|
|
||||||
can_delete, reason = manager.can_delete(cached.content_hash)
|
can_delete, reason = manager.can_delete(cached.content_hash)
|
||||||
assert can_delete is False
|
assert can_delete is False
|
||||||
assert "L2" in reason
|
assert "pinned" in reason
|
||||||
|
|
||||||
def test_delete_orphaned_item(self, manager, temp_dir):
|
def test_delete_orphaned_item(self, manager, temp_dir):
|
||||||
"""Can delete orphaned items."""
|
"""Can delete orphaned items."""
|
||||||
@@ -338,8 +338,8 @@ class TestL1CacheManagerActivityDiscard:
|
|||||||
can_discard, reason = manager.can_discard_activity("run-001")
|
can_discard, reason = manager.can_discard_activity("run-001")
|
||||||
assert can_discard is True
|
assert can_discard is True
|
||||||
|
|
||||||
def test_cannot_discard_activity_with_shared_output(self, manager, temp_dir, mock_l2):
|
def test_cannot_discard_activity_with_pinned_output(self, manager, temp_dir):
|
||||||
"""Activities with shared outputs cannot be discarded."""
|
"""Activities with pinned outputs cannot be discarded."""
|
||||||
input_file = create_test_file(temp_dir / "input.txt", "input")
|
input_file = create_test_file(temp_dir / "input.txt", "input")
|
||||||
output_file = create_test_file(temp_dir / "output.txt", "output")
|
output_file = create_test_file(temp_dir / "output.txt", "output")
|
||||||
|
|
||||||
@@ -352,12 +352,12 @@ class TestL1CacheManagerActivityDiscard:
|
|||||||
"run-001",
|
"run-001",
|
||||||
)
|
)
|
||||||
|
|
||||||
# Mark output as shared
|
# Mark output as pinned (published)
|
||||||
manager.l2_checker.mark_shared(output_cached.content_hash)
|
manager.pin(output_cached.content_hash, reason="published")
|
||||||
|
|
||||||
can_discard, reason = manager.can_discard_activity("run-001")
|
can_discard, reason = manager.can_discard_activity("run-001")
|
||||||
assert can_discard is False
|
assert can_discard is False
|
||||||
assert "L2" in reason
|
assert "pinned" in reason
|
||||||
|
|
||||||
def test_discard_activity_cleans_up(self, manager, temp_dir):
|
def test_discard_activity_cleans_up(self, manager, temp_dir):
|
||||||
"""Discarding activity cleans up orphaned items."""
|
"""Discarding activity cleans up orphaned items."""
|
||||||
|
|||||||
Reference in New Issue
Block a user