Files
mono/artdag/l1/app/dependencies.py
giles 1a74d811f7 Incorporate art-dag-mono repo into artdag/ subfolder
Merges full history from art-dag/mono.git into the monorepo
under the artdag/ directory. Contains: core (DAG engine),
l1 (Celery rendering server), l2 (ActivityPub registry),
common (shared templates/middleware), client (CLI), test (e2e).

Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>

git-subtree-dir: artdag
git-subtree-mainline: 1a179de547
git-subtree-split: 4c2e716558
2026-02-27 09:07:23 +00:00

266 lines
7.2 KiB
Python

"""
FastAPI dependency injection container.
Provides shared resources and services to route handlers.
"""
from functools import lru_cache
from typing import Optional
import asyncio
from fastapi import Request, Depends, HTTPException
from jinja2 import Environment
from artdag_common.middleware.auth import UserContext, get_user_from_cookie, get_user_from_header
from .config import settings
# Lazy imports to avoid circular dependencies
_redis_client = None
_cache_manager = None
_database = None
def get_redis_client():
"""Get the Redis client singleton."""
global _redis_client
if _redis_client is None:
import redis
_redis_client = redis.from_url(settings.redis_url, decode_responses=True)
return _redis_client
def get_cache_manager():
"""Get the cache manager singleton."""
global _cache_manager
if _cache_manager is None:
from cache_manager import get_cache_manager as _get_cache_manager
_cache_manager = _get_cache_manager()
return _cache_manager
def get_database():
"""Get the database singleton."""
global _database
if _database is None:
import database
_database = database
return _database
def get_templates(request: Request) -> Environment:
"""Get the Jinja2 environment from app state."""
return request.app.state.templates
async def _verify_opaque_grant(token: str) -> Optional[UserContext]:
"""Verify an opaque grant token via account server, with Redis cache."""
import httpx
import json
if not settings.internal_account_url:
return None
# Check L1 Redis cache first
cache_key = f"grant_verify:{token[:16]}"
try:
r = get_redis_client()
cached = r.get(cache_key)
if cached is not None:
if cached == "__invalid__":
return None
data = json.loads(cached)
return UserContext(
username=data["username"],
actor_id=data["actor_id"],
token=token,
email=data.get("email", ""),
)
except Exception:
pass
# Call account server
verify_url = f"{settings.internal_account_url.rstrip('/')}/auth/internal/verify-grant"
try:
async with httpx.AsyncClient(timeout=5.0) as client:
resp = await client.get(verify_url, params={"token": token})
if resp.status_code != 200:
return None
data = resp.json()
if not data.get("valid"):
# Cache negative result briefly
try:
r = get_redis_client()
r.set(cache_key, "__invalid__", ex=60)
except Exception:
pass
return None
except Exception:
return None
username = data.get("username", "")
display_name = data.get("display_name", "")
actor_id = f"@{username}" if username else ""
ctx = UserContext(
username=username,
actor_id=actor_id,
token=token,
email=username,
)
# Cache positive result for 5 minutes
try:
r = get_redis_client()
cache_data = json.dumps({
"username": username,
"actor_id": actor_id,
"email": username,
"display_name": display_name,
})
r.set(cache_key, cache_data, ex=300)
except Exception:
pass
return ctx
async def get_current_user(request: Request) -> Optional[UserContext]:
"""
Get the current user from request (cookie or header).
This is a permissive dependency - returns None if not authenticated.
Use require_auth for routes that require authentication.
"""
# Try header first (API clients — JWT tokens)
ctx = get_user_from_header(request)
if ctx:
return ctx
# Try opaque grant token (device flow / CLI tokens)
auth_header = request.headers.get("Authorization", "")
if auth_header.startswith("Bearer "):
token = auth_header[7:]
ctx = await _verify_opaque_grant(token)
if ctx:
return ctx
# Fall back to cookie (browser)
return get_user_from_cookie(request)
async def require_auth(request: Request) -> UserContext:
"""
Require authentication for a route.
Raises:
HTTPException 401 if not authenticated
HTTPException 302 redirect to login for HTML requests
"""
ctx = await get_current_user(request)
if ctx is None:
# Check if HTML request for redirect
accept = request.headers.get("accept", "")
if "text/html" in accept:
raise HTTPException(
status_code=302,
headers={"Location": "/auth/login"}
)
raise HTTPException(status_code=401, detail="Authentication required")
return ctx
async def get_user_context_from_cookie(request: Request) -> Optional[UserContext]:
"""
Legacy compatibility: get user from cookie.
Validates token with L2 server if configured.
"""
ctx = get_user_from_cookie(request)
if ctx is None:
return None
# If L2 server configured, could validate token here
# For now, trust the cookie
return ctx
# Service dependencies (lazy loading)
def get_run_service():
"""Get the run service."""
from .services.run_service import RunService
return RunService(
database=get_database(),
redis=get_redis_client(),
cache=get_cache_manager(),
)
def get_recipe_service():
"""Get the recipe service."""
from .services.recipe_service import RecipeService
return RecipeService(
redis=get_redis_client(), # Kept for API compatibility, not used
cache=get_cache_manager(),
)
def get_cache_service():
"""Get the cache service."""
from .services.cache_service import CacheService
return CacheService(
cache_manager=get_cache_manager(),
database=get_database(),
)
async def get_nav_counts(actor_id: Optional[str] = None) -> dict:
"""
Get counts for navigation bar display.
Returns dict with: runs, recipes, effects, media, storage
"""
counts = {}
try:
import database
counts["media"] = await database.count_user_items(actor_id) if actor_id else 0
except Exception:
pass
try:
recipe_service = get_recipe_service()
recipes = await recipe_service.list_recipes(actor_id)
counts["recipes"] = len(recipes)
except Exception:
pass
try:
run_service = get_run_service()
runs = await run_service.list_runs(actor_id)
counts["runs"] = len(runs)
except Exception:
pass
try:
# Effects are stored in _effects/ directory, not in cache
from pathlib import Path
cache_mgr = get_cache_manager()
effects_dir = Path(cache_mgr.cache_dir) / "_effects"
if effects_dir.exists():
counts["effects"] = len([d for d in effects_dir.iterdir() if d.is_dir()])
else:
counts["effects"] = 0
except Exception:
pass
try:
import database
storage_providers = await database.get_user_storage_providers(actor_id) if actor_id else []
counts["storage"] = len(storage_providers) if storage_providers else 0
except Exception:
pass
return counts