From 4639a9823146d2125efb69cb6921d8d9edfa16a6 Mon Sep 17 00:00:00 2001 From: gilesb Date: Thu, 8 Jan 2026 03:38:14 +0000 Subject: [PATCH] lists of shares. job deletion only deltes outputs --- cache_manager.py | 57 +++++++++++++++++++- server.py | 137 ++++++++++++++++++++++++++--------------------- 2 files changed, 133 insertions(+), 61 deletions(-) diff --git a/cache_manager.py b/cache_manager.py index a0f0d3c..82fd32a 100644 --- a/cache_manager.py +++ b/cache_manager.py @@ -91,7 +91,7 @@ class L2SharedChecker: # Query L2 try: - url = f"{self.l2_server}/registry/by-hash/{content_hash}" + url = f"{self.l2_server}/assets/by-hash/{content_hash}" logger.info(f"L2 check: GET {url}") resp = requests.get(url, timeout=5) logger.info(f"L2 check response: {resp.status_code}") @@ -506,6 +506,61 @@ class L1CacheManager: return True, "Activity discarded" return False, "Failed to discard" + def discard_activity_outputs_only(self, activity_id: str) -> tuple[bool, str]: + """ + Discard an activity, deleting only outputs and intermediates. + + Inputs (cache items, configs) are preserved. + + Returns: + (success, message) tuple + """ + activity = self.activity_store.get(activity_id) + if not activity: + return False, "Activity not found" + + # Check if output is pinned + if activity.output_id: + entry = self.cache.get_entry(activity.output_id) + if entry: + pinned, reason = self.is_pinned(entry.content_hash) + if pinned: + return False, f"Output is pinned ({reason})" + + # Delete output + if activity.output_id: + entry = self.cache.get_entry(activity.output_id) + if entry: + # Remove from cache + self.cache.remove(activity.output_id) + # Remove from content index + if entry.content_hash in self._content_index: + del self._content_index[entry.content_hash] + self._save_content_index() + # Delete from legacy dir if exists + legacy_path = self.legacy_dir / entry.content_hash + if legacy_path.exists(): + legacy_path.unlink() + + # Delete intermediates + for node_id in activity.intermediate_ids: + entry = self.cache.get_entry(node_id) + if entry: + self.cache.remove(node_id) + if entry.content_hash in self._content_index: + del self._content_index[entry.content_hash] + legacy_path = self.legacy_dir / entry.content_hash + if legacy_path.exists(): + legacy_path.unlink() + + if activity.intermediate_ids: + self._save_content_index() + + # Remove activity record (inputs remain in cache) + self.activity_store.remove(activity_id) + + return True, "Activity discarded (outputs only)" + def cleanup_intermediates(self) -> int: """Delete all intermediate cache entries (reconstructible).""" return self.activity_manager.cleanup_intermediates() diff --git a/server.py b/server.py index ef9909c..2fc3a84 100644 --- a/server.py +++ b/server.py @@ -515,13 +515,12 @@ async def get_run(run_id: str): @app.delete("/runs/{run_id}") async def discard_run(run_id: str, username: str = Depends(get_required_user)): """ - Discard (delete) a run and its intermediate cache entries. + Discard (delete) a run and its outputs. Enforces deletion rules: - - Cannot discard if any item (input, output) is published to L2 - - Deletes intermediate cache entries - - Keeps inputs (may be used by other runs) - - Deletes orphaned outputs + - Cannot discard if output is published to L2 (pinned) + - Deletes outputs and intermediate cache entries + - Preserves inputs (cache items and configs are NOT deleted) """ run = load_run(run_id) if not run: @@ -534,30 +533,21 @@ async def discard_run(run_id: str, username: str = Depends(get_required_user)): # Failed runs can always be deleted (no output to protect) if run.status != "failed": - # Check if any items are pinned (published or input to published) - items_to_check = list(run.inputs or []) + # Only check if output is pinned - inputs are preserved, not deleted if run.output_hash: - items_to_check.append(run.output_hash) - - for content_hash in items_to_check: - meta = load_cache_meta(content_hash) + meta = load_cache_meta(run.output_hash) if meta.get("pinned"): pin_reason = meta.get("pin_reason", "published") - raise HTTPException(400, f"Cannot discard run: item {content_hash[:16]}... is pinned ({pin_reason})") + raise HTTPException(400, f"Cannot discard run: output {run.output_hash[:16]}... is pinned ({pin_reason})") # Check if activity exists for this run activity = cache_manager.get_activity(run_id) if activity: - # Use activity manager deletion rules - can_discard, reason = cache_manager.can_discard_activity(run_id) - if not can_discard: - raise HTTPException(400, f"Cannot discard run: {reason}") - - # Discard the activity (cleans up cache entries) - success, msg = cache_manager.discard_activity(run_id) + # Discard the activity - only delete outputs, preserve inputs + success, msg = cache_manager.discard_activity_outputs_only(run_id) if not success: - raise HTTPException(500, f"Failed to discard: {msg}") + raise HTTPException(400, f"Cannot discard run: {msg}") # Remove from Redis redis_client.delete(f"{RUNS_KEY_PREFIX}{run_id}") @@ -567,7 +557,7 @@ async def discard_run(run_id: str, username: str = Depends(get_required_user)): @app.delete("/ui/runs/{run_id}/discard", response_class=HTMLResponse) async def ui_discard_run(run_id: str, request: Request): - """HTMX handler: discard a run.""" + """HTMX handler: discard a run. Only deletes outputs, preserves inputs.""" current_user = get_user_from_cookie(request) if not current_user: return '
Login required
' @@ -583,28 +573,21 @@ async def ui_discard_run(run_id: str, request: Request): # Failed runs can always be deleted if run.status != "failed": - # Check if any items are pinned (published or input to published) - items_to_check = list(run.inputs or []) + # Only check if output is pinned - inputs are preserved, not deleted if run.output_hash: - items_to_check.append(run.output_hash) - - for content_hash in items_to_check: - meta = load_cache_meta(content_hash) + meta = load_cache_meta(run.output_hash) if meta.get("pinned"): pin_reason = meta.get("pin_reason", "published") - return f'
Cannot discard: item {content_hash[:16]}... is pinned ({pin_reason})
' + return f'
Cannot discard: output is pinned ({pin_reason})
' # Check if activity exists for this run activity = cache_manager.get_activity(run_id) if activity: - can_discard, reason = cache_manager.can_discard_activity(run_id) - if not can_discard: - return f'
Cannot discard: {reason}
' - - success, msg = cache_manager.discard_activity(run_id) + # Discard the activity - only delete outputs, preserve inputs + success, msg = cache_manager.discard_activity_outputs_only(run_id) if not success: - return f'
Failed to discard: {msg}
' + return f'
Cannot discard: {msg}
' # Remove from Redis redis_client.delete(f"{RUNS_KEY_PREFIX}{run_id}") @@ -1745,7 +1728,7 @@ async def ui_cache_meta_form(content_hash: str, request: Request): description = meta.get("description", "") tags = meta.get("tags", []) tags_str = ", ".join(tags) if tags else "" - published = meta.get("published", {}) + l2_shares = meta.get("l2_shares", []) pinned = meta.get("pinned", False) pin_reason = meta.get("pin_reason", "") @@ -1758,18 +1741,31 @@ async def ui_cache_meta_form(content_hash: str, request: Request): self_checked = 'checked' if origin_type == "self" else '' external_checked = 'checked' if origin_type == "external" else '' - # Build publish section - if published.get("to_l2"): - asset_name = published.get("asset_name", "") - published_at = published.get("published_at", "")[:10] - last_synced = published.get("last_synced_at", "")[:10] + # Build publish section - show list of L2 shares + if l2_shares: + shares_html = "" + for share in l2_shares: + l2_server = share.get("l2_server", "Unknown") + asset_name = share.get("asset_name", "") + published_at = share.get("published_at", "")[:10] if share.get("published_at") else "" + last_synced = share.get("last_synced_at", "")[:10] if share.get("last_synced_at") else "" + shares_html += f''' +
+
+
{asset_name}
+
{l2_server}
+
+
+ Published: {published_at}
+ Synced: {last_synced} +
+
+ ''' publish_html = f'''
-
Published to L2
-
- Asset name: {asset_name}
- Published: {published_at}
- Last synced: {last_synced} +
Published to L2 ({len(l2_shares)} share{"s" if len(l2_shares) != 1 else ""})
+
+ {shares_html}
@@ -1965,7 +1961,7 @@ async def ui_publish_cache(content_hash: str, request: Request): # Call L2 try: resp = http_requests.post( - f"{L2_SERVER}/registry/publish-cache", + f"{L2_SERVER}/assets/publish-cache", headers={"Authorization": f"Bearer {token}"}, json={ "content_hash": content_hash, @@ -1993,14 +1989,26 @@ async def ui_publish_cache(content_hash: str, request: Request): except Exception as e: return f'
Error: {e}
' - # Update local metadata - publish_info = { - "to_l2": True, + # Update local metadata - add to l2_shares list + share_info = { + "l2_server": L2_SERVER, "asset_name": asset_name, "published_at": datetime.now(timezone.utc).isoformat(), "last_synced_at": datetime.now(timezone.utc).isoformat() } - save_cache_meta(content_hash, published=publish_info, pinned=True, pin_reason="published") + # Load existing shares and append + existing_meta = load_cache_meta(content_hash) + l2_shares = existing_meta.get("l2_shares", []) + # Update if already shared to this L2, otherwise append + updated = False + for i, share in enumerate(l2_shares): + if share.get("l2_server") == L2_SERVER: + l2_shares[i] = share_info + updated = True + break + if not updated: + l2_shares.append(share_info) + save_cache_meta(content_hash, l2_shares=l2_shares, pinned=True, pin_reason="published") return f'''
@@ -2028,19 +2036,28 @@ async def ui_republish_cache(content_hash: str, request: Request): # Load metadata meta = load_cache_meta(content_hash) - published = meta.get("published", {}) + l2_shares = meta.get("l2_shares", []) - if not published.get("to_l2"): - return '
Item not published yet
' + # Find share for current L2 server + current_share = None + share_index = -1 + for i, share in enumerate(l2_shares): + if share.get("l2_server") == L2_SERVER: + current_share = share + share_index = i + break - asset_name = published.get("asset_name") + if not current_share: + return '
Item not published to this L2 yet
' + + asset_name = current_share.get("asset_name") if not asset_name: return '
No asset name found
' # Call L2 update try: resp = http_requests.patch( - f"{L2_SERVER}/registry/{asset_name}", + f"{L2_SERVER}/assets/{asset_name}", headers={"Authorization": f"Bearer {token}"}, json={ "description": meta.get("description"), @@ -2066,8 +2083,8 @@ async def ui_republish_cache(content_hash: str, request: Request): return f'
Error: {e}
' # Update local metadata - published["last_synced_at"] = datetime.now(timezone.utc).isoformat() - save_cache_meta(content_hash, published=published) + l2_shares[share_index]["last_synced_at"] = datetime.now(timezone.utc).isoformat() + save_cache_meta(content_hash, l2_shares=l2_shares) return '
Updated on L2!
' @@ -2601,7 +2618,7 @@ async def publish_cache_to_l2( # Call L2 publish-cache endpoint try: resp = http_requests.post( - f"{L2_SERVER}/registry/publish-cache", + f"{L2_SERVER}/assets/publish-cache", headers={"Authorization": f"Bearer {token}"}, json={ "content_hash": content_hash, @@ -2692,7 +2709,7 @@ async def republish_cache_to_l2( # Call L2 update endpoint try: resp = http_requests.patch( - f"{L2_SERVER}/registry/{asset_name}", + f"{L2_SERVER}/assets/{asset_name}", headers={"Authorization": f"Bearer {token}"}, json={ "description": meta.get("description"), @@ -3226,7 +3243,7 @@ async def ui_publish_run(run_id: str, request: Request, output_name: str = Form( # Longer timeout because L2 calls back to L1 to fetch run details try: resp = http_requests.post( - f"{L2_SERVER}/registry/record-run", + f"{L2_SERVER}/assets/record-run", json={"run_id": run_id, "output_name": output_name, "l1_server": L1_PUBLIC_URL}, headers={"Authorization": f"Bearer {token}"}, timeout=30