Files
activity-pub/server.py
gilesb 4155427f03 feat: multi-actor ActivityPub support
Each registered user now has their own ActivityPub actor:
- Generate RSA keys per user on registration
- Webfinger resolves any registered user (@user@domain)
- Actor endpoints work for any registered user
- Each user has their own outbox (filtered activities)
- Activities signed with the publishing user's keys
- Objects attributed to the asset owner

Removed:
- ARTDAG_USER config (no longer single-actor)
- L1_SERVER config (comes with each request)

Added:
- /ui/users page listing all registered users
- user_exists() helper function

🤖 Generated with [Claude Code](https://claude.com/claude-code)

Co-Authored-By: Claude Opus 4.5 <noreply@anthropic.com>
2026-01-07 19:54:11 +00:00

1265 lines
38 KiB
Python

#!/usr/bin/env python3
"""
Art DAG L2 Server - ActivityPub
Manages ownership registry, activities, and federation.
- Registry of owned assets
- ActivityPub actor endpoints
- Sign and publish Create activities
- Federation with other servers
"""
import hashlib
import json
import os
import uuid
from datetime import datetime, timezone
from pathlib import Path
from typing import Optional
from urllib.parse import urlparse
from fastapi import FastAPI, HTTPException, Request, Response, Depends, Cookie
from fastapi.responses import JSONResponse, HTMLResponse
from fastapi.security import HTTPBearer, HTTPAuthorizationCredentials
from pydantic import BaseModel
import requests
import markdown
from auth import (
UserCreate, UserLogin, Token, User,
create_user, authenticate_user, create_access_token,
verify_token, get_current_user, load_users
)
# Configuration
DOMAIN = os.environ.get("ARTDAG_DOMAIN", "artdag.rose-ash.com")
DATA_DIR = Path(os.environ.get("ARTDAG_DATA", str(Path.home() / ".artdag" / "l2")))
# Note: L1_SERVER is no longer needed - L1 URL comes with each request
# Ensure data directory exists
DATA_DIR.mkdir(parents=True, exist_ok=True)
(DATA_DIR / "assets").mkdir(exist_ok=True)
# Load README
README_PATH = Path(__file__).parent / "README.md"
README_CONTENT = ""
if README_PATH.exists():
README_CONTENT = README_PATH.read_text()
app = FastAPI(
title="Art DAG L2 Server",
description="ActivityPub server for Art DAG ownership and federation",
version="0.1.0"
)
# ============ Data Models ============
class Asset(BaseModel):
"""An owned asset."""
name: str
content_hash: str
asset_type: str # image, video, effect, recipe, infrastructure
tags: list[str] = []
metadata: dict = {}
url: Optional[str] = None
provenance: Optional[dict] = None
created_at: str = ""
class Activity(BaseModel):
"""An ActivityPub activity."""
activity_id: str
activity_type: str # Create, Update, Delete, Announce
actor_id: str
object_data: dict
published: str
signature: Optional[dict] = None
class RegisterRequest(BaseModel):
"""Request to register an asset."""
name: str
content_hash: str
asset_type: str
tags: list[str] = []
metadata: dict = {}
url: Optional[str] = None
provenance: Optional[dict] = None
class RecordRunRequest(BaseModel):
"""Request to record an L1 run."""
run_id: str
output_name: str
l1_server: str # URL of the L1 server that has this run
class PublishCacheRequest(BaseModel):
"""Request to publish a cache item from L1."""
content_hash: str
asset_name: str
asset_type: str = "image"
origin: dict # {type: "self"|"external", url?: str, note?: str}
description: Optional[str] = None
tags: list[str] = []
metadata: dict = {}
class UpdateAssetRequest(BaseModel):
"""Request to update an existing asset."""
description: Optional[str] = None
tags: Optional[list[str]] = None
metadata: Optional[dict] = None
origin: Optional[dict] = None
# ============ Storage ============
def load_registry() -> dict:
"""Load registry from disk."""
path = DATA_DIR / "registry.json"
if path.exists():
with open(path) as f:
return json.load(f)
return {"version": "1.0", "assets": {}}
def save_registry(registry: dict):
"""Save registry to disk."""
path = DATA_DIR / "registry.json"
with open(path, "w") as f:
json.dump(registry, f, indent=2)
def load_activities() -> list:
"""Load activities from disk."""
path = DATA_DIR / "activities.json"
if path.exists():
with open(path) as f:
data = json.load(f)
return data.get("activities", [])
return []
def save_activities(activities: list):
"""Save activities to disk."""
path = DATA_DIR / "activities.json"
with open(path, "w") as f:
json.dump({"version": "1.0", "activities": activities}, f, indent=2)
def load_actor(username: str) -> dict:
"""Load actor data for a specific user with public key if available."""
actor = {
"id": f"https://{DOMAIN}/users/{username}",
"type": "Person",
"preferredUsername": username,
"name": username,
"inbox": f"https://{DOMAIN}/users/{username}/inbox",
"outbox": f"https://{DOMAIN}/users/{username}/outbox",
"followers": f"https://{DOMAIN}/users/{username}/followers",
"following": f"https://{DOMAIN}/users/{username}/following",
}
# Add public key if available
from keys import has_keys, load_public_key_pem
if has_keys(DATA_DIR, username):
actor["publicKey"] = {
"id": f"https://{DOMAIN}/users/{username}#main-key",
"owner": f"https://{DOMAIN}/users/{username}",
"publicKeyPem": load_public_key_pem(DATA_DIR, username)
}
return actor
def user_exists(username: str) -> bool:
"""Check if a user exists."""
users = load_users(DATA_DIR)
return username in users
def load_followers() -> list:
"""Load followers list."""
path = DATA_DIR / "followers.json"
if path.exists():
with open(path) as f:
return json.load(f)
return []
def save_followers(followers: list):
"""Save followers list."""
path = DATA_DIR / "followers.json"
with open(path, "w") as f:
json.dump(followers, f, indent=2)
# ============ Signing ============
from keys import has_keys, load_public_key_pem, create_signature
def sign_activity(activity: dict, username: str) -> dict:
"""Sign an activity with the user's RSA private key."""
if not has_keys(DATA_DIR, username):
# No keys - use placeholder (for testing)
activity["signature"] = {
"type": "RsaSignature2017",
"creator": f"https://{DOMAIN}/users/{username}#main-key",
"created": datetime.now(timezone.utc).isoformat(),
"signatureValue": "NO_KEYS_CONFIGURED"
}
else:
activity["signature"] = create_signature(DATA_DIR, username, DOMAIN, activity)
return activity
# ============ HTML Templates ============
def base_html(title: str, content: str, username: str = None) -> str:
"""Base HTML template."""
user_section = f'''
<div class="user-info">
Logged in as <strong>{username}</strong>
<button hx-post="/ui/logout" hx-swap="innerHTML" hx-target="body">Logout</button>
</div>
''' if username else '''
<div class="auth-links">
<a href="/ui/login">Login</a> | <a href="/ui/register">Register</a>
</div>
'''
return f'''<!DOCTYPE html>
<html lang="en">
<head>
<meta charset="UTF-8">
<meta name="viewport" content="width=device-width, initial-scale=1.0">
<title>{title} - Art DAG L2</title>
<script src="https://unpkg.com/htmx.org@1.9.10"></script>
<style>
* {{ box-sizing: border-box; }}
body {{
font-family: -apple-system, BlinkMacSystemFont, "Segoe UI", Roboto, sans-serif;
max-width: 900px;
margin: 0 auto;
padding: 20px;
background: #f5f5f5;
color: #333;
}}
header {{
display: flex;
justify-content: space-between;
align-items: center;
padding: 10px 0;
border-bottom: 2px solid #333;
margin-bottom: 20px;
flex-wrap: wrap;
gap: 10px;
}}
header h1 {{
margin: 0;
font-size: 1.5rem;
}}
header h1 a {{
color: inherit;
text-decoration: none;
}}
.user-info, .auth-links {{
font-size: 0.9rem;
}}
.auth-links a {{
color: #0066cc;
}}
.user-info button {{
margin-left: 10px;
padding: 5px 10px;
cursor: pointer;
}}
nav {{
margin-bottom: 20px;
padding: 10px 0;
border-bottom: 1px solid #ddd;
}}
nav a {{
margin-right: 15px;
color: #0066cc;
text-decoration: none;
}}
nav a:hover {{
text-decoration: underline;
}}
main {{
background: white;
padding: 20px;
border-radius: 5px;
box-shadow: 0 1px 3px rgba(0,0,0,0.1);
}}
.form-group {{
margin-bottom: 15px;
}}
.form-group label {{
display: block;
margin-bottom: 5px;
font-weight: bold;
}}
.form-group input {{
width: 100%;
max-width: 300px;
padding: 8px;
border: 1px solid #ccc;
border-radius: 4px;
font-size: 1rem;
}}
button[type="submit"], .btn {{
background: #0066cc;
color: white;
padding: 10px 20px;
border: none;
border-radius: 4px;
cursor: pointer;
font-size: 1rem;
}}
button[type="submit"]:hover, .btn:hover {{
background: #0055aa;
}}
.error {{
color: #cc0000;
padding: 10px;
background: #ffeeee;
border-radius: 4px;
margin-bottom: 15px;
}}
.success {{
color: #006600;
padding: 10px;
background: #eeffee;
border-radius: 4px;
margin-bottom: 15px;
}}
/* README styling */
.readme h1 {{ font-size: 1.8rem; margin-top: 0; }}
.readme h2 {{ font-size: 1.4rem; margin-top: 1.5rem; border-bottom: 1px solid #eee; padding-bottom: 5px; }}
.readme h3 {{ font-size: 1.1rem; }}
.readme pre {{
background: #f4f4f4;
padding: 10px;
border-radius: 4px;
overflow-x: auto;
}}
.readme code {{
background: #f4f4f4;
padding: 2px 5px;
border-radius: 3px;
font-family: monospace;
}}
.readme pre code {{
background: none;
padding: 0;
}}
.readme table {{
border-collapse: collapse;
width: 100%;
margin: 1rem 0;
}}
.readme th, .readme td {{
border: 1px solid #ddd;
padding: 8px;
text-align: left;
}}
.readme th {{
background: #f4f4f4;
}}
.stats {{
display: grid;
grid-template-columns: repeat(auto-fit, minmax(150px, 1fr));
gap: 15px;
margin-bottom: 20px;
}}
.stat-box {{
background: #f9f9f9;
padding: 15px;
border-radius: 5px;
text-align: center;
}}
.stat-box .number {{
font-size: 2rem;
font-weight: bold;
color: #0066cc;
}}
.stat-box .label {{
color: #666;
font-size: 0.9rem;
}}
@media (max-width: 600px) {{
body {{ padding: 10px; }}
header {{ flex-direction: column; align-items: flex-start; }}
}}
</style>
</head>
<body>
<header>
<h1><a href="/ui">Art DAG L2</a></h1>
{user_section}
</header>
<nav>
<a href="/ui">Home</a>
<a href="/ui/registry">Registry</a>
<a href="/ui/activities">Activities</a>
<a href="/ui/users">Users</a>
</nav>
<main>
{content}
</main>
</body>
</html>'''
def get_user_from_cookie(request: Request) -> Optional[str]:
"""Get username from auth cookie."""
token = request.cookies.get("auth_token")
if token:
return verify_token(token)
return None
# ============ UI Endpoints ============
@app.get("/ui", response_class=HTMLResponse)
async def ui_home(request: Request):
"""Home page with README and stats."""
username = get_user_from_cookie(request)
registry = load_registry()
activities = load_activities()
users = load_users(DATA_DIR)
readme_html = markdown.markdown(README_CONTENT, extensions=['tables', 'fenced_code'])
content = f'''
<div class="stats">
<div class="stat-box">
<div class="number">{len(registry.get("assets", {}))}</div>
<div class="label">Assets</div>
</div>
<div class="stat-box">
<div class="number">{len(activities)}</div>
<div class="label">Activities</div>
</div>
<div class="stat-box">
<div class="number">{len(users)}</div>
<div class="label">Users</div>
</div>
</div>
<div class="readme">
{readme_html}
</div>
'''
return HTMLResponse(base_html("Home", content, username))
@app.get("/ui/login", response_class=HTMLResponse)
async def ui_login_page(request: Request):
"""Login page."""
username = get_user_from_cookie(request)
if username:
return HTMLResponse(base_html("Already Logged In", f'''
<div class="success">You are already logged in as <strong>{username}</strong></div>
<p><a href="/ui">Go to home page</a></p>
''', username))
content = '''
<h2>Login</h2>
<div id="login-result"></div>
<form hx-post="/ui/login" hx-target="#login-result" hx-swap="innerHTML">
<div class="form-group">
<label for="username">Username</label>
<input type="text" id="username" name="username" required>
</div>
<div class="form-group">
<label for="password">Password</label>
<input type="password" id="password" name="password" required>
</div>
<button type="submit">Login</button>
</form>
<p style="margin-top: 20px;">Don't have an account? <a href="/ui/register">Register</a></p>
'''
return HTMLResponse(base_html("Login", content))
@app.post("/ui/login", response_class=HTMLResponse)
async def ui_login_submit(request: Request):
"""Handle login form submission."""
form = await request.form()
username = form.get("username", "").strip()
password = form.get("password", "")
if not username or not password:
return HTMLResponse('<div class="error">Username and password are required</div>')
user = authenticate_user(DATA_DIR, username, password)
if not user:
return HTMLResponse('<div class="error">Invalid username or password</div>')
token = create_access_token(user.username)
response = HTMLResponse(f'''
<div class="success">Login successful! Redirecting...</div>
<script>window.location.href = "/ui";</script>
''')
response.set_cookie(
key="auth_token",
value=token.access_token,
httponly=True,
max_age=60 * 60 * 24 * 30, # 30 days
samesite="lax"
)
return response
@app.get("/ui/register", response_class=HTMLResponse)
async def ui_register_page(request: Request):
"""Register page."""
username = get_user_from_cookie(request)
if username:
return HTMLResponse(base_html("Already Logged In", f'''
<div class="success">You are already logged in as <strong>{username}</strong></div>
<p><a href="/ui">Go to home page</a></p>
''', username))
content = '''
<h2>Register</h2>
<div id="register-result"></div>
<form hx-post="/ui/register" hx-target="#register-result" hx-swap="innerHTML">
<div class="form-group">
<label for="username">Username</label>
<input type="text" id="username" name="username" required pattern="[a-zA-Z0-9_-]+" title="Letters, numbers, underscore, dash only">
</div>
<div class="form-group">
<label for="email">Email (optional)</label>
<input type="email" id="email" name="email">
</div>
<div class="form-group">
<label for="password">Password</label>
<input type="password" id="password" name="password" required minlength="6">
</div>
<div class="form-group">
<label for="password2">Confirm Password</label>
<input type="password" id="password2" name="password2" required minlength="6">
</div>
<button type="submit">Register</button>
</form>
<p style="margin-top: 20px;">Already have an account? <a href="/ui/login">Login</a></p>
'''
return HTMLResponse(base_html("Register", content))
@app.post("/ui/register", response_class=HTMLResponse)
async def ui_register_submit(request: Request):
"""Handle register form submission."""
form = await request.form()
username = form.get("username", "").strip()
email = form.get("email", "").strip() or None
password = form.get("password", "")
password2 = form.get("password2", "")
if not username or not password:
return HTMLResponse('<div class="error">Username and password are required</div>')
if password != password2:
return HTMLResponse('<div class="error">Passwords do not match</div>')
if len(password) < 6:
return HTMLResponse('<div class="error">Password must be at least 6 characters</div>')
try:
user = create_user(DATA_DIR, username, password, email)
except ValueError as e:
return HTMLResponse(f'<div class="error">{str(e)}</div>')
token = create_access_token(user.username)
response = HTMLResponse(f'''
<div class="success">Registration successful! Redirecting...</div>
<script>window.location.href = "/ui";</script>
''')
response.set_cookie(
key="auth_token",
value=token.access_token,
httponly=True,
max_age=60 * 60 * 24 * 30, # 30 days
samesite="lax"
)
return response
@app.post("/ui/logout", response_class=HTMLResponse)
async def ui_logout():
"""Handle logout."""
response = HTMLResponse('''
<script>window.location.href = "/ui";</script>
''')
response.delete_cookie("auth_token")
return response
@app.get("/ui/registry", response_class=HTMLResponse)
async def ui_registry_page(request: Request):
"""Registry page showing all assets."""
username = get_user_from_cookie(request)
registry = load_registry()
assets = registry.get("assets", {})
if not assets:
content = '<h2>Registry</h2><p>No assets registered yet.</p>'
else:
rows = ""
for name, asset in sorted(assets.items(), key=lambda x: x[1].get("created_at", ""), reverse=True):
hash_short = asset.get("content_hash", "")[:16] + "..."
rows += f'''
<tr>
<td><strong>{name}</strong></td>
<td>{asset.get("asset_type", "")}</td>
<td><code>{hash_short}</code></td>
<td>{", ".join(asset.get("tags", []))}</td>
</tr>
'''
content = f'''
<h2>Registry ({len(assets)} assets)</h2>
<table>
<thead>
<tr>
<th>Name</th>
<th>Type</th>
<th>Content Hash</th>
<th>Tags</th>
</tr>
</thead>
<tbody>
{rows}
</tbody>
</table>
'''
return HTMLResponse(base_html("Registry", content, username))
@app.get("/ui/activities", response_class=HTMLResponse)
async def ui_activities_page(request: Request):
"""Activities page showing all signed activities."""
username = get_user_from_cookie(request)
activities = load_activities()
if not activities:
content = '<h2>Activities</h2><p>No activities yet.</p>'
else:
rows = ""
for activity in reversed(activities):
obj = activity.get("object_data", {})
rows += f'''
<tr>
<td>{activity.get("activity_type", "")}</td>
<td>{obj.get("name", "")}</td>
<td><code>{obj.get("contentHash", {}).get("value", "")[:16]}...</code></td>
<td>{activity.get("published", "")[:10]}</td>
</tr>
'''
content = f'''
<h2>Activities ({len(activities)} total)</h2>
<table>
<thead>
<tr>
<th>Type</th>
<th>Object</th>
<th>Content Hash</th>
<th>Published</th>
</tr>
</thead>
<tbody>
{rows}
</tbody>
</table>
'''
return HTMLResponse(base_html("Activities", content, username))
@app.get("/ui/users", response_class=HTMLResponse)
async def ui_users_page(request: Request):
"""Users page showing all registered users."""
current_user = get_user_from_cookie(request)
users = load_users(DATA_DIR)
if not users:
content = '<h2>Users</h2><p>No users registered yet.</p>'
else:
rows = ""
for username, user_data in sorted(users.items()):
actor_url = f"https://{DOMAIN}/users/{username}"
webfinger = f"@{username}@{DOMAIN}"
rows += f'''
<tr>
<td><a href="{actor_url}" target="_blank"><strong>{username}</strong></a></td>
<td><code>{webfinger}</code></td>
<td>{user_data.get("created_at", "")[:10]}</td>
</tr>
'''
content = f'''
<h2>Users ({len(users)} registered)</h2>
<p>Each user has their own ActivityPub actor that can be followed from Mastodon and other federated platforms.</p>
<table>
<thead>
<tr>
<th>Username</th>
<th>ActivityPub Handle</th>
<th>Registered</th>
</tr>
</thead>
<tbody>
{rows}
</tbody>
</table>
'''
return HTMLResponse(base_html("Users", content, current_user))
# ============ API Endpoints ============
@app.get("/")
async def root(request: Request):
"""Server info or redirect to UI."""
# If browser, redirect to UI
accept = request.headers.get("accept", "")
if "text/html" in accept and "application/json" not in accept:
return HTMLResponse(status_code=302, headers={"Location": "/ui"})
registry = load_registry()
activities = load_activities()
users = load_users(DATA_DIR)
return {
"name": "Art DAG L2 Server",
"version": "0.1.0",
"domain": DOMAIN,
"assets_count": len(registry.get("assets", {})),
"activities_count": len(activities),
"users_count": len(users)
}
# ============ Auth Endpoints ============
security = HTTPBearer(auto_error=False)
async def get_optional_user(
credentials: HTTPAuthorizationCredentials = Depends(security)
) -> Optional[User]:
"""Get current user if authenticated, None otherwise."""
if not credentials:
return None
return get_current_user(DATA_DIR, credentials.credentials)
async def get_required_user(
credentials: HTTPAuthorizationCredentials = Depends(security)
) -> User:
"""Get current user, raise 401 if not authenticated."""
if not credentials:
raise HTTPException(401, "Not authenticated")
user = get_current_user(DATA_DIR, credentials.credentials)
if not user:
raise HTTPException(401, "Invalid token")
return user
@app.post("/auth/register", response_model=Token)
async def register(req: UserCreate):
"""Register a new user."""
try:
user = create_user(DATA_DIR, req.username, req.password, req.email)
except ValueError as e:
raise HTTPException(400, str(e))
return create_access_token(user.username)
@app.post("/auth/login", response_model=Token)
async def login(req: UserLogin):
"""Login and get access token."""
user = authenticate_user(DATA_DIR, req.username, req.password)
if not user:
raise HTTPException(401, "Invalid username or password")
return create_access_token(user.username)
@app.get("/auth/me")
async def get_me(user: User = Depends(get_required_user)):
"""Get current user info."""
return {
"username": user.username,
"email": user.email,
"created_at": user.created_at
}
@app.post("/auth/verify")
async def verify_auth(credentials: HTTPAuthorizationCredentials = Depends(security)):
"""Verify a token and return username. Used by L1 server."""
if not credentials:
raise HTTPException(401, "No token provided")
username = verify_token(credentials.credentials)
if not username:
raise HTTPException(401, "Invalid token")
return {"username": username, "valid": True}
@app.get("/.well-known/webfinger")
async def webfinger(resource: str):
"""WebFinger endpoint for actor discovery."""
# Parse acct:username@domain
if not resource.startswith("acct:"):
raise HTTPException(400, "Resource must be acct: URI")
acct = resource[5:] # Remove "acct:"
if "@" not in acct:
raise HTTPException(400, "Invalid acct format")
username, domain = acct.split("@", 1)
if domain != DOMAIN:
raise HTTPException(404, f"Unknown domain: {domain}")
if not user_exists(username):
raise HTTPException(404, f"Unknown user: {username}")
return JSONResponse(
content={
"subject": resource,
"links": [
{
"rel": "self",
"type": "application/activity+json",
"href": f"https://{DOMAIN}/users/{username}"
}
]
},
media_type="application/jrd+json"
)
@app.get("/users/{username}")
async def get_actor(username: str, request: Request):
"""Get actor profile for any registered user."""
if not user_exists(username):
raise HTTPException(404, f"Unknown user: {username}")
actor = load_actor(username)
# Add ActivityPub context
actor["@context"] = [
"https://www.w3.org/ns/activitystreams",
"https://w3id.org/security/v1"
]
return JSONResponse(
content=actor,
media_type="application/activity+json"
)
@app.get("/users/{username}/outbox")
async def get_outbox(username: str, page: bool = False):
"""Get actor's outbox (activities they created)."""
if not user_exists(username):
raise HTTPException(404, f"Unknown user: {username}")
# Filter activities by this user's actor_id
all_activities = load_activities()
actor_id = f"https://{DOMAIN}/users/{username}"
user_activities = [a for a in all_activities if a.get("actor_id") == actor_id]
if not page:
return JSONResponse(
content={
"@context": "https://www.w3.org/ns/activitystreams",
"id": f"https://{DOMAIN}/users/{username}/outbox",
"type": "OrderedCollection",
"totalItems": len(user_activities),
"first": f"https://{DOMAIN}/users/{username}/outbox?page=true"
},
media_type="application/activity+json"
)
# Return activities page
return JSONResponse(
content={
"@context": "https://www.w3.org/ns/activitystreams",
"id": f"https://{DOMAIN}/users/{username}/outbox?page=true",
"type": "OrderedCollectionPage",
"partOf": f"https://{DOMAIN}/users/{username}/outbox",
"orderedItems": user_activities
},
media_type="application/activity+json"
)
@app.post("/users/{username}/inbox")
async def post_inbox(username: str, request: Request):
"""Receive activities from other servers."""
if not user_exists(username):
raise HTTPException(404, f"Unknown user: {username}")
body = await request.json()
activity_type = body.get("type")
# Handle Follow requests
if activity_type == "Follow":
follower = body.get("actor")
# TODO: Per-user followers - for now use global followers
followers = load_followers()
if follower not in followers:
followers.append(follower)
save_followers(followers)
# Send Accept (in production, do this async)
# For now just acknowledge
return {"status": "accepted"}
# Handle other activity types
return {"status": "received"}
@app.get("/users/{username}/followers")
async def get_followers(username: str):
"""Get actor's followers."""
if not user_exists(username):
raise HTTPException(404, f"Unknown user: {username}")
# TODO: Per-user followers - for now use global followers
followers = load_followers()
return JSONResponse(
content={
"@context": "https://www.w3.org/ns/activitystreams",
"id": f"https://{DOMAIN}/users/{username}/followers",
"type": "OrderedCollection",
"totalItems": len(followers),
"orderedItems": followers
},
media_type="application/activity+json"
)
# ============ Registry Endpoints ============
@app.get("/registry")
async def get_registry():
"""Get full registry."""
return load_registry()
@app.get("/registry/{name}")
async def get_asset(name: str):
"""Get a specific asset."""
registry = load_registry()
if name not in registry.get("assets", {}):
raise HTTPException(404, f"Asset not found: {name}")
return registry["assets"][name]
@app.patch("/registry/{name}")
async def update_asset(name: str, req: UpdateAssetRequest, user: User = Depends(get_required_user)):
"""Update an existing asset's metadata. Creates an Update activity."""
registry = load_registry()
if name not in registry.get("assets", {}):
raise HTTPException(404, f"Asset not found: {name}")
asset = registry["assets"][name]
# Check ownership
if asset.get("owner") != user.username:
raise HTTPException(403, f"Not authorized to update asset owned by {asset.get('owner')}")
# Update fields that were provided
if req.description is not None:
asset["description"] = req.description
if req.tags is not None:
asset["tags"] = req.tags
if req.metadata is not None:
asset["metadata"] = {**asset.get("metadata", {}), **req.metadata}
if req.origin is not None:
asset["origin"] = req.origin
asset["updated_at"] = datetime.now(timezone.utc).isoformat()
# Save registry
registry["assets"][name] = asset
save_registry(registry)
# Create Update activity
activity = {
"activity_id": str(uuid.uuid4()),
"activity_type": "Update",
"actor_id": f"https://{DOMAIN}/users/{user.username}",
"object_data": {
"type": asset.get("asset_type", "Object").capitalize(),
"name": name,
"id": f"https://{DOMAIN}/objects/{asset['content_hash']}",
"contentHash": {
"algorithm": "sha3-256",
"value": asset["content_hash"]
},
"attributedTo": f"https://{DOMAIN}/users/{user.username}",
"summary": req.description,
"tag": req.tags or asset.get("tags", [])
},
"published": asset["updated_at"]
}
# Sign activity with the user's keys
activity = sign_activity(activity, user.username)
# Save activity
activities = load_activities()
activities.append(activity)
save_activities(activities)
return {"asset": asset, "activity": activity}
def _register_asset_impl(req: RegisterRequest, owner: str):
"""Internal implementation for registering an asset."""
registry = load_registry()
# Check if name exists
if req.name in registry.get("assets", {}):
raise HTTPException(400, f"Asset already exists: {req.name}")
# Create asset
now = datetime.now(timezone.utc).isoformat()
asset = {
"name": req.name,
"content_hash": req.content_hash,
"asset_type": req.asset_type,
"tags": req.tags,
"metadata": req.metadata,
"url": req.url,
"provenance": req.provenance,
"owner": owner,
"created_at": now
}
# Add to registry
if "assets" not in registry:
registry["assets"] = {}
registry["assets"][req.name] = asset
save_registry(registry)
# Create ownership activity
activity = {
"activity_id": str(uuid.uuid4()),
"activity_type": "Create",
"actor_id": f"https://{DOMAIN}/users/{owner}",
"object_data": {
"type": req.asset_type.capitalize(),
"name": req.name,
"id": f"https://{DOMAIN}/objects/{req.content_hash}",
"contentHash": {
"algorithm": "sha3-256",
"value": req.content_hash
},
"attributedTo": f"https://{DOMAIN}/users/{owner}"
},
"published": now
}
# Sign activity with the owner's keys
activity = sign_activity(activity, owner)
# Save activity
activities = load_activities()
activities.append(activity)
save_activities(activities)
return {"asset": asset, "activity": activity}
@app.post("/registry")
async def register_asset(req: RegisterRequest, user: User = Depends(get_required_user)):
"""Register a new asset and create ownership activity. Requires authentication."""
return _register_asset_impl(req, user.username)
@app.post("/registry/record-run")
async def record_run(req: RecordRunRequest, user: User = Depends(get_required_user)):
"""Record an L1 run and register the output. Requires authentication."""
# Fetch run from the specified L1 server
l1_url = req.l1_server.rstrip('/')
try:
resp = requests.get(f"{l1_url}/runs/{req.run_id}")
resp.raise_for_status()
run = resp.json()
except Exception as e:
raise HTTPException(400, f"Failed to fetch run from L1 ({l1_url}): {e}")
if run.get("status") != "completed":
raise HTTPException(400, f"Run not completed: {run.get('status')}")
output_hash = run.get("output_hash")
if not output_hash:
raise HTTPException(400, "Run has no output hash")
# Build provenance from run
provenance = {
"inputs": [{"content_hash": h} for h in run.get("inputs", [])],
"recipe": run.get("recipe"),
"l1_server": l1_url,
"l1_run_id": req.run_id,
"rendered_at": run.get("completed_at")
}
# Register the output under the authenticated user
return _register_asset_impl(RegisterRequest(
name=req.output_name,
content_hash=output_hash,
asset_type="video", # Could be smarter about this
tags=["rendered", "l1"],
metadata={"l1_server": l1_url, "l1_run_id": req.run_id},
provenance=provenance
), user.username)
@app.post("/registry/publish-cache")
async def publish_cache(req: PublishCacheRequest, user: User = Depends(get_required_user)):
"""
Publish a cache item from L1 with metadata.
Requires origin to be set (self or external URL).
Creates a new asset and Create activity.
"""
# Validate origin
if not req.origin or "type" not in req.origin:
raise HTTPException(400, "Origin is required for publishing (type: 'self' or 'external')")
origin_type = req.origin.get("type")
if origin_type not in ("self", "external"):
raise HTTPException(400, "Origin type must be 'self' or 'external'")
if origin_type == "external" and not req.origin.get("url"):
raise HTTPException(400, "External origin requires a URL")
# Check if asset name already exists
registry = load_registry()
if req.asset_name in registry.get("assets", {}):
raise HTTPException(400, f"Asset name already exists: {req.asset_name}")
# Create asset
now = datetime.now(timezone.utc).isoformat()
asset = {
"name": req.asset_name,
"content_hash": req.content_hash,
"asset_type": req.asset_type,
"tags": req.tags,
"description": req.description,
"origin": req.origin,
"metadata": req.metadata,
"owner": user.username,
"created_at": now
}
# Add to registry
if "assets" not in registry:
registry["assets"] = {}
registry["assets"][req.asset_name] = asset
save_registry(registry)
# Create ownership activity with origin info
object_data = {
"type": req.asset_type.capitalize(),
"name": req.asset_name,
"id": f"https://{DOMAIN}/objects/{req.content_hash}",
"contentHash": {
"algorithm": "sha3-256",
"value": req.content_hash
},
"attributedTo": f"https://{DOMAIN}/users/{user.username}",
"tag": req.tags
}
if req.description:
object_data["summary"] = req.description
# Include origin in ActivityPub object
if origin_type == "self":
object_data["generator"] = {
"type": "Application",
"name": "Art DAG",
"note": "Original content created by the author"
}
else:
object_data["source"] = {
"type": "Link",
"href": req.origin.get("url"),
"name": req.origin.get("note", "External source")
}
activity = {
"activity_id": str(uuid.uuid4()),
"activity_type": "Create",
"actor_id": f"https://{DOMAIN}/users/{user.username}",
"object_data": object_data,
"published": now
}
# Sign activity with the user's keys
activity = sign_activity(activity, user.username)
# Save activity
activities = load_activities()
activities.append(activity)
save_activities(activities)
return {"asset": asset, "activity": activity}
# ============ Activities Endpoints ============
@app.get("/activities")
async def get_activities():
"""Get all activities."""
return {"activities": load_activities()}
@app.get("/objects/{content_hash}")
async def get_object(content_hash: str):
"""Get object by content hash."""
registry = load_registry()
# Find asset by hash
for name, asset in registry.get("assets", {}).items():
if asset.get("content_hash") == content_hash:
owner = asset.get("owner", "unknown")
return JSONResponse(
content={
"@context": "https://www.w3.org/ns/activitystreams",
"id": f"https://{DOMAIN}/objects/{content_hash}",
"type": asset.get("asset_type", "Object").capitalize(),
"name": name,
"contentHash": {
"algorithm": "sha3-256",
"value": content_hash
},
"attributedTo": f"https://{DOMAIN}/users/{owner}",
"published": asset.get("created_at")
},
media_type="application/activity+json"
)
raise HTTPException(404, f"Object not found: {content_hash}")
if __name__ == "__main__":
import uvicorn
uvicorn.run(app, host="0.0.0.0", port=8200)