diff --git a/app/routers/cache.py b/app/routers/cache.py
index 1a491d7..dc03d44 100644
--- a/app/routers/cache.py
+++ b/app/routers/cache.py
@@ -8,7 +8,7 @@ import logging
from pathlib import Path
from typing import Optional, Dict, Any
-from fastapi import APIRouter, Request, Depends, HTTPException, UploadFile, File
+from fastapi import APIRouter, Request, Depends, HTTPException, UploadFile, File, Form
from fastapi.responses import HTMLResponse, FileResponse
from pydantic import BaseModel
@@ -208,13 +208,95 @@ async def import_from_ipfs(
return {"cid": cid, "imported": True}
-@router.post("/upload")
-async def upload_content(
- file: UploadFile = File(...),
+@router.post("/upload/chunk")
+async def upload_chunk(
+ request: Request,
+ chunk: UploadFile = File(...),
+ upload_id: str = Form(...),
+ chunk_index: int = Form(...),
+ total_chunks: int = Form(...),
+ filename: str = Form(...),
+ display_name: Optional[str] = Form(None),
ctx: UserContext = Depends(require_auth),
cache_service: CacheService = Depends(get_cache_service),
):
- """Upload content to cache and IPFS."""
+ """Upload a file chunk. Assembles file when all chunks received."""
+ import tempfile
+ import os
+
+ # Create temp dir for this upload
+ chunk_dir = Path(tempfile.gettempdir()) / "uploads" / upload_id
+ chunk_dir.mkdir(parents=True, exist_ok=True)
+
+ # Save this chunk
+ chunk_path = chunk_dir / f"chunk_{chunk_index:05d}"
+ chunk_data = await chunk.read()
+ chunk_path.write_bytes(chunk_data)
+
+ # Check if all chunks received
+ received = len(list(chunk_dir.glob("chunk_*")))
+
+ if received < total_chunks:
+ return {"status": "partial", "received": received, "total": total_chunks}
+
+ # All chunks received - assemble file
+ final_path = chunk_dir / filename
+ with open(final_path, 'wb') as f:
+ for i in range(total_chunks):
+ cp = chunk_dir / f"chunk_{i:05d}"
+ f.write(cp.read_bytes())
+ cp.unlink() # Clean up chunk
+
+ # Read assembled file
+ content = final_path.read_bytes()
+ final_path.unlink()
+ chunk_dir.rmdir()
+
+ # Now do the normal upload flow
+ cid, ipfs_cid, error = await cache_service.upload_content(
+ content=content,
+ filename=filename,
+ actor_id=ctx.actor_id,
+ )
+
+ if error:
+ raise HTTPException(400, error)
+
+ # Assign friendly name
+ final_cid = ipfs_cid or cid
+ from ..services.naming_service import get_naming_service
+ naming = get_naming_service()
+ friendly_entry = await naming.assign_name(
+ cid=final_cid,
+ actor_id=ctx.actor_id,
+ item_type="media",
+ display_name=display_name,
+ filename=filename,
+ )
+
+ return {
+ "status": "complete",
+ "cid": final_cid,
+ "friendly_name": friendly_entry["friendly_name"],
+ "filename": filename,
+ "size": len(content),
+ "uploaded": True,
+ }
+
+
+@router.post("/upload")
+async def upload_content(
+ file: UploadFile = File(...),
+ display_name: Optional[str] = Form(None),
+ ctx: UserContext = Depends(require_auth),
+ cache_service: CacheService = Depends(get_cache_service),
+):
+ """Upload content to cache and IPFS.
+
+ Args:
+ file: The file to upload
+ display_name: Optional custom name for the media (used as friendly name)
+ """
content = await file.read()
cid, ipfs_cid, error = await cache_service.upload_content(
content=content,
@@ -233,6 +315,7 @@ async def upload_content(
cid=final_cid,
actor_id=ctx.actor_id,
item_type="media",
+ display_name=display_name, # Use custom name if provided
filename=file.filename,
)
@@ -350,3 +433,83 @@ async def update_metadata_htmx(
Steps
- {{ run.executed or 0 }} / {{ run.total_steps or (plan.steps|length if plan and plan.steps else '?') }}
+ {% if run.recipe == 'streaming' %}
+ {% if run.status == 'completed' %}1 / 1{% else %}0 / 1{% endif %}
+ {% else %}
+ {{ run.executed or 0 }} / {{ run.total_steps or (plan.steps|length if plan and plan.steps else '?') }}
+ {% endif %}
{% if run.cached_steps %}
({{ run.cached_steps }} cached)
{% endif %}
diff --git a/cache_manager.py b/cache_manager.py
index 99b5acc..9474ca2 100644
--- a/cache_manager.py
+++ b/cache_manager.py
@@ -175,15 +175,17 @@ class L1CacheManager:
# No fallbacks - failures raise exceptions.
def _run_async(self, coro):
- """Run async coroutine from sync context."""
+ """Run async coroutine from sync context.
+
+ Always creates a fresh event loop to avoid issues with Celery's
+ prefork workers where loops may be closed by previous tasks.
+ """
import asyncio
+ # Check if we're already in an async context
try:
- loop = asyncio.get_running_loop()
- # Already in async context - schedule on the running loop
- future = asyncio.ensure_future(coro, loop=loop)
- # Can't block here, so we need a different approach
- # Use a new thread with its own loop
+ asyncio.get_running_loop()
+ # We're in an async context - use a thread with its own loop
import threading
result = [None]
error = [None]
@@ -206,13 +208,13 @@ class L1CacheManager:
raise error[0]
return result[0]
except RuntimeError:
- # No running loop - safe to use run_until_complete
+ # No running loop - create a fresh one (don't reuse potentially closed loops)
+ loop = asyncio.new_event_loop()
+ asyncio.set_event_loop(loop)
try:
- loop = asyncio.get_event_loop()
- except RuntimeError:
- loop = asyncio.new_event_loop()
- asyncio.set_event_loop(loop)
- return loop.run_until_complete(coro)
+ return loop.run_until_complete(coro)
+ finally:
+ loop.close()
def _set_content_index(self, cache_id: str, ipfs_cid: str):
"""Set content index entry in database (cache_id -> ipfs_cid)."""
@@ -341,28 +343,38 @@ class L1CacheManager:
cache_id: str = None,
execution_time: float = 0.0,
move: bool = False,
+ skip_ipfs: bool = False,
) -> tuple[CachedFile, Optional[str]]:
"""
- Store a file in the cache and upload to IPFS.
+ Store a file in the cache and optionally upload to IPFS.
- Files are ALWAYS stored by IPFS CID. The cache_id parameter creates
- an index from cache_id -> IPFS CID for code-addressed lookups.
+ Files are stored by IPFS CID when skip_ipfs=False (default), or by
+ local content hash when skip_ipfs=True. The cache_id parameter creates
+ an index from cache_id -> CID for code-addressed lookups.
Args:
source_path: Path to file to cache
node_type: Type of node (e.g., "upload", "source", "effect")
- node_id: DEPRECATED - ignored, always uses IPFS CID
+ node_id: DEPRECATED - ignored, always uses CID
cache_id: Optional code-addressed cache ID to index
execution_time: How long the operation took
move: If True, move instead of copy
+ skip_ipfs: If True, skip IPFS upload and use local hash (faster for large files)
Returns:
- Tuple of (CachedFile with both node_id and cid, CID)
+ Tuple of (CachedFile with both node_id and cid, CID or None if skip_ipfs)
"""
- # Upload to IPFS first to get the CID (primary identifier)
- cid = ipfs_client.add_file(source_path)
- if not cid:
- raise RuntimeError(f"IPFS upload failed for {source_path}. IPFS is required.")
+ if skip_ipfs:
+ # Use local content hash instead of IPFS CID (much faster)
+ cid = file_hash(source_path)
+ ipfs_cid = None
+ logger.info(f"put: Using local hash (skip_ipfs=True): {cid[:16]}...")
+ else:
+ # Upload to IPFS first to get the CID (primary identifier)
+ cid = ipfs_client.add_file(source_path)
+ if not cid:
+ raise RuntimeError(f"IPFS upload failed for {source_path}. IPFS is required.")
+ ipfs_cid = cid
# Always store by IPFS CID (node_id parameter is deprecated)
node_id = cid
@@ -370,11 +382,12 @@ class L1CacheManager:
# Check if already cached (by node_id)
existing = self.cache.get_entry(node_id)
if existing and existing.output_path.exists():
- return CachedFile.from_cache_entry(existing), cid
+ return CachedFile.from_cache_entry(existing), ipfs_cid
# Compute local hash BEFORE moving the file (for dual-indexing)
+ # Only needed if we uploaded to IPFS (to map local hash -> IPFS CID)
local_hash = None
- if self._is_ipfs_cid(cid):
+ if not skip_ipfs and self._is_ipfs_cid(cid):
local_hash = file_hash(source_path)
# Store in local cache
@@ -405,9 +418,9 @@ class L1CacheManager:
self._set_content_index(local_hash, cid)
logger.debug(f"Indexed local hash {local_hash[:16]}... -> IPFS {cid}")
- logger.info(f"Cached: {cid[:16]}...")
+ logger.info(f"Cached: {cid[:16]}..." + (" (local only)" if skip_ipfs else " (IPFS)"))
- return CachedFile.from_cache_entry(entry), cid
+ return CachedFile.from_cache_entry(entry), ipfs_cid if not skip_ipfs else None
def get_by_node_id(self, node_id: str) -> Optional[Path]:
"""Get cached file path by node_id."""
diff --git a/celery_app.py b/celery_app.py
index f997330..13449df 100644
--- a/celery_app.py
+++ b/celery_app.py
@@ -14,7 +14,7 @@ app = Celery(
'art_celery',
broker=REDIS_URL,
backend=REDIS_URL,
- include=['tasks', 'tasks.streaming']
+ include=['tasks', 'tasks.streaming', 'tasks.ipfs_upload']
)
app.conf.update(
diff --git a/database.py b/database.py
index a2cc80c..ebc0eee 100644
--- a/database.py
+++ b/database.py
@@ -1129,7 +1129,10 @@ async def save_run_cache(
output_cid = EXCLUDED.output_cid,
ipfs_cid = COALESCE(EXCLUDED.ipfs_cid, run_cache.ipfs_cid),
provenance_cid = COALESCE(EXCLUDED.provenance_cid, run_cache.provenance_cid),
- plan_cid = COALESCE(EXCLUDED.plan_cid, run_cache.plan_cid)
+ plan_cid = COALESCE(EXCLUDED.plan_cid, run_cache.plan_cid),
+ actor_id = COALESCE(EXCLUDED.actor_id, run_cache.actor_id),
+ recipe = COALESCE(EXCLUDED.recipe, run_cache.recipe),
+ inputs = COALESCE(EXCLUDED.inputs, run_cache.inputs)
RETURNING run_id, output_cid, ipfs_cid, provenance_cid, plan_cid, recipe, inputs, actor_id, created_at
""",
run_id, output_cid, ipfs_cid, provenance_cid, plan_cid, recipe, _json.dumps(inputs), actor_id
diff --git a/ipfs_client.py b/ipfs_client.py
index 8d71758..edb6c8e 100644
--- a/ipfs_client.py
+++ b/ipfs_client.py
@@ -19,8 +19,8 @@ logger = logging.getLogger(__name__)
# IPFS API multiaddr - default to local, docker uses /dns/ipfs/tcp/5001
IPFS_API = os.getenv("IPFS_API", "/ip4/127.0.0.1/tcp/5001")
-# Connection timeout in seconds
-IPFS_TIMEOUT = int(os.getenv("IPFS_TIMEOUT", "30"))
+# Connection timeout in seconds (increased for large files)
+IPFS_TIMEOUT = int(os.getenv("IPFS_TIMEOUT", "120"))
def _multiaddr_to_url(multiaddr: str) -> str:
diff --git a/recipes/woods-recipe.sexp b/recipes/woods-recipe.sexp
new file mode 100644
index 0000000..4c5f4ec
--- /dev/null
+++ b/recipes/woods-recipe.sexp
@@ -0,0 +1,134 @@
+;; Woods Recipe - Using friendly names for all assets
+;;
+;; Requires uploaded:
+;; - Media: woods-1 through woods-8 (videos), woods-audio (audio)
+;; - Effects: fx-rotate, fx-zoom, fx-blend, fx-ripple, fx-invert, fx-hue-shift
+;; - Templates: tpl-standard-primitives, tpl-standard-effects, tpl-process-pair,
+;; tpl-crossfade-zoom, tpl-scan-spin, tpl-scan-ripple
+
+(stream "woods-recipe"
+ :fps 30
+ :width 1920
+ :height 1080
+ :seed 42
+
+ ;; Load standard primitives and effects via friendly names
+ (include :name "tpl-standard-primitives")
+ (include :name "tpl-standard-effects")
+
+ ;; Load reusable templates
+ (include :name "tpl-process-pair")
+ (include :name "tpl-crossfade-zoom")
+
+ ;; === SOURCES AS ARRAY (using friendly names) ===
+ (def sources [
+ (streaming:make-video-source "woods-1" 30)
+ (streaming:make-video-source "woods-2" 30)
+ (streaming:make-video-source "woods-3" 30)
+ (streaming:make-video-source "woods-4" 30)
+ (streaming:make-video-source "woods-5" 30)
+ (streaming:make-video-source "woods-6" 30)
+ (streaming:make-video-source "woods-7" 30)
+ (streaming:make-video-source "woods-8" 30)
+ ])
+
+ ;; Per-pair config: [rot-dir, rot-a-max, rot-b-max, zoom-a-max, zoom-b-max]
+ (def pair-configs [
+ {:dir -1 :rot-a 45 :rot-b -45 :zoom-a 1.5 :zoom-b 0.5}
+ {:dir 1 :rot-a 45 :rot-b -45 :zoom-a 1.5 :zoom-b 0.5}
+ {:dir 1 :rot-a 45 :rot-b -45 :zoom-a 1.5 :zoom-b 0.5}
+ {:dir -1 :rot-a -45 :rot-b 45 :zoom-a 0.5 :zoom-b 1.5}
+ {:dir -1 :rot-a 45 :rot-b -45 :zoom-a 1.5 :zoom-b 0.5}
+ {:dir 1 :rot-a 30 :rot-b -30 :zoom-a 1.3 :zoom-b 0.7}
+ {:dir -1 :rot-a -45 :rot-b 45 :zoom-a 0.5 :zoom-b 1.5}
+ {:dir 1 :rot-a 45 :rot-b -45 :zoom-a 1.5 :zoom-b 0.5}
+ ])
+
+ ;; Audio analyzer (using friendly name)
+ (def music (streaming:make-audio-analyzer "woods-audio"))
+
+ ;; Audio playback (friendly name resolved by streaming primitives)
+ (audio-playback "woods-audio")
+
+ ;; === GLOBAL SCANS ===
+
+ ;; Cycle state: which source is active
+ (scan cycle (streaming:audio-beat music t)
+ :init {:active 0 :beat 0 :clen 16}
+ :step (if (< (+ beat 1) clen)
+ (dict :active active :beat (+ beat 1) :clen clen)
+ (dict :active (mod (+ active 1) (len sources)) :beat 0
+ :clen (+ 8 (mod (* (streaming:audio-beat-count music t) 7) 17)))))
+
+ ;; Reusable scans from templates
+ (include :name "tpl-scan-spin")
+ (include :name "tpl-scan-ripple")
+
+ ;; === PER-PAIR STATE ===
+ (scan pairs (streaming:audio-beat music t)
+ :init {:states (map (core:range (len sources)) (lambda (_)
+ {:inv-a 0 :inv-b 0 :hue-a 0 :hue-b 0 :hue-a-val 0 :hue-b-val 0 :mix 0.5 :mix-rem 5 :angle 0 :rot-beat 0 :rot-clen 25}))}
+ :step (dict :states (map states (lambda (p)
+ (let [new-inv-a (if (< (core:rand) 0.1) (+ 1 (core:rand-int 1 4)) (core:max 0 (- (get p :inv-a) 1)))
+ new-inv-b (if (< (core:rand) 0.1) (+ 1 (core:rand-int 1 4)) (core:max 0 (- (get p :inv-b) 1)))
+ old-hue-a (get p :hue-a)
+ old-hue-b (get p :hue-b)
+ new-hue-a (if (< (core:rand) 0.1) (+ 1 (core:rand-int 1 4)) (core:max 0 (- old-hue-a 1)))
+ new-hue-b (if (< (core:rand) 0.1) (+ 1 (core:rand-int 1 4)) (core:max 0 (- old-hue-b 1)))
+ new-hue-a-val (if (> new-hue-a old-hue-a) (+ 30 (* (core:rand) 300)) (get p :hue-a-val))
+ new-hue-b-val (if (> new-hue-b old-hue-b) (+ 30 (* (core:rand) 300)) (get p :hue-b-val))
+ mix-rem (get p :mix-rem)
+ old-mix (get p :mix)
+ new-mix-rem (if (> mix-rem 0) (- mix-rem 1) (+ 1 (core:rand-int 1 10)))
+ new-mix (if (> mix-rem 0) old-mix (* (core:rand-int 0 2) 0.5))
+ rot-beat (get p :rot-beat)
+ rot-clen (get p :rot-clen)
+ old-angle (get p :angle)
+ new-rot-beat (if (< (+ rot-beat 1) rot-clen) (+ rot-beat 1) 0)
+ new-rot-clen (if (< (+ rot-beat 1) rot-clen) rot-clen (+ 20 (core:rand-int 0 10)))
+ new-angle (+ old-angle (/ 360 rot-clen))]
+ (dict :inv-a new-inv-a :inv-b new-inv-b
+ :hue-a new-hue-a :hue-b new-hue-b
+ :hue-a-val new-hue-a-val :hue-b-val new-hue-b-val
+ :mix new-mix :mix-rem new-mix-rem
+ :angle new-angle :rot-beat new-rot-beat :rot-clen new-rot-clen))))))
+
+ ;; === FRAME PIPELINE ===
+ (frame
+ (let [now t
+ e (streaming:audio-energy music now)
+
+ ;; Get cycle state
+ active (bind cycle :active)
+ beat-pos (bind cycle :beat)
+ clen (bind cycle :clen)
+
+ ;; Transition logic
+ phase3 (* beat-pos 3)
+ fading (and (>= phase3 (* clen 2)) (< phase3 (* clen 3)))
+ fade-amt (if fading (/ (- phase3 (* clen 2)) clen) 0)
+ next-idx (mod (+ active 1) (len sources))
+
+ ;; Get pair states array
+ pair-states (bind pairs :states)
+
+ ;; Process active pair using macro from template
+ active-frame (process-pair active)
+
+ ;; Crossfade with zoom during transition
+ result (if fading
+ (crossfade-zoom active-frame (process-pair next-idx) fade-amt)
+ active-frame)
+
+ ;; Final: global spin + ripple
+ spun (rotate result :angle (bind spin :angle))
+ rip-gate (bind ripple-state :gate)
+ rip-amp (* rip-gate (core:map-range e 0 1 5 50))]
+
+ (ripple spun
+ :amplitude rip-amp
+ :center_x (bind ripple-state :cx)
+ :center_y (bind ripple-state :cy)
+ :frequency 8
+ :decay 2
+ :speed 5))))
diff --git a/sexp_effects/parser.py b/sexp_effects/parser.py
index 12bedfd..b2078ea 100644
--- a/sexp_effects/parser.py
+++ b/sexp_effects/parser.py
@@ -1,168 +1,395 @@
"""
-S-Expression Parser
+S-expression parser for ArtDAG recipes and plans.
-Parses S-expressions into Python data structures:
-- Lists become Python lists
-- Symbols become Symbol objects
-- Numbers become int/float
-- Strings become str
-- Keywords (:foo) become Keyword objects
+Supports:
+- Lists: (a b c)
+- Symbols: foo, bar-baz, ->
+- Keywords: :key
+- Strings: "hello world"
+- Numbers: 42, 3.14, -1.5
+- Comments: ; to end of line
+- Vectors: [a b c] (syntactic sugar for lists)
+- Maps: {:key1 val1 :key2 val2} (parsed as Python dicts)
"""
-import re
from dataclasses import dataclass
-from typing import Any, List, Union
+from typing import Any, Dict, List, Union
+import re
-@dataclass(frozen=True)
+@dataclass
class Symbol:
- """A symbol (identifier) in the S-expression."""
+ """An unquoted symbol/identifier."""
name: str
def __repr__(self):
- return self.name
+ return f"Symbol({self.name!r})"
+
+ def __eq__(self, other):
+ if isinstance(other, Symbol):
+ return self.name == other.name
+ if isinstance(other, str):
+ return self.name == other
+ return False
+
+ def __hash__(self):
+ return hash(self.name)
-@dataclass(frozen=True)
+@dataclass
class Keyword:
- """A keyword like :foo in the S-expression."""
+ """A keyword starting with colon."""
name: str
def __repr__(self):
- return f":{self.name}"
+ return f"Keyword({self.name!r})"
+
+ def __eq__(self, other):
+ if isinstance(other, Keyword):
+ return self.name == other.name
+ return False
+
+ def __hash__(self):
+ return hash((':' , self.name))
-# Token patterns
-TOKEN_PATTERNS = [
- (r'\s+', None), # Whitespace (skip)
- (r';[^\n]*', None), # Comments (skip)
- (r'\(', 'LPAREN'),
- (r'\)', 'RPAREN'),
- (r'\[', 'LBRACKET'),
- (r'\]', 'RBRACKET'),
- (r"'", 'QUOTE'),
- (r'"([^"\\]|\\.)*"', 'STRING'),
- (r':[a-zA-Z_][a-zA-Z0-9_\-]*', 'KEYWORD'),
- (r'-?[0-9]+\.[0-9]+', 'FLOAT'),
- (r'-?[0-9]+', 'INT'),
- (r'#t|#f|true|false', 'BOOL'),
- (r'[a-zA-Z_+\-*/<>=!?][a-zA-Z0-9_+\-*/<>=!?]*', 'SYMBOL'),
-]
-
-TOKEN_REGEX = '|'.join(f'(?P<{name}>{pattern})' if name else f'(?:{pattern})'
- for pattern, name in TOKEN_PATTERNS)
+class ParseError(Exception):
+ """Error during S-expression parsing."""
+ def __init__(self, message: str, position: int = 0, line: int = 1, col: int = 1):
+ self.position = position
+ self.line = line
+ self.col = col
+ super().__init__(f"{message} at line {line}, column {col}")
-def tokenize(source: str) -> List[tuple]:
- """Tokenize S-expression source code."""
- tokens = []
- for match in re.finditer(TOKEN_REGEX, source):
- kind = match.lastgroup
- value = match.group()
- if kind:
- tokens.append((kind, value))
- return tokens
+class Tokenizer:
+ """Tokenize S-expression text into tokens."""
+
+ # Token patterns
+ WHITESPACE = re.compile(r'\s+')
+ COMMENT = re.compile(r';[^\n]*')
+ STRING = re.compile(r'"(?:[^"\\]|\\.)*"')
+ NUMBER = re.compile(r'-?(?:\d+\.?\d*|\.\d+)(?:[eE][+-]?\d+)?')
+ KEYWORD = re.compile(r':[a-zA-Z_][a-zA-Z0-9_-]*')
+ SYMBOL = re.compile(r'[a-zA-Z_*+\-><=/!?][a-zA-Z0-9_*+\-><=/!?.:]*')
+
+ def __init__(self, text: str):
+ self.text = text
+ self.pos = 0
+ self.line = 1
+ self.col = 1
+
+ def _advance(self, count: int = 1):
+ """Advance position, tracking line/column."""
+ for _ in range(count):
+ if self.pos < len(self.text):
+ if self.text[self.pos] == '\n':
+ self.line += 1
+ self.col = 1
+ else:
+ self.col += 1
+ self.pos += 1
+
+ def _skip_whitespace_and_comments(self):
+ """Skip whitespace and comments."""
+ while self.pos < len(self.text):
+ # Whitespace
+ match = self.WHITESPACE.match(self.text, self.pos)
+ if match:
+ self._advance(match.end() - self.pos)
+ continue
+
+ # Comments
+ match = self.COMMENT.match(self.text, self.pos)
+ if match:
+ self._advance(match.end() - self.pos)
+ continue
+
+ break
+
+ def peek(self) -> str | None:
+ """Peek at current character."""
+ self._skip_whitespace_and_comments()
+ if self.pos >= len(self.text):
+ return None
+ return self.text[self.pos]
+
+ def next_token(self) -> Any:
+ """Get the next token."""
+ self._skip_whitespace_and_comments()
+
+ if self.pos >= len(self.text):
+ return None
+
+ char = self.text[self.pos]
+ start_line, start_col = self.line, self.col
+
+ # Single-character tokens (parens, brackets, braces)
+ if char in '()[]{}':
+ self._advance()
+ return char
+
+ # String
+ if char == '"':
+ match = self.STRING.match(self.text, self.pos)
+ if not match:
+ raise ParseError("Unterminated string", self.pos, self.line, self.col)
+ self._advance(match.end() - self.pos)
+ # Parse escape sequences
+ content = match.group()[1:-1]
+ content = content.replace('\\n', '\n')
+ content = content.replace('\\t', '\t')
+ content = content.replace('\\"', '"')
+ content = content.replace('\\\\', '\\')
+ return content
+
+ # Keyword
+ if char == ':':
+ match = self.KEYWORD.match(self.text, self.pos)
+ if match:
+ self._advance(match.end() - self.pos)
+ return Keyword(match.group()[1:]) # Strip leading colon
+ raise ParseError(f"Invalid keyword", self.pos, self.line, self.col)
+
+ # Number (must check before symbol due to - prefix)
+ if char.isdigit() or (char == '-' and self.pos + 1 < len(self.text) and
+ (self.text[self.pos + 1].isdigit() or self.text[self.pos + 1] == '.')):
+ match = self.NUMBER.match(self.text, self.pos)
+ if match:
+ self._advance(match.end() - self.pos)
+ num_str = match.group()
+ if '.' in num_str or 'e' in num_str or 'E' in num_str:
+ return float(num_str)
+ return int(num_str)
+
+ # Symbol
+ match = self.SYMBOL.match(self.text, self.pos)
+ if match:
+ self._advance(match.end() - self.pos)
+ return Symbol(match.group())
+
+ raise ParseError(f"Unexpected character: {char!r}", self.pos, self.line, self.col)
-def parse(source: str) -> Any:
- """Parse S-expression source into Python data structures."""
- tokens = tokenize(source)
- pos = [0] # Use list for mutability in nested function
+def parse(text: str) -> Any:
+ """
+ Parse an S-expression string into Python data structures.
- def parse_expr():
- if pos[0] >= len(tokens):
- raise SyntaxError("Unexpected end of input")
+ Returns:
+ Parsed S-expression as nested Python structures:
+ - Lists become Python lists
+ - Symbols become Symbol objects
+ - Keywords become Keyword objects
+ - Strings become Python strings
+ - Numbers become int/float
- kind, value = tokens[pos[0]]
+ Example:
+ >>> parse('(recipe "test" :version "1.0")')
+ [Symbol('recipe'), 'test', Keyword('version'), '1.0']
+ """
+ tokenizer = Tokenizer(text)
+ result = _parse_expr(tokenizer)
- if kind == 'LPAREN':
- pos[0] += 1
- items = []
- while pos[0] < len(tokens) and tokens[pos[0]][0] != 'RPAREN':
- items.append(parse_expr())
- if pos[0] >= len(tokens):
- raise SyntaxError("Missing closing parenthesis")
- pos[0] += 1 # Skip RPAREN
- return items
-
- if kind == 'LBRACKET':
- pos[0] += 1
- items = []
- while pos[0] < len(tokens) and tokens[pos[0]][0] != 'RBRACKET':
- items.append(parse_expr())
- if pos[0] >= len(tokens):
- raise SyntaxError("Missing closing bracket")
- pos[0] += 1 # Skip RBRACKET
- return items
-
- elif kind == 'RPAREN':
- raise SyntaxError("Unexpected closing parenthesis")
-
- elif kind == 'QUOTE':
- pos[0] += 1
- return [Symbol('quote'), parse_expr()]
-
- elif kind == 'STRING':
- pos[0] += 1
- # Remove quotes and unescape
- return value[1:-1].replace('\\"', '"').replace('\\n', '\n')
-
- elif kind == 'INT':
- pos[0] += 1
- return int(value)
-
- elif kind == 'FLOAT':
- pos[0] += 1
- return float(value)
-
- elif kind == 'BOOL':
- pos[0] += 1
- return value in ('#t', 'true')
-
- elif kind == 'KEYWORD':
- pos[0] += 1
- return Keyword(value[1:]) # Remove leading :
-
- elif kind == 'SYMBOL':
- pos[0] += 1
- return Symbol(value)
-
- else:
- raise SyntaxError(f"Unknown token: {kind} {value}")
-
- result = parse_expr()
-
- # Check for multiple top-level expressions
- if pos[0] < len(tokens):
- # Allow multiple top-level expressions, return as list
- results = [result]
- while pos[0] < len(tokens):
- results.append(parse_expr())
- return results
+ # Check for trailing content
+ if tokenizer.peek() is not None:
+ raise ParseError("Unexpected content after expression",
+ tokenizer.pos, tokenizer.line, tokenizer.col)
return result
+def parse_all(text: str) -> List[Any]:
+ """
+ Parse multiple S-expressions from a string.
+
+ Returns list of parsed expressions.
+ """
+ tokenizer = Tokenizer(text)
+ results = []
+
+ while tokenizer.peek() is not None:
+ results.append(_parse_expr(tokenizer))
+
+ return results
+
+
+def _parse_expr(tokenizer: Tokenizer) -> Any:
+ """Parse a single expression."""
+ token = tokenizer.next_token()
+
+ if token is None:
+ raise ParseError("Unexpected end of input", tokenizer.pos, tokenizer.line, tokenizer.col)
+
+ # List
+ if token == '(':
+ return _parse_list(tokenizer, ')')
+
+ # Vector (sugar for list)
+ if token == '[':
+ return _parse_list(tokenizer, ']')
+
+ # Map/dict: {:key1 val1 :key2 val2}
+ if token == '{':
+ return _parse_map(tokenizer)
+
+ # Unexpected closers
+ if isinstance(token, str) and token in ')]}':
+ raise ParseError(f"Unexpected {token!r}", tokenizer.pos, tokenizer.line, tokenizer.col)
+
+ # Atom
+ return token
+
+
+def _parse_list(tokenizer: Tokenizer, closer: str) -> List[Any]:
+ """Parse a list until the closing delimiter."""
+ items = []
+
+ while True:
+ char = tokenizer.peek()
+
+ if char is None:
+ raise ParseError(f"Unterminated list, expected {closer!r}",
+ tokenizer.pos, tokenizer.line, tokenizer.col)
+
+ if char == closer:
+ tokenizer.next_token() # Consume closer
+ return items
+
+ items.append(_parse_expr(tokenizer))
+
+
+def _parse_map(tokenizer: Tokenizer) -> Dict[str, Any]:
+ """Parse a map/dict: {:key1 val1 :key2 val2} -> {"key1": val1, "key2": val2}."""
+ result = {}
+
+ while True:
+ char = tokenizer.peek()
+
+ if char is None:
+ raise ParseError("Unterminated map, expected '}'",
+ tokenizer.pos, tokenizer.line, tokenizer.col)
+
+ if char == '}':
+ tokenizer.next_token() # Consume closer
+ return result
+
+ # Parse key (should be a keyword like :key)
+ key_token = _parse_expr(tokenizer)
+ if isinstance(key_token, Keyword):
+ key = key_token.name
+ elif isinstance(key_token, str):
+ key = key_token
+ else:
+ raise ParseError(f"Map key must be keyword or string, got {type(key_token).__name__}",
+ tokenizer.pos, tokenizer.line, tokenizer.col)
+
+ # Parse value
+ value = _parse_expr(tokenizer)
+ result[key] = value
+
+
+def serialize(expr: Any, indent: int = 0, pretty: bool = False) -> str:
+ """
+ Serialize a Python data structure back to S-expression format.
+
+ Args:
+ expr: The expression to serialize
+ indent: Current indentation level (for pretty printing)
+ pretty: Whether to use pretty printing with newlines
+
+ Returns:
+ S-expression string
+ """
+ if isinstance(expr, list):
+ if not expr:
+ return "()"
+
+ if pretty:
+ return _serialize_pretty(expr, indent)
+ else:
+ items = [serialize(item, indent, False) for item in expr]
+ return "(" + " ".join(items) + ")"
+
+ if isinstance(expr, Symbol):
+ return expr.name
+
+ if isinstance(expr, Keyword):
+ return f":{expr.name}"
+
+ if isinstance(expr, str):
+ # Escape special characters
+ escaped = expr.replace('\\', '\\\\').replace('"', '\\"').replace('\n', '\\n').replace('\t', '\\t')
+ return f'"{escaped}"'
+
+ if isinstance(expr, bool):
+ return "true" if expr else "false"
+
+ if isinstance(expr, (int, float)):
+ return str(expr)
+
+ if expr is None:
+ return "nil"
+
+ if isinstance(expr, dict):
+ # Serialize dict as property list: {:key1 val1 :key2 val2}
+ items = []
+ for k, v in expr.items():
+ items.append(f":{k}")
+ items.append(serialize(v, indent, pretty))
+ return "{" + " ".join(items) + "}"
+
+ raise ValueError(f"Cannot serialize {type(expr).__name__}: {expr!r}")
+
+
+def _serialize_pretty(expr: List, indent: int) -> str:
+ """Pretty-print a list expression with smart formatting."""
+ if not expr:
+ return "()"
+
+ prefix = " " * indent
+ inner_prefix = " " * (indent + 1)
+
+ # Check if this is a simple list that fits on one line
+ simple = serialize(expr, indent, False)
+ if len(simple) < 60 and '\n' not in simple:
+ return simple
+
+ # Start building multiline output
+ head = serialize(expr[0], indent + 1, False)
+ parts = [f"({head}"]
+
+ i = 1
+ while i < len(expr):
+ item = expr[i]
+
+ # Group keyword-value pairs on same line
+ if isinstance(item, Keyword) and i + 1 < len(expr):
+ key = serialize(item, 0, False)
+ val = serialize(expr[i + 1], indent + 1, False)
+
+ # If value is short, put on same line
+ if len(val) < 50 and '\n' not in val:
+ parts.append(f"{inner_prefix}{key} {val}")
+ else:
+ # Value is complex, serialize it pretty
+ val_pretty = serialize(expr[i + 1], indent + 1, True)
+ parts.append(f"{inner_prefix}{key} {val_pretty}")
+ i += 2
+ else:
+ # Regular item
+ item_str = serialize(item, indent + 1, True)
+ parts.append(f"{inner_prefix}{item_str}")
+ i += 1
+
+ return "\n".join(parts) + ")"
+
+
def parse_file(path: str) -> Any:
"""Parse an S-expression file."""
with open(path, 'r') as f:
return parse(f.read())
-# Convenience for pretty-printing
def to_sexp(obj: Any) -> str:
- """Convert Python object back to S-expression string."""
- if isinstance(obj, list):
- return '(' + ' '.join(to_sexp(x) for x in obj) + ')'
- elif isinstance(obj, Symbol):
- return obj.name
- elif isinstance(obj, Keyword):
- return f':{obj.name}'
- elif isinstance(obj, str):
- return f'"{obj}"'
- elif isinstance(obj, bool):
- return '#t' if obj else '#f'
- elif isinstance(obj, (int, float)):
- return str(obj)
- else:
- return repr(obj)
+ """Convert Python object back to S-expression string (alias for serialize)."""
+ return serialize(obj)
diff --git a/sexp_effects/primitive_libs/streaming.py b/sexp_effects/primitive_libs/streaming.py
index 3e2fc51..499c73f 100644
--- a/sexp_effects/primitive_libs/streaming.py
+++ b/sexp_effects/primitive_libs/streaming.py
@@ -55,9 +55,13 @@ class VideoSource:
self._proc.kill()
self._proc = None
+ # Check file exists before trying to open
+ if not self.path.exists():
+ raise FileNotFoundError(f"Video file not found: {self.path}")
+
w, h = self._frame_size
cmd = [
- "ffmpeg", "-v", "quiet",
+ "ffmpeg", "-v", "error", # Show errors instead of quiet
"-ss", f"{seek_time:.3f}",
"-i", str(self.path),
"-f", "rawvideo", "-pix_fmt", "rgb24",
@@ -65,9 +69,18 @@ class VideoSource:
"-r", str(self.fps), # Output at specified fps
"-"
]
- self._proc = subprocess.Popen(cmd, stdout=subprocess.PIPE, stderr=subprocess.DEVNULL)
+ self._proc = subprocess.Popen(cmd, stdout=subprocess.PIPE, stderr=subprocess.PIPE)
self._stream_time = seek_time
+ # Check if process started successfully by reading first bit of stderr
+ import select
+ import sys
+ readable, _, _ = select.select([self._proc.stderr], [], [], 0.5)
+ if readable:
+ err = self._proc.stderr.read(4096).decode('utf-8', errors='ignore')
+ if err:
+ print(f"ffmpeg error for {self.path.name}: {err}", file=sys.stderr)
+
def _read_frame_from_stream(self) -> np.ndarray:
"""Read one frame from the stream."""
w, h = self._frame_size
@@ -130,8 +143,12 @@ class VideoSource:
frame = self._read_frame_from_stream()
if frame is None:
import sys
- print(f"NULL FRAME {self.path.name}: t={t:.2f} seek={seek_time:.2f}", file=sys.stderr)
- frame = np.zeros((h, w, 3), dtype=np.uint8)
+ # Check for ffmpeg errors
+ if self._proc and self._proc.stderr:
+ err = self._proc.stderr.read(4096).decode('utf-8', errors='ignore')
+ if err:
+ raise RuntimeError(f"Failed to read video frame from {self.path.name}: {err}")
+ raise RuntimeError(f"Failed to read video frame from {self.path.name} at t={t:.2f} - file may be corrupted or inaccessible")
else:
self._stream_time += self._frame_time
diff --git a/streaming/stream_sexp_generic.py b/streaming/stream_sexp_generic.py
index 4f88614..1256c62 100644
--- a/streaming/stream_sexp_generic.py
+++ b/streaming/stream_sexp_generic.py
@@ -30,12 +30,9 @@ from pathlib import Path
from dataclasses import dataclass
from typing import Dict, List, Any, Optional, Tuple
-# Try pip-installed artdag first, fall back to local path
-try:
- from artdag.sexp.parser import parse, parse_all, Symbol, Keyword
-except ImportError:
- sys.path.insert(0, str(Path(__file__).parent.parent.parent / "artdag"))
- from artdag.sexp.parser import parse, parse_all, Symbol, Keyword
+# Use local sexp_effects parser (supports namespaced symbols like math:sin)
+sys.path.insert(0, str(Path(__file__).parent.parent))
+from sexp_effects.parser import parse, parse_all, Symbol, Keyword
@dataclass
@@ -54,9 +51,10 @@ class StreamInterpreter:
and calls primitives.
"""
- def __init__(self, sexp_path: str):
+ def __init__(self, sexp_path: str, actor_id: Optional[str] = None):
self.sexp_path = Path(sexp_path)
self.sexp_dir = self.sexp_path.parent
+ self.actor_id = actor_id # For friendly name resolution
text = self.sexp_path.read_text()
self.ast = parse(text)
@@ -84,6 +82,26 @@ class StreamInterpreter:
self.sources_config: Optional[Path] = None
self.audio_config: Optional[Path] = None
+ # Error tracking
+ self.errors: List[str] = []
+
+ def _resolve_name(self, name: str) -> Optional[Path]:
+ """Resolve a friendly name to a file path using the naming service."""
+ try:
+ # Import here to avoid circular imports
+ from tasks.streaming import resolve_asset
+ path = resolve_asset(name, self.actor_id)
+ if path:
+ return path
+ except Exception as e:
+ print(f"Warning: failed to resolve name '{name}': {e}", file=sys.stderr)
+ return None
+
+ def _record_error(self, msg: str):
+ """Record an error that occurred during evaluation."""
+ self.errors.append(msg)
+ print(f"ERROR: {msg}", file=sys.stderr)
+
import random
self.rng = random.Random(self.config.get('seed', 42))
@@ -241,27 +259,50 @@ class StreamInterpreter:
self.macros[name] = {'params': params, 'body': body}
elif cmd == 'effect':
- # Handle (effect name :path "...") in included files - recursive
+ # Handle (effect name :path "...") or (effect name :name "...") in included files
i = 2
while i < len(form):
- if isinstance(form[i], Keyword) and form[i].name == 'path':
- path = str(form[i + 1]).strip('"')
- # Resolve relative to the file being loaded
- full = (effect_path.parent / path).resolve()
- self._load_effect(full)
- i += 2
+ if isinstance(form[i], Keyword):
+ kw = form[i].name
+ if kw == 'path':
+ path = str(form[i + 1]).strip('"')
+ full = (effect_path.parent / path).resolve()
+ self._load_effect(full)
+ i += 2
+ elif kw == 'name':
+ fname = str(form[i + 1]).strip('"')
+ resolved = self._resolve_name(fname)
+ if resolved:
+ self._load_effect(resolved)
+ else:
+ raise RuntimeError(f"Could not resolve effect name '{fname}' - make sure it's uploaded and you're logged in")
+ i += 2
+ else:
+ i += 1
else:
i += 1
elif cmd == 'include':
- # Handle (include :path "...") in included files - recursive
+ # Handle (include :path "...") or (include :name "...") in included files
i = 1
while i < len(form):
- if isinstance(form[i], Keyword) and form[i].name == 'path':
- path = str(form[i + 1]).strip('"')
- full = (effect_path.parent / path).resolve()
- self._load_effect(full)
- i += 2
+ if isinstance(form[i], Keyword):
+ kw = form[i].name
+ if kw == 'path':
+ path = str(form[i + 1]).strip('"')
+ full = (effect_path.parent / path).resolve()
+ self._load_effect(full)
+ i += 2
+ elif kw == 'name':
+ fname = str(form[i + 1]).strip('"')
+ resolved = self._resolve_name(fname)
+ if resolved:
+ self._load_effect(resolved)
+ else:
+ raise RuntimeError(f"Could not resolve include name '{fname}' - make sure it's uploaded and you're logged in")
+ i += 2
+ else:
+ i += 1
else:
i += 1
@@ -313,22 +354,49 @@ class StreamInterpreter:
name = form[1].name if isinstance(form[1], Symbol) else str(form[1])
i = 2
while i < len(form):
- if isinstance(form[i], Keyword) and form[i].name == 'path':
- path = str(form[i + 1]).strip('"')
- full = (self.sexp_dir / path).resolve()
- self._load_effect(full)
- i += 2
+ if isinstance(form[i], Keyword):
+ kw = form[i].name
+ if kw == 'path':
+ path = str(form[i + 1]).strip('"')
+ full = (self.sexp_dir / path).resolve()
+ self._load_effect(full)
+ i += 2
+ elif kw == 'name':
+ # Resolve friendly name to path
+ fname = str(form[i + 1]).strip('"')
+ resolved = self._resolve_name(fname)
+ if resolved:
+ self._load_effect(resolved)
+ else:
+ raise RuntimeError(f"Could not resolve effect name '{fname}' - make sure it's uploaded and you're logged in")
+ i += 2
+ else:
+ i += 1
else:
i += 1
elif cmd == 'include':
i = 1
while i < len(form):
- if isinstance(form[i], Keyword) and form[i].name == 'path':
- path = str(form[i + 1]).strip('"')
- full = (self.sexp_dir / path).resolve()
- self._load_effect(full)
- i += 2
+ if isinstance(form[i], Keyword):
+ kw = form[i].name
+ if kw == 'path':
+ path = str(form[i + 1]).strip('"')
+ full = (self.sexp_dir / path).resolve()
+ self._load_effect(full)
+ i += 2
+ elif kw == 'name':
+ # Resolve friendly name to path
+ fname = str(form[i + 1]).strip('"')
+ resolved = self._resolve_name(fname)
+ if resolved:
+ self._load_effect(resolved)
+ else:
+ raise RuntimeError(f"Could not resolve include name '{fname}' - make sure it's uploaded and you're logged in")
+ raise RuntimeError(f"Could not resolve include name '{fname}' - make sure it's uploaded and you're logged in")
+ i += 2
+ else:
+ i += 1
else:
i += 1
@@ -337,7 +405,13 @@ class StreamInterpreter:
# Skip if already set by config file
if self.audio_playback is None:
path = str(form[1]).strip('"')
- self.audio_playback = str((self.sexp_dir / path).resolve())
+ # Try to resolve as friendly name first
+ resolved = self._resolve_name(path)
+ if resolved:
+ self.audio_playback = str(resolved)
+ else:
+ # Fall back to relative path
+ self.audio_playback = str((self.sexp_dir / path).resolve())
print(f"Audio playback: {self.audio_playback}", file=sys.stderr)
elif cmd == 'def':
@@ -419,6 +493,10 @@ class StreamInterpreter:
if isinstance(expr, Keyword):
return expr.name
+ # Handle dicts from new parser - evaluate values
+ if isinstance(expr, dict):
+ return {k: self._eval(v, env) for k, v in expr.items()}
+
if not isinstance(expr, list) or not expr:
return expr
@@ -685,8 +763,8 @@ class StreamInterpreter:
return prim_func(*evaluated_args, **kwargs)
return prim_func(*evaluated_args)
except Exception as e:
- print(f"Primitive {op} error: {e}", file=sys.stderr)
- return None
+ self._record_error(f"Primitive {op} error: {e}")
+ raise RuntimeError(f"Primitive {op} failed: {e}")
# === Macros (function-like: args evaluated before binding) ===
@@ -720,8 +798,8 @@ class StreamInterpreter:
return prim_func(*evaluated_args, **kwargs)
return prim_func(*evaluated_args)
except Exception as e:
- print(f"Primitive {op} error: {e}", file=sys.stderr)
- return None
+ self._record_error(f"Primitive {op} error: {e}")
+ raise RuntimeError(f"Primitive {op} failed: {e}")
# Unknown - return as-is
return expr
diff --git a/tasks/__init__.py b/tasks/__init__.py
index bb8f547..6a07c25 100644
--- a/tasks/__init__.py
+++ b/tasks/__init__.py
@@ -2,9 +2,12 @@
#
# Tasks:
# 1. run_stream - Execute a streaming S-expression recipe
+# 2. upload_to_ipfs - Background IPFS upload for media files
from .streaming import run_stream
+from .ipfs_upload import upload_to_ipfs
__all__ = [
"run_stream",
+ "upload_to_ipfs",
]
diff --git a/tasks/ipfs_upload.py b/tasks/ipfs_upload.py
new file mode 100644
index 0000000..e99b6b1
--- /dev/null
+++ b/tasks/ipfs_upload.py
@@ -0,0 +1,83 @@
+"""
+Background IPFS upload task.
+
+Uploads files to IPFS in the background after initial local storage.
+This allows fast uploads while still getting IPFS CIDs eventually.
+"""
+
+import logging
+import os
+import sys
+from pathlib import Path
+from typing import Optional
+
+# Add parent directory to path for imports
+sys.path.insert(0, os.path.dirname(os.path.dirname(os.path.abspath(__file__))))
+
+from celery_app import app
+import ipfs_client
+
+logger = logging.getLogger(__name__)
+
+
+@app.task(bind=True, max_retries=3, default_retry_delay=60)
+def upload_to_ipfs(self, local_cid: str, actor_id: str) -> Optional[str]:
+ """
+ Upload a locally cached file to IPFS in the background.
+
+ Args:
+ local_cid: The local content hash of the file
+ actor_id: The user who uploaded the file
+
+ Returns:
+ IPFS CID if successful, None if failed
+ """
+ from cache_manager import get_cache_manager
+ import asyncio
+ import database
+
+ logger.info(f"Background IPFS upload starting for {local_cid[:16]}...")
+
+ try:
+ cache_mgr = get_cache_manager()
+
+ # Get the file path from local cache
+ file_path = cache_mgr.get_by_cid(local_cid)
+ if not file_path or not file_path.exists():
+ logger.error(f"File not found for local CID {local_cid[:16]}...")
+ return None
+
+ # Upload to IPFS
+ logger.info(f"Uploading {file_path} to IPFS...")
+ ipfs_cid = ipfs_client.add_file(file_path)
+
+ if not ipfs_cid:
+ logger.error(f"IPFS upload failed for {local_cid[:16]}...")
+ raise self.retry(exc=Exception("IPFS upload failed"))
+
+ logger.info(f"IPFS upload successful: {local_cid[:16]}... -> {ipfs_cid[:16]}...")
+
+ # Update database with IPFS CID
+ loop = asyncio.new_event_loop()
+ asyncio.set_event_loop(loop)
+ try:
+ # Initialize database pool if needed
+ loop.run_until_complete(database.init_pool())
+
+ # Update cache_items table
+ loop.run_until_complete(
+ database.update_cache_item_ipfs_cid(local_cid, ipfs_cid)
+ )
+
+ # Create index from IPFS CID to local cache
+ cache_mgr._set_content_index(ipfs_cid, local_cid)
+
+ logger.info(f"Database updated with IPFS CID for {local_cid[:16]}...")
+ finally:
+ loop.close()
+
+ return ipfs_cid
+
+ except Exception as e:
+ logger.error(f"Background IPFS upload error: {e}")
+ raise self.retry(exc=e)
diff --git a/tasks/streaming.py b/tasks/streaming.py
index 558f362..ea4128e 100644
--- a/tasks/streaming.py
+++ b/tasks/streaming.py
@@ -24,6 +24,11 @@ from cache_manager import get_cache_manager
logger = logging.getLogger(__name__)
+# Module-level event loop for database operations
+_resolve_loop = None
+_db_initialized = False
+
+
def resolve_asset(ref: str, actor_id: Optional[str] = None) -> Optional[Path]:
"""
Resolve an asset reference (CID or friendly name) to a file path.
@@ -35,6 +40,7 @@ def resolve_asset(ref: str, actor_id: Optional[str] = None) -> Optional[Path]:
Returns:
Path to the asset file, or None if not found
"""
+ global _resolve_loop, _db_initialized
cache_mgr = get_cache_manager()
# Try as direct CID first
@@ -46,15 +52,22 @@ def resolve_asset(ref: str, actor_id: Optional[str] = None) -> Optional[Path]:
# Try as friendly name if actor_id provided
if actor_id:
import asyncio
+ import database
from database import resolve_friendly_name
try:
- loop = asyncio.new_event_loop()
- asyncio.set_event_loop(loop)
- try:
- cid = loop.run_until_complete(resolve_friendly_name(actor_id, ref))
- finally:
- loop.close()
+ # Reuse event loop for database operations
+ if _resolve_loop is None or _resolve_loop.is_closed():
+ _resolve_loop = asyncio.new_event_loop()
+ asyncio.set_event_loop(_resolve_loop)
+ _db_initialized = False
+
+ # Initialize database pool once per loop
+ if not _db_initialized:
+ _resolve_loop.run_until_complete(database.init_db())
+ _db_initialized = True
+
+ cid = _resolve_loop.run_until_complete(resolve_friendly_name(actor_id, ref))
if cid:
path = cache_mgr.get_by_cid(cid)
@@ -173,6 +186,7 @@ def create_cid_primitives(actor_id: Optional[str] = None):
@app.task(bind=True, name='tasks.run_stream')
def run_stream(
self,
+ run_id: str,
recipe_sexp: str,
output_name: str = "output.mp4",
duration: Optional[float] = None,
@@ -185,6 +199,7 @@ def run_stream(
Execute a streaming S-expression recipe.
Args:
+ run_id: The run ID for database tracking
recipe_sexp: The recipe S-expression content
output_name: Name for the output file
duration: Optional duration override (seconds)
@@ -197,7 +212,7 @@ def run_stream(
Dict with output_cid, output_path, and status
"""
task_id = self.request.id
- logger.info(f"Starting stream task {task_id}")
+ logger.info(f"Starting stream task {task_id} for run {run_id}")
self.update_state(state='INITIALIZING', meta={'progress': 0})
@@ -237,8 +252,8 @@ def run_stream(
# Import the streaming interpreter
from streaming.stream_sexp_generic import StreamInterpreter
- # Create interpreter
- interp = StreamInterpreter(str(recipe_path))
+ # Create interpreter (pass actor_id for friendly name resolution)
+ interp = StreamInterpreter(str(recipe_path), actor_id=actor_id)
# Set primitive library directory explicitly
interp.primitive_lib_dir = sexp_effects_dir / "primitive_libs"
@@ -258,8 +273,17 @@ def run_stream(
logger.info(f"Rendering to {output_path}")
interp.run(duration=duration, output=str(output_path))
+ # Check for interpreter errors
+ if interp.errors:
+ error_msg = f"Rendering failed with {len(interp.errors)} errors: {interp.errors[0]}"
+ raise RuntimeError(error_msg)
+
self.update_state(state='CACHING', meta={'progress': 90})
+ # Validate output file (must be > 1KB to have actual frames)
+ if output_path.exists() and output_path.stat().st_size < 1024:
+ raise RuntimeError(f"Output file is too small ({output_path.stat().st_size} bytes) - rendering likely failed")
+
# Store output in cache
if output_path.exists():
cache_mgr = get_cache_manager()
@@ -271,16 +295,73 @@ def run_stream(
logger.info(f"Stream output cached: CID={cached_file.cid}, IPFS={ipfs_cid}")
+ # Save to database
+ import asyncio
+ import database
+
+ loop = asyncio.new_event_loop()
+ asyncio.set_event_loop(loop)
+ try:
+ # Initialize database pool if needed
+ if database.pool is None:
+ loop.run_until_complete(database.init_db())
+
+ # Get recipe CID from pending_run
+ pending = loop.run_until_complete(database.get_pending_run(run_id))
+ recipe_cid = pending.get("recipe", "streaming") if pending else "streaming"
+
+ # Save to run_cache for completed runs
+ logger.info(f"Saving run {run_id} to run_cache with actor_id={actor_id}")
+ loop.run_until_complete(database.save_run_cache(
+ run_id=run_id,
+ output_cid=cached_file.cid,
+ recipe=recipe_cid,
+ inputs=[],
+ ipfs_cid=ipfs_cid,
+ actor_id=actor_id,
+ ))
+ # Update pending run status
+ loop.run_until_complete(database.update_pending_run_status(
+ run_id=run_id,
+ status="completed",
+ ))
+ logger.info(f"Saved run {run_id} to database with actor_id={actor_id}")
+ except Exception as db_err:
+ logger.warning(f"Failed to save run to database: {db_err}")
+ finally:
+ loop.close()
+
return {
"status": "completed",
+ "run_id": run_id,
"task_id": task_id,
"output_cid": cached_file.cid,
"ipfs_cid": ipfs_cid,
"output_path": str(cached_file.path),
}
else:
+ # Update pending run status to failed
+ import asyncio
+ import database
+
+ loop = asyncio.new_event_loop()
+ asyncio.set_event_loop(loop)
+ try:
+ if database.pool is None:
+ loop.run_until_complete(database.init_db())
+ loop.run_until_complete(database.update_pending_run_status(
+ run_id=run_id,
+ status="failed",
+ error="Output file not created",
+ ))
+ except Exception as db_err:
+ logger.warning(f"Failed to update run status: {db_err}")
+ finally:
+ loop.close()
+
return {
"status": "failed",
+ "run_id": run_id,
"task_id": task_id,
"error": "Output file not created",
}
@@ -290,8 +371,28 @@ def run_stream(
import traceback
traceback.print_exc()
+ # Update pending run status to failed
+ import asyncio
+ import database
+
+ loop = asyncio.new_event_loop()
+ asyncio.set_event_loop(loop)
+ try:
+ if database.pool is None:
+ loop.run_until_complete(database.init_db())
+ loop.run_until_complete(database.update_pending_run_status(
+ run_id=run_id,
+ status="failed",
+ error=str(e),
+ ))
+ except Exception as db_err:
+ logger.warning(f"Failed to update run status: {db_err}")
+ finally:
+ loop.close()
+
return {
"status": "failed",
+ "run_id": run_id,
"task_id": task_id,
"error": str(e),
}
diff --git a/templates/standard-effects.sexp b/templates/standard-effects.sexp
index 9e97f34..ce4a92f 100644
--- a/templates/standard-effects.sexp
+++ b/templates/standard-effects.sexp
@@ -14,9 +14,9 @@
;; Usage:
;; (include :path "../templates/standard-effects.sexp")
-(effect rotate :path "../sexp_effects/effects/rotate.sexp")
-(effect zoom :path "../sexp_effects/effects/zoom.sexp")
-(effect blend :path "../sexp_effects/effects/blend.sexp")
-(effect ripple :path "../sexp_effects/effects/ripple.sexp")
-(effect invert :path "../sexp_effects/effects/invert.sexp")
-(effect hue_shift :path "../sexp_effects/effects/hue_shift.sexp")
+(effect rotate :name "fx-rotate")
+(effect zoom :name "fx-zoom")
+(effect blend :name "fx-blend")
+(effect ripple :name "fx-ripple")
+(effect invert :name "fx-invert")
+(effect hue_shift :name "fx-hue-shift")