Files
celery/app/routers/effects.py
gilesb 98ca2a6c81 Fix recipe listing, effects count, and add nav counts to all pages
- Fix list_by_type to return node_id (IPFS CID) instead of local hash
- Fix effects count on home page (count from _effects/ directory)
- Add nav_counts to all page templates (recipes, effects, runs, media, storage)
- Add editable metadata section to cache/media detail page
- Show more metadata on recipe detail page (ID, IPFS CID, step count)
- Update tests for new list_by_type behavior

Co-Authored-By: Claude Opus 4.5 <noreply@anthropic.com>
2026-01-12 13:30:11 +00:00

395 lines
12 KiB
Python

"""
Effects routes for L1 server.
Handles effect upload, listing, and metadata.
Effects are stored in IPFS like all other content-addressed data.
"""
import hashlib
import json
import logging
import time
from pathlib import Path
from typing import Optional
from fastapi import APIRouter, Request, Depends, HTTPException, UploadFile, File
from fastapi.responses import HTMLResponse, PlainTextResponse
from artdag_common import render
from artdag_common.middleware import wants_html, wants_json
from artdag_common.middleware.auth import UserContext
from ..dependencies import (
require_auth, get_templates, get_redis_client,
get_cache_manager,
)
from ..services.auth_service import AuthService
import ipfs_client
router = APIRouter()
logger = logging.getLogger(__name__)
def get_effects_dir() -> Path:
"""Get effects storage directory."""
cache_mgr = get_cache_manager()
effects_dir = Path(cache_mgr.cache_dir) / "_effects"
effects_dir.mkdir(parents=True, exist_ok=True)
return effects_dir
def parse_effect_metadata(source: str) -> dict:
"""
Parse effect metadata from source code.
Extracts PEP 723 dependencies and @-tag metadata from docstring.
"""
import re
metadata = {
"name": "",
"version": "1.0.0",
"author": "",
"temporal": False,
"description": "",
"params": [],
"dependencies": [],
"requires_python": ">=3.10",
}
# Parse PEP 723 dependencies
pep723_match = re.search(r"# /// script\n(.*?)# ///", source, re.DOTALL)
if pep723_match:
block = pep723_match.group(1)
deps_match = re.search(r'# dependencies = \[(.*?)\]', block, re.DOTALL)
if deps_match:
metadata["dependencies"] = re.findall(r'"([^"]+)"', deps_match.group(1))
python_match = re.search(r'# requires-python = "([^"]+)"', block)
if python_match:
metadata["requires_python"] = python_match.group(1)
# Parse docstring @-tags
docstring_match = re.search(r'"""(.*?)"""', source, re.DOTALL)
if not docstring_match:
docstring_match = re.search(r"'''(.*?)'''", source, re.DOTALL)
if docstring_match:
docstring = docstring_match.group(1)
lines = docstring.split("\n")
current_param = None
desc_lines = []
in_description = False
for line in lines:
stripped = line.strip()
if stripped.startswith("@effect "):
metadata["name"] = stripped[8:].strip()
in_description = False
elif stripped.startswith("@version "):
metadata["version"] = stripped[9:].strip()
elif stripped.startswith("@author "):
metadata["author"] = stripped[8:].strip()
elif stripped.startswith("@temporal "):
val = stripped[10:].strip().lower()
metadata["temporal"] = val in ("true", "yes", "1")
elif stripped.startswith("@description"):
in_description = True
desc_lines = []
elif stripped.startswith("@param "):
in_description = False
if current_param:
metadata["params"].append(current_param)
parts = stripped[7:].split()
if len(parts) >= 2:
current_param = {
"name": parts[0],
"type": parts[1],
"description": "",
}
else:
current_param = None
elif stripped.startswith("@range ") and current_param:
range_parts = stripped[7:].split()
if len(range_parts) >= 2:
try:
current_param["range"] = [float(range_parts[0]), float(range_parts[1])]
except ValueError:
pass
elif stripped.startswith("@default ") and current_param:
current_param["default"] = stripped[9:].strip()
elif stripped.startswith("@example"):
in_description = False
if current_param:
metadata["params"].append(current_param)
current_param = None
elif in_description and stripped:
desc_lines.append(stripped)
elif current_param and stripped and not stripped.startswith("@"):
current_param["description"] = stripped
if in_description:
metadata["description"] = " ".join(desc_lines)
if current_param:
metadata["params"].append(current_param)
return metadata
@router.post("/upload")
async def upload_effect(
file: UploadFile = File(...),
ctx: UserContext = Depends(require_auth),
):
"""
Upload an effect to IPFS.
Parses PEP 723 metadata and @-tag docstring.
Returns IPFS CID for use in recipes.
"""
content = await file.read()
try:
source = content.decode("utf-8")
except UnicodeDecodeError:
raise HTTPException(400, "Effect must be valid UTF-8 text")
# Parse metadata
try:
meta = parse_effect_metadata(source)
except Exception as e:
logger.warning(f"Failed to parse effect metadata: {e}")
meta = {"name": file.filename or "unknown"}
if not meta.get("name"):
meta["name"] = Path(file.filename).stem if file.filename else "unknown"
# Store effect source in IPFS
cid = ipfs_client.add_bytes(content)
if not cid:
raise HTTPException(500, "Failed to store effect in IPFS")
# Also keep local cache for fast worker access
effects_dir = get_effects_dir()
effect_dir = effects_dir / cid
effect_dir.mkdir(parents=True, exist_ok=True)
(effect_dir / "effect.py").write_text(source, encoding="utf-8")
# Store metadata (locally and in IPFS)
full_meta = {
"cid": cid,
"meta": meta,
"uploader": ctx.actor_id,
"uploaded_at": time.strftime("%Y-%m-%dT%H:%M:%SZ", time.gmtime()),
"filename": file.filename,
}
(effect_dir / "metadata.json").write_text(json.dumps(full_meta, indent=2))
# Also store metadata in IPFS for discoverability
meta_cid = ipfs_client.add_json(full_meta)
logger.info(f"Uploaded effect '{meta.get('name')}' cid={cid} by {ctx.actor_id}")
return {
"cid": cid,
"metadata_cid": meta_cid,
"name": meta.get("name"),
"version": meta.get("version"),
"temporal": meta.get("temporal", False),
"params": meta.get("params", []),
"dependencies": meta.get("dependencies", []),
"uploaded": True,
}
@router.get("/{cid}")
async def get_effect(
cid: str,
request: Request,
ctx: UserContext = Depends(require_auth),
):
"""Get effect metadata by CID."""
effects_dir = get_effects_dir()
effect_dir = effects_dir / cid
metadata_path = effect_dir / "metadata.json"
# Try local cache first
if metadata_path.exists():
meta = json.loads(metadata_path.read_text())
else:
# Fetch from IPFS
source_bytes = ipfs_client.get_bytes(cid)
if not source_bytes:
raise HTTPException(404, f"Effect {cid[:16]}... not found")
# Cache locally
effect_dir.mkdir(parents=True, exist_ok=True)
source = source_bytes.decode("utf-8")
(effect_dir / "effect.py").write_text(source)
# Parse metadata from source
parsed_meta = parse_effect_metadata(source)
meta = {"cid": cid, "meta": parsed_meta}
(effect_dir / "metadata.json").write_text(json.dumps(meta, indent=2))
if wants_json(request):
return meta
# HTML response
from ..dependencies import get_nav_counts
nav_counts = await get_nav_counts(ctx.actor_id)
templates = get_templates(request)
return render(templates, "effects/detail.html", request,
effect=meta,
user=ctx,
nav_counts=nav_counts,
active_tab="effects",
)
@router.get("/{cid}/source")
async def get_effect_source(
cid: str,
ctx: UserContext = Depends(require_auth),
):
"""Get effect source code."""
effects_dir = get_effects_dir()
source_path = effects_dir / cid / "effect.py"
# Try local cache first
if source_path.exists():
return PlainTextResponse(source_path.read_text())
# Fetch from IPFS
source_bytes = ipfs_client.get_bytes(cid)
if not source_bytes:
raise HTTPException(404, f"Effect {cid[:16]}... not found")
# Cache locally
source_path.parent.mkdir(parents=True, exist_ok=True)
source = source_bytes.decode("utf-8")
source_path.write_text(source)
return PlainTextResponse(source)
@router.get("")
async def list_effects(
request: Request,
ctx: UserContext = Depends(require_auth),
):
"""List all uploaded effects."""
effects_dir = get_effects_dir()
effects = []
if effects_dir.exists():
for effect_dir in effects_dir.iterdir():
if effect_dir.is_dir():
metadata_path = effect_dir / "metadata.json"
if metadata_path.exists():
try:
meta = json.loads(metadata_path.read_text())
effects.append(meta)
except json.JSONDecodeError:
pass
# Sort by upload time (newest first)
effects.sort(key=lambda e: e.get("uploaded_at", ""), reverse=True)
if wants_json(request):
return {"effects": effects}
from ..dependencies import get_nav_counts
nav_counts = await get_nav_counts(ctx.actor_id)
templates = get_templates(request)
return render(templates, "effects/list.html", request,
effects=effects,
user=ctx,
nav_counts=nav_counts,
active_tab="effects",
)
@router.post("/{cid}/publish")
async def publish_effect(
cid: str,
request: Request,
ctx: UserContext = Depends(require_auth),
):
"""Publish effect to L2 ActivityPub server."""
from ..services.cache_service import CacheService
import database
# Verify effect exists
effects_dir = get_effects_dir()
effect_dir = effects_dir / cid
if not effect_dir.exists():
error = "Effect not found"
if wants_html(request):
return HTMLResponse(f'<span class="text-red-400">{error}</span>')
raise HTTPException(404, error)
# Use cache service to publish
cache_service = CacheService(database, get_cache_manager())
ipfs_cid, error = await cache_service.publish_to_l2(
cid=cid,
actor_id=ctx.actor_id,
l2_server=ctx.l2_server,
auth_token=request.cookies.get("auth_token"),
)
if error:
if wants_html(request):
return HTMLResponse(f'<span class="text-red-400">{error}</span>')
raise HTTPException(400, error)
logger.info(f"Published effect {cid[:16]}... to L2 by {ctx.actor_id}")
if wants_html(request):
return HTMLResponse(f'<span class="text-green-400">Shared: {ipfs_cid[:16]}...</span>')
return {"ipfs_cid": ipfs_cid, "cid": cid, "published": True}
@router.delete("/{cid}")
async def delete_effect(
cid: str,
ctx: UserContext = Depends(require_auth),
):
"""Delete an effect from local cache (IPFS content is immutable)."""
effects_dir = get_effects_dir()
effect_dir = effects_dir / cid
if not effect_dir.exists():
raise HTTPException(404, f"Effect {cid[:16]}... not found in local cache")
# Check ownership
metadata_path = effect_dir / "metadata.json"
if metadata_path.exists():
meta = json.loads(metadata_path.read_text())
if meta.get("uploader") != ctx.actor_id:
raise HTTPException(403, "Can only delete your own effects")
import shutil
shutil.rmtree(effect_dir)
# Unpin from IPFS (content remains available if pinned elsewhere)
ipfs_client.unpin(cid)
logger.info(f"Deleted effect {cid[:16]}... by {ctx.actor_id}")
return {"deleted": True, "note": "Unpinned from local IPFS; content may still exist on other nodes"}