Compare commits

..

2 Commits

Author SHA1 Message Date
gilesb
8bf6f87c2a Implement ownership model for all cached content deletion
- cache_service.delete_content: Remove user's ownership link first,
  only delete actual file if no other owners remain

- cache_manager.discard_activity_outputs_only: Check if outputs and
  intermediates are used by other activities before deleting

- run_service.discard_run: Now cleans up run outputs/intermediates
  (only if not shared by other runs)

- home.py clear_user_data: Use ownership model for effects and media
  deletion instead of directly deleting files

The ownership model ensures:
1. Multiple users can "own" the same cached content
2. Deleting removes the user's ownership link (item_types entry)
3. Actual files only deleted when no owners remain (garbage collection)
4. Shared intermediates between runs are preserved

Co-Authored-By: Claude Opus 4.5 <noreply@anthropic.com>
2026-01-12 20:02:27 +00:00
gilesb
abe89c9177 Fix effects router to use proper ownership model
- Upload: Create item_types entry to track user-effect relationship
- List: Query item_types for user's effects instead of scanning filesystem
- Delete: Remove ownership link, only delete files if orphaned (garbage collect)

This matches the ownership model used by recipes and media, where multiple
users can "own" the same cached content through item_types entries.

Co-Authored-By: Claude Opus 4.5 <noreply@anthropic.com>
2026-01-12 19:56:12 +00:00
5 changed files with 200 additions and 91 deletions

View File

@@ -200,6 +200,15 @@ async def upload_effect(
# Also store metadata in IPFS for discoverability # Also store metadata in IPFS for discoverability
meta_cid = ipfs_client.add_json(full_meta) meta_cid = ipfs_client.add_json(full_meta)
# Track ownership in item_types
import database
await database.save_item_metadata(
cid=cid,
actor_id=ctx.actor_id,
item_type="effect",
filename=file.filename,
)
# Assign friendly name # Assign friendly name
from ..services.naming_service import get_naming_service from ..services.naming_service import get_naming_service
naming = get_naming_service() naming = get_naming_service()
@@ -314,31 +323,33 @@ async def list_effects(
limit: int = 20, limit: int = 20,
ctx: UserContext = Depends(require_auth), ctx: UserContext = Depends(require_auth),
): ):
"""List uploaded effects with pagination.""" """List user's effects with pagination."""
import database
effects_dir = get_effects_dir() effects_dir = get_effects_dir()
effects = [] effects = []
# Get user's effect CIDs from item_types
user_items = await database.get_user_items(ctx.actor_id, item_type="effect", limit=1000)
effect_cids = [item["cid"] for item in user_items]
# Get naming service for friendly name lookup # Get naming service for friendly name lookup
from ..services.naming_service import get_naming_service from ..services.naming_service import get_naming_service
naming = get_naming_service() naming = get_naming_service()
if effects_dir.exists(): for cid in effect_cids:
for effect_dir in effects_dir.iterdir(): effect_dir = effects_dir / cid
if effect_dir.is_dir(): metadata_path = effect_dir / "metadata.json"
metadata_path = effect_dir / "metadata.json" if metadata_path.exists():
if metadata_path.exists(): try:
try: meta = json.loads(metadata_path.read_text())
meta = json.loads(metadata_path.read_text()) # Add friendly name if available
# Add friendly name if available friendly = await naming.get_by_cid(ctx.actor_id, cid)
cid = meta.get("cid") if friendly:
if cid: meta["friendly_name"] = friendly["friendly_name"]
friendly = await naming.get_by_cid(ctx.actor_id, cid) meta["base_name"] = friendly["base_name"]
if friendly: effects.append(meta)
meta["friendly_name"] = friendly["friendly_name"] except json.JSONDecodeError:
meta["base_name"] = friendly["base_name"] pass
effects.append(meta)
except json.JSONDecodeError:
pass
# Sort by upload time (newest first) # Sort by upload time (newest first)
effects.sort(key=lambda e: e.get("uploaded_at", ""), reverse=True) effects.sort(key=lambda e: e.get("uploaded_at", ""), reverse=True)
@@ -412,25 +423,29 @@ async def delete_effect(
cid: str, cid: str,
ctx: UserContext = Depends(require_auth), ctx: UserContext = Depends(require_auth),
): ):
"""Delete an effect from local cache (IPFS content is immutable).""" """Remove user's ownership link to an effect."""
effects_dir = get_effects_dir() import database
effect_dir = effects_dir / cid
if not effect_dir.exists(): # Remove user's ownership link from item_types
raise HTTPException(404, f"Effect {cid[:16]}... not found in local cache") await database.delete_item_type(cid, ctx.actor_id, "effect")
# Check ownership # Remove friendly name
metadata_path = effect_dir / "metadata.json" await database.delete_friendly_name(ctx.actor_id, cid)
if metadata_path.exists():
meta = json.loads(metadata_path.read_text())
if meta.get("uploader") != ctx.actor_id:
raise HTTPException(403, "Can only delete your own effects")
import shutil # Check if anyone still owns this effect
shutil.rmtree(effect_dir) remaining_owners = await database.get_item_types(cid)
# Unpin from IPFS (content remains available if pinned elsewhere) # Only delete local files if no one owns it anymore
ipfs_client.unpin(cid) if not remaining_owners:
effects_dir = get_effects_dir()
effect_dir = effects_dir / cid
if effect_dir.exists():
import shutil
shutil.rmtree(effect_dir)
logger.info(f"Deleted effect {cid[:16]}... by {ctx.actor_id}") # Unpin from IPFS
return {"deleted": True, "note": "Unpinned from local IPFS; content may still exist on other nodes"} ipfs_client.unpin(cid)
logger.info(f"Garbage collected effect {cid[:16]}... (no remaining owners)")
logger.info(f"Removed effect {cid[:16]}... ownership for {ctx.actor_id}")
return {"deleted": True}

View File

@@ -132,35 +132,55 @@ async def clear_user_data(request: Request):
except Exception as e: except Exception as e:
errors.append(f"Failed to list recipes: {e}") errors.append(f"Failed to list recipes: {e}")
# Delete all effects # Delete all effects (uses ownership model)
try: try:
cache_manager = get_cache_manager() # Get user's effects from item_types
effects_dir = Path(cache_manager.cache_dir) / "_effects" effect_items = await database.get_user_items(actor_id, item_type="effect", limit=10000)
if effects_dir.exists(): for item in effect_items:
import shutil cid = item.get("cid")
for effect_dir in effects_dir.iterdir(): if cid:
if effect_dir.is_dir(): try:
try: # Remove ownership link
shutil.rmtree(effect_dir) await database.delete_item_type(cid, actor_id, "effect")
deleted["effects"] += 1 await database.delete_friendly_name(actor_id, cid)
except Exception as e:
errors.append(f"Effect {effect_dir.name}: {e}") # Check if orphaned
remaining = await database.get_item_types(cid)
if not remaining:
# Garbage collect
effects_dir = Path(cache_manager.cache_dir) / "_effects" / cid
if effects_dir.exists():
import shutil
shutil.rmtree(effects_dir)
import ipfs_client
ipfs_client.unpin(cid)
deleted["effects"] += 1
except Exception as e:
errors.append(f"Effect {cid[:16]}...: {e}")
except Exception as e: except Exception as e:
errors.append(f"Failed to delete effects: {e}") errors.append(f"Failed to delete effects: {e}")
# Delete all media/cache items for user # Delete all media/cache items for user (uses ownership model)
try: try:
items = await database.get_user_items(actor_id, limit=10000) from ..services.cache_service import CacheService
for item in items: cache_service = CacheService(database, cache_manager)
try:
# Get user's media items (video, image, audio)
for media_type in ["video", "image", "audio", "unknown"]:
items = await database.get_user_items(actor_id, item_type=media_type, limit=10000)
for item in items:
cid = item.get("cid") cid = item.get("cid")
if cid: if cid:
await database.delete_cache_item(cid) try:
deleted["media"] += 1 success, error = await cache_service.delete_content(cid, actor_id)
except Exception as e: if success:
errors.append(f"Media {item.get('cid', 'unknown')}: {e}") deleted["media"] += 1
elif error:
errors.append(f"Media {cid[:16]}...: {error}")
except Exception as e:
errors.append(f"Media {cid[:16]}...: {e}")
except Exception as e: except Exception as e:
errors.append(f"Failed to list media: {e}") errors.append(f"Failed to delete media: {e}")
logger.info(f"Cleared data for {actor_id}: {deleted}") logger.info(f"Cleared data for {actor_id}: {deleted}")
if errors: if errors:

View File

@@ -416,32 +416,62 @@ class CacheService:
return l2_result.get("ipfs_cid") or ipfs_cid, None return l2_result.get("ipfs_cid") or ipfs_cid, None
async def delete_content(self, cid: str, actor_id: str) -> Tuple[bool, Optional[str]]: async def delete_content(self, cid: str, actor_id: str) -> Tuple[bool, Optional[str]]:
"""Delete content from cache. Returns (success, error).""" """
if not self.cache.has_content(cid): Remove user's ownership link to cached content.
return False, "Content not found"
# Check if pinned This removes the item_types entry linking the user to the content.
The cached file is only deleted if no other users own it.
Returns (success, error).
"""
import logging
logger = logging.getLogger(__name__)
# Check if pinned for this user
meta = await self.db.load_item_metadata(cid, actor_id) meta = await self.db.load_item_metadata(cid, actor_id)
if meta and meta.get("pinned"): if meta and meta.get("pinned"):
pin_reason = meta.get("pin_reason", "unknown") pin_reason = meta.get("pin_reason", "unknown")
return False, f"Cannot discard pinned item (reason: {pin_reason})" return False, f"Cannot discard pinned item (reason: {pin_reason})"
# Check deletion rules via cache_manager # Get the item type to delete the right ownership entry
can_delete, reason = self.cache.can_delete(cid) item_types = await self.db.get_item_types(cid, actor_id)
if not can_delete: if not item_types:
return False, f"Cannot discard: {reason}" return False, "You don't own this content"
# Delete via cache_manager # Remove user's ownership links (all types for this user)
success, msg = self.cache.delete_by_cid(cid) for item in item_types:
item_type = item.get("type", "media")
await self.db.delete_item_type(cid, actor_id, item_type)
# Clean up legacy metadata files # Remove friendly name
meta_path = self.cache_dir / f"{cid}.meta.json" await self.db.delete_friendly_name(actor_id, cid)
if meta_path.exists():
meta_path.unlink()
mp4_path = self.cache_dir / f"{cid}.mp4"
if mp4_path.exists():
mp4_path.unlink()
# Check if anyone else still owns this content
remaining_owners = await self.db.get_item_types(cid)
# Only delete the actual file if no one owns it anymore
if not remaining_owners:
# Check deletion rules via cache_manager
can_delete, reason = self.cache.can_delete(cid)
if can_delete:
# Delete via cache_manager
self.cache.delete_by_cid(cid)
# Clean up legacy metadata files
meta_path = self.cache_dir / f"{cid}.meta.json"
if meta_path.exists():
meta_path.unlink()
mp4_path = self.cache_dir / f"{cid}.mp4"
if mp4_path.exists():
mp4_path.unlink()
# Delete from database
await self.db.delete_cache_item(cid)
logger.info(f"Garbage collected content {cid[:16]}... (no remaining owners)")
else:
logger.info(f"Content {cid[:16]}... orphaned but cannot delete: {reason}")
logger.info(f"Removed content {cid[:16]}... ownership for {actor_id}")
return True, None return True, None
async def import_from_ipfs(self, ipfs_cid: str, actor_id: str) -> Tuple[Optional[str], Optional[str]]: async def import_from_ipfs(self, ipfs_cid: str, actor_id: str) -> Tuple[Optional[str], Optional[str]]:

View File

@@ -467,10 +467,13 @@ class RunService:
username: str, username: str,
) -> Tuple[bool, Optional[str]]: ) -> Tuple[bool, Optional[str]]:
""" """
Discard (delete) a run record. Discard (delete) a run record and clean up outputs/intermediates.
Note: This removes the run record but not the output content. Outputs and intermediates are only deleted if not used by other runs.
""" """
import logging
logger = logging.getLogger(__name__)
run = await self.get_run(run_id) run = await self.get_run(run_id)
if not run: if not run:
return False, f"Run {run_id} not found" return False, f"Run {run_id} not found"
@@ -480,6 +483,18 @@ class RunService:
if run_owner and run_owner not in (username, actor_id): if run_owner and run_owner not in (username, actor_id):
return False, "Access denied" return False, "Access denied"
# Clean up activity outputs/intermediates (only if orphaned)
# The activity_id is the same as run_id
try:
success, msg = self.cache.discard_activity_outputs_only(run_id)
if success:
logger.info(f"Cleaned up run {run_id}: {msg}")
else:
# Activity might not exist (old runs), that's OK
logger.debug(f"No activity cleanup for {run_id}: {msg}")
except Exception as e:
logger.warning(f"Failed to cleanup activity for {run_id}: {e}")
# Remove task_id mapping from Redis # Remove task_id mapping from Redis
self.redis.delete(f"{self.task_key_prefix}{run_id}") self.redis.delete(f"{self.task_key_prefix}{run_id}")
@@ -487,8 +502,7 @@ class RunService:
try: try:
await self.db.delete_run_cache(run_id) await self.db.delete_run_cache(run_id)
except Exception as e: except Exception as e:
import logging logger.warning(f"Failed to delete run_cache for {run_id}: {e}")
logging.getLogger(__name__).warning(f"Failed to delete run_cache for {run_id}: {e}")
# Remove pending run if exists # Remove pending run if exists
try: try:

View File

@@ -690,11 +690,26 @@ class L1CacheManager:
return True, "Activity discarded" return True, "Activity discarded"
return False, "Failed to discard" return False, "Failed to discard"
def _is_used_by_other_activities(self, node_id: str, exclude_activity_id: str) -> bool:
"""Check if a node is used by any activity other than the excluded one."""
for other_activity in self.activity_store.list():
if other_activity.activity_id == exclude_activity_id:
continue
# Check if used as input, output, or intermediate
if node_id in other_activity.input_ids:
return True
if node_id == other_activity.output_id:
return True
if node_id in other_activity.intermediate_ids:
return True
return False
def discard_activity_outputs_only(self, activity_id: str) -> tuple[bool, str]: def discard_activity_outputs_only(self, activity_id: str) -> tuple[bool, str]:
""" """
Discard an activity, deleting only outputs and intermediates. Discard an activity, deleting only outputs and intermediates.
Inputs (cache items, configs) are preserved. Inputs (cache items, configs) are preserved.
Outputs/intermediates used by other activities are preserved.
Returns: Returns:
(success, message) tuple (success, message) tuple
@@ -711,21 +726,31 @@ class L1CacheManager:
if pinned: if pinned:
return False, f"Output is pinned ({reason})" return False, f"Output is pinned ({reason})"
# Delete output deleted_outputs = 0
if activity.output_id: preserved_shared = 0
entry = self.cache.get_entry(activity.output_id)
if entry:
# Remove from cache
self.cache.remove(activity.output_id)
# Remove from content index (Redis + local)
self._del_content_index(entry.cid)
# Delete from legacy dir if exists
legacy_path = self.legacy_dir / entry.cid
if legacy_path.exists():
legacy_path.unlink()
# Delete intermediates # Delete output (only if not used by other activities)
if activity.output_id:
if self._is_used_by_other_activities(activity.output_id, activity_id):
preserved_shared += 1
else:
entry = self.cache.get_entry(activity.output_id)
if entry:
# Remove from cache
self.cache.remove(activity.output_id)
# Remove from content index (Redis + local)
self._del_content_index(entry.cid)
# Delete from legacy dir if exists
legacy_path = self.legacy_dir / entry.cid
if legacy_path.exists():
legacy_path.unlink()
deleted_outputs += 1
# Delete intermediates (only if not used by other activities)
for node_id in activity.intermediate_ids: for node_id in activity.intermediate_ids:
if self._is_used_by_other_activities(node_id, activity_id):
preserved_shared += 1
continue
entry = self.cache.get_entry(node_id) entry = self.cache.get_entry(node_id)
if entry: if entry:
self.cache.remove(node_id) self.cache.remove(node_id)
@@ -733,11 +758,16 @@ class L1CacheManager:
legacy_path = self.legacy_dir / entry.cid legacy_path = self.legacy_dir / entry.cid
if legacy_path.exists(): if legacy_path.exists():
legacy_path.unlink() legacy_path.unlink()
deleted_outputs += 1
# Remove activity record (inputs remain in cache) # Remove activity record (inputs remain in cache)
self.activity_store.remove(activity_id) self.activity_store.remove(activity_id)
return True, "Activity discarded (outputs only)" msg = f"Activity discarded (deleted {deleted_outputs} outputs"
if preserved_shared > 0:
msg += f", preserved {preserved_shared} shared items"
msg += ")"
return True, msg
def cleanup_intermediates(self) -> int: def cleanup_intermediates(self) -> int:
"""Delete all intermediate cache entries (reconstructible).""" """Delete all intermediate cache entries (reconstructible)."""