Files
rose-ash/shared/sx/ocaml_sync.py
giles f0f16d24bc Fix bridge newline escaping, stepper optimization, dom-listen dedup
- ocaml_sync.py: escape newlines in eval/load_source to prevent
  protocol desync (bridge crashed on any multi-line SX)
- Stepper: do-back uses rebuild-preview (O(1) render) instead of
  replaying all steps. Hydration effect same. Cookie save on button
  click only.
- dom.sx: remove duplicate dom-listen (was shadowing the one at
  line 351 that adapter-dom.sx's dom-on wraps)
- orchestration.sx: fix bind-sse-swap close paren count
- safe_eq: Dict equality via __host_handle for DOM node identity

Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
2026-03-25 15:14:10 +00:00

168 lines
5.5 KiB
Python

"""
Synchronous OCaml bridge — persistent subprocess for build-time evaluation.
Used by bootstrappers (JS cli.py, OCaml bootstrap.py) that need a sync
evaluator to run transpiler.sx. For async runtime use, see ocaml_bridge.py.
"""
from __future__ import annotations
import os
import subprocess
import sys
_DEFAULT_BIN = os.path.join(
os.path.dirname(__file__),
"../../hosts/ocaml/_build/default/bin/sx_server.exe",
)
class OcamlSyncError(Exception):
"""Error from the OCaml SX kernel."""
def _sx_unescape(s: str) -> str:
"""Unescape an SX string literal (left-to-right, one pass)."""
out = []
i = 0
while i < len(s):
if s[i] == '\\' and i + 1 < len(s):
c = s[i + 1]
if c == 'n':
out.append('\n')
elif c == 'r':
out.append('\r')
elif c == 't':
out.append('\t')
elif c == '"':
out.append('"')
elif c == '\\':
out.append('\\')
else:
out.append(c)
i += 2
else:
out.append(s[i])
i += 1
return ''.join(out)
class OcamlSync:
"""Synchronous bridge to the OCaml sx_server subprocess."""
def __init__(self, binary: str | None = None):
self._binary = binary or os.environ.get("SX_OCAML_BIN") or _DEFAULT_BIN
self._proc: subprocess.Popen | None = None
self._epoch: int = 0
def _ensure(self):
if self._proc is not None and self._proc.poll() is None:
return
self._proc = subprocess.Popen(
[self._binary],
stdin=subprocess.PIPE,
stdout=subprocess.PIPE,
stderr=subprocess.PIPE,
)
self._epoch = 0
# Wait for (ready)
line = self._readline()
if line != "(ready)":
raise OcamlSyncError(f"Expected (ready), got: {line}")
def _send(self, command: str):
"""Send a command with epoch prefix."""
assert self._proc and self._proc.stdin
self._epoch += 1
self._proc.stdin.write(f"(epoch {self._epoch})\n".encode())
self._proc.stdin.write((command + "\n").encode())
self._proc.stdin.flush()
def _readline(self) -> str:
assert self._proc and self._proc.stdout
data = self._proc.stdout.readline()
if not data:
raise OcamlSyncError("OCaml subprocess died unexpectedly")
return data.decode().rstrip("\n")
def _strip_epoch(self, inner: str) -> str:
"""Strip leading epoch number from a response value: '42 value''value'."""
if inner and inner[0].isdigit():
space = inner.find(" ")
if space > 0:
return inner[space + 1:]
return "" # epoch only, no value
return inner
def _read_response(self) -> str:
"""Read a single response. Returns the value string or raises on error.
Handles epoch-tagged responses: (ok EPOCH), (ok EPOCH value),
(ok-len EPOCH N), (error EPOCH "msg").
"""
line = self._readline()
# Length-prefixed blob: (ok-len N) or (ok-len EPOCH N)
if line.startswith("(ok-len "):
parts = line[1:-1].split() # ["ok-len", ...]
n = int(parts[-1]) # last number is always byte count
assert self._proc and self._proc.stdout
data = self._proc.stdout.read(n)
self._proc.stdout.readline() # trailing newline
value = data.decode()
# Blob is SX-serialized — strip string quotes and unescape
if value.startswith('"') and value.endswith('"'):
value = _sx_unescape(value[1:-1])
return value
if line == "(ok)" or (line.startswith("(ok ") and line[4:-1].isdigit()):
return ""
if line.startswith("(ok-raw "):
inner = self._strip_epoch(line[8:-1])
return inner
if line.startswith("(ok "):
value = self._strip_epoch(line[4:-1])
if value.startswith('"') and value.endswith('"'):
value = _sx_unescape(value[1:-1])
return value
if line.startswith("(error "):
msg = self._strip_epoch(line[7:-1])
if msg.startswith('"') and msg.endswith('"'):
msg = _sx_unescape(msg[1:-1])
raise OcamlSyncError(msg)
raise OcamlSyncError(f"Unexpected response: {line}")
def eval(self, source: str) -> str:
"""Evaluate SX source, return result as string."""
self._ensure()
escaped = source.replace("\\", "\\\\").replace('"', '\\"').replace("\n", "\\n")
self._send(f'(eval "{escaped}")')
return self._read_response()
def load(self, path: str) -> str:
"""Load an .sx file into the kernel."""
self._ensure()
self._send(f'(load "{path}")')
return self._read_response()
def load_source(self, source: str) -> str:
"""Load SX source directly into the kernel."""
self._ensure()
escaped = source.replace("\\", "\\\\").replace('"', '\\"').replace("\n", "\\n")
self._send(f'(load-source "{escaped}")')
return self._read_response()
def stop(self):
if self._proc and self._proc.poll() is None:
self._proc.terminate()
self._proc.wait(timeout=5)
self._proc = None
# Singleton
_global: OcamlSync | None = None
def get_sync_bridge() -> OcamlSync:
global _global
if _global is None:
_global = OcamlSync()
return _global