#!/usr/bin/env python3 """ Art DAG L2 Server - ActivityPub Manages ownership registry, activities, and federation. - Registry of owned assets - ActivityPub actor endpoints - Sign and publish Create activities - Federation with other servers """ import hashlib import json import os import uuid from datetime import datetime, timezone from pathlib import Path from typing import Optional from urllib.parse import urlparse from fastapi import FastAPI, HTTPException, Request, Response from fastapi.responses import JSONResponse from pydantic import BaseModel import requests # Configuration DOMAIN = os.environ.get("ARTDAG_DOMAIN", "artdag.rose-ash.com") USERNAME = os.environ.get("ARTDAG_USER", "giles") DATA_DIR = Path(os.environ.get("ARTDAG_DATA", str(Path.home() / ".artdag" / "l2"))) L1_SERVER = os.environ.get("ARTDAG_L1", "http://localhost:8100") # Ensure data directory exists DATA_DIR.mkdir(parents=True, exist_ok=True) (DATA_DIR / "assets").mkdir(exist_ok=True) app = FastAPI( title="Art DAG L2 Server", description="ActivityPub server for Art DAG ownership and federation", version="0.1.0" ) # ============ Data Models ============ class Asset(BaseModel): """An owned asset.""" name: str content_hash: str asset_type: str # image, video, effect, recipe, infrastructure tags: list[str] = [] metadata: dict = {} url: Optional[str] = None provenance: Optional[dict] = None created_at: str = "" class Activity(BaseModel): """An ActivityPub activity.""" activity_id: str activity_type: str # Create, Update, Delete, Announce actor_id: str object_data: dict published: str signature: Optional[dict] = None class RegisterRequest(BaseModel): """Request to register an asset.""" name: str content_hash: str asset_type: str tags: list[str] = [] metadata: dict = {} url: Optional[str] = None provenance: Optional[dict] = None class RecordRunRequest(BaseModel): """Request to record an L1 run.""" run_id: str output_name: str # ============ Storage ============ def load_registry() -> dict: """Load registry from disk.""" path = DATA_DIR / "registry.json" if path.exists(): with open(path) as f: return json.load(f) return {"version": "1.0", "assets": {}} def save_registry(registry: dict): """Save registry to disk.""" path = DATA_DIR / "registry.json" with open(path, "w") as f: json.dump(registry, f, indent=2) def load_activities() -> list: """Load activities from disk.""" path = DATA_DIR / "activities.json" if path.exists(): with open(path) as f: data = json.load(f) return data.get("activities", []) return [] def save_activities(activities: list): """Save activities to disk.""" path = DATA_DIR / "activities.json" with open(path, "w") as f: json.dump({"version": "1.0", "activities": activities}, f, indent=2) def load_actor() -> dict: """Load actor data.""" path = DATA_DIR / "actor.json" if path.exists(): with open(path) as f: return json.load(f) # Return default actor return { "id": f"https://{DOMAIN}/users/{USERNAME}", "type": "Person", "preferredUsername": USERNAME, "name": USERNAME, "inbox": f"https://{DOMAIN}/users/{USERNAME}/inbox", "outbox": f"https://{DOMAIN}/users/{USERNAME}/outbox", "followers": f"https://{DOMAIN}/users/{USERNAME}/followers", "following": f"https://{DOMAIN}/users/{USERNAME}/following", } def load_followers() -> list: """Load followers list.""" path = DATA_DIR / "followers.json" if path.exists(): with open(path) as f: return json.load(f) return [] def save_followers(followers: list): """Save followers list.""" path = DATA_DIR / "followers.json" with open(path, "w") as f: json.dump(followers, f, indent=2) # ============ Signing ============ def sign_activity(activity: dict) -> dict: """Sign an activity (placeholder - real impl uses RSA).""" # In production, use artdag.activitypub.signatures activity["signature"] = { "type": "RsaSignature2017", "creator": f"https://{DOMAIN}/users/{USERNAME}#main-key", "created": datetime.now(timezone.utc).isoformat(), "signatureValue": "placeholder-implement-real-signing" } return activity # ============ ActivityPub Endpoints ============ @app.get("/") async def root(): """Server info.""" registry = load_registry() activities = load_activities() return { "name": "Art DAG L2 Server", "version": "0.1.0", "domain": DOMAIN, "user": USERNAME, "assets_count": len(registry.get("assets", {})), "activities_count": len(activities), "l1_server": L1_SERVER } @app.get("/.well-known/webfinger") async def webfinger(resource: str): """WebFinger endpoint for actor discovery.""" expected = f"acct:{USERNAME}@{DOMAIN}" if resource != expected: raise HTTPException(404, f"Unknown resource: {resource}") return JSONResponse( content={ "subject": expected, "links": [ { "rel": "self", "type": "application/activity+json", "href": f"https://{DOMAIN}/users/{USERNAME}" } ] }, media_type="application/jrd+json" ) @app.get("/users/{username}") async def get_actor(username: str, request: Request): """Get actor profile.""" if username != USERNAME: raise HTTPException(404, f"Unknown user: {username}") actor = load_actor() # Add ActivityPub context actor["@context"] = [ "https://www.w3.org/ns/activitystreams", "https://w3id.org/security/v1" ] return JSONResponse( content=actor, media_type="application/activity+json" ) @app.get("/users/{username}/outbox") async def get_outbox(username: str, page: bool = False): """Get actor's outbox (published activities).""" if username != USERNAME: raise HTTPException(404, f"Unknown user: {username}") activities = load_activities() if not page: return JSONResponse( content={ "@context": "https://www.w3.org/ns/activitystreams", "id": f"https://{DOMAIN}/users/{USERNAME}/outbox", "type": "OrderedCollection", "totalItems": len(activities), "first": f"https://{DOMAIN}/users/{USERNAME}/outbox?page=true" }, media_type="application/activity+json" ) # Return activities page return JSONResponse( content={ "@context": "https://www.w3.org/ns/activitystreams", "id": f"https://{DOMAIN}/users/{USERNAME}/outbox?page=true", "type": "OrderedCollectionPage", "partOf": f"https://{DOMAIN}/users/{USERNAME}/outbox", "orderedItems": activities }, media_type="application/activity+json" ) @app.post("/users/{username}/inbox") async def post_inbox(username: str, request: Request): """Receive activities from other servers.""" if username != USERNAME: raise HTTPException(404, f"Unknown user: {username}") body = await request.json() activity_type = body.get("type") # Handle Follow requests if activity_type == "Follow": follower = body.get("actor") followers = load_followers() if follower not in followers: followers.append(follower) save_followers(followers) # Send Accept (in production, do this async) # For now just acknowledge return {"status": "accepted"} # Handle other activity types return {"status": "received"} @app.get("/users/{username}/followers") async def get_followers(username: str): """Get actor's followers.""" if username != USERNAME: raise HTTPException(404, f"Unknown user: {username}") followers = load_followers() return JSONResponse( content={ "@context": "https://www.w3.org/ns/activitystreams", "id": f"https://{DOMAIN}/users/{USERNAME}/followers", "type": "OrderedCollection", "totalItems": len(followers), "orderedItems": followers }, media_type="application/activity+json" ) # ============ Registry Endpoints ============ @app.get("/registry") async def get_registry(): """Get full registry.""" return load_registry() @app.get("/registry/{name}") async def get_asset(name: str): """Get a specific asset.""" registry = load_registry() if name not in registry.get("assets", {}): raise HTTPException(404, f"Asset not found: {name}") return registry["assets"][name] @app.post("/registry") async def register_asset(req: RegisterRequest): """Register a new asset and create ownership activity.""" registry = load_registry() # Check if name exists if req.name in registry.get("assets", {}): raise HTTPException(400, f"Asset already exists: {req.name}") # Create asset now = datetime.now(timezone.utc).isoformat() asset = { "name": req.name, "content_hash": req.content_hash, "asset_type": req.asset_type, "tags": req.tags, "metadata": req.metadata, "url": req.url, "provenance": req.provenance, "created_at": now } # Add to registry if "assets" not in registry: registry["assets"] = {} registry["assets"][req.name] = asset save_registry(registry) # Create ownership activity activity = { "activity_id": str(uuid.uuid4()), "activity_type": "Create", "actor_id": f"https://{DOMAIN}/users/{USERNAME}", "object_data": { "type": req.asset_type.capitalize(), "name": req.name, "id": f"https://{DOMAIN}/objects/{req.content_hash}", "contentHash": { "algorithm": "sha3-256", "value": req.content_hash }, "attributedTo": f"https://{DOMAIN}/users/{USERNAME}" }, "published": now } # Sign activity activity = sign_activity(activity) # Save activity activities = load_activities() activities.append(activity) save_activities(activities) return {"asset": asset, "activity": activity} @app.post("/registry/record-run") async def record_run(req: RecordRunRequest): """Record an L1 run and register the output.""" # Fetch run from L1 server try: resp = requests.get(f"{L1_SERVER}/runs/{req.run_id}") resp.raise_for_status() run = resp.json() except Exception as e: raise HTTPException(400, f"Failed to fetch run from L1: {e}") if run.get("status") != "completed": raise HTTPException(400, f"Run not completed: {run.get('status')}") output_hash = run.get("output_hash") if not output_hash: raise HTTPException(400, "Run has no output hash") # Build provenance from run provenance = { "inputs": [{"content_hash": h} for h in run.get("inputs", [])], "recipe": run.get("recipe"), "l1_run_id": req.run_id, "rendered_at": run.get("completed_at") } # Register the output return await register_asset(RegisterRequest( name=req.output_name, content_hash=output_hash, asset_type="video", # Could be smarter about this tags=["rendered", "l1"], metadata={"l1_run_id": req.run_id}, provenance=provenance )) # ============ Activities Endpoints ============ @app.get("/activities") async def get_activities(): """Get all activities.""" return {"activities": load_activities()} @app.get("/objects/{content_hash}") async def get_object(content_hash: str): """Get object by content hash.""" registry = load_registry() # Find asset by hash for name, asset in registry.get("assets", {}).items(): if asset.get("content_hash") == content_hash: return JSONResponse( content={ "@context": "https://www.w3.org/ns/activitystreams", "id": f"https://{DOMAIN}/objects/{content_hash}", "type": asset.get("asset_type", "Object").capitalize(), "name": name, "contentHash": { "algorithm": "sha3-256", "value": content_hash }, "attributedTo": f"https://{DOMAIN}/users/{USERNAME}", "published": asset.get("created_at") }, media_type="application/activity+json" ) raise HTTPException(404, f"Object not found: {content_hash}") if __name__ == "__main__": import uvicorn uvicorn.run(app, host="0.0.0.0", port=8200)