Files
mono/artdag/core/artdag/nodes/source.py
giles 1a74d811f7 Incorporate art-dag-mono repo into artdag/ subfolder
Merges full history from art-dag/mono.git into the monorepo
under the artdag/ directory. Contains: core (DAG engine),
l1 (Celery rendering server), l2 (ActivityPub registry),
common (shared templates/middleware), client (CLI), test (e2e).

Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>

git-subtree-dir: artdag
git-subtree-mainline: 1a179de547
git-subtree-split: 4c2e716558
2026-02-27 09:07:23 +00:00

63 lines
1.6 KiB
Python

# primitive/nodes/source.py
"""
Source executors: Load media from paths.
Primitives: SOURCE
"""
import logging
import os
import shutil
from pathlib import Path
from typing import Any, Dict, List
from ..dag import NodeType
from ..executor import Executor, register_executor
logger = logging.getLogger(__name__)
@register_executor(NodeType.SOURCE)
class SourceExecutor(Executor):
"""
Load source media from a path.
Config:
path: Path to source file
Creates a symlink to the source file for zero-copy loading.
"""
def execute(
self,
config: Dict[str, Any],
inputs: List[Path],
output_path: Path,
) -> Path:
source_path = Path(config["path"])
if not source_path.exists():
raise FileNotFoundError(f"Source file not found: {source_path}")
output_path.parent.mkdir(parents=True, exist_ok=True)
# Use symlink for zero-copy
if output_path.exists() or output_path.is_symlink():
output_path.unlink()
# Preserve extension from source
actual_output = output_path.with_suffix(source_path.suffix)
if actual_output.exists() or actual_output.is_symlink():
actual_output.unlink()
os.symlink(source_path.resolve(), actual_output)
logger.debug(f"SOURCE: {source_path.name} -> {actual_output}")
return actual_output
def validate_config(self, config: Dict[str, Any]) -> List[str]:
errors = []
if "path" not in config:
errors.append("SOURCE requires 'path' config")
return errors