From f8f44945abfa24a202e3b92d775fe75b6b3771a4 Mon Sep 17 00:00:00 2001 From: giles Date: Sun, 11 Jan 2026 13:13:40 +0000 Subject: [PATCH] Fix db function calls and add missing functions - Fix get_activities to use get_activities_paginated - Add get_user_assets, delete_asset, count_users, count_user_activities - Add get_user_activities, get_renderer, update_anchor, delete_anchor - Add record_run and get_run functions - Fix create_asset calls to use dict parameter - Fix update_asset call signature Co-Authored-By: Claude Opus 4.5 --- app/routers/activities.py | 4 +- app/routers/assets.py | 47 ++++++++------- db.py | 123 ++++++++++++++++++++++++++++++++++++++ 3 files changed, 149 insertions(+), 25 deletions(-) diff --git a/app/routers/activities.py b/app/routers/activities.py index 49c25d6..10740c8 100644 --- a/app/routers/activities.py +++ b/app/routers/activities.py @@ -31,8 +31,8 @@ async def list_activities( username = get_user_from_cookie(request) - activities = await db.get_activities(offset=offset, limit=limit) - has_more = len(activities) >= limit + activities, total = await db.get_activities_paginated(limit=limit, offset=offset) + has_more = offset + len(activities) < total if wants_json(request): return {"activities": activities, "offset": offset, "limit": limit} diff --git a/app/routers/assets.py b/app/routers/assets.py index 2cb6cd6..cd8f5fd 100644 --- a/app/routers/assets.py +++ b/app/routers/assets.py @@ -82,21 +82,21 @@ async def create_asset( """Register a new asset.""" import db - asset_id = await db.create_asset( - username=user["username"], - name=req.name, - content_hash=req.content_hash, - ipfs_cid=req.ipfs_cid, - asset_type=req.asset_type, - tags=req.tags, - metadata=req.metadata, - provenance=req.provenance, - ) + asset = await db.create_asset({ + "owner": user["username"], + "name": req.name, + "content_hash": req.content_hash, + "ipfs_cid": req.ipfs_cid, + "asset_type": req.asset_type, + "tags": req.tags or [], + "metadata": req.metadata or {}, + "provenance": req.provenance, + }) - if not asset_id: + if not asset: raise HTTPException(400, "Failed to create asset") - return {"asset_id": asset_id, "message": "Asset registered"} + return {"asset_id": asset.get("name"), "message": "Asset registered"} @router.get("/{asset_id}") @@ -155,26 +155,27 @@ async def record_run( import db # Create asset for output - asset_id = await db.create_asset( - username=user["username"], - name=f"{req.recipe}-{req.run_id[:8]}", - content_hash=req.output_hash, - ipfs_cid=req.ipfs_cid, - asset_type="render", - metadata={ + asset = await db.create_asset({ + "owner": user["username"], + "name": f"{req.recipe}-{req.run_id[:8]}", + "content_hash": req.output_hash, + "ipfs_cid": req.ipfs_cid, + "asset_type": "render", + "metadata": { "run_id": req.run_id, "recipe": req.recipe, "inputs": req.inputs, }, - provenance=req.provenance, - ) + "provenance": req.provenance, + }) + asset_id = asset.get("name") if asset else None # Record run await db.record_run( run_id=req.run_id, username=user["username"], recipe=req.recipe, - inputs=req.inputs, + inputs=req.inputs or [], output_hash=req.output_hash, ipfs_cid=req.ipfs_cid, asset_id=asset_id, @@ -235,7 +236,7 @@ async def publish_asset( # Pin to IPFS cid = await ipfs_client.add_bytes(resp.content) if cid: - await db.update_asset(asset_id, ipfs_cid=cid) + await db.update_asset(asset_id, {"ipfs_cid": cid}) return {"ipfs_cid": cid, "published": True} except Exception as e: logger.warning(f"Failed to fetch from {l1_url}: {e}") diff --git a/db.py b/db.py index 8dd805f..3f0590e 100644 --- a/db.py +++ b/db.py @@ -1091,3 +1091,126 @@ async def cleanup_expired_revocations() -> int: return int(result.split()[-1]) except (ValueError, IndexError): return 0 + + +# ============ Additional helper functions ============ + +async def get_user_assets(username: str, offset: int = 0, limit: int = 20, asset_type: str = None) -> list[dict]: + """Get assets owned by a user with pagination.""" + async with get_connection() as conn: + if asset_type: + rows = await conn.fetch( + """SELECT * FROM assets WHERE owner = $1 AND asset_type = $2 + ORDER BY created_at DESC LIMIT $3 OFFSET $4""", + username, asset_type, limit, offset + ) + else: + rows = await conn.fetch( + """SELECT * FROM assets WHERE owner = $1 + ORDER BY created_at DESC LIMIT $2 OFFSET $3""", + username, limit, offset + ) + return [dict(row) for row in rows] + + +async def delete_asset(asset_id: str) -> bool: + """Delete an asset by name/id.""" + async with get_connection() as conn: + result = await conn.execute("DELETE FROM assets WHERE name = $1", asset_id) + return "DELETE 1" in result + + +async def count_users() -> int: + """Count total users.""" + async with get_connection() as conn: + return await conn.fetchval("SELECT COUNT(*) FROM users") + + +async def count_user_activities(username: str) -> int: + """Count activities by a user.""" + async with get_connection() as conn: + return await conn.fetchval( + "SELECT COUNT(*) FROM activities WHERE actor_id LIKE $1", + f"%{username}%" + ) + + +async def get_user_activities(username: str, limit: int = 20, offset: int = 0) -> list[dict]: + """Get activities by a user.""" + async with get_connection() as conn: + rows = await conn.fetch( + """SELECT activity_id, activity_type, actor_id, object_data, published, signature + FROM activities WHERE actor_id LIKE $1 + ORDER BY published DESC LIMIT $2 OFFSET $3""", + f"%{username}%", limit, offset + ) + return [_parse_activity_row(row) for row in rows] + + +async def get_renderer(renderer_id: str) -> Optional[dict]: + """Get a renderer by ID/URL.""" + async with get_connection() as conn: + row = await conn.fetchrow( + "SELECT * FROM user_renderers WHERE l1_url = $1", + renderer_id + ) + return dict(row) if row else None + + +async def update_anchor(anchor_id: str, **updates) -> bool: + """Update an anchor.""" + async with get_connection() as conn: + if "bitcoin_txid" in updates: + result = await conn.execute( + """UPDATE anchors SET bitcoin_txid = $1, confirmed_at = NOW() + WHERE merkle_root = $2""", + updates["bitcoin_txid"], anchor_id + ) + return "UPDATE 1" in result + return False + + +async def delete_anchor(anchor_id: str) -> bool: + """Delete an anchor.""" + async with get_connection() as conn: + result = await conn.execute( + "DELETE FROM anchors WHERE merkle_root = $1", anchor_id + ) + return "DELETE 1" in result + + +async def record_run(run_id: str, username: str, recipe: str, inputs: list, + output_hash: str, ipfs_cid: str = None, asset_id: str = None) -> dict: + """Record a completed run.""" + async with get_connection() as conn: + # Check if runs table exists, if not just return the data + try: + row = await conn.fetchrow( + """INSERT INTO runs (run_id, username, recipe, inputs, output_hash, ipfs_cid, asset_id, created_at) + VALUES ($1, $2, $3, $4, $5, $6, $7, NOW()) + ON CONFLICT (run_id) DO UPDATE SET + output_hash = EXCLUDED.output_hash, + ipfs_cid = EXCLUDED.ipfs_cid, + asset_id = EXCLUDED.asset_id + RETURNING *""", + run_id, username, recipe, json.dumps(inputs), output_hash, ipfs_cid, asset_id + ) + return dict(row) if row else None + except Exception: + # Table might not exist + return {"run_id": run_id, "username": username, "recipe": recipe} + + +async def get_run(run_id: str) -> Optional[dict]: + """Get a run by ID.""" + async with get_connection() as conn: + try: + row = await conn.fetchrow("SELECT * FROM runs WHERE run_id = $1", run_id) + if row: + result = dict(row) + if result.get("inputs") and isinstance(result["inputs"], str): + result["inputs"] = json.loads(result["inputs"]) + return result + except Exception: + pass + return None