Files
celery/app/routers/effects.py
giles d20eef76ad Fix completed runs not appearing in list + add purge-failed endpoint
- Update save_run_cache to also update actor_id, recipe, inputs on conflict
- Add logging for actor_id when saving runs to run_cache
- Add admin endpoint DELETE /runs/admin/purge-failed to delete all failed runs

Co-Authored-By: Claude Opus 4.5 <noreply@anthropic.com>
2026-02-02 23:24:39 +00:00

416 lines
13 KiB
Python

"""
Effects routes for L1 server.
Handles effect upload, listing, and metadata.
Effects are S-expression files stored in IPFS like all other content-addressed data.
"""
import json
import logging
import re
import time
from pathlib import Path
from typing import Optional
from fastapi import APIRouter, Request, Depends, HTTPException, UploadFile, File, Form
from fastapi.responses import HTMLResponse, PlainTextResponse
from artdag_common import render
from artdag_common.middleware import wants_html, wants_json
from artdag_common.middleware.auth import UserContext
from ..dependencies import (
require_auth, get_templates, get_redis_client,
get_cache_manager,
)
from ..services.auth_service import AuthService
import ipfs_client
router = APIRouter()
logger = logging.getLogger(__name__)
def get_effects_dir() -> Path:
"""Get effects storage directory."""
cache_mgr = get_cache_manager()
effects_dir = Path(cache_mgr.cache_dir) / "_effects"
effects_dir.mkdir(parents=True, exist_ok=True)
return effects_dir
def parse_effect_metadata(source: str) -> dict:
"""
Parse effect metadata from S-expression source code.
Extracts metadata from comment headers (;; @key value format)
or from (defeffect name ...) form.
"""
metadata = {
"name": "",
"version": "1.0.0",
"author": "",
"temporal": False,
"description": "",
"params": [],
}
# Parse comment-based metadata (;; @key value)
for line in source.split("\n"):
stripped = line.strip()
if not stripped.startswith(";"):
# Stop parsing metadata at first non-comment line
if stripped and not stripped.startswith("("):
continue
if stripped.startswith("("):
break
# Remove comment prefix
comment = stripped.lstrip(";").strip()
if comment.startswith("@effect "):
metadata["name"] = comment[8:].strip()
elif comment.startswith("@name "):
metadata["name"] = comment[6:].strip()
elif comment.startswith("@version "):
metadata["version"] = comment[9:].strip()
elif comment.startswith("@author "):
metadata["author"] = comment[8:].strip()
elif comment.startswith("@temporal"):
val = comment[9:].strip().lower() if len(comment) > 9 else "true"
metadata["temporal"] = val in ("true", "yes", "1", "")
elif comment.startswith("@description "):
metadata["description"] = comment[13:].strip()
elif comment.startswith("@param "):
# Format: @param name type [description]
parts = comment[7:].split(None, 2)
if len(parts) >= 2:
param = {"name": parts[0], "type": parts[1]}
if len(parts) > 2:
param["description"] = parts[2]
metadata["params"].append(param)
# Also try to extract name from (defeffect "name" ...) or (effect "name" ...)
if not metadata["name"]:
name_match = re.search(r'\((defeffect|effect)\s+"([^"]+)"', source)
if name_match:
metadata["name"] = name_match.group(2)
# Try to extract name from first (define ...) form
if not metadata["name"]:
define_match = re.search(r'\(define\s+(\w+)', source)
if define_match:
metadata["name"] = define_match.group(1)
return metadata
@router.post("/upload")
async def upload_effect(
file: UploadFile = File(...),
display_name: Optional[str] = Form(None),
ctx: UserContext = Depends(require_auth),
):
"""
Upload an S-expression effect to IPFS.
Parses metadata from comment headers.
Returns IPFS CID for use in recipes.
Args:
file: The .sexp effect file
display_name: Optional custom friendly name for the effect
"""
content = await file.read()
try:
source = content.decode("utf-8")
except UnicodeDecodeError:
raise HTTPException(400, "Effect must be valid UTF-8 text")
# Parse metadata from sexp source
try:
meta = parse_effect_metadata(source)
except Exception as e:
logger.warning(f"Failed to parse effect metadata: {e}")
meta = {"name": file.filename or "unknown"}
if not meta.get("name"):
meta["name"] = Path(file.filename).stem if file.filename else "unknown"
# Store effect source in IPFS
cid = ipfs_client.add_bytes(content)
if not cid:
raise HTTPException(500, "Failed to store effect in IPFS")
# Also keep local cache for fast worker access
effects_dir = get_effects_dir()
effect_dir = effects_dir / cid
effect_dir.mkdir(parents=True, exist_ok=True)
(effect_dir / "effect.sexp").write_text(source, encoding="utf-8")
# Store metadata (locally and in IPFS)
full_meta = {
"cid": cid,
"meta": meta,
"uploader": ctx.actor_id,
"uploaded_at": time.strftime("%Y-%m-%dT%H:%M:%SZ", time.gmtime()),
"filename": file.filename,
}
(effect_dir / "metadata.json").write_text(json.dumps(full_meta, indent=2))
# Also store metadata in IPFS for discoverability
meta_cid = ipfs_client.add_json(full_meta)
# Track ownership in item_types
import database
await database.save_item_metadata(
cid=cid,
actor_id=ctx.actor_id,
item_type="effect",
filename=file.filename,
)
# Assign friendly name (use custom display_name if provided, else from metadata)
from ..services.naming_service import get_naming_service
naming = get_naming_service()
friendly_entry = await naming.assign_name(
cid=cid,
actor_id=ctx.actor_id,
item_type="effect",
display_name=display_name or meta.get("name"),
filename=file.filename,
)
logger.info(f"Uploaded effect '{meta.get('name')}' cid={cid} friendly_name='{friendly_entry['friendly_name']}' by {ctx.actor_id}")
return {
"cid": cid,
"metadata_cid": meta_cid,
"name": meta.get("name"),
"friendly_name": friendly_entry["friendly_name"],
"version": meta.get("version"),
"temporal": meta.get("temporal", False),
"params": meta.get("params", []),
"uploaded": True,
}
@router.get("/{cid}")
async def get_effect(
cid: str,
request: Request,
ctx: UserContext = Depends(require_auth),
):
"""Get effect metadata by CID."""
effects_dir = get_effects_dir()
effect_dir = effects_dir / cid
metadata_path = effect_dir / "metadata.json"
# Try local cache first
if metadata_path.exists():
meta = json.loads(metadata_path.read_text())
else:
# Fetch from IPFS
source_bytes = ipfs_client.get_bytes(cid)
if not source_bytes:
raise HTTPException(404, f"Effect {cid[:16]}... not found")
# Cache locally
effect_dir.mkdir(parents=True, exist_ok=True)
source = source_bytes.decode("utf-8")
(effect_dir / "effect.sexp").write_text(source)
# Parse metadata from source
parsed_meta = parse_effect_metadata(source)
meta = {"cid": cid, "meta": parsed_meta}
(effect_dir / "metadata.json").write_text(json.dumps(meta, indent=2))
# Add friendly name if available
from ..services.naming_service import get_naming_service
naming = get_naming_service()
friendly = await naming.get_by_cid(ctx.actor_id, cid)
if friendly:
meta["friendly_name"] = friendly["friendly_name"]
meta["base_name"] = friendly["base_name"]
meta["version_id"] = friendly["version_id"]
if wants_json(request):
return meta
# HTML response
from ..dependencies import get_nav_counts
nav_counts = await get_nav_counts(ctx.actor_id)
templates = get_templates(request)
return render(templates, "effects/detail.html", request,
effect=meta,
user=ctx,
nav_counts=nav_counts,
active_tab="effects",
)
@router.get("/{cid}/source")
async def get_effect_source(
cid: str,
ctx: UserContext = Depends(require_auth),
):
"""Get effect source code."""
effects_dir = get_effects_dir()
source_path = effects_dir / cid / "effect.sexp"
# Try local cache first (check both .sexp and legacy .py)
if source_path.exists():
return PlainTextResponse(source_path.read_text())
legacy_path = effects_dir / cid / "effect.py"
if legacy_path.exists():
return PlainTextResponse(legacy_path.read_text())
# Fetch from IPFS
source_bytes = ipfs_client.get_bytes(cid)
if not source_bytes:
raise HTTPException(404, f"Effect {cid[:16]}... not found")
# Cache locally
source_path.parent.mkdir(parents=True, exist_ok=True)
source = source_bytes.decode("utf-8")
source_path.write_text(source)
return PlainTextResponse(source)
@router.get("")
async def list_effects(
request: Request,
offset: int = 0,
limit: int = 20,
ctx: UserContext = Depends(require_auth),
):
"""List user's effects with pagination."""
import database
effects_dir = get_effects_dir()
effects = []
# Get user's effect CIDs from item_types
user_items = await database.get_user_items(ctx.actor_id, item_type="effect", limit=1000)
effect_cids = [item["cid"] for item in user_items]
# Get naming service for friendly name lookup
from ..services.naming_service import get_naming_service
naming = get_naming_service()
for cid in effect_cids:
effect_dir = effects_dir / cid
metadata_path = effect_dir / "metadata.json"
if metadata_path.exists():
try:
meta = json.loads(metadata_path.read_text())
# Add friendly name if available
friendly = await naming.get_by_cid(ctx.actor_id, cid)
if friendly:
meta["friendly_name"] = friendly["friendly_name"]
meta["base_name"] = friendly["base_name"]
effects.append(meta)
except json.JSONDecodeError:
pass
# Sort by upload time (newest first)
effects.sort(key=lambda e: e.get("uploaded_at", ""), reverse=True)
# Apply pagination
total = len(effects)
paginated_effects = effects[offset:offset + limit]
has_more = offset + limit < total
if wants_json(request):
return {"effects": paginated_effects, "offset": offset, "limit": limit, "has_more": has_more}
from ..dependencies import get_nav_counts
nav_counts = await get_nav_counts(ctx.actor_id)
templates = get_templates(request)
return render(templates, "effects/list.html", request,
effects=paginated_effects,
user=ctx,
nav_counts=nav_counts,
active_tab="effects",
offset=offset,
limit=limit,
has_more=has_more,
)
@router.post("/{cid}/publish")
async def publish_effect(
cid: str,
request: Request,
ctx: UserContext = Depends(require_auth),
):
"""Publish effect to L2 ActivityPub server."""
from ..services.cache_service import CacheService
import database
# Verify effect exists
effects_dir = get_effects_dir()
effect_dir = effects_dir / cid
if not effect_dir.exists():
error = "Effect not found"
if wants_html(request):
return HTMLResponse(f'<span class="text-red-400">{error}</span>')
raise HTTPException(404, error)
# Use cache service to publish
cache_service = CacheService(database, get_cache_manager())
ipfs_cid, error = await cache_service.publish_to_l2(
cid=cid,
actor_id=ctx.actor_id,
l2_server=ctx.l2_server,
auth_token=request.cookies.get("auth_token"),
)
if error:
if wants_html(request):
return HTMLResponse(f'<span class="text-red-400">{error}</span>')
raise HTTPException(400, error)
logger.info(f"Published effect {cid[:16]}... to L2 by {ctx.actor_id}")
if wants_html(request):
return HTMLResponse(f'<span class="text-green-400">Shared: {ipfs_cid[:16]}...</span>')
return {"ipfs_cid": ipfs_cid, "cid": cid, "published": True}
@router.delete("/{cid}")
async def delete_effect(
cid: str,
ctx: UserContext = Depends(require_auth),
):
"""Remove user's ownership link to an effect."""
import database
# Remove user's ownership link from item_types
await database.delete_item_type(cid, ctx.actor_id, "effect")
# Remove friendly name
await database.delete_friendly_name(ctx.actor_id, cid)
# Check if anyone still owns this effect
remaining_owners = await database.get_item_types(cid)
# Only delete local files if no one owns it anymore
if not remaining_owners:
effects_dir = get_effects_dir()
effect_dir = effects_dir / cid
if effect_dir.exists():
import shutil
shutil.rmtree(effect_dir)
# Unpin from IPFS
ipfs_client.unpin(cid)
logger.info(f"Garbage collected effect {cid[:16]}... (no remaining owners)")
logger.info(f"Removed effect {cid[:16]}... ownership for {ctx.actor_id}")
return {"deleted": True}