diff --git a/app/__init__.py b/app/__init__.py new file mode 100644 index 0000000..b62a141 --- /dev/null +++ b/app/__init__.py @@ -0,0 +1,49 @@ +""" +Art-DAG L1 Server Application Factory. + +Creates and configures the FastAPI application with all routers and middleware. +""" + +from pathlib import Path +from fastapi import FastAPI +from fastapi.staticfiles import StaticFiles + +from artdag_common import create_jinja_env + +from .config import settings + + +def create_app() -> FastAPI: + """ + Create and configure the L1 FastAPI application. + + Returns: + Configured FastAPI instance + """ + app = FastAPI( + title="Art-DAG L1 Server", + description="Content-addressed media processing with distributed execution", + version="1.0.0", + ) + + # Initialize Jinja2 templates + template_dir = Path(__file__).parent / "templates" + app.state.templates = create_jinja_env(template_dir) + + # Include routers + from .routers import auth, storage, api, recipes, cache, runs, home + + # API routers + app.include_router(auth.router, prefix="/auth", tags=["auth"]) + app.include_router(storage.router, prefix="/storage", tags=["storage"]) + app.include_router(api.router, prefix="/api", tags=["api"]) + app.include_router(recipes.router, tags=["recipes"]) + app.include_router(cache.router, tags=["cache"]) + app.include_router(runs.router, tags=["runs"]) + app.include_router(home.router, tags=["home"]) + + return app + + +# Create the default app instance +app = create_app() diff --git a/app/config.py b/app/config.py new file mode 100644 index 0000000..4e8a7cc --- /dev/null +++ b/app/config.py @@ -0,0 +1,72 @@ +""" +L1 Server Configuration. + +Environment-based configuration with sensible defaults. +""" + +import os +from pathlib import Path +from dataclasses import dataclass, field +from typing import Optional + + +@dataclass +class Settings: + """Application settings loaded from environment.""" + + # Server + host: str = field(default_factory=lambda: os.environ.get("HOST", "0.0.0.0")) + port: int = field(default_factory=lambda: int(os.environ.get("PORT", "8000"))) + debug: bool = field(default_factory=lambda: os.environ.get("DEBUG", "").lower() == "true") + + # Cache + cache_dir: Path = field( + default_factory=lambda: Path(os.environ.get("CACHE_DIR", "/data/cache")) + ) + + # Redis + redis_url: str = field( + default_factory=lambda: os.environ.get("REDIS_URL", "redis://localhost:6379/5") + ) + + # Database + database_url: str = field( + default_factory=lambda: os.environ.get( + "DATABASE_URL", "postgresql://artdag:artdag@localhost:5432/artdag" + ) + ) + + # IPFS + ipfs_api: str = field( + default_factory=lambda: os.environ.get("IPFS_API", "/dns/localhost/tcp/5001") + ) + ipfs_gateway_url: str = field( + default_factory=lambda: os.environ.get("IPFS_GATEWAY_URL", "https://ipfs.io/ipfs") + ) + + # L2 Server + l2_server: Optional[str] = field( + default_factory=lambda: os.environ.get("L2_SERVER") + ) + l2_domain: Optional[str] = field( + default_factory=lambda: os.environ.get("L2_DOMAIN") + ) + + # Derived paths + @property + def plan_cache_dir(self) -> Path: + return self.cache_dir / "plans" + + @property + def analysis_cache_dir(self) -> Path: + return self.cache_dir / "analysis" + + def ensure_dirs(self) -> None: + """Create required directories.""" + self.cache_dir.mkdir(parents=True, exist_ok=True) + self.plan_cache_dir.mkdir(parents=True, exist_ok=True) + self.analysis_cache_dir.mkdir(parents=True, exist_ok=True) + + +# Singleton settings instance +settings = Settings() diff --git a/app/dependencies.py b/app/dependencies.py new file mode 100644 index 0000000..7a685f0 --- /dev/null +++ b/app/dependencies.py @@ -0,0 +1,135 @@ +""" +FastAPI dependency injection container. + +Provides shared resources and services to route handlers. +""" + +from functools import lru_cache +from typing import Optional +import asyncio + +from fastapi import Request, Depends, HTTPException +from jinja2 import Environment + +from artdag_common.middleware.auth import UserContext, get_user_from_cookie, get_user_from_header + +from .config import settings + + +# Lazy imports to avoid circular dependencies +_redis_client = None +_cache_manager = None +_database = None + + +def get_redis_client(): + """Get the Redis client singleton.""" + global _redis_client + if _redis_client is None: + import redis + _redis_client = redis.from_url(settings.redis_url, decode_responses=True) + return _redis_client + + +def get_cache_manager(): + """Get the cache manager singleton.""" + global _cache_manager + if _cache_manager is None: + from cache_manager import get_cache_manager as _get_cache_manager + _cache_manager = _get_cache_manager() + return _cache_manager + + +def get_database(): + """Get the database singleton.""" + global _database + if _database is None: + import database + _database = database + return _database + + +def get_templates(request: Request) -> Environment: + """Get the Jinja2 environment from app state.""" + return request.app.state.templates + + +async def get_current_user(request: Request) -> Optional[UserContext]: + """ + Get the current user from request (cookie or header). + + This is a permissive dependency - returns None if not authenticated. + Use require_auth for routes that require authentication. + """ + # Try header first (API clients) + ctx = get_user_from_header(request) + if ctx: + return ctx + + # Fall back to cookie (browser) + return get_user_from_cookie(request) + + +async def require_auth(request: Request) -> UserContext: + """ + Require authentication for a route. + + Raises: + HTTPException 401 if not authenticated + HTTPException 302 redirect to login for HTML requests + """ + ctx = await get_current_user(request) + if ctx is None: + # Check if HTML request for redirect + accept = request.headers.get("accept", "") + if "text/html" in accept: + raise HTTPException( + status_code=302, + headers={"Location": "/login"} + ) + raise HTTPException(status_code=401, detail="Authentication required") + return ctx + + +async def get_user_context_from_cookie(request: Request) -> Optional[UserContext]: + """ + Legacy compatibility: get user from cookie. + + Validates token with L2 server if configured. + """ + ctx = get_user_from_cookie(request) + if ctx is None: + return None + + # If L2 server configured, could validate token here + # For now, trust the cookie + return ctx + + +# Service dependencies (lazy loading) + +def get_run_service(): + """Get the run service.""" + from .services.run_service import RunService + return RunService( + redis=get_redis_client(), + cache=get_cache_manager(), + ) + + +def get_recipe_service(): + """Get the recipe service.""" + from .services.recipe_service import RecipeService + return RecipeService( + redis=get_redis_client(), + cache=get_cache_manager(), + ) + + +def get_cache_service(): + """Get the cache service.""" + from .services.cache_service import CacheService + return CacheService( + cache_manager=get_cache_manager(), + database=get_database(), + ) diff --git a/app/repositories/__init__.py b/app/repositories/__init__.py new file mode 100644 index 0000000..7985294 --- /dev/null +++ b/app/repositories/__init__.py @@ -0,0 +1,10 @@ +""" +L1 Server Repositories. + +Data access layer for persistence operations. +""" + +# TODO: Implement repositories +# - RunRepository - Redis-backed run storage +# - RecipeRepository - Redis-backed recipe storage +# - CacheRepository - Filesystem + PostgreSQL cache metadata diff --git a/app/routers/__init__.py b/app/routers/__init__.py new file mode 100644 index 0000000..f0a9d54 --- /dev/null +++ b/app/routers/__init__.py @@ -0,0 +1,23 @@ +""" +L1 Server Routers. + +Each router handles a specific domain of functionality. +""" + +from . import auth +from . import storage +from . import api +from . import recipes +from . import cache +from . import runs +from . import home + +__all__ = [ + "auth", + "storage", + "api", + "recipes", + "cache", + "runs", + "home", +] diff --git a/app/routers/api.py b/app/routers/api.py new file mode 100644 index 0000000..b186d09 --- /dev/null +++ b/app/routers/api.py @@ -0,0 +1,40 @@ +""" +3-phase API routes for L1 server. + +Provides the plan/execute/run-recipe endpoints for programmatic access. +""" + +from fastapi import APIRouter, Request, Depends, HTTPException + +from artdag_common.models.requests import PlanRequest, ExecutePlanRequest, RecipeRunRequest + +from ..dependencies import require_auth + +router = APIRouter() + + +# TODO: Migrate routes from server.py lines 6036-6241 +# - POST /plan - Generate execution plan +# - POST /execute - Execute a plan +# - POST /run-recipe - Run complete recipe + + +@router.post("/plan") +async def generate_plan(request: PlanRequest): + """Generate an execution plan from recipe without executing.""" + # TODO: Implement + raise HTTPException(501, "Not yet migrated") + + +@router.post("/execute") +async def execute_plan(request: ExecutePlanRequest): + """Execute a previously generated plan.""" + # TODO: Implement + raise HTTPException(501, "Not yet migrated") + + +@router.post("/run-recipe") +async def run_recipe(request: RecipeRunRequest): + """Run a complete recipe through all 3 phases.""" + # TODO: Implement + raise HTTPException(501, "Not yet migrated") diff --git a/app/routers/auth.py b/app/routers/auth.py new file mode 100644 index 0000000..29a3cc5 --- /dev/null +++ b/app/routers/auth.py @@ -0,0 +1,119 @@ +""" +Authentication routes for L1 server. + +L1 doesn't handle login directly - users log in at their L2 server. +Token is passed via URL from L2 redirect, then L1 sets its own cookie. +""" + +from fastapi import APIRouter, Depends, HTTPException, Request +from fastapi.responses import RedirectResponse +from fastapi.security import HTTPBearer, HTTPAuthorizationCredentials +from pydantic import BaseModel + +# Import auth utilities from existing server module +# TODO: Move these to a service +import sys +import os +sys.path.insert(0, os.path.dirname(os.path.dirname(os.path.dirname(os.path.abspath(__file__))))) + +router = APIRouter() +security = HTTPBearer(auto_error=False) + + +class RevokeUserRequest(BaseModel): + """Request to revoke all tokens for a user.""" + username: str + l2_server: str + + +@router.get("") +async def auth_callback(auth_token: str = None): + """ + Receive auth token from L2 redirect and set local cookie. + + This enables cross-subdomain auth on iOS Safari which blocks shared cookies. + L2 redirects here with ?auth_token=... after user logs in. + """ + # Import here to avoid circular imports + from server import get_verified_user_context, register_user_token + + if not auth_token: + return RedirectResponse(url="/", status_code=302) + + # Verify the token is valid + ctx = await get_verified_user_context(auth_token) + if not ctx: + return RedirectResponse(url="/", status_code=302) + + # Register token for this user (for revocation by username later) + register_user_token(ctx.username, auth_token) + + # Set local first-party cookie and redirect to runs + response = RedirectResponse(url="/runs", status_code=302) + response.set_cookie( + key="auth_token", + value=auth_token, + httponly=True, + max_age=60 * 60 * 24 * 30, # 30 days + samesite="lax", + secure=True + ) + return response + + +@router.get("/logout") +async def logout(): + """ + Logout - clear local cookie and redirect to home. + + Note: This only logs out of L1. User should also logout from L2. + """ + response = RedirectResponse(url="/", status_code=302) + response.delete_cookie("auth_token") + return response + + +@router.post("/revoke") +async def revoke_token( + credentials: HTTPAuthorizationCredentials = Depends(security), +): + """ + Revoke a token. Called by L2 when user logs out. + + The token to revoke is passed in the Authorization header. + """ + from server import get_user_context_from_token, revoke_token as do_revoke_token + + if not credentials: + raise HTTPException(401, "No token provided") + + token = credentials.credentials + + # Verify token is valid before revoking (ensures caller has the token) + ctx = get_user_context_from_token(token) + if not ctx: + raise HTTPException(401, "Invalid token") + + # Revoke the token + newly_revoked = do_revoke_token(token) + + return {"revoked": True, "newly_revoked": newly_revoked} + + +@router.post("/revoke-user") +async def revoke_user_tokens(request: RevokeUserRequest): + """ + Revoke all tokens for a user. Called by L2 when user logs out. + + This handles the case where L2 issued scoped tokens that differ from L2's own token. + """ + from server import revoke_all_user_tokens + + # Revoke all tokens registered for this user + count = revoke_all_user_tokens(request.username) + + return { + "revoked": True, + "tokens_revoked": count, + "username": request.username + } diff --git a/app/routers/cache.py b/app/routers/cache.py new file mode 100644 index 0000000..ecaee8e --- /dev/null +++ b/app/routers/cache.py @@ -0,0 +1,55 @@ +""" +Cache and media routes for L1 server. + +Handles content retrieval, metadata, media preview, and publishing. +""" + +from fastapi import APIRouter, Request, Depends, HTTPException +from fastapi.responses import HTMLResponse, FileResponse + +from artdag_common.middleware import UserContext, wants_html + +from ..dependencies import require_auth, get_templates, get_current_user + +router = APIRouter() + + +# TODO: Migrate routes from server.py lines 2767-4200 +# - GET /cache/{content_hash} - Get content details +# - GET /cache/{content_hash}/raw - Download raw file +# - GET /cache/{content_hash}/mp4 - Video conversion +# - GET /cache/{content_hash}/meta - Get metadata +# - PATCH /cache/{content_hash}/meta - Update metadata +# - POST /cache/{content_hash}/publish - Publish to L2 +# - PATCH /cache/{content_hash}/republish - Republish +# - DELETE /cache/{content_hash} - Delete content +# - POST /cache/import - Import from IPFS +# - POST /cache/upload - Upload content +# - GET /media - Media list + + +@router.get("/cache/{content_hash}") +async def get_cache_item( + content_hash: str, + request: Request, +): + """Get cached content details or serve the file.""" + # TODO: Implement with content negotiation + raise HTTPException(501, "Not yet migrated") + + +@router.get("/cache/{content_hash}/raw") +async def download_raw(content_hash: str): + """Download the raw cached file.""" + # TODO: Implement + raise HTTPException(501, "Not yet migrated") + + +@router.get("/media") +async def list_media( + request: Request, + user: UserContext = Depends(require_auth), +): + """List all media in cache.""" + # TODO: Implement + raise HTTPException(501, "Not yet migrated") diff --git a/app/routers/home.py b/app/routers/home.py new file mode 100644 index 0000000..0d8a4fd --- /dev/null +++ b/app/routers/home.py @@ -0,0 +1,49 @@ +""" +Home and root routes for L1 server. +""" + +from fastapi import APIRouter, Request, Depends +from fastapi.responses import HTMLResponse, RedirectResponse + +from artdag_common import render +from artdag_common.middleware import wants_html + +from ..dependencies import get_templates, get_current_user + +router = APIRouter() + + +@router.get("/") +async def home(request: Request): + """ + Home page - redirect to runs if authenticated, show landing otherwise. + """ + user = await get_current_user(request) + + if user: + return RedirectResponse(url="/runs", status_code=302) + + # For now, redirect to login at L2 + # TODO: Show a landing page with login link + return RedirectResponse(url="/runs", status_code=302) + + +@router.get("/login") +async def login_redirect(request: Request): + """ + Redirect to L2 for login. + """ + from ..config import settings + + if settings.l2_server: + # Redirect to L2 login with return URL + return_url = str(request.url_for("auth_callback")) + login_url = f"{settings.l2_server}/login?return_to={return_url}" + return RedirectResponse(url=login_url, status_code=302) + + # No L2 configured - show error + return HTMLResponse( + "
No L2 server configured for authentication.
", + status_code=503 + ) diff --git a/app/routers/recipes.py b/app/routers/recipes.py new file mode 100644 index 0000000..74595b8 --- /dev/null +++ b/app/routers/recipes.py @@ -0,0 +1,47 @@ +""" +Recipe management routes for L1 server. + +Handles recipe upload, listing, viewing, and execution. +""" + +from fastapi import APIRouter, Request, Depends, HTTPException +from fastapi.responses import HTMLResponse + +from artdag_common.middleware import UserContext, wants_html + +from ..dependencies import require_auth, get_templates + +router = APIRouter() + + +# TODO: Migrate routes from server.py lines 1990-2767 +# - POST /recipes/upload - Upload recipe YAML +# - GET /recipes - List recipes +# - GET /recipes/{recipe_id} - Get recipe details +# - DELETE /recipes/{recipe_id} - Delete recipe +# - POST /recipes/{recipe_id}/run - Run recipe +# - GET /recipe/{recipe_id} - Recipe detail page +# - GET /recipe/{recipe_id}/dag - Recipe DAG visualization +# - POST /ui/recipes/{recipe_id}/run - Run from UI +# - GET /ui/recipes-list - Recipes list UI + + +@router.get("/recipes") +async def list_recipes( + request: Request, + user: UserContext = Depends(require_auth), +): + """List available recipes.""" + # TODO: Implement + raise HTTPException(501, "Not yet migrated") + + +@router.get("/recipe/{recipe_id}") +async def recipe_detail( + recipe_id: str, + request: Request, + user: UserContext = Depends(require_auth), +): + """Recipe detail page with DAG visualization.""" + # TODO: Implement + raise HTTPException(501, "Not yet migrated") diff --git a/app/routers/runs.py b/app/routers/runs.py new file mode 100644 index 0000000..b27fce4 --- /dev/null +++ b/app/routers/runs.py @@ -0,0 +1,57 @@ +""" +Run management routes for L1 server. + +Handles run creation, status, listing, and detail views. +""" + +from fastapi import APIRouter, Request, Depends, HTTPException +from fastapi.responses import HTMLResponse + +from artdag_common.middleware import UserContext, wants_html + +from ..dependencies import require_auth, get_templates, get_current_user + +router = APIRouter() + + +# TODO: Migrate routes from server.py lines 675-1789 +# - POST /runs - Create run +# - GET /runs/{run_id} - Get run status +# - DELETE /runs/{run_id} - Delete run +# - GET /runs - List runs +# - GET /run/{run_id} - Run detail page +# - GET /run/{run_id}/plan - Plan visualization +# - GET /run/{run_id}/analysis - Analysis results +# - GET /run/{run_id}/artifacts - Artifacts list + + +@router.get("/runs") +async def list_runs( + request: Request, + user: UserContext = Depends(require_auth), +): + """List all runs for the current user.""" + # TODO: Implement + raise HTTPException(501, "Not yet migrated") + + +@router.get("/run/{run_id}") +async def run_detail( + run_id: str, + request: Request, + user: UserContext = Depends(require_auth), +): + """Run detail page with tabs for plan/analysis/artifacts.""" + # TODO: Implement + raise HTTPException(501, "Not yet migrated") + + +@router.get("/run/{run_id}/plan") +async def run_plan( + run_id: str, + request: Request, + user: UserContext = Depends(require_auth), +): + """Plan visualization as interactive DAG.""" + # TODO: Implement + raise HTTPException(501, "Not yet migrated") diff --git a/app/routers/storage.py b/app/routers/storage.py new file mode 100644 index 0000000..b594dc1 --- /dev/null +++ b/app/routers/storage.py @@ -0,0 +1,35 @@ +""" +Storage provider routes for L1 server. + +Manages user storage backends (Pinata, web3.storage, local, etc.) +""" + +from fastapi import APIRouter, Request, Depends, HTTPException +from fastapi.responses import HTMLResponse + +from artdag_common.middleware import UserContext, wants_html + +from ..dependencies import require_auth, get_templates + +router = APIRouter() + + +# TODO: Migrate routes from server.py lines 5473-5761 +# - GET /storage - List storage providers +# - POST /storage - Add storage provider +# - POST /storage/add - Add via form +# - GET /storage/{storage_id} - Get storage details +# - PATCH /storage/{storage_id} - Update storage +# - DELETE /storage/{storage_id} - Delete storage +# - POST /storage/{storage_id}/test - Test connection +# - GET /storage/type/{provider_type} - Get provider config + + +@router.get("") +async def list_storage( + request: Request, + user: UserContext = Depends(require_auth), +): + """List user's storage providers.""" + # TODO: Implement + raise HTTPException(501, "Not yet migrated") diff --git a/app/services/__init__.py b/app/services/__init__.py new file mode 100644 index 0000000..76eba24 --- /dev/null +++ b/app/services/__init__.py @@ -0,0 +1,15 @@ +""" +L1 Server Services. + +Business logic layer between routers and repositories. +""" + +from .run_service import RunService +from .recipe_service import RecipeService +from .cache_service import CacheService + +__all__ = [ + "RunService", + "RecipeService", + "CacheService", +] diff --git a/app/services/cache_service.py b/app/services/cache_service.py new file mode 100644 index 0000000..c9b4a95 --- /dev/null +++ b/app/services/cache_service.py @@ -0,0 +1,110 @@ +""" +Cache Service - business logic for cache and media management. +""" + +from pathlib import Path +from typing import Optional, List, Dict, Any + +from artdag_common.utils.media import detect_media_type, get_mime_type + + +class CacheService: + """ + Service for managing cached content. + + Handles content retrieval, metadata, and media type detection. + """ + + def __init__(self, cache_manager, database): + self.cache = cache_manager + self.db = database + + async def get_item(self, content_hash: str) -> Optional[Dict[str, Any]]: + """Get cached item by content hash.""" + path = self.cache.get_by_content_hash(content_hash) + if not path or not path.exists(): + return None + + # Get metadata from database + meta = await self.db.get_cache_item(content_hash) + + media_type = detect_media_type(path) + mime_type = get_mime_type(path) + size = path.stat().st_size + + return { + "content_hash": content_hash, + "path": str(path), + "media_type": media_type, + "mime_type": mime_type, + "size": size, + "name": meta.get("name") if meta else None, + "description": meta.get("description") if meta else None, + "tags": meta.get("tags", []) if meta else [], + "ipfs_cid": meta.get("ipfs_cid") if meta else None, + } + + async def get_path(self, content_hash: str) -> Optional[Path]: + """Get the file path for cached content.""" + return self.cache.get_by_content_hash(content_hash) + + async def list_items( + self, + actor_id: str = None, + media_type: str = None, + page: int = 1, + limit: int = 20, + ) -> Dict[str, Any]: + """List cached items with filters and pagination.""" + # Get items from database + items = await self.db.list_cache_items( + actor_id=actor_id, + media_type=media_type, + offset=(page - 1) * limit, + limit=limit, + ) + + total = await self.db.count_cache_items(actor_id=actor_id, media_type=media_type) + + return { + "items": items, + "pagination": { + "page": page, + "limit": limit, + "total": total, + "has_more": page * limit < total, + } + } + + async def update_metadata( + self, + content_hash: str, + name: str = None, + description: str = None, + tags: List[str] = None, + ) -> bool: + """Update item metadata.""" + return await self.db.update_cache_metadata( + content_hash=content_hash, + name=name, + description=description, + tags=tags, + ) + + async def delete_item(self, content_hash: str) -> bool: + """Delete a cached item.""" + path = self.cache.get_by_content_hash(content_hash) + if path and path.exists(): + path.unlink() + + # Remove from database + await self.db.delete_cache_item(content_hash) + return True + + def has_content(self, content_hash: str) -> bool: + """Check if content exists in cache.""" + return self.cache.has_content(content_hash) + + def get_ipfs_cid(self, content_hash: str) -> Optional[str]: + """Get IPFS CID for cached content.""" + return self.cache.get_ipfs_cid(content_hash) diff --git a/app/services/recipe_service.py b/app/services/recipe_service.py new file mode 100644 index 0000000..bae3043 --- /dev/null +++ b/app/services/recipe_service.py @@ -0,0 +1,128 @@ +""" +Recipe Service - business logic for recipe management. +""" + +from typing import Optional, List, Dict, Any +import json +import yaml + + +class RecipeService: + """ + Service for managing recipes. + + Handles recipe parsing, validation, and DAG building. + """ + + def __init__(self, redis, cache): + self.redis = redis + self.cache = cache + self.recipe_prefix = "recipe:" + + async def get_recipe(self, recipe_id: str) -> Optional[Dict[str, Any]]: + """Get a recipe by ID (content hash).""" + # First check Redis + data = self.redis.get(f"{self.recipe_prefix}{recipe_id}") + if data: + return json.loads(data) + + # Fall back to cache + path = self.cache.get_by_content_hash(recipe_id) + if path and path.exists(): + with open(path) as f: + return yaml.safe_load(f) + + return None + + async def list_recipes(self, actor_id: str = None, page: int = 1, limit: int = 20) -> Dict[str, Any]: + """List available recipes with pagination.""" + recipes = [] + cursor = 0 + + while True: + cursor, keys = self.redis.scan( + cursor=cursor, + match=f"{self.recipe_prefix}*", + count=100 + ) + for key in keys: + data = self.redis.get(key) + if data: + recipe = json.loads(data) + # Filter by actor if specified + if actor_id is None or recipe.get("actor_id") == actor_id: + recipes.append(recipe) + if cursor == 0: + break + + # Sort by name + recipes.sort(key=lambda r: r.get("name", "")) + + # Paginate + total = len(recipes) + start = (page - 1) * limit + end = start + limit + page_recipes = recipes[start:end] + + return { + "recipes": page_recipes, + "pagination": { + "page": page, + "limit": limit, + "total": total, + "has_more": end < total, + } + } + + async def save_recipe(self, recipe_id: str, recipe_data: Dict[str, Any]) -> None: + """Save a recipe to Redis.""" + self.redis.set(f"{self.recipe_prefix}{recipe_id}", json.dumps(recipe_data)) + + async def delete_recipe(self, recipe_id: str) -> bool: + """Delete a recipe.""" + return self.redis.delete(f"{self.recipe_prefix}{recipe_id}") > 0 + + def parse_yaml(self, yaml_content: str) -> Dict[str, Any]: + """Parse recipe YAML content.""" + return yaml.safe_load(yaml_content) + + def build_dag(self, recipe: Dict[str, Any]) -> Dict[str, Any]: + """ + Build DAG visualization data from recipe. + + Returns nodes and edges for Cytoscape.js. + """ + nodes = [] + edges = [] + + dag = recipe.get("dag", {}) + dag_nodes = dag.get("nodes", {}) + output_node = dag.get("output") + + for node_id, node_def in dag_nodes.items(): + node_type = node_def.get("type", "EFFECT") + nodes.append({ + "data": { + "id": node_id, + "label": node_id, + "nodeType": node_type, + "isOutput": node_id == output_node, + } + }) + + # Build edges from inputs + for input_ref in node_def.get("inputs", []): + if isinstance(input_ref, dict): + source = input_ref.get("node") or input_ref.get("input") + else: + source = input_ref + + if source: + edges.append({ + "data": { + "source": source, + "target": node_id, + } + }) + + return {"nodes": nodes, "edges": edges} diff --git a/app/services/run_service.py b/app/services/run_service.py new file mode 100644 index 0000000..c45d785 --- /dev/null +++ b/app/services/run_service.py @@ -0,0 +1,113 @@ +""" +Run Service - business logic for run management. +""" + +from typing import Optional, List, Dict, Any +import json + + +class RunService: + """ + Service for managing recipe runs. + + Handles run lifecycle, plan loading, and result aggregation. + """ + + def __init__(self, redis, cache): + self.redis = redis + self.cache = cache + self.run_prefix = "run:" + + async def get_run(self, run_id: str) -> Optional[Dict[str, Any]]: + """Get a run by ID.""" + data = self.redis.get(f"{self.run_prefix}{run_id}") + if not data: + return None + return json.loads(data) + + async def list_runs(self, actor_id: str, page: int = 1, limit: int = 20) -> Dict[str, Any]: + """List runs for a user with pagination.""" + # Get all runs and filter by actor + # TODO: Use Redis index for efficient filtering + all_runs = [] + cursor = 0 + + while True: + cursor, keys = self.redis.scan( + cursor=cursor, + match=f"{self.run_prefix}*", + count=100 + ) + for key in keys: + data = self.redis.get(key) + if data: + run = json.loads(data) + if run.get("actor_id") == actor_id or run.get("username") == actor_id: + all_runs.append(run) + if cursor == 0: + break + + # Sort by created_at descending + all_runs.sort(key=lambda r: r.get("created_at", ""), reverse=True) + + # Paginate + total = len(all_runs) + start = (page - 1) * limit + end = start + limit + runs = all_runs[start:end] + + return { + "runs": runs, + "pagination": { + "page": page, + "limit": limit, + "total": total, + "has_more": end < total, + } + } + + async def create_run( + self, + run_id: str, + recipe_id: str, + inputs: Dict[str, str], + actor_id: str, + ) -> Dict[str, Any]: + """Create a new run.""" + from datetime import datetime + + run = { + "run_id": run_id, + "recipe": f"recipe:{recipe_id}", + "inputs": inputs, + "actor_id": actor_id, + "status": "pending", + "created_at": datetime.utcnow().isoformat(), + } + + self.redis.set(f"{self.run_prefix}{run_id}", json.dumps(run)) + return run + + async def update_run(self, run_id: str, updates: Dict[str, Any]) -> Optional[Dict[str, Any]]: + """Update a run's fields.""" + run = await self.get_run(run_id) + if not run: + return None + + run.update(updates) + self.redis.set(f"{self.run_prefix}{run_id}", json.dumps(run)) + return run + + async def delete_run(self, run_id: str) -> bool: + """Delete a run.""" + return self.redis.delete(f"{self.run_prefix}{run_id}") > 0 + + async def load_plan(self, run_id: str) -> Optional[Dict[str, Any]]: + """Load execution plan for a run.""" + from ..config import settings + + plan_path = settings.plan_cache_dir / f"{run_id}.json" + if plan_path.exists(): + with open(plan_path) as f: + return json.load(f) + return None