Complete Python eval removal: epoch protocol, scope consolidation, JIT fixes

Route all rendering through OCaml bridge — render_to_html no longer uses
Python async_eval. Fix register_components to parse &key params and &rest
children from defcomp forms. Remove all dead sx_ref.py imports.

Epoch protocol (prevents pipe desync):
- Every command prefixed with (epoch N), all responses tagged with epoch
- Both sides discard stale-epoch messages — desync structurally impossible
- OCaml main loop discards stale io-responses between commands

Consolidate scope primitives into sx_scope.ml:
- Single source of truth for scope-push!/pop!/peek, collect!/collected,
  emit!/emitted, context, and 12 other scope operations
- Removes duplicate registrations from sx_server.ml (including bugs where
  scope-emit! and clear-collected! were registered twice with different impls)
- Bind scope prims into env so JIT VM finds them via OP_GLOBAL_GET

JIT VM fixes:
- Trampoline thunks before passing args to CALL_PRIM
- as_list resolves thunks via _sx_trampoline_fn
- len handles all value types (Bool, Number, RawHTML, SxExpr, Spread, etc.)

Other fixes:
- ~cssx/tw signature: (tokens) → (&key tokens) to match callers
- Minimal Python evaluator in html.py for sync sx() Jinja function
- Python scope primitive stubs (thread-local) for non-OCaml paths
- Reader macro resolution via OcamlSync instead of sx_ref.py

Tests: 1114 OCaml, 1078 JS, 35 Python regression, 6/6 Playwright SSR

Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
This commit is contained in:
2026-03-24 16:14:40 +00:00
parent e887c0d978
commit f9f810ffd7
18 changed files with 1305 additions and 478 deletions

View File

@@ -46,6 +46,7 @@ class OcamlBridge:
self._components_loaded = False
self._helpers_injected = False
self._io_cache: dict[tuple, Any] = {} # (name, args...) → cached result
self._epoch: int = 0 # request epoch — monotonically increasing
async def start(self) -> None:
"""Launch the OCaml subprocess and wait for (ready)."""
@@ -77,7 +78,7 @@ class OcamlBridge:
self._started = True
# Verify engine identity
await self._send("(ping)")
await self._send_command("(ping)")
kind, engine = await self._read_response()
engine_name = engine if kind == "ok" else "unknown"
_logger.info("OCaml SX kernel ready (pid=%d, engine=%s)", self._proc.pid, engine_name)
@@ -95,24 +96,36 @@ class OcamlBridge:
self._proc = None
self._started = False
async def _restart(self) -> None:
"""Kill and restart the OCaml subprocess to recover from pipe desync."""
_logger.warning("Restarting OCaml SX kernel (pipe recovery)")
if self._proc and self._proc.returncode is None:
self._proc.kill()
await self._proc.wait()
self._proc = None
self._started = False
self._components_loaded = False
self._helpers_injected = False
await self.start()
async def ping(self) -> str:
"""Health check — returns engine name (e.g. 'ocaml-cek')."""
async with self._lock:
await self._send("(ping)")
await self._send_command("(ping)")
kind, value = await self._read_response()
return value or "" if kind == "ok" else ""
async def load(self, path: str) -> int:
"""Load an .sx file for side effects (defcomp, define, defmacro)."""
async with self._lock:
await self._send(f'(load "{_escape(path)}")')
await self._send_command(f'(load "{_escape(path)}")')
value = await self._read_until_ok(ctx=None)
return int(float(value)) if value else 0
async def load_source(self, source: str) -> int:
"""Evaluate SX source for side effects."""
async with self._lock:
await self._send(f'(load-source "{_escape(source)}")')
await self._send_command(f'(load-source "{_escape(source)}")')
value = await self._read_until_ok(ctx=None)
return int(float(value)) if value else 0
@@ -124,7 +137,7 @@ class OcamlBridge:
"""
await self._ensure_components()
async with self._lock:
await self._send('(eval-blob)')
await self._send_command('(eval-blob)')
await self._send_blob(source)
return await self._read_until_ok(ctx)
@@ -136,14 +149,14 @@ class OcamlBridge:
"""Render SX to HTML, handling io-requests via Python async IO."""
await self._ensure_components()
async with self._lock:
await self._send(f'(render "{_escape(source)}")')
await self._send_command(f'(render "{_escape(source)}")')
return await self._read_until_ok(ctx)
async def aser(self, source: str, ctx: dict[str, Any] | None = None) -> str:
"""Evaluate SX and return SX wire format, handling io-requests."""
await self._ensure_components()
async with self._lock:
await self._send('(aser-blob)')
await self._send_command('(aser-blob)')
await self._send_blob(source)
return await self._read_until_ok(ctx)
@@ -159,7 +172,7 @@ class OcamlBridge:
# a separate lock acquisition could let another coroutine
# interleave commands between injection and aser-slot.
await self._inject_helpers_locked()
await self._send('(aser-slot-blob)')
await self._send_command('(aser-slot-blob)')
await self._send_blob(source)
return await self._read_until_ok(ctx)
@@ -182,7 +195,7 @@ class OcamlBridge:
var = f"__shell-{key.replace('_', '-')}"
defn = f'(define {var} "{_escape(str(val))}")'
try:
await self._send(f'(load-source "{_escape(defn)}")')
await self._send_command(f'(load-source "{_escape(defn)}")')
await self._read_until_ok(ctx=None)
except OcamlBridgeError as e:
_logger.warning("Shell static inject failed for %s: %s", key, e)
@@ -198,7 +211,7 @@ class OcamlBridge:
else:
defn = f'(define {var} "{_escape(str(val))}")'
try:
await self._send(f'(load-source "{_escape(defn)}")')
await self._send_command(f'(load-source "{_escape(defn)}")')
await self._read_until_ok(ctx=None)
except OcamlBridgeError as e:
_logger.warning("Shell static inject failed for %s: %s", key, e)
@@ -221,7 +234,7 @@ class OcamlBridge:
if pairs:
cmd = f'(set-request-cookies {{{" ".join(pairs)}}})'
try:
await self._send(cmd)
await self._send_command(cmd)
await self._read_until_ok(ctx=None)
except OcamlBridgeError as e:
_logger.debug("Cookie inject failed: %s", e)
@@ -277,7 +290,7 @@ class OcamlBridge:
parts.append(f' :{k} "{_escape(str(val))}"')
parts.append(")")
cmd = "".join(parts)
await self._send(cmd)
await self._send_command(cmd)
# Send page source as binary blob (avoids string-escape issues)
await self._send_blob(page_source)
html = await self._read_until_ok(ctx)
@@ -312,7 +325,7 @@ class OcamlBridge:
arg_list = " ".join(chr(97 + i) for i in range(nargs))
sx_def = f'(define {name} (fn ({param_names}) (helper "{name}" {arg_list})))'
try:
await self._send(f'(load-source "{_escape(sx_def)}")')
await self._send_command(f'(load-source "{_escape(sx_def)}")')
await self._read_until_ok(ctx=None)
count += 1
except OcamlBridgeError:
@@ -325,70 +338,11 @@ class OcamlBridge:
async def _compile_adapter_module(self) -> None:
"""Compile adapter-sx.sx to bytecode and load as a VM module.
All aser functions become NativeFn VM closures in the kernel env.
Subsequent aser-slot calls find them as NativeFn → VM executes
the entire render path compiled, no CEK steps.
Previously used Python's sx_ref.py evaluator for compilation.
Now the OCaml kernel handles JIT compilation natively — this method
is a no-op. The kernel's own JIT hook compiles functions on first call.
"""
from .parser import parse_all, serialize
from .ref.sx_ref import eval_expr, trampoline, PRIMITIVES
# Ensure compiler primitives are available
if 'serialize' not in PRIMITIVES:
PRIMITIVES['serialize'] = lambda x: serialize(x)
if 'primitive?' not in PRIMITIVES:
PRIMITIVES['primitive?'] = lambda name: isinstance(name, str) and name in PRIMITIVES
if 'has-key?' not in PRIMITIVES:
PRIMITIVES['has-key?'] = lambda *a: isinstance(a[0], dict) and str(a[1]) in a[0]
if 'set-nth!' not in PRIMITIVES:
from .types import NIL
PRIMITIVES['set-nth!'] = lambda *a: (a[0].__setitem__(int(a[1]), a[2]), NIL)[-1]
if 'init' not in PRIMITIVES:
PRIMITIVES['init'] = lambda *a: a[0][:-1] if isinstance(a[0], list) else a[0]
if 'concat' not in PRIMITIVES:
PRIMITIVES['concat'] = lambda *a: (a[0] or []) + (a[1] or [])
if 'slice' not in PRIMITIVES:
PRIMITIVES['slice'] = lambda *a: a[0][int(a[1]):int(a[2])] if len(a) == 3 else a[0][int(a[1]):]
from .types import Symbol
if 'make-symbol' not in PRIMITIVES:
PRIMITIVES['make-symbol'] = lambda name: Symbol(name)
from .types import NIL
for ho in ['map', 'filter', 'for-each', 'reduce', 'some', 'every?', 'map-indexed']:
if ho not in PRIMITIVES:
PRIMITIVES[ho] = lambda *a: NIL
# Load compiler
compiler_env = {}
spec_dir = os.path.join(os.path.dirname(__file__), "../../spec")
for f in ["bytecode.sx", "compiler.sx"]:
path = os.path.join(spec_dir, f)
if os.path.isfile(path):
with open(path) as fh:
for expr in parse_all(fh.read()):
trampoline(eval_expr(expr, compiler_env))
# Compile adapter-sx.sx
web_dir = os.path.join(os.path.dirname(__file__), "../../web")
adapter_path = os.path.join(web_dir, "adapter-sx.sx")
if not os.path.isfile(adapter_path):
_logger.warning("adapter-sx.sx not found at %s", adapter_path)
return
with open(adapter_path) as f:
adapter_exprs = parse_all(f.read())
compiled = trampoline(eval_expr(
[Symbol('compile-module'), [Symbol('quote'), adapter_exprs]],
compiler_env))
code_sx = serialize(compiled)
_logger.info("Compiled adapter-sx.sx: %d bytes bytecode", len(code_sx))
# Load the compiled module into the OCaml VM
async with self._lock:
await self._send(f'(vm-load-module {code_sx})')
await self._read_until_ok(ctx=None)
_logger.info("Loaded adapter-sx.sx as VM module")
_logger.info("Adapter module compilation delegated to OCaml kernel JIT")
async def _ensure_components(self) -> None:
"""Load all .sx source files into the kernel on first use.
@@ -455,7 +409,7 @@ class OcamlBridge:
async with self._lock:
for filepath in all_files:
try:
await self._send(f'(load "{_escape(filepath)}")')
await self._send_command(f'(load "{_escape(filepath)}")')
value = await self._read_until_ok(ctx=None)
# Response may be a number (count) or a value — just count files
count += 1
@@ -468,14 +422,14 @@ class OcamlBridge:
# reactive loops during island SSR — effects are DOM side-effects)
try:
noop_dispose = '(fn () nil)'
await self._send(f'(load-source "(define effect (fn (f) {noop_dispose}))")')
await self._send_command(f'(load-source "(define effect (fn (f) {noop_dispose}))")')
await self._read_until_ok(ctx=None)
except OcamlBridgeError:
pass
# Register JIT hook — lambdas compile on first call
try:
await self._send('(vm-compile-adapter)')
await self._send_command('(vm-compile-adapter)')
await self._read_until_ok(ctx=None)
_logger.info("JIT hook registered — lambdas compile on first call")
except OcamlBridgeError as e:
@@ -499,7 +453,7 @@ class OcamlBridge:
if callable(fn) and not name.startswith("~"):
sx_def = f'(define {name} (fn (&rest args) (apply helper (concat (list "{name}") args))))'
try:
await self._send(f'(load-source "{_escape(sx_def)}")')
await self._send_command(f'(load-source "{_escape(sx_def)}")')
await self._read_until_ok(ctx=None)
count += 1
except OcamlBridgeError:
@@ -510,7 +464,7 @@ class OcamlBridge:
async def reset(self) -> None:
"""Reset the kernel environment to pristine state."""
async with self._lock:
await self._send("(reset)")
await self._send_command("(reset)")
kind, value = await self._read_response()
if kind == "error":
raise OcamlBridgeError(f"reset: {value}")
@@ -531,6 +485,20 @@ class OcamlBridge:
self._proc.stdin.write((line + "\n").encode())
await self._proc.stdin.drain()
async def _send_command(self, line: str) -> None:
"""Send a command with a fresh epoch prefix.
Increments the epoch counter and sends (epoch N) before the
actual command. The OCaml kernel tags all responses with this
epoch so stale messages from previous requests are discarded.
"""
self._epoch += 1
assert self._proc and self._proc.stdin
_logger.debug("EPOCH %d SEND: %s", self._epoch, line[:120])
self._proc.stdin.write(f"(epoch {self._epoch})\n".encode())
self._proc.stdin.write((line + "\n").encode())
await self._proc.stdin.drain()
async def _send_blob(self, data: str) -> None:
"""Send a length-prefixed binary blob to the subprocess.
@@ -562,16 +530,45 @@ class OcamlBridge:
"""Read a single (ok ...) or (error ...) response.
Returns (kind, value) where kind is "ok" or "error".
Discards stale epoch messages.
"""
line = await self._readline()
# Length-prefixed blob
if line.startswith("(ok-len "):
n = int(line[8:-1])
assert self._proc and self._proc.stdout
data = await self._proc.stdout.readexactly(n)
await self._proc.stdout.readline() # trailing newline
return ("ok", data.decode())
return _parse_response(line)
while True:
line = await self._readline()
if not self._is_current_epoch(line):
_logger.debug("Discarding stale response: %s", line[:80])
if line.startswith("(ok-len "):
parts = line[1:-1].split()
if len(parts) >= 3:
n = int(parts[-1])
assert self._proc and self._proc.stdout
await self._proc.stdout.readexactly(n)
await self._proc.stdout.readline()
continue
# Length-prefixed blob: (ok-len EPOCH N) or (ok-len N)
if line.startswith("(ok-len "):
parts = line[1:-1].split()
n = int(parts[-1])
assert self._proc and self._proc.stdout
data = await self._proc.stdout.readexactly(n)
await self._proc.stdout.readline() # trailing newline
return ("ok", data.decode())
return _parse_response(line)
def _is_current_epoch(self, line: str) -> bool:
"""Check if a response line belongs to the current epoch.
Lines tagged with a stale epoch are discarded. Untagged lines
(from a kernel that predates the epoch protocol) are accepted.
"""
# Extract epoch number from known tagged formats:
# (ok EPOCH ...), (error EPOCH ...), (ok-len EPOCH N),
# (io-request EPOCH ...), (io-done EPOCH N)
import re
m = re.match(r'\((?:ok|error|ok-len|ok-raw|io-request|io-done)\s+(\d+)\b', line)
if m:
return int(m.group(1)) == self._epoch
# Untagged (legacy) — accept
return True
async def _read_until_ok(
self,
@@ -583,6 +580,9 @@ class OcamlBridge:
- Legacy (blocking): single io-request → immediate io-response
- Batched: collect io-requests until (io-done N), process ALL
concurrently with asyncio.gather, send responses in order
Lines tagged with a stale epoch are silently discarded, making
pipe desync from previous failed requests impossible.
"""
import asyncio
pending_batch: list[str] = []
@@ -590,20 +590,53 @@ class OcamlBridge:
while True:
line = await self._readline()
# Discard stale epoch messages
if not self._is_current_epoch(line):
_logger.debug("Discarding stale epoch message: %s", line[:80])
# If it's a stale ok-len, drain the blob bytes too
if line.startswith("(ok-len "):
parts = line[1:-1].split()
if len(parts) >= 3:
n = int(parts[2])
assert self._proc and self._proc.stdout
await self._proc.stdout.readexactly(n)
await self._proc.stdout.readline()
continue
if line.startswith("(io-request "):
# Check if batched (has numeric ID after "io-request ")
# New format: (io-request EPOCH ...) or (io-request EPOCH ID ...)
# Strip epoch from the line for IO dispatch
after = line[len("(io-request "):].lstrip()
# Skip epoch number if present
if after and after[0].isdigit():
# Batched mode — collect, don't respond yet
pending_batch.append(line)
continue
# Could be epoch or batch ID — check for second number
parts = after.split(None, 2)
if len(parts) >= 2 and parts[1][0].isdigit():
# (io-request EPOCH ID "name" args...) — batched with epoch
pending_batch.append(line)
continue
elif len(parts) >= 2 and parts[1].startswith('"'):
# (io-request EPOCH "name" args...) — legacy with epoch
try:
result = await self._handle_io_request(line, ctx)
await self._send(
f"(io-response {self._epoch} {_serialize_for_ocaml(result)})")
except Exception as e:
_logger.warning("IO request failed, sending nil: %s", e)
await self._send(f"(io-response {self._epoch} nil)")
continue
else:
# Old format: (io-request ID "name" ...) — batched, no epoch
pending_batch.append(line)
continue
# Legacy blocking mode — respond immediately
try:
result = await self._handle_io_request(line, ctx)
await self._send(f"(io-response {_serialize_for_ocaml(result)})")
await self._send(
f"(io-response {self._epoch} {_serialize_for_ocaml(result)})")
except Exception as e:
_logger.warning("IO request failed, sending nil: %s", e)
await self._send("(io-response nil)")
await self._send(f"(io-response {self._epoch} nil)")
continue
if line.startswith("(io-done "):
@@ -614,16 +647,17 @@ class OcamlBridge:
for result in results:
if isinstance(result, BaseException):
_logger.warning("Batched IO failed: %s", result)
await self._send("(io-response nil)")
await self._send(f"(io-response {self._epoch} nil)")
else:
await self._send(
f"(io-response {_serialize_for_ocaml(result)})")
f"(io-response {self._epoch} {_serialize_for_ocaml(result)})")
pending_batch = []
continue
# Length-prefixed blob: (ok-len N)
# Length-prefixed blob: (ok-len EPOCH N) or (ok-len N)
if line.startswith("(ok-len "):
n = int(line[8:-1])
parts = line[1:-1].split() # ["ok-len", epoch, n] or ["ok-len", n]
n = int(parts[-1]) # last number is always byte count
assert self._proc and self._proc.stdout
data = await self._proc.stdout.readexactly(n)
# Read trailing newline
@@ -829,25 +863,50 @@ def _escape(s: str) -> str:
def _parse_response(line: str) -> tuple[str, str | None]:
"""Parse an (ok ...) or (error ...) response line.
Handles epoch-tagged responses: (ok EPOCH), (ok EPOCH value),
(error EPOCH "msg"), as well as legacy untagged responses.
Returns (kind, value) tuple.
"""
line = line.strip()
if line == "(ok)":
# (ok EPOCH) — tagged no-value
if line == "(ok)" or (line.startswith("(ok ") and line[4:-1].isdigit()):
return ("ok", None)
if line.startswith("(ok-raw "):
# Raw SX wire format — no unescaping needed
return ("ok", line[8:-1])
# (ok-raw EPOCH value) or (ok-raw value)
inner = line[8:-1]
# Strip epoch if present
if inner and inner[0].isdigit():
space = inner.find(" ")
if space > 0:
inner = inner[space + 1:]
else:
return ("ok", None)
return ("ok", inner)
if line.startswith("(ok "):
value = line[4:-1] # strip (ok and )
inner = line[4:-1] # strip (ok and )
# Strip epoch number if present: (ok 42 "value") → "value"
if inner and inner[0].isdigit():
space = inner.find(" ")
if space > 0:
inner = inner[space + 1:]
else:
# (ok EPOCH) with no value
return ("ok", None)
# If the value is a quoted string, unquote it
if value.startswith('"') and value.endswith('"'):
value = _unescape(value[1:-1])
return ("ok", value)
if inner.startswith('"') and inner.endswith('"'):
inner = _unescape(inner[1:-1])
return ("ok", inner)
if line.startswith("(error "):
msg = line[7:-1]
if msg.startswith('"') and msg.endswith('"'):
msg = _unescape(msg[1:-1])
return ("error", msg)
inner = line[7:-1]
# Strip epoch number if present: (error 42 "msg") → "msg"
if inner and inner[0].isdigit():
space = inner.find(" ")
if space > 0:
inner = inner[space + 1:]
if inner.startswith('"') and inner.endswith('"'):
inner = _unescape(inner[1:-1])
return ("error", inner)
return ("error", f"Unexpected response: {line}")