The Python bootstrapper now auto-includes deps (component analysis) and signals (reactive islands) when the HTML adapter is present, matching production requirements where sx_ref.py must export compute_all_deps, transitive_deps, page_render_plan, etc. Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
2400 lines
81 KiB
Python
2400 lines
81 KiB
Python
#!/usr/bin/env python3
|
|
"""
|
|
Bootstrap compiler: reference SX evaluator -> Python.
|
|
|
|
Reads the .sx reference specification and emits a standalone Python
|
|
evaluator module (sx_ref.py) that can be compared against the hand-written
|
|
evaluator.py / html.py / async_eval.py.
|
|
|
|
The compiler translates the restricted SX subset used in eval.sx/render.sx
|
|
into idiomatic Python. Platform interface functions are emitted as
|
|
native Python implementations.
|
|
|
|
Usage:
|
|
python bootstrap_py.py > sx_ref.py
|
|
"""
|
|
from __future__ import annotations
|
|
|
|
import os
|
|
import sys
|
|
|
|
# Add project root to path for imports
|
|
_HERE = os.path.dirname(os.path.abspath(__file__))
|
|
_PROJECT = os.path.abspath(os.path.join(_HERE, "..", "..", ".."))
|
|
sys.path.insert(0, _PROJECT)
|
|
|
|
from shared.sx.parser import parse_all
|
|
from shared.sx.types import Symbol, Keyword, NIL as SX_NIL
|
|
|
|
|
|
# ---------------------------------------------------------------------------
|
|
# SX -> Python transpiler
|
|
# ---------------------------------------------------------------------------
|
|
|
|
# Python reserved words — SX names that collide get _ suffix
|
|
# Excludes names we intentionally shadow (list, dict, range, filter, map)
|
|
_PY_RESERVED = frozenset({
|
|
"False", "None", "True", "and", "as", "assert", "async", "await",
|
|
"break", "class", "continue", "def", "del", "elif", "else", "except",
|
|
"finally", "for", "from", "global", "if", "import", "in", "is",
|
|
"lambda", "nonlocal", "not", "or", "pass", "raise", "return", "try",
|
|
"while", "with", "yield",
|
|
# builtins we don't want to shadow
|
|
"default", "type", "id", "input", "open", "print", "set", "super",
|
|
})
|
|
|
|
|
|
class PyEmitter:
|
|
"""Transpile an SX AST node to Python source code."""
|
|
|
|
def __init__(self):
|
|
self.indent = 0
|
|
|
|
def emit(self, expr) -> str:
|
|
"""Emit a Python expression from an SX AST node."""
|
|
# Bool MUST be checked before int (bool is subclass of int in Python)
|
|
if isinstance(expr, bool):
|
|
return "True" if expr else "False"
|
|
if isinstance(expr, (int, float)):
|
|
return str(expr)
|
|
if isinstance(expr, str):
|
|
return self._py_string(expr)
|
|
if expr is None or expr is SX_NIL:
|
|
return "NIL"
|
|
if isinstance(expr, Symbol):
|
|
return self._emit_symbol(expr.name)
|
|
if isinstance(expr, Keyword):
|
|
return self._py_string(expr.name)
|
|
if isinstance(expr, dict):
|
|
return self._emit_native_dict(expr)
|
|
if isinstance(expr, list):
|
|
return self._emit_list(expr)
|
|
return str(expr)
|
|
|
|
def emit_statement(self, expr, indent: int = 0) -> str:
|
|
"""Emit a Python statement from an SX AST node."""
|
|
pad = " " * indent
|
|
if isinstance(expr, list) and expr:
|
|
head = expr[0]
|
|
if isinstance(head, Symbol):
|
|
name = head.name
|
|
if name == "define":
|
|
return self._emit_define(expr, indent)
|
|
if name == "set!":
|
|
return f"{pad}{self._mangle(expr[1].name)} = {self.emit(expr[2])}"
|
|
if name == "when":
|
|
return self._emit_when_stmt(expr, indent)
|
|
if name == "do" or name == "begin":
|
|
return "\n".join(self.emit_statement(e, indent) for e in expr[1:])
|
|
if name == "for-each":
|
|
return self._emit_for_each_stmt(expr, indent)
|
|
if name == "dict-set!":
|
|
return f"{pad}{self.emit(expr[1])}[{self.emit(expr[2])}] = {self.emit(expr[3])}"
|
|
if name == "append!":
|
|
return f"{pad}{self.emit(expr[1])}.append({self.emit(expr[2])})"
|
|
if name == "env-set!":
|
|
return f"{pad}{self.emit(expr[1])}[{self.emit(expr[2])}] = {self.emit(expr[3])}"
|
|
if name == "set-lambda-name!":
|
|
return f"{pad}{self.emit(expr[1])}.name = {self.emit(expr[2])}"
|
|
return f"{pad}{self.emit(expr)}"
|
|
|
|
# --- Symbol emission ---
|
|
|
|
def _emit_symbol(self, name: str) -> str:
|
|
mangled = self._mangle(name)
|
|
cell_vars = getattr(self, '_current_cell_vars', set())
|
|
if mangled in cell_vars:
|
|
return f"_cells[{self._py_string(mangled)}]"
|
|
return mangled
|
|
|
|
def _mangle(self, name: str) -> str:
|
|
"""Convert SX identifier to valid Python identifier."""
|
|
RENAMES = {
|
|
"nil": "NIL",
|
|
"true": "True",
|
|
"false": "False",
|
|
"nil?": "is_nil",
|
|
"type-of": "type_of",
|
|
"symbol-name": "symbol_name",
|
|
"keyword-name": "keyword_name",
|
|
"make-lambda": "make_lambda",
|
|
"make-component": "make_component",
|
|
"make-macro": "make_macro",
|
|
"make-thunk": "make_thunk",
|
|
"make-handler-def": "make_handler_def",
|
|
"make-query-def": "make_query_def",
|
|
"make-action-def": "make_action_def",
|
|
"make-page-def": "make_page_def",
|
|
"make-symbol": "make_symbol",
|
|
"make-keyword": "make_keyword",
|
|
"lambda-params": "lambda_params",
|
|
"lambda-body": "lambda_body",
|
|
"lambda-closure": "lambda_closure",
|
|
"lambda-name": "lambda_name",
|
|
"set-lambda-name!": "set_lambda_name",
|
|
"component-params": "component_params",
|
|
"component-body": "component_body",
|
|
"component-closure": "component_closure",
|
|
"component-has-children?": "component_has_children",
|
|
"component-name": "component_name",
|
|
"component-affinity": "component_affinity",
|
|
"macro-params": "macro_params",
|
|
"macro-rest-param": "macro_rest_param",
|
|
"macro-body": "macro_body",
|
|
"macro-closure": "macro_closure",
|
|
"thunk?": "is_thunk",
|
|
"thunk-expr": "thunk_expr",
|
|
"thunk-env": "thunk_env",
|
|
"callable?": "is_callable",
|
|
"lambda?": "is_lambda",
|
|
"component?": "is_component",
|
|
"island?": "is_island",
|
|
"make-island": "make_island",
|
|
"make-signal": "make_signal",
|
|
"signal?": "is_signal",
|
|
"signal-value": "signal_value",
|
|
"signal-set-value!": "signal_set_value",
|
|
"signal-subscribers": "signal_subscribers",
|
|
"signal-add-sub!": "signal_add_sub",
|
|
"signal-remove-sub!": "signal_remove_sub",
|
|
"signal-deps": "signal_deps",
|
|
"signal-set-deps!": "signal_set_deps",
|
|
"set-tracking-context!": "set_tracking_context",
|
|
"get-tracking-context": "get_tracking_context",
|
|
"make-tracking-context": "make_tracking_context",
|
|
"tracking-context-deps": "tracking_context_deps",
|
|
"tracking-context-add-dep!": "tracking_context_add_dep",
|
|
"tracking-context-notify-fn": "tracking_context_notify_fn",
|
|
"identical?": "is_identical",
|
|
"notify-subscribers": "notify_subscribers",
|
|
"flush-subscribers": "flush_subscribers",
|
|
"dispose-computed": "dispose_computed",
|
|
"with-island-scope": "with_island_scope",
|
|
"register-in-scope": "register_in_scope",
|
|
"*batch-depth*": "_batch_depth",
|
|
"*batch-queue*": "_batch_queue",
|
|
"*island-scope*": "_island_scope",
|
|
"macro?": "is_macro",
|
|
"primitive?": "is_primitive",
|
|
"get-primitive": "get_primitive",
|
|
"env-has?": "env_has",
|
|
"env-get": "env_get",
|
|
"env-set!": "env_set",
|
|
"env-extend": "env_extend",
|
|
"env-merge": "env_merge",
|
|
"dict-set!": "dict_set",
|
|
"dict-get": "dict_get",
|
|
"dict-has?": "dict_has",
|
|
"dict-delete!": "dict_delete",
|
|
"eval-expr": "eval_expr",
|
|
"eval-list": "eval_list",
|
|
"eval-call": "eval_call",
|
|
"is-render-expr?": "is_render_expr",
|
|
"render-expr": "render_expr",
|
|
"call-lambda": "call_lambda",
|
|
"call-component": "call_component",
|
|
"parse-keyword-args": "parse_keyword_args",
|
|
"parse-comp-params": "parse_comp_params",
|
|
"parse-macro-params": "parse_macro_params",
|
|
"expand-macro": "expand_macro",
|
|
"render-to-html": "render_to_html",
|
|
"render-to-sx": "render_to_sx",
|
|
"render-value-to-html": "render_value_to_html",
|
|
"render-list-to-html": "render_list_to_html",
|
|
"render-html-element": "render_html_element",
|
|
"render-html-component": "render_html_component",
|
|
"parse-element-args": "parse_element_args",
|
|
"render-attrs": "render_attrs",
|
|
"aser-list": "aser_list",
|
|
"aser-fragment": "aser_fragment",
|
|
"aser-call": "aser_call",
|
|
"aser-special": "aser_special",
|
|
"sf-if": "sf_if",
|
|
"sf-when": "sf_when",
|
|
"sf-cond": "sf_cond",
|
|
"sf-cond-scheme": "sf_cond_scheme",
|
|
"sf-cond-clojure": "sf_cond_clojure",
|
|
"sf-case": "sf_case",
|
|
"sf-case-loop": "sf_case_loop",
|
|
"sf-and": "sf_and",
|
|
"sf-or": "sf_or",
|
|
"sf-let": "sf_let",
|
|
"sf-lambda": "sf_lambda",
|
|
"sf-define": "sf_define",
|
|
"sf-defcomp": "sf_defcomp",
|
|
"defcomp-kwarg": "defcomp_kwarg",
|
|
"sf-defmacro": "sf_defmacro",
|
|
"sf-begin": "sf_begin",
|
|
"sf-quote": "sf_quote",
|
|
"sf-quasiquote": "sf_quasiquote",
|
|
"sf-thread-first": "sf_thread_first",
|
|
"sf-set!": "sf_set_bang",
|
|
"sf-reset": "sf_reset",
|
|
"sf-shift": "sf_shift",
|
|
"qq-expand": "qq_expand",
|
|
"ho-map": "ho_map",
|
|
"ho-map-indexed": "ho_map_indexed",
|
|
"ho-filter": "ho_filter",
|
|
"ho-reduce": "ho_reduce",
|
|
"ho-some": "ho_some",
|
|
"ho-every": "ho_every",
|
|
"ho-for-each": "ho_for_each",
|
|
"sf-defstyle": "sf_defstyle",
|
|
"special-form?": "is_special_form",
|
|
"ho-form?": "is_ho_form",
|
|
"strip-prefix": "strip_prefix",
|
|
"escape-html": "escape_html",
|
|
"escape-attr": "escape_attr",
|
|
"escape-string": "escape_string",
|
|
"raw-html-content": "raw_html_content",
|
|
"HTML_TAGS": "HTML_TAGS",
|
|
"VOID_ELEMENTS": "VOID_ELEMENTS",
|
|
"BOOLEAN_ATTRS": "BOOLEAN_ATTRS",
|
|
# render.sx core
|
|
"definition-form?": "is_definition_form",
|
|
# adapter-html.sx
|
|
"RENDER_HTML_FORMS": "RENDER_HTML_FORMS",
|
|
"render-html-form?": "is_render_html_form",
|
|
"dispatch-html-form": "dispatch_html_form",
|
|
"render-lambda-html": "render_lambda_html",
|
|
"make-raw-html": "make_raw_html",
|
|
"render-html-island": "render_html_island",
|
|
"serialize-island-state": "serialize_island_state",
|
|
"json-serialize": "json_serialize",
|
|
"empty-dict?": "is_empty_dict",
|
|
"sf-defisland": "sf_defisland",
|
|
# adapter-sx.sx
|
|
"render-to-sx": "render_to_sx",
|
|
"aser": "aser",
|
|
# Primitives that need exact aliases
|
|
"contains?": "contains_p",
|
|
"starts-with?": "starts_with_p",
|
|
"ends-with?": "ends_with_p",
|
|
"empty?": "empty_p",
|
|
"every?": "every_p",
|
|
"for-each": "for_each",
|
|
"for-each-indexed": "for_each_indexed",
|
|
"map-indexed": "map_indexed",
|
|
"map-dict": "map_dict",
|
|
"eval-cond": "eval_cond",
|
|
"eval-cond-scheme": "eval_cond_scheme",
|
|
"eval-cond-clojure": "eval_cond_clojure",
|
|
"process-bindings": "process_bindings",
|
|
# deps.sx
|
|
"scan-refs": "scan_refs",
|
|
"scan-refs-walk": "scan_refs_walk",
|
|
"transitive-deps": "transitive_deps",
|
|
"compute-all-deps": "compute_all_deps",
|
|
"scan-components-from-source": "scan_components_from_source",
|
|
"components-needed": "components_needed",
|
|
"page-component-bundle": "page_component_bundle",
|
|
"page-css-classes": "page_css_classes",
|
|
"component-deps": "component_deps",
|
|
"component-set-deps!": "component_set_deps",
|
|
"component-css-classes": "component_css_classes",
|
|
"component-io-refs": "component_io_refs",
|
|
"component-set-io-refs!": "component_set_io_refs",
|
|
"env-components": "env_components",
|
|
"regex-find-all": "regex_find_all",
|
|
"scan-css-classes": "scan_css_classes",
|
|
# deps.sx IO detection
|
|
"scan-io-refs": "scan_io_refs",
|
|
"scan-io-refs-walk": "scan_io_refs_walk",
|
|
"transitive-io-refs": "transitive_io_refs",
|
|
"compute-all-io-refs": "compute_all_io_refs",
|
|
"component-pure?": "component_pure_p",
|
|
"render-target": "render_target",
|
|
"page-render-plan": "page_render_plan",
|
|
# router.sx
|
|
"split-path-segments": "split_path_segments",
|
|
"make-route-segment": "make_route_segment",
|
|
"parse-route-pattern": "parse_route_pattern",
|
|
"match-route-segments": "match_route_segments",
|
|
"match-route": "match_route",
|
|
"find-matching-route": "find_matching_route",
|
|
}
|
|
if name in RENAMES:
|
|
return RENAMES[name]
|
|
# General mangling
|
|
result = name
|
|
# Handle trailing ? and !
|
|
if result.endswith("?"):
|
|
result = result[:-1] + "_p"
|
|
elif result.endswith("!"):
|
|
result = result[:-1] + "_b"
|
|
# Kebab to snake_case
|
|
result = result.replace("-", "_")
|
|
# Escape Python reserved words
|
|
if result in _PY_RESERVED:
|
|
result = result + "_"
|
|
return result
|
|
|
|
# --- List emission ---
|
|
|
|
def _emit_list(self, expr: list) -> str:
|
|
if not expr:
|
|
return "[]"
|
|
head = expr[0]
|
|
if not isinstance(head, Symbol):
|
|
# Data list
|
|
return "[" + ", ".join(self.emit(x) for x in expr) + "]"
|
|
name = head.name
|
|
handler = getattr(
|
|
self,
|
|
f"_sf_{name.replace('-', '_').replace('!', '_b').replace('?', '_p')}",
|
|
None,
|
|
)
|
|
if handler:
|
|
return handler(expr)
|
|
# Built-in forms
|
|
if name in ("fn", "lambda"):
|
|
return self._emit_fn(expr)
|
|
if name in ("let", "let*"):
|
|
return self._emit_let(expr)
|
|
if name == "if":
|
|
return self._emit_if(expr)
|
|
if name == "when":
|
|
return self._emit_when(expr)
|
|
if name == "cond":
|
|
return self._emit_cond(expr)
|
|
if name == "case":
|
|
return self._emit_case(expr)
|
|
if name == "and":
|
|
return self._emit_and(expr)
|
|
if name == "or":
|
|
return self._emit_or(expr)
|
|
if name == "not":
|
|
return f"(not sx_truthy({self.emit(expr[1])}))"
|
|
if name in ("do", "begin"):
|
|
return self._emit_do(expr)
|
|
if name == "list":
|
|
return "[" + ", ".join(self.emit(x) for x in expr[1:]) + "]"
|
|
if name == "dict":
|
|
return self._emit_dict_literal(expr)
|
|
if name == "quote":
|
|
return self._emit_quote(expr[1])
|
|
if name == "set!":
|
|
# set! in expression context — use nonlocal_cells dict for mutation
|
|
# from nested lambdas (Python closures can read but not rebind outer vars)
|
|
varname = expr[1].name if isinstance(expr[1], Symbol) else str(expr[1])
|
|
py_var = self._mangle(varname)
|
|
return f"_sx_cell_set(_cells, {self._py_string(py_var)}, {self.emit(expr[2])})"
|
|
if name == "str":
|
|
parts = [self.emit(x) for x in expr[1:]]
|
|
return "sx_str(" + ", ".join(parts) + ")"
|
|
# Mutation forms that can appear in expression context
|
|
if name == "append!":
|
|
return f"_sx_append({self.emit(expr[1])}, {self.emit(expr[2])})"
|
|
if name == "dict-set!":
|
|
return f"_sx_dict_set({self.emit(expr[1])}, {self.emit(expr[2])}, {self.emit(expr[3])})"
|
|
if name == "env-set!":
|
|
return f"_sx_dict_set({self.emit(expr[1])}, {self.emit(expr[2])}, {self.emit(expr[3])})"
|
|
if name == "set-lambda-name!":
|
|
return f"_sx_set_attr({self.emit(expr[1])}, 'name', {self.emit(expr[2])})"
|
|
# Infix operators
|
|
if name in ("+", "-", "*", "/", "=", "!=", "<", ">", "<=", ">=", "mod"):
|
|
return self._emit_infix(name, expr[1:])
|
|
if name == "inc":
|
|
return f"({self.emit(expr[1])} + 1)"
|
|
if name == "dec":
|
|
return f"({self.emit(expr[1])} - 1)"
|
|
|
|
# Regular function call
|
|
fn_name = self._mangle(name)
|
|
args = ", ".join(self.emit(x) for x in expr[1:])
|
|
return f"{fn_name}({args})"
|
|
|
|
# --- Special form emitters ---
|
|
|
|
def _emit_fn(self, expr) -> str:
|
|
params = expr[1]
|
|
body = expr[2:]
|
|
param_names = []
|
|
rest_name = None
|
|
i = 0
|
|
while i < len(params):
|
|
p = params[i]
|
|
if isinstance(p, Symbol) and p.name == "&rest":
|
|
# Next param is the rest parameter
|
|
if i + 1 < len(params):
|
|
rest_name = self._mangle(params[i + 1].name if isinstance(params[i + 1], Symbol) else str(params[i + 1]))
|
|
i += 2
|
|
continue
|
|
else:
|
|
i += 1
|
|
continue
|
|
if isinstance(p, Symbol):
|
|
param_names.append(self._mangle(p.name))
|
|
else:
|
|
param_names.append(str(p))
|
|
i += 1
|
|
if rest_name:
|
|
param_names.append(f"*{rest_name}")
|
|
params_str = ", ".join(param_names)
|
|
if len(body) == 1:
|
|
body_py = self.emit(body[0])
|
|
return f"lambda {params_str}: {body_py}"
|
|
# Multi-expression body: need a local function
|
|
lines = []
|
|
lines.append(f"_sx_fn(lambda {params_str}: (")
|
|
for b in body[:-1]:
|
|
lines.append(f" {self.emit(b)},")
|
|
lines.append(f" {self.emit(body[-1])}")
|
|
lines.append(")[-1])")
|
|
return "\n".join(lines)
|
|
|
|
def _emit_let(self, expr) -> str:
|
|
bindings = expr[1]
|
|
body = expr[2:]
|
|
assignments = []
|
|
if isinstance(bindings, list):
|
|
if bindings and isinstance(bindings[0], list):
|
|
# Scheme-style: ((name val) ...)
|
|
for b in bindings:
|
|
vname = b[0].name if isinstance(b[0], Symbol) else str(b[0])
|
|
assignments.append((self._mangle(vname), self.emit(b[1])))
|
|
else:
|
|
# Clojure-style: (name val name val ...)
|
|
for i in range(0, len(bindings), 2):
|
|
vname = bindings[i].name if isinstance(bindings[i], Symbol) else str(bindings[i])
|
|
assignments.append((self._mangle(vname), self.emit(bindings[i + 1])))
|
|
# Nested IIFE for sequential let (each binding can see previous ones):
|
|
# (lambda a: (lambda b: body)(val_b))(val_a)
|
|
# Cell variables (mutated by nested set!) are initialized in _cells dict
|
|
# instead of lambda params, since the body reads _cells[name].
|
|
cell_vars = getattr(self, '_current_cell_vars', set())
|
|
body_parts = [self.emit(b) for b in body]
|
|
if len(body) == 1:
|
|
body_str = body_parts[0]
|
|
else:
|
|
body_str = f"_sx_begin({', '.join(body_parts)})"
|
|
# Build from inside out
|
|
result = body_str
|
|
for name, val in reversed(assignments):
|
|
if name in cell_vars:
|
|
# Cell var: initialize in _cells dict, not as lambda param
|
|
result = f"_sx_begin(_sx_cell_set(_cells, {self._py_string(name)}, {val}), {result})"
|
|
else:
|
|
result = f"(lambda {name}: {result})({val})"
|
|
return result
|
|
|
|
def _emit_if(self, expr) -> str:
|
|
cond = self.emit(expr[1])
|
|
then = self.emit(expr[2])
|
|
els = self.emit(expr[3]) if len(expr) > 3 else "NIL"
|
|
return f"({then} if sx_truthy({cond}) else {els})"
|
|
|
|
def _emit_when(self, expr) -> str:
|
|
cond = self.emit(expr[1])
|
|
body_parts = expr[2:]
|
|
if len(body_parts) == 1:
|
|
return f"({self.emit(body_parts[0])} if sx_truthy({cond}) else NIL)"
|
|
body = ", ".join(self.emit(b) for b in body_parts)
|
|
return f"(_sx_begin({body}) if sx_truthy({cond}) else NIL)"
|
|
|
|
def _emit_when_stmt(self, expr, indent: int = 0) -> str:
|
|
pad = " " * indent
|
|
cond = self.emit(expr[1])
|
|
body_parts = expr[2:]
|
|
lines = [f"{pad}if sx_truthy({cond}):"]
|
|
for b in body_parts:
|
|
lines.append(self.emit_statement(b, indent + 1))
|
|
return "\n".join(lines)
|
|
|
|
def _emit_cond(self, expr) -> str:
|
|
clauses = expr[1:]
|
|
if not clauses:
|
|
return "NIL"
|
|
is_scheme = (
|
|
all(isinstance(c, list) and len(c) == 2 for c in clauses)
|
|
and not any(isinstance(c, Keyword) for c in clauses)
|
|
)
|
|
if is_scheme:
|
|
return self._cond_scheme(clauses)
|
|
return self._cond_clojure(clauses)
|
|
|
|
def _cond_scheme(self, clauses) -> str:
|
|
if not clauses:
|
|
return "NIL"
|
|
clause = clauses[0]
|
|
test = clause[0]
|
|
body = clause[1]
|
|
if isinstance(test, Symbol) and test.name in ("else", ":else"):
|
|
return self.emit(body)
|
|
if isinstance(test, Keyword) and test.name == "else":
|
|
return self.emit(body)
|
|
return f"({self.emit(body)} if sx_truthy({self.emit(test)}) else {self._cond_scheme(clauses[1:])})"
|
|
|
|
def _cond_clojure(self, clauses) -> str:
|
|
if len(clauses) < 2:
|
|
return "NIL"
|
|
test = clauses[0]
|
|
body = clauses[1]
|
|
if isinstance(test, Keyword) and test.name == "else":
|
|
return self.emit(body)
|
|
if isinstance(test, Symbol) and test.name in ("else", ":else"):
|
|
return self.emit(body)
|
|
return f"({self.emit(body)} if sx_truthy({self.emit(test)}) else {self._cond_clojure(clauses[2:])})"
|
|
|
|
def _emit_case(self, expr) -> str:
|
|
match_expr = self.emit(expr[1])
|
|
clauses = expr[2:]
|
|
return f"_sx_case({match_expr}, [{self._case_pairs(clauses)}])"
|
|
|
|
def _case_pairs(self, clauses) -> str:
|
|
pairs = []
|
|
i = 0
|
|
while i < len(clauses) - 1:
|
|
test = clauses[i]
|
|
body = clauses[i + 1]
|
|
if isinstance(test, Keyword) and test.name == "else":
|
|
pairs.append(f"(None, lambda: {self.emit(body)})")
|
|
elif isinstance(test, Symbol) and test.name in ("else", ":else"):
|
|
pairs.append(f"(None, lambda: {self.emit(body)})")
|
|
else:
|
|
pairs.append(f"({self.emit(test)}, lambda: {self.emit(body)})")
|
|
i += 2
|
|
return ", ".join(pairs)
|
|
|
|
def _emit_and(self, expr) -> str:
|
|
parts = [self.emit(x) for x in expr[1:]]
|
|
if len(parts) == 1:
|
|
return parts[0]
|
|
# Use Python's native and for short-circuit evaluation.
|
|
# Last value returned as-is; prior values tested with sx_truthy.
|
|
# (and a b c) -> (a if not sx_truthy(a) else (b if not sx_truthy(b) else c))
|
|
result = parts[-1]
|
|
for p in reversed(parts[:-1]):
|
|
result = f"({p} if not sx_truthy({p}) else {result})"
|
|
return result
|
|
|
|
def _emit_or(self, expr) -> str:
|
|
if len(expr) == 2:
|
|
return self.emit(expr[1])
|
|
parts = [self.emit(x) for x in expr[1:]]
|
|
# Use Python's short-circuit pattern:
|
|
# (or a b c) -> (a if sx_truthy(a) else (b if sx_truthy(b) else c))
|
|
result = parts[-1]
|
|
for p in reversed(parts[:-1]):
|
|
result = f"({p} if sx_truthy({p}) else {result})"
|
|
return result
|
|
|
|
def _emit_do(self, expr) -> str:
|
|
return self._emit_do_inner(expr[1:])
|
|
|
|
def _emit_do_inner(self, exprs) -> str:
|
|
if len(exprs) == 1:
|
|
return self.emit(exprs[0])
|
|
parts = [self.emit(e) for e in exprs]
|
|
return "_sx_begin(" + ", ".join(parts) + ")"
|
|
|
|
def _emit_native_dict(self, expr: dict) -> str:
|
|
"""Emit a native Python dict (from parser's {:key val} syntax)."""
|
|
parts = []
|
|
for key, val in expr.items():
|
|
parts.append(f"{self._py_string(key)}: {self.emit(val)}")
|
|
return "{" + ", ".join(parts) + "}"
|
|
|
|
def _emit_dict_literal(self, expr) -> str:
|
|
pairs = expr[1:]
|
|
parts = []
|
|
i = 0
|
|
while i < len(pairs) - 1:
|
|
key = pairs[i]
|
|
val = pairs[i + 1]
|
|
if isinstance(key, Keyword):
|
|
parts.append(f"{self._py_string(key.name)}: {self.emit(val)}")
|
|
else:
|
|
parts.append(f"{self.emit(key)}: {self.emit(val)}")
|
|
i += 2
|
|
return "{" + ", ".join(parts) + "}"
|
|
|
|
def _emit_infix(self, op: str, args: list) -> str:
|
|
PY_OPS = {"=": "==", "!=": "!=", "mod": "%"}
|
|
py_op = PY_OPS.get(op, op)
|
|
if len(args) == 1 and op == "-":
|
|
return f"(-{self.emit(args[0])})"
|
|
return f"({self.emit(args[0])} {py_op} {self.emit(args[1])})"
|
|
|
|
def _emit_define(self, expr, indent: int = 0) -> str:
|
|
pad = " " * indent
|
|
name = expr[1].name if isinstance(expr[1], Symbol) else str(expr[1])
|
|
val_expr = expr[2]
|
|
# If value is a lambda/fn, check if body uses set! on let-bound vars
|
|
# and emit as def for proper mutation support
|
|
if (isinstance(val_expr, list) and val_expr and
|
|
isinstance(val_expr[0], Symbol) and val_expr[0].name in ("fn", "lambda")
|
|
and self._body_uses_set(val_expr)):
|
|
return self._emit_define_as_def(name, val_expr, indent)
|
|
val = self.emit(val_expr)
|
|
return f"{pad}{self._mangle(name)} = {val}"
|
|
|
|
def _body_uses_set(self, fn_expr) -> bool:
|
|
"""Check if a fn expression's body (recursively) uses set!."""
|
|
def _has_set(node):
|
|
if not isinstance(node, list) or not node:
|
|
return False
|
|
head = node[0]
|
|
if isinstance(head, Symbol) and head.name == "set!":
|
|
return True
|
|
return any(_has_set(child) for child in node if isinstance(child, list))
|
|
body = fn_expr[2:]
|
|
return any(_has_set(b) for b in body)
|
|
|
|
def _emit_define_as_def(self, name: str, fn_expr, indent: int = 0) -> str:
|
|
"""Emit a define with fn value as a proper def statement.
|
|
|
|
This is used for functions that contain set! — Python closures can't
|
|
rebind outer lambda params, so we need proper def + local variables.
|
|
Variables mutated by set! from nested lambdas use a _cells dict.
|
|
"""
|
|
pad = " " * indent
|
|
params = fn_expr[1]
|
|
body = fn_expr[2:]
|
|
param_names = []
|
|
for p in params:
|
|
if isinstance(p, Symbol):
|
|
param_names.append(self._mangle(p.name))
|
|
else:
|
|
param_names.append(str(p))
|
|
params_str = ", ".join(param_names)
|
|
py_name = self._mangle(name)
|
|
# Find set! target variables that are used from nested lambda scopes
|
|
nested_set_vars = self._find_nested_set_vars(body)
|
|
lines = [f"{pad}def {py_name}({params_str}):"]
|
|
if nested_set_vars:
|
|
lines.append(f"{pad} _cells = {{}}")
|
|
# Emit body with cell var tracking
|
|
old_cells = getattr(self, '_current_cell_vars', set())
|
|
self._current_cell_vars = nested_set_vars
|
|
self._emit_body_stmts(body, lines, indent + 1)
|
|
self._current_cell_vars = old_cells
|
|
return "\n".join(lines)
|
|
|
|
def _find_nested_set_vars(self, body) -> set[str]:
|
|
"""Find variable names that are set! from within nested fn/lambda bodies."""
|
|
result = set()
|
|
def _scan(node, in_nested_fn=False):
|
|
if not isinstance(node, list) or not node:
|
|
return
|
|
head = node[0]
|
|
if isinstance(head, Symbol):
|
|
if head.name in ("fn", "lambda") and in_nested_fn:
|
|
# Already nested, keep scanning
|
|
for child in node[2:]:
|
|
_scan(child, True)
|
|
return
|
|
if head.name in ("fn", "lambda"):
|
|
# Entering nested fn
|
|
for child in node[2:]:
|
|
_scan(child, True)
|
|
return
|
|
if head.name == "set!" and in_nested_fn:
|
|
var = node[1].name if isinstance(node[1], Symbol) else str(node[1])
|
|
result.add(self._mangle(var))
|
|
for child in node:
|
|
if isinstance(child, list):
|
|
_scan(child, in_nested_fn)
|
|
for b in body:
|
|
_scan(b)
|
|
return result
|
|
|
|
def _emit_body_stmts(self, body: list, lines: list, indent: int) -> None:
|
|
"""Emit body expressions as statements into lines list.
|
|
|
|
Handles let as local variable declarations, and returns the last
|
|
expression.
|
|
"""
|
|
pad = " " * indent
|
|
for i, expr in enumerate(body):
|
|
is_last = (i == len(body) - 1)
|
|
if isinstance(expr, list) and expr and isinstance(expr[0], Symbol):
|
|
name = expr[0].name
|
|
if name in ("let", "let*"):
|
|
self._emit_let_as_stmts(expr, lines, indent, is_last)
|
|
continue
|
|
if name in ("do", "begin"):
|
|
sub_body = expr[1:]
|
|
if is_last:
|
|
self._emit_body_stmts(sub_body, lines, indent)
|
|
else:
|
|
for sub in sub_body:
|
|
lines.append(self.emit_statement(sub, indent))
|
|
continue
|
|
if is_last:
|
|
lines.append(f"{pad}return {self.emit(expr)}")
|
|
else:
|
|
lines.append(self.emit_statement(expr, indent))
|
|
|
|
def _emit_let_as_stmts(self, expr, lines: list, indent: int, is_last: bool) -> None:
|
|
"""Emit a let expression as local variable declarations."""
|
|
pad = " " * indent
|
|
bindings = expr[1]
|
|
body = expr[2:]
|
|
cell_vars = getattr(self, '_current_cell_vars', set())
|
|
if isinstance(bindings, list):
|
|
if bindings and isinstance(bindings[0], list):
|
|
# Scheme-style: ((name val) ...)
|
|
for b in bindings:
|
|
vname = b[0].name if isinstance(b[0], Symbol) else str(b[0])
|
|
mangled = self._mangle(vname)
|
|
if mangled in cell_vars:
|
|
lines.append(f"{pad}_cells[{self._py_string(mangled)}] = {self.emit(b[1])}")
|
|
else:
|
|
lines.append(f"{pad}{mangled} = {self.emit(b[1])}")
|
|
else:
|
|
# Clojure-style: (name val name val ...)
|
|
for j in range(0, len(bindings), 2):
|
|
vname = bindings[j].name if isinstance(bindings[j], Symbol) else str(bindings[j])
|
|
mangled = self._mangle(vname)
|
|
if mangled in cell_vars:
|
|
lines.append(f"{pad}_cells[{self._py_string(mangled)}] = {self.emit(bindings[j + 1])}")
|
|
else:
|
|
lines.append(f"{pad}{mangled} = {self.emit(bindings[j + 1])}")
|
|
if is_last:
|
|
self._emit_body_stmts(body, lines, indent)
|
|
else:
|
|
for b in body:
|
|
self._emit_stmt_recursive(b, lines, indent)
|
|
|
|
def _emit_for_each_stmt(self, expr, indent: int = 0) -> str:
|
|
pad = " " * indent
|
|
fn_expr = expr[1]
|
|
coll_expr = expr[2]
|
|
coll = self.emit(coll_expr)
|
|
# If fn is an inline lambda, emit a for loop
|
|
if isinstance(fn_expr, list) and isinstance(fn_expr[0], Symbol) and fn_expr[0].name == "fn":
|
|
params = fn_expr[1]
|
|
body = fn_expr[2:]
|
|
p = params[0].name if isinstance(params[0], Symbol) else str(params[0])
|
|
p_py = self._mangle(p)
|
|
lines = [f"{pad}for {p_py} in {coll}:"]
|
|
# Emit body as statements with proper let/set! handling
|
|
self._emit_loop_body(body, lines, indent + 1)
|
|
return "\n".join(lines)
|
|
fn = self.emit(fn_expr)
|
|
return f"{pad}for _item in {coll}:\n{pad} {fn}(_item)"
|
|
|
|
def _emit_loop_body(self, body: list, lines: list, indent: int) -> None:
|
|
"""Emit loop body as statements. Handles let, when, set!, cond properly."""
|
|
pad = " " * indent
|
|
for expr in body:
|
|
self._emit_stmt_recursive(expr, lines, indent)
|
|
|
|
def _emit_stmt_recursive(self, expr, lines: list, indent: int) -> None:
|
|
"""Emit an expression as statement(s), recursing into control flow."""
|
|
pad = " " * indent
|
|
if not isinstance(expr, list) or not expr:
|
|
lines.append(self.emit_statement(expr, indent))
|
|
return
|
|
head = expr[0]
|
|
if not isinstance(head, Symbol):
|
|
lines.append(self.emit_statement(expr, indent))
|
|
return
|
|
name = head.name
|
|
if name == "set!":
|
|
varname = expr[1].name if isinstance(expr[1], Symbol) else str(expr[1])
|
|
mangled = self._mangle(varname)
|
|
cell_vars = getattr(self, '_current_cell_vars', set())
|
|
if mangled in cell_vars:
|
|
lines.append(f"{pad}_cells[{self._py_string(mangled)}] = {self.emit(expr[2])}")
|
|
else:
|
|
lines.append(f"{pad}{mangled} = {self.emit(expr[2])}")
|
|
elif name in ("let", "let*"):
|
|
self._emit_let_as_stmts(expr, lines, indent, False)
|
|
elif name == "when":
|
|
cond = self.emit(expr[1])
|
|
lines.append(f"{pad}if sx_truthy({cond}):")
|
|
for b in expr[2:]:
|
|
self._emit_stmt_recursive(b, lines, indent + 1)
|
|
elif name == "cond":
|
|
self._emit_cond_stmt(expr, lines, indent)
|
|
elif name in ("do", "begin"):
|
|
for b in expr[1:]:
|
|
self._emit_stmt_recursive(b, lines, indent)
|
|
elif name == "if":
|
|
cond = self.emit(expr[1])
|
|
lines.append(f"{pad}if sx_truthy({cond}):")
|
|
self._emit_stmt_recursive(expr[2], lines, indent + 1)
|
|
if len(expr) > 3:
|
|
lines.append(f"{pad}else:")
|
|
self._emit_stmt_recursive(expr[3], lines, indent + 1)
|
|
elif name == "append!":
|
|
lines.append(f"{pad}{self.emit(expr[1])}.append({self.emit(expr[2])})")
|
|
elif name == "dict-set!":
|
|
lines.append(f"{pad}{self.emit(expr[1])}[{self.emit(expr[2])}] = {self.emit(expr[3])}")
|
|
elif name == "env-set!":
|
|
lines.append(f"{pad}{self.emit(expr[1])}[{self.emit(expr[2])}] = {self.emit(expr[3])}")
|
|
else:
|
|
lines.append(self.emit_statement(expr, indent))
|
|
|
|
def _emit_cond_stmt(self, expr, lines: list, indent: int) -> None:
|
|
"""Emit cond as if/elif/else chain."""
|
|
pad = " " * indent
|
|
clauses = expr[1:]
|
|
# Detect scheme vs clojure style
|
|
is_scheme = (
|
|
all(isinstance(c, list) and len(c) == 2 for c in clauses)
|
|
and not any(isinstance(c, Keyword) for c in clauses)
|
|
)
|
|
first_clause = True
|
|
if is_scheme:
|
|
for clause in clauses:
|
|
test, body = clause[0], clause[1]
|
|
if isinstance(test, Symbol) and test.name in ("else", ":else"):
|
|
lines.append(f"{pad}else:")
|
|
elif isinstance(test, Keyword) and test.name == "else":
|
|
lines.append(f"{pad}else:")
|
|
else:
|
|
kw = "if" if first_clause else "elif"
|
|
lines.append(f"{pad}{kw} sx_truthy({self.emit(test)}):")
|
|
first_clause = False
|
|
self._emit_stmt_recursive(body, lines, indent + 1)
|
|
else:
|
|
i = 0
|
|
while i < len(clauses) - 1:
|
|
test, body = clauses[i], clauses[i + 1]
|
|
if isinstance(test, Keyword) and test.name == "else":
|
|
lines.append(f"{pad}else:")
|
|
elif isinstance(test, Symbol) and test.name in ("else", ":else"):
|
|
lines.append(f"{pad}else:")
|
|
else:
|
|
kw = "if" if first_clause else "elif"
|
|
lines.append(f"{pad}{kw} sx_truthy({self.emit(test)}):")
|
|
first_clause = False
|
|
self._emit_stmt_recursive(body, lines, indent + 1)
|
|
i += 2
|
|
|
|
def _emit_quote(self, expr) -> str:
|
|
"""Emit a quoted expression as a Python literal AST."""
|
|
if isinstance(expr, bool):
|
|
return "True" if expr else "False"
|
|
if isinstance(expr, (int, float)):
|
|
return str(expr)
|
|
if isinstance(expr, str):
|
|
return self._py_string(expr)
|
|
if expr is None or expr is SX_NIL:
|
|
return "NIL"
|
|
if isinstance(expr, Symbol):
|
|
return f"Symbol({self._py_string(expr.name)})"
|
|
if isinstance(expr, Keyword):
|
|
return f"Keyword({self._py_string(expr.name)})"
|
|
if isinstance(expr, list):
|
|
return "[" + ", ".join(self._emit_quote(x) for x in expr) + "]"
|
|
return str(expr)
|
|
|
|
def _py_string(self, s: str) -> str:
|
|
return repr(s)
|
|
|
|
|
|
# ---------------------------------------------------------------------------
|
|
# Bootstrap compiler
|
|
# ---------------------------------------------------------------------------
|
|
|
|
def extract_defines(source: str) -> list[tuple[str, list]]:
|
|
"""Parse .sx source, return list of (name, define-expr) for top-level defines."""
|
|
exprs = parse_all(source)
|
|
defines = []
|
|
for expr in exprs:
|
|
if isinstance(expr, list) and expr and isinstance(expr[0], Symbol):
|
|
if expr[0].name == "define":
|
|
name = expr[1].name if isinstance(expr[1], Symbol) else str(expr[1])
|
|
defines.append((name, expr))
|
|
return defines
|
|
|
|
|
|
ADAPTER_FILES = {
|
|
"html": ("adapter-html.sx", "adapter-html"),
|
|
"sx": ("adapter-sx.sx", "adapter-sx"),
|
|
}
|
|
|
|
|
|
SPEC_MODULES = {
|
|
"deps": ("deps.sx", "deps (component dependency analysis)"),
|
|
"router": ("router.sx", "router (client-side route matching)"),
|
|
"engine": ("engine.sx", "engine (fetch/swap/trigger pure logic)"),
|
|
"signals": ("signals.sx", "signals (reactive signal runtime)"),
|
|
}
|
|
|
|
|
|
EXTENSION_NAMES = {"continuations"}
|
|
|
|
# Extension-provided special forms (not in eval.sx core)
|
|
EXTENSION_FORMS = {
|
|
"continuations": {"reset", "shift"},
|
|
}
|
|
|
|
|
|
def _parse_special_forms_spec(ref_dir: str) -> set[str]:
|
|
"""Parse special-forms.sx to extract declared form names."""
|
|
filepath = os.path.join(ref_dir, "special-forms.sx")
|
|
if not os.path.exists(filepath):
|
|
return set()
|
|
with open(filepath) as f:
|
|
src = f.read()
|
|
names = set()
|
|
for expr in parse_all(src):
|
|
if (isinstance(expr, list) and len(expr) >= 2
|
|
and isinstance(expr[0], Symbol)
|
|
and expr[0].name == "define-special-form"
|
|
and isinstance(expr[1], str)):
|
|
names.add(expr[1])
|
|
return names
|
|
|
|
|
|
def _extract_eval_dispatch_names(all_sections: list) -> set[str]:
|
|
"""Extract special form names dispatched in eval-list from transpiled sections."""
|
|
names = set()
|
|
for _label, defines in all_sections:
|
|
for name, _expr in defines:
|
|
if name.startswith("sf-"):
|
|
form = name[3:]
|
|
if form in ("cond-scheme", "cond-clojure", "case-loop"):
|
|
continue
|
|
names.add(form)
|
|
if name.startswith("ho-"):
|
|
form = name[3:]
|
|
names.add(form)
|
|
return names
|
|
|
|
|
|
def _validate_special_forms(ref_dir: str, all_sections: list,
|
|
has_continuations: bool) -> None:
|
|
"""Cross-check special-forms.sx against eval.sx dispatch. Warn on mismatches."""
|
|
spec_names = _parse_special_forms_spec(ref_dir)
|
|
if not spec_names:
|
|
return
|
|
|
|
dispatch_names = _extract_eval_dispatch_names(all_sections)
|
|
|
|
if has_continuations:
|
|
dispatch_names |= EXTENSION_FORMS["continuations"]
|
|
|
|
name_aliases = {
|
|
"thread-first": "->",
|
|
"every": "every?",
|
|
"set-bang": "set!",
|
|
}
|
|
normalized_dispatch = set()
|
|
for n in dispatch_names:
|
|
normalized_dispatch.add(name_aliases.get(n, n))
|
|
|
|
internal = {"named-let"}
|
|
normalized_dispatch -= internal
|
|
|
|
undispatched = spec_names - normalized_dispatch
|
|
ignore = {"fn", "let*", "do", "defrelation"}
|
|
undispatched -= ignore
|
|
|
|
unspecced = normalized_dispatch - spec_names
|
|
unspecced -= ignore
|
|
|
|
if undispatched:
|
|
import sys
|
|
print(f"# WARNING: special-forms.sx declares forms not in eval.sx: "
|
|
f"{', '.join(sorted(undispatched))}", file=sys.stderr)
|
|
if unspecced:
|
|
import sys
|
|
print(f"# WARNING: eval.sx dispatches forms not in special-forms.sx: "
|
|
f"{', '.join(sorted(unspecced))}", file=sys.stderr)
|
|
|
|
|
|
def compile_ref_to_py(
|
|
adapters: list[str] | None = None,
|
|
modules: list[str] | None = None,
|
|
extensions: list[str] | None = None,
|
|
spec_modules: list[str] | None = None,
|
|
) -> str:
|
|
"""Read reference .sx files and emit Python.
|
|
|
|
Args:
|
|
adapters: List of adapter names to include.
|
|
Valid names: html, sx.
|
|
None = include all server-side adapters.
|
|
modules: List of primitive module names to include.
|
|
core.* are always included. stdlib.* are opt-in.
|
|
None = include all modules (backward compatible).
|
|
extensions: List of optional extensions to include.
|
|
Valid names: continuations.
|
|
None = no extensions.
|
|
spec_modules: List of spec module names to include.
|
|
Valid names: deps, engine.
|
|
None = no spec modules.
|
|
"""
|
|
# Determine which primitive modules to include
|
|
prim_modules = None # None = all
|
|
if modules is not None:
|
|
prim_modules = [m for m in _ALL_PY_MODULES if m.startswith("core.")]
|
|
for m in modules:
|
|
if m not in prim_modules:
|
|
if m not in PRIMITIVES_PY_MODULES:
|
|
raise ValueError(f"Unknown module: {m!r}. Valid: {', '.join(PRIMITIVES_PY_MODULES)}")
|
|
prim_modules.append(m)
|
|
|
|
ref_dir = os.path.dirname(os.path.abspath(__file__))
|
|
emitter = PyEmitter()
|
|
|
|
# Resolve adapter set
|
|
if adapters is None:
|
|
adapter_set = set(ADAPTER_FILES.keys())
|
|
else:
|
|
adapter_set = set()
|
|
for a in adapters:
|
|
if a not in ADAPTER_FILES:
|
|
raise ValueError(f"Unknown adapter: {a!r}. Valid: {', '.join(ADAPTER_FILES)}")
|
|
adapter_set.add(a)
|
|
|
|
# Resolve spec modules
|
|
spec_mod_set = set()
|
|
if spec_modules:
|
|
for sm in spec_modules:
|
|
if sm not in SPEC_MODULES:
|
|
raise ValueError(f"Unknown spec module: {sm!r}. Valid: {', '.join(SPEC_MODULES)}")
|
|
spec_mod_set.add(sm)
|
|
# html adapter needs deps (component analysis) and signals (island rendering)
|
|
if "html" in adapter_set:
|
|
if "deps" in SPEC_MODULES:
|
|
spec_mod_set.add("deps")
|
|
if "signals" in SPEC_MODULES:
|
|
spec_mod_set.add("signals")
|
|
has_deps = "deps" in spec_mod_set
|
|
|
|
# Core files always included, then selected adapters, then spec modules
|
|
sx_files = [
|
|
("eval.sx", "eval"),
|
|
("forms.sx", "forms (server definition forms)"),
|
|
("render.sx", "render (core)"),
|
|
]
|
|
for name in ("html", "sx"):
|
|
if name in adapter_set:
|
|
sx_files.append(ADAPTER_FILES[name])
|
|
for name in sorted(spec_mod_set):
|
|
sx_files.append(SPEC_MODULES[name])
|
|
|
|
all_sections = []
|
|
for filename, label in sx_files:
|
|
filepath = os.path.join(ref_dir, filename)
|
|
if not os.path.exists(filepath):
|
|
continue
|
|
with open(filepath) as f:
|
|
src = f.read()
|
|
defines = extract_defines(src)
|
|
all_sections.append((label, defines))
|
|
|
|
# Resolve extensions
|
|
ext_set = set()
|
|
if extensions:
|
|
for e in extensions:
|
|
if e not in EXTENSION_NAMES:
|
|
raise ValueError(f"Unknown extension: {e!r}. Valid: {', '.join(EXTENSION_NAMES)}")
|
|
ext_set.add(e)
|
|
has_continuations = "continuations" in ext_set
|
|
|
|
# Validate special forms
|
|
_validate_special_forms(ref_dir, all_sections, has_continuations)
|
|
|
|
# Build output
|
|
has_html = "html" in adapter_set
|
|
has_sx = "sx" in adapter_set
|
|
|
|
parts = []
|
|
parts.append(PREAMBLE)
|
|
parts.append(PLATFORM_PY)
|
|
parts.append(PRIMITIVES_PY_PRE)
|
|
parts.append(_assemble_primitives_py(prim_modules))
|
|
parts.append(PRIMITIVES_PY_POST)
|
|
|
|
if has_deps:
|
|
parts.append(PLATFORM_DEPS_PY)
|
|
|
|
for label, defines in all_sections:
|
|
parts.append(f"\n# === Transpiled from {label} ===\n")
|
|
for name, expr in defines:
|
|
parts.append(f"# {name}")
|
|
parts.append(emitter.emit_statement(expr))
|
|
parts.append("")
|
|
|
|
parts.append(FIXUPS_PY)
|
|
if has_continuations:
|
|
parts.append(CONTINUATIONS_PY)
|
|
parts.append(public_api_py(has_html, has_sx, has_deps))
|
|
return "\n".join(parts)
|
|
|
|
|
|
# ---------------------------------------------------------------------------
|
|
# Static Python sections
|
|
# ---------------------------------------------------------------------------
|
|
|
|
PREAMBLE = '''\
|
|
"""
|
|
sx_ref.py -- Generated from reference SX evaluator specification.
|
|
|
|
Bootstrap-compiled from shared/sx/ref/{eval,render,adapter-html,adapter-sx}.sx
|
|
Compare against hand-written evaluator.py / html.py for correctness verification.
|
|
|
|
DO NOT EDIT -- regenerate with: python bootstrap_py.py
|
|
"""
|
|
from __future__ import annotations
|
|
|
|
import math
|
|
from typing import Any
|
|
|
|
|
|
# =========================================================================
|
|
# Types (reuse existing types)
|
|
# =========================================================================
|
|
|
|
from shared.sx.types import (
|
|
NIL, Symbol, Keyword, Lambda, Component, Island, Continuation, Macro,
|
|
HandlerDef, QueryDef, ActionDef, PageDef, _ShiftSignal,
|
|
)
|
|
from shared.sx.parser import SxExpr
|
|
'''
|
|
|
|
PLATFORM_PY = '''
|
|
# =========================================================================
|
|
# Platform interface -- Python implementation
|
|
# =========================================================================
|
|
|
|
class _Thunk:
|
|
"""Deferred evaluation for TCO."""
|
|
__slots__ = ("expr", "env")
|
|
def __init__(self, expr, env):
|
|
self.expr = expr
|
|
self.env = env
|
|
|
|
|
|
class _RawHTML:
|
|
"""Marker for pre-rendered HTML that should not be escaped."""
|
|
__slots__ = ("html",)
|
|
def __init__(self, html: str):
|
|
self.html = html
|
|
|
|
|
|
def sx_truthy(x):
|
|
"""SX truthiness: everything is truthy except False, None, and NIL."""
|
|
if x is False:
|
|
return False
|
|
if x is None or x is NIL:
|
|
return False
|
|
return True
|
|
|
|
|
|
def sx_str(*args):
|
|
"""SX str: concatenate string representations, skipping nil."""
|
|
parts = []
|
|
for a in args:
|
|
if a is None or a is NIL:
|
|
continue
|
|
parts.append(str(a))
|
|
return "".join(parts)
|
|
|
|
|
|
def sx_and(*args):
|
|
"""SX and: return last truthy value or first falsy."""
|
|
result = True
|
|
for a in args:
|
|
if not sx_truthy(a):
|
|
return a
|
|
result = a
|
|
return result
|
|
|
|
|
|
def sx_or(*args):
|
|
"""SX or: return first truthy value or last value."""
|
|
for a in args:
|
|
if sx_truthy(a):
|
|
return a
|
|
return args[-1] if args else False
|
|
|
|
|
|
def _sx_begin(*args):
|
|
"""Evaluate all args (for side effects), return last."""
|
|
return args[-1] if args else NIL
|
|
|
|
|
|
|
|
def _sx_case(match_val, pairs):
|
|
"""Case dispatch: pairs is [(test_val, body_fn), ...]. None test = else."""
|
|
for test, body_fn in pairs:
|
|
if test is None: # :else clause
|
|
return body_fn()
|
|
if match_val == test:
|
|
return body_fn()
|
|
return NIL
|
|
|
|
|
|
def _sx_fn(f):
|
|
"""Identity wrapper for multi-expression lambda bodies."""
|
|
return f
|
|
|
|
|
|
def type_of(x):
|
|
if x is None or x is NIL:
|
|
return "nil"
|
|
if isinstance(x, bool):
|
|
return "boolean"
|
|
if isinstance(x, (int, float)):
|
|
return "number"
|
|
if isinstance(x, SxExpr):
|
|
return "sx-expr"
|
|
if isinstance(x, str):
|
|
return "string"
|
|
if isinstance(x, Symbol):
|
|
return "symbol"
|
|
if isinstance(x, Keyword):
|
|
return "keyword"
|
|
if isinstance(x, _Thunk):
|
|
return "thunk"
|
|
if isinstance(x, Lambda):
|
|
return "lambda"
|
|
if isinstance(x, Component):
|
|
return "component"
|
|
if isinstance(x, Island):
|
|
return "island"
|
|
if isinstance(x, _Signal):
|
|
return "signal"
|
|
if isinstance(x, Macro):
|
|
return "macro"
|
|
if isinstance(x, _RawHTML):
|
|
return "raw-html"
|
|
if isinstance(x, Continuation):
|
|
return "continuation"
|
|
if isinstance(x, list):
|
|
return "list"
|
|
if isinstance(x, dict):
|
|
return "dict"
|
|
return "unknown"
|
|
|
|
|
|
def symbol_name(s):
|
|
return s.name
|
|
|
|
|
|
def keyword_name(k):
|
|
return k.name
|
|
|
|
|
|
def make_symbol(n):
|
|
return Symbol(n)
|
|
|
|
|
|
def make_keyword(n):
|
|
return Keyword(n)
|
|
|
|
|
|
def make_lambda(params, body, env):
|
|
return Lambda(params=list(params), body=body, closure=dict(env))
|
|
|
|
|
|
def make_component(name, params, has_children, body, env, affinity="auto"):
|
|
return Component(name=name, params=list(params), has_children=has_children,
|
|
body=body, closure=dict(env), affinity=str(affinity) if affinity else "auto")
|
|
|
|
|
|
def make_island(name, params, has_children, body, env):
|
|
return Island(name=name, params=list(params), has_children=has_children,
|
|
body=body, closure=dict(env))
|
|
|
|
|
|
def make_macro(params, rest_param, body, env, name=None):
|
|
return Macro(params=list(params), rest_param=rest_param, body=body,
|
|
closure=dict(env), name=name)
|
|
|
|
|
|
def make_handler_def(name, params, body, env):
|
|
return HandlerDef(name=name, params=list(params), body=body, closure=dict(env))
|
|
|
|
|
|
def make_query_def(name, params, doc, body, env):
|
|
return QueryDef(name=name, params=list(params), doc=doc, body=body, closure=dict(env))
|
|
|
|
|
|
def make_action_def(name, params, doc, body, env):
|
|
return ActionDef(name=name, params=list(params), doc=doc, body=body, closure=dict(env))
|
|
|
|
|
|
def make_page_def(name, slots, env):
|
|
path = slots.get("path", "")
|
|
auth_val = slots.get("auth", "public")
|
|
if isinstance(auth_val, Keyword):
|
|
auth = auth_val.name
|
|
elif isinstance(auth_val, list):
|
|
auth = [item.name if isinstance(item, Keyword) else str(item) for item in auth_val]
|
|
else:
|
|
auth = str(auth_val) if auth_val else "public"
|
|
layout = slots.get("layout")
|
|
if isinstance(layout, Keyword):
|
|
layout = layout.name
|
|
cache = None
|
|
stream_val = slots.get("stream")
|
|
stream = bool(trampoline(eval_expr(stream_val, env))) if stream_val is not None else False
|
|
return PageDef(
|
|
name=name, path=path, auth=auth, layout=layout, cache=cache,
|
|
data_expr=slots.get("data"), content_expr=slots.get("content"),
|
|
filter_expr=slots.get("filter"), aside_expr=slots.get("aside"),
|
|
menu_expr=slots.get("menu"), stream=stream,
|
|
fallback_expr=slots.get("fallback"), shell_expr=slots.get("shell"),
|
|
closure=dict(env),
|
|
)
|
|
|
|
|
|
def make_thunk(expr, env):
|
|
return _Thunk(expr, env)
|
|
|
|
|
|
def lambda_params(f):
|
|
return f.params
|
|
|
|
|
|
def lambda_body(f):
|
|
return f.body
|
|
|
|
|
|
def lambda_closure(f):
|
|
return f.closure
|
|
|
|
|
|
def lambda_name(f):
|
|
return f.name
|
|
|
|
|
|
def set_lambda_name(f, n):
|
|
f.name = n
|
|
|
|
|
|
def component_params(c):
|
|
return c.params
|
|
|
|
|
|
def component_body(c):
|
|
return c.body
|
|
|
|
|
|
def component_closure(c):
|
|
return c.closure
|
|
|
|
|
|
def component_has_children(c):
|
|
return c.has_children
|
|
|
|
|
|
def component_name(c):
|
|
return c.name
|
|
|
|
|
|
def component_affinity(c):
|
|
return getattr(c, 'affinity', 'auto')
|
|
|
|
|
|
def macro_params(m):
|
|
return m.params
|
|
|
|
|
|
def macro_rest_param(m):
|
|
return m.rest_param
|
|
|
|
|
|
def macro_body(m):
|
|
return m.body
|
|
|
|
|
|
def macro_closure(m):
|
|
return m.closure
|
|
|
|
|
|
def is_thunk(x):
|
|
return isinstance(x, _Thunk)
|
|
|
|
|
|
def thunk_expr(t):
|
|
return t.expr
|
|
|
|
|
|
def thunk_env(t):
|
|
return t.env
|
|
|
|
|
|
def is_callable(x):
|
|
return callable(x) or isinstance(x, Lambda)
|
|
|
|
|
|
def is_lambda(x):
|
|
return isinstance(x, Lambda)
|
|
|
|
|
|
def is_component(x):
|
|
return isinstance(x, Component)
|
|
|
|
|
|
def is_macro(x):
|
|
return isinstance(x, Macro)
|
|
|
|
|
|
def is_island(x):
|
|
return isinstance(x, Island)
|
|
|
|
|
|
def is_identical(a, b):
|
|
return a is b
|
|
|
|
|
|
# -------------------------------------------------------------------------
|
|
# Signal platform -- reactive state primitives
|
|
# -------------------------------------------------------------------------
|
|
|
|
class _Signal:
|
|
"""Reactive signal container."""
|
|
__slots__ = ("value", "subscribers", "deps")
|
|
def __init__(self, value):
|
|
self.value = value
|
|
self.subscribers = []
|
|
self.deps = []
|
|
|
|
|
|
class _TrackingContext:
|
|
"""Context for discovering signal dependencies."""
|
|
__slots__ = ("notify_fn", "deps")
|
|
def __init__(self, notify_fn):
|
|
self.notify_fn = notify_fn
|
|
self.deps = []
|
|
|
|
|
|
_tracking_context = None
|
|
|
|
|
|
def make_signal(value):
|
|
return _Signal(value)
|
|
|
|
|
|
def is_signal(x):
|
|
return isinstance(x, _Signal)
|
|
|
|
|
|
def signal_value(s):
|
|
return s.value if isinstance(s, _Signal) else s
|
|
|
|
|
|
def signal_set_value(s, v):
|
|
if isinstance(s, _Signal):
|
|
s.value = v
|
|
|
|
|
|
def signal_subscribers(s):
|
|
return list(s.subscribers) if isinstance(s, _Signal) else []
|
|
|
|
|
|
def signal_add_sub(s, fn):
|
|
if isinstance(s, _Signal) and fn not in s.subscribers:
|
|
s.subscribers.append(fn)
|
|
|
|
|
|
def signal_remove_sub(s, fn):
|
|
if isinstance(s, _Signal) and fn in s.subscribers:
|
|
s.subscribers.remove(fn)
|
|
|
|
|
|
def signal_deps(s):
|
|
return list(s.deps) if isinstance(s, _Signal) else []
|
|
|
|
|
|
def signal_set_deps(s, deps):
|
|
if isinstance(s, _Signal):
|
|
s.deps = list(deps) if isinstance(deps, list) else []
|
|
|
|
|
|
def set_tracking_context(ctx):
|
|
global _tracking_context
|
|
_tracking_context = ctx
|
|
|
|
|
|
def get_tracking_context():
|
|
global _tracking_context
|
|
return _tracking_context if _tracking_context is not None else NIL
|
|
|
|
|
|
def make_tracking_context(notify_fn):
|
|
return _TrackingContext(notify_fn)
|
|
|
|
|
|
def tracking_context_deps(ctx):
|
|
return ctx.deps if isinstance(ctx, _TrackingContext) else []
|
|
|
|
|
|
def tracking_context_add_dep(ctx, s):
|
|
if isinstance(ctx, _TrackingContext) and s not in ctx.deps:
|
|
ctx.deps.append(s)
|
|
|
|
|
|
def tracking_context_notify_fn(ctx):
|
|
return ctx.notify_fn if isinstance(ctx, _TrackingContext) else NIL
|
|
|
|
|
|
def json_serialize(obj):
|
|
import json
|
|
try:
|
|
return json.dumps(obj)
|
|
except (TypeError, ValueError):
|
|
return "{}"
|
|
|
|
|
|
def is_empty_dict(d):
|
|
if not isinstance(d, dict):
|
|
return True
|
|
return len(d) == 0
|
|
|
|
|
|
def env_has(env, name):
|
|
return name in env
|
|
|
|
|
|
def env_get(env, name):
|
|
return env.get(name, NIL)
|
|
|
|
|
|
def env_set(env, name, val):
|
|
env[name] = val
|
|
|
|
|
|
def env_extend(env):
|
|
return dict(env)
|
|
|
|
|
|
def env_merge(base, overlay):
|
|
result = dict(base)
|
|
result.update(overlay)
|
|
return result
|
|
|
|
|
|
def dict_set(d, k, v):
|
|
d[k] = v
|
|
|
|
|
|
def dict_get(d, k):
|
|
v = d.get(k)
|
|
return v if v is not None else NIL
|
|
|
|
|
|
def dict_has(d, k):
|
|
return k in d
|
|
|
|
|
|
def dict_delete(d, k):
|
|
d.pop(k, None)
|
|
|
|
|
|
def is_render_expr(expr):
|
|
"""Check if expression is an HTML element, component, or fragment."""
|
|
if not isinstance(expr, list) or not expr:
|
|
return False
|
|
h = expr[0]
|
|
if not isinstance(h, Symbol):
|
|
return False
|
|
n = h.name
|
|
return (n == "<>" or n == "raw!" or
|
|
n.startswith("~") or n.startswith("html:") or
|
|
n in HTML_TAGS or
|
|
("-" in n and len(expr) > 1 and isinstance(expr[1], Keyword)))
|
|
|
|
|
|
# Render dispatch -- set by adapter
|
|
_render_expr_fn = None
|
|
|
|
|
|
def render_expr(expr, env):
|
|
if _render_expr_fn:
|
|
return _render_expr_fn(expr, env)
|
|
return expr
|
|
|
|
|
|
def strip_prefix(s, prefix):
|
|
return s[len(prefix):] if s.startswith(prefix) else s
|
|
|
|
|
|
def error(msg):
|
|
raise EvalError(msg)
|
|
|
|
|
|
def inspect(x):
|
|
return repr(x)
|
|
|
|
|
|
def escape_html(s):
|
|
s = str(s)
|
|
return s.replace("&", "&").replace("<", "<").replace(">", ">").replace('"', """)
|
|
|
|
|
|
def escape_attr(s):
|
|
return escape_html(s)
|
|
|
|
|
|
def raw_html_content(x):
|
|
return x.html
|
|
|
|
|
|
def make_raw_html(s):
|
|
return _RawHTML(s)
|
|
|
|
|
|
class EvalError(Exception):
|
|
pass
|
|
|
|
|
|
def _sx_append(lst, item):
|
|
"""Append item to list, return the item (for expression context)."""
|
|
lst.append(item)
|
|
return item
|
|
|
|
|
|
def _sx_dict_set(d, k, v):
|
|
"""Set key in dict, return the value (for expression context)."""
|
|
d[k] = v
|
|
return v
|
|
|
|
|
|
def _sx_set_attr(obj, attr, val):
|
|
"""Set attribute on object, return the value."""
|
|
setattr(obj, attr, val)
|
|
return val
|
|
|
|
|
|
def _sx_cell_set(cells, name, val):
|
|
"""Set a mutable cell value. Returns the value."""
|
|
cells[name] = val
|
|
return val
|
|
|
|
|
|
def escape_string(s):
|
|
"""Escape a string for SX serialization."""
|
|
return (str(s)
|
|
.replace("\\\\", "\\\\\\\\")
|
|
.replace('"', '\\\\"')
|
|
.replace("\\n", "\\\\n")
|
|
.replace("\\t", "\\\\t")
|
|
.replace("</script", "<\\\\/script"))
|
|
|
|
|
|
def serialize(val):
|
|
"""Serialize an SX value to SX source text."""
|
|
t = type_of(val)
|
|
if t == "sx-expr":
|
|
return val.source
|
|
if t == "nil":
|
|
return "nil"
|
|
if t == "boolean":
|
|
return "true" if val else "false"
|
|
if t == "number":
|
|
return str(val)
|
|
if t == "string":
|
|
return '"' + escape_string(val) + '"'
|
|
if t == "symbol":
|
|
return symbol_name(val)
|
|
if t == "keyword":
|
|
return ":" + keyword_name(val)
|
|
if t == "raw-html":
|
|
escaped = escape_string(raw_html_content(val))
|
|
return '(raw! "' + escaped + '")'
|
|
if t == "list":
|
|
if not val:
|
|
return "()"
|
|
items = [serialize(x) for x in val]
|
|
return "(" + " ".join(items) + ")"
|
|
if t == "dict":
|
|
items = []
|
|
for k, v in val.items():
|
|
items.append(":" + str(k))
|
|
items.append(serialize(v))
|
|
return "{" + " ".join(items) + "}"
|
|
if callable(val):
|
|
return "nil"
|
|
return str(val)
|
|
|
|
|
|
_SPECIAL_FORM_NAMES = frozenset([
|
|
"if", "when", "cond", "case", "and", "or",
|
|
"let", "let*", "lambda", "fn",
|
|
"define", "defcomp", "defmacro", "defstyle",
|
|
"defhandler", "defpage", "defquery", "defaction", "defrelation",
|
|
"begin", "do", "quote", "quasiquote",
|
|
"->", "set!",
|
|
])
|
|
|
|
_HO_FORM_NAMES = frozenset([
|
|
"map", "map-indexed", "filter", "reduce",
|
|
"some", "every?", "for-each",
|
|
])
|
|
|
|
def is_special_form(name):
|
|
return name in _SPECIAL_FORM_NAMES
|
|
|
|
def is_ho_form(name):
|
|
return name in _HO_FORM_NAMES
|
|
|
|
|
|
def aser_special(name, expr, env):
|
|
"""Evaluate a special/HO form in aser mode.
|
|
|
|
Control flow forms evaluate conditions normally but render branches
|
|
through aser (serializing tags/components instead of rendering HTML).
|
|
Definition forms evaluate for side effects and return nil.
|
|
"""
|
|
# Control flow — evaluate conditions, aser branches
|
|
args = expr[1:]
|
|
if name == "if":
|
|
cond_val = trampoline(eval_expr(args[0], env))
|
|
if sx_truthy(cond_val):
|
|
return aser(args[1], env)
|
|
return aser(args[2], env) if _b_len(args) > 2 else NIL
|
|
if name == "when":
|
|
cond_val = trampoline(eval_expr(args[0], env))
|
|
if sx_truthy(cond_val):
|
|
result = NIL
|
|
for body in args[1:]:
|
|
result = aser(body, env)
|
|
return result
|
|
return NIL
|
|
if name == "cond":
|
|
clauses = args
|
|
if clauses and isinstance(clauses[0], _b_list) and _b_len(clauses[0]) == 2:
|
|
for clause in clauses:
|
|
test = clause[0]
|
|
if isinstance(test, Symbol) and test.name in ("else", ":else"):
|
|
return aser(clause[1], env)
|
|
if isinstance(test, Keyword) and test.name == "else":
|
|
return aser(clause[1], env)
|
|
if sx_truthy(trampoline(eval_expr(test, env))):
|
|
return aser(clause[1], env)
|
|
else:
|
|
i = 0
|
|
while i < _b_len(clauses) - 1:
|
|
test = clauses[i]
|
|
result = clauses[i + 1]
|
|
if isinstance(test, Keyword) and test.name == "else":
|
|
return aser(result, env)
|
|
if isinstance(test, Symbol) and test.name in (":else", "else"):
|
|
return aser(result, env)
|
|
if sx_truthy(trampoline(eval_expr(test, env))):
|
|
return aser(result, env)
|
|
i += 2
|
|
return NIL
|
|
if name == "case":
|
|
match_val = trampoline(eval_expr(args[0], env))
|
|
clauses = args[1:]
|
|
i = 0
|
|
while i < _b_len(clauses) - 1:
|
|
test = clauses[i]
|
|
result = clauses[i + 1]
|
|
if isinstance(test, Keyword) and test.name == "else":
|
|
return aser(result, env)
|
|
if isinstance(test, Symbol) and test.name in (":else", "else"):
|
|
return aser(result, env)
|
|
if match_val == trampoline(eval_expr(test, env)):
|
|
return aser(result, env)
|
|
i += 2
|
|
return NIL
|
|
if name in ("let", "let*"):
|
|
bindings = args[0]
|
|
local = _b_dict(env)
|
|
if isinstance(bindings, _b_list):
|
|
if bindings and isinstance(bindings[0], _b_list):
|
|
for b in bindings:
|
|
var = b[0]
|
|
vname = var.name if isinstance(var, Symbol) else var
|
|
local[vname] = trampoline(eval_expr(b[1], local))
|
|
else:
|
|
for i in _b_range(0, _b_len(bindings), 2):
|
|
var = bindings[i]
|
|
vname = var.name if isinstance(var, Symbol) else var
|
|
local[vname] = trampoline(eval_expr(bindings[i + 1], local))
|
|
result = NIL
|
|
for body in args[1:]:
|
|
result = aser(body, local)
|
|
return result
|
|
if name in ("begin", "do"):
|
|
result = NIL
|
|
for body in args:
|
|
result = aser(body, env)
|
|
return result
|
|
if name == "and":
|
|
result = True
|
|
for arg in args:
|
|
result = trampoline(eval_expr(arg, env))
|
|
if not sx_truthy(result):
|
|
return result
|
|
return result
|
|
if name == "or":
|
|
result = False
|
|
for arg in args:
|
|
result = trampoline(eval_expr(arg, env))
|
|
if sx_truthy(result):
|
|
return result
|
|
return result
|
|
# HO forms in aser mode — map/for-each render through aser
|
|
if name == "map":
|
|
fn = trampoline(eval_expr(args[0], env))
|
|
coll = trampoline(eval_expr(args[1], env))
|
|
results = []
|
|
for item in coll:
|
|
if isinstance(fn, Lambda):
|
|
local = _b_dict(fn.closure)
|
|
local.update(env)
|
|
local[fn.params[0]] = item
|
|
results.append(aser(fn.body, local))
|
|
elif callable(fn):
|
|
results.append(fn(item))
|
|
else:
|
|
raise EvalError("map requires callable")
|
|
return results
|
|
if name == "map-indexed":
|
|
fn = trampoline(eval_expr(args[0], env))
|
|
coll = trampoline(eval_expr(args[1], env))
|
|
results = []
|
|
for i, item in enumerate(coll):
|
|
if isinstance(fn, Lambda):
|
|
local = _b_dict(fn.closure)
|
|
local.update(env)
|
|
local[fn.params[0]] = i
|
|
local[fn.params[1]] = item
|
|
results.append(aser(fn.body, local))
|
|
elif callable(fn):
|
|
results.append(fn(i, item))
|
|
else:
|
|
raise EvalError("map-indexed requires callable")
|
|
return results
|
|
if name == "for-each":
|
|
fn = trampoline(eval_expr(args[0], env))
|
|
coll = trampoline(eval_expr(args[1], env))
|
|
results = []
|
|
for item in coll:
|
|
if isinstance(fn, Lambda):
|
|
local = _b_dict(fn.closure)
|
|
local.update(env)
|
|
local[fn.params[0]] = item
|
|
results.append(aser(fn.body, local))
|
|
elif callable(fn):
|
|
fn(item)
|
|
return results if results else NIL
|
|
# Definition forms — evaluate for side effects
|
|
if name in ("define", "defcomp", "defmacro", "defstyle",
|
|
"defhandler", "defpage", "defquery", "defaction", "defrelation"):
|
|
trampoline(eval_expr(expr, env))
|
|
return NIL
|
|
# Lambda/fn, quote, quasiquote, set!, -> : evaluate normally
|
|
result = eval_expr(expr, env)
|
|
return trampoline(result)
|
|
'''
|
|
|
|
# ---------------------------------------------------------------------------
|
|
# Primitive modules — Python implementations keyed by spec module name.
|
|
# core.* modules are always included; stdlib.* are opt-in.
|
|
# ---------------------------------------------------------------------------
|
|
|
|
PRIMITIVES_PY_MODULES: dict[str, str] = {
|
|
"core.arithmetic": '''
|
|
# core.arithmetic
|
|
PRIMITIVES["+"] = lambda *args: _b_sum(args)
|
|
PRIMITIVES["-"] = lambda a, b=None: -a if b is None else a - b
|
|
PRIMITIVES["*"] = lambda *args: _sx_mul(*args)
|
|
PRIMITIVES["/"] = lambda a, b: a / b
|
|
PRIMITIVES["mod"] = lambda a, b: a % b
|
|
PRIMITIVES["inc"] = lambda n: n + 1
|
|
PRIMITIVES["dec"] = lambda n: n - 1
|
|
PRIMITIVES["abs"] = _b_abs
|
|
PRIMITIVES["floor"] = math.floor
|
|
PRIMITIVES["ceil"] = math.ceil
|
|
PRIMITIVES["round"] = _b_round
|
|
PRIMITIVES["min"] = _b_min
|
|
PRIMITIVES["max"] = _b_max
|
|
PRIMITIVES["sqrt"] = math.sqrt
|
|
PRIMITIVES["pow"] = lambda x, n: x ** n
|
|
PRIMITIVES["clamp"] = lambda x, lo, hi: _b_max(lo, _b_min(hi, x))
|
|
|
|
def _sx_mul(*args):
|
|
r = 1
|
|
for a in args:
|
|
r *= a
|
|
return r
|
|
''',
|
|
|
|
"core.comparison": '''
|
|
# core.comparison
|
|
PRIMITIVES["="] = lambda a, b: a == b
|
|
PRIMITIVES["!="] = lambda a, b: a != b
|
|
PRIMITIVES["<"] = lambda a, b: a < b
|
|
PRIMITIVES[">"] = lambda a, b: a > b
|
|
PRIMITIVES["<="] = lambda a, b: a <= b
|
|
PRIMITIVES[">="] = lambda a, b: a >= b
|
|
''',
|
|
|
|
"core.logic": '''
|
|
# core.logic
|
|
PRIMITIVES["not"] = lambda x: not sx_truthy(x)
|
|
''',
|
|
|
|
"core.predicates": '''
|
|
# core.predicates
|
|
PRIMITIVES["nil?"] = lambda x: x is None or x is NIL
|
|
PRIMITIVES["number?"] = lambda x: isinstance(x, (int, float)) and not isinstance(x, bool)
|
|
PRIMITIVES["string?"] = lambda x: isinstance(x, str)
|
|
PRIMITIVES["list?"] = lambda x: isinstance(x, _b_list)
|
|
PRIMITIVES["dict?"] = lambda x: isinstance(x, _b_dict)
|
|
PRIMITIVES["continuation?"] = lambda x: isinstance(x, Continuation)
|
|
PRIMITIVES["empty?"] = lambda c: (
|
|
c is None or c is NIL or
|
|
(isinstance(c, (_b_list, str, _b_dict)) and _b_len(c) == 0)
|
|
)
|
|
PRIMITIVES["contains?"] = lambda c, k: (
|
|
str(k) in c if isinstance(c, str) else
|
|
k in c
|
|
)
|
|
PRIMITIVES["odd?"] = lambda n: n % 2 != 0
|
|
PRIMITIVES["even?"] = lambda n: n % 2 == 0
|
|
PRIMITIVES["zero?"] = lambda n: n == 0
|
|
''',
|
|
|
|
"core.strings": '''
|
|
# core.strings
|
|
PRIMITIVES["str"] = sx_str
|
|
PRIMITIVES["upper"] = lambda s: str(s).upper()
|
|
PRIMITIVES["lower"] = lambda s: str(s).lower()
|
|
PRIMITIVES["trim"] = lambda s: str(s).strip()
|
|
PRIMITIVES["split"] = lambda s, sep=" ": str(s).split(sep)
|
|
PRIMITIVES["join"] = lambda sep, coll: sep.join(coll)
|
|
PRIMITIVES["replace"] = lambda s, old, new: s.replace(old, new)
|
|
PRIMITIVES["index-of"] = lambda s, needle, start=0: str(s).find(needle, start)
|
|
PRIMITIVES["starts-with?"] = lambda s, p: str(s).startswith(p)
|
|
PRIMITIVES["ends-with?"] = lambda s, p: str(s).endswith(p)
|
|
PRIMITIVES["slice"] = lambda c, a, b=None: c[a:b] if b is not None else c[a:]
|
|
PRIMITIVES["concat"] = lambda *args: _b_sum((a for a in args if a), [])
|
|
''',
|
|
|
|
"core.collections": '''
|
|
# core.collections
|
|
PRIMITIVES["list"] = lambda *args: _b_list(args)
|
|
PRIMITIVES["dict"] = lambda *args: {args[i]: args[i+1] for i in _b_range(0, _b_len(args)-1, 2)}
|
|
PRIMITIVES["range"] = lambda a, b, step=1: _b_list(_b_range(_b_int(a), _b_int(b), _b_int(step)))
|
|
PRIMITIVES["get"] = lambda c, k, default=NIL: c.get(k, default) if isinstance(c, _b_dict) else (c[k] if isinstance(c, (_b_list, str)) and isinstance(k, _b_int) and 0 <= k < _b_len(c) else (c.get(k, default) if hasattr(c, 'get') else default))
|
|
PRIMITIVES["len"] = lambda c: _b_len(c) if c is not None and c is not NIL else 0
|
|
PRIMITIVES["first"] = lambda c: c[0] if c and _b_len(c) > 0 else NIL
|
|
PRIMITIVES["last"] = lambda c: c[-1] if c and _b_len(c) > 0 else NIL
|
|
PRIMITIVES["rest"] = lambda c: c[1:] if c else []
|
|
PRIMITIVES["nth"] = lambda c, n: c[n] if c and 0 <= n < _b_len(c) else NIL
|
|
PRIMITIVES["cons"] = lambda x, c: [x] + (c or [])
|
|
PRIMITIVES["append"] = lambda c, x: (c or []) + [x]
|
|
PRIMITIVES["chunk-every"] = lambda c, n: [c[i:i+n] for i in _b_range(0, _b_len(c), n)]
|
|
PRIMITIVES["zip-pairs"] = lambda c: [[c[i], c[i+1]] for i in _b_range(_b_len(c)-1)]
|
|
''',
|
|
|
|
"core.dict": '''
|
|
# core.dict
|
|
PRIMITIVES["keys"] = lambda d: _b_list((d or {}).keys())
|
|
PRIMITIVES["vals"] = lambda d: _b_list((d or {}).values())
|
|
PRIMITIVES["merge"] = lambda *args: _sx_merge_dicts(*args)
|
|
PRIMITIVES["has-key?"] = lambda d, k: isinstance(d, _b_dict) and k in d
|
|
PRIMITIVES["assoc"] = lambda d, *kvs: _sx_assoc(d, *kvs)
|
|
PRIMITIVES["dissoc"] = lambda d, *ks: {k: v for k, v in d.items() if k not in ks}
|
|
PRIMITIVES["into"] = lambda target, coll: (_b_list(coll) if isinstance(target, _b_list) else {p[0]: p[1] for p in coll if isinstance(p, _b_list) and _b_len(p) >= 2})
|
|
PRIMITIVES["zip"] = lambda *colls: [_b_list(t) for t in _b_zip(*colls)]
|
|
|
|
def _sx_merge_dicts(*args):
|
|
out = {}
|
|
for d in args:
|
|
if d and d is not NIL and isinstance(d, _b_dict):
|
|
out.update(d)
|
|
return out
|
|
|
|
def _sx_assoc(d, *kvs):
|
|
out = _b_dict(d) if d and d is not NIL else {}
|
|
for i in _b_range(0, _b_len(kvs) - 1, 2):
|
|
out[kvs[i]] = kvs[i + 1]
|
|
return out
|
|
''',
|
|
|
|
"stdlib.format": '''
|
|
# stdlib.format
|
|
PRIMITIVES["format-decimal"] = lambda v, p=2: f"{float(v):.{p}f}"
|
|
PRIMITIVES["parse-int"] = lambda v, d=0: _sx_parse_int(v, d)
|
|
PRIMITIVES["parse-datetime"] = lambda s: str(s) if s else NIL
|
|
|
|
def _sx_parse_int(v, default=0):
|
|
if v is None or v is NIL:
|
|
return default
|
|
s = str(v).strip()
|
|
# Match JS parseInt: extract leading integer portion
|
|
import re as _re
|
|
m = _re.match(r'^[+-]?\\d+', s)
|
|
if m:
|
|
return _b_int(m.group())
|
|
return default
|
|
''',
|
|
|
|
"stdlib.text": '''
|
|
# stdlib.text
|
|
PRIMITIVES["pluralize"] = lambda n, s="", p="s": s if n == 1 else p
|
|
PRIMITIVES["escape"] = escape_html
|
|
PRIMITIVES["strip-tags"] = lambda s: _strip_tags(str(s))
|
|
|
|
import re as _re
|
|
def _strip_tags(s):
|
|
return _re.sub(r"<[^>]+>", "", s)
|
|
''',
|
|
|
|
"stdlib.style": '''
|
|
# stdlib.style — stubs (CSSX needs full runtime)
|
|
''',
|
|
|
|
"stdlib.debug": '''
|
|
# stdlib.debug
|
|
PRIMITIVES["assert"] = lambda cond, msg="Assertion failed": (_ for _ in ()).throw(RuntimeError(f"Assertion error: {msg}")) if not sx_truthy(cond) else True
|
|
''',
|
|
}
|
|
|
|
_ALL_PY_MODULES = list(PRIMITIVES_PY_MODULES.keys())
|
|
|
|
|
|
def _assemble_primitives_py(modules: list[str] | None = None) -> str:
|
|
"""Assemble Python primitive code from selected modules."""
|
|
if modules is None:
|
|
modules = _ALL_PY_MODULES
|
|
parts = []
|
|
for mod in modules:
|
|
if mod in PRIMITIVES_PY_MODULES:
|
|
parts.append(PRIMITIVES_PY_MODULES[mod])
|
|
return "\n".join(parts)
|
|
|
|
|
|
PRIMITIVES_PY_PRE = '''
|
|
# =========================================================================
|
|
# Primitives
|
|
# =========================================================================
|
|
|
|
# Save builtins before shadowing
|
|
_b_len = len
|
|
_b_map = map
|
|
_b_filter = filter
|
|
_b_range = range
|
|
_b_list = list
|
|
_b_dict = dict
|
|
_b_max = max
|
|
_b_min = min
|
|
_b_round = round
|
|
_b_abs = abs
|
|
_b_sum = sum
|
|
_b_zip = zip
|
|
_b_int = int
|
|
|
|
PRIMITIVES = {}
|
|
'''
|
|
|
|
PRIMITIVES_PY_POST = '''
|
|
def is_primitive(name):
|
|
if name in PRIMITIVES:
|
|
return True
|
|
from shared.sx.primitives import get_primitive as _ext_get
|
|
return _ext_get(name) is not None
|
|
|
|
def get_primitive(name):
|
|
p = PRIMITIVES.get(name)
|
|
if p is not None:
|
|
return p
|
|
from shared.sx.primitives import get_primitive as _ext_get
|
|
return _ext_get(name)
|
|
|
|
# Higher-order helpers used by transpiled code
|
|
def map(fn, coll):
|
|
return [fn(x) for x in coll]
|
|
|
|
def map_indexed(fn, coll):
|
|
return [fn(i, item) for i, item in enumerate(coll)]
|
|
|
|
def filter(fn, coll):
|
|
return [x for x in coll if sx_truthy(fn(x))]
|
|
|
|
def reduce(fn, init, coll):
|
|
acc = init
|
|
for item in coll:
|
|
acc = fn(acc, item)
|
|
return acc
|
|
|
|
def some(fn, coll):
|
|
for item in coll:
|
|
r = fn(item)
|
|
if sx_truthy(r):
|
|
return r
|
|
return NIL
|
|
|
|
def every_p(fn, coll):
|
|
for item in coll:
|
|
if not sx_truthy(fn(item)):
|
|
return False
|
|
return True
|
|
|
|
def for_each(fn, coll):
|
|
for item in coll:
|
|
fn(item)
|
|
return NIL
|
|
|
|
def for_each_indexed(fn, coll):
|
|
for i, item in enumerate(coll):
|
|
fn(i, item)
|
|
return NIL
|
|
|
|
def map_dict(fn, d):
|
|
return {k: fn(k, v) for k, v in d.items()}
|
|
|
|
# Aliases used directly by transpiled code
|
|
first = PRIMITIVES["first"]
|
|
last = PRIMITIVES["last"]
|
|
rest = PRIMITIVES["rest"]
|
|
nth = PRIMITIVES["nth"]
|
|
len = PRIMITIVES["len"]
|
|
is_nil = PRIMITIVES["nil?"]
|
|
empty_p = PRIMITIVES["empty?"]
|
|
contains_p = PRIMITIVES["contains?"]
|
|
starts_with_p = PRIMITIVES["starts-with?"]
|
|
ends_with_p = PRIMITIVES["ends-with?"]
|
|
slice = PRIMITIVES["slice"]
|
|
get = PRIMITIVES["get"]
|
|
append = PRIMITIVES["append"]
|
|
cons = PRIMITIVES["cons"]
|
|
keys = PRIMITIVES["keys"]
|
|
join = PRIMITIVES["join"]
|
|
range = PRIMITIVES["range"]
|
|
apply = lambda f, args: f(*args)
|
|
assoc = PRIMITIVES["assoc"]
|
|
concat = PRIMITIVES["concat"]
|
|
split = PRIMITIVES["split"]
|
|
length = PRIMITIVES["len"]
|
|
merge = PRIMITIVES["merge"]
|
|
trim = PRIMITIVES["trim"]
|
|
replace = PRIMITIVES["replace"]
|
|
parse_int = PRIMITIVES["parse-int"]
|
|
upper = PRIMITIVES["upper"]
|
|
has_key_p = PRIMITIVES["has-key?"]
|
|
dissoc = PRIMITIVES["dissoc"]
|
|
'''
|
|
|
|
|
|
PLATFORM_DEPS_PY = (
|
|
'\n'
|
|
'# =========================================================================\n'
|
|
'# Platform: deps module — component dependency analysis\n'
|
|
'# =========================================================================\n'
|
|
'\n'
|
|
'import re as _re\n'
|
|
'\n'
|
|
'def component_deps(c):\n'
|
|
' """Return cached deps list for a component (may be empty)."""\n'
|
|
' return list(c.deps) if hasattr(c, "deps") and c.deps else []\n'
|
|
'\n'
|
|
'def component_set_deps(c, deps):\n'
|
|
' """Cache deps on a component."""\n'
|
|
' c.deps = set(deps) if not isinstance(deps, set) else deps\n'
|
|
'\n'
|
|
'def component_css_classes(c):\n'
|
|
' """Return pre-scanned CSS class list for a component."""\n'
|
|
' return list(c.css_classes) if hasattr(c, "css_classes") and c.css_classes else []\n'
|
|
'\n'
|
|
'def env_components(env):\n'
|
|
' """Return list of component/macro names in an environment."""\n'
|
|
' return [k for k, v in env.items()\n'
|
|
' if isinstance(v, (Component, Macro))]\n'
|
|
'\n'
|
|
'def regex_find_all(pattern, source):\n'
|
|
' """Return list of capture group 1 matches."""\n'
|
|
' return [m.group(1) for m in _re.finditer(pattern, source)]\n'
|
|
'\n'
|
|
'def scan_css_classes(source):\n'
|
|
' """Extract CSS class strings from SX source."""\n'
|
|
' classes = set()\n'
|
|
' for m in _re.finditer(r\':class\\s+"([^"]*)"\', source):\n'
|
|
' classes.update(m.group(1).split())\n'
|
|
' for m in _re.finditer(r\':class\\s+\\(str\\s+((?:"[^"]*"\\s*)+)\\)\', source):\n'
|
|
' for s in _re.findall(r\'"([^"]*)"\', m.group(1)):\n'
|
|
' classes.update(s.split())\n'
|
|
' for m in _re.finditer(r\';;\\s*@css\\s+(.+)\', source):\n'
|
|
' classes.update(m.group(1).split())\n'
|
|
' return list(classes)\n'
|
|
'\n'
|
|
'def component_io_refs(c):\n'
|
|
' """Return cached IO refs list for a component (may be empty)."""\n'
|
|
' return list(c.io_refs) if hasattr(c, "io_refs") and c.io_refs else []\n'
|
|
'\n'
|
|
'def component_set_io_refs(c, refs):\n'
|
|
' """Cache IO refs on a component."""\n'
|
|
' c.io_refs = set(refs) if not isinstance(refs, set) else refs\n'
|
|
)
|
|
|
|
|
|
FIXUPS_PY = '''
|
|
# =========================================================================
|
|
# Fixups -- wire up render adapter dispatch
|
|
# =========================================================================
|
|
|
|
def _setup_html_adapter():
|
|
global _render_expr_fn
|
|
_render_expr_fn = lambda expr, env: render_list_to_html(expr, env)
|
|
|
|
def _setup_sx_adapter():
|
|
global _render_expr_fn
|
|
_render_expr_fn = lambda expr, env: aser_list(expr, env)
|
|
|
|
|
|
# Wrap aser_call and aser_fragment to return SxExpr
|
|
# so serialize() won't double-quote them
|
|
_orig_aser_call = None
|
|
_orig_aser_fragment = None
|
|
|
|
def _wrap_aser_outputs():
|
|
global aser_call, aser_fragment, _orig_aser_call, _orig_aser_fragment
|
|
_orig_aser_call = aser_call
|
|
_orig_aser_fragment = aser_fragment
|
|
def _aser_call_wrapped(name, args, env):
|
|
result = _orig_aser_call(name, args, env)
|
|
return SxExpr(result) if isinstance(result, str) else result
|
|
def _aser_fragment_wrapped(children, env):
|
|
result = _orig_aser_fragment(children, env)
|
|
return SxExpr(result) if isinstance(result, str) else result
|
|
aser_call = _aser_call_wrapped
|
|
aser_fragment = _aser_fragment_wrapped
|
|
'''
|
|
|
|
CONTINUATIONS_PY = '''
|
|
# =========================================================================
|
|
# Extension: delimited continuations (shift/reset)
|
|
# =========================================================================
|
|
|
|
_RESET_RESUME = [] # stack of resume values; empty = not resuming
|
|
|
|
_SPECIAL_FORM_NAMES = _SPECIAL_FORM_NAMES | frozenset(["reset", "shift"])
|
|
|
|
def sf_reset(args, env):
|
|
"""(reset body) -- establish a continuation delimiter."""
|
|
body = first(args)
|
|
try:
|
|
return trampoline(eval_expr(body, env))
|
|
except _ShiftSignal as sig:
|
|
def cont_fn(value=NIL):
|
|
_RESET_RESUME.append(value)
|
|
try:
|
|
return trampoline(eval_expr(body, env))
|
|
finally:
|
|
_RESET_RESUME.pop()
|
|
k = Continuation(cont_fn)
|
|
sig_env = dict(sig.env)
|
|
sig_env[sig.k_name] = k
|
|
return trampoline(eval_expr(sig.body, sig_env))
|
|
|
|
def sf_shift(args, env):
|
|
"""(shift k body) -- capture continuation to nearest reset."""
|
|
if _RESET_RESUME:
|
|
return _RESET_RESUME[-1]
|
|
k_name = symbol_name(first(args))
|
|
body = nth(args, 1)
|
|
raise _ShiftSignal(k_name, body, env)
|
|
|
|
# Wrap eval_list to inject shift/reset dispatch
|
|
_base_eval_list = eval_list
|
|
def _eval_list_with_continuations(expr, env):
|
|
head = first(expr)
|
|
if type_of(head) == "symbol":
|
|
name = symbol_name(head)
|
|
args = rest(expr)
|
|
if name == "reset":
|
|
return sf_reset(args, env)
|
|
if name == "shift":
|
|
return sf_shift(args, env)
|
|
return _base_eval_list(expr, env)
|
|
eval_list = _eval_list_with_continuations
|
|
|
|
# Inject into aser_special
|
|
_base_aser_special = aser_special
|
|
def _aser_special_with_continuations(name, expr, env):
|
|
if name == "reset":
|
|
return sf_reset(expr[1:], env)
|
|
if name == "shift":
|
|
return sf_shift(expr[1:], env)
|
|
return _base_aser_special(name, expr, env)
|
|
aser_special = _aser_special_with_continuations
|
|
'''
|
|
|
|
|
|
def public_api_py(has_html: bool, has_sx: bool, has_deps: bool = False) -> str:
|
|
lines = [
|
|
'',
|
|
'# =========================================================================',
|
|
'# Public API',
|
|
'# =========================================================================',
|
|
'',
|
|
]
|
|
if has_sx:
|
|
lines.append('# Wrap aser outputs to return SxExpr')
|
|
lines.append('_wrap_aser_outputs()')
|
|
lines.append('')
|
|
if has_html:
|
|
lines.append('# Set HTML as default adapter')
|
|
lines.append('_setup_html_adapter()')
|
|
lines.append('')
|
|
lines.extend([
|
|
'def evaluate(expr, env=None):',
|
|
' """Evaluate expr in env and return the result."""',
|
|
' if env is None:',
|
|
' env = {}',
|
|
' result = eval_expr(expr, env)',
|
|
' while is_thunk(result):',
|
|
' result = eval_expr(thunk_expr(result), thunk_env(result))',
|
|
' return result',
|
|
'',
|
|
'',
|
|
'def render(expr, env=None):',
|
|
' """Render expr to HTML string."""',
|
|
' if env is None:',
|
|
' env = {}',
|
|
' return render_to_html(expr, env)',
|
|
'',
|
|
'',
|
|
'def make_env(**kwargs):',
|
|
' """Create an environment dict with initial bindings."""',
|
|
' return dict(kwargs)',
|
|
])
|
|
return '\n'.join(lines)
|
|
|
|
|
|
# ---------------------------------------------------------------------------
|
|
# Main
|
|
# ---------------------------------------------------------------------------
|
|
|
|
def main():
|
|
import argparse
|
|
parser = argparse.ArgumentParser(description="Bootstrap SX spec -> Python")
|
|
parser.add_argument(
|
|
"--adapters",
|
|
default=None,
|
|
help="Comma-separated adapter names (html,sx). Default: all server-side.",
|
|
)
|
|
parser.add_argument(
|
|
"--modules",
|
|
default=None,
|
|
help="Comma-separated primitive modules (core.* always included). Default: all.",
|
|
)
|
|
parser.add_argument(
|
|
"--extensions",
|
|
default=None,
|
|
help="Comma-separated extensions (continuations). Default: none.",
|
|
)
|
|
parser.add_argument(
|
|
"--spec-modules",
|
|
default=None,
|
|
help="Comma-separated spec modules (deps,engine). Default: none.",
|
|
)
|
|
args = parser.parse_args()
|
|
adapters = args.adapters.split(",") if args.adapters else None
|
|
modules = args.modules.split(",") if args.modules else None
|
|
extensions = args.extensions.split(",") if args.extensions else None
|
|
spec_modules = args.spec_modules.split(",") if args.spec_modules else None
|
|
print(compile_ref_to_py(adapters, modules, extensions, spec_modules))
|
|
|
|
|
|
if __name__ == "__main__":
|
|
main()
|