Add synchronous database functions for streaming

- resolve_friendly_name_sync: for resolving friendly names in sync code
- get_ipfs_cid_sync: for looking up IPFS CIDs in sync code

These are needed for video streaming callbacks that can't use async/await.

Co-Authored-By: Claude Opus 4.5 <noreply@anthropic.com>
This commit is contained in:
giles
2026-02-04 20:29:37 +00:00
parent 76e4a002a0
commit cdbc962b12

View File

@@ -1884,3 +1884,102 @@ async def update_friendly_name_cid(actor_id: str, old_cid: str, new_cid: str) ->
actor_id, old_cid, new_cid
)
return "UPDATE 1" in result
# =============================================================================
# SYNCHRONOUS DATABASE FUNCTIONS (for use from non-async contexts like video streaming)
# =============================================================================
def resolve_friendly_name_sync(
actor_id: str,
name: str,
item_type: Optional[str] = None,
) -> Optional[str]:
"""
Synchronous version of resolve_friendly_name using psycopg2.
Useful when calling from synchronous code (e.g., video streaming callbacks)
where async/await is not possible.
Returns CID or None if not found.
"""
import psycopg2
parts = name.strip().split(' ')
base_name = parts[0]
version_id = parts[1] if len(parts) > 1 else None
try:
conn = psycopg2.connect(DATABASE_URL)
cursor = conn.cursor()
if version_id:
# Exact version lookup
if item_type:
query = """
SELECT cid FROM friendly_names
WHERE actor_id = %s AND base_name = %s AND version_id = %s AND item_type = %s
"""
cursor.execute(query, (actor_id, base_name, version_id, item_type))
else:
query = """
SELECT cid FROM friendly_names
WHERE actor_id = %s AND base_name = %s AND version_id = %s
"""
cursor.execute(query, (actor_id, base_name, version_id))
else:
# Latest version lookup
if item_type:
query = """
SELECT cid FROM friendly_names
WHERE actor_id = %s AND base_name = %s AND item_type = %s
ORDER BY created_at DESC LIMIT 1
"""
cursor.execute(query, (actor_id, base_name, item_type))
else:
query = """
SELECT cid FROM friendly_names
WHERE actor_id = %s AND base_name = %s
ORDER BY created_at DESC LIMIT 1
"""
cursor.execute(query, (actor_id, base_name))
result = cursor.fetchone()
cursor.close()
conn.close()
return result[0] if result else None
except Exception as e:
import sys
print(f"resolve_friendly_name_sync ERROR: {e}", file=sys.stderr)
return None
def get_ipfs_cid_sync(cid: str) -> Optional[str]:
"""
Synchronous version of get_ipfs_cid using psycopg2.
Returns the IPFS CID for a given internal CID, or None if not found.
"""
import psycopg2
try:
conn = psycopg2.connect(DATABASE_URL)
cursor = conn.cursor()
cursor.execute(
"SELECT ipfs_cid FROM cache_items WHERE cid = %s",
(cid,)
)
result = cursor.fetchone()
cursor.close()
conn.close()
return result[0] if result else None
except Exception as e:
import sys
print(f"get_ipfs_cid_sync ERROR: {e}", file=sys.stderr)
return None