Add streaming endpoint to runs router
- Add /runs/stream POST endpoint for streaming recipes - Accepts recipe_sexp, sources_sexp, audio_sexp - Submits to run_stream Celery task Co-Authored-By: Claude Opus 4.5 <noreply@anthropic.com>
This commit is contained in:
@@ -113,6 +113,16 @@ class RunStatus(BaseModel):
|
|||||||
infrastructure: Optional[Dict[str, Any]] = None
|
infrastructure: Optional[Dict[str, Any]] = None
|
||||||
|
|
||||||
|
|
||||||
|
class StreamRequest(BaseModel):
|
||||||
|
"""Request to run a streaming recipe."""
|
||||||
|
recipe_sexp: str # The recipe S-expression content
|
||||||
|
output_name: str = "output.mp4"
|
||||||
|
duration: Optional[float] = None # Duration in seconds
|
||||||
|
fps: Optional[float] = None # FPS override
|
||||||
|
sources_sexp: Optional[str] = None # Sources config S-expression
|
||||||
|
audio_sexp: Optional[str] = None # Audio config S-expression
|
||||||
|
|
||||||
|
|
||||||
def get_run_service():
|
def get_run_service():
|
||||||
"""Get run service instance."""
|
"""Get run service instance."""
|
||||||
import database
|
import database
|
||||||
@@ -142,6 +152,64 @@ async def create_run(
|
|||||||
return run
|
return run
|
||||||
|
|
||||||
|
|
||||||
|
@router.post("/stream", response_model=RunStatus)
|
||||||
|
async def create_stream_run(
|
||||||
|
request: StreamRequest,
|
||||||
|
ctx: UserContext = Depends(require_auth),
|
||||||
|
redis = Depends(get_redis_client),
|
||||||
|
):
|
||||||
|
"""Start a streaming video render.
|
||||||
|
|
||||||
|
The recipe_sexp should be a complete streaming recipe with
|
||||||
|
(stream ...) form defining the pipeline.
|
||||||
|
|
||||||
|
Assets can be referenced by CID or friendly name in the recipe.
|
||||||
|
"""
|
||||||
|
import uuid
|
||||||
|
from tasks.streaming import run_stream
|
||||||
|
|
||||||
|
run_id = str(uuid.uuid4())
|
||||||
|
created_at = datetime.now(timezone.utc).isoformat()
|
||||||
|
|
||||||
|
# Submit Celery task
|
||||||
|
task = run_stream.delay(
|
||||||
|
recipe_sexp=request.recipe_sexp,
|
||||||
|
output_name=request.output_name,
|
||||||
|
duration=request.duration,
|
||||||
|
fps=request.fps,
|
||||||
|
actor_id=ctx.actor_id,
|
||||||
|
sources_sexp=request.sources_sexp,
|
||||||
|
audio_sexp=request.audio_sexp,
|
||||||
|
)
|
||||||
|
|
||||||
|
# Store run metadata in Redis
|
||||||
|
run_data = {
|
||||||
|
"run_id": run_id,
|
||||||
|
"status": "pending",
|
||||||
|
"recipe": "streaming",
|
||||||
|
"actor_id": ctx.actor_id,
|
||||||
|
"created_at": created_at,
|
||||||
|
"celery_task_id": task.id,
|
||||||
|
"output_name": request.output_name,
|
||||||
|
}
|
||||||
|
|
||||||
|
await redis.set(
|
||||||
|
f"{RUNS_KEY_PREFIX}{run_id}",
|
||||||
|
json.dumps(run_data),
|
||||||
|
ex=86400 * 7 # 7 days
|
||||||
|
)
|
||||||
|
|
||||||
|
logger.info(f"Started stream run {run_id} with task {task.id}")
|
||||||
|
|
||||||
|
return RunStatus(
|
||||||
|
run_id=run_id,
|
||||||
|
status="pending",
|
||||||
|
recipe="streaming",
|
||||||
|
created_at=created_at,
|
||||||
|
celery_task_id=task.id,
|
||||||
|
)
|
||||||
|
|
||||||
|
|
||||||
@router.get("/{run_id}")
|
@router.get("/{run_id}")
|
||||||
async def get_run(
|
async def get_run(
|
||||||
request: Request,
|
request: Request,
|
||||||
|
|||||||
Reference in New Issue
Block a user