Use Redis for cache indexes - enables multi-worker scaling

The cache_manager now uses Redis hashes for the content_index and
ipfs_cids mappings. This allows multiple uvicorn workers to share
state, so files added by one worker are immediately visible to all
others.

- Added redis_client parameter to L1CacheManager
- Index lookups check Redis first, then fall back to in-memory
- Index updates go to both Redis and JSON file (backup)
- Migrates existing JSON indexes to Redis on first load
- Re-enabled workers=4 in uvicorn

Co-Authored-By: Claude Opus 4.5 <noreply@anthropic.com>
This commit is contained in:
gilesb
2026-01-09 04:19:00 +00:00
parent 6fc3562d22
commit a0a4c08b9a
2 changed files with 142 additions and 43 deletions

View File

@@ -8,6 +8,7 @@ Integrates artdag's Cache, ActivityStore, and ActivityManager to provide:
- Deletion rules enforcement (shared items protected)
- L2 ActivityPub integration for "shared" status checks
- IPFS as durable backing store (local cache as hot storage)
- Redis-backed indexes for multi-worker consistency
"""
import hashlib
@@ -18,10 +19,13 @@ import shutil
from dataclasses import dataclass
from datetime import datetime, timezone
from pathlib import Path
from typing import Callable, Dict, List, Optional, Set
from typing import Callable, Dict, List, Optional, Set, TYPE_CHECKING
import requests
if TYPE_CHECKING:
import redis
from artdag import Cache, CacheEntry, DAG, Node, NodeType
from artdag.activities import Activity, ActivityStore, ActivityManager, make_is_shared_fn
@@ -134,10 +138,16 @@ class L1CacheManager:
self,
cache_dir: Path | str,
l2_server: str = "http://localhost:8200",
redis_client: Optional["redis.Redis"] = None,
):
self.cache_dir = Path(cache_dir)
self.cache_dir.mkdir(parents=True, exist_ok=True)
# Redis for shared state between workers
self._redis = redis_client
self._redis_content_key = "artdag:content_index"
self._redis_ipfs_key = "artdag:ipfs_index"
# artdag components
self.cache = Cache(self.cache_dir / "nodes")
self.activity_store = ActivityStore(self.cache_dir / "activities")
@@ -153,7 +163,7 @@ class L1CacheManager:
)
# Content hash index: content_hash -> node_id
# This enables lookup by content_hash for API compatibility
# Uses Redis if available, falls back to in-memory dict
self._content_index: Dict[str, str] = {}
self._load_content_index()
@@ -169,7 +179,23 @@ class L1CacheManager:
return self.cache_dir / "content_index.json"
def _load_content_index(self):
"""Load content_hash -> node_id index."""
"""Load content_hash -> node_id index from Redis or JSON file."""
# If Redis available and has data, use it
if self._redis:
try:
redis_data = self._redis.hgetall(self._redis_content_key)
if redis_data:
self._content_index = {
k.decode() if isinstance(k, bytes) else k:
v.decode() if isinstance(v, bytes) else v
for k, v in redis_data.items()
}
logger.info(f"Loaded {len(self._content_index)} content index entries from Redis")
return
except Exception as e:
logger.warning(f"Failed to load content index from Redis: {e}")
# Fall back to JSON file
if self._index_path().exists():
try:
with open(self._index_path()) as f:
@@ -183,16 +209,73 @@ class L1CacheManager:
if entry.content_hash:
self._content_index[entry.content_hash] = entry.node_id
# Migrate to Redis if available
if self._redis and self._content_index:
try:
self._redis.hset(self._redis_content_key, mapping=self._content_index)
logger.info(f"Migrated {len(self._content_index)} content index entries to Redis")
except Exception as e:
logger.warning(f"Failed to migrate content index to Redis: {e}")
def _save_content_index(self):
"""Save content_hash -> node_id index."""
"""Save content_hash -> node_id index to Redis and JSON file."""
# Always save to JSON as backup
with open(self._index_path(), "w") as f:
json.dump(self._content_index, f, indent=2)
def _set_content_index(self, content_hash: str, node_id: str):
"""Set a single content index entry (Redis + in-memory)."""
self._content_index[content_hash] = node_id
if self._redis:
try:
self._redis.hset(self._redis_content_key, content_hash, node_id)
except Exception as e:
logger.warning(f"Failed to set content index in Redis: {e}")
self._save_content_index()
def _get_content_index(self, content_hash: str) -> Optional[str]:
"""Get a content index entry (Redis-first, then in-memory)."""
if self._redis:
try:
val = self._redis.hget(self._redis_content_key, content_hash)
if val:
return val.decode() if isinstance(val, bytes) else val
except Exception as e:
logger.warning(f"Failed to get content index from Redis: {e}")
return self._content_index.get(content_hash)
def _del_content_index(self, content_hash: str):
"""Delete a content index entry."""
if content_hash in self._content_index:
del self._content_index[content_hash]
if self._redis:
try:
self._redis.hdel(self._redis_content_key, content_hash)
except Exception as e:
logger.warning(f"Failed to delete content index from Redis: {e}")
self._save_content_index()
def _ipfs_index_path(self) -> Path:
return self.cache_dir / "ipfs_index.json"
def _load_ipfs_index(self):
"""Load content_hash -> ipfs_cid index."""
"""Load content_hash -> ipfs_cid index from Redis or JSON file."""
# If Redis available and has data, use it
if self._redis:
try:
redis_data = self._redis.hgetall(self._redis_ipfs_key)
if redis_data:
self._ipfs_cids = {
k.decode() if isinstance(k, bytes) else k:
v.decode() if isinstance(v, bytes) else v
for k, v in redis_data.items()
}
logger.info(f"Loaded {len(self._ipfs_cids)} IPFS index entries from Redis")
return
except Exception as e:
logger.warning(f"Failed to load IPFS index from Redis: {e}")
# Fall back to JSON file
if self._ipfs_index_path().exists():
try:
with open(self._ipfs_index_path()) as f:
@@ -201,14 +284,43 @@ class L1CacheManager:
logger.warning(f"Failed to load IPFS index: {e}")
self._ipfs_cids = {}
# Migrate to Redis if available
if self._redis and self._ipfs_cids:
try:
self._redis.hset(self._redis_ipfs_key, mapping=self._ipfs_cids)
logger.info(f"Migrated {len(self._ipfs_cids)} IPFS index entries to Redis")
except Exception as e:
logger.warning(f"Failed to migrate IPFS index to Redis: {e}")
def _save_ipfs_index(self):
"""Save content_hash -> ipfs_cid index."""
"""Save content_hash -> ipfs_cid index to JSON file (backup)."""
with open(self._ipfs_index_path(), "w") as f:
json.dump(self._ipfs_cids, f, indent=2)
def _set_ipfs_index(self, content_hash: str, ipfs_cid: str):
"""Set a single IPFS index entry (Redis + in-memory)."""
self._ipfs_cids[content_hash] = ipfs_cid
if self._redis:
try:
self._redis.hset(self._redis_ipfs_key, content_hash, ipfs_cid)
except Exception as e:
logger.warning(f"Failed to set IPFS index in Redis: {e}")
self._save_ipfs_index()
def _get_ipfs_cid_from_index(self, content_hash: str) -> Optional[str]:
"""Get IPFS CID from index (Redis-first, then in-memory)."""
if self._redis:
try:
val = self._redis.hget(self._redis_ipfs_key, content_hash)
if val:
return val.decode() if isinstance(val, bytes) else val
except Exception as e:
logger.warning(f"Failed to get IPFS CID from Redis: {e}")
return self._ipfs_cids.get(content_hash)
def get_ipfs_cid(self, content_hash: str) -> Optional[str]:
"""Get IPFS CID for a content hash."""
return self._ipfs_cids.get(content_hash)
return self._get_ipfs_cid_from_index(content_hash)
def _is_shared_by_node_id(self, content_hash: str) -> bool:
"""Check if a content_hash is shared via L2."""
@@ -282,12 +394,11 @@ class L1CacheManager:
existing = self.cache.get_entry(node_id)
if existing and existing.output_path.exists():
# Already cached - still try to get IPFS CID if we don't have it
ipfs_cid = self._ipfs_cids.get(content_hash)
ipfs_cid = self._get_ipfs_cid_from_index(content_hash)
if not ipfs_cid:
ipfs_cid = ipfs_client.add_file(existing.output_path)
if ipfs_cid:
self._ipfs_cids[content_hash] = ipfs_cid
self._save_ipfs_index()
self._set_ipfs_index(content_hash, ipfs_cid)
return CachedFile.from_cache_entry(existing), ipfs_cid
# Store in local cache
@@ -301,15 +412,13 @@ class L1CacheManager:
entry = self.cache.get_entry(node_id)
# Update content index
self._content_index[entry.content_hash] = node_id
self._save_content_index()
# Update content index (Redis + local)
self._set_content_index(entry.content_hash, node_id)
# Upload to IPFS (async in background would be better, but sync for now)
ipfs_cid = ipfs_client.add_file(entry.output_path)
if ipfs_cid:
self._ipfs_cids[entry.content_hash] = ipfs_cid
self._save_ipfs_index()
self._set_ipfs_index(entry.content_hash, ipfs_cid)
logger.info(f"Uploaded to IPFS: {entry.content_hash[:16]}... -> {ipfs_cid}")
return CachedFile.from_cache_entry(entry), ipfs_cid
@@ -321,8 +430,8 @@ class L1CacheManager:
def get_by_content_hash(self, content_hash: str) -> Optional[Path]:
"""Get cached file path by content_hash. Falls back to IPFS if not in local cache."""
# Check index first (new cache structure)
node_id = self._content_index.get(content_hash)
# Check index first (Redis then local)
node_id = self._get_content_index(content_hash)
if node_id:
path = self.cache.get(node_id)
if path and path.exists():
@@ -334,16 +443,14 @@ class L1CacheManager:
path = self.cache.get(content_hash)
logger.debug(f" cache.get({content_hash[:16]}...) returned: {path}")
if path and path.exists():
self._content_index[content_hash] = content_hash
self._save_content_index()
self._set_content_index(content_hash, content_hash)
return path
# Scan cache entries (fallback for new structure)
entry = self.cache.find_by_content_hash(content_hash)
if entry and entry.output_path.exists():
logger.debug(f" Found via scan: {entry.output_path}")
self._content_index[content_hash] = entry.node_id
self._save_content_index()
self._set_content_index(content_hash, entry.node_id)
return entry.output_path
# Check legacy location (files stored directly as CACHE_DIR/{content_hash})
@@ -352,7 +459,7 @@ class L1CacheManager:
return legacy_path
# Try to recover from IPFS if we have a CID
ipfs_cid = self._ipfs_cids.get(content_hash)
ipfs_cid = self._get_ipfs_cid_from_index(content_hash)
if ipfs_cid:
logger.info(f"Recovering from IPFS: {content_hash[:16]}... ({ipfs_cid})")
recovery_path = self.legacy_dir / content_hash
@@ -368,7 +475,7 @@ class L1CacheManager:
def get_entry_by_content_hash(self, content_hash: str) -> Optional[CacheEntry]:
"""Get cache entry by content_hash."""
node_id = self._content_index.get(content_hash)
node_id = self._get_content_index(content_hash)
if node_id:
return self.cache.get_entry(node_id)
return self.cache.find_by_content_hash(content_hash)
@@ -479,7 +586,7 @@ class L1CacheManager:
return False, f"Item is pinned ({reason})"
# Find node_id for this content
node_id = self._content_index.get(content_hash, content_hash)
node_id = self._get_content_index(content_hash) or content_hash
# Check if it's an input or output of any activity
for activity in self.activity_store.list():
@@ -525,11 +632,10 @@ class L1CacheManager:
return False, reason
# Find and delete
node_id = self._content_index.get(content_hash)
node_id = self._get_content_index(content_hash)
if node_id:
self.cache.remove(node_id)
del self._content_index[content_hash]
self._save_content_index()
self._del_content_index(content_hash)
return True, "Deleted"
# Try legacy
@@ -585,10 +691,8 @@ class L1CacheManager:
if entry:
# Remove from cache
self.cache.remove(activity.output_id)
# Remove from content index
if entry.content_hash in self._content_index:
del self._content_index[entry.content_hash]
self._save_content_index()
# Remove from content index (Redis + local)
self._del_content_index(entry.content_hash)
# Delete from legacy dir if exists
legacy_path = self.legacy_dir / entry.content_hash
if legacy_path.exists():
@@ -599,15 +703,11 @@ class L1CacheManager:
entry = self.cache.get_entry(node_id)
if entry:
self.cache.remove(node_id)
if entry.content_hash in self._content_index:
del self._content_index[entry.content_hash]
self._del_content_index(entry.content_hash)
legacy_path = self.legacy_dir / entry.content_hash
if legacy_path.exists():
legacy_path.unlink()
if activity.intermediate_ids:
self._save_content_index()
# Remove activity record (inputs remain in cache)
self.activity_store.remove(activity_id)

View File

@@ -57,10 +57,7 @@ IPFS_GATEWAY_URL = os.environ.get("IPFS_GATEWAY_URL", "")
CACHE_DIR = Path(os.environ.get("CACHE_DIR", str(Path.home() / ".artdag" / "cache")))
CACHE_DIR.mkdir(parents=True, exist_ok=True)
# Initialize L1 cache manager (no L2 config - determined dynamically from token)
cache_manager = L1CacheManager(cache_dir=CACHE_DIR)
# Redis for persistent run storage
# Redis for persistent run storage and shared cache index (multi-worker support)
REDIS_URL = os.environ.get('REDIS_URL', 'redis://localhost:6379/5')
parsed = urlparse(REDIS_URL)
redis_client = redis.Redis(
@@ -73,6 +70,9 @@ redis_client = redis.Redis(
RUNS_KEY_PREFIX = "artdag:run:"
RECIPES_KEY_PREFIX = "artdag:recipe:"
# Initialize L1 cache manager with Redis for shared state between workers
cache_manager = L1CacheManager(cache_dir=CACHE_DIR, redis_client=redis_client)
def save_run(run: "RunStatus"):
"""Save run to Redis."""
@@ -4014,6 +4014,5 @@ async def ui_run_partial(run_id: str, request: Request):
if __name__ == "__main__":
import uvicorn
# Note: workers disabled because cache_manager uses in-memory state
# that isn't shared between worker processes
uvicorn.run("server:app", host="0.0.0.0", port=8100)
# Workers enabled - cache indexes shared via Redis
uvicorn.run("server:app", host="0.0.0.0", port=8100, workers=4)