Files
giles 1a74d811f7
All checks were successful
Build and Deploy / build-and-deploy (push) Successful in 2m33s
Incorporate art-dag-mono repo into artdag/ subfolder
Merges full history from art-dag/mono.git into the monorepo
under the artdag/ directory. Contains: core (DAG engine),
l1 (Celery rendering server), l2 (ActivityPub registry),
common (shared templates/middleware), client (CLI), test (e2e).

Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>

git-subtree-dir: artdag
git-subtree-mainline: 1a179de547
git-subtree-split: 4c2e716558
2026-02-27 09:07:23 +00:00

487 lines
14 KiB
Python

"""
Live audio analysis for reactive effects.
Provides real-time audio features:
- Energy (RMS amplitude)
- Beat detection
- Frequency bands (bass, mid, high)
"""
import numpy as np
from typing import Optional
import threading
import time
class AudioAnalyzer:
"""
Real-time audio analyzer using sounddevice.
Captures audio from microphone/line-in and computes
features in real-time for effect parameter bindings.
Example:
analyzer = AudioAnalyzer(device=0)
analyzer.start()
# In compositor loop:
energy = analyzer.get_energy()
beat = analyzer.get_beat()
analyzer.stop()
"""
def __init__(
self,
device: int = None,
sample_rate: int = 44100,
block_size: int = 1024,
buffer_seconds: float = 0.5,
):
"""
Initialize audio analyzer.
Args:
device: Audio input device index (None = default)
sample_rate: Audio sample rate
block_size: Samples per block
buffer_seconds: Ring buffer duration
"""
self.sample_rate = sample_rate
self.block_size = block_size
self.device = device
# Ring buffer for recent audio
buffer_size = int(sample_rate * buffer_seconds)
self._buffer = np.zeros(buffer_size, dtype=np.float32)
self._buffer_pos = 0
self._lock = threading.Lock()
# Beat detection state
self._last_energy = 0
self._energy_history = []
self._last_beat_time = 0
self._beat_threshold = 1.5 # Energy ratio for beat detection
self._min_beat_interval = 0.1 # Min seconds between beats
# Stream state
self._stream = None
self._running = False
def _audio_callback(self, indata, frames, time_info, status):
"""Called by sounddevice for each audio block."""
with self._lock:
# Add to ring buffer
data = indata[:, 0] if len(indata.shape) > 1 else indata
n = len(data)
if self._buffer_pos + n <= len(self._buffer):
self._buffer[self._buffer_pos:self._buffer_pos + n] = data
else:
# Wrap around
first = len(self._buffer) - self._buffer_pos
self._buffer[self._buffer_pos:] = data[:first]
self._buffer[:n - first] = data[first:]
self._buffer_pos = (self._buffer_pos + n) % len(self._buffer)
def start(self):
"""Start audio capture."""
try:
import sounddevice as sd
except ImportError:
print("Warning: sounddevice not installed. Audio analysis disabled.")
print("Install with: pip install sounddevice")
return
self._stream = sd.InputStream(
device=self.device,
channels=1,
samplerate=self.sample_rate,
blocksize=self.block_size,
callback=self._audio_callback,
)
self._stream.start()
self._running = True
def stop(self):
"""Stop audio capture."""
if self._stream:
self._stream.stop()
self._stream.close()
self._stream = None
self._running = False
def get_energy(self) -> float:
"""
Get current audio energy (RMS amplitude).
Returns:
Energy value normalized to 0-1 range (approximately)
"""
with self._lock:
# Use recent samples
recent = 2048
if self._buffer_pos >= recent:
data = self._buffer[self._buffer_pos - recent:self._buffer_pos]
else:
data = np.concatenate([
self._buffer[-(recent - self._buffer_pos):],
self._buffer[:self._buffer_pos]
])
# RMS energy
rms = np.sqrt(np.mean(data ** 2))
# Normalize (typical mic input is quite low)
normalized = min(1.0, rms * 10)
return normalized
def get_beat(self) -> bool:
"""
Detect if current moment is a beat.
Simple onset detection based on energy spikes.
Returns:
True if beat detected, False otherwise
"""
current_energy = self.get_energy()
now = time.time()
# Update energy history
self._energy_history.append(current_energy)
if len(self._energy_history) > 20:
self._energy_history.pop(0)
# Need enough history
if len(self._energy_history) < 5:
self._last_energy = current_energy
return False
# Average recent energy
avg_energy = np.mean(self._energy_history[:-1])
# Beat if current energy is significantly above average
is_beat = (
current_energy > avg_energy * self._beat_threshold and
now - self._last_beat_time > self._min_beat_interval and
current_energy > self._last_energy # Rising edge
)
if is_beat:
self._last_beat_time = now
self._last_energy = current_energy
return is_beat
def get_spectrum(self, bands: int = 3) -> np.ndarray:
"""
Get frequency spectrum divided into bands.
Args:
bands: Number of frequency bands (default 3: bass, mid, high)
Returns:
Array of band energies, normalized to 0-1
"""
with self._lock:
# Use recent samples for FFT
n = 2048
if self._buffer_pos >= n:
data = self._buffer[self._buffer_pos - n:self._buffer_pos]
else:
data = np.concatenate([
self._buffer[-(n - self._buffer_pos):],
self._buffer[:self._buffer_pos]
])
# FFT
fft = np.abs(np.fft.rfft(data * np.hanning(len(data))))
# Divide into bands
band_size = len(fft) // bands
result = np.zeros(bands)
for i in range(bands):
start = i * band_size
end = start + band_size
result[i] = np.mean(fft[start:end])
# Normalize
max_val = np.max(result)
if max_val > 0:
result = result / max_val
return result
@property
def is_running(self) -> bool:
return self._running
def __enter__(self):
self.start()
return self
def __exit__(self, *args):
self.stop()
class FileAudioAnalyzer:
"""
Audio analyzer that reads from a file (for testing/development).
Pre-computes analysis and plays back in sync with video.
"""
def __init__(self, path: str, analysis_data: dict = None):
"""
Initialize from audio file.
Args:
path: Path to audio file
analysis_data: Pre-computed analysis (times, values, etc.)
"""
self.path = path
self.analysis_data = analysis_data or {}
self._current_time = 0
def set_time(self, t: float):
"""Set current playback time."""
self._current_time = t
def get_energy(self) -> float:
"""Get energy at current time from pre-computed data."""
track = self.analysis_data.get("energy", {})
return self._interpolate(track, self._current_time)
def get_beat(self) -> bool:
"""Check if current time is near a beat."""
track = self.analysis_data.get("beats", {})
times = track.get("times", [])
# Check if we're within 50ms of a beat
for beat_time in times:
if abs(beat_time - self._current_time) < 0.05:
return True
return False
def _interpolate(self, track: dict, t: float) -> float:
"""Interpolate value at time t."""
times = track.get("times", [])
values = track.get("values", [])
if not times or not values:
return 0.0
if t <= times[0]:
return values[0]
if t >= times[-1]:
return values[-1]
# Find bracket and interpolate
for i in range(len(times) - 1):
if times[i] <= t <= times[i + 1]:
alpha = (t - times[i]) / (times[i + 1] - times[i])
return values[i] * (1 - alpha) + values[i + 1] * alpha
return values[-1]
@property
def is_running(self) -> bool:
return True
class StreamingAudioAnalyzer:
"""
Real-time audio analyzer that streams from a file.
Reads audio in sync with video time and computes features on-the-fly.
No pre-computation needed - analysis happens as frames are processed.
"""
def __init__(self, path: str, sample_rate: int = 22050, hop_length: int = 512):
"""
Initialize streaming audio analyzer.
Args:
path: Path to audio file
sample_rate: Sample rate for analysis
hop_length: Hop length for feature extraction
"""
import subprocess
import json
self.path = path
self.sample_rate = sample_rate
self.hop_length = hop_length
self._current_time = 0.0
# Get audio duration
cmd = ["ffprobe", "-v", "quiet", "-print_format", "json",
"-show_format", str(path)]
result = subprocess.run(cmd, capture_output=True, text=True)
info = json.loads(result.stdout)
self.duration = float(info["format"]["duration"])
# Audio buffer and state
self._audio_data = None
self._energy_history = []
self._last_energy = 0
self._last_beat_time = -1
self._beat_threshold = 1.5
self._min_beat_interval = 0.15
# Load audio lazily
self._loaded = False
def _load_audio(self):
"""Load audio data on first use."""
if self._loaded:
return
import subprocess
# Use ffmpeg to decode audio to raw PCM
cmd = [
"ffmpeg", "-v", "quiet",
"-i", str(self.path),
"-f", "f32le", # 32-bit float, little-endian
"-ac", "1", # mono
"-ar", str(self.sample_rate),
"-"
]
result = subprocess.run(cmd, capture_output=True)
self._audio_data = np.frombuffer(result.stdout, dtype=np.float32)
self._loaded = True
def set_time(self, t: float):
"""Set current playback time."""
self._current_time = t
def get_energy(self) -> float:
"""Compute energy at current time."""
self._load_audio()
if self._audio_data is None or len(self._audio_data) == 0:
return 0.0
# Get sample index for current time
sample_idx = int(self._current_time * self.sample_rate)
window_size = self.hop_length * 2
start = max(0, sample_idx - window_size // 2)
end = min(len(self._audio_data), sample_idx + window_size // 2)
if start >= end:
return 0.0
# RMS energy
chunk = self._audio_data[start:end]
rms = np.sqrt(np.mean(chunk ** 2))
# Normalize to 0-1 range (approximate)
energy = min(1.0, rms * 3.0)
self._last_energy = energy
return energy
def get_beat(self) -> bool:
"""Detect beat using spectral flux (change in frequency content)."""
self._load_audio()
if self._audio_data is None or len(self._audio_data) == 0:
return False
# Get audio chunks for current and previous frame
sample_idx = int(self._current_time * self.sample_rate)
chunk_size = self.hop_length * 2
# Current chunk
start = max(0, sample_idx - chunk_size // 2)
end = min(len(self._audio_data), sample_idx + chunk_size // 2)
if end - start < chunk_size // 2:
return False
current_chunk = self._audio_data[start:end]
# Previous chunk (one hop back)
prev_start = max(0, start - self.hop_length)
prev_end = max(0, end - self.hop_length)
if prev_end <= prev_start:
return False
prev_chunk = self._audio_data[prev_start:prev_end]
# Compute spectra
current_spec = np.abs(np.fft.rfft(current_chunk * np.hanning(len(current_chunk))))
prev_spec = np.abs(np.fft.rfft(prev_chunk * np.hanning(len(prev_chunk))))
# Spectral flux: sum of positive differences (onset = new frequencies appearing)
min_len = min(len(current_spec), len(prev_spec))
diff = current_spec[:min_len] - prev_spec[:min_len]
flux = np.sum(np.maximum(0, diff)) # Only count increases
# Normalize by spectrum size
flux = flux / (min_len + 1)
# Update flux history
self._energy_history.append((self._current_time, flux))
while self._energy_history and self._energy_history[0][0] < self._current_time - 1.5:
self._energy_history.pop(0)
if len(self._energy_history) < 3:
return False
# Adaptive threshold based on recent flux values
flux_values = [f for t, f in self._energy_history]
mean_flux = np.mean(flux_values)
std_flux = np.std(flux_values) + 0.001 # Avoid division by zero
# Beat if flux is above mean (more sensitive threshold)
threshold = mean_flux + std_flux * 0.3 # Lower = more sensitive
min_interval = 0.1 # Allow up to 600 BPM
time_ok = self._current_time - self._last_beat_time > min_interval
is_beat = flux > threshold and time_ok
if is_beat:
self._last_beat_time = self._current_time
return is_beat
def get_spectrum(self, bands: int = 3) -> np.ndarray:
"""Get frequency spectrum at current time."""
self._load_audio()
if self._audio_data is None or len(self._audio_data) == 0:
return np.zeros(bands)
sample_idx = int(self._current_time * self.sample_rate)
n = 2048
start = max(0, sample_idx - n // 2)
end = min(len(self._audio_data), sample_idx + n // 2)
if end - start < n // 2:
return np.zeros(bands)
chunk = self._audio_data[start:end]
# FFT
fft = np.abs(np.fft.rfft(chunk * np.hanning(len(chunk))))
# Divide into bands
band_size = len(fft) // bands
result = np.zeros(bands)
for i in range(bands):
s, e = i * band_size, (i + 1) * band_size
result[i] = np.mean(fft[s:e])
# Normalize
max_val = np.max(result)
if max_val > 0:
result = result / max_val
return result
@property
def is_running(self) -> bool:
return True