The background IPFS upload task was running on workers that don't have the file locally, causing uploads to fail silently. Now uploads go to IPFS synchronously so the IPFS CID is available immediately. Co-Authored-By: Claude Opus 4.5 <noreply@anthropic.com>
619 lines
22 KiB
Python
619 lines
22 KiB
Python
"""
|
|
Cache Service - business logic for cache and media management.
|
|
"""
|
|
|
|
import asyncio
|
|
import json
|
|
import logging
|
|
import os
|
|
import subprocess
|
|
from pathlib import Path
|
|
from typing import Optional, List, Dict, Any, Tuple, TYPE_CHECKING
|
|
|
|
import httpx
|
|
|
|
logger = logging.getLogger(__name__)
|
|
|
|
if TYPE_CHECKING:
|
|
from database import Database
|
|
from cache_manager import L1CacheManager
|
|
|
|
|
|
def detect_media_type(cache_path: Path) -> str:
|
|
"""Detect if file is image, video, or audio based on magic bytes."""
|
|
try:
|
|
with open(cache_path, "rb") as f:
|
|
header = f.read(32)
|
|
except Exception:
|
|
return "unknown"
|
|
|
|
# Video signatures
|
|
if header[:4] == b'\x1a\x45\xdf\xa3': # WebM/MKV
|
|
return "video"
|
|
if len(header) > 8 and header[4:8] == b'ftyp': # MP4/MOV
|
|
return "video"
|
|
if header[:4] == b'RIFF' and len(header) > 12 and header[8:12] == b'AVI ': # AVI
|
|
return "video"
|
|
|
|
# Image signatures
|
|
if header[:8] == b'\x89PNG\r\n\x1a\n': # PNG
|
|
return "image"
|
|
if header[:2] == b'\xff\xd8': # JPEG
|
|
return "image"
|
|
if header[:6] in (b'GIF87a', b'GIF89a'): # GIF
|
|
return "image"
|
|
if header[:4] == b'RIFF' and len(header) > 12 and header[8:12] == b'WEBP': # WebP
|
|
return "image"
|
|
|
|
# Audio signatures
|
|
if header[:4] == b'RIFF' and len(header) > 12 and header[8:12] == b'WAVE': # WAV
|
|
return "audio"
|
|
if header[:3] == b'ID3' or header[:2] == b'\xff\xfb': # MP3
|
|
return "audio"
|
|
if header[:4] == b'fLaC': # FLAC
|
|
return "audio"
|
|
|
|
return "unknown"
|
|
|
|
|
|
def get_mime_type(path: Path) -> str:
|
|
"""Get MIME type based on file magic bytes."""
|
|
media_type = detect_media_type(path)
|
|
if media_type == "video":
|
|
try:
|
|
with open(path, "rb") as f:
|
|
header = f.read(12)
|
|
if header[:4] == b'\x1a\x45\xdf\xa3':
|
|
return "video/x-matroska"
|
|
return "video/mp4"
|
|
except Exception:
|
|
return "video/mp4"
|
|
elif media_type == "image":
|
|
try:
|
|
with open(path, "rb") as f:
|
|
header = f.read(8)
|
|
if header[:8] == b'\x89PNG\r\n\x1a\n':
|
|
return "image/png"
|
|
if header[:2] == b'\xff\xd8':
|
|
return "image/jpeg"
|
|
if header[:6] in (b'GIF87a', b'GIF89a'):
|
|
return "image/gif"
|
|
return "image/jpeg"
|
|
except Exception:
|
|
return "image/jpeg"
|
|
elif media_type == "audio":
|
|
return "audio/mpeg"
|
|
return "application/octet-stream"
|
|
|
|
|
|
class CacheService:
|
|
"""
|
|
Service for managing cached content.
|
|
|
|
Handles content retrieval, metadata, and media type detection.
|
|
"""
|
|
|
|
def __init__(self, database: "Database", cache_manager: "L1CacheManager") -> None:
|
|
self.db = database
|
|
self.cache = cache_manager
|
|
self.cache_dir = Path(os.environ.get("CACHE_DIR", "/tmp/artdag-cache"))
|
|
|
|
async def get_cache_item(self, cid: str, actor_id: str = None) -> Optional[Dict[str, Any]]:
|
|
"""Get cached item with full metadata for display."""
|
|
# Get metadata from database first
|
|
meta = await self.db.load_item_metadata(cid, actor_id)
|
|
cache_item = await self.db.get_cache_item(cid)
|
|
|
|
# Check if content exists locally
|
|
path = self.cache.get_by_cid(cid) if self.cache.has_content(cid) else None
|
|
|
|
if path and path.exists():
|
|
# Local file exists - detect type from file
|
|
media_type = detect_media_type(path)
|
|
mime_type = get_mime_type(path)
|
|
size = path.stat().st_size
|
|
else:
|
|
# File not local - check database for type info
|
|
# Try to get type from item_types table
|
|
media_type = "unknown"
|
|
mime_type = "application/octet-stream"
|
|
size = 0
|
|
|
|
if actor_id:
|
|
try:
|
|
item_types = await self.db.get_item_types(cid, actor_id)
|
|
if item_types:
|
|
media_type = item_types[0].get("type", "unknown")
|
|
if media_type == "video":
|
|
mime_type = "video/mp4"
|
|
elif media_type == "image":
|
|
mime_type = "image/png"
|
|
elif media_type == "audio":
|
|
mime_type = "audio/mpeg"
|
|
except Exception:
|
|
pass
|
|
|
|
# If no local path but we have IPFS CID, content is available remotely
|
|
if not cache_item:
|
|
return None
|
|
|
|
result = {
|
|
"cid": cid,
|
|
"path": str(path) if path else None,
|
|
"media_type": media_type,
|
|
"mime_type": mime_type,
|
|
"size": size,
|
|
"ipfs_cid": cache_item.get("ipfs_cid") if cache_item else None,
|
|
"meta": meta,
|
|
"remote_only": path is None or not path.exists(),
|
|
}
|
|
|
|
# Unpack meta fields to top level for template convenience
|
|
if meta:
|
|
result["title"] = meta.get("title")
|
|
result["description"] = meta.get("description")
|
|
result["tags"] = meta.get("tags", [])
|
|
result["source_type"] = meta.get("source_type")
|
|
result["source_note"] = meta.get("source_note")
|
|
result["created_at"] = meta.get("created_at")
|
|
result["filename"] = meta.get("filename")
|
|
|
|
# Get friendly name if actor_id provided
|
|
if actor_id:
|
|
from .naming_service import get_naming_service
|
|
naming = get_naming_service()
|
|
friendly = await naming.get_by_cid(actor_id, cid)
|
|
if friendly:
|
|
result["friendly_name"] = friendly["friendly_name"]
|
|
result["base_name"] = friendly["base_name"]
|
|
result["version_id"] = friendly["version_id"]
|
|
|
|
return result
|
|
|
|
async def check_access(self, cid: str, actor_id: str, username: str) -> bool:
|
|
"""Check if user has access to content."""
|
|
user_hashes = await self._get_user_cache_hashes(username, actor_id)
|
|
return cid in user_hashes
|
|
|
|
async def _get_user_cache_hashes(self, username: str, actor_id: Optional[str] = None) -> set:
|
|
"""Get all cache hashes owned by or associated with a user."""
|
|
match_values = [username]
|
|
if actor_id:
|
|
match_values.append(actor_id)
|
|
|
|
hashes = set()
|
|
|
|
# Query database for items owned by user
|
|
if actor_id:
|
|
try:
|
|
db_items = await self.db.get_user_items(actor_id)
|
|
for item in db_items:
|
|
hashes.add(item["cid"])
|
|
except Exception:
|
|
pass
|
|
|
|
# Legacy: Files uploaded by user (JSON metadata)
|
|
if self.cache_dir.exists():
|
|
for f in self.cache_dir.iterdir():
|
|
if f.name.endswith('.meta.json'):
|
|
try:
|
|
with open(f, 'r') as mf:
|
|
meta = json.load(mf)
|
|
if meta.get("uploader") in match_values:
|
|
hashes.add(f.name.replace('.meta.json', ''))
|
|
except Exception:
|
|
pass
|
|
|
|
# Files from user's runs (inputs and outputs)
|
|
runs = await self._list_user_runs(username, actor_id)
|
|
for run in runs:
|
|
inputs = run.get("inputs", [])
|
|
if isinstance(inputs, dict):
|
|
inputs = list(inputs.values())
|
|
hashes.update(inputs)
|
|
if run.get("output_cid"):
|
|
hashes.add(run["output_cid"])
|
|
|
|
return hashes
|
|
|
|
async def _list_user_runs(self, username: str, actor_id: Optional[str]) -> List[Dict]:
|
|
"""List runs for a user (helper for access check)."""
|
|
from ..dependencies import get_redis_client
|
|
import json
|
|
|
|
redis = get_redis_client()
|
|
runs = []
|
|
cursor = 0
|
|
prefix = "artdag:run:"
|
|
|
|
while True:
|
|
cursor, keys = redis.scan(cursor=cursor, match=f"{prefix}*", count=100)
|
|
for key in keys:
|
|
data = redis.get(key)
|
|
if data:
|
|
run = json.loads(data)
|
|
if run.get("actor_id") in (username, actor_id) or run.get("username") in (username, actor_id):
|
|
runs.append(run)
|
|
if cursor == 0:
|
|
break
|
|
|
|
return runs
|
|
|
|
async def get_raw_file(self, cid: str) -> Tuple[Optional[Path], Optional[str], Optional[str]]:
|
|
"""Get raw file path, media type, and filename for download."""
|
|
if not self.cache.has_content(cid):
|
|
return None, None, None
|
|
|
|
path = self.cache.get_by_cid(cid)
|
|
if not path or not path.exists():
|
|
return None, None, None
|
|
|
|
media_type = detect_media_type(path)
|
|
mime = get_mime_type(path)
|
|
|
|
# Determine extension
|
|
ext = "bin"
|
|
if media_type == "video":
|
|
try:
|
|
with open(path, "rb") as f:
|
|
header = f.read(12)
|
|
if header[:4] == b'\x1a\x45\xdf\xa3':
|
|
ext = "mkv"
|
|
else:
|
|
ext = "mp4"
|
|
except Exception:
|
|
ext = "mp4"
|
|
elif media_type == "image":
|
|
try:
|
|
with open(path, "rb") as f:
|
|
header = f.read(8)
|
|
if header[:8] == b'\x89PNG\r\n\x1a\n':
|
|
ext = "png"
|
|
else:
|
|
ext = "jpg"
|
|
except Exception:
|
|
ext = "jpg"
|
|
|
|
filename = f"{cid}.{ext}"
|
|
return path, mime, filename
|
|
|
|
async def get_as_mp4(self, cid: str) -> Tuple[Optional[Path], Optional[str]]:
|
|
"""Get content as MP4, transcoding if necessary. Returns (path, error)."""
|
|
if not self.cache.has_content(cid):
|
|
return None, f"Content {cid} not in cache"
|
|
|
|
path = self.cache.get_by_cid(cid)
|
|
if not path or not path.exists():
|
|
return None, f"Content {cid} not in cache"
|
|
|
|
# Check if video
|
|
media_type = detect_media_type(path)
|
|
if media_type != "video":
|
|
return None, "Content is not a video"
|
|
|
|
# Check for cached MP4
|
|
mp4_path = self.cache_dir / f"{cid}.mp4"
|
|
if mp4_path.exists():
|
|
return mp4_path, None
|
|
|
|
# Check if already MP4 format
|
|
try:
|
|
result = subprocess.run(
|
|
["ffprobe", "-v", "error", "-select_streams", "v:0",
|
|
"-show_entries", "format=format_name", "-of", "csv=p=0", str(path)],
|
|
capture_output=True, text=True, timeout=10
|
|
)
|
|
if "mp4" in result.stdout.lower() or "mov" in result.stdout.lower():
|
|
return path, None
|
|
except Exception:
|
|
pass
|
|
|
|
# Transcode to MP4
|
|
transcode_path = self.cache_dir / f"{cid}.transcoding.mp4"
|
|
try:
|
|
result = subprocess.run(
|
|
["ffmpeg", "-y", "-i", str(path),
|
|
"-c:v", "libx264", "-preset", "fast", "-crf", "23",
|
|
"-c:a", "aac", "-b:a", "128k",
|
|
"-movflags", "+faststart",
|
|
str(transcode_path)],
|
|
capture_output=True, text=True, timeout=600
|
|
)
|
|
if result.returncode != 0:
|
|
return None, f"Transcoding failed: {result.stderr[:200]}"
|
|
|
|
transcode_path.rename(mp4_path)
|
|
return mp4_path, None
|
|
|
|
except subprocess.TimeoutExpired:
|
|
if transcode_path.exists():
|
|
transcode_path.unlink()
|
|
return None, "Transcoding timed out"
|
|
except Exception as e:
|
|
if transcode_path.exists():
|
|
transcode_path.unlink()
|
|
return None, f"Transcoding failed: {e}"
|
|
|
|
async def get_metadata(self, cid: str, actor_id: str) -> Optional[Dict[str, Any]]:
|
|
"""Get content metadata."""
|
|
if not self.cache.has_content(cid):
|
|
return None
|
|
return await self.db.load_item_metadata(cid, actor_id)
|
|
|
|
async def update_metadata(
|
|
self,
|
|
cid: str,
|
|
actor_id: str,
|
|
title: Optional[str] = None,
|
|
description: Optional[str] = None,
|
|
tags: Optional[List[str]] = None,
|
|
custom: Optional[Dict[str, Any]] = None,
|
|
) -> Tuple[bool, Optional[str]]:
|
|
"""Update content metadata. Returns (success, error)."""
|
|
if not self.cache.has_content(cid):
|
|
return False, "Content not found"
|
|
|
|
# Build update dict
|
|
updates = {}
|
|
if title is not None:
|
|
updates["title"] = title
|
|
if description is not None:
|
|
updates["description"] = description
|
|
if tags is not None:
|
|
updates["tags"] = tags
|
|
if custom is not None:
|
|
updates["custom"] = custom
|
|
|
|
try:
|
|
await self.db.update_item_metadata(cid, actor_id, **updates)
|
|
return True, None
|
|
except Exception as e:
|
|
return False, str(e)
|
|
|
|
async def publish_to_l2(
|
|
self,
|
|
cid: str,
|
|
actor_id: str,
|
|
l2_server: str,
|
|
auth_token: str,
|
|
) -> Tuple[Optional[str], Optional[str]]:
|
|
"""Publish content to L2 and IPFS. Returns (ipfs_cid, error)."""
|
|
if not self.cache.has_content(cid):
|
|
return None, "Content not found"
|
|
|
|
# Get IPFS CID
|
|
cache_item = await self.db.get_cache_item(cid)
|
|
ipfs_cid = cache_item.get("ipfs_cid") if cache_item else None
|
|
|
|
# Get metadata for origin info
|
|
meta = await self.db.load_item_metadata(cid, actor_id)
|
|
origin = meta.get("origin") if meta else None
|
|
|
|
if not origin or "type" not in origin:
|
|
return None, "Origin must be set before publishing"
|
|
|
|
if not auth_token:
|
|
return None, "Authentication token required"
|
|
|
|
# Call L2 publish-cache endpoint
|
|
try:
|
|
async with httpx.AsyncClient(timeout=30) as client:
|
|
resp = await client.post(
|
|
f"{l2_server}/assets/publish-cache",
|
|
headers={"Authorization": f"Bearer {auth_token}"},
|
|
json={
|
|
"cid": cid,
|
|
"ipfs_cid": ipfs_cid,
|
|
"asset_name": meta.get("title") or cid[:16],
|
|
"asset_type": detect_media_type(self.cache.get_by_cid(cid)),
|
|
"origin": origin,
|
|
"description": meta.get("description"),
|
|
"tags": meta.get("tags", []),
|
|
}
|
|
)
|
|
resp.raise_for_status()
|
|
l2_result = resp.json()
|
|
except httpx.HTTPStatusError as e:
|
|
error_detail = str(e)
|
|
try:
|
|
error_detail = e.response.json().get("detail", str(e))
|
|
except Exception:
|
|
pass
|
|
return None, f"L2 publish failed: {error_detail}"
|
|
except Exception as e:
|
|
return None, f"L2 publish failed: {e}"
|
|
|
|
# Update local metadata with publish status
|
|
await self.db.save_l2_share(
|
|
cid=cid,
|
|
actor_id=actor_id,
|
|
l2_server=l2_server,
|
|
asset_name=meta.get("title") or cid[:16],
|
|
content_type=detect_media_type(self.cache.get_by_cid(cid))
|
|
)
|
|
await self.db.update_item_metadata(
|
|
cid=cid,
|
|
actor_id=actor_id,
|
|
pinned=True,
|
|
pin_reason="published"
|
|
)
|
|
|
|
return l2_result.get("ipfs_cid") or ipfs_cid, None
|
|
|
|
async def delete_content(self, cid: str, actor_id: str) -> Tuple[bool, Optional[str]]:
|
|
"""
|
|
Remove user's ownership link to cached content.
|
|
|
|
This removes the item_types entry linking the user to the content.
|
|
The cached file is only deleted if no other users own it.
|
|
Returns (success, error).
|
|
"""
|
|
import logging
|
|
logger = logging.getLogger(__name__)
|
|
|
|
# Check if pinned for this user
|
|
meta = await self.db.load_item_metadata(cid, actor_id)
|
|
if meta and meta.get("pinned"):
|
|
pin_reason = meta.get("pin_reason", "unknown")
|
|
return False, f"Cannot discard pinned item (reason: {pin_reason})"
|
|
|
|
# Get the item type to delete the right ownership entry
|
|
item_types = await self.db.get_item_types(cid, actor_id)
|
|
if not item_types:
|
|
return False, "You don't own this content"
|
|
|
|
# Remove user's ownership links (all types for this user)
|
|
for item in item_types:
|
|
item_type = item.get("type", "media")
|
|
await self.db.delete_item_type(cid, actor_id, item_type)
|
|
|
|
# Remove friendly name
|
|
await self.db.delete_friendly_name(actor_id, cid)
|
|
|
|
# Check if anyone else still owns this content
|
|
remaining_owners = await self.db.get_item_types(cid)
|
|
|
|
# Only delete the actual file if no one owns it anymore
|
|
if not remaining_owners:
|
|
# Check deletion rules via cache_manager
|
|
can_delete, reason = self.cache.can_delete(cid)
|
|
if can_delete:
|
|
# Delete via cache_manager
|
|
self.cache.delete_by_cid(cid)
|
|
|
|
# Clean up legacy metadata files
|
|
meta_path = self.cache_dir / f"{cid}.meta.json"
|
|
if meta_path.exists():
|
|
meta_path.unlink()
|
|
mp4_path = self.cache_dir / f"{cid}.mp4"
|
|
if mp4_path.exists():
|
|
mp4_path.unlink()
|
|
|
|
# Delete from database
|
|
await self.db.delete_cache_item(cid)
|
|
|
|
logger.info(f"Garbage collected content {cid[:16]}... (no remaining owners)")
|
|
else:
|
|
logger.info(f"Content {cid[:16]}... orphaned but cannot delete: {reason}")
|
|
|
|
logger.info(f"Removed content {cid[:16]}... ownership for {actor_id}")
|
|
return True, None
|
|
|
|
async def import_from_ipfs(self, ipfs_cid: str, actor_id: str) -> Tuple[Optional[str], Optional[str]]:
|
|
"""Import content from IPFS. Returns (cid, error)."""
|
|
try:
|
|
import ipfs_client
|
|
|
|
# Download from IPFS
|
|
legacy_dir = self.cache_dir / "legacy"
|
|
legacy_dir.mkdir(parents=True, exist_ok=True)
|
|
tmp_path = legacy_dir / f"import-{ipfs_cid[:16]}"
|
|
|
|
if not ipfs_client.get_file(ipfs_cid, str(tmp_path)):
|
|
return None, f"Could not fetch CID {ipfs_cid} from IPFS"
|
|
|
|
# Detect media type before storing
|
|
media_type = detect_media_type(tmp_path)
|
|
|
|
# Store in cache
|
|
cached, new_ipfs_cid = self.cache.put(tmp_path, node_type="import", move=True)
|
|
cid = new_ipfs_cid or cached.cid # Prefer IPFS CID
|
|
|
|
# Save to database with detected media type
|
|
await self.db.create_cache_item(cid, new_ipfs_cid)
|
|
await self.db.save_item_metadata(
|
|
cid=cid,
|
|
actor_id=actor_id,
|
|
item_type=media_type, # Use detected type for filtering
|
|
filename=f"ipfs-{ipfs_cid[:16]}"
|
|
)
|
|
|
|
return cid, None
|
|
except Exception as e:
|
|
return None, f"Import failed: {e}"
|
|
|
|
async def upload_content(
|
|
self,
|
|
content: bytes,
|
|
filename: str,
|
|
actor_id: str,
|
|
) -> Tuple[Optional[str], Optional[str], Optional[str]]:
|
|
"""Upload content to cache. Returns (cid, ipfs_cid, error).
|
|
|
|
Files are stored locally first for fast response, then uploaded
|
|
to IPFS in the background.
|
|
"""
|
|
import tempfile
|
|
|
|
try:
|
|
# Write to temp file
|
|
with tempfile.NamedTemporaryFile(delete=False) as tmp:
|
|
tmp.write(content)
|
|
tmp_path = Path(tmp.name)
|
|
|
|
# Detect media type (video/image/audio) before moving file
|
|
media_type = detect_media_type(tmp_path)
|
|
|
|
# Store locally AND upload to IPFS synchronously
|
|
# This ensures the IPFS CID is available immediately for distributed access
|
|
cached, ipfs_cid = self.cache.put(tmp_path, node_type="upload", move=True, skip_ipfs=False)
|
|
cid = ipfs_cid or cached.cid # Prefer IPFS CID, fall back to local hash
|
|
|
|
# Save to database with media category type
|
|
await self.db.create_cache_item(cached.cid, ipfs_cid)
|
|
await self.db.save_item_metadata(
|
|
cid=cid,
|
|
actor_id=actor_id,
|
|
item_type=media_type,
|
|
filename=filename
|
|
)
|
|
|
|
if ipfs_cid:
|
|
logger.info(f"Uploaded to IPFS: {ipfs_cid[:16]}...")
|
|
else:
|
|
logger.warning(f"IPFS upload failed, using local hash: {cid[:16]}...")
|
|
|
|
return cid, ipfs_cid, None
|
|
except Exception as e:
|
|
return None, None, f"Upload failed: {e}"
|
|
|
|
async def list_media(
|
|
self,
|
|
actor_id: Optional[str] = None,
|
|
username: Optional[str] = None,
|
|
offset: int = 0,
|
|
limit: int = 24,
|
|
media_type: Optional[str] = None,
|
|
) -> List[Dict[str, Any]]:
|
|
"""List media items in cache."""
|
|
# Get items from database (uses item_types table)
|
|
items = await self.db.get_user_items(
|
|
actor_id=actor_id or username,
|
|
item_type=media_type, # "video", "image", "audio", or None for all
|
|
limit=limit,
|
|
offset=offset,
|
|
)
|
|
|
|
# Add friendly names to items
|
|
if actor_id:
|
|
from .naming_service import get_naming_service
|
|
naming = get_naming_service()
|
|
for item in items:
|
|
cid = item.get("cid")
|
|
if cid:
|
|
friendly = await naming.get_by_cid(actor_id, cid)
|
|
if friendly:
|
|
item["friendly_name"] = friendly["friendly_name"]
|
|
item["base_name"] = friendly["base_name"]
|
|
|
|
return items
|
|
|
|
# Legacy compatibility methods
|
|
def has_content(self, cid: str) -> bool:
|
|
"""Check if content exists in cache."""
|
|
return self.cache.has_content(cid)
|
|
|
|
def get_ipfs_cid(self, cid: str) -> Optional[str]:
|
|
"""Get IPFS CID for cached content."""
|
|
return self.cache.get_ipfs_cid(cid)
|