#!/usr/bin/env python3 """ Art DAG L2 Server - ActivityPub Manages ownership registry, activities, and federation. - Registry of owned assets - ActivityPub actor endpoints - Sign and publish Create activities - Federation with other servers """ import hashlib import json import logging import os import uuid from contextlib import asynccontextmanager from datetime import datetime, timezone from pathlib import Path from typing import Optional from urllib.parse import urlparse # Configure logging logging.basicConfig( level=logging.INFO, format='%(asctime)s %(levelname)s %(name)s: %(message)s' ) logger = logging.getLogger(__name__) from fastapi import FastAPI, HTTPException, Request, Response, Depends, Cookie, Form from fastapi.responses import JSONResponse, HTMLResponse, RedirectResponse, FileResponse from fastapi.security import HTTPBearer, HTTPAuthorizationCredentials from pydantic import BaseModel import requests import markdown import db from auth import ( UserCreate, UserLogin, Token, User, create_user, authenticate_user, create_access_token, verify_token, get_token_claims, get_current_user ) # Configuration DOMAIN = os.environ.get("ARTDAG_DOMAIN", "artdag.rose-ash.com") DATA_DIR = Path(os.environ.get("ARTDAG_DATA", str(Path.home() / ".artdag" / "l2"))) L1_PUBLIC_URL = os.environ.get("L1_PUBLIC_URL", "https://celery-artdag.rose-ash.com") EFFECTS_REPO_URL = os.environ.get("EFFECTS_REPO_URL", "https://git.rose-ash.com/art-dag/effects") IPFS_GATEWAY_URL = os.environ.get("IPFS_GATEWAY_URL", "") # Known L1 renderers (comma-separated URLs) L1_SERVERS_STR = os.environ.get("L1_SERVERS", "https://celery-artdag.rose-ash.com") L1_SERVERS = [s.strip() for s in L1_SERVERS_STR.split(",") if s.strip()] # Cookie domain for sharing auth across subdomains (e.g., ".rose-ash.com") # If not set, derives from DOMAIN (strips first subdomain, adds leading dot) def _get_cookie_domain(): env_val = os.environ.get("COOKIE_DOMAIN") if env_val: return env_val # Derive from DOMAIN: artdag.rose-ash.com -> .rose-ash.com parts = DOMAIN.split(".") if len(parts) >= 2: return "." + ".".join(parts[-2:]) return None COOKIE_DOMAIN = _get_cookie_domain() # Ensure data directory exists DATA_DIR.mkdir(parents=True, exist_ok=True) (DATA_DIR / "assets").mkdir(exist_ok=True) def compute_run_id(input_hashes: list[str], recipe: str, recipe_hash: str = None) -> str: """ Compute a deterministic run_id from inputs and recipe. The run_id is a SHA3-256 hash of: - Sorted input content hashes - Recipe identifier (recipe_hash if provided, else "effect:{recipe}") This makes runs content-addressable: same inputs + recipe = same run_id. Must match the L1 implementation exactly. """ data = { "inputs": sorted(input_hashes), "recipe": recipe_hash or f"effect:{recipe}", "version": "1", # For future schema changes } json_str = json.dumps(data, sort_keys=True, separators=(",", ":")) return hashlib.sha3_256(json_str.encode()).hexdigest() # Load README README_PATH = Path(__file__).parent / "README.md" README_CONTENT = "" if README_PATH.exists(): README_CONTENT = README_PATH.read_text() @asynccontextmanager async def lifespan(app: FastAPI): """Manage database connection pool lifecycle.""" await db.init_pool() yield await db.close_pool() app = FastAPI( title="Art DAG L2 Server", description="ActivityPub server for Art DAG ownership and federation", version="0.1.0", lifespan=lifespan ) @app.exception_handler(404) async def not_found_handler(request: Request, exc): """Custom 404 page.""" accept = request.headers.get("accept", "") if "text/html" in accept and "application/json" not in accept: content = '''

404

Page not found

Go to home page
''' username = get_user_from_cookie(request) return HTMLResponse(base_html("Not Found", content, username), status_code=404) return JSONResponse({"detail": "Not found"}, status_code=404) # ============ Data Models ============ class Asset(BaseModel): """An owned asset.""" name: str content_hash: str ipfs_cid: Optional[str] = None # IPFS content identifier asset_type: str # image, video, effect, recipe, infrastructure tags: list[str] = [] metadata: dict = {} url: Optional[str] = None provenance: Optional[dict] = None created_at: str = "" class Activity(BaseModel): """An ActivityPub activity.""" activity_id: str activity_type: str # Create, Update, Delete, Announce actor_id: str object_data: dict published: str signature: Optional[dict] = None class RegisterRequest(BaseModel): """Request to register an asset.""" name: str content_hash: str ipfs_cid: Optional[str] = None # IPFS content identifier asset_type: str tags: list[str] = [] metadata: dict = {} url: Optional[str] = None provenance: Optional[dict] = None class RecordRunRequest(BaseModel): """Request to record an L1 run.""" run_id: str l1_server: str # URL of the L1 server that has this run output_name: Optional[str] = None # Deprecated - assets now named by content_hash class PublishCacheRequest(BaseModel): """Request to publish a cache item from L1.""" content_hash: str ipfs_cid: Optional[str] = None # IPFS content identifier asset_name: str asset_type: str = "image" origin: dict # {type: "self"|"external", url?: str, note?: str} description: Optional[str] = None tags: list[str] = [] metadata: dict = {} class UpdateAssetRequest(BaseModel): """Request to update an existing asset.""" description: Optional[str] = None tags: Optional[list[str]] = None metadata: Optional[dict] = None origin: Optional[dict] = None ipfs_cid: Optional[str] = None # IPFS content identifier class AddStorageRequest(BaseModel): """Request to add a storage provider.""" provider_type: str # 'pinata', 'web3storage', 'local' provider_name: Optional[str] = None # User-friendly name config: dict # Provider-specific config (api_key, path, etc.) capacity_gb: int # Storage capacity in GB class UpdateStorageRequest(BaseModel): """Request to update a storage provider.""" config: Optional[dict] = None capacity_gb: Optional[int] = None is_active: Optional[bool] = None class SetAssetSourceRequest(BaseModel): """Request to set source URL for an asset.""" source_url: str source_type: str # 'youtube', 'local', 'url' # ============ Storage (Database) ============ async def load_registry() -> dict: """Load registry from database.""" assets = await db.get_all_assets() return {"version": "1.0", "assets": assets} async def load_activities() -> list: """Load activities from database.""" return await db.get_all_activities() def load_actor(username: str) -> dict: """Load actor data for a specific user with public key if available.""" actor = { "id": f"https://{DOMAIN}/users/{username}", "type": "Person", "preferredUsername": username, "name": username, "inbox": f"https://{DOMAIN}/users/{username}/inbox", "outbox": f"https://{DOMAIN}/users/{username}/outbox", "followers": f"https://{DOMAIN}/users/{username}/followers", "following": f"https://{DOMAIN}/users/{username}/following", } # Add public key if available from keys import has_keys, load_public_key_pem if has_keys(DATA_DIR, username): actor["publicKey"] = { "id": f"https://{DOMAIN}/users/{username}#main-key", "owner": f"https://{DOMAIN}/users/{username}", "publicKeyPem": load_public_key_pem(DATA_DIR, username) } return actor async def user_exists(username: str) -> bool: """Check if a user exists.""" return await db.user_exists(username) async def load_followers() -> list: """Load followers list from database.""" return await db.get_all_followers() # ============ Signing ============ from keys import has_keys, load_public_key_pem, create_signature def sign_activity(activity: dict, username: str) -> dict: """Sign an activity with the user's RSA private key.""" if not has_keys(DATA_DIR, username): # No keys - use placeholder (for testing) activity["signature"] = { "type": "RsaSignature2017", "creator": f"https://{DOMAIN}/users/{username}#main-key", "created": datetime.now(timezone.utc).isoformat(), "signatureValue": "NO_KEYS_CONFIGURED" } else: activity["signature"] = create_signature(DATA_DIR, username, DOMAIN, activity) return activity # ============ HTML Templates ============ # Tailwind CSS config for L2 - dark theme to match L1 TAILWIND_CONFIG = ''' ''' def base_html(title: str, content: str, username: str = None) -> str: """Base HTML template with Tailwind CSS dark theme.""" user_section = f'''
Logged in as {username} Logout
''' if username else '''
Login | Register
''' return f''' {title} - Art DAG L2 {TAILWIND_CONFIG}

Art DAG L2

{user_section}
{content}
''' def get_user_from_cookie(request: Request) -> Optional[str]: """Get username from auth cookie.""" token = request.cookies.get("auth_token") if token: return verify_token(token) return None def wants_html(request: Request) -> bool: """Check if request wants HTML (browser) vs JSON (API).""" accept = request.headers.get("accept", "") return "text/html" in accept and "application/json" not in accept and "application/activity+json" not in accept def format_date(value, length: int = 10) -> str: """Format a date value (datetime or string) to a string, sliced to length.""" if value is None: return "" if hasattr(value, 'isoformat'): return value.isoformat()[:length] if isinstance(value, str): return value[:length] return "" # ============ Auth UI Endpoints ============ @app.get("/login", response_class=HTMLResponse) async def ui_login_page(request: Request, return_to: str = None): """Login page. Accepts optional return_to URL for redirect after login.""" username = get_user_from_cookie(request) if username: return HTMLResponse(base_html("Already Logged In", f'''
You are already logged in as {username}

Go to home page

''', username)) # Hidden field for return_to URL return_to_field = f'' if return_to else '' content = f'''

Login

{return_to_field}

Don't have an account? Register

''' return HTMLResponse(base_html("Login", content)) @app.post("/login", response_class=HTMLResponse) async def ui_login_submit(request: Request): """Handle login form submission.""" form = await request.form() username = form.get("username", "").strip() password = form.get("password", "") return_to = form.get("return_to", "").strip() if not username or not password: return HTMLResponse('
Username and password are required
') user = await authenticate_user(DATA_DIR, username, password) if not user: return HTMLResponse('
Invalid username or password
') token = create_access_token(user.username, l2_server=f"https://{DOMAIN}") # If return_to is specified, redirect there with token for the other site to set its own cookie if return_to and return_to.startswith("http"): # Append token to return_to URL for the target site to set its own cookie separator = "&" if "?" in return_to else "?" redirect_url = f"{return_to}{separator}auth_token={token.access_token}" response = HTMLResponse(f'''
Login successful! Redirecting...
''') else: response = HTMLResponse(f'''
Login successful! Redirecting...
''') # Set cookie for L2 only (L1 servers set their own cookies via /auth endpoint) response.set_cookie( key="auth_token", value=token.access_token, httponly=True, max_age=60 * 60 * 24 * 30, # 30 days samesite="lax", secure=True ) return response @app.get("/register", response_class=HTMLResponse) async def ui_register_page(request: Request): """Register page.""" username = get_user_from_cookie(request) if username: return HTMLResponse(base_html("Already Logged In", f'''
You are already logged in as {username}

Go to home page

''', username)) content = '''

Register

Already have an account? Login

''' return HTMLResponse(base_html("Register", content)) @app.post("/register", response_class=HTMLResponse) async def ui_register_submit(request: Request): """Handle register form submission.""" form = await request.form() username = form.get("username", "").strip() email = form.get("email", "").strip() or None password = form.get("password", "") password2 = form.get("password2", "") if not username or not password: return HTMLResponse('
Username and password are required
') if password != password2: return HTMLResponse('
Passwords do not match
') if len(password) < 6: return HTMLResponse('
Password must be at least 6 characters
') try: user = await create_user(DATA_DIR, username, password, email) except ValueError as e: return HTMLResponse(f'
{str(e)}
') token = create_access_token(user.username, l2_server=f"https://{DOMAIN}") response = HTMLResponse(f'''
Registration successful! Redirecting...
''') response.set_cookie( key="auth_token", value=token.access_token, httponly=True, max_age=60 * 60 * 24 * 30, # 30 days samesite="lax", secure=True ) return response @app.get("/logout") async def logout(request: Request): """Handle logout - clear cookie, revoke token on L2 and attached L1s, and redirect to home.""" token = request.cookies.get("auth_token") claims = get_token_claims(token) if token else None username = claims.get("sub") if claims else None if username and token and claims: # Revoke token in L2 database (so even if L1 ignores revoke, token won't verify) token_hash = hashlib.sha256(token.encode()).hexdigest() expires_at = datetime.fromtimestamp(claims.get("exp", 0), tz=timezone.utc) await db.revoke_token(token_hash, username, expires_at) # Revoke ALL tokens for this user on attached L1 renderers # (L1 may have scoped tokens different from L2's token) attached = await db.get_user_renderers(username) for l1_url in attached: try: requests.post( f"{l1_url}/auth/revoke-user", json={"username": username, "l2_server": f"https://{DOMAIN}"}, timeout=5 ) except Exception as e: logger.warning(f"Failed to revoke user tokens on {l1_url}: {e}") # Remove all attachments for this user for l1_url in attached: await db.detach_renderer(username, l1_url) response = RedirectResponse(url="/", status_code=302) # Delete both legacy (no domain) and new (shared domain) cookies response.delete_cookie("auth_token") if COOKIE_DOMAIN: response.delete_cookie("auth_token", domain=COOKIE_DOMAIN) return response # ============ HTML Rendering Helpers ============ async def ui_activity_detail(activity_index: int, request: Request): """Activity detail page with full content display. Helper function for HTML rendering.""" username = get_user_from_cookie(request) activities = await load_activities() if activity_index < 0 or activity_index >= len(activities): content = '''

Activity Not Found

This activity does not exist.

← Back to Activities

''' return HTMLResponse(base_html("Activity Not Found", content, username)) activity = activities[activity_index] return await _render_activity_detail(activity, request) async def ui_activity_detail_by_data(activity: dict, request: Request): """Activity detail page taking activity data directly.""" return await _render_activity_detail(activity, request) async def _render_activity_detail(activity: dict, request: Request): """Core activity detail rendering logic.""" username = get_user_from_cookie(request) activity_type = activity.get("activity_type", "") activity_id = activity.get("activity_id", "") actor_id = activity.get("actor_id", "") actor_name = actor_id.split("/")[-1] if actor_id else "unknown" published = format_date(activity.get("published")) obj = activity.get("object_data", {}) # Object details obj_name = obj.get("name", "Untitled") obj_type = obj.get("type", "") content_hash_obj = obj.get("contentHash", {}) content_hash = content_hash_obj.get("value", "") if isinstance(content_hash_obj, dict) else "" media_type = obj.get("mediaType", "") description = obj.get("summary", "") or obj.get("content", "") # Provenance from object - or fallback to registry asset provenance = obj.get("provenance", {}) origin = obj.get("origin", {}) # Fallback: if activity doesn't have provenance, look up the asset from registry if not provenance or not origin: registry = await load_registry() assets = registry.get("assets", {}) # Find asset by content_hash or name for asset_name, asset_data in assets.items(): if asset_data.get("content_hash") == content_hash or asset_data.get("name") == obj_name: if not provenance: provenance = asset_data.get("provenance", {}) if not origin: origin = asset_data.get("origin", {}) break # Type colors type_color = "bg-green-600" if activity_type == "Create" else "bg-yellow-600" if activity_type == "Update" else "bg-gray-600" obj_type_color = "bg-blue-600" if "Image" in obj_type else "bg-purple-600" if "Video" in obj_type else "bg-gray-600" # Determine L1 server and asset type l1_server = provenance.get("l1_server", L1_PUBLIC_URL).rstrip("/") if provenance else L1_PUBLIC_URL.rstrip("/") is_video = "Video" in obj_type or "video" in media_type # Content display if is_video: content_html = f'''
Download Original
''' elif "Image" in obj_type or "image" in media_type: content_html = f'''
{obj_name}
Download
''' else: content_html = f'''

Content type: {media_type or obj_type}

Download
''' # Origin display origin_html = 'Not specified' if origin: origin_type = origin.get("type", "") if origin_type == "self": origin_html = 'Original content by author' elif origin_type == "external": origin_url = origin.get("url", "") origin_note = origin.get("note", "") origin_html = f'{origin_url}' if origin_note: origin_html += f'

{origin_note}

' # Provenance section provenance_html = "" if provenance and provenance.get("recipe"): recipe = provenance.get("recipe", "") inputs = provenance.get("inputs", []) l1_run_id = provenance.get("l1_run_id", "") rendered_at = format_date(provenance.get("rendered_at")) effects_commit = provenance.get("effects_commit", "") effect_url = provenance.get("effect_url") infrastructure = provenance.get("infrastructure", {}) if not effect_url: if effects_commit and effects_commit != "unknown": effect_url = f"{EFFECTS_REPO_URL}/src/commit/{effects_commit}/{recipe}" else: effect_url = f"{EFFECTS_REPO_URL}/src/branch/main/{recipe}" # Build inputs display - show actual content as thumbnails inputs_html = "" for inp in inputs: inp_hash = inp.get("content_hash", "") if isinstance(inp, dict) else inp if inp_hash: inputs_html += f'''
Input
{inp_hash[:16]}... view
''' # Infrastructure display infra_html = "" if infrastructure: software = infrastructure.get("software", {}) hardware = infrastructure.get("hardware", {}) if software or hardware: infra_parts = [] if software: infra_parts.append(f"Software: {software.get('name', 'unknown')}") if hardware: infra_parts.append(f"Hardware: {hardware.get('name', 'unknown')}") infra_html = f'

{" | ".join(infra_parts)}

' provenance_html = f'''

Provenance

This content was created by applying an effect to input content.

Effect

{recipe} {f'
Commit: {effects_commit[:12]}...
' if effects_commit else ''}

Input(s)

{inputs_html if inputs_html else 'No inputs recorded'}

Rendered

{rendered_at if rendered_at else 'Unknown'} {infra_html}
''' content = f'''

← Back to Activities

{activity_type}

{obj_name}

{obj_type}
{content_html}

Actor

{actor_name}

Description

{description if description else 'No description'}

Origin

{origin_html}

Content Hash

{content_hash}

Published

{published}

Activity ID

{activity_id}
{provenance_html}

ActivityPub

Object URL: https://{DOMAIN}/objects/{content_hash}

Actor: {actor_id}

''' return HTMLResponse(base_html(f"Activity: {obj_name}", content, username)) async def ui_asset_detail(name: str, request: Request): """Asset detail page with content preview and provenance. Helper function for HTML rendering.""" username = get_user_from_cookie(request) registry = await load_registry() assets = registry.get("assets", {}) if name not in assets: content = f'''

Asset Not Found

No asset named "{name}" exists.

← Back to Assets

''' return HTMLResponse(base_html("Asset Not Found", content, username)) asset = assets[name] owner = asset.get("owner", "unknown") content_hash = asset.get("content_hash", "") ipfs_cid = asset.get("ipfs_cid", "") asset_type = asset.get("asset_type", "") tags = asset.get("tags", []) description = asset.get("description", "") origin = asset.get("origin") or {} provenance = asset.get("provenance") or {} metadata = asset.get("metadata") or {} created_at = format_date(asset.get("created_at")) type_color = "bg-blue-600" if asset_type == "image" else "bg-purple-600" if asset_type == "video" else "bg-gray-600" # Determine L1 server URL for content l1_server = provenance.get("l1_server", L1_PUBLIC_URL).rstrip("/") # Content display - image or video from L1 if asset_type == "video": # Use iOS-compatible MP4 endpoint content_html = f'''
Download Original
''' elif asset_type == "image": content_html = f'''
{name}
Download
''' elif asset_type == "recipe": # Fetch recipe source from L1 or IPFS recipe_source = "" try: resp = requests.get(f"{l1_server}/cache/{content_hash}", timeout=10, headers={"Accept": "text/plain"}) if resp.status_code == 200: recipe_source = resp.text except Exception: pass if not recipe_source and ipfs_cid: # Try IPFS try: import ipfs_client recipe_bytes = ipfs_client.get_bytes(ipfs_cid) if recipe_bytes: recipe_source = recipe_bytes.decode('utf-8') except Exception: pass import html as html_module recipe_source_escaped = html_module.escape(recipe_source) if recipe_source else "(Could not load recipe source)" content_html = f'''

Recipe Source

{recipe_source_escaped}
Download YAML
''' else: content_html = f'''

Content type: {asset_type}

Download
''' # Origin display origin_html = 'Not specified' if origin: origin_type = origin.get("type", "unknown") if origin_type == "self": origin_html = 'Original content by author' elif origin_type == "external": origin_url = origin.get("url", "") origin_note = origin.get("note", "") origin_html = f'{origin_url}' if origin_note: origin_html += f'

{origin_note}

' # Tags display tags_html = 'No tags' if tags: tags_html = " ".join([f'{t}' for t in tags]) # IPFS display if ipfs_cid: local_gateway = f'Local' if IPFS_GATEWAY_URL else '' ipfs_html = f'''{ipfs_cid}
{local_gateway} ipfs.io dweb.link
''' else: ipfs_html = 'Not on IPFS' # Provenance section - for rendered outputs provenance_html = "" if provenance: recipe = provenance.get("recipe", "") inputs = provenance.get("inputs", []) l1_run_id = provenance.get("l1_run_id", "") rendered_at = format_date(provenance.get("rendered_at")) effects_commit = provenance.get("effects_commit", "") infrastructure = provenance.get("infrastructure", {}) # Use stored effect_url or build fallback effect_url = provenance.get("effect_url") if not effect_url: # Fallback for older records if effects_commit and effects_commit != "unknown": effect_url = f"{EFFECTS_REPO_URL}/src/commit/{effects_commit}/{recipe}" else: effect_url = f"{EFFECTS_REPO_URL}/src/branch/main/{recipe}" # Build inputs display - show actual content as thumbnails inputs_html = "" for inp in inputs: inp_hash = inp.get("content_hash", "") if isinstance(inp, dict) else inp if inp_hash: inputs_html += f'''
Input
{inp_hash[:16]}... view
''' # Infrastructure display infra_html = "" if infrastructure: software = infrastructure.get("software", {}) hardware = infrastructure.get("hardware", {}) if software or hardware: infra_html = f'''

Infrastructure

{f"Software: {software.get('name', 'unknown')}" if software else ""} {f" ({software.get('content_hash', '')[:16]}...)" if software.get('content_hash') else ""} {" | " if software and hardware else ""} {f"Hardware: {hardware.get('name', 'unknown')}" if hardware else ""}
''' provenance_html = f'''

Provenance

This asset was created by applying an effect to input content.

Effect

{recipe} {f'
Commit: {effects_commit[:12]}...
' if effects_commit else ''}

Input(s)

{inputs_html if inputs_html else 'No inputs recorded'}

Rendered

{rendered_at if rendered_at else 'Unknown'}
{infra_html}
''' content = f'''

← Back to Assets

{name}

{asset_type}
{content_html}

Owner

{owner}

Description

{description if description else 'No description'}

Origin

{origin_html}

Content Hash

{content_hash}

IPFS

{ipfs_html}

Created

{created_at}

Tags

{tags_html}
{provenance_html}

ActivityPub

Object URL: https://{DOMAIN}/objects/{content_hash}

Owner Actor: https://{DOMAIN}/users/{owner}

''' return HTMLResponse(base_html(f"Asset: {name}", content, username)) async def ui_user_detail(username: str, request: Request): """User detail page showing their published assets. Helper function for HTML rendering.""" current_user = get_user_from_cookie(request) if not await user_exists(username): content = f'''

User Not Found

No user named "{username}" exists.

← Back to Users

''' return HTMLResponse(base_html("User Not Found", content, current_user)) # Get user's assets registry = await load_registry() all_assets = registry.get("assets", {}) user_assets = {name: asset for name, asset in all_assets.items() if asset.get("owner") == username} # Get user's activities all_activities = await load_activities() actor_id = f"https://{DOMAIN}/users/{username}" user_activities = [a for a in all_activities if a.get("actor_id") == actor_id] webfinger = f"@{username}@{DOMAIN}" # Assets table if user_assets: rows = "" for name, asset in sorted(user_assets.items(), key=lambda x: x[1].get("created_at", ""), reverse=True): hash_short = asset.get("content_hash", "")[:16] + "..." asset_type = asset.get("asset_type", "") type_color = "bg-blue-600" if asset_type == "image" else "bg-purple-600" if asset_type == "video" else "bg-gray-600" rows += f''' {name} {asset_type} {hash_short} {", ".join(asset.get("tags", []))} ''' assets_html = f'''
{rows}
Name Type Content Hash Tags
''' else: assets_html = '

No published assets yet.

' content = f'''

← Back to Users

{username}

{webfinger}
{len(user_assets)}
Published Assets
{len(user_activities)}
Activities

ActivityPub

Actor URL: https://{DOMAIN}/users/{username}

Outbox: https://{DOMAIN}/users/{username}/outbox

Published Assets ({len(user_assets)})

{assets_html} ''' return HTMLResponse(base_html(f"User: {username}", content, current_user)) # ============ API Endpoints ============ @app.get("/") async def root(request: Request): """Server info. HTML shows home page with counts, JSON returns stats.""" registry = await load_registry() activities = await load_activities() users = await db.get_all_users() assets_count = len(registry.get("assets", {})) activities_count = len(activities) users_count = len(users) if wants_html(request): username = get_user_from_cookie(request) readme_html = markdown.markdown(README_CONTENT, extensions=['tables', 'fenced_code']) content = f'''
{assets_count}
Assets
{activities_count}
Activities
{users_count}
Users
{readme_html}
''' return HTMLResponse(base_html("Home", content, username)) return { "name": "Art DAG L2 Server", "version": "0.1.0", "domain": DOMAIN, "assets_count": assets_count, "activities_count": activities_count, "users_count": users_count } # ============ Auth Endpoints ============ security = HTTPBearer(auto_error=False) async def get_optional_user( credentials: HTTPAuthorizationCredentials = Depends(security) ) -> Optional[User]: """Get current user if authenticated, None otherwise.""" if not credentials: return None return await get_current_user(DATA_DIR, credentials.credentials) async def get_required_user( credentials: HTTPAuthorizationCredentials = Depends(security) ) -> User: """Get current user, raise 401 if not authenticated.""" if not credentials: raise HTTPException(401, "Not authenticated") user = await get_current_user(DATA_DIR, credentials.credentials) if not user: raise HTTPException(401, "Invalid token") return user @app.post("/auth/register", response_model=Token) async def register(req: UserCreate): """Register a new user.""" try: user = await create_user(DATA_DIR, req.username, req.password, req.email) except ValueError as e: raise HTTPException(400, str(e)) return create_access_token(user.username, l2_server=f"https://{DOMAIN}") @app.post("/auth/login", response_model=Token) async def login(req: UserLogin): """Login and get access token.""" user = await authenticate_user(DATA_DIR, req.username, req.password) if not user: raise HTTPException(401, "Invalid username or password") return create_access_token(user.username, l2_server=f"https://{DOMAIN}") @app.get("/auth/me") async def get_me(user: User = Depends(get_required_user)): """Get current user info.""" return { "username": user.username, "email": user.email, "created_at": user.created_at } class VerifyRequest(BaseModel): l1_server: str # URL of the L1 server requesting verification @app.post("/auth/verify") async def verify_auth( request: VerifyRequest, credentials: HTTPAuthorizationCredentials = Depends(security) ): """Verify a token and return username. Only authorized L1 servers can call this.""" if not credentials: raise HTTPException(401, "No token provided") token = credentials.credentials # Check L1 is authorized l1_normalized = request.l1_server.rstrip("/") authorized = any(l1_normalized == s.rstrip("/") for s in L1_SERVERS) if not authorized: raise HTTPException(403, f"L1 server not authorized: {request.l1_server}") # Check if token is revoked (L2-side revocation) token_hash = hashlib.sha256(token.encode()).hexdigest() if await db.is_token_revoked(token_hash): raise HTTPException(401, "Token has been revoked") # Verify token and get claims claims = get_token_claims(token) if not claims: raise HTTPException(401, "Invalid token") username = claims.get("sub") if not username: raise HTTPException(401, "Invalid token") # Check token scope - if token is scoped to an L1, it must match token_l1_server = claims.get("l1_server") if token_l1_server: token_l1_normalized = token_l1_server.rstrip("/") if token_l1_normalized != l1_normalized: raise HTTPException(403, f"Token is scoped to {token_l1_server}, not {request.l1_server}") # Record the attachment (L1 successfully verified user's token) await db.attach_renderer(username, l1_normalized) return {"username": username, "valid": True, "l1_server": request.l1_server} @app.get("/.well-known/webfinger") async def webfinger(resource: str): """WebFinger endpoint for actor discovery.""" # Parse acct:username@domain if not resource.startswith("acct:"): raise HTTPException(400, "Resource must be acct: URI") acct = resource[5:] # Remove "acct:" if "@" not in acct: raise HTTPException(400, "Invalid acct format") username, domain = acct.split("@", 1) if domain != DOMAIN: raise HTTPException(404, f"Unknown domain: {domain}") if not await user_exists(username): raise HTTPException(404, f"Unknown user: {username}") return JSONResponse( content={ "subject": resource, "links": [ { "rel": "self", "type": "application/activity+json", "href": f"https://{DOMAIN}/users/{username}" } ] }, media_type="application/jrd+json" ) @app.get("/users") async def get_users_list(request: Request, page: int = 1, limit: int = 20): """Get all users. HTML for browsers (with infinite scroll), JSON for APIs (with pagination).""" all_users = list((await db.get_all_users()).items()) total = len(all_users) # Sort by username all_users.sort(key=lambda x: x[0]) # Pagination start = (page - 1) * limit end = start + limit users_page = all_users[start:end] has_more = end < total if wants_html(request): username = get_user_from_cookie(request) if not users_page: if page == 1: content = '''

Users

No users registered yet.

''' else: return HTMLResponse("") # Empty for infinite scroll else: rows = "" for uname, user_data in users_page: webfinger = f"@{uname}@{DOMAIN}" created_at = format_date(user_data.get("created_at")) rows += f''' {uname} {webfinger} {created_at} ''' # For infinite scroll, just return rows if not first page if page > 1: if has_more: rows += f''' Loading more... ''' return HTMLResponse(rows) # First page - full content infinite_scroll_trigger = "" if has_more: infinite_scroll_trigger = f''' Loading more... ''' content = f'''

Users ({total} total)

{rows} {infinite_scroll_trigger}
Username WebFinger Created
''' return HTMLResponse(base_html("Users", content, username)) # JSON response for APIs return { "users": [{"username": uname, **data} for uname, data in users_page], "pagination": { "page": page, "limit": limit, "total": total, "has_more": has_more } } @app.get("/users/{username}") async def get_actor(username: str, request: Request): """Get actor profile for any registered user. Content negotiation: HTML for browsers, JSON for APIs.""" if not await user_exists(username): if wants_html(request): content = f'''

User Not Found

No user named "{username}" exists.

← Back to Users

''' return HTMLResponse(base_html("User Not Found", content, get_user_from_cookie(request))) raise HTTPException(404, f"Unknown user: {username}") if wants_html(request): # Render user detail page return await ui_user_detail(username, request) actor = load_actor(username) # Add ActivityPub context actor["@context"] = [ "https://www.w3.org/ns/activitystreams", "https://w3id.org/security/v1" ] return JSONResponse( content=actor, media_type="application/activity+json" ) @app.get("/users/{username}/outbox") async def get_outbox(username: str, page: bool = False): """Get actor's outbox (activities they created).""" if not await user_exists(username): raise HTTPException(404, f"Unknown user: {username}") # Filter activities by this user's actor_id all_activities = await load_activities() actor_id = f"https://{DOMAIN}/users/{username}" user_activities = [a for a in all_activities if a.get("actor_id") == actor_id] if not page: return JSONResponse( content={ "@context": "https://www.w3.org/ns/activitystreams", "id": f"https://{DOMAIN}/users/{username}/outbox", "type": "OrderedCollection", "totalItems": len(user_activities), "first": f"https://{DOMAIN}/users/{username}/outbox?page=true" }, media_type="application/activity+json" ) # Return activities page return JSONResponse( content={ "@context": "https://www.w3.org/ns/activitystreams", "id": f"https://{DOMAIN}/users/{username}/outbox?page=true", "type": "OrderedCollectionPage", "partOf": f"https://{DOMAIN}/users/{username}/outbox", "orderedItems": user_activities }, media_type="application/activity+json" ) @app.post("/users/{username}/inbox") async def post_inbox(username: str, request: Request): """Receive activities from other servers.""" if not await user_exists(username): raise HTTPException(404, f"Unknown user: {username}") body = await request.json() activity_type = body.get("type") # Handle Follow requests if activity_type == "Follow": follower_url = body.get("actor") # Add follower to database await db.add_follower(username, follower_url, follower_url) # Send Accept (in production, do this async) # For now just acknowledge return {"status": "accepted"} # Handle other activity types return {"status": "received"} @app.get("/users/{username}/followers") async def get_followers(username: str): """Get actor's followers.""" if not await user_exists(username): raise HTTPException(404, f"Unknown user: {username}") # TODO: Per-user followers - for now use global followers followers = await load_followers() return JSONResponse( content={ "@context": "https://www.w3.org/ns/activitystreams", "id": f"https://{DOMAIN}/users/{username}/followers", "type": "OrderedCollection", "totalItems": len(followers), "orderedItems": followers }, media_type="application/activity+json" ) # ============ Assets Endpoints ============ @app.get("/assets") async def get_registry(request: Request, page: int = 1, limit: int = 20): """Get registry. HTML for browsers (with infinite scroll), JSON for APIs (with pagination).""" registry = await load_registry() all_assets = list(registry.get("assets", {}).items()) total = len(all_assets) # Sort by created_at descending all_assets.sort(key=lambda x: x[1].get("created_at", ""), reverse=True) # Pagination start = (page - 1) * limit end = start + limit assets_page = all_assets[start:end] has_more = end < total if wants_html(request): username = get_user_from_cookie(request) if not assets_page: if page == 1: content = '''

Registry

No assets registered yet.

''' else: return HTMLResponse("") # Empty for infinite scroll else: rows = "" for name, asset in assets_page: asset_type = asset.get("asset_type", "") type_color = "bg-blue-600" if asset_type == "image" else "bg-purple-600" if asset_type == "video" else "bg-gray-600" owner = asset.get("owner", "unknown") content_hash = asset.get("content_hash", "")[:16] + "..." rows += f''' {name} {asset_type} {owner} {content_hash} View ''' # For infinite scroll, just return rows if not first page if page > 1: if has_more: rows += f''' Loading more... ''' return HTMLResponse(rows) # First page - full content infinite_scroll_trigger = "" if has_more: infinite_scroll_trigger = f''' Loading more... ''' content = f'''

Registry ({total} assets)

{rows} {infinite_scroll_trigger}
Name Type Owner Hash
''' return HTMLResponse(base_html("Registry", content, username)) # JSON response for APIs return { "assets": {name: asset for name, asset in assets_page}, "pagination": { "page": page, "limit": limit, "total": total, "has_more": has_more } } @app.get("/asset/{name}") async def get_asset_by_name_legacy(name: str): """Legacy route - redirect to /assets/{name}.""" return RedirectResponse(url=f"/assets/{name}", status_code=301) @app.get("/assets/{name}") async def get_asset(name: str, request: Request): """Get asset by name. HTML for browsers (default), JSON only if explicitly requested.""" registry = await load_registry() # Check if JSON explicitly requested accept = request.headers.get("accept", "") wants_json = "application/json" in accept and "text/html" not in accept if name not in registry.get("assets", {}): if wants_json: raise HTTPException(404, f"Asset not found: {name}") content = f'''

Asset Not Found

No asset named "{name}" exists.

← Back to Assets

''' return HTMLResponse(base_html("Asset Not Found", content, get_user_from_cookie(request))) if wants_json: return registry["assets"][name] # Default to HTML for browsers return await ui_asset_detail(name, request) @app.get("/assets/by-run-id/{run_id}") async def get_asset_by_run_id(run_id: str): """ Get asset by content-addressable run_id. Returns the asset info including output_hash and ipfs_cid for L1 recovery. The run_id is stored in the asset's provenance when the run is recorded. """ asset = await db.get_asset_by_run_id(run_id) if not asset: raise HTTPException(404, f"No asset found for run_id: {run_id}") return { "run_id": run_id, "asset_name": asset.get("name"), "output_hash": asset.get("content_hash"), "ipfs_cid": asset.get("ipfs_cid"), "provenance_cid": asset.get("provenance", {}).get("provenance_cid") if asset.get("provenance") else None, } @app.patch("/assets/{name}") async def update_asset(name: str, req: UpdateAssetRequest, user: User = Depends(get_required_user)): """Update an existing asset's metadata. Creates an Update activity.""" asset = await db.get_asset(name) if not asset: raise HTTPException(404, f"Asset not found: {name}") # Check ownership if asset.get("owner") != user.username: raise HTTPException(403, f"Not authorized to update asset owned by {asset.get('owner')}") # Build updates dict updates = {} if req.description is not None: updates["description"] = req.description if req.tags is not None: updates["tags"] = req.tags if req.metadata is not None: updates["metadata"] = {**asset.get("metadata", {}), **req.metadata} if req.origin is not None: updates["origin"] = req.origin if req.ipfs_cid is not None: updates["ipfs_cid"] = req.ipfs_cid # Pin on IPFS (fire-and-forget, don't block) import threading threading.Thread(target=_pin_ipfs_async, args=(req.ipfs_cid,), daemon=True).start() # Update asset in database updated_asset = await db.update_asset(name, updates) # Create Update activity activity = { "activity_id": str(uuid.uuid4()), "activity_type": "Update", "actor_id": f"https://{DOMAIN}/users/{user.username}", "object_data": { "type": updated_asset.get("asset_type", "Object").capitalize(), "name": name, "id": f"https://{DOMAIN}/objects/{updated_asset['content_hash']}", "contentHash": { "algorithm": "sha3-256", "value": updated_asset["content_hash"] }, "attributedTo": f"https://{DOMAIN}/users/{user.username}", "summary": req.description, "tag": req.tags or updated_asset.get("tags", []) }, "published": updated_asset.get("updated_at", datetime.now(timezone.utc).isoformat()) } # Sign activity with the user's keys activity = sign_activity(activity, user.username) # Save activity to database await db.create_activity(activity) return {"asset": updated_asset, "activity": activity} def _pin_ipfs_async(cid: str): """Pin IPFS content in background thread.""" try: import ipfs_client if ipfs_client.is_available(): ipfs_client.pin(cid) logger.info(f"Pinned IPFS content: {cid}") except Exception as e: logger.warning(f"Failed to pin IPFS content {cid}: {e}") async def _register_asset_impl(req: RegisterRequest, owner: str): """ Internal implementation for registering an asset atomically. Requires IPFS CID - content must be on IPFS before registering. Uses a transaction for all DB operations. """ import ipfs_client from ipfs_client import IPFSError logger.info(f"register_asset: Starting for {req.name} (hash={req.content_hash[:16]}...)") # ===== PHASE 1: VALIDATION ===== # IPFS CID is required if not req.ipfs_cid: raise HTTPException(400, "IPFS CID is required for registration") # Check if name exists - return existing asset if so existing = await db.get_asset(req.name) if existing: logger.info(f"register_asset: Asset {req.name} already exists, returning existing") return {"asset": existing, "activity": None, "existing": True} # ===== PHASE 2: IPFS OPERATIONS (non-blocking) ===== import asyncio logger.info(f"register_asset: Pinning CID {req.ipfs_cid[:16]}... on IPFS") try: await asyncio.to_thread(ipfs_client.pin_or_raise, req.ipfs_cid) logger.info("register_asset: CID pinned successfully") except IPFSError as e: logger.error(f"register_asset: IPFS pin failed: {e}") raise HTTPException(500, f"IPFS operation failed: {e}") # ===== PHASE 3: DB TRANSACTION ===== now = datetime.now(timezone.utc).isoformat() try: async with db.transaction() as conn: # Check name again inside transaction (race condition protection) if await db.asset_exists_by_name_tx(conn, req.name): # Race condition - another request created it first, return existing existing = await db.get_asset(req.name) logger.info(f"register_asset: Asset {req.name} created by concurrent request") return {"asset": existing, "activity": None, "existing": True} # Create asset asset = { "name": req.name, "content_hash": req.content_hash, "ipfs_cid": req.ipfs_cid, "asset_type": req.asset_type, "tags": req.tags, "metadata": req.metadata, "url": req.url, "provenance": req.provenance, "owner": owner, "created_at": now } created_asset = await db.create_asset_tx(conn, asset) # Create ownership activity object_data = { "type": req.asset_type.capitalize(), "name": req.name, "id": f"https://{DOMAIN}/objects/{req.content_hash}", "contentHash": { "algorithm": "sha3-256", "value": req.content_hash }, "attributedTo": f"https://{DOMAIN}/users/{owner}" } # Include provenance in activity object_data if present if req.provenance: object_data["provenance"] = req.provenance activity = { "activity_id": req.content_hash, # Content-addressable by content hash "activity_type": "Create", "actor_id": f"https://{DOMAIN}/users/{owner}", "object_data": object_data, "published": now } activity = sign_activity(activity, owner) created_activity = await db.create_activity_tx(conn, activity) # Transaction commits here on successful exit except HTTPException: raise except Exception as e: logger.error(f"register_asset: Database transaction failed: {e}") raise HTTPException(500, f"Failed to register asset: {e}") logger.info(f"register_asset: Successfully registered {req.name}") return {"asset": created_asset, "activity": created_activity} @app.post("/assets") async def register_asset(req: RegisterRequest, user: User = Depends(get_required_user)): """Register a new asset and create ownership activity. Requires authentication.""" return await _register_asset_impl(req, user.username) @app.post("/assets/record-run") @app.post("/registry/record-run") # Legacy route async def record_run(req: RecordRunRequest, user: User = Depends(get_required_user)): """ Record an L1 run and register the output atomically. Ensures all operations succeed or none do: 1. All input assets registered (if not already on L2) + pinned on IPFS 2. Output asset registered + pinned on IPFS 3. Recipe serialized to JSON, stored on IPFS, CID saved in provenance """ import ipfs_client from ipfs_client import IPFSError # ===== PHASE 1: PREPARATION (read-only, non-blocking) ===== import asyncio l1_url = req.l1_server.rstrip('/') logger.info(f"record_run: Starting for run_id={req.run_id} from {l1_url}") # Helper to fetch from L1 without blocking event loop def fetch_l1_run(): import time as _time url = f"{l1_url}/runs/{req.run_id}" logger.info(f"record_run: Fetching run from L1: {url}") t0 = _time.time() resp = requests.get(url, timeout=30) logger.info(f"record_run: L1 request took {_time.time()-t0:.3f}s, status={resp.status_code}") if resp.status_code == 404: raise ValueError(f"Run not found on L1: {req.run_id}") resp.raise_for_status() try: return resp.json() except Exception: body_preview = resp.text[:200] if resp.text else "(empty)" logger.error(f"L1 returned non-JSON for {url}: status={resp.status_code}, body={body_preview}") raise ValueError(f"L1 returned invalid response: {body_preview[:100]}") def fetch_l1_cache(content_hash): logger.debug(f"record_run: Fetching cache {content_hash[:16]}... from L1") url = f"{l1_url}/cache/{content_hash}" resp = requests.get(url, headers={"Accept": "application/json"}, timeout=10) if resp.status_code == 404: raise ValueError(f"Cache item not found on L1: {content_hash[:16]}...") resp.raise_for_status() try: return resp.json() except Exception as e: # Log what we actually got back body_preview = resp.text[:200] if resp.text else "(empty)" logger.error(f"L1 returned non-JSON for {url}: status={resp.status_code}, body={body_preview}") raise ValueError(f"L1 returned invalid response (status={resp.status_code}): {body_preview[:100]}") # Fetch run from L1 try: run = await asyncio.to_thread(fetch_l1_run) logger.info(f"record_run: Fetched run, status={run.get('status')}, inputs={len(run.get('inputs', []))}") except Exception as e: logger.error(f"record_run: Failed to fetch run from L1: {e}") raise HTTPException(400, f"Failed to fetch run from L1 ({l1_url}): {e}") if run.get("status") != "completed": raise HTTPException(400, f"Run not completed: {run.get('status')}") output_hash = run.get("output_hash") if not output_hash: raise HTTPException(400, "Run has no output hash") # Fetch output cache info from L1 (must exist - it's new) logger.info(f"record_run: Fetching output cache {output_hash[:16]}... from L1") try: cache_info = await asyncio.to_thread(fetch_l1_cache, output_hash) output_media_type = cache_info.get("media_type", "image") output_ipfs_cid = cache_info.get("ipfs_cid") logger.info(f"record_run: Output has IPFS CID: {output_ipfs_cid[:16] if output_ipfs_cid else 'None'}...") except Exception as e: logger.error(f"record_run: Failed to fetch output cache info: {e}") raise HTTPException(400, f"Failed to fetch output cache info: {e}") if not output_ipfs_cid: logger.error("record_run: Output has no IPFS CID") raise HTTPException(400, "Output has no IPFS CID - cannot publish") # Gather input info: check L2 first, then fall back to L1 input_hashes = run.get("inputs", []) input_infos = [] # List of {content_hash, ipfs_cid, media_type, existing_asset} logger.info(f"record_run: Gathering info for {len(input_hashes)} inputs") for input_hash in input_hashes: # Check if already on L2 existing = await db.get_asset_by_hash(input_hash) if existing and existing.get("ipfs_cid"): logger.info(f"record_run: Input {input_hash[:16]}... found on L2") input_infos.append({ "content_hash": input_hash, "ipfs_cid": existing["ipfs_cid"], "media_type": existing.get("asset_type", "image"), "existing_asset": existing }) else: # Not on L2, try L1 logger.info(f"record_run: Input {input_hash[:16]}... not on L2, fetching from L1") try: inp_info = await asyncio.to_thread(fetch_l1_cache, input_hash) ipfs_cid = inp_info.get("ipfs_cid") if not ipfs_cid: logger.error(f"record_run: Input {input_hash[:16]}... has no IPFS CID") raise HTTPException(400, f"Input {input_hash[:16]}... has no IPFS CID (not on L2 or L1)") input_infos.append({ "content_hash": input_hash, "ipfs_cid": ipfs_cid, "media_type": inp_info.get("media_type", "image"), "existing_asset": None }) except HTTPException: raise except Exception as e: logger.error(f"record_run: Failed to fetch input {input_hash[:16]}... from L1: {e}") raise HTTPException(400, f"Input {input_hash[:16]}... not on L2 and failed to fetch from L1: {e}") # Prepare recipe data recipe_data = run.get("recipe") if not recipe_data: recipe_data = { "name": run.get("recipe_name", "unknown"), "effect_url": run.get("effect_url"), "effects_commit": run.get("effects_commit"), } # Build registered_inputs list - all referenced by content_hash registered_inputs = [] for inp in input_infos: registered_inputs.append({ "content_hash": inp["content_hash"], "ipfs_cid": inp["ipfs_cid"] }) # ===== PHASE 2: IPFS OPERATIONS (non-blocking for event loop) ===== def do_ipfs_operations(): """Run IPFS operations in thread pool to not block event loop.""" from concurrent.futures import ThreadPoolExecutor, as_completed # Collect all CIDs to pin (inputs + output) cids_to_pin = [inp["ipfs_cid"] for inp in input_infos] + [output_ipfs_cid] logger.info(f"record_run: Pinning {len(cids_to_pin)} CIDs on IPFS") # Pin all in parallel with ThreadPoolExecutor(max_workers=5) as executor: futures = {executor.submit(ipfs_client.pin_or_raise, cid): cid for cid in cids_to_pin} for future in as_completed(futures): future.result() # Raises IPFSError if failed logger.info("record_run: All CIDs pinned successfully") # Store recipe on IPFS logger.info("record_run: Storing recipe on IPFS") recipe_cid = ipfs_client.add_json(recipe_data) # Build and store full provenance on IPFS # Compute content-addressable run_id from inputs + recipe recipe_name = recipe_data.get("name", "unknown") if isinstance(recipe_data, dict) else str(recipe_data) run_id = compute_run_id(input_hashes, recipe_name) provenance = { "run_id": run_id, # Content-addressable run identifier "inputs": registered_inputs, "output": { "content_hash": output_hash, "ipfs_cid": output_ipfs_cid }, "recipe": recipe_data, "recipe_cid": recipe_cid, "effect_url": run.get("effect_url"), "effects_commit": run.get("effects_commit"), "l1_server": l1_url, "l1_run_id": req.run_id, "rendered_at": run.get("completed_at"), "infrastructure": run.get("infrastructure") } logger.info("record_run: Storing provenance on IPFS") provenance_cid = ipfs_client.add_json(provenance) return recipe_cid, provenance_cid, provenance try: import asyncio recipe_cid, provenance_cid, provenance = await asyncio.to_thread(do_ipfs_operations) logger.info(f"record_run: Recipe CID: {recipe_cid[:16]}..., Provenance CID: {provenance_cid[:16]}...") except IPFSError as e: logger.error(f"record_run: IPFS operation failed: {e}") raise HTTPException(500, f"IPFS operation failed: {e}") # ===== PHASE 3: DB TRANSACTION (all-or-nothing) ===== logger.info("record_run: Starting DB transaction") now = datetime.now(timezone.utc).isoformat() # Add provenance_cid to provenance for storage in DB provenance["provenance_cid"] = provenance_cid try: async with db.transaction() as conn: # Register input assets (if not already on L2) - named by content_hash for inp in input_infos: if not inp["existing_asset"]: media_type = inp["media_type"] tags = ["auto-registered", "input"] if media_type == "recipe": tags.append("recipe") input_asset = { "name": inp["content_hash"], # Use content_hash as name "content_hash": inp["content_hash"], "ipfs_cid": inp["ipfs_cid"], "asset_type": media_type, "tags": tags, "metadata": {"auto_registered_from_run": req.run_id}, "owner": user.username, "created_at": now } await db.create_asset_tx(conn, input_asset) # Check if output already exists (by content_hash) - return existing if so existing = await db.get_asset_by_name_tx(conn, output_hash) if existing: logger.info(f"record_run: Output {output_hash[:16]}... already exists") # Check if activity already exists for this run existing_activity = await db.get_activity(provenance["run_id"]) if existing_activity: logger.info(f"record_run: Activity {provenance['run_id'][:16]}... also exists") return {"asset": existing, "activity": existing_activity, "existing": True} # Asset exists but no activity - create one logger.info(f"record_run: Creating activity for existing asset") object_data = { "type": existing.get("asset_type", "image").capitalize(), "name": output_hash, "id": f"https://{DOMAIN}/objects/{output_hash}", "contentHash": { "algorithm": "sha3-256", "value": output_hash }, "attributedTo": f"https://{DOMAIN}/users/{user.username}", "provenance": provenance } activity = { "activity_id": provenance["run_id"], "activity_type": "Create", "actor_id": f"https://{DOMAIN}/users/{user.username}", "object_data": object_data, "published": now } activity = sign_activity(activity, user.username) created_activity = await db.create_activity_tx(conn, activity) return {"asset": existing, "activity": created_activity, "existing": True} # Create output asset with provenance - named by content_hash output_asset = { "name": output_hash, # Use content_hash as name "content_hash": output_hash, "ipfs_cid": output_ipfs_cid, "asset_type": output_media_type, "tags": ["rendered", "l1"], "metadata": {"l1_server": l1_url, "l1_run_id": req.run_id}, "provenance": provenance, "owner": user.username, "created_at": now } created_asset = await db.create_asset_tx(conn, output_asset) # Create activity - all referenced by content_hash object_data = { "type": output_media_type.capitalize(), "name": output_hash, # Use content_hash as name "id": f"https://{DOMAIN}/objects/{output_hash}", "contentHash": { "algorithm": "sha3-256", "value": output_hash }, "attributedTo": f"https://{DOMAIN}/users/{user.username}", "provenance": provenance } activity = { "activity_id": provenance["run_id"], # Content-addressable run_id "activity_type": "Create", "actor_id": f"https://{DOMAIN}/users/{user.username}", "object_data": object_data, "published": now } activity = sign_activity(activity, user.username) created_activity = await db.create_activity_tx(conn, activity) # Transaction commits here on successful exit except HTTPException: raise except Exception as e: logger.error(f"record_run: Database transaction failed: {e}") raise HTTPException(500, f"Failed to record run: {e}") logger.info(f"record_run: Successfully published {output_hash[:16]}... with {len(registered_inputs)} inputs") return {"asset": created_asset, "activity": created_activity} @app.post("/assets/publish-cache") async def publish_cache(req: PublishCacheRequest, user: User = Depends(get_required_user)): """ Publish a cache item from L1 with metadata atomically. Requires origin to be set (self or external URL). Requires IPFS CID - content must be on IPFS before publishing. Creates a new asset and Create activity in a single transaction. """ import ipfs_client from ipfs_client import IPFSError logger.info(f"publish_cache: Starting for {req.asset_name} (hash={req.content_hash[:16]}...)") # ===== PHASE 1: VALIDATION ===== # Validate origin if not req.origin or "type" not in req.origin: raise HTTPException(400, "Origin is required for publishing (type: 'self' or 'external')") origin_type = req.origin.get("type") if origin_type not in ("self", "external"): raise HTTPException(400, "Origin type must be 'self' or 'external'") if origin_type == "external" and not req.origin.get("url"): raise HTTPException(400, "External origin requires a URL") # IPFS CID is now required if not req.ipfs_cid: raise HTTPException(400, "IPFS CID is required for publishing") # Check if asset name already exists if await db.asset_exists(req.asset_name): raise HTTPException(400, f"Asset name already exists: {req.asset_name}") # ===== PHASE 2: IPFS OPERATIONS (non-blocking) ===== import asyncio logger.info(f"publish_cache: Pinning CID {req.ipfs_cid[:16]}... on IPFS") try: await asyncio.to_thread(ipfs_client.pin_or_raise, req.ipfs_cid) logger.info("publish_cache: CID pinned successfully") except IPFSError as e: logger.error(f"publish_cache: IPFS pin failed: {e}") raise HTTPException(500, f"IPFS operation failed: {e}") # ===== PHASE 3: DB TRANSACTION ===== logger.info("publish_cache: Starting DB transaction") now = datetime.now(timezone.utc).isoformat() try: async with db.transaction() as conn: # Check name again inside transaction (race condition protection) if await db.asset_exists_by_name_tx(conn, req.asset_name): raise HTTPException(400, f"Asset name already exists: {req.asset_name}") # Create asset asset = { "name": req.asset_name, "content_hash": req.content_hash, "ipfs_cid": req.ipfs_cid, "asset_type": req.asset_type, "tags": req.tags, "description": req.description, "origin": req.origin, "metadata": req.metadata, "owner": user.username, "created_at": now } created_asset = await db.create_asset_tx(conn, asset) # Create ownership activity with origin info object_data = { "type": req.asset_type.capitalize(), "name": req.asset_name, "id": f"https://{DOMAIN}/objects/{req.content_hash}", "contentHash": { "algorithm": "sha3-256", "value": req.content_hash }, "attributedTo": f"https://{DOMAIN}/users/{user.username}", "tag": req.tags } if req.description: object_data["summary"] = req.description # Include origin in ActivityPub object if origin_type == "self": object_data["generator"] = { "type": "Application", "name": "Art DAG", "note": "Original content created by the author" } else: object_data["source"] = { "type": "Link", "href": req.origin.get("url"), "name": req.origin.get("note", "External source") } activity = { "activity_id": req.content_hash, # Content-addressable by content hash "activity_type": "Create", "actor_id": f"https://{DOMAIN}/users/{user.username}", "object_data": object_data, "published": now } activity = sign_activity(activity, user.username) created_activity = await db.create_activity_tx(conn, activity) # Transaction commits here on successful exit except HTTPException: raise except Exception as e: logger.error(f"publish_cache: Database transaction failed: {e}") raise HTTPException(500, f"Failed to publish cache item: {e}") logger.info(f"publish_cache: Successfully published {req.asset_name}") return {"asset": created_asset, "activity": created_activity} # ============ Activities Endpoints ============ @app.get("/activities") async def get_activities(request: Request, page: int = 1, limit: int = 20): """Get activities. HTML for browsers (with infinite scroll), JSON for APIs (with pagination).""" all_activities = await load_activities() total = len(all_activities) # Reverse for newest first all_activities = list(reversed(all_activities)) # Pagination start = (page - 1) * limit end = start + limit activities_page = all_activities[start:end] has_more = end < total if wants_html(request): username = get_user_from_cookie(request) if not activities_page: if page == 1: content = '''

Activities

No activities yet.

''' else: return HTMLResponse("") # Empty for infinite scroll else: rows = "" for i, activity in enumerate(activities_page): activity_index = total - 1 - (start + i) # Original index obj = activity.get("object_data", {}) activity_type = activity.get("activity_type", "") type_color = "bg-green-600" if activity_type == "Create" else "bg-yellow-600" if activity_type == "Update" else "bg-gray-600" actor_id = activity.get("actor_id", "") actor_name = actor_id.split("/")[-1] if actor_id else "unknown" rows += f''' {activity_type} {obj.get("name", "Untitled")} {actor_name} {format_date(activity.get("published"))} View ''' # For infinite scroll, just return rows if not first page if page > 1: if has_more: rows += f''' Loading more... ''' return HTMLResponse(rows) # First page - full content with header infinite_scroll_trigger = "" if has_more: infinite_scroll_trigger = f''' Loading more... ''' content = f'''

Activities ({total} total)

{rows} {infinite_scroll_trigger}
Type Object Actor Published
''' return HTMLResponse(base_html("Activities", content, username)) # JSON response for APIs return { "activities": activities_page, "pagination": { "page": page, "limit": limit, "total": total, "has_more": has_more } } @app.get("/activities/{activity_ref}") async def get_activity_detail(activity_ref: str, request: Request): """Get single activity by index or activity_id. HTML for browsers (default), JSON only if explicitly requested.""" # Check if JSON explicitly requested accept = request.headers.get("accept", "") wants_json = ("application/json" in accept or "application/activity+json" in accept) and "text/html" not in accept activity = None activity_index = None # Check if it's a numeric index or an activity_id (hash) if activity_ref.isdigit(): # Numeric index (legacy) activity_index = int(activity_ref) activities = await load_activities() if 0 <= activity_index < len(activities): activity = activities[activity_index] else: # Activity ID (hash) - look up directly activity = await db.get_activity(activity_ref) if activity: # Find index for UI rendering activities = await load_activities() for i, a in enumerate(activities): if a.get("activity_id") == activity_ref: activity_index = i break if not activity: if wants_json: raise HTTPException(404, "Activity not found") content = '''

Activity Not Found

This activity does not exist.

← Back to Activities

''' return HTMLResponse(base_html("Activity Not Found", content, get_user_from_cookie(request))) if wants_json: return activity # Default to HTML for browsers if activity_index is not None: return await ui_activity_detail(activity_index, request) else: # Render activity directly if no index found return await ui_activity_detail_by_data(activity, request) @app.get("/activity/{activity_index}") async def get_activity_legacy(activity_index: int): """Legacy route - redirect to /activities/{activity_index}.""" return RedirectResponse(url=f"/activities/{activity_index}", status_code=301) @app.get("/objects/{content_hash}") async def get_object(content_hash: str, request: Request): """Get object by content hash. Content negotiation: HTML for browsers, JSON for APIs.""" registry = await load_registry() # Find asset by hash for name, asset in registry.get("assets", {}).items(): if asset.get("content_hash") == content_hash: # Check Accept header - only return JSON if explicitly requested accept = request.headers.get("accept", "") wants_json = ("application/json" in accept or "application/activity+json" in accept) and "text/html" not in accept if not wants_json: # Default: redirect to detail page for browsers return RedirectResponse(url=f"/assets/{name}", status_code=303) owner = asset.get("owner", "unknown") return JSONResponse( content={ "@context": "https://www.w3.org/ns/activitystreams", "id": f"https://{DOMAIN}/objects/{content_hash}", "type": asset.get("asset_type", "Object").capitalize(), "name": name, "contentHash": { "algorithm": "sha3-256", "value": content_hash }, "attributedTo": f"https://{DOMAIN}/users/{owner}", "published": asset.get("created_at") }, media_type="application/activity+json" ) raise HTTPException(404, f"Object not found: {content_hash}") # ============ Anchoring (Bitcoin timestamps) ============ @app.post("/anchors/create") async def create_anchor_endpoint(request: Request): """ Create a new anchor for all unanchored activities. Builds a merkle tree, stores it on IPFS, and submits to OpenTimestamps for Bitcoin anchoring. The anchor proof is backed up to persistent storage. """ import anchoring import ipfs_client # Check auth (cookie or header) username = get_user_from_cookie(request) if not username: if wants_html(request): return HTMLResponse('''
Error: Login required
''') raise HTTPException(401, "Authentication required") # Get unanchored activities unanchored = await db.get_unanchored_activities() if not unanchored: if wants_html(request): return HTMLResponse('''
No unanchored activities to anchor.
''') return {"message": "No unanchored activities", "anchored": 0} activity_ids = [a["activity_id"] for a in unanchored] # Create anchor anchor = await anchoring.create_anchor(activity_ids, db, ipfs_client) if anchor: if wants_html(request): return HTMLResponse(f'''
Success! Anchored {len(activity_ids)} activities.
Merkle root: {anchor["merkle_root"][:32]}...
Refresh page to see the new anchor.
''') return { "message": f"Anchored {len(activity_ids)} activities", "merkle_root": anchor["merkle_root"], "tree_ipfs_cid": anchor.get("tree_ipfs_cid"), "activity_count": anchor["activity_count"] } else: if wants_html(request): return HTMLResponse('''
Failed! Could not create anchor.
''') raise HTTPException(500, "Failed to create anchor") @app.get("/anchors") async def list_anchors(): """List all anchors.""" anchors = await db.get_all_anchors() stats = await db.get_anchor_stats() return { "anchors": anchors, "stats": stats } @app.get("/anchors/{merkle_root}") async def get_anchor_endpoint(merkle_root: str): """Get anchor details by merkle root.""" anchor = await db.get_anchor(merkle_root) if not anchor: raise HTTPException(404, f"Anchor not found: {merkle_root}") return anchor @app.get("/anchors/{merkle_root}/tree") async def get_anchor_tree(merkle_root: str): """Get the full merkle tree from IPFS.""" import asyncio import ipfs_client anchor = await db.get_anchor(merkle_root) if not anchor: raise HTTPException(404, f"Anchor not found: {merkle_root}") tree_cid = anchor.get("tree_ipfs_cid") if not tree_cid: raise HTTPException(404, "Anchor has no tree on IPFS") try: tree_bytes = await asyncio.to_thread(ipfs_client.get_bytes, tree_cid) if tree_bytes: return json.loads(tree_bytes) except Exception as e: raise HTTPException(500, f"Failed to fetch tree from IPFS: {e}") @app.get("/anchors/verify/{activity_id}") async def verify_activity_anchor(activity_id: str): """ Verify an activity's anchor proof. Returns the merkle proof showing this activity is included in an anchored batch. """ import anchoring import ipfs_client # Get activity activity = await db.get_activity(activity_id) if not activity: raise HTTPException(404, f"Activity not found: {activity_id}") anchor_root = activity.get("anchor_root") if not anchor_root: return {"verified": False, "reason": "Activity not yet anchored"} # Get anchor anchor = await db.get_anchor(anchor_root) if not anchor: return {"verified": False, "reason": "Anchor record not found"} # Get tree from IPFS (non-blocking) import asyncio tree_cid = anchor.get("tree_ipfs_cid") if not tree_cid: return {"verified": False, "reason": "Merkle tree not on IPFS"} try: tree_bytes = await asyncio.to_thread(ipfs_client.get_bytes, tree_cid) tree = json.loads(tree_bytes) if tree_bytes else None except Exception: return {"verified": False, "reason": "Failed to fetch tree from IPFS"} if not tree: return {"verified": False, "reason": "Could not load merkle tree"} # Get proof proof = anchoring.get_merkle_proof(tree, activity_id) if not proof: return {"verified": False, "reason": "Activity not in merkle tree"} # Verify proof valid = anchoring.verify_merkle_proof(activity_id, proof, anchor_root) return { "verified": valid, "activity_id": activity_id, "merkle_root": anchor_root, "tree_ipfs_cid": tree_cid, "proof": proof, "bitcoin_txid": anchor.get("bitcoin_txid"), "confirmed_at": anchor.get("confirmed_at") } @app.post("/anchors/{merkle_root}/upgrade") async def upgrade_anchor_proof(merkle_root: str): """ Try to upgrade an OTS proof from pending to confirmed. Bitcoin confirmation typically takes 1-2 hours. Call this periodically to check if the proof has been included in a Bitcoin block. """ import anchoring import ipfs_client import asyncio anchor = await db.get_anchor(merkle_root) if not anchor: raise HTTPException(404, f"Anchor not found: {merkle_root}") if anchor.get("confirmed_at"): return {"status": "already_confirmed", "bitcoin_txid": anchor.get("bitcoin_txid")} # Get current OTS proof from IPFS ots_cid = anchor.get("ots_proof_cid") if not ots_cid: return {"status": "no_proof", "message": "No OTS proof stored"} try: ots_proof = await asyncio.to_thread(ipfs_client.get_bytes, ots_cid) if not ots_proof: return {"status": "error", "message": "Could not fetch OTS proof from IPFS"} except Exception as e: return {"status": "error", "message": f"IPFS error: {e}"} # Try to upgrade upgraded = await asyncio.to_thread(anchoring.upgrade_ots_proof, ots_proof) if upgraded and len(upgraded) > len(ots_proof): # Store upgraded proof on IPFS try: new_cid = await asyncio.to_thread(ipfs_client.add_bytes, upgraded) # TODO: Update anchor record with new CID and confirmed status return { "status": "upgraded", "message": "Proof upgraded - Bitcoin confirmation received", "new_ots_cid": new_cid, "proof_size": len(upgraded) } except Exception as e: return {"status": "error", "message": f"Failed to store upgraded proof: {e}"} else: return { "status": "pending", "message": "Not yet confirmed on Bitcoin. Try again in ~1 hour.", "proof_size": len(ots_proof) if ots_proof else 0 } @app.get("/anchors/ui", response_class=HTMLResponse) async def anchors_ui(request: Request): """Anchors UI page - view and test OpenTimestamps anchoring.""" username = get_user_from_cookie(request) anchors = await db.get_all_anchors() stats = await db.get_anchor_stats() # Build anchors table rows rows = "" for anchor in anchors: status = "confirmed" if anchor.get("confirmed_at") else "pending" status_class = "text-green-400" if status == "confirmed" else "text-yellow-400" merkle_root = anchor.get("merkle_root", "")[:16] + "..." rows += f''' {merkle_root} {anchor.get("activity_count", 0)} {status} {format_date(anchor.get("created_at"), 16)} ''' if not rows: rows = 'No anchors yet' content = f'''

Bitcoin Anchoring via OpenTimestamps

{stats.get("total_anchors", 0)}
Total Anchors
{stats.get("confirmed_anchors", 0)}
Confirmed
{stats.get("pending_anchors", 0)}
Pending

Test Anchoring

Create a test anchor for unanchored activities, or test the OTS connection.

Anchors

{rows}
Merkle Root Activities Status Created Actions

How it works:

  1. Activities are batched and hashed into a merkle tree
  2. The merkle root is submitted to OpenTimestamps
  3. OTS aggregates hashes and anchors to Bitcoin (~1-2 hours)
  4. Once confirmed, anyone can verify the timestamp
''' return HTMLResponse(base_html("Anchors", content, username)) @app.post("/anchors/test-ots", response_class=HTMLResponse) async def test_ots_connection(): """Test OpenTimestamps connection by submitting a test hash.""" import anchoring import hashlib import asyncio # Create a test hash test_data = f"test-{datetime.now(timezone.utc).isoformat()}" test_hash = hashlib.sha256(test_data.encode()).hexdigest() # Try to submit try: ots_proof = await asyncio.to_thread(anchoring.submit_to_opentimestamps, test_hash) if ots_proof: return HTMLResponse(f'''
Success! OpenTimestamps is working.
Test hash: {test_hash[:32]}...
Proof size: {len(ots_proof)} bytes
''') else: return HTMLResponse('''
Failed! Could not reach OpenTimestamps servers.
''') except Exception as e: return HTMLResponse(f'''
Error: {str(e)}
''') # ============ Renderers (L1 servers) ============ @app.get("/renderers", response_class=HTMLResponse) async def renderers_page(request: Request): """Page to manage L1 renderer attachments.""" username = get_user_from_cookie(request) if not username: content = '''

Renderers

Log in to manage your renderer connections.

''' return HTMLResponse(base_html("Renderers", content)) # Get user's attached renderers attached = await db.get_user_renderers(username) from urllib.parse import quote # Build renderer list rows = [] for l1_url in L1_SERVERS: is_attached = l1_url in attached # Extract display name from URL display_name = l1_url.replace("https://", "").replace("http://", "") if is_attached: status = 'Attached' action = f''' Open ''' else: status = 'Not attached' # Attach via endpoint that creates scoped token (not raw token in URL) attach_url = f"/renderers/attach?l1_url={quote(l1_url, safe='')}" action = f''' Attach ''' row_id = l1_url.replace("://", "-").replace("/", "-").replace(".", "-") rows.append(f'''
{display_name}
{l1_url}
{status} {action}
''') content = f'''

Renderers

Connect to L1 rendering servers. After attaching, you can run effects and manage media on that renderer.

{"".join(rows) if rows else '

No renderers configured.

'}
''' return HTMLResponse(base_html("Renderers", content, username)) @app.get("/renderers/attach") async def attach_renderer_redirect(request: Request, l1_url: str): """Create a scoped token and redirect to L1 for attachment.""" username = get_user_from_cookie(request) if not username: return RedirectResponse(url="/login", status_code=302) # Verify L1 is in our allowed list l1_normalized = l1_url.rstrip("/") if not any(l1_normalized == s.rstrip("/") for s in L1_SERVERS): raise HTTPException(403, f"L1 server not authorized: {l1_url}") # Create a scoped token that only works for this specific L1 scoped_token = create_access_token( username, l2_server=f"https://{DOMAIN}", l1_server=l1_normalized ) # Redirect to L1 with scoped token redirect_url = f"{l1_normalized}/auth?auth_token={scoped_token.access_token}" return RedirectResponse(url=redirect_url, status_code=302) @app.post("/renderers/detach", response_class=HTMLResponse) async def detach_renderer(request: Request): """Detach from an L1 renderer.""" username = get_user_from_cookie(request) if not username: return HTMLResponse('
Not logged in
') form = await request.form() l1_url = form.get("l1_url", "") await db.detach_renderer(username, l1_url) # Return updated row with link to attach endpoint (not raw token) display_name = l1_url.replace("https://", "").replace("http://", "") from urllib.parse import quote attach_url = f"/renderers/attach?l1_url={quote(l1_url, safe='')}" row_id = l1_url.replace("://", "-").replace("/", "-").replace(".", "-") return HTMLResponse(f'''
{display_name}
{l1_url}
Not attached Attach
''') # ============ User Storage ============ import storage_providers @app.get("/storage") async def list_storage(request: Request, user: User = Depends(get_optional_user)): """List user's storage providers. HTML for browsers (default), JSON only if explicitly requested.""" # Check if JSON explicitly requested accept = request.headers.get("accept", "") wants_json = "application/json" in accept and "text/html" not in accept # For browser sessions, also check cookie authentication username = user.username if user else get_user_from_cookie(request) if not username: if wants_json: raise HTTPException(401, "Authentication required") return RedirectResponse(url="/login", status_code=302) storages = await db.get_user_storage(username) # Add usage stats to each storage for storage in storages: usage = await db.get_storage_usage(storage["id"]) storage["used_bytes"] = usage["used_bytes"] storage["pin_count"] = usage["pin_count"] storage["donated_gb"] = storage["capacity_gb"] // 2 # Mask sensitive config keys for display if storage.get("config"): config = storage["config"] if isinstance(storage["config"], dict) else json.loads(storage["config"]) masked = {} for k, v in config.items(): if "key" in k.lower() or "token" in k.lower() or "secret" in k.lower(): masked[k] = v[:4] + "..." + v[-4:] if len(str(v)) > 8 else "****" else: masked[k] = v storage["config_display"] = masked if wants_json: return {"storages": storages} # Default to HTML for browsers return await ui_storage_page(username, storages, request) @app.post("/storage") async def add_storage(req: AddStorageRequest, user: User = Depends(get_required_user)): """Add a storage provider.""" # Validate provider type valid_types = ["pinata", "web3storage", "nftstorage", "infura", "filebase", "storj", "local"] if req.provider_type not in valid_types: raise HTTPException(400, f"Invalid provider type: {req.provider_type}") # Test the provider connection before saving provider = storage_providers.create_provider(req.provider_type, { **req.config, "capacity_gb": req.capacity_gb }) if not provider: raise HTTPException(400, "Failed to create provider with given config") success, message = await provider.test_connection() if not success: raise HTTPException(400, f"Provider connection failed: {message}") # Save to database provider_name = req.provider_name or f"{req.provider_type}-{user.username}" storage_id = await db.add_user_storage( username=user.username, provider_type=req.provider_type, provider_name=provider_name, config=req.config, capacity_gb=req.capacity_gb ) if not storage_id: raise HTTPException(500, "Failed to save storage provider") return {"id": storage_id, "message": f"Storage provider added: {provider_name}"} @app.post("/storage/add") async def add_storage_form( request: Request, provider_type: str = Form(...), provider_name: Optional[str] = Form(None), description: Optional[str] = Form(None), capacity_gb: int = Form(5), api_key: Optional[str] = Form(None), secret_key: Optional[str] = Form(None), api_token: Optional[str] = Form(None), project_id: Optional[str] = Form(None), project_secret: Optional[str] = Form(None), access_key: Optional[str] = Form(None), bucket: Optional[str] = Form(None), path: Optional[str] = Form(None), ): """Add a storage provider via HTML form (cookie auth).""" username = get_user_from_cookie(request) if not username: return HTMLResponse('
Not authenticated
', status_code=401) # Validate provider type valid_types = ["pinata", "web3storage", "nftstorage", "infura", "filebase", "storj", "local"] if provider_type not in valid_types: return HTMLResponse(f'
Invalid provider type: {provider_type}
') # Build config based on provider type config = {} if provider_type == "pinata": if not api_key or not secret_key: return HTMLResponse('
Pinata requires API Key and Secret Key
') config = {"api_key": api_key, "secret_key": secret_key} elif provider_type == "web3storage": if not api_token: return HTMLResponse('
web3.storage requires API Token
') config = {"api_token": api_token} elif provider_type == "nftstorage": if not api_token: return HTMLResponse('
NFT.Storage requires API Token
') config = {"api_token": api_token} elif provider_type == "infura": if not project_id or not project_secret: return HTMLResponse('
Infura requires Project ID and Project Secret
') config = {"project_id": project_id, "project_secret": project_secret} elif provider_type == "filebase": if not access_key or not secret_key or not bucket: return HTMLResponse('
Filebase requires Access Key, Secret Key, and Bucket
') config = {"access_key": access_key, "secret_key": secret_key, "bucket": bucket} elif provider_type == "storj": if not access_key or not secret_key or not bucket: return HTMLResponse('
Storj requires Access Key, Secret Key, and Bucket
') config = {"access_key": access_key, "secret_key": secret_key, "bucket": bucket} elif provider_type == "local": if not path: return HTMLResponse('
Local storage requires a path
') config = {"path": path} # Test the provider connection before saving provider = storage_providers.create_provider(provider_type, { **config, "capacity_gb": capacity_gb }) if not provider: return HTMLResponse('
Failed to create provider with given config
') success, message = await provider.test_connection() if not success: return HTMLResponse(f'
Provider connection failed: {message}
') # Save to database name = provider_name or f"{provider_type}-{username}-{len(await db.get_user_storage_by_type(username, provider_type)) + 1}" storage_id = await db.add_user_storage( username=username, provider_type=provider_type, provider_name=name, config=config, capacity_gb=capacity_gb, description=description ) if not storage_id: return HTMLResponse('
Failed to save storage provider
') return HTMLResponse(f'''
Storage provider "{name}" added successfully!
''') @app.get("/storage/{storage_id}") async def get_storage(storage_id: int, user: User = Depends(get_required_user)): """Get a specific storage provider.""" storage = await db.get_storage_by_id(storage_id) if not storage: raise HTTPException(404, "Storage provider not found") if storage["username"] != user.username: raise HTTPException(403, "Not authorized") usage = await db.get_storage_usage(storage_id) storage["used_bytes"] = usage["used_bytes"] storage["pin_count"] = usage["pin_count"] storage["donated_gb"] = storage["capacity_gb"] // 2 return storage @app.patch("/storage/{storage_id}") async def update_storage(storage_id: int, req: UpdateStorageRequest, user: User = Depends(get_required_user)): """Update a storage provider.""" storage = await db.get_storage_by_id(storage_id) if not storage: raise HTTPException(404, "Storage provider not found") if storage["username"] != user.username: raise HTTPException(403, "Not authorized") # If updating config, test the new connection if req.config: existing_config = storage["config"] if isinstance(storage["config"], dict) else json.loads(storage["config"]) new_config = {**existing_config, **req.config} provider = storage_providers.create_provider(storage["provider_type"], { **new_config, "capacity_gb": req.capacity_gb or storage["capacity_gb"] }) if provider: success, message = await provider.test_connection() if not success: raise HTTPException(400, f"Provider connection failed: {message}") success = await db.update_user_storage( storage_id, config=req.config, capacity_gb=req.capacity_gb, is_active=req.is_active ) if not success: raise HTTPException(500, "Failed to update storage provider") return {"message": "Storage provider updated"} @app.delete("/storage/{storage_id}") async def remove_storage(storage_id: int, request: Request, user: User = Depends(get_optional_user)): """Remove a storage provider.""" # Support both Bearer token and cookie auth username = user.username if user else get_user_from_cookie(request) if not username: raise HTTPException(401, "Not authenticated") storage = await db.get_storage_by_id(storage_id) if not storage: raise HTTPException(404, "Storage provider not found") if storage["username"] != username: raise HTTPException(403, "Not authorized") success = await db.remove_user_storage(storage_id) if not success: raise HTTPException(500, "Failed to remove storage provider") # Return empty string for HTMX to remove the element if wants_html(request): return HTMLResponse("") return {"message": "Storage provider removed"} @app.post("/storage/{storage_id}/test") async def test_storage(storage_id: int, request: Request, user: User = Depends(get_optional_user)): """Test storage provider connectivity.""" # Support both Bearer token and cookie auth username = user.username if user else get_user_from_cookie(request) if not username: if wants_html(request): return HTMLResponse('Not authenticated', status_code=401) raise HTTPException(401, "Not authenticated") storage = await db.get_storage_by_id(storage_id) if not storage: if wants_html(request): return HTMLResponse('Storage not found', status_code=404) raise HTTPException(404, "Storage provider not found") if storage["username"] != username: if wants_html(request): return HTMLResponse('Not authorized', status_code=403) raise HTTPException(403, "Not authorized") config = storage["config"] if isinstance(storage["config"], dict) else json.loads(storage["config"]) provider = storage_providers.create_provider(storage["provider_type"], { **config, "capacity_gb": storage["capacity_gb"] }) if not provider: if wants_html(request): return HTMLResponse('Failed to create provider') raise HTTPException(500, "Failed to create provider") success, message = await provider.test_connection() if wants_html(request): if success: return HTMLResponse(f'{message}') return HTMLResponse(f'{message}') return {"success": success, "message": message} STORAGE_PROVIDERS_INFO = { "pinata": {"name": "Pinata", "desc": "1GB free, IPFS pinning", "color": "blue"}, "web3storage": {"name": "web3.storage", "desc": "IPFS + Filecoin", "color": "green"}, "nftstorage": {"name": "NFT.Storage", "desc": "Free for NFTs", "color": "pink"}, "infura": {"name": "Infura IPFS", "desc": "5GB free", "color": "orange"}, "filebase": {"name": "Filebase", "desc": "5GB free, S3+IPFS", "color": "cyan"}, "storj": {"name": "Storj", "desc": "25GB free", "color": "indigo"}, "local": {"name": "Local Storage", "desc": "Your own disk", "color": "purple"}, } async def ui_storage_page(username: str, storages: list, request: Request) -> HTMLResponse: """Render main storage settings page showing provider types.""" # Count configs per type type_counts = {} for s in storages: pt = s["provider_type"] type_counts[pt] = type_counts.get(pt, 0) + 1 # Build provider type cards cards = "" for ptype, info in STORAGE_PROVIDERS_INFO.items(): count = type_counts.get(ptype, 0) count_badge = f'{count}' if count > 0 else "" cards += f'''
{info["name"]} {count_badge}
{info["desc"]}
''' # Total stats total_capacity = sum(s["capacity_gb"] for s in storages) total_used = sum(s.get("used_bytes", 0) for s in storages) total_pins = sum(s.get("pin_count", 0) for s in storages) content = f'''

Storage Providers

Attach your own storage to help power the network. 50% of your capacity is donated to store shared content, making popular assets more resilient.

{len(storages)}
Total Configs
{total_capacity} GB
Total Capacity
{total_used / (1024**3):.1f} GB
Used
{total_pins}
Total Pins

Select Provider Type

{cards}
''' return HTMLResponse(base_html("Storage", content, username)) @app.get("/storage/type/{provider_type}") async def storage_type_page(provider_type: str, request: Request, user: User = Depends(get_optional_user)): """Page for managing storage configs of a specific type.""" username = user.username if user else get_user_from_cookie(request) if not username: return RedirectResponse(url="/login", status_code=302) if provider_type not in STORAGE_PROVIDERS_INFO: raise HTTPException(404, "Invalid provider type") storages = await db.get_user_storage_by_type(username, provider_type) # Add usage stats for storage in storages: usage = await db.get_storage_usage(storage["id"]) storage["used_bytes"] = usage["used_bytes"] storage["pin_count"] = usage["pin_count"] # Mask sensitive config keys if storage.get("config"): config = storage["config"] if isinstance(storage["config"], dict) else json.loads(storage["config"]) masked = {} for k, v in config.items(): if "key" in k.lower() or "token" in k.lower() or "secret" in k.lower(): masked[k] = v[:4] + "..." + v[-4:] if len(str(v)) > 8 else "****" else: masked[k] = v storage["config_display"] = masked info = STORAGE_PROVIDERS_INFO[provider_type] return await ui_storage_type_page(username, provider_type, info, storages, request) async def ui_storage_type_page(username: str, provider_type: str, info: dict, storages: list, request: Request) -> HTMLResponse: """Render per-type storage management page.""" def format_bytes(b): if b > 1024**3: return f"{b / 1024**3:.1f} GB" if b > 1024**2: return f"{b / 1024**2:.1f} MB" if b > 1024: return f"{b / 1024:.1f} KB" return f"{b} bytes" # Build storage rows storage_rows = "" for s in storages: status_class = "bg-green-600" if s["is_active"] else "bg-gray-600" status_text = "Active" if s["is_active"] else "Inactive" config_display = s.get("config_display", {}) config_html = ", ".join(f"{k}: {v}" for k, v in config_display.items() if k != "path") desc = s.get("description") or "" desc_html = f'
{desc}
' if desc else "" storage_rows += f'''

{s["provider_name"] or provider_type}

{desc_html}
{status_text}
Capacity
{s["capacity_gb"]} GB
Donated
{s["capacity_gb"] // 2} GB
Used
{format_bytes(s["used_bytes"])}
Pins
{s["pin_count"]}
{config_html}
''' if not storages: storage_rows = f'

No {info["name"]} configs yet. Add one below.

' # Build form fields based on provider type form_fields = "" if provider_type == "pinata": form_fields = '''
''' elif provider_type in ("web3storage", "nftstorage"): form_fields = '''
''' elif provider_type == "infura": form_fields = '''
''' elif provider_type in ("filebase", "storj"): form_fields = '''
''' elif provider_type == "local": form_fields = '''
''' content = f'''
← Back

{info["name"]} Storage

Your {info["name"]} Configs

{storage_rows}

Add New {info["name"]} Config

{form_fields}
''' return HTMLResponse(base_html(f"{info['name']} Storage", content, username)) # ============ Client Download ============ CLIENT_TARBALL = Path(__file__).parent / "artdag-client.tar.gz" @app.get("/download/client") async def download_client(): """Download the Art DAG CLI client.""" if not CLIENT_TARBALL.exists(): raise HTTPException(404, "Client package not found") return FileResponse( CLIENT_TARBALL, media_type="application/gzip", filename="artdag-client.tar.gz" ) # ============================================================================ # Documentation Routes # ============================================================================ # Documentation paths L2_DOCS_DIR = Path(__file__).parent COMMON_DOCS_DIR = Path(__file__).parent.parent / "common" L2_DOCS_MAP = { "l2": L2_DOCS_DIR / "README.md", "common": COMMON_DOCS_DIR / "README.md", } def render_markdown(content: str) -> str: """Convert markdown to HTML with basic styling.""" import re # Escape HTML first content = content.replace("&", "&").replace("<", "<").replace(">", ">") # Code blocks (``` ... ```) def code_block_replace(match): lang = match.group(1) or "" code = match.group(2) return f'
{code}
' content = re.sub(r'```(\w*)\n(.*?)```', code_block_replace, content, flags=re.DOTALL) # Inline code content = re.sub(r'`([^`]+)`', r'\1', content) # Headers content = re.sub(r'^### (.+)$', r'

\1

', content, flags=re.MULTILINE) content = re.sub(r'^## (.+)$', r'

\1

', content, flags=re.MULTILINE) content = re.sub(r'^# (.+)$', r'

\1

', content, flags=re.MULTILINE) # Bold and italic content = re.sub(r'\*\*([^*]+)\*\*', r'\1', content) content = re.sub(r'\*([^*]+)\*', r'\1', content) # Links content = re.sub(r'\[([^\]]+)\]\(([^)]+)\)', r'\1', content) # Tables def table_replace(match): lines = match.group(0).strip().split('\n') if len(lines) < 2: return match.group(0) header = lines[0] rows = lines[2:] if len(lines) > 2 else [] header_cells = [cell.strip() for cell in header.split('|')[1:-1]] header_html = ''.join(f'{cell}' for cell in header_cells) rows_html = '' for row in rows: cells = [cell.strip() for cell in row.split('|')[1:-1]] cells_html = ''.join(f'{cell}' for cell in cells) rows_html += f'{cells_html}' return f'{header_html}{rows_html}
' content = re.sub(r'(\|[^\n]+\|\n)+', table_replace, content) # Bullet points content = re.sub(r'^- (.+)$', r'
  • \1
  • ', content, flags=re.MULTILINE) content = re.sub(r'(]*>.*\n?)+', r'', content) # Paragraphs (lines not starting with < or whitespace) lines = content.split('\n') result = [] in_paragraph = False for line in lines: stripped = line.strip() if not stripped: if in_paragraph: result.append('

    ') in_paragraph = False result.append('') elif stripped.startswith('<'): if in_paragraph: result.append('

    ') in_paragraph = False result.append(line) else: if not in_paragraph: result.append('

    ') in_paragraph = True result.append(line) if in_paragraph: result.append('

    ') content = '\n'.join(result) return content @app.get("/docs", response_class=HTMLResponse) async def docs_index(request: Request): """Documentation index page.""" user = await get_optional_user(request) html = f""" Documentation - Art DAG L2

    Documentation

    """ return HTMLResponse(html) @app.get("/docs/{doc_name}", response_class=HTMLResponse) async def docs_page(doc_name: str, request: Request): """Render a markdown documentation file as HTML.""" if doc_name not in L2_DOCS_MAP: raise HTTPException(404, f"Documentation '{doc_name}' not found") doc_path = L2_DOCS_MAP[doc_name] if not doc_path.exists(): raise HTTPException(404, f"Documentation file not found: {doc_path}") content = doc_path.read_text() html_content = render_markdown(content) html = f""" {doc_name.upper()} - Art DAG Documentation
    {html_content}
    """ return HTMLResponse(html) if __name__ == "__main__": import uvicorn uvicorn.run("server:app", host="0.0.0.0", port=8200, workers=4)