diff --git a/app/routers/runs.py b/app/routers/runs.py index 45fcfbf..f3cfefc 100644 --- a/app/routers/runs.py +++ b/app/routers/runs.py @@ -113,6 +113,16 @@ class RunStatus(BaseModel): infrastructure: Optional[Dict[str, Any]] = None +class StreamRequest(BaseModel): + """Request to run a streaming recipe.""" + recipe_sexp: str # The recipe S-expression content + output_name: str = "output.mp4" + duration: Optional[float] = None # Duration in seconds + fps: Optional[float] = None # FPS override + sources_sexp: Optional[str] = None # Sources config S-expression + audio_sexp: Optional[str] = None # Audio config S-expression + + def get_run_service(): """Get run service instance.""" import database @@ -142,6 +152,64 @@ async def create_run( return run +@router.post("/stream", response_model=RunStatus) +async def create_stream_run( + request: StreamRequest, + ctx: UserContext = Depends(require_auth), + redis = Depends(get_redis_client), +): + """Start a streaming video render. + + The recipe_sexp should be a complete streaming recipe with + (stream ...) form defining the pipeline. + + Assets can be referenced by CID or friendly name in the recipe. + """ + import uuid + from tasks.streaming import run_stream + + run_id = str(uuid.uuid4()) + created_at = datetime.now(timezone.utc).isoformat() + + # Submit Celery task + task = run_stream.delay( + recipe_sexp=request.recipe_sexp, + output_name=request.output_name, + duration=request.duration, + fps=request.fps, + actor_id=ctx.actor_id, + sources_sexp=request.sources_sexp, + audio_sexp=request.audio_sexp, + ) + + # Store run metadata in Redis + run_data = { + "run_id": run_id, + "status": "pending", + "recipe": "streaming", + "actor_id": ctx.actor_id, + "created_at": created_at, + "celery_task_id": task.id, + "output_name": request.output_name, + } + + await redis.set( + f"{RUNS_KEY_PREFIX}{run_id}", + json.dumps(run_data), + ex=86400 * 7 # 7 days + ) + + logger.info(f"Started stream run {run_id} with task {task.id}") + + return RunStatus( + run_id=run_id, + status="pending", + recipe="streaming", + created_at=created_at, + celery_task_id=task.id, + ) + + @router.get("/{run_id}") async def get_run( request: Request,