Fix db function calls and add missing functions

- Fix get_activities to use get_activities_paginated
- Add get_user_assets, delete_asset, count_users, count_user_activities
- Add get_user_activities, get_renderer, update_anchor, delete_anchor
- Add record_run and get_run functions
- Fix create_asset calls to use dict parameter
- Fix update_asset call signature

Co-Authored-By: Claude Opus 4.5 <noreply@anthropic.com>
This commit is contained in:
giles
2026-01-11 13:13:40 +00:00
parent 65994ac107
commit f8f44945ab
3 changed files with 149 additions and 25 deletions

View File

@@ -31,8 +31,8 @@ async def list_activities(
username = get_user_from_cookie(request)
activities = await db.get_activities(offset=offset, limit=limit)
has_more = len(activities) >= limit
activities, total = await db.get_activities_paginated(limit=limit, offset=offset)
has_more = offset + len(activities) < total
if wants_json(request):
return {"activities": activities, "offset": offset, "limit": limit}

View File

@@ -82,21 +82,21 @@ async def create_asset(
"""Register a new asset."""
import db
asset_id = await db.create_asset(
username=user["username"],
name=req.name,
content_hash=req.content_hash,
ipfs_cid=req.ipfs_cid,
asset_type=req.asset_type,
tags=req.tags,
metadata=req.metadata,
provenance=req.provenance,
)
asset = await db.create_asset({
"owner": user["username"],
"name": req.name,
"content_hash": req.content_hash,
"ipfs_cid": req.ipfs_cid,
"asset_type": req.asset_type,
"tags": req.tags or [],
"metadata": req.metadata or {},
"provenance": req.provenance,
})
if not asset_id:
if not asset:
raise HTTPException(400, "Failed to create asset")
return {"asset_id": asset_id, "message": "Asset registered"}
return {"asset_id": asset.get("name"), "message": "Asset registered"}
@router.get("/{asset_id}")
@@ -155,26 +155,27 @@ async def record_run(
import db
# Create asset for output
asset_id = await db.create_asset(
username=user["username"],
name=f"{req.recipe}-{req.run_id[:8]}",
content_hash=req.output_hash,
ipfs_cid=req.ipfs_cid,
asset_type="render",
metadata={
asset = await db.create_asset({
"owner": user["username"],
"name": f"{req.recipe}-{req.run_id[:8]}",
"content_hash": req.output_hash,
"ipfs_cid": req.ipfs_cid,
"asset_type": "render",
"metadata": {
"run_id": req.run_id,
"recipe": req.recipe,
"inputs": req.inputs,
},
provenance=req.provenance,
)
"provenance": req.provenance,
})
asset_id = asset.get("name") if asset else None
# Record run
await db.record_run(
run_id=req.run_id,
username=user["username"],
recipe=req.recipe,
inputs=req.inputs,
inputs=req.inputs or [],
output_hash=req.output_hash,
ipfs_cid=req.ipfs_cid,
asset_id=asset_id,
@@ -235,7 +236,7 @@ async def publish_asset(
# Pin to IPFS
cid = await ipfs_client.add_bytes(resp.content)
if cid:
await db.update_asset(asset_id, ipfs_cid=cid)
await db.update_asset(asset_id, {"ipfs_cid": cid})
return {"ipfs_cid": cid, "published": True}
except Exception as e:
logger.warning(f"Failed to fetch from {l1_url}: {e}")

123
db.py
View File

@@ -1091,3 +1091,126 @@ async def cleanup_expired_revocations() -> int:
return int(result.split()[-1])
except (ValueError, IndexError):
return 0
# ============ Additional helper functions ============
async def get_user_assets(username: str, offset: int = 0, limit: int = 20, asset_type: str = None) -> list[dict]:
"""Get assets owned by a user with pagination."""
async with get_connection() as conn:
if asset_type:
rows = await conn.fetch(
"""SELECT * FROM assets WHERE owner = $1 AND asset_type = $2
ORDER BY created_at DESC LIMIT $3 OFFSET $4""",
username, asset_type, limit, offset
)
else:
rows = await conn.fetch(
"""SELECT * FROM assets WHERE owner = $1
ORDER BY created_at DESC LIMIT $2 OFFSET $3""",
username, limit, offset
)
return [dict(row) for row in rows]
async def delete_asset(asset_id: str) -> bool:
"""Delete an asset by name/id."""
async with get_connection() as conn:
result = await conn.execute("DELETE FROM assets WHERE name = $1", asset_id)
return "DELETE 1" in result
async def count_users() -> int:
"""Count total users."""
async with get_connection() as conn:
return await conn.fetchval("SELECT COUNT(*) FROM users")
async def count_user_activities(username: str) -> int:
"""Count activities by a user."""
async with get_connection() as conn:
return await conn.fetchval(
"SELECT COUNT(*) FROM activities WHERE actor_id LIKE $1",
f"%{username}%"
)
async def get_user_activities(username: str, limit: int = 20, offset: int = 0) -> list[dict]:
"""Get activities by a user."""
async with get_connection() as conn:
rows = await conn.fetch(
"""SELECT activity_id, activity_type, actor_id, object_data, published, signature
FROM activities WHERE actor_id LIKE $1
ORDER BY published DESC LIMIT $2 OFFSET $3""",
f"%{username}%", limit, offset
)
return [_parse_activity_row(row) for row in rows]
async def get_renderer(renderer_id: str) -> Optional[dict]:
"""Get a renderer by ID/URL."""
async with get_connection() as conn:
row = await conn.fetchrow(
"SELECT * FROM user_renderers WHERE l1_url = $1",
renderer_id
)
return dict(row) if row else None
async def update_anchor(anchor_id: str, **updates) -> bool:
"""Update an anchor."""
async with get_connection() as conn:
if "bitcoin_txid" in updates:
result = await conn.execute(
"""UPDATE anchors SET bitcoin_txid = $1, confirmed_at = NOW()
WHERE merkle_root = $2""",
updates["bitcoin_txid"], anchor_id
)
return "UPDATE 1" in result
return False
async def delete_anchor(anchor_id: str) -> bool:
"""Delete an anchor."""
async with get_connection() as conn:
result = await conn.execute(
"DELETE FROM anchors WHERE merkle_root = $1", anchor_id
)
return "DELETE 1" in result
async def record_run(run_id: str, username: str, recipe: str, inputs: list,
output_hash: str, ipfs_cid: str = None, asset_id: str = None) -> dict:
"""Record a completed run."""
async with get_connection() as conn:
# Check if runs table exists, if not just return the data
try:
row = await conn.fetchrow(
"""INSERT INTO runs (run_id, username, recipe, inputs, output_hash, ipfs_cid, asset_id, created_at)
VALUES ($1, $2, $3, $4, $5, $6, $7, NOW())
ON CONFLICT (run_id) DO UPDATE SET
output_hash = EXCLUDED.output_hash,
ipfs_cid = EXCLUDED.ipfs_cid,
asset_id = EXCLUDED.asset_id
RETURNING *""",
run_id, username, recipe, json.dumps(inputs), output_hash, ipfs_cid, asset_id
)
return dict(row) if row else None
except Exception:
# Table might not exist
return {"run_id": run_id, "username": username, "recipe": recipe}
async def get_run(run_id: str) -> Optional[dict]:
"""Get a run by ID."""
async with get_connection() as conn:
try:
row = await conn.fetchrow("SELECT * FROM runs WHERE run_id = $1", run_id)
if row:
result = dict(row)
if result.get("inputs") and isinstance(result["inputs"], str):
result["inputs"] = json.loads(result["inputs"])
return result
except Exception:
pass
return None