Rename content_hash/output_hash to cid throughout

Refactor to use IPFS CID as the primary content identifier:
- Update database schema: content_hash -> cid, output_hash -> output_cid
- Update all services, routers, and tasks to use cid terminology
- Update HTML templates to display CID instead of hash
- Update cache_manager parameter names
- Update README documentation

This completes the transition to CID-only content addressing.

Co-Authored-By: Claude Opus 4.5 <noreply@anthropic.com>
This commit is contained in:
gilesb
2026-01-12 08:02:44 +00:00
parent 494a2a8650
commit 92d26b2b72
22 changed files with 981 additions and 988 deletions

View File

@@ -19,7 +19,7 @@ SCHEMA_SQL = """
-- Core cache: just content hash and IPFS CID
-- Physical file storage - shared by all users
CREATE TABLE IF NOT EXISTS cache_items (
content_hash VARCHAR(64) PRIMARY KEY,
cid VARCHAR(64) PRIMARY KEY,
ipfs_cid VARCHAR(128),
created_at TIMESTAMP WITH TIME ZONE DEFAULT NOW()
);
@@ -28,7 +28,7 @@ CREATE TABLE IF NOT EXISTS cache_items (
-- actor_id format: @username@server (ActivityPub style)
CREATE TABLE IF NOT EXISTS item_types (
id SERIAL PRIMARY KEY,
content_hash VARCHAR(64) REFERENCES cache_items(content_hash) ON DELETE CASCADE,
cid VARCHAR(64) REFERENCES cache_items(cid) ON DELETE CASCADE,
actor_id VARCHAR(255) NOT NULL,
type VARCHAR(50) NOT NULL,
path VARCHAR(255),
@@ -40,7 +40,7 @@ CREATE TABLE IF NOT EXISTS item_types (
filename VARCHAR(255),
metadata JSONB DEFAULT '{}',
created_at TIMESTAMP WITH TIME ZONE DEFAULT NOW(),
UNIQUE(content_hash, actor_id, type, path)
UNIQUE(cid, actor_id, type, path)
);
-- Add columns if they don't exist (for existing databases)
@@ -61,7 +61,7 @@ CREATE TABLE IF NOT EXISTS pin_reasons (
-- L2 shares: per-user shares (includes content_type for role when shared)
CREATE TABLE IF NOT EXISTS l2_shares (
id SERIAL PRIMARY KEY,
content_hash VARCHAR(64) REFERENCES cache_items(content_hash) ON DELETE CASCADE,
cid VARCHAR(64) REFERENCES cache_items(cid) ON DELETE CASCADE,
actor_id VARCHAR(255) NOT NULL,
l2_server VARCHAR(255) NOT NULL,
asset_name VARCHAR(255) NOT NULL,
@@ -69,7 +69,7 @@ CREATE TABLE IF NOT EXISTS l2_shares (
content_type VARCHAR(50) NOT NULL,
published_at TIMESTAMP WITH TIME ZONE DEFAULT NOW(),
last_synced_at TIMESTAMP WITH TIME ZONE,
UNIQUE(content_hash, actor_id, l2_server, content_type)
UNIQUE(cid, actor_id, l2_server, content_type)
);
-- Add activity_id column if it doesn't exist (for existing databases)
@@ -82,7 +82,7 @@ END $$;
-- run_id is a hash of (sorted inputs + recipe), making runs deterministic
CREATE TABLE IF NOT EXISTS run_cache (
run_id VARCHAR(64) PRIMARY KEY,
output_hash VARCHAR(64) NOT NULL,
output_cid VARCHAR(64) NOT NULL,
ipfs_cid VARCHAR(128),
provenance_cid VARCHAR(128),
recipe VARCHAR(255) NOT NULL,
@@ -128,27 +128,27 @@ CREATE TABLE IF NOT EXISTS storage_backends (
-- Storage pins tracking (what's pinned where)
CREATE TABLE IF NOT EXISTS storage_pins (
id SERIAL PRIMARY KEY,
content_hash VARCHAR(64) NOT NULL,
cid VARCHAR(64) NOT NULL,
storage_id INTEGER NOT NULL REFERENCES storage_backends(id) ON DELETE CASCADE,
ipfs_cid VARCHAR(128),
pin_type VARCHAR(20) NOT NULL, -- 'user_content', 'donated', 'system'
size_bytes BIGINT,
pinned_at TIMESTAMP WITH TIME ZONE DEFAULT NOW(),
UNIQUE(content_hash, storage_id)
UNIQUE(cid, storage_id)
);
-- Indexes
CREATE INDEX IF NOT EXISTS idx_item_types_content_hash ON item_types(content_hash);
CREATE INDEX IF NOT EXISTS idx_item_types_cid ON item_types(cid);
CREATE INDEX IF NOT EXISTS idx_item_types_actor_id ON item_types(actor_id);
CREATE INDEX IF NOT EXISTS idx_item_types_type ON item_types(type);
CREATE INDEX IF NOT EXISTS idx_item_types_path ON item_types(path);
CREATE INDEX IF NOT EXISTS idx_pin_reasons_item_type ON pin_reasons(item_type_id);
CREATE INDEX IF NOT EXISTS idx_l2_shares_content_hash ON l2_shares(content_hash);
CREATE INDEX IF NOT EXISTS idx_l2_shares_cid ON l2_shares(cid);
CREATE INDEX IF NOT EXISTS idx_l2_shares_actor_id ON l2_shares(actor_id);
CREATE INDEX IF NOT EXISTS idx_run_cache_output ON run_cache(output_hash);
CREATE INDEX IF NOT EXISTS idx_run_cache_output ON run_cache(output_cid);
CREATE INDEX IF NOT EXISTS idx_storage_backends_actor ON storage_backends(actor_id);
CREATE INDEX IF NOT EXISTS idx_storage_backends_type ON storage_backends(provider_type);
CREATE INDEX IF NOT EXISTS idx_storage_pins_hash ON storage_pins(content_hash);
CREATE INDEX IF NOT EXISTS idx_storage_pins_hash ON storage_pins(cid);
CREATE INDEX IF NOT EXISTS idx_storage_pins_storage ON storage_pins(storage_id);
"""
@@ -171,47 +171,47 @@ async def close_db():
# ============ Cache Items ============
async def create_cache_item(content_hash: str, ipfs_cid: Optional[str] = None) -> dict:
async def create_cache_item(cid: str, ipfs_cid: Optional[str] = None) -> dict:
"""Create a cache item. Returns the created item."""
async with pool.acquire() as conn:
row = await conn.fetchrow(
"""
INSERT INTO cache_items (content_hash, ipfs_cid)
INSERT INTO cache_items (cid, ipfs_cid)
VALUES ($1, $2)
ON CONFLICT (content_hash) DO UPDATE SET ipfs_cid = COALESCE($2, cache_items.ipfs_cid)
RETURNING content_hash, ipfs_cid, created_at
ON CONFLICT (cid) DO UPDATE SET ipfs_cid = COALESCE($2, cache_items.ipfs_cid)
RETURNING cid, ipfs_cid, created_at
""",
content_hash, ipfs_cid
cid, ipfs_cid
)
return dict(row)
async def get_cache_item(content_hash: str) -> Optional[dict]:
async def get_cache_item(cid: str) -> Optional[dict]:
"""Get a cache item by content hash."""
async with pool.acquire() as conn:
row = await conn.fetchrow(
"SELECT content_hash, ipfs_cid, created_at FROM cache_items WHERE content_hash = $1",
content_hash
"SELECT cid, ipfs_cid, created_at FROM cache_items WHERE cid = $1",
cid
)
return dict(row) if row else None
async def update_cache_item_ipfs_cid(content_hash: str, ipfs_cid: str) -> bool:
async def update_cache_item_ipfs_cid(cid: str, ipfs_cid: str) -> bool:
"""Update the IPFS CID for a cache item."""
async with pool.acquire() as conn:
result = await conn.execute(
"UPDATE cache_items SET ipfs_cid = $2 WHERE content_hash = $1",
content_hash, ipfs_cid
"UPDATE cache_items SET ipfs_cid = $2 WHERE cid = $1",
cid, ipfs_cid
)
return result == "UPDATE 1"
async def delete_cache_item(content_hash: str) -> bool:
async def delete_cache_item(cid: str) -> bool:
"""Delete a cache item and all associated data (cascades)."""
async with pool.acquire() as conn:
result = await conn.execute(
"DELETE FROM cache_items WHERE content_hash = $1",
content_hash
"DELETE FROM cache_items WHERE cid = $1",
cid
)
return result == "DELETE 1"
@@ -221,7 +221,7 @@ async def list_cache_items(limit: int = 100, offset: int = 0) -> List[dict]:
async with pool.acquire() as conn:
rows = await conn.fetch(
"""
SELECT content_hash, ipfs_cid, created_at
SELECT cid, ipfs_cid, created_at
FROM cache_items
ORDER BY created_at DESC
LIMIT $1 OFFSET $2
@@ -234,7 +234,7 @@ async def list_cache_items(limit: int = 100, offset: int = 0) -> List[dict]:
# ============ Item Types ============
async def add_item_type(
content_hash: str,
cid: str,
actor_id: str,
item_type: str,
path: Optional[str] = None,
@@ -247,72 +247,72 @@ async def add_item_type(
async with pool.acquire() as conn:
# Ensure cache_item exists
await conn.execute(
"INSERT INTO cache_items (content_hash) VALUES ($1) ON CONFLICT DO NOTHING",
content_hash
"INSERT INTO cache_items (cid) VALUES ($1) ON CONFLICT DO NOTHING",
cid
)
# Insert or update item_type
row = await conn.fetchrow(
"""
INSERT INTO item_types (content_hash, actor_id, type, path, description, source_type, source_url, source_note)
INSERT INTO item_types (cid, actor_id, type, path, description, source_type, source_url, source_note)
VALUES ($1, $2, $3, $4, $5, $6, $7, $8)
ON CONFLICT (content_hash, actor_id, type, path) DO UPDATE SET
ON CONFLICT (cid, actor_id, type, path) DO UPDATE SET
description = COALESCE($5, item_types.description),
source_type = COALESCE($6, item_types.source_type),
source_url = COALESCE($7, item_types.source_url),
source_note = COALESCE($8, item_types.source_note)
RETURNING id, content_hash, actor_id, type, path, description, source_type, source_url, source_note, pinned, created_at
RETURNING id, cid, actor_id, type, path, description, source_type, source_url, source_note, pinned, created_at
""",
content_hash, actor_id, item_type, path, description, source_type, source_url, source_note
cid, actor_id, item_type, path, description, source_type, source_url, source_note
)
return dict(row)
async def get_item_types(content_hash: str, actor_id: Optional[str] = None) -> List[dict]:
async def get_item_types(cid: str, actor_id: Optional[str] = None) -> List[dict]:
"""Get types for a cache item, optionally filtered by user."""
async with pool.acquire() as conn:
if actor_id:
rows = await conn.fetch(
"""
SELECT id, content_hash, actor_id, type, path, description, source_type, source_url, source_note, pinned, created_at
SELECT id, cid, actor_id, type, path, description, source_type, source_url, source_note, pinned, created_at
FROM item_types
WHERE content_hash = $1 AND actor_id = $2
WHERE cid = $1 AND actor_id = $2
ORDER BY created_at
""",
content_hash, actor_id
cid, actor_id
)
else:
rows = await conn.fetch(
"""
SELECT id, content_hash, actor_id, type, path, description, source_type, source_url, source_note, pinned, created_at
SELECT id, cid, actor_id, type, path, description, source_type, source_url, source_note, pinned, created_at
FROM item_types
WHERE content_hash = $1
WHERE cid = $1
ORDER BY created_at
""",
content_hash
cid
)
return [dict(row) for row in rows]
async def get_item_type(content_hash: str, actor_id: str, item_type: str, path: Optional[str] = None) -> Optional[dict]:
async def get_item_type(cid: str, actor_id: str, item_type: str, path: Optional[str] = None) -> Optional[dict]:
"""Get a specific type for a cache item and user."""
async with pool.acquire() as conn:
if path is None:
row = await conn.fetchrow(
"""
SELECT id, content_hash, actor_id, type, path, description, source_type, source_url, source_note, pinned, created_at
SELECT id, cid, actor_id, type, path, description, source_type, source_url, source_note, pinned, created_at
FROM item_types
WHERE content_hash = $1 AND actor_id = $2 AND type = $3 AND path IS NULL
WHERE cid = $1 AND actor_id = $2 AND type = $3 AND path IS NULL
""",
content_hash, actor_id, item_type
cid, actor_id, item_type
)
else:
row = await conn.fetchrow(
"""
SELECT id, content_hash, actor_id, type, path, description, source_type, source_url, source_note, pinned, created_at
SELECT id, cid, actor_id, type, path, description, source_type, source_url, source_note, pinned, created_at
FROM item_types
WHERE content_hash = $1 AND actor_id = $2 AND type = $3 AND path = $4
WHERE cid = $1 AND actor_id = $2 AND type = $3 AND path = $4
""",
content_hash, actor_id, item_type, path
cid, actor_id, item_type, path
)
return dict(row) if row else None
@@ -340,18 +340,18 @@ async def update_item_type(
return result == "UPDATE 1"
async def delete_item_type(content_hash: str, actor_id: str, item_type: str, path: Optional[str] = None) -> bool:
async def delete_item_type(cid: str, actor_id: str, item_type: str, path: Optional[str] = None) -> bool:
"""Delete a specific type from a cache item for a user."""
async with pool.acquire() as conn:
if path is None:
result = await conn.execute(
"DELETE FROM item_types WHERE content_hash = $1 AND actor_id = $2 AND type = $3 AND path IS NULL",
content_hash, actor_id, item_type
"DELETE FROM item_types WHERE cid = $1 AND actor_id = $2 AND type = $3 AND path IS NULL",
cid, actor_id, item_type
)
else:
result = await conn.execute(
"DELETE FROM item_types WHERE content_hash = $1 AND actor_id = $2 AND type = $3 AND path = $4",
content_hash, actor_id, item_type, path
"DELETE FROM item_types WHERE cid = $1 AND actor_id = $2 AND type = $3 AND path = $4",
cid, actor_id, item_type, path
)
return result == "DELETE 1"
@@ -362,11 +362,11 @@ async def list_items_by_type(item_type: str, actor_id: Optional[str] = None, lim
if actor_id:
rows = await conn.fetch(
"""
SELECT it.id, it.content_hash, it.actor_id, it.type, it.path, it.description,
SELECT it.id, it.cid, it.actor_id, it.type, it.path, it.description,
it.source_type, it.source_url, it.source_note, it.pinned, it.created_at,
ci.ipfs_cid
FROM item_types it
JOIN cache_items ci ON it.content_hash = ci.content_hash
JOIN cache_items ci ON it.cid = ci.cid
WHERE it.type = $1 AND it.actor_id = $2
ORDER BY it.created_at DESC
LIMIT $3 OFFSET $4
@@ -376,11 +376,11 @@ async def list_items_by_type(item_type: str, actor_id: Optional[str] = None, lim
else:
rows = await conn.fetch(
"""
SELECT it.id, it.content_hash, it.actor_id, it.type, it.path, it.description,
SELECT it.id, it.cid, it.actor_id, it.type, it.path, it.description,
it.source_type, it.source_url, it.source_note, it.pinned, it.created_at,
ci.ipfs_cid
FROM item_types it
JOIN cache_items ci ON it.content_hash = ci.content_hash
JOIN cache_items ci ON it.cid = ci.cid
WHERE it.type = $1
ORDER BY it.created_at DESC
LIMIT $2 OFFSET $3
@@ -396,11 +396,11 @@ async def get_item_by_path(item_type: str, path: str, actor_id: Optional[str] =
if actor_id:
row = await conn.fetchrow(
"""
SELECT it.id, it.content_hash, it.actor_id, it.type, it.path, it.description,
SELECT it.id, it.cid, it.actor_id, it.type, it.path, it.description,
it.source_type, it.source_url, it.source_note, it.pinned, it.created_at,
ci.ipfs_cid
FROM item_types it
JOIN cache_items ci ON it.content_hash = ci.content_hash
JOIN cache_items ci ON it.cid = ci.cid
WHERE it.type = $1 AND it.path = $2 AND it.actor_id = $3
""",
item_type, path, actor_id
@@ -408,11 +408,11 @@ async def get_item_by_path(item_type: str, path: str, actor_id: Optional[str] =
else:
row = await conn.fetchrow(
"""
SELECT it.id, it.content_hash, it.actor_id, it.type, it.path, it.description,
SELECT it.id, it.cid, it.actor_id, it.type, it.path, it.description,
it.source_type, it.source_url, it.source_note, it.pinned, it.created_at,
ci.ipfs_cid
FROM item_types it
JOIN cache_items ci ON it.content_hash = ci.content_hash
JOIN cache_items ci ON it.cid = ci.cid
WHERE it.type = $1 AND it.path = $2
""",
item_type, path
@@ -480,7 +480,7 @@ async def get_pin_reasons(item_type_id: int) -> List[dict]:
return [dict(row) for row in rows]
async def is_item_pinned(content_hash: str, item_type: Optional[str] = None) -> tuple[bool, List[str]]:
async def is_item_pinned(cid: str, item_type: Optional[str] = None) -> tuple[bool, List[str]]:
"""Check if any type of a cache item is pinned. Returns (is_pinned, reasons)."""
async with pool.acquire() as conn:
if item_type:
@@ -489,9 +489,9 @@ async def is_item_pinned(content_hash: str, item_type: Optional[str] = None) ->
SELECT pr.reason
FROM pin_reasons pr
JOIN item_types it ON pr.item_type_id = it.id
WHERE it.content_hash = $1 AND it.type = $2 AND it.pinned = TRUE
WHERE it.cid = $1 AND it.type = $2 AND it.pinned = TRUE
""",
content_hash, item_type
cid, item_type
)
else:
rows = await conn.fetch(
@@ -499,9 +499,9 @@ async def is_item_pinned(content_hash: str, item_type: Optional[str] = None) ->
SELECT pr.reason
FROM pin_reasons pr
JOIN item_types it ON pr.item_type_id = it.id
WHERE it.content_hash = $1 AND it.pinned = TRUE
WHERE it.cid = $1 AND it.pinned = TRUE
""",
content_hash
cid
)
reasons = [row["reason"] for row in rows]
return len(reasons) > 0, reasons
@@ -510,7 +510,7 @@ async def is_item_pinned(content_hash: str, item_type: Optional[str] = None) ->
# ============ L2 Shares ============
async def add_l2_share(
content_hash: str,
cid: str,
actor_id: str,
l2_server: str,
asset_name: str,
@@ -520,85 +520,85 @@ async def add_l2_share(
async with pool.acquire() as conn:
row = await conn.fetchrow(
"""
INSERT INTO l2_shares (content_hash, actor_id, l2_server, asset_name, content_type, last_synced_at)
INSERT INTO l2_shares (cid, actor_id, l2_server, asset_name, content_type, last_synced_at)
VALUES ($1, $2, $3, $4, $5, NOW())
ON CONFLICT (content_hash, actor_id, l2_server, content_type) DO UPDATE SET
ON CONFLICT (cid, actor_id, l2_server, content_type) DO UPDATE SET
asset_name = $4,
last_synced_at = NOW()
RETURNING id, content_hash, actor_id, l2_server, asset_name, content_type, published_at, last_synced_at
RETURNING id, cid, actor_id, l2_server, asset_name, content_type, published_at, last_synced_at
""",
content_hash, actor_id, l2_server, asset_name, content_type
cid, actor_id, l2_server, asset_name, content_type
)
return dict(row)
async def get_l2_shares(content_hash: str, actor_id: Optional[str] = None) -> List[dict]:
async def get_l2_shares(cid: str, actor_id: Optional[str] = None) -> List[dict]:
"""Get L2 shares for a cache item, optionally filtered by user."""
async with pool.acquire() as conn:
if actor_id:
rows = await conn.fetch(
"""
SELECT id, content_hash, actor_id, l2_server, asset_name, activity_id, content_type, published_at, last_synced_at
SELECT id, cid, actor_id, l2_server, asset_name, activity_id, content_type, published_at, last_synced_at
FROM l2_shares
WHERE content_hash = $1 AND actor_id = $2
WHERE cid = $1 AND actor_id = $2
ORDER BY published_at
""",
content_hash, actor_id
cid, actor_id
)
else:
rows = await conn.fetch(
"""
SELECT id, content_hash, actor_id, l2_server, asset_name, activity_id, content_type, published_at, last_synced_at
SELECT id, cid, actor_id, l2_server, asset_name, activity_id, content_type, published_at, last_synced_at
FROM l2_shares
WHERE content_hash = $1
WHERE cid = $1
ORDER BY published_at
""",
content_hash
cid
)
return [dict(row) for row in rows]
async def delete_l2_share(content_hash: str, actor_id: str, l2_server: str, content_type: str) -> bool:
async def delete_l2_share(cid: str, actor_id: str, l2_server: str, content_type: str) -> bool:
"""Delete an L2 share for a user."""
async with pool.acquire() as conn:
result = await conn.execute(
"DELETE FROM l2_shares WHERE content_hash = $1 AND actor_id = $2 AND l2_server = $3 AND content_type = $4",
content_hash, actor_id, l2_server, content_type
"DELETE FROM l2_shares WHERE cid = $1 AND actor_id = $2 AND l2_server = $3 AND content_type = $4",
cid, actor_id, l2_server, content_type
)
return result == "DELETE 1"
# ============ Cache Item Cleanup ============
async def has_remaining_references(content_hash: str) -> bool:
async def has_remaining_references(cid: str) -> bool:
"""Check if a cache item has any remaining item_types or l2_shares."""
async with pool.acquire() as conn:
item_types_count = await conn.fetchval(
"SELECT COUNT(*) FROM item_types WHERE content_hash = $1",
content_hash
"SELECT COUNT(*) FROM item_types WHERE cid = $1",
cid
)
if item_types_count > 0:
return True
l2_shares_count = await conn.fetchval(
"SELECT COUNT(*) FROM l2_shares WHERE content_hash = $1",
content_hash
"SELECT COUNT(*) FROM l2_shares WHERE cid = $1",
cid
)
return l2_shares_count > 0
async def cleanup_orphaned_cache_item(content_hash: str) -> bool:
async def cleanup_orphaned_cache_item(cid: str) -> bool:
"""Delete a cache item if it has no remaining references. Returns True if deleted."""
async with pool.acquire() as conn:
# Only delete if no item_types or l2_shares reference it
result = await conn.execute(
"""
DELETE FROM cache_items
WHERE content_hash = $1
AND NOT EXISTS (SELECT 1 FROM item_types WHERE content_hash = $1)
AND NOT EXISTS (SELECT 1 FROM l2_shares WHERE content_hash = $1)
WHERE cid = $1
AND NOT EXISTS (SELECT 1 FROM item_types WHERE cid = $1)
AND NOT EXISTS (SELECT 1 FROM l2_shares WHERE cid = $1)
""",
content_hash
cid
)
return result == "DELETE 1"
@@ -610,7 +610,7 @@ import json as _json
async def save_item_metadata(
content_hash: str,
cid: str,
actor_id: str,
item_type: str = "media",
filename: Optional[str] = None,
@@ -643,16 +643,16 @@ async def save_item_metadata(
async with pool.acquire() as conn:
# Ensure cache_item exists
await conn.execute(
"INSERT INTO cache_items (content_hash) VALUES ($1) ON CONFLICT DO NOTHING",
content_hash
"INSERT INTO cache_items (cid) VALUES ($1) ON CONFLICT DO NOTHING",
cid
)
# Upsert item_type
row = await conn.fetchrow(
"""
INSERT INTO item_types (content_hash, actor_id, type, description, source_type, source_url, source_note, pinned, filename, metadata)
INSERT INTO item_types (cid, actor_id, type, description, source_type, source_url, source_note, pinned, filename, metadata)
VALUES ($1, $2, $3, $4, $5, $6, $7, $8, $9, $10)
ON CONFLICT (content_hash, actor_id, type, path) DO UPDATE SET
ON CONFLICT (cid, actor_id, type, path) DO UPDATE SET
description = COALESCE(EXCLUDED.description, item_types.description),
source_type = COALESCE(EXCLUDED.source_type, item_types.source_type),
source_url = COALESCE(EXCLUDED.source_url, item_types.source_url),
@@ -660,9 +660,9 @@ async def save_item_metadata(
pinned = EXCLUDED.pinned,
filename = COALESCE(EXCLUDED.filename, item_types.filename),
metadata = item_types.metadata || EXCLUDED.metadata
RETURNING id, content_hash, actor_id, type, path, description, source_type, source_url, source_note, pinned, filename, metadata, created_at
RETURNING id, cid, actor_id, type, path, description, source_type, source_url, source_note, pinned, filename, metadata, created_at
""",
content_hash, actor_id, item_type, description, source_type, source_url, source_note, pinned, filename, _json.dumps(metadata)
cid, actor_id, item_type, description, source_type, source_url, source_note, pinned, filename, _json.dumps(metadata)
)
item_type_id = row["id"]
@@ -719,7 +719,7 @@ async def save_item_metadata(
return result
async def load_item_metadata(content_hash: str, actor_id: Optional[str] = None) -> dict:
async def load_item_metadata(cid: str, actor_id: Optional[str] = None) -> dict:
"""
Load item metadata from the database.
@@ -731,8 +731,8 @@ async def load_item_metadata(content_hash: str, actor_id: Optional[str] = None)
async with pool.acquire() as conn:
# Get cache item
cache_item = await conn.fetchrow(
"SELECT content_hash, ipfs_cid, created_at FROM cache_items WHERE content_hash = $1",
content_hash
"SELECT cid, ipfs_cid, created_at FROM cache_items WHERE cid = $1",
cid
)
if not cache_item:
@@ -743,19 +743,19 @@ async def load_item_metadata(content_hash: str, actor_id: Optional[str] = None)
item_types = await conn.fetch(
"""
SELECT id, actor_id, type, path, description, source_type, source_url, source_note, pinned, filename, metadata, created_at
FROM item_types WHERE content_hash = $1 AND actor_id = $2
FROM item_types WHERE cid = $1 AND actor_id = $2
ORDER BY created_at
""",
content_hash, actor_id
cid, actor_id
)
else:
item_types = await conn.fetch(
"""
SELECT id, actor_id, type, path, description, source_type, source_url, source_note, pinned, filename, metadata, created_at
FROM item_types WHERE content_hash = $1
FROM item_types WHERE cid = $1
ORDER BY created_at
""",
content_hash
cid
)
if not item_types:
@@ -807,17 +807,17 @@ async def load_item_metadata(content_hash: str, actor_id: Optional[str] = None)
shares = await conn.fetch(
"""
SELECT l2_server, asset_name, activity_id, content_type, published_at, last_synced_at
FROM l2_shares WHERE content_hash = $1 AND actor_id = $2
FROM l2_shares WHERE cid = $1 AND actor_id = $2
""",
content_hash, actor_id
cid, actor_id
)
else:
shares = await conn.fetch(
"""
SELECT l2_server, asset_name, activity_id, content_type, published_at, last_synced_at
FROM l2_shares WHERE content_hash = $1
FROM l2_shares WHERE cid = $1
""",
content_hash
cid
)
if shares:
@@ -845,7 +845,7 @@ async def load_item_metadata(content_hash: str, actor_id: Optional[str] = None)
async def update_item_metadata(
content_hash: str,
cid: str,
actor_id: str,
item_type: str = "media",
**updates
@@ -880,15 +880,15 @@ async def update_item_metadata(
existing = await conn.fetchrow(
"""
SELECT id, metadata FROM item_types
WHERE content_hash = $1 AND actor_id = $2 AND type = $3 AND path IS NULL
WHERE cid = $1 AND actor_id = $2 AND type = $3 AND path IS NULL
""",
content_hash, actor_id, item_type
cid, actor_id, item_type
)
if not existing:
# Create new entry
return await save_item_metadata(
content_hash, actor_id, item_type,
cid, actor_id, item_type,
filename=filename, description=description,
source_type=source_type, source_url=source_url, source_note=source_note,
pinned=pinned or False, pin_reason=pin_reason,
@@ -898,7 +898,7 @@ async def update_item_metadata(
# Build update query dynamically
set_parts = []
params = [content_hash, actor_id, item_type]
params = [cid, actor_id, item_type]
param_idx = 4
if description is not None:
@@ -949,7 +949,7 @@ async def update_item_metadata(
if set_parts:
query = f"""
UPDATE item_types SET {', '.join(set_parts)}
WHERE content_hash = $1 AND actor_id = $2 AND type = $3 AND path IS NULL
WHERE cid = $1 AND actor_id = $2 AND type = $3 AND path IS NULL
"""
await conn.execute(query, *params)
@@ -964,11 +964,11 @@ async def update_item_metadata(
existing["id"], pin_reason
)
return await load_item_metadata(content_hash, actor_id)
return await load_item_metadata(cid, actor_id)
async def save_l2_share(
content_hash: str,
cid: str,
actor_id: str,
l2_server: str,
asset_name: str,
@@ -979,15 +979,15 @@ async def save_l2_share(
async with pool.acquire() as conn:
row = await conn.fetchrow(
"""
INSERT INTO l2_shares (content_hash, actor_id, l2_server, asset_name, activity_id, content_type, last_synced_at)
INSERT INTO l2_shares (cid, actor_id, l2_server, asset_name, activity_id, content_type, last_synced_at)
VALUES ($1, $2, $3, $4, $5, $6, NOW())
ON CONFLICT (content_hash, actor_id, l2_server, content_type) DO UPDATE SET
ON CONFLICT (cid, actor_id, l2_server, content_type) DO UPDATE SET
asset_name = EXCLUDED.asset_name,
activity_id = COALESCE(EXCLUDED.activity_id, l2_shares.activity_id),
last_synced_at = NOW()
RETURNING l2_server, asset_name, activity_id, content_type, published_at, last_synced_at
""",
content_hash, actor_id, l2_server, asset_name, activity_id, content_type
cid, actor_id, l2_server, asset_name, activity_id, content_type
)
return {
"l2_server": row["l2_server"],
@@ -1000,19 +1000,19 @@ async def save_l2_share(
async def get_user_items(actor_id: str, item_type: Optional[str] = None, limit: int = 100, offset: int = 0) -> List[dict]:
"""Get all items for a user, optionally filtered by type. Deduplicates by content_hash."""
"""Get all items for a user, optionally filtered by type. Deduplicates by cid."""
async with pool.acquire() as conn:
if item_type:
rows = await conn.fetch(
"""
SELECT * FROM (
SELECT DISTINCT ON (it.content_hash)
it.content_hash, it.type, it.description, it.filename, it.pinned, it.created_at,
SELECT DISTINCT ON (it.cid)
it.cid, it.type, it.description, it.filename, it.pinned, it.created_at,
ci.ipfs_cid
FROM item_types it
JOIN cache_items ci ON it.content_hash = ci.content_hash
JOIN cache_items ci ON it.cid = ci.cid
WHERE it.actor_id = $1 AND it.type = $2
ORDER BY it.content_hash, it.created_at DESC
ORDER BY it.cid, it.created_at DESC
) deduped
ORDER BY created_at DESC
LIMIT $3 OFFSET $4
@@ -1023,13 +1023,13 @@ async def get_user_items(actor_id: str, item_type: Optional[str] = None, limit:
rows = await conn.fetch(
"""
SELECT * FROM (
SELECT DISTINCT ON (it.content_hash)
it.content_hash, it.type, it.description, it.filename, it.pinned, it.created_at,
SELECT DISTINCT ON (it.cid)
it.cid, it.type, it.description, it.filename, it.pinned, it.created_at,
ci.ipfs_cid
FROM item_types it
JOIN cache_items ci ON it.content_hash = ci.content_hash
JOIN cache_items ci ON it.cid = ci.cid
WHERE it.actor_id = $1
ORDER BY it.content_hash, it.created_at DESC
ORDER BY it.cid, it.created_at DESC
) deduped
ORDER BY created_at DESC
LIMIT $2 OFFSET $3
@@ -1039,7 +1039,7 @@ async def get_user_items(actor_id: str, item_type: Optional[str] = None, limit:
return [
{
"content_hash": r["content_hash"],
"cid": r["cid"],
"type": r["type"],
"description": r["description"],
"filename": r["filename"],
@@ -1052,16 +1052,16 @@ async def get_user_items(actor_id: str, item_type: Optional[str] = None, limit:
async def count_user_items(actor_id: str, item_type: Optional[str] = None) -> int:
"""Count unique items (by content_hash) for a user."""
"""Count unique items (by cid) for a user."""
async with pool.acquire() as conn:
if item_type:
return await conn.fetchval(
"SELECT COUNT(DISTINCT content_hash) FROM item_types WHERE actor_id = $1 AND type = $2",
"SELECT COUNT(DISTINCT cid) FROM item_types WHERE actor_id = $1 AND type = $2",
actor_id, item_type
)
else:
return await conn.fetchval(
"SELECT COUNT(DISTINCT content_hash) FROM item_types WHERE actor_id = $1",
"SELECT COUNT(DISTINCT cid) FROM item_types WHERE actor_id = $1",
actor_id
)
@@ -1073,7 +1073,7 @@ async def get_run_cache(run_id: str) -> Optional[dict]:
async with pool.acquire() as conn:
row = await conn.fetchrow(
"""
SELECT run_id, output_hash, ipfs_cid, provenance_cid, recipe, inputs, actor_id, created_at
SELECT run_id, output_cid, ipfs_cid, provenance_cid, recipe, inputs, actor_id, created_at
FROM run_cache WHERE run_id = $1
""",
run_id
@@ -1081,7 +1081,7 @@ async def get_run_cache(run_id: str) -> Optional[dict]:
if row:
return {
"run_id": row["run_id"],
"output_hash": row["output_hash"],
"output_cid": row["output_cid"],
"ipfs_cid": row["ipfs_cid"],
"provenance_cid": row["provenance_cid"],
"recipe": row["recipe"],
@@ -1094,7 +1094,7 @@ async def get_run_cache(run_id: str) -> Optional[dict]:
async def save_run_cache(
run_id: str,
output_hash: str,
output_cid: str,
recipe: str,
inputs: List[str],
ipfs_cid: Optional[str] = None,
@@ -1105,19 +1105,19 @@ async def save_run_cache(
async with pool.acquire() as conn:
row = await conn.fetchrow(
"""
INSERT INTO run_cache (run_id, output_hash, ipfs_cid, provenance_cid, recipe, inputs, actor_id)
INSERT INTO run_cache (run_id, output_cid, ipfs_cid, provenance_cid, recipe, inputs, actor_id)
VALUES ($1, $2, $3, $4, $5, $6, $7)
ON CONFLICT (run_id) DO UPDATE SET
output_hash = EXCLUDED.output_hash,
output_cid = EXCLUDED.output_cid,
ipfs_cid = COALESCE(EXCLUDED.ipfs_cid, run_cache.ipfs_cid),
provenance_cid = COALESCE(EXCLUDED.provenance_cid, run_cache.provenance_cid)
RETURNING run_id, output_hash, ipfs_cid, provenance_cid, recipe, inputs, actor_id, created_at
RETURNING run_id, output_cid, ipfs_cid, provenance_cid, recipe, inputs, actor_id, created_at
""",
run_id, output_hash, ipfs_cid, provenance_cid, recipe, _json.dumps(inputs), actor_id
run_id, output_cid, ipfs_cid, provenance_cid, recipe, _json.dumps(inputs), actor_id
)
return {
"run_id": row["run_id"],
"output_hash": row["output_hash"],
"output_cid": row["output_cid"],
"ipfs_cid": row["ipfs_cid"],
"provenance_cid": row["provenance_cid"],
"recipe": row["recipe"],
@@ -1127,20 +1127,20 @@ async def save_run_cache(
}
async def get_run_by_output(output_hash: str) -> Optional[dict]:
async def get_run_by_output(output_cid: str) -> Optional[dict]:
"""Get run cache entry by output hash."""
async with pool.acquire() as conn:
row = await conn.fetchrow(
"""
SELECT run_id, output_hash, ipfs_cid, provenance_cid, recipe, inputs, actor_id, created_at
FROM run_cache WHERE output_hash = $1
SELECT run_id, output_cid, ipfs_cid, provenance_cid, recipe, inputs, actor_id, created_at
FROM run_cache WHERE output_cid = $1
""",
output_hash
output_cid
)
if row:
return {
"run_id": row["run_id"],
"output_hash": row["output_hash"],
"output_cid": row["output_cid"],
"ipfs_cid": row["ipfs_cid"],
"provenance_cid": row["provenance_cid"],
"recipe": row["recipe"],
@@ -1173,7 +1173,7 @@ async def list_runs_by_actor(actor_id: str, offset: int = 0, limit: int = 20) ->
async with pool.acquire() as conn:
rows = await conn.fetch(
"""
SELECT run_id, output_hash, ipfs_cid, provenance_cid, recipe, inputs, actor_id, created_at
SELECT run_id, output_cid, ipfs_cid, provenance_cid, recipe, inputs, actor_id, created_at
FROM run_cache
WHERE actor_id = $1
ORDER BY created_at DESC
@@ -1184,7 +1184,7 @@ async def list_runs_by_actor(actor_id: str, offset: int = 0, limit: int = 20) ->
return [
{
"run_id": row["run_id"],
"output_hash": row["output_hash"],
"output_cid": row["output_cid"],
"ipfs_cid": row["ipfs_cid"],
"provenance_cid": row["provenance_cid"],
"recipe": row["recipe"],
@@ -1348,7 +1348,7 @@ async def get_all_active_storage() -> List[dict]:
async def add_storage_pin(
content_hash: str,
cid: str,
storage_id: int,
ipfs_cid: Optional[str],
pin_type: str,
@@ -1358,40 +1358,40 @@ async def add_storage_pin(
async with pool.acquire() as conn:
try:
row = await conn.fetchrow(
"""INSERT INTO storage_pins (content_hash, storage_id, ipfs_cid, pin_type, size_bytes)
"""INSERT INTO storage_pins (cid, storage_id, ipfs_cid, pin_type, size_bytes)
VALUES ($1, $2, $3, $4, $5)
ON CONFLICT (content_hash, storage_id) DO UPDATE SET
ON CONFLICT (cid, storage_id) DO UPDATE SET
ipfs_cid = EXCLUDED.ipfs_cid,
pin_type = EXCLUDED.pin_type,
size_bytes = EXCLUDED.size_bytes,
pinned_at = NOW()
RETURNING id""",
content_hash, storage_id, ipfs_cid, pin_type, size_bytes
cid, storage_id, ipfs_cid, pin_type, size_bytes
)
return row["id"] if row else None
except Exception:
return None
async def remove_storage_pin(content_hash: str, storage_id: int) -> bool:
async def remove_storage_pin(cid: str, storage_id: int) -> bool:
"""Remove a pin record."""
async with pool.acquire() as conn:
result = await conn.execute(
"DELETE FROM storage_pins WHERE content_hash = $1 AND storage_id = $2",
content_hash, storage_id
"DELETE FROM storage_pins WHERE cid = $1 AND storage_id = $2",
cid, storage_id
)
return "DELETE 1" in result
async def get_pins_for_content(content_hash: str) -> List[dict]:
async def get_pins_for_content(cid: str) -> List[dict]:
"""Get all storage locations where content is pinned."""
async with pool.acquire() as conn:
rows = await conn.fetch(
"""SELECT sp.*, sb.provider_type, sb.provider_name, sb.actor_id
FROM storage_pins sp
JOIN storage_backends sb ON sp.storage_id = sb.id
WHERE sp.content_hash = $1""",
content_hash
WHERE sp.cid = $1""",
cid
)
return [dict(row) for row in rows]