Files
celery/app/services/cache_service.py
giles 8591faf0fc Fix CacheService list_media and recipe inputs type
- Add list_media method to CacheService
- Change recipe run inputs from List to dict

Co-Authored-By: Claude Opus 4.5 <noreply@anthropic.com>
2026-01-11 13:28:27 +00:00

130 lines
3.9 KiB
Python

"""
Cache Service - business logic for cache and media management.
"""
from pathlib import Path
from typing import Optional, List, Dict, Any
from artdag_common.utils.media import detect_media_type, get_mime_type
class CacheService:
"""
Service for managing cached content.
Handles content retrieval, metadata, and media type detection.
"""
def __init__(self, cache_manager, database):
self.cache = cache_manager
self.db = database
async def get_item(self, content_hash: str) -> Optional[Dict[str, Any]]:
"""Get cached item by content hash."""
path = self.cache.get_by_content_hash(content_hash)
if not path or not path.exists():
return None
# Get metadata from database
meta = await self.db.get_cache_item(content_hash)
media_type = detect_media_type(path)
mime_type = get_mime_type(path)
size = path.stat().st_size
return {
"content_hash": content_hash,
"path": str(path),
"media_type": media_type,
"mime_type": mime_type,
"size": size,
"name": meta.get("name") if meta else None,
"description": meta.get("description") if meta else None,
"tags": meta.get("tags", []) if meta else [],
"ipfs_cid": meta.get("ipfs_cid") if meta else None,
}
async def get_path(self, content_hash: str) -> Optional[Path]:
"""Get the file path for cached content."""
return self.cache.get_by_content_hash(content_hash)
async def list_items(
self,
actor_id: str = None,
media_type: str = None,
page: int = 1,
limit: int = 20,
) -> Dict[str, Any]:
"""List cached items with filters and pagination."""
# Get items from database
items = await self.db.list_cache_items(
actor_id=actor_id,
media_type=media_type,
offset=(page - 1) * limit,
limit=limit,
)
total = await self.db.count_cache_items(actor_id=actor_id, media_type=media_type)
return {
"items": items,
"pagination": {
"page": page,
"limit": limit,
"total": total,
"has_more": page * limit < total,
}
}
async def update_metadata(
self,
content_hash: str,
name: str = None,
description: str = None,
tags: List[str] = None,
) -> bool:
"""Update item metadata."""
return await self.db.update_cache_metadata(
content_hash=content_hash,
name=name,
description=description,
tags=tags,
)
async def delete_item(self, content_hash: str) -> bool:
"""Delete a cached item."""
path = self.cache.get_by_content_hash(content_hash)
if path and path.exists():
path.unlink()
# Remove from database
await self.db.delete_cache_item(content_hash)
return True
def has_content(self, content_hash: str) -> bool:
"""Check if content exists in cache."""
return self.cache.has_content(content_hash)
def get_ipfs_cid(self, content_hash: str) -> Optional[str]:
"""Get IPFS CID for cached content."""
return self.cache.get_ipfs_cid(content_hash)
async def list_media(
self,
actor_id: str = None,
username: str = None,
offset: int = 0,
limit: int = 24,
media_type: str = None,
) -> List[Dict[str, Any]]:
"""List media items in cache."""
# Use list_items internally, converting offset to page
page = (offset // limit) + 1 if limit > 0 else 1
result = await self.list_items(
actor_id=actor_id or username,
media_type=media_type,
page=page,
limit=limit,
)
return result.get("items", [])