Files
activity-pub/server.py
gilesb e5d1c93034 Remove cross-subdomain cookie sharing, use lax SameSite
L1 servers now verify tokens by calling L2's /auth/verify endpoint,
so L2 no longer needs to share cookies across subdomains. Each L1
sets its own first-party cookie via its /auth endpoint.

Co-Authored-By: Claude Opus 4.5 <noreply@anthropic.com>
2026-01-09 17:40:14 +00:00

2955 lines
121 KiB
Python

#!/usr/bin/env python3
"""
Art DAG L2 Server - ActivityPub
Manages ownership registry, activities, and federation.
- Registry of owned assets
- ActivityPub actor endpoints
- Sign and publish Create activities
- Federation with other servers
"""
import hashlib
import json
import logging
import os
import uuid
from contextlib import asynccontextmanager
from datetime import datetime, timezone
from pathlib import Path
from typing import Optional
from urllib.parse import urlparse
# Configure logging
logging.basicConfig(
level=logging.INFO,
format='%(asctime)s %(levelname)s %(name)s: %(message)s'
)
logger = logging.getLogger(__name__)
from fastapi import FastAPI, HTTPException, Request, Response, Depends, Cookie
from fastapi.responses import JSONResponse, HTMLResponse, RedirectResponse, FileResponse
from fastapi.security import HTTPBearer, HTTPAuthorizationCredentials
from pydantic import BaseModel
import requests
import markdown
import db
from auth import (
UserCreate, UserLogin, Token, User,
create_user, authenticate_user, create_access_token,
verify_token, get_current_user
)
# Configuration
DOMAIN = os.environ.get("ARTDAG_DOMAIN", "artdag.rose-ash.com")
DATA_DIR = Path(os.environ.get("ARTDAG_DATA", str(Path.home() / ".artdag" / "l2")))
L1_PUBLIC_URL = os.environ.get("L1_PUBLIC_URL", "https://celery-artdag.rose-ash.com")
EFFECTS_REPO_URL = os.environ.get("EFFECTS_REPO_URL", "https://git.rose-ash.com/art-dag/effects")
IPFS_GATEWAY_URL = os.environ.get("IPFS_GATEWAY_URL", "")
# Known L1 renderers (comma-separated URLs)
L1_SERVERS_STR = os.environ.get("L1_SERVERS", "https://celery-artdag.rose-ash.com")
L1_SERVERS = [s.strip() for s in L1_SERVERS_STR.split(",") if s.strip()]
# Cookie domain for sharing auth across subdomains (e.g., ".rose-ash.com")
# If not set, derives from DOMAIN (strips first subdomain, adds leading dot)
def _get_cookie_domain():
env_val = os.environ.get("COOKIE_DOMAIN")
if env_val:
return env_val
# Derive from DOMAIN: artdag.rose-ash.com -> .rose-ash.com
parts = DOMAIN.split(".")
if len(parts) >= 2:
return "." + ".".join(parts[-2:])
return None
COOKIE_DOMAIN = _get_cookie_domain()
# Ensure data directory exists
DATA_DIR.mkdir(parents=True, exist_ok=True)
(DATA_DIR / "assets").mkdir(exist_ok=True)
def compute_run_id(input_hashes: list[str], recipe: str, recipe_hash: str = None) -> str:
"""
Compute a deterministic run_id from inputs and recipe.
The run_id is a SHA3-256 hash of:
- Sorted input content hashes
- Recipe identifier (recipe_hash if provided, else "effect:{recipe}")
This makes runs content-addressable: same inputs + recipe = same run_id.
Must match the L1 implementation exactly.
"""
data = {
"inputs": sorted(input_hashes),
"recipe": recipe_hash or f"effect:{recipe}",
"version": "1", # For future schema changes
}
json_str = json.dumps(data, sort_keys=True, separators=(",", ":"))
return hashlib.sha3_256(json_str.encode()).hexdigest()
# Load README
README_PATH = Path(__file__).parent / "README.md"
README_CONTENT = ""
if README_PATH.exists():
README_CONTENT = README_PATH.read_text()
@asynccontextmanager
async def lifespan(app: FastAPI):
"""Manage database connection pool lifecycle."""
await db.init_pool()
yield
await db.close_pool()
app = FastAPI(
title="Art DAG L2 Server",
description="ActivityPub server for Art DAG ownership and federation",
version="0.1.0",
lifespan=lifespan
)
@app.exception_handler(404)
async def not_found_handler(request: Request, exc):
"""Custom 404 page."""
accept = request.headers.get("accept", "")
if "text/html" in accept and "application/json" not in accept:
content = '''
<div class="text-center py-16">
<h2 class="text-6xl font-bold text-gray-600 mb-4">404</h2>
<p class="text-xl text-gray-400 mb-8">Page not found</p>
<a href="/" class="text-blue-400 hover:text-blue-300">Go to home page</a>
</div>
'''
username = get_user_from_cookie(request)
return HTMLResponse(base_html("Not Found", content, username), status_code=404)
return JSONResponse({"detail": "Not found"}, status_code=404)
# ============ Data Models ============
class Asset(BaseModel):
"""An owned asset."""
name: str
content_hash: str
ipfs_cid: Optional[str] = None # IPFS content identifier
asset_type: str # image, video, effect, recipe, infrastructure
tags: list[str] = []
metadata: dict = {}
url: Optional[str] = None
provenance: Optional[dict] = None
created_at: str = ""
class Activity(BaseModel):
"""An ActivityPub activity."""
activity_id: str
activity_type: str # Create, Update, Delete, Announce
actor_id: str
object_data: dict
published: str
signature: Optional[dict] = None
class RegisterRequest(BaseModel):
"""Request to register an asset."""
name: str
content_hash: str
ipfs_cid: Optional[str] = None # IPFS content identifier
asset_type: str
tags: list[str] = []
metadata: dict = {}
url: Optional[str] = None
provenance: Optional[dict] = None
class RecordRunRequest(BaseModel):
"""Request to record an L1 run."""
run_id: str
l1_server: str # URL of the L1 server that has this run
output_name: Optional[str] = None # Deprecated - assets now named by content_hash
class PublishCacheRequest(BaseModel):
"""Request to publish a cache item from L1."""
content_hash: str
ipfs_cid: Optional[str] = None # IPFS content identifier
asset_name: str
asset_type: str = "image"
origin: dict # {type: "self"|"external", url?: str, note?: str}
description: Optional[str] = None
tags: list[str] = []
metadata: dict = {}
class UpdateAssetRequest(BaseModel):
"""Request to update an existing asset."""
description: Optional[str] = None
tags: Optional[list[str]] = None
metadata: Optional[dict] = None
origin: Optional[dict] = None
ipfs_cid: Optional[str] = None # IPFS content identifier
# ============ Storage (Database) ============
async def load_registry() -> dict:
"""Load registry from database."""
assets = await db.get_all_assets()
return {"version": "1.0", "assets": assets}
async def load_activities() -> list:
"""Load activities from database."""
return await db.get_all_activities()
def load_actor(username: str) -> dict:
"""Load actor data for a specific user with public key if available."""
actor = {
"id": f"https://{DOMAIN}/users/{username}",
"type": "Person",
"preferredUsername": username,
"name": username,
"inbox": f"https://{DOMAIN}/users/{username}/inbox",
"outbox": f"https://{DOMAIN}/users/{username}/outbox",
"followers": f"https://{DOMAIN}/users/{username}/followers",
"following": f"https://{DOMAIN}/users/{username}/following",
}
# Add public key if available
from keys import has_keys, load_public_key_pem
if has_keys(DATA_DIR, username):
actor["publicKey"] = {
"id": f"https://{DOMAIN}/users/{username}#main-key",
"owner": f"https://{DOMAIN}/users/{username}",
"publicKeyPem": load_public_key_pem(DATA_DIR, username)
}
return actor
async def user_exists(username: str) -> bool:
"""Check if a user exists."""
return await db.user_exists(username)
async def load_followers() -> list:
"""Load followers list from database."""
return await db.get_all_followers()
# ============ Signing ============
from keys import has_keys, load_public_key_pem, create_signature
def sign_activity(activity: dict, username: str) -> dict:
"""Sign an activity with the user's RSA private key."""
if not has_keys(DATA_DIR, username):
# No keys - use placeholder (for testing)
activity["signature"] = {
"type": "RsaSignature2017",
"creator": f"https://{DOMAIN}/users/{username}#main-key",
"created": datetime.now(timezone.utc).isoformat(),
"signatureValue": "NO_KEYS_CONFIGURED"
}
else:
activity["signature"] = create_signature(DATA_DIR, username, DOMAIN, activity)
return activity
# ============ HTML Templates ============
# Tailwind CSS config for L2 - dark theme to match L1
TAILWIND_CONFIG = '''
<script src="https://cdn.tailwindcss.com?plugins=typography"></script>
<script>
tailwind.config = {
darkMode: 'class',
theme: {
extend: {
colors: {
dark: { 900: '#0a0a0a', 800: '#111', 700: '#1a1a1a', 600: '#222', 500: '#333' }
}
}
}
}
</script>
<script src="https://unpkg.com/htmx.org@1.9.10"></script>
'''
def base_html(title: str, content: str, username: str = None) -> str:
"""Base HTML template with Tailwind CSS dark theme."""
user_section = f'''
<div class="flex items-center gap-4 text-sm text-gray-400">
Logged in as <strong class="text-white">{username}</strong>
<a href="/logout" class="px-3 py-1 bg-dark-500 hover:bg-dark-600 rounded text-blue-400 hover:text-blue-300 transition-colors">
Logout
</a>
</div>
''' if username else '''
<div class="flex items-center gap-4 text-sm">
<a href="/login" class="text-blue-400 hover:text-blue-300">Login</a>
<span class="text-gray-500">|</span>
<a href="/register" class="text-blue-400 hover:text-blue-300">Register</a>
</div>
'''
return f'''<!DOCTYPE html>
<html lang="en" class="dark">
<head>
<meta charset="UTF-8">
<meta name="viewport" content="width=device-width, initial-scale=1.0">
<title>{title} - Art DAG L2</title>
{TAILWIND_CONFIG}
</head>
<body class="bg-dark-900 text-gray-100 min-h-screen">
<div class="max-w-5xl mx-auto px-4 py-6 sm:px-6 lg:px-8">
<header class="flex flex-wrap items-center justify-between gap-4 mb-6 pb-4 border-b border-dark-500">
<h1 class="text-2xl font-bold">
<a href="/" class="text-white hover:text-gray-200">Art DAG L2</a>
</h1>
{user_section}
</header>
<nav class="flex flex-wrap gap-6 mb-6 pb-4 border-b border-dark-500">
<a href="/assets" class="text-gray-400 hover:text-white transition-colors">Assets</a>
<a href="/activities" class="text-gray-400 hover:text-white transition-colors">Activities</a>
<a href="/users" class="text-gray-400 hover:text-white transition-colors">Users</a>
<a href="/anchors/ui" class="text-gray-400 hover:text-white transition-colors">Anchors</a>
<a href="/renderers" class="text-gray-400 hover:text-white transition-colors">Renderers</a>
<a href="/download/client" class="text-gray-400 hover:text-white transition-colors ml-auto" title="Download CLI client">Download Client</a>
</nav>
<main class="bg-dark-700 rounded-lg p-6">
{content}
</main>
</div>
</body>
</html>'''
def get_user_from_cookie(request: Request) -> Optional[str]:
"""Get username from auth cookie."""
token = request.cookies.get("auth_token")
if token:
return verify_token(token)
return None
def wants_html(request: Request) -> bool:
"""Check if request wants HTML (browser) vs JSON (API)."""
accept = request.headers.get("accept", "")
return "text/html" in accept and "application/json" not in accept and "application/activity+json" not in accept
def format_date(value, length: int = 10) -> str:
"""Format a date value (datetime or string) to a string, sliced to length."""
if value is None:
return ""
if hasattr(value, 'isoformat'):
return value.isoformat()[:length]
if isinstance(value, str):
return value[:length]
return ""
# ============ Auth UI Endpoints ============
@app.get("/login", response_class=HTMLResponse)
async def ui_login_page(request: Request, return_to: str = None):
"""Login page. Accepts optional return_to URL for redirect after login."""
username = get_user_from_cookie(request)
if username:
return HTMLResponse(base_html("Already Logged In", f'''
<div class="bg-green-900/50 border border-green-700 text-green-300 px-4 py-3 rounded-lg mb-4">
You are already logged in as <strong>{username}</strong>
</div>
<p><a href="/" class="text-blue-400 hover:text-blue-300">Go to home page</a></p>
''', username))
# Hidden field for return_to URL
return_to_field = f'<input type="hidden" name="return_to" value="{return_to}">' if return_to else ''
content = f'''
<h2 class="text-xl font-semibold text-white mb-6">Login</h2>
<div id="login-result"></div>
<form hx-post="/login" hx-target="#login-result" hx-swap="innerHTML" class="space-y-4 max-w-md">
{return_to_field}
<div>
<label for="username" class="block text-sm font-medium text-gray-300 mb-2">Username</label>
<input type="text" id="username" name="username" required
class="w-full px-4 py-3 bg-dark-600 border border-dark-500 rounded-lg text-white placeholder-gray-500 focus:border-blue-500 focus:outline-none focus:ring-1 focus:ring-blue-500">
</div>
<div>
<label for="password" class="block text-sm font-medium text-gray-300 mb-2">Password</label>
<input type="password" id="password" name="password" required
class="w-full px-4 py-3 bg-dark-600 border border-dark-500 rounded-lg text-white placeholder-gray-500 focus:border-blue-500 focus:outline-none focus:ring-1 focus:ring-blue-500">
</div>
<button type="submit" class="w-full px-4 py-3 bg-blue-600 hover:bg-blue-700 text-white font-medium rounded-lg transition-colors">
Login
</button>
</form>
<p class="mt-6 text-gray-400">Don't have an account? <a href="/register" class="text-blue-400 hover:text-blue-300">Register</a></p>
'''
return HTMLResponse(base_html("Login", content))
@app.post("/login", response_class=HTMLResponse)
async def ui_login_submit(request: Request):
"""Handle login form submission."""
form = await request.form()
username = form.get("username", "").strip()
password = form.get("password", "")
return_to = form.get("return_to", "").strip()
if not username or not password:
return HTMLResponse('<div class="bg-red-900/50 border border-red-700 text-red-300 px-4 py-3 rounded-lg mb-4">Username and password are required</div>')
user = await authenticate_user(DATA_DIR, username, password)
if not user:
return HTMLResponse('<div class="bg-red-900/50 border border-red-700 text-red-300 px-4 py-3 rounded-lg mb-4">Invalid username or password</div>')
token = create_access_token(user.username, l2_server=f"https://{DOMAIN}")
# If return_to is specified, redirect there with token for the other site to set its own cookie
if return_to and return_to.startswith("http"):
# Append token to return_to URL for the target site to set its own cookie
separator = "&" if "?" in return_to else "?"
redirect_url = f"{return_to}{separator}auth_token={token.access_token}"
response = HTMLResponse(f'''
<div class="bg-green-900/50 border border-green-700 text-green-300 px-4 py-3 rounded-lg mb-4">Login successful! Redirecting...</div>
<script>window.location.href = "{redirect_url}";</script>
''')
else:
response = HTMLResponse(f'''
<div class="bg-green-900/50 border border-green-700 text-green-300 px-4 py-3 rounded-lg mb-4">Login successful! Redirecting...</div>
<script>window.location.href = "/";</script>
''')
# Set cookie for L2 only (L1 servers set their own cookies via /auth endpoint)
response.set_cookie(
key="auth_token",
value=token.access_token,
httponly=True,
max_age=60 * 60 * 24 * 30, # 30 days
samesite="lax",
secure=True
)
return response
@app.get("/register", response_class=HTMLResponse)
async def ui_register_page(request: Request):
"""Register page."""
username = get_user_from_cookie(request)
if username:
return HTMLResponse(base_html("Already Logged In", f'''
<div class="bg-green-900/50 border border-green-700 text-green-300 px-4 py-3 rounded-lg mb-4">
You are already logged in as <strong>{username}</strong>
</div>
<p><a href="/" class="text-blue-400 hover:text-blue-300">Go to home page</a></p>
''', username))
content = '''
<h2 class="text-xl font-semibold text-white mb-6">Register</h2>
<div id="register-result"></div>
<form hx-post="/register" hx-target="#register-result" hx-swap="innerHTML" class="space-y-4 max-w-md">
<div>
<label for="username" class="block text-sm font-medium text-gray-300 mb-2">Username</label>
<input type="text" id="username" name="username" required pattern="[a-zA-Z0-9_-]+" title="Letters, numbers, underscore, dash only"
class="w-full px-4 py-3 bg-dark-600 border border-dark-500 rounded-lg text-white placeholder-gray-500 focus:border-blue-500 focus:outline-none focus:ring-1 focus:ring-blue-500">
</div>
<div>
<label for="email" class="block text-sm font-medium text-gray-300 mb-2">Email (optional)</label>
<input type="email" id="email" name="email"
class="w-full px-4 py-3 bg-dark-600 border border-dark-500 rounded-lg text-white placeholder-gray-500 focus:border-blue-500 focus:outline-none focus:ring-1 focus:ring-blue-500">
</div>
<div>
<label for="password" class="block text-sm font-medium text-gray-300 mb-2">Password</label>
<input type="password" id="password" name="password" required minlength="6"
class="w-full px-4 py-3 bg-dark-600 border border-dark-500 rounded-lg text-white placeholder-gray-500 focus:border-blue-500 focus:outline-none focus:ring-1 focus:ring-blue-500">
</div>
<div>
<label for="password2" class="block text-sm font-medium text-gray-300 mb-2">Confirm Password</label>
<input type="password" id="password2" name="password2" required minlength="6"
class="w-full px-4 py-3 bg-dark-600 border border-dark-500 rounded-lg text-white placeholder-gray-500 focus:border-blue-500 focus:outline-none focus:ring-1 focus:ring-blue-500">
</div>
<button type="submit" class="w-full px-4 py-3 bg-blue-600 hover:bg-blue-700 text-white font-medium rounded-lg transition-colors">
Register
</button>
</form>
<p class="mt-6 text-gray-400">Already have an account? <a href="/login" class="text-blue-400 hover:text-blue-300">Login</a></p>
'''
return HTMLResponse(base_html("Register", content))
@app.post("/register", response_class=HTMLResponse)
async def ui_register_submit(request: Request):
"""Handle register form submission."""
form = await request.form()
username = form.get("username", "").strip()
email = form.get("email", "").strip() or None
password = form.get("password", "")
password2 = form.get("password2", "")
if not username or not password:
return HTMLResponse('<div class="bg-red-900/50 border border-red-700 text-red-300 px-4 py-3 rounded-lg mb-4">Username and password are required</div>')
if password != password2:
return HTMLResponse('<div class="bg-red-900/50 border border-red-700 text-red-300 px-4 py-3 rounded-lg mb-4">Passwords do not match</div>')
if len(password) < 6:
return HTMLResponse('<div class="bg-red-900/50 border border-red-700 text-red-300 px-4 py-3 rounded-lg mb-4">Password must be at least 6 characters</div>')
try:
user = await create_user(DATA_DIR, username, password, email)
except ValueError as e:
return HTMLResponse(f'<div class="bg-red-900/50 border border-red-700 text-red-300 px-4 py-3 rounded-lg mb-4">{str(e)}</div>')
token = create_access_token(user.username, l2_server=f"https://{DOMAIN}")
response = HTMLResponse(f'''
<div class="bg-green-900/50 border border-green-700 text-green-300 px-4 py-3 rounded-lg mb-4">Registration successful! Redirecting...</div>
<script>window.location.href = "/";</script>
''')
response.set_cookie(
key="auth_token",
value=token.access_token,
httponly=True,
max_age=60 * 60 * 24 * 30, # 30 days
samesite="lax",
secure=True
)
return response
@app.get("/logout")
async def logout():
"""Handle logout - clear cookie and redirect to home."""
response = RedirectResponse(url="/", status_code=302)
# Delete both legacy (no domain) and new (shared domain) cookies
response.delete_cookie("auth_token")
if COOKIE_DOMAIN:
response.delete_cookie("auth_token", domain=COOKIE_DOMAIN)
return response
# ============ HTML Rendering Helpers ============
async def ui_activity_detail(activity_index: int, request: Request):
"""Activity detail page with full content display. Helper function for HTML rendering."""
username = get_user_from_cookie(request)
activities = await load_activities()
if activity_index < 0 or activity_index >= len(activities):
content = '''
<h2 class="text-xl font-semibold text-white mb-4">Activity Not Found</h2>
<p class="text-gray-400">This activity does not exist.</p>
<p class="mt-4"><a href="/activities" class="text-blue-400 hover:text-blue-300">&larr; Back to Activities</a></p>
'''
return HTMLResponse(base_html("Activity Not Found", content, username))
activity = activities[activity_index]
activity_type = activity.get("activity_type", "")
activity_id = activity.get("activity_id", "")
actor_id = activity.get("actor_id", "")
actor_name = actor_id.split("/")[-1] if actor_id else "unknown"
published = format_date(activity.get("published"))
obj = activity.get("object_data", {})
# Object details
obj_name = obj.get("name", "Untitled")
obj_type = obj.get("type", "")
content_hash_obj = obj.get("contentHash", {})
content_hash = content_hash_obj.get("value", "") if isinstance(content_hash_obj, dict) else ""
media_type = obj.get("mediaType", "")
description = obj.get("summary", "") or obj.get("content", "")
# Provenance from object - or fallback to registry asset
provenance = obj.get("provenance", {})
origin = obj.get("origin", {})
# Fallback: if activity doesn't have provenance, look up the asset from registry
if not provenance or not origin:
registry = await load_registry()
assets = registry.get("assets", {})
# Find asset by content_hash or name
for asset_name, asset_data in assets.items():
if asset_data.get("content_hash") == content_hash or asset_data.get("name") == obj_name:
if not provenance:
provenance = asset_data.get("provenance", {})
if not origin:
origin = asset_data.get("origin", {})
break
# Type colors
type_color = "bg-green-600" if activity_type == "Create" else "bg-yellow-600" if activity_type == "Update" else "bg-gray-600"
obj_type_color = "bg-blue-600" if "Image" in obj_type else "bg-purple-600" if "Video" in obj_type else "bg-gray-600"
# Determine L1 server and asset type
l1_server = provenance.get("l1_server", L1_PUBLIC_URL).rstrip("/") if provenance else L1_PUBLIC_URL.rstrip("/")
is_video = "Video" in obj_type or "video" in media_type
# Content display
if is_video:
content_html = f'''
<div class="bg-dark-600 rounded-lg p-4 mb-6">
<video src="{l1_server}/cache/{content_hash}/mp4" controls autoplay muted loop playsinline
class="max-w-full max-h-96 mx-auto rounded-lg"></video>
<div class="mt-3 text-center">
<a href="{l1_server}/cache/{content_hash}/raw" download
class="inline-block px-4 py-2 bg-green-600 hover:bg-green-700 text-white font-medium rounded-lg transition-colors">
Download Original
</a>
</div>
</div>
'''
elif "Image" in obj_type or "image" in media_type:
content_html = f'''
<div class="bg-dark-600 rounded-lg p-4 mb-6">
<img src="{l1_server}/cache/{content_hash}/raw" alt="{obj_name}"
class="max-w-full max-h-96 mx-auto rounded-lg">
<div class="mt-3 text-center">
<a href="{l1_server}/cache/{content_hash}/raw" download
class="inline-block px-4 py-2 bg-green-600 hover:bg-green-700 text-white font-medium rounded-lg transition-colors">
Download
</a>
</div>
</div>
'''
else:
content_html = f'''
<div class="bg-dark-600 rounded-lg p-4 mb-6 text-center">
<p class="text-gray-400 mb-3">Content type: {media_type or obj_type}</p>
<a href="{l1_server}/cache/{content_hash}/raw" download
class="inline-block px-4 py-2 bg-green-600 hover:bg-green-700 text-white font-medium rounded-lg transition-colors">
Download
</a>
</div>
'''
# Origin display
origin_html = '<span class="text-gray-500">Not specified</span>'
if origin:
origin_type = origin.get("type", "")
if origin_type == "self":
origin_html = '<span class="text-green-400">Original content by author</span>'
elif origin_type == "external":
origin_url = origin.get("url", "")
origin_note = origin.get("note", "")
origin_html = f'<a href="{origin_url}" target="_blank" class="text-blue-400 hover:text-blue-300 break-all">{origin_url}</a>'
if origin_note:
origin_html += f'<p class="text-gray-500 text-sm mt-1">{origin_note}</p>'
# Provenance section
provenance_html = ""
if provenance and provenance.get("recipe"):
recipe = provenance.get("recipe", "")
inputs = provenance.get("inputs", [])
l1_run_id = provenance.get("l1_run_id", "")
rendered_at = format_date(provenance.get("rendered_at"))
effects_commit = provenance.get("effects_commit", "")
effect_url = provenance.get("effect_url")
infrastructure = provenance.get("infrastructure", {})
if not effect_url:
if effects_commit and effects_commit != "unknown":
effect_url = f"{EFFECTS_REPO_URL}/src/commit/{effects_commit}/{recipe}"
else:
effect_url = f"{EFFECTS_REPO_URL}/src/branch/main/{recipe}"
# Build inputs display - show actual content as thumbnails
inputs_html = ""
for inp in inputs:
inp_hash = inp.get("content_hash", "") if isinstance(inp, dict) else inp
if inp_hash:
inputs_html += f'''
<div class="bg-dark-500 rounded-lg p-3 mb-3">
<div class="flex justify-center mb-2">
<video src="{l1_server}/cache/{inp_hash}/mp4"
class="max-h-40 rounded hidden" muted loop playsinline
onloadeddata="this.classList.remove('hidden'); this.nextElementSibling.classList.add('hidden'); this.play();">
</video>
<img src="{l1_server}/cache/{inp_hash}/raw" alt="Input"
class="max-h-40 rounded">
</div>
<div class="text-center">
<code class="text-xs text-gray-400">{inp_hash[:16]}...</code>
<a href="{l1_server}/cache/{inp_hash}" target="_blank"
class="text-blue-400 hover:text-blue-300 text-xs ml-2">view</a>
</div>
</div>
'''
# Infrastructure display
infra_html = ""
if infrastructure:
software = infrastructure.get("software", {})
hardware = infrastructure.get("hardware", {})
if software or hardware:
infra_parts = []
if software:
infra_parts.append(f"Software: {software.get('name', 'unknown')}")
if hardware:
infra_parts.append(f"Hardware: {hardware.get('name', 'unknown')}")
infra_html = f'<p class="text-sm text-gray-400 mt-2">{" | ".join(infra_parts)}</p>'
provenance_html = f'''
<div class="border-t border-dark-500 pt-6 mt-6">
<h3 class="text-lg font-semibold text-white mb-4">Provenance</h3>
<p class="text-sm text-gray-400 mb-4">This content was created by applying an effect to input content.</p>
<div class="grid gap-4 sm:grid-cols-2">
<div class="bg-dark-600 rounded-lg p-4">
<h4 class="text-sm font-medium text-gray-400 mb-2">Effect</h4>
<a href="{effect_url}" target="_blank"
class="inline-flex items-center gap-2 px-3 py-2 bg-blue-600 hover:bg-blue-700 text-white rounded-lg transition-colors">
<svg class="w-4 h-4" fill="none" stroke="currentColor" viewBox="0 0 24 24">
<path stroke-linecap="round" stroke-linejoin="round" stroke-width="2" d="M10 20l4-16m4 4l4 4-4 4M6 16l-4-4 4-4"/>
</svg>
{recipe}
</a>
{f'<div class="mt-2 text-xs text-gray-500">Commit: {effects_commit[:12]}...</div>' if effects_commit else ''}
</div>
<div class="bg-dark-600 rounded-lg p-4">
<h4 class="text-sm font-medium text-gray-400 mb-2">Input(s)</h4>
{inputs_html if inputs_html else '<span class="text-gray-500">No inputs recorded</span>'}
</div>
<div class="bg-dark-600 rounded-lg p-4">
<h4 class="text-sm font-medium text-gray-400 mb-2">L1 Run</h4>
<a href="{l1_server}/ui/detail/{l1_run_id}" target="_blank"
class="text-blue-400 hover:text-blue-300 font-mono text-xs">{l1_run_id[:20]}...</a>
</div>
<div class="bg-dark-600 rounded-lg p-4">
<h4 class="text-sm font-medium text-gray-400 mb-2">Rendered</h4>
<span class="text-white">{rendered_at if rendered_at else 'Unknown'}</span>
{infra_html}
</div>
</div>
</div>
'''
content = f'''
<p class="mb-4"><a href="/activities" class="text-blue-400 hover:text-blue-300">&larr; Back to Activities</a></p>
<div class="flex flex-wrap items-center gap-3 mb-6">
<span class="px-3 py-1 {type_color} text-white text-sm font-medium rounded">{activity_type}</span>
<h2 class="text-2xl font-bold text-white">{obj_name}</h2>
<span class="px-3 py-1 {obj_type_color} text-white text-sm rounded">{obj_type}</span>
</div>
{content_html}
<div class="grid gap-6 md:grid-cols-2">
<div class="space-y-4">
<div class="bg-dark-600 rounded-lg p-4">
<h3 class="text-sm font-medium text-gray-400 mb-2">Actor</h3>
<a href="/users/{actor_name}" class="text-blue-400 hover:text-blue-300 text-lg">{actor_name}</a>
</div>
<div class="bg-dark-600 rounded-lg p-4">
<h3 class="text-sm font-medium text-gray-400 mb-2">Description</h3>
<p class="text-white">{description if description else '<span class="text-gray-500">No description</span>'}</p>
</div>
<div class="bg-dark-600 rounded-lg p-4">
<h3 class="text-sm font-medium text-gray-400 mb-2">Origin</h3>
{origin_html}
</div>
</div>
<div class="space-y-4">
<div class="bg-dark-600 rounded-lg p-4">
<h3 class="text-sm font-medium text-gray-400 mb-2">Content Hash</h3>
<code class="text-green-300 text-sm break-all">{content_hash}</code>
</div>
<div class="bg-dark-600 rounded-lg p-4">
<h3 class="text-sm font-medium text-gray-400 mb-2">Published</h3>
<span class="text-white">{published}</span>
</div>
<div class="bg-dark-600 rounded-lg p-4">
<h3 class="text-sm font-medium text-gray-400 mb-2">Activity ID</h3>
<code class="text-gray-300 text-xs break-all">{activity_id}</code>
</div>
</div>
</div>
{provenance_html}
<div class="border-t border-dark-500 pt-6 mt-6">
<h3 class="text-lg font-semibold text-white mb-4">ActivityPub</h3>
<div class="bg-dark-600 rounded-lg p-4">
<p class="text-sm text-gray-300 mb-2">
<span class="text-gray-400">Object URL:</span>
<a href="https://{DOMAIN}/objects/{content_hash}" target="_blank" class="text-blue-400 hover:text-blue-300 ml-2">https://{DOMAIN}/objects/{content_hash}</a>
</p>
<p class="text-sm text-gray-300">
<span class="text-gray-400">Actor:</span>
<a href="{actor_id}" target="_blank" class="text-blue-400 hover:text-blue-300 ml-2">{actor_id}</a>
</p>
</div>
</div>
'''
return HTMLResponse(base_html(f"Activity: {obj_name}", content, username))
async def ui_asset_detail(name: str, request: Request):
"""Asset detail page with content preview and provenance. Helper function for HTML rendering."""
username = get_user_from_cookie(request)
registry = await load_registry()
assets = registry.get("assets", {})
if name not in assets:
content = f'''
<h2 class="text-xl font-semibold text-white mb-4">Asset Not Found</h2>
<p class="text-gray-400">No asset named "{name}" exists.</p>
<p class="mt-4"><a href="/assets" class="text-blue-400 hover:text-blue-300">&larr; Back to Assets</a></p>
'''
return HTMLResponse(base_html("Asset Not Found", content, username))
asset = assets[name]
owner = asset.get("owner", "unknown")
content_hash = asset.get("content_hash", "")
ipfs_cid = asset.get("ipfs_cid", "")
asset_type = asset.get("asset_type", "")
tags = asset.get("tags", [])
description = asset.get("description", "")
origin = asset.get("origin") or {}
provenance = asset.get("provenance") or {}
metadata = asset.get("metadata") or {}
created_at = format_date(asset.get("created_at"))
type_color = "bg-blue-600" if asset_type == "image" else "bg-purple-600" if asset_type == "video" else "bg-gray-600"
# Determine L1 server URL for content
l1_server = provenance.get("l1_server", L1_PUBLIC_URL).rstrip("/")
# Content display - image or video from L1
if asset_type == "video":
# Use iOS-compatible MP4 endpoint
content_html = f'''
<div class="bg-dark-600 rounded-lg p-4 mb-6">
<video src="{l1_server}/cache/{content_hash}/mp4" controls autoplay muted loop playsinline
class="max-w-full max-h-96 mx-auto rounded-lg"></video>
<div class="mt-3 text-center">
<a href="{l1_server}/cache/{content_hash}/raw" download
class="inline-block px-4 py-2 bg-green-600 hover:bg-green-700 text-white font-medium rounded-lg transition-colors">
Download Original
</a>
</div>
</div>
'''
elif asset_type == "image":
content_html = f'''
<div class="bg-dark-600 rounded-lg p-4 mb-6">
<img src="{l1_server}/cache/{content_hash}/raw" alt="{name}"
class="max-w-full max-h-96 mx-auto rounded-lg">
<div class="mt-3 text-center">
<a href="{l1_server}/cache/{content_hash}/raw" download
class="inline-block px-4 py-2 bg-green-600 hover:bg-green-700 text-white font-medium rounded-lg transition-colors">
Download
</a>
</div>
</div>
'''
elif asset_type == "recipe":
# Fetch recipe source from L1 or IPFS
recipe_source = ""
try:
resp = requests.get(f"{l1_server}/cache/{content_hash}", timeout=10, headers={"Accept": "text/plain"})
if resp.status_code == 200:
recipe_source = resp.text
except Exception:
pass
if not recipe_source and ipfs_cid:
# Try IPFS
try:
import ipfs_client
recipe_bytes = ipfs_client.get_bytes(ipfs_cid)
if recipe_bytes:
recipe_source = recipe_bytes.decode('utf-8')
except Exception:
pass
import html as html_module
recipe_source_escaped = html_module.escape(recipe_source) if recipe_source else "(Could not load recipe source)"
content_html = f'''
<div class="bg-dark-600 rounded-lg p-4 mb-6">
<h3 class="text-lg font-semibold text-white mb-3">Recipe Source</h3>
<pre class="bg-dark-900 p-4 rounded-lg overflow-x-auto text-sm text-gray-300 font-mono whitespace-pre-wrap"><code>{recipe_source_escaped}</code></pre>
<div class="mt-3 text-center">
<a href="{l1_server}/cache/{content_hash}/raw" download
class="inline-block px-4 py-2 bg-green-600 hover:bg-green-700 text-white font-medium rounded-lg transition-colors">
Download YAML
</a>
</div>
</div>
'''
else:
content_html = f'''
<div class="bg-dark-600 rounded-lg p-4 mb-6 text-center">
<p class="text-gray-400 mb-3">Content type: {asset_type}</p>
<a href="{l1_server}/cache/{content_hash}/raw" download
class="inline-block px-4 py-2 bg-green-600 hover:bg-green-700 text-white font-medium rounded-lg transition-colors">
Download
</a>
</div>
'''
# Origin display
origin_html = '<span class="text-gray-500">Not specified</span>'
if origin:
origin_type = origin.get("type", "unknown")
if origin_type == "self":
origin_html = '<span class="text-green-400">Original content by author</span>'
elif origin_type == "external":
origin_url = origin.get("url", "")
origin_note = origin.get("note", "")
origin_html = f'<a href="{origin_url}" target="_blank" class="text-blue-400 hover:text-blue-300 break-all">{origin_url}</a>'
if origin_note:
origin_html += f'<p class="text-gray-500 text-sm mt-1">{origin_note}</p>'
# Tags display
tags_html = '<span class="text-gray-500">No tags</span>'
if tags:
tags_html = " ".join([f'<span class="px-2 py-1 bg-dark-500 text-gray-300 text-xs rounded">{t}</span>' for t in tags])
# IPFS display
if ipfs_cid:
local_gateway = f'<a href="{IPFS_GATEWAY_URL}/ipfs/{ipfs_cid}" target="_blank" class="px-2 py-1 bg-green-600 hover:bg-green-700 text-white text-xs rounded">Local</a>' if IPFS_GATEWAY_URL else ''
ipfs_html = f'''<code class="text-purple-300 text-sm break-all mb-2 block">{ipfs_cid}</code>
<div class="flex flex-wrap gap-1 mt-2">
{local_gateway}
<a href="https://ipfs.io/ipfs/{ipfs_cid}" target="_blank" class="px-2 py-1 bg-purple-600 hover:bg-purple-700 text-white text-xs rounded">ipfs.io</a>
<a href="https://dweb.link/ipfs/{ipfs_cid}" target="_blank" class="px-2 py-1 bg-purple-600 hover:bg-purple-700 text-white text-xs rounded">dweb.link</a>
</div>'''
else:
ipfs_html = '<span class="text-gray-500">Not on IPFS</span>'
# Provenance section - for rendered outputs
provenance_html = ""
if provenance:
recipe = provenance.get("recipe", "")
inputs = provenance.get("inputs", [])
l1_run_id = provenance.get("l1_run_id", "")
rendered_at = format_date(provenance.get("rendered_at"))
effects_commit = provenance.get("effects_commit", "")
infrastructure = provenance.get("infrastructure", {})
# Use stored effect_url or build fallback
effect_url = provenance.get("effect_url")
if not effect_url:
# Fallback for older records
if effects_commit and effects_commit != "unknown":
effect_url = f"{EFFECTS_REPO_URL}/src/commit/{effects_commit}/{recipe}"
else:
effect_url = f"{EFFECTS_REPO_URL}/src/branch/main/{recipe}"
# Build inputs display - show actual content as thumbnails
inputs_html = ""
for inp in inputs:
inp_hash = inp.get("content_hash", "") if isinstance(inp, dict) else inp
if inp_hash:
inputs_html += f'''
<div class="bg-dark-500 rounded-lg p-3 mb-3">
<div class="flex justify-center mb-2">
<video src="{l1_server}/cache/{inp_hash}/mp4"
class="max-h-40 rounded hidden" muted loop playsinline
onloadeddata="this.classList.remove('hidden'); this.nextElementSibling.classList.add('hidden'); this.play();">
</video>
<img src="{l1_server}/cache/{inp_hash}/raw" alt="Input"
class="max-h-40 rounded">
</div>
<div class="text-center">
<code class="text-xs text-gray-400">{inp_hash[:16]}...</code>
<a href="{l1_server}/cache/{inp_hash}" target="_blank"
class="text-blue-400 hover:text-blue-300 text-xs ml-2">view</a>
</div>
</div>
'''
# Infrastructure display
infra_html = ""
if infrastructure:
software = infrastructure.get("software", {})
hardware = infrastructure.get("hardware", {})
if software or hardware:
infra_html = f'''
<div class="bg-dark-600 rounded-lg p-4 sm:col-span-2">
<h4 class="text-sm font-medium text-gray-400 mb-2">Infrastructure</h4>
<div class="text-sm text-gray-300">
{f"Software: {software.get('name', 'unknown')}" if software else ""}
{f" ({software.get('content_hash', '')[:16]}...)" if software.get('content_hash') else ""}
{" | " if software and hardware else ""}
{f"Hardware: {hardware.get('name', 'unknown')}" if hardware else ""}
</div>
</div>
'''
provenance_html = f'''
<div class="border-t border-dark-500 pt-6 mt-6">
<h3 class="text-lg font-semibold text-white mb-4">Provenance</h3>
<p class="text-sm text-gray-400 mb-4">This asset was created by applying an effect to input content.</p>
<div class="grid gap-4 sm:grid-cols-2">
<div class="bg-dark-600 rounded-lg p-4">
<h4 class="text-sm font-medium text-gray-400 mb-2">Effect</h4>
<a href="{effect_url}" target="_blank"
class="inline-flex items-center gap-2 px-3 py-2 bg-blue-600 hover:bg-blue-700 text-white rounded-lg transition-colors">
<svg class="w-4 h-4" fill="none" stroke="currentColor" viewBox="0 0 24 24">
<path stroke-linecap="round" stroke-linejoin="round" stroke-width="2" d="M10 20l4-16m4 4l4 4-4 4M6 16l-4-4 4-4"/>
</svg>
{recipe}
</a>
{f'<div class="mt-2 text-xs text-gray-500">Commit: {effects_commit[:12]}...</div>' if effects_commit else ''}
</div>
<div class="bg-dark-600 rounded-lg p-4">
<h4 class="text-sm font-medium text-gray-400 mb-2">Input(s)</h4>
{inputs_html if inputs_html else '<span class="text-gray-500">No inputs recorded</span>'}
</div>
<div class="bg-dark-600 rounded-lg p-4">
<h4 class="text-sm font-medium text-gray-400 mb-2">L1 Run</h4>
<a href="{l1_server}/ui/detail/{l1_run_id}" target="_blank"
class="text-blue-400 hover:text-blue-300 font-mono text-xs">{l1_run_id[:16]}...</a>
</div>
<div class="bg-dark-600 rounded-lg p-4">
<h4 class="text-sm font-medium text-gray-400 mb-2">Rendered</h4>
<span class="text-white">{rendered_at if rendered_at else 'Unknown'}</span>
</div>
{infra_html}
</div>
</div>
'''
content = f'''
<p class="mb-4"><a href="/assets" class="text-blue-400 hover:text-blue-300">&larr; Back to Assets</a></p>
<div class="flex flex-wrap items-center gap-4 mb-6">
<h2 class="text-2xl font-bold text-white">{name}</h2>
<span class="px-3 py-1 {type_color} text-white text-sm rounded">{asset_type}</span>
</div>
{content_html}
<div class="grid gap-6 md:grid-cols-2">
<div class="space-y-4">
<div class="bg-dark-600 rounded-lg p-4">
<h3 class="text-sm font-medium text-gray-400 mb-2">Owner</h3>
<a href="/users/{owner}" class="text-blue-400 hover:text-blue-300 text-lg">{owner}</a>
</div>
<div class="bg-dark-600 rounded-lg p-4">
<h3 class="text-sm font-medium text-gray-400 mb-2">Description</h3>
<p class="text-white">{description if description else '<span class="text-gray-500">No description</span>'}</p>
</div>
<div class="bg-dark-600 rounded-lg p-4">
<h3 class="text-sm font-medium text-gray-400 mb-2">Origin</h3>
{origin_html}
</div>
</div>
<div class="space-y-4">
<div class="bg-dark-600 rounded-lg p-4">
<h3 class="text-sm font-medium text-gray-400 mb-2">Content Hash</h3>
<code class="text-green-300 text-sm break-all">{content_hash}</code>
</div>
<div class="bg-dark-600 rounded-lg p-4">
<h3 class="text-sm font-medium text-gray-400 mb-2">IPFS</h3>
{ipfs_html}
</div>
<div class="bg-dark-600 rounded-lg p-4">
<h3 class="text-sm font-medium text-gray-400 mb-2">Created</h3>
<span class="text-white">{created_at}</span>
</div>
<div class="bg-dark-600 rounded-lg p-4">
<h3 class="text-sm font-medium text-gray-400 mb-2">Tags</h3>
<div class="flex flex-wrap gap-2">{tags_html}</div>
</div>
</div>
</div>
{provenance_html}
<div class="border-t border-dark-500 pt-6 mt-6">
<h3 class="text-lg font-semibold text-white mb-4">ActivityPub</h3>
<div class="bg-dark-600 rounded-lg p-4">
<p class="text-sm text-gray-300 mb-2">
<span class="text-gray-400">Object URL:</span>
<a href="https://{DOMAIN}/objects/{content_hash}" target="_blank" class="text-blue-400 hover:text-blue-300 ml-2">https://{DOMAIN}/objects/{content_hash}</a>
</p>
<p class="text-sm text-gray-300">
<span class="text-gray-400">Owner Actor:</span>
<a href="https://{DOMAIN}/users/{owner}" target="_blank" class="text-blue-400 hover:text-blue-300 ml-2">https://{DOMAIN}/users/{owner}</a>
</p>
</div>
</div>
'''
return HTMLResponse(base_html(f"Asset: {name}", content, username))
async def ui_user_detail(username: str, request: Request):
"""User detail page showing their published assets. Helper function for HTML rendering."""
current_user = get_user_from_cookie(request)
if not await user_exists(username):
content = f'''
<h2 class="text-xl font-semibold text-white mb-4">User Not Found</h2>
<p class="text-gray-400">No user named "{username}" exists.</p>
<p class="mt-4"><a href="/users" class="text-blue-400 hover:text-blue-300">&larr; Back to Users</a></p>
'''
return HTMLResponse(base_html("User Not Found", content, current_user))
# Get user's assets
registry = await load_registry()
all_assets = registry.get("assets", {})
user_assets = {name: asset for name, asset in all_assets.items() if asset.get("owner") == username}
# Get user's activities
all_activities = await load_activities()
actor_id = f"https://{DOMAIN}/users/{username}"
user_activities = [a for a in all_activities if a.get("actor_id") == actor_id]
webfinger = f"@{username}@{DOMAIN}"
# Assets table
if user_assets:
rows = ""
for name, asset in sorted(user_assets.items(), key=lambda x: x[1].get("created_at", ""), reverse=True):
hash_short = asset.get("content_hash", "")[:16] + "..."
asset_type = asset.get("asset_type", "")
type_color = "bg-blue-600" if asset_type == "image" else "bg-purple-600" if asset_type == "video" else "bg-gray-600"
rows += f'''
<tr class="border-b border-dark-500 hover:bg-dark-600 transition-colors">
<td class="py-3 px-4">
<a href="/assets/{name}" class="text-blue-400 hover:text-blue-300 font-medium">{name}</a>
</td>
<td class="py-3 px-4"><span class="px-2 py-1 {type_color} text-white text-xs rounded">{asset_type}</span></td>
<td class="py-3 px-4"><code class="text-xs bg-dark-600 px-2 py-1 rounded text-green-300">{hash_short}</code></td>
<td class="py-3 px-4 text-gray-400">{", ".join(asset.get("tags", []))}</td>
</tr>
'''
assets_html = f'''
<div class="overflow-x-auto">
<table class="w-full text-sm">
<thead>
<tr class="bg-dark-600 text-left">
<th class="py-3 px-4 font-medium text-gray-300">Name</th>
<th class="py-3 px-4 font-medium text-gray-300">Type</th>
<th class="py-3 px-4 font-medium text-gray-300">Content Hash</th>
<th class="py-3 px-4 font-medium text-gray-300">Tags</th>
</tr>
</thead>
<tbody>
{rows}
</tbody>
</table>
</div>
'''
else:
assets_html = '<p class="text-gray-400">No published assets yet.</p>'
content = f'''
<p class="mb-4"><a href="/users" class="text-blue-400 hover:text-blue-300">&larr; Back to Users</a></p>
<div class="flex flex-wrap items-center gap-4 mb-6">
<h2 class="text-2xl font-bold text-white">{username}</h2>
<code class="px-3 py-1 bg-dark-600 text-gray-300 text-sm rounded">{webfinger}</code>
</div>
<div class="grid gap-4 sm:grid-cols-2 mb-8">
<div class="bg-dark-600 rounded-lg p-4 text-center">
<div class="text-2xl font-bold text-blue-400">{len(user_assets)}</div>
<div class="text-sm text-gray-400 mt-1">Published Assets</div>
</div>
<div class="bg-dark-600 rounded-lg p-4 text-center">
<div class="text-2xl font-bold text-blue-400">{len(user_activities)}</div>
<div class="text-sm text-gray-400 mt-1">Activities</div>
</div>
</div>
<div class="mb-6 p-4 bg-dark-600 rounded-lg">
<h3 class="text-sm font-medium text-gray-400 mb-2">ActivityPub</h3>
<p class="text-sm text-gray-300 mb-1">
Actor URL: <a href="https://{DOMAIN}/users/{username}" target="_blank" class="text-blue-400 hover:text-blue-300">https://{DOMAIN}/users/{username}</a>
</p>
<p class="text-sm text-gray-300">
Outbox: <a href="https://{DOMAIN}/users/{username}/outbox" target="_blank" class="text-blue-400 hover:text-blue-300">https://{DOMAIN}/users/{username}/outbox</a>
</p>
</div>
<h3 class="text-xl font-semibold text-white mb-4">Published Assets ({len(user_assets)})</h3>
{assets_html}
'''
return HTMLResponse(base_html(f"User: {username}", content, current_user))
# ============ API Endpoints ============
@app.get("/")
async def root(request: Request):
"""Server info. HTML shows home page with counts, JSON returns stats."""
registry = await load_registry()
activities = await load_activities()
users = await db.get_all_users()
assets_count = len(registry.get("assets", {}))
activities_count = len(activities)
users_count = len(users)
if wants_html(request):
username = get_user_from_cookie(request)
readme_html = markdown.markdown(README_CONTENT, extensions=['tables', 'fenced_code'])
content = f'''
<div class="grid gap-4 sm:grid-cols-3 mb-8">
<a href="/assets" class="block bg-dark-600 rounded-lg p-6 text-center hover:bg-dark-500 transition-colors">
<div class="text-3xl font-bold text-blue-400">{assets_count}</div>
<div class="text-sm text-gray-400 mt-1">Assets</div>
</a>
<a href="/activities" class="block bg-dark-600 rounded-lg p-6 text-center hover:bg-dark-500 transition-colors">
<div class="text-3xl font-bold text-blue-400">{activities_count}</div>
<div class="text-sm text-gray-400 mt-1">Activities</div>
</a>
<a href="/users" class="block bg-dark-600 rounded-lg p-6 text-center hover:bg-dark-500 transition-colors">
<div class="text-3xl font-bold text-blue-400">{users_count}</div>
<div class="text-sm text-gray-400 mt-1">Users</div>
</a>
</div>
<div class="prose prose-invert max-w-none
prose-headings:text-white prose-headings:border-b prose-headings:border-dark-500 prose-headings:pb-2
prose-h1:text-2xl prose-h1:mt-0
prose-h2:text-xl prose-h2:mt-6
prose-a:text-blue-400 hover:prose-a:text-blue-300
prose-pre:bg-dark-600 prose-pre:border prose-pre:border-dark-500
prose-code:bg-dark-600 prose-code:px-1 prose-code:py-0.5 prose-code:rounded prose-code:text-green-300
prose-table:border-collapse
prose-th:bg-dark-600 prose-th:border prose-th:border-dark-500 prose-th:px-4 prose-th:py-2
prose-td:border prose-td:border-dark-500 prose-td:px-4 prose-td:py-2">
{readme_html}
</div>
'''
return HTMLResponse(base_html("Home", content, username))
return {
"name": "Art DAG L2 Server",
"version": "0.1.0",
"domain": DOMAIN,
"assets_count": assets_count,
"activities_count": activities_count,
"users_count": users_count
}
# ============ Auth Endpoints ============
security = HTTPBearer(auto_error=False)
async def get_optional_user(
credentials: HTTPAuthorizationCredentials = Depends(security)
) -> Optional[User]:
"""Get current user if authenticated, None otherwise."""
if not credentials:
return None
return await get_current_user(DATA_DIR, credentials.credentials)
async def get_required_user(
credentials: HTTPAuthorizationCredentials = Depends(security)
) -> User:
"""Get current user, raise 401 if not authenticated."""
if not credentials:
raise HTTPException(401, "Not authenticated")
user = await get_current_user(DATA_DIR, credentials.credentials)
if not user:
raise HTTPException(401, "Invalid token")
return user
@app.post("/auth/register", response_model=Token)
async def register(req: UserCreate):
"""Register a new user."""
try:
user = await create_user(DATA_DIR, req.username, req.password, req.email)
except ValueError as e:
raise HTTPException(400, str(e))
return create_access_token(user.username, l2_server=f"https://{DOMAIN}")
@app.post("/auth/login", response_model=Token)
async def login(req: UserLogin):
"""Login and get access token."""
user = await authenticate_user(DATA_DIR, req.username, req.password)
if not user:
raise HTTPException(401, "Invalid username or password")
return create_access_token(user.username, l2_server=f"https://{DOMAIN}")
@app.get("/auth/me")
async def get_me(user: User = Depends(get_required_user)):
"""Get current user info."""
return {
"username": user.username,
"email": user.email,
"created_at": user.created_at
}
@app.post("/auth/verify")
async def verify_auth(credentials: HTTPAuthorizationCredentials = Depends(security)):
"""Verify a token and return username. Used by L1 server."""
if not credentials:
raise HTTPException(401, "No token provided")
username = verify_token(credentials.credentials)
if not username:
raise HTTPException(401, "Invalid token")
return {"username": username, "valid": True}
@app.get("/.well-known/webfinger")
async def webfinger(resource: str):
"""WebFinger endpoint for actor discovery."""
# Parse acct:username@domain
if not resource.startswith("acct:"):
raise HTTPException(400, "Resource must be acct: URI")
acct = resource[5:] # Remove "acct:"
if "@" not in acct:
raise HTTPException(400, "Invalid acct format")
username, domain = acct.split("@", 1)
if domain != DOMAIN:
raise HTTPException(404, f"Unknown domain: {domain}")
if not await user_exists(username):
raise HTTPException(404, f"Unknown user: {username}")
return JSONResponse(
content={
"subject": resource,
"links": [
{
"rel": "self",
"type": "application/activity+json",
"href": f"https://{DOMAIN}/users/{username}"
}
]
},
media_type="application/jrd+json"
)
@app.get("/users")
async def get_users_list(request: Request, page: int = 1, limit: int = 20):
"""Get all users. HTML for browsers (with infinite scroll), JSON for APIs (with pagination)."""
all_users = list((await db.get_all_users()).items())
total = len(all_users)
# Sort by username
all_users.sort(key=lambda x: x[0])
# Pagination
start = (page - 1) * limit
end = start + limit
users_page = all_users[start:end]
has_more = end < total
if wants_html(request):
username = get_user_from_cookie(request)
if not users_page:
if page == 1:
content = '''
<h2 class="text-xl font-semibold text-white mb-4">Users</h2>
<p class="text-gray-400">No users registered yet.</p>
'''
else:
return HTMLResponse("") # Empty for infinite scroll
else:
rows = ""
for uname, user_data in users_page:
webfinger = f"@{uname}@{DOMAIN}"
created_at = format_date(user_data.get("created_at"))
rows += f'''
<tr class="border-b border-dark-500 hover:bg-dark-600 transition-colors">
<td class="py-3 px-4">
<a href="/users/{uname}" class="text-blue-400 hover:text-blue-300 font-medium">{uname}</a>
</td>
<td class="py-3 px-4"><code class="text-xs bg-dark-600 px-2 py-1 rounded text-gray-300">{webfinger}</code></td>
<td class="py-3 px-4 text-gray-400">{created_at}</td>
</tr>
'''
# For infinite scroll, just return rows if not first page
if page > 1:
if has_more:
rows += f'''
<tr hx-get="/users?page={page + 1}" hx-trigger="revealed" hx-swap="afterend">
<td colspan="3" class="py-4 text-center text-gray-400">Loading more...</td>
</tr>
'''
return HTMLResponse(rows)
# First page - full content
infinite_scroll_trigger = ""
if has_more:
infinite_scroll_trigger = f'''
<tr hx-get="/users?page=2" hx-trigger="revealed" hx-swap="afterend">
<td colspan="3" class="py-4 text-center text-gray-400">Loading more...</td>
</tr>
'''
content = f'''
<h2 class="text-xl font-semibold text-white mb-6">Users ({total} total)</h2>
<div class="overflow-x-auto">
<table class="w-full text-sm">
<thead>
<tr class="bg-dark-600 text-left">
<th class="py-3 px-4 font-medium text-gray-300">Username</th>
<th class="py-3 px-4 font-medium text-gray-300">WebFinger</th>
<th class="py-3 px-4 font-medium text-gray-300">Created</th>
</tr>
</thead>
<tbody>
{rows}
{infinite_scroll_trigger}
</tbody>
</table>
</div>
'''
return HTMLResponse(base_html("Users", content, username))
# JSON response for APIs
return {
"users": [{"username": uname, **data} for uname, data in users_page],
"pagination": {
"page": page,
"limit": limit,
"total": total,
"has_more": has_more
}
}
@app.get("/users/{username}")
async def get_actor(username: str, request: Request):
"""Get actor profile for any registered user. Content negotiation: HTML for browsers, JSON for APIs."""
if not await user_exists(username):
if wants_html(request):
content = f'''
<h2 class="text-xl font-semibold text-white mb-4">User Not Found</h2>
<p class="text-gray-400">No user named "{username}" exists.</p>
<p class="mt-4"><a href="/users" class="text-blue-400 hover:text-blue-300">&larr; Back to Users</a></p>
'''
return HTMLResponse(base_html("User Not Found", content, get_user_from_cookie(request)))
raise HTTPException(404, f"Unknown user: {username}")
if wants_html(request):
# Render user detail page
return await ui_user_detail(username, request)
actor = load_actor(username)
# Add ActivityPub context
actor["@context"] = [
"https://www.w3.org/ns/activitystreams",
"https://w3id.org/security/v1"
]
return JSONResponse(
content=actor,
media_type="application/activity+json"
)
@app.get("/users/{username}/outbox")
async def get_outbox(username: str, page: bool = False):
"""Get actor's outbox (activities they created)."""
if not await user_exists(username):
raise HTTPException(404, f"Unknown user: {username}")
# Filter activities by this user's actor_id
all_activities = await load_activities()
actor_id = f"https://{DOMAIN}/users/{username}"
user_activities = [a for a in all_activities if a.get("actor_id") == actor_id]
if not page:
return JSONResponse(
content={
"@context": "https://www.w3.org/ns/activitystreams",
"id": f"https://{DOMAIN}/users/{username}/outbox",
"type": "OrderedCollection",
"totalItems": len(user_activities),
"first": f"https://{DOMAIN}/users/{username}/outbox?page=true"
},
media_type="application/activity+json"
)
# Return activities page
return JSONResponse(
content={
"@context": "https://www.w3.org/ns/activitystreams",
"id": f"https://{DOMAIN}/users/{username}/outbox?page=true",
"type": "OrderedCollectionPage",
"partOf": f"https://{DOMAIN}/users/{username}/outbox",
"orderedItems": user_activities
},
media_type="application/activity+json"
)
@app.post("/users/{username}/inbox")
async def post_inbox(username: str, request: Request):
"""Receive activities from other servers."""
if not await user_exists(username):
raise HTTPException(404, f"Unknown user: {username}")
body = await request.json()
activity_type = body.get("type")
# Handle Follow requests
if activity_type == "Follow":
follower_url = body.get("actor")
# Add follower to database
await db.add_follower(username, follower_url, follower_url)
# Send Accept (in production, do this async)
# For now just acknowledge
return {"status": "accepted"}
# Handle other activity types
return {"status": "received"}
@app.get("/users/{username}/followers")
async def get_followers(username: str):
"""Get actor's followers."""
if not await user_exists(username):
raise HTTPException(404, f"Unknown user: {username}")
# TODO: Per-user followers - for now use global followers
followers = await load_followers()
return JSONResponse(
content={
"@context": "https://www.w3.org/ns/activitystreams",
"id": f"https://{DOMAIN}/users/{username}/followers",
"type": "OrderedCollection",
"totalItems": len(followers),
"orderedItems": followers
},
media_type="application/activity+json"
)
# ============ Assets Endpoints ============
@app.get("/assets")
async def get_registry(request: Request, page: int = 1, limit: int = 20):
"""Get registry. HTML for browsers (with infinite scroll), JSON for APIs (with pagination)."""
registry = await load_registry()
all_assets = list(registry.get("assets", {}).items())
total = len(all_assets)
# Sort by created_at descending
all_assets.sort(key=lambda x: x[1].get("created_at", ""), reverse=True)
# Pagination
start = (page - 1) * limit
end = start + limit
assets_page = all_assets[start:end]
has_more = end < total
if wants_html(request):
username = get_user_from_cookie(request)
if not assets_page:
if page == 1:
content = '''
<h2 class="text-xl font-semibold text-white mb-4">Registry</h2>
<p class="text-gray-400">No assets registered yet.</p>
'''
else:
return HTMLResponse("") # Empty for infinite scroll
else:
rows = ""
for name, asset in assets_page:
asset_type = asset.get("asset_type", "")
type_color = "bg-blue-600" if asset_type == "image" else "bg-purple-600" if asset_type == "video" else "bg-gray-600"
owner = asset.get("owner", "unknown")
content_hash = asset.get("content_hash", "")[:16] + "..."
rows += f'''
<tr class="border-b border-dark-500 hover:bg-dark-600 transition-colors">
<td class="py-3 px-4 text-white font-medium">{name}</td>
<td class="py-3 px-4"><span class="px-2 py-1 {type_color} text-white text-xs rounded">{asset_type}</span></td>
<td class="py-3 px-4">
<a href="/users/{owner}" class="text-gray-400 hover:text-blue-300">{owner}</a>
</td>
<td class="py-3 px-4"><code class="text-xs bg-dark-600 px-2 py-1 rounded text-green-300">{content_hash}</code></td>
<td class="py-3 px-4">
<a href="/assets/{name}" class="px-3 py-1 bg-blue-600 hover:bg-blue-700 text-white text-xs font-medium rounded transition-colors">View</a>
</td>
</tr>
'''
# For infinite scroll, just return rows if not first page
if page > 1:
if has_more:
rows += f'''
<tr hx-get="/assets?page={page + 1}" hx-trigger="revealed" hx-swap="afterend">
<td colspan="5" class="py-4 text-center text-gray-400">Loading more...</td>
</tr>
'''
return HTMLResponse(rows)
# First page - full content
infinite_scroll_trigger = ""
if has_more:
infinite_scroll_trigger = f'''
<tr hx-get="/assets?page=2" hx-trigger="revealed" hx-swap="afterend">
<td colspan="5" class="py-4 text-center text-gray-400">Loading more...</td>
</tr>
'''
content = f'''
<h2 class="text-xl font-semibold text-white mb-6">Registry ({total} assets)</h2>
<div class="overflow-x-auto">
<table class="w-full text-sm">
<thead>
<tr class="bg-dark-600 text-left">
<th class="py-3 px-4 font-medium text-gray-300">Name</th>
<th class="py-3 px-4 font-medium text-gray-300">Type</th>
<th class="py-3 px-4 font-medium text-gray-300">Owner</th>
<th class="py-3 px-4 font-medium text-gray-300">Hash</th>
<th class="py-3 px-4 font-medium text-gray-300"></th>
</tr>
</thead>
<tbody>
{rows}
{infinite_scroll_trigger}
</tbody>
</table>
</div>
'''
return HTMLResponse(base_html("Registry", content, username))
# JSON response for APIs
return {
"assets": {name: asset for name, asset in assets_page},
"pagination": {
"page": page,
"limit": limit,
"total": total,
"has_more": has_more
}
}
@app.get("/asset/{name}")
async def get_asset_by_name_legacy(name: str):
"""Legacy route - redirect to /assets/{name}."""
return RedirectResponse(url=f"/assets/{name}", status_code=301)
@app.get("/assets/{name}")
async def get_asset(name: str, request: Request):
"""Get asset by name. HTML for browsers (default), JSON only if explicitly requested."""
registry = await load_registry()
# Check if JSON explicitly requested
accept = request.headers.get("accept", "")
wants_json = "application/json" in accept and "text/html" not in accept
if name not in registry.get("assets", {}):
if wants_json:
raise HTTPException(404, f"Asset not found: {name}")
content = f'''
<h2 class="text-xl font-semibold text-white mb-4">Asset Not Found</h2>
<p class="text-gray-400">No asset named "{name}" exists.</p>
<p class="mt-4"><a href="/assets" class="text-blue-400 hover:text-blue-300">&larr; Back to Assets</a></p>
'''
return HTMLResponse(base_html("Asset Not Found", content, get_user_from_cookie(request)))
if wants_json:
return registry["assets"][name]
# Default to HTML for browsers
return await ui_asset_detail(name, request)
@app.get("/assets/by-run-id/{run_id}")
async def get_asset_by_run_id(run_id: str):
"""
Get asset by content-addressable run_id.
Returns the asset info including output_hash and ipfs_cid for L1 recovery.
The run_id is stored in the asset's provenance when the run is recorded.
"""
asset = await db.get_asset_by_run_id(run_id)
if not asset:
raise HTTPException(404, f"No asset found for run_id: {run_id}")
return {
"run_id": run_id,
"asset_name": asset.get("name"),
"output_hash": asset.get("content_hash"),
"ipfs_cid": asset.get("ipfs_cid"),
"provenance_cid": asset.get("provenance", {}).get("provenance_cid") if asset.get("provenance") else None,
}
@app.patch("/assets/{name}")
async def update_asset(name: str, req: UpdateAssetRequest, user: User = Depends(get_required_user)):
"""Update an existing asset's metadata. Creates an Update activity."""
asset = await db.get_asset(name)
if not asset:
raise HTTPException(404, f"Asset not found: {name}")
# Check ownership
if asset.get("owner") != user.username:
raise HTTPException(403, f"Not authorized to update asset owned by {asset.get('owner')}")
# Build updates dict
updates = {}
if req.description is not None:
updates["description"] = req.description
if req.tags is not None:
updates["tags"] = req.tags
if req.metadata is not None:
updates["metadata"] = {**asset.get("metadata", {}), **req.metadata}
if req.origin is not None:
updates["origin"] = req.origin
if req.ipfs_cid is not None:
updates["ipfs_cid"] = req.ipfs_cid
# Pin on IPFS (fire-and-forget, don't block)
import threading
threading.Thread(target=_pin_ipfs_async, args=(req.ipfs_cid,), daemon=True).start()
# Update asset in database
updated_asset = await db.update_asset(name, updates)
# Create Update activity
activity = {
"activity_id": str(uuid.uuid4()),
"activity_type": "Update",
"actor_id": f"https://{DOMAIN}/users/{user.username}",
"object_data": {
"type": updated_asset.get("asset_type", "Object").capitalize(),
"name": name,
"id": f"https://{DOMAIN}/objects/{updated_asset['content_hash']}",
"contentHash": {
"algorithm": "sha3-256",
"value": updated_asset["content_hash"]
},
"attributedTo": f"https://{DOMAIN}/users/{user.username}",
"summary": req.description,
"tag": req.tags or updated_asset.get("tags", [])
},
"published": updated_asset.get("updated_at", datetime.now(timezone.utc).isoformat())
}
# Sign activity with the user's keys
activity = sign_activity(activity, user.username)
# Save activity to database
await db.create_activity(activity)
return {"asset": updated_asset, "activity": activity}
def _pin_ipfs_async(cid: str):
"""Pin IPFS content in background thread."""
try:
import ipfs_client
if ipfs_client.is_available():
ipfs_client.pin(cid)
logger.info(f"Pinned IPFS content: {cid}")
except Exception as e:
logger.warning(f"Failed to pin IPFS content {cid}: {e}")
async def _register_asset_impl(req: RegisterRequest, owner: str):
"""
Internal implementation for registering an asset atomically.
Requires IPFS CID - content must be on IPFS before registering.
Uses a transaction for all DB operations.
"""
import ipfs_client
from ipfs_client import IPFSError
logger.info(f"register_asset: Starting for {req.name} (hash={req.content_hash[:16]}...)")
# ===== PHASE 1: VALIDATION =====
# IPFS CID is required
if not req.ipfs_cid:
raise HTTPException(400, "IPFS CID is required for registration")
# Check if name exists - return existing asset if so
existing = await db.get_asset_by_name(req.name)
if existing:
logger.info(f"register_asset: Asset {req.name} already exists, returning existing")
return {"asset": existing, "activity": None, "existing": True}
# ===== PHASE 2: IPFS OPERATIONS (non-blocking) =====
import asyncio
logger.info(f"register_asset: Pinning CID {req.ipfs_cid[:16]}... on IPFS")
try:
await asyncio.to_thread(ipfs_client.pin_or_raise, req.ipfs_cid)
logger.info("register_asset: CID pinned successfully")
except IPFSError as e:
logger.error(f"register_asset: IPFS pin failed: {e}")
raise HTTPException(500, f"IPFS operation failed: {e}")
# ===== PHASE 3: DB TRANSACTION =====
now = datetime.now(timezone.utc).isoformat()
try:
async with db.transaction() as conn:
# Check name again inside transaction (race condition protection)
if await db.asset_exists_by_name_tx(conn, req.name):
# Race condition - another request created it first, return existing
existing = await db.get_asset_by_name(req.name)
logger.info(f"register_asset: Asset {req.name} created by concurrent request")
return {"asset": existing, "activity": None, "existing": True}
# Create asset
asset = {
"name": req.name,
"content_hash": req.content_hash,
"ipfs_cid": req.ipfs_cid,
"asset_type": req.asset_type,
"tags": req.tags,
"metadata": req.metadata,
"url": req.url,
"provenance": req.provenance,
"owner": owner,
"created_at": now
}
created_asset = await db.create_asset_tx(conn, asset)
# Create ownership activity
object_data = {
"type": req.asset_type.capitalize(),
"name": req.name,
"id": f"https://{DOMAIN}/objects/{req.content_hash}",
"contentHash": {
"algorithm": "sha3-256",
"value": req.content_hash
},
"attributedTo": f"https://{DOMAIN}/users/{owner}"
}
# Include provenance in activity object_data if present
if req.provenance:
object_data["provenance"] = req.provenance
activity = {
"activity_id": req.content_hash, # Content-addressable by content hash
"activity_type": "Create",
"actor_id": f"https://{DOMAIN}/users/{owner}",
"object_data": object_data,
"published": now
}
activity = sign_activity(activity, owner)
created_activity = await db.create_activity_tx(conn, activity)
# Transaction commits here on successful exit
except HTTPException:
raise
except Exception as e:
logger.error(f"register_asset: Database transaction failed: {e}")
raise HTTPException(500, f"Failed to register asset: {e}")
logger.info(f"register_asset: Successfully registered {req.name}")
return {"asset": created_asset, "activity": created_activity}
@app.post("/assets")
async def register_asset(req: RegisterRequest, user: User = Depends(get_required_user)):
"""Register a new asset and create ownership activity. Requires authentication."""
return await _register_asset_impl(req, user.username)
@app.post("/assets/record-run")
@app.post("/registry/record-run") # Legacy route
async def record_run(req: RecordRunRequest, user: User = Depends(get_required_user)):
"""
Record an L1 run and register the output atomically.
Ensures all operations succeed or none do:
1. All input assets registered (if not already on L2) + pinned on IPFS
2. Output asset registered + pinned on IPFS
3. Recipe serialized to JSON, stored on IPFS, CID saved in provenance
"""
import ipfs_client
from ipfs_client import IPFSError
# ===== PHASE 1: PREPARATION (read-only, non-blocking) =====
import asyncio
l1_url = req.l1_server.rstrip('/')
logger.info(f"record_run: Starting for run_id={req.run_id} from {l1_url}")
# Helper to fetch from L1 without blocking event loop
def fetch_l1_run():
import time as _time
url = f"{l1_url}/runs/{req.run_id}"
logger.info(f"record_run: Fetching run from L1: {url}")
t0 = _time.time()
resp = requests.get(url, timeout=30)
logger.info(f"record_run: L1 request took {_time.time()-t0:.3f}s, status={resp.status_code}")
if resp.status_code == 404:
raise ValueError(f"Run not found on L1: {req.run_id}")
resp.raise_for_status()
try:
return resp.json()
except Exception:
body_preview = resp.text[:200] if resp.text else "(empty)"
logger.error(f"L1 returned non-JSON for {url}: status={resp.status_code}, body={body_preview}")
raise ValueError(f"L1 returned invalid response: {body_preview[:100]}")
def fetch_l1_cache(content_hash):
logger.debug(f"record_run: Fetching cache {content_hash[:16]}... from L1")
url = f"{l1_url}/cache/{content_hash}"
resp = requests.get(url, headers={"Accept": "application/json"}, timeout=10)
if resp.status_code == 404:
raise ValueError(f"Cache item not found on L1: {content_hash[:16]}...")
resp.raise_for_status()
try:
return resp.json()
except Exception as e:
# Log what we actually got back
body_preview = resp.text[:200] if resp.text else "(empty)"
logger.error(f"L1 returned non-JSON for {url}: status={resp.status_code}, body={body_preview}")
raise ValueError(f"L1 returned invalid response (status={resp.status_code}): {body_preview[:100]}")
# Fetch run from L1
try:
run = await asyncio.to_thread(fetch_l1_run)
logger.info(f"record_run: Fetched run, status={run.get('status')}, inputs={len(run.get('inputs', []))}")
except Exception as e:
logger.error(f"record_run: Failed to fetch run from L1: {e}")
raise HTTPException(400, f"Failed to fetch run from L1 ({l1_url}): {e}")
if run.get("status") != "completed":
raise HTTPException(400, f"Run not completed: {run.get('status')}")
output_hash = run.get("output_hash")
if not output_hash:
raise HTTPException(400, "Run has no output hash")
# Fetch output cache info from L1 (must exist - it's new)
logger.info(f"record_run: Fetching output cache {output_hash[:16]}... from L1")
try:
cache_info = await asyncio.to_thread(fetch_l1_cache, output_hash)
output_media_type = cache_info.get("media_type", "image")
output_ipfs_cid = cache_info.get("ipfs_cid")
logger.info(f"record_run: Output has IPFS CID: {output_ipfs_cid[:16] if output_ipfs_cid else 'None'}...")
except Exception as e:
logger.error(f"record_run: Failed to fetch output cache info: {e}")
raise HTTPException(400, f"Failed to fetch output cache info: {e}")
if not output_ipfs_cid:
logger.error("record_run: Output has no IPFS CID")
raise HTTPException(400, "Output has no IPFS CID - cannot publish")
# Gather input info: check L2 first, then fall back to L1
input_hashes = run.get("inputs", [])
input_infos = [] # List of {content_hash, ipfs_cid, media_type, existing_asset}
logger.info(f"record_run: Gathering info for {len(input_hashes)} inputs")
for input_hash in input_hashes:
# Check if already on L2
existing = await db.get_asset_by_hash(input_hash)
if existing and existing.get("ipfs_cid"):
logger.info(f"record_run: Input {input_hash[:16]}... found on L2")
input_infos.append({
"content_hash": input_hash,
"ipfs_cid": existing["ipfs_cid"],
"media_type": existing.get("asset_type", "image"),
"existing_asset": existing
})
else:
# Not on L2, try L1
logger.info(f"record_run: Input {input_hash[:16]}... not on L2, fetching from L1")
try:
inp_info = await asyncio.to_thread(fetch_l1_cache, input_hash)
ipfs_cid = inp_info.get("ipfs_cid")
if not ipfs_cid:
logger.error(f"record_run: Input {input_hash[:16]}... has no IPFS CID")
raise HTTPException(400, f"Input {input_hash[:16]}... has no IPFS CID (not on L2 or L1)")
input_infos.append({
"content_hash": input_hash,
"ipfs_cid": ipfs_cid,
"media_type": inp_info.get("media_type", "image"),
"existing_asset": None
})
except HTTPException:
raise
except Exception as e:
logger.error(f"record_run: Failed to fetch input {input_hash[:16]}... from L1: {e}")
raise HTTPException(400, f"Input {input_hash[:16]}... not on L2 and failed to fetch from L1: {e}")
# Prepare recipe data
recipe_data = run.get("recipe")
if not recipe_data:
recipe_data = {
"name": run.get("recipe_name", "unknown"),
"effect_url": run.get("effect_url"),
"effects_commit": run.get("effects_commit"),
}
# Build registered_inputs list - all referenced by content_hash
registered_inputs = []
for inp in input_infos:
registered_inputs.append({
"content_hash": inp["content_hash"],
"ipfs_cid": inp["ipfs_cid"]
})
# ===== PHASE 2: IPFS OPERATIONS (non-blocking for event loop) =====
def do_ipfs_operations():
"""Run IPFS operations in thread pool to not block event loop."""
from concurrent.futures import ThreadPoolExecutor, as_completed
# Collect all CIDs to pin (inputs + output)
cids_to_pin = [inp["ipfs_cid"] for inp in input_infos] + [output_ipfs_cid]
logger.info(f"record_run: Pinning {len(cids_to_pin)} CIDs on IPFS")
# Pin all in parallel
with ThreadPoolExecutor(max_workers=5) as executor:
futures = {executor.submit(ipfs_client.pin_or_raise, cid): cid for cid in cids_to_pin}
for future in as_completed(futures):
future.result() # Raises IPFSError if failed
logger.info("record_run: All CIDs pinned successfully")
# Store recipe on IPFS
logger.info("record_run: Storing recipe on IPFS")
recipe_cid = ipfs_client.add_json(recipe_data)
# Build and store full provenance on IPFS
# Compute content-addressable run_id from inputs + recipe
recipe_name = recipe_data.get("name", "unknown") if isinstance(recipe_data, dict) else str(recipe_data)
run_id = compute_run_id(input_hashes, recipe_name)
provenance = {
"run_id": run_id, # Content-addressable run identifier
"inputs": registered_inputs,
"output": {
"content_hash": output_hash,
"ipfs_cid": output_ipfs_cid
},
"recipe": recipe_data,
"recipe_cid": recipe_cid,
"effect_url": run.get("effect_url"),
"effects_commit": run.get("effects_commit"),
"l1_server": l1_url,
"l1_run_id": req.run_id,
"rendered_at": run.get("completed_at"),
"infrastructure": run.get("infrastructure")
}
logger.info("record_run: Storing provenance on IPFS")
provenance_cid = ipfs_client.add_json(provenance)
return recipe_cid, provenance_cid, provenance
try:
import asyncio
recipe_cid, provenance_cid, provenance = await asyncio.to_thread(do_ipfs_operations)
logger.info(f"record_run: Recipe CID: {recipe_cid[:16]}..., Provenance CID: {provenance_cid[:16]}...")
except IPFSError as e:
logger.error(f"record_run: IPFS operation failed: {e}")
raise HTTPException(500, f"IPFS operation failed: {e}")
# ===== PHASE 3: DB TRANSACTION (all-or-nothing) =====
logger.info("record_run: Starting DB transaction")
now = datetime.now(timezone.utc).isoformat()
# Add provenance_cid to provenance for storage in DB
provenance["provenance_cid"] = provenance_cid
try:
async with db.transaction() as conn:
# Register input assets (if not already on L2) - named by content_hash
for inp in input_infos:
if not inp["existing_asset"]:
media_type = inp["media_type"]
tags = ["auto-registered", "input"]
if media_type == "recipe":
tags.append("recipe")
input_asset = {
"name": inp["content_hash"], # Use content_hash as name
"content_hash": inp["content_hash"],
"ipfs_cid": inp["ipfs_cid"],
"asset_type": media_type,
"tags": tags,
"metadata": {"auto_registered_from_run": req.run_id},
"owner": user.username,
"created_at": now
}
await db.create_asset_tx(conn, input_asset)
# Check if output already exists (by content_hash) - return existing if so
if await db.asset_exists_by_name_tx(conn, output_hash):
existing = await db.get_asset_by_name(output_hash)
logger.info(f"record_run: Output {output_hash[:16]}... already exists")
return {"asset": existing, "activity": None, "existing": True}
# Create output asset with provenance - named by content_hash
output_asset = {
"name": output_hash, # Use content_hash as name
"content_hash": output_hash,
"ipfs_cid": output_ipfs_cid,
"asset_type": output_media_type,
"tags": ["rendered", "l1"],
"metadata": {"l1_server": l1_url, "l1_run_id": req.run_id},
"provenance": provenance,
"owner": user.username,
"created_at": now
}
created_asset = await db.create_asset_tx(conn, output_asset)
# Create activity - all referenced by content_hash
object_data = {
"type": output_media_type.capitalize(),
"name": output_hash, # Use content_hash as name
"id": f"https://{DOMAIN}/objects/{output_hash}",
"contentHash": {
"algorithm": "sha3-256",
"value": output_hash
},
"attributedTo": f"https://{DOMAIN}/users/{user.username}",
"provenance": provenance
}
activity = {
"activity_id": provenance["run_id"], # Content-addressable run_id
"activity_type": "Create",
"actor_id": f"https://{DOMAIN}/users/{user.username}",
"object_data": object_data,
"published": now
}
activity = sign_activity(activity, user.username)
created_activity = await db.create_activity_tx(conn, activity)
# Transaction commits here on successful exit
except HTTPException:
raise
except Exception as e:
logger.error(f"record_run: Database transaction failed: {e}")
raise HTTPException(500, f"Failed to record run: {e}")
logger.info(f"record_run: Successfully published {output_hash[:16]}... with {len(registered_inputs)} inputs")
return {"asset": created_asset, "activity": created_activity}
@app.post("/assets/publish-cache")
async def publish_cache(req: PublishCacheRequest, user: User = Depends(get_required_user)):
"""
Publish a cache item from L1 with metadata atomically.
Requires origin to be set (self or external URL).
Requires IPFS CID - content must be on IPFS before publishing.
Creates a new asset and Create activity in a single transaction.
"""
import ipfs_client
from ipfs_client import IPFSError
logger.info(f"publish_cache: Starting for {req.asset_name} (hash={req.content_hash[:16]}...)")
# ===== PHASE 1: VALIDATION =====
# Validate origin
if not req.origin or "type" not in req.origin:
raise HTTPException(400, "Origin is required for publishing (type: 'self' or 'external')")
origin_type = req.origin.get("type")
if origin_type not in ("self", "external"):
raise HTTPException(400, "Origin type must be 'self' or 'external'")
if origin_type == "external" and not req.origin.get("url"):
raise HTTPException(400, "External origin requires a URL")
# IPFS CID is now required
if not req.ipfs_cid:
raise HTTPException(400, "IPFS CID is required for publishing")
# Check if asset name already exists
if await db.asset_exists(req.asset_name):
raise HTTPException(400, f"Asset name already exists: {req.asset_name}")
# ===== PHASE 2: IPFS OPERATIONS (non-blocking) =====
import asyncio
logger.info(f"publish_cache: Pinning CID {req.ipfs_cid[:16]}... on IPFS")
try:
await asyncio.to_thread(ipfs_client.pin_or_raise, req.ipfs_cid)
logger.info("publish_cache: CID pinned successfully")
except IPFSError as e:
logger.error(f"publish_cache: IPFS pin failed: {e}")
raise HTTPException(500, f"IPFS operation failed: {e}")
# ===== PHASE 3: DB TRANSACTION =====
logger.info("publish_cache: Starting DB transaction")
now = datetime.now(timezone.utc).isoformat()
try:
async with db.transaction() as conn:
# Check name again inside transaction (race condition protection)
if await db.asset_exists_by_name_tx(conn, req.asset_name):
raise HTTPException(400, f"Asset name already exists: {req.asset_name}")
# Create asset
asset = {
"name": req.asset_name,
"content_hash": req.content_hash,
"ipfs_cid": req.ipfs_cid,
"asset_type": req.asset_type,
"tags": req.tags,
"description": req.description,
"origin": req.origin,
"metadata": req.metadata,
"owner": user.username,
"created_at": now
}
created_asset = await db.create_asset_tx(conn, asset)
# Create ownership activity with origin info
object_data = {
"type": req.asset_type.capitalize(),
"name": req.asset_name,
"id": f"https://{DOMAIN}/objects/{req.content_hash}",
"contentHash": {
"algorithm": "sha3-256",
"value": req.content_hash
},
"attributedTo": f"https://{DOMAIN}/users/{user.username}",
"tag": req.tags
}
if req.description:
object_data["summary"] = req.description
# Include origin in ActivityPub object
if origin_type == "self":
object_data["generator"] = {
"type": "Application",
"name": "Art DAG",
"note": "Original content created by the author"
}
else:
object_data["source"] = {
"type": "Link",
"href": req.origin.get("url"),
"name": req.origin.get("note", "External source")
}
activity = {
"activity_id": req.content_hash, # Content-addressable by content hash
"activity_type": "Create",
"actor_id": f"https://{DOMAIN}/users/{user.username}",
"object_data": object_data,
"published": now
}
activity = sign_activity(activity, user.username)
created_activity = await db.create_activity_tx(conn, activity)
# Transaction commits here on successful exit
except HTTPException:
raise
except Exception as e:
logger.error(f"publish_cache: Database transaction failed: {e}")
raise HTTPException(500, f"Failed to publish cache item: {e}")
logger.info(f"publish_cache: Successfully published {req.asset_name}")
return {"asset": created_asset, "activity": created_activity}
# ============ Activities Endpoints ============
@app.get("/activities")
async def get_activities(request: Request, page: int = 1, limit: int = 20):
"""Get activities. HTML for browsers (with infinite scroll), JSON for APIs (with pagination)."""
all_activities = await load_activities()
total = len(all_activities)
# Reverse for newest first
all_activities = list(reversed(all_activities))
# Pagination
start = (page - 1) * limit
end = start + limit
activities_page = all_activities[start:end]
has_more = end < total
if wants_html(request):
username = get_user_from_cookie(request)
if not activities_page:
if page == 1:
content = '''
<h2 class="text-xl font-semibold text-white mb-4">Activities</h2>
<p class="text-gray-400">No activities yet.</p>
'''
else:
return HTMLResponse("") # Empty for infinite scroll
else:
rows = ""
for i, activity in enumerate(activities_page):
activity_index = total - 1 - (start + i) # Original index
obj = activity.get("object_data", {})
activity_type = activity.get("activity_type", "")
type_color = "bg-green-600" if activity_type == "Create" else "bg-yellow-600" if activity_type == "Update" else "bg-gray-600"
actor_id = activity.get("actor_id", "")
actor_name = actor_id.split("/")[-1] if actor_id else "unknown"
rows += f'''
<tr class="border-b border-dark-500 hover:bg-dark-600 transition-colors">
<td class="py-3 px-4"><span class="px-2 py-1 {type_color} text-white text-xs font-medium rounded">{activity_type}</span></td>
<td class="py-3 px-4 text-white">{obj.get("name", "Untitled")}</td>
<td class="py-3 px-4">
<a href="/users/{actor_name}" class="text-gray-400 hover:text-blue-300">{actor_name}</a>
</td>
<td class="py-3 px-4 text-gray-400">{format_date(activity.get("published"))}</td>
<td class="py-3 px-4">
<a href="/activities/{activity_index}" class="px-3 py-1 bg-blue-600 hover:bg-blue-700 text-white text-xs font-medium rounded transition-colors">View</a>
</td>
</tr>
'''
# For infinite scroll, just return rows if not first page
if page > 1:
if has_more:
rows += f'''
<tr hx-get="/activities?page={page + 1}" hx-trigger="revealed" hx-swap="afterend">
<td colspan="5" class="py-4 text-center text-gray-400">Loading more...</td>
</tr>
'''
return HTMLResponse(rows)
# First page - full content with header
infinite_scroll_trigger = ""
if has_more:
infinite_scroll_trigger = f'''
<tr hx-get="/activities?page=2" hx-trigger="revealed" hx-swap="afterend">
<td colspan="5" class="py-4 text-center text-gray-400">Loading more...</td>
</tr>
'''
content = f'''
<h2 class="text-xl font-semibold text-white mb-6">Activities ({total} total)</h2>
<div class="overflow-x-auto">
<table class="w-full text-sm">
<thead>
<tr class="bg-dark-600 text-left">
<th class="py-3 px-4 font-medium text-gray-300">Type</th>
<th class="py-3 px-4 font-medium text-gray-300">Object</th>
<th class="py-3 px-4 font-medium text-gray-300">Actor</th>
<th class="py-3 px-4 font-medium text-gray-300">Published</th>
<th class="py-3 px-4 font-medium text-gray-300"></th>
</tr>
</thead>
<tbody>
{rows}
{infinite_scroll_trigger}
</tbody>
</table>
</div>
'''
return HTMLResponse(base_html("Activities", content, username))
# JSON response for APIs
return {
"activities": activities_page,
"pagination": {
"page": page,
"limit": limit,
"total": total,
"has_more": has_more
}
}
@app.get("/activities/{activity_index}")
async def get_activity_detail(activity_index: int, request: Request):
"""Get single activity. HTML for browsers (default), JSON only if explicitly requested."""
activities = await load_activities()
# Check if JSON explicitly requested
accept = request.headers.get("accept", "")
wants_json = ("application/json" in accept or "application/activity+json" in accept) and "text/html" not in accept
if activity_index < 0 or activity_index >= len(activities):
if wants_json:
raise HTTPException(404, "Activity not found")
content = '''
<h2 class="text-xl font-semibold text-white mb-4">Activity Not Found</h2>
<p class="text-gray-400">This activity does not exist.</p>
<p class="mt-4"><a href="/activities" class="text-blue-400 hover:text-blue-300">&larr; Back to Activities</a></p>
'''
return HTMLResponse(base_html("Activity Not Found", content, get_user_from_cookie(request)))
activity = activities[activity_index]
if wants_json:
return activity
# Default to HTML for browsers
return await ui_activity_detail(activity_index, request)
@app.get("/activity/{activity_index}")
async def get_activity_legacy(activity_index: int):
"""Legacy route - redirect to /activities/{activity_index}."""
return RedirectResponse(url=f"/activities/{activity_index}", status_code=301)
@app.get("/objects/{content_hash}")
async def get_object(content_hash: str, request: Request):
"""Get object by content hash. Content negotiation: HTML for browsers, JSON for APIs."""
registry = await load_registry()
# Find asset by hash
for name, asset in registry.get("assets", {}).items():
if asset.get("content_hash") == content_hash:
# Check Accept header - only return JSON if explicitly requested
accept = request.headers.get("accept", "")
wants_json = ("application/json" in accept or "application/activity+json" in accept) and "text/html" not in accept
if not wants_json:
# Default: redirect to detail page for browsers
return RedirectResponse(url=f"/assets/{name}", status_code=303)
owner = asset.get("owner", "unknown")
return JSONResponse(
content={
"@context": "https://www.w3.org/ns/activitystreams",
"id": f"https://{DOMAIN}/objects/{content_hash}",
"type": asset.get("asset_type", "Object").capitalize(),
"name": name,
"contentHash": {
"algorithm": "sha3-256",
"value": content_hash
},
"attributedTo": f"https://{DOMAIN}/users/{owner}",
"published": asset.get("created_at")
},
media_type="application/activity+json"
)
raise HTTPException(404, f"Object not found: {content_hash}")
# ============ Anchoring (Bitcoin timestamps) ============
@app.post("/anchors/create")
async def create_anchor_endpoint(request: Request):
"""
Create a new anchor for all unanchored activities.
Builds a merkle tree, stores it on IPFS, and submits to OpenTimestamps
for Bitcoin anchoring. The anchor proof is backed up to persistent storage.
"""
import anchoring
import ipfs_client
# Check auth (cookie or header)
username = get_user_from_cookie(request)
if not username:
if wants_html(request):
return HTMLResponse('''
<div class="p-3 bg-red-900/50 border border-red-700 rounded text-red-300">
<strong>Error:</strong> Login required
</div>
''')
raise HTTPException(401, "Authentication required")
# Get unanchored activities
unanchored = await db.get_unanchored_activities()
if not unanchored:
if wants_html(request):
return HTMLResponse('''
<div class="p-3 bg-yellow-900/50 border border-yellow-700 rounded text-yellow-300">
No unanchored activities to anchor.
</div>
''')
return {"message": "No unanchored activities", "anchored": 0}
activity_ids = [a["activity_id"] for a in unanchored]
# Create anchor
anchor = await anchoring.create_anchor(activity_ids, db, ipfs_client)
if anchor:
if wants_html(request):
return HTMLResponse(f'''
<div class="p-3 bg-green-900/50 border border-green-700 rounded text-green-300">
<strong>Success!</strong> Anchored {len(activity_ids)} activities.<br>
<span class="text-sm">Merkle root: {anchor["merkle_root"][:32]}...</span><br>
<span class="text-sm">Refresh page to see the new anchor.</span>
</div>
''')
return {
"message": f"Anchored {len(activity_ids)} activities",
"merkle_root": anchor["merkle_root"],
"tree_ipfs_cid": anchor.get("tree_ipfs_cid"),
"activity_count": anchor["activity_count"]
}
else:
if wants_html(request):
return HTMLResponse('''
<div class="p-3 bg-red-900/50 border border-red-700 rounded text-red-300">
<strong>Failed!</strong> Could not create anchor.
</div>
''')
raise HTTPException(500, "Failed to create anchor")
@app.get("/anchors")
async def list_anchors():
"""List all anchors."""
anchors = await db.get_all_anchors()
stats = await db.get_anchor_stats()
return {
"anchors": anchors,
"stats": stats
}
@app.get("/anchors/{merkle_root}")
async def get_anchor_endpoint(merkle_root: str):
"""Get anchor details by merkle root."""
anchor = await db.get_anchor(merkle_root)
if not anchor:
raise HTTPException(404, f"Anchor not found: {merkle_root}")
return anchor
@app.get("/anchors/{merkle_root}/tree")
async def get_anchor_tree(merkle_root: str):
"""Get the full merkle tree from IPFS."""
import asyncio
import ipfs_client
anchor = await db.get_anchor(merkle_root)
if not anchor:
raise HTTPException(404, f"Anchor not found: {merkle_root}")
tree_cid = anchor.get("tree_ipfs_cid")
if not tree_cid:
raise HTTPException(404, "Anchor has no tree on IPFS")
try:
tree_bytes = await asyncio.to_thread(ipfs_client.get_bytes, tree_cid)
if tree_bytes:
return json.loads(tree_bytes)
except Exception as e:
raise HTTPException(500, f"Failed to fetch tree from IPFS: {e}")
@app.get("/anchors/verify/{activity_id}")
async def verify_activity_anchor(activity_id: str):
"""
Verify an activity's anchor proof.
Returns the merkle proof showing this activity is included in an anchored batch.
"""
import anchoring
import ipfs_client
# Get activity
activity = await db.get_activity(activity_id)
if not activity:
raise HTTPException(404, f"Activity not found: {activity_id}")
anchor_root = activity.get("anchor_root")
if not anchor_root:
return {"verified": False, "reason": "Activity not yet anchored"}
# Get anchor
anchor = await db.get_anchor(anchor_root)
if not anchor:
return {"verified": False, "reason": "Anchor record not found"}
# Get tree from IPFS (non-blocking)
import asyncio
tree_cid = anchor.get("tree_ipfs_cid")
if not tree_cid:
return {"verified": False, "reason": "Merkle tree not on IPFS"}
try:
tree_bytes = await asyncio.to_thread(ipfs_client.get_bytes, tree_cid)
tree = json.loads(tree_bytes) if tree_bytes else None
except Exception:
return {"verified": False, "reason": "Failed to fetch tree from IPFS"}
if not tree:
return {"verified": False, "reason": "Could not load merkle tree"}
# Get proof
proof = anchoring.get_merkle_proof(tree, activity_id)
if not proof:
return {"verified": False, "reason": "Activity not in merkle tree"}
# Verify proof
valid = anchoring.verify_merkle_proof(activity_id, proof, anchor_root)
return {
"verified": valid,
"activity_id": activity_id,
"merkle_root": anchor_root,
"tree_ipfs_cid": tree_cid,
"proof": proof,
"bitcoin_txid": anchor.get("bitcoin_txid"),
"confirmed_at": anchor.get("confirmed_at")
}
@app.post("/anchors/{merkle_root}/upgrade")
async def upgrade_anchor_proof(merkle_root: str):
"""
Try to upgrade an OTS proof from pending to confirmed.
Bitcoin confirmation typically takes 1-2 hours. Call this periodically
to check if the proof has been included in a Bitcoin block.
"""
import anchoring
import ipfs_client
import asyncio
anchor = await db.get_anchor(merkle_root)
if not anchor:
raise HTTPException(404, f"Anchor not found: {merkle_root}")
if anchor.get("confirmed_at"):
return {"status": "already_confirmed", "bitcoin_txid": anchor.get("bitcoin_txid")}
# Get current OTS proof from IPFS
ots_cid = anchor.get("ots_proof_cid")
if not ots_cid:
return {"status": "no_proof", "message": "No OTS proof stored"}
try:
ots_proof = await asyncio.to_thread(ipfs_client.get_bytes, ots_cid)
if not ots_proof:
return {"status": "error", "message": "Could not fetch OTS proof from IPFS"}
except Exception as e:
return {"status": "error", "message": f"IPFS error: {e}"}
# Try to upgrade
upgraded = await asyncio.to_thread(anchoring.upgrade_ots_proof, ots_proof)
if upgraded and len(upgraded) > len(ots_proof):
# Store upgraded proof on IPFS
try:
new_cid = await asyncio.to_thread(ipfs_client.add_bytes, upgraded)
# TODO: Update anchor record with new CID and confirmed status
return {
"status": "upgraded",
"message": "Proof upgraded - Bitcoin confirmation received",
"new_ots_cid": new_cid,
"proof_size": len(upgraded)
}
except Exception as e:
return {"status": "error", "message": f"Failed to store upgraded proof: {e}"}
else:
return {
"status": "pending",
"message": "Not yet confirmed on Bitcoin. Try again in ~1 hour.",
"proof_size": len(ots_proof) if ots_proof else 0
}
@app.get("/anchors/ui", response_class=HTMLResponse)
async def anchors_ui(request: Request):
"""Anchors UI page - view and test OpenTimestamps anchoring."""
username = get_user_from_cookie(request)
anchors = await db.get_all_anchors()
stats = await db.get_anchor_stats()
# Build anchors table rows
rows = ""
for anchor in anchors:
status = "confirmed" if anchor.get("confirmed_at") else "pending"
status_class = "text-green-400" if status == "confirmed" else "text-yellow-400"
merkle_root = anchor.get("merkle_root", "")[:16] + "..."
rows += f'''
<tr class="border-b border-dark-500 hover:bg-dark-600">
<td class="py-3 px-2 font-mono text-sm">{merkle_root}</td>
<td class="py-3 px-2">{anchor.get("activity_count", 0)}</td>
<td class="py-3 px-2 {status_class}">{status}</td>
<td class="py-3 px-2 text-sm">{format_date(anchor.get("created_at"), 16)}</td>
<td class="py-3 px-2">
<button
hx-post="/anchors/{anchor.get("merkle_root")}/upgrade"
hx-target="#upgrade-result-{anchor.get("merkle_root")[:8]}"
hx-swap="innerHTML"
class="px-2 py-1 bg-blue-600 hover:bg-blue-700 rounded text-sm">
Check Status
</button>
<span id="upgrade-result-{anchor.get("merkle_root")[:8]}" class="ml-2 text-sm"></span>
</td>
</tr>
'''
if not rows:
rows = '<tr><td colspan="5" class="py-8 text-center text-gray-500">No anchors yet</td></tr>'
content = f'''
<script src="https://unpkg.com/htmx.org@1.9.10"></script>
<h2 class="text-xl font-semibold mb-4">Bitcoin Anchoring via OpenTimestamps</h2>
<div class="grid grid-cols-1 md:grid-cols-3 gap-4 mb-6">
<div class="bg-dark-600 rounded-lg p-4">
<div class="text-2xl font-bold text-blue-400">{stats.get("total_anchors", 0)}</div>
<div class="text-gray-400 text-sm">Total Anchors</div>
</div>
<div class="bg-dark-600 rounded-lg p-4">
<div class="text-2xl font-bold text-green-400">{stats.get("confirmed_anchors", 0)}</div>
<div class="text-gray-400 text-sm">Confirmed</div>
</div>
<div class="bg-dark-600 rounded-lg p-4">
<div class="text-2xl font-bold text-yellow-400">{stats.get("pending_anchors", 0)}</div>
<div class="text-gray-400 text-sm">Pending</div>
</div>
</div>
<div class="mb-6 p-4 bg-dark-600 rounded-lg">
<h3 class="font-semibold mb-2">Test Anchoring</h3>
<p class="text-sm text-gray-400 mb-3">Create a test anchor for unanchored activities, or test the OTS connection.</p>
<div class="flex gap-4">
<button
hx-post="/anchors/create"
hx-target="#test-result"
hx-swap="innerHTML"
class="px-4 py-2 bg-green-600 hover:bg-green-700 rounded">
Create Anchor
</button>
<button
hx-post="/anchors/test-ots"
hx-target="#test-result"
hx-swap="innerHTML"
class="px-4 py-2 bg-purple-600 hover:bg-purple-700 rounded">
Test OTS Connection
</button>
</div>
<div id="test-result" class="mt-4 text-sm"></div>
</div>
<h3 class="font-semibold mb-3">Anchors</h3>
<div class="overflow-x-auto">
<table class="w-full text-sm">
<thead>
<tr class="text-left text-gray-400 border-b border-dark-500">
<th class="py-2 px-2">Merkle Root</th>
<th class="py-2 px-2">Activities</th>
<th class="py-2 px-2">Status</th>
<th class="py-2 px-2">Created</th>
<th class="py-2 px-2">Actions</th>
</tr>
</thead>
<tbody>
{rows}
</tbody>
</table>
</div>
<div class="mt-6 p-4 bg-dark-800 rounded-lg text-sm text-gray-400">
<h4 class="font-semibold text-gray-300 mb-2">How it works:</h4>
<ol class="list-decimal list-inside space-y-1">
<li>Activities are batched and hashed into a merkle tree</li>
<li>The merkle root is submitted to OpenTimestamps</li>
<li>OTS aggregates hashes and anchors to Bitcoin (~1-2 hours)</li>
<li>Once confirmed, anyone can verify the timestamp</li>
</ol>
</div>
'''
return HTMLResponse(base_html("Anchors", content, username))
@app.post("/anchors/test-ots", response_class=HTMLResponse)
async def test_ots_connection():
"""Test OpenTimestamps connection by submitting a test hash."""
import anchoring
import hashlib
import asyncio
# Create a test hash
test_data = f"test-{datetime.now(timezone.utc).isoformat()}"
test_hash = hashlib.sha256(test_data.encode()).hexdigest()
# Try to submit
try:
ots_proof = await asyncio.to_thread(anchoring.submit_to_opentimestamps, test_hash)
if ots_proof:
return HTMLResponse(f'''
<div class="p-3 bg-green-900/50 border border-green-700 rounded text-green-300">
<strong>Success!</strong> OpenTimestamps is working.<br>
<span class="text-sm">Test hash: {test_hash[:32]}...</span><br>
<span class="text-sm">Proof size: {len(ots_proof)} bytes</span>
</div>
''')
else:
return HTMLResponse('''
<div class="p-3 bg-red-900/50 border border-red-700 rounded text-red-300">
<strong>Failed!</strong> Could not reach OpenTimestamps servers.
</div>
''')
except Exception as e:
return HTMLResponse(f'''
<div class="p-3 bg-red-900/50 border border-red-700 rounded text-red-300">
<strong>Error:</strong> {str(e)}
</div>
''')
# ============ Renderers (L1 servers) ============
@app.get("/renderers", response_class=HTMLResponse)
async def renderers_page(request: Request):
"""Page to manage L1 renderer attachments."""
username = get_user_from_cookie(request)
if not username:
content = '''
<h2 class="text-xl font-semibold text-white mb-4">Renderers</h2>
<p class="text-gray-400">Log in to manage your renderer connections.</p>
'''
return HTMLResponse(base_html("Renderers", content))
# Get user's attached renderers
attached = await db.get_user_renderers(username)
token = request.cookies.get("auth_token", "")
# Build renderer list
rows = []
for l1_url in L1_SERVERS:
is_attached = l1_url in attached
# Extract display name from URL
display_name = l1_url.replace("https://", "").replace("http://", "")
if is_attached:
status = '<span class="px-2 py-1 bg-green-600 text-white text-xs font-medium rounded-full">Attached</span>'
action = f'''
<a href="{l1_url}" target="_blank" class="px-3 py-1 bg-blue-600 hover:bg-blue-700 text-white text-sm rounded transition-colors">
Open
</a>
<button hx-post="/renderers/detach" hx-vals='{{"l1_url": "{l1_url}"}}' hx-target="#renderer-{l1_url.replace("://", "-").replace("/", "-").replace(".", "-")}" hx-swap="outerHTML"
class="px-3 py-1 bg-gray-600 hover:bg-gray-700 text-white text-sm rounded transition-colors ml-2">
Detach
</button>
'''
else:
status = '<span class="px-2 py-1 bg-gray-600 text-gray-300 text-xs font-medium rounded-full">Not attached</span>'
# Attach redirects to L1 with token
attach_url = f"{l1_url}/auth?auth_token={token}"
action = f'''
<a href="{attach_url}" class="px-3 py-1 bg-green-600 hover:bg-green-700 text-white text-sm rounded transition-colors"
onclick="setTimeout(() => location.reload(), 2000)">
Attach
</a>
'''
row_id = l1_url.replace("://", "-").replace("/", "-").replace(".", "-")
rows.append(f'''
<div id="renderer-{row_id}" class="flex items-center justify-between p-4 bg-dark-600 rounded-lg">
<div>
<div class="font-medium text-white">{display_name}</div>
<div class="text-sm text-gray-400">{l1_url}</div>
</div>
<div class="flex items-center gap-3">
{status}
{action}
</div>
</div>
''')
content = f'''
<h2 class="text-xl font-semibold text-white mb-4">Renderers</h2>
<p class="text-gray-400 mb-6">Connect to L1 rendering servers. After attaching, you can run effects and manage media on that renderer.</p>
<div class="space-y-3">
{"".join(rows) if rows else '<p class="text-gray-500">No renderers configured.</p>'}
</div>
'''
return HTMLResponse(base_html("Renderers", content, username))
@app.post("/renderers/detach", response_class=HTMLResponse)
async def detach_renderer(request: Request):
"""Detach from an L1 renderer."""
username = get_user_from_cookie(request)
if not username:
return HTMLResponse('<div class="text-red-400">Not logged in</div>')
form = await request.form()
l1_url = form.get("l1_url", "")
await db.detach_renderer(username, l1_url)
# Return updated row
display_name = l1_url.replace("https://", "").replace("http://", "")
token = request.cookies.get("auth_token", "")
attach_url = f"{l1_url}/auth?auth_token={token}"
row_id = l1_url.replace("://", "-").replace("/", "-").replace(".", "-")
return HTMLResponse(f'''
<div id="renderer-{row_id}" class="flex items-center justify-between p-4 bg-dark-600 rounded-lg">
<div>
<div class="font-medium text-white">{display_name}</div>
<div class="text-sm text-gray-400">{l1_url}</div>
</div>
<div class="flex items-center gap-3">
<span class="px-2 py-1 bg-gray-600 text-gray-300 text-xs font-medium rounded-full">Not attached</span>
<a href="{attach_url}" class="px-3 py-1 bg-green-600 hover:bg-green-700 text-white text-sm rounded transition-colors"
onclick="setTimeout(() => location.reload(), 2000)">
Attach
</a>
</div>
</div>
''')
# ============ Client Download ============
CLIENT_TARBALL = Path(__file__).parent / "artdag-client.tar.gz"
@app.get("/download/client")
async def download_client():
"""Download the Art DAG CLI client."""
if not CLIENT_TARBALL.exists():
raise HTTPException(404, "Client package not found")
return FileResponse(
CLIENT_TARBALL,
media_type="application/gzip",
filename="artdag-client.tar.gz"
)
if __name__ == "__main__":
import uvicorn
uvicorn.run("server:app", host="0.0.0.0", port=8200, workers=4)