Compare commits

...

2 Commits

Author SHA1 Message Date
gilesb
8bf6f87c2a Implement ownership model for all cached content deletion
- cache_service.delete_content: Remove user's ownership link first,
  only delete actual file if no other owners remain

- cache_manager.discard_activity_outputs_only: Check if outputs and
  intermediates are used by other activities before deleting

- run_service.discard_run: Now cleans up run outputs/intermediates
  (only if not shared by other runs)

- home.py clear_user_data: Use ownership model for effects and media
  deletion instead of directly deleting files

The ownership model ensures:
1. Multiple users can "own" the same cached content
2. Deleting removes the user's ownership link (item_types entry)
3. Actual files only deleted when no owners remain (garbage collection)
4. Shared intermediates between runs are preserved

Co-Authored-By: Claude Opus 4.5 <noreply@anthropic.com>
2026-01-12 20:02:27 +00:00
gilesb
abe89c9177 Fix effects router to use proper ownership model
- Upload: Create item_types entry to track user-effect relationship
- List: Query item_types for user's effects instead of scanning filesystem
- Delete: Remove ownership link, only delete files if orphaned (garbage collect)

This matches the ownership model used by recipes and media, where multiple
users can "own" the same cached content through item_types entries.

Co-Authored-By: Claude Opus 4.5 <noreply@anthropic.com>
2026-01-12 19:56:12 +00:00
5 changed files with 200 additions and 91 deletions

View File

@@ -200,6 +200,15 @@ async def upload_effect(
# Also store metadata in IPFS for discoverability # Also store metadata in IPFS for discoverability
meta_cid = ipfs_client.add_json(full_meta) meta_cid = ipfs_client.add_json(full_meta)
# Track ownership in item_types
import database
await database.save_item_metadata(
cid=cid,
actor_id=ctx.actor_id,
item_type="effect",
filename=file.filename,
)
# Assign friendly name # Assign friendly name
from ..services.naming_service import get_naming_service from ..services.naming_service import get_naming_service
naming = get_naming_service() naming = get_naming_service()
@@ -314,24 +323,26 @@ async def list_effects(
limit: int = 20, limit: int = 20,
ctx: UserContext = Depends(require_auth), ctx: UserContext = Depends(require_auth),
): ):
"""List uploaded effects with pagination.""" """List user's effects with pagination."""
import database
effects_dir = get_effects_dir() effects_dir = get_effects_dir()
effects = [] effects = []
# Get user's effect CIDs from item_types
user_items = await database.get_user_items(ctx.actor_id, item_type="effect", limit=1000)
effect_cids = [item["cid"] for item in user_items]
# Get naming service for friendly name lookup # Get naming service for friendly name lookup
from ..services.naming_service import get_naming_service from ..services.naming_service import get_naming_service
naming = get_naming_service() naming = get_naming_service()
if effects_dir.exists(): for cid in effect_cids:
for effect_dir in effects_dir.iterdir(): effect_dir = effects_dir / cid
if effect_dir.is_dir():
metadata_path = effect_dir / "metadata.json" metadata_path = effect_dir / "metadata.json"
if metadata_path.exists(): if metadata_path.exists():
try: try:
meta = json.loads(metadata_path.read_text()) meta = json.loads(metadata_path.read_text())
# Add friendly name if available # Add friendly name if available
cid = meta.get("cid")
if cid:
friendly = await naming.get_by_cid(ctx.actor_id, cid) friendly = await naming.get_by_cid(ctx.actor_id, cid)
if friendly: if friendly:
meta["friendly_name"] = friendly["friendly_name"] meta["friendly_name"] = friendly["friendly_name"]
@@ -412,25 +423,29 @@ async def delete_effect(
cid: str, cid: str,
ctx: UserContext = Depends(require_auth), ctx: UserContext = Depends(require_auth),
): ):
"""Delete an effect from local cache (IPFS content is immutable).""" """Remove user's ownership link to an effect."""
import database
# Remove user's ownership link from item_types
await database.delete_item_type(cid, ctx.actor_id, "effect")
# Remove friendly name
await database.delete_friendly_name(ctx.actor_id, cid)
# Check if anyone still owns this effect
remaining_owners = await database.get_item_types(cid)
# Only delete local files if no one owns it anymore
if not remaining_owners:
effects_dir = get_effects_dir() effects_dir = get_effects_dir()
effect_dir = effects_dir / cid effect_dir = effects_dir / cid
if effect_dir.exists():
if not effect_dir.exists():
raise HTTPException(404, f"Effect {cid[:16]}... not found in local cache")
# Check ownership
metadata_path = effect_dir / "metadata.json"
if metadata_path.exists():
meta = json.loads(metadata_path.read_text())
if meta.get("uploader") != ctx.actor_id:
raise HTTPException(403, "Can only delete your own effects")
import shutil import shutil
shutil.rmtree(effect_dir) shutil.rmtree(effect_dir)
# Unpin from IPFS (content remains available if pinned elsewhere) # Unpin from IPFS
ipfs_client.unpin(cid) ipfs_client.unpin(cid)
logger.info(f"Garbage collected effect {cid[:16]}... (no remaining owners)")
logger.info(f"Deleted effect {cid[:16]}... by {ctx.actor_id}") logger.info(f"Removed effect {cid[:16]}... ownership for {ctx.actor_id}")
return {"deleted": True, "note": "Unpinned from local IPFS; content may still exist on other nodes"} return {"deleted": True}

View File

@@ -132,35 +132,55 @@ async def clear_user_data(request: Request):
except Exception as e: except Exception as e:
errors.append(f"Failed to list recipes: {e}") errors.append(f"Failed to list recipes: {e}")
# Delete all effects # Delete all effects (uses ownership model)
try: try:
cache_manager = get_cache_manager() # Get user's effects from item_types
effects_dir = Path(cache_manager.cache_dir) / "_effects" effect_items = await database.get_user_items(actor_id, item_type="effect", limit=10000)
for item in effect_items:
cid = item.get("cid")
if cid:
try:
# Remove ownership link
await database.delete_item_type(cid, actor_id, "effect")
await database.delete_friendly_name(actor_id, cid)
# Check if orphaned
remaining = await database.get_item_types(cid)
if not remaining:
# Garbage collect
effects_dir = Path(cache_manager.cache_dir) / "_effects" / cid
if effects_dir.exists(): if effects_dir.exists():
import shutil import shutil
for effect_dir in effects_dir.iterdir(): shutil.rmtree(effects_dir)
if effect_dir.is_dir(): import ipfs_client
try: ipfs_client.unpin(cid)
shutil.rmtree(effect_dir)
deleted["effects"] += 1 deleted["effects"] += 1
except Exception as e: except Exception as e:
errors.append(f"Effect {effect_dir.name}: {e}") errors.append(f"Effect {cid[:16]}...: {e}")
except Exception as e: except Exception as e:
errors.append(f"Failed to delete effects: {e}") errors.append(f"Failed to delete effects: {e}")
# Delete all media/cache items for user # Delete all media/cache items for user (uses ownership model)
try: try:
items = await database.get_user_items(actor_id, limit=10000) from ..services.cache_service import CacheService
cache_service = CacheService(database, cache_manager)
# Get user's media items (video, image, audio)
for media_type in ["video", "image", "audio", "unknown"]:
items = await database.get_user_items(actor_id, item_type=media_type, limit=10000)
for item in items: for item in items:
try:
cid = item.get("cid") cid = item.get("cid")
if cid: if cid:
await database.delete_cache_item(cid) try:
success, error = await cache_service.delete_content(cid, actor_id)
if success:
deleted["media"] += 1 deleted["media"] += 1
elif error:
errors.append(f"Media {cid[:16]}...: {error}")
except Exception as e: except Exception as e:
errors.append(f"Media {item.get('cid', 'unknown')}: {e}") errors.append(f"Media {cid[:16]}...: {e}")
except Exception as e: except Exception as e:
errors.append(f"Failed to list media: {e}") errors.append(f"Failed to delete media: {e}")
logger.info(f"Cleared data for {actor_id}: {deleted}") logger.info(f"Cleared data for {actor_id}: {deleted}")
if errors: if errors:

View File

@@ -416,23 +416,45 @@ class CacheService:
return l2_result.get("ipfs_cid") or ipfs_cid, None return l2_result.get("ipfs_cid") or ipfs_cid, None
async def delete_content(self, cid: str, actor_id: str) -> Tuple[bool, Optional[str]]: async def delete_content(self, cid: str, actor_id: str) -> Tuple[bool, Optional[str]]:
"""Delete content from cache. Returns (success, error).""" """
if not self.cache.has_content(cid): Remove user's ownership link to cached content.
return False, "Content not found"
# Check if pinned This removes the item_types entry linking the user to the content.
The cached file is only deleted if no other users own it.
Returns (success, error).
"""
import logging
logger = logging.getLogger(__name__)
# Check if pinned for this user
meta = await self.db.load_item_metadata(cid, actor_id) meta = await self.db.load_item_metadata(cid, actor_id)
if meta and meta.get("pinned"): if meta and meta.get("pinned"):
pin_reason = meta.get("pin_reason", "unknown") pin_reason = meta.get("pin_reason", "unknown")
return False, f"Cannot discard pinned item (reason: {pin_reason})" return False, f"Cannot discard pinned item (reason: {pin_reason})"
# Get the item type to delete the right ownership entry
item_types = await self.db.get_item_types(cid, actor_id)
if not item_types:
return False, "You don't own this content"
# Remove user's ownership links (all types for this user)
for item in item_types:
item_type = item.get("type", "media")
await self.db.delete_item_type(cid, actor_id, item_type)
# Remove friendly name
await self.db.delete_friendly_name(actor_id, cid)
# Check if anyone else still owns this content
remaining_owners = await self.db.get_item_types(cid)
# Only delete the actual file if no one owns it anymore
if not remaining_owners:
# Check deletion rules via cache_manager # Check deletion rules via cache_manager
can_delete, reason = self.cache.can_delete(cid) can_delete, reason = self.cache.can_delete(cid)
if not can_delete: if can_delete:
return False, f"Cannot discard: {reason}"
# Delete via cache_manager # Delete via cache_manager
success, msg = self.cache.delete_by_cid(cid) self.cache.delete_by_cid(cid)
# Clean up legacy metadata files # Clean up legacy metadata files
meta_path = self.cache_dir / f"{cid}.meta.json" meta_path = self.cache_dir / f"{cid}.meta.json"
@@ -442,6 +464,14 @@ class CacheService:
if mp4_path.exists(): if mp4_path.exists():
mp4_path.unlink() mp4_path.unlink()
# Delete from database
await self.db.delete_cache_item(cid)
logger.info(f"Garbage collected content {cid[:16]}... (no remaining owners)")
else:
logger.info(f"Content {cid[:16]}... orphaned but cannot delete: {reason}")
logger.info(f"Removed content {cid[:16]}... ownership for {actor_id}")
return True, None return True, None
async def import_from_ipfs(self, ipfs_cid: str, actor_id: str) -> Tuple[Optional[str], Optional[str]]: async def import_from_ipfs(self, ipfs_cid: str, actor_id: str) -> Tuple[Optional[str], Optional[str]]:

View File

@@ -467,10 +467,13 @@ class RunService:
username: str, username: str,
) -> Tuple[bool, Optional[str]]: ) -> Tuple[bool, Optional[str]]:
""" """
Discard (delete) a run record. Discard (delete) a run record and clean up outputs/intermediates.
Note: This removes the run record but not the output content. Outputs and intermediates are only deleted if not used by other runs.
""" """
import logging
logger = logging.getLogger(__name__)
run = await self.get_run(run_id) run = await self.get_run(run_id)
if not run: if not run:
return False, f"Run {run_id} not found" return False, f"Run {run_id} not found"
@@ -480,6 +483,18 @@ class RunService:
if run_owner and run_owner not in (username, actor_id): if run_owner and run_owner not in (username, actor_id):
return False, "Access denied" return False, "Access denied"
# Clean up activity outputs/intermediates (only if orphaned)
# The activity_id is the same as run_id
try:
success, msg = self.cache.discard_activity_outputs_only(run_id)
if success:
logger.info(f"Cleaned up run {run_id}: {msg}")
else:
# Activity might not exist (old runs), that's OK
logger.debug(f"No activity cleanup for {run_id}: {msg}")
except Exception as e:
logger.warning(f"Failed to cleanup activity for {run_id}: {e}")
# Remove task_id mapping from Redis # Remove task_id mapping from Redis
self.redis.delete(f"{self.task_key_prefix}{run_id}") self.redis.delete(f"{self.task_key_prefix}{run_id}")
@@ -487,8 +502,7 @@ class RunService:
try: try:
await self.db.delete_run_cache(run_id) await self.db.delete_run_cache(run_id)
except Exception as e: except Exception as e:
import logging logger.warning(f"Failed to delete run_cache for {run_id}: {e}")
logging.getLogger(__name__).warning(f"Failed to delete run_cache for {run_id}: {e}")
# Remove pending run if exists # Remove pending run if exists
try: try:

View File

@@ -690,11 +690,26 @@ class L1CacheManager:
return True, "Activity discarded" return True, "Activity discarded"
return False, "Failed to discard" return False, "Failed to discard"
def _is_used_by_other_activities(self, node_id: str, exclude_activity_id: str) -> bool:
"""Check if a node is used by any activity other than the excluded one."""
for other_activity in self.activity_store.list():
if other_activity.activity_id == exclude_activity_id:
continue
# Check if used as input, output, or intermediate
if node_id in other_activity.input_ids:
return True
if node_id == other_activity.output_id:
return True
if node_id in other_activity.intermediate_ids:
return True
return False
def discard_activity_outputs_only(self, activity_id: str) -> tuple[bool, str]: def discard_activity_outputs_only(self, activity_id: str) -> tuple[bool, str]:
""" """
Discard an activity, deleting only outputs and intermediates. Discard an activity, deleting only outputs and intermediates.
Inputs (cache items, configs) are preserved. Inputs (cache items, configs) are preserved.
Outputs/intermediates used by other activities are preserved.
Returns: Returns:
(success, message) tuple (success, message) tuple
@@ -711,8 +726,14 @@ class L1CacheManager:
if pinned: if pinned:
return False, f"Output is pinned ({reason})" return False, f"Output is pinned ({reason})"
# Delete output deleted_outputs = 0
preserved_shared = 0
# Delete output (only if not used by other activities)
if activity.output_id: if activity.output_id:
if self._is_used_by_other_activities(activity.output_id, activity_id):
preserved_shared += 1
else:
entry = self.cache.get_entry(activity.output_id) entry = self.cache.get_entry(activity.output_id)
if entry: if entry:
# Remove from cache # Remove from cache
@@ -723,9 +744,13 @@ class L1CacheManager:
legacy_path = self.legacy_dir / entry.cid legacy_path = self.legacy_dir / entry.cid
if legacy_path.exists(): if legacy_path.exists():
legacy_path.unlink() legacy_path.unlink()
deleted_outputs += 1
# Delete intermediates # Delete intermediates (only if not used by other activities)
for node_id in activity.intermediate_ids: for node_id in activity.intermediate_ids:
if self._is_used_by_other_activities(node_id, activity_id):
preserved_shared += 1
continue
entry = self.cache.get_entry(node_id) entry = self.cache.get_entry(node_id)
if entry: if entry:
self.cache.remove(node_id) self.cache.remove(node_id)
@@ -733,11 +758,16 @@ class L1CacheManager:
legacy_path = self.legacy_dir / entry.cid legacy_path = self.legacy_dir / entry.cid
if legacy_path.exists(): if legacy_path.exists():
legacy_path.unlink() legacy_path.unlink()
deleted_outputs += 1
# Remove activity record (inputs remain in cache) # Remove activity record (inputs remain in cache)
self.activity_store.remove(activity_id) self.activity_store.remove(activity_id)
return True, "Activity discarded (outputs only)" msg = f"Activity discarded (deleted {deleted_outputs} outputs"
if preserved_shared > 0:
msg += f", preserved {preserved_shared} shared items"
msg += ")"
return True, msg
def cleanup_intermediates(self) -> int: def cleanup_intermediates(self) -> int:
"""Delete all intermediate cache entries (reconstructible).""" """Delete all intermediate cache entries (reconstructible)."""