From 022f88bf0c936bf0effde3f46b7e6dc14e8101cf Mon Sep 17 00:00:00 2001 From: giles Date: Sun, 11 Jan 2026 07:46:15 +0000 Subject: [PATCH] Complete L1 router and template migration MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit - Full implementation of runs, recipes, cache routers with templates - Auth and storage routers fully migrated - Jinja2 templates for all L1 pages - Service layer for auth and storage 🤖 Generated with [Claude Code](https://claude.com/claude-code) Co-Authored-By: Claude Opus 4.5 --- app/__init__.py | 40 +++- app/routers/api.py | 259 +++++++++++++++++++-- app/routers/auth.py | 41 ++-- app/routers/cache.py | 339 +++++++++++++++++++++++++--- app/routers/recipes.py | 248 ++++++++++++++++++-- app/routers/runs.py | 324 ++++++++++++++++++++++++-- app/routers/storage.py | 282 +++++++++++++++++++++-- app/services/auth_service.py | 141 ++++++++++++ app/services/storage_service.py | 228 +++++++++++++++++++ app/templates/base.html | 23 ++ app/templates/cache/detail.html | 110 +++++++++ app/templates/cache/media_list.html | 110 +++++++++ app/templates/home.html | 40 ++++ app/templates/recipes/detail.html | 112 +++++++++ app/templates/recipes/list.html | 55 +++++ app/templates/runs/_run_card.html | 48 ++++ app/templates/runs/detail.html | 219 ++++++++++++++++++ app/templates/runs/list.html | 45 ++++ app/templates/storage/list.html | 90 ++++++++ app/templates/storage/type.html | 152 +++++++++++++ 20 files changed, 2771 insertions(+), 135 deletions(-) create mode 100644 app/services/auth_service.py create mode 100644 app/services/storage_service.py create mode 100644 app/templates/base.html create mode 100644 app/templates/cache/detail.html create mode 100644 app/templates/cache/media_list.html create mode 100644 app/templates/home.html create mode 100644 app/templates/recipes/detail.html create mode 100644 app/templates/recipes/list.html create mode 100644 app/templates/runs/_run_card.html create mode 100644 app/templates/runs/detail.html create mode 100644 app/templates/runs/list.html create mode 100644 app/templates/storage/list.html create mode 100644 app/templates/storage/type.html diff --git a/app/__init__.py b/app/__init__.py index b62a141..e906f7e 100644 --- a/app/__init__.py +++ b/app/__init__.py @@ -33,14 +33,44 @@ def create_app() -> FastAPI: # Include routers from .routers import auth, storage, api, recipes, cache, runs, home - # API routers + # Home and auth routers (root level) + app.include_router(home.router, tags=["home"]) app.include_router(auth.router, prefix="/auth", tags=["auth"]) + + # Feature routers app.include_router(storage.router, prefix="/storage", tags=["storage"]) app.include_router(api.router, prefix="/api", tags=["api"]) - app.include_router(recipes.router, tags=["recipes"]) - app.include_router(cache.router, tags=["cache"]) - app.include_router(runs.router, tags=["runs"]) - app.include_router(home.router, tags=["home"]) + + # Runs router - handles both /runs and /run/{id} patterns + app.include_router(runs.router, prefix="/runs", tags=["runs"]) + # Also mount at /run for single-run detail URLs + from fastapi import APIRouter + run_detail_router = APIRouter() + @run_detail_router.get("/{run_id}") + async def run_detail_redirect(run_id: str, request): + from .routers.runs import run_detail + return await run_detail(run_id, request) + app.include_router(run_detail_router, prefix="/run", tags=["runs"]) + + # Recipes router - handles both /recipes and /recipe/{id} patterns + app.include_router(recipes.router, prefix="/recipes", tags=["recipes"]) + recipe_detail_router = APIRouter() + @recipe_detail_router.get("/{recipe_id}") + async def recipe_detail_redirect(recipe_id: str, request): + from .routers.recipes import get_recipe + return await get_recipe(recipe_id, request) + app.include_router(recipe_detail_router, prefix="/recipe", tags=["recipes"]) + + # Cache router - handles /cache and /media + app.include_router(cache.router, prefix="/cache", tags=["cache"]) + # Also mount media list at /media for convenience + from fastapi import APIRouter as MediaRouter + media_router = MediaRouter() + @media_router.get("") + async def media_list_redirect(request, offset: int = 0, limit: int = 24): + from .routers.cache import list_media + return await list_media(request, offset, limit) + app.include_router(media_router, prefix="/media", tags=["media"]) return app diff --git a/app/routers/api.py b/app/routers/api.py index b186d09..fe4f644 100644 --- a/app/routers/api.py +++ b/app/routers/api.py @@ -4,37 +4,258 @@ Provides the plan/execute/run-recipe endpoints for programmatic access. """ -from fastapi import APIRouter, Request, Depends, HTTPException +import hashlib +import json +import logging +import uuid +from datetime import datetime, timezone +from typing import Dict, List, Optional -from artdag_common.models.requests import PlanRequest, ExecutePlanRequest, RecipeRunRequest +import yaml +from fastapi import APIRouter, Depends, HTTPException +from pydantic import BaseModel -from ..dependencies import require_auth +from ..dependencies import require_auth, get_redis_client, get_cache_manager +from ..services.auth_service import UserContext router = APIRouter() +logger = logging.getLogger(__name__) + +# Redis key prefix +RUNS_KEY_PREFIX = "artdag:run:" -# TODO: Migrate routes from server.py lines 6036-6241 -# - POST /plan - Generate execution plan -# - POST /execute - Execute a plan -# - POST /run-recipe - Run complete recipe +class PlanRequest(BaseModel): + recipe_yaml: str + input_hashes: Dict[str, str] + features: List[str] = ["beats", "energy"] + + +class ExecutePlanRequest(BaseModel): + plan_json: str + run_id: Optional[str] = None + + +class RecipeRunRequest(BaseModel): + recipe_yaml: str + input_hashes: Dict[str, str] + features: List[str] = ["beats", "energy"] + + +def compute_run_id(input_hashes: List[str], recipe: str, recipe_hash: str = None) -> str: + """Compute deterministic run_id from inputs and recipe.""" + data = { + "inputs": sorted(input_hashes), + "recipe": recipe_hash or f"effect:{recipe}", + "version": "1", + } + json_str = json.dumps(data, sort_keys=True, separators=(",", ":")) + return hashlib.sha3_256(json_str.encode()).hexdigest() @router.post("/plan") -async def generate_plan(request: PlanRequest): - """Generate an execution plan from recipe without executing.""" - # TODO: Implement - raise HTTPException(501, "Not yet migrated") +async def generate_plan_endpoint( + request: PlanRequest, + ctx: UserContext = Depends(require_auth), +): + """ + Generate an execution plan without executing it. + + Phase 1 (Analyze) + Phase 2 (Plan) of the 3-phase model. + Returns the plan with cache status for each step. + """ + from tasks.orchestrate import generate_plan + + try: + task = generate_plan.delay( + recipe_yaml=request.recipe_yaml, + input_hashes=request.input_hashes, + features=request.features, + ) + + # Wait for result (plan generation is usually fast) + result = task.get(timeout=60) + + return { + "status": result.get("status"), + "recipe": result.get("recipe"), + "plan_id": result.get("plan_id"), + "total_steps": result.get("total_steps"), + "cached_steps": result.get("cached_steps"), + "pending_steps": result.get("pending_steps"), + "steps": result.get("steps"), + } + except Exception as e: + logger.error(f"Plan generation failed: {e}") + raise HTTPException(status_code=500, detail=str(e)) @router.post("/execute") -async def execute_plan(request: ExecutePlanRequest): - """Execute a previously generated plan.""" - # TODO: Implement - raise HTTPException(501, "Not yet migrated") +async def execute_plan_endpoint( + request: ExecutePlanRequest, + ctx: UserContext = Depends(require_auth), +): + """ + Execute a pre-generated execution plan. + + Phase 3 (Execute) of the 3-phase model. + Submits the plan to Celery for parallel execution. + """ + from tasks.orchestrate import run_plan + + run_id = request.run_id or str(uuid.uuid4()) + + try: + task = run_plan.delay( + plan_json=request.plan_json, + run_id=run_id, + ) + + return { + "status": "submitted", + "run_id": run_id, + "celery_task_id": task.id, + } + except Exception as e: + logger.error(f"Plan execution failed: {e}") + raise HTTPException(status_code=500, detail=str(e)) @router.post("/run-recipe") -async def run_recipe(request: RecipeRunRequest): - """Run a complete recipe through all 3 phases.""" - # TODO: Implement - raise HTTPException(501, "Not yet migrated") +async def run_recipe_endpoint( + request: RecipeRunRequest, + ctx: UserContext = Depends(require_auth), +): + """ + Run a complete recipe through all 3 phases. + + 1. Analyze: Extract features from inputs + 2. Plan: Generate execution plan with cache IDs + 3. Execute: Run steps with parallel execution + + Returns immediately with run_id. Poll /api/run/{run_id} for status. + """ + from tasks.orchestrate import run_recipe + import database + + redis = get_redis_client() + cache = get_cache_manager() + + # Parse recipe name + try: + recipe_data = yaml.safe_load(request.recipe_yaml) + recipe_name = recipe_data.get("name", "unknown") + except Exception: + recipe_name = "unknown" + + # Compute deterministic run_id + run_id = compute_run_id( + list(request.input_hashes.values()), + recipe_name, + hashlib.sha3_256(request.recipe_yaml.encode()).hexdigest() + ) + + # Check if already completed + cached = await database.get_run_cache(run_id) + if cached: + output_hash = cached.get("output_hash") + if cache.has_content(output_hash): + return { + "status": "completed", + "run_id": run_id, + "output_hash": output_hash, + "output_ipfs_cid": cache.get_ipfs_cid(output_hash), + "cached": True, + } + + # Submit to Celery + try: + task = run_recipe.delay( + recipe_yaml=request.recipe_yaml, + input_hashes=request.input_hashes, + features=request.features, + run_id=run_id, + ) + + # Store run status in Redis + run_data = { + "run_id": run_id, + "status": "pending", + "recipe": recipe_name, + "inputs": list(request.input_hashes.values()), + "celery_task_id": task.id, + "created_at": datetime.now(timezone.utc).isoformat(), + "username": ctx.actor_id, + } + redis.setex( + f"{RUNS_KEY_PREFIX}{run_id}", + 86400, + json.dumps(run_data) + ) + + return { + "status": "submitted", + "run_id": run_id, + "celery_task_id": task.id, + "recipe": recipe_name, + } + except Exception as e: + logger.error(f"Recipe run failed: {e}") + raise HTTPException(status_code=500, detail=str(e)) + + +@router.get("/run/{run_id}") +async def get_run_status( + run_id: str, + ctx: UserContext = Depends(require_auth), +): + """Get status of a recipe execution run.""" + import database + from celery.result import AsyncResult + + redis = get_redis_client() + + # Check Redis for run status + run_data = redis.get(f"{RUNS_KEY_PREFIX}{run_id}") + if run_data: + data = json.loads(run_data) + + # If pending, check Celery task status + if data.get("status") == "pending" and data.get("celery_task_id"): + result = AsyncResult(data["celery_task_id"]) + + if result.ready(): + if result.successful(): + task_result = result.get() + data["status"] = task_result.get("status", "completed") + data["output_hash"] = task_result.get("output_cache_id") + data["output_ipfs_cid"] = task_result.get("output_ipfs_cid") + data["total_steps"] = task_result.get("total_steps") + data["cached"] = task_result.get("cached") + data["executed"] = task_result.get("executed") + + # Update Redis + redis.setex( + f"{RUNS_KEY_PREFIX}{run_id}", + 86400, + json.dumps(data) + ) + else: + data["status"] = "failed" + data["error"] = str(result.result) + else: + data["celery_status"] = result.status + + return data + + # Check database cache + cached = await database.get_run_cache(run_id) + if cached: + return { + "run_id": run_id, + "status": "completed", + "output_hash": cached.get("output_hash"), + "cached": True, + } + + raise HTTPException(status_code=404, detail="Run not found") diff --git a/app/routers/auth.py b/app/routers/auth.py index 29a3cc5..3c0da4f 100644 --- a/app/routers/auth.py +++ b/app/routers/auth.py @@ -10,16 +10,18 @@ from fastapi.responses import RedirectResponse from fastapi.security import HTTPBearer, HTTPAuthorizationCredentials from pydantic import BaseModel -# Import auth utilities from existing server module -# TODO: Move these to a service -import sys -import os -sys.path.insert(0, os.path.dirname(os.path.dirname(os.path.dirname(os.path.abspath(__file__))))) +from ..dependencies import get_redis_client +from ..services.auth_service import AuthService router = APIRouter() security = HTTPBearer(auto_error=False) +def get_auth_service(): + """Get auth service instance.""" + return AuthService(get_redis_client()) + + class RevokeUserRequest(BaseModel): """Request to revoke all tokens for a user.""" username: str @@ -27,26 +29,27 @@ class RevokeUserRequest(BaseModel): @router.get("") -async def auth_callback(auth_token: str = None): +async def auth_callback( + request: Request, + auth_token: str = None, + auth_service: AuthService = Depends(get_auth_service), +): """ Receive auth token from L2 redirect and set local cookie. This enables cross-subdomain auth on iOS Safari which blocks shared cookies. L2 redirects here with ?auth_token=... after user logs in. """ - # Import here to avoid circular imports - from server import get_verified_user_context, register_user_token - if not auth_token: return RedirectResponse(url="/", status_code=302) # Verify the token is valid - ctx = await get_verified_user_context(auth_token) + ctx = await auth_service.verify_token_with_l2(auth_token) if not ctx: return RedirectResponse(url="/", status_code=302) # Register token for this user (for revocation by username later) - register_user_token(ctx.username, auth_token) + auth_service.register_user_token(ctx.username, auth_token) # Set local first-party cookie and redirect to runs response = RedirectResponse(url="/runs", status_code=302) @@ -76,41 +79,41 @@ async def logout(): @router.post("/revoke") async def revoke_token( credentials: HTTPAuthorizationCredentials = Depends(security), + auth_service: AuthService = Depends(get_auth_service), ): """ Revoke a token. Called by L2 when user logs out. The token to revoke is passed in the Authorization header. """ - from server import get_user_context_from_token, revoke_token as do_revoke_token - if not credentials: raise HTTPException(401, "No token provided") token = credentials.credentials # Verify token is valid before revoking (ensures caller has the token) - ctx = get_user_context_from_token(token) + ctx = auth_service.get_user_context_from_token(token) if not ctx: raise HTTPException(401, "Invalid token") # Revoke the token - newly_revoked = do_revoke_token(token) + newly_revoked = auth_service.revoke_token(token) return {"revoked": True, "newly_revoked": newly_revoked} @router.post("/revoke-user") -async def revoke_user_tokens(request: RevokeUserRequest): +async def revoke_user_tokens( + request: RevokeUserRequest, + auth_service: AuthService = Depends(get_auth_service), +): """ Revoke all tokens for a user. Called by L2 when user logs out. This handles the case where L2 issued scoped tokens that differ from L2's own token. """ - from server import revoke_all_user_tokens - # Revoke all tokens registered for this user - count = revoke_all_user_tokens(request.username) + count = auth_service.revoke_all_user_tokens(request.username) return { "revoked": True, diff --git a/app/routers/cache.py b/app/routers/cache.py index ecaee8e..d2aed9f 100644 --- a/app/routers/cache.py +++ b/app/routers/cache.py @@ -4,52 +4,333 @@ Cache and media routes for L1 server. Handles content retrieval, metadata, media preview, and publishing. """ -from fastapi import APIRouter, Request, Depends, HTTPException +import logging +from pathlib import Path +from typing import Optional, Dict, Any + +from fastapi import APIRouter, Request, Depends, HTTPException, UploadFile, File from fastapi.responses import HTMLResponse, FileResponse +from pydantic import BaseModel -from artdag_common.middleware import UserContext, wants_html +from artdag_common import render +from artdag_common.middleware import wants_html, wants_json -from ..dependencies import require_auth, get_templates, get_current_user +from ..dependencies import ( + require_auth, get_templates, get_redis_client, + get_cache_manager, get_current_user +) +from ..services.auth_service import UserContext, AuthService +from ..services.cache_service import CacheService router = APIRouter() +logger = logging.getLogger(__name__) -# TODO: Migrate routes from server.py lines 2767-4200 -# - GET /cache/{content_hash} - Get content details -# - GET /cache/{content_hash}/raw - Download raw file -# - GET /cache/{content_hash}/mp4 - Video conversion -# - GET /cache/{content_hash}/meta - Get metadata -# - PATCH /cache/{content_hash}/meta - Update metadata -# - POST /cache/{content_hash}/publish - Publish to L2 -# - PATCH /cache/{content_hash}/republish - Republish -# - DELETE /cache/{content_hash} - Delete content -# - POST /cache/import - Import from IPFS -# - POST /cache/upload - Upload content -# - GET /media - Media list +class UpdateMetadataRequest(BaseModel): + title: Optional[str] = None + description: Optional[str] = None + tags: Optional[list] = None + custom: Optional[Dict[str, Any]] = None -@router.get("/cache/{content_hash}") -async def get_cache_item( +def get_cache_service(): + """Get cache service instance.""" + import database + return CacheService(database, get_cache_manager()) + + +@router.get("/{content_hash}") +async def get_cached( content_hash: str, request: Request, + cache_service: CacheService = Depends(get_cache_service), ): - """Get cached content details or serve the file.""" - # TODO: Implement with content negotiation - raise HTTPException(501, "Not yet migrated") + """Get cached content by hash. Content negotiation: HTML for browsers, JSON for APIs.""" + auth_service = AuthService(get_redis_client()) + ctx = auth_service.get_user_from_cookie(request) + + cache_item = await cache_service.get_cache_item(content_hash) + if not cache_item: + if wants_html(request): + templates = get_templates(request) + return render(templates, "cache/not_found.html", request, + content_hash=content_hash, + user=ctx, + active_tab="media", + ) + raise HTTPException(404, f"Content {content_hash} not in cache") + + # JSON response + if wants_json(request): + return cache_item + + # HTML response + if not ctx: + from fastapi.responses import RedirectResponse + return RedirectResponse(url="/auth", status_code=302) + + # Check access + has_access = await cache_service.check_access(content_hash, ctx.actor_id, ctx.username) + if not has_access: + raise HTTPException(403, "Access denied") + + templates = get_templates(request) + return render(templates, "cache/detail.html", request, + cache=cache_item, + user=ctx, + active_tab="media", + ) -@router.get("/cache/{content_hash}/raw") -async def download_raw(content_hash: str): - """Download the raw cached file.""" - # TODO: Implement - raise HTTPException(501, "Not yet migrated") +@router.get("/{content_hash}/raw") +async def get_cached_raw( + content_hash: str, + cache_service: CacheService = Depends(get_cache_service), +): + """Get raw cached content (file download).""" + file_path, media_type, filename = await cache_service.get_raw_file(content_hash) + + if not file_path: + raise HTTPException(404, f"Content {content_hash} not in cache") + + return FileResponse(file_path, media_type=media_type, filename=filename) -@router.get("/media") +@router.get("/{content_hash}/mp4") +async def get_cached_mp4( + content_hash: str, + cache_service: CacheService = Depends(get_cache_service), +): + """Get cached content as MP4 (transcodes MKV on first request).""" + mp4_path, error = await cache_service.get_as_mp4(content_hash) + + if error: + raise HTTPException(400 if "not a video" in error else 404, error) + + return FileResponse(mp4_path, media_type="video/mp4") + + +@router.get("/{content_hash}/meta") +async def get_metadata( + content_hash: str, + ctx: UserContext = Depends(require_auth), + cache_service: CacheService = Depends(get_cache_service), +): + """Get content metadata.""" + meta = await cache_service.get_metadata(content_hash, ctx.actor_id) + if meta is None: + raise HTTPException(404, "Content not found") + return meta + + +@router.patch("/{content_hash}/meta") +async def update_metadata( + content_hash: str, + req: UpdateMetadataRequest, + ctx: UserContext = Depends(require_auth), + cache_service: CacheService = Depends(get_cache_service), +): + """Update content metadata.""" + success, error = await cache_service.update_metadata( + content_hash=content_hash, + actor_id=ctx.actor_id, + title=req.title, + description=req.description, + tags=req.tags, + custom=req.custom, + ) + + if error: + raise HTTPException(400, error) + + return {"updated": True} + + +@router.post("/{content_hash}/publish") +async def publish_content( + content_hash: str, + request: Request, + ctx: UserContext = Depends(require_auth), + cache_service: CacheService = Depends(get_cache_service), +): + """Publish content to L2 and IPFS.""" + ipfs_cid, error = await cache_service.publish_to_l2( + content_hash=content_hash, + actor_id=ctx.actor_id, + l2_server=ctx.l2_server, + auth_token=request.cookies.get("auth_token"), + ) + + if error: + if wants_html(request): + return HTMLResponse(f'{error}') + raise HTTPException(400, error) + + if wants_html(request): + return HTMLResponse(f'Published: {ipfs_cid[:16]}...') + + return {"ipfs_cid": ipfs_cid, "published": True} + + +@router.delete("/{content_hash}") +async def delete_content( + content_hash: str, + ctx: UserContext = Depends(require_auth), + cache_service: CacheService = Depends(get_cache_service), +): + """Delete content from cache.""" + success, error = await cache_service.delete_content(content_hash, ctx.actor_id) + + if error: + raise HTTPException(400 if "Cannot" in error or "pinned" in error else 404, error) + + return {"deleted": True} + + +@router.post("/import") +async def import_from_ipfs( + ipfs_cid: str, + ctx: UserContext = Depends(require_auth), + cache_service: CacheService = Depends(get_cache_service), +): + """Import content from IPFS.""" + content_hash, error = await cache_service.import_from_ipfs(ipfs_cid, ctx.actor_id) + + if error: + raise HTTPException(400, error) + + return {"content_hash": content_hash, "imported": True} + + +@router.post("/upload") +async def upload_content( + file: UploadFile = File(...), + ctx: UserContext = Depends(require_auth), + cache_service: CacheService = Depends(get_cache_service), +): + """Upload content to cache.""" + content = await file.read() + content_hash, error = await cache_service.upload_content( + content=content, + filename=file.filename, + actor_id=ctx.actor_id, + ) + + if error: + raise HTTPException(400, error) + + return {"content_hash": content_hash, "uploaded": True} + + +# Media listing endpoint +@router.get("") async def list_media( request: Request, - user: UserContext = Depends(require_auth), + offset: int = 0, + limit: int = 24, + media_type: Optional[str] = None, + cache_service: CacheService = Depends(get_cache_service), ): """List all media in cache.""" - # TODO: Implement - raise HTTPException(501, "Not yet migrated") + auth_service = AuthService(get_redis_client()) + ctx = auth_service.get_user_from_cookie(request) + + if not ctx: + if wants_json(request): + raise HTTPException(401, "Authentication required") + from fastapi.responses import RedirectResponse + return RedirectResponse(url="/auth", status_code=302) + + items = await cache_service.list_media( + actor_id=ctx.actor_id, + username=ctx.username, + offset=offset, + limit=limit, + media_type=media_type, + ) + has_more = len(items) >= limit + + if wants_json(request): + return {"items": items, "offset": offset, "limit": limit, "has_more": has_more} + + templates = get_templates(request) + return render(templates, "cache/media_list.html", request, + items=items, + user=ctx, + offset=offset, + limit=limit, + has_more=has_more, + active_tab="media", + ) + + +# HTMX metadata form +@router.get("/{content_hash}/meta-form", response_class=HTMLResponse) +async def get_metadata_form( + content_hash: str, + request: Request, + cache_service: CacheService = Depends(get_cache_service), +): + """Get metadata editing form (HTMX).""" + auth_service = AuthService(get_redis_client()) + ctx = auth_service.get_user_from_cookie(request) + + if not ctx: + return HTMLResponse('
Login required
') + + meta = await cache_service.get_metadata(content_hash, ctx.actor_id) + + return HTMLResponse(f''' +

Metadata

+
+
+ + +
+
+ + +
+ +
+ ''') + + +@router.patch("/{content_hash}/meta", response_class=HTMLResponse) +async def update_metadata_htmx( + content_hash: str, + request: Request, + cache_service: CacheService = Depends(get_cache_service), +): + """Update metadata (HTMX form handler).""" + auth_service = AuthService(get_redis_client()) + ctx = auth_service.get_user_from_cookie(request) + + if not ctx: + return HTMLResponse('
Login required
') + + form_data = await request.form() + + success, error = await cache_service.update_metadata( + content_hash=content_hash, + actor_id=ctx.actor_id, + title=form_data.get("title"), + description=form_data.get("description"), + ) + + if error: + return HTMLResponse(f'
{error}
') + + return HTMLResponse(''' +
Metadata saved!
+ + ''') diff --git a/app/routers/recipes.py b/app/routers/recipes.py index 74595b8..ad8be55 100644 --- a/app/routers/recipes.py +++ b/app/routers/recipes.py @@ -4,44 +4,246 @@ Recipe management routes for L1 server. Handles recipe upload, listing, viewing, and execution. """ -from fastapi import APIRouter, Request, Depends, HTTPException +import logging +from typing import List, Optional + +from fastapi import APIRouter, Request, Depends, HTTPException, UploadFile, File from fastapi.responses import HTMLResponse +from pydantic import BaseModel -from artdag_common.middleware import UserContext, wants_html +from artdag_common import render +from artdag_common.middleware import wants_html, wants_json -from ..dependencies import require_auth, get_templates +from ..dependencies import require_auth, get_templates, get_redis_client +from ..services.auth_service import UserContext, AuthService +from ..services.recipe_service import RecipeService router = APIRouter() +logger = logging.getLogger(__name__) -# TODO: Migrate routes from server.py lines 1990-2767 -# - POST /recipes/upload - Upload recipe YAML -# - GET /recipes - List recipes -# - GET /recipes/{recipe_id} - Get recipe details -# - DELETE /recipes/{recipe_id} - Delete recipe -# - POST /recipes/{recipe_id}/run - Run recipe -# - GET /recipe/{recipe_id} - Recipe detail page -# - GET /recipe/{recipe_id}/dag - Recipe DAG visualization -# - POST /ui/recipes/{recipe_id}/run - Run from UI -# - GET /ui/recipes-list - Recipes list UI +class RecipeUploadRequest(BaseModel): + yaml_content: str + name: Optional[str] = None + description: Optional[str] = None -@router.get("/recipes") +def get_recipe_service(): + """Get recipe service instance.""" + return RecipeService(get_redis_client()) + + +@router.post("/upload") +async def upload_recipe( + req: RecipeUploadRequest, + ctx: UserContext = Depends(require_auth), + recipe_service: RecipeService = Depends(get_recipe_service), +): + """Upload a new recipe from YAML.""" + recipe_id, error = await recipe_service.upload_recipe( + yaml_content=req.yaml_content, + uploader=ctx.actor_id, + name=req.name, + description=req.description, + ) + + if error: + raise HTTPException(400, error) + + return {"recipe_id": recipe_id, "message": "Recipe uploaded successfully"} + + +@router.get("") async def list_recipes( request: Request, - user: UserContext = Depends(require_auth), + offset: int = 0, + limit: int = 20, + recipe_service: RecipeService = Depends(get_recipe_service), ): """List available recipes.""" - # TODO: Implement - raise HTTPException(501, "Not yet migrated") + auth_service = AuthService(get_redis_client()) + ctx = auth_service.get_user_from_cookie(request) + + if not ctx: + if wants_json(request): + raise HTTPException(401, "Authentication required") + from fastapi.responses import RedirectResponse + return RedirectResponse(url="/auth", status_code=302) + + recipes = await recipe_service.list_recipes(ctx.actor_id, offset=offset, limit=limit) + + if wants_json(request): + return {"recipes": recipes, "offset": offset, "limit": limit} + + templates = get_templates(request) + return render(templates, "recipes/list.html", request, + recipes=recipes, + user=ctx, + active_tab="recipes", + ) -@router.get("/recipe/{recipe_id}") -async def recipe_detail( +@router.get("/{recipe_id}") +async def get_recipe( recipe_id: str, request: Request, - user: UserContext = Depends(require_auth), + recipe_service: RecipeService = Depends(get_recipe_service), ): - """Recipe detail page with DAG visualization.""" - # TODO: Implement - raise HTTPException(501, "Not yet migrated") + """Get recipe details.""" + auth_service = AuthService(get_redis_client()) + ctx = auth_service.get_user_from_cookie(request) + + if not ctx: + if wants_json(request): + raise HTTPException(401, "Authentication required") + from fastapi.responses import RedirectResponse + return RedirectResponse(url="/auth", status_code=302) + + recipe = await recipe_service.get_recipe(recipe_id) + if not recipe: + raise HTTPException(404, "Recipe not found") + + if wants_json(request): + return recipe + + # Build DAG elements for visualization + dag_elements = [] + node_colors = { + "input": "#3b82f6", + "effect": "#8b5cf6", + "analyze": "#ec4899", + "transform": "#10b981", + "output": "#f59e0b", + } + + for i, step in enumerate(recipe.get("steps", [])): + step_id = step.get("id", f"step-{i}") + dag_elements.append({ + "data": { + "id": step_id, + "label": step.get("name", f"Step {i+1}"), + "color": node_colors.get(step.get("type", "effect"), "#6b7280"), + } + }) + for inp in step.get("inputs", []): + dag_elements.append({ + "data": {"source": inp, "target": step_id} + }) + + templates = get_templates(request) + return render(templates, "recipes/detail.html", request, + recipe=recipe, + dag_elements=dag_elements, + user=ctx, + active_tab="recipes", + ) + + +@router.delete("/{recipe_id}") +async def delete_recipe( + recipe_id: str, + ctx: UserContext = Depends(require_auth), + recipe_service: RecipeService = Depends(get_recipe_service), +): + """Delete a recipe.""" + success, error = await recipe_service.delete_recipe(recipe_id, ctx.actor_id) + if error: + raise HTTPException(400 if "Cannot" in error else 404, error) + return {"deleted": True, "recipe_id": recipe_id} + + +@router.post("/{recipe_id}/run") +async def run_recipe( + recipe_id: str, + inputs: List[str], + ctx: UserContext = Depends(require_auth), + recipe_service: RecipeService = Depends(get_recipe_service), +): + """Run a recipe with given inputs.""" + from ..services.run_service import RunService + from ..dependencies import get_cache_manager + import database + + recipe = await recipe_service.get_recipe(recipe_id) + if not recipe: + raise HTTPException(404, "Recipe not found") + + # Create run using run service + run_service = RunService(database, get_redis_client(), get_cache_manager()) + run, error = await run_service.create_run( + recipe=recipe.get("name", recipe_id), + inputs=inputs, + use_dag=True, + actor_id=ctx.actor_id, + l2_server=ctx.l2_server, + ) + + if error: + raise HTTPException(400, error) + + return { + "run_id": run.run_id, + "status": run.status, + "message": "Recipe execution started", + } + + +@router.get("/{recipe_id}/dag") +async def recipe_dag( + recipe_id: str, + request: Request, + recipe_service: RecipeService = Depends(get_recipe_service), +): + """Get recipe DAG visualization data.""" + recipe = await recipe_service.get_recipe(recipe_id) + if not recipe: + raise HTTPException(404, "Recipe not found") + + dag_elements = [] + node_colors = { + "input": "#3b82f6", + "effect": "#8b5cf6", + "analyze": "#ec4899", + "transform": "#10b981", + "output": "#f59e0b", + } + + for i, step in enumerate(recipe.get("steps", [])): + step_id = step.get("id", f"step-{i}") + dag_elements.append({ + "data": { + "id": step_id, + "label": step.get("name", f"Step {i+1}"), + "color": node_colors.get(step.get("type", "effect"), "#6b7280"), + } + }) + for inp in step.get("inputs", []): + dag_elements.append({ + "data": {"source": inp, "target": step_id} + }) + + return {"elements": dag_elements} + + +@router.delete("/{recipe_id}/ui", response_class=HTMLResponse) +async def ui_discard_recipe( + recipe_id: str, + request: Request, + recipe_service: RecipeService = Depends(get_recipe_service), +): + """HTMX handler: discard a recipe.""" + auth_service = AuthService(get_redis_client()) + ctx = auth_service.get_user_from_cookie(request) + + if not ctx: + return HTMLResponse('
Login required
', status_code=401) + + success, error = await recipe_service.delete_recipe(recipe_id, ctx.actor_id) + + if error: + return HTMLResponse(f'
{error}
') + + return HTMLResponse( + '
Recipe deleted
' + '' + ) diff --git a/app/routers/runs.py b/app/routers/runs.py index b27fce4..aee682b 100644 --- a/app/routers/runs.py +++ b/app/routers/runs.py @@ -4,54 +4,332 @@ Run management routes for L1 server. Handles run creation, status, listing, and detail views. """ +import asyncio +import json +import logging +from datetime import datetime, timezone +from typing import List, Optional, Dict, Any + from fastapi import APIRouter, Request, Depends, HTTPException from fastapi.responses import HTMLResponse +from pydantic import BaseModel -from artdag_common.middleware import UserContext, wants_html +from artdag_common import render +from artdag_common.middleware import wants_html, wants_json -from ..dependencies import require_auth, get_templates, get_current_user +from ..dependencies import ( + require_auth, get_templates, get_current_user, + get_redis_client, get_cache_manager +) +from ..services.auth_service import UserContext +from ..services.run_service import RunService router = APIRouter() +logger = logging.getLogger(__name__) + +RUNS_KEY_PREFIX = "artdag:run:" -# TODO: Migrate routes from server.py lines 675-1789 -# - POST /runs - Create run -# - GET /runs/{run_id} - Get run status -# - DELETE /runs/{run_id} - Delete run -# - GET /runs - List runs -# - GET /run/{run_id} - Run detail page -# - GET /run/{run_id}/plan - Plan visualization -# - GET /run/{run_id}/analysis - Analysis results -# - GET /run/{run_id}/artifacts - Artifacts list +class RunRequest(BaseModel): + recipe: str + inputs: List[str] + output_name: Optional[str] = None + use_dag: bool = True + dag_json: Optional[str] = None -@router.get("/runs") +class RunStatus(BaseModel): + run_id: str + status: str + recipe: str + inputs: List[str] + output_name: Optional[str] = None + created_at: Optional[str] = None + completed_at: Optional[str] = None + output_hash: Optional[str] = None + username: Optional[str] = None + provenance_cid: Optional[str] = None + celery_task_id: Optional[str] = None + error: Optional[str] = None + plan_id: Optional[str] = None + plan_name: Optional[str] = None + step_results: Optional[Dict[str, Any]] = None + all_outputs: Optional[List[str]] = None + effects_commit: Optional[str] = None + effect_url: Optional[str] = None + infrastructure: Optional[Dict[str, Any]] = None + + +def get_run_service(): + """Get run service instance.""" + import database + return RunService(database, get_redis_client(), get_cache_manager()) + + +@router.post("", response_model=RunStatus) +async def create_run( + request: RunRequest, + ctx: UserContext = Depends(require_auth), + run_service: RunService = Depends(get_run_service), +): + """Start a new rendering run. Checks cache before executing.""" + run, error = await run_service.create_run( + recipe=request.recipe, + inputs=request.inputs, + output_name=request.output_name, + use_dag=request.use_dag, + dag_json=request.dag_json, + actor_id=ctx.actor_id, + l2_server=ctx.l2_server, + ) + + if error: + raise HTTPException(400, error) + + return run + + +@router.get("/{run_id}", response_model=RunStatus) +async def get_run( + run_id: str, + run_service: RunService = Depends(get_run_service), +): + """Get status of a run.""" + run = await run_service.get_run(run_id) + if not run: + raise HTTPException(404, f"Run {run_id} not found") + return run + + +@router.delete("/{run_id}") +async def discard_run( + run_id: str, + ctx: UserContext = Depends(require_auth), + run_service: RunService = Depends(get_run_service), +): + """Discard (delete) a run and its outputs.""" + success, error = await run_service.discard_run(run_id, ctx.actor_id, ctx.username) + if error: + raise HTTPException(400 if "Cannot" in error else 404, error) + return {"discarded": True, "run_id": run_id} + + +@router.get("") async def list_runs( request: Request, - user: UserContext = Depends(require_auth), + offset: int = 0, + limit: int = 20, + run_service: RunService = Depends(get_run_service), ): """List all runs for the current user.""" - # TODO: Implement - raise HTTPException(501, "Not yet migrated") + from ..services.auth_service import AuthService + + auth_service = AuthService(get_redis_client()) + ctx = auth_service.get_user_from_cookie(request) + + if not ctx: + if wants_json(request): + raise HTTPException(401, "Authentication required") + from fastapi.responses import RedirectResponse + return RedirectResponse(url="/auth", status_code=302) + + runs = await run_service.list_runs(ctx.actor_id, offset=offset, limit=limit) + has_more = len(runs) >= limit + + if wants_json(request): + return {"runs": runs, "offset": offset, "limit": limit, "has_more": has_more} + + templates = get_templates(request) + return render(templates, "runs/list.html", request, + runs=runs, + user=ctx, + offset=offset, + limit=limit, + has_more=has_more, + active_tab="runs", + ) -@router.get("/run/{run_id}") +@router.get("/{run_id}/detail") async def run_detail( run_id: str, request: Request, - user: UserContext = Depends(require_auth), + run_service: RunService = Depends(get_run_service), ): """Run detail page with tabs for plan/analysis/artifacts.""" - # TODO: Implement - raise HTTPException(501, "Not yet migrated") + from ..services.auth_service import AuthService + + auth_service = AuthService(get_redis_client()) + ctx = auth_service.get_user_from_cookie(request) + + if not ctx: + if wants_json(request): + raise HTTPException(401, "Authentication required") + from fastapi.responses import RedirectResponse + return RedirectResponse(url="/auth", status_code=302) + + run = await run_service.get_run(run_id) + if not run: + raise HTTPException(404, f"Run {run_id} not found") + + # Get plan and artifacts + plan = await run_service.get_run_plan(run_id) + artifacts = await run_service.get_run_artifacts(run_id) + + # Build DAG elements for visualization + dag_elements = [] + if plan and plan.get("steps"): + node_colors = { + "input": "#3b82f6", + "effect": "#8b5cf6", + "analyze": "#ec4899", + "transform": "#10b981", + "output": "#f59e0b", + } + for i, step in enumerate(plan["steps"]): + dag_elements.append({ + "data": { + "id": step.get("id", f"step-{i}"), + "label": step.get("name", f"Step {i+1}"), + "color": node_colors.get(step.get("type", "effect"), "#6b7280"), + } + }) + # Add edges from inputs + for inp in step.get("inputs", []): + dag_elements.append({ + "data": { + "source": inp, + "target": step.get("id", f"step-{i}"), + } + }) + + if wants_json(request): + return { + "run": run, + "plan": plan, + "artifacts": artifacts, + } + + templates = get_templates(request) + return render(templates, "runs/detail.html", request, + run=run, + plan=plan, + artifacts=artifacts, + dag_elements=dag_elements, + user=ctx, + active_tab="runs", + ) -@router.get("/run/{run_id}/plan") +@router.get("/{run_id}/plan") async def run_plan( run_id: str, request: Request, - user: UserContext = Depends(require_auth), + run_service: RunService = Depends(get_run_service), ): """Plan visualization as interactive DAG.""" - # TODO: Implement - raise HTTPException(501, "Not yet migrated") + from ..services.auth_service import AuthService + + auth_service = AuthService(get_redis_client()) + ctx = auth_service.get_user_from_cookie(request) + + if not ctx: + raise HTTPException(401, "Authentication required") + + plan = await run_service.get_run_plan(run_id) + if not plan: + raise HTTPException(404, "Plan not found for this run") + + if wants_json(request): + return plan + + # Build DAG elements + dag_elements = [] + node_colors = { + "input": "#3b82f6", + "effect": "#8b5cf6", + "analyze": "#ec4899", + "transform": "#10b981", + "output": "#f59e0b", + } + + for i, step in enumerate(plan.get("steps", [])): + step_id = step.get("id", f"step-{i}") + dag_elements.append({ + "data": { + "id": step_id, + "label": step.get("name", f"Step {i+1}"), + "color": node_colors.get(step.get("type", "effect"), "#6b7280"), + } + }) + for inp in step.get("inputs", []): + dag_elements.append({ + "data": {"source": inp, "target": step_id} + }) + + templates = get_templates(request) + return render(templates, "runs/plan.html", request, + run_id=run_id, + plan=plan, + dag_elements=dag_elements, + user=ctx, + active_tab="runs", + ) + + +@router.get("/{run_id}/artifacts") +async def run_artifacts( + run_id: str, + request: Request, + run_service: RunService = Depends(get_run_service), +): + """Get artifacts list for a run.""" + from ..services.auth_service import AuthService + + auth_service = AuthService(get_redis_client()) + ctx = auth_service.get_user_from_cookie(request) + + if not ctx: + raise HTTPException(401, "Authentication required") + + artifacts = await run_service.get_run_artifacts(run_id) + + if wants_json(request): + return {"artifacts": artifacts} + + templates = get_templates(request) + return render(templates, "runs/artifacts.html", request, + run_id=run_id, + artifacts=artifacts, + user=ctx, + active_tab="runs", + ) + + +@router.delete("/{run_id}/ui", response_class=HTMLResponse) +async def ui_discard_run( + run_id: str, + request: Request, + run_service: RunService = Depends(get_run_service), +): + """HTMX handler: discard a run.""" + from ..services.auth_service import AuthService + + auth_service = AuthService(get_redis_client()) + ctx = auth_service.get_user_from_cookie(request) + + if not ctx: + return HTMLResponse( + '
Login required
', + status_code=401 + ) + + success, error = await run_service.discard_run(run_id, ctx.actor_id, ctx.username) + + if error: + return HTMLResponse(f'
{error}
') + + return HTMLResponse( + '
Run discarded
' + '' + ) diff --git a/app/routers/storage.py b/app/routers/storage.py index b594dc1..0ffc4fa 100644 --- a/app/routers/storage.py +++ b/app/routers/storage.py @@ -4,32 +4,280 @@ Storage provider routes for L1 server. Manages user storage backends (Pinata, web3.storage, local, etc.) """ -from fastapi import APIRouter, Request, Depends, HTTPException -from fastapi.responses import HTMLResponse +from typing import Optional, Dict, Any -from artdag_common.middleware import UserContext, wants_html +from fastapi import APIRouter, Request, Depends, HTTPException, Form +from fastapi.responses import HTMLResponse, RedirectResponse +from pydantic import BaseModel -from ..dependencies import require_auth, get_templates +from artdag_common import render +from artdag_common.middleware import wants_html, wants_json + +from ..dependencies import get_database, get_current_user, require_auth, get_templates +from ..services.auth_service import UserContext +from ..services.storage_service import StorageService, STORAGE_PROVIDERS_INFO, VALID_PROVIDER_TYPES router = APIRouter() -# TODO: Migrate routes from server.py lines 5473-5761 -# - GET /storage - List storage providers -# - POST /storage - Add storage provider -# - POST /storage/add - Add via form -# - GET /storage/{storage_id} - Get storage details -# - PATCH /storage/{storage_id} - Update storage -# - DELETE /storage/{storage_id} - Delete storage -# - POST /storage/{storage_id}/test - Test connection -# - GET /storage/type/{provider_type} - Get provider config +# Import storage_providers module +import storage_providers as sp_module + + +def get_storage_service(): + """Get storage service instance.""" + import database + return StorageService(database, sp_module) + + +class AddStorageRequest(BaseModel): + provider_type: str + config: Dict[str, Any] + capacity_gb: int = 5 + provider_name: Optional[str] = None + + +class UpdateStorageRequest(BaseModel): + config: Optional[Dict[str, Any]] = None + capacity_gb: Optional[int] = None + is_active: Optional[bool] = None @router.get("") async def list_storage( request: Request, - user: UserContext = Depends(require_auth), + storage_service: StorageService = Depends(get_storage_service), ): - """List user's storage providers.""" - # TODO: Implement - raise HTTPException(501, "Not yet migrated") + """List user's storage providers. HTML for browsers, JSON for API.""" + from ..services.auth_service import AuthService + from ..dependencies import get_redis_client + + auth_service = AuthService(get_redis_client()) + ctx = auth_service.get_user_from_cookie(request) + + if not ctx: + if wants_json(request): + raise HTTPException(401, "Authentication required") + return RedirectResponse(url="/auth", status_code=302) + + storages = await storage_service.list_storages(ctx.actor_id) + + if wants_json(request): + return {"storages": storages} + + # Render HTML template + templates = get_templates(request) + return render(templates, "storage/list.html", request, + storages=storages, + user=ctx, + providers_info=STORAGE_PROVIDERS_INFO, + ) + + +@router.post("") +async def add_storage( + req: AddStorageRequest, + request: Request, + storage_service: StorageService = Depends(get_storage_service), +): + """Add a storage provider via API.""" + ctx = await require_auth(request) + + storage_id, error = await storage_service.add_storage( + actor_id=ctx.actor_id, + provider_type=req.provider_type, + config=req.config, + capacity_gb=req.capacity_gb, + provider_name=req.provider_name, + ) + + if error: + raise HTTPException(400, error) + + return {"id": storage_id, "message": "Storage provider added"} + + +@router.post("/add") +async def add_storage_form( + request: Request, + provider_type: str = Form(...), + provider_name: Optional[str] = Form(None), + description: Optional[str] = Form(None), + capacity_gb: int = Form(5), + api_key: Optional[str] = Form(None), + secret_key: Optional[str] = Form(None), + api_token: Optional[str] = Form(None), + project_id: Optional[str] = Form(None), + project_secret: Optional[str] = Form(None), + access_key: Optional[str] = Form(None), + bucket: Optional[str] = Form(None), + path: Optional[str] = Form(None), + storage_service: StorageService = Depends(get_storage_service), +): + """Add a storage provider via HTML form.""" + from ..services.auth_service import AuthService + from ..dependencies import get_redis_client + + auth_service = AuthService(get_redis_client()) + ctx = auth_service.get_user_from_cookie(request) + + if not ctx: + return HTMLResponse('
Not authenticated
', status_code=401) + + # Build config from form + form_data = { + "api_key": api_key, + "secret_key": secret_key, + "api_token": api_token, + "project_id": project_id, + "project_secret": project_secret, + "access_key": access_key, + "bucket": bucket, + "path": path, + } + config, error = storage_service.build_config_from_form(provider_type, form_data) + + if error: + return HTMLResponse(f'
{error}
') + + storage_id, error = await storage_service.add_storage( + actor_id=ctx.actor_id, + provider_type=provider_type, + config=config, + capacity_gb=capacity_gb, + provider_name=provider_name, + description=description, + ) + + if error: + return HTMLResponse(f'
{error}
') + + return HTMLResponse(f''' +
Storage provider added successfully!
+ + ''') + + +@router.get("/{storage_id}") +async def get_storage( + storage_id: int, + request: Request, + storage_service: StorageService = Depends(get_storage_service), +): + """Get a specific storage provider.""" + ctx = await require_auth(request) + + storage = await storage_service.get_storage(storage_id, ctx.actor_id) + if not storage: + raise HTTPException(404, "Storage provider not found") + + return storage + + +@router.patch("/{storage_id}") +async def update_storage( + storage_id: int, + req: UpdateStorageRequest, + request: Request, + storage_service: StorageService = Depends(get_storage_service), +): + """Update a storage provider.""" + ctx = await require_auth(request) + + success, error = await storage_service.update_storage( + storage_id=storage_id, + actor_id=ctx.actor_id, + config=req.config, + capacity_gb=req.capacity_gb, + is_active=req.is_active, + ) + + if error: + raise HTTPException(400, error) + + return {"message": "Storage provider updated"} + + +@router.delete("/{storage_id}") +async def delete_storage( + storage_id: int, + request: Request, + storage_service: StorageService = Depends(get_storage_service), +): + """Remove a storage provider.""" + from ..services.auth_service import AuthService + from ..dependencies import get_redis_client + + auth_service = AuthService(get_redis_client()) + ctx = auth_service.get_user_from_cookie(request) + + if not ctx: + raise HTTPException(401, "Not authenticated") + + success, error = await storage_service.delete_storage(storage_id, ctx.actor_id) + + if error: + raise HTTPException(400, error) + + if wants_html(request): + return HTMLResponse("") + + return {"message": "Storage provider removed"} + + +@router.post("/{storage_id}/test") +async def test_storage( + storage_id: int, + request: Request, + storage_service: StorageService = Depends(get_storage_service), +): + """Test storage provider connectivity.""" + from ..services.auth_service import AuthService + from ..dependencies import get_redis_client + + auth_service = AuthService(get_redis_client()) + ctx = auth_service.get_user_from_cookie(request) + + if not ctx: + if wants_html(request): + return HTMLResponse('Not authenticated', status_code=401) + raise HTTPException(401, "Not authenticated") + + success, message = await storage_service.test_storage(storage_id, ctx.actor_id) + + if wants_html(request): + color = "green" if success else "red" + return HTMLResponse(f'{message}') + + return {"success": success, "message": message} + + +@router.get("/type/{provider_type}") +async def storage_type_page( + provider_type: str, + request: Request, + storage_service: StorageService = Depends(get_storage_service), +): + """Page for managing storage configs of a specific type.""" + from ..services.auth_service import AuthService + from ..dependencies import get_redis_client + + auth_service = AuthService(get_redis_client()) + ctx = auth_service.get_user_from_cookie(request) + + if not ctx: + return RedirectResponse(url="/auth", status_code=302) + + if provider_type not in STORAGE_PROVIDERS_INFO: + raise HTTPException(404, "Invalid provider type") + + storages = await storage_service.list_by_type(ctx.actor_id, provider_type) + provider_info = STORAGE_PROVIDERS_INFO[provider_type] + + templates = get_templates(request) + return render(templates, "storage/type.html", request, + provider_type=provider_type, + provider_info=provider_info, + storages=storages, + user=ctx, + ) diff --git a/app/services/auth_service.py b/app/services/auth_service.py new file mode 100644 index 0000000..5515d22 --- /dev/null +++ b/app/services/auth_service.py @@ -0,0 +1,141 @@ +""" +Auth Service - token management and user verification. +""" + +import hashlib +import base64 +import json +from typing import Optional +from dataclasses import dataclass + +import httpx + +from ..config import settings + + +# Token expiry (30 days to match token lifetime) +TOKEN_EXPIRY_SECONDS = 60 * 60 * 24 * 30 + +# Redis key prefixes +REVOKED_KEY_PREFIX = "artdag:revoked:" +USER_TOKENS_PREFIX = "artdag:user_tokens:" + + +@dataclass +class UserContext: + """User context from token.""" + username: str + actor_id: str + token: Optional[str] = None + + +class AuthService: + """Service for authentication and token management.""" + + def __init__(self, redis_client): + self.redis = redis_client + + def register_user_token(self, username: str, token: str) -> None: + """Track a token for a user (for later revocation by username).""" + token_hash = hashlib.sha256(token.encode()).hexdigest() + key = f"{USER_TOKENS_PREFIX}{username}" + self.redis.sadd(key, token_hash) + self.redis.expire(key, TOKEN_EXPIRY_SECONDS) + + def revoke_token(self, token: str) -> bool: + """Add token to revocation set. Returns True if newly revoked.""" + token_hash = hashlib.sha256(token.encode()).hexdigest() + key = f"{REVOKED_KEY_PREFIX}{token_hash}" + result = self.redis.set(key, "1", ex=TOKEN_EXPIRY_SECONDS, nx=True) + return result is not None + + def revoke_token_hash(self, token_hash: str) -> bool: + """Add token hash to revocation set. Returns True if newly revoked.""" + key = f"{REVOKED_KEY_PREFIX}{token_hash}" + result = self.redis.set(key, "1", ex=TOKEN_EXPIRY_SECONDS, nx=True) + return result is not None + + def revoke_all_user_tokens(self, username: str) -> int: + """Revoke all tokens for a user. Returns count revoked.""" + key = f"{USER_TOKENS_PREFIX}{username}" + token_hashes = self.redis.smembers(key) + count = 0 + for token_hash in token_hashes: + if self.revoke_token_hash( + token_hash.decode() if isinstance(token_hash, bytes) else token_hash + ): + count += 1 + self.redis.delete(key) + return count + + def is_token_revoked(self, token: str) -> bool: + """Check if token has been revoked.""" + token_hash = hashlib.sha256(token.encode()).hexdigest() + key = f"{REVOKED_KEY_PREFIX}{token_hash}" + return self.redis.exists(key) > 0 + + def decode_token_claims(self, token: str) -> Optional[dict]: + """Decode JWT claims without verification.""" + try: + parts = token.split(".") + if len(parts) != 3: + return None + payload = parts[1] + # Add padding + padding = 4 - len(payload) % 4 + if padding != 4: + payload += "=" * padding + return json.loads(base64.urlsafe_b64decode(payload)) + except (json.JSONDecodeError, ValueError): + return None + + def get_user_context_from_token(self, token: str) -> Optional[UserContext]: + """Extract user context from a token.""" + if self.is_token_revoked(token): + return None + + claims = self.decode_token_claims(token) + if not claims: + return None + + username = claims.get("username") or claims.get("sub") + actor_id = claims.get("actor_id") or claims.get("actor") + + if not username: + return None + + return UserContext( + username=username, + actor_id=actor_id or f"@{username}", + token=token, + ) + + async def verify_token_with_l2(self, token: str) -> Optional[UserContext]: + """Verify token with L2 server.""" + ctx = self.get_user_context_from_token(token) + if not ctx: + return None + + # If L2 server configured, verify token + if settings.l2_server: + try: + async with httpx.AsyncClient() as client: + resp = await client.get( + f"{settings.l2_server}/auth/verify", + headers={"Authorization": f"Bearer {token}"}, + timeout=5.0, + ) + if resp.status_code != 200: + return None + except httpx.RequestError: + # L2 unavailable, trust the token + pass + + return ctx + + def get_user_from_cookie(self, request) -> Optional[UserContext]: + """Extract user context from auth cookie.""" + token = request.cookies.get("auth_token") + if not token: + return None + return self.get_user_context_from_token(token) diff --git a/app/services/storage_service.py b/app/services/storage_service.py new file mode 100644 index 0000000..b7349a8 --- /dev/null +++ b/app/services/storage_service.py @@ -0,0 +1,228 @@ +""" +Storage Service - business logic for storage provider management. +""" + +import json +from typing import Optional, List, Dict, Any + + +STORAGE_PROVIDERS_INFO = { + "pinata": {"name": "Pinata", "desc": "1GB free, IPFS pinning", "color": "blue"}, + "web3storage": {"name": "web3.storage", "desc": "IPFS + Filecoin", "color": "green"}, + "nftstorage": {"name": "NFT.Storage", "desc": "Free for NFTs", "color": "pink"}, + "infura": {"name": "Infura IPFS", "desc": "5GB free", "color": "orange"}, + "filebase": {"name": "Filebase", "desc": "5GB free, S3+IPFS", "color": "cyan"}, + "storj": {"name": "Storj", "desc": "25GB free", "color": "indigo"}, + "local": {"name": "Local Storage", "desc": "Your own disk", "color": "purple"}, +} + +VALID_PROVIDER_TYPES = list(STORAGE_PROVIDERS_INFO.keys()) + + +class StorageService: + """Service for managing user storage providers.""" + + def __init__(self, database, storage_providers_module): + self.db = database + self.providers = storage_providers_module + + async def list_storages(self, actor_id: str) -> List[Dict[str, Any]]: + """List all storage providers for a user with usage stats.""" + storages = await self.db.get_user_storage(actor_id) + + for storage in storages: + usage = await self.db.get_storage_usage(storage["id"]) + storage["used_bytes"] = usage["used_bytes"] + storage["pin_count"] = usage["pin_count"] + storage["donated_gb"] = storage["capacity_gb"] // 2 + + # Mask sensitive config keys for display + if storage.get("config"): + config = storage["config"] if isinstance(storage["config"], dict) else json.loads(storage["config"]) + masked = {} + for k, v in config.items(): + if "key" in k.lower() or "token" in k.lower() or "secret" in k.lower(): + masked[k] = v[:4] + "..." + v[-4:] if len(str(v)) > 8 else "****" + else: + masked[k] = v + storage["config_display"] = masked + + return storages + + async def get_storage(self, storage_id: int, actor_id: str) -> Optional[Dict[str, Any]]: + """Get a specific storage provider.""" + storage = await self.db.get_storage_by_id(storage_id) + if not storage: + return None + if storage["actor_id"] != actor_id: + return None + + usage = await self.db.get_storage_usage(storage_id) + storage["used_bytes"] = usage["used_bytes"] + storage["pin_count"] = usage["pin_count"] + storage["donated_gb"] = storage["capacity_gb"] // 2 + + return storage + + async def add_storage( + self, + actor_id: str, + provider_type: str, + config: Dict[str, Any], + capacity_gb: int = 5, + provider_name: Optional[str] = None, + description: Optional[str] = None, + ) -> tuple[Optional[int], Optional[str]]: + """Add a new storage provider. Returns (storage_id, error_message).""" + if provider_type not in VALID_PROVIDER_TYPES: + return None, f"Invalid provider type: {provider_type}" + + # Test connection before saving + provider = self.providers.create_provider(provider_type, { + **config, + "capacity_gb": capacity_gb + }) + if not provider: + return None, "Failed to create provider with given config" + + success, message = await provider.test_connection() + if not success: + return None, f"Provider connection failed: {message}" + + # Generate name if not provided + if not provider_name: + existing = await self.db.get_user_storage_by_type(actor_id, provider_type) + provider_name = f"{provider_type}-{len(existing) + 1}" + + storage_id = await self.db.add_user_storage( + actor_id=actor_id, + provider_type=provider_type, + provider_name=provider_name, + config=config, + capacity_gb=capacity_gb, + description=description + ) + + if not storage_id: + return None, "Failed to save storage provider" + + return storage_id, None + + async def update_storage( + self, + storage_id: int, + actor_id: str, + config: Optional[Dict[str, Any]] = None, + capacity_gb: Optional[int] = None, + is_active: Optional[bool] = None, + ) -> tuple[bool, Optional[str]]: + """Update a storage provider. Returns (success, error_message).""" + storage = await self.db.get_storage_by_id(storage_id) + if not storage: + return False, "Storage provider not found" + if storage["actor_id"] != actor_id: + return False, "Not authorized" + + # Test new config if provided + if config: + existing_config = storage["config"] if isinstance(storage["config"], dict) else json.loads(storage["config"]) + new_config = {**existing_config, **config} + provider = self.providers.create_provider(storage["provider_type"], { + **new_config, + "capacity_gb": capacity_gb or storage["capacity_gb"] + }) + if provider: + success, message = await provider.test_connection() + if not success: + return False, f"Provider connection failed: {message}" + + success = await self.db.update_user_storage( + storage_id, + config=config, + capacity_gb=capacity_gb, + is_active=is_active + ) + + return success, None if success else "Failed to update storage provider" + + async def delete_storage(self, storage_id: int, actor_id: str) -> tuple[bool, Optional[str]]: + """Delete a storage provider. Returns (success, error_message).""" + storage = await self.db.get_storage_by_id(storage_id) + if not storage: + return False, "Storage provider not found" + if storage["actor_id"] != actor_id: + return False, "Not authorized" + + success = await self.db.remove_user_storage(storage_id) + return success, None if success else "Failed to remove storage provider" + + async def test_storage(self, storage_id: int, actor_id: str) -> tuple[bool, str]: + """Test storage provider connectivity. Returns (success, message).""" + storage = await self.db.get_storage_by_id(storage_id) + if not storage: + return False, "Storage not found" + if storage["actor_id"] != actor_id: + return False, "Not authorized" + + config = storage["config"] if isinstance(storage["config"], dict) else json.loads(storage["config"]) + provider = self.providers.create_provider(storage["provider_type"], { + **config, + "capacity_gb": storage["capacity_gb"] + }) + + if not provider: + return False, "Failed to create provider" + + return await provider.test_connection() + + async def list_by_type(self, actor_id: str, provider_type: str) -> List[Dict[str, Any]]: + """List storage providers of a specific type.""" + return await self.db.get_user_storage_by_type(actor_id, provider_type) + + def build_config_from_form(self, provider_type: str, form_data: Dict[str, Any]) -> tuple[Optional[Dict], Optional[str]]: + """Build provider config from form data. Returns (config, error).""" + api_key = form_data.get("api_key") + secret_key = form_data.get("secret_key") + api_token = form_data.get("api_token") + project_id = form_data.get("project_id") + project_secret = form_data.get("project_secret") + access_key = form_data.get("access_key") + bucket = form_data.get("bucket") + path = form_data.get("path") + + if provider_type == "pinata": + if not api_key or not secret_key: + return None, "Pinata requires API Key and Secret Key" + return {"api_key": api_key, "secret_key": secret_key}, None + + elif provider_type == "web3storage": + if not api_token: + return None, "web3.storage requires API Token" + return {"api_token": api_token}, None + + elif provider_type == "nftstorage": + if not api_token: + return None, "NFT.Storage requires API Token" + return {"api_token": api_token}, None + + elif provider_type == "infura": + if not project_id or not project_secret: + return None, "Infura requires Project ID and Project Secret" + return {"project_id": project_id, "project_secret": project_secret}, None + + elif provider_type == "filebase": + if not access_key or not secret_key or not bucket: + return None, "Filebase requires Access Key, Secret Key, and Bucket" + return {"access_key": access_key, "secret_key": secret_key, "bucket": bucket}, None + + elif provider_type == "storj": + if not access_key or not secret_key or not bucket: + return None, "Storj requires Access Key, Secret Key, and Bucket" + return {"access_key": access_key, "secret_key": secret_key, "bucket": bucket}, None + + elif provider_type == "local": + if not path: + return None, "Local storage requires a path" + return {"path": path}, None + + return None, f"Unknown provider type: {provider_type}" diff --git a/app/templates/base.html b/app/templates/base.html new file mode 100644 index 0000000..c8cf903 --- /dev/null +++ b/app/templates/base.html @@ -0,0 +1,23 @@ +{% extends "base.html" %} + +{% block brand %}Art-DAG L1{% endblock %} + +{% block nav_items %} + +{% endblock %} + +{% block nav_right %} +{% if user %} +
+ {{ user.username }} + Logout +
+{% else %} +Login +{% endif %} +{% endblock %} diff --git a/app/templates/cache/detail.html b/app/templates/cache/detail.html new file mode 100644 index 0000000..fb2166b --- /dev/null +++ b/app/templates/cache/detail.html @@ -0,0 +1,110 @@ +{% extends "base.html" %} + +{% block title %}{{ cache.hash[:16] }} - Cache - Art-DAG L1{% endblock %} + +{% block content %} +
+ +
+ ← Media +

{{ cache.hash[:24] }}...

+
+ + +
+ {% if cache.media_type and cache.media_type.startswith('image/') %} + + + {% elif cache.media_type and cache.media_type.startswith('video/') %} + + + {% elif cache.media_type and cache.media_type.startswith('audio/') %} +
+ +
+ + {% elif cache.media_type == 'application/json' %} +
+
{{ cache.content_preview }}
+
+ + {% else %} +
+
{{ cache.media_type or 'Unknown type' }}
+
{{ cache.size_bytes | filesizeformat if cache.size_bytes else 'Unknown size' }}
+
+ {% endif %} +
+ + +
+
+
Hash
+
{{ cache.hash }}
+
+
+
Content Type
+
{{ cache.media_type or 'Unknown' }}
+
+
+
Size
+
{{ cache.size_bytes | filesizeformat if cache.size_bytes else 'Unknown' }}
+
+
+
Created
+
{{ cache.created_at or 'Unknown' }}
+
+
+ + + {% if cache.ipfs_cid %} +
+
IPFS CID
+
+ {{ cache.ipfs_cid }} + + View on IPFS Gateway → + +
+
+ {% endif %} + + + {% if cache.runs %} +

Related Runs

+
+ {% for run in cache.runs %} + +
+ {{ run.run_id[:16] }}... + {{ run.created_at }} +
+
+ {% endfor %} +
+ {% endif %} + + +
+ + Download + + {% if not cache.ipfs_cid %} + + + {% endif %} +
+
+{% endblock %} diff --git a/app/templates/cache/media_list.html b/app/templates/cache/media_list.html new file mode 100644 index 0000000..ce68f83 --- /dev/null +++ b/app/templates/cache/media_list.html @@ -0,0 +1,110 @@ +{% extends "base.html" %} + +{% block title %}Media - Art-DAG L1{% endblock %} + +{% block content %} + + + +{% endblock %} diff --git a/app/templates/home.html b/app/templates/home.html new file mode 100644 index 0000000..8c1c99e --- /dev/null +++ b/app/templates/home.html @@ -0,0 +1,40 @@ +{% extends "base.html" %} + +{% block title %}Art-DAG L1{% endblock %} + +{% block content %} +
+

Art-DAG L1

+

Content-Addressable Media Processing

+ + + + {% if not user %} +
+

Sign in through your L2 server to access all features.

+ Sign In → +
+ {% endif %} +
+{% endblock %} diff --git a/app/templates/recipes/detail.html b/app/templates/recipes/detail.html new file mode 100644 index 0000000..839781a --- /dev/null +++ b/app/templates/recipes/detail.html @@ -0,0 +1,112 @@ +{% extends "base.html" %} + +{% block title %}{{ recipe.name }} - Recipe - Art-DAG L1{% endblock %} + +{% block head %} +{{ super() }} + +{% endblock %} + +{% block content %} +
+ +
+ ← Recipes +

{{ recipe.name }}

+ {% if recipe.version %} + v{{ recipe.version }} + {% endif %} +
+ + {% if recipe.description %} +

{{ recipe.description }}

+ {% endif %} + + +
+
+ Pipeline DAG + {{ recipe.steps | length }} steps +
+
+
+ + +

Steps

+
+ {% for step in recipe.steps %} + {% set colors = { + 'effect': 'blue', + 'analyze': 'purple', + 'transform': 'green', + 'combine': 'orange', + 'output': 'cyan' + } %} + {% set color = colors.get(step.type, 'gray') %} + +
+
+
+ + {{ loop.index }} + + {{ step.name }} + + {{ step.type }} + +
+
+ + {% if step.inputs %} +
+ Inputs: {{ step.inputs | join(', ') }} +
+ {% endif %} + + {% if step.params %} +
+ {{ step.params | tojson }} +
+ {% endif %} +
+ {% endfor %} +
+ + +

Source

+
+
{{ recipe.yaml }}
+
+
+ + +{% endblock %} diff --git a/app/templates/recipes/list.html b/app/templates/recipes/list.html new file mode 100644 index 0000000..feef25d --- /dev/null +++ b/app/templates/recipes/list.html @@ -0,0 +1,55 @@ +{% extends "base.html" %} + +{% block title %}Recipes - Art-DAG L1{% endblock %} + +{% block content %} +
+
+

Recipes

+
+ +

+ Recipes define processing pipelines for audio and media. Each recipe is a DAG of effects. +

+ + {% if recipes %} + + {% else %} +
+

No recipes available.

+

Recipes are defined in YAML format and submitted via API.

+
+ {% endif %} +
+{% endblock %} diff --git a/app/templates/runs/_run_card.html b/app/templates/runs/_run_card.html new file mode 100644 index 0000000..4e1484a --- /dev/null +++ b/app/templates/runs/_run_card.html @@ -0,0 +1,48 @@ +{# Run card partial - expects 'run' variable #} +{% set status_colors = { + 'completed': 'green', + 'running': 'blue', + 'pending': 'yellow', + 'failed': 'red', + 'cached': 'purple' +} %} +{% set color = status_colors.get(run.status, 'gray') %} + + +
+
+ {{ run.run_id[:12] }}... + + {{ run.status }} + + {% if run.cached %} + cached + {% endif %} +
+ {{ run.created_at }} +
+ +
+
+ + Recipe: {{ run.recipe or 'Unknown' }} + + {% if run.total_steps %} + + Steps: {{ run.executed or 0 }}/{{ run.total_steps }} + + {% endif %} +
+ + {% if run.output_hash %} + {{ run.output_hash[:16] }}... + {% endif %} +
+ + {% if run.inputs %} +
+ Inputs: {{ run.inputs | length }} file(s) +
+ {% endif %} +
diff --git a/app/templates/runs/detail.html b/app/templates/runs/detail.html new file mode 100644 index 0000000..38f608a --- /dev/null +++ b/app/templates/runs/detail.html @@ -0,0 +1,219 @@ +{% extends "base.html" %} + +{% block title %}Run {{ run.run_id[:12] }} - Art-DAG L1{% endblock %} + +{% block head %} +{{ super() }} + +{% endblock %} + +{% block content %} +{% set status_colors = {'completed': 'green', 'running': 'blue', 'pending': 'yellow', 'failed': 'red'} %} +{% set color = status_colors.get(run.status, 'gray') %} + +
+ +
+ ← Runs +

{{ run.run_id[:16] }}...

+ + {{ run.status }} + + {% if run.cached %} + Cached + {% endif %} +
+ + +
+
+
Recipe
+
{{ run.recipe or 'Unknown' }}
+
+
+
Steps
+
+ {{ run.executed or 0 }} / {{ run.total_steps or '?' }} + {% if run.cached_steps %} + ({{ run.cached_steps }} cached) + {% endif %} +
+
+
+
Created
+
{{ run.created_at }}
+
+
+
User
+
{{ run.username or 'Unknown' }}
+
+
+ + +
+ +
+ + +
+ {% if plan %} +
+ +
+ {% for step in plan.steps %} + {% set step_color = 'green' if step.cached else ('blue' if step.status == 'running' else 'gray') %} +
+
+ + {{ loop.index }} + + {{ step.name }} + {{ step.type }} +
+
+ {% if step.cached %} + cached + {% endif %} + {% if step.cache_id %} + + {{ step.cache_id[:12] }}... + + {% endif %} +
+
+ {% endfor %} +
+ {% else %} +

No plan available for this run.

+ {% endif %} +
+ + + + + + + + + + + + {% if run.output_hash %} +
+

Output

+
+ + {{ run.output_hash }} + + {% if run.output_ipfs_cid %} + + IPFS: {{ run.output_ipfs_cid[:16] }}... + + {% endif %} +
+
+ {% endif %} +
+ + +{% endblock %} diff --git a/app/templates/runs/list.html b/app/templates/runs/list.html new file mode 100644 index 0000000..8d72415 --- /dev/null +++ b/app/templates/runs/list.html @@ -0,0 +1,45 @@ +{% extends "base.html" %} + +{% block title %}Runs - Art-DAG L1{% endblock %} + +{% block content %} +
+
+

Execution Runs

+ Browse Recipes → +
+ + {% if runs %} +
+ {% for run in runs %} + {% include "runs/_run_card.html" %} + {% endfor %} +
+ + {% if has_more %} +
+ Loading more... +
+ {% endif %} + + {% else %} +
+
+ + + +

No runs yet

+
+

Execute a recipe to see your runs here.

+ + Browse Recipes + +
+ {% endif %} +
+{% endblock %} diff --git a/app/templates/storage/list.html b/app/templates/storage/list.html new file mode 100644 index 0000000..a33f98a --- /dev/null +++ b/app/templates/storage/list.html @@ -0,0 +1,90 @@ +{% extends "base.html" %} + +{% block title %}Storage Providers - Art-DAG L1{% endblock %} + +{% block content %} +
+

Storage Providers

+ +

+ Configure your IPFS pinning services. Data is pinned to your accounts, giving you full control. +

+ + + + + + {% if storages %} +

Your Storage Providers

+
+ {% for storage in storages %} + {% set info = providers_info.get(storage.provider_type, {'name': storage.provider_type, 'color': 'gray'}) %} +
+
+
+ {{ storage.provider_name or info.name }} + {% if storage.is_active %} + Active + {% else %} + Inactive + {% endif %} +
+
+ + +
+
+ +
+
+ Capacity: + {{ storage.capacity_gb }} GB +
+
+ Used: + {{ (storage.used_bytes / 1024 / 1024 / 1024) | round(2) }} GB +
+
+ Pins: + {{ storage.pin_count }} +
+
+ +
+
+ {% endfor %} +
+ {% else %} +
+

No storage providers configured yet.

+

Click on a provider above to add your first one.

+
+ {% endif %} +
+{% endblock %} diff --git a/app/templates/storage/type.html b/app/templates/storage/type.html new file mode 100644 index 0000000..851c633 --- /dev/null +++ b/app/templates/storage/type.html @@ -0,0 +1,152 @@ +{% extends "base.html" %} + +{% block title %}{{ provider_info.name }} - Storage - Art-DAG L1{% endblock %} + +{% block content %} +
+
+ ← All Providers +

{{ provider_info.name }}

+
+ +

{{ provider_info.desc }}

+ + +
+

Add {{ provider_info.name }} Account

+ +
+ + +
+ + +
+ + {% if provider_type == 'pinata' %} +
+
+ + +
+
+ + +
+
+ + {% elif provider_type in ['web3storage', 'nftstorage'] %} +
+ + +
+ + {% elif provider_type == 'infura' %} +
+
+ + +
+
+ + +
+
+ + {% elif provider_type in ['filebase', 'storj'] %} +
+
+ + +
+
+ + +
+
+
+ + +
+ + {% elif provider_type == 'local' %} +
+ + +
+ {% endif %} + +
+ + +
+ +
+ +
+ +
+
+
+ + + {% if storages %} +

Configured Accounts

+
+ {% for storage in storages %} +
+
+
+ {{ storage.provider_name }} + {% if storage.is_active %} + Active + {% endif %} +
+
+ + +
+
+ + {% if storage.config_display %} +
+ {% for key, value in storage.config_display.items() %} + {{ key }}: {{ value }} + {% endfor %} +
+ {% endif %} + +
+
+ {% endfor %} +
+ {% endif %} +
+{% endblock %}