Files
rose-ash/shared/sx/ref/platform_py.py
giles ea2b71cfa3 Add provide/context/emit!/emitted — render-time dynamic scope
Four new primitives for scoped downward value passing and upward
accumulation through the render tree. Specced in .sx, bootstrapped
to Python and JS across all adapters (eval, html, sx, dom, async).

Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
2026-03-13 02:58:21 +00:00

1566 lines
44 KiB
Python

"""
Platform sections for Python SX bootstrapper.
These are static Python code strings that form the runtime infrastructure
for the bootstrapped sx_ref.py module. They are NOT generated from .sx files —
they provide the host platform interface that the transpiled SX spec code
relies on.
Both G0 (bootstrap_py.py) and G1 (run_py_sx.py) import from here.
"""
from __future__ import annotations
from shared.sx.types import NIL as SX_NIL, Symbol # noqa: F401 — re-export for consumers
from shared.sx.parser import parse_all as _parse_all
def extract_defines(source: str) -> list[tuple[str, list]]:
"""Parse .sx source, return list of (name, define-expr) for top-level defines.
Recognizes both (define ...) and (define-async ...) forms.
"""
exprs = _parse_all(source)
defines = []
for expr in exprs:
if isinstance(expr, list) and expr and isinstance(expr[0], Symbol):
if expr[0].name in ("define", "define-async"):
name = expr[1].name if isinstance(expr[1], Symbol) else str(expr[1])
defines.append((name, expr))
return defines
# ---------------------------------------------------------------------------
# Preamble — file header, imports, type imports
# ---------------------------------------------------------------------------
PREAMBLE = '''\
"""
sx_ref.py -- Generated from reference SX evaluator specification.
Bootstrap-compiled from shared/sx/ref/{eval,render,adapter-html,adapter-sx}.sx
Compare against hand-written evaluator.py / html.py for correctness verification.
DO NOT EDIT -- regenerate with: python run_py_sx.py
"""
from __future__ import annotations
import math
from typing import Any
# =========================================================================
# Types (reuse existing types)
# =========================================================================
from shared.sx.types import (
NIL, Symbol, Keyword, Lambda, Component, Island, Continuation, Macro,
HandlerDef, QueryDef, ActionDef, PageDef, _ShiftSignal,
)
from shared.sx.parser import SxExpr
from shared.sx.env import Env as _Env, MergedEnv as _MergedEnv
'''
# ---------------------------------------------------------------------------
# Platform interface — core Python runtime
# ---------------------------------------------------------------------------
PLATFORM_PY = '''
# =========================================================================
# Platform interface -- Python implementation
# =========================================================================
class _Thunk:
"""Deferred evaluation for TCO."""
__slots__ = ("expr", "env")
def __init__(self, expr, env):
self.expr = expr
self.env = env
class _RawHTML:
"""Marker for pre-rendered HTML that should not be escaped."""
__slots__ = ("html",)
def __init__(self, html: str):
self.html = html
class _Spread:
"""Attribute injection value — merges attrs onto parent element."""
__slots__ = ("attrs",)
def __init__(self, attrs: dict):
self.attrs = dict(attrs) if attrs else {}
# Render-time accumulator buckets (per render pass)
_collect_buckets: dict[str, list] = {}
def _collect_reset():
"""Reset all collect buckets (call at start of each render pass)."""
global _collect_buckets
_collect_buckets = {}
# Render-time dynamic scope stacks (provide/context/emit!)
_provide_stacks: dict[str, list[dict]] = {}
def provide_push(name, value=None):
"""Push a provider scope with name, value, and empty emitted list."""
_provide_stacks.setdefault(name, []).append({"value": value, "emitted": []})
def provide_pop(name):
"""Pop the most recent provider scope for name."""
if name in _provide_stacks and _provide_stacks[name]:
_provide_stacks[name].pop()
def sx_context(name, *default):
"""Read value from nearest enclosing provider. Error if no provider and no default."""
if name in _provide_stacks and _provide_stacks[name]:
return _provide_stacks[name][-1]["value"]
if default:
return default[0]
raise RuntimeError(f"No provider for: {name}")
def sx_emit(name, value):
"""Append value to nearest enclosing provider's accumulator. Error if no provider."""
if name in _provide_stacks and _provide_stacks[name]:
_provide_stacks[name][-1]["emitted"].append(value)
else:
raise RuntimeError(f"No provider for emit!: {name}")
return NIL
def sx_emitted(name):
"""Return list of values emitted into nearest matching provider."""
if name in _provide_stacks and _provide_stacks[name]:
return list(_provide_stacks[name][-1]["emitted"])
return []
def sx_truthy(x):
"""SX truthiness: everything is truthy except False, None, and NIL."""
if x is False:
return False
if x is None or x is NIL:
return False
return True
def sx_str(*args):
"""SX str: concatenate string representations, skipping nil."""
parts = []
for a in args:
if a is None or a is NIL:
continue
parts.append(str(a))
return "".join(parts)
def sx_and(*args):
"""SX and: return last truthy value or first falsy."""
result = True
for a in args:
if not sx_truthy(a):
return a
result = a
return result
def sx_or(*args):
"""SX or: return first truthy value or last value."""
for a in args:
if sx_truthy(a):
return a
return args[-1] if args else False
def _sx_begin(*args):
"""Evaluate all args (for side effects), return last."""
return args[-1] if args else NIL
def _sx_case(match_val, pairs):
"""Case dispatch: pairs is [(test_val, body_fn), ...]. None test = else."""
for test, body_fn in pairs:
if test is None: # :else clause
return body_fn()
if match_val == test:
return body_fn()
return NIL
def _sx_fn(f):
"""Identity wrapper for multi-expression lambda bodies."""
return f
def type_of(x):
if x is None or x is NIL:
return "nil"
if isinstance(x, bool):
return "boolean"
if isinstance(x, (int, float)):
return "number"
if isinstance(x, SxExpr):
return "sx-expr"
if isinstance(x, str):
return "string"
if isinstance(x, Symbol):
return "symbol"
if isinstance(x, Keyword):
return "keyword"
if isinstance(x, _Thunk):
return "thunk"
if isinstance(x, Lambda):
return "lambda"
if isinstance(x, Component):
return "component"
if isinstance(x, Island):
return "island"
if isinstance(x, _Signal):
return "signal"
if isinstance(x, _Spread):
return "spread"
if isinstance(x, Macro):
return "macro"
if isinstance(x, _RawHTML):
return "raw-html"
if isinstance(x, Continuation):
return "continuation"
if isinstance(x, list):
return "list"
if isinstance(x, dict):
return "dict"
return "unknown"
def symbol_name(s):
return s.name
def keyword_name(k):
return k.name
def make_symbol(n):
return Symbol(n)
def make_keyword(n):
return Keyword(n)
def _ensure_env(env):
"""Wrap plain dict in Env if needed."""
if isinstance(env, _Env):
return env
return _Env(env if isinstance(env, dict) else {})
def make_lambda(params, body, env):
return Lambda(params=list(params), body=body, closure=_ensure_env(env))
def make_component(name, params, has_children, body, env, affinity="auto"):
return Component(name=name, params=list(params), has_children=has_children,
body=body, closure=dict(env), affinity=str(affinity) if affinity else "auto")
def make_island(name, params, has_children, body, env):
return Island(name=name, params=list(params), has_children=has_children,
body=body, closure=dict(env))
def make_macro(params, rest_param, body, env, name=None):
return Macro(params=list(params), rest_param=rest_param, body=body,
closure=dict(env), name=name)
def make_handler_def(name, params, body, env, opts=None):
path = opts.get('path') if opts else None
method = str(opts.get('method', 'get')) if opts else 'get'
csrf = opts.get('csrf', True) if opts else True
returns = str(opts.get('returns', 'element')) if opts else 'element'
if isinstance(csrf, str):
csrf = csrf.lower() not in ('false', 'nil', 'no')
return HandlerDef(name=name, params=list(params), body=body, closure=dict(env),
path=path, method=method.lower(), csrf=csrf, returns=returns)
def make_query_def(name, params, doc, body, env):
return QueryDef(name=name, params=list(params), doc=doc, body=body, closure=dict(env))
def make_action_def(name, params, doc, body, env):
return ActionDef(name=name, params=list(params), doc=doc, body=body, closure=dict(env))
def make_page_def(name, slots, env):
path = slots.get("path", "")
auth_val = slots.get("auth", "public")
if isinstance(auth_val, Keyword):
auth = auth_val.name
elif isinstance(auth_val, list):
auth = [item.name if isinstance(item, Keyword) else str(item) for item in auth_val]
else:
auth = str(auth_val) if auth_val else "public"
layout = slots.get("layout")
if isinstance(layout, Keyword):
layout = layout.name
cache = None
stream_val = slots.get("stream")
stream = bool(trampoline(eval_expr(stream_val, env))) if stream_val is not None else False
return PageDef(
name=name, path=path, auth=auth, layout=layout, cache=cache,
data_expr=slots.get("data"), content_expr=slots.get("content"),
filter_expr=slots.get("filter"), aside_expr=slots.get("aside"),
menu_expr=slots.get("menu"), stream=stream,
fallback_expr=slots.get("fallback"), shell_expr=slots.get("shell"),
closure=dict(env),
)
def make_thunk(expr, env):
return _Thunk(expr, env)
def make_spread(attrs):
return _Spread(attrs if isinstance(attrs, dict) else {})
def is_spread(x):
return isinstance(x, _Spread)
def spread_attrs(s):
return s.attrs if isinstance(s, _Spread) else {}
def sx_collect(bucket, value):
"""Add value to named render-time accumulator (deduplicated)."""
if bucket not in _collect_buckets:
_collect_buckets[bucket] = []
items = _collect_buckets[bucket]
if value not in items:
items.append(value)
def sx_collected(bucket):
"""Return all values in named render-time accumulator."""
return list(_collect_buckets.get(bucket, []))
def sx_clear_collected(bucket):
"""Clear a named render-time accumulator bucket."""
if bucket in _collect_buckets:
_collect_buckets[bucket] = []
def lambda_params(f):
return f.params
def lambda_body(f):
return f.body
def lambda_closure(f):
return f.closure
def lambda_name(f):
return f.name
def set_lambda_name(f, n):
f.name = n
def component_params(c):
return c.params
def component_body(c):
return c.body
def component_closure(c):
return c.closure
def component_has_children(c):
return c.has_children
def component_name(c):
return c.name
def component_affinity(c):
return getattr(c, 'affinity', 'auto')
def component_param_types(c):
return getattr(c, 'param_types', None)
def component_set_param_types(c, d):
c.param_types = d
def macro_params(m):
return m.params
def macro_rest_param(m):
return m.rest_param
def macro_body(m):
return m.body
def macro_closure(m):
return m.closure
def is_thunk(x):
return isinstance(x, _Thunk)
def thunk_expr(t):
return t.expr
def thunk_env(t):
return t.env
def is_callable(x):
return callable(x) or isinstance(x, Lambda)
def is_lambda(x):
return isinstance(x, Lambda)
def is_component(x):
return isinstance(x, Component)
def is_macro(x):
return isinstance(x, Macro)
def is_island(x):
return isinstance(x, Island)
def is_identical(a, b):
return a is b
# -------------------------------------------------------------------------
# Signal platform -- reactive state primitives
# -------------------------------------------------------------------------
class _Signal:
"""Reactive signal container."""
__slots__ = ("value", "subscribers", "deps")
def __init__(self, value):
self.value = value
self.subscribers = []
self.deps = []
class _TrackingContext:
"""Context for discovering signal dependencies."""
__slots__ = ("notify_fn", "deps")
def __init__(self, notify_fn):
self.notify_fn = notify_fn
self.deps = []
_tracking_context = None
def make_signal(value):
return _Signal(value)
def is_signal(x):
return isinstance(x, _Signal)
def signal_value(s):
return s.value if isinstance(s, _Signal) else s
def signal_set_value(s, v):
if isinstance(s, _Signal):
s.value = v
def signal_subscribers(s):
return list(s.subscribers) if isinstance(s, _Signal) else []
def signal_add_sub(s, fn):
if isinstance(s, _Signal) and fn not in s.subscribers:
s.subscribers.append(fn)
def signal_remove_sub(s, fn):
if isinstance(s, _Signal) and fn in s.subscribers:
s.subscribers.remove(fn)
def signal_deps(s):
return list(s.deps) if isinstance(s, _Signal) else []
def signal_set_deps(s, deps):
if isinstance(s, _Signal):
s.deps = list(deps) if isinstance(deps, list) else []
def set_tracking_context(ctx):
global _tracking_context
_tracking_context = ctx
def get_tracking_context():
global _tracking_context
return _tracking_context if _tracking_context is not None else NIL
def make_tracking_context(notify_fn):
return _TrackingContext(notify_fn)
def tracking_context_deps(ctx):
return ctx.deps if isinstance(ctx, _TrackingContext) else []
def tracking_context_add_dep(ctx, s):
if isinstance(ctx, _TrackingContext) and s not in ctx.deps:
ctx.deps.append(s)
def tracking_context_notify_fn(ctx):
return ctx.notify_fn if isinstance(ctx, _TrackingContext) else NIL
def invoke(f, *args):
"""Call f with args — handles both native callables and SX lambdas.
In Python, all transpiled lambdas are natively callable, so this is
just a direct call. The JS host needs dispatch logic here because
SX lambdas from runtime-evaluated code are objects, not functions.
"""
return f(*args)
def json_serialize(obj):
import json
return json.dumps(obj)
def is_empty_dict(d):
if not isinstance(d, dict):
return True
return len(d) == 0
# DOM event primitives — no-ops on server (browser-only).
def dom_listen(el, name, handler):
return lambda: None
def dom_dispatch(el, name, detail=None):
return False
def event_detail(e):
return None
def env_has(env, name):
return name in env
def env_get(env, name):
return env.get(name, NIL)
def env_set(env, name, val):
env[name] = val
def env_extend(env):
return _ensure_env(env).extend()
def env_merge(base, overlay):
base = _ensure_env(base)
overlay = _ensure_env(overlay)
if base is overlay:
# Same env — just extend with empty local scope for params
return base.extend()
# Check if base is an ancestor of overlay — if so, no need to merge
# (common for self-recursive calls where closure == caller's ancestor)
p = overlay
depth = 0
while p is not None and depth < 100:
if p is base:
return base.extend()
p = getattr(p, '_parent', None)
depth += 1
# MergedEnv: reads walk base then overlay; set! walks base only
return _MergedEnv({}, primary=base, secondary=overlay)
def dict_set(d, k, v):
d[k] = v
def dict_get(d, k):
v = d.get(k)
return v if v is not None else NIL
def dict_has(d, k):
return k in d
def dict_delete(d, k):
d.pop(k, None)
def is_render_expr(expr):
"""Placeholder — overridden by transpiled version from render.sx."""
return False
# Render dispatch -- set by adapter
_render_expr_fn = None
# Render mode flag -- set by render-to-html/aser, checked by eval-list
# When false, render expressions (HTML tags, components) fall through to eval-call
_render_mode = False
def render_active_p():
return _render_mode
def set_render_active_b(val):
global _render_mode
_render_mode = bool(val)
def render_expr(expr, env):
if _render_expr_fn:
return _render_expr_fn(expr, env)
# No adapter — fall through to eval_call so components still evaluate
return eval_call(first(expr), rest(expr), env)
def strip_prefix(s, prefix):
return s[len(prefix):] if s.startswith(prefix) else s
def debug_log(*args):
import sys
print(*args, file=sys.stderr)
def error(msg):
raise EvalError(msg)
def inspect(x):
return repr(x)
def escape_html(s):
s = str(s)
return s.replace("&", "&amp;").replace("<", "&lt;").replace(">", "&gt;").replace('"', "&quot;")
def escape_attr(s):
return escape_html(s)
def raw_html_content(x):
return x.html
def make_raw_html(s):
return _RawHTML(s)
def sx_expr_source(x):
return x.source if isinstance(x, SxExpr) else str(x)
try:
from shared.sx.types import EvalError
except ImportError:
class EvalError(Exception):
pass
def _sx_append(lst, item):
"""Append item to list, return the item (for expression context)."""
lst.append(item)
return item
def _sx_dict_set(d, k, v):
"""Set key in dict, return the value (for expression context)."""
d[k] = v
return v
def _sx_set_attr(obj, attr, val):
"""Set attribute on object, return the value."""
setattr(obj, attr, val)
return val
def _sx_cell_set(cells, name, val):
"""Set a mutable cell value. Returns the value."""
cells[name] = val
return val
def escape_string(s):
"""Escape a string for SX serialization."""
return (str(s)
.replace("\\\\", "\\\\\\\\")
.replace('"', '\\\\"')
.replace("\\n", "\\\\n")
.replace("\\t", "\\\\t")
.replace("</script", "<\\\\/script"))
def serialize(val):
"""Serialize an SX value to SX source text.
Note: parser.sx defines sx-serialize with a serialize alias, but parser.sx
is only included in JS builds (for client-side parsing). Python builds
provide this as a platform function.
"""
t = type_of(val)
if t == "sx-expr":
return val.source
if t == "nil":
return "nil"
if t == "boolean":
return "true" if val else "false"
if t == "number":
return str(val)
if t == "string":
return '"' + escape_string(val) + '"'
if t == "symbol":
return symbol_name(val)
if t == "keyword":
return ":" + keyword_name(val)
if t == "raw-html":
escaped = escape_string(raw_html_content(val))
return '(raw! "' + escaped + '")'
if t == "list":
if not val:
return "()"
items = [serialize(x) for x in val]
return "(" + " ".join(items) + ")"
if t == "dict":
items = []
for k, v in val.items():
items.append(":" + str(k))
items.append(serialize(v))
return "{" + " ".join(items) + "}"
if callable(val):
return "nil"
return str(val)
# Aliases for transpiled code — parser.sx defines sx-serialize/sx-serialize-dict
# but parser.sx is JS-only. Provide aliases so transpiled render.sx works.
sx_serialize = serialize
sx_serialize_dict = lambda d: serialize(d)
_SPECIAL_FORM_NAMES = frozenset() # Placeholder — overridden by transpiled adapter-sx.sx
_HO_FORM_NAMES = frozenset()
def is_special_form(name):
"""Placeholder — overridden by transpiled version from adapter-sx.sx."""
return False
def is_ho_form(name):
"""Placeholder — overridden by transpiled version from adapter-sx.sx."""
return False
def aser_special(name, expr, env):
"""Placeholder — overridden by transpiled version from adapter-sx.sx."""
return trampoline(eval_expr(expr, env))
'''
# ---------------------------------------------------------------------------
# Primitive modules — Python implementations keyed by spec module name.
# core.* modules are always included; stdlib.* are opt-in.
# ---------------------------------------------------------------------------
PRIMITIVES_PY_MODULES: dict[str, str] = {
"core.arithmetic": '''
# core.arithmetic
PRIMITIVES["+"] = lambda *args: _b_sum(args)
PRIMITIVES["-"] = lambda a, b=None: -a if b is None else a - b
PRIMITIVES["*"] = lambda *args: _sx_mul(*args)
PRIMITIVES["/"] = lambda a, b: a / b
PRIMITIVES["mod"] = lambda a, b: a % b
PRIMITIVES["inc"] = lambda n: n + 1
PRIMITIVES["dec"] = lambda n: n - 1
PRIMITIVES["abs"] = _b_abs
PRIMITIVES["floor"] = math.floor
PRIMITIVES["ceil"] = math.ceil
PRIMITIVES["round"] = _b_round
PRIMITIVES["min"] = _b_min
PRIMITIVES["max"] = _b_max
PRIMITIVES["sqrt"] = math.sqrt
PRIMITIVES["pow"] = lambda x, n: x ** n
PRIMITIVES["clamp"] = lambda x, lo, hi: _b_max(lo, _b_min(hi, x))
def _sx_mul(*args):
r = 1
for a in args:
r *= a
return r
''',
"core.comparison": '''
# core.comparison
PRIMITIVES["="] = lambda a, b: a == b
PRIMITIVES["!="] = lambda a, b: a != b
PRIMITIVES["<"] = lambda a, b: a < b
PRIMITIVES[">"] = lambda a, b: a > b
PRIMITIVES["<="] = lambda a, b: a <= b
PRIMITIVES[">="] = lambda a, b: a >= b
''',
"core.logic": '''
# core.logic
PRIMITIVES["not"] = lambda x: not sx_truthy(x)
''',
"core.predicates": '''
# core.predicates
PRIMITIVES["nil?"] = lambda x: x is None or x is NIL
PRIMITIVES["number?"] = lambda x: isinstance(x, (int, float)) and not isinstance(x, bool)
PRIMITIVES["string?"] = lambda x: isinstance(x, str)
PRIMITIVES["list?"] = lambda x: isinstance(x, _b_list)
PRIMITIVES["dict?"] = lambda x: isinstance(x, _b_dict)
PRIMITIVES["continuation?"] = lambda x: isinstance(x, Continuation)
PRIMITIVES["empty?"] = lambda c: (
c is None or c is NIL or
(isinstance(c, (_b_list, str, _b_dict)) and _b_len(c) == 0)
)
PRIMITIVES["contains?"] = lambda c, k: (
str(k) in c if isinstance(c, str) else
k in c
)
PRIMITIVES["odd?"] = lambda n: n % 2 != 0
PRIMITIVES["even?"] = lambda n: n % 2 == 0
PRIMITIVES["zero?"] = lambda n: n == 0
''',
"core.strings": '''
# core.strings
PRIMITIVES["str"] = sx_str
PRIMITIVES["upper"] = lambda s: str(s).upper()
PRIMITIVES["lower"] = lambda s: str(s).lower()
PRIMITIVES["trim"] = lambda s: str(s).strip()
PRIMITIVES["split"] = lambda s, sep=" ": str(s).split(sep)
PRIMITIVES["join"] = lambda sep, coll: sep.join(str(x) for x in coll)
PRIMITIVES["replace"] = lambda s, old, new: s.replace(old, new)
PRIMITIVES["index-of"] = lambda s, needle, start=0: str(s).find(needle, start)
PRIMITIVES["starts-with?"] = lambda s, p: str(s).startswith(p)
PRIMITIVES["ends-with?"] = lambda s, p: str(s).endswith(p)
PRIMITIVES["slice"] = lambda c, a, b=None: c[int(a):] if (b is None or b is NIL) else c[int(a):int(b)]
PRIMITIVES["concat"] = lambda *args: _b_sum((a for a in args if a), [])
''',
"core.collections": '''
# core.collections
PRIMITIVES["list"] = lambda *args: _b_list(args)
PRIMITIVES["dict"] = lambda *args: {args[i]: args[i+1] for i in _b_range(0, _b_len(args)-1, 2)}
PRIMITIVES["range"] = lambda a, b, step=1: _b_list(_b_range(_b_int(a), _b_int(b), _b_int(step)))
PRIMITIVES["get"] = lambda c, k, default=NIL: c.get(k, default) if isinstance(c, _b_dict) else (c[k] if isinstance(c, (_b_list, str)) and isinstance(k, _b_int) and 0 <= k < _b_len(c) else (c.get(k, default) if hasattr(c, 'get') else default))
PRIMITIVES["len"] = lambda c: _b_len(c) if c is not None and c is not NIL else 0
PRIMITIVES["first"] = lambda c: c[0] if c and _b_len(c) > 0 else NIL
PRIMITIVES["last"] = lambda c: c[-1] if c and _b_len(c) > 0 else NIL
PRIMITIVES["rest"] = lambda c: c[1:] if c else []
PRIMITIVES["nth"] = lambda c, n: c[n] if c and 0 <= n < _b_len(c) else NIL
PRIMITIVES["cons"] = lambda x, c: [x] + (c or [])
PRIMITIVES["append"] = lambda c, x: (c or []) + (x if isinstance(x, list) else [x])
PRIMITIVES["chunk-every"] = lambda c, n: [c[i:i+n] for i in _b_range(0, _b_len(c), n)]
PRIMITIVES["zip-pairs"] = lambda c: [[c[i], c[i+1]] for i in _b_range(_b_len(c)-1)]
''',
"core.dict": '''
# core.dict
PRIMITIVES["keys"] = lambda d: _b_list((d or {}).keys())
PRIMITIVES["vals"] = lambda d: _b_list((d or {}).values())
PRIMITIVES["merge"] = lambda *args: _sx_merge_dicts(*args)
PRIMITIVES["has-key?"] = lambda d, k: isinstance(d, _b_dict) and k in d
PRIMITIVES["assoc"] = lambda d, *kvs: _sx_assoc(d, *kvs)
PRIMITIVES["dissoc"] = lambda d, *ks: {k: v for k, v in d.items() if k not in ks}
PRIMITIVES["into"] = lambda target, coll: (_b_list(coll) if isinstance(target, _b_list) else {p[0]: p[1] for p in coll if isinstance(p, _b_list) and _b_len(p) >= 2})
PRIMITIVES["zip"] = lambda *colls: [_b_list(t) for t in _b_zip(*colls)]
def _sx_merge_dicts(*args):
out = {}
for d in args:
if d and d is not NIL and isinstance(d, _b_dict):
out.update(d)
return out
def _sx_assoc(d, *kvs):
out = _b_dict(d) if d and d is not NIL else {}
for i in _b_range(0, _b_len(kvs) - 1, 2):
out[kvs[i]] = kvs[i + 1]
return out
''',
"stdlib.format": '''
# stdlib.format
PRIMITIVES["format-decimal"] = lambda v, p=2: f"{float(v):.{p}f}"
PRIMITIVES["parse-int"] = lambda v, d=0: _sx_parse_int(v, d)
PRIMITIVES["parse-datetime"] = lambda s: str(s) if s else NIL
def _sx_parse_int(v, default=0):
if v is None or v is NIL:
return default
s = str(v).strip()
# Match JS parseInt: extract leading integer portion
import re as _re
m = _re.match(r'^[+-]?\\d+', s)
if m:
return _b_int(m.group())
return default
''',
"stdlib.text": '''
# stdlib.text
PRIMITIVES["pluralize"] = lambda n, s="", p="s": s if n == 1 else p
PRIMITIVES["escape"] = escape_html
PRIMITIVES["strip-tags"] = lambda s: _strip_tags(str(s))
import re as _re
def _strip_tags(s):
return _re.sub(r"<[^>]+>", "", s)
''',
"stdlib.style": '''
# stdlib.style — stubs (CSSX needs full runtime)
''',
"stdlib.debug": '''
# stdlib.debug
PRIMITIVES["assert"] = lambda cond, msg="Assertion failed": (_ for _ in ()).throw(RuntimeError(f"Assertion error: {msg}")) if not sx_truthy(cond) else True
''',
"stdlib.spread": '''
# stdlib.spread — spread + collect primitives
PRIMITIVES["make-spread"] = make_spread
PRIMITIVES["spread?"] = is_spread
PRIMITIVES["spread-attrs"] = spread_attrs
PRIMITIVES["collect!"] = sx_collect
PRIMITIVES["collected"] = sx_collected
PRIMITIVES["clear-collected!"] = sx_clear_collected
# provide/context/emit! — render-time dynamic scope
PRIMITIVES["provide-push!"] = provide_push
PRIMITIVES["provide-pop!"] = provide_pop
PRIMITIVES["context"] = sx_context
PRIMITIVES["emit!"] = sx_emit
PRIMITIVES["emitted"] = sx_emitted
''',
}
_ALL_PY_MODULES = list(PRIMITIVES_PY_MODULES.keys())
def _assemble_primitives_py(modules: list[str] | None = None) -> str:
"""Assemble Python primitive code from selected modules."""
if modules is None:
modules = _ALL_PY_MODULES
parts = []
for mod in modules:
if mod in PRIMITIVES_PY_MODULES:
parts.append(PRIMITIVES_PY_MODULES[mod])
return "\n".join(parts)
# ---------------------------------------------------------------------------
# Primitives pre/post — builtin aliases and HO helpers
# ---------------------------------------------------------------------------
PRIMITIVES_PY_PRE = '''
# =========================================================================
# Primitives
# =========================================================================
# Save builtins before shadowing
_b_len = len
_b_map = map
_b_filter = filter
_b_range = range
_b_list = list
_b_dict = dict
_b_max = max
_b_min = min
_b_round = round
_b_abs = abs
_b_sum = sum
_b_zip = zip
_b_int = int
PRIMITIVES = {}
'''
PRIMITIVES_PY_POST = '''
def is_primitive(name):
if name in PRIMITIVES:
return True
from shared.sx.primitives import get_primitive as _ext_get
return _ext_get(name) is not None
def get_primitive(name):
p = PRIMITIVES.get(name)
if p is not None:
return p
from shared.sx.primitives import get_primitive as _ext_get
return _ext_get(name)
# Higher-order helpers used by transpiled code
def map(fn, coll):
return [fn(x) for x in coll]
def map_indexed(fn, coll):
return [fn(i, item) for i, item in enumerate(coll)]
def filter(fn, coll):
return [x for x in coll if sx_truthy(fn(x))]
def reduce(fn, init, coll):
acc = init
for item in coll:
acc = fn(acc, item)
return acc
def some(fn, coll):
for item in coll:
r = fn(item)
if sx_truthy(r):
return r
return NIL
def every_p(fn, coll):
for item in coll:
if not sx_truthy(fn(item)):
return False
return True
def for_each(fn, coll):
for item in coll:
fn(item)
return NIL
def for_each_indexed(fn, coll):
for i, item in enumerate(coll):
fn(i, item)
return NIL
def map_dict(fn, d):
return {k: fn(k, v) for k, v in d.items()}
# Aliases used directly by transpiled code
first = PRIMITIVES["first"]
last = PRIMITIVES["last"]
rest = PRIMITIVES["rest"]
nth = PRIMITIVES["nth"]
len = PRIMITIVES["len"]
is_nil = PRIMITIVES["nil?"]
empty_p = PRIMITIVES["empty?"]
contains_p = PRIMITIVES["contains?"]
starts_with_p = PRIMITIVES["starts-with?"]
ends_with_p = PRIMITIVES["ends-with?"]
slice = PRIMITIVES["slice"]
get = PRIMITIVES["get"]
append = PRIMITIVES["append"]
cons = PRIMITIVES["cons"]
keys = PRIMITIVES["keys"]
join = PRIMITIVES["join"]
range = PRIMITIVES["range"]
apply = lambda f, args: f(*args)
assoc = PRIMITIVES["assoc"]
concat = PRIMITIVES["concat"]
split = PRIMITIVES["split"]
length = PRIMITIVES["len"]
merge = PRIMITIVES["merge"]
trim = PRIMITIVES["trim"]
replace = PRIMITIVES["replace"]
parse_int = PRIMITIVES["parse-int"]
upper = PRIMITIVES["upper"]
has_key_p = PRIMITIVES["has-key?"]
dissoc = PRIMITIVES["dissoc"]
index_of = PRIMITIVES["index-of"]
'''
# ---------------------------------------------------------------------------
# Platform: deps module — component dependency analysis
# ---------------------------------------------------------------------------
PLATFORM_DEPS_PY = (
'\n'
'# =========================================================================\n'
'# Platform: deps module — component dependency analysis\n'
'# =========================================================================\n'
'\n'
'import re as _re\n'
'\n'
'def component_deps(c):\n'
' """Return cached deps list for a component (may be empty)."""\n'
' return list(c.deps) if hasattr(c, "deps") and c.deps else []\n'
'\n'
'def component_set_deps(c, deps):\n'
' """Cache deps on a component."""\n'
' c.deps = set(deps) if not isinstance(deps, set) else deps\n'
'\n'
'def component_css_classes(c):\n'
' """Return pre-scanned CSS class list for a component."""\n'
' return list(c.css_classes) if hasattr(c, "css_classes") and c.css_classes else []\n'
'\n'
'def env_components(env):\n'
' """Placeholder — overridden by transpiled version from deps.sx."""\n'
' return [k for k, v in env.items()\n'
' if isinstance(v, (Component, Macro))]\n'
'\n'
'def regex_find_all(pattern, source):\n'
' """Return list of capture group 1 matches."""\n'
' return [m.group(1) for m in _re.finditer(pattern, source)]\n'
'\n'
'def scan_css_classes(source):\n'
' """Extract CSS class strings from SX source."""\n'
' classes = set()\n'
' for m in _re.finditer(r\':class\\s+"([^"]*)"\', source):\n'
' classes.update(m.group(1).split())\n'
' for m in _re.finditer(r\':class\\s+\\(str\\s+((?:"[^"]*"\\s*)+)\\)\', source):\n'
' for s in _re.findall(r\'"([^"]*)"\', m.group(1)):\n'
' classes.update(s.split())\n'
' for m in _re.finditer(r\';;\\s*@css\\s+(.+)\', source):\n'
' classes.update(m.group(1).split())\n'
' return list(classes)\n'
'\n'
'def component_io_refs(c):\n'
' """Return cached IO refs list, or NIL if not yet computed."""\n'
' if not hasattr(c, "io_refs") or c.io_refs is None:\n'
' return NIL\n'
' return list(c.io_refs)\n'
'\n'
'def component_set_io_refs(c, refs):\n'
' """Cache IO refs on a component."""\n'
' c.io_refs = set(refs) if not isinstance(refs, set) else refs\n'
)
# ---------------------------------------------------------------------------
# Platform: async adapter — async evaluation, I/O dispatch
# ---------------------------------------------------------------------------
PLATFORM_ASYNC_PY = '''
# =========================================================================
# Platform interface -- Async adapter
# =========================================================================
import contextvars
import inspect
from shared.sx.primitives_io import (
IO_PRIMITIVES, RequestContext, execute_io,
)
# Lazy imports to avoid circular dependency (html.py imports sx_ref.py)
_css_class_collector_cv = None
_svg_context_cv = None
def _ensure_html_imports():
global _css_class_collector_cv, _svg_context_cv
if _css_class_collector_cv is None:
from shared.sx.html import css_class_collector, _svg_context
_css_class_collector_cv = css_class_collector
_svg_context_cv = _svg_context
# When True, async_aser expands known components server-side
_expand_components_cv: contextvars.ContextVar[bool] = contextvars.ContextVar(
"_expand_components_ref", default=False
)
class _AsyncThunk:
__slots__ = ("expr", "env", "ctx")
def __init__(self, expr, env, ctx):
self.expr = expr
self.env = env
self.ctx = ctx
def io_primitive_p(name):
return name in IO_PRIMITIVES
def expand_components_p():
return _expand_components_cv.get()
def svg_context_p():
_ensure_html_imports()
return _svg_context_cv.get(False)
def svg_context_set(val):
_ensure_html_imports()
return _svg_context_cv.set(val)
def svg_context_reset(token):
_ensure_html_imports()
_svg_context_cv.reset(token)
def css_class_collect(val):
_ensure_html_imports()
collector = _css_class_collector_cv.get(None)
if collector is not None:
collector.update(str(val).split())
def is_raw_html(x):
return isinstance(x, _RawHTML)
def make_sx_expr(s):
return SxExpr(s)
def is_sx_expr(x):
return isinstance(x, SxExpr)
# Predicate helpers used by adapter-async (these are in PRIMITIVES but
# the bootstrapped code calls them as plain functions)
def string_p(x):
return isinstance(x, str)
def list_p(x):
return isinstance(x, _b_list)
def number_p(x):
return isinstance(x, (int, float)) and not isinstance(x, bool)
def sx_parse(src):
from shared.sx.parser import parse_all
return parse_all(src)
def is_async_coroutine(x):
return inspect.iscoroutine(x)
async def async_await(x):
return await x
async def _async_trampoline(val):
while isinstance(val, _AsyncThunk):
val = await async_eval(val.expr, val.env, val.ctx)
return val
async def async_eval(expr, env, ctx=None):
"""Evaluate with I/O primitives. Entry point for async evaluation."""
if ctx is None:
ctx = RequestContext()
result = await _async_eval_inner(expr, env, ctx)
while isinstance(result, _AsyncThunk):
result = await _async_eval_inner(result.expr, result.env, result.ctx)
return result
async def _async_eval_inner(expr, env, ctx):
"""Intercept I/O primitives, delegate everything else to sync eval."""
if isinstance(expr, list) and expr:
head = expr[0]
if isinstance(head, Symbol) and head.name in IO_PRIMITIVES:
args_list, kwargs = await _parse_io_args(expr[1:], env, ctx)
return await execute_io(head.name, args_list, kwargs, ctx)
is_render = isinstance(expr, list) and is_render_expr(expr)
result = eval_expr(expr, env)
result = trampoline(result)
if is_render and isinstance(result, str):
return _RawHTML(result)
return result
async def _parse_io_args(exprs, env, ctx):
"""Parse and evaluate I/O node args (keyword + positional)."""
from shared.sx.types import Keyword as _Kw
args_list = []
kwargs = {}
i = 0
while i < len(exprs):
item = exprs[i]
if isinstance(item, _Kw) and i + 1 < len(exprs):
kwargs[item.name] = await async_eval(exprs[i + 1], env, ctx)
i += 2
else:
args_list.append(await async_eval(item, env, ctx))
i += 1
return args_list, kwargs
async def async_eval_to_sx(expr, env, ctx=None):
"""Evaluate and produce SX source string (wire format)."""
if ctx is None:
ctx = RequestContext()
result = await async_aser(expr, env, ctx)
if isinstance(result, SxExpr):
return result
if result is None or result is NIL:
return SxExpr("")
if isinstance(result, str):
return SxExpr(result)
return SxExpr(sx_serialize(result))
async def async_eval_slot_to_sx(expr, env, ctx=None):
"""Like async_eval_to_sx but expands component calls server-side."""
if ctx is None:
ctx = RequestContext()
token = _expand_components_cv.set(True)
try:
result = await async_eval_slot_inner(expr, env, ctx)
if isinstance(result, SxExpr):
return result
if result is None or result is NIL:
return SxExpr("")
if isinstance(result, str):
return SxExpr(result)
return SxExpr(sx_serialize(result))
finally:
_expand_components_cv.reset(token)
'''
# ---------------------------------------------------------------------------
# Fixups — wire up render adapter dispatch
# ---------------------------------------------------------------------------
FIXUPS_PY = '''
# =========================================================================
# Fixups -- wire up render adapter dispatch
# =========================================================================
def _setup_html_adapter():
global _render_expr_fn
_render_expr_fn = lambda expr, env: render_list_to_html(expr, env)
def _setup_sx_adapter():
global _render_expr_fn
_render_expr_fn = lambda expr, env: aser_list(expr, env)
# Wrap aser_call and aser_fragment to return SxExpr
# so serialize() won't double-quote them
_orig_aser_call = None
_orig_aser_fragment = None
def _wrap_aser_outputs():
global aser_call, aser_fragment, _orig_aser_call, _orig_aser_fragment
_orig_aser_call = aser_call
_orig_aser_fragment = aser_fragment
def _aser_call_wrapped(name, args, env):
result = _orig_aser_call(name, args, env)
return SxExpr(result) if isinstance(result, str) else result
def _aser_fragment_wrapped(children, env):
result = _orig_aser_fragment(children, env)
return SxExpr(result) if isinstance(result, str) else result
aser_call = _aser_call_wrapped
aser_fragment = _aser_fragment_wrapped
# Override sf_set_bang to walk the Env scope chain so that (set! var val)
# updates the variable in its defining scope, not just the local copy.
def sf_set_bang(args, env):
name = symbol_name(first(args))
value = trampoline(eval_expr(nth(args, 1), env))
env = _ensure_env(env)
try:
env.set(name, value)
except KeyError:
# Not found in chain — define locally (matches prior behavior)
env[name] = value
return value
'''
# ---------------------------------------------------------------------------
# Extensions: delimited continuations
# ---------------------------------------------------------------------------
CONTINUATIONS_PY = '''
# =========================================================================
# Extension: delimited continuations (shift/reset)
# =========================================================================
_RESET_RESUME = [] # stack of resume values; empty = not resuming
# Extend the transpiled form name lists with continuation forms
if isinstance(SPECIAL_FORM_NAMES, list):
SPECIAL_FORM_NAMES.extend(["reset", "shift"])
else:
_SPECIAL_FORM_NAMES = _SPECIAL_FORM_NAMES | frozenset(["reset", "shift"])
def sf_reset(args, env):
"""(reset body) -- establish a continuation delimiter."""
body = first(args)
try:
return trampoline(eval_expr(body, env))
except _ShiftSignal as sig:
def cont_fn(value=NIL):
_RESET_RESUME.append(value)
try:
return trampoline(eval_expr(body, env))
finally:
_RESET_RESUME.pop()
k = Continuation(cont_fn)
sig_env = dict(sig.env)
sig_env[sig.k_name] = k
return trampoline(eval_expr(sig.body, sig_env))
def sf_shift(args, env):
"""(shift k body) -- capture continuation to nearest reset."""
if _RESET_RESUME:
return _RESET_RESUME[-1]
k_name = symbol_name(first(args))
body = nth(args, 1)
raise _ShiftSignal(k_name, body, env)
# Wrap eval_list to inject shift/reset dispatch
_base_eval_list = eval_list
def _eval_list_with_continuations(expr, env):
head = first(expr)
if type_of(head) == "symbol":
name = symbol_name(head)
args = rest(expr)
if name == "reset":
return sf_reset(args, env)
if name == "shift":
return sf_shift(args, env)
return _base_eval_list(expr, env)
eval_list = _eval_list_with_continuations
# Inject into aser_special
_base_aser_special = aser_special
def _aser_special_with_continuations(name, expr, env):
if name == "reset":
return sf_reset(expr[1:], env)
if name == "shift":
return sf_shift(expr[1:], env)
return _base_aser_special(name, expr, env)
aser_special = _aser_special_with_continuations
'''
# ---------------------------------------------------------------------------
# Public API generator
# ---------------------------------------------------------------------------
def public_api_py(has_html: bool, has_sx: bool, has_deps: bool = False,
has_async: bool = False) -> str:
lines = [
'',
'# =========================================================================',
'# Public API',
'# =========================================================================',
'',
]
if has_sx:
lines.append('# Wrap aser outputs to return SxExpr')
lines.append('_wrap_aser_outputs()')
lines.append('')
if has_html:
lines.append('# Set HTML as default adapter')
lines.append('_setup_html_adapter()')
lines.append('')
lines.extend([
'def evaluate(expr, env=None):',
' """Evaluate expr in env and return the result."""',
' if env is None:',
' env = _Env()',
' elif isinstance(env, dict):',
' env = _Env(env)',
' result = eval_expr(expr, env)',
' while is_thunk(result):',
' result = eval_expr(thunk_expr(result), thunk_env(result))',
' return result',
'',
'',
'def render(expr, env=None):',
' """Render expr to HTML string."""',
' global _render_mode',
' if env is None:',
' env = {}',
' try:',
' _render_mode = True',
' return render_to_html(expr, env)',
' finally:',
' _render_mode = False',
'',
'',
'def make_env(**kwargs):',
' """Create an environment with initial bindings."""',
' return _Env(dict(kwargs))',
])
return '\n'.join(lines)
# ---------------------------------------------------------------------------
# Build config — which .sx files to transpile and in what order
# ---------------------------------------------------------------------------
ADAPTER_FILES = {
"html": ("adapter-html.sx", "adapter-html"),
"sx": ("adapter-sx.sx", "adapter-sx"),
"async": ("adapter-async.sx", "adapter-async"),
}
SPEC_MODULES = {
"deps": ("deps.sx", "deps (component dependency analysis)"),
"router": ("router.sx", "router (client-side route matching)"),
"engine": ("engine.sx", "engine (fetch/swap/trigger pure logic)"),
"signals": ("signals.sx", "signals (reactive signal runtime)"),
"page-helpers": ("page-helpers.sx", "page-helpers (pure data transformation helpers)"),
"types": ("types.sx", "types (gradual type system)"),
}
EXTENSION_NAMES = {"continuations"}
EXTENSION_FORMS = {
"continuations": {"reset", "shift"},
}