Phase 3: Move host files to hosts/python/ and hosts/javascript/

Python: bootstrap.py, platform.py, transpiler.sx, boundary_parser.py, tests/
JavaScript: bootstrap.py, cli.py, platform.py, transpiler.sx
Both bootstrappers verified — build from new locations, output to shared/.

Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
This commit is contained in:
2026-03-15 02:18:56 +00:00
parent 05f7b10864
commit 7036621be8
15 changed files with 14 additions and 2097 deletions

1639
hosts/python/bootstrap.py Normal file

File diff suppressed because it is too large Load Diff

View File

@@ -0,0 +1,356 @@
"""
Parse boundary declarations from multiple sources.
Three tiers of boundary files:
1. shared/sx/ref/boundary.sx — core SX language I/O contract
2. shared/sx/ref/boundary-app.sx — deployment-specific layout I/O
3. {service}/sx/boundary.sx — per-service page helpers
Shared by both bootstrap_py.py and bootstrap_js.py, and used at runtime
by the validation module.
"""
from __future__ import annotations
import glob
import logging
import os
from typing import Any
logger = logging.getLogger("sx.boundary_parser")
# Allow standalone use (from bootstrappers) or in-project imports
try:
from shared.sx.types import Symbol, Keyword, NIL as SX_NIL
except ImportError:
import sys
_HERE = os.path.dirname(os.path.abspath(__file__))
_PROJECT = os.path.abspath(os.path.join(_HERE, "..", "..", ".."))
sys.path.insert(0, _PROJECT)
from shared.sx.types import Symbol, Keyword, NIL as SX_NIL
def _get_parse_all():
"""Lazy import to avoid circular dependency when parser.py loads sx_ref.py."""
from shared.sx.parser import parse_all
return parse_all
def _ref_dir() -> str:
return os.path.dirname(os.path.abspath(__file__))
def _project_root() -> str:
"""Return the project root containing service directories.
Dev: shared/sx/ref -> shared/sx -> shared -> project root
Docker: /app/shared/sx/ref -> /app (shared is inside /app)
"""
ref = _ref_dir()
# Go up 3 levels: shared/sx/ref -> project root
root = os.path.abspath(os.path.join(ref, "..", "..", ".."))
# Verify by checking for a known service directory or shared/
if os.path.isdir(os.path.join(root, "shared")):
return root
# Docker: /app/shared/sx/ref -> /app
# shared is INSIDE /app, not a sibling — go up to parent of shared
root = os.path.abspath(os.path.join(ref, "..", ".."))
if os.path.isdir(os.path.join(root, "sx")): # /app/sx exists in Docker
return root
return root
def _read_file(filename: str) -> str:
filepath = os.path.join(_ref_dir(), filename)
with open(filepath, encoding="utf-8") as f:
return f.read()
def _read_file_path(filepath: str) -> str:
with open(filepath, encoding="utf-8") as f:
return f.read()
def _extract_keyword_arg(expr: list, key: str) -> Any:
"""Extract :key value from a flat keyword-arg list."""
for i, item in enumerate(expr):
if isinstance(item, Keyword) and item.name == key and i + 1 < len(expr):
return expr[i + 1]
return None
def _extract_declarations(
source: str,
) -> tuple[set[str], dict[str, set[str]]]:
"""Extract I/O primitive names and page helper names from boundary source.
Returns (io_names, {service: helper_names}).
"""
exprs = _get_parse_all()(source)
io_names: set[str] = set()
helpers: dict[str, set[str]] = {}
for expr in exprs:
if not isinstance(expr, list) or not expr:
continue
head = expr[0]
if not isinstance(head, Symbol):
continue
if head.name == "define-io-primitive":
name = expr[1]
if isinstance(name, str):
io_names.add(name)
elif head.name == "define-page-helper":
name = expr[1]
service = _extract_keyword_arg(expr, "service")
if isinstance(name, str) and isinstance(service, str):
helpers.setdefault(service, set()).add(name)
return io_names, helpers
def _find_service_boundary_files() -> list[str]:
"""Find service boundary.sx files.
Dev: {project}/{service}/sx/boundary.sx (e.g. blog/sx/boundary.sx)
Docker: /app/sx/boundary.sx (service's sx/ dir copied directly into /app/)
"""
root = _project_root()
files: list[str] = []
# Dev layout: {root}/{service}/sx/boundary.sx
for f in glob.glob(os.path.join(root, "*/sx/boundary.sx")):
if "/shared/" not in f:
files.append(f)
# Docker layout: service's sx/ dir is at {root}/sx/boundary.sx
docker_path = os.path.join(root, "sx", "boundary.sx")
if os.path.exists(docker_path) and docker_path not in files:
files.append(docker_path)
return files
# ---------------------------------------------------------------------------
# Public API
# ---------------------------------------------------------------------------
def parse_primitives_sx() -> frozenset[str]:
"""Parse primitives.sx and return frozenset of declared pure primitive names."""
by_module = parse_primitives_by_module()
all_names: set[str] = set()
for names in by_module.values():
all_names.update(names)
return frozenset(all_names)
def parse_primitives_by_module() -> dict[str, frozenset[str]]:
"""Parse primitives.sx and return primitives grouped by module."""
source = _read_file("primitives.sx")
exprs = _get_parse_all()(source)
modules: dict[str, set[str]] = {}
current_module = "_unscoped"
for expr in exprs:
if not isinstance(expr, list) or len(expr) < 2:
continue
if not isinstance(expr[0], Symbol):
continue
if expr[0].name == "define-module":
mod_name = expr[1]
if isinstance(mod_name, Keyword):
current_module = mod_name.name
elif isinstance(mod_name, str):
current_module = mod_name
elif expr[0].name == "define-primitive":
name = expr[1]
if isinstance(name, str):
modules.setdefault(current_module, set()).add(name)
return {mod: frozenset(names) for mod, names in modules.items()}
def _parse_param_type(param) -> tuple[str, str | None, bool]:
"""Parse a single param entry from a :params list.
Returns (name, type_or_none, is_rest).
A bare symbol like ``x`` → ("x", None, False).
A typed form ``(x :as number)`` → ("x", "number", False).
The ``&rest`` marker is tracked externally.
"""
if isinstance(param, Symbol):
return (param.name, None, False)
if isinstance(param, list) and len(param) == 3:
# (name :as type)
name_sym, kw, type_val = param
if (isinstance(name_sym, Symbol)
and isinstance(kw, Keyword) and kw.name == "as"):
type_str = type_val.name if isinstance(type_val, Symbol) else str(type_val)
return (name_sym.name, type_str, False)
return (str(param), None, False)
def parse_primitive_param_types() -> dict[str, dict]:
"""Parse primitives.sx and extract param type info for each primitive.
Returns a dict mapping primitive name to param type descriptor::
{
"+": {"positional": [], "rest_type": "number"},
"/": {"positional": [("a", "number"), ("b", "number")], "rest_type": None},
"get": {"positional": [("coll", None), ("key", None)], "rest_type": None},
}
Each positional entry is (name, type_or_none). rest_type is the
type of the &rest parameter (or None if no &rest, or None if untyped &rest).
"""
source = _read_file("primitives.sx")
exprs = _get_parse_all()(source)
result: dict[str, dict] = {}
for expr in exprs:
if not isinstance(expr, list) or len(expr) < 2:
continue
if not isinstance(expr[0], Symbol) or expr[0].name != "define-primitive":
continue
name = expr[1]
if not isinstance(name, str):
continue
params_list = _extract_keyword_arg(expr, "params")
if not isinstance(params_list, list):
continue
positional: list[tuple[str, str | None]] = []
rest_type: str | None = None
i = 0
while i < len(params_list):
item = params_list[i]
if isinstance(item, Symbol) and item.name == "&rest":
# Next item is the rest param
if i + 1 < len(params_list):
rname, rtype, _ = _parse_param_type(params_list[i + 1])
rest_type = rtype
i += 2
else:
pname, ptype, _ = _parse_param_type(item)
if pname != "&rest":
positional.append((pname, ptype))
i += 1
# Only store if at least one param has a type
has_types = rest_type is not None or any(t is not None for _, t in positional)
if has_types:
result[name] = {"positional": positional, "rest_type": rest_type}
return result
def parse_boundary_sx() -> tuple[frozenset[str], dict[str, frozenset[str]]]:
"""Parse all boundary sources and return (io_names, {service: helper_names}).
Loads three tiers:
1. boundary.sx — core language I/O
2. boundary-app.sx — deployment-specific I/O
3. {service}/sx/boundary.sx — per-service page helpers
"""
all_io: set[str] = set()
all_helpers: dict[str, set[str]] = {}
def _merge(source: str, label: str) -> None:
io_names, helpers = _extract_declarations(source)
all_io.update(io_names)
for svc, names in helpers.items():
all_helpers.setdefault(svc, set()).update(names)
logger.debug("Boundary %s: %d io, %d helpers", label, len(io_names), sum(len(v) for v in helpers.values()))
# 1. Core language contract
_merge(_read_file("boundary.sx"), "core")
# 2. Deployment-specific I/O
app_path = os.path.join(_ref_dir(), "boundary-app.sx")
if os.path.exists(app_path):
_merge(_read_file("boundary-app.sx"), "app")
# 3. Per-service boundary files
for filepath in _find_service_boundary_files():
try:
_merge(_read_file_path(filepath), filepath)
except Exception as e:
logger.warning("Failed to parse %s: %s", filepath, e)
frozen_helpers = {svc: frozenset(names) for svc, names in all_helpers.items()}
return frozenset(all_io), frozen_helpers
def parse_boundary_effects() -> dict[str, list[str]]:
"""Parse boundary.sx and return effect annotations for all declared primitives.
Returns a dict mapping primitive name to its declared effects list.
E.g. {"current-user": ["io"], "reset!": ["mutation"], "signal": []}.
Only includes primitives that have an explicit :effects declaration.
Pure primitives from primitives.sx are not included (they have no effects).
"""
source = _read_file("boundary.sx")
exprs = _get_parse_all()(source)
result: dict[str, list[str]] = {}
_DECL_FORMS = {
"define-io-primitive", "declare-signal-primitive",
"declare-spread-primitive",
}
for expr in exprs:
if not isinstance(expr, list) or len(expr) < 2:
continue
head = expr[0]
if not isinstance(head, Symbol) or head.name not in _DECL_FORMS:
continue
name = expr[1]
if not isinstance(name, str):
continue
effects_val = _extract_keyword_arg(expr, "effects")
if effects_val is None:
# IO primitives default to [io] if no explicit :effects
if head.name == "define-io-primitive":
result[name] = ["io"]
continue
if isinstance(effects_val, list):
effect_names = []
for item in effects_val:
if isinstance(item, Symbol):
effect_names.append(item.name)
elif isinstance(item, str):
effect_names.append(item)
result[name] = effect_names
else:
# Might be a single symbol
if isinstance(effects_val, Symbol):
result[name] = [effects_val.name]
return result
def parse_boundary_types() -> frozenset[str]:
"""Parse boundary.sx and return the declared boundary type names."""
source = _read_file("boundary.sx")
exprs = _get_parse_all()(source)
for expr in exprs:
if (isinstance(expr, list) and len(expr) >= 2
and isinstance(expr[0], Symbol)
and expr[0].name == "define-boundary-types"):
type_list = expr[1]
if isinstance(type_list, list):
return frozenset(
item for item in type_list
if isinstance(item, str)
)
return frozenset()

1640
hosts/python/platform.py Normal file

File diff suppressed because it is too large Load Diff

View File

@@ -0,0 +1,249 @@
#!/usr/bin/env python3
"""Run test-cek-reactive.sx — tests for deref-as-shift reactive rendering."""
from __future__ import annotations
import os, sys
_HERE = os.path.dirname(os.path.abspath(__file__))
_PROJECT = os.path.abspath(os.path.join(_HERE, "..", "..", ".."))
sys.path.insert(0, _PROJECT)
sys.setrecursionlimit(20000)
from shared.sx.parser import parse_all
from shared.sx.ref import sx_ref
from shared.sx.ref.sx_ref import (
make_env, env_get, env_has, env_set,
env_extend, env_merge,
)
# Use tree-walk evaluator for interpreting .sx test files.
# The CEK override (eval_expr = cek_run) would cause the interpreted cek.sx
# to delegate to the transpiled CEK, not the interpreted one being tested.
# Override both the local names AND the module-level names so that transpiled
# functions (ho_map, call_lambda, etc.) also use tree-walk internally.
eval_expr = sx_ref._tree_walk_eval_expr
trampoline = sx_ref._tree_walk_trampoline
sx_ref.eval_expr = eval_expr
sx_ref.trampoline = trampoline
from shared.sx.types import (
NIL, Symbol, Keyword, Lambda, Component, Island, Continuation, Macro,
_ShiftSignal,
)
# Build env with primitives
env = make_env()
# Platform test functions
_suite_stack: list[str] = []
_pass_count = 0
_fail_count = 0
def _try_call(thunk):
try:
trampoline(eval_expr([thunk], env))
return {"ok": True}
except Exception as e:
return {"ok": False, "error": str(e)}
def _report_pass(name):
global _pass_count
_pass_count += 1
ctx = " > ".join(_suite_stack)
print(f" PASS: {ctx} > {name}")
return NIL
def _report_fail(name, error):
global _fail_count
_fail_count += 1
ctx = " > ".join(_suite_stack)
print(f" FAIL: {ctx} > {name}: {error}")
return NIL
def _push_suite(name):
_suite_stack.append(name)
print(f"{' ' * (len(_suite_stack)-1)}Suite: {name}")
return NIL
def _pop_suite():
if _suite_stack:
_suite_stack.pop()
return NIL
def _test_env():
return env
def _sx_parse(source):
return parse_all(source)
def _sx_parse_one(source):
"""Parse a single expression."""
exprs = parse_all(source)
return exprs[0] if exprs else NIL
def _make_continuation(fn):
return Continuation(fn)
env["try-call"] = _try_call
env["report-pass"] = _report_pass
env["report-fail"] = _report_fail
env["push-suite"] = _push_suite
env["pop-suite"] = _pop_suite
env["test-env"] = _test_env
env["sx-parse"] = _sx_parse
env["sx-parse-one"] = _sx_parse_one
env["env-get"] = env_get
env["env-has?"] = env_has
env["env-set!"] = env_set
env["env-extend"] = env_extend
env["make-continuation"] = _make_continuation
env["continuation?"] = lambda x: isinstance(x, Continuation)
env["continuation-fn"] = lambda c: c.fn
def _make_cek_continuation_with_data(captured, rest_kont):
c = Continuation(lambda v=NIL: v)
c._cek_data = {"captured": captured, "rest-kont": rest_kont}
return c
env["make-cek-continuation"] = _make_cek_continuation_with_data
env["continuation-data"] = lambda c: getattr(c, '_cek_data', {})
# Type predicates and constructors
env["callable?"] = lambda x: callable(x) or isinstance(x, (Lambda, Component, Island, Continuation))
env["lambda?"] = lambda x: isinstance(x, Lambda)
env["component?"] = lambda x: isinstance(x, Component)
env["island?"] = lambda x: isinstance(x, Island)
env["macro?"] = lambda x: isinstance(x, Macro)
env["thunk?"] = sx_ref.is_thunk
env["thunk-expr"] = sx_ref.thunk_expr
env["thunk-env"] = sx_ref.thunk_env
env["make-thunk"] = sx_ref.make_thunk
env["make-lambda"] = sx_ref.make_lambda
env["make-component"] = sx_ref.make_component
env["make-island"] = sx_ref.make_island
env["make-macro"] = sx_ref.make_macro
env["make-symbol"] = lambda n: Symbol(n)
env["lambda-params"] = lambda f: f.params
env["lambda-body"] = lambda f: f.body
env["lambda-closure"] = lambda f: f.closure
env["lambda-name"] = lambda f: f.name
env["set-lambda-name!"] = lambda f, n: setattr(f, 'name', n) or NIL
env["component-params"] = lambda c: c.params
env["component-body"] = lambda c: c.body
env["component-closure"] = lambda c: c.closure
env["component-has-children?"] = lambda c: c.has_children
env["component-affinity"] = lambda c: getattr(c, 'affinity', 'auto')
env["component-set-param-types!"] = lambda c, t: setattr(c, 'param_types', t) or NIL
env["macro-params"] = lambda m: m.params
env["macro-rest-param"] = lambda m: m.rest_param
env["macro-body"] = lambda m: m.body
env["macro-closure"] = lambda m: m.closure
env["env-merge"] = env_merge
env["symbol-name"] = lambda s: s.name if isinstance(s, Symbol) else str(s)
env["keyword-name"] = lambda k: k.name if isinstance(k, Keyword) else str(k)
env["type-of"] = sx_ref.type_of
env["primitive?"] = sx_ref.is_primitive
env["get-primitive"] = sx_ref.get_primitive
env["strip-prefix"] = lambda s, p: s[len(p):] if s.startswith(p) else s
env["inspect"] = repr
env["debug-log"] = lambda *args: None
env["error"] = sx_ref.error
env["apply"] = lambda f, args: f(*args)
# Functions from eval.sx that cek.sx references
env["trampoline"] = trampoline
env["eval-expr"] = eval_expr
env["eval-list"] = sx_ref.eval_list
env["eval-call"] = sx_ref.eval_call
env["call-lambda"] = sx_ref.call_lambda
env["call-component"] = sx_ref.call_component
env["parse-keyword-args"] = sx_ref.parse_keyword_args
env["sf-lambda"] = sx_ref.sf_lambda
env["sf-defcomp"] = sx_ref.sf_defcomp
env["sf-defisland"] = sx_ref.sf_defisland
env["sf-defmacro"] = sx_ref.sf_defmacro
env["sf-defstyle"] = sx_ref.sf_defstyle
env["sf-deftype"] = sx_ref.sf_deftype
env["sf-defeffect"] = sx_ref.sf_defeffect
env["sf-letrec"] = sx_ref.sf_letrec
env["sf-named-let"] = sx_ref.sf_named_let
env["sf-dynamic-wind"] = sx_ref.sf_dynamic_wind
env["sf-scope"] = sx_ref.sf_scope
env["sf-provide"] = sx_ref.sf_provide
env["qq-expand"] = sx_ref.qq_expand
env["expand-macro"] = sx_ref.expand_macro
env["cond-scheme?"] = sx_ref.cond_scheme_p
# Higher-order form handlers
env["ho-map"] = sx_ref.ho_map
env["ho-map-indexed"] = sx_ref.ho_map_indexed
env["ho-filter"] = sx_ref.ho_filter
env["ho-reduce"] = sx_ref.ho_reduce
env["ho-some"] = sx_ref.ho_some
env["ho-every"] = sx_ref.ho_every
env["ho-for-each"] = sx_ref.ho_for_each
env["call-fn"] = sx_ref.call_fn
# Render-related (stub for testing — no active rendering)
env["render-active?"] = lambda: False
env["is-render-expr?"] = lambda expr: False
env["render-expr"] = lambda expr, env: NIL
# Scope primitives (needed for reactive-shift-deref island cleanup)
env["scope-push!"] = sx_ref.PRIMITIVES.get("scope-push!", lambda *a: NIL)
env["scope-pop!"] = sx_ref.PRIMITIVES.get("scope-pop!", lambda *a: NIL)
env["context"] = sx_ref.PRIMITIVES.get("context", lambda *a: NIL)
env["emit!"] = sx_ref.PRIMITIVES.get("emit!", lambda *a: NIL)
env["emitted"] = sx_ref.PRIMITIVES.get("emitted", lambda *a: [])
# Dynamic wind
env["push-wind!"] = lambda before, after: NIL
env["pop-wind!"] = lambda: NIL
env["call-thunk"] = lambda f, e: f() if callable(f) else trampoline(eval_expr([f], e))
# Mutation helpers
env["dict-get"] = lambda d, k: d.get(k, NIL) if isinstance(d, dict) else NIL
env["identical?"] = lambda a, b: a is b
# defhandler, defpage, defquery, defaction stubs
for name in ["sf-defhandler", "sf-defpage", "sf-defquery", "sf-defaction"]:
pyname = name.replace("-", "_")
fn = getattr(sx_ref, pyname, None)
if fn:
env[name] = fn
else:
env[name] = lambda args, e, _n=name: NIL
# Load test framework
with open(os.path.join(_HERE, "test-framework.sx")) as f:
for expr in parse_all(f.read()):
trampoline(eval_expr(expr, env))
# Load signals module
print("Loading signals.sx ...")
with open(os.path.join(_HERE, "signals.sx")) as f:
for expr in parse_all(f.read()):
trampoline(eval_expr(expr, env))
# Load frames module
print("Loading frames.sx ...")
with open(os.path.join(_HERE, "frames.sx")) as f:
for expr in parse_all(f.read()):
trampoline(eval_expr(expr, env))
# Load CEK module
print("Loading cek.sx ...")
with open(os.path.join(_HERE, "cek.sx")) as f:
for expr in parse_all(f.read()):
trampoline(eval_expr(expr, env))
# Run tests
print("=" * 60)
print("Running test-cek-reactive.sx")
print("=" * 60)
with open(os.path.join(_HERE, "test-cek-reactive.sx")) as f:
for expr in parse_all(f.read()):
trampoline(eval_expr(expr, env))
print("=" * 60)
print(f"Results: {_pass_count} passed, {_fail_count} failed")
print("=" * 60)
sys.exit(1 if _fail_count > 0 else 0)

View File

@@ -0,0 +1,265 @@
#!/usr/bin/env python3
"""Run test-cek.sx using the bootstrapped evaluator with CEK module loaded."""
from __future__ import annotations
import os, sys
_HERE = os.path.dirname(os.path.abspath(__file__))
_PROJECT = os.path.abspath(os.path.join(_HERE, "..", "..", ".."))
sys.path.insert(0, _PROJECT)
from shared.sx.ref.sx_ref import sx_parse as parse_all
from shared.sx.ref import sx_ref
from shared.sx.ref.sx_ref import (
make_env, env_get, env_has, env_set,
env_extend, env_merge,
)
# Use tree-walk evaluator for interpreting .sx test files.
# The CEK override (eval_expr = cek_run) would cause the interpreted cek.sx
# to delegate to the transpiled CEK, not the interpreted one being tested.
# Override both the local names AND the module-level names so that transpiled
# functions (ho_map, call_lambda, etc.) also use tree-walk internally.
eval_expr = sx_ref._tree_walk_eval_expr
trampoline = sx_ref._tree_walk_trampoline
sx_ref.eval_expr = eval_expr
sx_ref.trampoline = trampoline
from shared.sx.types import (
NIL, Symbol, Keyword, Lambda, Component, Island, Continuation, Macro,
_ShiftSignal,
)
# Build env with primitives
env = make_env()
# Platform test functions
_suite_stack: list[str] = []
_pass_count = 0
_fail_count = 0
def _try_call(thunk):
try:
trampoline(eval_expr([thunk], env))
return {"ok": True}
except Exception as e:
return {"ok": False, "error": str(e)}
def _report_pass(name):
global _pass_count
_pass_count += 1
ctx = " > ".join(_suite_stack)
print(f" PASS: {ctx} > {name}")
return NIL
def _report_fail(name, error):
global _fail_count
_fail_count += 1
ctx = " > ".join(_suite_stack)
print(f" FAIL: {ctx} > {name}: {error}")
return NIL
def _push_suite(name):
_suite_stack.append(name)
print(f"{' ' * (len(_suite_stack)-1)}Suite: {name}")
return NIL
def _pop_suite():
if _suite_stack:
_suite_stack.pop()
return NIL
def _test_env():
return env
def _sx_parse(source):
return parse_all(source)
def _sx_parse_one(source):
"""Parse a single expression."""
exprs = parse_all(source)
return exprs[0] if exprs else NIL
def _make_continuation(fn):
return Continuation(fn)
env["try-call"] = _try_call
env["report-pass"] = _report_pass
env["report-fail"] = _report_fail
env["push-suite"] = _push_suite
env["pop-suite"] = _pop_suite
env["test-env"] = _test_env
env["sx-parse"] = _sx_parse
env["sx-parse-one"] = _sx_parse_one
env["env-get"] = env_get
env["env-has?"] = env_has
env["env-set!"] = env_set
env["env-extend"] = env_extend
env["make-continuation"] = _make_continuation
env["continuation?"] = lambda x: isinstance(x, Continuation)
env["continuation-fn"] = lambda c: c.fn
def _make_cek_continuation(captured, rest_kont):
"""Create a Continuation that stores captured CEK frames as data."""
data = {"captured": captured, "rest-kont": rest_kont}
# The fn is a dummy — invocation happens via CEK's continue-with-call
return Continuation(lambda v=NIL: v)
# Monkey-patch to store data
_orig_make_cek_cont = _make_cek_continuation
def _make_cek_continuation_with_data(captured, rest_kont):
c = _orig_make_cek_cont(captured, rest_kont)
c._cek_data = {"captured": captured, "rest-kont": rest_kont}
return c
env["make-cek-continuation"] = _make_cek_continuation_with_data
env["continuation-data"] = lambda c: getattr(c, '_cek_data', {})
# Register platform functions from sx_ref that cek.sx and eval.sx need
# These are normally available as transpiled Python but need to be in the
# SX env when interpreting .sx files directly.
# Type predicates and constructors
env["callable?"] = lambda x: callable(x) or isinstance(x, (Lambda, Component, Island, Continuation))
env["lambda?"] = lambda x: isinstance(x, Lambda)
env["component?"] = lambda x: isinstance(x, Component)
env["island?"] = lambda x: isinstance(x, Island)
env["macro?"] = lambda x: isinstance(x, Macro)
env["thunk?"] = sx_ref.is_thunk
env["thunk-expr"] = sx_ref.thunk_expr
env["thunk-env"] = sx_ref.thunk_env
env["make-thunk"] = sx_ref.make_thunk
env["make-lambda"] = sx_ref.make_lambda
env["make-component"] = sx_ref.make_component
env["make-island"] = sx_ref.make_island
env["make-macro"] = sx_ref.make_macro
env["make-symbol"] = lambda n: Symbol(n)
env["lambda-params"] = lambda f: f.params
env["lambda-body"] = lambda f: f.body
env["lambda-closure"] = lambda f: f.closure
env["lambda-name"] = lambda f: f.name
env["set-lambda-name!"] = lambda f, n: setattr(f, 'name', n) or NIL
env["component-params"] = lambda c: c.params
env["component-body"] = lambda c: c.body
env["component-closure"] = lambda c: c.closure
env["component-has-children?"] = lambda c: c.has_children
env["component-affinity"] = lambda c: getattr(c, 'affinity', 'auto')
env["component-set-param-types!"] = lambda c, t: setattr(c, 'param_types', t) or NIL
env["macro-params"] = lambda m: m.params
env["macro-rest-param"] = lambda m: m.rest_param
env["macro-body"] = lambda m: m.body
env["macro-closure"] = lambda m: m.closure
env["env-merge"] = env_merge
env["symbol-name"] = lambda s: s.name if isinstance(s, Symbol) else str(s)
env["keyword-name"] = lambda k: k.name if isinstance(k, Keyword) else str(k)
env["type-of"] = sx_ref.type_of
env["primitive?"] = lambda n: n in sx_ref.PRIMITIVES
env["get-primitive"] = lambda n: sx_ref.PRIMITIVES.get(n)
env["strip-prefix"] = lambda s, p: s[len(p):] if s.startswith(p) else s
env["inspect"] = repr
env["debug-log"] = lambda *args: None
env["error"] = sx_ref.error
env["apply"] = lambda f, args: f(*args)
# Functions from eval.sx that cek.sx references
env["trampoline"] = trampoline
env["eval-expr"] = eval_expr
env["eval-list"] = sx_ref.eval_list
env["eval-call"] = sx_ref.eval_call
env["call-lambda"] = sx_ref.call_lambda
env["call-component"] = sx_ref.call_component
env["parse-keyword-args"] = sx_ref.parse_keyword_args
env["sf-lambda"] = sx_ref.sf_lambda
env["sf-defcomp"] = sx_ref.sf_defcomp
env["sf-defisland"] = sx_ref.sf_defisland
env["sf-defmacro"] = sx_ref.sf_defmacro
env["sf-defstyle"] = sx_ref.sf_defstyle
env["sf-deftype"] = sx_ref.sf_deftype
env["sf-defeffect"] = sx_ref.sf_defeffect
env["sf-letrec"] = sx_ref.sf_letrec
env["sf-named-let"] = sx_ref.sf_named_let
env["sf-dynamic-wind"] = sx_ref.sf_dynamic_wind
env["sf-scope"] = sx_ref.sf_scope
env["sf-provide"] = sx_ref.sf_provide
env["qq-expand"] = sx_ref.qq_expand
env["expand-macro"] = sx_ref.expand_macro
env["cond-scheme?"] = sx_ref.cond_scheme_p
# Higher-order form handlers
env["ho-map"] = sx_ref.ho_map
env["ho-map-indexed"] = sx_ref.ho_map_indexed
env["ho-filter"] = sx_ref.ho_filter
env["ho-reduce"] = sx_ref.ho_reduce
env["ho-some"] = sx_ref.ho_some
env["ho-every"] = sx_ref.ho_every
env["ho-for-each"] = sx_ref.ho_for_each
env["call-fn"] = sx_ref.call_fn
# Render-related (stub for testing — no active rendering)
env["render-active?"] = lambda: False
env["is-render-expr?"] = lambda expr: False
env["render-expr"] = lambda expr, env: NIL
# Scope primitives
env["scope-push!"] = sx_ref.PRIMITIVES.get("scope-push!", lambda *a: NIL)
env["scope-pop!"] = sx_ref.PRIMITIVES.get("scope-pop!", lambda *a: NIL)
env["context"] = sx_ref.PRIMITIVES.get("context", lambda *a: NIL)
env["emit!"] = sx_ref.PRIMITIVES.get("emit!", lambda *a: NIL)
env["emitted"] = sx_ref.PRIMITIVES.get("emitted", lambda *a: [])
# Dynamic wind
env["push-wind!"] = lambda before, after: NIL
env["pop-wind!"] = lambda: NIL
env["call-thunk"] = lambda f, e: f() if callable(f) else trampoline(eval_expr([f], e))
# Mutation helpers used by parse-keyword-args etc
env["dict-get"] = lambda d, k: d.get(k, NIL) if isinstance(d, dict) else NIL
# defhandler, defpage, defquery, defaction — these are registrations
# Use the bootstrapped versions if they exist, otherwise stub
for name in ["sf-defhandler", "sf-defpage", "sf-defquery", "sf-defaction"]:
pyname = name.replace("-", "_")
fn = getattr(sx_ref, pyname, None)
if fn:
env[name] = fn
else:
env[name] = lambda args, e, _n=name: NIL
# Load test framework
with open(os.path.join(_HERE, "test-framework.sx")) as f:
for expr in parse_all(f.read()):
trampoline(eval_expr(expr, env))
# Load frames module
print("Loading frames.sx ...")
with open(os.path.join(_HERE, "frames.sx")) as f:
for expr in parse_all(f.read()):
trampoline(eval_expr(expr, env))
# Load CEK module
print("Loading cek.sx ...")
with open(os.path.join(_HERE, "cek.sx")) as f:
for expr in parse_all(f.read()):
trampoline(eval_expr(expr, env))
# Define cek-eval helper in SX
for expr in parse_all("""
(define cek-eval
(fn (source)
(let ((exprs (sx-parse source)))
(let ((result nil))
(for-each (fn (e) (set! result (eval-expr-cek e (test-env)))) exprs)
result))))
"""):
trampoline(eval_expr(expr, env))
# Run tests
print("=" * 60)
print("Running test-cek.sx")
print("=" * 60)
with open(os.path.join(_HERE, "test-cek.sx")) as f:
for expr in parse_all(f.read()):
trampoline(eval_expr(expr, env))
print("=" * 60)
print(f"Results: {_pass_count} passed, {_fail_count} failed")
print("=" * 60)
sys.exit(1 if _fail_count > 0 else 0)

View File

@@ -0,0 +1,106 @@
#!/usr/bin/env python3
"""Run test-continuations.sx using the bootstrapped evaluator with continuations enabled."""
from __future__ import annotations
import os, sys, subprocess, tempfile
_HERE = os.path.dirname(os.path.abspath(__file__))
_PROJECT = os.path.abspath(os.path.join(_HERE, "..", "..", ".."))
sys.path.insert(0, _PROJECT)
# Bootstrap a fresh sx_ref with continuations enabled
print("Bootstrapping with --extensions continuations ...")
result = subprocess.run(
[sys.executable, os.path.join(_HERE, "bootstrap_py.py"),
"--extensions", "continuations"],
capture_output=True, text=True, cwd=_PROJECT,
)
if result.returncode != 0:
print("Bootstrap FAILED:")
print(result.stderr)
sys.exit(1)
# Write to temp file and import
tmp = tempfile.NamedTemporaryFile(mode="w", suffix=".py", delete=False, dir=_HERE)
tmp.write(result.stdout)
tmp.close()
try:
import importlib.util
spec = importlib.util.spec_from_file_location("sx_ref_cont", tmp.name)
mod = importlib.util.module_from_spec(spec)
spec.loader.exec_module(mod)
finally:
os.unlink(tmp.name)
from shared.sx.types import NIL
parse_all = mod.sx_parse
# Use tree-walk evaluator for interpreting .sx test files.
# CEK is now the default, but test runners need tree-walk so that
# transpiled HO forms (ho_map, etc.) don't re-enter CEK mid-evaluation.
eval_expr = mod._tree_walk_eval_expr
trampoline = mod._tree_walk_trampoline
mod.eval_expr = eval_expr
mod.trampoline = trampoline
env = mod.make_env()
# Platform test functions
_suite_stack: list[str] = []
_pass_count = 0
_fail_count = 0
def _try_call(thunk):
try:
trampoline(eval_expr([thunk], env))
return {"ok": True}
except Exception as e:
return {"ok": False, "error": str(e)}
def _report_pass(name):
global _pass_count
_pass_count += 1
ctx = " > ".join(_suite_stack)
print(f" PASS: {ctx} > {name}")
return NIL
def _report_fail(name, error):
global _fail_count
_fail_count += 1
ctx = " > ".join(_suite_stack)
print(f" FAIL: {ctx} > {name}: {error}")
return NIL
def _push_suite(name):
_suite_stack.append(name)
print(f"{' ' * (len(_suite_stack)-1)}Suite: {name}")
return NIL
def _pop_suite():
if _suite_stack:
_suite_stack.pop()
return NIL
env["try-call"] = _try_call
env["report-pass"] = _report_pass
env["report-fail"] = _report_fail
env["push-suite"] = _push_suite
env["pop-suite"] = _pop_suite
# Load test framework
with open(os.path.join(_HERE, "test-framework.sx")) as f:
for expr in parse_all(f.read()):
trampoline(eval_expr(expr, env))
# Run tests
print("=" * 60)
print("Running test-continuations.sx")
print("=" * 60)
with open(os.path.join(_HERE, "test-continuations.sx")) as f:
for expr in parse_all(f.read()):
trampoline(eval_expr(expr, env))
print("=" * 60)
print(f"Results: {_pass_count} passed, {_fail_count} failed")
print("=" * 60)
sys.exit(1 if _fail_count > 0 else 0)

View File

@@ -0,0 +1,162 @@
#!/usr/bin/env python3
"""Run test-signals.sx using the bootstrapped evaluator with signal primitives.
Uses bootstrapped signal functions from sx_ref.py directly, patching apply
to handle SX lambdas from the interpreter (test expressions create lambdas
that need evaluator dispatch).
"""
from __future__ import annotations
import os, sys
_HERE = os.path.dirname(os.path.abspath(__file__))
_PROJECT = os.path.abspath(os.path.join(_HERE, "..", "..", ".."))
sys.path.insert(0, _PROJECT)
from shared.sx.ref.sx_ref import sx_parse as parse_all
from shared.sx.ref import sx_ref
from shared.sx.ref.sx_ref import make_env, scope_push, scope_pop, sx_context
from shared.sx.types import NIL, Island, Lambda
# Use tree-walk evaluator for interpreting .sx test files.
eval_expr = sx_ref._tree_walk_eval_expr
trampoline = sx_ref._tree_walk_trampoline
sx_ref.eval_expr = eval_expr
sx_ref.trampoline = trampoline
# Build env with primitives
env = make_env()
# --- Patch apply BEFORE anything else ---
# Test expressions create SX Lambdas that bootstrapped code calls via apply.
# Patch the module-level function so all bootstrapped functions see it.
# apply is used by swap! and other forms to call functions with arg lists
def _apply(f, args):
if isinstance(f, Lambda):
return trampoline(eval_expr([f] + list(args), env))
return f(*args)
sx_ref.__dict__["apply"] = _apply
# cons needs to handle tuples from Python *args (swap! passes &rest as tuple)
_orig_cons = sx_ref.PRIMITIVES.get("cons")
def _cons(x, c):
if isinstance(c, tuple):
c = list(c)
return [x] + (c or [])
sx_ref.__dict__["cons"] = _cons
sx_ref.PRIMITIVES["cons"] = _cons
# Platform test functions
_suite_stack: list[str] = []
_pass_count = 0
_fail_count = 0
def _try_call(thunk):
try:
trampoline(eval_expr([thunk], env))
return {"ok": True}
except Exception as e:
return {"ok": False, "error": str(e)}
def _report_pass(name):
global _pass_count
_pass_count += 1
ctx = " > ".join(_suite_stack)
print(f" PASS: {ctx} > {name}")
return NIL
def _report_fail(name, error):
global _fail_count
_fail_count += 1
ctx = " > ".join(_suite_stack)
print(f" FAIL: {ctx} > {name}: {error}")
return NIL
def _push_suite(name):
_suite_stack.append(name)
print(f"{' ' * (len(_suite_stack)-1)}Suite: {name}")
return NIL
def _pop_suite():
if _suite_stack:
_suite_stack.pop()
return NIL
env["try-call"] = _try_call
env["report-pass"] = _report_pass
env["report-fail"] = _report_fail
env["push-suite"] = _push_suite
env["pop-suite"] = _pop_suite
# Signal functions are now pure SX (transpiled into sx_ref.py from signals.sx)
# Wire both low-level dict-based signal functions and high-level API
env["identical?"] = sx_ref.is_identical
env["island?"] = lambda x: isinstance(x, Island)
# Scope primitives (used by signals.sx for reactive tracking)
env["scope-push!"] = scope_push
env["scope-pop!"] = scope_pop
env["context"] = sx_context
# Low-level signal functions (now pure SX, transpiled from signals.sx)
env["make-signal"] = sx_ref.make_signal
env["signal?"] = sx_ref.is_signal
env["signal-value"] = sx_ref.signal_value
env["signal-set-value!"] = sx_ref.signal_set_value
env["signal-subscribers"] = sx_ref.signal_subscribers
env["signal-add-sub!"] = sx_ref.signal_add_sub
env["signal-remove-sub!"] = sx_ref.signal_remove_sub
env["signal-deps"] = sx_ref.signal_deps
env["signal-set-deps!"] = sx_ref.signal_set_deps
# Bootstrapped signal functions from sx_ref.py
env["signal"] = sx_ref.signal
env["deref"] = sx_ref.deref
env["reset!"] = sx_ref.reset_b
env["swap!"] = sx_ref.swap_b
env["computed"] = sx_ref.computed
env["effect"] = sx_ref.effect
# batch has a bootstrapper issue with _batch_depth global variable access.
# Wrap it to work correctly in the test context.
def _batch(thunk):
sx_ref._batch_depth = getattr(sx_ref, '_batch_depth', 0) + 1
sx_ref.cek_call(thunk, None)
sx_ref._batch_depth -= 1
if sx_ref._batch_depth == 0:
queue = list(sx_ref._batch_queue)
sx_ref._batch_queue = []
seen = []
pending = []
for s in queue:
for sub in sx_ref.signal_subscribers(s):
if sub not in seen:
seen.append(sub)
pending.append(sub)
for sub in pending:
sub()
return NIL
env["batch"] = _batch
env["notify-subscribers"] = sx_ref.notify_subscribers
env["flush-subscribers"] = sx_ref.flush_subscribers
env["dispose-computed"] = sx_ref.dispose_computed
env["with-island-scope"] = sx_ref.with_island_scope
env["register-in-scope"] = sx_ref.register_in_scope
env["callable?"] = sx_ref.is_callable
# Load test framework
with open(os.path.join(_HERE, "test-framework.sx")) as f:
for expr in parse_all(f.read()):
trampoline(eval_expr(expr, env))
# Run tests
print("=" * 60)
print("Running test-signals.sx")
print("=" * 60)
with open(os.path.join(_HERE, "test-signals.sx")) as f:
for expr in parse_all(f.read()):
trampoline(eval_expr(expr, env))
print("=" * 60)
print(f"Results: {_pass_count} passed, {_fail_count} failed")
print("=" * 60)
sys.exit(1 if _fail_count > 0 else 0)

View File

@@ -0,0 +1,191 @@
#!/usr/bin/env python3
"""Run test-types.sx using the bootstrapped evaluator with types module loaded."""
from __future__ import annotations
import os, sys
_HERE = os.path.dirname(os.path.abspath(__file__))
_PROJECT = os.path.abspath(os.path.join(_HERE, "..", "..", ".."))
sys.path.insert(0, _PROJECT)
from shared.sx.ref.sx_ref import sx_parse as parse_all
from shared.sx.ref import sx_ref
from shared.sx.ref.sx_ref import make_env, env_get, env_has, env_set
from shared.sx.types import NIL, Component
# Use tree-walk evaluator for interpreting .sx test files.
# CEK is now the default, but the test runners need tree-walk so that
# transpiled HO forms (ho_map, etc.) don't re-enter CEK mid-evaluation.
eval_expr = sx_ref._tree_walk_eval_expr
trampoline = sx_ref._tree_walk_trampoline
sx_ref.eval_expr = eval_expr
sx_ref.trampoline = trampoline
# Build env with primitives
env = make_env()
# Platform test functions
_suite_stack: list[str] = []
_pass_count = 0
_fail_count = 0
def _try_call(thunk):
try:
trampoline(eval_expr([thunk], env)) # call the thunk
return {"ok": True}
except Exception as e:
return {"ok": False, "error": str(e)}
def _report_pass(name):
global _pass_count
_pass_count += 1
ctx = " > ".join(_suite_stack)
print(f" PASS: {ctx} > {name}")
return NIL
def _report_fail(name, error):
global _fail_count
_fail_count += 1
ctx = " > ".join(_suite_stack)
print(f" FAIL: {ctx} > {name}: {error}")
return NIL
def _push_suite(name):
_suite_stack.append(name)
print(f"{' ' * (len(_suite_stack)-1)}Suite: {name}")
return NIL
def _pop_suite():
if _suite_stack:
_suite_stack.pop()
return NIL
env["try-call"] = _try_call
env["report-pass"] = _report_pass
env["report-fail"] = _report_fail
env["push-suite"] = _push_suite
env["pop-suite"] = _pop_suite
# Test fixtures — provide the functions that tests expect
# test-prim-types: dict of primitive return types for type inference
def _test_prim_types():
return {
"+": "number", "-": "number", "*": "number", "/": "number",
"mod": "number", "inc": "number", "dec": "number",
"abs": "number", "min": "number", "max": "number",
"floor": "number", "ceil": "number", "round": "number",
"str": "string", "upper": "string", "lower": "string",
"trim": "string", "join": "string", "replace": "string",
"format": "string", "substr": "string",
"=": "boolean", "<": "boolean", ">": "boolean",
"<=": "boolean", ">=": "boolean", "!=": "boolean",
"not": "boolean", "nil?": "boolean", "empty?": "boolean",
"number?": "boolean", "string?": "boolean", "boolean?": "boolean",
"list?": "boolean", "dict?": "boolean", "symbol?": "boolean",
"keyword?": "boolean", "contains?": "boolean", "has-key?": "boolean",
"starts-with?": "boolean", "ends-with?": "boolean",
"len": "number", "first": "any", "rest": "list",
"last": "any", "nth": "any", "cons": "list",
"append": "list", "concat": "list", "reverse": "list",
"sort": "list", "slice": "list", "range": "list",
"flatten": "list", "keys": "list", "vals": "list",
"map-dict": "dict", "assoc": "dict", "dissoc": "dict",
"merge": "dict", "dict": "dict",
"get": "any", "type-of": "string",
}
# test-prim-param-types: dict of primitive param type specs
# Format: {name → {"positional" [["name" "type"] ...] "rest-type" type-or-nil}}
def _test_prim_param_types():
return {
"+": {"positional": [["a", "number"]], "rest-type": "number"},
"-": {"positional": [["a", "number"]], "rest-type": "number"},
"*": {"positional": [["a", "number"]], "rest-type": "number"},
"/": {"positional": [["a", "number"]], "rest-type": "number"},
"inc": {"positional": [["n", "number"]], "rest-type": NIL},
"dec": {"positional": [["n", "number"]], "rest-type": NIL},
"upper": {"positional": [["s", "string"]], "rest-type": NIL},
"lower": {"positional": [["s", "string"]], "rest-type": NIL},
"keys": {"positional": [["d", "dict"]], "rest-type": NIL},
"vals": {"positional": [["d", "dict"]], "rest-type": NIL},
}
# test-env: returns a fresh env for use in tests (same as the test env)
def _test_env():
return env
# sx-parse: parse an SX string and return list of AST nodes
def _sx_parse(source):
return parse_all(source)
# dict-get: used in some legacy tests
def _dict_get(d, k):
v = d.get(k) if isinstance(d, dict) else NIL
return v if v is not None else NIL
# component-set-param-types! and component-param-types: type annotation accessors
def _component_set_param_types(comp, types_dict):
comp.param_types = types_dict
return NIL
def _component_param_types(comp):
return getattr(comp, 'param_types', NIL)
# Platform functions used by types.sx but not SX primitives
def _component_params(c):
return c.params
def _component_body(c):
return c.body
def _component_has_children(c):
return c.has_children
def _map_dict(fn, d):
from shared.sx.types import Lambda as _Lambda
result = {}
for k, v in d.items():
if isinstance(fn, _Lambda):
# Call SX lambda through the evaluator
result[k] = trampoline(eval_expr([fn, k, v], env))
else:
result[k] = fn(k, v)
return result
env["test-prim-types"] = _test_prim_types
env["test-prim-param-types"] = _test_prim_param_types
env["test-env"] = _test_env
env["sx-parse"] = _sx_parse
env["dict-get"] = _dict_get
env["component-set-param-types!"] = _component_set_param_types
env["component-param-types"] = _component_param_types
env["component-params"] = _component_params
env["component-body"] = _component_body
env["component-has-children"] = _component_has_children
env["map-dict"] = _map_dict
env["env-get"] = env_get
env["env-has?"] = env_has
env["env-set!"] = env_set
# Load test framework (macros + assertion helpers)
with open(os.path.join(_HERE, "test-framework.sx")) as f:
for expr in parse_all(f.read()):
trampoline(eval_expr(expr, env))
# Load types module
with open(os.path.join(_HERE, "types.sx")) as f:
for expr in parse_all(f.read()):
trampoline(eval_expr(expr, env))
# Run tests
print("=" * 60)
print("Running test-types.sx")
print("=" * 60)
with open(os.path.join(_HERE, "test-types.sx")) as f:
for expr in parse_all(f.read()):
trampoline(eval_expr(expr, env))
print("=" * 60)
print(f"Results: {_pass_count} passed, {_fail_count} failed")
print("=" * 60)
sys.exit(1 if _fail_count > 0 else 0)

1212
hosts/python/transpiler.sx Normal file

File diff suppressed because it is too large Load Diff