Files
celery/app/routers/api.py
gilesb 92d26b2b72 Rename content_hash/output_hash to cid throughout
Refactor to use IPFS CID as the primary content identifier:
- Update database schema: content_hash -> cid, output_hash -> output_cid
- Update all services, routers, and tasks to use cid terminology
- Update HTML templates to display CID instead of hash
- Update cache_manager parameter names
- Update README documentation

This completes the transition to CID-only content addressing.

Co-Authored-By: Claude Opus 4.5 <noreply@anthropic.com>
2026-01-12 08:02:44 +00:00

258 lines
7.3 KiB
Python

"""
3-phase API routes for L1 server.
Provides the plan/execute/run-recipe endpoints for programmatic access.
"""
import hashlib
import json
import logging
import uuid
from datetime import datetime, timezone
from typing import Dict, List, Optional
from fastapi import APIRouter, Depends, HTTPException
from pydantic import BaseModel
from artdag_common.middleware.auth import UserContext
from ..dependencies import require_auth, get_redis_client, get_cache_manager
router = APIRouter()
logger = logging.getLogger(__name__)
# Redis key prefix
RUNS_KEY_PREFIX = "artdag:run:"
class PlanRequest(BaseModel):
recipe_sexp: str
input_hashes: Dict[str, str]
class ExecutePlanRequest(BaseModel):
plan_json: str
run_id: Optional[str] = None
class RecipeRunRequest(BaseModel):
recipe_sexp: str
input_hashes: Dict[str, str]
def compute_run_id(input_hashes: List[str], recipe: str, recipe_hash: str = None) -> str:
"""Compute deterministic run_id from inputs and recipe."""
data = {
"inputs": sorted(input_hashes),
"recipe": recipe_hash or f"effect:{recipe}",
"version": "1",
}
json_str = json.dumps(data, sort_keys=True, separators=(",", ":"))
return hashlib.sha3_256(json_str.encode()).hexdigest()
@router.post("/plan")
async def generate_plan_endpoint(
request: PlanRequest,
ctx: UserContext = Depends(require_auth),
):
"""
Generate an execution plan without executing it.
Phase 1 (Analyze) + Phase 2 (Plan) of the 3-phase model.
Returns the plan with cache status for each step.
"""
from tasks.orchestrate import generate_plan
try:
task = generate_plan.delay(
recipe_sexp=request.recipe_sexp,
input_hashes=request.input_hashes,
)
# Wait for result (plan generation is usually fast)
result = task.get(timeout=60)
return {
"status": result.get("status"),
"recipe": result.get("recipe"),
"plan_id": result.get("plan_id"),
"total_steps": result.get("total_steps"),
"cached_steps": result.get("cached_steps"),
"pending_steps": result.get("pending_steps"),
"steps": result.get("steps"),
}
except Exception as e:
logger.error(f"Plan generation failed: {e}")
raise HTTPException(status_code=500, detail=str(e))
@router.post("/execute")
async def execute_plan_endpoint(
request: ExecutePlanRequest,
ctx: UserContext = Depends(require_auth),
):
"""
Execute a pre-generated execution plan.
Phase 3 (Execute) of the 3-phase model.
Submits the plan to Celery for parallel execution.
"""
from tasks.orchestrate import run_plan
run_id = request.run_id or str(uuid.uuid4())
try:
task = run_plan.delay(
plan_json=request.plan_json,
run_id=run_id,
)
return {
"status": "submitted",
"run_id": run_id,
"celery_task_id": task.id,
}
except Exception as e:
logger.error(f"Plan execution failed: {e}")
raise HTTPException(status_code=500, detail=str(e))
@router.post("/run-recipe")
async def run_recipe_endpoint(
request: RecipeRunRequest,
ctx: UserContext = Depends(require_auth),
):
"""
Run a complete recipe through all 3 phases.
1. Analyze: Extract features from inputs
2. Plan: Generate execution plan with cache IDs
3. Execute: Run steps with parallel execution
Returns immediately with run_id. Poll /api/run/{run_id} for status.
"""
from tasks.orchestrate import run_recipe
from artdag.sexp import compile_string
import database
redis = get_redis_client()
cache = get_cache_manager()
# Parse recipe name from S-expression
try:
compiled = compile_string(request.recipe_sexp)
recipe_name = compiled.name or "unknown"
except Exception:
recipe_name = "unknown"
# Compute deterministic run_id
run_id = compute_run_id(
list(request.input_hashes.values()),
recipe_name,
hashlib.sha3_256(request.recipe_sexp.encode()).hexdigest()
)
# Check if already completed
cached = await database.get_run_cache(run_id)
if cached:
output_cid = cached.get("output_cid")
if cache.has_content(output_cid):
return {
"status": "completed",
"run_id": run_id,
"output_cid": output_cid,
"output_ipfs_cid": cache.get_ipfs_cid(output_cid),
"cached": True,
}
# Submit to Celery
try:
task = run_recipe.delay(
recipe_sexp=request.recipe_sexp,
input_hashes=request.input_hashes,
run_id=run_id,
)
# Store run status in Redis
run_data = {
"run_id": run_id,
"status": "pending",
"recipe": recipe_name,
"inputs": list(request.input_hashes.values()),
"celery_task_id": task.id,
"created_at": datetime.now(timezone.utc).isoformat(),
"username": ctx.actor_id,
}
redis.setex(
f"{RUNS_KEY_PREFIX}{run_id}",
86400,
json.dumps(run_data)
)
return {
"status": "submitted",
"run_id": run_id,
"celery_task_id": task.id,
"recipe": recipe_name,
}
except Exception as e:
logger.error(f"Recipe run failed: {e}")
raise HTTPException(status_code=500, detail=str(e))
@router.get("/run/{run_id}")
async def get_run_status(
run_id: str,
ctx: UserContext = Depends(require_auth),
):
"""Get status of a recipe execution run."""
import database
from celery.result import AsyncResult
redis = get_redis_client()
# Check Redis for run status
run_data = redis.get(f"{RUNS_KEY_PREFIX}{run_id}")
if run_data:
data = json.loads(run_data)
# If pending, check Celery task status
if data.get("status") == "pending" and data.get("celery_task_id"):
result = AsyncResult(data["celery_task_id"])
if result.ready():
if result.successful():
task_result = result.get()
data["status"] = task_result.get("status", "completed")
data["output_cid"] = task_result.get("output_cache_id")
data["output_ipfs_cid"] = task_result.get("output_ipfs_cid")
data["total_steps"] = task_result.get("total_steps")
data["cached"] = task_result.get("cached")
data["executed"] = task_result.get("executed")
# Update Redis
redis.setex(
f"{RUNS_KEY_PREFIX}{run_id}",
86400,
json.dumps(data)
)
else:
data["status"] = "failed"
data["error"] = str(result.result)
else:
data["celery_status"] = result.status
return data
# Check database cache
cached = await database.get_run_cache(run_id)
if cached:
return {
"run_id": run_id,
"status": "completed",
"output_cid": cached.get("output_cid"),
"cached": True,
}
raise HTTPException(status_code=404, detail="Run not found")