""" Platform sections for Python SX bootstrapper. These are static Python code strings that form the runtime infrastructure for the bootstrapped sx_ref.py module. They are NOT generated from .sx files — they provide the host platform interface that the transpiled SX spec code relies on. Both G0 (bootstrap_py.py) and G1 (run_py_sx.py) import from here. """ from __future__ import annotations from shared.sx.types import NIL as SX_NIL, Symbol # noqa: F401 — re-export for consumers from shared.sx.parser import parse_all as _parse_all def extract_defines(source: str) -> list[tuple[str, list]]: """Parse .sx source, return list of (name, define-expr) for top-level defines. Recognizes both (define ...) and (define-async ...) forms. """ exprs = _parse_all(source) defines = [] for expr in exprs: if isinstance(expr, list) and expr and isinstance(expr[0], Symbol): if expr[0].name in ("define", "define-async"): name = expr[1].name if isinstance(expr[1], Symbol) else str(expr[1]) defines.append((name, expr)) return defines # --------------------------------------------------------------------------- # Preamble — file header, imports, type imports # --------------------------------------------------------------------------- PREAMBLE = '''\ """ sx_ref.py -- Generated from reference SX evaluator specification. Bootstrap-compiled from shared/sx/ref/{eval,render,adapter-html,adapter-sx}.sx Compare against hand-written evaluator.py / html.py for correctness verification. DO NOT EDIT -- regenerate with: python run_py_sx.py """ from __future__ import annotations import math from typing import Any # ========================================================================= # Types (reuse existing types) # ========================================================================= from shared.sx.types import ( NIL, Symbol, Keyword, Lambda, Component, Island, Continuation, Macro, HandlerDef, QueryDef, ActionDef, PageDef, _ShiftSignal, ) from shared.sx.parser import SxExpr from shared.sx.env import Env as _Env, MergedEnv as _MergedEnv ''' # --------------------------------------------------------------------------- # Platform interface — core Python runtime # --------------------------------------------------------------------------- PLATFORM_PY = ''' # ========================================================================= # Platform interface -- Python implementation # ========================================================================= class _Thunk: """Deferred evaluation for TCO.""" __slots__ = ("expr", "env") def __init__(self, expr, env): self.expr = expr self.env = env class _RawHTML: """Marker for pre-rendered HTML that should not be escaped.""" __slots__ = ("html",) def __init__(self, html: str): self.html = html class _Spread: """Attribute injection value — merges attrs onto parent element.""" __slots__ = ("attrs",) def __init__(self, attrs: dict): self.attrs = dict(attrs) if attrs else {} # Unified scope stacks — backing store for provide/context/emit!/collect! # Each entry: {"value": v, "emitted": [], "dedup": bool} _scope_stacks: dict[str, list[dict]] = {} def _collect_reset(): """Reset all scope stacks (call at start of each render pass).""" global _scope_stacks _scope_stacks = {} def scope_push(name, value=None): """Push a scope with name, value, and empty accumulator.""" _scope_stacks.setdefault(name, []).append({"value": value, "emitted": [], "dedup": False}) def scope_pop(name): """Pop the most recent scope for name.""" if name in _scope_stacks and _scope_stacks[name]: _scope_stacks[name].pop() # Aliases — provide-push!/provide-pop! map to scope-push!/scope-pop! provide_push = scope_push provide_pop = scope_pop def sx_context(name, *default): """Read value from nearest enclosing scope. Error if no scope and no default.""" if name in _scope_stacks and _scope_stacks[name]: return _scope_stacks[name][-1]["value"] if default: return default[0] raise RuntimeError(f"No provider for: {name}") def sx_emit(name, value): """Append value to nearest enclosing scope's accumulator. Respects dedup flag.""" if name in _scope_stacks and _scope_stacks[name]: entry = _scope_stacks[name][-1] if entry["dedup"] and value in entry["emitted"]: return NIL entry["emitted"].append(value) return NIL def sx_emitted(name): """Return list of values emitted into nearest matching scope.""" if name in _scope_stacks and _scope_stacks[name]: return list(_scope_stacks[name][-1]["emitted"]) return [] def sx_truthy(x): """SX truthiness: everything is truthy except False, None, and NIL.""" if x is False: return False if x is None or x is NIL: return False return True def sx_str(*args): """SX str: concatenate string representations, skipping nil.""" parts = [] for a in args: if a is None or a is NIL: continue parts.append(str(a)) return "".join(parts) def sx_and(*args): """SX and: return last truthy value or first falsy.""" result = True for a in args: if not sx_truthy(a): return a result = a return result def sx_or(*args): """SX or: return first truthy value or last value.""" for a in args: if sx_truthy(a): return a return args[-1] if args else False def _sx_begin(*args): """Evaluate all args (for side effects), return last.""" return args[-1] if args else NIL def _sx_case(match_val, pairs): """Case dispatch: pairs is [(test_val, body_fn), ...]. None test = else.""" for test, body_fn in pairs: if test is None: # :else clause return body_fn() if match_val == test: return body_fn() return NIL def _sx_fn(f): """Identity wrapper for multi-expression lambda bodies.""" return f def type_of(x): if x is None or x is NIL: return "nil" if isinstance(x, bool): return "boolean" if isinstance(x, (int, float)): return "number" if isinstance(x, SxExpr): return "sx-expr" if isinstance(x, str): return "string" if isinstance(x, Symbol): return "symbol" if isinstance(x, Keyword): return "keyword" if isinstance(x, _Thunk): return "thunk" if isinstance(x, Lambda): return "lambda" if isinstance(x, Component): return "component" if isinstance(x, Island): return "island" if isinstance(x, _Spread): return "spread" if isinstance(x, Macro): return "macro" if isinstance(x, _RawHTML): return "raw-html" if isinstance(x, Continuation): return "continuation" if isinstance(x, list): return "list" if isinstance(x, dict): return "dict" return "unknown" def symbol_name(s): return s.name def keyword_name(k): return k.name def make_symbol(n): return Symbol(n) def make_keyword(n): return Keyword(n) def _ensure_env(env): """Wrap plain dict in Env if needed.""" if isinstance(env, _Env): return env return _Env(env if isinstance(env, dict) else {}) def make_lambda(params, body, env): return Lambda(params=list(params), body=body, closure=_ensure_env(env)) def make_component(name, params, has_children, body, env, affinity="auto"): return Component(name=name, params=list(params), has_children=has_children, body=body, closure=dict(env), affinity=str(affinity) if affinity else "auto") def make_island(name, params, has_children, body, env): return Island(name=name, params=list(params), has_children=has_children, body=body, closure=dict(env)) def make_macro(params, rest_param, body, env, name=None): return Macro(params=list(params), rest_param=rest_param, body=body, closure=dict(env), name=name) def make_handler_def(name, params, body, env, opts=None): path = opts.get('path') if opts else None method = str(opts.get('method', 'get')) if opts else 'get' csrf = opts.get('csrf', True) if opts else True returns = str(opts.get('returns', 'element')) if opts else 'element' if isinstance(csrf, str): csrf = csrf.lower() not in ('false', 'nil', 'no') return HandlerDef(name=name, params=list(params), body=body, closure=dict(env), path=path, method=method.lower(), csrf=csrf, returns=returns) def make_query_def(name, params, doc, body, env): return QueryDef(name=name, params=list(params), doc=doc, body=body, closure=dict(env)) def make_action_def(name, params, doc, body, env): return ActionDef(name=name, params=list(params), doc=doc, body=body, closure=dict(env)) def make_page_def(name, slots, env): path = slots.get("path", "") auth_val = slots.get("auth", "public") if isinstance(auth_val, Keyword): auth = auth_val.name elif isinstance(auth_val, list): auth = [item.name if isinstance(item, Keyword) else str(item) for item in auth_val] else: auth = str(auth_val) if auth_val else "public" layout = slots.get("layout") if isinstance(layout, Keyword): layout = layout.name cache = None stream_val = slots.get("stream") stream = bool(trampoline(eval_expr(stream_val, env))) if stream_val is not None else False return PageDef( name=name, path=path, auth=auth, layout=layout, cache=cache, data_expr=slots.get("data"), content_expr=slots.get("content"), filter_expr=slots.get("filter"), aside_expr=slots.get("aside"), menu_expr=slots.get("menu"), stream=stream, fallback_expr=slots.get("fallback"), shell_expr=slots.get("shell"), closure=dict(env), ) def make_thunk(expr, env): return _Thunk(expr, env) def make_spread(attrs): return _Spread(attrs if isinstance(attrs, dict) else {}) def is_spread(x): return isinstance(x, _Spread) def spread_attrs(s): return s.attrs if isinstance(s, _Spread) else {} def sx_collect(bucket, value): """Add value to named scope accumulator (deduplicated). Lazily creates root scope.""" if bucket not in _scope_stacks or not _scope_stacks[bucket]: _scope_stacks.setdefault(bucket, []).append({"value": None, "emitted": [], "dedup": True}) entry = _scope_stacks[bucket][-1] if value not in entry["emitted"]: entry["emitted"].append(value) def sx_collected(bucket): """Return all values collected in named scope accumulator.""" return sx_emitted(bucket) def sx_clear_collected(bucket): """Clear nearest scope's accumulator for name.""" if bucket in _scope_stacks and _scope_stacks[bucket]: _scope_stacks[bucket][-1]["emitted"] = [] def lambda_params(f): return f.params def lambda_body(f): return f.body def lambda_closure(f): return f.closure def lambda_name(f): return f.name def set_lambda_name(f, n): f.name = n def component_params(c): return c.params def component_body(c): return c.body def component_closure(c): return c.closure def component_has_children(c): return c.has_children def component_name(c): return c.name def component_affinity(c): return getattr(c, 'affinity', 'auto') def component_param_types(c): return getattr(c, 'param_types', None) def component_set_param_types(c, d): c.param_types = d def macro_params(m): return m.params def macro_rest_param(m): return m.rest_param def macro_body(m): return m.body def macro_closure(m): return m.closure def is_thunk(x): return isinstance(x, _Thunk) def thunk_expr(t): return t.expr def thunk_env(t): return t.env def is_callable(x): return callable(x) or isinstance(x, Lambda) def is_lambda(x): return isinstance(x, Lambda) def is_component(x): return isinstance(x, Component) def is_macro(x): return isinstance(x, Macro) def is_island(x): return isinstance(x, Island) def is_identical(a, b): return a is b def json_serialize(obj): import json return json.dumps(obj) def is_empty_dict(d): if not isinstance(d, dict): return True return len(d) == 0 # DOM event primitives — no-ops on server (browser-only). def dom_listen(el, name, handler): return lambda: None def dom_dispatch(el, name, detail=None): return False def event_detail(e): return None def env_has(env, name): return name in env def env_get(env, name): return env.get(name, NIL) def env_bind(env, name, val): """Create/overwrite binding on THIS env only (let, define, param binding).""" env[name] = val def env_set(env, name, val): """Mutate existing binding, walking scope chain (set!).""" if hasattr(env, 'set'): try: env.set(name, val) except KeyError: # Not found anywhere — bind on immediate env env[name] = val else: env[name] = val def env_extend(env): return _ensure_env(env).extend() def env_merge(base, overlay): base = _ensure_env(base) overlay = _ensure_env(overlay) if base is overlay: # Same env — just extend with empty local scope for params return base.extend() # Check if base is an ancestor of overlay — if so, overlay contains # everything in base. But overlay scopes between overlay and base may # have extra local bindings (e.g. page helpers injected at request time). # Only take the shortcut if no intermediate scope has local bindings. p = overlay depth = 0 while p is not None and depth < 100: if p is base: q = overlay has_extra = False while q is not base: if hasattr(q, '_bindings') and q._bindings: has_extra = True break q = getattr(q, '_parent', None) if not has_extra: return base.extend() break p = getattr(p, '_parent', None) depth += 1 # MergedEnv: reads walk base then overlay; set! walks base only return _MergedEnv({}, primary=base, secondary=overlay) def dict_set(d, k, v): d[k] = v def dict_get(d, k): v = d.get(k) return v if v is not None else NIL def dict_has(d, k): return k in d def dict_delete(d, k): d.pop(k, None) def is_render_expr(expr): """Placeholder — overridden by transpiled version from render.sx.""" return False # Render dispatch -- set by adapter _render_expr_fn = None # Render mode flag -- set by render-to-html/aser, checked by eval-list # When false, render expressions (HTML tags, components) fall through to eval-call _render_mode = False def render_active_p(): return _render_mode def set_render_active_b(val): global _render_mode _render_mode = bool(val) def render_expr(expr, env): if _render_expr_fn: return _render_expr_fn(expr, env) # No adapter — fall through to eval_call so components still evaluate return eval_call(first(expr), rest(expr), env) def strip_prefix(s, prefix): return s[len(prefix):] if s.startswith(prefix) else s def debug_log(*args): import sys print(*args, file=sys.stderr) def error(msg): raise EvalError(msg) def inspect(x): return repr(x) def escape_html(s): s = str(s) return s.replace("&", "&").replace("<", "<").replace(">", ">").replace('"', """) def escape_attr(s): return escape_html(s) def raw_html_content(x): return x.html def make_raw_html(s): return _RawHTML(s) def sx_expr_source(x): return x.source if isinstance(x, SxExpr) else str(x) try: from shared.sx.types import EvalError except ImportError: class EvalError(Exception): pass def _sx_append(lst, item): """Append item to list, return the item (for expression context).""" lst.append(item) return item def _sx_dict_set(d, k, v): """Set key in dict, return the value (for expression context).""" d[k] = v return v def _sx_set_attr(obj, attr, val): """Set attribute on object, return the value.""" setattr(obj, attr, val) return val def _sx_cell_set(cells, name, val): """Set a mutable cell value. Returns the value.""" cells[name] = val return val def escape_string(s): """Escape a string for SX serialization.""" return (str(s) .replace("\\\\", "\\\\\\\\") .replace('"', '\\\\"') .replace("\\n", "\\\\n") .replace("\\t", "\\\\t") .replace(""] = lambda a, b: a > b PRIMITIVES["<="] = lambda a, b: a <= b PRIMITIVES[">="] = lambda a, b: a >= b ''', "core.logic": ''' # core.logic PRIMITIVES["not"] = lambda x: not sx_truthy(x) ''', "core.predicates": ''' # core.predicates PRIMITIVES["nil?"] = lambda x: x is None or x is NIL PRIMITIVES["number?"] = lambda x: isinstance(x, (int, float)) and not isinstance(x, bool) PRIMITIVES["string?"] = lambda x: isinstance(x, str) PRIMITIVES["list?"] = lambda x: isinstance(x, _b_list) PRIMITIVES["dict?"] = lambda x: isinstance(x, _b_dict) PRIMITIVES["boolean?"] = lambda x: isinstance(x, bool) PRIMITIVES["symbol?"] = lambda x: isinstance(x, Symbol) PRIMITIVES["keyword?"] = lambda x: isinstance(x, Keyword) PRIMITIVES["continuation?"] = lambda x: isinstance(x, Continuation) PRIMITIVES["empty?"] = lambda c: ( c is None or c is NIL or (isinstance(c, (_b_list, str, _b_dict)) and _b_len(c) == 0) ) PRIMITIVES["contains?"] = lambda c, k: ( str(k) in c if isinstance(c, str) else k in c ) PRIMITIVES["odd?"] = lambda n: n % 2 != 0 PRIMITIVES["even?"] = lambda n: n % 2 == 0 PRIMITIVES["zero?"] = lambda n: n == 0 ''', "core.strings": ''' # core.strings PRIMITIVES["str"] = sx_str PRIMITIVES["char-from-code"] = lambda n: chr(_b_int(n)) PRIMITIVES["upper"] = lambda s: str(s).upper() PRIMITIVES["lower"] = lambda s: str(s).lower() PRIMITIVES["trim"] = lambda s: str(s).strip() PRIMITIVES["split"] = lambda s, sep=" ": str(s).split(sep) PRIMITIVES["join"] = lambda sep, coll: sep.join(str(x) for x in coll) PRIMITIVES["replace"] = lambda s, old, new: s.replace(old, new) PRIMITIVES["index-of"] = lambda s, needle, start=0: str(s).find(needle, start) PRIMITIVES["starts-with?"] = lambda s, p: str(s).startswith(p) PRIMITIVES["ends-with?"] = lambda s, p: str(s).endswith(p) PRIMITIVES["slice"] = lambda c, a, b=None: c[int(a):] if (b is None or b is NIL) else c[int(a):int(b)] PRIMITIVES["concat"] = lambda *args: _b_sum((a for a in args if a), []) ''', "core.collections": ''' # core.collections PRIMITIVES["list"] = lambda *args: _b_list(args) PRIMITIVES["dict"] = lambda *args: {args[i]: args[i+1] for i in _b_range(0, _b_len(args)-1, 2)} PRIMITIVES["range"] = lambda a, b, step=1: _b_list(_b_range(_b_int(a), _b_int(b), _b_int(step))) PRIMITIVES["get"] = lambda c, k, default=NIL: c.get(k, default) if isinstance(c, _b_dict) else (c[k] if isinstance(c, (_b_list, str)) and isinstance(k, _b_int) and 0 <= k < _b_len(c) else (c.get(k, default) if hasattr(c, 'get') else default)) PRIMITIVES["len"] = lambda c: _b_len(c) if c is not None and c is not NIL else 0 PRIMITIVES["first"] = lambda c: c[0] if c and _b_len(c) > 0 else NIL PRIMITIVES["last"] = lambda c: c[-1] if c and _b_len(c) > 0 else NIL PRIMITIVES["rest"] = lambda c: c[1:] if c else [] PRIMITIVES["nth"] = lambda c, n: c[n] if c and 0 <= n < _b_len(c) else NIL PRIMITIVES["cons"] = lambda x, c: [x] + (c or []) PRIMITIVES["append"] = lambda c, x: (c or []) + (x if isinstance(x, list) else [x]) PRIMITIVES["chunk-every"] = lambda c, n: [c[i:i+n] for i in _b_range(0, _b_len(c), n)] PRIMITIVES["zip-pairs"] = lambda c: [[c[i], c[i+1]] for i in _b_range(_b_len(c)-1)] ''', "core.dict": ''' # core.dict PRIMITIVES["keys"] = lambda d: _b_list((d or {}).keys()) PRIMITIVES["vals"] = lambda d: _b_list((d or {}).values()) PRIMITIVES["merge"] = lambda *args: _sx_merge_dicts(*args) PRIMITIVES["has-key?"] = lambda d, k: isinstance(d, _b_dict) and k in d PRIMITIVES["assoc"] = lambda d, *kvs: _sx_assoc(d, *kvs) PRIMITIVES["dissoc"] = lambda d, *ks: {k: v for k, v in d.items() if k not in ks} PRIMITIVES["into"] = lambda target, coll: (_b_list(coll) if isinstance(target, _b_list) else {p[0]: p[1] for p in coll if isinstance(p, _b_list) and _b_len(p) >= 2}) PRIMITIVES["zip"] = lambda *colls: [_b_list(t) for t in _b_zip(*colls)] def _sx_merge_dicts(*args): out = {} for d in args: if d and d is not NIL and isinstance(d, _b_dict): out.update(d) return out def _sx_assoc(d, *kvs): out = _b_dict(d) if d and d is not NIL else {} for i in _b_range(0, _b_len(kvs) - 1, 2): out[kvs[i]] = kvs[i + 1] return out ''', "stdlib.format": ''' # stdlib.format PRIMITIVES["format-decimal"] = lambda v, p=2: f"{float(v):.{p}f}" PRIMITIVES["parse-int"] = lambda v, d=0: _sx_parse_int(v, d) PRIMITIVES["parse-datetime"] = lambda s: str(s) if s else NIL def _sx_parse_int(v, default=0): if v is None or v is NIL: return default s = str(v).strip() # Match JS parseInt: extract leading integer portion import re as _re m = _re.match(r'^[+-]?\\d+', s) if m: return _b_int(m.group()) return default ''', "stdlib.text": ''' # stdlib.text PRIMITIVES["pluralize"] = lambda n, s="", p="s": s if n == 1 else p PRIMITIVES["escape"] = escape_html PRIMITIVES["strip-tags"] = lambda s: _strip_tags(str(s)) import re as _re def _strip_tags(s): return _re.sub(r"<[^>]+>", "", s) ''', "stdlib.style": ''' # stdlib.style — stubs (CSSX needs full runtime) ''', "stdlib.debug": ''' # stdlib.debug PRIMITIVES["assert"] = lambda cond, msg="Assertion failed": (_ for _ in ()).throw(RuntimeError(f"Assertion error: {msg}")) if not sx_truthy(cond) else True ''', "stdlib.spread": ''' # stdlib.spread — spread + collect + scope primitives PRIMITIVES["make-spread"] = make_spread PRIMITIVES["spread?"] = is_spread PRIMITIVES["spread-attrs"] = spread_attrs PRIMITIVES["collect!"] = sx_collect PRIMITIVES["collected"] = sx_collected PRIMITIVES["clear-collected!"] = sx_clear_collected # scope — unified render-time dynamic scope PRIMITIVES["scope-push!"] = scope_push PRIMITIVES["scope-pop!"] = scope_pop # provide-push!/provide-pop! — aliases for scope-push!/scope-pop! PRIMITIVES["provide-push!"] = provide_push PRIMITIVES["provide-pop!"] = provide_pop PRIMITIVES["context"] = sx_context PRIMITIVES["emit!"] = sx_emit PRIMITIVES["emitted"] = sx_emitted ''', } _ALL_PY_MODULES = list(PRIMITIVES_PY_MODULES.keys()) def _assemble_primitives_py(modules: list[str] | None = None) -> str: """Assemble Python primitive code from selected modules.""" if modules is None: modules = _ALL_PY_MODULES parts = [] for mod in modules: if mod in PRIMITIVES_PY_MODULES: parts.append(PRIMITIVES_PY_MODULES[mod]) return "\n".join(parts) # --------------------------------------------------------------------------- # Primitives pre/post — builtin aliases and HO helpers # --------------------------------------------------------------------------- PRIMITIVES_PY_PRE = ''' # ========================================================================= # Primitives # ========================================================================= # Save builtins before shadowing _b_len = len _b_map = map _b_filter = filter _b_range = range _b_list = list _b_dict = dict _b_max = max _b_min = min _b_round = round _b_abs = abs _b_sum = sum _b_zip = zip _b_int = int PRIMITIVES = {} ''' PRIMITIVES_PY_POST = ''' def is_primitive(name): if name in PRIMITIVES: return True from shared.sx.primitives import get_primitive as _ext_get return _ext_get(name) is not None def get_primitive(name): p = PRIMITIVES.get(name) if p is not None: return p from shared.sx.primitives import get_primitive as _ext_get return _ext_get(name) # Higher-order helpers used by transpiled code def map(fn, coll): return [fn(x) for x in coll] def map_indexed(fn, coll): return [fn(i, item) for i, item in enumerate(coll)] def filter(fn, coll): return [x for x in coll if sx_truthy(fn(x))] def reduce(fn, init, coll): acc = init for item in coll: acc = fn(acc, item) return acc def some(fn, coll): for item in coll: r = fn(item) if sx_truthy(r): return r return NIL def every_p(fn, coll): for item in coll: if not sx_truthy(fn(item)): return False return True def for_each(fn, coll): for item in coll: fn(item) return NIL def for_each_indexed(fn, coll): for i, item in enumerate(coll): fn(i, item) return NIL def map_dict(fn, d): return {k: fn(k, v) for k, v in d.items()} # Dynamic wind support (used by sf-dynamic-wind in eval.sx) _wind_stack = [] def push_wind_b(before, after): _wind_stack.append((before, after)) return NIL def pop_wind_b(): if _wind_stack: _wind_stack.pop() return NIL def call_thunk(f, env): """Call a zero-arg function/lambda.""" if is_callable(f) and not is_lambda(f): return f() if is_lambda(f): return trampoline(call_lambda(f, [], env)) return trampoline(eval_expr([f], env)) def dynamic_wind_call(before, body, after, env): """Execute dynamic-wind with try/finally for error safety.""" call_thunk(before, env) push_wind_b(before, after) try: result = call_thunk(body, env) finally: pop_wind_b() call_thunk(after, env) return result # Aliases used directly by transpiled code first = PRIMITIVES["first"] last = PRIMITIVES["last"] rest = PRIMITIVES["rest"] nth = PRIMITIVES["nth"] len = PRIMITIVES["len"] is_nil = PRIMITIVES["nil?"] empty_p = PRIMITIVES["empty?"] contains_p = PRIMITIVES["contains?"] starts_with_p = PRIMITIVES["starts-with?"] ends_with_p = PRIMITIVES["ends-with?"] slice = PRIMITIVES["slice"] get = PRIMITIVES["get"] append = PRIMITIVES["append"] cons = PRIMITIVES["cons"] keys = PRIMITIVES["keys"] join = PRIMITIVES["join"] range = PRIMITIVES["range"] apply = lambda f, args: f(*args) assoc = PRIMITIVES["assoc"] concat = PRIMITIVES["concat"] split = PRIMITIVES["split"] length = PRIMITIVES["len"] merge = PRIMITIVES["merge"] trim = PRIMITIVES["trim"] replace = PRIMITIVES["replace"] parse_int = PRIMITIVES["parse-int"] upper = PRIMITIVES["upper"] has_key_p = PRIMITIVES["has-key?"] dict_p = PRIMITIVES["dict?"] boolean_p = PRIMITIVES["boolean?"] symbol_p = PRIMITIVES["symbol?"] keyword_p = PRIMITIVES["keyword?"] number_p = PRIMITIVES["number?"] string_p = PRIMITIVES["string?"] list_p = PRIMITIVES["list?"] dissoc = PRIMITIVES["dissoc"] PRIMITIVES["char-code-at"] = lambda s, i: ord(s[int(i)]) if 0 <= int(i) < len(s) else 0 PRIMITIVES["to-hex"] = lambda n: hex(int(n) & 0xFFFFFFFF)[2:] char_code_at = PRIMITIVES["char-code-at"] to_hex = PRIMITIVES["to-hex"] index_of = PRIMITIVES["index-of"] lower = PRIMITIVES["lower"] char_from_code = PRIMITIVES["char-from-code"] ''' # --------------------------------------------------------------------------- # Platform: parser module — character classification, number parsing, # reader macro registry # --------------------------------------------------------------------------- PLATFORM_PARSER_PY = ''' # ========================================================================= # Platform interface — Parser # ========================================================================= import re as _re_parser _IDENT_START_RE = _re_parser.compile(r"[a-zA-Z_~*+\\-><=/!?&]") _IDENT_CHAR_RE = _re_parser.compile(r"[a-zA-Z0-9_~*+\\-><=/!?.:&/#,]") def ident_start_p(ch): return bool(_IDENT_START_RE.match(ch)) def ident_char_p(ch): return bool(_IDENT_CHAR_RE.match(ch)) def parse_number(s): """Parse a numeric string to int or float.""" try: if "." in s or "e" in s or "E" in s: return float(s) return int(s) except (ValueError, TypeError): return float(s) # Reader macro registry _reader_macros = {} def reader_macro_get(name): return _reader_macros.get(name, NIL) def reader_macro_set_b(name, handler): _reader_macros[name] = handler return NIL ''' # --------------------------------------------------------------------------- # Platform: deps module — component dependency analysis # --------------------------------------------------------------------------- PLATFORM_DEPS_PY = ( '\n' '# =========================================================================\n' '# Platform: deps module — component dependency analysis\n' '# =========================================================================\n' '\n' 'import re as _re\n' '\n' 'def component_deps(c):\n' ' """Return cached deps list for a component (may be empty)."""\n' ' return list(c.deps) if hasattr(c, "deps") and c.deps else []\n' '\n' 'def component_set_deps(c, deps):\n' ' """Cache deps on a component."""\n' ' c.deps = set(deps) if not isinstance(deps, set) else deps\n' '\n' 'def component_css_classes(c):\n' ' """Return pre-scanned CSS class list for a component."""\n' ' return list(c.css_classes) if hasattr(c, "css_classes") and c.css_classes else []\n' '\n' 'def env_components(env):\n' ' """Placeholder — overridden by transpiled version from deps.sx."""\n' ' return [k for k, v in env.items()\n' ' if isinstance(v, (Component, Macro))]\n' '\n' 'def regex_find_all(pattern, source):\n' ' """Return list of capture group 1 matches."""\n' ' return [m.group(1) for m in _re.finditer(pattern, source)]\n' '\n' 'def scan_css_classes(source):\n' ' """Extract CSS class strings from SX source."""\n' ' classes = set()\n' ' for m in _re.finditer(r\':class\\s+"([^"]*)"\', source):\n' ' classes.update(m.group(1).split())\n' ' for m in _re.finditer(r\':class\\s+\\(str\\s+((?:"[^"]*"\\s*)+)\\)\', source):\n' ' for s in _re.findall(r\'"([^"]*)"\', m.group(1)):\n' ' classes.update(s.split())\n' ' for m in _re.finditer(r\';;\\s*@css\\s+(.+)\', source):\n' ' classes.update(m.group(1).split())\n' ' return list(classes)\n' '\n' 'def component_io_refs(c):\n' ' """Return cached IO refs list, or NIL if not yet computed."""\n' ' if not hasattr(c, "io_refs") or c.io_refs is None:\n' ' return NIL\n' ' return list(c.io_refs)\n' '\n' 'def component_set_io_refs(c, refs):\n' ' """Cache IO refs on a component."""\n' ' c.io_refs = set(refs) if not isinstance(refs, set) else refs\n' ) # --------------------------------------------------------------------------- # Platform: CEK module — explicit CEK machine support # --------------------------------------------------------------------------- PLATFORM_CEK_PY = ''' # ========================================================================= # Platform: CEK module — explicit CEK machine # ========================================================================= # Standalone aliases for primitives used by cek.sx / frames.sx inc = PRIMITIVES["inc"] dec = PRIMITIVES["dec"] zip_pairs = PRIMITIVES["zip-pairs"] continuation_p = PRIMITIVES["continuation?"] def make_cek_continuation(captured, rest_kont): """Create a Continuation storing captured CEK frames as data.""" c = Continuation(lambda v=NIL: v) c._cek_data = {"captured": captured, "rest-kont": rest_kont} return c def continuation_data(c): """Return the _cek_data dict from a CEK continuation.""" return getattr(c, '_cek_data', {}) or {} ''' # Iterative override for cek_run — replaces transpiled recursive version CEK_FIXUPS_PY = ''' # Override recursive cek_run with iterative loop (avoids Python stack overflow) def cek_run(state): """Drive CEK machine to completion (iterative).""" while not cek_terminal_p(state): state = cek_step(state) return cek_value(state) # CEK is the canonical evaluator — override eval_expr to use it. # The tree-walk evaluator (eval_expr from eval.sx) is superseded. _tree_walk_eval_expr = eval_expr def eval_expr(expr, env): """Evaluate expr using the CEK machine.""" return cek_run(make_cek_state(expr, env, [])) # CEK never produces thunks — trampoline becomes identity _tree_walk_trampoline = trampoline def trampoline(val): """In CEK mode, values are immediate — resolve any legacy thunks.""" if is_thunk(val): return eval_expr(thunk_expr(val), thunk_env(val)) return val ''' # --------------------------------------------------------------------------- # Platform: async adapter — async evaluation, I/O dispatch # --------------------------------------------------------------------------- PLATFORM_ASYNC_PY = ''' # ========================================================================= # Platform interface -- Async adapter # ========================================================================= import contextvars import inspect as _inspect from shared.sx.primitives_io import ( IO_PRIMITIVES, RequestContext, execute_io, ) # Lazy imports to avoid circular dependency (html.py imports sx_ref.py) _css_class_collector_cv = None _svg_context_cv = None def _ensure_html_imports(): global _css_class_collector_cv, _svg_context_cv if _css_class_collector_cv is None: from shared.sx.html import css_class_collector, _svg_context _css_class_collector_cv = css_class_collector _svg_context_cv = _svg_context # When True, async_aser expands known components server-side _expand_components_cv: contextvars.ContextVar[bool] = contextvars.ContextVar( "_expand_components_ref", default=False ) class _AsyncThunk: __slots__ = ("expr", "env", "ctx") def __init__(self, expr, env, ctx): self.expr = expr self.env = env self.ctx = ctx def io_primitive_p(name): return name in IO_PRIMITIVES def expand_components_p(): return _expand_components_cv.get() def svg_context_p(): _ensure_html_imports() return _svg_context_cv.get(False) def svg_context_set(val): _ensure_html_imports() return _svg_context_cv.set(val) def svg_context_reset(token): _ensure_html_imports() _svg_context_cv.reset(token) def css_class_collect(val): _ensure_html_imports() collector = _css_class_collector_cv.get(None) if collector is not None: collector.update(str(val).split()) def is_raw_html(x): return isinstance(x, _RawHTML) def make_sx_expr(s): return SxExpr(s) def is_sx_expr(x): return isinstance(x, SxExpr) # Predicate helpers used by adapter-async (these are in PRIMITIVES but # the bootstrapped code calls them as plain functions) def string_p(x): return isinstance(x, str) def list_p(x): return isinstance(x, _b_list) def number_p(x): return isinstance(x, (int, float)) and not isinstance(x, bool) def is_async_coroutine(x): return _inspect.iscoroutine(x) async def async_await(x): return await x async def _async_trampoline(val): while isinstance(val, _AsyncThunk): val = await async_eval(val.expr, val.env, val.ctx) return val async def async_eval(expr, env, ctx=None): """Evaluate with I/O primitives. Entry point for async evaluation.""" if ctx is None: ctx = RequestContext() result = await _async_eval_inner(expr, env, ctx) while isinstance(result, _AsyncThunk): result = await _async_eval_inner(result.expr, result.env, result.ctx) return result async def _async_eval_inner(expr, env, ctx): """Intercept I/O primitives, delegate everything else to sync eval.""" if isinstance(expr, list) and expr: head = expr[0] if isinstance(head, Symbol) and head.name in IO_PRIMITIVES: args_list, kwargs = await _parse_io_args(expr[1:], env, ctx) return await execute_io(head.name, args_list, kwargs, ctx) is_render = isinstance(expr, list) and is_render_expr(expr) result = eval_expr(expr, env) result = trampoline(result) if is_render and isinstance(result, str): return _RawHTML(result) return result async def _parse_io_args(exprs, env, ctx): """Parse and evaluate I/O node args (keyword + positional).""" from shared.sx.types import Keyword as _Kw args_list = [] kwargs = {} i = 0 while i < len(exprs): item = exprs[i] if isinstance(item, _Kw) and i + 1 < len(exprs): kwargs[item.name] = await async_eval(exprs[i + 1], env, ctx) i += 2 else: args_list.append(await async_eval(item, env, ctx)) i += 1 return args_list, kwargs async def async_eval_to_sx(expr, env, ctx=None): """Evaluate and produce SX source string (wire format).""" if ctx is None: ctx = RequestContext() result = await async_aser(expr, env, ctx) if isinstance(result, SxExpr): return result if result is None or result is NIL: return SxExpr("") if isinstance(result, str): return SxExpr(result) return SxExpr(sx_serialize(result)) async def async_eval_slot_to_sx(expr, env, ctx=None): """Like async_eval_to_sx but expands component calls server-side.""" if ctx is None: ctx = RequestContext() token = _expand_components_cv.set(True) try: result = await async_eval_slot_inner(expr, env, ctx) if isinstance(result, SxExpr): return result if result is None or result is NIL: return SxExpr("") if isinstance(result, str): return SxExpr(result) return SxExpr(sx_serialize(result)) finally: _expand_components_cv.reset(token) ''' # --------------------------------------------------------------------------- # Fixups — wire up render adapter dispatch # --------------------------------------------------------------------------- FIXUPS_PY = ''' # ========================================================================= # Fixups -- wire up render adapter dispatch # ========================================================================= def _setup_html_adapter(): global _render_expr_fn _render_expr_fn = lambda expr, env: render_list_to_html(expr, env) def _setup_sx_adapter(): global _render_expr_fn _render_expr_fn = lambda expr, env: aser_list(expr, env) # Wrap aser_call and aser_fragment to return SxExpr # so serialize() won't double-quote them _orig_aser_call = None _orig_aser_fragment = None def _wrap_aser_outputs(): global aser_call, aser_fragment, _orig_aser_call, _orig_aser_fragment _orig_aser_call = aser_call _orig_aser_fragment = aser_fragment def _aser_call_wrapped(name, args, env): result = _orig_aser_call(name, args, env) return SxExpr(result) if isinstance(result, str) else result def _aser_fragment_wrapped(children, env): result = _orig_aser_fragment(children, env) return SxExpr(result) if isinstance(result, str) else result aser_call = _aser_call_wrapped aser_fragment = _aser_fragment_wrapped # Override sf_set_bang to walk the Env scope chain so that (set! var val) # updates the variable in its defining scope, not just the local copy. def sf_set_bang(args, env): name = symbol_name(first(args)) value = trampoline(eval_expr(nth(args, 1), env)) env = _ensure_env(env) try: env.set(name, value) except KeyError: # Not found in chain — define locally (matches prior behavior) env[name] = value return value ''' # --------------------------------------------------------------------------- # Extensions: delimited continuations # --------------------------------------------------------------------------- CONTINUATIONS_PY = ''' # ========================================================================= # Extension: delimited continuations (shift/reset) # ========================================================================= _RESET_RESUME = [] # stack of resume values; empty = not resuming # Extend the transpiled form name lists with continuation forms if isinstance(SPECIAL_FORM_NAMES, list): SPECIAL_FORM_NAMES.extend(["reset", "shift"]) else: _SPECIAL_FORM_NAMES = _SPECIAL_FORM_NAMES | frozenset(["reset", "shift"]) def sf_reset(args, env): """(reset body) -- establish a continuation delimiter.""" body = first(args) try: return trampoline(eval_expr(body, env)) except _ShiftSignal as sig: def cont_fn(value=NIL): _RESET_RESUME.append(value) try: return trampoline(eval_expr(body, env)) finally: _RESET_RESUME.pop() k = Continuation(cont_fn) sig_env = dict(sig.env) sig_env[sig.k_name] = k return trampoline(eval_expr(sig.body, sig_env)) def sf_shift(args, env): """(shift k body) -- capture continuation to nearest reset.""" if _RESET_RESUME: return _RESET_RESUME[-1] k_name = symbol_name(first(args)) body = nth(args, 1) raise _ShiftSignal(k_name, body, env) # Wrap eval_list to inject shift/reset dispatch _base_eval_list = eval_list def _eval_list_with_continuations(expr, env): head = first(expr) if type_of(head) == "symbol": name = symbol_name(head) args = rest(expr) if name == "reset": return sf_reset(args, env) if name == "shift": return sf_shift(args, env) return _base_eval_list(expr, env) eval_list = _eval_list_with_continuations # Inject into aser_special _base_aser_special = aser_special def _aser_special_with_continuations(name, expr, env): if name == "reset": return sf_reset(expr[1:], env) if name == "shift": return sf_shift(expr[1:], env) return _base_aser_special(name, expr, env) aser_special = _aser_special_with_continuations ''' # --------------------------------------------------------------------------- # Public API generator # --------------------------------------------------------------------------- def public_api_py(has_html: bool, has_sx: bool, has_deps: bool = False, has_async: bool = False) -> str: lines = [ '', '# =========================================================================', '# Public API', '# =========================================================================', '', ] if has_sx: lines.append('# Wrap aser outputs to return SxExpr') lines.append('_wrap_aser_outputs()') lines.append('') if has_html: lines.append('# Set HTML as default adapter') lines.append('_setup_html_adapter()') lines.append('') lines.extend([ 'def evaluate(expr, env=None):', ' """Evaluate expr in env and return the result."""', ' if env is None:', ' env = _Env()', ' elif isinstance(env, dict):', ' env = _Env(env)', ' result = eval_expr(expr, env)', ' while is_thunk(result):', ' result = eval_expr(thunk_expr(result), thunk_env(result))', ' return result', '', '', 'def render(expr, env=None):', ' """Render expr to HTML string."""', ' global _render_mode', ' if env is None:', ' env = {}', ' try:', ' _render_mode = True', ' return render_to_html(expr, env)', ' finally:', ' _render_mode = False', '', '', 'def make_env(**kwargs):', ' """Create an environment with initial bindings."""', ' return _Env(dict(kwargs))', '', '', 'def populate_effect_annotations(env, effect_map=None):', ' """Populate *effect-annotations* in env from boundary declarations.', '', ' If effect_map is provided, use it directly (dict of name -> effects list).', ' Otherwise, parse boundary.sx via boundary_parser.', ' """', ' if effect_map is None:', ' from shared.sx.ref.boundary_parser import parse_boundary_effects', ' effect_map = parse_boundary_effects()', ' anns = env.get("*effect-annotations*", {})', ' if not isinstance(anns, dict):', ' anns = {}', ' anns.update(effect_map)', ' env["*effect-annotations*"] = anns', ' return anns', '', '', 'def check_component_effects(env, comp_name=None):', ' """Check effect violations for components in env.', '', ' If comp_name is given, check only that component.', ' Returns list of diagnostic dicts (warnings, not errors).', ' """', ' anns = env.get("*effect-annotations*")', ' if not anns:', ' return []', ' diagnostics = []', ' names = [comp_name] if comp_name else [k for k in env if isinstance(k, str) and k.startswith("~")]', ' for name in names:', ' val = env.get(name)', ' if val is not None and type_of(val) == "component":', ' comp_effects = anns.get(name)', ' if comp_effects is None:', ' continue # unannotated — skip', ' body = val.body if hasattr(val, "body") else None', ' if body is None:', ' continue', ' _walk_effects(body, name, comp_effects, anns, diagnostics)', ' return diagnostics', '', '', 'def _walk_effects(node, comp_name, caller_effects, anns, diagnostics):', ' """Walk AST node and check effect calls."""', ' if not isinstance(node, list) or not node:', ' return', ' head = node[0]', ' if isinstance(head, Symbol):', ' callee = head.name', ' callee_effects = anns.get(callee)', ' if callee_effects is not None and caller_effects is not None:', ' for e in callee_effects:', ' if e not in caller_effects:', ' diagnostics.append({', ' "level": "warning",', ' "message": f"`{callee}` has effects {callee_effects} but `{comp_name}` only allows {caller_effects or \'[pure]\'}",', ' "component": comp_name,', ' })', ' break', ' for child in node[1:]:', ' _walk_effects(child, comp_name, caller_effects, anns, diagnostics)', ]) return '\n'.join(lines) # --------------------------------------------------------------------------- # Build config — which .sx files to transpile and in what order # --------------------------------------------------------------------------- ADAPTER_FILES = { "parser": ("parser.sx", "parser"), "html": ("adapter-html.sx", "adapter-html"), "sx": ("adapter-sx.sx", "adapter-sx"), "async": ("adapter-async.sx", "adapter-async"), } SPEC_MODULES = { "deps": ("deps.sx", "deps (component dependency analysis)"), "router": ("router.sx", "router (client-side route matching)"), "engine": ("engine.sx", "engine (fetch/swap/trigger pure logic)"), "signals": ("signals.sx", "signals (reactive signal runtime)"), "page-helpers": ("page-helpers.sx", "page-helpers (pure data transformation helpers)"), "types": ("types.sx", "types (gradual type system)"), } # Note: frames and cek are now part of evaluator.sx (always loaded as core) # Explicit ordering for spec modules with dependencies. SPEC_MODULE_ORDER = [ "deps", "engine", "page-helpers", "router", "signals", "types", ] EXTENSION_NAMES = {"continuations"} EXTENSION_FORMS = { "continuations": {"reset", "shift"}, }