diff --git a/Dockerfile.gpu b/Dockerfile.gpu new file mode 100644 index 0000000..7a9d6a1 --- /dev/null +++ b/Dockerfile.gpu @@ -0,0 +1,44 @@ +# GPU-enabled worker image +# Based on NVIDIA CUDA with Python for CuPy support + +FROM nvidia/cuda:12.1.1-cudnn8-runtime-ubuntu22.04 + +WORKDIR /app + +# Install Python 3.11 and system dependencies +RUN apt-get update && apt-get install -y --no-install-recommends \ + python3.11 \ + python3.11-venv \ + python3-pip \ + git \ + ffmpeg \ + && rm -rf /var/lib/apt/lists/* \ + && ln -sf /usr/bin/python3.11 /usr/bin/python3 \ + && ln -sf /usr/bin/python3 /usr/bin/python + +# Upgrade pip +RUN python3 -m pip install --upgrade pip + +# Install CPU dependencies first +COPY requirements.txt . +RUN pip install --no-cache-dir -r requirements.txt + +# Install GPU-specific dependencies (CuPy for CUDA 12.x) +RUN pip install --no-cache-dir cupy-cuda12x + +# Copy application +COPY . . + +# Clone effects repo +RUN git clone https://git.rose-ash.com/art-dag/effects.git /app/artdag-effects + +# Create cache directory +RUN mkdir -p /data/cache + +ENV PYTHONUNBUFFERED=1 +ENV PYTHONDONTWRITEBYTECODE=1 +ENV EFFECTS_PATH=/app/artdag-effects +ENV PYTHONPATH=/app + +# Default command runs celery worker +CMD ["celery", "-A", "celery_app", "worker", "--loglevel=info", "-E", "-Q", "gpu,celery"] diff --git a/app/routers/runs.py b/app/routers/runs.py index 8704b24..94a4fce 100644 --- a/app/routers/runs.py +++ b/app/routers/runs.py @@ -227,16 +227,19 @@ async def create_stream_run( logger.warning(f"Failed to store recipe in cache: {e}") # Continue anyway - run will still work, just won't appear in /recipes - # Submit Celery task - task = run_stream.delay( - run_id=run_id, - recipe_sexp=request.recipe_sexp, - output_name=request.output_name, - duration=request.duration, - fps=request.fps, - actor_id=actor_id, - sources_sexp=request.sources_sexp, - audio_sexp=request.audio_sexp, + # Submit Celery task to GPU queue for hardware-accelerated rendering + task = run_stream.apply_async( + kwargs=dict( + run_id=run_id, + recipe_sexp=request.recipe_sexp, + output_name=request.output_name, + duration=request.duration, + fps=request.fps, + actor_id=actor_id, + sources_sexp=request.sources_sexp, + audio_sexp=request.audio_sexp, + ), + queue='gpu', ) # Store in database for durability @@ -396,7 +399,7 @@ async def get_run( artifacts = [] output_media_type = None if run.get("output_cid"): - # Detect media type using magic bytes + # Detect media type using magic bytes, fall back to database item_type output_cid = run["output_cid"] media_type = None try: @@ -408,6 +411,16 @@ async def get_run( output_media_type = media_type except Exception: pass + # Fall back to database item_type if local detection failed + if not media_type: + try: + import database + item_types = await database.get_item_types(output_cid, run.get("actor_id")) + if item_types: + media_type = type_to_mime(item_types[0].get("type")) + output_media_type = media_type + except Exception: + pass artifacts.append({ "cid": output_cid, "step_name": "Output", @@ -963,18 +976,44 @@ async def stream_run_output( request: Request, ): """Stream the video output of a running render. - - Returns the partial video file as it's being written, - allowing live preview of the render progress. + + For IPFS HLS streams, redirects to the IPFS gateway playlist. + For local HLS streams, redirects to the m3u8 playlist. + For legacy MP4 streams, returns the file directly. """ - from fastapi.responses import StreamingResponse, FileResponse + from fastapi.responses import StreamingResponse, FileResponse, RedirectResponse from pathlib import Path import os + import database + from celery_app import app as celery_app + + await database.init_db() + + # Check for IPFS HLS streaming first (distributed P2P streaming) + pending = await database.get_pending_run(run_id) + if pending and pending.get("celery_task_id"): + task_id = pending["celery_task_id"] + result = celery_app.AsyncResult(task_id) + if result.ready() and isinstance(result.result, dict): + ipfs_playlist_url = result.result.get("ipfs_playlist_url") + if ipfs_playlist_url: + logger.info(f"Redirecting to IPFS stream: {ipfs_playlist_url}") + return RedirectResponse(url=ipfs_playlist_url, status_code=302) - # Check for the streaming output file in the shared cache cache_dir = os.environ.get("CACHE_DIR", "/data/cache") - stream_path = Path(cache_dir) / "streaming" / run_id / "output.mp4" + stream_dir = Path(cache_dir) / "streaming" / run_id + # Check for local HLS output + hls_playlist = stream_dir / "stream.m3u8" + if hls_playlist.exists(): + # Redirect to the HLS playlist endpoint + return RedirectResponse( + url=f"/runs/{run_id}/hls/stream.m3u8", + status_code=302 + ) + + # Fall back to legacy MP4 streaming + stream_path = stream_dir / "output.mp4" if not stream_path.exists(): raise HTTPException(404, "Stream not available yet") @@ -982,7 +1021,6 @@ async def stream_run_output( if file_size == 0: raise HTTPException(404, "Stream not ready") - # Return the file with headers that allow streaming of growing file return FileResponse( path=str(stream_path), media_type="video/mp4", @@ -992,3 +1030,139 @@ async def stream_run_output( "X-Content-Size": str(file_size), } ) + + +@router.get("/{run_id}/hls/{filename:path}") +async def serve_hls_content( + run_id: str, + filename: str, + request: Request, +): + """Serve HLS playlist and segments for live streaming. + + Serves stream.m3u8 (playlist) and segment_*.ts files. + The playlist updates as new segments are rendered. + + If files aren't found locally, proxies to the GPU worker (if configured). + """ + from fastapi.responses import FileResponse, StreamingResponse + from pathlib import Path + import os + import httpx + + cache_dir = os.environ.get("CACHE_DIR", "/data/cache") + stream_dir = Path(cache_dir) / "streaming" / run_id + file_path = stream_dir / filename + + # Security: ensure we're only serving files within stream_dir + try: + file_path_resolved = file_path.resolve() + stream_dir_resolved = stream_dir.resolve() + if stream_dir.exists() and not str(file_path_resolved).startswith(str(stream_dir_resolved)): + raise HTTPException(403, "Invalid path") + except Exception: + pass # Allow proxy fallback + + # Determine content type + if filename.endswith(".m3u8"): + media_type = "application/vnd.apple.mpegurl" + headers = { + "Cache-Control": "no-cache, no-store, must-revalidate", + "Access-Control-Allow-Origin": "*", + } + elif filename.endswith(".ts"): + media_type = "video/mp2t" + headers = { + "Cache-Control": "public, max-age=3600", + "Access-Control-Allow-Origin": "*", + } + else: + raise HTTPException(400, "Invalid file type") + + # Try local file first + if file_path.exists(): + return FileResponse( + path=str(file_path), + media_type=media_type, + headers=headers, + ) + + # Fallback: proxy to GPU worker if configured + gpu_worker_url = os.environ.get("GPU_WORKER_STREAM_URL") + if gpu_worker_url: + # Proxy request to GPU worker + proxy_url = f"{gpu_worker_url}/{run_id}/{filename}" + try: + async with httpx.AsyncClient(timeout=30.0) as client: + resp = await client.get(proxy_url) + if resp.status_code == 200: + return StreamingResponse( + content=iter([resp.content]), + media_type=media_type, + headers=headers, + ) + except Exception as e: + logger.warning(f"GPU worker proxy failed: {e}") + + raise HTTPException(404, f"File not found: {filename}") + + +@router.get("/{run_id}/ipfs-stream") +async def get_ipfs_stream_info(run_id: str, request: Request): + """Get IPFS streaming info for a run. + + Returns the IPFS playlist URL and segment info if available. + This allows clients to stream directly from IPFS gateways. + """ + from celery_app import app as celery_app + import database + import os + + await database.init_db() + + # Try to get pending run to find the Celery task ID + pending = await database.get_pending_run(run_id) + if not pending: + # Try completed runs + run = await database.get_run_cache(run_id) + if not run: + raise HTTPException(404, "Run not found") + # For completed runs, check if we have IPFS info stored + ipfs_cid = run.get("ipfs_cid") + if ipfs_cid: + gateway = os.environ.get("IPFS_GATEWAY_URL", "https://ipfs.io/ipfs") + return { + "run_id": run_id, + "status": "completed", + "ipfs_video_url": f"{gateway}/{ipfs_cid}", + } + raise HTTPException(404, "No IPFS stream info available") + + task_id = pending.get("celery_task_id") + if not task_id: + raise HTTPException(404, "No task ID for this run") + + # Get the Celery task result + result = celery_app.AsyncResult(task_id) + + if result.ready(): + # Task is complete - check the result for IPFS playlist info + task_result = result.result + if isinstance(task_result, dict): + ipfs_playlist_cid = task_result.get("ipfs_playlist_cid") + ipfs_playlist_url = task_result.get("ipfs_playlist_url") + if ipfs_playlist_url: + return { + "run_id": run_id, + "status": "completed", + "ipfs_playlist_cid": ipfs_playlist_cid, + "ipfs_playlist_url": ipfs_playlist_url, + "segment_count": task_result.get("ipfs_segment_count", 0), + } + + # Task is still running or no IPFS info available + return { + "run_id": run_id, + "status": pending.get("status", "pending"), + "message": "IPFS streaming info not yet available" + } diff --git a/app/services/cache_service.py b/app/services/cache_service.py index a4d54f6..ddc50a4 100644 --- a/app/services/cache_service.py +++ b/app/services/cache_service.py @@ -100,30 +100,52 @@ class CacheService: async def get_cache_item(self, cid: str, actor_id: str = None) -> Optional[Dict[str, Any]]: """Get cached item with full metadata for display.""" - # Check if content exists - if not self.cache.has_content(cid): - return None - - path = self.cache.get_by_cid(cid) - if not path or not path.exists(): - return None - - # Get metadata from database + # Get metadata from database first meta = await self.db.load_item_metadata(cid, actor_id) cache_item = await self.db.get_cache_item(cid) - media_type = detect_media_type(path) - mime_type = get_mime_type(path) - size = path.stat().st_size + # Check if content exists locally + path = self.cache.get_by_cid(cid) if self.cache.has_content(cid) else None + + if path and path.exists(): + # Local file exists - detect type from file + media_type = detect_media_type(path) + mime_type = get_mime_type(path) + size = path.stat().st_size + else: + # File not local - check database for type info + # Try to get type from item_types table + media_type = "unknown" + mime_type = "application/octet-stream" + size = 0 + + if actor_id: + try: + item_types = await self.db.get_item_types(cid, actor_id) + if item_types: + media_type = item_types[0].get("type", "unknown") + if media_type == "video": + mime_type = "video/mp4" + elif media_type == "image": + mime_type = "image/png" + elif media_type == "audio": + mime_type = "audio/mpeg" + except Exception: + pass + + # If no local path but we have IPFS CID, content is available remotely + if not cache_item: + return None result = { "cid": cid, - "path": str(path), + "path": str(path) if path else None, "media_type": media_type, "mime_type": mime_type, "size": size, "ipfs_cid": cache_item.get("ipfs_cid") if cache_item else None, "meta": meta, + "remote_only": path is None or not path.exists(), } # Unpack meta fields to top level for template convenience diff --git a/app/templates/cache/detail.html b/app/templates/cache/detail.html index c4f7915..da30119 100644 --- a/app/templates/cache/detail.html +++ b/app/templates/cache/detail.html @@ -13,17 +13,32 @@
{% if cache.mime_type and cache.mime_type.startswith('image/') %} + {% if cache.remote_only and cache.ipfs_cid %} + + {% else %} + {% endif %} {% elif cache.mime_type and cache.mime_type.startswith('video/') %} + {% if cache.remote_only and cache.ipfs_cid %} + + {% else %} + {% endif %} {% elif cache.mime_type and cache.mime_type.startswith('audio/') %}
+ {% if cache.remote_only and cache.ipfs_cid %} + + {% else %} + {% endif %}
{% elif cache.mime_type == 'application/json' %} diff --git a/app/templates/runs/detail.html b/app/templates/runs/detail.html index 989319b..e0cbb72 100644 --- a/app/templates/runs/detail.html +++ b/app/templates/runs/detail.html @@ -7,6 +7,7 @@ + {% endblock %} {% block content %} @@ -73,6 +74,174 @@
+ + {% if run.status == 'rendering' %} +
+
+

+ + Live Preview +

+
Connecting...
+
+
+ +
+
+
+
Waiting for stream...
+
+
+
+
+ Stream URL: /runs/{{ run.run_id }}/hls/stream.m3u8 +
+
+ + + {% endif %} +