str:
- """Compute SHA3-256 hash of a file."""
+ """Compute local content hash (fallback when IPFS unavailable)."""
hasher = hashlib.new(algorithm)
actual_path = path.resolve() if path.is_symlink() else path
with open(actual_path, "rb") as f:
@@ -51,10 +51,10 @@ class CachedFile:
Provides a unified view combining:
- node_id: computation identity (for DAG caching)
- - content_hash: file content identity (for external references)
+ - cid: file content identity (for external references)
"""
node_id: str
- content_hash: str
+ cid: str
path: Path
size_bytes: int
node_type: str
@@ -64,7 +64,7 @@ class CachedFile:
def from_cache_entry(cls, entry: CacheEntry) -> "CachedFile":
return cls(
node_id=entry.node_id,
- content_hash=entry.content_hash,
+ cid=entry.cid,
path=entry.output_path,
size_bytes=entry.size_bytes,
node_type=entry.node_type,
@@ -84,41 +84,41 @@ class L2SharedChecker:
self.cache_ttl = cache_ttl
self._cache: Dict[str, tuple[bool, float]] = {}
- def is_shared(self, content_hash: str) -> bool:
- """Check if content_hash has been published to L2."""
+ def is_shared(self, cid: str) -> bool:
+ """Check if cid has been published to L2."""
import time
now = time.time()
# Check cache
- if content_hash in self._cache:
- is_shared, cached_at = self._cache[content_hash]
+ if cid in self._cache:
+ is_shared, cached_at = self._cache[cid]
if now - cached_at < self.cache_ttl:
- logger.debug(f"L2 check (cached): {content_hash[:16]}... = {is_shared}")
+ logger.debug(f"L2 check (cached): {cid[:16]}... = {is_shared}")
return is_shared
# Query L2
try:
- url = f"{self.l2_server}/assets/by-hash/{content_hash}"
+ url = f"{self.l2_server}/assets/by-hash/{cid}"
logger.info(f"L2 check: GET {url}")
resp = requests.get(url, timeout=5)
logger.info(f"L2 check response: {resp.status_code}")
is_shared = resp.status_code == 200
except Exception as e:
- logger.warning(f"Failed to check L2 for {content_hash}: {e}")
+ logger.warning(f"Failed to check L2 for {cid}: {e}")
# On error, assume IS shared (safer - prevents accidental deletion)
is_shared = True
- self._cache[content_hash] = (is_shared, now)
+ self._cache[cid] = (is_shared, now)
return is_shared
- def invalidate(self, content_hash: str):
- """Invalidate cache for a content_hash (call after publishing)."""
- self._cache.pop(content_hash, None)
+ def invalidate(self, cid: str):
+ """Invalidate cache for a cid (call after publishing)."""
+ self._cache.pop(cid, None)
- def mark_shared(self, content_hash: str):
+ def mark_shared(self, cid: str):
"""Mark as shared without querying (call after successful publish)."""
import time
- self._cache[content_hash] = (True, time.time())
+ self._cache[cid] = (True, time.time())
class L1CacheManager:
@@ -131,7 +131,7 @@ class L1CacheManager:
- ActivityManager for deletion rules
- L2 integration for shared status
- Provides both node_id and content_hash based access.
+ Provides both node_id and cid based access.
"""
def __init__(
@@ -162,16 +162,16 @@ class L1CacheManager:
is_shared_fn=self._is_shared_by_node_id,
)
- # Content hash index: content_hash -> node_id
+ # Content hash index: cid -> node_id
# Uses Redis if available, falls back to in-memory dict
self._content_index: Dict[str, str] = {}
self._load_content_index()
- # IPFS CID index: content_hash -> ipfs_cid
+ # IPFS CID index: cid -> ipfs_cid
self._ipfs_cids: Dict[str, str] = {}
self._load_ipfs_index()
- # Legacy files directory (for files uploaded directly by content_hash)
+ # Legacy files directory (for files uploaded directly by cid)
self.legacy_dir = self.cache_dir / "legacy"
self.legacy_dir.mkdir(parents=True, exist_ok=True)
@@ -179,7 +179,7 @@ class L1CacheManager:
return self.cache_dir / "content_index.json"
def _load_content_index(self):
- """Load content_hash -> node_id index from Redis or JSON file."""
+ """Load cid -> node_id index from Redis or JSON file."""
# If Redis available and has data, use it
if self._redis:
try:
@@ -206,8 +206,8 @@ class L1CacheManager:
# Also index from existing cache entries
for entry in self.cache.list_entries():
- if entry.content_hash:
- self._content_index[entry.content_hash] = entry.node_id
+ if entry.cid:
+ self._content_index[entry.cid] = entry.node_id
# Migrate to Redis if available
if self._redis and self._content_index:
@@ -218,39 +218,39 @@ class L1CacheManager:
logger.warning(f"Failed to migrate content index to Redis: {e}")
def _save_content_index(self):
- """Save content_hash -> node_id index to Redis and JSON file."""
+ """Save cid -> node_id index to Redis and JSON file."""
# Always save to JSON as backup
with open(self._index_path(), "w") as f:
json.dump(self._content_index, f, indent=2)
- def _set_content_index(self, content_hash: str, node_id: str):
+ def _set_content_index(self, cid: str, node_id: str):
"""Set a single content index entry (Redis + in-memory)."""
- self._content_index[content_hash] = node_id
+ self._content_index[cid] = node_id
if self._redis:
try:
- self._redis.hset(self._redis_content_key, content_hash, node_id)
+ self._redis.hset(self._redis_content_key, cid, node_id)
except Exception as e:
logger.warning(f"Failed to set content index in Redis: {e}")
self._save_content_index()
- def _get_content_index(self, content_hash: str) -> Optional[str]:
+ def _get_content_index(self, cid: str) -> Optional[str]:
"""Get a content index entry (Redis-first, then in-memory)."""
if self._redis:
try:
- val = self._redis.hget(self._redis_content_key, content_hash)
+ val = self._redis.hget(self._redis_content_key, cid)
if val:
return val.decode() if isinstance(val, bytes) else val
except Exception as e:
logger.warning(f"Failed to get content index from Redis: {e}")
- return self._content_index.get(content_hash)
+ return self._content_index.get(cid)
- def _del_content_index(self, content_hash: str):
+ def _del_content_index(self, cid: str):
"""Delete a content index entry."""
- if content_hash in self._content_index:
- del self._content_index[content_hash]
+ if cid in self._content_index:
+ del self._content_index[cid]
if self._redis:
try:
- self._redis.hdel(self._redis_content_key, content_hash)
+ self._redis.hdel(self._redis_content_key, cid)
except Exception as e:
logger.warning(f"Failed to delete content index from Redis: {e}")
self._save_content_index()
@@ -259,7 +259,7 @@ class L1CacheManager:
return self.cache_dir / "ipfs_index.json"
def _load_ipfs_index(self):
- """Load content_hash -> ipfs_cid index from Redis or JSON file."""
+ """Load cid -> ipfs_cid index from Redis or JSON file."""
# If Redis available and has data, use it
if self._redis:
try:
@@ -293,71 +293,71 @@ class L1CacheManager:
logger.warning(f"Failed to migrate IPFS index to Redis: {e}")
def _save_ipfs_index(self):
- """Save content_hash -> ipfs_cid index to JSON file (backup)."""
+ """Save cid -> ipfs_cid index to JSON file (backup)."""
with open(self._ipfs_index_path(), "w") as f:
json.dump(self._ipfs_cids, f, indent=2)
- def _set_ipfs_index(self, content_hash: str, ipfs_cid: str):
+ def _set_ipfs_index(self, cid: str, ipfs_cid: str):
"""Set a single IPFS index entry (Redis + in-memory)."""
- self._ipfs_cids[content_hash] = ipfs_cid
+ self._ipfs_cids[cid] = ipfs_cid
if self._redis:
try:
- self._redis.hset(self._redis_ipfs_key, content_hash, ipfs_cid)
+ self._redis.hset(self._redis_ipfs_key, cid, ipfs_cid)
except Exception as e:
logger.warning(f"Failed to set IPFS index in Redis: {e}")
self._save_ipfs_index()
- def _get_ipfs_cid_from_index(self, content_hash: str) -> Optional[str]:
+ def _get_ipfs_cid_from_index(self, cid: str) -> Optional[str]:
"""Get IPFS CID from index (Redis-first, then in-memory)."""
if self._redis:
try:
- val = self._redis.hget(self._redis_ipfs_key, content_hash)
+ val = self._redis.hget(self._redis_ipfs_key, cid)
if val:
return val.decode() if isinstance(val, bytes) else val
except Exception as e:
logger.warning(f"Failed to get IPFS CID from Redis: {e}")
- return self._ipfs_cids.get(content_hash)
+ return self._ipfs_cids.get(cid)
- def get_ipfs_cid(self, content_hash: str) -> Optional[str]:
+ def get_ipfs_cid(self, cid: str) -> Optional[str]:
"""Get IPFS CID for a content hash."""
- return self._get_ipfs_cid_from_index(content_hash)
+ return self._get_ipfs_cid_from_index(cid)
- def _is_shared_by_node_id(self, content_hash: str) -> bool:
- """Check if a content_hash is shared via L2."""
- return self.l2_checker.is_shared(content_hash)
+ def _is_shared_by_node_id(self, cid: str) -> bool:
+ """Check if a cid is shared via L2."""
+ return self.l2_checker.is_shared(cid)
- def _load_meta(self, content_hash: str) -> dict:
+ def _load_meta(self, cid: str) -> dict:
"""Load metadata for a cached file."""
- meta_path = self.cache_dir / f"{content_hash}.meta.json"
+ meta_path = self.cache_dir / f"{cid}.meta.json"
if meta_path.exists():
with open(meta_path) as f:
return json.load(f)
return {}
- def is_pinned(self, content_hash: str) -> tuple[bool, str]:
+ def is_pinned(self, cid: str) -> tuple[bool, str]:
"""
- Check if a content_hash is pinned (non-deletable).
+ Check if a cid is pinned (non-deletable).
Returns:
(is_pinned, reason) tuple
"""
- meta = self._load_meta(content_hash)
+ meta = self._load_meta(cid)
if meta.get("pinned"):
return True, meta.get("pin_reason", "published")
return False, ""
- def _save_meta(self, content_hash: str, **updates) -> dict:
+ def _save_meta(self, cid: str, **updates) -> dict:
"""Save/update metadata for a cached file."""
- meta = self._load_meta(content_hash)
+ meta = self._load_meta(cid)
meta.update(updates)
- meta_path = self.cache_dir / f"{content_hash}.meta.json"
+ meta_path = self.cache_dir / f"{cid}.meta.json"
with open(meta_path, "w") as f:
json.dump(meta, f, indent=2)
return meta
- def pin(self, content_hash: str, reason: str = "published") -> None:
+ def pin(self, cid: str, reason: str = "published") -> None:
"""Mark an item as pinned (non-deletable)."""
- self._save_meta(content_hash, pinned=True, pin_reason=reason)
+ self._save_meta(cid, pinned=True, pin_reason=reason)
# ============ File Storage ============
@@ -375,31 +375,28 @@ class L1CacheManager:
Args:
source_path: Path to file to cache
node_type: Type of node (e.g., "upload", "source", "effect")
- node_id: Optional node_id; if not provided, uses content_hash
+ node_id: Optional node_id; if not provided, uses CID
execution_time: How long the operation took
move: If True, move instead of copy
Returns:
- Tuple of (CachedFile with both node_id and content_hash, IPFS CID or None)
+ Tuple of (CachedFile with both node_id and cid, CID)
"""
- # Compute content hash first
- content_hash = file_hash(source_path)
+ # Upload to IPFS first to get the CID (primary identifier)
+ cid = ipfs_client.add_file(source_path)
+ if not cid:
+ # Fallback to local hash if IPFS unavailable
+ cid = file_hash(source_path)
+ logger.warning(f"IPFS unavailable, using local hash: {cid[:16]}...")
- # Use content_hash as node_id if not provided
- # This is for legacy/uploaded files that don't have a DAG node
+ # Use CID as node_id if not provided
if node_id is None:
- node_id = content_hash
+ node_id = cid
# Check if already cached (by node_id)
existing = self.cache.get_entry(node_id)
if existing and existing.output_path.exists():
- # Already cached - still try to get IPFS CID if we don't have it
- ipfs_cid = self._get_ipfs_cid_from_index(content_hash)
- if not ipfs_cid:
- ipfs_cid = ipfs_client.add_file(existing.output_path)
- if ipfs_cid:
- self._set_ipfs_index(content_hash, ipfs_cid)
- return CachedFile.from_cache_entry(existing), ipfs_cid
+ return CachedFile.from_cache_entry(existing), cid
# Store in local cache
self.cache.put(
@@ -412,16 +409,12 @@ class L1CacheManager:
entry = self.cache.get_entry(node_id)
- # Update content index (Redis + local)
- self._set_content_index(entry.content_hash, node_id)
+ # Update content index (CID -> node_id mapping)
+ self._set_content_index(cid, node_id)
- # Upload to IPFS (async in background would be better, but sync for now)
- ipfs_cid = ipfs_client.add_file(entry.output_path)
- if ipfs_cid:
- self._set_ipfs_index(entry.content_hash, ipfs_cid)
- logger.info(f"Uploaded to IPFS: {entry.content_hash[:16]}... -> {ipfs_cid}")
+ logger.info(f"Cached: {cid[:16]}...")
- return CachedFile.from_cache_entry(entry), ipfs_cid
+ return CachedFile.from_cache_entry(entry), cid
def get_by_node_id(self, node_id: str) -> Optional[Path]:
"""Get cached file path by node_id."""
@@ -432,46 +425,46 @@ class L1CacheManager:
# CIDv0 starts with "Qm", CIDv1 starts with "bafy" or other multibase prefixes
return identifier.startswith("Qm") or identifier.startswith("bafy") or identifier.startswith("baf")
- def get_by_content_hash(self, content_hash: str) -> Optional[Path]:
- """Get cached file path by content_hash or IPFS CID. Falls back to IPFS if not in local cache."""
+ def get_by_cid(self, cid: str) -> Optional[Path]:
+ """Get cached file path by cid or IPFS CID. Falls back to IPFS if not in local cache."""
# If it looks like an IPFS CID, use get_by_cid instead
- if self._is_ipfs_cid(content_hash):
- return self.get_by_cid(content_hash)
+ if self._is_ipfs_cid(cid):
+ return self.get_by_cid(cid)
# Check index first (Redis then local)
- node_id = self._get_content_index(content_hash)
+ node_id = self._get_content_index(cid)
if node_id:
path = self.cache.get(node_id)
if path and path.exists():
logger.debug(f" Found via index: {path}")
return path
- # For uploads, node_id == content_hash, so try direct lookup
+ # For uploads, node_id == cid, so try direct lookup
# This works even if cache index hasn't been reloaded
- path = self.cache.get(content_hash)
- logger.debug(f" cache.get({content_hash[:16]}...) returned: {path}")
+ path = self.cache.get(cid)
+ logger.debug(f" cache.get({cid[:16]}...) returned: {path}")
if path and path.exists():
- self._set_content_index(content_hash, content_hash)
+ self._set_content_index(cid, cid)
return path
# Scan cache entries (fallback for new structure)
- entry = self.cache.find_by_content_hash(content_hash)
+ entry = self.cache.find_by_cid(cid)
if entry and entry.output_path.exists():
logger.debug(f" Found via scan: {entry.output_path}")
- self._set_content_index(content_hash, entry.node_id)
+ self._set_content_index(cid, entry.node_id)
return entry.output_path
- # Check legacy location (files stored directly as CACHE_DIR/{content_hash})
- legacy_path = self.cache_dir / content_hash
+ # Check legacy location (files stored directly as CACHE_DIR/{cid})
+ legacy_path = self.cache_dir / cid
if legacy_path.exists() and legacy_path.is_file():
return legacy_path
# Try to recover from IPFS if we have a CID
- ipfs_cid = self._get_ipfs_cid_from_index(content_hash)
+ ipfs_cid = self._get_ipfs_cid_from_index(cid)
if ipfs_cid:
- logger.info(f"Recovering from IPFS: {content_hash[:16]}... ({ipfs_cid})")
- recovery_path = self.legacy_dir / content_hash
+ logger.info(f"Recovering from IPFS: {cid[:16]}... ({ipfs_cid})")
+ recovery_path = self.legacy_dir / cid
if ipfs_client.get_file(ipfs_cid, recovery_path):
logger.info(f"Recovered from IPFS: {recovery_path}")
return recovery_path
@@ -504,16 +497,16 @@ class L1CacheManager:
return None
- def has_content(self, content_hash: str) -> bool:
+ def has_content(self, cid: str) -> bool:
"""Check if content exists in cache."""
- return self.get_by_content_hash(content_hash) is not None
+ return self.get_by_cid(cid) is not None
- def get_entry_by_content_hash(self, content_hash: str) -> Optional[CacheEntry]:
- """Get cache entry by content_hash."""
- node_id = self._get_content_index(content_hash)
+ def get_entry_by_cid(self, cid: str) -> Optional[CacheEntry]:
+ """Get cache entry by cid."""
+ node_id = self._get_content_index(cid)
if node_id:
return self.cache.get_entry(node_id)
- return self.cache.find_by_content_hash(content_hash)
+ return self.cache.find_by_cid(cid)
def list_all(self) -> List[CachedFile]:
"""List all cached files."""
@@ -523,11 +516,11 @@ class L1CacheManager:
# New cache structure entries
for entry in self.cache.list_entries():
files.append(CachedFile.from_cache_entry(entry))
- if entry.content_hash:
- seen_hashes.add(entry.content_hash)
+ if entry.cid:
+ seen_hashes.add(entry.cid)
# Legacy files stored directly in cache_dir (old structure)
- # These are files named by content_hash directly in CACHE_DIR
+ # These are files named by cid directly in CACHE_DIR
for f in self.cache_dir.iterdir():
# Skip directories and special files
if not f.is_file():
@@ -544,7 +537,7 @@ class L1CacheManager:
files.append(CachedFile(
node_id=f.name,
- content_hash=f.name,
+ cid=f.name,
path=f,
size_bytes=f.stat().st_size,
node_type="legacy",
@@ -566,8 +559,8 @@ class L1CacheManager:
"""
hashes = []
for entry in self.cache.list_entries():
- if entry.node_type == node_type and entry.content_hash:
- hashes.append(entry.content_hash)
+ if entry.node_type == node_type and entry.cid:
+ hashes.append(entry.cid)
return hashes
# ============ Activity Tracking ============
@@ -590,19 +583,19 @@ class L1CacheManager:
def record_simple_activity(
self,
input_hashes: List[str],
- output_hash: str,
+ output_cid: str,
run_id: str = None,
) -> Activity:
"""
Record a simple (non-DAG) execution as an activity.
For legacy single-effect runs that don't use full DAG execution.
- Uses content_hash as node_id.
+ Uses cid as node_id.
"""
activity = Activity(
- activity_id=run_id or str(hash((tuple(input_hashes), output_hash))),
+ activity_id=run_id or str(hash((tuple(input_hashes), output_cid))),
input_ids=sorted(input_hashes),
- output_id=output_hash,
+ output_id=output_cid,
intermediate_ids=[],
created_at=datetime.now(timezone.utc).timestamp(),
status="completed",
@@ -624,7 +617,7 @@ class L1CacheManager:
# ============ Deletion Rules ============
- def can_delete(self, content_hash: str) -> tuple[bool, str]:
+ def can_delete(self, cid: str) -> tuple[bool, str]:
"""
Check if a cached item can be deleted.
@@ -632,12 +625,12 @@ class L1CacheManager:
(can_delete, reason) tuple
"""
# Check if pinned (published or input to published)
- pinned, reason = self.is_pinned(content_hash)
+ pinned, reason = self.is_pinned(cid)
if pinned:
return False, f"Item is pinned ({reason})"
# Find node_id for this content
- node_id = self._get_content_index(content_hash) or content_hash
+ node_id = self._get_content_index(cid) or cid
# Check if it's an input or output of any activity
for activity in self.activity_store.list():
@@ -663,34 +656,34 @@ class L1CacheManager:
for node_id in activity.all_node_ids:
entry = self.cache.get_entry(node_id)
if entry:
- pinned, reason = self.is_pinned(entry.content_hash)
+ pinned, reason = self.is_pinned(entry.cid)
if pinned:
return False, f"Item {node_id} is pinned ({reason})"
return True, "OK"
- def delete_by_content_hash(self, content_hash: str) -> tuple[bool, str]:
+ def delete_by_cid(self, cid: str) -> tuple[bool, str]:
"""
- Delete a cached item by content_hash.
+ Delete a cached item by cid.
Enforces deletion rules.
Returns:
(success, message) tuple
"""
- can_delete, reason = self.can_delete(content_hash)
+ can_delete, reason = self.can_delete(cid)
if not can_delete:
return False, reason
# Find and delete
- node_id = self._get_content_index(content_hash)
+ node_id = self._get_content_index(cid)
if node_id:
self.cache.remove(node_id)
- self._del_content_index(content_hash)
+ self._del_content_index(cid)
return True, "Deleted"
# Try legacy
- legacy_path = self.legacy_dir / content_hash
+ legacy_path = self.legacy_dir / cid
if legacy_path.exists():
legacy_path.unlink()
return True, "Deleted (legacy)"
@@ -732,7 +725,7 @@ class L1CacheManager:
if activity.output_id:
entry = self.cache.get_entry(activity.output_id)
if entry:
- pinned, reason = self.is_pinned(entry.content_hash)
+ pinned, reason = self.is_pinned(entry.cid)
if pinned:
return False, f"Output is pinned ({reason})"
@@ -743,9 +736,9 @@ class L1CacheManager:
# Remove from cache
self.cache.remove(activity.output_id)
# Remove from content index (Redis + local)
- self._del_content_index(entry.content_hash)
+ self._del_content_index(entry.cid)
# Delete from legacy dir if exists
- legacy_path = self.legacy_dir / entry.content_hash
+ legacy_path = self.legacy_dir / entry.cid
if legacy_path.exists():
legacy_path.unlink()
@@ -754,8 +747,8 @@ class L1CacheManager:
entry = self.cache.get_entry(node_id)
if entry:
self.cache.remove(node_id)
- self._del_content_index(entry.content_hash)
- legacy_path = self.legacy_dir / entry.content_hash
+ self._del_content_index(entry.cid)
+ legacy_path = self.legacy_dir / entry.cid
if legacy_path.exists():
legacy_path.unlink()
@@ -777,13 +770,13 @@ class L1CacheManager:
# ============ L2 Integration ============
- def mark_published(self, content_hash: str):
- """Mark a content_hash as published to L2."""
- self.l2_checker.mark_shared(content_hash)
+ def mark_published(self, cid: str):
+ """Mark a cid as published to L2."""
+ self.l2_checker.mark_shared(cid)
- def invalidate_shared_cache(self, content_hash: str):
+ def invalidate_shared_cache(self, cid: str):
"""Invalidate shared status cache (call if item might be unpublished)."""
- self.l2_checker.invalidate(content_hash)
+ self.l2_checker.invalidate(cid)
# ============ Stats ============
diff --git a/database.py b/database.py
index 0cf1bad..21b5818 100644
--- a/database.py
+++ b/database.py
@@ -19,7 +19,7 @@ SCHEMA_SQL = """
-- Core cache: just content hash and IPFS CID
-- Physical file storage - shared by all users
CREATE TABLE IF NOT EXISTS cache_items (
- content_hash VARCHAR(64) PRIMARY KEY,
+ cid VARCHAR(64) PRIMARY KEY,
ipfs_cid VARCHAR(128),
created_at TIMESTAMP WITH TIME ZONE DEFAULT NOW()
);
@@ -28,7 +28,7 @@ CREATE TABLE IF NOT EXISTS cache_items (
-- actor_id format: @username@server (ActivityPub style)
CREATE TABLE IF NOT EXISTS item_types (
id SERIAL PRIMARY KEY,
- content_hash VARCHAR(64) REFERENCES cache_items(content_hash) ON DELETE CASCADE,
+ cid VARCHAR(64) REFERENCES cache_items(cid) ON DELETE CASCADE,
actor_id VARCHAR(255) NOT NULL,
type VARCHAR(50) NOT NULL,
path VARCHAR(255),
@@ -40,7 +40,7 @@ CREATE TABLE IF NOT EXISTS item_types (
filename VARCHAR(255),
metadata JSONB DEFAULT '{}',
created_at TIMESTAMP WITH TIME ZONE DEFAULT NOW(),
- UNIQUE(content_hash, actor_id, type, path)
+ UNIQUE(cid, actor_id, type, path)
);
-- Add columns if they don't exist (for existing databases)
@@ -61,7 +61,7 @@ CREATE TABLE IF NOT EXISTS pin_reasons (
-- L2 shares: per-user shares (includes content_type for role when shared)
CREATE TABLE IF NOT EXISTS l2_shares (
id SERIAL PRIMARY KEY,
- content_hash VARCHAR(64) REFERENCES cache_items(content_hash) ON DELETE CASCADE,
+ cid VARCHAR(64) REFERENCES cache_items(cid) ON DELETE CASCADE,
actor_id VARCHAR(255) NOT NULL,
l2_server VARCHAR(255) NOT NULL,
asset_name VARCHAR(255) NOT NULL,
@@ -69,7 +69,7 @@ CREATE TABLE IF NOT EXISTS l2_shares (
content_type VARCHAR(50) NOT NULL,
published_at TIMESTAMP WITH TIME ZONE DEFAULT NOW(),
last_synced_at TIMESTAMP WITH TIME ZONE,
- UNIQUE(content_hash, actor_id, l2_server, content_type)
+ UNIQUE(cid, actor_id, l2_server, content_type)
);
-- Add activity_id column if it doesn't exist (for existing databases)
@@ -82,7 +82,7 @@ END $$;
-- run_id is a hash of (sorted inputs + recipe), making runs deterministic
CREATE TABLE IF NOT EXISTS run_cache (
run_id VARCHAR(64) PRIMARY KEY,
- output_hash VARCHAR(64) NOT NULL,
+ output_cid VARCHAR(64) NOT NULL,
ipfs_cid VARCHAR(128),
provenance_cid VARCHAR(128),
recipe VARCHAR(255) NOT NULL,
@@ -128,27 +128,27 @@ CREATE TABLE IF NOT EXISTS storage_backends (
-- Storage pins tracking (what's pinned where)
CREATE TABLE IF NOT EXISTS storage_pins (
id SERIAL PRIMARY KEY,
- content_hash VARCHAR(64) NOT NULL,
+ cid VARCHAR(64) NOT NULL,
storage_id INTEGER NOT NULL REFERENCES storage_backends(id) ON DELETE CASCADE,
ipfs_cid VARCHAR(128),
pin_type VARCHAR(20) NOT NULL, -- 'user_content', 'donated', 'system'
size_bytes BIGINT,
pinned_at TIMESTAMP WITH TIME ZONE DEFAULT NOW(),
- UNIQUE(content_hash, storage_id)
+ UNIQUE(cid, storage_id)
);
-- Indexes
-CREATE INDEX IF NOT EXISTS idx_item_types_content_hash ON item_types(content_hash);
+CREATE INDEX IF NOT EXISTS idx_item_types_cid ON item_types(cid);
CREATE INDEX IF NOT EXISTS idx_item_types_actor_id ON item_types(actor_id);
CREATE INDEX IF NOT EXISTS idx_item_types_type ON item_types(type);
CREATE INDEX IF NOT EXISTS idx_item_types_path ON item_types(path);
CREATE INDEX IF NOT EXISTS idx_pin_reasons_item_type ON pin_reasons(item_type_id);
-CREATE INDEX IF NOT EXISTS idx_l2_shares_content_hash ON l2_shares(content_hash);
+CREATE INDEX IF NOT EXISTS idx_l2_shares_cid ON l2_shares(cid);
CREATE INDEX IF NOT EXISTS idx_l2_shares_actor_id ON l2_shares(actor_id);
-CREATE INDEX IF NOT EXISTS idx_run_cache_output ON run_cache(output_hash);
+CREATE INDEX IF NOT EXISTS idx_run_cache_output ON run_cache(output_cid);
CREATE INDEX IF NOT EXISTS idx_storage_backends_actor ON storage_backends(actor_id);
CREATE INDEX IF NOT EXISTS idx_storage_backends_type ON storage_backends(provider_type);
-CREATE INDEX IF NOT EXISTS idx_storage_pins_hash ON storage_pins(content_hash);
+CREATE INDEX IF NOT EXISTS idx_storage_pins_hash ON storage_pins(cid);
CREATE INDEX IF NOT EXISTS idx_storage_pins_storage ON storage_pins(storage_id);
"""
@@ -171,47 +171,47 @@ async def close_db():
# ============ Cache Items ============
-async def create_cache_item(content_hash: str, ipfs_cid: Optional[str] = None) -> dict:
+async def create_cache_item(cid: str, ipfs_cid: Optional[str] = None) -> dict:
"""Create a cache item. Returns the created item."""
async with pool.acquire() as conn:
row = await conn.fetchrow(
"""
- INSERT INTO cache_items (content_hash, ipfs_cid)
+ INSERT INTO cache_items (cid, ipfs_cid)
VALUES ($1, $2)
- ON CONFLICT (content_hash) DO UPDATE SET ipfs_cid = COALESCE($2, cache_items.ipfs_cid)
- RETURNING content_hash, ipfs_cid, created_at
+ ON CONFLICT (cid) DO UPDATE SET ipfs_cid = COALESCE($2, cache_items.ipfs_cid)
+ RETURNING cid, ipfs_cid, created_at
""",
- content_hash, ipfs_cid
+ cid, ipfs_cid
)
return dict(row)
-async def get_cache_item(content_hash: str) -> Optional[dict]:
+async def get_cache_item(cid: str) -> Optional[dict]:
"""Get a cache item by content hash."""
async with pool.acquire() as conn:
row = await conn.fetchrow(
- "SELECT content_hash, ipfs_cid, created_at FROM cache_items WHERE content_hash = $1",
- content_hash
+ "SELECT cid, ipfs_cid, created_at FROM cache_items WHERE cid = $1",
+ cid
)
return dict(row) if row else None
-async def update_cache_item_ipfs_cid(content_hash: str, ipfs_cid: str) -> bool:
+async def update_cache_item_ipfs_cid(cid: str, ipfs_cid: str) -> bool:
"""Update the IPFS CID for a cache item."""
async with pool.acquire() as conn:
result = await conn.execute(
- "UPDATE cache_items SET ipfs_cid = $2 WHERE content_hash = $1",
- content_hash, ipfs_cid
+ "UPDATE cache_items SET ipfs_cid = $2 WHERE cid = $1",
+ cid, ipfs_cid
)
return result == "UPDATE 1"
-async def delete_cache_item(content_hash: str) -> bool:
+async def delete_cache_item(cid: str) -> bool:
"""Delete a cache item and all associated data (cascades)."""
async with pool.acquire() as conn:
result = await conn.execute(
- "DELETE FROM cache_items WHERE content_hash = $1",
- content_hash
+ "DELETE FROM cache_items WHERE cid = $1",
+ cid
)
return result == "DELETE 1"
@@ -221,7 +221,7 @@ async def list_cache_items(limit: int = 100, offset: int = 0) -> List[dict]:
async with pool.acquire() as conn:
rows = await conn.fetch(
"""
- SELECT content_hash, ipfs_cid, created_at
+ SELECT cid, ipfs_cid, created_at
FROM cache_items
ORDER BY created_at DESC
LIMIT $1 OFFSET $2
@@ -234,7 +234,7 @@ async def list_cache_items(limit: int = 100, offset: int = 0) -> List[dict]:
# ============ Item Types ============
async def add_item_type(
- content_hash: str,
+ cid: str,
actor_id: str,
item_type: str,
path: Optional[str] = None,
@@ -247,72 +247,72 @@ async def add_item_type(
async with pool.acquire() as conn:
# Ensure cache_item exists
await conn.execute(
- "INSERT INTO cache_items (content_hash) VALUES ($1) ON CONFLICT DO NOTHING",
- content_hash
+ "INSERT INTO cache_items (cid) VALUES ($1) ON CONFLICT DO NOTHING",
+ cid
)
# Insert or update item_type
row = await conn.fetchrow(
"""
- INSERT INTO item_types (content_hash, actor_id, type, path, description, source_type, source_url, source_note)
+ INSERT INTO item_types (cid, actor_id, type, path, description, source_type, source_url, source_note)
VALUES ($1, $2, $3, $4, $5, $6, $7, $8)
- ON CONFLICT (content_hash, actor_id, type, path) DO UPDATE SET
+ ON CONFLICT (cid, actor_id, type, path) DO UPDATE SET
description = COALESCE($5, item_types.description),
source_type = COALESCE($6, item_types.source_type),
source_url = COALESCE($7, item_types.source_url),
source_note = COALESCE($8, item_types.source_note)
- RETURNING id, content_hash, actor_id, type, path, description, source_type, source_url, source_note, pinned, created_at
+ RETURNING id, cid, actor_id, type, path, description, source_type, source_url, source_note, pinned, created_at
""",
- content_hash, actor_id, item_type, path, description, source_type, source_url, source_note
+ cid, actor_id, item_type, path, description, source_type, source_url, source_note
)
return dict(row)
-async def get_item_types(content_hash: str, actor_id: Optional[str] = None) -> List[dict]:
+async def get_item_types(cid: str, actor_id: Optional[str] = None) -> List[dict]:
"""Get types for a cache item, optionally filtered by user."""
async with pool.acquire() as conn:
if actor_id:
rows = await conn.fetch(
"""
- SELECT id, content_hash, actor_id, type, path, description, source_type, source_url, source_note, pinned, created_at
+ SELECT id, cid, actor_id, type, path, description, source_type, source_url, source_note, pinned, created_at
FROM item_types
- WHERE content_hash = $1 AND actor_id = $2
+ WHERE cid = $1 AND actor_id = $2
ORDER BY created_at
""",
- content_hash, actor_id
+ cid, actor_id
)
else:
rows = await conn.fetch(
"""
- SELECT id, content_hash, actor_id, type, path, description, source_type, source_url, source_note, pinned, created_at
+ SELECT id, cid, actor_id, type, path, description, source_type, source_url, source_note, pinned, created_at
FROM item_types
- WHERE content_hash = $1
+ WHERE cid = $1
ORDER BY created_at
""",
- content_hash
+ cid
)
return [dict(row) for row in rows]
-async def get_item_type(content_hash: str, actor_id: str, item_type: str, path: Optional[str] = None) -> Optional[dict]:
+async def get_item_type(cid: str, actor_id: str, item_type: str, path: Optional[str] = None) -> Optional[dict]:
"""Get a specific type for a cache item and user."""
async with pool.acquire() as conn:
if path is None:
row = await conn.fetchrow(
"""
- SELECT id, content_hash, actor_id, type, path, description, source_type, source_url, source_note, pinned, created_at
+ SELECT id, cid, actor_id, type, path, description, source_type, source_url, source_note, pinned, created_at
FROM item_types
- WHERE content_hash = $1 AND actor_id = $2 AND type = $3 AND path IS NULL
+ WHERE cid = $1 AND actor_id = $2 AND type = $3 AND path IS NULL
""",
- content_hash, actor_id, item_type
+ cid, actor_id, item_type
)
else:
row = await conn.fetchrow(
"""
- SELECT id, content_hash, actor_id, type, path, description, source_type, source_url, source_note, pinned, created_at
+ SELECT id, cid, actor_id, type, path, description, source_type, source_url, source_note, pinned, created_at
FROM item_types
- WHERE content_hash = $1 AND actor_id = $2 AND type = $3 AND path = $4
+ WHERE cid = $1 AND actor_id = $2 AND type = $3 AND path = $4
""",
- content_hash, actor_id, item_type, path
+ cid, actor_id, item_type, path
)
return dict(row) if row else None
@@ -340,18 +340,18 @@ async def update_item_type(
return result == "UPDATE 1"
-async def delete_item_type(content_hash: str, actor_id: str, item_type: str, path: Optional[str] = None) -> bool:
+async def delete_item_type(cid: str, actor_id: str, item_type: str, path: Optional[str] = None) -> bool:
"""Delete a specific type from a cache item for a user."""
async with pool.acquire() as conn:
if path is None:
result = await conn.execute(
- "DELETE FROM item_types WHERE content_hash = $1 AND actor_id = $2 AND type = $3 AND path IS NULL",
- content_hash, actor_id, item_type
+ "DELETE FROM item_types WHERE cid = $1 AND actor_id = $2 AND type = $3 AND path IS NULL",
+ cid, actor_id, item_type
)
else:
result = await conn.execute(
- "DELETE FROM item_types WHERE content_hash = $1 AND actor_id = $2 AND type = $3 AND path = $4",
- content_hash, actor_id, item_type, path
+ "DELETE FROM item_types WHERE cid = $1 AND actor_id = $2 AND type = $3 AND path = $4",
+ cid, actor_id, item_type, path
)
return result == "DELETE 1"
@@ -362,11 +362,11 @@ async def list_items_by_type(item_type: str, actor_id: Optional[str] = None, lim
if actor_id:
rows = await conn.fetch(
"""
- SELECT it.id, it.content_hash, it.actor_id, it.type, it.path, it.description,
+ SELECT it.id, it.cid, it.actor_id, it.type, it.path, it.description,
it.source_type, it.source_url, it.source_note, it.pinned, it.created_at,
ci.ipfs_cid
FROM item_types it
- JOIN cache_items ci ON it.content_hash = ci.content_hash
+ JOIN cache_items ci ON it.cid = ci.cid
WHERE it.type = $1 AND it.actor_id = $2
ORDER BY it.created_at DESC
LIMIT $3 OFFSET $4
@@ -376,11 +376,11 @@ async def list_items_by_type(item_type: str, actor_id: Optional[str] = None, lim
else:
rows = await conn.fetch(
"""
- SELECT it.id, it.content_hash, it.actor_id, it.type, it.path, it.description,
+ SELECT it.id, it.cid, it.actor_id, it.type, it.path, it.description,
it.source_type, it.source_url, it.source_note, it.pinned, it.created_at,
ci.ipfs_cid
FROM item_types it
- JOIN cache_items ci ON it.content_hash = ci.content_hash
+ JOIN cache_items ci ON it.cid = ci.cid
WHERE it.type = $1
ORDER BY it.created_at DESC
LIMIT $2 OFFSET $3
@@ -396,11 +396,11 @@ async def get_item_by_path(item_type: str, path: str, actor_id: Optional[str] =
if actor_id:
row = await conn.fetchrow(
"""
- SELECT it.id, it.content_hash, it.actor_id, it.type, it.path, it.description,
+ SELECT it.id, it.cid, it.actor_id, it.type, it.path, it.description,
it.source_type, it.source_url, it.source_note, it.pinned, it.created_at,
ci.ipfs_cid
FROM item_types it
- JOIN cache_items ci ON it.content_hash = ci.content_hash
+ JOIN cache_items ci ON it.cid = ci.cid
WHERE it.type = $1 AND it.path = $2 AND it.actor_id = $3
""",
item_type, path, actor_id
@@ -408,11 +408,11 @@ async def get_item_by_path(item_type: str, path: str, actor_id: Optional[str] =
else:
row = await conn.fetchrow(
"""
- SELECT it.id, it.content_hash, it.actor_id, it.type, it.path, it.description,
+ SELECT it.id, it.cid, it.actor_id, it.type, it.path, it.description,
it.source_type, it.source_url, it.source_note, it.pinned, it.created_at,
ci.ipfs_cid
FROM item_types it
- JOIN cache_items ci ON it.content_hash = ci.content_hash
+ JOIN cache_items ci ON it.cid = ci.cid
WHERE it.type = $1 AND it.path = $2
""",
item_type, path
@@ -480,7 +480,7 @@ async def get_pin_reasons(item_type_id: int) -> List[dict]:
return [dict(row) for row in rows]
-async def is_item_pinned(content_hash: str, item_type: Optional[str] = None) -> tuple[bool, List[str]]:
+async def is_item_pinned(cid: str, item_type: Optional[str] = None) -> tuple[bool, List[str]]:
"""Check if any type of a cache item is pinned. Returns (is_pinned, reasons)."""
async with pool.acquire() as conn:
if item_type:
@@ -489,9 +489,9 @@ async def is_item_pinned(content_hash: str, item_type: Optional[str] = None) ->
SELECT pr.reason
FROM pin_reasons pr
JOIN item_types it ON pr.item_type_id = it.id
- WHERE it.content_hash = $1 AND it.type = $2 AND it.pinned = TRUE
+ WHERE it.cid = $1 AND it.type = $2 AND it.pinned = TRUE
""",
- content_hash, item_type
+ cid, item_type
)
else:
rows = await conn.fetch(
@@ -499,9 +499,9 @@ async def is_item_pinned(content_hash: str, item_type: Optional[str] = None) ->
SELECT pr.reason
FROM pin_reasons pr
JOIN item_types it ON pr.item_type_id = it.id
- WHERE it.content_hash = $1 AND it.pinned = TRUE
+ WHERE it.cid = $1 AND it.pinned = TRUE
""",
- content_hash
+ cid
)
reasons = [row["reason"] for row in rows]
return len(reasons) > 0, reasons
@@ -510,7 +510,7 @@ async def is_item_pinned(content_hash: str, item_type: Optional[str] = None) ->
# ============ L2 Shares ============
async def add_l2_share(
- content_hash: str,
+ cid: str,
actor_id: str,
l2_server: str,
asset_name: str,
@@ -520,85 +520,85 @@ async def add_l2_share(
async with pool.acquire() as conn:
row = await conn.fetchrow(
"""
- INSERT INTO l2_shares (content_hash, actor_id, l2_server, asset_name, content_type, last_synced_at)
+ INSERT INTO l2_shares (cid, actor_id, l2_server, asset_name, content_type, last_synced_at)
VALUES ($1, $2, $3, $4, $5, NOW())
- ON CONFLICT (content_hash, actor_id, l2_server, content_type) DO UPDATE SET
+ ON CONFLICT (cid, actor_id, l2_server, content_type) DO UPDATE SET
asset_name = $4,
last_synced_at = NOW()
- RETURNING id, content_hash, actor_id, l2_server, asset_name, content_type, published_at, last_synced_at
+ RETURNING id, cid, actor_id, l2_server, asset_name, content_type, published_at, last_synced_at
""",
- content_hash, actor_id, l2_server, asset_name, content_type
+ cid, actor_id, l2_server, asset_name, content_type
)
return dict(row)
-async def get_l2_shares(content_hash: str, actor_id: Optional[str] = None) -> List[dict]:
+async def get_l2_shares(cid: str, actor_id: Optional[str] = None) -> List[dict]:
"""Get L2 shares for a cache item, optionally filtered by user."""
async with pool.acquire() as conn:
if actor_id:
rows = await conn.fetch(
"""
- SELECT id, content_hash, actor_id, l2_server, asset_name, activity_id, content_type, published_at, last_synced_at
+ SELECT id, cid, actor_id, l2_server, asset_name, activity_id, content_type, published_at, last_synced_at
FROM l2_shares
- WHERE content_hash = $1 AND actor_id = $2
+ WHERE cid = $1 AND actor_id = $2
ORDER BY published_at
""",
- content_hash, actor_id
+ cid, actor_id
)
else:
rows = await conn.fetch(
"""
- SELECT id, content_hash, actor_id, l2_server, asset_name, activity_id, content_type, published_at, last_synced_at
+ SELECT id, cid, actor_id, l2_server, asset_name, activity_id, content_type, published_at, last_synced_at
FROM l2_shares
- WHERE content_hash = $1
+ WHERE cid = $1
ORDER BY published_at
""",
- content_hash
+ cid
)
return [dict(row) for row in rows]
-async def delete_l2_share(content_hash: str, actor_id: str, l2_server: str, content_type: str) -> bool:
+async def delete_l2_share(cid: str, actor_id: str, l2_server: str, content_type: str) -> bool:
"""Delete an L2 share for a user."""
async with pool.acquire() as conn:
result = await conn.execute(
- "DELETE FROM l2_shares WHERE content_hash = $1 AND actor_id = $2 AND l2_server = $3 AND content_type = $4",
- content_hash, actor_id, l2_server, content_type
+ "DELETE FROM l2_shares WHERE cid = $1 AND actor_id = $2 AND l2_server = $3 AND content_type = $4",
+ cid, actor_id, l2_server, content_type
)
return result == "DELETE 1"
# ============ Cache Item Cleanup ============
-async def has_remaining_references(content_hash: str) -> bool:
+async def has_remaining_references(cid: str) -> bool:
"""Check if a cache item has any remaining item_types or l2_shares."""
async with pool.acquire() as conn:
item_types_count = await conn.fetchval(
- "SELECT COUNT(*) FROM item_types WHERE content_hash = $1",
- content_hash
+ "SELECT COUNT(*) FROM item_types WHERE cid = $1",
+ cid
)
if item_types_count > 0:
return True
l2_shares_count = await conn.fetchval(
- "SELECT COUNT(*) FROM l2_shares WHERE content_hash = $1",
- content_hash
+ "SELECT COUNT(*) FROM l2_shares WHERE cid = $1",
+ cid
)
return l2_shares_count > 0
-async def cleanup_orphaned_cache_item(content_hash: str) -> bool:
+async def cleanup_orphaned_cache_item(cid: str) -> bool:
"""Delete a cache item if it has no remaining references. Returns True if deleted."""
async with pool.acquire() as conn:
# Only delete if no item_types or l2_shares reference it
result = await conn.execute(
"""
DELETE FROM cache_items
- WHERE content_hash = $1
- AND NOT EXISTS (SELECT 1 FROM item_types WHERE content_hash = $1)
- AND NOT EXISTS (SELECT 1 FROM l2_shares WHERE content_hash = $1)
+ WHERE cid = $1
+ AND NOT EXISTS (SELECT 1 FROM item_types WHERE cid = $1)
+ AND NOT EXISTS (SELECT 1 FROM l2_shares WHERE cid = $1)
""",
- content_hash
+ cid
)
return result == "DELETE 1"
@@ -610,7 +610,7 @@ import json as _json
async def save_item_metadata(
- content_hash: str,
+ cid: str,
actor_id: str,
item_type: str = "media",
filename: Optional[str] = None,
@@ -643,16 +643,16 @@ async def save_item_metadata(
async with pool.acquire() as conn:
# Ensure cache_item exists
await conn.execute(
- "INSERT INTO cache_items (content_hash) VALUES ($1) ON CONFLICT DO NOTHING",
- content_hash
+ "INSERT INTO cache_items (cid) VALUES ($1) ON CONFLICT DO NOTHING",
+ cid
)
# Upsert item_type
row = await conn.fetchrow(
"""
- INSERT INTO item_types (content_hash, actor_id, type, description, source_type, source_url, source_note, pinned, filename, metadata)
+ INSERT INTO item_types (cid, actor_id, type, description, source_type, source_url, source_note, pinned, filename, metadata)
VALUES ($1, $2, $3, $4, $5, $6, $7, $8, $9, $10)
- ON CONFLICT (content_hash, actor_id, type, path) DO UPDATE SET
+ ON CONFLICT (cid, actor_id, type, path) DO UPDATE SET
description = COALESCE(EXCLUDED.description, item_types.description),
source_type = COALESCE(EXCLUDED.source_type, item_types.source_type),
source_url = COALESCE(EXCLUDED.source_url, item_types.source_url),
@@ -660,9 +660,9 @@ async def save_item_metadata(
pinned = EXCLUDED.pinned,
filename = COALESCE(EXCLUDED.filename, item_types.filename),
metadata = item_types.metadata || EXCLUDED.metadata
- RETURNING id, content_hash, actor_id, type, path, description, source_type, source_url, source_note, pinned, filename, metadata, created_at
+ RETURNING id, cid, actor_id, type, path, description, source_type, source_url, source_note, pinned, filename, metadata, created_at
""",
- content_hash, actor_id, item_type, description, source_type, source_url, source_note, pinned, filename, _json.dumps(metadata)
+ cid, actor_id, item_type, description, source_type, source_url, source_note, pinned, filename, _json.dumps(metadata)
)
item_type_id = row["id"]
@@ -719,7 +719,7 @@ async def save_item_metadata(
return result
-async def load_item_metadata(content_hash: str, actor_id: Optional[str] = None) -> dict:
+async def load_item_metadata(cid: str, actor_id: Optional[str] = None) -> dict:
"""
Load item metadata from the database.
@@ -731,8 +731,8 @@ async def load_item_metadata(content_hash: str, actor_id: Optional[str] = None)
async with pool.acquire() as conn:
# Get cache item
cache_item = await conn.fetchrow(
- "SELECT content_hash, ipfs_cid, created_at FROM cache_items WHERE content_hash = $1",
- content_hash
+ "SELECT cid, ipfs_cid, created_at FROM cache_items WHERE cid = $1",
+ cid
)
if not cache_item:
@@ -743,19 +743,19 @@ async def load_item_metadata(content_hash: str, actor_id: Optional[str] = None)
item_types = await conn.fetch(
"""
SELECT id, actor_id, type, path, description, source_type, source_url, source_note, pinned, filename, metadata, created_at
- FROM item_types WHERE content_hash = $1 AND actor_id = $2
+ FROM item_types WHERE cid = $1 AND actor_id = $2
ORDER BY created_at
""",
- content_hash, actor_id
+ cid, actor_id
)
else:
item_types = await conn.fetch(
"""
SELECT id, actor_id, type, path, description, source_type, source_url, source_note, pinned, filename, metadata, created_at
- FROM item_types WHERE content_hash = $1
+ FROM item_types WHERE cid = $1
ORDER BY created_at
""",
- content_hash
+ cid
)
if not item_types:
@@ -807,17 +807,17 @@ async def load_item_metadata(content_hash: str, actor_id: Optional[str] = None)
shares = await conn.fetch(
"""
SELECT l2_server, asset_name, activity_id, content_type, published_at, last_synced_at
- FROM l2_shares WHERE content_hash = $1 AND actor_id = $2
+ FROM l2_shares WHERE cid = $1 AND actor_id = $2
""",
- content_hash, actor_id
+ cid, actor_id
)
else:
shares = await conn.fetch(
"""
SELECT l2_server, asset_name, activity_id, content_type, published_at, last_synced_at
- FROM l2_shares WHERE content_hash = $1
+ FROM l2_shares WHERE cid = $1
""",
- content_hash
+ cid
)
if shares:
@@ -845,7 +845,7 @@ async def load_item_metadata(content_hash: str, actor_id: Optional[str] = None)
async def update_item_metadata(
- content_hash: str,
+ cid: str,
actor_id: str,
item_type: str = "media",
**updates
@@ -880,15 +880,15 @@ async def update_item_metadata(
existing = await conn.fetchrow(
"""
SELECT id, metadata FROM item_types
- WHERE content_hash = $1 AND actor_id = $2 AND type = $3 AND path IS NULL
+ WHERE cid = $1 AND actor_id = $2 AND type = $3 AND path IS NULL
""",
- content_hash, actor_id, item_type
+ cid, actor_id, item_type
)
if not existing:
# Create new entry
return await save_item_metadata(
- content_hash, actor_id, item_type,
+ cid, actor_id, item_type,
filename=filename, description=description,
source_type=source_type, source_url=source_url, source_note=source_note,
pinned=pinned or False, pin_reason=pin_reason,
@@ -898,7 +898,7 @@ async def update_item_metadata(
# Build update query dynamically
set_parts = []
- params = [content_hash, actor_id, item_type]
+ params = [cid, actor_id, item_type]
param_idx = 4
if description is not None:
@@ -949,7 +949,7 @@ async def update_item_metadata(
if set_parts:
query = f"""
UPDATE item_types SET {', '.join(set_parts)}
- WHERE content_hash = $1 AND actor_id = $2 AND type = $3 AND path IS NULL
+ WHERE cid = $1 AND actor_id = $2 AND type = $3 AND path IS NULL
"""
await conn.execute(query, *params)
@@ -964,11 +964,11 @@ async def update_item_metadata(
existing["id"], pin_reason
)
- return await load_item_metadata(content_hash, actor_id)
+ return await load_item_metadata(cid, actor_id)
async def save_l2_share(
- content_hash: str,
+ cid: str,
actor_id: str,
l2_server: str,
asset_name: str,
@@ -979,15 +979,15 @@ async def save_l2_share(
async with pool.acquire() as conn:
row = await conn.fetchrow(
"""
- INSERT INTO l2_shares (content_hash, actor_id, l2_server, asset_name, activity_id, content_type, last_synced_at)
+ INSERT INTO l2_shares (cid, actor_id, l2_server, asset_name, activity_id, content_type, last_synced_at)
VALUES ($1, $2, $3, $4, $5, $6, NOW())
- ON CONFLICT (content_hash, actor_id, l2_server, content_type) DO UPDATE SET
+ ON CONFLICT (cid, actor_id, l2_server, content_type) DO UPDATE SET
asset_name = EXCLUDED.asset_name,
activity_id = COALESCE(EXCLUDED.activity_id, l2_shares.activity_id),
last_synced_at = NOW()
RETURNING l2_server, asset_name, activity_id, content_type, published_at, last_synced_at
""",
- content_hash, actor_id, l2_server, asset_name, activity_id, content_type
+ cid, actor_id, l2_server, asset_name, activity_id, content_type
)
return {
"l2_server": row["l2_server"],
@@ -1000,19 +1000,19 @@ async def save_l2_share(
async def get_user_items(actor_id: str, item_type: Optional[str] = None, limit: int = 100, offset: int = 0) -> List[dict]:
- """Get all items for a user, optionally filtered by type. Deduplicates by content_hash."""
+ """Get all items for a user, optionally filtered by type. Deduplicates by cid."""
async with pool.acquire() as conn:
if item_type:
rows = await conn.fetch(
"""
SELECT * FROM (
- SELECT DISTINCT ON (it.content_hash)
- it.content_hash, it.type, it.description, it.filename, it.pinned, it.created_at,
+ SELECT DISTINCT ON (it.cid)
+ it.cid, it.type, it.description, it.filename, it.pinned, it.created_at,
ci.ipfs_cid
FROM item_types it
- JOIN cache_items ci ON it.content_hash = ci.content_hash
+ JOIN cache_items ci ON it.cid = ci.cid
WHERE it.actor_id = $1 AND it.type = $2
- ORDER BY it.content_hash, it.created_at DESC
+ ORDER BY it.cid, it.created_at DESC
) deduped
ORDER BY created_at DESC
LIMIT $3 OFFSET $4
@@ -1023,13 +1023,13 @@ async def get_user_items(actor_id: str, item_type: Optional[str] = None, limit:
rows = await conn.fetch(
"""
SELECT * FROM (
- SELECT DISTINCT ON (it.content_hash)
- it.content_hash, it.type, it.description, it.filename, it.pinned, it.created_at,
+ SELECT DISTINCT ON (it.cid)
+ it.cid, it.type, it.description, it.filename, it.pinned, it.created_at,
ci.ipfs_cid
FROM item_types it
- JOIN cache_items ci ON it.content_hash = ci.content_hash
+ JOIN cache_items ci ON it.cid = ci.cid
WHERE it.actor_id = $1
- ORDER BY it.content_hash, it.created_at DESC
+ ORDER BY it.cid, it.created_at DESC
) deduped
ORDER BY created_at DESC
LIMIT $2 OFFSET $3
@@ -1039,7 +1039,7 @@ async def get_user_items(actor_id: str, item_type: Optional[str] = None, limit:
return [
{
- "content_hash": r["content_hash"],
+ "cid": r["cid"],
"type": r["type"],
"description": r["description"],
"filename": r["filename"],
@@ -1052,16 +1052,16 @@ async def get_user_items(actor_id: str, item_type: Optional[str] = None, limit:
async def count_user_items(actor_id: str, item_type: Optional[str] = None) -> int:
- """Count unique items (by content_hash) for a user."""
+ """Count unique items (by cid) for a user."""
async with pool.acquire() as conn:
if item_type:
return await conn.fetchval(
- "SELECT COUNT(DISTINCT content_hash) FROM item_types WHERE actor_id = $1 AND type = $2",
+ "SELECT COUNT(DISTINCT cid) FROM item_types WHERE actor_id = $1 AND type = $2",
actor_id, item_type
)
else:
return await conn.fetchval(
- "SELECT COUNT(DISTINCT content_hash) FROM item_types WHERE actor_id = $1",
+ "SELECT COUNT(DISTINCT cid) FROM item_types WHERE actor_id = $1",
actor_id
)
@@ -1073,7 +1073,7 @@ async def get_run_cache(run_id: str) -> Optional[dict]:
async with pool.acquire() as conn:
row = await conn.fetchrow(
"""
- SELECT run_id, output_hash, ipfs_cid, provenance_cid, recipe, inputs, actor_id, created_at
+ SELECT run_id, output_cid, ipfs_cid, provenance_cid, recipe, inputs, actor_id, created_at
FROM run_cache WHERE run_id = $1
""",
run_id
@@ -1081,7 +1081,7 @@ async def get_run_cache(run_id: str) -> Optional[dict]:
if row:
return {
"run_id": row["run_id"],
- "output_hash": row["output_hash"],
+ "output_cid": row["output_cid"],
"ipfs_cid": row["ipfs_cid"],
"provenance_cid": row["provenance_cid"],
"recipe": row["recipe"],
@@ -1094,7 +1094,7 @@ async def get_run_cache(run_id: str) -> Optional[dict]:
async def save_run_cache(
run_id: str,
- output_hash: str,
+ output_cid: str,
recipe: str,
inputs: List[str],
ipfs_cid: Optional[str] = None,
@@ -1105,19 +1105,19 @@ async def save_run_cache(
async with pool.acquire() as conn:
row = await conn.fetchrow(
"""
- INSERT INTO run_cache (run_id, output_hash, ipfs_cid, provenance_cid, recipe, inputs, actor_id)
+ INSERT INTO run_cache (run_id, output_cid, ipfs_cid, provenance_cid, recipe, inputs, actor_id)
VALUES ($1, $2, $3, $4, $5, $6, $7)
ON CONFLICT (run_id) DO UPDATE SET
- output_hash = EXCLUDED.output_hash,
+ output_cid = EXCLUDED.output_cid,
ipfs_cid = COALESCE(EXCLUDED.ipfs_cid, run_cache.ipfs_cid),
provenance_cid = COALESCE(EXCLUDED.provenance_cid, run_cache.provenance_cid)
- RETURNING run_id, output_hash, ipfs_cid, provenance_cid, recipe, inputs, actor_id, created_at
+ RETURNING run_id, output_cid, ipfs_cid, provenance_cid, recipe, inputs, actor_id, created_at
""",
- run_id, output_hash, ipfs_cid, provenance_cid, recipe, _json.dumps(inputs), actor_id
+ run_id, output_cid, ipfs_cid, provenance_cid, recipe, _json.dumps(inputs), actor_id
)
return {
"run_id": row["run_id"],
- "output_hash": row["output_hash"],
+ "output_cid": row["output_cid"],
"ipfs_cid": row["ipfs_cid"],
"provenance_cid": row["provenance_cid"],
"recipe": row["recipe"],
@@ -1127,20 +1127,20 @@ async def save_run_cache(
}
-async def get_run_by_output(output_hash: str) -> Optional[dict]:
+async def get_run_by_output(output_cid: str) -> Optional[dict]:
"""Get run cache entry by output hash."""
async with pool.acquire() as conn:
row = await conn.fetchrow(
"""
- SELECT run_id, output_hash, ipfs_cid, provenance_cid, recipe, inputs, actor_id, created_at
- FROM run_cache WHERE output_hash = $1
+ SELECT run_id, output_cid, ipfs_cid, provenance_cid, recipe, inputs, actor_id, created_at
+ FROM run_cache WHERE output_cid = $1
""",
- output_hash
+ output_cid
)
if row:
return {
"run_id": row["run_id"],
- "output_hash": row["output_hash"],
+ "output_cid": row["output_cid"],
"ipfs_cid": row["ipfs_cid"],
"provenance_cid": row["provenance_cid"],
"recipe": row["recipe"],
@@ -1173,7 +1173,7 @@ async def list_runs_by_actor(actor_id: str, offset: int = 0, limit: int = 20) ->
async with pool.acquire() as conn:
rows = await conn.fetch(
"""
- SELECT run_id, output_hash, ipfs_cid, provenance_cid, recipe, inputs, actor_id, created_at
+ SELECT run_id, output_cid, ipfs_cid, provenance_cid, recipe, inputs, actor_id, created_at
FROM run_cache
WHERE actor_id = $1
ORDER BY created_at DESC
@@ -1184,7 +1184,7 @@ async def list_runs_by_actor(actor_id: str, offset: int = 0, limit: int = 20) ->
return [
{
"run_id": row["run_id"],
- "output_hash": row["output_hash"],
+ "output_cid": row["output_cid"],
"ipfs_cid": row["ipfs_cid"],
"provenance_cid": row["provenance_cid"],
"recipe": row["recipe"],
@@ -1348,7 +1348,7 @@ async def get_all_active_storage() -> List[dict]:
async def add_storage_pin(
- content_hash: str,
+ cid: str,
storage_id: int,
ipfs_cid: Optional[str],
pin_type: str,
@@ -1358,40 +1358,40 @@ async def add_storage_pin(
async with pool.acquire() as conn:
try:
row = await conn.fetchrow(
- """INSERT INTO storage_pins (content_hash, storage_id, ipfs_cid, pin_type, size_bytes)
+ """INSERT INTO storage_pins (cid, storage_id, ipfs_cid, pin_type, size_bytes)
VALUES ($1, $2, $3, $4, $5)
- ON CONFLICT (content_hash, storage_id) DO UPDATE SET
+ ON CONFLICT (cid, storage_id) DO UPDATE SET
ipfs_cid = EXCLUDED.ipfs_cid,
pin_type = EXCLUDED.pin_type,
size_bytes = EXCLUDED.size_bytes,
pinned_at = NOW()
RETURNING id""",
- content_hash, storage_id, ipfs_cid, pin_type, size_bytes
+ cid, storage_id, ipfs_cid, pin_type, size_bytes
)
return row["id"] if row else None
except Exception:
return None
-async def remove_storage_pin(content_hash: str, storage_id: int) -> bool:
+async def remove_storage_pin(cid: str, storage_id: int) -> bool:
"""Remove a pin record."""
async with pool.acquire() as conn:
result = await conn.execute(
- "DELETE FROM storage_pins WHERE content_hash = $1 AND storage_id = $2",
- content_hash, storage_id
+ "DELETE FROM storage_pins WHERE cid = $1 AND storage_id = $2",
+ cid, storage_id
)
return "DELETE 1" in result
-async def get_pins_for_content(content_hash: str) -> List[dict]:
+async def get_pins_for_content(cid: str) -> List[dict]:
"""Get all storage locations where content is pinned."""
async with pool.acquire() as conn:
rows = await conn.fetch(
"""SELECT sp.*, sb.provider_type, sb.provider_name, sb.actor_id
FROM storage_pins sp
JOIN storage_backends sb ON sp.storage_id = sb.id
- WHERE sp.content_hash = $1""",
- content_hash
+ WHERE sp.cid = $1""",
+ cid
)
return [dict(row) for row in rows]
diff --git a/legacy_tasks.py b/legacy_tasks.py
index abdc3b8..986034c 100644
--- a/legacy_tasks.py
+++ b/legacy_tasks.py
@@ -120,21 +120,21 @@ class SourceExecutor(Executor):
"""Executor for SOURCE nodes - loads content from cache by hash."""
def execute(self, config: Dict, inputs: List[Path], output_path: Path) -> Path:
- # Source nodes load from cache by content_hash
- content_hash = config.get("content_hash")
- if not content_hash:
- raise ValueError("SOURCE node requires content_hash in config")
+ # Source nodes load from cache by cid
+ cid = config.get("cid")
+ if not cid:
+ raise ValueError("SOURCE node requires cid in config")
# Look up in cache
- source_path = CACHE_DIR / content_hash
+ source_path = CACHE_DIR / cid
if not source_path.exists():
# Try nodes directory
from cache_manager import get_cache_manager
cache_manager = get_cache_manager()
- source_path = cache_manager.get_by_content_hash(content_hash)
+ source_path = cache_manager.get_by_cid(cid)
if not source_path or not source_path.exists():
- raise ValueError(f"Source content not in cache: {content_hash}")
+ raise ValueError(f"Source content not in cache: {cid}")
# For source nodes, we just return the path (no transformation)
# The engine will use this as input to subsequent nodes
@@ -186,7 +186,7 @@ def render_effect(self, input_hash: str, effect_name: str, output_name: str) ->
# Input comes from cache by hash (supports both legacy and new cache locations)
cache_manager = get_cache_manager()
- input_path = cache_manager.get_by_content_hash(input_hash)
+ input_path = cache_manager.get_by_cid(input_hash)
if not input_path or not input_path.exists():
raise ValueError(f"Input not in cache: {input_hash}")
@@ -214,9 +214,9 @@ def render_effect(self, input_hash: str, effect_name: str, output_name: str) ->
raise ValueError(f"Unknown effect: {effect_name}")
# Verify output
- output_hash = file_hash(result)
- if output_hash != expected_hash:
- raise ValueError(f"Output hash mismatch: expected {expected_hash}, got {output_hash}")
+ output_cid = file_hash(result)
+ if output_cid != expected_hash:
+ raise ValueError(f"Output hash mismatch: expected {expected_hash}, got {output_cid}")
# Build effect info based on source
if effect_name == "identity":
@@ -224,7 +224,7 @@ def render_effect(self, input_hash: str, effect_name: str, output_name: str) ->
artdag_commit = get_artdag_commit()
effect_info = {
"name": f"effect:{effect_name}",
- "content_hash": REGISTRY[f"effect:{effect_name}"]["hash"],
+ "cid": REGISTRY[f"effect:{effect_name}"]["hash"],
"repo": "github",
"repo_commit": artdag_commit,
"repo_url": f"https://github.com/gilesbradshaw/art-dag/blob/{artdag_commit}/artdag/nodes/effect.py"
@@ -234,7 +234,7 @@ def render_effect(self, input_hash: str, effect_name: str, output_name: str) ->
effects_commit = get_effects_commit()
effect_info = {
"name": f"effect:{effect_name}",
- "content_hash": REGISTRY[f"effect:{effect_name}"]["hash"],
+ "cid": REGISTRY[f"effect:{effect_name}"]["hash"],
"repo": "rose-ash",
"repo_commit": effects_commit,
"repo_url": f"https://git.rose-ash.com/art-dag/effects/src/commit/{effects_commit}/{effect_name}"
@@ -247,15 +247,15 @@ def render_effect(self, input_hash: str, effect_name: str, output_name: str) ->
"rendered_by": "@giles@artdag.rose-ash.com",
"output": {
"name": output_name,
- "content_hash": output_hash,
+ "cid": output_cid,
},
"inputs": [
- {"content_hash": input_hash}
+ {"cid": input_hash}
],
"effects": [effect_info],
"infrastructure": {
- "software": {"name": "infra:artdag", "content_hash": REGISTRY["infra:artdag"]["hash"]},
- "hardware": {"name": "infra:giles-hp", "content_hash": REGISTRY["infra:giles-hp"]["hash"]}
+ "software": {"name": "infra:artdag", "cid": REGISTRY["infra:artdag"]["hash"]},
+ "hardware": {"name": "infra:giles-hp", "cid": REGISTRY["infra:giles-hp"]["hash"]}
}
}
@@ -329,10 +329,10 @@ def execute_dag(self, dag_json: str, run_id: str = None) -> dict:
if not result.success:
raise RuntimeError(f"DAG execution failed: {result.error}")
- # Index all node outputs by content_hash and upload to IPFS
+ # Index all node outputs by cid and upload to IPFS
cache_manager = get_cache_manager()
- output_hash = None
- node_hashes = {} # node_id -> content_hash mapping
+ output_cid = None
+ node_hashes = {} # node_id -> cid mapping
node_ipfs_cids = {} # node_id -> ipfs_cid mapping
# Process all node results (intermediates + output)
@@ -341,9 +341,9 @@ def execute_dag(self, dag_json: str, run_id: str = None) -> dict:
node = dag.nodes.get(node_id)
# Skip SOURCE nodes - they're already in cache
if node and (node.node_type == NodeType.SOURCE or str(node.node_type) == "SOURCE"):
- content_hash = node.config.get("content_hash")
- if content_hash:
- node_hashes[node_id] = content_hash
+ cid = node.config.get("cid")
+ if cid:
+ node_hashes[node_id] = cid
continue
# Determine node type for cache metadata
@@ -353,20 +353,20 @@ def execute_dag(self, dag_json: str, run_id: str = None) -> dict:
else:
cache_node_type = "dag_intermediate"
- # Store in cache_manager (indexes by content_hash, uploads to IPFS)
+ # Store in cache_manager (indexes by cid, uploads to IPFS)
cached, ipfs_cid = cache_manager.put(
Path(node_path),
node_type=cache_node_type,
node_id=node_id,
)
- node_hashes[node_id] = cached.content_hash
+ node_hashes[node_id] = cached.cid
if ipfs_cid:
node_ipfs_cids[node_id] = ipfs_cid
- logger.info(f"Cached node {node_id}: {cached.content_hash[:16]}... -> {ipfs_cid or 'no IPFS'}")
+ logger.info(f"Cached node {node_id}: {cached.cid[:16]}... -> {ipfs_cid or 'no IPFS'}")
# Get output hash from the output node
if result.output_path and result.output_path.exists():
- output_hash = file_hash(result.output_path)
+ output_cid = file_hash(result.output_path)
output_ipfs_cid = node_ipfs_cids.get(dag.output_id)
# Store output in database (for L2 to query IPFS CID)
@@ -376,14 +376,14 @@ def execute_dag(self, dag_json: str, run_id: str = None) -> dict:
async def save_to_db():
if database.pool is None:
await database.init_db()
- await database.create_cache_item(output_hash, output_ipfs_cid)
+ await database.create_cache_item(output_cid, output_ipfs_cid)
# Also save the run result
if run_id:
input_hashes_for_db = [
- node.config.get("content_hash")
+ node.config.get("cid")
for node in dag.nodes.values()
if (node.node_type == NodeType.SOURCE or str(node.node_type) == "SOURCE")
- and node.config.get("content_hash")
+ and node.config.get("cid")
]
# Get actor_id and recipe from pending_runs (saved when run started)
actor_id = None
@@ -395,7 +395,7 @@ def execute_dag(self, dag_json: str, run_id: str = None) -> dict:
await database.save_run_cache(
run_id=run_id,
- output_hash=output_hash,
+ output_cid=output_cid,
recipe=recipe_name,
inputs=input_hashes_for_db,
ipfs_cid=output_ipfs_cid,
@@ -405,7 +405,7 @@ def execute_dag(self, dag_json: str, run_id: str = None) -> dict:
# Save output as media for the user
if actor_id:
await database.save_item_metadata(
- content_hash=output_hash,
+ cid=output_cid,
actor_id=actor_id,
item_type="media",
description=f"Output from recipe: {recipe_name}",
@@ -431,9 +431,9 @@ def execute_dag(self, dag_json: str, run_id: str = None) -> dict:
intermediate_hashes = []
for node_id, node in dag.nodes.items():
if node.node_type == NodeType.SOURCE or str(node.node_type) == "SOURCE":
- content_hash = node.config.get("content_hash")
- if content_hash:
- input_hashes.append(content_hash)
+ cid = node.config.get("cid")
+ if cid:
+ input_hashes.append(cid)
elif node_id != dag.output_id and node_id in node_hashes:
intermediate_hashes.append(node_hashes[node_id])
@@ -441,9 +441,9 @@ def execute_dag(self, dag_json: str, run_id: str = None) -> dict:
from artdag.activities import Activity
from datetime import datetime, timezone
activity = Activity(
- activity_id=run_id or f"dag-{output_hash[:16]}",
+ activity_id=run_id or f"dag-{output_cid[:16]}",
input_ids=sorted(input_hashes),
- output_id=output_hash,
+ output_id=output_cid,
intermediate_ids=intermediate_hashes,
created_at=datetime.now(timezone.utc).timestamp(),
status="completed",
@@ -454,23 +454,23 @@ def execute_dag(self, dag_json: str, run_id: str = None) -> dict:
input_hashes_for_provenance = []
for node_id, node in dag.nodes.items():
if node.node_type == NodeType.SOURCE or str(node.node_type) == "SOURCE":
- content_hash = node.config.get("content_hash")
- if content_hash:
- input_hashes_for_provenance.append({"content_hash": content_hash})
+ cid = node.config.get("cid")
+ if cid:
+ input_hashes_for_provenance.append({"cid": cid})
provenance = {
"task_id": self.request.id,
"run_id": run_id,
"rendered_at": datetime.now(timezone.utc).isoformat(),
"output": {
- "content_hash": output_hash,
+ "cid": output_cid,
"ipfs_cid": node_ipfs_cids.get(dag.output_id) if dag.output_id else None,
},
"inputs": input_hashes_for_provenance,
"dag": dag_json, # Full DAG definition
"nodes": {
node_id: {
- "content_hash": node_hashes.get(node_id),
+ "cid": node_hashes.get(node_id),
"ipfs_cid": node_ipfs_cids.get(node_id),
}
for node_id in dag.nodes.keys()
@@ -496,7 +496,7 @@ def execute_dag(self, dag_json: str, run_id: str = None) -> dict:
return {
"success": True,
"run_id": run_id,
- "output_hash": output_hash,
+ "output_cid": output_cid,
"output_ipfs_cid": node_ipfs_cids.get(dag.output_id) if dag.output_id else None,
"output_path": str(result.output_path) if result.output_path else None,
"execution_time": result.execution_time,
@@ -505,7 +505,7 @@ def execute_dag(self, dag_json: str, run_id: str = None) -> dict:
"node_results": {
node_id: str(path) for node_id, path in result.node_results.items()
},
- "node_hashes": node_hashes, # node_id -> content_hash
+ "node_hashes": node_hashes, # node_id -> cid
"node_ipfs_cids": node_ipfs_cids, # node_id -> ipfs_cid
"provenance_cid": provenance_cid,
}
@@ -526,10 +526,10 @@ def build_effect_dag(input_hashes: List[str], effect_name: str) -> DAG:
# Add source nodes for each input
source_ids = []
- for i, content_hash in enumerate(input_hashes):
+ for i, cid in enumerate(input_hashes):
source_node = Node(
node_type=NodeType.SOURCE,
- config={"content_hash": content_hash},
+ config={"cid": cid},
name=f"source_{i}",
)
dag.add_node(source_node)
diff --git a/server_legacy.py b/server_legacy.py
index f493ab0..1f76f37 100644
--- a/server_legacy.py
+++ b/server_legacy.py
@@ -5,7 +5,7 @@ Art DAG L1 Server
Manages rendering runs and provides access to the cache.
- POST /runs - start a run (recipe + inputs)
- GET /runs/{run_id} - get run status/result
-- GET /cache/{content_hash} - get cached content
+- GET /cache/{cid} - get cached content
"""
import asyncio
@@ -167,16 +167,16 @@ def list_all_runs() -> list["RunStatus"]:
return sorted(runs, key=lambda r: r.created_at, reverse=True)
-def find_runs_using_content(content_hash: str) -> list[tuple["RunStatus", str]]:
- """Find all runs that use a content_hash as input or output.
+def find_runs_using_content(cid: str) -> list[tuple["RunStatus", str]]:
+ """Find all runs that use a cid as input or output.
Returns list of (run, role) tuples where role is 'input' or 'output'.
"""
results = []
for run in list_all_runs():
- if run.inputs and content_hash in run.inputs:
+ if run.inputs and cid in run.inputs:
results.append((run, "input"))
- if run.output_hash == content_hash:
+ if run.output_cid == cid:
results.append((run, "output"))
return results
@@ -268,7 +268,7 @@ class RunStatus(BaseModel):
output_name: str
created_at: str
completed_at: Optional[str] = None
- output_hash: Optional[str] = None
+ output_cid: Optional[str] = None
output_ipfs_cid: Optional[str] = None # IPFS CID of output (IPFS_PRIMARY mode)
error: Optional[str] = None
celery_task_id: Optional[str] = None
@@ -298,7 +298,7 @@ class FixedInput(BaseModel):
"""A fixed input resolved from the registry."""
node_id: str
asset: str
- content_hash: str
+ cid: str
class RecipeStatus(BaseModel):
@@ -317,7 +317,7 @@ class RecipeStatus(BaseModel):
class RecipeRunRequest(BaseModel):
"""Request to run a recipe with variable inputs."""
- inputs: dict[str, str] # node_id -> content_hash
+ inputs: dict[str, str] # node_id -> cid
def save_recipe(recipe: RecipeStatus):
@@ -391,7 +391,7 @@ def parse_recipe_yaml(yaml_content: str, recipe_hash: str, uploader: str) -> Rec
fixed_inputs.append(FixedInput(
node_id=node_id,
asset=asset_name,
- content_hash=asset_info.get("hash", "")
+ cid=asset_info.get("hash", "")
))
return RecipeStatus(
@@ -555,13 +555,13 @@ async def cache_file(source: Path, node_type: str = "output") -> str:
"""
cached, ipfs_cid = cache_manager.put(source, node_type=node_type)
# Save to cache_items table (with IPFS CID)
- await database.create_cache_item(cached.content_hash, ipfs_cid)
- return cached.content_hash
+ await database.create_cache_item(cached.cid, ipfs_cid)
+ return cached.cid
-def get_cache_path(content_hash: str) -> Optional[Path]:
- """Get the path for a cached file by content_hash."""
- return cache_manager.get_by_content_hash(content_hash)
+def get_cache_path(cid: str) -> Optional[Path]:
+ """Get the path for a cached file by cid."""
+ return cache_manager.get_by_cid(cid)
@app.get("/api")
@@ -658,7 +658,7 @@ def render_home_html(actor_id: Optional[str] = None) -> str:
Provenance
Every render produces a provenance record linking inputs, effects, and infrastructure:
{{
- "output": {{"content_hash": "..."}},
+ "output": {{"cid": "..."}},
"inputs": [...],
"effects": [...],
"infrastructure": {{...}}
@@ -692,10 +692,10 @@ async def create_run(request: RunRequest, ctx: UserContext = Depends(get_require
# Check L1 cache first
cached_run = await database.get_run_cache(run_id)
if cached_run:
- output_hash = cached_run["output_hash"]
+ output_cid = cached_run["output_cid"]
# Verify the output file still exists in cache
- if cache_manager.has_content(output_hash):
- logger.info(f"create_run: Cache hit for run_id={run_id[:16]}... output={output_hash[:16]}...")
+ if cache_manager.has_content(output_cid):
+ logger.info(f"create_run: Cache hit for run_id={run_id[:16]}... output={output_cid[:16]}...")
return RunStatus(
run_id=run_id,
status="completed",
@@ -704,7 +704,7 @@ async def create_run(request: RunRequest, ctx: UserContext = Depends(get_require
output_name=output_name,
created_at=cached_run.get("created_at", datetime.now(timezone.utc).isoformat()),
completed_at=cached_run.get("created_at", datetime.now(timezone.utc).isoformat()),
- output_hash=output_hash,
+ output_cid=output_cid,
username=actor_id,
provenance_cid=cached_run.get("provenance_cid"),
)
@@ -720,31 +720,31 @@ async def create_run(request: RunRequest, ctx: UserContext = Depends(get_require
)
if l2_resp.status_code == 200:
l2_data = l2_resp.json()
- output_hash = l2_data.get("output_hash")
+ output_cid = l2_data.get("output_cid")
ipfs_cid = l2_data.get("ipfs_cid")
- if output_hash and ipfs_cid:
+ if output_cid and ipfs_cid:
logger.info(f"create_run: Found on L2, pulling from IPFS: {ipfs_cid}")
# Pull from IPFS to L1 cache
import ipfs_client
legacy_dir = CACHE_DIR / "legacy"
legacy_dir.mkdir(parents=True, exist_ok=True)
- recovery_path = legacy_dir / output_hash
+ recovery_path = legacy_dir / output_cid
if ipfs_client.get_file(ipfs_cid, str(recovery_path)):
# File retrieved - put() updates indexes, but file is already in legacy location
# Just update the content and IPFS indexes manually
- cache_manager._set_content_index(output_hash, output_hash)
- cache_manager._set_ipfs_index(output_hash, ipfs_cid)
+ cache_manager._set_content_index(output_cid, output_cid)
+ cache_manager._set_ipfs_index(output_cid, ipfs_cid)
# Save to run cache
await database.save_run_cache(
run_id=run_id,
- output_hash=output_hash,
+ output_cid=output_cid,
recipe=request.recipe,
inputs=request.inputs,
ipfs_cid=ipfs_cid,
provenance_cid=l2_data.get("provenance_cid"),
actor_id=actor_id,
)
- logger.info(f"create_run: Recovered from L2/IPFS: {output_hash[:16]}...")
+ logger.info(f"create_run: Recovered from L2/IPFS: {output_cid[:16]}...")
return RunStatus(
run_id=run_id,
status="completed",
@@ -753,7 +753,7 @@ async def create_run(request: RunRequest, ctx: UserContext = Depends(get_require
output_name=output_name,
created_at=datetime.now(timezone.utc).isoformat(),
completed_at=datetime.now(timezone.utc).isoformat(),
- output_hash=output_hash,
+ output_cid=output_cid,
username=actor_id,
provenance_cid=l2_data.get("provenance_cid"),
)
@@ -845,12 +845,12 @@ async def get_run(run_id: str):
step_id: {"cid": cid, "status": "completed"}
for step_id, cid in result.get("step_cids", {}).items()
}
- # Try to get content_hash from cache_id mapping in Redis
- # (cache_id is often the same as content_hash)
+ # Try to get cid from cache_id mapping in Redis
+ # (cache_id is often the same as cid)
output_path = None
- elif "output_hash" in result or "output_cache_id" in result:
+ elif "output_cid" in result or "output_cache_id" in result:
# New DAG/plan result format
- run.output_hash = result.get("output_hash") or result.get("output_cache_id")
+ run.output_cid = result.get("output_cid") or result.get("output_cache_id")
run.provenance_cid = result.get("provenance_cid")
output_path = Path(result.get("output_path", "")) if result.get("output_path") else None
# Store plan execution data
@@ -860,7 +860,7 @@ async def get_run(run_id: str):
run.all_outputs = result.get("outputs") # All outputs from all steps
elif "output" in result:
# Legacy render_effect format
- run.output_hash = result.get("output", {}).get("content_hash")
+ run.output_cid = result.get("output", {}).get("cid")
run.provenance_cid = result.get("provenance_cid")
output_path = Path(result.get("output", {}).get("local_path", ""))
@@ -874,27 +874,27 @@ async def get_run(run_id: str):
run.infrastructure = result.get("infrastructure")
# Cache the output (legacy mode - DAG/plan already caches via cache_manager)
- is_plan_result = "output_hash" in result or "output_cache_id" in result
+ is_plan_result = "output_cid" in result or "output_cache_id" in result
if output_path and output_path.exists() and not is_plan_result:
t0 = time.time()
await cache_file(output_path, node_type="effect_output")
logger.info(f"get_run: cache_file took {time.time()-t0:.3f}s")
# Record activity for deletion tracking (legacy mode)
- if run.output_hash and run.inputs:
+ if run.output_cid and run.inputs:
await asyncio.to_thread(
cache_manager.record_simple_activity,
input_hashes=run.inputs,
- output_hash=run.output_hash,
+ output_cid=run.output_cid,
run_id=run.run_id,
)
# Save to run cache for content-addressable lookup
- if run.output_hash:
- ipfs_cid = cache_manager._get_ipfs_cid_from_index(run.output_hash)
+ if run.output_cid:
+ ipfs_cid = cache_manager._get_ipfs_cid_from_index(run.output_cid)
await database.save_run_cache(
run_id=run.run_id,
- output_hash=run.output_hash,
+ output_cid=run.output_cid,
recipe=run.recipe,
inputs=run.inputs,
ipfs_cid=ipfs_cid,
@@ -936,11 +936,11 @@ async def discard_run(run_id: str, ctx: UserContext = Depends(get_required_user_
# Failed runs can always be deleted (no output to protect)
if run.status != "failed":
# Only check if output is pinned - inputs are preserved, not deleted
- if run.output_hash:
- meta = await database.load_item_metadata(run.output_hash, ctx.actor_id)
+ if run.output_cid:
+ meta = await database.load_item_metadata(run.output_cid, ctx.actor_id)
if meta.get("pinned"):
pin_reason = meta.get("pin_reason", "published")
- raise HTTPException(400, f"Cannot discard run: output {run.output_hash[:16]}... is pinned ({pin_reason})")
+ raise HTTPException(400, f"Cannot discard run: output {run.output_cid[:16]}... is pinned ({pin_reason})")
# Check if activity exists for this run
activity = await asyncio.to_thread(cache_manager.get_activity, run_id)
@@ -975,8 +975,8 @@ async def ui_discard_run(run_id: str, request: Request):
# Failed runs can always be deleted
if run.status != "failed":
# Only check if output is pinned - inputs are preserved, not deleted
- if run.output_hash:
- meta = await database.load_item_metadata(run.output_hash, ctx.actor_id)
+ if run.output_cid:
+ meta = await database.load_item_metadata(run.output_cid, ctx.actor_id)
if meta.get("pinned"):
pin_reason = meta.get("pin_reason", "published")
return f'Cannot discard: output is pinned ({pin_reason})
'
@@ -1019,7 +1019,7 @@ async def run_detail(run_id: str, request: Request):
if is_successful:
run.status = "completed"
run.completed_at = datetime.now(timezone.utc).isoformat()
- run.output_hash = result.get("output", {}).get("content_hash")
+ run.output_cid = result.get("output", {}).get("cid")
effects = result.get("effects", [])
if effects:
run.effects_commit = effects[0].get("repo_commit")
@@ -1029,11 +1029,11 @@ async def run_detail(run_id: str, request: Request):
if output_path.exists():
await cache_file(output_path)
# Save to run cache for content-addressable lookup
- if run.output_hash:
- ipfs_cid = cache_manager._get_ipfs_cid_from_index(run.output_hash)
+ if run.output_cid:
+ ipfs_cid = cache_manager._get_ipfs_cid_from_index(run.output_cid)
await database.save_run_cache(
run_id=run.run_id,
- output_hash=run.output_hash,
+ output_cid=run.output_cid,
recipe=run.recipe,
inputs=run.inputs,
ipfs_cid=ipfs_cid,
@@ -1093,7 +1093,7 @@ async def run_detail(run_id: str, request: Request):
# Build media HTML for inputs and output
media_html = ""
available_inputs = [inp for inp in run.inputs if cache_manager.has_content(inp)]
- has_output = run.status == "completed" and run.output_hash and cache_manager.has_content(run.output_hash)
+ has_output = run.status == "completed" and run.output_cid and cache_manager.has_content(run.output_cid)
has_ipfs_output = run.status == "completed" and run.output_ipfs_cid and not has_output
if available_inputs or has_output or has_ipfs_output:
@@ -1121,19 +1121,19 @@ async def run_detail(run_id: str, request: Request):