Files
rose-ash/shared/sx/primitives.py
giles f9f810ffd7 Complete Python eval removal: epoch protocol, scope consolidation, JIT fixes
Route all rendering through OCaml bridge — render_to_html no longer uses
Python async_eval. Fix register_components to parse &key params and &rest
children from defcomp forms. Remove all dead sx_ref.py imports.

Epoch protocol (prevents pipe desync):
- Every command prefixed with (epoch N), all responses tagged with epoch
- Both sides discard stale-epoch messages — desync structurally impossible
- OCaml main loop discards stale io-responses between commands

Consolidate scope primitives into sx_scope.ml:
- Single source of truth for scope-push!/pop!/peek, collect!/collected,
  emit!/emitted, context, and 12 other scope operations
- Removes duplicate registrations from sx_server.ml (including bugs where
  scope-emit! and clear-collected! were registered twice with different impls)
- Bind scope prims into env so JIT VM finds them via OP_GLOBAL_GET

JIT VM fixes:
- Trampoline thunks before passing args to CALL_PRIM
- as_list resolves thunks via _sx_trampoline_fn
- len handles all value types (Bool, Number, RawHTML, SxExpr, Spread, etc.)

Other fixes:
- ~cssx/tw signature: (tokens) → (&key tokens) to match callers
- Minimal Python evaluator in html.py for sync sx() Jinja function
- Python scope primitive stubs (thread-local) for non-OCaml paths
- Reader macro resolution via OcamlSync instead of sx_ref.py

Tests: 1114 OCaml, 1078 JS, 35 Python regression, 6/6 Playwright SSR

Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
2026-03-24 16:14:40 +00:00

633 lines
17 KiB
Python

"""
Primitive registry and built-in pure functions.
All primitives here are pure (no I/O). Async / I/O primitives live in
separate modules and are registered at app startup.
"""
from __future__ import annotations
import math
from typing import Any, Callable
from .types import Component, Island, Keyword, Lambda, Macro, NIL, Symbol
# ---------------------------------------------------------------------------
# Registry
# ---------------------------------------------------------------------------
_PRIMITIVES: dict[str, Callable] = {}
def register_primitive(name: str):
"""Decorator that registers a callable as a named primitive.
Usage::
@register_primitive("str")
def prim_str(*args):
return "".join(str(a) for a in args)
"""
def decorator(fn: Callable) -> Callable:
from .boundary import validate_primitive
validate_primitive(name)
_PRIMITIVES[name] = fn
return fn
return decorator
def get_primitive(name: str) -> Callable | None:
return _PRIMITIVES.get(name)
def all_primitives() -> dict[str, Callable]:
"""Return a snapshot of the registry (name → callable)."""
return dict(_PRIMITIVES)
# ---------------------------------------------------------------------------
# Arithmetic
# ---------------------------------------------------------------------------
@register_primitive("+")
def prim_add(*args: Any) -> Any:
return sum(args)
@register_primitive("-")
def prim_sub(a: Any, b: Any = None) -> Any:
return -a if b is None else a - b
@register_primitive("*")
def prim_mul(*args: Any) -> Any:
r = 1
for a in args:
r *= a
return r
@register_primitive("/")
def prim_div(a: Any, b: Any) -> Any:
return a / b
@register_primitive("mod")
def prim_mod(a: Any, b: Any) -> Any:
return a % b
@register_primitive("sqrt")
def prim_sqrt(x: Any) -> float:
return math.sqrt(x)
@register_primitive("pow")
def prim_pow(x: Any, n: Any) -> Any:
return x ** n
@register_primitive("abs")
def prim_abs(x: Any) -> Any:
return abs(x)
@register_primitive("floor")
def prim_floor(x: Any) -> int:
return math.floor(x)
@register_primitive("ceil")
def prim_ceil(x: Any) -> int:
return math.ceil(x)
@register_primitive("round")
def prim_round(x: Any, ndigits: Any = 0) -> Any:
return round(x, int(ndigits))
@register_primitive("min")
def prim_min(*args: Any) -> Any:
if len(args) == 1 and isinstance(args[0], (list, tuple)):
return min(args[0])
return min(args)
@register_primitive("max")
def prim_max(*args: Any) -> Any:
if len(args) == 1 and isinstance(args[0], (list, tuple)):
return max(args[0])
return max(args)
@register_primitive("clamp")
def prim_clamp(x: Any, lo: Any, hi: Any) -> Any:
return max(lo, min(hi, x))
@register_primitive("inc")
def prim_inc(n: Any) -> Any:
return n + 1
@register_primitive("dec")
def prim_dec(n: Any) -> Any:
return n - 1
# ---------------------------------------------------------------------------
# Comparison
# ---------------------------------------------------------------------------
@register_primitive("=")
def prim_eq(a: Any, b: Any) -> bool:
return a == b
@register_primitive("!=")
def prim_neq(a: Any, b: Any) -> bool:
return a != b
@register_primitive("eq?")
def prim_eq_identity(a: Any, b: Any) -> bool:
"""Identity equality — true only if a and b are the same object."""
return a is b
@register_primitive("eqv?")
def prim_eqv(a: Any, b: Any) -> bool:
"""Equivalent: identity for compound types, value for atoms."""
if a is b:
return True
if isinstance(a, (int, float, str, bool)) and isinstance(b, type(a)):
return a == b
if (a is None or a is NIL) and (b is None or b is NIL):
return True
return False
@register_primitive("equal?")
def prim_equal(a: Any, b: Any) -> bool:
"""Deep structural equality (same as =)."""
return a == b
@register_primitive("<")
def prim_lt(a: Any, b: Any) -> bool:
return a < b
@register_primitive(">")
def prim_gt(a: Any, b: Any) -> bool:
return a > b
@register_primitive("<=")
def prim_lte(a: Any, b: Any) -> bool:
return a <= b
@register_primitive(">=")
def prim_gte(a: Any, b: Any) -> bool:
return a >= b
# ---------------------------------------------------------------------------
# Predicates
# ---------------------------------------------------------------------------
@register_primitive("odd?")
def prim_is_odd(n: Any) -> bool:
return n % 2 == 1
@register_primitive("even?")
def prim_is_even(n: Any) -> bool:
return n % 2 == 0
@register_primitive("zero?")
def prim_is_zero(n: Any) -> bool:
return n == 0
@register_primitive("nil?")
def prim_is_nil(x: Any) -> bool:
return x is None or x is NIL
@register_primitive("boolean?")
def prim_is_boolean(x: Any) -> bool:
return isinstance(x, bool)
@register_primitive("number?")
def prim_is_number(x: Any) -> bool:
return isinstance(x, (int, float)) and not isinstance(x, bool)
@register_primitive("string?")
def prim_is_string(x: Any) -> bool:
return isinstance(x, str)
@register_primitive("list?")
def prim_is_list(x: Any) -> bool:
return isinstance(x, list)
@register_primitive("dict?")
def prim_is_dict(x: Any) -> bool:
return isinstance(x, dict)
@register_primitive("continuation?")
def prim_is_continuation(x: Any) -> bool:
from .types import Continuation
return isinstance(x, Continuation)
@register_primitive("empty?")
def prim_is_empty(coll: Any) -> bool:
if coll is None or coll is NIL:
return True
try:
return len(coll) == 0
except TypeError:
return False
@register_primitive("contains?")
def prim_contains(coll: Any, key: Any) -> bool:
if isinstance(coll, str):
return str(key) in coll
if isinstance(coll, dict):
k = key.name if isinstance(key, Keyword) else key
return k in coll
if isinstance(coll, (list, tuple)):
return key in coll
return False
# ---------------------------------------------------------------------------
# Logic (non-short-circuit versions; and/or are special forms)
# ---------------------------------------------------------------------------
@register_primitive("not")
def prim_not(x: Any) -> bool:
return not x
# ---------------------------------------------------------------------------
# Strings
# ---------------------------------------------------------------------------
@register_primitive("str")
def prim_str(*args: Any) -> str:
parts: list[str] = []
for a in args:
if a is None or a is NIL:
parts.append("")
elif isinstance(a, bool):
parts.append("true" if a else "false")
else:
parts.append(str(a))
return "".join(parts)
@register_primitive("concat")
def prim_concat(*colls: Any) -> list:
result: list[Any] = []
for c in colls:
if c is not None and c is not NIL:
result.extend(c)
return result
@register_primitive("upper")
@register_primitive("upcase")
def prim_upper(s: str) -> str:
return s.upper()
@register_primitive("lower")
@register_primitive("downcase")
def prim_lower(s: str) -> str:
return s.lower()
@register_primitive("string-length")
def prim_string_length(s: str) -> int:
return len(s)
@register_primitive("substring")
def prim_substring(s: str, start: int, end: int) -> str:
return s[int(start):int(end)]
@register_primitive("string-contains?")
def prim_string_contains(s: str, needle: str) -> bool:
return needle in s
@register_primitive("trim")
def prim_trim(s: str) -> str:
return s.strip()
@register_primitive("split")
def prim_split(s: str, sep: str = " ") -> list[str]:
return s.split(sep)
@register_primitive("join")
def prim_join(sep: str, coll: list) -> str:
return sep.join(str(x) for x in coll)
@register_primitive("replace")
def prim_replace(s: str, old: str, new: str) -> str:
return s.replace(old, new)
@register_primitive("slice")
def prim_slice(coll: Any, start: int, end: Any = None) -> Any:
"""Slice a string or list: (slice coll start end?)."""
start = int(start)
if end is None or end is NIL:
return coll[start:]
return coll[start:int(end)]
@register_primitive("index-of")
def prim_index_of(s: str, needle: str, start: int = 0) -> int:
return str(s).find(needle, int(start))
@register_primitive("starts-with?")
def prim_starts_with(s, prefix: str) -> bool:
if not isinstance(s, str):
return False
return s.startswith(prefix)
@register_primitive("ends-with?")
def prim_ends_with(s: str, suffix: str) -> bool:
return s.endswith(suffix)
# ---------------------------------------------------------------------------
# Collections — construction
# ---------------------------------------------------------------------------
@register_primitive("list")
def prim_list(*args: Any) -> list:
return list(args)
@register_primitive("dict")
def prim_dict(*pairs: Any) -> dict:
result: dict[str, Any] = {}
i = 0
while i < len(pairs) - 1:
key = pairs[i]
if isinstance(key, Keyword):
key = key.name
result[key] = pairs[i + 1]
i += 2
return result
@register_primitive("range")
def prim_range(start: Any, end: Any, step: Any = 1) -> list[int]:
return list(range(int(start), int(end), int(step)))
# ---------------------------------------------------------------------------
# Collections — access
# ---------------------------------------------------------------------------
@register_primitive("get")
def prim_get(coll: Any, key: Any, default: Any = None) -> Any:
if isinstance(coll, dict):
result = coll.get(key)
if result is not None:
return result
if isinstance(key, Keyword):
result = coll.get(key.name)
if result is not None:
return result
return default
if isinstance(coll, list):
return coll[key] if 0 <= key < len(coll) else default
if hasattr(coll, "get"):
return coll.get(key, default)
return default
@register_primitive("len")
def prim_len(coll: Any) -> int:
return len(coll)
@register_primitive("first")
def prim_first(coll: Any) -> Any:
return coll[0] if coll else NIL
@register_primitive("last")
def prim_last(coll: Any) -> Any:
return coll[-1] if coll else NIL
@register_primitive("rest")
def prim_rest(coll: Any) -> list:
return coll[1:] if coll else []
@register_primitive("nth")
def prim_nth(coll: Any, n: Any) -> Any:
return coll[n] if 0 <= n < len(coll) else NIL
@register_primitive("cons")
def prim_cons(x: Any, coll: Any) -> list:
return [x] + list(coll) if coll else [x]
@register_primitive("append")
def prim_append(coll: Any, x: Any) -> list:
if isinstance(x, list):
return list(coll) + x if coll else list(x)
return list(coll) + [x] if coll else [x]
@register_primitive("reverse")
def prim_reverse(coll: Any) -> list:
return list(reversed(coll)) if coll else []
@register_primitive("flatten")
def prim_flatten(coll: Any) -> list:
result = []
for item in (coll or []):
if isinstance(item, list):
result.extend(item)
else:
result.append(item)
return result
@register_primitive("has-key?")
def prim_has_key(d: Any, key: Any) -> bool:
if not isinstance(d, dict):
return False
k = key.name if isinstance(key, Keyword) else key
return k in d
@register_primitive("append!")
def prim_append_mut(coll: Any, x: Any) -> list:
coll.append(x)
return coll
@register_primitive("chunk-every")
def prim_chunk_every(coll: Any, n: Any) -> list:
n = int(n)
return [coll[i : i + n] for i in range(0, len(coll), n)]
@register_primitive("zip-pairs")
def prim_zip_pairs(coll: Any) -> list:
if not coll or len(coll) < 2:
return []
return [[coll[i], coll[i + 1]] for i in range(len(coll) - 1)]
# ---------------------------------------------------------------------------
# Collections — dict operations
# ---------------------------------------------------------------------------
@register_primitive("keys")
def prim_keys(d: dict) -> list:
return list(d.keys())
@register_primitive("vals")
def prim_vals(d: dict) -> list:
return list(d.values())
@register_primitive("merge")
def prim_merge(*dicts: Any) -> dict:
result: dict[str, Any] = {}
for d in dicts:
if d is not None and d is not NIL:
result.update(d)
return result
@register_primitive("assoc")
def prim_assoc(d: Any, *pairs: Any) -> dict:
result = dict(d) if d and d is not NIL else {}
i = 0
while i < len(pairs) - 1:
key = pairs[i]
if isinstance(key, Keyword):
key = key.name
result[key] = pairs[i + 1]
i += 2
return result
@register_primitive("dissoc")
def prim_dissoc(d: Any, *keys_to_remove: Any) -> dict:
result = dict(d) if d and d is not NIL else {}
for key in keys_to_remove:
if isinstance(key, Keyword):
key = key.name
result.pop(key, None)
return result
@register_primitive("dict-set!")
def prim_dict_set_mut(d: Any, key: Any, val: Any) -> Any:
if isinstance(key, Keyword):
key = key.name
d[key] = val
return val
# ---------------------------------------------------------------------------
# Type introspection — platform primitives declared in eval.sx
# ---------------------------------------------------------------------------
@register_primitive("type-of")
def prim_type_of(x: Any) -> str:
if isinstance(x, bool):
return "boolean"
if isinstance(x, (int, float)):
return "number"
if isinstance(x, str):
return "string"
if x is None or x is NIL:
return "nil"
if isinstance(x, Symbol):
return "symbol"
if isinstance(x, Keyword):
return "keyword"
if isinstance(x, list):
return "list"
if isinstance(x, dict):
return "dict"
if isinstance(x, Lambda):
return "lambda"
if isinstance(x, Component):
return "component"
if isinstance(x, Island):
return "island"
if isinstance(x, Macro):
return "macro"
return "unknown"
@register_primitive("symbol-name")
def prim_symbol_name(s: Any) -> str:
return s.name if isinstance(s, Symbol) else str(s)
@register_primitive("keyword-name")
def prim_keyword_name(k: Any) -> str:
return k.name if isinstance(k, Keyword) else str(k)
@register_primitive("sx-parse")
def prim_sx_parse(source: str) -> list:
from .parser import parse_all
return parse_all(source)
@register_primitive("into")
def prim_into(target: Any, coll: Any) -> Any:
if isinstance(target, list):
if isinstance(coll, dict):
return [[k, v] for k, v in coll.items()]
return list(coll)
if isinstance(target, dict):
if isinstance(coll, dict):
return dict(coll)
result: dict[str, Any] = {}
for item in coll:
if isinstance(item, (list, tuple)) and len(item) >= 2:
key = item[0].name if isinstance(item[0], Keyword) else item[0]
result[key] = item[1]
return result
raise ValueError(f"into: unsupported target type {type(target).__name__}")
@register_primitive("random-int")
def prim_random_int(low: int, high: int) -> int:
import random
return random.randint(int(low), int(high))
@register_primitive("json-encode")
def prim_json_encode(value) -> str:
import json
return json.dumps(value, indent=2)
# ---------------------------------------------------------------------------
# Scope primitives — delegate to sx_ref.py's scope stack implementation
# (shared global state between transpiled and hand-written evaluators)
# ---------------------------------------------------------------------------
def _register_scope_primitives():
"""Register scope/provide/collect primitive stubs.
The OCaml kernel provides the real implementations. These stubs exist
so _PRIMITIVES contains the names for dependency analysis, and so
any Python-side code that checks for their existence finds them.
"""
import threading
_scope_data = threading.local()
def _collect(channel, value):
if not hasattr(_scope_data, 'collected'):
_scope_data.collected = {}
_scope_data.collected.setdefault(channel, []).append(value)
return NIL
def _collected(channel):
if not hasattr(_scope_data, 'collected'):
return []
return list(_scope_data.collected.get(channel, []))
def _clear_collected(channel):
if hasattr(_scope_data, 'collected'):
_scope_data.collected.pop(channel, None)
return NIL
def _emit(channel, value):
if not hasattr(_scope_data, 'emitted'):
_scope_data.emitted = {}
_scope_data.emitted.setdefault(channel, []).append(value)
return NIL
def _emitted(channel):
if not hasattr(_scope_data, 'emitted'):
return []
return list(_scope_data.emitted.get(channel, []))
def _context(key):
if not hasattr(_scope_data, 'context'):
return NIL
return _scope_data.context.get(key, NIL) if isinstance(_scope_data.context, dict) else NIL
_PRIMITIVES["collect!"] = _collect
_PRIMITIVES["collected"] = _collected
_PRIMITIVES["clear-collected!"] = _clear_collected
_PRIMITIVES["emitted"] = _emitted
_PRIMITIVES["emit!"] = _emit
_PRIMITIVES["context"] = _context
_register_scope_primitives()