Files
giles bb458aa924 Replace batch DAG system with streaming architecture
- Remove legacy_tasks.py, hybrid_state.py, render.py
- Remove old task modules (analyze, execute, execute_sexp, orchestrate)
- Add streaming interpreter from test repo
- Add sexp_effects with primitives and video effects
- Add streaming Celery task with CID-based asset resolution
- Support both CID and friendly name references for assets
- Add .dockerignore to prevent local clones from conflicting

Co-Authored-By: Claude Opus 4.5 <noreply@anthropic.com>
2026-02-02 19:10:11 +00:00

389 lines
14 KiB
Python

"""
ASCII Art Primitives Library
ASCII art rendering with per-zone expression evaluation and cell effects.
"""
import numpy as np
import cv2
from PIL import Image, ImageDraw, ImageFont
from typing import Any, Dict, List, Optional, Callable
import colorsys
# Character sets
CHAR_SETS = {
"standard": " .:-=+*#%@",
"blocks": " ░▒▓█",
"simple": " .:oO@",
"digits": "0123456789",
"binary": "01",
"ascii": " `.-':_,^=;><+!rc*/z?sLTv)J7(|Fi{C}fI31tlu[neoZ5Yxjya]2ESwqkP6h9d4VpOGbUAKXHm8RD#$Bg0MNWQ%&@",
}
# Default font
_default_font = None
def _get_font(size: int):
"""Get monospace font at given size."""
global _default_font
try:
return ImageFont.truetype("/usr/share/fonts/truetype/dejavu/DejaVuSansMono.ttf", size)
except:
return ImageFont.load_default()
def _parse_color(color_str: str) -> tuple:
"""Parse color string to RGB tuple."""
if color_str.startswith('#'):
hex_color = color_str[1:]
if len(hex_color) == 3:
hex_color = ''.join(c*2 for c in hex_color)
return tuple(int(hex_color[i:i+2], 16) for i in (0, 2, 4))
colors = {
'black': (0, 0, 0), 'white': (255, 255, 255),
'red': (255, 0, 0), 'green': (0, 255, 0), 'blue': (0, 0, 255),
'yellow': (255, 255, 0), 'cyan': (0, 255, 255), 'magenta': (255, 0, 255),
'gray': (128, 128, 128), 'grey': (128, 128, 128),
}
return colors.get(color_str.lower(), (0, 0, 0))
def _cell_sample(frame: np.ndarray, cell_size: int):
"""Sample frame into cells, returning colors and luminances.
Uses cv2.resize with INTER_AREA (pixel-area averaging) which is
~25x faster than numpy reshape+mean for block downsampling.
"""
h, w = frame.shape[:2]
rows = h // cell_size
cols = w // cell_size
# Crop to exact grid then block-average via cv2 area interpolation.
cropped = frame[:rows * cell_size, :cols * cell_size]
colors = cv2.resize(cropped, (cols, rows), interpolation=cv2.INTER_AREA)
luminances = ((0.299 * colors[:, :, 0] +
0.587 * colors[:, :, 1] +
0.114 * colors[:, :, 2]) / 255.0).astype(np.float32)
return colors, luminances
def _luminance_to_char(lum: float, alphabet: str, contrast: float) -> str:
"""Map luminance to character."""
chars = CHAR_SETS.get(alphabet, alphabet)
lum = ((lum - 0.5) * contrast + 0.5)
lum = max(0, min(1, lum))
idx = int(lum * (len(chars) - 1))
return chars[idx]
def _render_char_cell(char: str, cell_size: int, color: tuple, bg_color: tuple) -> np.ndarray:
"""Render a single character to a cell image."""
img = Image.new('RGB', (cell_size, cell_size), bg_color)
draw = ImageDraw.Draw(img)
font = _get_font(cell_size)
# Center the character
bbox = draw.textbbox((0, 0), char, font=font)
text_w = bbox[2] - bbox[0]
text_h = bbox[3] - bbox[1]
x = (cell_size - text_w) // 2
y = (cell_size - text_h) // 2 - bbox[1]
draw.text((x, y), char, fill=color, font=font)
return np.array(img)
def prim_ascii_fx_zone(
frame: np.ndarray,
cols: int = 80,
char_size: int = None,
alphabet: str = "standard",
color_mode: str = "color",
background: str = "black",
contrast: float = 1.5,
char_hue = None,
char_saturation = None,
char_brightness = None,
char_scale = None,
char_rotation = None,
char_jitter = None,
cell_effect = None,
energy: float = None,
rotation_scale: float = 0,
_interp = None,
_env = None,
**extra_params
) -> np.ndarray:
"""
Render frame as ASCII art with per-zone effects.
Args:
frame: Input image
cols: Number of character columns
char_size: Cell size in pixels (overrides cols if set)
alphabet: Character set name or custom string
color_mode: "color", "mono", "invert", or color name
background: Background color name or hex
contrast: Contrast for character selection
char_hue/saturation/brightness/scale/rotation/jitter: Per-zone expressions
cell_effect: Lambda (cell, zone) -> cell for per-cell effects
energy: Energy value from audio analysis
rotation_scale: Max rotation degrees
_interp: Interpreter (auto-injected)
_env: Environment (auto-injected)
**extra_params: Additional params passed to zone dict
"""
h, w = frame.shape[:2]
# Calculate cell size
if char_size is None or char_size == 0:
cell_size = max(4, w // cols)
else:
cell_size = max(4, int(char_size))
# Sample cells
colors, luminances = _cell_sample(frame, cell_size)
rows, cols_actual = luminances.shape
# Parse background color
bg_color = _parse_color(background)
# Create output image
out_h = rows * cell_size
out_w = cols_actual * cell_size
output = np.full((out_h, out_w, 3), bg_color, dtype=np.uint8)
# Check if we have cell_effect
has_cell_effect = cell_effect is not None
# Process each cell
for r in range(rows):
for c in range(cols_actual):
lum = luminances[r, c]
cell_color = tuple(colors[r, c])
# Build zone context
zone = {
'row': r,
'col': c,
'row-norm': r / max(1, rows - 1),
'col-norm': c / max(1, cols_actual - 1),
'lum': float(lum),
'r': cell_color[0] / 255,
'g': cell_color[1] / 255,
'b': cell_color[2] / 255,
'cell_size': cell_size,
}
# Add HSV
r_f, g_f, b_f = cell_color[0]/255, cell_color[1]/255, cell_color[2]/255
hsv = colorsys.rgb_to_hsv(r_f, g_f, b_f)
zone['hue'] = hsv[0] * 360
zone['sat'] = hsv[1]
# Add energy and rotation_scale
if energy is not None:
zone['energy'] = energy
zone['rotation_scale'] = rotation_scale
# Add extra params
for k, v in extra_params.items():
if isinstance(v, (int, float, str, bool)) or v is None:
zone[k] = v
# Get character
char = _luminance_to_char(lum, alphabet, contrast)
zone['char'] = char
# Determine cell color based on mode
if color_mode == "mono":
render_color = (255, 255, 255)
elif color_mode == "invert":
render_color = tuple(255 - c for c in cell_color)
elif color_mode == "color":
render_color = cell_color
else:
render_color = _parse_color(color_mode)
zone['color'] = render_color
# Render character to cell
cell_img = _render_char_cell(char, cell_size, render_color, bg_color)
# Apply cell_effect if provided
if has_cell_effect and _interp is not None:
cell_img = _apply_cell_effect(cell_img, zone, cell_effect, _interp, _env, extra_params)
# Paste cell to output
y1, y2 = r * cell_size, (r + 1) * cell_size
x1, x2 = c * cell_size, (c + 1) * cell_size
output[y1:y2, x1:x2] = cell_img
# Resize to match input dimensions
if output.shape[:2] != frame.shape[:2]:
output = cv2.resize(output, (w, h), interpolation=cv2.INTER_LINEAR)
return output
def _apply_cell_effect(cell_img, zone, cell_effect, interp, env, extra_params):
"""Apply cell_effect lambda to a cell image.
cell_effect is a Lambda object with params and body.
We create a child environment with zone variables and cell,
then evaluate the lambda body.
"""
# Get Environment class from the interpreter's module
Environment = type(env)
# Create child environment with zone variables
cell_env = Environment(env)
# Bind zone variables
for k, v in zone.items():
cell_env.set(k, v)
# Also bind with zone- prefix for consistency
cell_env.set('zone-row', zone.get('row', 0))
cell_env.set('zone-col', zone.get('col', 0))
cell_env.set('zone-row-norm', zone.get('row-norm', 0))
cell_env.set('zone-col-norm', zone.get('col-norm', 0))
cell_env.set('zone-lum', zone.get('lum', 0))
cell_env.set('zone-sat', zone.get('sat', 0))
cell_env.set('zone-hue', zone.get('hue', 0))
cell_env.set('zone-r', zone.get('r', 0))
cell_env.set('zone-g', zone.get('g', 0))
cell_env.set('zone-b', zone.get('b', 0))
# Inject loaded effects as callable functions
if hasattr(interp, 'effects'):
for effect_name in interp.effects:
def make_effect_fn(name):
def effect_fn(frame, *args):
params = {}
if name == 'blur' and len(args) >= 1:
params['radius'] = args[0]
elif name == 'rotate' and len(args) >= 1:
params['angle'] = args[0]
elif name == 'brightness' and len(args) >= 1:
params['amount'] = args[0]
elif name == 'contrast' and len(args) >= 1:
params['amount'] = args[0]
elif name == 'saturation' and len(args) >= 1:
params['amount'] = args[0]
elif name == 'hue_shift' and len(args) >= 1:
params['degrees'] = args[0]
elif name == 'rgb_split' and len(args) >= 2:
params['offset_x'] = args[0]
params['offset_y'] = args[1]
elif name == 'pixelate' and len(args) >= 1:
params['size'] = args[0]
elif name == 'invert':
pass
result, _ = interp.run_effect(name, frame, params, {})
return result
return effect_fn
cell_env.set(effect_name, make_effect_fn(effect_name))
# Bind cell image and zone dict
cell_env.set('cell', cell_img)
cell_env.set('zone', zone)
# Evaluate the cell_effect lambda
# Lambda has params and body - we need to bind the params then evaluate
if hasattr(cell_effect, 'params') and hasattr(cell_effect, 'body'):
# Bind lambda parameters: (lambda [cell zone] body)
if len(cell_effect.params) >= 1:
cell_env.set(cell_effect.params[0], cell_img)
if len(cell_effect.params) >= 2:
cell_env.set(cell_effect.params[1], zone)
result = interp.eval(cell_effect.body, cell_env)
elif isinstance(cell_effect, list):
# Raw S-expression lambda like (lambda [cell zone] body) or (fn [cell zone] body)
# Check if it's a lambda expression
head = cell_effect[0] if cell_effect else None
head_name = head.name if head and hasattr(head, 'name') else str(head) if head else None
is_lambda = head_name in ('lambda', 'fn')
if is_lambda:
# (lambda [params...] body)
params = cell_effect[1] if len(cell_effect) > 1 else []
body = cell_effect[2] if len(cell_effect) > 2 else None
# Bind lambda parameters
if isinstance(params, list) and len(params) >= 1:
param_name = params[0].name if hasattr(params[0], 'name') else str(params[0])
cell_env.set(param_name, cell_img)
if isinstance(params, list) and len(params) >= 2:
param_name = params[1].name if hasattr(params[1], 'name') else str(params[1])
cell_env.set(param_name, zone)
result = interp.eval(body, cell_env) if body else cell_img
else:
# Some other expression - just evaluate it
result = interp.eval(cell_effect, cell_env)
elif callable(cell_effect):
# It's a callable
result = cell_effect(cell_img, zone)
else:
raise ValueError(f"cell_effect must be a Lambda, list, or callable, got {type(cell_effect)}")
if isinstance(result, np.ndarray) and result.shape == cell_img.shape:
return result
elif isinstance(result, np.ndarray):
# Shape mismatch - resize to fit
result = cv2.resize(result, (cell_img.shape[1], cell_img.shape[0]))
return result
raise ValueError(f"cell_effect must return an image array, got {type(result)}")
def _get_legacy_ascii_primitives():
"""Import ASCII primitives from legacy primitives module.
These are loaded lazily to avoid import issues during module loading.
By the time a primitive library is loaded, sexp_effects.primitives
is already in sys.modules (imported by sexp_effects.__init__).
"""
from sexp_effects.primitives import (
prim_cell_sample,
prim_luminance_to_chars,
prim_render_char_grid,
prim_render_char_grid_fx,
prim_alphabet_char,
prim_alphabet_length,
prim_map_char_grid,
prim_map_colors,
prim_make_char_grid,
prim_set_char,
prim_get_char,
prim_char_grid_dimensions,
cell_sample_extended,
)
return {
'cell-sample': prim_cell_sample,
'cell-sample-extended': cell_sample_extended,
'luminance-to-chars': prim_luminance_to_chars,
'render-char-grid': prim_render_char_grid,
'render-char-grid-fx': prim_render_char_grid_fx,
'alphabet-char': prim_alphabet_char,
'alphabet-length': prim_alphabet_length,
'map-char-grid': prim_map_char_grid,
'map-colors': prim_map_colors,
'make-char-grid': prim_make_char_grid,
'set-char': prim_set_char,
'get-char': prim_get_char,
'char-grid-dimensions': prim_char_grid_dimensions,
}
PRIMITIVES = {
'ascii-fx-zone': prim_ascii_fx_zone,
**_get_legacy_ascii_primitives(),
}