Route all rendering through OCaml bridge — render_to_html no longer uses Python async_eval. Fix register_components to parse &key params and &rest children from defcomp forms. Remove all dead sx_ref.py imports. Epoch protocol (prevents pipe desync): - Every command prefixed with (epoch N), all responses tagged with epoch - Both sides discard stale-epoch messages — desync structurally impossible - OCaml main loop discards stale io-responses between commands Consolidate scope primitives into sx_scope.ml: - Single source of truth for scope-push!/pop!/peek, collect!/collected, emit!/emitted, context, and 12 other scope operations - Removes duplicate registrations from sx_server.ml (including bugs where scope-emit! and clear-collected! were registered twice with different impls) - Bind scope prims into env so JIT VM finds them via OP_GLOBAL_GET JIT VM fixes: - Trampoline thunks before passing args to CALL_PRIM - as_list resolves thunks via _sx_trampoline_fn - len handles all value types (Bool, Number, RawHTML, SxExpr, Spread, etc.) Other fixes: - ~cssx/tw signature: (tokens) → (&key tokens) to match callers - Minimal Python evaluator in html.py for sync sx() Jinja function - Python scope primitive stubs (thread-local) for non-OCaml paths - Reader macro resolution via OcamlSync instead of sx_ref.py Tests: 1114 OCaml, 1078 JS, 35 Python regression, 6/6 Playwright SSR Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
550 lines
18 KiB
Python
550 lines
18 KiB
Python
"""
|
|
S-expression parser.
|
|
|
|
Supports:
|
|
- Lists: (a b c)
|
|
- Vectors: [a b c] (sugar for lists)
|
|
- Maps: {:key1 val1 :key2 val2}
|
|
- Symbols: foo, bar-baz, ->, ~card
|
|
- Keywords: :class, :id
|
|
- Strings: "hello world" (with \\n, \\t, \\", \\\\ escapes)
|
|
- Numbers: 42, 3.14, -1.5, 1e-3
|
|
- Comments: ; to end of line
|
|
- Fragment: <> (empty-tag symbol for fragment groups)
|
|
"""
|
|
|
|
from __future__ import annotations
|
|
|
|
import re
|
|
from typing import Any
|
|
|
|
from .types import Keyword, Symbol, NIL
|
|
|
|
# ---------------------------------------------------------------------------
|
|
# Reader macro registry
|
|
# ---------------------------------------------------------------------------
|
|
|
|
_READER_MACROS: dict[str, Any] = {}
|
|
|
|
|
|
def register_reader_macro(name: str, handler: Any) -> None:
|
|
"""Register a reader macro handler: #name expr → handler(expr)."""
|
|
_READER_MACROS[name] = handler
|
|
|
|
|
|
def _resolve_sx_reader_macro(name: str):
|
|
"""Auto-resolve a reader macro from the component env.
|
|
|
|
If a file like z3.sx defines (define z3-translate ...), then #z3 is
|
|
automatically available as a reader macro without any Python registration.
|
|
Looks for {name}-translate as a Lambda in the component env.
|
|
|
|
Uses the synchronous OCaml bridge (ocaml_sync) when available.
|
|
"""
|
|
try:
|
|
from .jinja_bridge import get_component_env
|
|
from .types import Lambda
|
|
except ImportError:
|
|
return None
|
|
env = get_component_env()
|
|
fn = env.get(f"{name}-translate")
|
|
if fn is None or not isinstance(fn, Lambda):
|
|
return None
|
|
# Use sync OCaml bridge to invoke the lambda
|
|
try:
|
|
from .ocaml_sync import OcamlSync
|
|
_sync = OcamlSync()
|
|
_sync.start()
|
|
def _sx_handler(expr):
|
|
from .parser import serialize as _ser
|
|
result = _sync.eval(f"({name}-translate {_ser(expr)})")
|
|
return parse(result) if result else expr
|
|
return _sx_handler
|
|
except Exception:
|
|
return None
|
|
|
|
|
|
# ---------------------------------------------------------------------------
|
|
# SxExpr — pre-built sx source marker
|
|
# ---------------------------------------------------------------------------
|
|
|
|
class SxExpr(str):
|
|
"""Pre-built sx source that serialize() outputs unquoted.
|
|
|
|
``SxExpr`` is a ``str`` subclass, so it works everywhere a plain
|
|
string does (join, startswith, f-strings, isinstance checks). The
|
|
only difference: ``serialize()`` emits it unquoted instead of
|
|
wrapping it in double-quotes.
|
|
|
|
Use this to nest sx call strings inside other sx_call() invocations
|
|
without them being quoted as strings::
|
|
|
|
sx_call("parent", child=sx_call("child", x=1))
|
|
# => (~parent :child (~child :x 1))
|
|
"""
|
|
|
|
def __new__(cls, source: str = "") -> "SxExpr":
|
|
return str.__new__(cls, source)
|
|
|
|
@property
|
|
def source(self) -> str:
|
|
"""The raw SX source string (backward compat)."""
|
|
return str.__str__(self)
|
|
|
|
def __repr__(self) -> str:
|
|
return f"SxExpr({str.__repr__(self)})"
|
|
|
|
def __add__(self, other: object) -> "SxExpr":
|
|
return SxExpr(str.__add__(self, str(other)))
|
|
|
|
def __radd__(self, other: object) -> "SxExpr":
|
|
return SxExpr(str.__add__(str(other), self))
|
|
|
|
|
|
# ---------------------------------------------------------------------------
|
|
# Errors
|
|
# ---------------------------------------------------------------------------
|
|
|
|
_ESCAPE_MAP = {"n": "\n", "t": "\t", "r": "\r", "0": "\0", '"': '"', "\\": "\\", "/": "/"}
|
|
|
|
|
|
def _unescape_string(s: str) -> str:
|
|
"""Process escape sequences in a parsed string, character by character."""
|
|
out: list[str] = []
|
|
i = 0
|
|
while i < len(s):
|
|
if s[i] == "\\" and i + 1 < len(s):
|
|
nxt = s[i + 1]
|
|
if nxt == "u" and i + 5 < len(s):
|
|
hex_str = s[i + 2:i + 6]
|
|
try:
|
|
out.append(chr(int(hex_str, 16)))
|
|
i += 6
|
|
continue
|
|
except ValueError:
|
|
pass # fall through to default handling
|
|
out.append(_ESCAPE_MAP.get(nxt, nxt))
|
|
i += 2
|
|
else:
|
|
out.append(s[i])
|
|
i += 1
|
|
return "".join(out)
|
|
|
|
|
|
class ParseError(Exception):
|
|
"""Error during s-expression parsing."""
|
|
|
|
def __init__(self, message: str, position: int = 0, line: int = 1, col: int = 1):
|
|
self.position = position
|
|
self.line = line
|
|
self.col = col
|
|
super().__init__(f"{message} at line {line}, column {col}")
|
|
|
|
|
|
# ---------------------------------------------------------------------------
|
|
# Tokenizer
|
|
# ---------------------------------------------------------------------------
|
|
|
|
class Tokenizer:
|
|
"""Stateful tokenizer that walks an s-expression string."""
|
|
|
|
WHITESPACE = re.compile(r"\s+")
|
|
COMMENT = re.compile(r";[^\n]*")
|
|
STRING = re.compile(r'"(?:[^"\\]|\\.)*"')
|
|
NUMBER = re.compile(r"-?(?:\d+\.?\d*|\.\d+)(?:[eE][+-]?\d+)?")
|
|
KEYWORD = re.compile(r":[a-zA-Z_~*+\-><=/!?&\[]{1}[a-zA-Z0-9_~*+\-><=/!?.:&/\[\]#,]*")
|
|
# Symbols may start with alpha, _, or common operator chars, plus ~ for components,
|
|
# <> for the fragment symbol, & for &key/&rest, and unicode letters (é, ñ, em-dash…).
|
|
SYMBOL = re.compile(r"[a-zA-Z_~*+\-><=/!?&\u0080-\uFFFF][a-zA-Z0-9_~*+\-><=/!?.:&\u0080-\uFFFF]*")
|
|
|
|
def __init__(self, text: str):
|
|
self.text = text
|
|
self.pos = 0
|
|
self.line = 1
|
|
self.col = 1
|
|
|
|
def _advance(self, count: int = 1):
|
|
for _ in range(count):
|
|
if self.pos < len(self.text):
|
|
if self.text[self.pos] == "\n":
|
|
self.line += 1
|
|
self.col = 1
|
|
else:
|
|
self.col += 1
|
|
self.pos += 1
|
|
|
|
def _skip_whitespace_and_comments(self):
|
|
while self.pos < len(self.text):
|
|
m = self.WHITESPACE.match(self.text, self.pos)
|
|
if m:
|
|
self._advance(m.end() - self.pos)
|
|
continue
|
|
m = self.COMMENT.match(self.text, self.pos)
|
|
if m:
|
|
self._advance(m.end() - self.pos)
|
|
continue
|
|
break
|
|
|
|
def peek(self) -> str | None:
|
|
self._skip_whitespace_and_comments()
|
|
if self.pos >= len(self.text):
|
|
return None
|
|
return self.text[self.pos]
|
|
|
|
def next_token(self) -> Any:
|
|
self._skip_whitespace_and_comments()
|
|
if self.pos >= len(self.text):
|
|
return None
|
|
|
|
char = self.text[self.pos]
|
|
|
|
# Delimiters
|
|
if char in "()[]{}":
|
|
self._advance()
|
|
return char
|
|
|
|
# String
|
|
if char == '"':
|
|
m = self.STRING.match(self.text, self.pos)
|
|
if not m:
|
|
raise ParseError("Unterminated string", self.pos, self.line, self.col)
|
|
self._advance(m.end() - self.pos)
|
|
content = m.group()[1:-1]
|
|
content = _unescape_string(content)
|
|
return content
|
|
|
|
# Keyword
|
|
if char == ":":
|
|
m = self.KEYWORD.match(self.text, self.pos)
|
|
if m:
|
|
self._advance(m.end() - self.pos)
|
|
return Keyword(m.group()[1:])
|
|
raise ParseError("Invalid keyword", self.pos, self.line, self.col)
|
|
|
|
# Number (check before symbol because of leading -)
|
|
if char.isdigit() or (
|
|
char == "-"
|
|
and self.pos + 1 < len(self.text)
|
|
and (self.text[self.pos + 1].isdigit() or self.text[self.pos + 1] == ".")
|
|
):
|
|
m = self.NUMBER.match(self.text, self.pos)
|
|
if m:
|
|
self._advance(m.end() - self.pos)
|
|
num_str = m.group()
|
|
if "." in num_str or "e" in num_str or "E" in num_str:
|
|
return float(num_str)
|
|
return int(num_str)
|
|
|
|
# Ellipsis (... as a symbol, used in spec declarations)
|
|
if char == "." and self.text[self.pos:self.pos + 3] == "...":
|
|
self._advance(3)
|
|
return Symbol("...")
|
|
|
|
# Symbol
|
|
m = self.SYMBOL.match(self.text, self.pos)
|
|
if m:
|
|
self._advance(m.end() - self.pos)
|
|
name = m.group()
|
|
# Built-in literal symbols
|
|
if name == "true":
|
|
return True
|
|
if name == "false":
|
|
return False
|
|
if name == "nil":
|
|
return NIL
|
|
return Symbol(name)
|
|
|
|
# Reader macro dispatch: #
|
|
if char == "#":
|
|
return "#"
|
|
|
|
raise ParseError(f"Unexpected character: {char!r}", self.pos, self.line, self.col)
|
|
|
|
def _read_raw_string(self) -> str:
|
|
"""Read raw string literal until closing |."""
|
|
buf: list[str] = []
|
|
while self.pos < len(self.text):
|
|
ch = self.text[self.pos]
|
|
if ch == "|":
|
|
self._advance(1)
|
|
return "".join(buf)
|
|
buf.append(ch)
|
|
self._advance(1)
|
|
raise ParseError("Unterminated raw string", self.pos, self.line, self.col)
|
|
|
|
def _read_ident(self) -> str:
|
|
"""Read an identifier (for reader macro names)."""
|
|
import re
|
|
m = self.SYMBOL.match(self.text, self.pos)
|
|
if m:
|
|
self._advance(m.end() - self.pos)
|
|
return m.group()
|
|
raise ParseError("Expected identifier after #", self.pos, self.line, self.col)
|
|
|
|
|
|
# ---------------------------------------------------------------------------
|
|
# Parsing
|
|
# ---------------------------------------------------------------------------
|
|
|
|
def parse(text: str) -> Any:
|
|
"""Parse a single s-expression from *text*.
|
|
|
|
>>> parse('(div :class "main" (p "hello"))')
|
|
[Symbol('div'), Keyword('class'), 'main', [Symbol('p'), 'hello']]
|
|
"""
|
|
tok = Tokenizer(text)
|
|
result = _parse_expr(tok)
|
|
if tok.peek() is not None:
|
|
raise ParseError("Unexpected content after expression", tok.pos, tok.line, tok.col)
|
|
return result
|
|
|
|
|
|
def parse_all(text: str) -> list[Any]:
|
|
"""Parse zero or more s-expressions from *text*."""
|
|
tok = Tokenizer(text)
|
|
results: list[Any] = []
|
|
while tok.peek() is not None:
|
|
results.append(_parse_expr(tok))
|
|
return results
|
|
|
|
|
|
def _parse_expr(tok: Tokenizer) -> Any:
|
|
# Use peek() (raw character) for structural decisions so that string
|
|
# values like ")" or "(" don't get confused with actual delimiters.
|
|
raw = tok.peek()
|
|
if raw is None:
|
|
raise ParseError("Unexpected end of input", tok.pos, tok.line, tok.col)
|
|
if raw in ")]}":
|
|
tok.next_token() # consume the delimiter
|
|
raise ParseError(f"Unexpected {raw!r}", tok.pos, tok.line, tok.col)
|
|
if raw == "(":
|
|
tok.next_token() # consume the '('
|
|
return _parse_list(tok, ")")
|
|
if raw == "[":
|
|
tok.next_token() # consume the '['
|
|
return _parse_list(tok, "]")
|
|
if raw == "{":
|
|
tok.next_token() # consume the '{'
|
|
return _parse_map(tok)
|
|
# Quote / quasiquote syntax: ' ` , ,@
|
|
if raw == "'":
|
|
tok._advance(1) # consume the quote
|
|
inner = _parse_expr(tok)
|
|
return [Symbol("quote"), inner]
|
|
if raw == "`":
|
|
tok._advance(1) # consume the backtick
|
|
inner = _parse_expr(tok)
|
|
return [Symbol("quasiquote"), inner]
|
|
if raw == ",":
|
|
tok._advance(1) # consume the comma
|
|
# Check for splice-unquote (,@) — no whitespace between , and @
|
|
if tok.pos < len(tok.text) and tok.text[tok.pos] == "@":
|
|
tok._advance(1) # consume the @
|
|
inner = _parse_expr(tok)
|
|
return [Symbol("splice-unquote"), inner]
|
|
inner = _parse_expr(tok)
|
|
return [Symbol("unquote"), inner]
|
|
# Reader macro dispatch: #
|
|
if raw == "#":
|
|
tok._advance(1) # consume the #
|
|
if tok.pos >= len(tok.text):
|
|
raise ParseError("Unexpected end of input after #",
|
|
tok.pos, tok.line, tok.col)
|
|
dispatch = tok.text[tok.pos]
|
|
if dispatch == ";":
|
|
tok._advance(1)
|
|
_parse_expr(tok) # read and discard
|
|
return _parse_expr(tok) # return next
|
|
if dispatch == "|":
|
|
tok._advance(1)
|
|
return tok._read_raw_string()
|
|
if dispatch == "'":
|
|
tok._advance(1)
|
|
return [Symbol("quote"), _parse_expr(tok)]
|
|
# Extensible dispatch: #name expr
|
|
if dispatch.isalpha() or dispatch in "_~":
|
|
macro_name = tok._read_ident()
|
|
handler = _READER_MACROS.get(macro_name)
|
|
if handler is None:
|
|
# Auto-resolve: look for {name}-translate in component env
|
|
handler = _resolve_sx_reader_macro(macro_name)
|
|
if handler is None:
|
|
raise ParseError(f"Unknown reader macro: #{macro_name}",
|
|
tok.pos, tok.line, tok.col)
|
|
return handler(_parse_expr(tok))
|
|
raise ParseError(f"Unknown reader macro: #{dispatch}",
|
|
tok.pos, tok.line, tok.col)
|
|
# Everything else: strings, keywords, symbols, numbers
|
|
token = tok.next_token()
|
|
return token
|
|
|
|
|
|
def _parse_list(tok: Tokenizer, closer: str) -> list[Any]:
|
|
items: list[Any] = []
|
|
while True:
|
|
c = tok.peek()
|
|
if c is None:
|
|
raise ParseError(f"Unterminated list, expected {closer!r}", tok.pos, tok.line, tok.col)
|
|
if c == closer:
|
|
tok.next_token()
|
|
return items
|
|
items.append(_parse_expr(tok))
|
|
|
|
|
|
def _parse_map(tok: Tokenizer) -> dict[str, Any]:
|
|
result: dict[str, Any] = {}
|
|
while True:
|
|
c = tok.peek()
|
|
if c is None:
|
|
raise ParseError("Unterminated map, expected '}'", tok.pos, tok.line, tok.col)
|
|
if c == "}":
|
|
tok.next_token()
|
|
return result
|
|
key_token = _parse_expr(tok)
|
|
if isinstance(key_token, Keyword):
|
|
key = key_token.name
|
|
elif isinstance(key_token, str):
|
|
key = key_token
|
|
else:
|
|
raise ParseError(
|
|
f"Map key must be keyword or string, got {type(key_token).__name__}",
|
|
tok.pos, tok.line, tok.col,
|
|
)
|
|
result[key] = _parse_expr(tok)
|
|
|
|
|
|
# ---------------------------------------------------------------------------
|
|
# Serialization
|
|
# ---------------------------------------------------------------------------
|
|
|
|
def serialize(expr: Any, indent: int = 0, pretty: bool = False) -> str:
|
|
"""Serialize a value back to s-expression text.
|
|
|
|
Type dispatch order (first match wins):
|
|
|
|
- ``SxExpr`` → emitted unquoted (pre-built sx source)
|
|
- ``list`` → ``(head ...)`` (s-expression list)
|
|
- ``Symbol`` → bare name
|
|
- ``Keyword`` → ``:name``
|
|
- ``str`` → ``"quoted"`` (with escapes)
|
|
- ``bool`` → ``true`` / ``false``
|
|
- ``int/float`` → numeric literal
|
|
- ``None/NIL`` → ``nil``
|
|
- ``dict`` → ``{:key val ...}``
|
|
|
|
List serialization conventions (for ``sx_call`` kwargs):
|
|
|
|
- ``(list ...)`` — data array: client gets iterable for map/filter
|
|
- ``(<> ...)`` — rendered content: client treats as DocumentFragment
|
|
- ``(head ...)`` — AST: head is called as function (never use for data)
|
|
"""
|
|
if isinstance(expr, SxExpr):
|
|
return expr.source
|
|
|
|
if isinstance(expr, list):
|
|
if not expr:
|
|
return "()"
|
|
# Quasiquote sugar: [Symbol("quasiquote"), x] → `x
|
|
if (len(expr) == 2 and isinstance(expr[0], Symbol)):
|
|
name = expr[0].name
|
|
if name == "quasiquote":
|
|
return "`" + serialize(expr[1], indent, pretty)
|
|
if name == "unquote":
|
|
return "," + serialize(expr[1], indent, pretty)
|
|
if name == "splice-unquote":
|
|
return ",@" + serialize(expr[1], indent, pretty)
|
|
if pretty:
|
|
return _serialize_pretty(expr, indent)
|
|
items = [serialize(item, indent, False) for item in expr]
|
|
return "(" + " ".join(items) + ")"
|
|
|
|
if isinstance(expr, Symbol):
|
|
return expr.name
|
|
|
|
if isinstance(expr, Keyword):
|
|
return f":{expr.name}"
|
|
|
|
if isinstance(expr, str):
|
|
escaped = (
|
|
expr.replace("\\", "\\\\")
|
|
.replace('"', '\\"')
|
|
.replace("\n", "\\n")
|
|
.replace("\r", "\\r")
|
|
.replace("\t", "\\t")
|
|
.replace("\0", "\\0")
|
|
.replace("</script", "<\\/script")
|
|
)
|
|
return f'"{escaped}"'
|
|
|
|
if isinstance(expr, bool):
|
|
return "true" if expr else "false"
|
|
|
|
if isinstance(expr, (int, float)):
|
|
return str(expr)
|
|
|
|
if expr is None or isinstance(expr, type(NIL)):
|
|
return "nil"
|
|
|
|
if isinstance(expr, dict):
|
|
items: list[str] = []
|
|
for k, v in expr.items():
|
|
items.append(f":{k}")
|
|
items.append(serialize(v, indent, pretty))
|
|
return "{" + " ".join(items) + "}"
|
|
|
|
# _RawHTML — pre-rendered HTML; wrap as (raw! "...") for SX wire format
|
|
from .html import _RawHTML
|
|
if isinstance(expr, _RawHTML):
|
|
escaped = (
|
|
expr.html.replace("\\", "\\\\")
|
|
.replace('"', '\\"')
|
|
.replace("\n", "\\n")
|
|
.replace("\r", "\\r")
|
|
.replace("\0", "\\0")
|
|
.replace("</script", "<\\/script")
|
|
)
|
|
return f'(raw! "{escaped}")'
|
|
|
|
# Catch callables (Python functions leaked into sx data)
|
|
if callable(expr):
|
|
import logging
|
|
logging.getLogger("sx").error(
|
|
"serialize: callable leaked into sx data: %r", expr)
|
|
return "nil"
|
|
|
|
# Fallback for Lambda/Component — show repr
|
|
return repr(expr)
|
|
|
|
|
|
def _serialize_pretty(expr: list, indent: int) -> str:
|
|
if not expr:
|
|
return "()"
|
|
inner_prefix = " " * (indent + 1)
|
|
|
|
# Try compact first
|
|
compact = serialize(expr, indent, False)
|
|
if len(compact) < 72 and "\n" not in compact:
|
|
return compact
|
|
|
|
head = serialize(expr[0], indent + 1, False)
|
|
parts = [f"({head}"]
|
|
|
|
i = 1
|
|
while i < len(expr):
|
|
item = expr[i]
|
|
if isinstance(item, Keyword) and i + 1 < len(expr):
|
|
key = serialize(item, 0, False)
|
|
val = serialize(expr[i + 1], indent + 1, False)
|
|
if len(val) < 50 and "\n" not in val:
|
|
parts.append(f"{inner_prefix}{key} {val}")
|
|
else:
|
|
val_p = serialize(expr[i + 1], indent + 1, True)
|
|
parts.append(f"{inner_prefix}{key} {val_p}")
|
|
i += 2
|
|
else:
|
|
item_str = serialize(item, indent + 1, True)
|
|
parts.append(f"{inner_prefix}{item_str}")
|
|
i += 1
|
|
|
|
return "\n".join(parts) + ")"
|