- cache_service.delete_content: Remove user's ownership link first, only delete actual file if no other owners remain - cache_manager.discard_activity_outputs_only: Check if outputs and intermediates are used by other activities before deleting - run_service.discard_run: Now cleans up run outputs/intermediates (only if not shared by other runs) - home.py clear_user_data: Use ownership model for effects and media deletion instead of directly deleting files The ownership model ensures: 1. Multiple users can "own" the same cached content 2. Deleting removes the user's ownership link (item_types entry) 3. Actual files only deleted when no owners remain (garbage collection) 4. Shared intermediates between runs are preserved Co-Authored-By: Claude Opus 4.5 <noreply@anthropic.com>
586 lines
21 KiB
Python
586 lines
21 KiB
Python
"""
|
|
Cache Service - business logic for cache and media management.
|
|
"""
|
|
|
|
import asyncio
|
|
import json
|
|
import os
|
|
import subprocess
|
|
from pathlib import Path
|
|
from typing import Optional, List, Dict, Any, Tuple, TYPE_CHECKING
|
|
|
|
import httpx
|
|
|
|
if TYPE_CHECKING:
|
|
from database import Database
|
|
from cache_manager import L1CacheManager
|
|
|
|
|
|
def detect_media_type(cache_path: Path) -> str:
|
|
"""Detect if file is image, video, or audio based on magic bytes."""
|
|
try:
|
|
with open(cache_path, "rb") as f:
|
|
header = f.read(32)
|
|
except Exception:
|
|
return "unknown"
|
|
|
|
# Video signatures
|
|
if header[:4] == b'\x1a\x45\xdf\xa3': # WebM/MKV
|
|
return "video"
|
|
if len(header) > 8 and header[4:8] == b'ftyp': # MP4/MOV
|
|
return "video"
|
|
if header[:4] == b'RIFF' and len(header) > 12 and header[8:12] == b'AVI ': # AVI
|
|
return "video"
|
|
|
|
# Image signatures
|
|
if header[:8] == b'\x89PNG\r\n\x1a\n': # PNG
|
|
return "image"
|
|
if header[:2] == b'\xff\xd8': # JPEG
|
|
return "image"
|
|
if header[:6] in (b'GIF87a', b'GIF89a'): # GIF
|
|
return "image"
|
|
if header[:4] == b'RIFF' and len(header) > 12 and header[8:12] == b'WEBP': # WebP
|
|
return "image"
|
|
|
|
# Audio signatures
|
|
if header[:4] == b'RIFF' and len(header) > 12 and header[8:12] == b'WAVE': # WAV
|
|
return "audio"
|
|
if header[:3] == b'ID3' or header[:2] == b'\xff\xfb': # MP3
|
|
return "audio"
|
|
if header[:4] == b'fLaC': # FLAC
|
|
return "audio"
|
|
|
|
return "unknown"
|
|
|
|
|
|
def get_mime_type(path: Path) -> str:
|
|
"""Get MIME type based on file magic bytes."""
|
|
media_type = detect_media_type(path)
|
|
if media_type == "video":
|
|
try:
|
|
with open(path, "rb") as f:
|
|
header = f.read(12)
|
|
if header[:4] == b'\x1a\x45\xdf\xa3':
|
|
return "video/x-matroska"
|
|
return "video/mp4"
|
|
except Exception:
|
|
return "video/mp4"
|
|
elif media_type == "image":
|
|
try:
|
|
with open(path, "rb") as f:
|
|
header = f.read(8)
|
|
if header[:8] == b'\x89PNG\r\n\x1a\n':
|
|
return "image/png"
|
|
if header[:2] == b'\xff\xd8':
|
|
return "image/jpeg"
|
|
if header[:6] in (b'GIF87a', b'GIF89a'):
|
|
return "image/gif"
|
|
return "image/jpeg"
|
|
except Exception:
|
|
return "image/jpeg"
|
|
elif media_type == "audio":
|
|
return "audio/mpeg"
|
|
return "application/octet-stream"
|
|
|
|
|
|
class CacheService:
|
|
"""
|
|
Service for managing cached content.
|
|
|
|
Handles content retrieval, metadata, and media type detection.
|
|
"""
|
|
|
|
def __init__(self, database: "Database", cache_manager: "L1CacheManager") -> None:
|
|
self.db = database
|
|
self.cache = cache_manager
|
|
self.cache_dir = Path(os.environ.get("CACHE_DIR", "/tmp/artdag-cache"))
|
|
|
|
async def get_cache_item(self, cid: str, actor_id: str = None) -> Optional[Dict[str, Any]]:
|
|
"""Get cached item with full metadata for display."""
|
|
# Check if content exists
|
|
if not self.cache.has_content(cid):
|
|
return None
|
|
|
|
path = self.cache.get_by_cid(cid)
|
|
if not path or not path.exists():
|
|
return None
|
|
|
|
# Get metadata from database
|
|
meta = await self.db.load_item_metadata(cid, actor_id)
|
|
cache_item = await self.db.get_cache_item(cid)
|
|
|
|
media_type = detect_media_type(path)
|
|
mime_type = get_mime_type(path)
|
|
size = path.stat().st_size
|
|
|
|
result = {
|
|
"cid": cid,
|
|
"path": str(path),
|
|
"media_type": media_type,
|
|
"mime_type": mime_type,
|
|
"size": size,
|
|
"ipfs_cid": cache_item.get("ipfs_cid") if cache_item else None,
|
|
"meta": meta,
|
|
}
|
|
|
|
# Unpack meta fields to top level for template convenience
|
|
if meta:
|
|
result["title"] = meta.get("title")
|
|
result["description"] = meta.get("description")
|
|
result["tags"] = meta.get("tags", [])
|
|
result["source_type"] = meta.get("source_type")
|
|
result["source_note"] = meta.get("source_note")
|
|
result["created_at"] = meta.get("created_at")
|
|
result["filename"] = meta.get("filename")
|
|
|
|
# Get friendly name if actor_id provided
|
|
if actor_id:
|
|
from .naming_service import get_naming_service
|
|
naming = get_naming_service()
|
|
friendly = await naming.get_by_cid(actor_id, cid)
|
|
if friendly:
|
|
result["friendly_name"] = friendly["friendly_name"]
|
|
result["base_name"] = friendly["base_name"]
|
|
result["version_id"] = friendly["version_id"]
|
|
|
|
return result
|
|
|
|
async def check_access(self, cid: str, actor_id: str, username: str) -> bool:
|
|
"""Check if user has access to content."""
|
|
user_hashes = await self._get_user_cache_hashes(username, actor_id)
|
|
return cid in user_hashes
|
|
|
|
async def _get_user_cache_hashes(self, username: str, actor_id: Optional[str] = None) -> set:
|
|
"""Get all cache hashes owned by or associated with a user."""
|
|
match_values = [username]
|
|
if actor_id:
|
|
match_values.append(actor_id)
|
|
|
|
hashes = set()
|
|
|
|
# Query database for items owned by user
|
|
if actor_id:
|
|
try:
|
|
db_items = await self.db.get_user_items(actor_id)
|
|
for item in db_items:
|
|
hashes.add(item["cid"])
|
|
except Exception:
|
|
pass
|
|
|
|
# Legacy: Files uploaded by user (JSON metadata)
|
|
if self.cache_dir.exists():
|
|
for f in self.cache_dir.iterdir():
|
|
if f.name.endswith('.meta.json'):
|
|
try:
|
|
with open(f, 'r') as mf:
|
|
meta = json.load(mf)
|
|
if meta.get("uploader") in match_values:
|
|
hashes.add(f.name.replace('.meta.json', ''))
|
|
except Exception:
|
|
pass
|
|
|
|
# Files from user's runs (inputs and outputs)
|
|
runs = await self._list_user_runs(username, actor_id)
|
|
for run in runs:
|
|
inputs = run.get("inputs", [])
|
|
if isinstance(inputs, dict):
|
|
inputs = list(inputs.values())
|
|
hashes.update(inputs)
|
|
if run.get("output_cid"):
|
|
hashes.add(run["output_cid"])
|
|
|
|
return hashes
|
|
|
|
async def _list_user_runs(self, username: str, actor_id: Optional[str]) -> List[Dict]:
|
|
"""List runs for a user (helper for access check)."""
|
|
from ..dependencies import get_redis_client
|
|
import json
|
|
|
|
redis = get_redis_client()
|
|
runs = []
|
|
cursor = 0
|
|
prefix = "artdag:run:"
|
|
|
|
while True:
|
|
cursor, keys = redis.scan(cursor=cursor, match=f"{prefix}*", count=100)
|
|
for key in keys:
|
|
data = redis.get(key)
|
|
if data:
|
|
run = json.loads(data)
|
|
if run.get("actor_id") in (username, actor_id) or run.get("username") in (username, actor_id):
|
|
runs.append(run)
|
|
if cursor == 0:
|
|
break
|
|
|
|
return runs
|
|
|
|
async def get_raw_file(self, cid: str) -> Tuple[Optional[Path], Optional[str], Optional[str]]:
|
|
"""Get raw file path, media type, and filename for download."""
|
|
if not self.cache.has_content(cid):
|
|
return None, None, None
|
|
|
|
path = self.cache.get_by_cid(cid)
|
|
if not path or not path.exists():
|
|
return None, None, None
|
|
|
|
media_type = detect_media_type(path)
|
|
mime = get_mime_type(path)
|
|
|
|
# Determine extension
|
|
ext = "bin"
|
|
if media_type == "video":
|
|
try:
|
|
with open(path, "rb") as f:
|
|
header = f.read(12)
|
|
if header[:4] == b'\x1a\x45\xdf\xa3':
|
|
ext = "mkv"
|
|
else:
|
|
ext = "mp4"
|
|
except Exception:
|
|
ext = "mp4"
|
|
elif media_type == "image":
|
|
try:
|
|
with open(path, "rb") as f:
|
|
header = f.read(8)
|
|
if header[:8] == b'\x89PNG\r\n\x1a\n':
|
|
ext = "png"
|
|
else:
|
|
ext = "jpg"
|
|
except Exception:
|
|
ext = "jpg"
|
|
|
|
filename = f"{cid}.{ext}"
|
|
return path, mime, filename
|
|
|
|
async def get_as_mp4(self, cid: str) -> Tuple[Optional[Path], Optional[str]]:
|
|
"""Get content as MP4, transcoding if necessary. Returns (path, error)."""
|
|
if not self.cache.has_content(cid):
|
|
return None, f"Content {cid} not in cache"
|
|
|
|
path = self.cache.get_by_cid(cid)
|
|
if not path or not path.exists():
|
|
return None, f"Content {cid} not in cache"
|
|
|
|
# Check if video
|
|
media_type = detect_media_type(path)
|
|
if media_type != "video":
|
|
return None, "Content is not a video"
|
|
|
|
# Check for cached MP4
|
|
mp4_path = self.cache_dir / f"{cid}.mp4"
|
|
if mp4_path.exists():
|
|
return mp4_path, None
|
|
|
|
# Check if already MP4 format
|
|
try:
|
|
result = subprocess.run(
|
|
["ffprobe", "-v", "error", "-select_streams", "v:0",
|
|
"-show_entries", "format=format_name", "-of", "csv=p=0", str(path)],
|
|
capture_output=True, text=True, timeout=10
|
|
)
|
|
if "mp4" in result.stdout.lower() or "mov" in result.stdout.lower():
|
|
return path, None
|
|
except Exception:
|
|
pass
|
|
|
|
# Transcode to MP4
|
|
transcode_path = self.cache_dir / f"{cid}.transcoding.mp4"
|
|
try:
|
|
result = subprocess.run(
|
|
["ffmpeg", "-y", "-i", str(path),
|
|
"-c:v", "libx264", "-preset", "fast", "-crf", "23",
|
|
"-c:a", "aac", "-b:a", "128k",
|
|
"-movflags", "+faststart",
|
|
str(transcode_path)],
|
|
capture_output=True, text=True, timeout=600
|
|
)
|
|
if result.returncode != 0:
|
|
return None, f"Transcoding failed: {result.stderr[:200]}"
|
|
|
|
transcode_path.rename(mp4_path)
|
|
return mp4_path, None
|
|
|
|
except subprocess.TimeoutExpired:
|
|
if transcode_path.exists():
|
|
transcode_path.unlink()
|
|
return None, "Transcoding timed out"
|
|
except Exception as e:
|
|
if transcode_path.exists():
|
|
transcode_path.unlink()
|
|
return None, f"Transcoding failed: {e}"
|
|
|
|
async def get_metadata(self, cid: str, actor_id: str) -> Optional[Dict[str, Any]]:
|
|
"""Get content metadata."""
|
|
if not self.cache.has_content(cid):
|
|
return None
|
|
return await self.db.load_item_metadata(cid, actor_id)
|
|
|
|
async def update_metadata(
|
|
self,
|
|
cid: str,
|
|
actor_id: str,
|
|
title: Optional[str] = None,
|
|
description: Optional[str] = None,
|
|
tags: Optional[List[str]] = None,
|
|
custom: Optional[Dict[str, Any]] = None,
|
|
) -> Tuple[bool, Optional[str]]:
|
|
"""Update content metadata. Returns (success, error)."""
|
|
if not self.cache.has_content(cid):
|
|
return False, "Content not found"
|
|
|
|
# Build update dict
|
|
updates = {}
|
|
if title is not None:
|
|
updates["title"] = title
|
|
if description is not None:
|
|
updates["description"] = description
|
|
if tags is not None:
|
|
updates["tags"] = tags
|
|
if custom is not None:
|
|
updates["custom"] = custom
|
|
|
|
try:
|
|
await self.db.update_item_metadata(cid, actor_id, **updates)
|
|
return True, None
|
|
except Exception as e:
|
|
return False, str(e)
|
|
|
|
async def publish_to_l2(
|
|
self,
|
|
cid: str,
|
|
actor_id: str,
|
|
l2_server: str,
|
|
auth_token: str,
|
|
) -> Tuple[Optional[str], Optional[str]]:
|
|
"""Publish content to L2 and IPFS. Returns (ipfs_cid, error)."""
|
|
if not self.cache.has_content(cid):
|
|
return None, "Content not found"
|
|
|
|
# Get IPFS CID
|
|
cache_item = await self.db.get_cache_item(cid)
|
|
ipfs_cid = cache_item.get("ipfs_cid") if cache_item else None
|
|
|
|
# Get metadata for origin info
|
|
meta = await self.db.load_item_metadata(cid, actor_id)
|
|
origin = meta.get("origin") if meta else None
|
|
|
|
if not origin or "type" not in origin:
|
|
return None, "Origin must be set before publishing"
|
|
|
|
if not auth_token:
|
|
return None, "Authentication token required"
|
|
|
|
# Call L2 publish-cache endpoint
|
|
try:
|
|
async with httpx.AsyncClient(timeout=30) as client:
|
|
resp = await client.post(
|
|
f"{l2_server}/assets/publish-cache",
|
|
headers={"Authorization": f"Bearer {auth_token}"},
|
|
json={
|
|
"cid": cid,
|
|
"ipfs_cid": ipfs_cid,
|
|
"asset_name": meta.get("title") or cid[:16],
|
|
"asset_type": detect_media_type(self.cache.get_by_cid(cid)),
|
|
"origin": origin,
|
|
"description": meta.get("description"),
|
|
"tags": meta.get("tags", []),
|
|
}
|
|
)
|
|
resp.raise_for_status()
|
|
l2_result = resp.json()
|
|
except httpx.HTTPStatusError as e:
|
|
error_detail = str(e)
|
|
try:
|
|
error_detail = e.response.json().get("detail", str(e))
|
|
except Exception:
|
|
pass
|
|
return None, f"L2 publish failed: {error_detail}"
|
|
except Exception as e:
|
|
return None, f"L2 publish failed: {e}"
|
|
|
|
# Update local metadata with publish status
|
|
await self.db.save_l2_share(
|
|
cid=cid,
|
|
actor_id=actor_id,
|
|
l2_server=l2_server,
|
|
asset_name=meta.get("title") or cid[:16],
|
|
content_type=detect_media_type(self.cache.get_by_cid(cid))
|
|
)
|
|
await self.db.update_item_metadata(
|
|
cid=cid,
|
|
actor_id=actor_id,
|
|
pinned=True,
|
|
pin_reason="published"
|
|
)
|
|
|
|
return l2_result.get("ipfs_cid") or ipfs_cid, None
|
|
|
|
async def delete_content(self, cid: str, actor_id: str) -> Tuple[bool, Optional[str]]:
|
|
"""
|
|
Remove user's ownership link to cached content.
|
|
|
|
This removes the item_types entry linking the user to the content.
|
|
The cached file is only deleted if no other users own it.
|
|
Returns (success, error).
|
|
"""
|
|
import logging
|
|
logger = logging.getLogger(__name__)
|
|
|
|
# Check if pinned for this user
|
|
meta = await self.db.load_item_metadata(cid, actor_id)
|
|
if meta and meta.get("pinned"):
|
|
pin_reason = meta.get("pin_reason", "unknown")
|
|
return False, f"Cannot discard pinned item (reason: {pin_reason})"
|
|
|
|
# Get the item type to delete the right ownership entry
|
|
item_types = await self.db.get_item_types(cid, actor_id)
|
|
if not item_types:
|
|
return False, "You don't own this content"
|
|
|
|
# Remove user's ownership links (all types for this user)
|
|
for item in item_types:
|
|
item_type = item.get("type", "media")
|
|
await self.db.delete_item_type(cid, actor_id, item_type)
|
|
|
|
# Remove friendly name
|
|
await self.db.delete_friendly_name(actor_id, cid)
|
|
|
|
# Check if anyone else still owns this content
|
|
remaining_owners = await self.db.get_item_types(cid)
|
|
|
|
# Only delete the actual file if no one owns it anymore
|
|
if not remaining_owners:
|
|
# Check deletion rules via cache_manager
|
|
can_delete, reason = self.cache.can_delete(cid)
|
|
if can_delete:
|
|
# Delete via cache_manager
|
|
self.cache.delete_by_cid(cid)
|
|
|
|
# Clean up legacy metadata files
|
|
meta_path = self.cache_dir / f"{cid}.meta.json"
|
|
if meta_path.exists():
|
|
meta_path.unlink()
|
|
mp4_path = self.cache_dir / f"{cid}.mp4"
|
|
if mp4_path.exists():
|
|
mp4_path.unlink()
|
|
|
|
# Delete from database
|
|
await self.db.delete_cache_item(cid)
|
|
|
|
logger.info(f"Garbage collected content {cid[:16]}... (no remaining owners)")
|
|
else:
|
|
logger.info(f"Content {cid[:16]}... orphaned but cannot delete: {reason}")
|
|
|
|
logger.info(f"Removed content {cid[:16]}... ownership for {actor_id}")
|
|
return True, None
|
|
|
|
async def import_from_ipfs(self, ipfs_cid: str, actor_id: str) -> Tuple[Optional[str], Optional[str]]:
|
|
"""Import content from IPFS. Returns (cid, error)."""
|
|
try:
|
|
import ipfs_client
|
|
|
|
# Download from IPFS
|
|
legacy_dir = self.cache_dir / "legacy"
|
|
legacy_dir.mkdir(parents=True, exist_ok=True)
|
|
tmp_path = legacy_dir / f"import-{ipfs_cid[:16]}"
|
|
|
|
if not ipfs_client.get_file(ipfs_cid, str(tmp_path)):
|
|
return None, f"Could not fetch CID {ipfs_cid} from IPFS"
|
|
|
|
# Detect media type before storing
|
|
media_type = detect_media_type(tmp_path)
|
|
|
|
# Store in cache
|
|
cached, new_ipfs_cid = self.cache.put(tmp_path, node_type="import", move=True)
|
|
cid = new_ipfs_cid or cached.cid # Prefer IPFS CID
|
|
|
|
# Save to database with detected media type
|
|
await self.db.create_cache_item(cid, new_ipfs_cid)
|
|
await self.db.save_item_metadata(
|
|
cid=cid,
|
|
actor_id=actor_id,
|
|
item_type=media_type, # Use detected type for filtering
|
|
filename=f"ipfs-{ipfs_cid[:16]}"
|
|
)
|
|
|
|
return cid, None
|
|
except Exception as e:
|
|
return None, f"Import failed: {e}"
|
|
|
|
async def upload_content(
|
|
self,
|
|
content: bytes,
|
|
filename: str,
|
|
actor_id: str,
|
|
) -> Tuple[Optional[str], Optional[str], Optional[str]]:
|
|
"""Upload content to cache. Returns (cid, ipfs_cid, error)."""
|
|
import tempfile
|
|
|
|
try:
|
|
# Write to temp file
|
|
with tempfile.NamedTemporaryFile(delete=False) as tmp:
|
|
tmp.write(content)
|
|
tmp_path = Path(tmp.name)
|
|
|
|
# Detect media type (video/image/audio) before moving file
|
|
media_type = detect_media_type(tmp_path)
|
|
|
|
# Store in cache (also stores in IPFS)
|
|
cached, ipfs_cid = self.cache.put(tmp_path, node_type="upload", move=True)
|
|
cid = ipfs_cid or cached.cid # Prefer IPFS CID
|
|
|
|
# Save to database with media category type
|
|
# Using media_type ("video", "image", "audio") not mime_type ("video/mp4")
|
|
# so list_media filtering works correctly
|
|
await self.db.create_cache_item(cid, ipfs_cid)
|
|
await self.db.save_item_metadata(
|
|
cid=cid,
|
|
actor_id=actor_id,
|
|
item_type=media_type, # Store media category for filtering
|
|
filename=filename
|
|
)
|
|
|
|
return cid, ipfs_cid, None
|
|
except Exception as e:
|
|
return None, None, f"Upload failed: {e}"
|
|
|
|
async def list_media(
|
|
self,
|
|
actor_id: Optional[str] = None,
|
|
username: Optional[str] = None,
|
|
offset: int = 0,
|
|
limit: int = 24,
|
|
media_type: Optional[str] = None,
|
|
) -> List[Dict[str, Any]]:
|
|
"""List media items in cache."""
|
|
# Get items from database (uses item_types table)
|
|
items = await self.db.get_user_items(
|
|
actor_id=actor_id or username,
|
|
item_type=media_type, # "video", "image", "audio", or None for all
|
|
limit=limit,
|
|
offset=offset,
|
|
)
|
|
|
|
# Add friendly names to items
|
|
if actor_id:
|
|
from .naming_service import get_naming_service
|
|
naming = get_naming_service()
|
|
for item in items:
|
|
cid = item.get("cid")
|
|
if cid:
|
|
friendly = await naming.get_by_cid(actor_id, cid)
|
|
if friendly:
|
|
item["friendly_name"] = friendly["friendly_name"]
|
|
item["base_name"] = friendly["base_name"]
|
|
|
|
return items
|
|
|
|
# Legacy compatibility methods
|
|
def has_content(self, cid: str) -> bool:
|
|
"""Check if content exists in cache."""
|
|
return self.cache.has_content(cid)
|
|
|
|
def get_ipfs_cid(self, cid: str) -> Optional[str]:
|
|
"""Get IPFS CID for cached content."""
|
|
return self.cache.get_ipfs_cid(cid)
|