diff --git a/.gitea/workflows/ci.yml b/.gitea/workflows/ci.yml new file mode 100644 index 0000000..09d98ea --- /dev/null +++ b/.gitea/workflows/ci.yml @@ -0,0 +1,91 @@ +name: Build and Deploy + +on: + push: + branches: [main] + +env: + REGISTRY: registry.rose-ash.com:5000 + IMAGE_CPU: celery-l1-server + IMAGE_GPU: celery-l1-gpu-server + +jobs: + build-and-deploy: + runs-on: ubuntu-latest + steps: + - uses: actions/checkout@v4 + + - name: Set up SSH + env: + SSH_KEY: ${{ secrets.DEPLOY_SSH_KEY }} + DEPLOY_HOST: ${{ secrets.DEPLOY_HOST }} + run: | + mkdir -p ~/.ssh + echo "$SSH_KEY" > ~/.ssh/id_rsa + chmod 600 ~/.ssh/id_rsa + ssh-keyscan -H "$DEPLOY_HOST" >> ~/.ssh/known_hosts 2>/dev/null || true + + - name: Sync code to server + env: + DEPLOY_HOST: ${{ secrets.DEPLOY_HOST }} + run: | + rsync -avz --delete \ + --exclude '.git' \ + --exclude '__pycache__' \ + --exclude '*.pyc' \ + --exclude '.pytest_cache' \ + --exclude 'venv' \ + ./ "root@$DEPLOY_HOST:/root/art-dag/celery/" + + - name: Build and push CPU image + env: + DEPLOY_HOST: ${{ secrets.DEPLOY_HOST }} + run: | + ssh "root@$DEPLOY_HOST" " + cd /root/art-dag/celery + docker build --build-arg CACHEBUST=\$(date +%s) -t ${{ env.REGISTRY }}/${{ env.IMAGE_CPU }}:latest -t ${{ env.REGISTRY }}/${{ env.IMAGE_CPU }}:${{ github.sha }} . + docker push ${{ env.REGISTRY }}/${{ env.IMAGE_CPU }}:latest + docker push ${{ env.REGISTRY }}/${{ env.IMAGE_CPU }}:${{ github.sha }} + " + + - name: Build and push GPU image + env: + DEPLOY_HOST: ${{ secrets.DEPLOY_HOST }} + run: | + ssh "root@$DEPLOY_HOST" " + cd /root/art-dag/celery + docker build --build-arg CACHEBUST=\$(date +%s) -f Dockerfile.gpu -t ${{ env.REGISTRY }}/${{ env.IMAGE_GPU }}:latest -t ${{ env.REGISTRY }}/${{ env.IMAGE_GPU }}:${{ github.sha }} . + docker push ${{ env.REGISTRY }}/${{ env.IMAGE_GPU }}:latest + docker push ${{ env.REGISTRY }}/${{ env.IMAGE_GPU }}:${{ github.sha }} + " + + - name: Deploy stack + env: + DEPLOY_HOST: ${{ secrets.DEPLOY_HOST }} + run: | + ssh "root@$DEPLOY_HOST" " + cd /root/art-dag/celery + docker stack deploy -c docker-compose.yml celery + echo 'Waiting for services to update...' + sleep 10 + docker stack services celery + " + + - name: Deploy GPU worker + env: + GPU_HOST: ${{ secrets.GPU_HOST }} + SSH_KEY: ${{ secrets.GPU_SSH_KEY }} + if: ${{ env.GPU_HOST != '' }} + run: | + # Set up GPU SSH if different host + if [ -n "$SSH_KEY" ]; then + echo "$SSH_KEY" > ~/.ssh/gpu_key + chmod 600 ~/.ssh/gpu_key + ssh-keyscan -H "${GPU_HOST#*@}" >> ~/.ssh/known_hosts 2>/dev/null || true + + ssh -i ~/.ssh/gpu_key "$GPU_HOST" " + docker pull ${{ env.REGISTRY }}/${{ env.IMAGE_GPU }}:latest + docker stack deploy -c /root/art-dag/celery/docker-compose.yml celery || \ + docker service update --image ${{ env.REGISTRY }}/${{ env.IMAGE_GPU }}:latest celery_l1-gpu-worker + " + fi diff --git a/app/routers/runs.py b/app/routers/runs.py index 36beb23..29c7d25 100644 --- a/app/routers/runs.py +++ b/app/routers/runs.py @@ -1071,6 +1071,159 @@ async def purge_failed_runs( return {"purged": len(deleted), "run_ids": deleted} +@router.post("/{run_id}/pause") +async def pause_run( + run_id: str, + request: Request, + ctx: UserContext = Depends(require_auth), +): + """Pause a running render. Waits for current segment to complete. + + The render will checkpoint at the next segment boundary and stop. + """ + import database + from celery_app import app as celery_app + + await database.init_db() + + pending = await database.get_pending_run(run_id) + if not pending: + raise HTTPException(404, "Run not found") + + if pending['status'] != 'running': + raise HTTPException(400, f"Can only pause running renders (current status: {pending['status']})") + + # Revoke the Celery task (soft termination via SIGTERM - allows cleanup) + celery_task_id = pending.get('celery_task_id') + if celery_task_id: + celery_app.control.revoke(celery_task_id, terminate=True, signal='SIGTERM') + logger.info(f"Sent SIGTERM to task {celery_task_id} for run {run_id}") + + # Update status to 'paused' + await database.update_pending_run_status(run_id, 'paused') + + return { + "run_id": run_id, + "status": "paused", + "checkpoint_frame": pending.get('checkpoint_frame'), + } + + +@router.post("/{run_id}/resume") +async def resume_run( + run_id: str, + request: Request, + ctx: UserContext = Depends(require_auth), +): + """Resume a paused or failed run from its last checkpoint. + + The render will continue from the checkpoint frame. + """ + import database + from tasks.streaming import run_stream + + await database.init_db() + + pending = await database.get_pending_run(run_id) + if not pending: + raise HTTPException(404, "Run not found") + + if pending['status'] not in ('failed', 'paused'): + raise HTTPException(400, f"Can only resume failed/paused runs (current status: {pending['status']})") + + if not pending.get('checkpoint_frame'): + raise HTTPException(400, "No checkpoint available - use restart instead") + + if not pending.get('resumable', True): + raise HTTPException(400, "Run checkpoint is corrupted - use restart instead") + + # Submit new Celery task with resume=True + task = run_stream.apply_async( + kwargs=dict( + run_id=run_id, + recipe_sexp=pending.get('dag_json', ''), # Recipe is stored in dag_json + output_name=pending.get('output_name', 'output.mp4'), + actor_id=pending.get('actor_id'), + resume=True, + ), + queue='gpu', + ) + + # Update status and celery_task_id + await database.update_pending_run_status(run_id, 'running') + + # Update the celery_task_id manually since create_pending_run isn't called + async with database.pool.acquire() as conn: + await conn.execute( + "UPDATE pending_runs SET celery_task_id = $2, updated_at = NOW() WHERE run_id = $1", + run_id, task.id + ) + + logger.info(f"Resumed run {run_id} from frame {pending.get('checkpoint_frame')} with task {task.id}") + + return { + "run_id": run_id, + "status": "running", + "celery_task_id": task.id, + "resumed_from_frame": pending.get('checkpoint_frame'), + } + + +@router.post("/{run_id}/restart") +async def restart_run( + run_id: str, + request: Request, + ctx: UserContext = Depends(require_auth), +): + """Restart a failed/paused run from the beginning (discard checkpoint). + + All progress will be lost. Use resume instead to continue from checkpoint. + """ + import database + from tasks.streaming import run_stream + + await database.init_db() + + pending = await database.get_pending_run(run_id) + if not pending: + raise HTTPException(404, "Run not found") + + if pending['status'] not in ('failed', 'paused'): + raise HTTPException(400, f"Can only restart failed/paused runs (current status: {pending['status']})") + + # Clear checkpoint data + await database.clear_run_checkpoint(run_id) + + # Submit new Celery task (without resume) + task = run_stream.apply_async( + kwargs=dict( + run_id=run_id, + recipe_sexp=pending.get('dag_json', ''), # Recipe is stored in dag_json + output_name=pending.get('output_name', 'output.mp4'), + actor_id=pending.get('actor_id'), + resume=False, + ), + queue='gpu', + ) + + # Update status and celery_task_id + await database.update_pending_run_status(run_id, 'running') + + async with database.pool.acquire() as conn: + await conn.execute( + "UPDATE pending_runs SET celery_task_id = $2, updated_at = NOW() WHERE run_id = $1", + run_id, task.id + ) + + logger.info(f"Restarted run {run_id} from beginning with task {task.id}") + + return { + "run_id": run_id, + "status": "running", + "celery_task_id": task.id, + } + + @router.get("/{run_id}/stream") async def stream_run_output( run_id: str, diff --git a/app/services/run_service.py b/app/services/run_service.py index ae538c5..5bfe19d 100644 --- a/app/services/run_service.py +++ b/app/services/run_service.py @@ -150,6 +150,7 @@ class RunService: "inputs": self._ensure_inputs_list(cached.get("inputs")), "output_cid": output_cid, "ipfs_cid": cached.get("ipfs_cid"), + "ipfs_playlist_cid": cached.get("ipfs_playlist_cid") or (pending.get("ipfs_playlist_cid") if pending else None), "provenance_cid": cached.get("provenance_cid"), "plan_cid": cached.get("plan_cid"), "actor_id": cached.get("actor_id"), @@ -174,6 +175,7 @@ class RunService: status_map = { "pending": "pending", "started": "running", + "rendering": "running", # Custom status from streaming task "success": "completed", "failure": "failed", "retry": "running", @@ -192,6 +194,14 @@ class RunService: "created_at": pending.get("created_at"), "error": pending.get("error"), "recipe_sexp": pending.get("dag_json"), # Recipe content for streaming runs + # Checkpoint fields for resumable renders + "checkpoint_frame": pending.get("checkpoint_frame"), + "checkpoint_t": pending.get("checkpoint_t"), + "total_frames": pending.get("total_frames"), + "resumable": pending.get("resumable", True), + # IPFS streaming info + "ipfs_playlist_cid": pending.get("ipfs_playlist_cid"), + "quality_playlists": pending.get("quality_playlists"), } # If task completed, get result @@ -227,6 +237,14 @@ class RunService: "created_at": pending.get("created_at"), "error": pending.get("error"), "recipe_sexp": pending.get("dag_json"), # Recipe content for streaming runs + # Checkpoint fields for resumable renders + "checkpoint_frame": pending.get("checkpoint_frame"), + "checkpoint_t": pending.get("checkpoint_t"), + "total_frames": pending.get("total_frames"), + "resumable": pending.get("resumable", True), + # IPFS streaming info + "ipfs_playlist_cid": pending.get("ipfs_playlist_cid"), + "quality_playlists": pending.get("quality_playlists"), } # Fallback: Check Redis for backwards compatibility @@ -272,6 +290,7 @@ class RunService: status_map = { "pending": "pending", "started": "running", + "rendering": "running", # Custom status from streaming task "success": "completed", "failure": "failed", "retry": "running", diff --git a/app/templates/runs/detail.html b/app/templates/runs/detail.html index 7bfaa18..c7abb2d 100644 --- a/app/templates/runs/detail.html +++ b/app/templates/runs/detail.html @@ -11,7 +11,7 @@ {% endblock %} {% block content %} -{% set status_colors = {'completed': 'green', 'running': 'blue', 'pending': 'yellow', 'failed': 'red'} %} +{% set status_colors = {'completed': 'green', 'running': 'blue', 'pending': 'yellow', 'failed': 'red', 'paused': 'yellow'} %} {% set color = status_colors.get(run.status, 'gray') %}
@@ -28,7 +28,42 @@ {% if run.error %} {{ run.error }} {% endif %} + {% if run.checkpoint_frame %} + + Checkpoint: {{ run.checkpoint_frame }}{% if run.total_frames %} / {{ run.total_frames }}{% endif %} frames + + {% endif %}
+ + + {% if run.status == 'running' %} + + {% endif %} + + + {% if run.status in ['failed', 'paused'] %} + {% if run.checkpoint_frame %} + + {% endif %} + + {% endif %} + {% if run.recipe %}
- - {% if run.status == 'rendering' or run.ipfs_playlist_cid %} + + {% if run.status == 'rendering' or run.ipfs_playlist_cid or (run.status in ['paused', 'failed'] and run.checkpoint_frame) %}

{% if run.status == 'rendering' %} Live Preview + {% elif run.status == 'paused' %} + + Partial Output (Paused) + {% elif run.status == 'failed' and run.checkpoint_frame %} + + Partial Output (Failed) {% else %} Video @@ -144,12 +185,15 @@ const baseUrl = '/runs/{{ run.run_id }}/playlist.m3u8'; const isRendering = {{ 'true' if run.status == 'rendering' else 'false' }}; + const isPausedOrFailed = {{ 'true' if run.status in ['paused', 'failed'] else 'false' }}; let hls = null; let retryCount = 0; const maxRetries = 120; let segmentsLoaded = 0; - let currentMode = isRendering ? 'live' : 'replay'; // Default based on status + // Start in replay mode for paused/failed (shows partial output from start) + // Start in live mode for rendering (follows the render progress) + let currentMode = isRendering ? 'live' : 'replay'; function getHlsUrl() { return baseUrl + '?_t=' + Date.now(); diff --git a/database.py b/database.py index 3da697d..4e58f42 100644 --- a/database.py +++ b/database.py @@ -111,6 +111,27 @@ BEGIN WHERE table_name = 'pending_runs' AND column_name = 'quality_playlists') THEN ALTER TABLE pending_runs ADD COLUMN quality_playlists JSONB; END IF; + -- Checkpoint columns for resumable renders + IF NOT EXISTS (SELECT 1 FROM information_schema.columns + WHERE table_name = 'pending_runs' AND column_name = 'checkpoint_frame') THEN + ALTER TABLE pending_runs ADD COLUMN checkpoint_frame INTEGER; + END IF; + IF NOT EXISTS (SELECT 1 FROM information_schema.columns + WHERE table_name = 'pending_runs' AND column_name = 'checkpoint_t') THEN + ALTER TABLE pending_runs ADD COLUMN checkpoint_t FLOAT; + END IF; + IF NOT EXISTS (SELECT 1 FROM information_schema.columns + WHERE table_name = 'pending_runs' AND column_name = 'checkpoint_scans') THEN + ALTER TABLE pending_runs ADD COLUMN checkpoint_scans JSONB; + END IF; + IF NOT EXISTS (SELECT 1 FROM information_schema.columns + WHERE table_name = 'pending_runs' AND column_name = 'total_frames') THEN + ALTER TABLE pending_runs ADD COLUMN total_frames INTEGER; + END IF; + IF NOT EXISTS (SELECT 1 FROM information_schema.columns + WHERE table_name = 'pending_runs' AND column_name = 'resumable') THEN + ALTER TABLE pending_runs ADD COLUMN resumable BOOLEAN DEFAULT TRUE; + END IF; END $$; CREATE INDEX IF NOT EXISTS idx_pending_runs_status ON pending_runs(status); @@ -1530,7 +1551,9 @@ async def get_pending_run(run_id: str) -> Optional[dict]: async with pool.acquire() as conn: row = await conn.fetchrow( """ - SELECT run_id, celery_task_id, status, recipe, inputs, dag_json, plan_cid, output_name, actor_id, error, ipfs_playlist_cid, quality_playlists, created_at, updated_at + SELECT run_id, celery_task_id, status, recipe, inputs, dag_json, plan_cid, output_name, actor_id, error, + ipfs_playlist_cid, quality_playlists, checkpoint_frame, checkpoint_t, checkpoint_scans, + total_frames, resumable, created_at, updated_at FROM pending_runs WHERE run_id = $1 """, run_id @@ -1544,6 +1567,10 @@ async def get_pending_run(run_id: str) -> Optional[dict]: quality_playlists = row.get("quality_playlists") if isinstance(quality_playlists, str): quality_playlists = _json.loads(quality_playlists) + # Parse checkpoint_scans if it's a string + checkpoint_scans = row.get("checkpoint_scans") + if isinstance(checkpoint_scans, str): + checkpoint_scans = _json.loads(checkpoint_scans) return { "run_id": row["run_id"], "celery_task_id": row["celery_task_id"], @@ -1557,6 +1584,11 @@ async def get_pending_run(run_id: str) -> Optional[dict]: "error": row["error"], "ipfs_playlist_cid": row["ipfs_playlist_cid"], "quality_playlists": quality_playlists, + "checkpoint_frame": row.get("checkpoint_frame"), + "checkpoint_t": row.get("checkpoint_t"), + "checkpoint_scans": checkpoint_scans, + "total_frames": row.get("total_frames"), + "resumable": row.get("resumable", True), "created_at": row["created_at"].isoformat() if row["created_at"] else None, "updated_at": row["updated_at"].isoformat() if row["updated_at"] else None, } @@ -1666,6 +1698,109 @@ async def update_pending_run_playlist(run_id: str, ipfs_playlist_cid: str, quali return "UPDATE 1" in result +async def update_pending_run_checkpoint( + run_id: str, + checkpoint_frame: int, + checkpoint_t: float, + checkpoint_scans: Optional[dict] = None, + total_frames: Optional[int] = None, +) -> bool: + """Update checkpoint state for a streaming run. + + Called at segment boundaries to enable resume after failures. + + Args: + run_id: The run ID + checkpoint_frame: Last completed frame at segment boundary + checkpoint_t: Time value for checkpoint frame + checkpoint_scans: Accumulated scan state {scan_name: state_dict} + total_frames: Total expected frames (for progress %) + """ + if pool is None: + raise RuntimeError("Database pool not initialized - call init_db() first") + async with pool.acquire() as conn: + result = await conn.execute( + """ + UPDATE pending_runs SET + checkpoint_frame = $2, + checkpoint_t = $3, + checkpoint_scans = $4, + total_frames = COALESCE($5, total_frames), + updated_at = NOW() + WHERE run_id = $1 + """, + run_id, + checkpoint_frame, + checkpoint_t, + _json.dumps(checkpoint_scans) if checkpoint_scans else None, + total_frames, + ) + return "UPDATE 1" in result + + +async def get_run_checkpoint(run_id: str) -> Optional[dict]: + """Get checkpoint data for resuming a run. + + Returns: + Dict with checkpoint_frame, checkpoint_t, checkpoint_scans, quality_playlists, etc. + or None if no checkpoint exists + """ + if pool is None: + raise RuntimeError("Database pool not initialized - call init_db() first") + async with pool.acquire() as conn: + row = await conn.fetchrow( + """ + SELECT checkpoint_frame, checkpoint_t, checkpoint_scans, total_frames, + quality_playlists, ipfs_playlist_cid, resumable + FROM pending_runs WHERE run_id = $1 + """, + run_id + ) + if row and row.get("checkpoint_frame") is not None: + # Parse JSONB fields + checkpoint_scans = row.get("checkpoint_scans") + if isinstance(checkpoint_scans, str): + checkpoint_scans = _json.loads(checkpoint_scans) + quality_playlists = row.get("quality_playlists") + if isinstance(quality_playlists, str): + quality_playlists = _json.loads(quality_playlists) + return { + "frame_num": row["checkpoint_frame"], + "t": row["checkpoint_t"], + "scans": checkpoint_scans or {}, + "total_frames": row.get("total_frames"), + "quality_playlists": quality_playlists, + "ipfs_playlist_cid": row.get("ipfs_playlist_cid"), + "resumable": row.get("resumable", True), + } + return None + + +async def clear_run_checkpoint(run_id: str) -> bool: + """Clear checkpoint data for a run (used on restart). + + Args: + run_id: The run ID + """ + if pool is None: + raise RuntimeError("Database pool not initialized - call init_db() first") + async with pool.acquire() as conn: + result = await conn.execute( + """ + UPDATE pending_runs SET + checkpoint_frame = NULL, + checkpoint_t = NULL, + checkpoint_scans = NULL, + quality_playlists = NULL, + ipfs_playlist_cid = NULL, + updated_at = NOW() + WHERE run_id = $1 + """, + run_id, + ) + return "UPDATE 1" in result + + async def complete_pending_run(run_id: str) -> bool: """Remove a pending run after it completes (moves to run_cache).""" async with pool.acquire() as conn: diff --git a/deploy.sh b/deploy.sh index bae1673..a2d6e69 100755 --- a/deploy.sh +++ b/deploy.sh @@ -7,13 +7,13 @@ echo "=== Pulling latest code ===" git pull echo "=== Building Docker image ===" -docker build --build-arg CACHEBUST=$(date +%s) -t git.rose-ash.com/art-dag/l1-server:latest . +docker build --build-arg CACHEBUST=$(date +%s) -t registry.rose-ash.com:5000/celery-l1-server:latest . + +echo "=== Pushing to registry ===" +docker push registry.rose-ash.com:5000/celery-l1-server:latest echo "=== Redeploying celery stack ===" docker stack deploy -c docker-compose.yml celery -echo "=== Restarting proxy nginx ===" -docker service update --force proxy_nginx - echo "=== Done ===" docker stack services celery diff --git a/diagnose_gpu.py b/diagnose_gpu.py new file mode 100755 index 0000000..5136139 --- /dev/null +++ b/diagnose_gpu.py @@ -0,0 +1,249 @@ +#!/usr/bin/env python3 +""" +GPU Rendering Diagnostic Script + +Checks for common issues that cause GPU rendering slowdowns in art-dag. +Run this script to identify potential performance bottlenecks. +""" + +import sys +import subprocess +import os + +def print_section(title): + print(f"\n{'='*60}") + print(f" {title}") + print(f"{'='*60}") + +def check_pass(msg): + print(f" [PASS] {msg}") + +def check_fail(msg): + print(f" [FAIL] {msg}") + +def check_warn(msg): + print(f" [WARN] {msg}") + +def check_info(msg): + print(f" [INFO] {msg}") + +# ============================================================ +# 1. Check GPU Availability +# ============================================================ +print_section("1. GPU AVAILABILITY") + +# Check nvidia-smi +try: + result = subprocess.run(["nvidia-smi", "--query-gpu=name,memory.total,memory.free,utilization.gpu", + "--format=csv,noheader"], capture_output=True, text=True, timeout=5) + if result.returncode == 0: + for line in result.stdout.strip().split('\n'): + check_pass(f"GPU found: {line}") + else: + check_fail("nvidia-smi failed - no GPU detected") +except FileNotFoundError: + check_fail("nvidia-smi not found - NVIDIA drivers not installed") +except Exception as e: + check_fail(f"nvidia-smi error: {e}") + +# ============================================================ +# 2. Check CuPy +# ============================================================ +print_section("2. CUPY (GPU ARRAY LIBRARY)") + +try: + import cupy as cp + check_pass(f"CuPy available, version {cp.__version__}") + + # Test basic GPU operation + try: + a = cp.zeros((100, 100), dtype=cp.uint8) + cp.cuda.Stream.null.synchronize() + check_pass("CuPy GPU operations working") + + # Check memory + mempool = cp.get_default_memory_pool() + check_info(f"GPU memory pool: {mempool.used_bytes() / 1024**2:.1f} MB used, " + f"{mempool.total_bytes() / 1024**2:.1f} MB total") + except Exception as e: + check_fail(f"CuPy GPU test failed: {e}") +except ImportError: + check_fail("CuPy not installed - GPU rendering disabled") + +# ============================================================ +# 3. Check PyNvVideoCodec (GPU Encoding) +# ============================================================ +print_section("3. PYNVVIDEOCODEC (GPU ENCODING)") + +try: + import PyNvVideoCodec as nvc + check_pass("PyNvVideoCodec available - zero-copy GPU encoding enabled") +except ImportError: + check_warn("PyNvVideoCodec not available - using FFmpeg NVENC (slower)") + +# ============================================================ +# 4. Check Decord GPU (Hardware Decode) +# ============================================================ +print_section("4. DECORD GPU (HARDWARE DECODE)") + +try: + import decord + from decord import gpu + ctx = gpu(0) + check_pass(f"Decord GPU (NVDEC) available - hardware video decode enabled") +except ImportError: + check_warn("Decord not installed - using FFmpeg decode") +except Exception as e: + check_warn(f"Decord GPU not available ({e}) - using FFmpeg decode") + +# ============================================================ +# 5. Check DLPack Support +# ============================================================ +print_section("5. DLPACK (ZERO-COPY TRANSFER)") + +try: + import decord + from decord import VideoReader, gpu + import cupy as cp + + # Need a test video file + test_video = None + for path in ["/data/cache", "/tmp"]: + if os.path.exists(path): + for f in os.listdir(path): + if f.endswith(('.mp4', '.webm', '.mkv')): + test_video = os.path.join(path, f) + break + if test_video: + break + + if test_video: + try: + vr = VideoReader(test_video, ctx=gpu(0)) + frame = vr[0] + dlpack = frame.to_dlpack() + gpu_frame = cp.from_dlpack(dlpack) + check_pass(f"DLPack zero-copy working (tested with {os.path.basename(test_video)})") + except Exception as e: + check_fail(f"DLPack FAILED: {e}") + check_info("This means every frame does GPU->CPU->GPU copy (SLOW)") + else: + check_warn("No test video found - cannot verify DLPack") +except ImportError: + check_warn("Cannot test DLPack - decord or cupy not available") + +# ============================================================ +# 6. Check Fast CUDA Kernels +# ============================================================ +print_section("6. FAST CUDA KERNELS (JIT COMPILED)") + +try: + sys.path.insert(0, '/root/art-dag/celery') + from streaming.jit_compiler import ( + fast_rotate, fast_zoom, fast_blend, fast_hue_shift, + fast_invert, fast_ripple, get_fast_ops + ) + check_pass("Fast CUDA kernels loaded successfully") + + # Test one kernel + try: + import cupy as cp + test_img = cp.zeros((720, 1280, 3), dtype=cp.uint8) + result = fast_rotate(test_img, 45.0) + cp.cuda.Stream.null.synchronize() + check_pass("Fast rotate kernel working") + except Exception as e: + check_fail(f"Fast kernel execution failed: {e}") +except ImportError as e: + check_warn(f"Fast CUDA kernels not available: {e}") + check_info("Fallback to slower CuPy operations") + +# ============================================================ +# 7. Check Fused Pipeline Compiler +# ============================================================ +print_section("7. FUSED PIPELINE COMPILER") + +try: + sys.path.insert(0, '/root/art-dag/celery') + from streaming.sexp_to_cuda import compile_frame_pipeline, compile_autonomous_pipeline + check_pass("Fused CUDA pipeline compiler available") +except ImportError as e: + check_warn(f"Fused pipeline compiler not available: {e}") + check_info("Using per-operation fallback (slower for multi-effect pipelines)") + +# ============================================================ +# 8. Check FFmpeg NVENC +# ============================================================ +print_section("8. FFMPEG NVENC (HARDWARE ENCODE)") + +try: + result = subprocess.run(["ffmpeg", "-encoders"], capture_output=True, text=True, timeout=5) + if "h264_nvenc" in result.stdout: + check_pass("FFmpeg h264_nvenc encoder available") + else: + check_warn("FFmpeg h264_nvenc not available - using libx264 (CPU)") + + if "hevc_nvenc" in result.stdout: + check_pass("FFmpeg hevc_nvenc encoder available") +except Exception as e: + check_fail(f"FFmpeg check failed: {e}") + +# ============================================================ +# 9. Check FFmpeg NVDEC +# ============================================================ +print_section("9. FFMPEG NVDEC (HARDWARE DECODE)") + +try: + result = subprocess.run(["ffmpeg", "-hwaccels"], capture_output=True, text=True, timeout=5) + if "cuda" in result.stdout: + check_pass("FFmpeg CUDA hwaccel available") + else: + check_warn("FFmpeg CUDA hwaccel not available - using CPU decode") +except Exception as e: + check_fail(f"FFmpeg hwaccel check failed: {e}") + +# ============================================================ +# 10. Check Pipeline Cache Status +# ============================================================ +print_section("10. PIPELINE CACHE STATUS") + +try: + sys.path.insert(0, '/root/art-dag/celery') + from sexp_effects.primitive_libs.streaming_gpu import ( + _FUSED_PIPELINE_CACHE, _AUTONOMOUS_PIPELINE_CACHE + ) + fused_count = len(_FUSED_PIPELINE_CACHE) + auto_count = len(_AUTONOMOUS_PIPELINE_CACHE) + + if fused_count > 0 or auto_count > 0: + check_info(f"Fused pipeline cache: {fused_count} entries") + check_info(f"Autonomous pipeline cache: {auto_count} entries") + if fused_count > 100 or auto_count > 100: + check_warn("Large pipeline cache - may cause memory pressure") + else: + check_info("Pipeline caches empty (no rendering done yet)") +except Exception as e: + check_info(f"Could not check pipeline cache: {e}") + +# ============================================================ +# Summary +# ============================================================ +print_section("SUMMARY") +print(""" +Optimal GPU rendering requires: + 1. [CRITICAL] CuPy with working GPU operations + 2. [CRITICAL] DLPack zero-copy transfer (decord -> CuPy) + 3. [HIGH] Fast CUDA kernels from jit_compiler + 4. [MEDIUM] Fused pipeline compiler for multi-effect recipes + 5. [MEDIUM] PyNvVideoCodec for zero-copy encoding + 6. [LOW] FFmpeg NVENC/NVDEC as fallback + +If DLPack is failing, check: + - decord version (needs 0.6.0+ with DLPack support) + - CuPy version compatibility + - CUDA toolkit version match + +If fast kernels are not loading: + - Check if streaming/jit_compiler.py exists + - Verify CUDA compiler (nvcc) is available +""") diff --git a/docker-compose.yml b/docker-compose.yml index 0923c7d..9138b4c 100644 --- a/docker-compose.yml +++ b/docker-compose.yml @@ -64,7 +64,7 @@ services: - node.labels.gpu != true l1-server: - image: git.rose-ash.com/art-dag/l1-server:latest + image: registry.rose-ash.com:5000/celery-l1-server:latest env_file: - .env environment: @@ -100,7 +100,7 @@ services: - node.labels.gpu != true l1-worker: - image: git.rose-ash.com/art-dag/l1-server:latest + image: registry.rose-ash.com:5000/celery-l1-server:latest command: sh -c "find /app -type d -name __pycache__ -exec rm -rf {} + 2>/dev/null; celery -A celery_app worker --loglevel=info -E" environment: - REDIS_URL=redis://redis:6379/5 @@ -149,10 +149,10 @@ services: - node.labels.gpu != true # GPU worker for streaming/rendering tasks - # Build: docker build -f Dockerfile.gpu -t git.rose-ash.com/art-dag/l1-gpu-server:latest . + # Build: docker build -f Dockerfile.gpu -t registry.rose-ash.com:5000/celery-l1-gpu-server:latest . # Requires: docker node update --label-add gpu=true l1-gpu-worker: - image: git.rose-ash.com/art-dag/l1-gpu-server:latest + image: registry.rose-ash.com:5000/celery-l1-gpu-server:latest # For local dev, uncomment to build from Dockerfile.gpu: # build: # context: . diff --git a/streaming/multi_res_output.py b/streaming/multi_res_output.py index 722b677..40c661a 100644 --- a/streaming/multi_res_output.py +++ b/streaming/multi_res_output.py @@ -59,7 +59,21 @@ class MultiResolutionHLSOutput: ipfs_gateway: str = "https://ipfs.io/ipfs", on_playlist_update: callable = None, audio_source: str = None, + resume_from: Optional[Dict] = None, ): + """Initialize multi-resolution HLS output. + + Args: + output_dir: Directory for HLS output files + source_size: (width, height) of source frames + fps: Frames per second + segment_duration: Duration of each HLS segment in seconds + ipfs_gateway: IPFS gateway URL for playlist URLs + on_playlist_update: Callback when playlists are updated + audio_source: Optional audio file to mux with video + resume_from: Optional dict to resume from checkpoint with keys: + - segment_cids: Dict of quality -> {seg_num: cid} + """ self.output_dir = Path(output_dir) self.output_dir.mkdir(parents=True, exist_ok=True) self.source_width, self.source_height = source_size @@ -75,6 +89,13 @@ class MultiResolutionHLSOutput: self.qualities: Dict[str, QualityLevel] = {} self._setup_quality_levels() + # Restore segment CIDs if resuming (don't re-upload existing segments) + if resume_from and resume_from.get('segment_cids'): + for name, cids in resume_from['segment_cids'].items(): + if name in self.qualities: + self.qualities[name].segment_cids = dict(cids) + print(f"[MultiResHLS] Restored {len(cids)} segment CIDs for {name}", file=sys.stderr) + # IPFS client from ipfs_client import add_file, add_bytes self._ipfs_add_file = add_file diff --git a/streaming/stream_sexp_generic.py b/streaming/stream_sexp_generic.py index bc0160e..427832b 100644 --- a/streaming/stream_sexp_generic.py +++ b/streaming/stream_sexp_generic.py @@ -128,6 +128,14 @@ class StreamInterpreter: # Signature: on_progress(percent: float, frame_num: int, total_frames: int) self.on_progress: callable = None + # Callback for checkpoint saves (called at segment boundaries for resumability) + # Signature: on_checkpoint(checkpoint: dict) + # checkpoint contains: frame_num, t, scans + self.on_checkpoint: callable = None + + # Frames per segment for checkpoint timing (default 4 seconds at 30fps = 120 frames) + self._frames_per_segment: int = 120 + def _resolve_name(self, name: str) -> Optional[Path]: """Resolve a friendly name to a file path using the naming service.""" try: @@ -989,8 +997,40 @@ class StreamInterpreter: else: scan['state'] = {'acc': new_state} - def run(self, duration: float = None, output: str = "pipe"): - """Run the streaming pipeline.""" + def _restore_checkpoint(self, checkpoint: dict): + """Restore scan states from a checkpoint. + + Called when resuming a render from a previous checkpoint. + + Args: + checkpoint: Dict with 'scans' key containing {scan_name: state_dict} + """ + scans_state = checkpoint.get('scans', {}) + for name, state in scans_state.items(): + if name in self.scans: + self.scans[name]['state'] = dict(state) + print(f"Restored scan '{name}' state from checkpoint", file=sys.stderr) + + def _get_checkpoint_state(self) -> dict: + """Get current scan states for checkpointing. + + Returns: + Dict mapping scan names to their current state dicts + """ + return {name: dict(scan['state']) for name, scan in self.scans.items()} + + def run(self, duration: float = None, output: str = "pipe", resume_from: dict = None): + """Run the streaming pipeline. + + Args: + duration: Duration in seconds (auto-detected from audio if None) + output: Output mode ("pipe", "preview", path/hls, path/ipfs-hls, or file path) + resume_from: Checkpoint dict to resume from, with keys: + - frame_num: Last completed frame + - t: Time value for checkpoint frame + - scans: Dict of scan states to restore + - segment_cids: Dict of quality -> {seg_num: cid} for output resume + """ # Import output classes - handle both package and direct execution try: from .output import PipeOutput, DisplayOutput, FileOutput, HLSOutput, IPFSHLSOutput @@ -1010,6 +1050,11 @@ class StreamInterpreter: self._init() + # Restore checkpoint state if resuming + if resume_from: + self._restore_checkpoint(resume_from) + print(f"Resuming from frame {resume_from.get('frame_num', 0)}", file=sys.stderr) + if not self.frame_pipeline: print("Error: no (frame ...) pipeline defined", file=sys.stderr) return @@ -1061,6 +1106,12 @@ class StreamInterpreter: hls_dir = output[:-9] # Remove /ipfs-hls suffix import os ipfs_gateway = os.environ.get("IPFS_GATEWAY_URL", "https://ipfs.io/ipfs") + + # Build resume state for output if resuming + output_resume = None + if resume_from and resume_from.get('segment_cids'): + output_resume = {'segment_cids': resume_from['segment_cids']} + # Use multi-resolution output (renders original + 720p + 360p) if MultiResolutionHLSOutput is not None: print(f"[StreamInterpreter] Using multi-resolution HLS output ({w}x{h} + 720p + 360p)", file=sys.stderr) @@ -1071,6 +1122,7 @@ class StreamInterpreter: ipfs_gateway=ipfs_gateway, on_playlist_update=self.on_playlist_update, audio_source=audio, + resume_from=output_resume, ) # Fallback to GPU single-resolution if multi-res not available elif GPUHLSOutput is not None and check_gpu_encode_available(): @@ -1083,6 +1135,16 @@ class StreamInterpreter: else: out = FileOutput(output, size=(w, h), fps=fps, audio_source=audio) + # Calculate frames per segment based on fps and segment duration (4 seconds default) + segment_duration = 4.0 + self._frames_per_segment = int(fps * segment_duration) + + # Determine start frame (resume from checkpoint + 1, or 0) + start_frame = 0 + if resume_from and resume_from.get('frame_num') is not None: + start_frame = resume_from['frame_num'] + 1 + print(f"Starting from frame {start_frame} (checkpoint was at {resume_from['frame_num']})", file=sys.stderr) + try: frame_times = [] profile_interval = 30 # Profile every N frames @@ -1090,7 +1152,7 @@ class StreamInterpreter: eval_times = [] write_times = [] - for frame_num in range(n_frames): + for frame_num in range(start_frame, n_frames): if not out.is_open: break @@ -1127,6 +1189,19 @@ class StreamInterpreter: frame_elapsed = time.time() - frame_start frame_times.append(frame_elapsed) + # Checkpoint at segment boundaries (every _frames_per_segment frames) + if frame_num > 0 and frame_num % self._frames_per_segment == 0: + if self.on_checkpoint: + try: + checkpoint = { + 'frame_num': frame_num, + 't': ctx.t, + 'scans': self._get_checkpoint_state(), + } + self.on_checkpoint(checkpoint) + except Exception as e: + print(f"Warning: checkpoint callback failed: {e}", file=sys.stderr) + # Progress with timing and profile breakdown if frame_num % profile_interval == 0 and frame_num > 0: pct = 100 * frame_num / n_frames diff --git a/tasks/streaming.py b/tasks/streaming.py index df2c519..7ac6057 100644 --- a/tasks/streaming.py +++ b/tasks/streaming.py @@ -3,11 +3,13 @@ Streaming video rendering task. Executes S-expression recipes for frame-by-frame video processing. Supports CID and friendly name references for assets. +Supports pause/resume/restart for long renders. """ import hashlib import logging import os +import signal import sys import tempfile from pathlib import Path @@ -23,6 +25,11 @@ from cache_manager import get_cache_manager logger = logging.getLogger(__name__) + +class PauseRequested(Exception): + """Raised when user requests pause via SIGTERM.""" + pass + # Debug: verify module is being loaded print(f"DEBUG MODULE LOAD: tasks/streaming.py loaded at {__file__}", file=sys.stderr) @@ -246,6 +253,7 @@ def run_stream( actor_id: Optional[str] = None, sources_sexp: Optional[str] = None, audio_sexp: Optional[str] = None, + resume: bool = False, ) -> dict: """ Execute a streaming S-expression recipe. @@ -259,13 +267,25 @@ def run_stream( actor_id: User ID for friendly name resolution sources_sexp: Optional sources config S-expression audio_sexp: Optional audio config S-expression + resume: If True, load checkpoint and resume from where we left off Returns: Dict with output_cid, output_path, and status """ global _resolve_loop, _db_initialized task_id = self.request.id - logger.info(f"Starting stream task {task_id} for run {run_id}") + logger.info(f"Starting stream task {task_id} for run {run_id} (resume={resume})") + + # Handle graceful pause (SIGTERM from Celery revoke) + pause_requested = False + original_sigterm = signal.getsignal(signal.SIGTERM) + + def handle_sigterm(signum, frame): + nonlocal pause_requested + pause_requested = True + logger.info(f"Pause requested for run {run_id} (SIGTERM received)") + + signal.signal(signal.SIGTERM, handle_sigterm) self.update_state(state='INITIALIZING', meta={'progress': 0}) @@ -311,6 +331,28 @@ def run_stream( # Import the streaming interpreter from streaming.stream_sexp_generic import StreamInterpreter + # Load checkpoint if resuming + checkpoint = None + if resume: + import asyncio + import database + try: + if _resolve_loop is None or _resolve_loop.is_closed(): + _resolve_loop = asyncio.new_event_loop() + asyncio.set_event_loop(_resolve_loop) + _db_initialized = False + if not _db_initialized: + _resolve_loop.run_until_complete(database.init_db()) + _db_initialized = True + checkpoint = _resolve_loop.run_until_complete(database.get_run_checkpoint(run_id)) + if checkpoint: + logger.info(f"Loaded checkpoint for run {run_id}: frame {checkpoint.get('frame_num')}") + else: + logger.warning(f"No checkpoint found for run {run_id}, starting from beginning") + except Exception as e: + logger.error(f"Failed to load checkpoint: {e}") + checkpoint = None + # Create interpreter (pass actor_id for friendly name resolution) interp = StreamInterpreter(str(recipe_path), actor_id=actor_id) @@ -361,6 +403,7 @@ def run_stream( # Set up progress callback to update Celery task state def on_progress(pct, frame_num, total_frames): + nonlocal pause_requested # Scale progress: 5% (start) to 85% (before caching) scaled_progress = 5 + (pct * 0.8) # 5% to 85% self.update_state(state='RENDERING', meta={ @@ -372,9 +415,94 @@ def run_stream( interp.on_progress = on_progress + # Set up checkpoint callback to save state at segment boundaries + def on_checkpoint(ckpt): + """Save checkpoint state to database.""" + nonlocal pause_requested + global _resolve_loop, _db_initialized + import asyncio + import database + + try: + if _resolve_loop is None or _resolve_loop.is_closed(): + _resolve_loop = asyncio.new_event_loop() + asyncio.set_event_loop(_resolve_loop) + _db_initialized = False + if not _db_initialized: + _resolve_loop.run_until_complete(database.init_db()) + _db_initialized = True + + # Get total frames from interpreter config + total_frames = None + if hasattr(interp, 'output') and hasattr(interp.output, '_frame_count'): + # Estimate total frames based on duration + fps_val = interp.config.get('fps', 30) + for name, val in interp.globals.items(): + if hasattr(val, 'duration'): + total_frames = int(val.duration * fps_val) + break + + _resolve_loop.run_until_complete(database.update_pending_run_checkpoint( + run_id=run_id, + checkpoint_frame=ckpt['frame_num'], + checkpoint_t=ckpt['t'], + checkpoint_scans=ckpt.get('scans'), + total_frames=total_frames, + )) + logger.info(f"Saved checkpoint for run {run_id}: frame {ckpt['frame_num']}") + + # Check if pause was requested after checkpoint + if pause_requested: + logger.info(f"Pause requested after checkpoint, raising PauseRequested") + raise PauseRequested("Render paused by user") + + except PauseRequested: + raise # Re-raise to stop the render + except Exception as e: + logger.error(f"Failed to save checkpoint: {e}") + + interp.on_checkpoint = on_checkpoint + + # Build resume state for the interpreter (includes segment CIDs for output) + resume_from = None + if checkpoint: + resume_from = { + 'frame_num': checkpoint.get('frame_num'), + 't': checkpoint.get('t'), + 'scans': checkpoint.get('scans', {}), + } + # Add segment CIDs if available (from quality_playlists in checkpoint) + # Note: We need to extract segment_cids from the output's state, which isn't + # directly stored. For now, the output will re-check existing segments on disk. + # Run rendering to file - logger.info(f"Rendering to {output_path}") - interp.run(duration=duration, output=str(output_path)) + logger.info(f"Rendering to {output_path}" + (f" (resuming from frame {resume_from['frame_num']})" if resume_from else "")) + render_paused = False + try: + interp.run(duration=duration, output=str(output_path), resume_from=resume_from) + except PauseRequested: + # Graceful pause - checkpoint already saved + render_paused = True + logger.info(f"Render paused for run {run_id}") + + # Restore original signal handler + signal.signal(signal.SIGTERM, original_sigterm) + + if render_paused: + import asyncio + import database + try: + if _resolve_loop is None or _resolve_loop.is_closed(): + _resolve_loop = asyncio.new_event_loop() + asyncio.set_event_loop(_resolve_loop) + _db_initialized = False + if not _db_initialized: + _resolve_loop.run_until_complete(database.init_db()) + _db_initialized = True + _resolve_loop.run_until_complete(database.update_pending_run_status(run_id, 'paused')) + except Exception as e: + logger.error(f"Failed to update status to paused: {e}") + return {"status": "paused", "run_id": run_id, "task_id": task_id} # Check for interpreter errors if interp.errors: @@ -395,7 +523,6 @@ def run_stream( # Update pending run with playlist CID for live HLS redirect if ipfs_playlist_cid: - global _resolve_loop, _db_initialized import asyncio import database try: