Add modular app structure for L1 server refactoring

Phase 2 of the full modernization:
- App factory pattern with create_app()
- Settings via dataclass with env vars
- Dependency injection container
- Router stubs for auth, storage, api, recipes, cache, runs
- Service layer stubs for run, recipe, cache
- Repository layer placeholder

Routes are stubs that import from legacy server.py during migration.
Next: Migrate each router fully with templates.

🤖 Generated with [Claude Code](https://claude.com/claude-code)

Co-Authored-By: Claude Opus 4.5 <noreply@anthropic.com>
This commit is contained in:
giles
2026-01-11 07:08:08 +00:00
parent 3e24156c99
commit adc876dbd6
16 changed files with 1057 additions and 0 deletions

49
app/__init__.py Normal file
View File

@@ -0,0 +1,49 @@
"""
Art-DAG L1 Server Application Factory.
Creates and configures the FastAPI application with all routers and middleware.
"""
from pathlib import Path
from fastapi import FastAPI
from fastapi.staticfiles import StaticFiles
from artdag_common import create_jinja_env
from .config import settings
def create_app() -> FastAPI:
"""
Create and configure the L1 FastAPI application.
Returns:
Configured FastAPI instance
"""
app = FastAPI(
title="Art-DAG L1 Server",
description="Content-addressed media processing with distributed execution",
version="1.0.0",
)
# Initialize Jinja2 templates
template_dir = Path(__file__).parent / "templates"
app.state.templates = create_jinja_env(template_dir)
# Include routers
from .routers import auth, storage, api, recipes, cache, runs, home
# API routers
app.include_router(auth.router, prefix="/auth", tags=["auth"])
app.include_router(storage.router, prefix="/storage", tags=["storage"])
app.include_router(api.router, prefix="/api", tags=["api"])
app.include_router(recipes.router, tags=["recipes"])
app.include_router(cache.router, tags=["cache"])
app.include_router(runs.router, tags=["runs"])
app.include_router(home.router, tags=["home"])
return app
# Create the default app instance
app = create_app()

72
app/config.py Normal file
View File

@@ -0,0 +1,72 @@
"""
L1 Server Configuration.
Environment-based configuration with sensible defaults.
"""
import os
from pathlib import Path
from dataclasses import dataclass, field
from typing import Optional
@dataclass
class Settings:
"""Application settings loaded from environment."""
# Server
host: str = field(default_factory=lambda: os.environ.get("HOST", "0.0.0.0"))
port: int = field(default_factory=lambda: int(os.environ.get("PORT", "8000")))
debug: bool = field(default_factory=lambda: os.environ.get("DEBUG", "").lower() == "true")
# Cache
cache_dir: Path = field(
default_factory=lambda: Path(os.environ.get("CACHE_DIR", "/data/cache"))
)
# Redis
redis_url: str = field(
default_factory=lambda: os.environ.get("REDIS_URL", "redis://localhost:6379/5")
)
# Database
database_url: str = field(
default_factory=lambda: os.environ.get(
"DATABASE_URL", "postgresql://artdag:artdag@localhost:5432/artdag"
)
)
# IPFS
ipfs_api: str = field(
default_factory=lambda: os.environ.get("IPFS_API", "/dns/localhost/tcp/5001")
)
ipfs_gateway_url: str = field(
default_factory=lambda: os.environ.get("IPFS_GATEWAY_URL", "https://ipfs.io/ipfs")
)
# L2 Server
l2_server: Optional[str] = field(
default_factory=lambda: os.environ.get("L2_SERVER")
)
l2_domain: Optional[str] = field(
default_factory=lambda: os.environ.get("L2_DOMAIN")
)
# Derived paths
@property
def plan_cache_dir(self) -> Path:
return self.cache_dir / "plans"
@property
def analysis_cache_dir(self) -> Path:
return self.cache_dir / "analysis"
def ensure_dirs(self) -> None:
"""Create required directories."""
self.cache_dir.mkdir(parents=True, exist_ok=True)
self.plan_cache_dir.mkdir(parents=True, exist_ok=True)
self.analysis_cache_dir.mkdir(parents=True, exist_ok=True)
# Singleton settings instance
settings = Settings()

135
app/dependencies.py Normal file
View File

@@ -0,0 +1,135 @@
"""
FastAPI dependency injection container.
Provides shared resources and services to route handlers.
"""
from functools import lru_cache
from typing import Optional
import asyncio
from fastapi import Request, Depends, HTTPException
from jinja2 import Environment
from artdag_common.middleware.auth import UserContext, get_user_from_cookie, get_user_from_header
from .config import settings
# Lazy imports to avoid circular dependencies
_redis_client = None
_cache_manager = None
_database = None
def get_redis_client():
"""Get the Redis client singleton."""
global _redis_client
if _redis_client is None:
import redis
_redis_client = redis.from_url(settings.redis_url, decode_responses=True)
return _redis_client
def get_cache_manager():
"""Get the cache manager singleton."""
global _cache_manager
if _cache_manager is None:
from cache_manager import get_cache_manager as _get_cache_manager
_cache_manager = _get_cache_manager()
return _cache_manager
def get_database():
"""Get the database singleton."""
global _database
if _database is None:
import database
_database = database
return _database
def get_templates(request: Request) -> Environment:
"""Get the Jinja2 environment from app state."""
return request.app.state.templates
async def get_current_user(request: Request) -> Optional[UserContext]:
"""
Get the current user from request (cookie or header).
This is a permissive dependency - returns None if not authenticated.
Use require_auth for routes that require authentication.
"""
# Try header first (API clients)
ctx = get_user_from_header(request)
if ctx:
return ctx
# Fall back to cookie (browser)
return get_user_from_cookie(request)
async def require_auth(request: Request) -> UserContext:
"""
Require authentication for a route.
Raises:
HTTPException 401 if not authenticated
HTTPException 302 redirect to login for HTML requests
"""
ctx = await get_current_user(request)
if ctx is None:
# Check if HTML request for redirect
accept = request.headers.get("accept", "")
if "text/html" in accept:
raise HTTPException(
status_code=302,
headers={"Location": "/login"}
)
raise HTTPException(status_code=401, detail="Authentication required")
return ctx
async def get_user_context_from_cookie(request: Request) -> Optional[UserContext]:
"""
Legacy compatibility: get user from cookie.
Validates token with L2 server if configured.
"""
ctx = get_user_from_cookie(request)
if ctx is None:
return None
# If L2 server configured, could validate token here
# For now, trust the cookie
return ctx
# Service dependencies (lazy loading)
def get_run_service():
"""Get the run service."""
from .services.run_service import RunService
return RunService(
redis=get_redis_client(),
cache=get_cache_manager(),
)
def get_recipe_service():
"""Get the recipe service."""
from .services.recipe_service import RecipeService
return RecipeService(
redis=get_redis_client(),
cache=get_cache_manager(),
)
def get_cache_service():
"""Get the cache service."""
from .services.cache_service import CacheService
return CacheService(
cache_manager=get_cache_manager(),
database=get_database(),
)

View File

@@ -0,0 +1,10 @@
"""
L1 Server Repositories.
Data access layer for persistence operations.
"""
# TODO: Implement repositories
# - RunRepository - Redis-backed run storage
# - RecipeRepository - Redis-backed recipe storage
# - CacheRepository - Filesystem + PostgreSQL cache metadata

23
app/routers/__init__.py Normal file
View File

@@ -0,0 +1,23 @@
"""
L1 Server Routers.
Each router handles a specific domain of functionality.
"""
from . import auth
from . import storage
from . import api
from . import recipes
from . import cache
from . import runs
from . import home
__all__ = [
"auth",
"storage",
"api",
"recipes",
"cache",
"runs",
"home",
]

40
app/routers/api.py Normal file
View File

@@ -0,0 +1,40 @@
"""
3-phase API routes for L1 server.
Provides the plan/execute/run-recipe endpoints for programmatic access.
"""
from fastapi import APIRouter, Request, Depends, HTTPException
from artdag_common.models.requests import PlanRequest, ExecutePlanRequest, RecipeRunRequest
from ..dependencies import require_auth
router = APIRouter()
# TODO: Migrate routes from server.py lines 6036-6241
# - POST /plan - Generate execution plan
# - POST /execute - Execute a plan
# - POST /run-recipe - Run complete recipe
@router.post("/plan")
async def generate_plan(request: PlanRequest):
"""Generate an execution plan from recipe without executing."""
# TODO: Implement
raise HTTPException(501, "Not yet migrated")
@router.post("/execute")
async def execute_plan(request: ExecutePlanRequest):
"""Execute a previously generated plan."""
# TODO: Implement
raise HTTPException(501, "Not yet migrated")
@router.post("/run-recipe")
async def run_recipe(request: RecipeRunRequest):
"""Run a complete recipe through all 3 phases."""
# TODO: Implement
raise HTTPException(501, "Not yet migrated")

119
app/routers/auth.py Normal file
View File

@@ -0,0 +1,119 @@
"""
Authentication routes for L1 server.
L1 doesn't handle login directly - users log in at their L2 server.
Token is passed via URL from L2 redirect, then L1 sets its own cookie.
"""
from fastapi import APIRouter, Depends, HTTPException, Request
from fastapi.responses import RedirectResponse
from fastapi.security import HTTPBearer, HTTPAuthorizationCredentials
from pydantic import BaseModel
# Import auth utilities from existing server module
# TODO: Move these to a service
import sys
import os
sys.path.insert(0, os.path.dirname(os.path.dirname(os.path.dirname(os.path.abspath(__file__)))))
router = APIRouter()
security = HTTPBearer(auto_error=False)
class RevokeUserRequest(BaseModel):
"""Request to revoke all tokens for a user."""
username: str
l2_server: str
@router.get("")
async def auth_callback(auth_token: str = None):
"""
Receive auth token from L2 redirect and set local cookie.
This enables cross-subdomain auth on iOS Safari which blocks shared cookies.
L2 redirects here with ?auth_token=... after user logs in.
"""
# Import here to avoid circular imports
from server import get_verified_user_context, register_user_token
if not auth_token:
return RedirectResponse(url="/", status_code=302)
# Verify the token is valid
ctx = await get_verified_user_context(auth_token)
if not ctx:
return RedirectResponse(url="/", status_code=302)
# Register token for this user (for revocation by username later)
register_user_token(ctx.username, auth_token)
# Set local first-party cookie and redirect to runs
response = RedirectResponse(url="/runs", status_code=302)
response.set_cookie(
key="auth_token",
value=auth_token,
httponly=True,
max_age=60 * 60 * 24 * 30, # 30 days
samesite="lax",
secure=True
)
return response
@router.get("/logout")
async def logout():
"""
Logout - clear local cookie and redirect to home.
Note: This only logs out of L1. User should also logout from L2.
"""
response = RedirectResponse(url="/", status_code=302)
response.delete_cookie("auth_token")
return response
@router.post("/revoke")
async def revoke_token(
credentials: HTTPAuthorizationCredentials = Depends(security),
):
"""
Revoke a token. Called by L2 when user logs out.
The token to revoke is passed in the Authorization header.
"""
from server import get_user_context_from_token, revoke_token as do_revoke_token
if not credentials:
raise HTTPException(401, "No token provided")
token = credentials.credentials
# Verify token is valid before revoking (ensures caller has the token)
ctx = get_user_context_from_token(token)
if not ctx:
raise HTTPException(401, "Invalid token")
# Revoke the token
newly_revoked = do_revoke_token(token)
return {"revoked": True, "newly_revoked": newly_revoked}
@router.post("/revoke-user")
async def revoke_user_tokens(request: RevokeUserRequest):
"""
Revoke all tokens for a user. Called by L2 when user logs out.
This handles the case where L2 issued scoped tokens that differ from L2's own token.
"""
from server import revoke_all_user_tokens
# Revoke all tokens registered for this user
count = revoke_all_user_tokens(request.username)
return {
"revoked": True,
"tokens_revoked": count,
"username": request.username
}

55
app/routers/cache.py Normal file
View File

@@ -0,0 +1,55 @@
"""
Cache and media routes for L1 server.
Handles content retrieval, metadata, media preview, and publishing.
"""
from fastapi import APIRouter, Request, Depends, HTTPException
from fastapi.responses import HTMLResponse, FileResponse
from artdag_common.middleware import UserContext, wants_html
from ..dependencies import require_auth, get_templates, get_current_user
router = APIRouter()
# TODO: Migrate routes from server.py lines 2767-4200
# - GET /cache/{content_hash} - Get content details
# - GET /cache/{content_hash}/raw - Download raw file
# - GET /cache/{content_hash}/mp4 - Video conversion
# - GET /cache/{content_hash}/meta - Get metadata
# - PATCH /cache/{content_hash}/meta - Update metadata
# - POST /cache/{content_hash}/publish - Publish to L2
# - PATCH /cache/{content_hash}/republish - Republish
# - DELETE /cache/{content_hash} - Delete content
# - POST /cache/import - Import from IPFS
# - POST /cache/upload - Upload content
# - GET /media - Media list
@router.get("/cache/{content_hash}")
async def get_cache_item(
content_hash: str,
request: Request,
):
"""Get cached content details or serve the file."""
# TODO: Implement with content negotiation
raise HTTPException(501, "Not yet migrated")
@router.get("/cache/{content_hash}/raw")
async def download_raw(content_hash: str):
"""Download the raw cached file."""
# TODO: Implement
raise HTTPException(501, "Not yet migrated")
@router.get("/media")
async def list_media(
request: Request,
user: UserContext = Depends(require_auth),
):
"""List all media in cache."""
# TODO: Implement
raise HTTPException(501, "Not yet migrated")

49
app/routers/home.py Normal file
View File

@@ -0,0 +1,49 @@
"""
Home and root routes for L1 server.
"""
from fastapi import APIRouter, Request, Depends
from fastapi.responses import HTMLResponse, RedirectResponse
from artdag_common import render
from artdag_common.middleware import wants_html
from ..dependencies import get_templates, get_current_user
router = APIRouter()
@router.get("/")
async def home(request: Request):
"""
Home page - redirect to runs if authenticated, show landing otherwise.
"""
user = await get_current_user(request)
if user:
return RedirectResponse(url="/runs", status_code=302)
# For now, redirect to login at L2
# TODO: Show a landing page with login link
return RedirectResponse(url="/runs", status_code=302)
@router.get("/login")
async def login_redirect(request: Request):
"""
Redirect to L2 for login.
"""
from ..config import settings
if settings.l2_server:
# Redirect to L2 login with return URL
return_url = str(request.url_for("auth_callback"))
login_url = f"{settings.l2_server}/login?return_to={return_url}"
return RedirectResponse(url=login_url, status_code=302)
# No L2 configured - show error
return HTMLResponse(
"<html><body><h1>Login not configured</h1>"
"<p>No L2 server configured for authentication.</p></body></html>",
status_code=503
)

47
app/routers/recipes.py Normal file
View File

@@ -0,0 +1,47 @@
"""
Recipe management routes for L1 server.
Handles recipe upload, listing, viewing, and execution.
"""
from fastapi import APIRouter, Request, Depends, HTTPException
from fastapi.responses import HTMLResponse
from artdag_common.middleware import UserContext, wants_html
from ..dependencies import require_auth, get_templates
router = APIRouter()
# TODO: Migrate routes from server.py lines 1990-2767
# - POST /recipes/upload - Upload recipe YAML
# - GET /recipes - List recipes
# - GET /recipes/{recipe_id} - Get recipe details
# - DELETE /recipes/{recipe_id} - Delete recipe
# - POST /recipes/{recipe_id}/run - Run recipe
# - GET /recipe/{recipe_id} - Recipe detail page
# - GET /recipe/{recipe_id}/dag - Recipe DAG visualization
# - POST /ui/recipes/{recipe_id}/run - Run from UI
# - GET /ui/recipes-list - Recipes list UI
@router.get("/recipes")
async def list_recipes(
request: Request,
user: UserContext = Depends(require_auth),
):
"""List available recipes."""
# TODO: Implement
raise HTTPException(501, "Not yet migrated")
@router.get("/recipe/{recipe_id}")
async def recipe_detail(
recipe_id: str,
request: Request,
user: UserContext = Depends(require_auth),
):
"""Recipe detail page with DAG visualization."""
# TODO: Implement
raise HTTPException(501, "Not yet migrated")

57
app/routers/runs.py Normal file
View File

@@ -0,0 +1,57 @@
"""
Run management routes for L1 server.
Handles run creation, status, listing, and detail views.
"""
from fastapi import APIRouter, Request, Depends, HTTPException
from fastapi.responses import HTMLResponse
from artdag_common.middleware import UserContext, wants_html
from ..dependencies import require_auth, get_templates, get_current_user
router = APIRouter()
# TODO: Migrate routes from server.py lines 675-1789
# - POST /runs - Create run
# - GET /runs/{run_id} - Get run status
# - DELETE /runs/{run_id} - Delete run
# - GET /runs - List runs
# - GET /run/{run_id} - Run detail page
# - GET /run/{run_id}/plan - Plan visualization
# - GET /run/{run_id}/analysis - Analysis results
# - GET /run/{run_id}/artifacts - Artifacts list
@router.get("/runs")
async def list_runs(
request: Request,
user: UserContext = Depends(require_auth),
):
"""List all runs for the current user."""
# TODO: Implement
raise HTTPException(501, "Not yet migrated")
@router.get("/run/{run_id}")
async def run_detail(
run_id: str,
request: Request,
user: UserContext = Depends(require_auth),
):
"""Run detail page with tabs for plan/analysis/artifacts."""
# TODO: Implement
raise HTTPException(501, "Not yet migrated")
@router.get("/run/{run_id}/plan")
async def run_plan(
run_id: str,
request: Request,
user: UserContext = Depends(require_auth),
):
"""Plan visualization as interactive DAG."""
# TODO: Implement
raise HTTPException(501, "Not yet migrated")

35
app/routers/storage.py Normal file
View File

@@ -0,0 +1,35 @@
"""
Storage provider routes for L1 server.
Manages user storage backends (Pinata, web3.storage, local, etc.)
"""
from fastapi import APIRouter, Request, Depends, HTTPException
from fastapi.responses import HTMLResponse
from artdag_common.middleware import UserContext, wants_html
from ..dependencies import require_auth, get_templates
router = APIRouter()
# TODO: Migrate routes from server.py lines 5473-5761
# - GET /storage - List storage providers
# - POST /storage - Add storage provider
# - POST /storage/add - Add via form
# - GET /storage/{storage_id} - Get storage details
# - PATCH /storage/{storage_id} - Update storage
# - DELETE /storage/{storage_id} - Delete storage
# - POST /storage/{storage_id}/test - Test connection
# - GET /storage/type/{provider_type} - Get provider config
@router.get("")
async def list_storage(
request: Request,
user: UserContext = Depends(require_auth),
):
"""List user's storage providers."""
# TODO: Implement
raise HTTPException(501, "Not yet migrated")

15
app/services/__init__.py Normal file
View File

@@ -0,0 +1,15 @@
"""
L1 Server Services.
Business logic layer between routers and repositories.
"""
from .run_service import RunService
from .recipe_service import RecipeService
from .cache_service import CacheService
__all__ = [
"RunService",
"RecipeService",
"CacheService",
]

View File

@@ -0,0 +1,110 @@
"""
Cache Service - business logic for cache and media management.
"""
from pathlib import Path
from typing import Optional, List, Dict, Any
from artdag_common.utils.media import detect_media_type, get_mime_type
class CacheService:
"""
Service for managing cached content.
Handles content retrieval, metadata, and media type detection.
"""
def __init__(self, cache_manager, database):
self.cache = cache_manager
self.db = database
async def get_item(self, content_hash: str) -> Optional[Dict[str, Any]]:
"""Get cached item by content hash."""
path = self.cache.get_by_content_hash(content_hash)
if not path or not path.exists():
return None
# Get metadata from database
meta = await self.db.get_cache_item(content_hash)
media_type = detect_media_type(path)
mime_type = get_mime_type(path)
size = path.stat().st_size
return {
"content_hash": content_hash,
"path": str(path),
"media_type": media_type,
"mime_type": mime_type,
"size": size,
"name": meta.get("name") if meta else None,
"description": meta.get("description") if meta else None,
"tags": meta.get("tags", []) if meta else [],
"ipfs_cid": meta.get("ipfs_cid") if meta else None,
}
async def get_path(self, content_hash: str) -> Optional[Path]:
"""Get the file path for cached content."""
return self.cache.get_by_content_hash(content_hash)
async def list_items(
self,
actor_id: str = None,
media_type: str = None,
page: int = 1,
limit: int = 20,
) -> Dict[str, Any]:
"""List cached items with filters and pagination."""
# Get items from database
items = await self.db.list_cache_items(
actor_id=actor_id,
media_type=media_type,
offset=(page - 1) * limit,
limit=limit,
)
total = await self.db.count_cache_items(actor_id=actor_id, media_type=media_type)
return {
"items": items,
"pagination": {
"page": page,
"limit": limit,
"total": total,
"has_more": page * limit < total,
}
}
async def update_metadata(
self,
content_hash: str,
name: str = None,
description: str = None,
tags: List[str] = None,
) -> bool:
"""Update item metadata."""
return await self.db.update_cache_metadata(
content_hash=content_hash,
name=name,
description=description,
tags=tags,
)
async def delete_item(self, content_hash: str) -> bool:
"""Delete a cached item."""
path = self.cache.get_by_content_hash(content_hash)
if path and path.exists():
path.unlink()
# Remove from database
await self.db.delete_cache_item(content_hash)
return True
def has_content(self, content_hash: str) -> bool:
"""Check if content exists in cache."""
return self.cache.has_content(content_hash)
def get_ipfs_cid(self, content_hash: str) -> Optional[str]:
"""Get IPFS CID for cached content."""
return self.cache.get_ipfs_cid(content_hash)

View File

@@ -0,0 +1,128 @@
"""
Recipe Service - business logic for recipe management.
"""
from typing import Optional, List, Dict, Any
import json
import yaml
class RecipeService:
"""
Service for managing recipes.
Handles recipe parsing, validation, and DAG building.
"""
def __init__(self, redis, cache):
self.redis = redis
self.cache = cache
self.recipe_prefix = "recipe:"
async def get_recipe(self, recipe_id: str) -> Optional[Dict[str, Any]]:
"""Get a recipe by ID (content hash)."""
# First check Redis
data = self.redis.get(f"{self.recipe_prefix}{recipe_id}")
if data:
return json.loads(data)
# Fall back to cache
path = self.cache.get_by_content_hash(recipe_id)
if path and path.exists():
with open(path) as f:
return yaml.safe_load(f)
return None
async def list_recipes(self, actor_id: str = None, page: int = 1, limit: int = 20) -> Dict[str, Any]:
"""List available recipes with pagination."""
recipes = []
cursor = 0
while True:
cursor, keys = self.redis.scan(
cursor=cursor,
match=f"{self.recipe_prefix}*",
count=100
)
for key in keys:
data = self.redis.get(key)
if data:
recipe = json.loads(data)
# Filter by actor if specified
if actor_id is None or recipe.get("actor_id") == actor_id:
recipes.append(recipe)
if cursor == 0:
break
# Sort by name
recipes.sort(key=lambda r: r.get("name", ""))
# Paginate
total = len(recipes)
start = (page - 1) * limit
end = start + limit
page_recipes = recipes[start:end]
return {
"recipes": page_recipes,
"pagination": {
"page": page,
"limit": limit,
"total": total,
"has_more": end < total,
}
}
async def save_recipe(self, recipe_id: str, recipe_data: Dict[str, Any]) -> None:
"""Save a recipe to Redis."""
self.redis.set(f"{self.recipe_prefix}{recipe_id}", json.dumps(recipe_data))
async def delete_recipe(self, recipe_id: str) -> bool:
"""Delete a recipe."""
return self.redis.delete(f"{self.recipe_prefix}{recipe_id}") > 0
def parse_yaml(self, yaml_content: str) -> Dict[str, Any]:
"""Parse recipe YAML content."""
return yaml.safe_load(yaml_content)
def build_dag(self, recipe: Dict[str, Any]) -> Dict[str, Any]:
"""
Build DAG visualization data from recipe.
Returns nodes and edges for Cytoscape.js.
"""
nodes = []
edges = []
dag = recipe.get("dag", {})
dag_nodes = dag.get("nodes", {})
output_node = dag.get("output")
for node_id, node_def in dag_nodes.items():
node_type = node_def.get("type", "EFFECT")
nodes.append({
"data": {
"id": node_id,
"label": node_id,
"nodeType": node_type,
"isOutput": node_id == output_node,
}
})
# Build edges from inputs
for input_ref in node_def.get("inputs", []):
if isinstance(input_ref, dict):
source = input_ref.get("node") or input_ref.get("input")
else:
source = input_ref
if source:
edges.append({
"data": {
"source": source,
"target": node_id,
}
})
return {"nodes": nodes, "edges": edges}

113
app/services/run_service.py Normal file
View File

@@ -0,0 +1,113 @@
"""
Run Service - business logic for run management.
"""
from typing import Optional, List, Dict, Any
import json
class RunService:
"""
Service for managing recipe runs.
Handles run lifecycle, plan loading, and result aggregation.
"""
def __init__(self, redis, cache):
self.redis = redis
self.cache = cache
self.run_prefix = "run:"
async def get_run(self, run_id: str) -> Optional[Dict[str, Any]]:
"""Get a run by ID."""
data = self.redis.get(f"{self.run_prefix}{run_id}")
if not data:
return None
return json.loads(data)
async def list_runs(self, actor_id: str, page: int = 1, limit: int = 20) -> Dict[str, Any]:
"""List runs for a user with pagination."""
# Get all runs and filter by actor
# TODO: Use Redis index for efficient filtering
all_runs = []
cursor = 0
while True:
cursor, keys = self.redis.scan(
cursor=cursor,
match=f"{self.run_prefix}*",
count=100
)
for key in keys:
data = self.redis.get(key)
if data:
run = json.loads(data)
if run.get("actor_id") == actor_id or run.get("username") == actor_id:
all_runs.append(run)
if cursor == 0:
break
# Sort by created_at descending
all_runs.sort(key=lambda r: r.get("created_at", ""), reverse=True)
# Paginate
total = len(all_runs)
start = (page - 1) * limit
end = start + limit
runs = all_runs[start:end]
return {
"runs": runs,
"pagination": {
"page": page,
"limit": limit,
"total": total,
"has_more": end < total,
}
}
async def create_run(
self,
run_id: str,
recipe_id: str,
inputs: Dict[str, str],
actor_id: str,
) -> Dict[str, Any]:
"""Create a new run."""
from datetime import datetime
run = {
"run_id": run_id,
"recipe": f"recipe:{recipe_id}",
"inputs": inputs,
"actor_id": actor_id,
"status": "pending",
"created_at": datetime.utcnow().isoformat(),
}
self.redis.set(f"{self.run_prefix}{run_id}", json.dumps(run))
return run
async def update_run(self, run_id: str, updates: Dict[str, Any]) -> Optional[Dict[str, Any]]:
"""Update a run's fields."""
run = await self.get_run(run_id)
if not run:
return None
run.update(updates)
self.redis.set(f"{self.run_prefix}{run_id}", json.dumps(run))
return run
async def delete_run(self, run_id: str) -> bool:
"""Delete a run."""
return self.redis.delete(f"{self.run_prefix}{run_id}") > 0
async def load_plan(self, run_id: str) -> Optional[Dict[str, Any]]:
"""Load execution plan for a run."""
from ..config import settings
plan_path = settings.plan_cache_dir / f"{run_id}.json"
if plan_path.exists():
with open(plan_path) as f:
return json.load(f)
return None