Implement ownership model for all cached content deletion

- cache_service.delete_content: Remove user's ownership link first,
  only delete actual file if no other owners remain

- cache_manager.discard_activity_outputs_only: Check if outputs and
  intermediates are used by other activities before deleting

- run_service.discard_run: Now cleans up run outputs/intermediates
  (only if not shared by other runs)

- home.py clear_user_data: Use ownership model for effects and media
  deletion instead of directly deleting files

The ownership model ensures:
1. Multiple users can "own" the same cached content
2. Deleting removes the user's ownership link (item_types entry)
3. Actual files only deleted when no owners remain (garbage collection)
4. Shared intermediates between runs are preserved

Co-Authored-By: Claude Opus 4.5 <noreply@anthropic.com>
This commit is contained in:
gilesb
2026-01-12 20:02:27 +00:00
parent abe89c9177
commit 8bf6f87c2a
4 changed files with 150 additions and 56 deletions

View File

@@ -132,35 +132,55 @@ async def clear_user_data(request: Request):
except Exception as e: except Exception as e:
errors.append(f"Failed to list recipes: {e}") errors.append(f"Failed to list recipes: {e}")
# Delete all effects # Delete all effects (uses ownership model)
try: try:
cache_manager = get_cache_manager() # Get user's effects from item_types
effects_dir = Path(cache_manager.cache_dir) / "_effects" effect_items = await database.get_user_items(actor_id, item_type="effect", limit=10000)
if effects_dir.exists(): for item in effect_items:
import shutil cid = item.get("cid")
for effect_dir in effects_dir.iterdir(): if cid:
if effect_dir.is_dir(): try:
try: # Remove ownership link
shutil.rmtree(effect_dir) await database.delete_item_type(cid, actor_id, "effect")
deleted["effects"] += 1 await database.delete_friendly_name(actor_id, cid)
except Exception as e:
errors.append(f"Effect {effect_dir.name}: {e}") # Check if orphaned
remaining = await database.get_item_types(cid)
if not remaining:
# Garbage collect
effects_dir = Path(cache_manager.cache_dir) / "_effects" / cid
if effects_dir.exists():
import shutil
shutil.rmtree(effects_dir)
import ipfs_client
ipfs_client.unpin(cid)
deleted["effects"] += 1
except Exception as e:
errors.append(f"Effect {cid[:16]}...: {e}")
except Exception as e: except Exception as e:
errors.append(f"Failed to delete effects: {e}") errors.append(f"Failed to delete effects: {e}")
# Delete all media/cache items for user # Delete all media/cache items for user (uses ownership model)
try: try:
items = await database.get_user_items(actor_id, limit=10000) from ..services.cache_service import CacheService
for item in items: cache_service = CacheService(database, cache_manager)
try:
# Get user's media items (video, image, audio)
for media_type in ["video", "image", "audio", "unknown"]:
items = await database.get_user_items(actor_id, item_type=media_type, limit=10000)
for item in items:
cid = item.get("cid") cid = item.get("cid")
if cid: if cid:
await database.delete_cache_item(cid) try:
deleted["media"] += 1 success, error = await cache_service.delete_content(cid, actor_id)
except Exception as e: if success:
errors.append(f"Media {item.get('cid', 'unknown')}: {e}") deleted["media"] += 1
elif error:
errors.append(f"Media {cid[:16]}...: {error}")
except Exception as e:
errors.append(f"Media {cid[:16]}...: {e}")
except Exception as e: except Exception as e:
errors.append(f"Failed to list media: {e}") errors.append(f"Failed to delete media: {e}")
logger.info(f"Cleared data for {actor_id}: {deleted}") logger.info(f"Cleared data for {actor_id}: {deleted}")
if errors: if errors:

View File

@@ -416,32 +416,62 @@ class CacheService:
return l2_result.get("ipfs_cid") or ipfs_cid, None return l2_result.get("ipfs_cid") or ipfs_cid, None
async def delete_content(self, cid: str, actor_id: str) -> Tuple[bool, Optional[str]]: async def delete_content(self, cid: str, actor_id: str) -> Tuple[bool, Optional[str]]:
"""Delete content from cache. Returns (success, error).""" """
if not self.cache.has_content(cid): Remove user's ownership link to cached content.
return False, "Content not found"
# Check if pinned This removes the item_types entry linking the user to the content.
The cached file is only deleted if no other users own it.
Returns (success, error).
"""
import logging
logger = logging.getLogger(__name__)
# Check if pinned for this user
meta = await self.db.load_item_metadata(cid, actor_id) meta = await self.db.load_item_metadata(cid, actor_id)
if meta and meta.get("pinned"): if meta and meta.get("pinned"):
pin_reason = meta.get("pin_reason", "unknown") pin_reason = meta.get("pin_reason", "unknown")
return False, f"Cannot discard pinned item (reason: {pin_reason})" return False, f"Cannot discard pinned item (reason: {pin_reason})"
# Check deletion rules via cache_manager # Get the item type to delete the right ownership entry
can_delete, reason = self.cache.can_delete(cid) item_types = await self.db.get_item_types(cid, actor_id)
if not can_delete: if not item_types:
return False, f"Cannot discard: {reason}" return False, "You don't own this content"
# Delete via cache_manager # Remove user's ownership links (all types for this user)
success, msg = self.cache.delete_by_cid(cid) for item in item_types:
item_type = item.get("type", "media")
await self.db.delete_item_type(cid, actor_id, item_type)
# Clean up legacy metadata files # Remove friendly name
meta_path = self.cache_dir / f"{cid}.meta.json" await self.db.delete_friendly_name(actor_id, cid)
if meta_path.exists():
meta_path.unlink()
mp4_path = self.cache_dir / f"{cid}.mp4"
if mp4_path.exists():
mp4_path.unlink()
# Check if anyone else still owns this content
remaining_owners = await self.db.get_item_types(cid)
# Only delete the actual file if no one owns it anymore
if not remaining_owners:
# Check deletion rules via cache_manager
can_delete, reason = self.cache.can_delete(cid)
if can_delete:
# Delete via cache_manager
self.cache.delete_by_cid(cid)
# Clean up legacy metadata files
meta_path = self.cache_dir / f"{cid}.meta.json"
if meta_path.exists():
meta_path.unlink()
mp4_path = self.cache_dir / f"{cid}.mp4"
if mp4_path.exists():
mp4_path.unlink()
# Delete from database
await self.db.delete_cache_item(cid)
logger.info(f"Garbage collected content {cid[:16]}... (no remaining owners)")
else:
logger.info(f"Content {cid[:16]}... orphaned but cannot delete: {reason}")
logger.info(f"Removed content {cid[:16]}... ownership for {actor_id}")
return True, None return True, None
async def import_from_ipfs(self, ipfs_cid: str, actor_id: str) -> Tuple[Optional[str], Optional[str]]: async def import_from_ipfs(self, ipfs_cid: str, actor_id: str) -> Tuple[Optional[str], Optional[str]]:

View File

@@ -467,10 +467,13 @@ class RunService:
username: str, username: str,
) -> Tuple[bool, Optional[str]]: ) -> Tuple[bool, Optional[str]]:
""" """
Discard (delete) a run record. Discard (delete) a run record and clean up outputs/intermediates.
Note: This removes the run record but not the output content. Outputs and intermediates are only deleted if not used by other runs.
""" """
import logging
logger = logging.getLogger(__name__)
run = await self.get_run(run_id) run = await self.get_run(run_id)
if not run: if not run:
return False, f"Run {run_id} not found" return False, f"Run {run_id} not found"
@@ -480,6 +483,18 @@ class RunService:
if run_owner and run_owner not in (username, actor_id): if run_owner and run_owner not in (username, actor_id):
return False, "Access denied" return False, "Access denied"
# Clean up activity outputs/intermediates (only if orphaned)
# The activity_id is the same as run_id
try:
success, msg = self.cache.discard_activity_outputs_only(run_id)
if success:
logger.info(f"Cleaned up run {run_id}: {msg}")
else:
# Activity might not exist (old runs), that's OK
logger.debug(f"No activity cleanup for {run_id}: {msg}")
except Exception as e:
logger.warning(f"Failed to cleanup activity for {run_id}: {e}")
# Remove task_id mapping from Redis # Remove task_id mapping from Redis
self.redis.delete(f"{self.task_key_prefix}{run_id}") self.redis.delete(f"{self.task_key_prefix}{run_id}")
@@ -487,8 +502,7 @@ class RunService:
try: try:
await self.db.delete_run_cache(run_id) await self.db.delete_run_cache(run_id)
except Exception as e: except Exception as e:
import logging logger.warning(f"Failed to delete run_cache for {run_id}: {e}")
logging.getLogger(__name__).warning(f"Failed to delete run_cache for {run_id}: {e}")
# Remove pending run if exists # Remove pending run if exists
try: try:

View File

@@ -690,11 +690,26 @@ class L1CacheManager:
return True, "Activity discarded" return True, "Activity discarded"
return False, "Failed to discard" return False, "Failed to discard"
def _is_used_by_other_activities(self, node_id: str, exclude_activity_id: str) -> bool:
"""Check if a node is used by any activity other than the excluded one."""
for other_activity in self.activity_store.list():
if other_activity.activity_id == exclude_activity_id:
continue
# Check if used as input, output, or intermediate
if node_id in other_activity.input_ids:
return True
if node_id == other_activity.output_id:
return True
if node_id in other_activity.intermediate_ids:
return True
return False
def discard_activity_outputs_only(self, activity_id: str) -> tuple[bool, str]: def discard_activity_outputs_only(self, activity_id: str) -> tuple[bool, str]:
""" """
Discard an activity, deleting only outputs and intermediates. Discard an activity, deleting only outputs and intermediates.
Inputs (cache items, configs) are preserved. Inputs (cache items, configs) are preserved.
Outputs/intermediates used by other activities are preserved.
Returns: Returns:
(success, message) tuple (success, message) tuple
@@ -711,21 +726,31 @@ class L1CacheManager:
if pinned: if pinned:
return False, f"Output is pinned ({reason})" return False, f"Output is pinned ({reason})"
# Delete output deleted_outputs = 0
if activity.output_id: preserved_shared = 0
entry = self.cache.get_entry(activity.output_id)
if entry:
# Remove from cache
self.cache.remove(activity.output_id)
# Remove from content index (Redis + local)
self._del_content_index(entry.cid)
# Delete from legacy dir if exists
legacy_path = self.legacy_dir / entry.cid
if legacy_path.exists():
legacy_path.unlink()
# Delete intermediates # Delete output (only if not used by other activities)
if activity.output_id:
if self._is_used_by_other_activities(activity.output_id, activity_id):
preserved_shared += 1
else:
entry = self.cache.get_entry(activity.output_id)
if entry:
# Remove from cache
self.cache.remove(activity.output_id)
# Remove from content index (Redis + local)
self._del_content_index(entry.cid)
# Delete from legacy dir if exists
legacy_path = self.legacy_dir / entry.cid
if legacy_path.exists():
legacy_path.unlink()
deleted_outputs += 1
# Delete intermediates (only if not used by other activities)
for node_id in activity.intermediate_ids: for node_id in activity.intermediate_ids:
if self._is_used_by_other_activities(node_id, activity_id):
preserved_shared += 1
continue
entry = self.cache.get_entry(node_id) entry = self.cache.get_entry(node_id)
if entry: if entry:
self.cache.remove(node_id) self.cache.remove(node_id)
@@ -733,11 +758,16 @@ class L1CacheManager:
legacy_path = self.legacy_dir / entry.cid legacy_path = self.legacy_dir / entry.cid
if legacy_path.exists(): if legacy_path.exists():
legacy_path.unlink() legacy_path.unlink()
deleted_outputs += 1
# Remove activity record (inputs remain in cache) # Remove activity record (inputs remain in cache)
self.activity_store.remove(activity_id) self.activity_store.remove(activity_id)
return True, "Activity discarded (outputs only)" msg = f"Activity discarded (deleted {deleted_outputs} outputs"
if preserved_shared > 0:
msg += f", preserved {preserved_shared} shared items"
msg += ")"
return True, msg
def cleanup_intermediates(self) -> int: def cleanup_intermediates(self) -> int:
"""Delete all intermediate cache entries (reconstructible).""" """Delete all intermediate cache entries (reconstructible)."""