""" Live audio analysis for reactive effects. Provides real-time audio features: - Energy (RMS amplitude) - Beat detection - Frequency bands (bass, mid, high) """ import numpy as np from typing import Optional import threading import time class AudioAnalyzer: """ Real-time audio analyzer using sounddevice. Captures audio from microphone/line-in and computes features in real-time for effect parameter bindings. Example: analyzer = AudioAnalyzer(device=0) analyzer.start() # In compositor loop: energy = analyzer.get_energy() beat = analyzer.get_beat() analyzer.stop() """ def __init__( self, device: int = None, sample_rate: int = 44100, block_size: int = 1024, buffer_seconds: float = 0.5, ): """ Initialize audio analyzer. Args: device: Audio input device index (None = default) sample_rate: Audio sample rate block_size: Samples per block buffer_seconds: Ring buffer duration """ self.sample_rate = sample_rate self.block_size = block_size self.device = device # Ring buffer for recent audio buffer_size = int(sample_rate * buffer_seconds) self._buffer = np.zeros(buffer_size, dtype=np.float32) self._buffer_pos = 0 self._lock = threading.Lock() # Beat detection state self._last_energy = 0 self._energy_history = [] self._last_beat_time = 0 self._beat_threshold = 1.5 # Energy ratio for beat detection self._min_beat_interval = 0.1 # Min seconds between beats # Stream state self._stream = None self._running = False def _audio_callback(self, indata, frames, time_info, status): """Called by sounddevice for each audio block.""" with self._lock: # Add to ring buffer data = indata[:, 0] if len(indata.shape) > 1 else indata n = len(data) if self._buffer_pos + n <= len(self._buffer): self._buffer[self._buffer_pos:self._buffer_pos + n] = data else: # Wrap around first = len(self._buffer) - self._buffer_pos self._buffer[self._buffer_pos:] = data[:first] self._buffer[:n - first] = data[first:] self._buffer_pos = (self._buffer_pos + n) % len(self._buffer) def start(self): """Start audio capture.""" try: import sounddevice as sd except ImportError: print("Warning: sounddevice not installed. Audio analysis disabled.") print("Install with: pip install sounddevice") return self._stream = sd.InputStream( device=self.device, channels=1, samplerate=self.sample_rate, blocksize=self.block_size, callback=self._audio_callback, ) self._stream.start() self._running = True def stop(self): """Stop audio capture.""" if self._stream: self._stream.stop() self._stream.close() self._stream = None self._running = False def get_energy(self) -> float: """ Get current audio energy (RMS amplitude). Returns: Energy value normalized to 0-1 range (approximately) """ with self._lock: # Use recent samples recent = 2048 if self._buffer_pos >= recent: data = self._buffer[self._buffer_pos - recent:self._buffer_pos] else: data = np.concatenate([ self._buffer[-(recent - self._buffer_pos):], self._buffer[:self._buffer_pos] ]) # RMS energy rms = np.sqrt(np.mean(data ** 2)) # Normalize (typical mic input is quite low) normalized = min(1.0, rms * 10) return normalized def get_beat(self) -> bool: """ Detect if current moment is a beat. Simple onset detection based on energy spikes. Returns: True if beat detected, False otherwise """ current_energy = self.get_energy() now = time.time() # Update energy history self._energy_history.append(current_energy) if len(self._energy_history) > 20: self._energy_history.pop(0) # Need enough history if len(self._energy_history) < 5: self._last_energy = current_energy return False # Average recent energy avg_energy = np.mean(self._energy_history[:-1]) # Beat if current energy is significantly above average is_beat = ( current_energy > avg_energy * self._beat_threshold and now - self._last_beat_time > self._min_beat_interval and current_energy > self._last_energy # Rising edge ) if is_beat: self._last_beat_time = now self._last_energy = current_energy return is_beat def get_spectrum(self, bands: int = 3) -> np.ndarray: """ Get frequency spectrum divided into bands. Args: bands: Number of frequency bands (default 3: bass, mid, high) Returns: Array of band energies, normalized to 0-1 """ with self._lock: # Use recent samples for FFT n = 2048 if self._buffer_pos >= n: data = self._buffer[self._buffer_pos - n:self._buffer_pos] else: data = np.concatenate([ self._buffer[-(n - self._buffer_pos):], self._buffer[:self._buffer_pos] ]) # FFT fft = np.abs(np.fft.rfft(data * np.hanning(len(data)))) # Divide into bands band_size = len(fft) // bands result = np.zeros(bands) for i in range(bands): start = i * band_size end = start + band_size result[i] = np.mean(fft[start:end]) # Normalize max_val = np.max(result) if max_val > 0: result = result / max_val return result @property def is_running(self) -> bool: return self._running def __enter__(self): self.start() return self def __exit__(self, *args): self.stop() class FileAudioAnalyzer: """ Audio analyzer that reads from a file (for testing/development). Pre-computes analysis and plays back in sync with video. """ def __init__(self, path: str, analysis_data: dict = None): """ Initialize from audio file. Args: path: Path to audio file analysis_data: Pre-computed analysis (times, values, etc.) """ self.path = path self.analysis_data = analysis_data or {} self._current_time = 0 def set_time(self, t: float): """Set current playback time.""" self._current_time = t def get_energy(self) -> float: """Get energy at current time from pre-computed data.""" track = self.analysis_data.get("energy", {}) return self._interpolate(track, self._current_time) def get_beat(self) -> bool: """Check if current time is near a beat.""" track = self.analysis_data.get("beats", {}) times = track.get("times", []) # Check if we're within 50ms of a beat for beat_time in times: if abs(beat_time - self._current_time) < 0.05: return True return False def _interpolate(self, track: dict, t: float) -> float: """Interpolate value at time t.""" times = track.get("times", []) values = track.get("values", []) if not times or not values: return 0.0 if t <= times[0]: return values[0] if t >= times[-1]: return values[-1] # Find bracket and interpolate for i in range(len(times) - 1): if times[i] <= t <= times[i + 1]: alpha = (t - times[i]) / (times[i + 1] - times[i]) return values[i] * (1 - alpha) + values[i + 1] * alpha return values[-1] @property def is_running(self) -> bool: return True class StreamingAudioAnalyzer: """ Real-time audio analyzer that streams from a file. Reads audio in sync with video time and computes features on-the-fly. No pre-computation needed - analysis happens as frames are processed. """ def __init__(self, path: str, sample_rate: int = 22050, hop_length: int = 512): """ Initialize streaming audio analyzer. Args: path: Path to audio file sample_rate: Sample rate for analysis hop_length: Hop length for feature extraction """ import subprocess import json self.path = path self.sample_rate = sample_rate self.hop_length = hop_length self._current_time = 0.0 # Get audio duration cmd = ["ffprobe", "-v", "quiet", "-print_format", "json", "-show_format", str(path)] result = subprocess.run(cmd, capture_output=True, text=True) info = json.loads(result.stdout) self.duration = float(info["format"]["duration"]) # Audio buffer and state self._audio_data = None self._energy_history = [] self._last_energy = 0 self._last_beat_time = -1 self._beat_threshold = 1.5 self._min_beat_interval = 0.15 # Load audio lazily self._loaded = False def _load_audio(self): """Load audio data on first use.""" if self._loaded: return import subprocess # Use ffmpeg to decode audio to raw PCM cmd = [ "ffmpeg", "-v", "quiet", "-i", str(self.path), "-f", "f32le", # 32-bit float, little-endian "-ac", "1", # mono "-ar", str(self.sample_rate), "-" ] result = subprocess.run(cmd, capture_output=True) self._audio_data = np.frombuffer(result.stdout, dtype=np.float32) self._loaded = True def set_time(self, t: float): """Set current playback time.""" self._current_time = t def get_energy(self) -> float: """Compute energy at current time.""" self._load_audio() if self._audio_data is None or len(self._audio_data) == 0: return 0.0 # Get sample index for current time sample_idx = int(self._current_time * self.sample_rate) window_size = self.hop_length * 2 start = max(0, sample_idx - window_size // 2) end = min(len(self._audio_data), sample_idx + window_size // 2) if start >= end: return 0.0 # RMS energy chunk = self._audio_data[start:end] rms = np.sqrt(np.mean(chunk ** 2)) # Normalize to 0-1 range (approximate) energy = min(1.0, rms * 3.0) self._last_energy = energy return energy def get_beat(self) -> bool: """Detect beat using spectral flux (change in frequency content).""" self._load_audio() if self._audio_data is None or len(self._audio_data) == 0: return False # Get audio chunks for current and previous frame sample_idx = int(self._current_time * self.sample_rate) chunk_size = self.hop_length * 2 # Current chunk start = max(0, sample_idx - chunk_size // 2) end = min(len(self._audio_data), sample_idx + chunk_size // 2) if end - start < chunk_size // 2: return False current_chunk = self._audio_data[start:end] # Previous chunk (one hop back) prev_start = max(0, start - self.hop_length) prev_end = max(0, end - self.hop_length) if prev_end <= prev_start: return False prev_chunk = self._audio_data[prev_start:prev_end] # Compute spectra current_spec = np.abs(np.fft.rfft(current_chunk * np.hanning(len(current_chunk)))) prev_spec = np.abs(np.fft.rfft(prev_chunk * np.hanning(len(prev_chunk)))) # Spectral flux: sum of positive differences (onset = new frequencies appearing) min_len = min(len(current_spec), len(prev_spec)) diff = current_spec[:min_len] - prev_spec[:min_len] flux = np.sum(np.maximum(0, diff)) # Only count increases # Normalize by spectrum size flux = flux / (min_len + 1) # Update flux history self._energy_history.append((self._current_time, flux)) while self._energy_history and self._energy_history[0][0] < self._current_time - 1.5: self._energy_history.pop(0) if len(self._energy_history) < 3: return False # Adaptive threshold based on recent flux values flux_values = [f for t, f in self._energy_history] mean_flux = np.mean(flux_values) std_flux = np.std(flux_values) + 0.001 # Avoid division by zero # Beat if flux is above mean (more sensitive threshold) threshold = mean_flux + std_flux * 0.3 # Lower = more sensitive min_interval = 0.1 # Allow up to 600 BPM time_ok = self._current_time - self._last_beat_time > min_interval is_beat = flux > threshold and time_ok if is_beat: self._last_beat_time = self._current_time return is_beat def get_spectrum(self, bands: int = 3) -> np.ndarray: """Get frequency spectrum at current time.""" self._load_audio() if self._audio_data is None or len(self._audio_data) == 0: return np.zeros(bands) sample_idx = int(self._current_time * self.sample_rate) n = 2048 start = max(0, sample_idx - n // 2) end = min(len(self._audio_data), sample_idx + n // 2) if end - start < n // 2: return np.zeros(bands) chunk = self._audio_data[start:end] # FFT fft = np.abs(np.fft.rfft(chunk * np.hanning(len(chunk)))) # Divide into bands band_size = len(fft) // bands result = np.zeros(bands) for i in range(bands): s, e = i * band_size, (i + 1) * band_size result[i] = np.mean(fft[s:e]) # Normalize max_val = np.max(result) if max_val > 0: result = result / max_val return result @property def is_running(self) -> bool: return True