Phase 1: s-expression core library + test infrastructure
All checks were successful
Build and Deploy / build-and-deploy (push) Successful in 1m9s

S-expression parser, evaluator, and primitive registry in shared/sexp/.
109 unit tests covering parsing, evaluation, special forms, lambdas,
closures, components (defcomp), and 60+ pure builtins.

Test infrastructure: Dockerfile.unit (tier 1, fast) and
Dockerfile.integration (tier 2, ffmpeg). Dev watch mode auto-reruns
on file changes. Deploy gate blocks push on test failure.

Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
This commit is contained in:
2026-02-27 13:26:18 +00:00
parent 996ddad2ea
commit 0fb87e3b1c
15 changed files with 2293 additions and 0 deletions

View File

@@ -44,6 +44,17 @@ fi
echo "Building: ${BUILD[*]}"
echo ""
# --- Run unit tests before deploying ---
echo "=== Running unit tests ==="
docker build -f test/Dockerfile.unit -t rose-ash-test-unit:latest . -q
if ! docker run --rm rose-ash-test-unit:latest; then
echo ""
echo "Unit tests FAILED — aborting deploy."
exit 1
fi
echo "Unit tests passed."
echo ""
for app in "${BUILD[@]}"; do
echo "=== $app ==="
docker build -f "$app/Dockerfile" -t "$REGISTRY/$app:latest" .

18
dev.sh
View File

@@ -20,6 +20,24 @@ case "${1:-up}" in
shift
$COMPOSE logs -f "$@"
;;
test)
# One-shot: all unit tests
$COMPOSE run --rm test-unit python -m pytest \
shared/ artdag/core/tests/ artdag/core/artdag/sexp/ \
artdag/l1/tests/ artdag/l1/sexp_effects/ \
-v --tb=short \
--ignore=artdag/l1/tests/test_jax_primitives.py \
--ignore=artdag/l1/tests/test_jax_pipeline_integration.py \
-k "not gpu and not cuda"
;;
test-integration)
# One-shot: integration tests (needs ffmpeg, heavier)
$COMPOSE run --rm test-integration
;;
watch)
# Auto-rerun unit tests on file changes (stays running)
$COMPOSE up test-unit
;;
build)
shift
if [[ $# -eq 0 ]]; then

View File

@@ -335,6 +335,41 @@ services:
- ./account/__init__.py:/app/account/__init__.py:ro
- ./account/models:/app/account/models:ro
test-unit:
build:
context: .
dockerfile: test/Dockerfile.unit
volumes:
- ./shared:/app/shared
- ./artdag/core:/app/artdag/core
- ./artdag/l1/tests:/app/artdag/l1/tests
- ./artdag/l1/sexp_effects:/app/artdag/l1/sexp_effects
- ./artdag/l1/app:/app/artdag/l1/app
entrypoint: >
python -m pytest_watch
--runner "python -m pytest -v --tb=short"
--
shared/
artdag/core/tests/
artdag/core/artdag/sexp/
artdag/l1/tests/
artdag/l1/sexp_effects/
--ignore=artdag/l1/tests/test_jax_primitives.py
--ignore=artdag/l1/tests/test_jax_pipeline_integration.py
-k "not gpu and not cuda"
profiles:
- test
test-integration:
build:
context: .
dockerfile: test/Dockerfile.integration
volumes:
- ./shared:/app/shared
- ./artdag:/app/artdag
profiles:
- test
networks:
appnet:
driver: bridge

View File

@@ -598,3 +598,35 @@ rose-ash/
**Intelligence** (Phases 10-11): Federation makes s-expressions portable across instances. The LLM makes s-expressions accessible to non-programmers — natural language in, rendered pages out. The system learns from its own data, continuously improving the quality of generated s-expressions.
Each phase is independently deployable. The end state: a platform where the application logic is expressed in a small, composable, content-addressed language that humans author, LLMs generate, resolvers execute, IPFS stores, and ActivityPub federates.
---
## Progress Log
### Phase 1: S-Expression Core Library — COMPLETE
**Branch:** `sexpression`
**Delivered** (`shared/sexp/`):
- `types.py` — Symbol, Keyword, Lambda (callable closure), Component (defcomp), NIL singleton
- `parser.py` — Tokenizer + parse/parse_all/serialize. Supports lists, vectors, maps, symbols (~component, <>fragment), keywords, strings, numbers, comments, &key/&rest
- `env.py` — Lexical environment with parent-chain scoping
- `evaluator.py` — Full evaluator with special forms (if, when, cond, case, and, or, let/let*, lambda/fn, define, defcomp, begin/do, quote, ->, set!) and higher-order forms (map, map-indexed, filter, reduce, some, every?, for-each)
- `primitives.py` — 60+ pure builtins: arithmetic, comparison, predicates, strings (str, concat, upper, lower, join, split, starts-with?, ends-with?), collections (list, dict, get, first, last, rest, nth, cons, append, keys, vals, merge, assoc, dissoc, into, range, chunk-every, zip-pairs)
- `__init__.py` — Public API
**Tests** (`shared/sexp/tests/`):
- `test_parser.py` — 28 tests (atoms, lists, maps, vectors, comments, errors, serialization, roundtrip)
- `test_evaluator.py` — 81 tests (literals, arithmetic, comparison, predicates, special forms, lambda/closures, collections, higher-order, strings, defcomp, dict literals, set!)
- **109 tests, all passing**
**Source material ported from:** `artdag/core/artdag/sexp/parser.py` and `evaluator.py`. Stripped DAG-specific types (Binding), replaced Lambda dataclass with callable closure, added defcomp/Component, added web-oriented string primitives, added &key/&rest support in parser.
### Test Infrastructure — COMPLETE
**Delivered:**
- `test/Dockerfile.unit` — Tier 1: all unit tests (shared + artdag core + L1), pure Python, fast
- `test/Dockerfile.integration` — Tier 2: integration tests needing ffmpeg/media pipeline
- `docker-compose.dev.yml``test-unit` (watch mode) and `test-integration` services, `profiles: [test]`
- `dev.sh``./dev.sh watch` (auto-rerun on save), `./dev.sh test` (one-shot), `./dev.sh test-integration`
- `deploy.sh` — Unit test gate: tests must pass before any images are pushed

66
shared/sexp/__init__.py Normal file
View File

@@ -0,0 +1,66 @@
"""
S-expression language core.
Parse, evaluate, and serialize s-expressions. This package provides the
foundation for the composable fragment architecture described in
``docs/sexp-architecture-plan.md``.
Quick start::
from shared.sexp import parse, evaluate, serialize, Symbol, Keyword
expr = parse('(let ((x 10)) (+ x 1))')
result = evaluate(expr) # → 11
expr2 = parse('(map (fn (n) (* n n)) (list 1 2 3))')
result2 = evaluate(expr2) # → [1, 4, 9]
"""
from .types import (
NIL,
Component,
Keyword,
Lambda,
Symbol,
)
from .parser import (
ParseError,
parse,
parse_all,
serialize,
)
from .evaluator import (
EvalError,
evaluate,
make_env,
)
from .primitives import (
all_primitives,
get_primitive,
register_primitive,
)
from .env import Env
__all__ = [
# Types
"Symbol",
"Keyword",
"Lambda",
"Component",
"NIL",
# Parser
"parse",
"parse_all",
"serialize",
"ParseError",
# Evaluator
"evaluate",
"make_env",
"EvalError",
# Primitives
"register_primitive",
"get_primitive",
"all_primitives",
# Environment
"Env",
]

97
shared/sexp/env.py Normal file
View File

@@ -0,0 +1,97 @@
"""
Lexical environment for s-expression evaluation.
Environments form a parent chain so inner scopes shadow outer ones
while still allowing lookup of free variables.
"""
from __future__ import annotations
from typing import Any
class Env:
"""A lexical scope mapping names → values with an optional parent."""
__slots__ = ("_bindings", "_parent")
def __init__(
self,
bindings: dict[str, Any] | None = None,
parent: Env | None = None,
):
self._bindings: dict[str, Any] = bindings or {}
self._parent = parent
# -- lookup -------------------------------------------------------------
def lookup(self, name: str) -> Any:
"""Resolve *name*, walking the parent chain.
Raises ``KeyError`` if not found.
"""
if name in self._bindings:
return self._bindings[name]
if self._parent is not None:
return self._parent.lookup(name)
raise KeyError(name)
def __contains__(self, name: str) -> bool:
if name in self._bindings:
return True
if self._parent is not None:
return name in self._parent
return False
def __getitem__(self, name: str) -> Any:
return self.lookup(name)
def get(self, name: str, default: Any = None) -> Any:
try:
return self.lookup(name)
except KeyError:
return default
# -- mutation -----------------------------------------------------------
def define(self, name: str, value: Any) -> None:
"""Bind *name* in the **current** scope."""
self._bindings[name] = value
def set(self, name: str, value: Any) -> None:
"""Update *name* in the **nearest enclosing** scope that contains it.
Raises ``KeyError`` if the name is not bound anywhere.
"""
if name in self._bindings:
self._bindings[name] = value
elif self._parent is not None:
self._parent.set(name, value)
else:
raise KeyError(f"Cannot set! undefined variable: {name}")
# -- construction -------------------------------------------------------
def extend(self, bindings: dict[str, Any] | None = None) -> Env:
"""Return a child environment."""
return Env(bindings or {}, parent=self)
# -- conversion ---------------------------------------------------------
def to_dict(self) -> dict[str, Any]:
"""Flatten the full chain into a single dict (parent first)."""
if self._parent is not None:
d = self._parent.to_dict()
else:
d = {}
d.update(self._bindings)
return d
def __repr__(self) -> str:
keys = list(self._bindings.keys())
depth = 0
p = self._parent
while p:
depth += 1
p = p._parent
return f"<Env depth={depth} keys={keys}>"

549
shared/sexp/evaluator.py Normal file
View File

@@ -0,0 +1,549 @@
"""
S-expression evaluator.
Walks a parsed s-expression tree and evaluates it in an environment.
Special forms:
(if cond then else?)
(when cond body)
(cond clause...) — Scheme-style ((test body)...) or Clojure-style (test body...)
(case expr val body... :else default)
(and expr...) (or expr...)
(let ((name val)...) body) or (let (name val name val...) body)
(lambda (params...) body) or (fn (params...) body)
(define name value)
(defcomp ~name (&key param...) body)
(begin expr...)
(quote expr)
(do expr...) — alias for begin
(-> val form...) — thread-first macro
Higher-order forms (operate on lambdas):
(map fn coll)
(map-indexed fn coll)
(filter fn coll)
(reduce fn init coll)
(some fn coll)
(every? fn coll)
(for-each fn coll)
"""
from __future__ import annotations
from typing import Any
from .types import Component, Keyword, Lambda, NIL, Symbol
from .primitives import _PRIMITIVES
class EvalError(Exception):
"""Error during expression evaluation."""
pass
# ---------------------------------------------------------------------------
# Public API
# ---------------------------------------------------------------------------
def evaluate(expr: Any, env: dict[str, Any] | None = None) -> Any:
"""Evaluate *expr* in *env* and return the result."""
if env is None:
env = {}
return _eval(expr, env)
def make_env(**kwargs: Any) -> dict[str, Any]:
"""Convenience: create an environment dict with initial bindings."""
return dict(kwargs)
# ---------------------------------------------------------------------------
# Internal evaluator
# ---------------------------------------------------------------------------
def _eval(expr: Any, env: dict[str, Any]) -> Any:
# --- literals ---------------------------------------------------------
if isinstance(expr, (int, float, str, bool)):
return expr
if expr is None or expr is NIL:
return NIL
# --- symbol lookup ----------------------------------------------------
if isinstance(expr, Symbol):
name = expr.name
if name in env:
return env[name]
if name in _PRIMITIVES:
return _PRIMITIVES[name]
if name == "true":
return True
if name == "false":
return False
if name == "nil":
return NIL
raise EvalError(f"Undefined symbol: {name}")
# --- keyword → its string name ----------------------------------------
if isinstance(expr, Keyword):
return expr.name
# --- dict literal -----------------------------------------------------
if isinstance(expr, dict):
return {k: _eval(v, env) for k, v in expr.items()}
# --- list = call or special form --------------------------------------
if not isinstance(expr, list):
return expr
if not expr:
return []
head = expr[0]
# If head is not a symbol/lambda/list, treat entire list as data
if not isinstance(head, (Symbol, Lambda, list)):
return [_eval(x, env) for x in expr]
# --- special forms ----------------------------------------------------
if isinstance(head, Symbol):
name = head.name
handler = _SPECIAL_FORMS.get(name)
if handler is not None:
return handler(expr, env)
# Higher-order forms (need lazy eval of lambda arg)
ho = _HO_FORMS.get(name)
if ho is not None:
return ho(expr, env)
# --- function / lambda call -------------------------------------------
fn = _eval(head, env)
args = [_eval(a, env) for a in expr[1:]]
if callable(fn) and not isinstance(fn, (Lambda, Component)):
return fn(*args)
if isinstance(fn, Lambda):
return _call_lambda(fn, args, env)
if isinstance(fn, Component):
return _call_component(fn, expr[1:], env)
raise EvalError(f"Not callable: {fn!r}")
# ---------------------------------------------------------------------------
# Lambda / component invocation
# ---------------------------------------------------------------------------
def _call_lambda(fn: Lambda, args: list[Any], caller_env: dict[str, Any]) -> Any:
if len(args) != len(fn.params):
raise EvalError(f"{fn!r} expects {len(fn.params)} args, got {len(args)}")
local = dict(fn.closure)
local.update(caller_env)
for p, v in zip(fn.params, args):
local[p] = v
return _eval(fn.body, local)
def _call_component(comp: Component, raw_args: list[Any], env: dict[str, Any]) -> Any:
"""Evaluate a component invocation with keyword arguments.
``(~card :title "Hello" (p "child"))``
→ comp.params gets ``title="Hello"``, comp children gets ``[(p "child")]``
"""
kwargs: dict[str, Any] = {}
children: list[Any] = []
i = 0
while i < len(raw_args):
arg = raw_args[i]
if isinstance(arg, Keyword) and i + 1 < len(raw_args):
kwargs[arg.name] = _eval(raw_args[i + 1], env)
i += 2
else:
children.append(_eval(arg, env))
i += 1
local = dict(comp.closure)
local.update(env)
for p in comp.params:
if p in kwargs:
local[p] = kwargs[p]
else:
local[p] = NIL
if comp.has_children:
local["children"] = children
return _eval(comp.body, local)
# ---------------------------------------------------------------------------
# Special forms
# ---------------------------------------------------------------------------
def _sf_if(expr: list, env: dict) -> Any:
if len(expr) < 3:
raise EvalError("if requires condition and then-branch")
cond = _eval(expr[1], env)
if cond and cond is not NIL:
return _eval(expr[2], env)
if len(expr) > 3:
return _eval(expr[3], env)
return NIL
def _sf_when(expr: list, env: dict) -> Any:
if len(expr) < 3:
raise EvalError("when requires condition and body")
cond = _eval(expr[1], env)
if cond and cond is not NIL:
result = NIL
for body_expr in expr[2:]:
result = _eval(body_expr, env)
return result
return NIL
def _sf_cond(expr: list, env: dict) -> Any:
clauses = expr[1:]
if not clauses:
return NIL
# Detect scheme-style: first clause is a 2-element list that isn't a comparison
if (
isinstance(clauses[0], list)
and len(clauses[0]) == 2
and not (isinstance(clauses[0][0], Symbol) and clauses[0][0].name in (
"=", "<", ">", "<=", ">=", "!=", "and", "or",
))
):
for clause in clauses:
if not isinstance(clause, list) or len(clause) < 2:
raise EvalError("cond clause must be (test result)")
test = clause[0]
if isinstance(test, Symbol) and test.name in ("else", ":else"):
return _eval(clause[1], env)
if isinstance(test, Keyword) and test.name == "else":
return _eval(clause[1], env)
if _eval(test, env):
return _eval(clause[1], env)
else:
i = 0
while i < len(clauses) - 1:
test = clauses[i]
result = clauses[i + 1]
if isinstance(test, Keyword) and test.name == "else":
return _eval(result, env)
if isinstance(test, Symbol) and test.name in (":else", "else"):
return _eval(result, env)
if _eval(test, env):
return _eval(result, env)
i += 2
return NIL
def _sf_case(expr: list, env: dict) -> Any:
if len(expr) < 2:
raise EvalError("case requires expression to match")
match_val = _eval(expr[1], env)
clauses = expr[2:]
i = 0
while i < len(clauses) - 1:
test = clauses[i]
result = clauses[i + 1]
if isinstance(test, Keyword) and test.name == "else":
return _eval(result, env)
if isinstance(test, Symbol) and test.name in (":else", "else"):
return _eval(result, env)
if match_val == _eval(test, env):
return _eval(result, env)
i += 2
return NIL
def _sf_and(expr: list, env: dict) -> Any:
result: Any = True
for arg in expr[1:]:
result = _eval(arg, env)
if not result:
return result
return result
def _sf_or(expr: list, env: dict) -> Any:
result: Any = False
for arg in expr[1:]:
result = _eval(arg, env)
if result:
return result
return result
def _sf_let(expr: list, env: dict) -> Any:
if len(expr) < 3:
raise EvalError("let requires bindings and body")
bindings = expr[1]
local = dict(env)
if isinstance(bindings, list):
if bindings and isinstance(bindings[0], list):
# Scheme-style: ((name val) ...)
for binding in bindings:
if len(binding) != 2:
raise EvalError("let binding must be (name value)")
var = binding[0]
vname = var.name if isinstance(var, Symbol) else var
local[vname] = _eval(binding[1], local)
elif len(bindings) % 2 == 0:
# Clojure-style: (name val name val ...)
for i in range(0, len(bindings), 2):
var = bindings[i]
vname = var.name if isinstance(var, Symbol) else var
local[vname] = _eval(bindings[i + 1], local)
else:
raise EvalError("let bindings must be (name val ...) pairs")
else:
raise EvalError("let bindings must be a list")
# Evaluate body expressions, return last
result: Any = NIL
for body_expr in expr[2:]:
result = _eval(body_expr, local)
return result
def _sf_lambda(expr: list, env: dict) -> Lambda:
if len(expr) < 3:
raise EvalError("lambda requires params and body")
params_expr = expr[1]
if not isinstance(params_expr, list):
raise EvalError("lambda params must be a list")
param_names = []
for p in params_expr:
if isinstance(p, Symbol):
param_names.append(p.name)
elif isinstance(p, str):
param_names.append(p)
else:
raise EvalError(f"Invalid lambda param: {p}")
return Lambda(param_names, expr[2], dict(env))
def _sf_define(expr: list, env: dict) -> Any:
if len(expr) < 3:
raise EvalError("define requires name and value")
name_sym = expr[1]
if not isinstance(name_sym, Symbol):
raise EvalError(f"define name must be symbol, got {type(name_sym).__name__}")
value = _eval(expr[2], env)
if isinstance(value, Lambda) and value.name is None:
value.name = name_sym.name
env[name_sym.name] = value
return value
def _sf_defcomp(expr: list, env: dict) -> Component:
"""``(defcomp ~name (&key param1 param2 &rest children) body)``"""
if len(expr) < 4:
raise EvalError("defcomp requires name, params, and body")
name_sym = expr[1]
if not isinstance(name_sym, Symbol):
raise EvalError(f"defcomp name must be symbol, got {type(name_sym).__name__}")
comp_name = name_sym.name.lstrip("~")
params_expr = expr[2]
if not isinstance(params_expr, list):
raise EvalError("defcomp params must be a list")
params: list[str] = []
has_children = False
in_key = False
for p in params_expr:
if isinstance(p, Symbol):
if p.name == "&key":
in_key = True
continue
if p.name == "&rest":
has_children = True
continue
if in_key or has_children:
if not has_children:
params.append(p.name)
else:
params.append(p.name)
# Skip children param name after &rest
elif isinstance(p, str):
params.append(p)
comp = Component(
name=comp_name,
params=params,
has_children=has_children,
body=expr[3],
closure=dict(env),
)
env[name_sym.name] = comp
return comp
def _sf_begin(expr: list, env: dict) -> Any:
result: Any = NIL
for sub in expr[1:]:
result = _eval(sub, env)
return result
def _sf_quote(expr: list, _env: dict) -> Any:
return expr[1] if len(expr) > 1 else NIL
def _sf_thread_first(expr: list, env: dict) -> Any:
"""``(-> val (f a) (g b))`` → ``(g (f val a) b)``"""
if len(expr) < 2:
raise EvalError("-> requires at least a value")
result = _eval(expr[1], env)
for form in expr[2:]:
if isinstance(form, list):
fn = _eval(form[0], env)
args = [result] + [_eval(a, env) for a in form[1:]]
else:
fn = _eval(form, env)
args = [result]
if callable(fn) and not isinstance(fn, (Lambda, Component)):
result = fn(*args)
elif isinstance(fn, Lambda):
result = _call_lambda(fn, args, env)
else:
raise EvalError(f"-> form not callable: {fn!r}")
return result
def _sf_set_bang(expr: list, env: dict) -> Any:
"""``(set! name value)`` — mutate existing binding."""
if len(expr) != 3:
raise EvalError("set! requires name and value")
name_sym = expr[1]
if not isinstance(name_sym, Symbol):
raise EvalError(f"set! name must be symbol, got {type(name_sym).__name__}")
value = _eval(expr[2], env)
# Walk up scope if using Env objects; for plain dicts just overwrite
env[name_sym.name] = value
return value
_SPECIAL_FORMS: dict[str, Any] = {
"if": _sf_if,
"when": _sf_when,
"cond": _sf_cond,
"case": _sf_case,
"and": _sf_and,
"or": _sf_or,
"let": _sf_let,
"let*": _sf_let,
"lambda": _sf_lambda,
"fn": _sf_lambda,
"define": _sf_define,
"defcomp": _sf_defcomp,
"begin": _sf_begin,
"do": _sf_begin,
"quote": _sf_quote,
"->": _sf_thread_first,
"set!": _sf_set_bang,
}
# ---------------------------------------------------------------------------
# Higher-order forms (need to evaluate the fn arg first)
# ---------------------------------------------------------------------------
def _ho_map(expr: list, env: dict) -> list:
if len(expr) != 3:
raise EvalError("map requires fn and collection")
fn = _eval(expr[1], env)
coll = _eval(expr[2], env)
if not isinstance(fn, Lambda):
raise EvalError(f"map requires lambda, got {type(fn).__name__}")
return [_call_lambda(fn, [item], env) for item in coll]
def _ho_map_indexed(expr: list, env: dict) -> list:
if len(expr) != 3:
raise EvalError("map-indexed requires fn and collection")
fn = _eval(expr[1], env)
coll = _eval(expr[2], env)
if not isinstance(fn, Lambda):
raise EvalError(f"map-indexed requires lambda, got {type(fn).__name__}")
if len(fn.params) < 2:
raise EvalError("map-indexed lambda needs (i item) params")
return [_call_lambda(fn, [i, item], env) for i, item in enumerate(coll)]
def _ho_filter(expr: list, env: dict) -> list:
if len(expr) != 3:
raise EvalError("filter requires fn and collection")
fn = _eval(expr[1], env)
coll = _eval(expr[2], env)
if not isinstance(fn, Lambda):
raise EvalError(f"filter requires lambda, got {type(fn).__name__}")
return [item for item in coll if _call_lambda(fn, [item], env)]
def _ho_reduce(expr: list, env: dict) -> Any:
if len(expr) != 4:
raise EvalError("reduce requires fn, init, and collection")
fn = _eval(expr[1], env)
acc = _eval(expr[2], env)
coll = _eval(expr[3], env)
if not isinstance(fn, Lambda):
raise EvalError(f"reduce requires lambda, got {type(fn).__name__}")
for item in coll:
acc = _call_lambda(fn, [acc, item], env)
return acc
def _ho_some(expr: list, env: dict) -> Any:
if len(expr) != 3:
raise EvalError("some requires fn and collection")
fn = _eval(expr[1], env)
coll = _eval(expr[2], env)
if not isinstance(fn, Lambda):
raise EvalError(f"some requires lambda, got {type(fn).__name__}")
for item in coll:
result = _call_lambda(fn, [item], env)
if result:
return result
return NIL
def _ho_every(expr: list, env: dict) -> bool:
if len(expr) != 3:
raise EvalError("every? requires fn and collection")
fn = _eval(expr[1], env)
coll = _eval(expr[2], env)
if not isinstance(fn, Lambda):
raise EvalError(f"every? requires lambda, got {type(fn).__name__}")
for item in coll:
if not _call_lambda(fn, [item], env):
return False
return True
def _ho_for_each(expr: list, env: dict) -> Any:
if len(expr) != 3:
raise EvalError("for-each requires fn and collection")
fn = _eval(expr[1], env)
coll = _eval(expr[2], env)
if not isinstance(fn, Lambda):
raise EvalError(f"for-each requires lambda, got {type(fn).__name__}")
for item in coll:
_call_lambda(fn, [item], env)
return NIL
_HO_FORMS: dict[str, Any] = {
"map": _ho_map,
"map-indexed": _ho_map_indexed,
"filter": _ho_filter,
"reduce": _ho_reduce,
"some": _ho_some,
"every?": _ho_every,
"for-each": _ho_for_each,
}

306
shared/sexp/parser.py Normal file
View File

@@ -0,0 +1,306 @@
"""
S-expression parser.
Supports:
- Lists: (a b c)
- Vectors: [a b c] (sugar for lists)
- Maps: {:key1 val1 :key2 val2}
- Symbols: foo, bar-baz, ->, ~card
- Keywords: :class, :id
- Strings: "hello world" (with \\n, \\t, \\", \\\\ escapes)
- Numbers: 42, 3.14, -1.5, 1e-3
- Comments: ; to end of line
- Fragment: <> (empty-tag symbol for fragment groups)
"""
from __future__ import annotations
import re
from typing import Any
from .types import Keyword, Symbol, NIL
# ---------------------------------------------------------------------------
# Errors
# ---------------------------------------------------------------------------
class ParseError(Exception):
"""Error during s-expression parsing."""
def __init__(self, message: str, position: int = 0, line: int = 1, col: int = 1):
self.position = position
self.line = line
self.col = col
super().__init__(f"{message} at line {line}, column {col}")
# ---------------------------------------------------------------------------
# Tokenizer
# ---------------------------------------------------------------------------
class Tokenizer:
"""Stateful tokenizer that walks an s-expression string."""
WHITESPACE = re.compile(r"\s+")
COMMENT = re.compile(r";[^\n]*")
STRING = re.compile(r'"(?:[^"\\]|\\.)*"')
NUMBER = re.compile(r"-?(?:\d+\.?\d*|\.\d+)(?:[eE][+-]?\d+)?")
KEYWORD = re.compile(r":[a-zA-Z_][a-zA-Z0-9_-]*")
# Symbols may start with alpha, _, or common operator chars, plus ~ for components,
# <> for the fragment symbol, and & for &key/&rest.
SYMBOL = re.compile(r"[a-zA-Z_~*+\-><=/!?&][a-zA-Z0-9_~*+\-><=/!?.:&]*")
def __init__(self, text: str):
self.text = text
self.pos = 0
self.line = 1
self.col = 1
def _advance(self, count: int = 1):
for _ in range(count):
if self.pos < len(self.text):
if self.text[self.pos] == "\n":
self.line += 1
self.col = 1
else:
self.col += 1
self.pos += 1
def _skip_whitespace_and_comments(self):
while self.pos < len(self.text):
m = self.WHITESPACE.match(self.text, self.pos)
if m:
self._advance(m.end() - self.pos)
continue
m = self.COMMENT.match(self.text, self.pos)
if m:
self._advance(m.end() - self.pos)
continue
break
def peek(self) -> str | None:
self._skip_whitespace_and_comments()
if self.pos >= len(self.text):
return None
return self.text[self.pos]
def next_token(self) -> Any:
self._skip_whitespace_and_comments()
if self.pos >= len(self.text):
return None
char = self.text[self.pos]
# Delimiters
if char in "()[]{}":
self._advance()
return char
# String
if char == '"':
m = self.STRING.match(self.text, self.pos)
if not m:
raise ParseError("Unterminated string", self.pos, self.line, self.col)
self._advance(m.end() - self.pos)
content = m.group()[1:-1]
content = content.replace("\\n", "\n")
content = content.replace("\\t", "\t")
content = content.replace('\\"', '"')
content = content.replace("\\\\", "\\")
return content
# Keyword
if char == ":":
m = self.KEYWORD.match(self.text, self.pos)
if m:
self._advance(m.end() - self.pos)
return Keyword(m.group()[1:])
raise ParseError("Invalid keyword", self.pos, self.line, self.col)
# Number (check before symbol because of leading -)
if char.isdigit() or (
char == "-"
and self.pos + 1 < len(self.text)
and (self.text[self.pos + 1].isdigit() or self.text[self.pos + 1] == ".")
):
m = self.NUMBER.match(self.text, self.pos)
if m:
self._advance(m.end() - self.pos)
num_str = m.group()
if "." in num_str or "e" in num_str or "E" in num_str:
return float(num_str)
return int(num_str)
# Symbol
m = self.SYMBOL.match(self.text, self.pos)
if m:
self._advance(m.end() - self.pos)
name = m.group()
# Built-in literal symbols
if name == "true":
return True
if name == "false":
return False
if name == "nil":
return NIL
return Symbol(name)
raise ParseError(f"Unexpected character: {char!r}", self.pos, self.line, self.col)
# ---------------------------------------------------------------------------
# Parsing
# ---------------------------------------------------------------------------
def parse(text: str) -> Any:
"""Parse a single s-expression from *text*.
>>> parse('(div :class "main" (p "hello"))')
[Symbol('div'), Keyword('class'), 'main', [Symbol('p'), 'hello']]
"""
tok = Tokenizer(text)
result = _parse_expr(tok)
if tok.peek() is not None:
raise ParseError("Unexpected content after expression", tok.pos, tok.line, tok.col)
return result
def parse_all(text: str) -> list[Any]:
"""Parse zero or more s-expressions from *text*."""
tok = Tokenizer(text)
results: list[Any] = []
while tok.peek() is not None:
results.append(_parse_expr(tok))
return results
def _parse_expr(tok: Tokenizer) -> Any:
token = tok.next_token()
if token is None:
raise ParseError("Unexpected end of input", tok.pos, tok.line, tok.col)
if token == "(":
return _parse_list(tok, ")")
if token == "[":
return _parse_list(tok, "]")
if token == "{":
return _parse_map(tok)
if token in (")", "]", "}"):
raise ParseError(f"Unexpected {token!r}", tok.pos, tok.line, tok.col)
return token
def _parse_list(tok: Tokenizer, closer: str) -> list[Any]:
items: list[Any] = []
while True:
c = tok.peek()
if c is None:
raise ParseError(f"Unterminated list, expected {closer!r}", tok.pos, tok.line, tok.col)
if c == closer:
tok.next_token()
return items
items.append(_parse_expr(tok))
def _parse_map(tok: Tokenizer) -> dict[str, Any]:
result: dict[str, Any] = {}
while True:
c = tok.peek()
if c is None:
raise ParseError("Unterminated map, expected '}'", tok.pos, tok.line, tok.col)
if c == "}":
tok.next_token()
return result
key_token = _parse_expr(tok)
if isinstance(key_token, Keyword):
key = key_token.name
elif isinstance(key_token, str):
key = key_token
else:
raise ParseError(
f"Map key must be keyword or string, got {type(key_token).__name__}",
tok.pos, tok.line, tok.col,
)
result[key] = _parse_expr(tok)
# ---------------------------------------------------------------------------
# Serialization
# ---------------------------------------------------------------------------
def serialize(expr: Any, indent: int = 0, pretty: bool = False) -> str:
"""Serialize a value back to s-expression text."""
if isinstance(expr, list):
if not expr:
return "()"
if pretty:
return _serialize_pretty(expr, indent)
items = [serialize(item, indent, False) for item in expr]
return "(" + " ".join(items) + ")"
if isinstance(expr, Symbol):
return expr.name
if isinstance(expr, Keyword):
return f":{expr.name}"
if isinstance(expr, str):
escaped = (
expr.replace("\\", "\\\\")
.replace('"', '\\"')
.replace("\n", "\\n")
.replace("\t", "\\t")
)
return f'"{escaped}"'
if isinstance(expr, bool):
return "true" if expr else "false"
if isinstance(expr, (int, float)):
return str(expr)
if expr is None or isinstance(expr, type(NIL)):
return "nil"
if isinstance(expr, dict):
items: list[str] = []
for k, v in expr.items():
items.append(f":{k}")
items.append(serialize(v, indent, pretty))
return "{" + " ".join(items) + "}"
# Fallback for Lambda/Component — show repr
return repr(expr)
def _serialize_pretty(expr: list, indent: int) -> str:
if not expr:
return "()"
inner_prefix = " " * (indent + 1)
# Try compact first
compact = serialize(expr, indent, False)
if len(compact) < 72 and "\n" not in compact:
return compact
head = serialize(expr[0], indent + 1, False)
parts = [f"({head}"]
i = 1
while i < len(expr):
item = expr[i]
if isinstance(item, Keyword) and i + 1 < len(expr):
key = serialize(item, 0, False)
val = serialize(expr[i + 1], indent + 1, False)
if len(val) < 50 and "\n" not in val:
parts.append(f"{inner_prefix}{key} {val}")
else:
val_p = serialize(expr[i + 1], indent + 1, True)
parts.append(f"{inner_prefix}{key} {val_p}")
i += 2
else:
item_str = serialize(item, indent + 1, True)
parts.append(f"{inner_prefix}{item_str}")
i += 1
return "\n".join(parts) + ")"

417
shared/sexp/primitives.py Normal file
View File

@@ -0,0 +1,417 @@
"""
Primitive registry and built-in pure functions.
All primitives here are pure (no I/O). Async / I/O primitives live in
separate modules and are registered at app startup.
"""
from __future__ import annotations
import math
from typing import Any, Callable
from .types import Keyword, Lambda, NIL
# ---------------------------------------------------------------------------
# Registry
# ---------------------------------------------------------------------------
_PRIMITIVES: dict[str, Callable] = {}
def register_primitive(name: str):
"""Decorator that registers a callable as a named primitive.
Usage::
@register_primitive("str")
def prim_str(*args):
return "".join(str(a) for a in args)
"""
def decorator(fn: Callable) -> Callable:
_PRIMITIVES[name] = fn
return fn
return decorator
def get_primitive(name: str) -> Callable | None:
return _PRIMITIVES.get(name)
def all_primitives() -> dict[str, Callable]:
"""Return a snapshot of the registry (name → callable)."""
return dict(_PRIMITIVES)
# ---------------------------------------------------------------------------
# Arithmetic
# ---------------------------------------------------------------------------
@register_primitive("+")
def prim_add(*args: Any) -> Any:
return sum(args)
@register_primitive("-")
def prim_sub(a: Any, b: Any = None) -> Any:
return -a if b is None else a - b
@register_primitive("*")
def prim_mul(*args: Any) -> Any:
r = 1
for a in args:
r *= a
return r
@register_primitive("/")
def prim_div(a: Any, b: Any) -> Any:
return a / b
@register_primitive("mod")
def prim_mod(a: Any, b: Any) -> Any:
return a % b
@register_primitive("sqrt")
def prim_sqrt(x: Any) -> float:
return math.sqrt(x)
@register_primitive("pow")
def prim_pow(x: Any, n: Any) -> Any:
return x ** n
@register_primitive("abs")
def prim_abs(x: Any) -> Any:
return abs(x)
@register_primitive("floor")
def prim_floor(x: Any) -> int:
return math.floor(x)
@register_primitive("ceil")
def prim_ceil(x: Any) -> int:
return math.ceil(x)
@register_primitive("round")
def prim_round(x: Any, ndigits: Any = 0) -> Any:
return round(x, int(ndigits))
@register_primitive("min")
def prim_min(*args: Any) -> Any:
if len(args) == 1 and isinstance(args[0], (list, tuple)):
return min(args[0])
return min(args)
@register_primitive("max")
def prim_max(*args: Any) -> Any:
if len(args) == 1 and isinstance(args[0], (list, tuple)):
return max(args[0])
return max(args)
@register_primitive("clamp")
def prim_clamp(x: Any, lo: Any, hi: Any) -> Any:
return max(lo, min(hi, x))
@register_primitive("inc")
def prim_inc(n: Any) -> Any:
return n + 1
@register_primitive("dec")
def prim_dec(n: Any) -> Any:
return n - 1
# ---------------------------------------------------------------------------
# Comparison
# ---------------------------------------------------------------------------
@register_primitive("=")
def prim_eq(a: Any, b: Any) -> bool:
return a == b
@register_primitive("!=")
def prim_neq(a: Any, b: Any) -> bool:
return a != b
@register_primitive("<")
def prim_lt(a: Any, b: Any) -> bool:
return a < b
@register_primitive(">")
def prim_gt(a: Any, b: Any) -> bool:
return a > b
@register_primitive("<=")
def prim_lte(a: Any, b: Any) -> bool:
return a <= b
@register_primitive(">=")
def prim_gte(a: Any, b: Any) -> bool:
return a >= b
# ---------------------------------------------------------------------------
# Predicates
# ---------------------------------------------------------------------------
@register_primitive("odd?")
def prim_is_odd(n: Any) -> bool:
return n % 2 == 1
@register_primitive("even?")
def prim_is_even(n: Any) -> bool:
return n % 2 == 0
@register_primitive("zero?")
def prim_is_zero(n: Any) -> bool:
return n == 0
@register_primitive("nil?")
def prim_is_nil(x: Any) -> bool:
return x is None or x is NIL
@register_primitive("number?")
def prim_is_number(x: Any) -> bool:
return isinstance(x, (int, float))
@register_primitive("string?")
def prim_is_string(x: Any) -> bool:
return isinstance(x, str)
@register_primitive("list?")
def prim_is_list(x: Any) -> bool:
return isinstance(x, list)
@register_primitive("dict?")
def prim_is_dict(x: Any) -> bool:
return isinstance(x, dict)
@register_primitive("empty?")
def prim_is_empty(coll: Any) -> bool:
if coll is None or coll is NIL:
return True
return len(coll) == 0
@register_primitive("contains?")
def prim_contains(coll: Any, key: Any) -> bool:
if isinstance(coll, dict):
k = key.name if isinstance(key, Keyword) else key
return k in coll
if isinstance(coll, (list, tuple)):
return key in coll
return False
# ---------------------------------------------------------------------------
# Logic (non-short-circuit versions; and/or are special forms)
# ---------------------------------------------------------------------------
@register_primitive("not")
def prim_not(x: Any) -> bool:
return not x
# ---------------------------------------------------------------------------
# Strings
# ---------------------------------------------------------------------------
@register_primitive("str")
def prim_str(*args: Any) -> str:
parts: list[str] = []
for a in args:
if a is None or a is NIL:
parts.append("")
elif isinstance(a, bool):
parts.append("true" if a else "false")
else:
parts.append(str(a))
return "".join(parts)
@register_primitive("concat")
def prim_concat(*colls: Any) -> list:
result: list[Any] = []
for c in colls:
if c is not None and c is not NIL:
result.extend(c)
return result
@register_primitive("upper")
def prim_upper(s: str) -> str:
return s.upper()
@register_primitive("lower")
def prim_lower(s: str) -> str:
return s.lower()
@register_primitive("trim")
def prim_trim(s: str) -> str:
return s.strip()
@register_primitive("split")
def prim_split(s: str, sep: str = " ") -> list[str]:
return s.split(sep)
@register_primitive("join")
def prim_join(sep: str, coll: list) -> str:
return sep.join(str(x) for x in coll)
@register_primitive("starts-with?")
def prim_starts_with(s: str, prefix: str) -> bool:
return s.startswith(prefix)
@register_primitive("ends-with?")
def prim_ends_with(s: str, suffix: str) -> bool:
return s.endswith(suffix)
# ---------------------------------------------------------------------------
# Collections — construction
# ---------------------------------------------------------------------------
@register_primitive("list")
def prim_list(*args: Any) -> list:
return list(args)
@register_primitive("dict")
def prim_dict(*pairs: Any) -> dict:
result: dict[str, Any] = {}
i = 0
while i < len(pairs) - 1:
key = pairs[i]
if isinstance(key, Keyword):
key = key.name
result[key] = pairs[i + 1]
i += 2
return result
@register_primitive("range")
def prim_range(start: Any, end: Any, step: Any = 1) -> list[int]:
return list(range(int(start), int(end), int(step)))
# ---------------------------------------------------------------------------
# Collections — access
# ---------------------------------------------------------------------------
@register_primitive("get")
def prim_get(coll: Any, key: Any, default: Any = None) -> Any:
if isinstance(coll, dict):
result = coll.get(key)
if result is not None:
return result
if isinstance(key, Keyword):
result = coll.get(key.name)
if result is not None:
return result
return default
if isinstance(coll, list):
return coll[key] if 0 <= key < len(coll) else default
return default
@register_primitive("len")
def prim_len(coll: Any) -> int:
return len(coll)
@register_primitive("first")
def prim_first(coll: Any) -> Any:
return coll[0] if coll else NIL
@register_primitive("last")
def prim_last(coll: Any) -> Any:
return coll[-1] if coll else NIL
@register_primitive("rest")
def prim_rest(coll: Any) -> list:
return coll[1:] if coll else []
@register_primitive("nth")
def prim_nth(coll: Any, n: Any) -> Any:
return coll[n] if 0 <= n < len(coll) else NIL
@register_primitive("cons")
def prim_cons(x: Any, coll: Any) -> list:
return [x] + list(coll) if coll else [x]
@register_primitive("append")
def prim_append(coll: Any, x: Any) -> list:
return list(coll) + [x] if coll else [x]
@register_primitive("chunk-every")
def prim_chunk_every(coll: Any, n: Any) -> list:
n = int(n)
return [coll[i : i + n] for i in range(0, len(coll), n)]
@register_primitive("zip-pairs")
def prim_zip_pairs(coll: Any) -> list:
if not coll or len(coll) < 2:
return []
return [[coll[i], coll[i + 1]] for i in range(len(coll) - 1)]
# ---------------------------------------------------------------------------
# Collections — dict operations
# ---------------------------------------------------------------------------
@register_primitive("keys")
def prim_keys(d: dict) -> list:
return list(d.keys())
@register_primitive("vals")
def prim_vals(d: dict) -> list:
return list(d.values())
@register_primitive("merge")
def prim_merge(*dicts: Any) -> dict:
result: dict[str, Any] = {}
for d in dicts:
if d is not None and d is not NIL:
result.update(d)
return result
@register_primitive("assoc")
def prim_assoc(d: Any, *pairs: Any) -> dict:
result = dict(d) if d and d is not NIL else {}
i = 0
while i < len(pairs) - 1:
key = pairs[i]
if isinstance(key, Keyword):
key = key.name
result[key] = pairs[i + 1]
i += 2
return result
@register_primitive("dissoc")
def prim_dissoc(d: Any, *keys_to_remove: Any) -> dict:
result = dict(d) if d and d is not NIL else {}
for key in keys_to_remove:
if isinstance(key, Keyword):
key = key.name
result.pop(key, None)
return result
@register_primitive("into")
def prim_into(target: Any, coll: Any) -> Any:
if isinstance(target, list):
if isinstance(coll, dict):
return [[k, v] for k, v in coll.items()]
return list(coll)
if isinstance(target, dict):
if isinstance(coll, dict):
return dict(coll)
result: dict[str, Any] = {}
for item in coll:
if isinstance(item, (list, tuple)) and len(item) >= 2:
key = item[0].name if isinstance(item[0], Keyword) else item[0]
result[key] = item[1]
return result
raise ValueError(f"into: unsupported target type {type(target).__name__}")
# ---------------------------------------------------------------------------
# Assertions
# ---------------------------------------------------------------------------
@register_primitive("assert")
def prim_assert(condition: Any, message: str = "Assertion failed") -> bool:
if not condition:
raise RuntimeError(f"Assertion error: {message}")
return True

View File

View File

@@ -0,0 +1,326 @@
"""Tests for the s-expression evaluator."""
import pytest
from shared.sexp import parse, evaluate, EvalError, Symbol, Keyword, NIL
from shared.sexp.types import Lambda, Component
# ---------------------------------------------------------------------------
# Helpers
# ---------------------------------------------------------------------------
def ev(text, env=None):
"""Parse and evaluate a single expression."""
return evaluate(parse(text), env)
# ---------------------------------------------------------------------------
# Literals and lookups
# ---------------------------------------------------------------------------
class TestLiterals:
def test_int(self):
assert ev("42") == 42
def test_string(self):
assert ev('"hello"') == "hello"
def test_true(self):
assert ev("true") is True
def test_nil(self):
assert ev("nil") is NIL
def test_symbol_lookup(self):
assert ev("x", {"x": 10}) == 10
def test_undefined_symbol(self):
with pytest.raises(EvalError, match="Undefined symbol"):
ev("xyz")
def test_keyword_evaluates_to_name(self):
assert ev(":foo") == "foo"
# ---------------------------------------------------------------------------
# Arithmetic
# ---------------------------------------------------------------------------
class TestArithmetic:
def test_add(self):
assert ev("(+ 1 2 3)") == 6
def test_sub(self):
assert ev("(- 10 3)") == 7
def test_negate(self):
assert ev("(- 5)") == -5
def test_mul(self):
assert ev("(* 2 3 4)") == 24
def test_div(self):
assert ev("(/ 10 4)") == 2.5
def test_mod(self):
assert ev("(mod 7 3)") == 1
def test_clamp(self):
assert ev("(clamp 15 0 10)") == 10
assert ev("(clamp -5 0 10)") == 0
assert ev("(clamp 5 0 10)") == 5
# ---------------------------------------------------------------------------
# Comparison and predicates
# ---------------------------------------------------------------------------
class TestComparison:
def test_eq(self):
assert ev("(= 1 1)") is True
assert ev("(= 1 2)") is False
def test_lt_gt(self):
assert ev("(< 1 2)") is True
assert ev("(> 2 1)") is True
def test_predicates(self):
assert ev("(odd? 3)") is True
assert ev("(even? 4)") is True
assert ev("(zero? 0)") is True
assert ev("(nil? nil)") is True
assert ev('(string? "hi")') is True
assert ev("(number? 42)") is True
assert ev("(list? (list 1))") is True
assert ev("(dict? {:a 1})") is True
# ---------------------------------------------------------------------------
# Special forms
# ---------------------------------------------------------------------------
class TestSpecialForms:
def test_if_true(self):
assert ev("(if true 1 2)") == 1
def test_if_false(self):
assert ev("(if false 1 2)") == 2
def test_if_no_else(self):
assert ev("(if false 1)") is NIL
def test_when_true(self):
assert ev("(when true 42)") == 42
def test_when_false(self):
assert ev("(when false 42)") is NIL
def test_and_short_circuit(self):
assert ev("(and true true 3)") == 3
assert ev("(and true false 3)") is False
def test_or_short_circuit(self):
assert ev("(or false false 3)") == 3
assert ev("(or false 2 3)") == 2
def test_let_scheme_style(self):
assert ev("(let ((x 10) (y 20)) (+ x y))") == 30
def test_let_clojure_style(self):
assert ev("(let (x 10 y 20) (+ x y))") == 30
def test_let_sequential(self):
assert ev("(let ((x 1) (y (+ x 1))) y)") == 2
def test_begin(self):
assert ev("(begin 1 2 3)") == 3
def test_quote(self):
result = ev("(quote (a b c))")
assert result == [Symbol("a"), Symbol("b"), Symbol("c")]
def test_cond_clojure(self):
assert ev("(cond false 1 true 2 :else 3)") == 2
def test_cond_else(self):
assert ev("(cond false 1 false 2 :else 99)") == 99
def test_case(self):
assert ev('(case 2 1 "one" 2 "two" :else "other")') == "two"
def test_thread_first(self):
assert ev("(-> 5 (+ 3) (* 2))") == 16
def test_define(self):
env = {}
ev("(define x 42)", env)
assert env["x"] == 42
# ---------------------------------------------------------------------------
# Lambda
# ---------------------------------------------------------------------------
class TestLambda:
def test_create_and_call(self):
assert ev("((fn (x) (* x x)) 5)") == 25
def test_closure(self):
result = ev("(let ((a 10)) ((fn (x) (+ x a)) 5))")
assert result == 15
def test_higher_order(self):
result = ev("(let ((double (fn (x) (* x 2)))) (double 7))")
assert result == 14
# ---------------------------------------------------------------------------
# Collections
# ---------------------------------------------------------------------------
class TestCollections:
def test_list_constructor(self):
assert ev("(list 1 2 3)") == [1, 2, 3]
def test_dict_constructor(self):
assert ev("(dict :a 1 :b 2)") == {"a": 1, "b": 2}
def test_get_dict(self):
assert ev('(get {:a 1 :b 2} "a")') == 1
def test_get_list(self):
assert ev("(get (list 10 20 30) 1)") == 20
def test_first_last_rest(self):
assert ev("(first (list 1 2 3))") == 1
assert ev("(last (list 1 2 3))") == 3
assert ev("(rest (list 1 2 3))") == [2, 3]
def test_len(self):
assert ev("(len (list 1 2 3))") == 3
def test_concat(self):
assert ev("(concat (list 1 2) (list 3 4))") == [1, 2, 3, 4]
def test_cons(self):
assert ev("(cons 0 (list 1 2))") == [0, 1, 2]
def test_keys_vals(self):
assert ev("(keys {:a 1 :b 2})") == ["a", "b"]
assert ev("(vals {:a 1 :b 2})") == [1, 2]
def test_merge(self):
assert ev("(merge {:a 1} {:b 2} {:a 3})") == {"a": 3, "b": 2}
def test_assoc(self):
assert ev('(assoc {:a 1} :b 2)') == {"a": 1, "b": 2}
def test_dissoc(self):
assert ev('(dissoc {:a 1 :b 2} :a)') == {"b": 2}
def test_empty(self):
assert ev("(empty? (list))") is True
assert ev("(empty? (list 1))") is False
assert ev("(empty? nil)") is True
def test_contains(self):
assert ev('(contains? {:a 1} "a")') is True
assert ev("(contains? (list 1 2 3) 2)") is True
# ---------------------------------------------------------------------------
# Higher-order forms
# ---------------------------------------------------------------------------
class TestHigherOrder:
def test_map(self):
assert ev("(map (fn (x) (* x x)) (list 1 2 3 4))") == [1, 4, 9, 16]
def test_map_indexed(self):
result = ev("(map-indexed (fn (i x) (+ i x)) (list 10 20 30))")
assert result == [10, 21, 32]
def test_filter(self):
assert ev("(filter (fn (x) (> x 2)) (list 1 2 3 4))") == [3, 4]
def test_reduce(self):
assert ev("(reduce (fn (acc x) (+ acc x)) 0 (list 1 2 3))") == 6
def test_some(self):
assert ev("(some (fn (x) (if (> x 3) x nil)) (list 1 2 4 5))") == 4
def test_every(self):
assert ev("(every? (fn (x) (> x 0)) (list 1 2 3))") is True
assert ev("(every? (fn (x) (> x 2)) (list 1 2 3))") is False
# ---------------------------------------------------------------------------
# Strings
# ---------------------------------------------------------------------------
class TestStrings:
def test_str(self):
assert ev('(str "hello" " " "world")') == "hello world"
def test_str_numbers(self):
assert ev('(str "val=" 42)') == "val=42"
def test_upper_lower(self):
assert ev('(upper "hello")') == "HELLO"
assert ev('(lower "HELLO")') == "hello"
def test_join(self):
assert ev('(join ", " (list "a" "b" "c"))') == "a, b, c"
def test_split(self):
assert ev('(split "a,b,c" ",")') == ["a", "b", "c"]
# ---------------------------------------------------------------------------
# defcomp
# ---------------------------------------------------------------------------
class TestDefcomp:
def test_basic_component(self):
env = {}
ev("(defcomp ~card (&key title) title)", env)
assert isinstance(env["~card"], Component)
assert env["~card"].name == "card"
def test_component_call(self):
env = {}
ev("(defcomp ~greeting (&key name) (str \"Hello, \" name \"!\"))", env)
result = ev('(~greeting :name "Alice")', env)
assert result == "Hello, Alice!"
def test_component_with_children(self):
env = {}
ev("(defcomp ~wrapper (&key class &rest children) (list class children))", env)
result = ev('(~wrapper :class "box" 1 2 3)', env)
assert result == ["box", [1, 2, 3]]
def test_component_missing_kwarg_is_nil(self):
env = {}
ev("(defcomp ~opt (&key x y) (list x y))", env)
result = ev("(~opt :x 1)", env)
assert result == [1, NIL]
# ---------------------------------------------------------------------------
# Dict literal evaluation
# ---------------------------------------------------------------------------
class TestDictLiteral:
def test_dict_values_evaluated(self):
assert ev("{:a (+ 1 2) :b (* 3 4)}") == {"a": 3, "b": 12}
# ---------------------------------------------------------------------------
# set!
# ---------------------------------------------------------------------------
class TestSetBang:
def test_set_bang(self):
env = {"x": 1}
ev("(set! x 42)", env)
assert env["x"] == 42

View File

@@ -0,0 +1,191 @@
"""Tests for the s-expression parser."""
import pytest
from shared.sexp.parser import parse, parse_all, serialize, ParseError
from shared.sexp.types import Symbol, Keyword, NIL
# ---------------------------------------------------------------------------
# Atoms
# ---------------------------------------------------------------------------
class TestAtoms:
def test_integer(self):
assert parse("42") == 42
def test_negative_integer(self):
assert parse("-7") == -7
def test_float(self):
assert parse("3.14") == 3.14
def test_scientific(self):
assert parse("1e-3") == 0.001
def test_string(self):
assert parse('"hello world"') == "hello world"
def test_string_escapes(self):
assert parse(r'"line1\nline2"') == "line1\nline2"
assert parse(r'"tab\there"') == "tab\there"
assert parse(r'"say \"hi\""') == 'say "hi"'
def test_symbol(self):
assert parse("foo") == Symbol("foo")
def test_component_symbol(self):
s = parse("~card")
assert s == Symbol("~card")
assert s.is_component
def test_keyword(self):
assert parse(":class") == Keyword("class")
def test_true(self):
assert parse("true") is True
def test_false(self):
assert parse("false") is False
def test_nil(self):
assert parse("nil") is NIL
# ---------------------------------------------------------------------------
# Lists
# ---------------------------------------------------------------------------
class TestLists:
def test_empty_list(self):
assert parse("()") == []
def test_simple_list(self):
assert parse("(1 2 3)") == [1, 2, 3]
def test_mixed_list(self):
result = parse('(div :class "main")')
assert result == [Symbol("div"), Keyword("class"), "main"]
def test_nested_list(self):
result = parse("(a (b c) d)")
assert result == [Symbol("a"), [Symbol("b"), Symbol("c")], Symbol("d")]
def test_vector_sugar(self):
assert parse("[1 2 3]") == [1, 2, 3]
# ---------------------------------------------------------------------------
# Maps
# ---------------------------------------------------------------------------
class TestMaps:
def test_simple_map(self):
result = parse('{:a 1 :b 2}')
assert result == {"a": 1, "b": 2}
def test_nested_map(self):
result = parse('{:x {:y 3}}')
assert result == {"x": {"y": 3}}
def test_string_keys(self):
result = parse('{"name" "alice"}')
assert result == {"name": "alice"}
# ---------------------------------------------------------------------------
# Comments
# ---------------------------------------------------------------------------
class TestComments:
def test_line_comment(self):
assert parse("; comment\n42") == 42
def test_inline_comment(self):
result = parse("(a ; stuff\nb)")
assert result == [Symbol("a"), Symbol("b")]
# ---------------------------------------------------------------------------
# parse_all
# ---------------------------------------------------------------------------
class TestParseAll:
def test_multiple(self):
results = parse_all("1 2 3")
assert results == [1, 2, 3]
def test_multiple_lists(self):
results = parse_all("(a) (b)")
assert results == [[Symbol("a")], [Symbol("b")]]
def test_empty(self):
assert parse_all("") == []
assert parse_all(" ; only comments\n") == []
# ---------------------------------------------------------------------------
# Errors
# ---------------------------------------------------------------------------
class TestErrors:
def test_unterminated_list(self):
with pytest.raises(ParseError):
parse("(a b")
def test_unterminated_string(self):
with pytest.raises(ParseError):
parse('"hello')
def test_unexpected_closer(self):
with pytest.raises(ParseError):
parse(")")
def test_trailing_content(self):
with pytest.raises(ParseError):
parse("1 2")
# ---------------------------------------------------------------------------
# Serialization
# ---------------------------------------------------------------------------
class TestSerialize:
def test_int(self):
assert serialize(42) == "42"
def test_float(self):
assert serialize(3.14) == "3.14"
def test_string(self):
assert serialize("hello") == '"hello"'
def test_string_escapes(self):
assert serialize('say "hi"') == '"say \\"hi\\""'
def test_symbol(self):
assert serialize(Symbol("foo")) == "foo"
def test_keyword(self):
assert serialize(Keyword("class")) == ":class"
def test_bool(self):
assert serialize(True) == "true"
assert serialize(False) == "false"
def test_nil(self):
assert serialize(None) == "nil"
assert serialize(NIL) == "nil"
def test_list(self):
assert serialize([Symbol("a"), 1, 2]) == "(a 1 2)"
def test_empty_list(self):
assert serialize([]) == "()"
def test_dict(self):
result = serialize({"a": 1, "b": 2})
assert result == "{:a 1 :b 2}"
def test_roundtrip(self):
original = '(div :class "main" (p "hello") (span 42))'
assert serialize(parse(original)) == original

156
shared/sexp/types.py Normal file
View File

@@ -0,0 +1,156 @@
"""
Core types for the s-expression language.
Symbol — unquoted identifier (e.g. div, ~card, map)
Keyword — colon-prefixed key (e.g. :class, :id)
Lambda — callable closure created by (lambda ...) or (fn ...)
Nil — singleton null value
"""
from __future__ import annotations
from dataclasses import dataclass, field
from typing import Any
# ---------------------------------------------------------------------------
# Nil
# ---------------------------------------------------------------------------
class _Nil:
"""Singleton nil value — falsy, serialises as 'nil'."""
_instance: _Nil | None = None
def __new__(cls):
if cls._instance is None:
cls._instance = super().__new__(cls)
return cls._instance
def __bool__(self):
return False
def __repr__(self):
return "nil"
def __eq__(self, other):
return other is None or isinstance(other, _Nil)
def __hash__(self):
return hash(None)
NIL = _Nil()
# ---------------------------------------------------------------------------
# Symbol
# ---------------------------------------------------------------------------
@dataclass(frozen=True)
class Symbol:
"""An unquoted symbol/identifier."""
name: str
def __repr__(self):
return f"Symbol({self.name!r})"
def __eq__(self, other):
if isinstance(other, Symbol):
return self.name == other.name
if isinstance(other, str):
return self.name == other
return False
def __hash__(self):
return hash(self.name)
@property
def is_component(self) -> bool:
"""True if this symbol names a component (~prefix)."""
return self.name.startswith("~")
# ---------------------------------------------------------------------------
# Keyword
# ---------------------------------------------------------------------------
@dataclass(frozen=True)
class Keyword:
"""A keyword starting with colon (e.g. :class, :id)."""
name: str
def __repr__(self):
return f"Keyword({self.name!r})"
def __eq__(self, other):
if isinstance(other, Keyword):
return self.name == other.name
return False
def __hash__(self):
return hash((":", self.name))
# ---------------------------------------------------------------------------
# Lambda
# ---------------------------------------------------------------------------
@dataclass
class Lambda:
"""A callable closure.
Created by ``(lambda (x) body)`` or ``(fn (x) body)``.
Captures the defining environment so free variables resolve correctly.
"""
params: list[str]
body: Any
closure: dict[str, Any] = field(default_factory=dict)
name: str | None = None # optional, set by (define name (fn ...))
def __repr__(self):
tag = self.name or "lambda"
return f"<{tag}({', '.join(self.params)})>"
def __call__(self, *args: Any, evaluator: Any = None, caller_env: dict | None = None) -> Any:
"""Invoke the lambda. Requires *evaluator* — the evaluate() function."""
if evaluator is None:
raise RuntimeError("Lambda requires evaluator to be called")
if len(args) != len(self.params):
raise RuntimeError(
f"{self!r} expects {len(self.params)} args, got {len(args)}"
)
local = dict(self.closure)
if caller_env:
local.update(caller_env)
for p, v in zip(self.params, args):
local[p] = v
return evaluator(self.body, local)
# ---------------------------------------------------------------------------
# Component
# ---------------------------------------------------------------------------
@dataclass
class Component:
"""A reusable UI component defined via ``(defcomp ~name (&key ...) body)``.
Components are like lambdas but accept keyword arguments and support
a ``children`` rest parameter.
"""
name: str
params: list[str] # keyword parameter names (without &key prefix)
has_children: bool # True if &rest children declared
body: Any # unevaluated s-expression body
closure: dict[str, Any] = field(default_factory=dict)
def __repr__(self):
return f"<Component ~{self.name}({', '.join(self.params)})>"
# ---------------------------------------------------------------------------
# Type alias
# ---------------------------------------------------------------------------
# An s-expression value after evaluation
SExp = int | float | str | bool | Symbol | Keyword | Lambda | Component | list | dict | _Nil | None

View File

@@ -0,0 +1,38 @@
# Tier 2: Integration tests — needs ffmpeg, full artdag pipeline
# Covers: artdag/test/, artdag/l1/ loose test files (non-GPU)
FROM python:3.11-slim
ENV PYTHONDONTWRITEBYTECODE=1 \
PYTHONUNBUFFERED=1 \
PYTHONPATH=/app
WORKDIR /app
# System deps for media processing tests
RUN apt-get update && apt-get install -y --no-install-recommends \
ffmpeg \
ca-certificates \
&& rm -rf /var/lib/apt/lists/*
# Python deps
COPY shared/requirements.txt ./requirements-shared.txt
RUN pip install --no-cache-dir -r requirements-shared.txt
COPY artdag/core/ ./artdag/core/
RUN pip install --no-cache-dir -e artdag/core/[all,dev]
COPY artdag/common/ ./artdag/common/
RUN pip install --no-cache-dir -e artdag/common/
COPY artdag/l1/requirements.txt ./requirements-l1.txt
RUN pip install --no-cache-dir -r requirements-l1.txt
# Copy full L1 + test suite
COPY artdag/l1/ ./artdag/l1/
COPY artdag/test/ ./artdag/test/
COPY shared/ ./shared/
CMD ["python", "-m", "pytest", \
"artdag/test/", \
"-v", "--tb=short", \
"-k", "not gpu and not cuda"]

51
test/Dockerfile.unit Normal file
View File

@@ -0,0 +1,51 @@
# Tier 1: Unit tests — pure Python, no services, no DB, no Redis
# Covers: shared/, artdag/core/, artdag/l1/tests/, artdag/l1/sexp_effects/
FROM python:3.11-slim
ENV PYTHONDONTWRITEBYTECODE=1 \
PYTHONUNBUFFERED=1 \
PYTHONPATH=/app
WORKDIR /app
# Install shared deps (includes pytest, pytest-asyncio)
COPY shared/requirements.txt ./requirements-shared.txt
RUN pip install --no-cache-dir -r requirements-shared.txt pytest-watch
# Install artdag core as editable package
COPY artdag/core/ ./artdag/core/
RUN pip install --no-cache-dir -e artdag/core/[dev]
# Install artdag common
COPY artdag/common/ ./artdag/common/
RUN pip install --no-cache-dir -e artdag/common/
# Install L1 deps that unit tests need (not the heavy ones)
COPY artdag/l1/requirements.txt ./requirements-l1.txt
RUN pip install --no-cache-dir -r requirements-l1.txt 2>/dev/null || true
# Copy test subjects
COPY shared/ ./shared/
COPY artdag/l1/tests/ ./artdag/l1/tests/
COPY artdag/l1/sexp_effects/ ./artdag/l1/sexp_effects/
COPY artdag/l1/app/ ./artdag/l1/app/
COPY artdag/l1/database.py ./artdag/l1/database.py
COPY artdag/l1/ipfs_client.py ./artdag/l1/ipfs_client.py
COPY artdag/l1/storage_providers.py ./artdag/l1/storage_providers.py
COPY artdag/l1/cache_manager.py ./artdag/l1/cache_manager.py
COPY artdag/l1/claiming.py ./artdag/l1/claiming.py
COPY artdag/l1/path_registry.py ./artdag/l1/path_registry.py
COPY artdag/l1/celery_app.py ./artdag/l1/celery_app.py
COPY artdag/l1/pyproject.toml ./artdag/l1/pyproject.toml
# Default: run all unit tests
CMD ["python", "-m", "pytest", \
"shared/", \
"artdag/core/tests/", \
"artdag/core/artdag/sexp/", \
"artdag/l1/tests/", \
"artdag/l1/sexp_effects/", \
"-v", "--tb=short", \
"--ignore=artdag/l1/tests/test_jax_primitives.py", \
"--ignore=artdag/l1/tests/test_jax_pipeline_integration.py", \
"-k", "not gpu and not cuda"]