Compare commits

...

2 Commits

Author SHA1 Message Date
gilesb
8bf6f87c2a Implement ownership model for all cached content deletion
- cache_service.delete_content: Remove user's ownership link first,
  only delete actual file if no other owners remain

- cache_manager.discard_activity_outputs_only: Check if outputs and
  intermediates are used by other activities before deleting

- run_service.discard_run: Now cleans up run outputs/intermediates
  (only if not shared by other runs)

- home.py clear_user_data: Use ownership model for effects and media
  deletion instead of directly deleting files

The ownership model ensures:
1. Multiple users can "own" the same cached content
2. Deleting removes the user's ownership link (item_types entry)
3. Actual files only deleted when no owners remain (garbage collection)
4. Shared intermediates between runs are preserved

Co-Authored-By: Claude Opus 4.5 <noreply@anthropic.com>
2026-01-12 20:02:27 +00:00
gilesb
abe89c9177 Fix effects router to use proper ownership model
- Upload: Create item_types entry to track user-effect relationship
- List: Query item_types for user's effects instead of scanning filesystem
- Delete: Remove ownership link, only delete files if orphaned (garbage collect)

This matches the ownership model used by recipes and media, where multiple
users can "own" the same cached content through item_types entries.

Co-Authored-By: Claude Opus 4.5 <noreply@anthropic.com>
2026-01-12 19:56:12 +00:00
5 changed files with 200 additions and 91 deletions

View File

@@ -200,6 +200,15 @@ async def upload_effect(
# Also store metadata in IPFS for discoverability
meta_cid = ipfs_client.add_json(full_meta)
# Track ownership in item_types
import database
await database.save_item_metadata(
cid=cid,
actor_id=ctx.actor_id,
item_type="effect",
filename=file.filename,
)
# Assign friendly name
from ..services.naming_service import get_naming_service
naming = get_naming_service()
@@ -314,31 +323,33 @@ async def list_effects(
limit: int = 20,
ctx: UserContext = Depends(require_auth),
):
"""List uploaded effects with pagination."""
"""List user's effects with pagination."""
import database
effects_dir = get_effects_dir()
effects = []
# Get user's effect CIDs from item_types
user_items = await database.get_user_items(ctx.actor_id, item_type="effect", limit=1000)
effect_cids = [item["cid"] for item in user_items]
# Get naming service for friendly name lookup
from ..services.naming_service import get_naming_service
naming = get_naming_service()
if effects_dir.exists():
for effect_dir in effects_dir.iterdir():
if effect_dir.is_dir():
metadata_path = effect_dir / "metadata.json"
if metadata_path.exists():
try:
meta = json.loads(metadata_path.read_text())
# Add friendly name if available
cid = meta.get("cid")
if cid:
friendly = await naming.get_by_cid(ctx.actor_id, cid)
if friendly:
meta["friendly_name"] = friendly["friendly_name"]
meta["base_name"] = friendly["base_name"]
effects.append(meta)
except json.JSONDecodeError:
pass
for cid in effect_cids:
effect_dir = effects_dir / cid
metadata_path = effect_dir / "metadata.json"
if metadata_path.exists():
try:
meta = json.loads(metadata_path.read_text())
# Add friendly name if available
friendly = await naming.get_by_cid(ctx.actor_id, cid)
if friendly:
meta["friendly_name"] = friendly["friendly_name"]
meta["base_name"] = friendly["base_name"]
effects.append(meta)
except json.JSONDecodeError:
pass
# Sort by upload time (newest first)
effects.sort(key=lambda e: e.get("uploaded_at", ""), reverse=True)
@@ -412,25 +423,29 @@ async def delete_effect(
cid: str,
ctx: UserContext = Depends(require_auth),
):
"""Delete an effect from local cache (IPFS content is immutable)."""
effects_dir = get_effects_dir()
effect_dir = effects_dir / cid
"""Remove user's ownership link to an effect."""
import database
if not effect_dir.exists():
raise HTTPException(404, f"Effect {cid[:16]}... not found in local cache")
# Remove user's ownership link from item_types
await database.delete_item_type(cid, ctx.actor_id, "effect")
# Check ownership
metadata_path = effect_dir / "metadata.json"
if metadata_path.exists():
meta = json.loads(metadata_path.read_text())
if meta.get("uploader") != ctx.actor_id:
raise HTTPException(403, "Can only delete your own effects")
# Remove friendly name
await database.delete_friendly_name(ctx.actor_id, cid)
import shutil
shutil.rmtree(effect_dir)
# Check if anyone still owns this effect
remaining_owners = await database.get_item_types(cid)
# Unpin from IPFS (content remains available if pinned elsewhere)
ipfs_client.unpin(cid)
# Only delete local files if no one owns it anymore
if not remaining_owners:
effects_dir = get_effects_dir()
effect_dir = effects_dir / cid
if effect_dir.exists():
import shutil
shutil.rmtree(effect_dir)
logger.info(f"Deleted effect {cid[:16]}... by {ctx.actor_id}")
return {"deleted": True, "note": "Unpinned from local IPFS; content may still exist on other nodes"}
# Unpin from IPFS
ipfs_client.unpin(cid)
logger.info(f"Garbage collected effect {cid[:16]}... (no remaining owners)")
logger.info(f"Removed effect {cid[:16]}... ownership for {ctx.actor_id}")
return {"deleted": True}

View File

@@ -132,35 +132,55 @@ async def clear_user_data(request: Request):
except Exception as e:
errors.append(f"Failed to list recipes: {e}")
# Delete all effects
# Delete all effects (uses ownership model)
try:
cache_manager = get_cache_manager()
effects_dir = Path(cache_manager.cache_dir) / "_effects"
if effects_dir.exists():
import shutil
for effect_dir in effects_dir.iterdir():
if effect_dir.is_dir():
try:
shutil.rmtree(effect_dir)
deleted["effects"] += 1
except Exception as e:
errors.append(f"Effect {effect_dir.name}: {e}")
# Get user's effects from item_types
effect_items = await database.get_user_items(actor_id, item_type="effect", limit=10000)
for item in effect_items:
cid = item.get("cid")
if cid:
try:
# Remove ownership link
await database.delete_item_type(cid, actor_id, "effect")
await database.delete_friendly_name(actor_id, cid)
# Check if orphaned
remaining = await database.get_item_types(cid)
if not remaining:
# Garbage collect
effects_dir = Path(cache_manager.cache_dir) / "_effects" / cid
if effects_dir.exists():
import shutil
shutil.rmtree(effects_dir)
import ipfs_client
ipfs_client.unpin(cid)
deleted["effects"] += 1
except Exception as e:
errors.append(f"Effect {cid[:16]}...: {e}")
except Exception as e:
errors.append(f"Failed to delete effects: {e}")
# Delete all media/cache items for user
# Delete all media/cache items for user (uses ownership model)
try:
items = await database.get_user_items(actor_id, limit=10000)
for item in items:
try:
from ..services.cache_service import CacheService
cache_service = CacheService(database, cache_manager)
# Get user's media items (video, image, audio)
for media_type in ["video", "image", "audio", "unknown"]:
items = await database.get_user_items(actor_id, item_type=media_type, limit=10000)
for item in items:
cid = item.get("cid")
if cid:
await database.delete_cache_item(cid)
deleted["media"] += 1
except Exception as e:
errors.append(f"Media {item.get('cid', 'unknown')}: {e}")
try:
success, error = await cache_service.delete_content(cid, actor_id)
if success:
deleted["media"] += 1
elif error:
errors.append(f"Media {cid[:16]}...: {error}")
except Exception as e:
errors.append(f"Media {cid[:16]}...: {e}")
except Exception as e:
errors.append(f"Failed to list media: {e}")
errors.append(f"Failed to delete media: {e}")
logger.info(f"Cleared data for {actor_id}: {deleted}")
if errors:

View File

@@ -416,32 +416,62 @@ class CacheService:
return l2_result.get("ipfs_cid") or ipfs_cid, None
async def delete_content(self, cid: str, actor_id: str) -> Tuple[bool, Optional[str]]:
"""Delete content from cache. Returns (success, error)."""
if not self.cache.has_content(cid):
return False, "Content not found"
"""
Remove user's ownership link to cached content.
# Check if pinned
This removes the item_types entry linking the user to the content.
The cached file is only deleted if no other users own it.
Returns (success, error).
"""
import logging
logger = logging.getLogger(__name__)
# Check if pinned for this user
meta = await self.db.load_item_metadata(cid, actor_id)
if meta and meta.get("pinned"):
pin_reason = meta.get("pin_reason", "unknown")
return False, f"Cannot discard pinned item (reason: {pin_reason})"
# Check deletion rules via cache_manager
can_delete, reason = self.cache.can_delete(cid)
if not can_delete:
return False, f"Cannot discard: {reason}"
# Get the item type to delete the right ownership entry
item_types = await self.db.get_item_types(cid, actor_id)
if not item_types:
return False, "You don't own this content"
# Delete via cache_manager
success, msg = self.cache.delete_by_cid(cid)
# Remove user's ownership links (all types for this user)
for item in item_types:
item_type = item.get("type", "media")
await self.db.delete_item_type(cid, actor_id, item_type)
# Clean up legacy metadata files
meta_path = self.cache_dir / f"{cid}.meta.json"
if meta_path.exists():
meta_path.unlink()
mp4_path = self.cache_dir / f"{cid}.mp4"
if mp4_path.exists():
mp4_path.unlink()
# Remove friendly name
await self.db.delete_friendly_name(actor_id, cid)
# Check if anyone else still owns this content
remaining_owners = await self.db.get_item_types(cid)
# Only delete the actual file if no one owns it anymore
if not remaining_owners:
# Check deletion rules via cache_manager
can_delete, reason = self.cache.can_delete(cid)
if can_delete:
# Delete via cache_manager
self.cache.delete_by_cid(cid)
# Clean up legacy metadata files
meta_path = self.cache_dir / f"{cid}.meta.json"
if meta_path.exists():
meta_path.unlink()
mp4_path = self.cache_dir / f"{cid}.mp4"
if mp4_path.exists():
mp4_path.unlink()
# Delete from database
await self.db.delete_cache_item(cid)
logger.info(f"Garbage collected content {cid[:16]}... (no remaining owners)")
else:
logger.info(f"Content {cid[:16]}... orphaned but cannot delete: {reason}")
logger.info(f"Removed content {cid[:16]}... ownership for {actor_id}")
return True, None
async def import_from_ipfs(self, ipfs_cid: str, actor_id: str) -> Tuple[Optional[str], Optional[str]]:

View File

@@ -467,10 +467,13 @@ class RunService:
username: str,
) -> Tuple[bool, Optional[str]]:
"""
Discard (delete) a run record.
Discard (delete) a run record and clean up outputs/intermediates.
Note: This removes the run record but not the output content.
Outputs and intermediates are only deleted if not used by other runs.
"""
import logging
logger = logging.getLogger(__name__)
run = await self.get_run(run_id)
if not run:
return False, f"Run {run_id} not found"
@@ -480,6 +483,18 @@ class RunService:
if run_owner and run_owner not in (username, actor_id):
return False, "Access denied"
# Clean up activity outputs/intermediates (only if orphaned)
# The activity_id is the same as run_id
try:
success, msg = self.cache.discard_activity_outputs_only(run_id)
if success:
logger.info(f"Cleaned up run {run_id}: {msg}")
else:
# Activity might not exist (old runs), that's OK
logger.debug(f"No activity cleanup for {run_id}: {msg}")
except Exception as e:
logger.warning(f"Failed to cleanup activity for {run_id}: {e}")
# Remove task_id mapping from Redis
self.redis.delete(f"{self.task_key_prefix}{run_id}")
@@ -487,8 +502,7 @@ class RunService:
try:
await self.db.delete_run_cache(run_id)
except Exception as e:
import logging
logging.getLogger(__name__).warning(f"Failed to delete run_cache for {run_id}: {e}")
logger.warning(f"Failed to delete run_cache for {run_id}: {e}")
# Remove pending run if exists
try:

View File

@@ -690,11 +690,26 @@ class L1CacheManager:
return True, "Activity discarded"
return False, "Failed to discard"
def _is_used_by_other_activities(self, node_id: str, exclude_activity_id: str) -> bool:
"""Check if a node is used by any activity other than the excluded one."""
for other_activity in self.activity_store.list():
if other_activity.activity_id == exclude_activity_id:
continue
# Check if used as input, output, or intermediate
if node_id in other_activity.input_ids:
return True
if node_id == other_activity.output_id:
return True
if node_id in other_activity.intermediate_ids:
return True
return False
def discard_activity_outputs_only(self, activity_id: str) -> tuple[bool, str]:
"""
Discard an activity, deleting only outputs and intermediates.
Inputs (cache items, configs) are preserved.
Outputs/intermediates used by other activities are preserved.
Returns:
(success, message) tuple
@@ -711,21 +726,31 @@ class L1CacheManager:
if pinned:
return False, f"Output is pinned ({reason})"
# Delete output
if activity.output_id:
entry = self.cache.get_entry(activity.output_id)
if entry:
# Remove from cache
self.cache.remove(activity.output_id)
# Remove from content index (Redis + local)
self._del_content_index(entry.cid)
# Delete from legacy dir if exists
legacy_path = self.legacy_dir / entry.cid
if legacy_path.exists():
legacy_path.unlink()
deleted_outputs = 0
preserved_shared = 0
# Delete intermediates
# Delete output (only if not used by other activities)
if activity.output_id:
if self._is_used_by_other_activities(activity.output_id, activity_id):
preserved_shared += 1
else:
entry = self.cache.get_entry(activity.output_id)
if entry:
# Remove from cache
self.cache.remove(activity.output_id)
# Remove from content index (Redis + local)
self._del_content_index(entry.cid)
# Delete from legacy dir if exists
legacy_path = self.legacy_dir / entry.cid
if legacy_path.exists():
legacy_path.unlink()
deleted_outputs += 1
# Delete intermediates (only if not used by other activities)
for node_id in activity.intermediate_ids:
if self._is_used_by_other_activities(node_id, activity_id):
preserved_shared += 1
continue
entry = self.cache.get_entry(node_id)
if entry:
self.cache.remove(node_id)
@@ -733,11 +758,16 @@ class L1CacheManager:
legacy_path = self.legacy_dir / entry.cid
if legacy_path.exists():
legacy_path.unlink()
deleted_outputs += 1
# Remove activity record (inputs remain in cache)
self.activity_store.remove(activity_id)
return True, "Activity discarded (outputs only)"
msg = f"Activity discarded (deleted {deleted_outputs} outputs"
if preserved_shared > 0:
msg += f", preserved {preserved_shared} shared items"
msg += ")"
return True, msg
def cleanup_intermediates(self) -> int:
"""Delete all intermediate cache entries (reconstructible)."""