- Ignores input, returns dog.mkv from immutable URL - Downloads from art-source with hash verification - Caches downloaded file locally - 5 automated tests (all passing) Owner: @giles@artdag.rose-ash.com 🤖 Generated with [Claude Code](https://claude.com/claude-code) Co-Authored-By: Claude Opus 4.5 <noreply@anthropic.com>
85 lines
2.4 KiB
Python
85 lines
2.4 KiB
Python
"""
|
|
Dog effect - returns dog.mkv regardless of input.
|
|
|
|
This is a constant effect that fetches from an immutable URL.
|
|
"""
|
|
|
|
import hashlib
|
|
import logging
|
|
import shutil
|
|
from pathlib import Path
|
|
from typing import Any, Dict
|
|
|
|
import requests
|
|
|
|
logger = logging.getLogger(__name__)
|
|
|
|
# Immutable source URL (commit-pinned)
|
|
DOG_URL = "https://git.rose-ash.com/art-dag/art-source/raw/3cb70f9aab6c5e9473e5ce34c59fb1d7dca4bdaa/dog.mkv"
|
|
DOG_HASH = "772f26f9b4e80984788bc48f7c6eee0a1974966b2d4ee56a72d7c6586b3ac9d8"
|
|
|
|
|
|
def file_hash(path: Path) -> str:
|
|
"""Compute SHA3-256 hash of a file."""
|
|
hasher = hashlib.sha3_256()
|
|
with open(path, "rb") as f:
|
|
for chunk in iter(lambda: f.read(65536), b""):
|
|
hasher.update(chunk)
|
|
return hasher.hexdigest()
|
|
|
|
|
|
def effect_dog(input_path: Path, output_path: Path, config: Dict[str, Any]) -> Path:
|
|
"""
|
|
Dog effect - ignores input, returns dog.mkv.
|
|
|
|
Downloads from immutable URL and verifies hash.
|
|
"""
|
|
output_path.parent.mkdir(parents=True, exist_ok=True)
|
|
|
|
# Output with correct extension
|
|
actual_output = output_path.with_suffix(".mkv")
|
|
if actual_output.exists():
|
|
actual_output.unlink()
|
|
|
|
# Check cache first
|
|
cache_dir = Path.home() / ".artdag" / "effect_cache"
|
|
cache_dir.mkdir(parents=True, exist_ok=True)
|
|
cached_file = cache_dir / f"{DOG_HASH}.mkv"
|
|
|
|
if cached_file.exists():
|
|
# Verify cached file
|
|
if file_hash(cached_file) == DOG_HASH:
|
|
logger.debug(f"EFFECT dog: using cached {cached_file}")
|
|
shutil.copy2(cached_file, actual_output)
|
|
return actual_output
|
|
else:
|
|
# Cache corrupted, remove it
|
|
cached_file.unlink()
|
|
|
|
# Download from immutable URL
|
|
logger.info(f"EFFECT dog: downloading from {DOG_URL}")
|
|
response = requests.get(DOG_URL, stream=True)
|
|
response.raise_for_status()
|
|
|
|
# Write to cache
|
|
with open(cached_file, "wb") as f:
|
|
for chunk in response.iter_content(chunk_size=65536):
|
|
f.write(chunk)
|
|
|
|
# Verify hash
|
|
downloaded_hash = file_hash(cached_file)
|
|
if downloaded_hash != DOG_HASH:
|
|
cached_file.unlink()
|
|
raise ValueError(f"Hash mismatch! Expected {DOG_HASH}, got {downloaded_hash}")
|
|
|
|
# Copy to output
|
|
shutil.copy2(cached_file, actual_output)
|
|
logger.debug(f"EFFECT dog: {input_path.name} -> {actual_output} (input ignored)")
|
|
|
|
return actual_output
|
|
|
|
|
|
# Export for registration
|
|
effect = effect_dog
|
|
name = "dog"
|