diff --git a/app/routers/home.py b/app/routers/home.py index 7a62d0f..f1f2535 100644 --- a/app/routers/home.py +++ b/app/routers/home.py @@ -132,35 +132,55 @@ async def clear_user_data(request: Request): except Exception as e: errors.append(f"Failed to list recipes: {e}") - # Delete all effects + # Delete all effects (uses ownership model) try: - cache_manager = get_cache_manager() - effects_dir = Path(cache_manager.cache_dir) / "_effects" - if effects_dir.exists(): - import shutil - for effect_dir in effects_dir.iterdir(): - if effect_dir.is_dir(): - try: - shutil.rmtree(effect_dir) - deleted["effects"] += 1 - except Exception as e: - errors.append(f"Effect {effect_dir.name}: {e}") + # Get user's effects from item_types + effect_items = await database.get_user_items(actor_id, item_type="effect", limit=10000) + for item in effect_items: + cid = item.get("cid") + if cid: + try: + # Remove ownership link + await database.delete_item_type(cid, actor_id, "effect") + await database.delete_friendly_name(actor_id, cid) + + # Check if orphaned + remaining = await database.get_item_types(cid) + if not remaining: + # Garbage collect + effects_dir = Path(cache_manager.cache_dir) / "_effects" / cid + if effects_dir.exists(): + import shutil + shutil.rmtree(effects_dir) + import ipfs_client + ipfs_client.unpin(cid) + deleted["effects"] += 1 + except Exception as e: + errors.append(f"Effect {cid[:16]}...: {e}") except Exception as e: errors.append(f"Failed to delete effects: {e}") - # Delete all media/cache items for user + # Delete all media/cache items for user (uses ownership model) try: - items = await database.get_user_items(actor_id, limit=10000) - for item in items: - try: + from ..services.cache_service import CacheService + cache_service = CacheService(database, cache_manager) + + # Get user's media items (video, image, audio) + for media_type in ["video", "image", "audio", "unknown"]: + items = await database.get_user_items(actor_id, item_type=media_type, limit=10000) + for item in items: cid = item.get("cid") if cid: - await database.delete_cache_item(cid) - deleted["media"] += 1 - except Exception as e: - errors.append(f"Media {item.get('cid', 'unknown')}: {e}") + try: + success, error = await cache_service.delete_content(cid, actor_id) + if success: + deleted["media"] += 1 + elif error: + errors.append(f"Media {cid[:16]}...: {error}") + except Exception as e: + errors.append(f"Media {cid[:16]}...: {e}") except Exception as e: - errors.append(f"Failed to list media: {e}") + errors.append(f"Failed to delete media: {e}") logger.info(f"Cleared data for {actor_id}: {deleted}") if errors: diff --git a/app/services/cache_service.py b/app/services/cache_service.py index 516509c..1faed97 100644 --- a/app/services/cache_service.py +++ b/app/services/cache_service.py @@ -416,32 +416,62 @@ class CacheService: return l2_result.get("ipfs_cid") or ipfs_cid, None async def delete_content(self, cid: str, actor_id: str) -> Tuple[bool, Optional[str]]: - """Delete content from cache. Returns (success, error).""" - if not self.cache.has_content(cid): - return False, "Content not found" + """ + Remove user's ownership link to cached content. - # Check if pinned + This removes the item_types entry linking the user to the content. + The cached file is only deleted if no other users own it. + Returns (success, error). + """ + import logging + logger = logging.getLogger(__name__) + + # Check if pinned for this user meta = await self.db.load_item_metadata(cid, actor_id) if meta and meta.get("pinned"): pin_reason = meta.get("pin_reason", "unknown") return False, f"Cannot discard pinned item (reason: {pin_reason})" - # Check deletion rules via cache_manager - can_delete, reason = self.cache.can_delete(cid) - if not can_delete: - return False, f"Cannot discard: {reason}" + # Get the item type to delete the right ownership entry + item_types = await self.db.get_item_types(cid, actor_id) + if not item_types: + return False, "You don't own this content" - # Delete via cache_manager - success, msg = self.cache.delete_by_cid(cid) + # Remove user's ownership links (all types for this user) + for item in item_types: + item_type = item.get("type", "media") + await self.db.delete_item_type(cid, actor_id, item_type) - # Clean up legacy metadata files - meta_path = self.cache_dir / f"{cid}.meta.json" - if meta_path.exists(): - meta_path.unlink() - mp4_path = self.cache_dir / f"{cid}.mp4" - if mp4_path.exists(): - mp4_path.unlink() + # Remove friendly name + await self.db.delete_friendly_name(actor_id, cid) + # Check if anyone else still owns this content + remaining_owners = await self.db.get_item_types(cid) + + # Only delete the actual file if no one owns it anymore + if not remaining_owners: + # Check deletion rules via cache_manager + can_delete, reason = self.cache.can_delete(cid) + if can_delete: + # Delete via cache_manager + self.cache.delete_by_cid(cid) + + # Clean up legacy metadata files + meta_path = self.cache_dir / f"{cid}.meta.json" + if meta_path.exists(): + meta_path.unlink() + mp4_path = self.cache_dir / f"{cid}.mp4" + if mp4_path.exists(): + mp4_path.unlink() + + # Delete from database + await self.db.delete_cache_item(cid) + + logger.info(f"Garbage collected content {cid[:16]}... (no remaining owners)") + else: + logger.info(f"Content {cid[:16]}... orphaned but cannot delete: {reason}") + + logger.info(f"Removed content {cid[:16]}... ownership for {actor_id}") return True, None async def import_from_ipfs(self, ipfs_cid: str, actor_id: str) -> Tuple[Optional[str], Optional[str]]: diff --git a/app/services/run_service.py b/app/services/run_service.py index af8cd03..9193d4f 100644 --- a/app/services/run_service.py +++ b/app/services/run_service.py @@ -467,10 +467,13 @@ class RunService: username: str, ) -> Tuple[bool, Optional[str]]: """ - Discard (delete) a run record. + Discard (delete) a run record and clean up outputs/intermediates. - Note: This removes the run record but not the output content. + Outputs and intermediates are only deleted if not used by other runs. """ + import logging + logger = logging.getLogger(__name__) + run = await self.get_run(run_id) if not run: return False, f"Run {run_id} not found" @@ -480,6 +483,18 @@ class RunService: if run_owner and run_owner not in (username, actor_id): return False, "Access denied" + # Clean up activity outputs/intermediates (only if orphaned) + # The activity_id is the same as run_id + try: + success, msg = self.cache.discard_activity_outputs_only(run_id) + if success: + logger.info(f"Cleaned up run {run_id}: {msg}") + else: + # Activity might not exist (old runs), that's OK + logger.debug(f"No activity cleanup for {run_id}: {msg}") + except Exception as e: + logger.warning(f"Failed to cleanup activity for {run_id}: {e}") + # Remove task_id mapping from Redis self.redis.delete(f"{self.task_key_prefix}{run_id}") @@ -487,8 +502,7 @@ class RunService: try: await self.db.delete_run_cache(run_id) except Exception as e: - import logging - logging.getLogger(__name__).warning(f"Failed to delete run_cache for {run_id}: {e}") + logger.warning(f"Failed to delete run_cache for {run_id}: {e}") # Remove pending run if exists try: diff --git a/cache_manager.py b/cache_manager.py index d754e30..a0a7bde 100644 --- a/cache_manager.py +++ b/cache_manager.py @@ -690,11 +690,26 @@ class L1CacheManager: return True, "Activity discarded" return False, "Failed to discard" + def _is_used_by_other_activities(self, node_id: str, exclude_activity_id: str) -> bool: + """Check if a node is used by any activity other than the excluded one.""" + for other_activity in self.activity_store.list(): + if other_activity.activity_id == exclude_activity_id: + continue + # Check if used as input, output, or intermediate + if node_id in other_activity.input_ids: + return True + if node_id == other_activity.output_id: + return True + if node_id in other_activity.intermediate_ids: + return True + return False + def discard_activity_outputs_only(self, activity_id: str) -> tuple[bool, str]: """ Discard an activity, deleting only outputs and intermediates. Inputs (cache items, configs) are preserved. + Outputs/intermediates used by other activities are preserved. Returns: (success, message) tuple @@ -711,21 +726,31 @@ class L1CacheManager: if pinned: return False, f"Output is pinned ({reason})" - # Delete output - if activity.output_id: - entry = self.cache.get_entry(activity.output_id) - if entry: - # Remove from cache - self.cache.remove(activity.output_id) - # Remove from content index (Redis + local) - self._del_content_index(entry.cid) - # Delete from legacy dir if exists - legacy_path = self.legacy_dir / entry.cid - if legacy_path.exists(): - legacy_path.unlink() + deleted_outputs = 0 + preserved_shared = 0 - # Delete intermediates + # Delete output (only if not used by other activities) + if activity.output_id: + if self._is_used_by_other_activities(activity.output_id, activity_id): + preserved_shared += 1 + else: + entry = self.cache.get_entry(activity.output_id) + if entry: + # Remove from cache + self.cache.remove(activity.output_id) + # Remove from content index (Redis + local) + self._del_content_index(entry.cid) + # Delete from legacy dir if exists + legacy_path = self.legacy_dir / entry.cid + if legacy_path.exists(): + legacy_path.unlink() + deleted_outputs += 1 + + # Delete intermediates (only if not used by other activities) for node_id in activity.intermediate_ids: + if self._is_used_by_other_activities(node_id, activity_id): + preserved_shared += 1 + continue entry = self.cache.get_entry(node_id) if entry: self.cache.remove(node_id) @@ -733,11 +758,16 @@ class L1CacheManager: legacy_path = self.legacy_dir / entry.cid if legacy_path.exists(): legacy_path.unlink() + deleted_outputs += 1 # Remove activity record (inputs remain in cache) self.activity_store.remove(activity_id) - return True, "Activity discarded (outputs only)" + msg = f"Activity discarded (deleted {deleted_outputs} outputs" + if preserved_shared > 0: + msg += f", preserved {preserved_shared} shared items" + msg += ")" + return True, msg def cleanup_intermediates(self) -> int: """Delete all intermediate cache entries (reconstructible)."""