Files
activity-pub/app/routers/assets.py
giles d1e9287829 Add modular app structure for L2 server
- Create app factory with routers and templates
- Auth, assets, activities, anchors, storage, users, renderers routers
- Federation router for WebFinger and nodeinfo
- Jinja2 templates for L2 pages
- Config and dependency injection

🤖 Generated with [Claude Code](https://claude.com/claude-code)

Co-Authored-By: Claude Opus 4.5 <noreply@anthropic.com>
2026-01-11 07:46:26 +00:00

244 lines
6.2 KiB
Python

"""
Asset management routes for L2 server.
Handles asset registration, listing, and publishing.
"""
import logging
from typing import Optional, List
from fastapi import APIRouter, Request, Depends, HTTPException, Form
from fastapi.responses import HTMLResponse
from pydantic import BaseModel
from artdag_common import render
from artdag_common.middleware import wants_html, wants_json
from ..config import settings
from ..dependencies import get_templates, require_auth, get_user_from_cookie
router = APIRouter()
logger = logging.getLogger(__name__)
class AssetCreate(BaseModel):
name: str
content_hash: str
ipfs_cid: Optional[str] = None
asset_type: str # image, video, effect, recipe
tags: List[str] = []
metadata: dict = {}
provenance: Optional[dict] = None
class RecordRunRequest(BaseModel):
run_id: str
recipe: str
inputs: List[str]
output_hash: str
ipfs_cid: Optional[str] = None
provenance: Optional[dict] = None
@router.get("")
async def list_assets(
request: Request,
offset: int = 0,
limit: int = 20,
asset_type: Optional[str] = None,
):
"""List user's assets."""
import db
username = get_user_from_cookie(request)
if not username:
if wants_json(request):
raise HTTPException(401, "Authentication required")
from fastapi.responses import RedirectResponse
return RedirectResponse(url="/login", status_code=302)
assets = await db.get_user_assets(username, offset=offset, limit=limit, asset_type=asset_type)
has_more = len(assets) >= limit
if wants_json(request):
return {"assets": assets, "offset": offset, "limit": limit, "has_more": has_more}
templates = get_templates(request)
return render(templates, "assets/list.html", request,
assets=assets,
user={"username": username},
offset=offset,
limit=limit,
has_more=has_more,
active_tab="assets",
)
@router.post("")
async def create_asset(
req: AssetCreate,
user: dict = Depends(require_auth),
):
"""Register a new asset."""
import db
asset_id = await db.create_asset(
username=user["username"],
name=req.name,
content_hash=req.content_hash,
ipfs_cid=req.ipfs_cid,
asset_type=req.asset_type,
tags=req.tags,
metadata=req.metadata,
provenance=req.provenance,
)
if not asset_id:
raise HTTPException(400, "Failed to create asset")
return {"asset_id": asset_id, "message": "Asset registered"}
@router.get("/{asset_id}")
async def get_asset(
asset_id: str,
request: Request,
):
"""Get asset details."""
import db
username = get_user_from_cookie(request)
asset = await db.get_asset(asset_id)
if not asset:
raise HTTPException(404, "Asset not found")
if wants_json(request):
return asset
templates = get_templates(request)
return render(templates, "assets/detail.html", request,
asset=asset,
user={"username": username} if username else None,
active_tab="assets",
)
@router.delete("/{asset_id}")
async def delete_asset(
asset_id: str,
user: dict = Depends(require_auth),
):
"""Delete an asset."""
import db
asset = await db.get_asset(asset_id)
if not asset:
raise HTTPException(404, "Asset not found")
if asset.get("owner") != user["username"]:
raise HTTPException(403, "Not authorized")
success = await db.delete_asset(asset_id)
if not success:
raise HTTPException(400, "Failed to delete asset")
return {"deleted": True}
@router.post("/record-run")
async def record_run(
req: RecordRunRequest,
user: dict = Depends(require_auth),
):
"""Record a run completion and register output as asset."""
import db
# Create asset for output
asset_id = await db.create_asset(
username=user["username"],
name=f"{req.recipe}-{req.run_id[:8]}",
content_hash=req.output_hash,
ipfs_cid=req.ipfs_cid,
asset_type="render",
metadata={
"run_id": req.run_id,
"recipe": req.recipe,
"inputs": req.inputs,
},
provenance=req.provenance,
)
# Record run
await db.record_run(
run_id=req.run_id,
username=user["username"],
recipe=req.recipe,
inputs=req.inputs,
output_hash=req.output_hash,
ipfs_cid=req.ipfs_cid,
asset_id=asset_id,
)
return {
"run_id": req.run_id,
"asset_id": asset_id,
"recorded": True,
}
@router.get("/by-run-id/{run_id}")
async def get_asset_by_run_id(run_id: str):
"""Get asset by run ID (for L1 cache lookup)."""
import db
run = await db.get_run(run_id)
if not run:
raise HTTPException(404, "Run not found")
return {
"run_id": run_id,
"output_hash": run.get("output_hash"),
"ipfs_cid": run.get("ipfs_cid"),
"provenance_cid": run.get("provenance_cid"),
}
@router.post("/{asset_id}/publish")
async def publish_asset(
asset_id: str,
request: Request,
user: dict = Depends(require_auth),
):
"""Publish asset to IPFS."""
import db
import ipfs_client
asset = await db.get_asset(asset_id)
if not asset:
raise HTTPException(404, "Asset not found")
if asset.get("owner") != user["username"]:
raise HTTPException(403, "Not authorized")
# Already published?
if asset.get("ipfs_cid"):
return {"ipfs_cid": asset["ipfs_cid"], "already_published": True}
# Get content from L1
content_hash = asset.get("content_hash")
for l1_url in settings.l1_servers:
try:
import requests
resp = requests.get(f"{l1_url}/cache/{content_hash}/raw", timeout=30)
if resp.status_code == 200:
# Pin to IPFS
cid = await ipfs_client.add_bytes(resp.content)
if cid:
await db.update_asset(asset_id, ipfs_cid=cid)
return {"ipfs_cid": cid, "published": True}
except Exception as e:
logger.warning(f"Failed to fetch from {l1_url}: {e}")
raise HTTPException(400, "Failed to publish - content not found on any L1")