Files
celery/app/services/cache_service.py
giles e8501de466 Fix list_media to use get_user_items instead of list_cache_items
list_cache_items doesn't accept actor_id parameter.
Use get_user_items which properly filters by actor_id and item_type.

Co-Authored-By: Claude Opus 4.5 <noreply@anthropic.com>
2026-01-11 14:10:49 +00:00

509 lines
18 KiB
Python

"""
Cache Service - business logic for cache and media management.
"""
import asyncio
import json
import os
import subprocess
from pathlib import Path
from typing import Optional, List, Dict, Any, Tuple
import httpx
def detect_media_type(cache_path: Path) -> str:
"""Detect if file is image, video, or audio based on magic bytes."""
try:
with open(cache_path, "rb") as f:
header = f.read(32)
except Exception:
return "unknown"
# Video signatures
if header[:4] == b'\x1a\x45\xdf\xa3': # WebM/MKV
return "video"
if len(header) > 8 and header[4:8] == b'ftyp': # MP4/MOV
return "video"
if header[:4] == b'RIFF' and len(header) > 12 and header[8:12] == b'AVI ': # AVI
return "video"
# Image signatures
if header[:8] == b'\x89PNG\r\n\x1a\n': # PNG
return "image"
if header[:2] == b'\xff\xd8': # JPEG
return "image"
if header[:6] in (b'GIF87a', b'GIF89a'): # GIF
return "image"
if header[:4] == b'RIFF' and len(header) > 12 and header[8:12] == b'WEBP': # WebP
return "image"
# Audio signatures
if header[:4] == b'RIFF' and len(header) > 12 and header[8:12] == b'WAVE': # WAV
return "audio"
if header[:3] == b'ID3' or header[:2] == b'\xff\xfb': # MP3
return "audio"
if header[:4] == b'fLaC': # FLAC
return "audio"
return "unknown"
def get_mime_type(path: Path) -> str:
"""Get MIME type based on file magic bytes."""
media_type = detect_media_type(path)
if media_type == "video":
try:
with open(path, "rb") as f:
header = f.read(12)
if header[:4] == b'\x1a\x45\xdf\xa3':
return "video/x-matroska"
return "video/mp4"
except Exception:
return "video/mp4"
elif media_type == "image":
try:
with open(path, "rb") as f:
header = f.read(8)
if header[:8] == b'\x89PNG\r\n\x1a\n':
return "image/png"
if header[:2] == b'\xff\xd8':
return "image/jpeg"
if header[:6] in (b'GIF87a', b'GIF89a'):
return "image/gif"
return "image/jpeg"
except Exception:
return "image/jpeg"
elif media_type == "audio":
return "audio/mpeg"
return "application/octet-stream"
class CacheService:
"""
Service for managing cached content.
Handles content retrieval, metadata, and media type detection.
"""
def __init__(self, database, cache_manager):
self.db = database
self.cache = cache_manager
self.cache_dir = Path(os.environ.get("CACHE_DIR", "/tmp/artdag-cache"))
async def get_cache_item(self, content_hash: str) -> Optional[Dict[str, Any]]:
"""Get cached item with full metadata for display."""
# Check if content exists
if not self.cache.has_content(content_hash):
return None
path = self.cache.get_content_path(content_hash)
if not path or not path.exists():
return None
# Get metadata from database
meta = await self.db.load_item_metadata(content_hash, None)
cache_item = await self.db.get_cache_item(content_hash)
media_type = detect_media_type(path)
mime_type = get_mime_type(path)
size = path.stat().st_size
return {
"content_hash": content_hash,
"path": str(path),
"media_type": media_type,
"mime_type": mime_type,
"size": size,
"ipfs_cid": cache_item.get("ipfs_cid") if cache_item else None,
"meta": meta,
}
async def check_access(self, content_hash: str, actor_id: str, username: str) -> bool:
"""Check if user has access to content."""
user_hashes = await self._get_user_cache_hashes(username, actor_id)
return content_hash in user_hashes
async def _get_user_cache_hashes(self, username: str, actor_id: Optional[str] = None) -> set:
"""Get all cache hashes owned by or associated with a user."""
match_values = [username]
if actor_id:
match_values.append(actor_id)
hashes = set()
# Query database for items owned by user
if actor_id:
try:
db_items = await self.db.get_user_items(actor_id)
for item in db_items:
hashes.add(item["content_hash"])
except Exception:
pass
# Legacy: Files uploaded by user (JSON metadata)
if self.cache_dir.exists():
for f in self.cache_dir.iterdir():
if f.name.endswith('.meta.json'):
try:
with open(f, 'r') as mf:
meta = json.load(mf)
if meta.get("uploader") in match_values:
hashes.add(f.name.replace('.meta.json', ''))
except Exception:
pass
# Files from user's runs (inputs and outputs)
runs = await self._list_user_runs(username, actor_id)
for run in runs:
inputs = run.get("inputs", [])
if isinstance(inputs, dict):
inputs = list(inputs.values())
hashes.update(inputs)
if run.get("output_hash"):
hashes.add(run["output_hash"])
return hashes
async def _list_user_runs(self, username: str, actor_id: Optional[str]) -> List[Dict]:
"""List runs for a user (helper for access check)."""
from ..dependencies import get_redis_client
import json
redis = get_redis_client()
runs = []
cursor = 0
prefix = "artdag:run:"
while True:
cursor, keys = redis.scan(cursor=cursor, match=f"{prefix}*", count=100)
for key in keys:
data = redis.get(key)
if data:
run = json.loads(data)
if run.get("actor_id") in (username, actor_id) or run.get("username") in (username, actor_id):
runs.append(run)
if cursor == 0:
break
return runs
async def get_raw_file(self, content_hash: str) -> Tuple[Optional[Path], Optional[str], Optional[str]]:
"""Get raw file path, media type, and filename for download."""
if not self.cache.has_content(content_hash):
return None, None, None
path = self.cache.get_content_path(content_hash)
if not path or not path.exists():
return None, None, None
media_type = detect_media_type(path)
mime = get_mime_type(path)
# Determine extension
ext = "bin"
if media_type == "video":
try:
with open(path, "rb") as f:
header = f.read(12)
if header[:4] == b'\x1a\x45\xdf\xa3':
ext = "mkv"
else:
ext = "mp4"
except Exception:
ext = "mp4"
elif media_type == "image":
try:
with open(path, "rb") as f:
header = f.read(8)
if header[:8] == b'\x89PNG\r\n\x1a\n':
ext = "png"
else:
ext = "jpg"
except Exception:
ext = "jpg"
filename = f"{content_hash}.{ext}"
return path, mime, filename
async def get_as_mp4(self, content_hash: str) -> Tuple[Optional[Path], Optional[str]]:
"""Get content as MP4, transcoding if necessary. Returns (path, error)."""
if not self.cache.has_content(content_hash):
return None, f"Content {content_hash} not in cache"
path = self.cache.get_content_path(content_hash)
if not path or not path.exists():
return None, f"Content {content_hash} not in cache"
# Check if video
media_type = detect_media_type(path)
if media_type != "video":
return None, "Content is not a video"
# Check for cached MP4
mp4_path = self.cache_dir / f"{content_hash}.mp4"
if mp4_path.exists():
return mp4_path, None
# Check if already MP4 format
try:
result = subprocess.run(
["ffprobe", "-v", "error", "-select_streams", "v:0",
"-show_entries", "format=format_name", "-of", "csv=p=0", str(path)],
capture_output=True, text=True, timeout=10
)
if "mp4" in result.stdout.lower() or "mov" in result.stdout.lower():
return path, None
except Exception:
pass
# Transcode to MP4
transcode_path = self.cache_dir / f"{content_hash}.transcoding.mp4"
try:
result = subprocess.run(
["ffmpeg", "-y", "-i", str(path),
"-c:v", "libx264", "-preset", "fast", "-crf", "23",
"-c:a", "aac", "-b:a", "128k",
"-movflags", "+faststart",
str(transcode_path)],
capture_output=True, text=True, timeout=600
)
if result.returncode != 0:
return None, f"Transcoding failed: {result.stderr[:200]}"
transcode_path.rename(mp4_path)
return mp4_path, None
except subprocess.TimeoutExpired:
if transcode_path.exists():
transcode_path.unlink()
return None, "Transcoding timed out"
except Exception as e:
if transcode_path.exists():
transcode_path.unlink()
return None, f"Transcoding failed: {e}"
async def get_metadata(self, content_hash: str, actor_id: str) -> Optional[Dict[str, Any]]:
"""Get content metadata."""
if not self.cache.has_content(content_hash):
return None
return await self.db.load_item_metadata(content_hash, actor_id)
async def update_metadata(
self,
content_hash: str,
actor_id: str,
title: str = None,
description: str = None,
tags: List[str] = None,
custom: Dict[str, Any] = None,
) -> Tuple[bool, Optional[str]]:
"""Update content metadata. Returns (success, error)."""
if not self.cache.has_content(content_hash):
return False, "Content not found"
# Build update dict
updates = {}
if title is not None:
updates["title"] = title
if description is not None:
updates["description"] = description
if tags is not None:
updates["tags"] = tags
if custom is not None:
updates["custom"] = custom
try:
await self.db.update_item_metadata(content_hash, actor_id, **updates)
return True, None
except Exception as e:
return False, str(e)
async def publish_to_l2(
self,
content_hash: str,
actor_id: str,
l2_server: str,
auth_token: str,
) -> Tuple[Optional[str], Optional[str]]:
"""Publish content to L2 and IPFS. Returns (ipfs_cid, error)."""
if not self.cache.has_content(content_hash):
return None, "Content not found"
# Get IPFS CID
cache_item = await self.db.get_cache_item(content_hash)
ipfs_cid = cache_item.get("ipfs_cid") if cache_item else None
# Get metadata for origin info
meta = await self.db.load_item_metadata(content_hash, actor_id)
origin = meta.get("origin") if meta else None
if not origin or "type" not in origin:
return None, "Origin must be set before publishing"
if not auth_token:
return None, "Authentication token required"
# Call L2 publish-cache endpoint
try:
async with httpx.AsyncClient(timeout=30) as client:
resp = await client.post(
f"{l2_server}/assets/publish-cache",
headers={"Authorization": f"Bearer {auth_token}"},
json={
"content_hash": content_hash,
"ipfs_cid": ipfs_cid,
"asset_name": meta.get("title") or content_hash[:16],
"asset_type": detect_media_type(self.cache.get_content_path(content_hash)),
"origin": origin,
"description": meta.get("description"),
"tags": meta.get("tags", []),
}
)
resp.raise_for_status()
l2_result = resp.json()
except httpx.HTTPStatusError as e:
error_detail = str(e)
try:
error_detail = e.response.json().get("detail", str(e))
except Exception:
pass
return None, f"L2 publish failed: {error_detail}"
except Exception as e:
return None, f"L2 publish failed: {e}"
# Update local metadata with publish status
await self.db.save_l2_share(
content_hash=content_hash,
actor_id=actor_id,
l2_server=l2_server,
asset_name=meta.get("title") or content_hash[:16],
content_type=detect_media_type(self.cache.get_content_path(content_hash))
)
await self.db.update_item_metadata(
content_hash=content_hash,
actor_id=actor_id,
pinned=True,
pin_reason="published"
)
return l2_result.get("ipfs_cid") or ipfs_cid, None
async def delete_content(self, content_hash: str, actor_id: str) -> Tuple[bool, Optional[str]]:
"""Delete content from cache. Returns (success, error)."""
if not self.cache.has_content(content_hash):
return False, "Content not found"
# Check if pinned
meta = await self.db.load_item_metadata(content_hash, actor_id)
if meta and meta.get("pinned"):
pin_reason = meta.get("pin_reason", "unknown")
return False, f"Cannot discard pinned item (reason: {pin_reason})"
# Check deletion rules via cache_manager
can_delete, reason = self.cache.can_delete(content_hash)
if not can_delete:
return False, f"Cannot discard: {reason}"
# Delete via cache_manager
success, msg = self.cache.delete_by_content_hash(content_hash)
# Clean up legacy metadata files
meta_path = self.cache_dir / f"{content_hash}.meta.json"
if meta_path.exists():
meta_path.unlink()
mp4_path = self.cache_dir / f"{content_hash}.mp4"
if mp4_path.exists():
mp4_path.unlink()
return True, None
async def import_from_ipfs(self, ipfs_cid: str, actor_id: str) -> Tuple[Optional[str], Optional[str]]:
"""Import content from IPFS. Returns (content_hash, error)."""
try:
import ipfs_client
# Download from IPFS
legacy_dir = self.cache_dir / "legacy"
legacy_dir.mkdir(parents=True, exist_ok=True)
tmp_path = legacy_dir / f"import-{ipfs_cid[:16]}"
if not ipfs_client.get_file(ipfs_cid, str(tmp_path)):
return None, f"Could not fetch CID {ipfs_cid} from IPFS"
# Store in cache
cached, _ = self.cache.put(tmp_path, node_type="import", move=True)
content_hash = cached.content_hash
# Save to database
await self.db.create_cache_item(content_hash, ipfs_cid)
await self.db.save_item_metadata(
content_hash=content_hash,
actor_id=actor_id,
item_type="media",
filename=f"ipfs-{ipfs_cid[:16]}"
)
return content_hash, None
except Exception as e:
return None, f"Import failed: {e}"
async def upload_content(
self,
content: bytes,
filename: str,
actor_id: str,
) -> Tuple[Optional[str], Optional[str]]:
"""Upload content to cache. Returns (content_hash, error)."""
import tempfile
try:
# Write to temp file
with tempfile.NamedTemporaryFile(delete=False) as tmp:
tmp.write(content)
tmp_path = Path(tmp.name)
# Store in cache
cached, ipfs_cid = self.cache.put(tmp_path, node_type="upload", move=True)
content_hash = cached.content_hash
# Save to database
await self.db.create_cache_item(content_hash, ipfs_cid)
await self.db.save_item_metadata(
content_hash=content_hash,
actor_id=actor_id,
item_type="media",
filename=filename
)
return content_hash, None
except Exception as e:
return None, f"Upload failed: {e}"
async def list_media(
self,
actor_id: str = None,
username: str = None,
offset: int = 0,
limit: int = 24,
media_type: str = None,
) -> List[Dict[str, Any]]:
"""List media items in cache."""
# Get items from database (uses item_types table)
items = await self.db.get_user_items(
actor_id=actor_id or username,
item_type=media_type, # "video", "image", "audio", or None for all
limit=limit,
offset=offset,
)
return items
# Legacy compatibility methods
def has_content(self, content_hash: str) -> bool:
"""Check if content exists in cache."""
return self.cache.has_content(content_hash)
def get_ipfs_cid(self, content_hash: str) -> Optional[str]:
"""Get IPFS CID for cached content."""
return self.cache.get_ipfs_cid(content_hash)