Files
rose-ash/artdag/core/tests/test_engine.py
giles 1a74d811f7
All checks were successful
Build and Deploy / build-and-deploy (push) Successful in 2m33s
Incorporate art-dag-mono repo into artdag/ subfolder
Merges full history from art-dag/mono.git into the monorepo
under the artdag/ directory. Contains: core (DAG engine),
l1 (Celery rendering server), l2 (ActivityPub registry),
common (shared templates/middleware), client (CLI), test (e2e).

Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>

git-subtree-dir: artdag
git-subtree-mainline: 1a179de547
git-subtree-split: 4c2e716558
2026-02-27 09:07:23 +00:00

465 lines
14 KiB
Python

# tests/test_primitive_new/test_engine.py
"""Tests for primitive engine execution."""
import pytest
import subprocess
import tempfile
from pathlib import Path
from artdag.dag import DAG, DAGBuilder, Node, NodeType
from artdag.engine import Engine
from artdag import nodes # Register executors
@pytest.fixture
def cache_dir():
"""Create temporary cache directory."""
with tempfile.TemporaryDirectory() as tmpdir:
yield Path(tmpdir)
@pytest.fixture
def engine(cache_dir):
"""Create engine instance."""
return Engine(cache_dir)
@pytest.fixture
def test_video(cache_dir):
"""Create a test video file."""
video_path = cache_dir / "test_video.mp4"
cmd = [
"ffmpeg", "-y",
"-f", "lavfi", "-i", "testsrc=duration=5:size=320x240:rate=30",
"-f", "lavfi", "-i", "sine=frequency=440:duration=5",
"-c:v", "libx264", "-preset", "ultrafast",
"-c:a", "aac",
str(video_path)
]
subprocess.run(cmd, capture_output=True, check=True)
return video_path
@pytest.fixture
def test_audio(cache_dir):
"""Create a test audio file."""
audio_path = cache_dir / "test_audio.mp3"
cmd = [
"ffmpeg", "-y",
"-f", "lavfi", "-i", "sine=frequency=880:duration=5",
"-c:a", "libmp3lame",
str(audio_path)
]
subprocess.run(cmd, capture_output=True, check=True)
return audio_path
class TestEngineBasic:
"""Test basic engine functionality."""
def test_engine_creation(self, cache_dir):
"""Test engine creation."""
engine = Engine(cache_dir)
assert engine.cache is not None
def test_invalid_dag(self, engine):
"""Test executing invalid DAG."""
dag = DAG() # No nodes, no output
result = engine.execute(dag)
assert not result.success
assert "Invalid DAG" in result.error
def test_missing_executor(self, engine):
"""Test executing node with missing executor."""
dag = DAG()
node = Node(node_type="UNKNOWN_TYPE", config={})
node_id = dag.add_node(node)
dag.set_output(node_id)
result = engine.execute(dag)
assert not result.success
assert "No executor" in result.error
class TestSourceExecutor:
"""Test SOURCE node executor."""
def test_source_creates_symlink(self, engine, test_video):
"""Test source node creates symlink."""
builder = DAGBuilder()
source = builder.source(str(test_video))
builder.set_output(source)
dag = builder.build()
result = engine.execute(dag)
assert result.success
assert result.output_path.exists()
assert result.output_path.is_symlink()
def test_source_missing_file(self, engine):
"""Test source with missing file."""
builder = DAGBuilder()
source = builder.source("/nonexistent/file.mp4")
builder.set_output(source)
dag = builder.build()
result = engine.execute(dag)
assert not result.success
assert "not found" in result.error.lower()
class TestSegmentExecutor:
"""Test SEGMENT node executor."""
def test_segment_duration(self, engine, test_video):
"""Test segment extracts correct duration."""
builder = DAGBuilder()
source = builder.source(str(test_video))
segment = builder.segment(source, duration=2.0)
builder.set_output(segment)
dag = builder.build()
result = engine.execute(dag)
assert result.success
# Verify duration
probe = subprocess.run([
"ffprobe", "-v", "error",
"-show_entries", "format=duration",
"-of", "csv=p=0",
str(result.output_path)
], capture_output=True, text=True)
duration = float(probe.stdout.strip())
assert abs(duration - 2.0) < 0.1
def test_segment_with_offset(self, engine, test_video):
"""Test segment with offset."""
builder = DAGBuilder()
source = builder.source(str(test_video))
segment = builder.segment(source, offset=1.0, duration=2.0)
builder.set_output(segment)
dag = builder.build()
result = engine.execute(dag)
assert result.success
class TestResizeExecutor:
"""Test RESIZE node executor."""
def test_resize_dimensions(self, engine, test_video):
"""Test resize to specific dimensions."""
builder = DAGBuilder()
source = builder.source(str(test_video))
resized = builder.resize(source, width=640, height=480, mode="fit")
builder.set_output(resized)
dag = builder.build()
result = engine.execute(dag)
assert result.success
# Verify dimensions
probe = subprocess.run([
"ffprobe", "-v", "error",
"-show_entries", "stream=width,height",
"-of", "csv=p=0:s=x",
str(result.output_path)
], capture_output=True, text=True)
dimensions = probe.stdout.strip().split("\n")[0]
assert "640x480" in dimensions
class TestTransformExecutor:
"""Test TRANSFORM node executor."""
def test_transform_saturation(self, engine, test_video):
"""Test transform with saturation effect."""
builder = DAGBuilder()
source = builder.source(str(test_video))
transformed = builder.transform(source, effects={"saturation": 1.5})
builder.set_output(transformed)
dag = builder.build()
result = engine.execute(dag)
assert result.success
assert result.output_path.exists()
def test_transform_multiple_effects(self, engine, test_video):
"""Test transform with multiple effects."""
builder = DAGBuilder()
source = builder.source(str(test_video))
transformed = builder.transform(source, effects={
"saturation": 1.2,
"contrast": 1.1,
"brightness": 0.05,
})
builder.set_output(transformed)
dag = builder.build()
result = engine.execute(dag)
assert result.success
class TestSequenceExecutor:
"""Test SEQUENCE node executor."""
def test_sequence_cut(self, engine, test_video):
"""Test sequence with cut transition."""
builder = DAGBuilder()
s1 = builder.source(str(test_video))
seg1 = builder.segment(s1, duration=2.0)
seg2 = builder.segment(s1, offset=2.0, duration=2.0)
seq = builder.sequence([seg1, seg2], transition={"type": "cut"})
builder.set_output(seq)
dag = builder.build()
result = engine.execute(dag)
assert result.success
# Verify combined duration
probe = subprocess.run([
"ffprobe", "-v", "error",
"-show_entries", "format=duration",
"-of", "csv=p=0",
str(result.output_path)
], capture_output=True, text=True)
duration = float(probe.stdout.strip())
assert abs(duration - 4.0) < 0.2
def test_sequence_crossfade(self, engine, test_video):
"""Test sequence with crossfade transition."""
builder = DAGBuilder()
s1 = builder.source(str(test_video))
seg1 = builder.segment(s1, duration=3.0)
seg2 = builder.segment(s1, offset=1.0, duration=3.0)
seq = builder.sequence([seg1, seg2], transition={"type": "crossfade", "duration": 0.5})
builder.set_output(seq)
dag = builder.build()
result = engine.execute(dag)
assert result.success
# Duration should be sum minus crossfade
probe = subprocess.run([
"ffprobe", "-v", "error",
"-show_entries", "format=duration",
"-of", "csv=p=0",
str(result.output_path)
], capture_output=True, text=True)
duration = float(probe.stdout.strip())
# 3 + 3 - 0.5 = 5.5
assert abs(duration - 5.5) < 0.3
class TestMuxExecutor:
"""Test MUX node executor."""
def test_mux_video_audio(self, engine, test_video, test_audio):
"""Test muxing video and audio."""
builder = DAGBuilder()
video = builder.source(str(test_video))
audio = builder.source(str(test_audio))
muxed = builder.mux(video, audio)
builder.set_output(muxed)
dag = builder.build()
result = engine.execute(dag)
assert result.success
assert result.output_path.exists()
class TestAudioMixExecutor:
"""Test AUDIO_MIX node executor."""
def test_audio_mix_simple(self, engine, cache_dir):
"""Test simple audio mixing."""
# Create two test audio files with different frequencies
audio1_path = cache_dir / "audio1.mp3"
audio2_path = cache_dir / "audio2.mp3"
subprocess.run([
"ffmpeg", "-y",
"-f", "lavfi", "-i", "sine=frequency=440:duration=3",
"-c:a", "libmp3lame",
str(audio1_path)
], capture_output=True, check=True)
subprocess.run([
"ffmpeg", "-y",
"-f", "lavfi", "-i", "sine=frequency=880:duration=3",
"-c:a", "libmp3lame",
str(audio2_path)
], capture_output=True, check=True)
builder = DAGBuilder()
a1 = builder.source(str(audio1_path))
a2 = builder.source(str(audio2_path))
mixed = builder.audio_mix([a1, a2])
builder.set_output(mixed)
dag = builder.build()
result = engine.execute(dag)
assert result.success
assert result.output_path.exists()
def test_audio_mix_with_gains(self, engine, cache_dir):
"""Test audio mixing with custom gains."""
audio1_path = cache_dir / "audio1.mp3"
audio2_path = cache_dir / "audio2.mp3"
subprocess.run([
"ffmpeg", "-y",
"-f", "lavfi", "-i", "sine=frequency=440:duration=3",
"-c:a", "libmp3lame",
str(audio1_path)
], capture_output=True, check=True)
subprocess.run([
"ffmpeg", "-y",
"-f", "lavfi", "-i", "sine=frequency=880:duration=3",
"-c:a", "libmp3lame",
str(audio2_path)
], capture_output=True, check=True)
builder = DAGBuilder()
a1 = builder.source(str(audio1_path))
a2 = builder.source(str(audio2_path))
mixed = builder.audio_mix([a1, a2], gains=[1.0, 0.3])
builder.set_output(mixed)
dag = builder.build()
result = engine.execute(dag)
assert result.success
assert result.output_path.exists()
def test_audio_mix_three_inputs(self, engine, cache_dir):
"""Test mixing three audio sources."""
audio_paths = []
for i, freq in enumerate([440, 660, 880]):
path = cache_dir / f"audio{i}.mp3"
subprocess.run([
"ffmpeg", "-y",
"-f", "lavfi", "-i", f"sine=frequency={freq}:duration=2",
"-c:a", "libmp3lame",
str(path)
], capture_output=True, check=True)
audio_paths.append(path)
builder = DAGBuilder()
sources = [builder.source(str(p)) for p in audio_paths]
mixed = builder.audio_mix(sources, gains=[1.0, 0.5, 0.3])
builder.set_output(mixed)
dag = builder.build()
result = engine.execute(dag)
assert result.success
assert result.output_path.exists()
class TestCaching:
"""Test engine caching behavior."""
def test_cache_reuse(self, engine, test_video):
"""Test that cached results are reused."""
builder = DAGBuilder()
source = builder.source(str(test_video))
builder.set_output(source)
dag = builder.build()
# First execution
result1 = engine.execute(dag)
assert result1.success
assert result1.nodes_cached == 0
assert result1.nodes_executed == 1
# Second execution should use cache
result2 = engine.execute(dag)
assert result2.success
assert result2.nodes_cached == 1
assert result2.nodes_executed == 0
def test_clear_cache(self, engine, test_video):
"""Test clearing cache."""
builder = DAGBuilder()
source = builder.source(str(test_video))
builder.set_output(source)
dag = builder.build()
engine.execute(dag)
assert engine.cache.stats.total_entries == 1
engine.clear_cache()
assert engine.cache.stats.total_entries == 0
class TestProgressCallback:
"""Test progress callback functionality."""
def test_progress_callback(self, engine, test_video):
"""Test that progress callback is called."""
progress_updates = []
def callback(progress):
progress_updates.append((progress.node_id, progress.status))
engine.set_progress_callback(callback)
builder = DAGBuilder()
source = builder.source(str(test_video))
builder.set_output(source)
dag = builder.build()
result = engine.execute(dag)
assert result.success
assert len(progress_updates) > 0
# Should have pending, running, completed
statuses = [p[1] for p in progress_updates]
assert "pending" in statuses
assert "completed" in statuses
class TestFullWorkflow:
"""Test complete workflow."""
def test_full_pipeline(self, engine, test_video, test_audio):
"""Test complete video processing pipeline."""
builder = DAGBuilder()
# Load sources
video = builder.source(str(test_video))
audio = builder.source(str(test_audio))
# Extract segment
segment = builder.segment(video, duration=3.0)
# Resize
resized = builder.resize(segment, width=640, height=480)
# Apply effects
transformed = builder.transform(resized, effects={"saturation": 1.3})
# Mux with audio
final = builder.mux(transformed, audio)
builder.set_output(final)
dag = builder.build()
result = engine.execute(dag)
assert result.success
assert result.output_path.exists()
assert result.nodes_executed == 6 # source, source, segment, resize, transform, mux