Files
mono/artdag/l1/cache_manager.py
giles 1a74d811f7 Incorporate art-dag-mono repo into artdag/ subfolder
Merges full history from art-dag/mono.git into the monorepo
under the artdag/ directory. Contains: core (DAG engine),
l1 (Celery rendering server), l2 (ActivityPub registry),
common (shared templates/middleware), client (CLI), test (e2e).

Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>

git-subtree-dir: artdag
git-subtree-mainline: 1a179de547
git-subtree-split: 4c2e716558
2026-02-27 09:07:23 +00:00

873 lines
31 KiB
Python

# art-celery/cache_manager.py
"""
Cache management for Art DAG L1 server.
Integrates artdag's Cache, ActivityStore, and ActivityManager to provide:
- Content-addressed caching with both node_id and cid
- Activity tracking for runs (input/output/intermediate relationships)
- Deletion rules enforcement (shared items protected)
- L2 ActivityPub integration for "shared" status checks
- IPFS as durable backing store (local cache as hot storage)
- Redis-backed indexes for multi-worker consistency
"""
import hashlib
import json
import logging
import os
import shutil
from dataclasses import dataclass
from datetime import datetime, timezone
from pathlib import Path
from typing import Callable, Dict, List, Optional, Set, TYPE_CHECKING
import requests
if TYPE_CHECKING:
import redis
from artdag import Cache, CacheEntry, DAG, Node, NodeType
from artdag.activities import Activity, ActivityStore, ActivityManager, make_is_shared_fn
import ipfs_client
logger = logging.getLogger(__name__)
def file_hash(path: Path, algorithm: str = "sha3_256") -> str:
"""Compute local content hash (fallback when IPFS unavailable)."""
hasher = hashlib.new(algorithm)
actual_path = path.resolve() if path.is_symlink() else path
with open(actual_path, "rb") as f:
for chunk in iter(lambda: f.read(65536), b""):
hasher.update(chunk)
return hasher.hexdigest()
@dataclass
class CachedFile:
"""
A cached file with both identifiers.
Provides a unified view combining:
- node_id: computation identity (for DAG caching)
- cid: file content identity (for external references)
"""
node_id: str
cid: str
path: Path
size_bytes: int
node_type: str
created_at: float
@classmethod
def from_cache_entry(cls, entry: CacheEntry) -> "CachedFile":
return cls(
node_id=entry.node_id,
cid=entry.cid,
path=entry.output_path,
size_bytes=entry.size_bytes,
node_type=entry.node_type,
created_at=entry.created_at,
)
class L2SharedChecker:
"""
Checks if content is shared (published) via L2 ActivityPub server.
Caches results to avoid repeated API calls.
"""
def __init__(self, l2_server: str, cache_ttl: int = 300):
self.l2_server = l2_server
self.cache_ttl = cache_ttl
self._cache: Dict[str, tuple[bool, float]] = {}
def is_shared(self, cid: str) -> bool:
"""Check if cid has been published to L2."""
import time
now = time.time()
# Check cache
if cid in self._cache:
is_shared, cached_at = self._cache[cid]
if now - cached_at < self.cache_ttl:
logger.debug(f"L2 check (cached): {cid[:16]}... = {is_shared}")
return is_shared
# Query L2
try:
url = f"{self.l2_server}/assets/by-hash/{cid}"
logger.info(f"L2 check: GET {url}")
resp = requests.get(url, timeout=5)
logger.info(f"L2 check response: {resp.status_code}")
is_shared = resp.status_code == 200
except Exception as e:
logger.warning(f"Failed to check L2 for {cid}: {e}")
# On error, assume IS shared (safer - prevents accidental deletion)
is_shared = True
self._cache[cid] = (is_shared, now)
return is_shared
def invalidate(self, cid: str):
"""Invalidate cache for a cid (call after publishing)."""
self._cache.pop(cid, None)
def mark_shared(self, cid: str):
"""Mark as shared without querying (call after successful publish)."""
import time
self._cache[cid] = (True, time.time())
class L1CacheManager:
"""
Unified cache manager for Art DAG L1 server.
Combines:
- artdag Cache for file storage
- ActivityStore for run tracking
- ActivityManager for deletion rules
- L2 integration for shared status
Provides both node_id and cid based access.
"""
def __init__(
self,
cache_dir: Path | str,
l2_server: str = "http://localhost:8200",
redis_client: Optional["redis.Redis"] = None,
):
self.cache_dir = Path(cache_dir)
self.cache_dir.mkdir(parents=True, exist_ok=True)
# Redis for shared state between workers
self._redis = redis_client
self._redis_content_key = "artdag:content_index"
self._redis_ipfs_key = "artdag:ipfs_index"
# artdag components
self.cache = Cache(self.cache_dir / "nodes")
self.activity_store = ActivityStore(self.cache_dir / "activities")
# L2 shared checker
self.l2_checker = L2SharedChecker(l2_server)
# Activity manager with L2-based is_shared
self.activity_manager = ActivityManager(
cache=self.cache,
activity_store=self.activity_store,
is_shared_fn=self._is_shared_by_node_id,
)
# Legacy files directory (for files uploaded directly by cid)
self.legacy_dir = self.cache_dir / "legacy"
self.legacy_dir.mkdir(parents=True, exist_ok=True)
# ============ Redis Index (no JSON files) ============
#
# Content index maps: CID (content hash or IPFS CID) -> node_id (code hash)
# IPFS index maps: node_id -> IPFS CID
#
# Database is the ONLY source of truth for cache_id -> ipfs_cid mapping.
# No fallbacks - failures raise exceptions.
def _run_async(self, coro):
"""Run async coroutine from sync context.
Always creates a fresh event loop to avoid issues with Celery's
prefork workers where loops may be closed by previous tasks.
"""
import asyncio
# Check if we're already in an async context
try:
asyncio.get_running_loop()
# We're in an async context - use a thread with its own loop
import threading
result = [None]
error = [None]
def run_in_thread():
try:
new_loop = asyncio.new_event_loop()
asyncio.set_event_loop(new_loop)
try:
result[0] = new_loop.run_until_complete(coro)
finally:
new_loop.close()
except Exception as e:
error[0] = e
thread = threading.Thread(target=run_in_thread)
thread.start()
thread.join(timeout=30)
if error[0]:
raise error[0]
return result[0]
except RuntimeError:
# No running loop - create a fresh one (don't reuse potentially closed loops)
loop = asyncio.new_event_loop()
asyncio.set_event_loop(loop)
try:
return loop.run_until_complete(coro)
finally:
loop.close()
def _set_content_index(self, cache_id: str, ipfs_cid: str):
"""Set content index entry in database (cache_id -> ipfs_cid)."""
import database
async def save_to_db():
import asyncpg
conn = await asyncpg.connect(database.DATABASE_URL)
try:
await conn.execute(
"""
INSERT INTO cache_items (cid, ipfs_cid)
VALUES ($1, $2)
ON CONFLICT (cid) DO UPDATE SET ipfs_cid = $2
""",
cache_id, ipfs_cid
)
finally:
await conn.close()
self._run_async(save_to_db())
logger.info(f"Indexed in database: {cache_id[:16]}... -> {ipfs_cid}")
def _get_content_index(self, cache_id: str) -> Optional[str]:
"""Get content index entry (cache_id -> ipfs_cid) from database."""
import database
async def get_from_db():
import asyncpg
conn = await asyncpg.connect(database.DATABASE_URL)
try:
row = await conn.fetchrow(
"SELECT ipfs_cid FROM cache_items WHERE cid = $1",
cache_id
)
return {"ipfs_cid": row["ipfs_cid"]} if row else None
finally:
await conn.close()
result = self._run_async(get_from_db())
if result and result.get("ipfs_cid"):
return result["ipfs_cid"]
return None
def _del_content_index(self, cache_id: str):
"""Delete content index entry from database."""
import database
async def delete_from_db():
import asyncpg
conn = await asyncpg.connect(database.DATABASE_URL)
try:
await conn.execute("DELETE FROM cache_items WHERE cid = $1", cache_id)
finally:
await conn.close()
self._run_async(delete_from_db())
def _set_ipfs_index(self, cid: str, ipfs_cid: str):
"""Set IPFS index entry in Redis."""
if self._redis:
try:
self._redis.hset(self._redis_ipfs_key, cid, ipfs_cid)
except Exception as e:
logger.warning(f"Failed to set IPFS index in Redis: {e}")
def _get_ipfs_cid_from_index(self, cid: str) -> Optional[str]:
"""Get IPFS CID from Redis."""
if self._redis:
try:
val = self._redis.hget(self._redis_ipfs_key, cid)
if val:
return val.decode() if isinstance(val, bytes) else val
except Exception as e:
logger.warning(f"Failed to get IPFS CID from Redis: {e}")
return None
def get_ipfs_cid(self, cid: str) -> Optional[str]:
"""Get IPFS CID for a content hash."""
return self._get_ipfs_cid_from_index(cid)
def _is_shared_by_node_id(self, cid: str) -> bool:
"""Check if a cid is shared via L2."""
return self.l2_checker.is_shared(cid)
def _load_meta(self, cid: str) -> dict:
"""Load metadata for a cached file."""
meta_path = self.cache_dir / f"{cid}.meta.json"
if meta_path.exists():
with open(meta_path) as f:
return json.load(f)
return {}
def is_pinned(self, cid: str) -> tuple[bool, str]:
"""
Check if a cid is pinned (non-deletable).
Returns:
(is_pinned, reason) tuple
"""
meta = self._load_meta(cid)
if meta.get("pinned"):
return True, meta.get("pin_reason", "published")
return False, ""
def _save_meta(self, cid: str, **updates) -> dict:
"""Save/update metadata for a cached file."""
meta = self._load_meta(cid)
meta.update(updates)
meta_path = self.cache_dir / f"{cid}.meta.json"
with open(meta_path, "w") as f:
json.dump(meta, f, indent=2)
return meta
def pin(self, cid: str, reason: str = "published") -> None:
"""Mark an item as pinned (non-deletable)."""
self._save_meta(cid, pinned=True, pin_reason=reason)
# ============ File Storage ============
def put(
self,
source_path: Path,
node_type: str = "upload",
node_id: str = None,
cache_id: str = None,
execution_time: float = 0.0,
move: bool = False,
skip_ipfs: bool = False,
) -> tuple[CachedFile, Optional[str]]:
"""
Store a file in the cache and optionally upload to IPFS.
Files are stored by IPFS CID when skip_ipfs=False (default), or by
local content hash when skip_ipfs=True. The cache_id parameter creates
an index from cache_id -> CID for code-addressed lookups.
Args:
source_path: Path to file to cache
node_type: Type of node (e.g., "upload", "source", "effect")
node_id: DEPRECATED - ignored, always uses CID
cache_id: Optional code-addressed cache ID to index
execution_time: How long the operation took
move: If True, move instead of copy
skip_ipfs: If True, skip IPFS upload and use local hash (faster for large files)
Returns:
Tuple of (CachedFile with both node_id and cid, CID or None if skip_ipfs)
"""
if skip_ipfs:
# Use local content hash instead of IPFS CID (much faster)
cid = file_hash(source_path)
ipfs_cid = None
logger.info(f"put: Using local hash (skip_ipfs=True): {cid[:16]}...")
else:
# Upload to IPFS first to get the CID (primary identifier)
cid = ipfs_client.add_file(source_path)
if not cid:
raise RuntimeError(f"IPFS upload failed for {source_path}. IPFS is required.")
ipfs_cid = cid
# Always store by IPFS CID (node_id parameter is deprecated)
node_id = cid
# Check if already cached (by node_id)
existing = self.cache.get_entry(node_id)
if existing and existing.output_path.exists():
return CachedFile.from_cache_entry(existing), ipfs_cid
# Compute local hash BEFORE moving the file (for dual-indexing)
# Only needed if we uploaded to IPFS (to map local hash -> IPFS CID)
local_hash = None
if not skip_ipfs and self._is_ipfs_cid(cid):
local_hash = file_hash(source_path)
# Store in local cache
logger.info(f"put: Storing in cache with node_id={node_id[:16]}...")
self.cache.put(
node_id=node_id,
source_path=source_path,
node_type=node_type,
execution_time=execution_time,
move=move,
)
entry = self.cache.get_entry(node_id)
logger.info(f"put: After cache.put, get_entry(node_id={node_id[:16]}...) returned entry={entry is not None}, path={entry.output_path if entry else None}")
# Verify we can retrieve it
verify_path = self.cache.get(node_id)
logger.info(f"put: Verify cache.get(node_id={node_id[:16]}...) = {verify_path}")
# Index by cache_id if provided (code-addressed cache lookup)
# This allows get_by_cid(cache_id) to find files stored by IPFS CID
if cache_id and cache_id != cid:
self._set_content_index(cache_id, cid)
logger.info(f"put: Indexed cache_id {cache_id[:16]}... -> IPFS {cid}")
# Also index by local hash for content-based lookup
if local_hash and local_hash != cid:
self._set_content_index(local_hash, cid)
logger.debug(f"Indexed local hash {local_hash[:16]}... -> IPFS {cid}")
logger.info(f"Cached: {cid[:16]}..." + (" (local only)" if skip_ipfs else " (IPFS)"))
return CachedFile.from_cache_entry(entry), ipfs_cid if not skip_ipfs else None
def get_by_node_id(self, node_id: str) -> Optional[Path]:
"""Get cached file path by node_id."""
return self.cache.get(node_id)
def _is_ipfs_cid(self, identifier: str) -> bool:
"""Check if identifier looks like an IPFS CID."""
# CIDv0 starts with "Qm", CIDv1 starts with "bafy" or other multibase prefixes
return identifier.startswith("Qm") or identifier.startswith("bafy") or identifier.startswith("baf")
def get_by_cid(self, cid: str) -> Optional[Path]:
"""Get cached file path by cid or IPFS CID. Falls back to IPFS if not in local cache."""
logger.info(f"get_by_cid: Looking for cid={cid[:16]}...")
# Check index first (Redis then local)
node_id = self._get_content_index(cid)
logger.info(f"get_by_cid: Index lookup returned node_id={node_id[:16] if node_id else None}...")
if node_id:
path = self.cache.get(node_id)
logger.info(f"get_by_cid: cache.get(node_id={node_id[:16]}...) returned path={path}")
if path and path.exists():
logger.info(f"get_by_cid: Found via index: {path}")
return path
# artdag Cache doesn't know about entry - check filesystem directly
# Files are stored at {cache_dir}/nodes/{node_id}/output.*
nodes_dir = self.cache_dir / "nodes" / node_id
if nodes_dir.exists():
for f in nodes_dir.iterdir():
if f.name.startswith("output."):
logger.info(f"get_by_cid: Found on filesystem: {f}")
return f
# For uploads, node_id == cid, so try direct lookup
# This works even if cache index hasn't been reloaded
path = self.cache.get(cid)
logger.info(f"get_by_cid: Direct cache.get({cid[:16]}...) returned: {path}")
if path and path.exists():
self._set_content_index(cid, cid)
return path
# Check filesystem directly for cid as node_id
nodes_dir = self.cache_dir / "nodes" / cid
if nodes_dir.exists():
for f in nodes_dir.iterdir():
if f.name.startswith("output."):
logger.info(f"get_by_cid: Found on filesystem (direct): {f}")
self._set_content_index(cid, cid)
return f
# Scan cache entries (fallback for new structure)
entry = self.cache.find_by_cid(cid)
logger.info(f"get_by_cid: find_by_cid({cid[:16]}...) returned entry={entry}")
if entry and entry.output_path.exists():
logger.info(f"get_by_cid: Found via scan: {entry.output_path}")
self._set_content_index(cid, entry.node_id)
return entry.output_path
# Check legacy location (files stored directly as CACHE_DIR/{cid})
legacy_path = self.cache_dir / cid
logger.info(f"get_by_cid: Checking legacy path: {legacy_path} exists={legacy_path.exists()}")
if legacy_path.exists() and legacy_path.is_file():
logger.info(f"get_by_cid: Found at legacy path: {legacy_path}")
return legacy_path
# Fetch from IPFS - this is the source of truth for all content
if self._is_ipfs_cid(cid):
logger.info(f"get_by_cid: Fetching from IPFS: {cid[:16]}...")
recovery_path = self.legacy_dir / cid
recovery_path.parent.mkdir(parents=True, exist_ok=True)
if ipfs_client.get_file(cid, str(recovery_path)):
logger.info(f"get_by_cid: Fetched from IPFS: {recovery_path}")
self._set_content_index(cid, cid)
return recovery_path
else:
logger.warning(f"get_by_cid: IPFS fetch failed for {cid[:16]}...")
# Also try with a mapped IPFS CID if different from cid
ipfs_cid = self._get_ipfs_cid_from_index(cid)
if ipfs_cid and ipfs_cid != cid:
logger.info(f"get_by_cid: Fetching from IPFS via mapping: {ipfs_cid[:16]}...")
recovery_path = self.legacy_dir / cid
recovery_path.parent.mkdir(parents=True, exist_ok=True)
if ipfs_client.get_file(ipfs_cid, str(recovery_path)):
logger.info(f"get_by_cid: Fetched from IPFS: {recovery_path}")
return recovery_path
return None
def has_content(self, cid: str) -> bool:
"""Check if content exists in cache."""
return self.get_by_cid(cid) is not None
def get_entry_by_cid(self, cid: str) -> Optional[CacheEntry]:
"""Get cache entry by cid."""
node_id = self._get_content_index(cid)
if node_id:
return self.cache.get_entry(node_id)
return self.cache.find_by_cid(cid)
def list_all(self) -> List[CachedFile]:
"""List all cached files."""
files = []
seen_hashes = set()
# New cache structure entries
for entry in self.cache.list_entries():
files.append(CachedFile.from_cache_entry(entry))
if entry.cid:
seen_hashes.add(entry.cid)
# Legacy files stored directly in cache_dir (old structure)
# These are files named by cid directly in CACHE_DIR
for f in self.cache_dir.iterdir():
# Skip directories and special files
if not f.is_file():
continue
# Skip metadata/auxiliary files
if f.suffix in ('.json', '.mp4'):
continue
# Skip if name doesn't look like a hash (64 hex chars)
if len(f.name) != 64 or not all(c in '0123456789abcdef' for c in f.name):
continue
# Skip if already seen via new cache
if f.name in seen_hashes:
continue
files.append(CachedFile(
node_id=f.name,
cid=f.name,
path=f,
size_bytes=f.stat().st_size,
node_type="legacy",
created_at=f.stat().st_mtime,
))
seen_hashes.add(f.name)
return files
def list_by_type(self, node_type: str) -> List[str]:
"""
List CIDs of all cached files of a specific type.
Args:
node_type: Type to filter by (e.g., "recipe", "upload", "effect")
Returns:
List of CIDs (IPFS CID if available, otherwise node_id)
"""
cids = []
for entry in self.cache.list_entries():
if entry.node_type == node_type:
# Return node_id which is the IPFS CID for uploaded content
cids.append(entry.node_id)
return cids
# ============ Activity Tracking ============
def record_activity(self, dag: DAG, run_id: str = None) -> Activity:
"""
Record a DAG execution as an activity.
Args:
dag: The executed DAG
run_id: Optional run ID to use as activity_id
Returns:
The created Activity
"""
activity = Activity.from_dag(dag, activity_id=run_id)
self.activity_store.add(activity)
return activity
def record_simple_activity(
self,
input_hashes: List[str],
output_cid: str,
run_id: str = None,
) -> Activity:
"""
Record a simple (non-DAG) execution as an activity.
For legacy single-effect runs that don't use full DAG execution.
Uses cid as node_id.
"""
activity = Activity(
activity_id=run_id or str(hash((tuple(input_hashes), output_cid))),
input_ids=sorted(input_hashes),
output_id=output_cid,
intermediate_ids=[],
created_at=datetime.now(timezone.utc).timestamp(),
status="completed",
)
self.activity_store.add(activity)
return activity
def get_activity(self, activity_id: str) -> Optional[Activity]:
"""Get activity by ID."""
return self.activity_store.get(activity_id)
def list_activities(self) -> List[Activity]:
"""List all activities."""
return self.activity_store.list()
def find_activities_by_inputs(self, input_hashes: List[str]) -> List[Activity]:
"""Find activities with matching inputs (for UI grouping)."""
return self.activity_store.find_by_input_ids(input_hashes)
# ============ Deletion Rules ============
def can_delete(self, cid: str) -> tuple[bool, str]:
"""
Check if a cached item can be deleted.
Returns:
(can_delete, reason) tuple
"""
# Check if pinned (published or input to published)
pinned, reason = self.is_pinned(cid)
if pinned:
return False, f"Item is pinned ({reason})"
# Find node_id for this content
node_id = self._get_content_index(cid) or cid
# Check if it's an input or output of any activity
for activity in self.activity_store.list():
if node_id in activity.input_ids:
return False, f"Item is input to activity {activity.activity_id}"
if node_id == activity.output_id:
return False, f"Item is output of activity {activity.activity_id}"
return True, "OK"
def can_discard_activity(self, activity_id: str) -> tuple[bool, str]:
"""
Check if an activity can be discarded.
Returns:
(can_discard, reason) tuple
"""
activity = self.activity_store.get(activity_id)
if not activity:
return False, "Activity not found"
# Check if any item is pinned
for node_id in activity.all_node_ids:
entry = self.cache.get_entry(node_id)
if entry:
pinned, reason = self.is_pinned(entry.cid)
if pinned:
return False, f"Item {node_id} is pinned ({reason})"
return True, "OK"
def delete_by_cid(self, cid: str) -> tuple[bool, str]:
"""
Delete a cached item by cid.
Enforces deletion rules.
Returns:
(success, message) tuple
"""
can_delete, reason = self.can_delete(cid)
if not can_delete:
return False, reason
# Find and delete
node_id = self._get_content_index(cid)
if node_id:
self.cache.remove(node_id)
self._del_content_index(cid)
return True, "Deleted"
# Try legacy
legacy_path = self.legacy_dir / cid
if legacy_path.exists():
legacy_path.unlink()
return True, "Deleted (legacy)"
return False, "Not found"
def discard_activity(self, activity_id: str) -> tuple[bool, str]:
"""
Discard an activity and clean up its cache entries.
Enforces deletion rules.
Returns:
(success, message) tuple
"""
can_discard, reason = self.can_discard_activity(activity_id)
if not can_discard:
return False, reason
success = self.activity_manager.discard_activity(activity_id)
if success:
return True, "Activity discarded"
return False, "Failed to discard"
def _is_used_by_other_activities(self, node_id: str, exclude_activity_id: str) -> bool:
"""Check if a node is used by any activity other than the excluded one."""
for other_activity in self.activity_store.list():
if other_activity.activity_id == exclude_activity_id:
continue
# Check if used as input, output, or intermediate
if node_id in other_activity.input_ids:
return True
if node_id == other_activity.output_id:
return True
if node_id in other_activity.intermediate_ids:
return True
return False
def discard_activity_outputs_only(self, activity_id: str) -> tuple[bool, str]:
"""
Discard an activity, deleting only outputs and intermediates.
Inputs (cache items, configs) are preserved.
Outputs/intermediates used by other activities are preserved.
Returns:
(success, message) tuple
"""
activity = self.activity_store.get(activity_id)
if not activity:
return False, "Activity not found"
# Check if output is pinned
if activity.output_id:
entry = self.cache.get_entry(activity.output_id)
if entry:
pinned, reason = self.is_pinned(entry.cid)
if pinned:
return False, f"Output is pinned ({reason})"
deleted_outputs = 0
preserved_shared = 0
# Delete output (only if not used by other activities)
if activity.output_id:
if self._is_used_by_other_activities(activity.output_id, activity_id):
preserved_shared += 1
else:
entry = self.cache.get_entry(activity.output_id)
if entry:
# Remove from cache
self.cache.remove(activity.output_id)
# Remove from content index (Redis + local)
self._del_content_index(entry.cid)
# Delete from legacy dir if exists
legacy_path = self.legacy_dir / entry.cid
if legacy_path.exists():
legacy_path.unlink()
deleted_outputs += 1
# Delete intermediates (only if not used by other activities)
for node_id in activity.intermediate_ids:
if self._is_used_by_other_activities(node_id, activity_id):
preserved_shared += 1
continue
entry = self.cache.get_entry(node_id)
if entry:
self.cache.remove(node_id)
self._del_content_index(entry.cid)
legacy_path = self.legacy_dir / entry.cid
if legacy_path.exists():
legacy_path.unlink()
deleted_outputs += 1
# Remove activity record (inputs remain in cache)
self.activity_store.remove(activity_id)
msg = f"Activity discarded (deleted {deleted_outputs} outputs"
if preserved_shared > 0:
msg += f", preserved {preserved_shared} shared items"
msg += ")"
return True, msg
def cleanup_intermediates(self) -> int:
"""Delete all intermediate cache entries (reconstructible)."""
return self.activity_manager.cleanup_intermediates()
def get_deletable_items(self) -> List[CachedFile]:
"""Get all items that can be deleted."""
deletable = []
for entry in self.activity_manager.get_deletable_entries():
deletable.append(CachedFile.from_cache_entry(entry))
return deletable
# ============ L2 Integration ============
def mark_published(self, cid: str):
"""Mark a cid as published to L2."""
self.l2_checker.mark_shared(cid)
def invalidate_shared_cache(self, cid: str):
"""Invalidate shared status cache (call if item might be unpublished)."""
self.l2_checker.invalidate(cid)
# ============ Stats ============
def get_stats(self) -> dict:
"""Get cache statistics."""
stats = self.cache.get_stats()
return {
"total_entries": stats.total_entries,
"total_size_bytes": stats.total_size_bytes,
"hits": stats.hits,
"misses": stats.misses,
"hit_rate": stats.hit_rate,
"activities": len(self.activity_store),
}
# Singleton instance (initialized on first import with env vars)
_manager: Optional[L1CacheManager] = None
def get_cache_manager() -> L1CacheManager:
"""Get the singleton cache manager instance."""
global _manager
if _manager is None:
import redis
from urllib.parse import urlparse
cache_dir = Path(os.environ.get("CACHE_DIR", str(Path.home() / ".artdag" / "cache")))
l2_server = os.environ.get("L2_SERVER", "http://localhost:8200")
# Initialize Redis client for shared cache index
redis_url = os.environ.get('REDIS_URL', 'redis://localhost:6379/5')
parsed = urlparse(redis_url)
redis_client = redis.Redis(
host=parsed.hostname or 'localhost',
port=parsed.port or 6379,
db=int(parsed.path.lstrip('/') or 0),
socket_timeout=5,
socket_connect_timeout=5
)
_manager = L1CacheManager(cache_dir=cache_dir, l2_server=l2_server, redis_client=redis_client)
return _manager
def reset_cache_manager():
"""Reset the singleton (for testing)."""
global _manager
_manager = None