Merges full history from art-dag/mono.git into the monorepo under the artdag/ directory. Contains: core (DAG engine), l1 (Celery rendering server), l2 (ActivityPub registry), common (shared templates/middleware), client (CLI), test (e2e). Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com> git-subtree-dir: artdag git-subtree-mainline:1a179de547git-subtree-split:4c2e716558
328 lines
12 KiB
Python
328 lines
12 KiB
Python
"""
|
|
Tests for effect loading from cache and IPFS.
|
|
|
|
These tests verify that:
|
|
- Effects can be loaded from the local cache directory
|
|
- IPFS gateway configuration is correct for Docker environments
|
|
- The effect executor correctly resolves CIDs from config
|
|
"""
|
|
|
|
import os
|
|
import sys
|
|
import tempfile
|
|
from pathlib import Path
|
|
from typing import Any, Dict, Optional
|
|
from unittest.mock import patch, MagicMock
|
|
|
|
import pytest
|
|
|
|
|
|
# Minimal effect loading implementation for testing
|
|
# This mirrors the logic in artdag/nodes/effect.py
|
|
|
|
|
|
def get_effects_cache_dir_impl(env_vars: Dict[str, str]) -> Optional[Path]:
|
|
"""Get the effects cache directory from environment or default."""
|
|
for env_var in ["CACHE_DIR", "ARTDAG_CACHE_DIR"]:
|
|
cache_dir = env_vars.get(env_var)
|
|
if cache_dir:
|
|
effects_dir = Path(cache_dir) / "_effects"
|
|
if effects_dir.exists():
|
|
return effects_dir
|
|
|
|
# Try default locations
|
|
for base in [Path.home() / ".artdag" / "cache", Path("/var/cache/artdag")]:
|
|
effects_dir = base / "_effects"
|
|
if effects_dir.exists():
|
|
return effects_dir
|
|
|
|
return None
|
|
|
|
|
|
def effect_path_for_cid(effects_dir: Path, effect_cid: str) -> Path:
|
|
"""Get the expected path for an effect given its CID."""
|
|
return effects_dir / effect_cid / "effect.py"
|
|
|
|
|
|
class TestEffectCacheDirectory:
|
|
"""Tests for effect cache directory resolution."""
|
|
|
|
def test_cache_dir_from_env(self, tmp_path: Path) -> None:
|
|
"""CACHE_DIR env var should determine effects directory."""
|
|
effects_dir = tmp_path / "_effects"
|
|
effects_dir.mkdir(parents=True)
|
|
|
|
env = {"CACHE_DIR": str(tmp_path)}
|
|
result = get_effects_cache_dir_impl(env)
|
|
|
|
assert result == effects_dir
|
|
|
|
def test_artdag_cache_dir_fallback(self, tmp_path: Path) -> None:
|
|
"""ARTDAG_CACHE_DIR should work as fallback."""
|
|
effects_dir = tmp_path / "_effects"
|
|
effects_dir.mkdir(parents=True)
|
|
|
|
env = {"ARTDAG_CACHE_DIR": str(tmp_path)}
|
|
result = get_effects_cache_dir_impl(env)
|
|
|
|
assert result == effects_dir
|
|
|
|
def test_no_env_returns_none_if_no_default_exists(self) -> None:
|
|
"""Should return None if no cache directory exists."""
|
|
env = {}
|
|
result = get_effects_cache_dir_impl(env)
|
|
|
|
# Will return None unless default dirs exist
|
|
# This is expected behavior
|
|
if result is not None:
|
|
assert result.exists()
|
|
|
|
|
|
class TestEffectPathResolution:
|
|
"""Tests for effect path resolution."""
|
|
|
|
def test_effect_path_structure(self, tmp_path: Path) -> None:
|
|
"""Effect should be at _effects/{cid}/effect.py."""
|
|
effects_dir = tmp_path / "_effects"
|
|
effect_cid = "QmTestEffect123"
|
|
|
|
path = effect_path_for_cid(effects_dir, effect_cid)
|
|
|
|
assert path == effects_dir / effect_cid / "effect.py"
|
|
|
|
def test_effect_file_exists_after_upload(self, tmp_path: Path) -> None:
|
|
"""After upload, effect.py should exist in the right location."""
|
|
effects_dir = tmp_path / "_effects"
|
|
effect_cid = "QmPWaW5E5WFrmDjT6w8enqvtJhM8c5jvQu7XN1doHA3Z7J"
|
|
|
|
# Simulate effect upload (as done by app/routers/effects.py)
|
|
effect_dir = effects_dir / effect_cid
|
|
effect_dir.mkdir(parents=True)
|
|
effect_source = '''"""
|
|
@effect invert
|
|
@version 1.0.0
|
|
"""
|
|
|
|
def process_frame(frame, params, state):
|
|
return 255 - frame, state
|
|
'''
|
|
(effect_dir / "effect.py").write_text(effect_source)
|
|
|
|
# Verify the path structure
|
|
expected_path = effect_path_for_cid(effects_dir, effect_cid)
|
|
assert expected_path.exists()
|
|
assert "process_frame" in expected_path.read_text()
|
|
|
|
|
|
class TestIPFSAPIConfiguration:
|
|
"""Tests for IPFS API configuration (consistent across codebase)."""
|
|
|
|
def test_ipfs_api_multiaddr_conversion(self) -> None:
|
|
"""
|
|
IPFS_API multiaddr should convert to correct URL.
|
|
|
|
Both ipfs_client.py and artdag/nodes/effect.py now use IPFS_API
|
|
with multiaddr format for consistency.
|
|
"""
|
|
import re
|
|
|
|
def multiaddr_to_url(multiaddr: str) -> str:
|
|
"""Convert multiaddr to URL (same logic as ipfs_client.py)."""
|
|
dns_match = re.match(r"/dns[46]?/([^/]+)/tcp/(\d+)", multiaddr)
|
|
if dns_match:
|
|
return f"http://{dns_match.group(1)}:{dns_match.group(2)}"
|
|
ip4_match = re.match(r"/ip4/([^/]+)/tcp/(\d+)", multiaddr)
|
|
if ip4_match:
|
|
return f"http://{ip4_match.group(1)}:{ip4_match.group(2)}"
|
|
return "http://127.0.0.1:5001"
|
|
|
|
# Docker config
|
|
docker_api = "/dns/ipfs/tcp/5001"
|
|
url = multiaddr_to_url(docker_api)
|
|
assert url == "http://ipfs:5001"
|
|
|
|
# Local dev config
|
|
local_api = "/ip4/127.0.0.1/tcp/5001"
|
|
url = multiaddr_to_url(local_api)
|
|
assert url == "http://127.0.0.1:5001"
|
|
|
|
def test_all_ipfs_access_uses_api_not_gateway(self) -> None:
|
|
"""
|
|
All IPFS access should use IPFS_API (port 5001), not IPFS_GATEWAY (port 8080).
|
|
|
|
Fixed 2026-01-12: artdag/nodes/effect.py was using a separate IPFS_GATEWAY
|
|
variable. Now it uses IPFS_API like ipfs_client.py for consistency.
|
|
"""
|
|
# The API endpoint that both modules use
|
|
api_endpoint = "/api/v0/cat"
|
|
|
|
# This is correct - using the API
|
|
assert "api/v0" in api_endpoint
|
|
|
|
# Gateway endpoint would be /ipfs/{cid} - we don't use this anymore
|
|
gateway_pattern = "/ipfs/"
|
|
assert gateway_pattern not in api_endpoint
|
|
|
|
|
|
class TestEffectExecutorConfigResolution:
|
|
"""Tests for how the effect executor resolves CID from config."""
|
|
|
|
def test_executor_should_use_cid_key(self) -> None:
|
|
"""
|
|
Effect executor must look for 'cid' key in config.
|
|
|
|
The transform_node function sets config["cid"] for effects.
|
|
The executor must read from the same key.
|
|
"""
|
|
config = {
|
|
"effect": "invert",
|
|
"cid": "QmPWaW5E5WFrmDjT6w8enqvtJhM8c5jvQu7XN1doHA3Z7J",
|
|
"intensity": 1.0,
|
|
}
|
|
|
|
# Simulate executor CID extraction (from artdag/nodes/effect.py:258)
|
|
effect_cid = config.get("cid") or config.get("hash")
|
|
|
|
assert effect_cid == "QmPWaW5E5WFrmDjT6w8enqvtJhM8c5jvQu7XN1doHA3Z7J"
|
|
|
|
def test_executor_should_not_use_effect_hash(self) -> None:
|
|
"""
|
|
Regression test: 'effect_hash' is not a valid config key.
|
|
|
|
Bug found 2026-01-12: transform_node was using config["effect_hash"]
|
|
but executor only checks config["cid"] or config["hash"].
|
|
"""
|
|
config = {
|
|
"effect": "invert",
|
|
"effect_hash": "QmPWaW5E5WFrmDjT6w8enqvtJhM8c5jvQu7XN1doHA3Z7J",
|
|
}
|
|
|
|
# This simulates the buggy behavior where effect_hash was set
|
|
# but executor doesn't look for it
|
|
effect_cid = config.get("cid") or config.get("hash")
|
|
|
|
# The bug: effect_hash is ignored, effect_cid is None
|
|
assert effect_cid is None, "effect_hash should NOT be recognized"
|
|
|
|
def test_hash_key_is_legacy_fallback(self) -> None:
|
|
"""'hash' key should work as legacy fallback for 'cid'."""
|
|
config = {
|
|
"effect": "invert",
|
|
"hash": "QmPWaW5E5WFrmDjT6w8enqvtJhM8c5jvQu7XN1doHA3Z7J",
|
|
}
|
|
|
|
effect_cid = config.get("cid") or config.get("hash")
|
|
|
|
assert effect_cid == "QmPWaW5E5WFrmDjT6w8enqvtJhM8c5jvQu7XN1doHA3Z7J"
|
|
|
|
|
|
class TestEffectLoadingIntegration:
|
|
"""Integration tests for complete effect loading path."""
|
|
|
|
def test_effect_loads_from_cache_when_present(self, tmp_path: Path) -> None:
|
|
"""Effect should load from cache without hitting IPFS."""
|
|
effects_dir = tmp_path / "_effects"
|
|
effect_cid = "QmPWaW5E5WFrmDjT6w8enqvtJhM8c5jvQu7XN1doHA3Z7J"
|
|
|
|
# Create effect file in cache
|
|
effect_dir = effects_dir / effect_cid
|
|
effect_dir.mkdir(parents=True)
|
|
(effect_dir / "effect.py").write_text('''
|
|
def process_frame(frame, params, state):
|
|
"""Invert colors."""
|
|
return 255 - frame, state
|
|
''')
|
|
|
|
# Verify the effect can be found
|
|
effect_path = effect_path_for_cid(effects_dir, effect_cid)
|
|
assert effect_path.exists()
|
|
|
|
# Load and verify it has the expected function
|
|
import importlib.util
|
|
spec = importlib.util.spec_from_file_location("test_effect", effect_path)
|
|
module = importlib.util.module_from_spec(spec)
|
|
spec.loader.exec_module(module)
|
|
|
|
assert hasattr(module, "process_frame")
|
|
|
|
def test_effect_fetch_uses_ipfs_api(self, tmp_path: Path) -> None:
|
|
"""Effect fetch should use IPFS API endpoint, not gateway."""
|
|
import re
|
|
|
|
def multiaddr_to_url(multiaddr: str) -> str:
|
|
dns_match = re.match(r"/dns[46]?/([^/]+)/tcp/(\d+)", multiaddr)
|
|
if dns_match:
|
|
return f"http://{dns_match.group(1)}:{dns_match.group(2)}"
|
|
ip4_match = re.match(r"/ip4/([^/]+)/tcp/(\d+)", multiaddr)
|
|
if ip4_match:
|
|
return f"http://{ip4_match.group(1)}:{ip4_match.group(2)}"
|
|
return "http://127.0.0.1:5001"
|
|
|
|
# In Docker, IPFS_API=/dns/ipfs/tcp/5001
|
|
docker_multiaddr = "/dns/ipfs/tcp/5001"
|
|
base_url = multiaddr_to_url(docker_multiaddr)
|
|
effect_cid = "QmTestCid123"
|
|
|
|
# Should use API endpoint
|
|
api_url = f"{base_url}/api/v0/cat?arg={effect_cid}"
|
|
|
|
assert "ipfs:5001" in api_url
|
|
assert "/api/v0/cat" in api_url
|
|
assert "127.0.0.1" not in api_url
|
|
|
|
|
|
class TestSharedVolumeScenario:
|
|
"""
|
|
Tests simulating the Docker shared volume scenario.
|
|
|
|
In Docker:
|
|
- l1-server uploads effect to /data/cache/_effects/{cid}/effect.py
|
|
- l1-worker should find it at the same path via shared volume
|
|
"""
|
|
|
|
def test_effect_visible_on_shared_volume(self, tmp_path: Path) -> None:
|
|
"""Effect uploaded on server should be visible to worker."""
|
|
# Simulate shared volume mounted at /data/cache on both containers
|
|
shared_volume = tmp_path / "data" / "cache"
|
|
effects_dir = shared_volume / "_effects"
|
|
|
|
# Server uploads effect
|
|
effect_cid = "QmPWaW5E5WFrmDjT6w8enqvtJhM8c5jvQu7XN1doHA3Z7J"
|
|
effect_upload_dir = effects_dir / effect_cid
|
|
effect_upload_dir.mkdir(parents=True)
|
|
(effect_upload_dir / "effect.py").write_text('def process_frame(f, p, s): return f, s')
|
|
(effect_upload_dir / "metadata.json").write_text('{"cid": "' + effect_cid + '"}')
|
|
|
|
# Worker should find the effect
|
|
env_vars = {"CACHE_DIR": str(shared_volume)}
|
|
worker_effects_dir = get_effects_cache_dir_impl(env_vars)
|
|
|
|
assert worker_effects_dir is not None
|
|
assert worker_effects_dir == effects_dir
|
|
|
|
worker_effect_path = effect_path_for_cid(worker_effects_dir, effect_cid)
|
|
assert worker_effect_path.exists()
|
|
|
|
def test_effect_cid_matches_registry(self, tmp_path: Path) -> None:
|
|
"""CID in recipe registry must match the uploaded effect directory name."""
|
|
shared_volume = tmp_path
|
|
effects_dir = shared_volume / "_effects"
|
|
|
|
# The CID used in the recipe registry
|
|
registry_cid = "QmPWaW5E5WFrmDjT6w8enqvtJhM8c5jvQu7XN1doHA3Z7J"
|
|
|
|
# Upload creates directory with CID as name
|
|
effect_upload_dir = effects_dir / registry_cid
|
|
effect_upload_dir.mkdir(parents=True)
|
|
(effect_upload_dir / "effect.py").write_text('def process_frame(f, p, s): return f, s')
|
|
|
|
# Executor receives the same CID from DAG config
|
|
dag_config_cid = registry_cid # This comes from transform_node
|
|
|
|
# These must match for the lookup to work
|
|
assert dag_config_cid == registry_cid
|
|
|
|
# And the path must exist
|
|
lookup_path = effects_dir / dag_config_cid / "effect.py"
|
|
assert lookup_path.exists()
|