Files
celery/app/services/cache_service.py
giles adc876dbd6 Add modular app structure for L1 server refactoring
Phase 2 of the full modernization:
- App factory pattern with create_app()
- Settings via dataclass with env vars
- Dependency injection container
- Router stubs for auth, storage, api, recipes, cache, runs
- Service layer stubs for run, recipe, cache
- Repository layer placeholder

Routes are stubs that import from legacy server.py during migration.
Next: Migrate each router fully with templates.

🤖 Generated with [Claude Code](https://claude.com/claude-code)

Co-Authored-By: Claude Opus 4.5 <noreply@anthropic.com>
2026-01-11 07:08:08 +00:00

111 lines
3.3 KiB
Python

"""
Cache Service - business logic for cache and media management.
"""
from pathlib import Path
from typing import Optional, List, Dict, Any
from artdag_common.utils.media import detect_media_type, get_mime_type
class CacheService:
"""
Service for managing cached content.
Handles content retrieval, metadata, and media type detection.
"""
def __init__(self, cache_manager, database):
self.cache = cache_manager
self.db = database
async def get_item(self, content_hash: str) -> Optional[Dict[str, Any]]:
"""Get cached item by content hash."""
path = self.cache.get_by_content_hash(content_hash)
if not path or not path.exists():
return None
# Get metadata from database
meta = await self.db.get_cache_item(content_hash)
media_type = detect_media_type(path)
mime_type = get_mime_type(path)
size = path.stat().st_size
return {
"content_hash": content_hash,
"path": str(path),
"media_type": media_type,
"mime_type": mime_type,
"size": size,
"name": meta.get("name") if meta else None,
"description": meta.get("description") if meta else None,
"tags": meta.get("tags", []) if meta else [],
"ipfs_cid": meta.get("ipfs_cid") if meta else None,
}
async def get_path(self, content_hash: str) -> Optional[Path]:
"""Get the file path for cached content."""
return self.cache.get_by_content_hash(content_hash)
async def list_items(
self,
actor_id: str = None,
media_type: str = None,
page: int = 1,
limit: int = 20,
) -> Dict[str, Any]:
"""List cached items with filters and pagination."""
# Get items from database
items = await self.db.list_cache_items(
actor_id=actor_id,
media_type=media_type,
offset=(page - 1) * limit,
limit=limit,
)
total = await self.db.count_cache_items(actor_id=actor_id, media_type=media_type)
return {
"items": items,
"pagination": {
"page": page,
"limit": limit,
"total": total,
"has_more": page * limit < total,
}
}
async def update_metadata(
self,
content_hash: str,
name: str = None,
description: str = None,
tags: List[str] = None,
) -> bool:
"""Update item metadata."""
return await self.db.update_cache_metadata(
content_hash=content_hash,
name=name,
description=description,
tags=tags,
)
async def delete_item(self, content_hash: str) -> bool:
"""Delete a cached item."""
path = self.cache.get_by_content_hash(content_hash)
if path and path.exists():
path.unlink()
# Remove from database
await self.db.delete_cache_item(content_hash)
return True
def has_content(self, content_hash: str) -> bool:
"""Check if content exists in cache."""
return self.cache.has_content(content_hash)
def get_ipfs_cid(self, content_hash: str) -> Optional[str]:
"""Get IPFS CID for cached content."""
return self.cache.get_ipfs_cid(content_hash)