#!/usr/bin/env python3 """ Art DAG L2 Server - ActivityPub Manages ownership registry, activities, and federation. - Registry of owned assets - ActivityPub actor endpoints - Sign and publish Create activities - Federation with other servers """ import hashlib import json import os import uuid from datetime import datetime, timezone from pathlib import Path from typing import Optional from urllib.parse import urlparse from fastapi import FastAPI, HTTPException, Request, Response, Depends, Cookie from fastapi.responses import JSONResponse, HTMLResponse from fastapi.security import HTTPBearer, HTTPAuthorizationCredentials from pydantic import BaseModel import requests import markdown from auth import ( UserCreate, UserLogin, Token, User, create_user, authenticate_user, create_access_token, verify_token, get_current_user, load_users ) # Configuration DOMAIN = os.environ.get("ARTDAG_DOMAIN", "artdag.rose-ash.com") DATA_DIR = Path(os.environ.get("ARTDAG_DATA", str(Path.home() / ".artdag" / "l2"))) L1_PUBLIC_URL = os.environ.get("L1_PUBLIC_URL", "https://celery-artdag.rose-ash.com") EFFECTS_REPO_URL = os.environ.get("EFFECTS_REPO_URL", "https://git.rose-ash.com/art-dag/effects") # Ensure data directory exists DATA_DIR.mkdir(parents=True, exist_ok=True) (DATA_DIR / "assets").mkdir(exist_ok=True) # Load README README_PATH = Path(__file__).parent / "README.md" README_CONTENT = "" if README_PATH.exists(): README_CONTENT = README_PATH.read_text() app = FastAPI( title="Art DAG L2 Server", description="ActivityPub server for Art DAG ownership and federation", version="0.1.0" ) # ============ Data Models ============ class Asset(BaseModel): """An owned asset.""" name: str content_hash: str asset_type: str # image, video, effect, recipe, infrastructure tags: list[str] = [] metadata: dict = {} url: Optional[str] = None provenance: Optional[dict] = None created_at: str = "" class Activity(BaseModel): """An ActivityPub activity.""" activity_id: str activity_type: str # Create, Update, Delete, Announce actor_id: str object_data: dict published: str signature: Optional[dict] = None class RegisterRequest(BaseModel): """Request to register an asset.""" name: str content_hash: str asset_type: str tags: list[str] = [] metadata: dict = {} url: Optional[str] = None provenance: Optional[dict] = None class RecordRunRequest(BaseModel): """Request to record an L1 run.""" run_id: str output_name: str l1_server: str # URL of the L1 server that has this run class PublishCacheRequest(BaseModel): """Request to publish a cache item from L1.""" content_hash: str asset_name: str asset_type: str = "image" origin: dict # {type: "self"|"external", url?: str, note?: str} description: Optional[str] = None tags: list[str] = [] metadata: dict = {} class UpdateAssetRequest(BaseModel): """Request to update an existing asset.""" description: Optional[str] = None tags: Optional[list[str]] = None metadata: Optional[dict] = None origin: Optional[dict] = None # ============ Storage ============ def load_registry() -> dict: """Load registry from disk.""" path = DATA_DIR / "registry.json" if path.exists(): with open(path) as f: return json.load(f) return {"version": "1.0", "assets": {}} def save_registry(registry: dict): """Save registry to disk.""" path = DATA_DIR / "registry.json" with open(path, "w") as f: json.dump(registry, f, indent=2) def load_activities() -> list: """Load activities from disk.""" path = DATA_DIR / "activities.json" if path.exists(): with open(path) as f: data = json.load(f) return data.get("activities", []) return [] def save_activities(activities: list): """Save activities to disk.""" path = DATA_DIR / "activities.json" with open(path, "w") as f: json.dump({"version": "1.0", "activities": activities}, f, indent=2) def load_actor(username: str) -> dict: """Load actor data for a specific user with public key if available.""" actor = { "id": f"https://{DOMAIN}/users/{username}", "type": "Person", "preferredUsername": username, "name": username, "inbox": f"https://{DOMAIN}/users/{username}/inbox", "outbox": f"https://{DOMAIN}/users/{username}/outbox", "followers": f"https://{DOMAIN}/users/{username}/followers", "following": f"https://{DOMAIN}/users/{username}/following", } # Add public key if available from keys import has_keys, load_public_key_pem if has_keys(DATA_DIR, username): actor["publicKey"] = { "id": f"https://{DOMAIN}/users/{username}#main-key", "owner": f"https://{DOMAIN}/users/{username}", "publicKeyPem": load_public_key_pem(DATA_DIR, username) } return actor def user_exists(username: str) -> bool: """Check if a user exists.""" users = load_users(DATA_DIR) return username in users def load_followers() -> list: """Load followers list.""" path = DATA_DIR / "followers.json" if path.exists(): with open(path) as f: return json.load(f) return [] def save_followers(followers: list): """Save followers list.""" path = DATA_DIR / "followers.json" with open(path, "w") as f: json.dump(followers, f, indent=2) # ============ Signing ============ from keys import has_keys, load_public_key_pem, create_signature def sign_activity(activity: dict, username: str) -> dict: """Sign an activity with the user's RSA private key.""" if not has_keys(DATA_DIR, username): # No keys - use placeholder (for testing) activity["signature"] = { "type": "RsaSignature2017", "creator": f"https://{DOMAIN}/users/{username}#main-key", "created": datetime.now(timezone.utc).isoformat(), "signatureValue": "NO_KEYS_CONFIGURED" } else: activity["signature"] = create_signature(DATA_DIR, username, DOMAIN, activity) return activity # ============ HTML Templates ============ # Tailwind CSS config for L2 - dark theme to match L1 TAILWIND_CONFIG = ''' ''' def base_html(title: str, content: str, username: str = None) -> str: """Base HTML template with Tailwind CSS dark theme.""" user_section = f'''
Logged in as {username}
''' if username else '''
Login | Register
''' return f''' {title} - Art DAG L2 {TAILWIND_CONFIG}

Art DAG L2

{user_section}
{content}
''' def get_user_from_cookie(request: Request) -> Optional[str]: """Get username from auth cookie.""" token = request.cookies.get("auth_token") if token: return verify_token(token) return None # ============ UI Endpoints ============ @app.get("/ui", response_class=HTMLResponse) async def ui_home(request: Request): """Home page with README and stats.""" username = get_user_from_cookie(request) registry = load_registry() activities = load_activities() users = load_users(DATA_DIR) readme_html = markdown.markdown(README_CONTENT, extensions=['tables', 'fenced_code']) content = f'''
{len(registry.get("assets", {}))}
Assets
{len(activities)}
Activities
{len(users)}
Users
{readme_html}
''' return HTMLResponse(base_html("Home", content, username)) @app.get("/ui/login", response_class=HTMLResponse) async def ui_login_page(request: Request): """Login page.""" username = get_user_from_cookie(request) if username: return HTMLResponse(base_html("Already Logged In", f'''
You are already logged in as {username}

Go to home page

''', username)) content = '''

Login

Don't have an account? Register

''' return HTMLResponse(base_html("Login", content)) @app.post("/ui/login", response_class=HTMLResponse) async def ui_login_submit(request: Request): """Handle login form submission.""" form = await request.form() username = form.get("username", "").strip() password = form.get("password", "") if not username or not password: return HTMLResponse('
Username and password are required
') user = authenticate_user(DATA_DIR, username, password) if not user: return HTMLResponse('
Invalid username or password
') token = create_access_token(user.username) response = HTMLResponse(f'''
Login successful! Redirecting...
''') response.set_cookie( key="auth_token", value=token.access_token, httponly=True, max_age=60 * 60 * 24 * 30, # 30 days samesite="lax" ) return response @app.get("/ui/register", response_class=HTMLResponse) async def ui_register_page(request: Request): """Register page.""" username = get_user_from_cookie(request) if username: return HTMLResponse(base_html("Already Logged In", f'''
You are already logged in as {username}

Go to home page

''', username)) content = '''

Register

Already have an account? Login

''' return HTMLResponse(base_html("Register", content)) @app.post("/ui/register", response_class=HTMLResponse) async def ui_register_submit(request: Request): """Handle register form submission.""" form = await request.form() username = form.get("username", "").strip() email = form.get("email", "").strip() or None password = form.get("password", "") password2 = form.get("password2", "") if not username or not password: return HTMLResponse('
Username and password are required
') if password != password2: return HTMLResponse('
Passwords do not match
') if len(password) < 6: return HTMLResponse('
Password must be at least 6 characters
') try: user = create_user(DATA_DIR, username, password, email) except ValueError as e: return HTMLResponse(f'
{str(e)}
') token = create_access_token(user.username) response = HTMLResponse(f'''
Registration successful! Redirecting...
''') response.set_cookie( key="auth_token", value=token.access_token, httponly=True, max_age=60 * 60 * 24 * 30, # 30 days samesite="lax" ) return response @app.post("/ui/logout", response_class=HTMLResponse) async def ui_logout(): """Handle logout.""" response = HTMLResponse(''' ''') response.delete_cookie("auth_token") return response @app.get("/ui/registry", response_class=HTMLResponse) async def ui_registry_page(request: Request): """Registry page showing all assets.""" username = get_user_from_cookie(request) registry = load_registry() assets = registry.get("assets", {}) if not assets: content = '''

Registry

No assets registered yet.

''' else: rows = "" for name, asset in sorted(assets.items(), key=lambda x: x[1].get("created_at", ""), reverse=True): hash_short = asset.get("content_hash", "")[:16] + "..." owner = asset.get("owner", "unknown") asset_type = asset.get("asset_type", "") type_color = "bg-blue-600" if asset_type == "image" else "bg-purple-600" if asset_type == "video" else "bg-gray-600" rows += f''' {name} {asset_type} {owner} {hash_short} {", ".join(asset.get("tags", []))} ''' content = f'''

Registry ({len(assets)} assets)

{rows}
Name Type Owner Content Hash Tags
''' return HTMLResponse(base_html("Registry", content, username)) @app.get("/ui/activities", response_class=HTMLResponse) async def ui_activities_page(request: Request): """Activities page showing all signed activities.""" username = get_user_from_cookie(request) activities = load_activities() if not activities: content = '''

Activities

No activities yet.

''' else: rows = "" for i, activity in enumerate(reversed(activities)): # Index from end since we reversed activity_index = len(activities) - 1 - i obj = activity.get("object_data", {}) activity_type = activity.get("activity_type", "") type_color = "bg-green-600" if activity_type == "Create" else "bg-yellow-600" if activity_type == "Update" else "bg-gray-600" actor_id = activity.get("actor_id", "") actor_name = actor_id.split("/")[-1] if actor_id else "unknown" rows += f''' {activity_type} {obj.get("name", "Untitled")} {actor_name} {activity.get("published", "")[:10]} ''' content = f'''

Activities ({len(activities)} total)

{rows}
Type Object Actor Published
''' return HTMLResponse(base_html("Activities", content, username)) @app.get("/ui/activity/{activity_index}", response_class=HTMLResponse) async def ui_activity_detail(activity_index: int, request: Request): """Activity detail page with full content display.""" username = get_user_from_cookie(request) activities = load_activities() if activity_index < 0 or activity_index >= len(activities): content = '''

Activity Not Found

This activity does not exist.

← Back to Activities

''' return HTMLResponse(base_html("Activity Not Found", content, username)) activity = activities[activity_index] activity_type = activity.get("activity_type", "") activity_id = activity.get("activity_id", "") actor_id = activity.get("actor_id", "") actor_name = actor_id.split("/")[-1] if actor_id else "unknown" published = activity.get("published", "")[:10] obj = activity.get("object_data", {}) # Object details obj_name = obj.get("name", "Untitled") obj_type = obj.get("type", "") content_hash_obj = obj.get("contentHash", {}) content_hash = content_hash_obj.get("value", "") if isinstance(content_hash_obj, dict) else "" media_type = obj.get("mediaType", "") description = obj.get("summary", "") or obj.get("content", "") # Provenance from object provenance = obj.get("provenance", {}) origin = obj.get("origin", {}) # Type colors type_color = "bg-green-600" if activity_type == "Create" else "bg-yellow-600" if activity_type == "Update" else "bg-gray-600" obj_type_color = "bg-blue-600" if "Image" in obj_type else "bg-purple-600" if "Video" in obj_type else "bg-gray-600" # Determine L1 server and asset type l1_server = provenance.get("l1_server", L1_PUBLIC_URL).rstrip("/") if provenance else L1_PUBLIC_URL.rstrip("/") is_video = "Video" in obj_type or "video" in media_type # Content display if is_video: content_html = f'''
Download Original
''' elif "Image" in obj_type or "image" in media_type: content_html = f'''
{obj_name}
Download
''' else: content_html = f'''

Content type: {media_type or obj_type}

Download
''' # Origin display origin_html = 'Not specified' if origin: origin_type = origin.get("type", "") if origin_type == "self": origin_html = 'Original content by author' elif origin_type == "external": origin_url = origin.get("url", "") origin_note = origin.get("note", "") origin_html = f'{origin_url}' if origin_note: origin_html += f'

{origin_note}

' # Provenance section provenance_html = "" if provenance and provenance.get("recipe"): recipe = provenance.get("recipe", "") inputs = provenance.get("inputs", []) l1_run_id = provenance.get("l1_run_id", "") rendered_at = provenance.get("rendered_at", "")[:10] if provenance.get("rendered_at") else "" effects_commit = provenance.get("effects_commit", "") effect_url = provenance.get("effect_url") infrastructure = provenance.get("infrastructure", {}) if not effect_url: if effects_commit and effects_commit != "unknown": effect_url = f"{EFFECTS_REPO_URL}/src/commit/{effects_commit}/{recipe}" else: effect_url = f"{EFFECTS_REPO_URL}/src/branch/main/{recipe}" # Build inputs display - show actual content as thumbnails inputs_html = "" for inp in inputs: inp_hash = inp.get("content_hash", "") if isinstance(inp, dict) else inp if inp_hash: inputs_html += f'''
Input
{inp_hash[:16]}... view
''' # Infrastructure display infra_html = "" if infrastructure: software = infrastructure.get("software", {}) hardware = infrastructure.get("hardware", {}) if software or hardware: infra_parts = [] if software: infra_parts.append(f"Software: {software.get('name', 'unknown')}") if hardware: infra_parts.append(f"Hardware: {hardware.get('name', 'unknown')}") infra_html = f'

{" | ".join(infra_parts)}

' provenance_html = f'''

Provenance

This content was created by applying an effect to input content.

Effect

{recipe} {f'
Commit: {effects_commit[:12]}...
' if effects_commit else ''}

Input(s)

{inputs_html if inputs_html else 'No inputs recorded'}

Rendered

{rendered_at if rendered_at else 'Unknown'} {infra_html}
''' content = f'''

← Back to Activities

{activity_type}

{obj_name}

{obj_type}
{content_html}

Actor

{actor_name}

Description

{description if description else 'No description'}

Origin

{origin_html}

Content Hash

{content_hash}

Published

{published}

Activity ID

{activity_id}
{provenance_html}

ActivityPub

Object URL: https://{DOMAIN}/objects/{content_hash}

Actor: {actor_id}

''' return HTMLResponse(base_html(f"Activity: {obj_name}", content, username)) @app.get("/ui/users", response_class=HTMLResponse) async def ui_users_page(request: Request): """Users page showing all registered users.""" current_user = get_user_from_cookie(request) users = load_users(DATA_DIR) if not users: content = '''

Users

No users registered yet.

''' else: rows = "" for uname, user_data in sorted(users.items()): webfinger = f"@{uname}@{DOMAIN}" rows += f''' {uname} {webfinger} {user_data.get("created_at", "")[:10]} ''' content = f'''

Users ({len(users)} registered)

Each user has their own ActivityPub actor that can be followed from Mastodon and other federated platforms.

{rows}
Username ActivityPub Handle Registered
''' return HTMLResponse(base_html("Users", content, current_user)) @app.get("/ui/asset/{name}", response_class=HTMLResponse) async def ui_asset_detail(name: str, request: Request): """Asset detail page with content preview and provenance.""" username = get_user_from_cookie(request) registry = load_registry() assets = registry.get("assets", {}) if name not in assets: content = f'''

Asset Not Found

No asset named "{name}" exists.

← Back to Registry

''' return HTMLResponse(base_html("Asset Not Found", content, username)) asset = assets[name] owner = asset.get("owner", "unknown") content_hash = asset.get("content_hash", "") asset_type = asset.get("asset_type", "") tags = asset.get("tags", []) description = asset.get("description", "") origin = asset.get("origin", {}) provenance = asset.get("provenance", {}) metadata = asset.get("metadata", {}) created_at = asset.get("created_at", "")[:10] type_color = "bg-blue-600" if asset_type == "image" else "bg-purple-600" if asset_type == "video" else "bg-gray-600" # Determine L1 server URL for content l1_server = provenance.get("l1_server", L1_PUBLIC_URL).rstrip("/") # Content display - image or video from L1 if asset_type == "video": # Use iOS-compatible MP4 endpoint content_html = f'''
Download Original
''' elif asset_type == "image": content_html = f'''
{name}
Download
''' else: content_html = f'''

Content type: {asset_type}

Download
''' # Origin display origin_html = 'Not specified' if origin: origin_type = origin.get("type", "unknown") if origin_type == "self": origin_html = 'Original content by author' elif origin_type == "external": origin_url = origin.get("url", "") origin_note = origin.get("note", "") origin_html = f'{origin_url}' if origin_note: origin_html += f'

{origin_note}

' # Tags display tags_html = 'No tags' if tags: tags_html = " ".join([f'{t}' for t in tags]) # Provenance section - for rendered outputs provenance_html = "" if provenance: recipe = provenance.get("recipe", "") inputs = provenance.get("inputs", []) l1_run_id = provenance.get("l1_run_id", "") rendered_at = provenance.get("rendered_at", "")[:10] if provenance.get("rendered_at") else "" effects_commit = provenance.get("effects_commit", "") infrastructure = provenance.get("infrastructure", {}) # Use stored effect_url or build fallback effect_url = provenance.get("effect_url") if not effect_url: # Fallback for older records if effects_commit and effects_commit != "unknown": effect_url = f"{EFFECTS_REPO_URL}/src/commit/{effects_commit}/{recipe}" else: effect_url = f"{EFFECTS_REPO_URL}/src/branch/main/{recipe}" # Build inputs display - show actual content as thumbnails inputs_html = "" for inp in inputs: inp_hash = inp.get("content_hash", "") if isinstance(inp, dict) else inp if inp_hash: inputs_html += f'''
Input
{inp_hash[:16]}... view
''' # Infrastructure display infra_html = "" if infrastructure: software = infrastructure.get("software", {}) hardware = infrastructure.get("hardware", {}) if software or hardware: infra_html = f'''

Infrastructure

{f"Software: {software.get('name', 'unknown')}" if software else ""} {f" ({software.get('content_hash', '')[:16]}...)" if software.get('content_hash') else ""} {" | " if software and hardware else ""} {f"Hardware: {hardware.get('name', 'unknown')}" if hardware else ""}
''' provenance_html = f'''

Provenance

This asset was created by applying an effect to input content.

Effect

{recipe} {f'
Commit: {effects_commit[:12]}...
' if effects_commit else ''}

Input(s)

{inputs_html if inputs_html else 'No inputs recorded'}

Rendered

{rendered_at if rendered_at else 'Unknown'}
{infra_html}
''' content = f'''

← Back to Registry

{name}

{asset_type}
{content_html}

Owner

{owner}

Description

{description if description else 'No description'}

Origin

{origin_html}

Content Hash

{content_hash}

Created

{created_at}

Tags

{tags_html}
{provenance_html}

ActivityPub

Object URL: https://{DOMAIN}/objects/{content_hash}

Owner Actor: https://{DOMAIN}/users/{owner}

''' return HTMLResponse(base_html(f"Asset: {name}", content, username)) @app.get("/ui/user/{username}", response_class=HTMLResponse) async def ui_user_detail(username: str, request: Request): """User detail page showing their published assets.""" current_user = get_user_from_cookie(request) if not user_exists(username): content = f'''

User Not Found

No user named "{username}" exists.

← Back to Users

''' return HTMLResponse(base_html("User Not Found", content, current_user)) # Get user's assets registry = load_registry() all_assets = registry.get("assets", {}) user_assets = {name: asset for name, asset in all_assets.items() if asset.get("owner") == username} # Get user's activities all_activities = load_activities() actor_id = f"https://{DOMAIN}/users/{username}" user_activities = [a for a in all_activities if a.get("actor_id") == actor_id] webfinger = f"@{username}@{DOMAIN}" # Assets table if user_assets: rows = "" for name, asset in sorted(user_assets.items(), key=lambda x: x[1].get("created_at", ""), reverse=True): hash_short = asset.get("content_hash", "")[:16] + "..." asset_type = asset.get("asset_type", "") type_color = "bg-blue-600" if asset_type == "image" else "bg-purple-600" if asset_type == "video" else "bg-gray-600" rows += f''' {name} {asset_type} {hash_short} {", ".join(asset.get("tags", []))} ''' assets_html = f'''
{rows}
Name Type Content Hash Tags
''' else: assets_html = '

No published assets yet.

' content = f'''

← Back to Users

{username}

{webfinger}
{len(user_assets)}
Published Assets
{len(user_activities)}
Activities

ActivityPub

Actor URL: https://{DOMAIN}/users/{username}

Outbox: https://{DOMAIN}/users/{username}/outbox

Published Assets ({len(user_assets)})

{assets_html} ''' return HTMLResponse(base_html(f"User: {username}", content, current_user)) # ============ API Endpoints ============ @app.get("/") async def root(request: Request): """Server info or redirect to UI.""" # If browser, redirect to UI accept = request.headers.get("accept", "") if "text/html" in accept and "application/json" not in accept: return HTMLResponse(status_code=302, headers={"Location": "/ui"}) registry = load_registry() activities = load_activities() users = load_users(DATA_DIR) return { "name": "Art DAG L2 Server", "version": "0.1.0", "domain": DOMAIN, "assets_count": len(registry.get("assets", {})), "activities_count": len(activities), "users_count": len(users) } # ============ Auth Endpoints ============ security = HTTPBearer(auto_error=False) async def get_optional_user( credentials: HTTPAuthorizationCredentials = Depends(security) ) -> Optional[User]: """Get current user if authenticated, None otherwise.""" if not credentials: return None return get_current_user(DATA_DIR, credentials.credentials) async def get_required_user( credentials: HTTPAuthorizationCredentials = Depends(security) ) -> User: """Get current user, raise 401 if not authenticated.""" if not credentials: raise HTTPException(401, "Not authenticated") user = get_current_user(DATA_DIR, credentials.credentials) if not user: raise HTTPException(401, "Invalid token") return user @app.post("/auth/register", response_model=Token) async def register(req: UserCreate): """Register a new user.""" try: user = create_user(DATA_DIR, req.username, req.password, req.email) except ValueError as e: raise HTTPException(400, str(e)) return create_access_token(user.username) @app.post("/auth/login", response_model=Token) async def login(req: UserLogin): """Login and get access token.""" user = authenticate_user(DATA_DIR, req.username, req.password) if not user: raise HTTPException(401, "Invalid username or password") return create_access_token(user.username) @app.get("/auth/me") async def get_me(user: User = Depends(get_required_user)): """Get current user info.""" return { "username": user.username, "email": user.email, "created_at": user.created_at } @app.post("/auth/verify") async def verify_auth(credentials: HTTPAuthorizationCredentials = Depends(security)): """Verify a token and return username. Used by L1 server.""" if not credentials: raise HTTPException(401, "No token provided") username = verify_token(credentials.credentials) if not username: raise HTTPException(401, "Invalid token") return {"username": username, "valid": True} @app.get("/.well-known/webfinger") async def webfinger(resource: str): """WebFinger endpoint for actor discovery.""" # Parse acct:username@domain if not resource.startswith("acct:"): raise HTTPException(400, "Resource must be acct: URI") acct = resource[5:] # Remove "acct:" if "@" not in acct: raise HTTPException(400, "Invalid acct format") username, domain = acct.split("@", 1) if domain != DOMAIN: raise HTTPException(404, f"Unknown domain: {domain}") if not user_exists(username): raise HTTPException(404, f"Unknown user: {username}") return JSONResponse( content={ "subject": resource, "links": [ { "rel": "self", "type": "application/activity+json", "href": f"https://{DOMAIN}/users/{username}" } ] }, media_type="application/jrd+json" ) @app.get("/users/{username}") async def get_actor(username: str, request: Request): """Get actor profile for any registered user.""" if not user_exists(username): raise HTTPException(404, f"Unknown user: {username}") actor = load_actor(username) # Add ActivityPub context actor["@context"] = [ "https://www.w3.org/ns/activitystreams", "https://w3id.org/security/v1" ] return JSONResponse( content=actor, media_type="application/activity+json" ) @app.get("/users/{username}/outbox") async def get_outbox(username: str, page: bool = False): """Get actor's outbox (activities they created).""" if not user_exists(username): raise HTTPException(404, f"Unknown user: {username}") # Filter activities by this user's actor_id all_activities = load_activities() actor_id = f"https://{DOMAIN}/users/{username}" user_activities = [a for a in all_activities if a.get("actor_id") == actor_id] if not page: return JSONResponse( content={ "@context": "https://www.w3.org/ns/activitystreams", "id": f"https://{DOMAIN}/users/{username}/outbox", "type": "OrderedCollection", "totalItems": len(user_activities), "first": f"https://{DOMAIN}/users/{username}/outbox?page=true" }, media_type="application/activity+json" ) # Return activities page return JSONResponse( content={ "@context": "https://www.w3.org/ns/activitystreams", "id": f"https://{DOMAIN}/users/{username}/outbox?page=true", "type": "OrderedCollectionPage", "partOf": f"https://{DOMAIN}/users/{username}/outbox", "orderedItems": user_activities }, media_type="application/activity+json" ) @app.post("/users/{username}/inbox") async def post_inbox(username: str, request: Request): """Receive activities from other servers.""" if not user_exists(username): raise HTTPException(404, f"Unknown user: {username}") body = await request.json() activity_type = body.get("type") # Handle Follow requests if activity_type == "Follow": follower = body.get("actor") # TODO: Per-user followers - for now use global followers followers = load_followers() if follower not in followers: followers.append(follower) save_followers(followers) # Send Accept (in production, do this async) # For now just acknowledge return {"status": "accepted"} # Handle other activity types return {"status": "received"} @app.get("/users/{username}/followers") async def get_followers(username: str): """Get actor's followers.""" if not user_exists(username): raise HTTPException(404, f"Unknown user: {username}") # TODO: Per-user followers - for now use global followers followers = load_followers() return JSONResponse( content={ "@context": "https://www.w3.org/ns/activitystreams", "id": f"https://{DOMAIN}/users/{username}/followers", "type": "OrderedCollection", "totalItems": len(followers), "orderedItems": followers }, media_type="application/activity+json" ) # ============ Registry Endpoints ============ @app.get("/registry") async def get_registry(): """Get full registry.""" return load_registry() @app.get("/registry/{name}") async def get_asset(name: str): """Get a specific asset.""" registry = load_registry() if name not in registry.get("assets", {}): raise HTTPException(404, f"Asset not found: {name}") return registry["assets"][name] @app.patch("/registry/{name}") async def update_asset(name: str, req: UpdateAssetRequest, user: User = Depends(get_required_user)): """Update an existing asset's metadata. Creates an Update activity.""" registry = load_registry() if name not in registry.get("assets", {}): raise HTTPException(404, f"Asset not found: {name}") asset = registry["assets"][name] # Check ownership if asset.get("owner") != user.username: raise HTTPException(403, f"Not authorized to update asset owned by {asset.get('owner')}") # Update fields that were provided if req.description is not None: asset["description"] = req.description if req.tags is not None: asset["tags"] = req.tags if req.metadata is not None: asset["metadata"] = {**asset.get("metadata", {}), **req.metadata} if req.origin is not None: asset["origin"] = req.origin asset["updated_at"] = datetime.now(timezone.utc).isoformat() # Save registry registry["assets"][name] = asset save_registry(registry) # Create Update activity activity = { "activity_id": str(uuid.uuid4()), "activity_type": "Update", "actor_id": f"https://{DOMAIN}/users/{user.username}", "object_data": { "type": asset.get("asset_type", "Object").capitalize(), "name": name, "id": f"https://{DOMAIN}/objects/{asset['content_hash']}", "contentHash": { "algorithm": "sha3-256", "value": asset["content_hash"] }, "attributedTo": f"https://{DOMAIN}/users/{user.username}", "summary": req.description, "tag": req.tags or asset.get("tags", []) }, "published": asset["updated_at"] } # Sign activity with the user's keys activity = sign_activity(activity, user.username) # Save activity activities = load_activities() activities.append(activity) save_activities(activities) return {"asset": asset, "activity": activity} def _register_asset_impl(req: RegisterRequest, owner: str): """Internal implementation for registering an asset.""" registry = load_registry() # Check if name exists if req.name in registry.get("assets", {}): raise HTTPException(400, f"Asset already exists: {req.name}") # Create asset now = datetime.now(timezone.utc).isoformat() asset = { "name": req.name, "content_hash": req.content_hash, "asset_type": req.asset_type, "tags": req.tags, "metadata": req.metadata, "url": req.url, "provenance": req.provenance, "owner": owner, "created_at": now } # Add to registry if "assets" not in registry: registry["assets"] = {} registry["assets"][req.name] = asset save_registry(registry) # Create ownership activity activity = { "activity_id": str(uuid.uuid4()), "activity_type": "Create", "actor_id": f"https://{DOMAIN}/users/{owner}", "object_data": { "type": req.asset_type.capitalize(), "name": req.name, "id": f"https://{DOMAIN}/objects/{req.content_hash}", "contentHash": { "algorithm": "sha3-256", "value": req.content_hash }, "attributedTo": f"https://{DOMAIN}/users/{owner}" }, "published": now } # Sign activity with the owner's keys activity = sign_activity(activity, owner) # Save activity activities = load_activities() activities.append(activity) save_activities(activities) return {"asset": asset, "activity": activity} @app.post("/registry") async def register_asset(req: RegisterRequest, user: User = Depends(get_required_user)): """Register a new asset and create ownership activity. Requires authentication.""" return _register_asset_impl(req, user.username) @app.post("/registry/record-run") async def record_run(req: RecordRunRequest, user: User = Depends(get_required_user)): """Record an L1 run and register the output. Requires authentication.""" # Fetch run from the specified L1 server l1_url = req.l1_server.rstrip('/') try: resp = requests.get(f"{l1_url}/runs/{req.run_id}") resp.raise_for_status() run = resp.json() except Exception as e: raise HTTPException(400, f"Failed to fetch run from L1 ({l1_url}): {e}") if run.get("status") != "completed": raise HTTPException(400, f"Run not completed: {run.get('status')}") output_hash = run.get("output_hash") if not output_hash: raise HTTPException(400, "Run has no output hash") # Build provenance from run provenance = { "inputs": [{"content_hash": h} for h in run.get("inputs", [])], "recipe": run.get("recipe"), "effect_url": run.get("effect_url"), "effects_commit": run.get("effects_commit"), "l1_server": l1_url, "l1_run_id": req.run_id, "rendered_at": run.get("completed_at"), "infrastructure": run.get("infrastructure") } # Register the output under the authenticated user return _register_asset_impl(RegisterRequest( name=req.output_name, content_hash=output_hash, asset_type="video", # Could be smarter about this tags=["rendered", "l1"], metadata={"l1_server": l1_url, "l1_run_id": req.run_id}, provenance=provenance ), user.username) @app.post("/registry/publish-cache") async def publish_cache(req: PublishCacheRequest, user: User = Depends(get_required_user)): """ Publish a cache item from L1 with metadata. Requires origin to be set (self or external URL). Creates a new asset and Create activity. """ # Validate origin if not req.origin or "type" not in req.origin: raise HTTPException(400, "Origin is required for publishing (type: 'self' or 'external')") origin_type = req.origin.get("type") if origin_type not in ("self", "external"): raise HTTPException(400, "Origin type must be 'self' or 'external'") if origin_type == "external" and not req.origin.get("url"): raise HTTPException(400, "External origin requires a URL") # Check if asset name already exists registry = load_registry() if req.asset_name in registry.get("assets", {}): raise HTTPException(400, f"Asset name already exists: {req.asset_name}") # Create asset now = datetime.now(timezone.utc).isoformat() asset = { "name": req.asset_name, "content_hash": req.content_hash, "asset_type": req.asset_type, "tags": req.tags, "description": req.description, "origin": req.origin, "metadata": req.metadata, "owner": user.username, "created_at": now } # Add to registry if "assets" not in registry: registry["assets"] = {} registry["assets"][req.asset_name] = asset save_registry(registry) # Create ownership activity with origin info object_data = { "type": req.asset_type.capitalize(), "name": req.asset_name, "id": f"https://{DOMAIN}/objects/{req.content_hash}", "contentHash": { "algorithm": "sha3-256", "value": req.content_hash }, "attributedTo": f"https://{DOMAIN}/users/{user.username}", "tag": req.tags } if req.description: object_data["summary"] = req.description # Include origin in ActivityPub object if origin_type == "self": object_data["generator"] = { "type": "Application", "name": "Art DAG", "note": "Original content created by the author" } else: object_data["source"] = { "type": "Link", "href": req.origin.get("url"), "name": req.origin.get("note", "External source") } activity = { "activity_id": str(uuid.uuid4()), "activity_type": "Create", "actor_id": f"https://{DOMAIN}/users/{user.username}", "object_data": object_data, "published": now } # Sign activity with the user's keys activity = sign_activity(activity, user.username) # Save activity activities = load_activities() activities.append(activity) save_activities(activities) return {"asset": asset, "activity": activity} # ============ Activities Endpoints ============ @app.get("/activities") async def get_activities(): """Get all activities.""" return {"activities": load_activities()} @app.get("/objects/{content_hash}") async def get_object(content_hash: str): """Get object by content hash.""" registry = load_registry() # Find asset by hash for name, asset in registry.get("assets", {}).items(): if asset.get("content_hash") == content_hash: owner = asset.get("owner", "unknown") return JSONResponse( content={ "@context": "https://www.w3.org/ns/activitystreams", "id": f"https://{DOMAIN}/objects/{content_hash}", "type": asset.get("asset_type", "Object").capitalize(), "name": name, "contentHash": { "algorithm": "sha3-256", "value": content_hash }, "attributedTo": f"https://{DOMAIN}/users/{owner}", "published": asset.get("created_at") }, media_type="application/activity+json" ) raise HTTPException(404, f"Object not found: {content_hash}") if __name__ == "__main__": import uvicorn uvicorn.run(app, host="0.0.0.0", port=8200)