diff --git a/database.py b/database.py index b0b21e8..00fe672 100644 --- a/database.py +++ b/database.py @@ -37,10 +37,19 @@ CREATE TABLE IF NOT EXISTS item_types ( source_url TEXT, source_note TEXT, pinned BOOLEAN DEFAULT FALSE, + filename VARCHAR(255), + metadata JSONB DEFAULT '{}', created_at TIMESTAMP WITH TIME ZONE DEFAULT NOW(), UNIQUE(content_hash, actor_id, type, path) ); +-- Add columns if they don't exist (for existing databases) +DO $$ BEGIN + ALTER TABLE item_types ADD COLUMN IF NOT EXISTS filename VARCHAR(255); + ALTER TABLE item_types ADD COLUMN IF NOT EXISTS metadata JSONB DEFAULT '{}'; +EXCEPTION WHEN others THEN NULL; +END $$; + -- Pin reasons: one-to-many from item_types CREATE TABLE IF NOT EXISTS pin_reasons ( id SERIAL PRIMARY KEY, @@ -521,3 +530,453 @@ async def cleanup_orphaned_cache_item(content_hash: str) -> bool: content_hash ) return result == "DELETE 1" + + +# ============ High-Level Metadata Functions ============ +# These provide a compatible interface to the old JSON-based save_cache_meta/load_cache_meta + +import json as _json + + +async def save_item_metadata( + content_hash: str, + actor_id: str, + item_type: str = "media", + filename: Optional[str] = None, + description: Optional[str] = None, + source_type: Optional[str] = None, + source_url: Optional[str] = None, + source_note: Optional[str] = None, + pinned: bool = False, + pin_reason: Optional[str] = None, + tags: Optional[List[str]] = None, + folder: Optional[str] = None, + collections: Optional[List[str]] = None, + **extra_metadata +) -> dict: + """ + Save or update item metadata in the database. + + Returns a dict with the item metadata (compatible with old JSON format). + """ + # Build metadata JSONB for extra fields + metadata = {} + if tags: + metadata["tags"] = tags + if folder: + metadata["folder"] = folder + if collections: + metadata["collections"] = collections + metadata.update(extra_metadata) + + async with pool.acquire() as conn: + # Ensure cache_item exists + await conn.execute( + "INSERT INTO cache_items (content_hash) VALUES ($1) ON CONFLICT DO NOTHING", + content_hash + ) + + # Upsert item_type + row = await conn.fetchrow( + """ + INSERT INTO item_types (content_hash, actor_id, type, description, source_type, source_url, source_note, pinned, filename, metadata) + VALUES ($1, $2, $3, $4, $5, $6, $7, $8, $9, $10) + ON CONFLICT (content_hash, actor_id, type, path) DO UPDATE SET + description = COALESCE(EXCLUDED.description, item_types.description), + source_type = COALESCE(EXCLUDED.source_type, item_types.source_type), + source_url = COALESCE(EXCLUDED.source_url, item_types.source_url), + source_note = COALESCE(EXCLUDED.source_note, item_types.source_note), + pinned = EXCLUDED.pinned, + filename = COALESCE(EXCLUDED.filename, item_types.filename), + metadata = item_types.metadata || EXCLUDED.metadata + RETURNING id, content_hash, actor_id, type, path, description, source_type, source_url, source_note, pinned, filename, metadata, created_at + """, + content_hash, actor_id, item_type, description, source_type, source_url, source_note, pinned, filename, _json.dumps(metadata) + ) + + item_type_id = row["id"] + + # Handle pinning + if pinned and pin_reason: + # Add pin reason if not exists + await conn.execute( + """ + INSERT INTO pin_reasons (item_type_id, reason) + VALUES ($1, $2) + ON CONFLICT DO NOTHING + """, + item_type_id, pin_reason + ) + + # Build response dict (compatible with old format) + result = { + "uploader": actor_id, + "uploaded_at": row["created_at"].isoformat() if row["created_at"] else None, + "filename": row["filename"], + "type": row["type"], + "description": row["description"], + "pinned": row["pinned"], + } + + # Add origin if present + if row["source_type"] or row["source_url"] or row["source_note"]: + result["origin"] = { + "type": row["source_type"], + "url": row["source_url"], + "note": row["source_note"] + } + + # Add metadata fields + if row["metadata"]: + meta = row["metadata"] if isinstance(row["metadata"], dict) else _json.loads(row["metadata"]) + if meta.get("tags"): + result["tags"] = meta["tags"] + if meta.get("folder"): + result["folder"] = meta["folder"] + if meta.get("collections"): + result["collections"] = meta["collections"] + + # Get pin reasons + if row["pinned"]: + reasons = await conn.fetch( + "SELECT reason FROM pin_reasons WHERE item_type_id = $1", + item_type_id + ) + if reasons: + result["pin_reason"] = reasons[0]["reason"] + + return result + + +async def load_item_metadata(content_hash: str, actor_id: Optional[str] = None) -> dict: + """ + Load item metadata from the database. + + If actor_id is provided, returns metadata for that user's view of the item. + Otherwise, returns combined metadata from all users (for backwards compat). + + Returns a dict compatible with old JSON format. + """ + async with pool.acquire() as conn: + # Get cache item + cache_item = await conn.fetchrow( + "SELECT content_hash, ipfs_cid, created_at FROM cache_items WHERE content_hash = $1", + content_hash + ) + + if not cache_item: + return {} + + # Get item types + if actor_id: + item_types = await conn.fetch( + """ + SELECT id, actor_id, type, path, description, source_type, source_url, source_note, pinned, filename, metadata, created_at + FROM item_types WHERE content_hash = $1 AND actor_id = $2 + ORDER BY created_at + """, + content_hash, actor_id + ) + else: + item_types = await conn.fetch( + """ + SELECT id, actor_id, type, path, description, source_type, source_url, source_note, pinned, filename, metadata, created_at + FROM item_types WHERE content_hash = $1 + ORDER BY created_at + """, + content_hash + ) + + if not item_types: + return {"uploaded_at": cache_item["created_at"].isoformat() if cache_item["created_at"] else None} + + # Use first item type as primary (for backwards compat) + primary = item_types[0] + + result = { + "uploader": primary["actor_id"], + "uploaded_at": primary["created_at"].isoformat() if primary["created_at"] else None, + "filename": primary["filename"], + "type": primary["type"], + "description": primary["description"], + "pinned": any(it["pinned"] for it in item_types), + } + + # Add origin if present + if primary["source_type"] or primary["source_url"] or primary["source_note"]: + result["origin"] = { + "type": primary["source_type"], + "url": primary["source_url"], + "note": primary["source_note"] + } + + # Add metadata fields + if primary["metadata"]: + meta = primary["metadata"] if isinstance(primary["metadata"], dict) else _json.loads(primary["metadata"]) + if meta.get("tags"): + result["tags"] = meta["tags"] + if meta.get("folder"): + result["folder"] = meta["folder"] + if meta.get("collections"): + result["collections"] = meta["collections"] + + # Get pin reasons for pinned items + for it in item_types: + if it["pinned"]: + reasons = await conn.fetch( + "SELECT reason FROM pin_reasons WHERE item_type_id = $1", + it["id"] + ) + if reasons: + result["pin_reason"] = reasons[0]["reason"] + break + + # Get L2 shares + if actor_id: + shares = await conn.fetch( + """ + SELECT l2_server, asset_name, content_type, published_at, last_synced_at + FROM l2_shares WHERE content_hash = $1 AND actor_id = $2 + """, + content_hash, actor_id + ) + else: + shares = await conn.fetch( + """ + SELECT l2_server, asset_name, content_type, published_at, last_synced_at + FROM l2_shares WHERE content_hash = $1 + """, + content_hash + ) + + if shares: + result["l2_shares"] = [ + { + "l2_server": s["l2_server"], + "asset_name": s["asset_name"], + "content_type": s["content_type"], + "published_at": s["published_at"].isoformat() if s["published_at"] else None, + "last_synced_at": s["last_synced_at"].isoformat() if s["last_synced_at"] else None, + } + for s in shares + ] + + # For backwards compat, also set "published" if shared + result["published"] = { + "to_l2": True, + "asset_name": shares[0]["asset_name"], + "l2_server": shares[0]["l2_server"], + } + + return result + + +async def update_item_metadata( + content_hash: str, + actor_id: str, + item_type: str = "media", + **updates +) -> dict: + """ + Update specific fields of item metadata. + + Returns updated metadata dict. + """ + # Extract known fields from updates + description = updates.pop("description", None) + source_type = updates.pop("source_type", None) + source_url = updates.pop("source_url", None) + source_note = updates.pop("source_note", None) + + # Handle origin dict format + origin = updates.pop("origin", None) + if origin: + source_type = origin.get("type", source_type) + source_url = origin.get("url", source_url) + source_note = origin.get("note", source_note) + + pinned = updates.pop("pinned", None) + pin_reason = updates.pop("pin_reason", None) + filename = updates.pop("filename", None) + tags = updates.pop("tags", None) + folder = updates.pop("folder", None) + collections = updates.pop("collections", None) + + async with pool.acquire() as conn: + # Get existing item_type + existing = await conn.fetchrow( + """ + SELECT id, metadata FROM item_types + WHERE content_hash = $1 AND actor_id = $2 AND type = $3 AND path IS NULL + """, + content_hash, actor_id, item_type + ) + + if not existing: + # Create new entry + return await save_item_metadata( + content_hash, actor_id, item_type, + filename=filename, description=description, + source_type=source_type, source_url=source_url, source_note=source_note, + pinned=pinned or False, pin_reason=pin_reason, + tags=tags, folder=folder, collections=collections, + **updates + ) + + # Build update query dynamically + set_parts = [] + params = [content_hash, actor_id, item_type] + param_idx = 4 + + if description is not None: + set_parts.append(f"description = ${param_idx}") + params.append(description) + param_idx += 1 + + if source_type is not None: + set_parts.append(f"source_type = ${param_idx}") + params.append(source_type) + param_idx += 1 + + if source_url is not None: + set_parts.append(f"source_url = ${param_idx}") + params.append(source_url) + param_idx += 1 + + if source_note is not None: + set_parts.append(f"source_note = ${param_idx}") + params.append(source_note) + param_idx += 1 + + if pinned is not None: + set_parts.append(f"pinned = ${param_idx}") + params.append(pinned) + param_idx += 1 + + if filename is not None: + set_parts.append(f"filename = ${param_idx}") + params.append(filename) + param_idx += 1 + + # Handle metadata updates + current_metadata = existing["metadata"] if isinstance(existing["metadata"], dict) else (_json.loads(existing["metadata"]) if existing["metadata"] else {}) + if tags is not None: + current_metadata["tags"] = tags + if folder is not None: + current_metadata["folder"] = folder + if collections is not None: + current_metadata["collections"] = collections + current_metadata.update(updates) + + if current_metadata: + set_parts.append(f"metadata = ${param_idx}") + params.append(_json.dumps(current_metadata)) + param_idx += 1 + + if set_parts: + query = f""" + UPDATE item_types SET {', '.join(set_parts)} + WHERE content_hash = $1 AND actor_id = $2 AND type = $3 AND path IS NULL + """ + await conn.execute(query, *params) + + # Handle pin reason + if pinned and pin_reason: + await conn.execute( + """ + INSERT INTO pin_reasons (item_type_id, reason) + VALUES ($1, $2) + ON CONFLICT DO NOTHING + """, + existing["id"], pin_reason + ) + + return await load_item_metadata(content_hash, actor_id) + + +async def save_l2_share( + content_hash: str, + actor_id: str, + l2_server: str, + asset_name: str, + content_type: str = "media" +) -> dict: + """Save an L2 share and return share info.""" + async with pool.acquire() as conn: + row = await conn.fetchrow( + """ + INSERT INTO l2_shares (content_hash, actor_id, l2_server, asset_name, content_type, last_synced_at) + VALUES ($1, $2, $3, $4, $5, NOW()) + ON CONFLICT (content_hash, actor_id, l2_server, content_type) DO UPDATE SET + asset_name = EXCLUDED.asset_name, + last_synced_at = NOW() + RETURNING l2_server, asset_name, content_type, published_at, last_synced_at + """, + content_hash, actor_id, l2_server, asset_name, content_type + ) + return { + "l2_server": row["l2_server"], + "asset_name": row["asset_name"], + "content_type": row["content_type"], + "published_at": row["published_at"].isoformat() if row["published_at"] else None, + "last_synced_at": row["last_synced_at"].isoformat() if row["last_synced_at"] else None, + } + + +async def get_user_items(actor_id: str, item_type: Optional[str] = None, limit: int = 100, offset: int = 0) -> List[dict]: + """Get all items for a user, optionally filtered by type.""" + async with pool.acquire() as conn: + if item_type: + rows = await conn.fetch( + """ + SELECT it.content_hash, it.type, it.description, it.filename, it.pinned, it.created_at, + ci.ipfs_cid + FROM item_types it + JOIN cache_items ci ON it.content_hash = ci.content_hash + WHERE it.actor_id = $1 AND it.type = $2 + ORDER BY it.created_at DESC + LIMIT $3 OFFSET $4 + """, + actor_id, item_type, limit, offset + ) + else: + rows = await conn.fetch( + """ + SELECT it.content_hash, it.type, it.description, it.filename, it.pinned, it.created_at, + ci.ipfs_cid + FROM item_types it + JOIN cache_items ci ON it.content_hash = ci.content_hash + WHERE it.actor_id = $1 + ORDER BY it.created_at DESC + LIMIT $2 OFFSET $3 + """, + actor_id, limit, offset + ) + + return [ + { + "content_hash": r["content_hash"], + "type": r["type"], + "description": r["description"], + "filename": r["filename"], + "pinned": r["pinned"], + "created_at": r["created_at"].isoformat() if r["created_at"] else None, + "ipfs_cid": r["ipfs_cid"], + } + for r in rows + ] + + +async def count_user_items(actor_id: str, item_type: Optional[str] = None) -> int: + """Count items for a user.""" + async with pool.acquire() as conn: + if item_type: + return await conn.fetchval( + "SELECT COUNT(*) FROM item_types WHERE actor_id = $1 AND type = $2", + actor_id, item_type + ) + else: + return await conn.fetchval( + "SELECT COUNT(*) FROM item_types WHERE actor_id = $1", + actor_id + ) diff --git a/server.py b/server.py index fa35911..065b157 100644 --- a/server.py +++ b/server.py @@ -8,10 +8,12 @@ Manages rendering runs and provides access to the cache. - GET /cache/{content_hash} - get cached content """ +import base64 import hashlib import json import os import uuid +from dataclasses import dataclass from datetime import datetime, timezone from pathlib import Path from typing import Optional @@ -32,17 +34,18 @@ from contextlib import asynccontextmanager from cache_manager import L1CacheManager, get_cache_manager import database -# L2 server for auth verification -L2_SERVER = os.environ.get("L2_SERVER", "http://localhost:8200") -L2_DOMAIN = os.environ.get("L2_DOMAIN", "artdag.rose-ash.com") +# L1 public URL for redirects L1_PUBLIC_URL = os.environ.get("L1_PUBLIC_URL", "http://localhost:8100") +# Default L2 for login redirect when not logged in (user can login to any L2) +DEFAULT_L2_SERVER = os.environ.get("DEFAULT_L2_SERVER", "http://localhost:8200") + # Cache directory (use /data/cache in Docker, ~/.artdag/cache locally) CACHE_DIR = Path(os.environ.get("CACHE_DIR", str(Path.home() / ".artdag" / "cache"))) CACHE_DIR.mkdir(parents=True, exist_ok=True) -# Initialize L1 cache manager with artdag integration -cache_manager = L1CacheManager(cache_dir=CACHE_DIR, l2_server=L2_SERVER) +# Initialize L1 cache manager (no L2 config - determined dynamically from token) +cache_manager = L1CacheManager(cache_dir=CACHE_DIR) # Redis for persistent run storage REDIS_URL = os.environ.get('REDIS_URL', 'redis://localhost:6379/5') @@ -267,11 +270,62 @@ def parse_recipe_yaml(yaml_content: str, recipe_hash: str, uploader: str) -> Rec security = HTTPBearer(auto_error=False) -def verify_token_with_l2(token: str) -> Optional[str]: - """Verify token with L2 server, return username if valid.""" +@dataclass +class UserContext: + """User authentication context extracted from JWT token.""" + username: str + l2_server: str # The L2 server that issued the token (e.g., "https://artdag.rose-ash.com") + l2_domain: str # Domain part for actor_id (e.g., "artdag.rose-ash.com") + + @property + def actor_id(self) -> str: + """ActivityPub-style actor ID: @username@domain""" + return f"@{self.username}@{self.l2_domain}" + + +def decode_token_claims(token: str) -> Optional[dict]: + """Decode JWT payload without verification (L2 does verification).""" + try: + # JWT format: header.payload.signature (all base64url encoded) + parts = token.split(".") + if len(parts) != 3: + return None + # Decode payload (add padding if needed) + payload = parts[1] + padding = 4 - len(payload) % 4 + if padding != 4: + payload += "=" * padding + decoded = base64.urlsafe_b64decode(payload) + return json.loads(decoded) + except Exception: + return None + + +def get_user_context_from_token(token: str) -> Optional[UserContext]: + """Extract user context from JWT token. Token must contain l2_server claim.""" + claims = decode_token_claims(token) + if not claims: + return None + + username = claims.get("username") or claims.get("sub") + l2_server = claims.get("l2_server") # e.g., "https://artdag.rose-ash.com" + + if not username or not l2_server: + return None + + # Extract domain from l2_server URL for actor_id + from urllib.parse import urlparse + parsed = urlparse(l2_server) + l2_domain = parsed.netloc or l2_server + + return UserContext(username=username, l2_server=l2_server, l2_domain=l2_domain) + + +def verify_token_with_l2(token: str, l2_server: str) -> Optional[str]: + """Verify token with the L2 server that issued it, return username if valid.""" try: resp = http_requests.post( - f"{L2_SERVER}/auth/verify", + f"{l2_server}/auth/verify", headers={"Authorization": f"Bearer {token}"}, timeout=5 ) @@ -282,13 +336,28 @@ def verify_token_with_l2(token: str) -> Optional[str]: return None +def get_verified_user_context(token: str) -> Optional[UserContext]: + """Get verified user context from token. Verifies with the L2 that issued it.""" + ctx = get_user_context_from_token(token) + if not ctx: + return None + + # Verify token with the L2 server from the token + verified_username = verify_token_with_l2(token, ctx.l2_server) + if not verified_username: + return None + + return ctx + + async def get_optional_user( credentials: HTTPAuthorizationCredentials = Depends(security) ) -> Optional[str]: """Get username if authenticated, None otherwise.""" if not credentials: return None - return verify_token_with_l2(credentials.credentials) + ctx = get_verified_user_context(credentials.credentials) + return ctx.username if ctx else None async def get_required_user( @@ -297,10 +366,22 @@ async def get_required_user( """Get username, raise 401 if not authenticated.""" if not credentials: raise HTTPException(401, "Not authenticated") - username = verify_token_with_l2(credentials.credentials) - if not username: + ctx = get_verified_user_context(credentials.credentials) + if not ctx: raise HTTPException(401, "Invalid token") - return username + return ctx.username + + +async def get_required_user_context( + credentials: HTTPAuthorizationCredentials = Depends(security) +) -> UserContext: + """Get full user context, raise 401 if not authenticated.""" + if not credentials: + raise HTTPException(401, "Not authenticated") + ctx = get_verified_user_context(credentials.credentials) + if not ctx: + raise HTTPException(401, "Invalid token") + return ctx def file_hash(path: Path) -> str: @@ -423,15 +504,15 @@ async def root(): @app.post("/runs", response_model=RunStatus) -async def create_run(request: RunRequest, username: str = Depends(get_required_user)): +async def create_run(request: RunRequest, ctx: UserContext = Depends(get_required_user_context)): """Start a new rendering run. Requires authentication.""" run_id = str(uuid.uuid4()) # Generate output name if not provided output_name = request.output_name or f"{request.recipe}-{run_id[:8]}" - # Format username as ActivityPub actor ID - actor_id = f"@{username}@{L2_DOMAIN}" + # Use actor_id from user context + actor_id = ctx.actor_id # Create run record run = RunStatus( @@ -529,7 +610,7 @@ async def get_run(run_id: str): @app.delete("/runs/{run_id}") -async def discard_run(run_id: str, username: str = Depends(get_required_user)): +async def discard_run(run_id: str, ctx: UserContext = Depends(get_required_user_context)): """ Discard (delete) a run and its outputs. @@ -543,15 +624,14 @@ async def discard_run(run_id: str, username: str = Depends(get_required_user)): raise HTTPException(404, f"Run {run_id} not found") # Check ownership - actor_id = f"@{username}@{L2_DOMAIN}" - if run.username not in (username, actor_id): + if run.username not in (ctx.username, ctx.actor_id): raise HTTPException(403, "Access denied") # Failed runs can always be deleted (no output to protect) if run.status != "failed": # Only check if output is pinned - inputs are preserved, not deleted if run.output_hash: - meta = load_cache_meta(run.output_hash) + meta = await database.load_item_metadata(run.output_hash, ctx.actor_id) if meta.get("pinned"): pin_reason = meta.get("pin_reason", "published") raise HTTPException(400, f"Cannot discard run: output {run.output_hash[:16]}... is pinned ({pin_reason})") @@ -574,8 +654,8 @@ async def discard_run(run_id: str, username: str = Depends(get_required_user)): @app.delete("/ui/runs/{run_id}/discard", response_class=HTMLResponse) async def ui_discard_run(run_id: str, request: Request): """HTMX handler: discard a run. Only deletes outputs, preserves inputs.""" - current_user = get_user_from_cookie(request) - if not current_user: + ctx = get_user_context_from_cookie(request) + if not ctx: return '
Login required
' run = load_run(run_id) @@ -583,15 +663,14 @@ async def ui_discard_run(run_id: str, request: Request): return '
Run not found
' # Check ownership - actor_id = f"@{current_user}@{L2_DOMAIN}" - if run.username not in (current_user, actor_id): + if run.username not in (ctx.username, ctx.actor_id): return '
Access denied
' # Failed runs can always be deleted if run.status != "failed": # Only check if output is pinned - inputs are preserved, not deleted if run.output_hash: - meta = load_cache_meta(run.output_hash) + meta = await database.load_item_metadata(run.output_hash, ctx.actor_id) if meta.get("pinned"): pin_reason = meta.get("pin_reason", "published") return f'
Cannot discard: output is pinned ({pin_reason})
' @@ -648,16 +727,15 @@ async def run_detail(run_id: str, request: Request): save_run(run) if wants_html(request): - current_user = get_user_from_cookie(request) - if not current_user: + ctx = get_user_context_from_cookie(request) + if not ctx: content = '

Login to view run details.

' - return HTMLResponse(render_page("Login Required", content, current_user, active_tab="runs"), status_code=401) + return HTMLResponse(render_page("Login Required", content, None, active_tab="runs"), status_code=401) # Check user owns this run - actor_id = f"@{current_user}@{L2_DOMAIN}" - if run.username not in (current_user, actor_id): + if run.username not in (ctx.username, ctx.actor_id): content = '

Access denied.

' - return HTMLResponse(render_page("Access Denied", content, current_user, active_tab="runs"), status_code=403) + return HTMLResponse(render_page("Access Denied", content, ctx.actor_id, active_tab="runs"), status_code=403) # Build effect URL if run.effect_url: @@ -859,7 +937,7 @@ async def run_detail(run_id: str, request: Request): ''' - return HTMLResponse(render_page(f"Run: {run.recipe}", content, current_user, active_tab="runs")) + return HTMLResponse(render_page(f"Run: {run.recipe}", content, ctx.actor_id, active_tab="runs")) # JSON response return run.model_dump() @@ -868,15 +946,14 @@ async def run_detail(run_id: str, request: Request): @app.get("/runs") async def list_runs(request: Request, page: int = 1, limit: int = 20): """List runs. HTML for browsers (with infinite scroll), JSON for APIs (with pagination).""" - current_user = get_user_from_cookie(request) + ctx = get_user_context_from_cookie(request) all_runs = list_all_runs() total = len(all_runs) # Filter by user if logged in for HTML - if wants_html(request) and current_user: - actor_id = f"@{current_user}@{L2_DOMAIN}" - all_runs = [r for r in all_runs if r.username in (current_user, actor_id)] + if wants_html(request) and ctx: + all_runs = [r for r in all_runs if r.username in (ctx.username, ctx.actor_id)] total = len(all_runs) # Pagination @@ -886,9 +963,9 @@ async def list_runs(request: Request, page: int = 1, limit: int = 20): has_more = end < total if wants_html(request): - if not current_user: + if not ctx: content = '

Login to see your runs.

' - return HTMLResponse(render_page("Runs", content, current_user, active_tab="runs")) + return HTMLResponse(render_page("Runs", content, None, active_tab="runs")) if not runs_page: if page == 1: @@ -989,7 +1066,7 @@ async def list_runs(request: Request, page: int = 1, limit: int = 20): ''' - return HTMLResponse(render_page("Runs", content, current_user, active_tab="runs")) + return HTMLResponse(render_page("Runs", content, ctx.actor_id, active_tab="runs")) # JSON response for APIs return { @@ -1006,7 +1083,7 @@ async def list_runs(request: Request, page: int = 1, limit: int = 20): # ============ Recipe Endpoints ============ @app.post("/recipes/upload") -async def upload_recipe(file: UploadFile = File(...), username: str = Depends(get_required_user)): +async def upload_recipe(file: UploadFile = File(...), ctx: UserContext = Depends(get_required_user_context)): """Upload a recipe YAML file. Requires authentication.""" import tempfile @@ -1032,7 +1109,7 @@ async def upload_recipe(file: UploadFile = File(...), username: str = Depends(ge recipe_hash = cached.content_hash # Parse and save metadata - actor_id = f"@{username}@{L2_DOMAIN}" + actor_id = ctx.actor_id try: recipe_status = parse_recipe_yaml(yaml_content, recipe_hash, actor_id) except Exception as e: @@ -1040,8 +1117,14 @@ async def upload_recipe(file: UploadFile = File(...), username: str = Depends(ge save_recipe(recipe_status) - # Save cache metadata - save_cache_meta(recipe_hash, actor_id, file.filename, type="recipe", recipe_name=recipe_status.name) + # Save cache metadata to database + await database.save_item_metadata( + content_hash=recipe_hash, + actor_id=actor_id, + item_type="recipe", + filename=file.filename, + description=recipe_status.name # Use recipe name as description + ) return { "recipe_id": recipe_hash, @@ -1055,13 +1138,13 @@ async def upload_recipe(file: UploadFile = File(...), username: str = Depends(ge @app.get("/recipes") async def list_recipes_api(request: Request, page: int = 1, limit: int = 20): """List recipes. HTML for browsers, JSON for APIs.""" - current_user = get_user_from_cookie(request) + ctx = get_user_context_from_cookie(request) all_recipes = list_all_recipes() if wants_html(request): # HTML response - if not current_user: + if not ctx: return HTMLResponse(render_page( "Recipes", '

Login to see recipes.

', @@ -1070,8 +1153,7 @@ async def list_recipes_api(request: Request, page: int = 1, limit: int = 20): )) # Filter to user's recipes - actor_id = f"@{current_user}@{L2_DOMAIN}" - user_recipes = [c for c in all_recipes if c.uploader in (current_user, actor_id)] + user_recipes = [c for c in all_recipes if c.uploader in (ctx.username, ctx.actor_id)] total = len(user_recipes) if not user_recipes: @@ -1079,7 +1161,7 @@ async def list_recipes_api(request: Request, page: int = 1, limit: int = 20):

Recipes (0)

No recipes yet. Upload a recipe YAML file to get started.

''' - return HTMLResponse(render_page("Recipes", content, current_user, active_tab="recipes")) + return HTMLResponse(render_page("Recipes", content, ctx.actor_id, active_tab="recipes")) html_parts = [] for recipe in user_recipes: @@ -1119,7 +1201,7 @@ async def list_recipes_api(request: Request, page: int = 1, limit: int = 20): ''' - return HTMLResponse(render_page("Recipes", content, current_user, active_tab="recipes")) + return HTMLResponse(render_page("Recipes", content, ctx.actor_id, active_tab="recipes")) # JSON response for APIs total = len(all_recipes) @@ -1149,15 +1231,14 @@ async def get_recipe_api(recipe_id: str): @app.delete("/recipes/{recipe_id}") -async def remove_recipe(recipe_id: str, username: str = Depends(get_required_user)): +async def remove_recipe(recipe_id: str, ctx: UserContext = Depends(get_required_user_context)): """Delete a recipe. Requires authentication.""" recipe = load_recipe(recipe_id) if not recipe: raise HTTPException(404, f"Recipe {recipe_id} not found") # Check ownership - actor_id = f"@{username}@{L2_DOMAIN}" - if recipe.uploader not in (username, actor_id): + if recipe.uploader not in (ctx.username, ctx.actor_id): raise HTTPException(403, "Access denied") # Check if pinned @@ -1173,7 +1254,7 @@ async def remove_recipe(recipe_id: str, username: str = Depends(get_required_use @app.post("/recipes/{recipe_id}/run") -async def run_recipe(recipe_id: str, request: RecipeRunRequest, username: str = Depends(get_required_user)): +async def run_recipe(recipe_id: str, request: RecipeRunRequest, ctx: UserContext = Depends(get_required_user_context)): """Run a recipe with provided variable inputs. Requires authentication.""" recipe = load_recipe(recipe_id) if not recipe: @@ -1197,7 +1278,7 @@ async def run_recipe(recipe_id: str, request: RecipeRunRequest, username: str = # Create run run_id = str(uuid.uuid4()) - actor_id = f"@{username}@{L2_DOMAIN}" + actor_id = ctx.actor_id # Collect all input hashes all_inputs = list(request.inputs.values()) @@ -1285,14 +1366,14 @@ def build_dag_from_recipe(yaml_config: dict, user_inputs: dict[str, str], recipe @app.get("/recipe/{recipe_id}", response_class=HTMLResponse) async def recipe_detail_page(recipe_id: str, request: Request): """Recipe detail page with run form.""" - current_user = get_user_from_cookie(request) + ctx = get_user_context_from_cookie(request) recipe = load_recipe(recipe_id) if not recipe: return HTMLResponse(render_page( "Recipe Not Found", f'

Recipe {recipe_id} not found.

', - current_user, + ctx.actor_id if ctx else None, active_tab="recipes" ), status_code=404) @@ -1360,14 +1441,14 @@ async def recipe_detail_page(recipe_id: str, request: Request): ''' - return HTMLResponse(render_page(f"Recipe: {recipe.name}", content, current_user, active_tab="recipes")) + return HTMLResponse(render_page(f"Recipe: {recipe.name}", content, ctx.actor_id if ctx else None, active_tab="recipes")) @app.post("/ui/recipes/{recipe_id}/run", response_class=HTMLResponse) async def ui_run_recipe(recipe_id: str, request: Request): """HTMX handler: run a recipe with form inputs.""" - current_user = get_user_from_cookie(request) - if not current_user: + ctx = get_user_context_from_cookie(request) + if not ctx: return '
Login required
' recipe = load_recipe(recipe_id) @@ -1398,7 +1479,7 @@ async def ui_run_recipe(recipe_id: str, request: Request): # Create run run_id = str(uuid.uuid4()) - actor_id = f"@{current_user}@{L2_DOMAIN}" + actor_id = ctx.actor_id # Collect all input hashes all_inputs = list(inputs.values()) @@ -1436,16 +1517,15 @@ async def ui_run_recipe(recipe_id: str, request: Request): @app.get("/ui/recipes-list", response_class=HTMLResponse) async def ui_recipes_list(request: Request): """HTMX partial: list of recipes.""" - current_user = get_user_from_cookie(request) + ctx = get_user_context_from_cookie(request) - if not current_user: + if not ctx: return '

Login to see recipes.

' all_recipes = list_all_recipes() - # Filter to user's configs - actor_id = f"@{current_user}@{L2_DOMAIN}" - user_recipes = [c for c in all_recipes if c.uploader in (current_user, actor_id)] + # Filter to user's recipes + user_recipes = [c for c in all_recipes if c.uploader in (ctx.username, ctx.actor_id)] if not user_recipes: return '

No recipes yet. Upload a recipe YAML file to get started.

' @@ -1488,8 +1568,8 @@ async def ui_recipes_list(request: Request): @app.delete("/ui/recipes/{recipe_id}/discard", response_class=HTMLResponse) async def ui_discard_recipe(recipe_id: str, request: Request): """HTMX handler: discard a recipe.""" - current_user = get_user_from_cookie(request) - if not current_user: + ctx = get_user_context_from_cookie(request) + if not ctx: return '
Login required
' recipe = load_recipe(recipe_id) @@ -1497,8 +1577,7 @@ async def ui_discard_recipe(recipe_id: str, request: Request): return '
Recipe not found
' # Check ownership - actor_id = f"@{current_user}@{L2_DOMAIN}" - if recipe.uploader not in (current_user, actor_id): + if recipe.uploader not in (ctx.username, ctx.actor_id): return '
Access denied
' # Check if pinned @@ -1591,25 +1670,25 @@ async def get_cached_mp4(content_hash: str): @app.get("/cache/{content_hash}/detail") async def cache_detail(content_hash: str, request: Request): """View cached content detail. HTML for browsers, JSON for APIs.""" - current_user = get_user_from_cookie(request) + ctx = get_user_context_from_cookie(request) cache_path = get_cache_path(content_hash) if not cache_path: if wants_html(request): content = f'

Content not found: {content_hash}

' - return HTMLResponse(render_page("Not Found", content, current_user, active_tab="media"), status_code=404) + return HTMLResponse(render_page("Not Found", content, ctx.actor_id if ctx else None, active_tab="media"), status_code=404) raise HTTPException(404, f"Content {content_hash} not in cache") if wants_html(request): - if not current_user: + if not ctx: content = '

Login to view cached content.

' - return HTMLResponse(render_page("Login Required", content, current_user, active_tab="media"), status_code=401) + return HTMLResponse(render_page("Login Required", content, None, active_tab="media"), status_code=401) # Check user has access - user_hashes = get_user_cache_hashes(current_user) + user_hashes = await get_user_cache_hashes(ctx.username, ctx.actor_id) if content_hash not in user_hashes: content = '

Access denied.

' - return HTMLResponse(render_page("Access Denied", content, current_user, active_tab="media"), status_code=403) + return HTMLResponse(render_page("Access Denied", content, ctx.actor_id, active_tab="media"), status_code=403) media_type = detect_media_type(cache_path) file_size = cache_path.stat().st_size @@ -1684,10 +1763,10 @@ async def cache_detail(content_hash: str, request: Request): ''' - return HTMLResponse(render_page(f"Cache: {content_hash[:16]}...", content, current_user, active_tab="media")) + return HTMLResponse(render_page(f"Cache: {content_hash[:16]}...", content, ctx.actor_id, active_tab="media")) # JSON response - return metadata - meta = load_cache_meta(content_hash) + meta = await database.load_item_metadata(content_hash, ctx.actor_id if ctx else None) file_size = cache_path.stat().st_size media_type = detect_media_type(cache_path) return { @@ -1715,17 +1794,17 @@ async def ui_cache_view(content_hash: str): @app.get("/ui/cache/{content_hash}/meta-form", response_class=HTMLResponse) async def ui_cache_meta_form(content_hash: str, request: Request): """HTMX partial: metadata editing form for a cached item.""" - current_user = get_user_from_cookie(request) - if not current_user: + ctx = get_user_context_from_cookie(request) + if not ctx: return '
Login required to edit metadata
' # Check ownership - user_hashes = get_user_cache_hashes(current_user) + user_hashes = await get_user_cache_hashes(ctx.username, ctx.actor_id) if content_hash not in user_hashes: return '
Access denied
' # Load metadata - meta = load_cache_meta(content_hash) + meta = await database.load_item_metadata(content_hash, ctx.actor_id) origin = meta.get("origin", {}) origin_type = origin.get("type", "") origin_url = origin.get("url", "") @@ -1887,12 +1966,12 @@ async def ui_cache_meta_form(content_hash: str, request: Request): @app.patch("/ui/cache/{content_hash}/meta", response_class=HTMLResponse) async def ui_update_cache_meta(content_hash: str, request: Request): """HTMX handler: update cache metadata from form.""" - current_user = get_user_from_cookie(request) - if not current_user: + ctx = get_user_context_from_cookie(request) + if not ctx: return '
Login required
' # Check ownership - user_hashes = get_user_cache_hashes(current_user) + user_hashes = await get_user_cache_hashes(ctx.username, ctx.actor_id) if content_hash not in user_hashes: return '
Access denied
' @@ -1905,29 +1984,28 @@ async def ui_update_cache_meta(content_hash: str, request: Request): tags_str = form.get("tags", "").strip() # Build origin - origin = None + source_type = None if origin_type == "self": - origin = {"type": "self"} + source_type = "self" elif origin_type == "external": if not origin_url: return '
External origin requires a URL
' - origin = {"type": "external", "url": origin_url} - if origin_note: - origin["note"] = origin_note + source_type = "external" # Parse tags tags = [t.strip() for t in tags_str.split(",") if t.strip()] if tags_str else [] - # Build updates - updates = {} - if origin: - updates["origin"] = origin - if description: - updates["description"] = description - updates["tags"] = tags - - # Save - save_cache_meta(content_hash, **updates) + # Save to database + await database.update_item_metadata( + content_hash=content_hash, + actor_id=ctx.actor_id, + item_type="media", + description=description if description else None, + source_type=source_type, + source_url=origin_url if origin_url else None, + source_note=origin_note if origin_note else None, + tags=tags + ) return '
Metadata saved!
' @@ -1935,8 +2013,8 @@ async def ui_update_cache_meta(content_hash: str, request: Request): @app.post("/ui/cache/{content_hash}/publish", response_class=HTMLResponse) async def ui_publish_cache(content_hash: str, request: Request): """HTMX handler: publish cache item to L2.""" - current_user = get_user_from_cookie(request) - if not current_user: + ctx = get_user_context_from_cookie(request) + if not ctx: return '
Login required
' token = request.cookies.get("auth_token") @@ -1944,7 +2022,7 @@ async def ui_publish_cache(content_hash: str, request: Request): return '
Auth token required
' # Check ownership - user_hashes = get_user_cache_hashes(current_user) + user_hashes = await get_user_cache_hashes(ctx.username, ctx.actor_id) if content_hash not in user_hashes: return '
Access denied
' @@ -1956,17 +2034,18 @@ async def ui_publish_cache(content_hash: str, request: Request): if not asset_name: return '
Asset name required
' - # Load metadata - meta = load_cache_meta(content_hash) + # Load metadata from database + meta = await database.load_item_metadata(content_hash, ctx.actor_id) origin = meta.get("origin") if not origin or "type" not in origin: return '
Set origin before publishing
' - # Call L2 + # Call the L2 server from user's context + l2_server = ctx.l2_server try: resp = http_requests.post( - f"{L2_SERVER}/assets/publish-cache", + f"{l2_server}/assets/publish-cache", headers={"Authorization": f"Bearer {token}"}, json={ "content_hash": content_hash, @@ -1994,31 +2073,27 @@ async def ui_publish_cache(content_hash: str, request: Request): except Exception as e: return f'
Error: {e}
' - # Update local metadata - add to l2_shares list - share_info = { - "l2_server": L2_SERVER, - "asset_name": asset_name, - "published_at": datetime.now(timezone.utc).isoformat(), - "last_synced_at": datetime.now(timezone.utc).isoformat() - } - # Load existing shares and append - existing_meta = load_cache_meta(content_hash) - l2_shares = existing_meta.get("l2_shares", []) - # Update if already shared to this L2, otherwise append - updated = False - for i, share in enumerate(l2_shares): - if share.get("l2_server") == L2_SERVER: - l2_shares[i] = share_info - updated = True - break - if not updated: - l2_shares.append(share_info) - save_cache_meta(content_hash, l2_shares=l2_shares, pinned=True, pin_reason="published") + # Save L2 share to database and pin the item + await database.save_l2_share( + content_hash=content_hash, + actor_id=ctx.actor_id, + l2_server=l2_server, + asset_name=asset_name, + content_type=asset_type + ) + await database.update_item_metadata( + content_hash=content_hash, + actor_id=ctx.actor_id, + pinned=True, + pin_reason="published" + ) + # Use HTTPS for L2 links + l2_https = l2_server.replace("http://", "https://") return f'''
Published to L2 as {asset_name}! - View on L2 + View on L2
''' @@ -2026,8 +2101,8 @@ async def ui_publish_cache(content_hash: str, request: Request): @app.patch("/ui/cache/{content_hash}/republish", response_class=HTMLResponse) async def ui_republish_cache(content_hash: str, request: Request): """HTMX handler: re-publish (update) cache item on L2.""" - current_user = get_user_from_cookie(request) - if not current_user: + ctx = get_user_context_from_cookie(request) + if not ctx: return '
Login required
' token = request.cookies.get("auth_token") @@ -2035,19 +2110,20 @@ async def ui_republish_cache(content_hash: str, request: Request): return '
Auth token required
' # Check ownership - user_hashes = get_user_cache_hashes(current_user) + user_hashes = await get_user_cache_hashes(ctx.username, ctx.actor_id) if content_hash not in user_hashes: return '
Access denied
' # Load metadata - meta = load_cache_meta(content_hash) + meta = await database.load_item_metadata(content_hash, ctx.actor_id) l2_shares = meta.get("l2_shares", []) - # Find share for current L2 server + # Find share for current L2 server (user's L2) + l2_server = ctx.l2_server current_share = None share_index = -1 for i, share in enumerate(l2_shares): - if share.get("l2_server") == L2_SERVER: + if share.get("l2_server") == l2_server: current_share = share share_index = i break @@ -2062,7 +2138,7 @@ async def ui_republish_cache(content_hash: str, request: Request): # Call L2 update try: resp = http_requests.patch( - f"{L2_SERVER}/assets/{asset_name}", + f"{l2_server}/assets/{asset_name}", headers={"Authorization": f"Bearer {token}"}, json={ "description": meta.get("description"), @@ -2087,9 +2163,14 @@ async def ui_republish_cache(content_hash: str, request: Request): except Exception as e: return f'
Error: {e}
' - # Update local metadata - l2_shares[share_index]["last_synced_at"] = datetime.now(timezone.utc).isoformat() - save_cache_meta(content_hash, l2_shares=l2_shares) + # Update local metadata - save_l2_share updates last_synced_at on conflict + await database.save_l2_share( + content_hash=content_hash, + actor_id=ctx.actor_id, + l2_server=l2_server, + asset_name=asset_name, + content_type=current_share.get("content_type", "media") + ) return '
Updated on L2!
' @@ -2104,16 +2185,16 @@ async def list_media( tag: Optional[str] = None ): """List media items. HTML for browsers (with infinite scroll), JSON for APIs (with pagination).""" - current_user = get_user_from_cookie(request) + ctx = get_user_context_from_cookie(request) if wants_html(request): # Require login for HTML media view - if not current_user: + if not ctx: content = '

Login to see media.

' - return HTMLResponse(render_page("Media", content, current_user, active_tab="media")) + return HTMLResponse(render_page("Media", content, None, active_tab="media")) # Get hashes owned by/associated with this user - user_hashes = get_user_cache_hashes(current_user) + user_hashes = await get_user_cache_hashes(ctx.username, ctx.actor_id) # Get cache items that belong to the user (from cache_manager) cache_items = [] @@ -2122,7 +2203,7 @@ async def list_media( if content_hash not in user_hashes: continue - meta = load_cache_meta(content_hash) + meta = await database.load_item_metadata(content_hash, ctx.actor_id) # Apply folder filter if folder: @@ -2249,7 +2330,7 @@ async def list_media( ''' - return HTMLResponse(render_page("Media", content, current_user, active_tab="media")) + return HTMLResponse(render_page("Media", content, ctx.actor_id, active_tab="media")) # JSON response for APIs - list all hashes with optional pagination all_hashes = [cf.content_hash for cf in cache_manager.list_all()] @@ -2271,7 +2352,7 @@ async def list_media( @app.delete("/cache/{content_hash}") -async def discard_cache(content_hash: str, username: str = Depends(get_required_user)): +async def discard_cache(content_hash: str, ctx: UserContext = Depends(get_required_user_context)): """ Discard (delete) a cached item. @@ -2285,12 +2366,12 @@ async def discard_cache(content_hash: str, username: str = Depends(get_required_ raise HTTPException(404, "Content not found") # Check ownership - user_hashes = get_user_cache_hashes(username) + user_hashes = await get_user_cache_hashes(ctx.username, ctx.actor_id) if content_hash not in user_hashes: raise HTTPException(403, "Access denied") - # Check if pinned (legacy metadata) - meta = load_cache_meta(content_hash) + # Check if pinned + meta = await database.load_item_metadata(content_hash, ctx.actor_id) if meta.get("pinned"): pin_reason = meta.get("pin_reason", "unknown") raise HTTPException(400, f"Cannot discard pinned item (reason: {pin_reason})") @@ -2328,12 +2409,12 @@ async def discard_cache(content_hash: str, username: str = Depends(get_required_ @app.delete("/ui/cache/{content_hash}/discard", response_class=HTMLResponse) async def ui_discard_cache(content_hash: str, request: Request): """HTMX handler: discard a cached item.""" - current_user = get_user_from_cookie(request) - if not current_user: + ctx = get_user_context_from_cookie(request) + if not ctx: return '
Login required
' # Check ownership - user_hashes = get_user_cache_hashes(current_user) + user_hashes = await get_user_cache_hashes(ctx.username, ctx.actor_id) if content_hash not in user_hashes: return '
Access denied
' @@ -2341,8 +2422,8 @@ async def ui_discard_cache(content_hash: str, request: Request): if not cache_manager.has_content(content_hash): return '
Content not found
' - # Check if pinned (legacy metadata) - meta = load_cache_meta(content_hash) + # Check if pinned + meta = await database.load_item_metadata(content_hash, ctx.actor_id) if meta.get("pinned"): pin_reason = meta.get("pin_reason", "unknown") return f'
Cannot discard: item is pinned ({pin_reason})
' @@ -2464,22 +2545,46 @@ def save_user_data(username: str, data: dict): json.dump(data, f, indent=2) -def get_user_cache_hashes(username: str) -> set: - """Get all cache hashes owned by or associated with a user.""" - actor_id = f"@{username}@{L2_DOMAIN}" +async def get_user_cache_hashes(username: str, actor_id: Optional[str] = None) -> set: + """Get all cache hashes owned by or associated with a user. + + username: The plain username + actor_id: The full actor ID (@user@server), if available + """ + # Match against both formats for backwards compatibility + match_values = [username] + if actor_id: + match_values.append(actor_id) + hashes = set() - # Files uploaded by user + # Query database for items owned by user (new system) + if actor_id: + try: + db_items = await database.get_user_items(actor_id) + for item in db_items: + hashes.add(item["content_hash"]) + except Exception: + pass # Database may not be initialized + + # Legacy: Files uploaded by user (JSON metadata) if CACHE_DIR.exists(): for f in CACHE_DIR.iterdir(): if f.name.endswith('.meta.json'): - meta = load_cache_meta(f.name.replace('.meta.json', '')) - if meta.get("uploader") in (username, actor_id): - hashes.add(f.name.replace('.meta.json', '')) + try: + meta_path = CACHE_DIR / f.name + if meta_path.exists(): + import json + with open(meta_path, 'r') as mf: + meta = json.load(mf) + if meta.get("uploader") in match_values: + hashes.add(f.name.replace('.meta.json', '')) + except Exception: + pass # Files from user's runs (inputs and outputs) for run in list_all_runs(): - if run.username in (username, actor_id): + if run.username in match_values: hashes.update(run.inputs) if run.output_hash: hashes.add(run.output_hash) @@ -2488,7 +2593,7 @@ def get_user_cache_hashes(username: str) -> set: @app.post("/cache/upload") -async def upload_to_cache(file: UploadFile = File(...), username: str = Depends(get_required_user)): +async def upload_to_cache(file: UploadFile = File(...), ctx: UserContext = Depends(get_required_user_context)): """Upload a file to cache. Requires authentication.""" # Write to temp file first import tempfile @@ -2501,9 +2606,13 @@ async def upload_to_cache(file: UploadFile = File(...), username: str = Depends( cached, ipfs_cid = cache_manager.put(tmp_path, node_type="upload", move=True) content_hash = cached.content_hash - # Save uploader metadata - actor_id = f"@{username}@{L2_DOMAIN}" - save_cache_meta(content_hash, actor_id, file.filename) + # Save uploader metadata to database + await database.save_item_metadata( + content_hash=content_hash, + actor_id=ctx.actor_id, + item_type="media", + filename=file.filename + ) return {"content_hash": content_hash, "filename": file.filename, "size": len(content)} @@ -2524,7 +2633,7 @@ class PublishRequest(BaseModel): @app.get("/cache/{content_hash}/meta") -async def get_cache_meta(content_hash: str, username: str = Depends(get_required_user)): +async def get_cache_meta(content_hash: str, ctx: UserContext = Depends(get_required_user_context)): """Get metadata for a cached file.""" # Check file exists cache_path = get_cache_path(content_hash) @@ -2532,15 +2641,15 @@ async def get_cache_meta(content_hash: str, username: str = Depends(get_required raise HTTPException(404, "Content not found") # Check ownership - user_hashes = get_user_cache_hashes(username) + user_hashes = await get_user_cache_hashes(ctx.username, ctx.actor_id) if content_hash not in user_hashes: raise HTTPException(403, "Access denied") - return load_cache_meta(content_hash) + return await database.load_item_metadata(content_hash, ctx.actor_id) @app.patch("/cache/{content_hash}/meta") -async def update_cache_meta(content_hash: str, update: CacheMetaUpdate, username: str = Depends(get_required_user)): +async def update_cache_meta(content_hash: str, update: CacheMetaUpdate, ctx: UserContext = Depends(get_required_user_context)): """Update metadata for a cached file.""" # Check file exists cache_path = get_cache_path(content_hash) @@ -2548,7 +2657,7 @@ async def update_cache_meta(content_hash: str, update: CacheMetaUpdate, username raise HTTPException(404, "Content not found") # Check ownership - user_hashes = get_user_cache_hashes(username) + user_hashes = await get_user_cache_hashes(ctx.username, ctx.actor_id) if content_hash not in user_hashes: raise HTTPException(403, "Access denied") @@ -2562,20 +2671,20 @@ async def update_cache_meta(content_hash: str, update: CacheMetaUpdate, username updates["tags"] = update.tags if update.folder is not None: # Ensure folder exists in user's folder list - user_data = load_user_data(username) + user_data = load_user_data(ctx.username) if update.folder not in user_data["folders"]: raise HTTPException(400, f"Folder does not exist: {update.folder}") updates["folder"] = update.folder if update.collections is not None: # Validate collections exist - user_data = load_user_data(username) + user_data = load_user_data(ctx.username) existing = {c["name"] for c in user_data["collections"]} for col in update.collections: if col not in existing: raise HTTPException(400, f"Collection does not exist: {col}") updates["collections"] = update.collections - meta = save_cache_meta(content_hash, **updates) + meta = await database.update_item_metadata(content_hash, ctx.actor_id, **updates) return meta @@ -2584,7 +2693,7 @@ async def publish_cache_to_l2( content_hash: str, req: PublishRequest, request: Request, - username: str = Depends(get_required_user) + ctx: UserContext = Depends(get_required_user_context) ): """ Publish a cache item to L2 (ActivityPub). @@ -2597,12 +2706,12 @@ async def publish_cache_to_l2( raise HTTPException(404, "Content not found") # Check ownership - user_hashes = get_user_cache_hashes(username) + user_hashes = await get_user_cache_hashes(ctx.username, ctx.actor_id) if content_hash not in user_hashes: raise HTTPException(403, "Access denied") # Load metadata - meta = load_cache_meta(content_hash) + meta = await database.load_item_metadata(content_hash, ctx.actor_id) # Check origin is set origin = meta.get("origin") @@ -2620,10 +2729,11 @@ async def publish_cache_to_l2( if not token: raise HTTPException(401, "Authentication token required") - # Call L2 publish-cache endpoint + # Call L2 publish-cache endpoint (use user's L2 server) + l2_server = ctx.l2_server try: resp = http_requests.post( - f"{L2_SERVER}/assets/publish-cache", + f"{l2_server}/assets/publish-cache", headers={"Authorization": f"Bearer {token}"}, json={ "content_hash": content_hash, @@ -2653,13 +2763,19 @@ async def publish_cache_to_l2( raise HTTPException(500, f"L2 publish failed: {e}") # Update local metadata with publish status and pin - publish_info = { - "to_l2": True, - "asset_name": req.asset_name, - "published_at": datetime.now(timezone.utc).isoformat(), - "last_synced_at": datetime.now(timezone.utc).isoformat() - } - save_cache_meta(content_hash, published=publish_info, pinned=True, pin_reason="published") + await database.save_l2_share( + content_hash=content_hash, + actor_id=ctx.actor_id, + l2_server=l2_server, + asset_name=req.asset_name, + content_type=req.asset_type + ) + await database.update_item_metadata( + content_hash=content_hash, + actor_id=ctx.actor_id, + pinned=True, + pin_reason="published" + ) return { "published": True, @@ -2672,7 +2788,7 @@ async def publish_cache_to_l2( async def republish_cache_to_l2( content_hash: str, request: Request, - username: str = Depends(get_required_user) + ctx: UserContext = Depends(get_required_user_context) ): """ Re-publish (update) a cache item on L2 after metadata changes. @@ -2685,19 +2801,26 @@ async def republish_cache_to_l2( raise HTTPException(404, "Content not found") # Check ownership - user_hashes = get_user_cache_hashes(username) + user_hashes = await get_user_cache_hashes(ctx.username, ctx.actor_id) if content_hash not in user_hashes: raise HTTPException(403, "Access denied") # Load metadata - meta = load_cache_meta(content_hash) + meta = await database.load_item_metadata(content_hash, ctx.actor_id) + l2_shares = meta.get("l2_shares", []) - # Check already published - published = meta.get("published", {}) - if not published.get("to_l2"): + # Find share for current L2 server (user's L2) + l2_server = ctx.l2_server + current_share = None + for share in l2_shares: + if share.get("l2_server") == l2_server: + current_share = share + break + + if not current_share: raise HTTPException(400, "Item not published yet. Use publish first.") - asset_name = published.get("asset_name") + asset_name = current_share.get("asset_name") if not asset_name: raise HTTPException(400, "No asset name found in publish info") @@ -2711,10 +2834,11 @@ async def republish_cache_to_l2( if not token: raise HTTPException(401, "Authentication token required") - # Call L2 update endpoint + # Call L2 update endpoint (use user's L2 server) + l2_server = ctx.l2_server try: resp = http_requests.patch( - f"{L2_SERVER}/assets/{asset_name}", + f"{l2_server}/assets/{asset_name}", headers={"Authorization": f"Bearer {token}"}, json={ "description": meta.get("description"), @@ -2740,9 +2864,14 @@ async def republish_cache_to_l2( except Exception as e: raise HTTPException(500, f"L2 update failed: {e}") - # Update local metadata - published["last_synced_at"] = datetime.now(timezone.utc).isoformat() - save_cache_meta(content_hash, published=published) + # Update local metadata - save_l2_share updates last_synced_at on conflict + await database.save_l2_share( + content_hash=content_hash, + actor_id=ctx.actor_id, + l2_server=l2_server, + asset_name=asset_name, + content_type=current_share.get("content_type", "media") + ) return { "updated": True, @@ -2786,12 +2915,12 @@ async def create_folder(folder_path: str, username: str = Depends(get_required_u @app.delete("/user/folders") -async def delete_folder(folder_path: str, username: str = Depends(get_required_user)): +async def delete_folder(folder_path: str, ctx: UserContext = Depends(get_required_user_context)): """Delete a folder (must be empty).""" if folder_path == "/": raise HTTPException(400, "Cannot delete root folder") - user_data = load_user_data(username) + user_data = load_user_data(ctx.username) if folder_path not in user_data["folders"]: raise HTTPException(404, "Folder not found") @@ -2802,14 +2931,14 @@ async def delete_folder(folder_path: str, username: str = Depends(get_required_u raise HTTPException(400, f"Folder has subfolders: {f}") # Check no items in folder - user_hashes = get_user_cache_hashes(username) + user_hashes = await get_user_cache_hashes(ctx.username, ctx.actor_id) for h in user_hashes: - meta = load_cache_meta(h) + meta = await database.load_item_metadata(h, ctx.actor_id) if meta.get("folder") == folder_path: raise HTTPException(400, "Folder is not empty") user_data["folders"].remove(folder_path) - save_user_data(username, user_data) + save_user_data(ctx.username, user_data) return {"folder": folder_path, "deleted": True} @@ -2841,23 +2970,23 @@ async def create_collection(name: str, username: str = Depends(get_required_user @app.delete("/user/collections") -async def delete_collection(name: str, username: str = Depends(get_required_user)): +async def delete_collection(name: str, ctx: UserContext = Depends(get_required_user_context)): """Delete a collection.""" - user_data = load_user_data(username) + user_data = load_user_data(ctx.username) # Find and remove for i, col in enumerate(user_data["collections"]): if col["name"] == name: user_data["collections"].pop(i) - save_user_data(username, user_data) + save_user_data(ctx.username, user_data) # Remove from all cache items - user_hashes = get_user_cache_hashes(username) + user_hashes = await get_user_cache_hashes(ctx.username, ctx.actor_id) for h in user_hashes: - meta = load_cache_meta(h) + meta = await database.load_item_metadata(h, ctx.actor_id) if name in meta.get("collections", []): - meta["collections"].remove(name) - save_cache_meta(h, **{k: v for k, v in meta.items() if k not in ("uploader", "uploaded_at")}) + new_collections = [c for c in meta.get("collections", []) if c != name] + await database.update_item_metadata(h, ctx.actor_id, collections=new_collections) return {"collection": name, "deleted": True} @@ -2903,12 +3032,18 @@ def detect_media_type(cache_path: Path) -> str: return "unknown" -def get_user_from_cookie(request) -> Optional[str]: - """Get username from auth cookie.""" +def get_user_context_from_cookie(request) -> Optional[UserContext]: + """Get user context from auth cookie. Returns full context with actor_id and l2_server.""" token = request.cookies.get("auth_token") if not token: return None - return verify_token_with_l2(token) + return get_verified_user_context(token) + + +def get_user_from_cookie(request) -> Optional[str]: + """Get username from auth cookie (backwards compat - prefer get_user_context_from_cookie).""" + ctx = get_user_context_from_cookie(request) + return ctx.username if ctx else None def wants_html(request: Request) -> bool: @@ -2936,12 +3071,13 @@ TAILWIND_CONFIG = ''' ''' -def render_page(title: str, content: str, username: Optional[str] = None, active_tab: str = None) -> str: - """Render a page with nav bar and content. Used for clean URL pages.""" +def render_page(title: str, content: str, actor_id: Optional[str] = None, active_tab: str = None) -> str: + """Render a page with nav bar and content. Used for clean URL pages. + + actor_id: The user's actor ID (@user@server) or None if not logged in. + """ user_info = "" - if username: - # Display as @user@server format - actor_id = f"@{username}@{L2_DOMAIN}" if not username.startswith("@") else username + if actor_id: user_info = f'''
Logged in as {actor_id} @@ -2992,12 +3128,13 @@ def render_page(title: str, content: str, username: Optional[str] = None, active """ -def render_ui_html(username: Optional[str] = None, tab: str = "runs") -> str: - """Render main UI HTML with optional user context.""" +def render_ui_html(actor_id: Optional[str] = None, tab: str = "runs") -> str: + """Render main UI HTML with optional user context. + + actor_id: The user's actor ID (@user@server) or None if not logged in. + """ user_info = "" - if username: - # Display as @user@server format - actor_id = f"@{username}@{L2_DOMAIN}" if not username.startswith("@") else username + if actor_id: user_info = f'''
Logged in as {actor_id} @@ -3066,23 +3203,23 @@ L1_PUBLIC_URL = os.environ.get("L1_PUBLIC_URL", "http://localhost:8100") @app.get("/login") async def login_page(): """Redirect to L2 server for login. L1 never handles credentials.""" - # Redirect to L2 with return URL so L2 can redirect back after login + # Redirect to default L2 with return URL so L2 can redirect back after login return_url = f"{L1_PUBLIC_URL}/runs" - return RedirectResponse(url=f"{L2_SERVER}/login?return_to={return_url}", status_code=302) + return RedirectResponse(url=f"{DEFAULT_L2_SERVER}/login?return_to={return_url}", status_code=302) @app.get("/register") async def register_page(): """Redirect to L2 server for registration. L1 never handles credentials.""" return_url = f"{L1_PUBLIC_URL}/runs" - return RedirectResponse(url=f"{L2_SERVER}/register?return_to={return_url}", status_code=302) + return RedirectResponse(url=f"{DEFAULT_L2_SERVER}/register?return_to={return_url}", status_code=302) @app.get("/logout") async def logout(): """Logout - clear cookie and redirect to L2 logout.""" # Clear local cookie and redirect to L2 to clear shared cookie - response = RedirectResponse(url=f"{L2_SERVER}/logout?return_to={L1_PUBLIC_URL}/", status_code=302) + response = RedirectResponse(url=f"{DEFAULT_L2_SERVER}/logout?return_to={L1_PUBLIC_URL}/", status_code=302) response.delete_cookie("auth_token") return response @@ -3116,6 +3253,10 @@ async def ui_logout(): @app.post("/ui/publish-run/{run_id}", response_class=HTMLResponse) async def ui_publish_run(run_id: str, request: Request, output_name: str = Form(...)): """Publish a run to L2 from the web UI.""" + ctx = get_user_context_from_cookie(request) + if not ctx: + return HTMLResponse('
Not logged in
') + token = request.cookies.get("auth_token") if not token: return HTMLResponse('
Not logged in
') @@ -3127,9 +3268,10 @@ async def ui_publish_run(run_id: str, request: Request, output_name: str = Form( # Call L2 to publish the run, including this L1's public URL # Longer timeout because L2 calls back to L1 to fetch run details + l2_server = ctx.l2_server try: resp = http_requests.post( - f"{L2_SERVER}/assets/record-run", + f"{l2_server}/assets/record-run", json={"run_id": run_id, "output_name": output_name, "l1_server": L1_PUBLIC_URL}, headers={"Authorization": f"Bearer {token}"}, timeout=30 @@ -3142,11 +3284,11 @@ async def ui_publish_run(run_id: str, request: Request, output_name: str = Form( # Pin the output if run.output_hash: - save_cache_meta(run.output_hash, pinned=True, pin_reason="published") + await database.update_item_metadata(run.output_hash, ctx.actor_id, pinned=True, pin_reason="published") # Pin the inputs (for provenance) for input_hash in run.inputs: - save_cache_meta(input_hash, pinned=True, pin_reason="input_to_published") + await database.update_item_metadata(input_hash, ctx.actor_id, pinned=True, pin_reason="input_to_published") # If this was a recipe-based run, pin the recipe and its fixed inputs if run.recipe.startswith("recipe:"): @@ -3180,16 +3322,15 @@ async def ui_publish_run(run_id: str, request: Request, output_name: str = Form( @app.get("/ui/runs", response_class=HTMLResponse) async def ui_runs(request: Request): """HTMX partial: list of runs.""" - current_user = get_user_from_cookie(request) + ctx = get_user_context_from_cookie(request) runs = list_all_runs() # Require login to see runs - if not current_user: + if not ctx: return '

Login to see your runs.

' # Filter runs by user - match both plain username and ActivityPub format (@user@domain) - actor_id = f"@{current_user}@{L2_DOMAIN}" - runs = [r for r in runs if r.username in (current_user, actor_id)] + runs = [r for r in runs if r.username in (ctx.username, ctx.actor_id)] if not runs: return '

You have no runs yet. Use the CLI to start a run.

' @@ -3281,14 +3422,14 @@ async def ui_media_list( tag: Optional[str] = None ): """HTMX partial: list of media items with optional filtering.""" - current_user = get_user_from_cookie(request) + ctx = get_user_context_from_cookie(request) # Require login to see media - if not current_user: + if not ctx: return '

Login to see media.

' # Get hashes owned by/associated with this user - user_hashes = get_user_cache_hashes(current_user) + user_hashes = await get_user_cache_hashes(ctx.username, ctx.actor_id) # Get cache items that belong to the user (from cache_manager) cache_items = [] @@ -3298,7 +3439,7 @@ async def ui_media_list( continue # Load metadata for filtering - meta = load_cache_meta(content_hash) + meta = await database.load_item_metadata(content_hash, ctx.actor_id) # Apply folder filter if folder: