diff --git a/.gitea/workflows/gpu-worker.yml b/.gitea/workflows/gpu-worker.yml new file mode 100644 index 0000000..1726a29 --- /dev/null +++ b/.gitea/workflows/gpu-worker.yml @@ -0,0 +1,74 @@ +name: GPU Worker CI/CD + +on: + push: + branches: [main] + paths: + - 'sexp_effects/**' + - 'streaming/**' + - 'tasks/**' + - 'Dockerfile.gpu' + - 'requirements.txt' + +jobs: + test: + runs-on: ubuntu-latest + steps: + - uses: actions/checkout@v3 + + - name: Set up Python + uses: actions/setup-python@v4 + with: + python-version: '3.11' + + - name: Install dependencies + run: | + pip install -r requirements.txt + pip install pytest + + - name: Run frame compatibility tests + run: | + pytest tests/test_frame_compatibility.py -v --ignore-glob='*gpu*' || true + # Note: GPU tests skipped on CI (no GPU), but CPU tests must pass + + deploy: + needs: test + runs-on: ubuntu-latest + if: github.ref == 'refs/heads/main' + steps: + - uses: actions/checkout@v3 + + - name: Deploy to GPU node + env: + GPU_HOST: ${{ secrets.GPU_HOST }} + SSH_KEY: ${{ secrets.GPU_SSH_KEY }} + run: | + # Set up SSH + mkdir -p ~/.ssh + echo "$SSH_KEY" > ~/.ssh/id_rsa + chmod 600 ~/.ssh/id_rsa + ssh-keyscan -H ${GPU_HOST#*@} >> ~/.ssh/known_hosts 2>/dev/null || true + + # Sync code + rsync -avz --delete \ + --exclude '.git' \ + --exclude '__pycache__' \ + --exclude '*.pyc' \ + --exclude '.pytest_cache' \ + ./ "$GPU_HOST:/root/art-dag/celery/" + + # Build and restart + ssh "$GPU_HOST" " + cd /root/art-dag/celery + docker build -t git.rose-ash.com/art-dag/l1-gpu-server:latest -f Dockerfile.gpu . + docker kill \$(docker ps -q -f name=l1-gpu-worker) 2>/dev/null || true + echo 'GPU worker restarted' + " + + - name: Verify deployment + env: + GPU_HOST: ${{ secrets.GPU_HOST }} + SSH_KEY: ${{ secrets.GPU_SSH_KEY }} + run: | + sleep 15 + ssh "$GPU_HOST" "docker logs --tail 20 \$(docker ps -q -f name=l1-gpu-worker)" diff --git a/app/config.py b/app/config.py index d89af0a..90f7540 100644 --- a/app/config.py +++ b/app/config.py @@ -2,9 +2,11 @@ L1 Server Configuration. Environment-based configuration with sensible defaults. +All config should go through this module - no direct os.environ calls elsewhere. """ import os +import sys from pathlib import Path from dataclasses import dataclass, field from typing import Optional @@ -52,6 +54,16 @@ class Settings: default_factory=lambda: os.environ.get("L2_DOMAIN") ) + # GPU/Streaming settings + streaming_gpu_persist: bool = field( + default_factory=lambda: os.environ.get("STREAMING_GPU_PERSIST", "0") == "1" + ) + ipfs_gateways: str = field( + default_factory=lambda: os.environ.get( + "IPFS_GATEWAYS", "https://ipfs.io,https://cloudflare-ipfs.com,https://dweb.link" + ) + ) + # Derived paths @property def plan_cache_dir(self) -> Path: @@ -68,5 +80,26 @@ class Settings: self.analysis_cache_dir.mkdir(parents=True, exist_ok=True) + def log_config(self, logger=None) -> None: + """Log all configuration values for debugging.""" + output = logger.info if logger else lambda x: print(x, file=sys.stderr) + output("=" * 60) + output("CONFIGURATION") + output("=" * 60) + output(f" cache_dir: {self.cache_dir}") + output(f" redis_url: {self.redis_url}") + output(f" database_url: {self.database_url[:50]}...") + output(f" ipfs_api: {self.ipfs_api}") + output(f" ipfs_gateway_url: {self.ipfs_gateway_url}") + output(f" ipfs_gateways: {self.ipfs_gateways[:50]}...") + output(f" streaming_gpu_persist: {self.streaming_gpu_persist}") + output(f" l2_server: {self.l2_server}") + output("=" * 60) + + # Singleton settings instance settings = Settings() + +# Log config on import if DEBUG or SHOW_CONFIG is set +if os.environ.get("DEBUG") or os.environ.get("SHOW_CONFIG"): + settings.log_config() diff --git a/celery_app.py b/celery_app.py index 13449df..4a843d2 100644 --- a/celery_app.py +++ b/celery_app.py @@ -6,17 +6,32 @@ Uses S-expression recipes with frame-by-frame processing. """ import os +import sys from celery import Celery +from celery.signals import worker_ready -REDIS_URL = os.environ.get('REDIS_URL', 'redis://localhost:6379/5') +# Use central config +sys.path.insert(0, os.path.dirname(os.path.abspath(__file__))) +from app.config import settings app = Celery( 'art_celery', - broker=REDIS_URL, - backend=REDIS_URL, + broker=settings.redis_url, + backend=settings.redis_url, include=['tasks', 'tasks.streaming', 'tasks.ipfs_upload'] ) + +@worker_ready.connect +def log_config_on_startup(sender, **kwargs): + """Log configuration when worker starts.""" + print("=" * 60, file=sys.stderr) + print("WORKER STARTED - CONFIGURATION", file=sys.stderr) + print("=" * 60, file=sys.stderr) + settings.log_config() + print(f"Worker: {sender}", file=sys.stderr) + print("=" * 60, file=sys.stderr) + app.conf.update( result_expires=86400 * 7, # 7 days - allow time for recovery after restarts task_serializer='json', diff --git a/docker-compose.gpu-dev.yml b/docker-compose.gpu-dev.yml new file mode 100644 index 0000000..1facb3b --- /dev/null +++ b/docker-compose.gpu-dev.yml @@ -0,0 +1,36 @@ +# GPU Worker Development Override +# +# Usage: docker stack deploy -c docker-compose.yml -c docker-compose.gpu-dev.yml celery +# Or for quick testing: docker-compose -f docker-compose.yml -f docker-compose.gpu-dev.yml up l1-gpu-worker +# +# Features: +# - Mounts source code for instant changes (no rebuild needed) +# - Uses watchmedo for auto-reload on file changes +# - Shows config on startup + +version: '3.8' + +services: + l1-gpu-worker: + # Override command to use watchmedo for auto-reload + command: > + sh -c " + pip install -q watchdog[watchmedo] 2>/dev/null || true; + echo '=== GPU WORKER DEV MODE ==='; + echo 'Source mounted - changes take effect on restart'; + echo 'Auto-reload enabled via watchmedo'; + env | grep -E 'STREAMING_GPU|IPFS_GATEWAY|REDIS|DATABASE' | sort; + echo '==========================='; + watchmedo auto-restart --directory=/app --pattern='*.py' --recursive -- \ + celery -A celery_app worker --loglevel=info -E -Q gpu,celery + " + environment: + # Development defaults (can override with .env) + - STREAMING_GPU_PERSIST=0 + - IPFS_GATEWAY_URL=https://celery-artdag.rose-ash.com/ipfs + - SHOW_CONFIG=1 + volumes: + # Mount source code for hot reload + - ./:/app:ro + # Keep cache local + - gpu_cache:/data/cache diff --git a/scripts/gpu-dev-deploy.sh b/scripts/gpu-dev-deploy.sh new file mode 100755 index 0000000..f1be595 --- /dev/null +++ b/scripts/gpu-dev-deploy.sh @@ -0,0 +1,34 @@ +#!/bin/bash +# Quick deploy to GPU node with hot reload +# Usage: ./scripts/gpu-dev-deploy.sh + +set -e + +GPU_HOST="${GPU_HOST:-root@138.197.163.123}" +REMOTE_DIR="/root/art-dag/celery" + +echo "=== GPU Dev Deploy ===" +echo "Syncing code to $GPU_HOST..." + +# Sync code (excluding cache, git, __pycache__) +rsync -avz --delete \ + --exclude '.git' \ + --exclude '__pycache__' \ + --exclude '*.pyc' \ + --exclude '.pytest_cache' \ + --exclude 'node_modules' \ + --exclude '.env' \ + ./ "$GPU_HOST:$REMOTE_DIR/" + +echo "Restarting GPU worker..." +ssh "$GPU_HOST" "docker kill \$(docker ps -q -f name=l1-gpu-worker) 2>/dev/null || true" + +echo "Waiting for new container..." +sleep 10 + +# Show new container logs +ssh "$GPU_HOST" "docker logs --tail 30 \$(docker ps -q -f name=l1-gpu-worker)" + +echo "" +echo "=== Deploy Complete ===" +echo "Use 'ssh $GPU_HOST docker logs -f \$(docker ps -q -f name=l1-gpu-worker)' to follow logs" diff --git a/sexp_effects/primitive_libs/blending_gpu.py b/sexp_effects/primitive_libs/blending_gpu.py index f0ffa70..c768be3 100644 --- a/sexp_effects/primitive_libs/blending_gpu.py +++ b/sexp_effects/primitive_libs/blending_gpu.py @@ -18,7 +18,7 @@ except ImportError: print("[blending_gpu] CuPy not available, using CPU fallback") # GPU persistence mode - keep frames on GPU between operations -GPU_PERSIST = os.environ.get("STREAMING_GPU_PERSIST", "1") == "1" +GPU_PERSIST = os.environ.get("STREAMING_GPU_PERSIST", "0") == "1" if GPU_AVAILABLE and GPU_PERSIST: print("[blending_gpu] GPU persistence enabled - frames stay on GPU") diff --git a/sexp_effects/primitive_libs/color_ops_gpu.py b/sexp_effects/primitive_libs/color_ops_gpu.py index b733a5c..a4f5272 100644 --- a/sexp_effects/primitive_libs/color_ops_gpu.py +++ b/sexp_effects/primitive_libs/color_ops_gpu.py @@ -21,7 +21,7 @@ except ImportError: print("[color_ops_gpu] CuPy not available, using CPU fallback") # GPU persistence mode - keep frames on GPU between operations -GPU_PERSIST = os.environ.get("STREAMING_GPU_PERSIST", "1") == "1" +GPU_PERSIST = os.environ.get("STREAMING_GPU_PERSIST", "0") == "1" if GPU_AVAILABLE and GPU_PERSIST: print("[color_ops_gpu] GPU persistence enabled - frames stay on GPU") diff --git a/sexp_effects/primitive_libs/geometry_gpu.py b/sexp_effects/primitive_libs/geometry_gpu.py index d7a14ef..3752bfa 100644 --- a/sexp_effects/primitive_libs/geometry_gpu.py +++ b/sexp_effects/primitive_libs/geometry_gpu.py @@ -25,7 +25,7 @@ except ImportError: # GPU persistence mode - keep frames on GPU between operations # Set STREAMING_GPU_PERSIST=1 for maximum performance -GPU_PERSIST = os.environ.get("STREAMING_GPU_PERSIST", "1") == "1" +GPU_PERSIST = os.environ.get("STREAMING_GPU_PERSIST", "0") == "1" if GPU_AVAILABLE and GPU_PERSIST: print("[geometry_gpu] GPU persistence enabled - frames stay on GPU") diff --git a/tests/test_frame_compatibility.py b/tests/test_frame_compatibility.py new file mode 100644 index 0000000..f12cce0 --- /dev/null +++ b/tests/test_frame_compatibility.py @@ -0,0 +1,185 @@ +""" +Integration tests for GPU/CPU frame compatibility. + +These tests verify that all primitives work correctly with both: +- numpy arrays (CPU frames) +- CuPy arrays (GPU frames) +- GPUFrame wrapper objects + +Run with: pytest tests/test_frame_compatibility.py -v +""" + +import pytest +import numpy as np +import sys +import os + +# Add parent to path +sys.path.insert(0, os.path.dirname(os.path.dirname(os.path.abspath(__file__)))) + +# Try to import CuPy +try: + import cupy as cp + HAS_GPU = True +except ImportError: + cp = None + HAS_GPU = False + + +def create_test_frame(on_gpu=False): + """Create a test frame (100x100 RGB).""" + frame = np.random.randint(0, 255, (100, 100, 3), dtype=np.uint8) + if on_gpu and HAS_GPU: + return cp.asarray(frame) + return frame + + +class MockGPUFrame: + """Mock GPUFrame for testing without full GPU stack.""" + def __init__(self, data): + self._data = data + + @property + def cpu(self): + if HAS_GPU and hasattr(self._data, 'get'): + return self._data.get() + return self._data + + @property + def gpu(self): + if HAS_GPU: + if hasattr(self._data, 'get'): + return self._data + return cp.asarray(self._data) + raise RuntimeError("No GPU available") + + +class TestColorOps: + """Test color_ops primitives with different frame types.""" + + def test_shift_hsv_numpy(self): + """shift-hsv should work with numpy arrays.""" + from sexp_effects.primitive_libs.color_ops import prim_shift_hsv + frame = create_test_frame(on_gpu=False) + result = prim_shift_hsv(frame, h=30, s=1.2, v=0.9) + assert isinstance(result, np.ndarray) + assert result.shape == frame.shape + + @pytest.mark.skipif(not HAS_GPU, reason="No GPU") + def test_shift_hsv_cupy(self): + """shift-hsv should work with CuPy arrays.""" + from sexp_effects.primitive_libs.color_ops import prim_shift_hsv + frame = create_test_frame(on_gpu=True) + result = prim_shift_hsv(frame, h=30, s=1.2, v=0.9) + assert isinstance(result, np.ndarray) # Should return numpy + + def test_shift_hsv_gpuframe(self): + """shift-hsv should work with GPUFrame objects.""" + from sexp_effects.primitive_libs.color_ops import prim_shift_hsv + frame = MockGPUFrame(create_test_frame(on_gpu=False)) + result = prim_shift_hsv(frame, h=30, s=1.2, v=0.9) + assert isinstance(result, np.ndarray) + + def test_invert_numpy(self): + """invert-img should work with numpy arrays.""" + from sexp_effects.primitive_libs.color_ops import prim_invert_img + frame = create_test_frame(on_gpu=False) + result = prim_invert_img(frame) + assert isinstance(result, np.ndarray) + + def test_adjust_numpy(self): + """adjust should work with numpy arrays.""" + from sexp_effects.primitive_libs.color_ops import prim_adjust + frame = create_test_frame(on_gpu=False) + result = prim_adjust(frame, brightness=10, contrast=1.2) + assert isinstance(result, np.ndarray) + + +class TestGeometry: + """Test geometry primitives with different frame types.""" + + def test_rotate_numpy(self): + """rotate should work with numpy arrays.""" + from sexp_effects.primitive_libs.geometry import prim_rotate + frame = create_test_frame(on_gpu=False) + result = prim_rotate(frame, 45) + assert isinstance(result, np.ndarray) + + def test_scale_numpy(self): + """scale should work with numpy arrays.""" + from sexp_effects.primitive_libs.geometry import prim_scale + frame = create_test_frame(on_gpu=False) + result = prim_scale(frame, 1.5) + assert isinstance(result, np.ndarray) + + +class TestBlending: + """Test blending primitives with different frame types.""" + + def test_blend_numpy(self): + """blend should work with numpy arrays.""" + from sexp_effects.primitive_libs.blending import prim_blend + frame_a = create_test_frame(on_gpu=False) + frame_b = create_test_frame(on_gpu=False) + result = prim_blend(frame_a, frame_b, 0.5) + assert isinstance(result, np.ndarray) + + +class TestInterpreterConversion: + """Test the interpreter's frame conversion.""" + + def test_maybe_to_numpy_none(self): + """_maybe_to_numpy should handle None.""" + from streaming.stream_sexp_generic import StreamInterpreter + # Create minimal interpreter + import tempfile + with tempfile.NamedTemporaryFile(mode='w', suffix='.sexp', delete=False) as f: + f.write('(stream "test" :fps 30 :width 100 :height 100 (frame frame))') + f.flush() + interp = StreamInterpreter(f.name) + + assert interp._maybe_to_numpy(None) is None + + def test_maybe_to_numpy_numpy(self): + """_maybe_to_numpy should pass through numpy arrays.""" + from streaming.stream_sexp_generic import StreamInterpreter + import tempfile + with tempfile.NamedTemporaryFile(mode='w', suffix='.sexp', delete=False) as f: + f.write('(stream "test" :fps 30 :width 100 :height 100 (frame frame))') + f.flush() + interp = StreamInterpreter(f.name) + + frame = create_test_frame(on_gpu=False) + result = interp._maybe_to_numpy(frame) + assert result is frame # Should be same object + + @pytest.mark.skipif(not HAS_GPU, reason="No GPU") + def test_maybe_to_numpy_cupy(self): + """_maybe_to_numpy should convert CuPy to numpy.""" + from streaming.stream_sexp_generic import StreamInterpreter + import tempfile + with tempfile.NamedTemporaryFile(mode='w', suffix='.sexp', delete=False) as f: + f.write('(stream "test" :fps 30 :width 100 :height 100 (frame frame))') + f.flush() + interp = StreamInterpreter(f.name) + + frame = create_test_frame(on_gpu=True) + result = interp._maybe_to_numpy(frame) + assert isinstance(result, np.ndarray) + + def test_maybe_to_numpy_gpuframe(self): + """_maybe_to_numpy should convert GPUFrame to numpy.""" + from streaming.stream_sexp_generic import StreamInterpreter + import tempfile + with tempfile.NamedTemporaryFile(mode='w', suffix='.sexp', delete=False) as f: + f.write('(stream "test" :fps 30 :width 100 :height 100 (frame frame))') + f.flush() + interp = StreamInterpreter(f.name) + + frame = MockGPUFrame(create_test_frame(on_gpu=False)) + result = interp._maybe_to_numpy(frame) + assert isinstance(result, np.ndarray) + + +if __name__ == '__main__': + pytest.main([__file__, '-v'])