diff --git a/.dockerignore b/.dockerignore
new file mode 100644
index 0000000..f48a442
--- /dev/null
+++ b/.dockerignore
@@ -0,0 +1,22 @@
+# Don't copy local clones - Dockerfile will clone fresh
+artdag-effects/
+
+# Python cache
+__pycache__/
+*.py[cod]
+*.egg-info/
+.pytest_cache/
+
+# Virtual environments
+.venv/
+venv/
+
+# Local env
+.env
+
+# Git
+.git/
+
+# IDE
+.vscode/
+.idea/
diff --git a/app/templates/runs/artifacts.html b/app/templates/runs/artifacts.html
new file mode 100644
index 0000000..874188c
--- /dev/null
+++ b/app/templates/runs/artifacts.html
@@ -0,0 +1,62 @@
+{% extends "base.html" %}
+
+{% block title %}Run Artifacts{% endblock %}
+
+{% block content %}
+
+ {% for artifact in artifacts %}
+
+
+
+ {{ artifact.role }}
+
+ {{ artifact.step_name }}
+
+
+
+
Content Hash
+
{{ artifact.hash }}
+
+
+
+
+ {% if artifact.media_type == 'video' %}Video
+ {% elif artifact.media_type == 'image' %}Image
+ {% elif artifact.media_type == 'audio' %}Audio
+ {% else %}File{% endif %}
+
+ {{ (artifact.size_bytes / 1024)|round(1) }} KB
+
+
+
+
+ {% endfor %}
+
+{% else %}
+
+
No artifacts found for this run.
+
+{% endif %}
+{% endblock %}
diff --git a/app/templates/runs/plan.html b/app/templates/runs/plan.html
new file mode 100644
index 0000000..f50090d
--- /dev/null
+++ b/app/templates/runs/plan.html
@@ -0,0 +1,99 @@
+{% extends "base.html" %}
+
+{% block title %}Run Plan - {{ run_id[:16] }}{% endblock %}
+
+{% block head %}
+
+{% endblock %}
+
+{% block content %}
+
+
No execution plan available for this run.
+
+{% endif %}
+{% endblock %}
diff --git a/celery_app.py b/celery_app.py
index 9f81107..f997330 100644
--- a/celery_app.py
+++ b/celery_app.py
@@ -1,8 +1,8 @@
"""
Art DAG Celery Application
-Distributed rendering for the Art DAG system.
-Uses the foundational artdag language from GitHub.
+Streaming video rendering for the Art DAG system.
+Uses S-expression recipes with frame-by-frame processing.
"""
import os
@@ -14,7 +14,7 @@ app = Celery(
'art_celery',
broker=REDIS_URL,
backend=REDIS_URL,
- include=['legacy_tasks', 'tasks', 'tasks.analyze', 'tasks.execute', 'tasks.orchestrate', 'tasks.execute_sexp']
+ include=['tasks', 'tasks.streaming']
)
app.conf.update(
diff --git a/configs/audio-dizzy.sexp b/configs/audio-dizzy.sexp
new file mode 100644
index 0000000..dc16087
--- /dev/null
+++ b/configs/audio-dizzy.sexp
@@ -0,0 +1,17 @@
+;; Audio Configuration - dizzy.mp3
+;;
+;; Defines audio analyzer and playback for a recipe.
+;; Pass to recipe with: --audio configs/audio-dizzy.sexp
+;;
+;; Provides:
+;; - music: audio analyzer for beat/energy detection
+;; - audio-playback: path for synchronized playback
+
+(require-primitives "streaming")
+
+;; Audio analyzer (provides beat detection and energy levels)
+;; Paths relative to working directory (project root)
+(def music (streaming:make-audio-analyzer "dizzy.mp3"))
+
+;; Audio playback path (for sync with video output)
+(audio-playback "dizzy.mp3")
diff --git a/configs/audio-halleluwah.sexp b/configs/audio-halleluwah.sexp
new file mode 100644
index 0000000..5e4b812
--- /dev/null
+++ b/configs/audio-halleluwah.sexp
@@ -0,0 +1,17 @@
+;; Audio Configuration - dizzy.mp3
+;;
+;; Defines audio analyzer and playback for a recipe.
+;; Pass to recipe with: --audio configs/audio-dizzy.sexp
+;;
+;; Provides:
+;; - music: audio analyzer for beat/energy detection
+;; - audio-playback: path for synchronized playback
+
+(require-primitives "streaming")
+
+;; Audio analyzer (provides beat detection and energy levels)
+;; Paths relative to working directory (project root)
+(def music (streaming:make-audio-analyzer "woods_half/halleluwah.webm"))
+
+;; Audio playback path (for sync with video output)
+(audio-playback "woods_half/halleluwah.webm")
diff --git a/configs/sources-default.sexp b/configs/sources-default.sexp
new file mode 100644
index 0000000..754bd92
--- /dev/null
+++ b/configs/sources-default.sexp
@@ -0,0 +1,38 @@
+;; Default Sources Configuration
+;;
+;; Defines video sources and per-pair effect configurations.
+;; Pass to recipe with: --sources configs/sources-default.sexp
+;;
+;; Required by recipes using process-pair macro:
+;; - sources: array of video sources
+;; - pair-configs: array of effect configurations per source
+
+(require-primitives "streaming")
+
+;; Video sources array
+;; Paths relative to working directory (project root)
+(def sources [
+ (streaming:make-video-source "monday.webm" 30)
+ (streaming:make-video-source "escher.webm" 30)
+ (streaming:make-video-source "2.webm" 30)
+ (streaming:make-video-source "disruptors.webm" 30)
+ (streaming:make-video-source "4.mp4" 30)
+ (streaming:make-video-source "ecstacy.mp4" 30)
+ (streaming:make-video-source "dopple.webm" 30)
+ (streaming:make-video-source "5.mp4" 30)
+])
+
+;; Per-pair effect config: rotation direction, rotation ranges, zoom ranges
+;; :dir = rotation direction (1 or -1)
+;; :rot-a, :rot-b = max rotation angles for clip A and B
+;; :zoom-a, :zoom-b = max zoom amounts for clip A and B
+(def pair-configs [
+ {:dir -1 :rot-a 45 :rot-b -45 :zoom-a 1.5 :zoom-b 0.5} ;; 0: monday
+ {:dir 1 :rot-a 45 :rot-b -45 :zoom-a 1.5 :zoom-b 0.5} ;; 1: escher
+ {:dir 1 :rot-a 45 :rot-b -45 :zoom-a 1.5 :zoom-b 0.5} ;; 2: vid2
+ {:dir -1 :rot-a -45 :rot-b 45 :zoom-a 0.5 :zoom-b 1.5} ;; 3: disruptors (reversed)
+ {:dir -1 :rot-a 45 :rot-b -45 :zoom-a 1.5 :zoom-b 0.5} ;; 4: vid4
+ {:dir 1 :rot-a 30 :rot-b -30 :zoom-a 1.3 :zoom-b 0.7} ;; 5: ecstacy (smaller)
+ {:dir -1 :rot-a -45 :rot-b 45 :zoom-a 0.5 :zoom-b 1.5} ;; 6: dopple (reversed)
+ {:dir 1 :rot-a 45 :rot-b -45 :zoom-a 1.5 :zoom-b 0.5} ;; 7: vid5
+])
diff --git a/configs/sources-woods-half.sexp b/configs/sources-woods-half.sexp
new file mode 100644
index 0000000..d2feff8
--- /dev/null
+++ b/configs/sources-woods-half.sexp
@@ -0,0 +1,19 @@
+;; Half-resolution Woods Sources (960x540)
+;;
+;; Pass to recipe with: --sources configs/sources-woods-half.sexp
+
+(require-primitives "streaming")
+
+(def sources [
+ (streaming:make-video-source "woods_half/1.webm" 30)
+ (streaming:make-video-source "woods_half/2.webm" 30)
+ (streaming:make-video-source "woods_half/3.webm" 30)
+ (streaming:make-video-source "woods_half/4.webm" 30)
+])
+
+(def pair-configs [
+ {:dir -1 :rot-a 45 :rot-b -45 :zoom-a 1.5 :zoom-b 0.5}
+ {:dir 1 :rot-a 45 :rot-b -45 :zoom-a 1.5 :zoom-b 0.5}
+ {:dir 1 :rot-a 45 :rot-b -45 :zoom-a 1.5 :zoom-b 0.5}
+ {:dir -1 :rot-a -45 :rot-b 45 :zoom-a 0.5 :zoom-b 1.5}
+])
diff --git a/configs/sources-woods.sexp b/configs/sources-woods.sexp
new file mode 100644
index 0000000..717bfd9
--- /dev/null
+++ b/configs/sources-woods.sexp
@@ -0,0 +1,39 @@
+;; Default Sources Configuration
+;;
+;; Defines video sources and per-pair effect configurations.
+;; Pass to recipe with: --sources configs/sources-default.sexp
+;;
+;; Required by recipes using process-pair macro:
+;; - sources: array of video sources
+;; - pair-configs: array of effect configurations per source
+
+(require-primitives "streaming")
+
+;; Video sources array
+;; Paths relative to working directory (project root)
+(def sources [
+ (streaming:make-video-source "woods/1.webm" 10)
+ (streaming:make-video-source "woods/2.webm" 10)
+ (streaming:make-video-source "woods/3.webm" 10)
+ (streaming:make-video-source "woods/4.webm" 10)
+ (streaming:make-video-source "woods/5.webm" 10)
+ (streaming:make-video-source "woods/6.webm" 10)
+ (streaming:make-video-source "woods/7.webm" 10)
+ (streaming:make-video-source "woods/8.webm" 10)
+])
+
+;; Per-pair effect config: rotation direction, rotation ranges, zoom ranges
+;; :dir = rotation direction (1 or -1)
+;; :rot-a, :rot-b = max rotation angles for clip A and B
+;; :zoom-a, :zoom-b = max zoom amounts for clip A and B
+(def pair-configs [
+ {:dir -1 :rot-a 45 :rot-b -45 :zoom-a 1.5 :zoom-b 0.5} ;; 0: monday
+ {:dir 1 :rot-a 45 :rot-b -45 :zoom-a 1.5 :zoom-b 0.5} ;; 1: escher
+ {:dir 1 :rot-a 45 :rot-b -45 :zoom-a 1.5 :zoom-b 0.5} ;; 2: vid2
+ {:dir -1 :rot-a -45 :rot-b 45 :zoom-a 0.5 :zoom-b 1.5} ;; 3: disruptors (reversed)
+ {:dir -1 :rot-a 45 :rot-b -45 :zoom-a 1.5 :zoom-b 0.5} ;; 0: monday
+ {:dir 1 :rot-a 45 :rot-b -45 :zoom-a 1.5 :zoom-b 0.5} ;; 1: escher
+ {:dir -1 :rot-a 45 :rot-b -45 :zoom-a 1.5 :zoom-b 0.5} ;; 0: monday
+ {:dir 1 :rot-a 45 :rot-b -45 :zoom-a 1.5 :zoom-b 0.5} ;; 1: escher
+
+])
diff --git a/effects/quick_test_explicit.sexp b/effects/quick_test_explicit.sexp
new file mode 100644
index 0000000..0a3698b
--- /dev/null
+++ b/effects/quick_test_explicit.sexp
@@ -0,0 +1,150 @@
+;; Quick Test - Fully Explicit Streaming Version
+;;
+;; The interpreter is completely generic - knows nothing about video/audio.
+;; All domain logic is explicit via primitives.
+;;
+;; Run with built-in sources/audio:
+;; python3 -m streaming.stream_sexp_generic effects/quick_test_explicit.sexp --fps 30
+;;
+;; Run with external config files:
+;; python3 -m streaming.stream_sexp_generic effects/quick_test_explicit.sexp \
+;; --sources configs/sources-default.sexp \
+;; --audio configs/audio-dizzy.sexp \
+;; --fps 30
+
+(stream "quick_test_explicit"
+ :fps 30
+ :width 1920
+ :height 1080
+ :seed 42
+
+ ;; Load standard primitives and effects
+ (include :path "../templates/standard-primitives.sexp")
+ (include :path "../templates/standard-effects.sexp")
+
+ ;; Load reusable templates
+ (include :path "../templates/stream-process-pair.sexp")
+ (include :path "../templates/crossfade-zoom.sexp")
+
+ ;; === SOURCES AS ARRAY ===
+ (def sources [
+ (streaming:make-video-source "monday.webm" 30)
+ (streaming:make-video-source "escher.webm" 30)
+ (streaming:make-video-source "2.webm" 30)
+ (streaming:make-video-source "disruptors.webm" 30)
+ (streaming:make-video-source "4.mp4" 30)
+ (streaming:make-video-source "ecstacy.mp4" 30)
+ (streaming:make-video-source "dopple.webm" 30)
+ (streaming:make-video-source "5.mp4" 30)
+ ])
+
+ ;; Per-pair config: [rot-dir, rot-a-max, rot-b-max, zoom-a-max, zoom-b-max]
+ ;; Pairs 3,6: reversed (negative rot-a, positive rot-b, shrink zoom-a, grow zoom-b)
+ ;; Pair 5: smaller ranges
+ (def pair-configs [
+ {:dir -1 :rot-a 45 :rot-b -45 :zoom-a 1.5 :zoom-b 0.5} ;; 0: monday
+ {:dir 1 :rot-a 45 :rot-b -45 :zoom-a 1.5 :zoom-b 0.5} ;; 1: escher
+ {:dir 1 :rot-a 45 :rot-b -45 :zoom-a 1.5 :zoom-b 0.5} ;; 2: vid2
+ {:dir -1 :rot-a -45 :rot-b 45 :zoom-a 0.5 :zoom-b 1.5} ;; 3: disruptors (reversed)
+ {:dir -1 :rot-a 45 :rot-b -45 :zoom-a 1.5 :zoom-b 0.5} ;; 4: vid4
+ {:dir 1 :rot-a 30 :rot-b -30 :zoom-a 1.3 :zoom-b 0.7} ;; 5: ecstacy (smaller)
+ {:dir -1 :rot-a -45 :rot-b 45 :zoom-a 0.5 :zoom-b 1.5} ;; 6: dopple (reversed)
+ {:dir 1 :rot-a 45 :rot-b -45 :zoom-a 1.5 :zoom-b 0.5} ;; 7: vid5
+ ])
+
+ ;; Audio analyzer
+ (def music (streaming:make-audio-analyzer "dizzy.mp3"))
+
+ ;; Audio playback
+ (audio-playback "../dizzy.mp3")
+
+ ;; === GLOBAL SCANS ===
+
+ ;; Cycle state: which source is active (recipe-specific)
+ ;; clen = beats per source (8-24 beats = ~4-12 seconds)
+ (scan cycle (streaming:audio-beat music t)
+ :init {:active 0 :beat 0 :clen 16}
+ :step (if (< (+ beat 1) clen)
+ (dict :active active :beat (+ beat 1) :clen clen)
+ (dict :active (mod (+ active 1) (len sources)) :beat 0
+ :clen (+ 8 (mod (* (streaming:audio-beat-count music t) 7) 17)))))
+
+ ;; Reusable scans from templates (require 'music' to be defined)
+ (include :path "../templates/scan-oscillating-spin.sexp")
+ (include :path "../templates/scan-ripple-drops.sexp")
+
+ ;; === PER-PAIR STATE (dynamically sized based on sources) ===
+ ;; Each pair has: inv-a, inv-b, hue-a, hue-b, mix, rot-angle
+ (scan pairs (streaming:audio-beat music t)
+ :init {:states (map (core:range (len sources)) (lambda (_)
+ {:inv-a 0 :inv-b 0 :hue-a 0 :hue-b 0 :hue-a-val 0 :hue-b-val 0 :mix 0.5 :mix-rem 5 :angle 0 :rot-beat 0 :rot-clen 25}))}
+ :step (dict :states (map states (lambda (p)
+ (let [;; Invert toggles (10% chance, lasts 1-4 beats)
+ new-inv-a (if (< (core:rand) 0.1) (+ 1 (core:rand-int 1 4)) (core:max 0 (- (get p :inv-a) 1)))
+ new-inv-b (if (< (core:rand) 0.1) (+ 1 (core:rand-int 1 4)) (core:max 0 (- (get p :inv-b) 1)))
+ ;; Hue shifts (10% chance, lasts 1-4 beats) - use countdown like invert
+ old-hue-a (get p :hue-a)
+ old-hue-b (get p :hue-b)
+ new-hue-a (if (< (core:rand) 0.1) (+ 1 (core:rand-int 1 4)) (core:max 0 (- old-hue-a 1)))
+ new-hue-b (if (< (core:rand) 0.1) (+ 1 (core:rand-int 1 4)) (core:max 0 (- old-hue-b 1)))
+ ;; Pick random hue value when triggering (stored separately)
+ new-hue-a-val (if (> new-hue-a old-hue-a) (+ 30 (* (core:rand) 300)) (get p :hue-a-val))
+ new-hue-b-val (if (> new-hue-b old-hue-b) (+ 30 (* (core:rand) 300)) (get p :hue-b-val))
+ ;; Mix (holds for 1-10 beats, then picks 0, 0.5, or 1)
+ mix-rem (get p :mix-rem)
+ old-mix (get p :mix)
+ new-mix-rem (if (> mix-rem 0) (- mix-rem 1) (+ 1 (core:rand-int 1 10)))
+ new-mix (if (> mix-rem 0) old-mix (* (core:rand-int 0 2) 0.5))
+ ;; Rotation (accumulates, reverses direction when cycle completes)
+ rot-beat (get p :rot-beat)
+ rot-clen (get p :rot-clen)
+ old-angle (get p :angle)
+ ;; Note: dir comes from pair-configs, but we store rotation state here
+ new-rot-beat (if (< (+ rot-beat 1) rot-clen) (+ rot-beat 1) 0)
+ new-rot-clen (if (< (+ rot-beat 1) rot-clen) rot-clen (+ 20 (core:rand-int 0 10)))
+ new-angle (+ old-angle (/ 360 rot-clen))]
+ (dict :inv-a new-inv-a :inv-b new-inv-b
+ :hue-a new-hue-a :hue-b new-hue-b
+ :hue-a-val new-hue-a-val :hue-b-val new-hue-b-val
+ :mix new-mix :mix-rem new-mix-rem
+ :angle new-angle :rot-beat new-rot-beat :rot-clen new-rot-clen))))))
+
+ ;; === FRAME PIPELINE ===
+ (frame
+ (let [now t
+ e (streaming:audio-energy music now)
+
+ ;; Get cycle state
+ active (bind cycle :active)
+ beat-pos (bind cycle :beat)
+ clen (bind cycle :clen)
+
+ ;; Transition logic: last third of cycle crossfades to next
+ phase3 (* beat-pos 3)
+ fading (and (>= phase3 (* clen 2)) (< phase3 (* clen 3)))
+ fade-amt (if fading (/ (- phase3 (* clen 2)) clen) 0)
+ next-idx (mod (+ active 1) (len sources))
+
+ ;; Get pair states array (required by process-pair macro)
+ pair-states (bind pairs :states)
+
+ ;; Process active pair using macro from template
+ active-frame (process-pair active)
+
+ ;; Crossfade with zoom during transition (using macro)
+ result (if fading
+ (crossfade-zoom active-frame (process-pair next-idx) fade-amt)
+ active-frame)
+
+ ;; Final: global spin + ripple
+ spun (rotate result :angle (bind spin :angle))
+ rip-gate (bind ripple-state :gate)
+ rip-amp (* rip-gate (core:map-range e 0 1 5 50))]
+
+ (ripple spun
+ :amplitude rip-amp
+ :center_x (bind ripple-state :cx)
+ :center_y (bind ripple-state :cy)
+ :frequency 8
+ :decay 2
+ :speed 5))))
diff --git a/hybrid_state.py b/hybrid_state.py
deleted file mode 100644
index b351a7c..0000000
--- a/hybrid_state.py
+++ /dev/null
@@ -1,294 +0,0 @@
-"""
-Hybrid State Manager: Local Redis + IPNS Sync.
-
-Provides fast local operations with eventual consistency across L1 nodes.
-
-- Local Redis: Fast reads/writes (microseconds)
-- IPNS Sync: Background sync with other nodes (every N seconds)
-- Duplicate work: Accepted, idempotent (same inputs → same CID)
-
-Usage:
- from hybrid_state import get_state_manager
-
- state = get_state_manager()
-
- # Fast local lookup
- cid = state.get_cached_cid(cache_id)
-
- # Fast local write (synced in background)
- state.set_cached_cid(cache_id, output_cid)
-"""
-
-import json
-import logging
-import os
-import threading
-import time
-from typing import Dict, Optional
-
-import redis
-
-logger = logging.getLogger(__name__)
-
-# Configuration
-REDIS_URL = os.environ.get("REDIS_URL", "redis://localhost:6379/5")
-CLUSTER_KEY = os.environ.get("ARTDAG_CLUSTER_KEY", "default")
-IPNS_SYNC_INTERVAL = int(os.environ.get("ARTDAG_IPNS_SYNC_INTERVAL", "30"))
-IPNS_ENABLED = os.environ.get("ARTDAG_IPNS_SYNC", "").lower() in ("true", "1", "yes")
-
-# Redis keys
-CACHE_KEY = "artdag:cid_cache" # hash: cache_id → output CID
-ANALYSIS_KEY = "artdag:analysis_cache" # hash: input_hash:features → analysis CID
-PLAN_KEY = "artdag:plan_cache" # hash: plan_id → plan CID
-RUN_KEY = "artdag:run_cache" # hash: run_id → output CID
-CLAIM_KEY_PREFIX = "artdag:claim:" # string: cache_id → worker (with TTL)
-
-# IPNS names (relative to cluster key)
-IPNS_CACHE_NAME = "cache"
-IPNS_ANALYSIS_NAME = "analysis"
-IPNS_PLAN_NAME = "plans"
-
-
-class HybridStateManager:
- """
- Local Redis + async IPNS sync for distributed L1 coordination.
-
- Fast path (local Redis):
- - get_cached_cid / set_cached_cid
- - try_claim / release_claim
-
- Slow path (background IPNS sync):
- - Periodically syncs local state with global IPNS state
- - Merges remote state into local (pulls new entries)
- - Publishes local state to IPNS (pushes updates)
- """
-
- def __init__(
- self,
- redis_url: str = REDIS_URL,
- cluster_key: str = CLUSTER_KEY,
- sync_interval: int = IPNS_SYNC_INTERVAL,
- ipns_enabled: bool = IPNS_ENABLED,
- ):
- self.cluster_key = cluster_key
- self.sync_interval = sync_interval
- self.ipns_enabled = ipns_enabled
-
- # Connect to Redis
- self._redis = redis.from_url(redis_url, decode_responses=True)
-
- # IPNS client (lazy import)
- self._ipfs = None
-
- # Sync thread
- self._sync_thread = None
- self._stop_sync = threading.Event()
-
- # Start background sync if enabled
- if self.ipns_enabled:
- self._start_background_sync()
-
- @property
- def ipfs(self):
- """Lazy import of IPFS client."""
- if self._ipfs is None:
- try:
- import ipfs_client
- self._ipfs = ipfs_client
- except ImportError:
- logger.warning("ipfs_client not available, IPNS sync disabled")
- self._ipfs = False
- return self._ipfs if self._ipfs else None
-
- # ========== CID Cache ==========
-
- def get_cached_cid(self, cache_id: str) -> Optional[str]:
- """Get output CID for a cache_id. Fast local lookup."""
- return self._redis.hget(CACHE_KEY, cache_id)
-
- def set_cached_cid(self, cache_id: str, cid: str) -> None:
- """Set output CID for a cache_id. Fast local write."""
- self._redis.hset(CACHE_KEY, cache_id, cid)
-
- def get_all_cached_cids(self) -> Dict[str, str]:
- """Get all cached CIDs."""
- return self._redis.hgetall(CACHE_KEY)
-
- # ========== Analysis Cache ==========
-
- def get_analysis_cid(self, input_hash: str, features: list) -> Optional[str]:
- """Get analysis CID for input + features."""
- key = f"{input_hash}:{','.join(sorted(features))}"
- return self._redis.hget(ANALYSIS_KEY, key)
-
- def set_analysis_cid(self, input_hash: str, features: list, cid: str) -> None:
- """Set analysis CID for input + features."""
- key = f"{input_hash}:{','.join(sorted(features))}"
- self._redis.hset(ANALYSIS_KEY, key, cid)
-
- def get_all_analysis_cids(self) -> Dict[str, str]:
- """Get all analysis CIDs."""
- return self._redis.hgetall(ANALYSIS_KEY)
-
- # ========== Plan Cache ==========
-
- def get_plan_cid(self, plan_id: str) -> Optional[str]:
- """Get plan CID for a plan_id."""
- return self._redis.hget(PLAN_KEY, plan_id)
-
- def set_plan_cid(self, plan_id: str, cid: str) -> None:
- """Set plan CID for a plan_id."""
- self._redis.hset(PLAN_KEY, plan_id, cid)
-
- def get_all_plan_cids(self) -> Dict[str, str]:
- """Get all plan CIDs."""
- return self._redis.hgetall(PLAN_KEY)
-
- # ========== Run Cache ==========
-
- def get_run_cid(self, run_id: str) -> Optional[str]:
- """Get output CID for a run_id."""
- return self._redis.hget(RUN_KEY, run_id)
-
- def set_run_cid(self, run_id: str, cid: str) -> None:
- """Set output CID for a run_id."""
- self._redis.hset(RUN_KEY, run_id, cid)
-
- # ========== Claiming ==========
-
- def try_claim(self, cache_id: str, worker_id: str, ttl: int = 300) -> bool:
- """
- Try to claim a cache_id for execution.
-
- Returns True if claimed, False if already claimed by another worker.
- Uses Redis SETNX for atomic claim.
- """
- key = f"{CLAIM_KEY_PREFIX}{cache_id}"
- return self._redis.set(key, worker_id, nx=True, ex=ttl)
-
- def release_claim(self, cache_id: str) -> None:
- """Release a claim."""
- key = f"{CLAIM_KEY_PREFIX}{cache_id}"
- self._redis.delete(key)
-
- def get_claim(self, cache_id: str) -> Optional[str]:
- """Get current claim holder for a cache_id."""
- key = f"{CLAIM_KEY_PREFIX}{cache_id}"
- return self._redis.get(key)
-
- # ========== IPNS Sync ==========
-
- def _start_background_sync(self):
- """Start background IPNS sync thread."""
- if self._sync_thread is not None:
- return
-
- def sync_loop():
- logger.info(f"IPNS sync started (interval={self.sync_interval}s)")
- while not self._stop_sync.wait(timeout=self.sync_interval):
- try:
- self._sync_with_ipns()
- except Exception as e:
- logger.warning(f"IPNS sync failed: {e}")
-
- self._sync_thread = threading.Thread(target=sync_loop, daemon=True)
- self._sync_thread.start()
-
- def stop_sync(self):
- """Stop background sync thread."""
- self._stop_sync.set()
- if self._sync_thread:
- self._sync_thread.join(timeout=5)
-
- def _sync_with_ipns(self):
- """Sync local state with IPNS global state."""
- if not self.ipfs:
- return
-
- logger.debug("Starting IPNS sync...")
-
- # Sync each cache type
- self._sync_hash(CACHE_KEY, IPNS_CACHE_NAME)
- self._sync_hash(ANALYSIS_KEY, IPNS_ANALYSIS_NAME)
- self._sync_hash(PLAN_KEY, IPNS_PLAN_NAME)
-
- logger.debug("IPNS sync complete")
-
- def _sync_hash(self, redis_key: str, ipns_name: str):
- """Sync a Redis hash with IPNS."""
- ipns_full_name = f"{self.cluster_key}/{ipns_name}"
-
- # Pull: resolve IPNS → get global state
- global_state = {}
- try:
- global_cid = self.ipfs.name_resolve(ipns_full_name)
- if global_cid:
- global_bytes = self.ipfs.get_bytes(global_cid)
- if global_bytes:
- global_state = json.loads(global_bytes.decode('utf-8'))
- logger.debug(f"Pulled {len(global_state)} entries from {ipns_name}")
- except Exception as e:
- logger.debug(f"Could not resolve {ipns_full_name}: {e}")
-
- # Merge global into local (add entries we don't have)
- if global_state:
- pipe = self._redis.pipeline()
- for key, value in global_state.items():
- pipe.hsetnx(redis_key, key, value)
- results = pipe.execute()
- added = sum(1 for r in results if r)
- if added:
- logger.info(f"Merged {added} new entries from IPNS/{ipns_name}")
-
- # Push: get local state, merge with global, publish
- local_state = self._redis.hgetall(redis_key)
- if local_state:
- merged = {**global_state, **local_state}
-
- # Only publish if we have new entries
- if len(merged) > len(global_state):
- try:
- new_cid = self.ipfs.add_json(merged)
- if new_cid:
- # Note: name_publish can be slow
- self.ipfs.name_publish(ipns_full_name, new_cid)
- logger.info(f"Published {len(merged)} entries to IPNS/{ipns_name}")
- except Exception as e:
- logger.warning(f"Failed to publish to {ipns_full_name}: {e}")
-
- def force_sync(self):
- """Force an immediate IPNS sync (blocking)."""
- self._sync_with_ipns()
-
- # ========== Stats ==========
-
- def get_stats(self) -> Dict:
- """Get cache statistics."""
- return {
- "cached_cids": self._redis.hlen(CACHE_KEY),
- "analysis_cids": self._redis.hlen(ANALYSIS_KEY),
- "plan_cids": self._redis.hlen(PLAN_KEY),
- "run_cids": self._redis.hlen(RUN_KEY),
- "ipns_enabled": self.ipns_enabled,
- "cluster_key": self.cluster_key[:16] + "..." if len(self.cluster_key) > 16 else self.cluster_key,
- }
-
-
-# Singleton instance
-_state_manager: Optional[HybridStateManager] = None
-
-
-def get_state_manager() -> HybridStateManager:
- """Get the singleton state manager instance."""
- global _state_manager
- if _state_manager is None:
- _state_manager = HybridStateManager()
- return _state_manager
-
-
-def reset_state_manager():
- """Reset the singleton (for testing)."""
- global _state_manager
- if _state_manager:
- _state_manager.stop_sync()
- _state_manager = None
diff --git a/legacy_tasks.py b/legacy_tasks.py
deleted file mode 100644
index 5827fdd..0000000
--- a/legacy_tasks.py
+++ /dev/null
@@ -1,1219 +0,0 @@
-"""
-Art DAG Celery Tasks
-
-Distributed rendering tasks for the Art DAG system.
-Supports both single-effect runs and multi-step DAG execution.
-"""
-
-import json
-import logging
-import os
-import subprocess
-import sys
-from datetime import datetime, timezone
-from pathlib import Path
-from typing import Dict, List, Optional
-
-from celery import Task
-from celery_app import app
-from cache_manager import file_hash
-
-# Import artdag components
-from artdag import DAG, Node, NodeType
-from artdag.engine import Engine
-from artdag.executor import register_executor, Executor, get_executor
-from artdag.nodes.effect import register_effect
-import artdag.nodes # Register all built-in executors (SOURCE, EFFECT, etc.)
-
-# Add effects to path (use env var in Docker, fallback to home dir locally)
-EFFECTS_PATH = Path(os.environ.get("EFFECTS_PATH", str(Path.home() / "artdag-effects")))
-ARTDAG_PATH = Path(os.environ.get("ARTDAG_PATH", str(Path.home() / "art" / "artdag")))
-
-logger = logging.getLogger(__name__)
-
-
-def get_effects_commit() -> str:
- """Get current git commit hash of effects repo."""
- try:
- result = subprocess.run(
- ["git", "rev-parse", "HEAD"],
- cwd=EFFECTS_PATH,
- capture_output=True,
- text=True
- )
- if result.returncode == 0:
- return result.stdout.strip()
- except Exception:
- pass
- return "unknown"
-
-
-def get_artdag_commit() -> str:
- """Get current git commit hash of artdag repo."""
- try:
- result = subprocess.run(
- ["git", "rev-parse", "HEAD"],
- cwd=ARTDAG_PATH,
- capture_output=True,
- text=True
- )
- if result.returncode == 0:
- return result.stdout.strip()
- except Exception:
- pass
- return "unknown"
-
-
-sys.path.insert(0, str(EFFECTS_PATH / "dog"))
-
-# Register the dog effect with the EFFECT executor
-# New format uses process() instead of effect_dog()
-from effect import process as dog_process
-
-@register_effect("dog")
-def _dog_effect(input_path: Path, output_path: Path, config: dict) -> Path:
- """Dog effect wrapper - registered for DAG EFFECT nodes."""
- # Wrap for new whole-video API
- return dog_process([input_path], output_path, config, None)
-
-
-# Cache directory (shared between server and worker)
-CACHE_DIR = Path(os.environ.get("CACHE_DIR", str(Path.home() / ".artdag" / "cache")))
-
-
-# ============ Executors for Effects ============
-
-@register_executor("effect:dog")
-class DogExecutor(Executor):
- """Executor for the dog effect."""
-
- def execute(self, config: Dict, inputs: List[Path], output_path: Path) -> Path:
- from effect import process as dog_process
- if len(inputs) != 1:
- raise ValueError(f"Dog effect expects 1 input, got {len(inputs)}")
- return dog_process(inputs, output_path, config, None)
-
-
-@register_executor("effect:identity")
-class IdentityExecutor(Executor):
- """Executor for the identity effect (passthrough)."""
-
- def execute(self, config: Dict, inputs: List[Path], output_path: Path) -> Path:
- from artdag.nodes.effect import effect_identity
- if len(inputs) != 1:
- raise ValueError(f"Identity effect expects 1 input, got {len(inputs)}")
- return effect_identity(inputs[0], output_path, config)
-
-
-@register_executor(NodeType.SOURCE)
-class SourceExecutor(Executor):
- """Executor for SOURCE nodes - loads content from cache by hash."""
-
- def execute(self, config: Dict, inputs: List[Path], output_path: Path) -> Path:
- # Source nodes load from cache by cid
- cid = config.get("cid")
- if not cid:
- raise ValueError("SOURCE node requires cid in config")
-
- # Look up in cache
- from cache_manager import get_cache_manager
- cache_manager = get_cache_manager()
- source_path = cache_manager.get_by_cid(cid)
-
- if not source_path or not source_path.exists():
- # Not in cache - fetch from IPFS
- import logging
- logger = logging.getLogger(__name__)
- logger.info(f"SOURCE {cid[:16]}... not in cache, fetching from IPFS")
-
- import ipfs_client
- fetch_path = CACHE_DIR / "ipfs_fetch" / cid
- fetch_path.parent.mkdir(parents=True, exist_ok=True)
-
- if ipfs_client.get_file(cid, str(fetch_path)):
- logger.info(f"SOURCE {cid[:16]}... fetched from IPFS to {fetch_path}")
- source_path = fetch_path
- else:
- raise ValueError(f"Source content not in cache and IPFS fetch failed: {cid}")
-
- # For source nodes, we just return the path (no transformation)
- # The engine will use this as input to subsequent nodes
- return source_path
-
-
-class RenderTask(Task):
- """Base task with provenance tracking."""
-
- def on_success(self, retval, task_id, args, kwargs):
- """Record successful render."""
- print(f"Task {task_id} completed: {retval}")
-
- def on_failure(self, exc, task_id, args, kwargs, einfo):
- """Record failed render."""
- print(f"Task {task_id} failed: {exc}")
-
-
-@app.task(base=RenderTask, bind=True)
-def render_effect(self, input_hash: str, effect_name: str, output_name: str) -> dict:
- """
- Render an effect on an input asset.
-
- Args:
- input_hash: SHA3-256 hash of input asset
- effect_name: Name of effect (e.g., "dog", "identity")
- output_name: Name for output asset
-
- Returns:
- Provenance record with output hash
- """
- from cache_manager import get_cache_manager
-
- # Registry hashes (for effects/infra metadata only)
- REGISTRY = {
- "effect:dog": {
- "hash": "d048fe313433eb4e38f0e24194ffae91b896ca3e6eed3e50b2cc37b7be495555"
- },
- "effect:identity": {
- "hash": "640ea11ee881ebf4101af0a955439105ab11e763682b209e88ea08fc66e1cc03"
- },
- "infra:artdag": {
- "hash": "96a5972de216aee12ec794dcad5f9360da2e676171eabf24a46dfe1ee5fee4b0"
- },
- "infra:giles-hp": {
- "hash": "964bf6e69dc4e2493f42375013caffe26404ec3cf8eb5d9bc170cd42a361523b"
- }
- }
-
- # Input comes from cache by hash (supports both legacy and new cache locations)
- cache_manager = get_cache_manager()
- input_path = cache_manager.get_by_cid(input_hash)
- if not input_path or not input_path.exists():
- raise ValueError(f"Input not in cache: {input_hash}")
-
- output_dir = CACHE_DIR
-
- # Verify input
- actual_hash = file_hash(input_path)
- if actual_hash != input_hash:
- raise ValueError(f"Input hash mismatch: expected {input_hash}, got {actual_hash}")
-
- self.update_state(state='RENDERING', meta={'effect': effect_name, 'input': input_hash[:16]})
-
- # Load and apply effect
- if effect_name == "dog":
- from effect import effect_dog, DOG_HASH
- output_path = output_dir / f"{output_name}.mkv"
- result = effect_dog(input_path, output_path, {})
- expected_hash = DOG_HASH
- elif effect_name == "identity":
- from artdag.nodes.effect import effect_identity
- output_path = output_dir / f"{output_name}{input_path.suffix}"
- result = effect_identity(input_path, output_path, {})
- expected_hash = input_hash
- else:
- raise ValueError(f"Unknown effect: {effect_name}")
-
- # Verify output
- output_cid = file_hash(result)
- if output_cid != expected_hash:
- raise ValueError(f"Output hash mismatch: expected {expected_hash}, got {output_cid}")
-
- # Build effect info based on source
- if effect_name == "identity":
- # Identity is from artdag package on GitHub
- artdag_commit = get_artdag_commit()
- effect_info = {
- "name": f"effect:{effect_name}",
- "cid": REGISTRY[f"effect:{effect_name}"]["hash"],
- "repo": "github",
- "repo_commit": artdag_commit,
- "repo_url": f"https://github.com/gilesbradshaw/art-dag/blob/{artdag_commit}/artdag/nodes/effect.py"
- }
- else:
- # Other effects from rose-ash effects repo
- effects_commit = get_effects_commit()
- effect_info = {
- "name": f"effect:{effect_name}",
- "cid": REGISTRY[f"effect:{effect_name}"]["hash"],
- "repo": "rose-ash",
- "repo_commit": effects_commit,
- "repo_url": f"https://git.rose-ash.com/art-dag/effects/src/commit/{effects_commit}/{effect_name}"
- }
-
- # Build provenance
- provenance = {
- "task_id": self.request.id,
- "rendered_at": datetime.now(timezone.utc).isoformat(),
- "rendered_by": "@giles@artdag.rose-ash.com",
- "output": {
- "name": output_name,
- "cid": output_cid,
- },
- "inputs": [
- {"cid": input_hash}
- ],
- "effects": [effect_info],
- "infrastructure": {
- "software": {"name": "infra:artdag", "cid": REGISTRY["infra:artdag"]["hash"]},
- "hardware": {"name": "infra:giles-hp", "cid": REGISTRY["infra:giles-hp"]["hash"]}
- }
- }
-
- # Store provenance on IPFS
- import ipfs_client
- provenance_cid = ipfs_client.add_json(provenance)
- if provenance_cid:
- provenance["provenance_cid"] = provenance_cid
- logger.info(f"Stored provenance on IPFS: {provenance_cid}")
- else:
- logger.warning("Failed to store provenance on IPFS")
-
- return provenance
-
-
-@app.task(base=RenderTask, bind=True)
-def execute_dag(self, dag_json: str, run_id: str = None) -> dict:
- """
- Execute a multi-step DAG.
-
- Args:
- dag_json: Serialized DAG as JSON string
- run_id: Optional run ID for tracking
-
- Returns:
- Execution result with output hash and node results
- """
- from cache_manager import get_cache_manager
-
- # Parse DAG
- try:
- dag = DAG.from_json(dag_json)
- except Exception as e:
- raise ValueError(f"Invalid DAG JSON: {e}")
-
- # Validate DAG
- errors = dag.validate()
- if errors:
- raise ValueError(f"Invalid DAG: {errors}")
-
- # Create engine with cache directory
- engine = Engine(CACHE_DIR / "nodes")
-
- # Set up progress callback
- def progress_callback(progress):
- self.update_state(
- state='EXECUTING',
- meta={
- 'node_id': progress.node_id,
- 'node_type': progress.node_type,
- 'status': progress.status,
- 'progress': progress.progress,
- 'message': progress.message,
- }
- )
- logger.info(f"DAG progress: {progress.node_id} - {progress.status} - {progress.message}")
-
- engine.set_progress_callback(progress_callback)
-
- # Execute DAG
- self.update_state(state='EXECUTING', meta={'status': 'starting', 'nodes': len(dag.nodes)})
- result = engine.execute(dag)
-
- if not result.success:
- raise RuntimeError(f"DAG execution failed: {result.error}")
-
- # Index all node outputs by cid and upload to IPFS
- cache_manager = get_cache_manager()
- output_cid = None
- node_hashes = {} # node_id -> cid mapping
- node_ipfs_cids = {} # node_id -> ipfs_cid mapping
-
- # Process all node results (intermediates + output)
- for node_id, node_path in result.node_results.items():
- if node_path and Path(node_path).exists():
- node = dag.nodes.get(node_id)
- # Skip SOURCE nodes - they're already in cache
- if node and (node.node_type == NodeType.SOURCE or str(node.node_type) == "SOURCE"):
- cid = node.config.get("cid")
- if cid:
- node_hashes[node_id] = cid
- continue
-
- # Determine node type for cache metadata
- node_type_str = str(node.node_type) if node else "intermediate"
- if "effect" in node_type_str.lower():
- cache_node_type = "effect_output"
- else:
- cache_node_type = "dag_intermediate"
-
- # Store in cache_manager (stored by IPFS CID, indexed by node_id)
- cached, content_cid = cache_manager.put(
- Path(node_path),
- node_type=cache_node_type,
- cache_id=node_id,
- )
- # content_cid is always IPFS CID now (IPFS failures are fatal)
- node_hashes[node_id] = content_cid
- node_ipfs_cids[node_id] = content_cid
- logger.info(f"Cached node {node_id}: IPFS CID {content_cid}")
-
- # Get output hash from the output node
- # Use the same identifier that's in the cache index (IPFS CID if available)
- if result.output_path and result.output_path.exists():
- local_hash = file_hash(result.output_path)
- output_ipfs_cid = node_ipfs_cids.get(dag.output_id)
- # Use IPFS CID as primary identifier if available, otherwise local hash
- # This must match what's in the content_index from cache_manager.put()
- output_cid = node_hashes.get(dag.output_id, local_hash)
-
- # Store output in database (for L2 to query IPFS CID)
- import asyncio
- import database
-
- # Store plan (DAG) to IPFS and local cache
- plan_cid = None
- try:
- import ipfs_client
- dag_dict = json.loads(dag_json)
- plan_cid = ipfs_client.add_json(dag_dict)
- if plan_cid:
- logger.info(f"Stored plan to IPFS: {plan_cid}")
- # Also store locally so it can be retrieved without IPFS
- # Store directly in cache_dir (get_by_cid checks cache_dir/cid)
- plan_path = CACHE_DIR / plan_cid
- CACHE_DIR.mkdir(parents=True, exist_ok=True)
- with open(plan_path, "w") as f:
- json.dump(dag_dict, f, indent=2)
- except Exception as e:
- logger.warning(f"Failed to store plan to IPFS: {e}")
-
- async def save_to_db():
- if database.pool is None:
- await database.init_db()
- await database.create_cache_item(output_cid, output_ipfs_cid)
- # Also save the run result
- if run_id:
- input_hashes_for_db = [
- node.config.get("cid")
- for node in dag.nodes.values()
- if (node.node_type == NodeType.SOURCE or str(node.node_type) == "SOURCE")
- and node.config.get("cid")
- ]
- # Get actor_id and recipe from pending_runs (saved when run started)
- actor_id = None
- recipe_name = "dag"
- pending = await database.get_pending_run(run_id)
- if pending:
- actor_id = pending.get("actor_id")
- recipe_name = pending.get("recipe") or "dag"
-
- await database.save_run_cache(
- run_id=run_id,
- output_cid=output_cid,
- recipe=recipe_name,
- inputs=input_hashes_for_db,
- ipfs_cid=output_ipfs_cid,
- actor_id=actor_id,
- plan_cid=plan_cid,
- )
-
- # Save output as media for the user
- if actor_id:
- await database.save_item_metadata(
- cid=output_cid,
- actor_id=actor_id,
- item_type="media",
- description=f"Output from recipe: {recipe_name}",
- source_type="recipe",
- source_note=f"run_id: {run_id}",
- )
-
- # Clean up pending run
- if pending:
- await database.complete_pending_run(run_id)
-
- try:
- loop = asyncio.get_event_loop()
- if loop.is_running():
- asyncio.ensure_future(save_to_db())
- else:
- loop.run_until_complete(save_to_db())
- except RuntimeError:
- asyncio.run(save_to_db())
-
- # Record activity for deletion tracking
- input_hashes = []
- intermediate_hashes = []
- for node_id, node in dag.nodes.items():
- if node.node_type == NodeType.SOURCE or str(node.node_type) == "SOURCE":
- cid = node.config.get("cid")
- if cid:
- input_hashes.append(cid)
- elif node_id != dag.output_id and node_id in node_hashes:
- intermediate_hashes.append(node_hashes[node_id])
-
- if input_hashes:
- from artdag.activities import Activity
- from datetime import datetime, timezone
- activity = Activity(
- activity_id=run_id or f"dag-{output_cid[:16]}",
- input_ids=sorted(input_hashes),
- output_id=output_cid,
- intermediate_ids=intermediate_hashes,
- created_at=datetime.now(timezone.utc).timestamp(),
- status="completed",
- )
- cache_manager.activity_store.add(activity)
-
- # Build provenance
- input_hashes_for_provenance = []
- for node_id, node in dag.nodes.items():
- if node.node_type == NodeType.SOURCE or str(node.node_type) == "SOURCE":
- cid = node.config.get("cid")
- if cid:
- input_hashes_for_provenance.append({"cid": cid})
-
- provenance = {
- "task_id": self.request.id,
- "run_id": run_id,
- "rendered_at": datetime.now(timezone.utc).isoformat(),
- "output": {
- "cid": output_cid,
- "ipfs_cid": node_ipfs_cids.get(dag.output_id) if dag.output_id else None,
- },
- "inputs": input_hashes_for_provenance,
- "dag": dag_json, # Full DAG definition
- "nodes": {
- node_id: {
- "cid": node_hashes.get(node_id),
- "ipfs_cid": node_ipfs_cids.get(node_id),
- }
- for node_id in dag.nodes.keys()
- if node_id in node_hashes
- },
- "execution": {
- "execution_time": result.execution_time,
- "nodes_executed": result.nodes_executed,
- "nodes_cached": result.nodes_cached,
- }
- }
-
- # Store provenance on IPFS
- import ipfs_client
- provenance_cid = ipfs_client.add_json(provenance)
- if provenance_cid:
- provenance["provenance_cid"] = provenance_cid
- logger.info(f"Stored DAG provenance on IPFS: {provenance_cid}")
- else:
- logger.warning("Failed to store DAG provenance on IPFS")
-
- # Build result
- return {
- "success": True,
- "run_id": run_id,
- "output_cid": output_cid,
- "output_ipfs_cid": node_ipfs_cids.get(dag.output_id) if dag.output_id else None,
- "output_path": str(result.output_path) if result.output_path else None,
- "execution_time": result.execution_time,
- "nodes_executed": result.nodes_executed,
- "nodes_cached": result.nodes_cached,
- "node_results": {
- node_id: str(path) for node_id, path in result.node_results.items()
- },
- "node_hashes": node_hashes, # node_id -> cid
- "node_ipfs_cids": node_ipfs_cids, # node_id -> ipfs_cid
- "provenance_cid": provenance_cid,
- }
-
-
-@app.task(base=RenderTask, bind=True)
-def execute_recipe(self, recipe_sexp: str, input_hashes: Dict[str, str], run_id: str = None) -> dict:
- """
- Execute an S-expression recipe.
-
- The recipe S-expression unfolds into a plan S-expression with code-addressed
- cache IDs computed before execution. Each plan node gets a deterministic hash
- "bucket" based on the computation definition (Merkle tree), not the results.
-
- Phases:
- 1. Parse: compile_string(recipe_sexp) -> CompiledRecipe
- 2. Analyze: Extract and run analysis nodes from recipe
- 3. Plan: create_plan(compiled, inputs) -> ExecutionPlanSexp with cache IDs
- 4. Store: plan.to_string() -> store as S-expression
- 5. Execute: Run steps level-by-level, checking cache by cache_id
- 6. Return: Include plan_sexp in result
-
- Args:
- recipe_sexp: Recipe as S-expression string
- input_hashes: Mapping from input name to content hash (CID)
- run_id: Optional run ID for tracking
-
- Returns:
- Execution result with output CID, plan S-expression, and node results
- """
- from cache_manager import get_cache_manager
- import ipfs_client
-
- # Try to import S-expression modules
- try:
- from artdag.sexp import compile_string, CompileError, ParseError
- from artdag.sexp.planner import create_plan, ExecutionPlanSexp, PlanStep
- except ImportError as e:
- raise ImportError(f"S-expression modules not available: {e}")
-
- cache_manager = get_cache_manager()
-
- logger.info(f"Executing recipe with {len(input_hashes)} inputs, run_id={run_id}")
-
- # ============ Phase 1: Parse ============
- self.update_state(state='PARSING', meta={'status': 'parsing recipe'})
- logger.info("Phase 1: Parsing recipe S-expression...")
-
- try:
- compiled = compile_string(recipe_sexp)
- except (ParseError, CompileError) as e:
- raise ValueError(f"Recipe parse error: {e}")
-
- recipe_name = compiled.name or "unnamed"
- logger.info(f"Parsed recipe: {recipe_name}")
-
- # ============ Phase 2: Analysis ============
- self.update_state(state='ANALYZING', meta={'status': 'running analysis'})
- logger.info("Phase 2: Running analysis nodes...")
-
- analysis_results = {}
- # Extract analysis nodes from compiled recipe
- for node in compiled.nodes:
- node_type = node.get("type", "").upper()
- config = node.get("config", {})
-
- if node_type == "ANALYZE" or config.get("analyze"):
- node_id = node.get("id")
- input_ref = config.get("input") or config.get("source")
- feature = config.get("feature") or config.get("analyze")
-
- # Resolve input reference to CID
- cid = input_hashes.get(input_ref)
- if not cid:
- logger.warning(f"Analysis node {node_id}: input '{input_ref}' not in input_hashes")
- continue
-
- # Get input file path
- input_path = cache_manager.get_by_cid(cid)
- if not input_path:
- logger.warning(f"Analysis node {node_id}: content {cid[:16]}... not in cache")
- continue
-
- # Run analysis
- try:
- from artdag.analysis import Analyzer
- analysis_dir = CACHE_DIR / "analysis"
- analysis_dir.mkdir(parents=True, exist_ok=True)
- analyzer = Analyzer(cache_dir=analysis_dir)
-
- features = [feature] if feature else ["beats", "energy"]
- result = analyzer.analyze(
- input_hash=cid,
- features=features,
- input_path=Path(input_path),
- )
- analysis_results[node_id] = result
- analysis_results[cid] = result
- logger.info(f"Analysis {node_id}: feature={feature}")
- except Exception as e:
- logger.warning(f"Analysis failed for {node_id}: {e}")
-
- logger.info(f"Completed {len(analysis_results)} analysis results")
-
- # ============ Phase 3: Generate Plan ============
- self.update_state(state='PLANNING', meta={'status': 'generating plan'})
- logger.info("Phase 3: Generating execution plan with code-addressed cache IDs...")
-
- plan = create_plan(compiled, inputs=input_hashes)
- logger.info(f"Generated plan with {len(plan.steps)} steps, plan_id={plan.plan_id[:16]}...")
-
- # ============ Phase 4: Store Plan as S-expression ============
- plan_sexp = plan.to_string(pretty=True)
- plan_cid = None
-
- try:
- plan_cid = ipfs_client.add_string(plan_sexp)
- if plan_cid:
- logger.info(f"Stored plan to IPFS: {plan_cid}")
- # Also store locally for fast retrieval
- plan_path = CACHE_DIR / plan_cid
- CACHE_DIR.mkdir(parents=True, exist_ok=True)
- plan_path.write_text(plan_sexp)
-
- # Save plan_cid to database immediately so it's available even if run fails
- if run_id:
- import asyncio
- import database
- async def save_plan_cid():
- if database.pool is None:
- await database.init_db()
- await database.update_pending_run_plan(run_id, plan_cid)
- try:
- loop = asyncio.get_event_loop()
- if loop.is_running():
- asyncio.ensure_future(save_plan_cid())
- else:
- loop.run_until_complete(save_plan_cid())
- except RuntimeError:
- asyncio.run(save_plan_cid())
- logger.info(f"Saved plan_cid to pending run: {run_id}")
- except Exception as e:
- logger.warning(f"Failed to store plan to IPFS: {e}")
-
- # ============ Phase 5: Execute Steps Level-by-Level ============
- self.update_state(state='EXECUTING', meta={'status': 'executing steps', 'total_steps': len(plan.steps)})
- logger.info("Phase 4: Executing plan steps...")
-
- # Group steps by level
- steps_by_level: Dict[int, List[PlanStep]] = {}
- for step in plan.steps:
- level = step.level
- steps_by_level.setdefault(level, []).append(step)
-
- max_level = max(steps_by_level.keys()) if steps_by_level else 0
-
- step_results = {} # step_id -> {"status", "path", "cid", "ipfs_cid"}
- cache_id_to_path = {} # cache_id -> output path (for resolving inputs)
- total_cached = 0
- total_executed = 0
-
- # Map input names to their cache_ids (inputs are their own cache_ids)
- for name, cid in input_hashes.items():
- cache_id_to_path[cid] = cache_manager.get_by_cid(cid)
-
- for level in range(max_level + 1):
- level_steps = steps_by_level.get(level, [])
- if not level_steps:
- continue
-
- logger.info(f"Executing level {level}: {len(level_steps)} steps")
-
- for step in level_steps:
- self.update_state(
- state='EXECUTING',
- meta={
- 'step_id': step.step_id,
- 'step_type': step.node_type,
- 'level': level,
- 'cache_id': step.cache_id[:16],
- }
- )
-
- # Check if cached using code-addressed cache_id
- cached_path = cache_manager.get_by_cid(step.cache_id)
- if cached_path and cached_path.exists():
- logger.info(f"Step {step.step_id}: cached at {step.cache_id[:16]}...")
- step_results[step.step_id] = {
- "status": "cached",
- "path": str(cached_path),
- "cache_id": step.cache_id,
- }
- cache_id_to_path[step.cache_id] = cached_path
- total_cached += 1
- continue
-
- # Execute the step
- try:
- # Resolve input paths from previous step cache_ids
- input_paths = []
- for input_ref in step.inputs:
- # input_ref is a step_id - find its cache_id and path
- input_step = next((s for s in plan.steps if s.step_id == input_ref), None)
- if input_step:
- input_cache_id = input_step.cache_id
- input_path = cache_id_to_path.get(input_cache_id)
- if input_path:
- input_paths.append(Path(input_path))
- else:
- # Check if it's a source input
- source_cid = step.config.get("cid")
- if source_cid:
- input_path = cache_manager.get_by_cid(source_cid)
- if input_path:
- input_paths.append(Path(input_path))
- else:
- # Direct CID reference (source node)
- source_cid = input_hashes.get(input_ref) or step.config.get("cid")
- if source_cid:
- input_path = cache_manager.get_by_cid(source_cid)
- if input_path:
- input_paths.append(Path(input_path))
-
- # Handle SOURCE nodes
- if step.node_type.upper() == "SOURCE":
- source_cid = step.config.get("cid")
-
- # If source has :input true, resolve CID from input_hashes
- if not source_cid and step.config.get("input"):
- source_name = step.config.get("name", "")
- # Try various key formats for lookup
- name_variants = [
- source_name,
- source_name.lower().replace(" ", "-"),
- source_name.lower().replace(" ", "_"),
- source_name.lower(),
- ]
- for variant in name_variants:
- if variant in input_hashes:
- source_cid = input_hashes[variant]
- logger.info(f"Resolved SOURCE '{source_name}' -> {source_cid[:16]}... via '{variant}'")
- break
-
- if not source_cid:
- raise ValueError(f"SOURCE '{source_name}' not found in input_hashes. Available: {list(input_hashes.keys())}")
-
- if source_cid:
- source_path = cache_manager.get_by_cid(source_cid)
- if source_path:
- step_results[step.step_id] = {
- "status": "source",
- "path": str(source_path),
- "cache_id": step.cache_id,
- "cid": source_cid,
- }
- cache_id_to_path[step.cache_id] = source_path
- total_cached += 1
- continue
- else:
- raise ValueError(f"Source content not found in cache: {source_cid[:16]}...")
- else:
- raise ValueError(f"SOURCE step has no cid and no :input flag: {step.config}")
-
- # Handle COMPOUND nodes (collapsed effect chains)
- if step.node_type.upper() == "COMPOUND":
- import subprocess
- import tempfile
-
- filter_chain = step.config.get("filter_chain", [])
- if not filter_chain:
- raise ValueError("COMPOUND step has empty filter_chain")
-
- # Get input path
- if not input_paths:
- raise ValueError("COMPOUND step has no inputs")
-
- # For COMPOUND with EFFECT filters, run effects sequentially
- current_input = input_paths[0]
- temp_files = []
-
- for i, filter_item in enumerate(filter_chain):
- filter_type = filter_item.get("type", "")
- filter_config = filter_item.get("config", {})
-
- if filter_type == "EFFECT":
- effect_name = filter_config.get("effect")
- effect_cid = filter_config.get("cid")
-
- if effect_name:
- # Try specific executor first, fall back to generic EFFECT executor
- effect_executor = get_executor(f"effect:{effect_name}")
- if not effect_executor:
- effect_executor = get_executor("EFFECT")
-
- if effect_executor:
- temp_dir = Path(tempfile.mkdtemp())
- temp_output = temp_dir / f"compound_{i}_{effect_name}.mkv"
-
- logger.info(f"COMPOUND: Running effect {effect_name} (cid={effect_cid[:16] if effect_cid else 'built-in'}...) step {i+1}/{len(filter_chain)}")
- result_path = effect_executor.execute(filter_config, [current_input], temp_output)
-
- current_input = result_path
- temp_files.append(temp_dir)
- else:
- raise ValueError(f"COMPOUND: No executor for effect {effect_name}")
-
- # Store final result
- output_dir = CACHE_DIR / "nodes" / step.cache_id
- output_dir.mkdir(parents=True, exist_ok=True)
- final_output = output_dir / "output.mkv"
-
- import shutil
- shutil.copy2(current_input, final_output)
-
- # Upload to IPFS (stored by IPFS CID, indexed by cache_id)
- cached, content_cid = cache_manager.put(
- final_output,
- node_type="COMPOUND",
- cache_id=step.cache_id,
- )
-
- # Cleanup temp files
- for temp_dir in temp_files:
- if temp_dir.exists():
- shutil.rmtree(temp_dir, ignore_errors=True)
-
- step_results[step.step_id] = {
- "status": "executed",
- "path": str(final_output),
- "cache_id": step.cache_id,
- "cid": content_cid,
- "filter_count": len(filter_chain),
- }
- cache_id_to_path[step.cache_id] = final_output
- total_executed += 1
- logger.info(f"COMPOUND step {step.step_id}: {len(filter_chain)} effects -> {content_cid[:16]}...")
- continue
-
- # Handle SEQUENCE nodes (concatenate clips)
- if step.node_type.upper() == "SEQUENCE":
- import subprocess
- import tempfile
-
- if len(input_paths) < 2:
- raise ValueError(f"SEQUENCE requires at least 2 inputs, got {len(input_paths)}")
-
- # Create concat list file for FFmpeg
- temp_dir = Path(tempfile.mkdtemp())
- concat_list = temp_dir / "concat.txt"
- with open(concat_list, "w") as f:
- for inp in input_paths:
- f.write(f"file '{inp}'\n")
-
- output_dir = CACHE_DIR / "nodes" / step.cache_id
- output_dir.mkdir(parents=True, exist_ok=True)
- final_output = output_dir / "output.mkv"
-
- # FFmpeg concat demuxer
- cmd = [
- "ffmpeg", "-y",
- "-f", "concat",
- "-safe", "0",
- "-i", str(concat_list),
- "-c", "copy",
- str(final_output)
- ]
-
- logger.info(f"SEQUENCE: Concatenating {len(input_paths)} clips")
- result = subprocess.run(cmd, capture_output=True, text=True)
-
- if result.returncode != 0:
- # Try with re-encoding if copy fails
- cmd = [
- "ffmpeg", "-y",
- "-f", "concat",
- "-safe", "0",
- "-i", str(concat_list),
- "-c:v", "libx264", "-c:a", "aac",
- str(final_output)
- ]
- result = subprocess.run(cmd, capture_output=True, text=True)
- if result.returncode != 0:
- raise RuntimeError(f"FFmpeg concat failed: {result.stderr}")
-
- # Upload to IPFS (stored by IPFS CID, indexed by cache_id)
- cached, content_cid = cache_manager.put(
- final_output,
- node_type="SEQUENCE",
- cache_id=step.cache_id,
- )
-
- # Cleanup
- import shutil
- shutil.rmtree(temp_dir, ignore_errors=True)
-
- step_results[step.step_id] = {
- "status": "executed",
- "path": str(final_output),
- "cache_id": step.cache_id,
- "cid": content_cid,
- "input_count": len(input_paths),
- }
- cache_id_to_path[step.cache_id] = final_output
- total_executed += 1
- logger.info(f"SEQUENCE step {step.step_id}: {len(input_paths)} clips -> {content_cid[:16]}...")
- continue
-
- # Handle EFFECT nodes
- if step.node_type.upper() == "EFFECT":
- effect_name = step.config.get("effect")
- if not effect_name:
- raise ValueError(f"EFFECT node missing 'effect' in config: {step.config}")
-
- # Try specific executor first (e.g., effect:dog)
- executor = get_executor(f"effect:{effect_name}")
- logger.info(f"EFFECT: get_executor('effect:{effect_name}') = {executor}")
- if not executor:
- # Fall back to generic EFFECT executor (handles IPFS effects)
- executor = get_executor("EFFECT")
- logger.info(f"EFFECT: Fallback get_executor('EFFECT') = {executor}")
- if not executor:
- raise ValueError(f"No executor for effect: {effect_name}")
-
- if len(input_paths) != 1:
- raise ValueError(f"EFFECT expects 1 input, got {len(input_paths)}")
-
- output_dir = CACHE_DIR / "nodes" / step.cache_id
- output_dir.mkdir(parents=True, exist_ok=True)
- output_path = output_dir / "output.mkv"
-
- effect_cid = step.config.get("cid")
- logger.info(f"EFFECT: Running {effect_name} (cid={effect_cid[:16] if effect_cid else 'built-in'}...)")
- result_path = executor.execute(step.config, input_paths, output_path)
-
- cached, content_cid = cache_manager.put(
- result_path,
- node_type="EFFECT",
- cache_id=step.cache_id,
- )
-
- step_results[step.step_id] = {
- "status": "executed",
- "path": str(result_path),
- "cache_id": step.cache_id,
- "cid": content_cid,
- "effect": effect_name,
- }
- cache_id_to_path[step.cache_id] = result_path
- total_executed += 1
- logger.info(f"EFFECT step {step.step_id}: {effect_name} -> {content_cid[:16]}...")
- continue
-
- # Fallback: try to get executor for unknown node types
- executor = get_executor(step.node_type.upper())
- if not executor:
- executor = get_executor(step.node_type)
-
- if not executor:
- raise ValueError(f"No executor for node type: {step.node_type}")
-
- # Determine output path
- output_dir = CACHE_DIR / "nodes" / step.cache_id
- output_dir.mkdir(parents=True, exist_ok=True)
- output_path = output_dir / "output.mkv"
-
- # Execute
- logger.info(f"Executing step {step.step_id} ({step.node_type}) with {len(input_paths)} inputs")
- result_path = executor.execute(step.config, input_paths, output_path)
-
- # Store result in cache (by IPFS CID, indexed by cache_id)
- cached, content_cid = cache_manager.put(
- result_path,
- node_type=step.node_type,
- cache_id=step.cache_id,
- )
-
- step_results[step.step_id] = {
- "status": "executed",
- "path": str(result_path),
- "cache_id": step.cache_id,
- "cid": content_cid,
- "ipfs_cid": content_cid if content_cid.startswith("Qm") or content_cid.startswith("bafy") else None,
- }
- cache_id_to_path[step.cache_id] = result_path
- total_executed += 1
-
- logger.info(f"Step {step.step_id}: executed -> {content_cid[:16]}...")
-
- except Exception as e:
- logger.error(f"Step {step.step_id} failed: {e}")
- return {
- "success": False,
- "run_id": run_id,
- "error": f"Step {step.step_id} failed: {e}",
- "step_results": step_results,
- "plan_cid": plan_cid,
- "plan_sexp": plan_sexp,
- }
-
- # Get output from final step
- output_step = next((s for s in plan.steps if s.step_id == plan.output_step_id), None)
- output_cid = None
- output_ipfs_cid = None
- output_path = None
- output_cache_id = None # Keep track of cache_id separately
-
- if output_step:
- output_result = step_results.get(output_step.step_id, {})
- output_cache_id = output_result.get("cache_id")
- output_ipfs_cid = output_result.get("cid") # cid should be IPFS CID now
- output_path = output_result.get("path")
-
- # Upload final output to IPFS if not already there
- if output_path and not output_ipfs_cid:
- output_path_obj = Path(output_path) if isinstance(output_path, str) else output_path
- if output_path_obj.exists():
- logger.info(f"Uploading final output to IPFS: {output_path}")
- output_ipfs_cid = ipfs_client.add_file(str(output_path_obj))
- if output_ipfs_cid:
- logger.info(f"Uploaded output to IPFS: {output_ipfs_cid}")
- else:
- logger.error(f"Failed to upload output to IPFS: {output_path}")
-
- # Use IPFS CID as the primary output identifier
- output_cid = output_ipfs_cid
-
- # Fail if output couldn't be uploaded to IPFS
- if not output_ipfs_cid:
- logger.error(f"Recipe failed: Could not upload output to IPFS! output_cid={output_cid}, output_path={output_path}")
- return {
- "success": False,
- "run_id": run_id,
- "error": "Failed to upload output to IPFS",
- "plan_cid": plan_cid,
- "plan_sexp": plan_sexp,
- "output_cid": output_cid,
- "output_path": output_path,
- "step_results": step_results,
- "total_steps": len(plan.steps),
- "cached": total_cached,
- "executed": total_executed,
- }
-
- # Fail if no output was produced
- if not output_cid:
- logger.error(f"Recipe produced no output! output_step={plan.output_step_id}, result={output_result if output_step else 'no output step'}")
- return {
- "success": False,
- "run_id": run_id,
- "error": "Recipe produced no output",
- "plan_cid": plan_cid,
- "plan_sexp": plan_sexp,
- "step_results": step_results,
- "total_steps": len(plan.steps),
- "cached": total_cached,
- "executed": total_executed,
- }
-
- # ============ Phase 6: Store Results ============
- logger.info("Phase 5: Storing results...")
-
- # Store in database
- import asyncio
- import database
-
- async def save_to_db():
- if database.pool is None:
- await database.init_db()
-
- # Get actor_id from pending run
- actor_id = None
- pending = await database.get_pending_run(run_id) if run_id else None
- if pending:
- actor_id = pending.get("actor_id")
-
- await database.save_run_cache(
- run_id=run_id,
- output_cid=output_cid,
- recipe=recipe_name,
- inputs=list(input_hashes.values()),
- ipfs_cid=output_ipfs_cid,
- actor_id=actor_id,
- plan_cid=plan_cid,
- )
-
- # Save output as media for user
- if actor_id and output_cid:
- await database.save_item_metadata(
- cid=output_cid,
- actor_id=actor_id,
- item_type="media",
- description=f"Output from recipe: {recipe_name}",
- source_type="recipe",
- source_note=f"run_id: {run_id}",
- )
-
- # Complete pending run
- if pending and run_id:
- await database.complete_pending_run(run_id)
-
- try:
- loop = asyncio.get_event_loop()
- if loop.is_running():
- asyncio.ensure_future(save_to_db())
- else:
- loop.run_until_complete(save_to_db())
- except RuntimeError:
- asyncio.run(save_to_db())
-
- # Build and store provenance
- provenance = {
- "task_id": self.request.id,
- "run_id": run_id,
- "rendered_at": datetime.now(timezone.utc).isoformat(),
- "recipe": recipe_name,
- "recipe_sexp": recipe_sexp,
- "plan_sexp": plan_sexp,
- "plan_cid": plan_cid,
- "output": {
- "cid": output_cid,
- "ipfs_cid": output_ipfs_cid,
- },
- "inputs": input_hashes,
- "steps": {
- step_id: {
- "cache_id": result.get("cache_id"),
- "cid": result.get("cid"),
- "status": result.get("status"),
- }
- for step_id, result in step_results.items()
- },
- "execution": {
- "total_steps": len(plan.steps),
- "cached": total_cached,
- "executed": total_executed,
- }
- }
-
- provenance_cid = ipfs_client.add_json(provenance)
- if provenance_cid:
- logger.info(f"Stored provenance on IPFS: {provenance_cid}")
-
- logger.info(f"Recipe execution complete: output={output_cid[:16] if output_cid else 'none'}...")
-
- return {
- "success": True,
- "run_id": run_id,
- "recipe": recipe_name,
- "plan_cid": plan_cid,
- "plan_sexp": plan_sexp,
- "output_cid": output_cid,
- "output_ipfs_cid": output_ipfs_cid,
- "output_path": output_path,
- "total_steps": len(plan.steps),
- "cached": total_cached,
- "executed": total_executed,
- "step_results": step_results,
- "provenance_cid": provenance_cid,
- }
-
-
-def build_effect_dag(input_hashes: List[str], effect_name: str) -> DAG:
- """
- Build a simple DAG for applying an effect to inputs.
-
- Args:
- input_hashes: List of input content hashes
- effect_name: Name of effect to apply (e.g., "dog", "identity")
-
- Returns:
- DAG ready for execution
- """
- dag = DAG()
-
- # Add source nodes for each input
- source_ids = []
- for i, cid in enumerate(input_hashes):
- source_node = Node(
- node_type=NodeType.SOURCE,
- config={"cid": cid},
- name=f"source_{i}",
- )
- dag.add_node(source_node)
- source_ids.append(source_node.node_id)
-
- # Add effect node
- effect_node = Node(
- node_type=f"effect:{effect_name}",
- config={},
- inputs=source_ids,
- name=f"effect_{effect_name}",
- )
- dag.add_node(effect_node)
- dag.set_output(effect_node.node_id)
-
- return dag
diff --git a/render.py b/render.py
deleted file mode 100755
index 5f7ccd6..0000000
--- a/render.py
+++ /dev/null
@@ -1,65 +0,0 @@
-#!/usr/bin/env python3
-"""
-CLI to submit render tasks to Art DAG Celery.
-
-Usage:
- python render.py dog cat # Render cat through dog effect
- python render.py identity cat # Render cat through identity effect
- python render.py