From ef3638d3cfef621fa530d9408c66d4cce47d5824 Mon Sep 17 00:00:00 2001 From: giles Date: Wed, 4 Feb 2026 10:25:56 +0000 Subject: [PATCH] Make HLS segment uploads async to prevent stuttering --- streaming/gpu_output.py | 84 ++++++++++++++++++++++++++++++----------- 1 file changed, 61 insertions(+), 23 deletions(-) diff --git a/streaming/gpu_output.py b/streaming/gpu_output.py index e765908..1aa498b 100644 --- a/streaming/gpu_output.py +++ b/streaming/gpu_output.py @@ -8,6 +8,8 @@ Frames stay on GPU throughout: CuPy → NV12 conversion → NVENC encoding. import numpy as np import subprocess import sys +import threading +import queue from pathlib import Path from typing import Tuple, Optional, Union import time @@ -211,6 +213,7 @@ class GPUHLSOutput: GPU-accelerated HLS output with IPFS upload. Uses zero-copy GPU encoding and writes HLS segments. + Uploads happen asynchronously in a background thread to avoid stuttering. """ def __init__( @@ -245,12 +248,18 @@ class GPUHLSOutput: # Track segment CIDs for IPFS self.segment_cids = {} self._playlist_cid = None + self._upload_lock = threading.Lock() # Import IPFS client from ipfs_client import add_file, add_bytes self._ipfs_add_file = add_file self._ipfs_add_bytes = add_bytes + # Background upload thread + self._upload_queue = queue.Queue() + self._upload_thread = threading.Thread(target=self._upload_worker, daemon=True) + self._upload_thread.start() + # Setup ffmpeg for muxing (takes raw H.264, outputs .ts segments) self._setup_muxer() @@ -303,51 +312,76 @@ class GPUHLSOutput: self._frames_in_segment = 0 self._check_upload_segments() + def _upload_worker(self): + """Background worker thread for async IPFS uploads.""" + while True: + try: + item = self._upload_queue.get(timeout=1.0) + if item is None: # Shutdown signal + break + seg_path, seg_num = item + self._do_upload(seg_path, seg_num) + except queue.Empty: + continue + except Exception as e: + print(f"Upload worker error: {e}", file=sys.stderr) + + def _do_upload(self, seg_path: Path, seg_num: int): + """Actually perform the upload (runs in background thread).""" + try: + cid = self._ipfs_add_file(seg_path, pin=True) + if cid: + with self._upload_lock: + self.segment_cids[seg_num] = cid + print(f"Added to IPFS: {seg_path.name} -> {cid}", file=sys.stderr) + self._update_playlist() + except Exception as e: + print(f"Failed to add to IPFS: {e}", file=sys.stderr) + def _check_upload_segments(self): - """Check for and upload new segments to IPFS.""" + """Check for and queue new segments for async IPFS upload.""" segments = sorted(self.output_dir.glob("segment_*.ts")) for seg_path in segments: seg_num = int(seg_path.stem.split("_")[1]) - if seg_num in self.segment_cids: - continue + with self._upload_lock: + if seg_num in self.segment_cids: + continue - # Check if segment is complete + # Check if segment is complete (quick check, no blocking) try: size1 = seg_path.stat().st_size if size1 == 0: continue - time.sleep(0.05) + # Quick non-blocking check + time.sleep(0.01) size2 = seg_path.stat().st_size if size1 != size2: continue except FileNotFoundError: continue - # Upload to IPFS - cid = self._ipfs_add_file(seg_path, pin=True) - if cid: - self.segment_cids[seg_num] = cid - print(f"Added to IPFS: {seg_path.name} -> {cid}", file=sys.stderr) - self._update_playlist() + # Queue for async upload (non-blocking!) + self._upload_queue.put((seg_path, seg_num)) def _update_playlist(self): """Generate and upload IPFS-aware playlist.""" - if not self.segment_cids: - return + with self._upload_lock: + if not self.segment_cids: + return - lines = [ - "#EXTM3U", - "#EXT-X-VERSION:3", - f"#EXT-X-TARGETDURATION:{int(self.segment_duration) + 1}", - "#EXT-X-MEDIA-SEQUENCE:0", - ] + lines = [ + "#EXTM3U", + "#EXT-X-VERSION:3", + f"#EXT-X-TARGETDURATION:{int(self.segment_duration) + 1}", + "#EXT-X-MEDIA-SEQUENCE:0", + ] - for seg_num in sorted(self.segment_cids.keys()): - cid = self.segment_cids[seg_num] - lines.append(f"#EXTINF:{self.segment_duration:.3f},") - lines.append(f"{self.ipfs_gateway}/{cid}") + for seg_num in sorted(self.segment_cids.keys()): + cid = self.segment_cids[seg_num] + lines.append(f"#EXTINF:{self.segment_duration:.3f},") + lines.append(f"{self.ipfs_gateway}/{cid}") playlist_content = "\n".join(lines) + "\n" @@ -381,6 +415,10 @@ class GPUHLSOutput: # Final segment upload self._check_upload_segments() + # Wait for pending uploads to complete + self._upload_queue.put(None) # Signal shutdown + self._upload_thread.join(timeout=30) + self._gpu_encoder.close() @property