From a0a4c08b9a4ac370a791fd19758c908e65d8ab41 Mon Sep 17 00:00:00 2001 From: gilesb Date: Fri, 9 Jan 2026 04:19:00 +0000 Subject: [PATCH] Use Redis for cache indexes - enables multi-worker scaling The cache_manager now uses Redis hashes for the content_index and ipfs_cids mappings. This allows multiple uvicorn workers to share state, so files added by one worker are immediately visible to all others. - Added redis_client parameter to L1CacheManager - Index lookups check Redis first, then fall back to in-memory - Index updates go to both Redis and JSON file (backup) - Migrates existing JSON indexes to Redis on first load - Re-enabled workers=4 in uvicorn Co-Authored-By: Claude Opus 4.5 --- cache_manager.py | 172 +++++++++++++++++++++++++++++++++++++---------- server.py | 13 ++-- 2 files changed, 142 insertions(+), 43 deletions(-) diff --git a/cache_manager.py b/cache_manager.py index a1b01c4..0d9718c 100644 --- a/cache_manager.py +++ b/cache_manager.py @@ -8,6 +8,7 @@ Integrates artdag's Cache, ActivityStore, and ActivityManager to provide: - Deletion rules enforcement (shared items protected) - L2 ActivityPub integration for "shared" status checks - IPFS as durable backing store (local cache as hot storage) +- Redis-backed indexes for multi-worker consistency """ import hashlib @@ -18,10 +19,13 @@ import shutil from dataclasses import dataclass from datetime import datetime, timezone from pathlib import Path -from typing import Callable, Dict, List, Optional, Set +from typing import Callable, Dict, List, Optional, Set, TYPE_CHECKING import requests +if TYPE_CHECKING: + import redis + from artdag import Cache, CacheEntry, DAG, Node, NodeType from artdag.activities import Activity, ActivityStore, ActivityManager, make_is_shared_fn @@ -134,10 +138,16 @@ class L1CacheManager: self, cache_dir: Path | str, l2_server: str = "http://localhost:8200", + redis_client: Optional["redis.Redis"] = None, ): self.cache_dir = Path(cache_dir) self.cache_dir.mkdir(parents=True, exist_ok=True) + # Redis for shared state between workers + self._redis = redis_client + self._redis_content_key = "artdag:content_index" + self._redis_ipfs_key = "artdag:ipfs_index" + # artdag components self.cache = Cache(self.cache_dir / "nodes") self.activity_store = ActivityStore(self.cache_dir / "activities") @@ -153,7 +163,7 @@ class L1CacheManager: ) # Content hash index: content_hash -> node_id - # This enables lookup by content_hash for API compatibility + # Uses Redis if available, falls back to in-memory dict self._content_index: Dict[str, str] = {} self._load_content_index() @@ -169,7 +179,23 @@ class L1CacheManager: return self.cache_dir / "content_index.json" def _load_content_index(self): - """Load content_hash -> node_id index.""" + """Load content_hash -> node_id index from Redis or JSON file.""" + # If Redis available and has data, use it + if self._redis: + try: + redis_data = self._redis.hgetall(self._redis_content_key) + if redis_data: + self._content_index = { + k.decode() if isinstance(k, bytes) else k: + v.decode() if isinstance(v, bytes) else v + for k, v in redis_data.items() + } + logger.info(f"Loaded {len(self._content_index)} content index entries from Redis") + return + except Exception as e: + logger.warning(f"Failed to load content index from Redis: {e}") + + # Fall back to JSON file if self._index_path().exists(): try: with open(self._index_path()) as f: @@ -183,16 +209,73 @@ class L1CacheManager: if entry.content_hash: self._content_index[entry.content_hash] = entry.node_id + # Migrate to Redis if available + if self._redis and self._content_index: + try: + self._redis.hset(self._redis_content_key, mapping=self._content_index) + logger.info(f"Migrated {len(self._content_index)} content index entries to Redis") + except Exception as e: + logger.warning(f"Failed to migrate content index to Redis: {e}") + def _save_content_index(self): - """Save content_hash -> node_id index.""" + """Save content_hash -> node_id index to Redis and JSON file.""" + # Always save to JSON as backup with open(self._index_path(), "w") as f: json.dump(self._content_index, f, indent=2) + def _set_content_index(self, content_hash: str, node_id: str): + """Set a single content index entry (Redis + in-memory).""" + self._content_index[content_hash] = node_id + if self._redis: + try: + self._redis.hset(self._redis_content_key, content_hash, node_id) + except Exception as e: + logger.warning(f"Failed to set content index in Redis: {e}") + self._save_content_index() + + def _get_content_index(self, content_hash: str) -> Optional[str]: + """Get a content index entry (Redis-first, then in-memory).""" + if self._redis: + try: + val = self._redis.hget(self._redis_content_key, content_hash) + if val: + return val.decode() if isinstance(val, bytes) else val + except Exception as e: + logger.warning(f"Failed to get content index from Redis: {e}") + return self._content_index.get(content_hash) + + def _del_content_index(self, content_hash: str): + """Delete a content index entry.""" + if content_hash in self._content_index: + del self._content_index[content_hash] + if self._redis: + try: + self._redis.hdel(self._redis_content_key, content_hash) + except Exception as e: + logger.warning(f"Failed to delete content index from Redis: {e}") + self._save_content_index() + def _ipfs_index_path(self) -> Path: return self.cache_dir / "ipfs_index.json" def _load_ipfs_index(self): - """Load content_hash -> ipfs_cid index.""" + """Load content_hash -> ipfs_cid index from Redis or JSON file.""" + # If Redis available and has data, use it + if self._redis: + try: + redis_data = self._redis.hgetall(self._redis_ipfs_key) + if redis_data: + self._ipfs_cids = { + k.decode() if isinstance(k, bytes) else k: + v.decode() if isinstance(v, bytes) else v + for k, v in redis_data.items() + } + logger.info(f"Loaded {len(self._ipfs_cids)} IPFS index entries from Redis") + return + except Exception as e: + logger.warning(f"Failed to load IPFS index from Redis: {e}") + + # Fall back to JSON file if self._ipfs_index_path().exists(): try: with open(self._ipfs_index_path()) as f: @@ -201,14 +284,43 @@ class L1CacheManager: logger.warning(f"Failed to load IPFS index: {e}") self._ipfs_cids = {} + # Migrate to Redis if available + if self._redis and self._ipfs_cids: + try: + self._redis.hset(self._redis_ipfs_key, mapping=self._ipfs_cids) + logger.info(f"Migrated {len(self._ipfs_cids)} IPFS index entries to Redis") + except Exception as e: + logger.warning(f"Failed to migrate IPFS index to Redis: {e}") + def _save_ipfs_index(self): - """Save content_hash -> ipfs_cid index.""" + """Save content_hash -> ipfs_cid index to JSON file (backup).""" with open(self._ipfs_index_path(), "w") as f: json.dump(self._ipfs_cids, f, indent=2) + def _set_ipfs_index(self, content_hash: str, ipfs_cid: str): + """Set a single IPFS index entry (Redis + in-memory).""" + self._ipfs_cids[content_hash] = ipfs_cid + if self._redis: + try: + self._redis.hset(self._redis_ipfs_key, content_hash, ipfs_cid) + except Exception as e: + logger.warning(f"Failed to set IPFS index in Redis: {e}") + self._save_ipfs_index() + + def _get_ipfs_cid_from_index(self, content_hash: str) -> Optional[str]: + """Get IPFS CID from index (Redis-first, then in-memory).""" + if self._redis: + try: + val = self._redis.hget(self._redis_ipfs_key, content_hash) + if val: + return val.decode() if isinstance(val, bytes) else val + except Exception as e: + logger.warning(f"Failed to get IPFS CID from Redis: {e}") + return self._ipfs_cids.get(content_hash) + def get_ipfs_cid(self, content_hash: str) -> Optional[str]: """Get IPFS CID for a content hash.""" - return self._ipfs_cids.get(content_hash) + return self._get_ipfs_cid_from_index(content_hash) def _is_shared_by_node_id(self, content_hash: str) -> bool: """Check if a content_hash is shared via L2.""" @@ -282,12 +394,11 @@ class L1CacheManager: existing = self.cache.get_entry(node_id) if existing and existing.output_path.exists(): # Already cached - still try to get IPFS CID if we don't have it - ipfs_cid = self._ipfs_cids.get(content_hash) + ipfs_cid = self._get_ipfs_cid_from_index(content_hash) if not ipfs_cid: ipfs_cid = ipfs_client.add_file(existing.output_path) if ipfs_cid: - self._ipfs_cids[content_hash] = ipfs_cid - self._save_ipfs_index() + self._set_ipfs_index(content_hash, ipfs_cid) return CachedFile.from_cache_entry(existing), ipfs_cid # Store in local cache @@ -301,15 +412,13 @@ class L1CacheManager: entry = self.cache.get_entry(node_id) - # Update content index - self._content_index[entry.content_hash] = node_id - self._save_content_index() + # Update content index (Redis + local) + self._set_content_index(entry.content_hash, node_id) # Upload to IPFS (async in background would be better, but sync for now) ipfs_cid = ipfs_client.add_file(entry.output_path) if ipfs_cid: - self._ipfs_cids[entry.content_hash] = ipfs_cid - self._save_ipfs_index() + self._set_ipfs_index(entry.content_hash, ipfs_cid) logger.info(f"Uploaded to IPFS: {entry.content_hash[:16]}... -> {ipfs_cid}") return CachedFile.from_cache_entry(entry), ipfs_cid @@ -321,8 +430,8 @@ class L1CacheManager: def get_by_content_hash(self, content_hash: str) -> Optional[Path]: """Get cached file path by content_hash. Falls back to IPFS if not in local cache.""" - # Check index first (new cache structure) - node_id = self._content_index.get(content_hash) + # Check index first (Redis then local) + node_id = self._get_content_index(content_hash) if node_id: path = self.cache.get(node_id) if path and path.exists(): @@ -334,16 +443,14 @@ class L1CacheManager: path = self.cache.get(content_hash) logger.debug(f" cache.get({content_hash[:16]}...) returned: {path}") if path and path.exists(): - self._content_index[content_hash] = content_hash - self._save_content_index() + self._set_content_index(content_hash, content_hash) return path # Scan cache entries (fallback for new structure) entry = self.cache.find_by_content_hash(content_hash) if entry and entry.output_path.exists(): logger.debug(f" Found via scan: {entry.output_path}") - self._content_index[content_hash] = entry.node_id - self._save_content_index() + self._set_content_index(content_hash, entry.node_id) return entry.output_path # Check legacy location (files stored directly as CACHE_DIR/{content_hash}) @@ -352,7 +459,7 @@ class L1CacheManager: return legacy_path # Try to recover from IPFS if we have a CID - ipfs_cid = self._ipfs_cids.get(content_hash) + ipfs_cid = self._get_ipfs_cid_from_index(content_hash) if ipfs_cid: logger.info(f"Recovering from IPFS: {content_hash[:16]}... ({ipfs_cid})") recovery_path = self.legacy_dir / content_hash @@ -368,7 +475,7 @@ class L1CacheManager: def get_entry_by_content_hash(self, content_hash: str) -> Optional[CacheEntry]: """Get cache entry by content_hash.""" - node_id = self._content_index.get(content_hash) + node_id = self._get_content_index(content_hash) if node_id: return self.cache.get_entry(node_id) return self.cache.find_by_content_hash(content_hash) @@ -479,7 +586,7 @@ class L1CacheManager: return False, f"Item is pinned ({reason})" # Find node_id for this content - node_id = self._content_index.get(content_hash, content_hash) + node_id = self._get_content_index(content_hash) or content_hash # Check if it's an input or output of any activity for activity in self.activity_store.list(): @@ -525,11 +632,10 @@ class L1CacheManager: return False, reason # Find and delete - node_id = self._content_index.get(content_hash) + node_id = self._get_content_index(content_hash) if node_id: self.cache.remove(node_id) - del self._content_index[content_hash] - self._save_content_index() + self._del_content_index(content_hash) return True, "Deleted" # Try legacy @@ -585,10 +691,8 @@ class L1CacheManager: if entry: # Remove from cache self.cache.remove(activity.output_id) - # Remove from content index - if entry.content_hash in self._content_index: - del self._content_index[entry.content_hash] - self._save_content_index() + # Remove from content index (Redis + local) + self._del_content_index(entry.content_hash) # Delete from legacy dir if exists legacy_path = self.legacy_dir / entry.content_hash if legacy_path.exists(): @@ -599,15 +703,11 @@ class L1CacheManager: entry = self.cache.get_entry(node_id) if entry: self.cache.remove(node_id) - if entry.content_hash in self._content_index: - del self._content_index[entry.content_hash] + self._del_content_index(entry.content_hash) legacy_path = self.legacy_dir / entry.content_hash if legacy_path.exists(): legacy_path.unlink() - if activity.intermediate_ids: - self._save_content_index() - # Remove activity record (inputs remain in cache) self.activity_store.remove(activity_id) diff --git a/server.py b/server.py index cbd5053..bde8948 100644 --- a/server.py +++ b/server.py @@ -57,10 +57,7 @@ IPFS_GATEWAY_URL = os.environ.get("IPFS_GATEWAY_URL", "") CACHE_DIR = Path(os.environ.get("CACHE_DIR", str(Path.home() / ".artdag" / "cache"))) CACHE_DIR.mkdir(parents=True, exist_ok=True) -# Initialize L1 cache manager (no L2 config - determined dynamically from token) -cache_manager = L1CacheManager(cache_dir=CACHE_DIR) - -# Redis for persistent run storage +# Redis for persistent run storage and shared cache index (multi-worker support) REDIS_URL = os.environ.get('REDIS_URL', 'redis://localhost:6379/5') parsed = urlparse(REDIS_URL) redis_client = redis.Redis( @@ -73,6 +70,9 @@ redis_client = redis.Redis( RUNS_KEY_PREFIX = "artdag:run:" RECIPES_KEY_PREFIX = "artdag:recipe:" +# Initialize L1 cache manager with Redis for shared state between workers +cache_manager = L1CacheManager(cache_dir=CACHE_DIR, redis_client=redis_client) + def save_run(run: "RunStatus"): """Save run to Redis.""" @@ -4014,6 +4014,5 @@ async def ui_run_partial(run_id: str, request: Request): if __name__ == "__main__": import uvicorn - # Note: workers disabled because cache_manager uses in-memory state - # that isn't shared between worker processes - uvicorn.run("server:app", host="0.0.0.0", port=8100) + # Workers enabled - cache indexes shared via Redis + uvicorn.run("server:app", host="0.0.0.0", port=8100, workers=4)