""" Effects routes for L1 server. Handles effect upload, listing, and metadata. """ import hashlib import json import logging import time from pathlib import Path from typing import Optional from fastapi import APIRouter, Request, Depends, HTTPException, UploadFile, File from fastapi.responses import HTMLResponse, PlainTextResponse from artdag_common import render from artdag_common.middleware import wants_html, wants_json from artdag_common.middleware.auth import UserContext from ..dependencies import ( require_auth, get_templates, get_redis_client, get_cache_manager, ) from ..services.auth_service import AuthService router = APIRouter() logger = logging.getLogger(__name__) def get_effects_dir() -> Path: """Get effects storage directory.""" cache_mgr = get_cache_manager() effects_dir = Path(cache_mgr.cache_dir) / "_effects" effects_dir.mkdir(parents=True, exist_ok=True) return effects_dir def parse_effect_metadata(source: str) -> dict: """ Parse effect metadata from source code. Extracts PEP 723 dependencies and @-tag metadata from docstring. """ import re metadata = { "name": "", "version": "1.0.0", "author": "", "temporal": False, "description": "", "params": [], "dependencies": [], "requires_python": ">=3.10", } # Parse PEP 723 dependencies pep723_match = re.search(r"# /// script\n(.*?)# ///", source, re.DOTALL) if pep723_match: block = pep723_match.group(1) deps_match = re.search(r'# dependencies = \[(.*?)\]', block, re.DOTALL) if deps_match: metadata["dependencies"] = re.findall(r'"([^"]+)"', deps_match.group(1)) python_match = re.search(r'# requires-python = "([^"]+)"', block) if python_match: metadata["requires_python"] = python_match.group(1) # Parse docstring @-tags docstring_match = re.search(r'"""(.*?)"""', source, re.DOTALL) if not docstring_match: docstring_match = re.search(r"'''(.*?)'''", source, re.DOTALL) if docstring_match: docstring = docstring_match.group(1) lines = docstring.split("\n") current_param = None desc_lines = [] in_description = False for line in lines: stripped = line.strip() if stripped.startswith("@effect "): metadata["name"] = stripped[8:].strip() in_description = False elif stripped.startswith("@version "): metadata["version"] = stripped[9:].strip() elif stripped.startswith("@author "): metadata["author"] = stripped[8:].strip() elif stripped.startswith("@temporal "): val = stripped[10:].strip().lower() metadata["temporal"] = val in ("true", "yes", "1") elif stripped.startswith("@description"): in_description = True desc_lines = [] elif stripped.startswith("@param "): in_description = False if current_param: metadata["params"].append(current_param) parts = stripped[7:].split() if len(parts) >= 2: current_param = { "name": parts[0], "type": parts[1], "description": "", } else: current_param = None elif stripped.startswith("@range ") and current_param: range_parts = stripped[7:].split() if len(range_parts) >= 2: try: current_param["range"] = [float(range_parts[0]), float(range_parts[1])] except ValueError: pass elif stripped.startswith("@default ") and current_param: current_param["default"] = stripped[9:].strip() elif stripped.startswith("@example"): in_description = False if current_param: metadata["params"].append(current_param) current_param = None elif in_description and stripped: desc_lines.append(stripped) elif current_param and stripped and not stripped.startswith("@"): current_param["description"] = stripped if in_description: metadata["description"] = " ".join(desc_lines) if current_param: metadata["params"].append(current_param) return metadata @router.post("/upload") async def upload_effect( file: UploadFile = File(...), ctx: UserContext = Depends(require_auth), ): """ Upload an effect to the cache. Parses PEP 723 metadata and @-tag docstring. Returns content hash for use in recipes. """ content = await file.read() try: source = content.decode("utf-8") except UnicodeDecodeError: raise HTTPException(400, "Effect must be valid UTF-8 text") # Compute content hash content_hash = hashlib.sha3_256(content).hexdigest() # Parse metadata try: meta = parse_effect_metadata(source) except Exception as e: logger.warning(f"Failed to parse effect metadata: {e}") meta = {"name": file.filename or "unknown"} if not meta.get("name"): meta["name"] = Path(file.filename).stem if file.filename else "unknown" # Store effect effects_dir = get_effects_dir() effect_dir = effects_dir / content_hash effect_dir.mkdir(parents=True, exist_ok=True) # Write source (effect_dir / "effect.py").write_text(source, encoding="utf-8") # Write metadata full_meta = { "content_hash": content_hash, "meta": meta, "uploader": ctx.actor_id, "uploaded_at": time.strftime("%Y-%m-%dT%H:%M:%SZ", time.gmtime()), "filename": file.filename, } (effect_dir / "metadata.json").write_text(json.dumps(full_meta, indent=2)) logger.info(f"Uploaded effect '{meta.get('name')}' hash={content_hash[:16]}... by {ctx.actor_id}") return { "content_hash": content_hash, "name": meta.get("name"), "version": meta.get("version"), "temporal": meta.get("temporal", False), "params": meta.get("params", []), "dependencies": meta.get("dependencies", []), "uploaded": True, } @router.get("/{content_hash}") async def get_effect( content_hash: str, request: Request, ctx: UserContext = Depends(require_auth), ): """Get effect metadata by hash.""" effects_dir = get_effects_dir() effect_dir = effects_dir / content_hash metadata_path = effect_dir / "metadata.json" if not metadata_path.exists(): raise HTTPException(404, f"Effect {content_hash[:16]}... not found") meta = json.loads(metadata_path.read_text()) if wants_json(request): return meta # HTML response templates = get_templates(request) return render(templates, "effects/detail.html", request, effect=meta, user=ctx, active_tab="effects", ) @router.get("/{content_hash}/source") async def get_effect_source( content_hash: str, ctx: UserContext = Depends(require_auth), ): """Get effect source code.""" effects_dir = get_effects_dir() source_path = effects_dir / content_hash / "effect.py" if not source_path.exists(): raise HTTPException(404, f"Effect {content_hash[:16]}... not found") return PlainTextResponse(source_path.read_text()) @router.get("") async def list_effects( request: Request, ctx: UserContext = Depends(require_auth), ): """List all uploaded effects.""" effects_dir = get_effects_dir() effects = [] if effects_dir.exists(): for effect_dir in effects_dir.iterdir(): if effect_dir.is_dir(): metadata_path = effect_dir / "metadata.json" if metadata_path.exists(): try: meta = json.loads(metadata_path.read_text()) effects.append(meta) except json.JSONDecodeError: pass # Sort by upload time (newest first) effects.sort(key=lambda e: e.get("uploaded_at", ""), reverse=True) if wants_json(request): return {"effects": effects} templates = get_templates(request) return render(templates, "effects/list.html", request, effects=effects, user=ctx, active_tab="effects", ) @router.delete("/{content_hash}") async def delete_effect( content_hash: str, ctx: UserContext = Depends(require_auth), ): """Delete an effect.""" effects_dir = get_effects_dir() effect_dir = effects_dir / content_hash if not effect_dir.exists(): raise HTTPException(404, f"Effect {content_hash[:16]}... not found") # Check ownership metadata_path = effect_dir / "metadata.json" if metadata_path.exists(): meta = json.loads(metadata_path.read_text()) if meta.get("uploader") != ctx.actor_id: raise HTTPException(403, "Can only delete your own effects") import shutil shutil.rmtree(effect_dir) logger.info(f"Deleted effect {content_hash[:16]}... by {ctx.actor_id}") return {"deleted": True}