Files
rose-ash/artdag/test/test_effects_pipeline.py
giles 1a74d811f7
All checks were successful
Build and Deploy / build-and-deploy (push) Successful in 2m33s
Incorporate art-dag-mono repo into artdag/ subfolder
Merges full history from art-dag/mono.git into the monorepo
under the artdag/ directory. Contains: core (DAG engine),
l1 (Celery rendering server), l2 (ActivityPub registry),
common (shared templates/middleware), client (CLI), test (e2e).

Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>

git-subtree-dir: artdag
git-subtree-mainline: 1a179de547
git-subtree-split: 4c2e716558
2026-02-27 09:07:23 +00:00

259 lines
8.1 KiB
Python

#!/usr/bin/env python3
"""
Test the full effects pipeline: segment -> effect -> output
This tests that effects can be applied to video segments without
producing "No video stream found" errors.
"""
import subprocess
import tempfile
import sys
from pathlib import Path
# Add parent to path
sys.path.insert(0, str(Path(__file__).parent))
import numpy as np
from sexp_effects import (
get_interpreter,
load_effects_dir,
run_effect,
list_effects,
)
def create_test_video(path: Path, duration: float = 1.0, size: str = "64x64") -> bool:
"""Create a short test video using ffmpeg."""
cmd = [
"ffmpeg", "-y",
"-f", "lavfi", "-i", f"testsrc=duration={duration}:size={size}:rate=10",
"-c:v", "libx264", "-preset", "ultrafast",
str(path)
]
result = subprocess.run(cmd, capture_output=True)
if result.returncode != 0:
print(f"Failed to create test video: {result.stderr.decode()}")
return False
return True
def segment_video(input_path: Path, output_path: Path, start: float, duration: float) -> bool:
"""Segment a video file."""
cmd = [
"ffmpeg", "-y", "-i", str(input_path),
"-ss", str(start), "-t", str(duration),
"-c:v", "libx264", "-preset", "ultrafast",
"-c:a", "aac",
str(output_path)
]
result = subprocess.run(cmd, capture_output=True)
if result.returncode != 0:
print(f"Failed to segment video: {result.stderr.decode()}")
return False
# Verify output has video stream
probe_cmd = [
"ffprobe", "-v", "quiet", "-print_format", "json",
"-show_streams", str(output_path)
]
probe_result = subprocess.run(probe_cmd, capture_output=True, text=True)
import json
probe_data = json.loads(probe_result.stdout)
has_video = any(
s.get("codec_type") == "video"
for s in probe_data.get("streams", [])
)
if not has_video:
print(f"Segment has no video stream!")
return False
return True
def run_effect_on_video(effect_name: str, input_path: Path, output_path: Path) -> bool:
"""Run a sexp effect on a video file using frame processing."""
import json
# Get video info
probe_cmd = [
"ffprobe", "-v", "quiet", "-print_format", "json",
"-show_streams", str(input_path)
]
probe_result = subprocess.run(probe_cmd, capture_output=True, text=True)
probe_data = json.loads(probe_result.stdout)
video_stream = None
for stream in probe_data.get("streams", []):
if stream.get("codec_type") == "video":
video_stream = stream
break
if not video_stream:
print(f" Input has no video stream: {input_path}")
return False
width = int(video_stream["width"])
height = int(video_stream["height"])
fps_str = video_stream.get("r_frame_rate", "10/1")
if "/" in fps_str:
num, den = fps_str.split("/")
fps = float(num) / float(den)
else:
fps = float(fps_str)
# Read frames, process, write
read_cmd = ["ffmpeg", "-i", str(input_path), "-f", "rawvideo", "-pix_fmt", "rgb24", "-"]
write_cmd = [
"ffmpeg", "-y",
"-f", "rawvideo", "-pix_fmt", "rgb24",
"-s", f"{width}x{height}", "-r", str(fps),
"-i", "-",
"-c:v", "libx264", "-preset", "ultrafast",
str(output_path)
]
read_proc = subprocess.Popen(read_cmd, stdout=subprocess.PIPE, stderr=subprocess.DEVNULL)
write_proc = subprocess.Popen(write_cmd, stdin=subprocess.PIPE, stderr=subprocess.PIPE)
frame_size = width * height * 3
frame_count = 0
state = {}
while True:
frame_data = read_proc.stdout.read(frame_size)
if len(frame_data) < frame_size:
break
frame = np.frombuffer(frame_data, dtype=np.uint8).reshape((height, width, 3))
processed, state = run_effect(effect_name, frame, {'_time': frame_count / fps}, state)
write_proc.stdin.write(processed.tobytes())
frame_count += 1
read_proc.stdout.close()
write_proc.stdin.close()
read_proc.wait()
write_proc.wait()
if write_proc.returncode != 0:
print(f" FFmpeg encode failed: {write_proc.stderr.read().decode()}")
return False
return frame_count > 0
def test_effect_pipeline(effect_name: str, tmpdir: Path) -> tuple:
"""
Test full pipeline: create video -> segment -> apply effect
Returns (success, error_message)
"""
# Create test video
source_video = tmpdir / "source.mp4"
if not create_test_video(source_video, duration=1.0, size="64x64"):
return False, "Failed to create source video"
# Segment it (simulate what the recipe does)
segment_video_path = tmpdir / "segment.mp4"
if not segment_video(source_video, segment_video_path, start=0.2, duration=0.5):
return False, "Failed to segment video"
# Check segment file exists and has content
if not segment_video_path.exists():
return False, "Segment file doesn't exist"
if segment_video_path.stat().st_size < 100:
return False, f"Segment file too small: {segment_video_path.stat().st_size} bytes"
# Apply effect
output_video = tmpdir / "output.mp4"
try:
if not run_effect_on_video(effect_name, segment_video_path, output_video):
return False, "Effect processing failed"
except Exception as e:
return False, str(e)
# Verify output
if not output_video.exists():
return False, "Output file doesn't exist"
if output_video.stat().st_size < 100:
return False, f"Output file too small: {output_video.stat().st_size} bytes"
return True, None
def main():
print("=" * 60)
print("Effects Pipeline Test")
print("=" * 60)
# Load effects
effects_dir = Path(__file__).parent / "sexp_effects" / "effects"
load_effects_dir(str(effects_dir))
effects = list_effects()
print(f"Testing {len(effects)} effects through segment->effect pipeline\n")
passed = []
failed = []
# Test multi-input effects separately
multi_input_effects = ("blend", "layer")
print("\nTesting multi-input effects...")
from sexp_effects.interpreter import get_interpreter
interp = get_interpreter()
frame_a = np.random.randint(0, 255, (64, 64, 3), dtype=np.uint8)
frame_b = np.random.randint(0, 255, (64, 64, 3), dtype=np.uint8)
for name in multi_input_effects:
try:
interp.global_env.set('frame-a', frame_a.copy())
interp.global_env.set('frame-b', frame_b.copy())
interp.global_env.set('frame', frame_a.copy())
result, state = interp.run_effect(name, frame_a.copy(), {'_time': 0.5}, {})
if isinstance(result, np.ndarray) and result.shape == frame_a.shape:
passed.append(name)
print(f" {name}: OK")
else:
failed.append((name, f"Bad output shape: {result.shape if hasattr(result, 'shape') else type(result)}"))
print(f" {name}: FAILED - bad shape")
except Exception as e:
failed.append((name, str(e)))
print(f" {name}: FAILED - {e}")
print("\nTesting single-input effects through pipeline...")
# Test each effect
for effect_name in sorted(effects):
# Skip multi-input effects (already tested above)
if effect_name in multi_input_effects:
continue
with tempfile.TemporaryDirectory() as tmpdir:
tmpdir = Path(tmpdir)
success, error = test_effect_pipeline(effect_name, tmpdir)
if success:
passed.append(effect_name)
print(f" {effect_name}: OK")
else:
failed.append((effect_name, error))
print(f" {effect_name}: FAILED - {error}")
print()
print("=" * 60)
print(f"Pipeline test: {len(passed)} passed, {len(failed)} failed")
if failed:
print("\nFailed effects:")
for name, error in failed:
print(f" {name}: {error}")
print("=" * 60)
return len(failed) == 0
if __name__ == "__main__":
success = main()
sys.exit(0 if success else 1)