OCaml bootstrapper Phase 2: HTML renderer, SX server, Python bridge

Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
This commit is contained in:
2026-03-15 23:28:48 +00:00
parent 16fa813d6d
commit 313f7d6be1
18 changed files with 2541 additions and 191 deletions

View File

@@ -31,7 +31,13 @@ from typing import Any
from .types import NIL, Component, Island, Keyword, Lambda, Macro, Symbol
from .parser import parse
import os as _os
if _os.environ.get("SX_USE_REF") == "1":
if _os.environ.get("SX_USE_OCAML") == "1":
# OCaml kernel bridge — render via persistent subprocess.
# html_render and _render_component are set up lazily since the bridge
# requires an async event loop. The sync sx() function falls back to
# the ref renderer; async callers use ocaml_bridge directly.
from .ref.sx_ref import render as html_render, render_html_component as _render_component
elif _os.environ.get("SX_USE_REF") == "1":
from .ref.sx_ref import render as html_render, render_html_component as _render_component
else:
from .html import render as html_render, _render_component
@@ -348,6 +354,12 @@ def reload_if_changed() -> None:
reload_logger.info("Reloaded %d file(s), components in %.1fms",
len(changed_files), (t1 - t0) * 1000)
# Invalidate OCaml bridge component cache so next render reloads
if _os.environ.get("SX_USE_OCAML") == "1":
from .ocaml_bridge import _bridge
if _bridge is not None:
_bridge._components_loaded = False
# Recompute render plans for all services that have pages
from .pages import _PAGE_REGISTRY, compute_page_render_plans
for svc in _PAGE_REGISTRY:
@@ -430,6 +442,9 @@ def finalize_components() -> None:
compute_all_io_refs(_COMPONENT_ENV, get_all_io_names())
_compute_component_hash()
# OCaml bridge loads components lazily on first render via
# OcamlBridge._ensure_components() — no sync needed here.
# ---------------------------------------------------------------------------
# sx() — render s-expression from Jinja template
@@ -482,7 +497,16 @@ async def sx_async(source: str, **kwargs: Any) -> str:
Use when the s-expression contains I/O nodes::
{{ sx_async('(frag "blog" "card" :slug "apple")') | safe }}
When SX_USE_OCAML=1, renders via the OCaml kernel subprocess which
yields io-requests back to Python for async fulfillment.
"""
if _os.environ.get("SX_USE_OCAML") == "1":
from .ocaml_bridge import get_bridge
bridge = await get_bridge()
ctx = dict(kwargs)
return await bridge.render(source, ctx=ctx)
from .resolver import resolve, RequestContext
env = dict(_COMPONENT_ENV)

408
shared/sx/ocaml_bridge.py Normal file
View File

@@ -0,0 +1,408 @@
"""
OCaml SX kernel ↔ Python coroutine bridge.
Manages a persistent OCaml subprocess (sx_server) that evaluates SX
expressions. When the OCaml kernel needs IO (database queries, service
calls), it yields an ``(io-request ...)`` back to Python, which fulfills
it asynchronously and sends an ``(io-response ...)`` back.
Usage::
bridge = OcamlBridge()
await bridge.start()
html = await bridge.render('(div (p "hello"))')
await bridge.stop()
"""
from __future__ import annotations
import asyncio
import logging
import os
from typing import Any
_logger = logging.getLogger("sx.ocaml")
# Default binary path — can be overridden via SX_OCAML_BIN env var
_DEFAULT_BIN = os.path.join(
os.path.dirname(__file__),
"../../hosts/ocaml/_build/default/bin/sx_server.exe",
)
class OcamlBridgeError(Exception):
"""Error from the OCaml SX kernel."""
class OcamlBridge:
"""Async bridge to a persistent OCaml SX subprocess."""
def __init__(self, binary: str | None = None):
self._binary = binary or os.environ.get("SX_OCAML_BIN") or _DEFAULT_BIN
self._proc: asyncio.subprocess.Process | None = None
self._lock = asyncio.Lock()
self._started = False
self._components_loaded = False
async def start(self) -> None:
"""Launch the OCaml subprocess and wait for (ready)."""
if self._started:
return
bin_path = os.path.abspath(self._binary)
if not os.path.isfile(bin_path):
raise FileNotFoundError(
f"OCaml SX server binary not found: {bin_path}\n"
f"Build with: cd hosts/ocaml && eval $(opam env) && dune build"
)
_logger.info("Starting OCaml SX kernel: %s", bin_path)
self._proc = await asyncio.create_subprocess_exec(
bin_path,
stdin=asyncio.subprocess.PIPE,
stdout=asyncio.subprocess.PIPE,
stderr=asyncio.subprocess.PIPE,
)
# Wait for (ready)
line = await self._readline()
if line != "(ready)":
raise OcamlBridgeError(f"Expected (ready), got: {line!r}")
self._started = True
# Verify engine identity
self._send("(ping)")
kind, engine = await self._read_response()
engine_name = engine if kind == "ok" else "unknown"
_logger.info("OCaml SX kernel ready (pid=%d, engine=%s)", self._proc.pid, engine_name)
async def stop(self) -> None:
"""Terminate the subprocess."""
if self._proc and self._proc.returncode is None:
self._proc.stdin.close()
try:
await asyncio.wait_for(self._proc.wait(), timeout=5.0)
except asyncio.TimeoutError:
self._proc.kill()
await self._proc.wait()
_logger.info("OCaml SX kernel stopped")
self._proc = None
self._started = False
async def ping(self) -> str:
"""Health check — returns engine name (e.g. 'ocaml-cek')."""
async with self._lock:
self._send("(ping)")
kind, value = await self._read_response()
return value or "" if kind == "ok" else ""
async def load(self, path: str) -> int:
"""Load an .sx file for side effects (defcomp, define, defmacro)."""
async with self._lock:
self._send(f'(load "{_escape(path)}")')
kind, value = await self._read_response()
if kind == "error":
raise OcamlBridgeError(f"load {path}: {value}")
return int(float(value)) if value else 0
async def load_source(self, source: str) -> int:
"""Evaluate SX source for side effects."""
async with self._lock:
self._send(f'(load-source "{_escape(source)}")')
kind, value = await self._read_response()
if kind == "error":
raise OcamlBridgeError(f"load-source: {value}")
return int(float(value)) if value else 0
async def eval(self, source: str) -> str:
"""Evaluate SX expression, return serialized result."""
async with self._lock:
self._send(f'(eval "{_escape(source)}")')
kind, value = await self._read_response()
if kind == "error":
raise OcamlBridgeError(f"eval: {value}")
return value or ""
async def render(
self,
source: str,
ctx: dict[str, Any] | None = None,
) -> str:
"""Render SX to HTML, handling io-requests via Python async IO."""
await self._ensure_components()
async with self._lock:
self._send(f'(render "{_escape(source)}")')
return await self._read_until_ok(ctx)
async def _ensure_components(self) -> None:
"""Load component definitions into the kernel on first use."""
if self._components_loaded:
return
self._components_loaded = True
try:
from .jinja_bridge import get_component_env, _CLIENT_LIBRARY_SOURCES
from .parser import serialize
from .types import Component, Island, Macro
env = get_component_env()
parts: list[str] = list(_CLIENT_LIBRARY_SOURCES)
for key, val in env.items():
if isinstance(val, Island):
ps = ["&key"] + list(val.params)
if val.has_children:
ps.extend(["&rest", "children"])
parts.append(f"(defisland ~{val.name} ({' '.join(ps)}) {serialize(val.body)})")
elif isinstance(val, Component):
ps = ["&key"] + list(val.params)
if val.has_children:
ps.extend(["&rest", "children"])
parts.append(f"(defcomp ~{val.name} ({' '.join(ps)}) {serialize(val.body)})")
elif isinstance(val, Macro):
ps = list(val.params)
if val.rest_param:
ps.extend(["&rest", val.rest_param])
parts.append(f"(defmacro {val.name} ({' '.join(ps)}) {serialize(val.body)})")
if parts:
source = "\n".join(parts)
await self.load_source(source)
_logger.info("Loaded %d definitions into OCaml kernel", len(parts))
except Exception as e:
_logger.error("Failed to load components into OCaml kernel: %s", e)
self._components_loaded = False # retry next time
async def reset(self) -> None:
"""Reset the kernel environment to pristine state."""
async with self._lock:
self._send("(reset)")
kind, value = await self._read_response()
if kind == "error":
raise OcamlBridgeError(f"reset: {value}")
# ------------------------------------------------------------------
# Internal protocol handling
# ------------------------------------------------------------------
def _send(self, line: str) -> None:
"""Write a line to the subprocess stdin."""
assert self._proc and self._proc.stdin
self._proc.stdin.write((line + "\n").encode())
async def _readline(self) -> str:
"""Read a line from the subprocess stdout."""
assert self._proc and self._proc.stdout
data = await self._proc.stdout.readline()
if not data:
# Process died — collect stderr for diagnostics
stderr = b""
if self._proc.stderr:
stderr = await self._proc.stderr.read()
raise OcamlBridgeError(
f"OCaml subprocess died unexpectedly. stderr: {stderr.decode(errors='replace')}"
)
return data.decode().rstrip("\n")
async def _read_response(self) -> tuple[str, str | None]:
"""Read a single (ok ...) or (error ...) response.
Returns (kind, value) where kind is "ok" or "error".
"""
line = await self._readline()
return _parse_response(line)
async def _read_until_ok(
self,
ctx: dict[str, Any] | None = None,
) -> str:
"""Read lines until (ok ...) or (error ...).
Handles (io-request ...) by fulfilling IO and sending (io-response ...).
"""
while True:
line = await self._readline()
if line.startswith("(io-request "):
result = await self._handle_io_request(line, ctx)
# Send response back to OCaml
self._send(f"(io-response {_serialize_for_ocaml(result)})")
continue
kind, value = _parse_response(line)
if kind == "error":
raise OcamlBridgeError(value or "Unknown error")
# kind == "ok"
return value or ""
async def _handle_io_request(
self,
line: str,
ctx: dict[str, Any] | None,
) -> Any:
"""Dispatch an io-request to the appropriate Python handler."""
from .parser import parse_all
# Parse the io-request
parsed = parse_all(line)
if not parsed or not isinstance(parsed[0], list):
raise OcamlBridgeError(f"Malformed io-request: {line}")
parts = parsed[0]
# parts = [Symbol("io-request"), name_str, ...args]
if len(parts) < 2:
raise OcamlBridgeError(f"Malformed io-request: {line}")
req_name = _to_str(parts[1])
args = parts[2:]
if req_name == "query":
return await self._io_query(args)
elif req_name == "action":
return await self._io_action(args)
elif req_name == "request-arg":
return self._io_request_arg(args)
elif req_name == "request-method":
return self._io_request_method()
elif req_name == "ctx":
return self._io_ctx(args, ctx)
else:
raise OcamlBridgeError(f"Unknown io-request type: {req_name}")
async def _io_query(self, args: list) -> Any:
"""Handle (io-request "query" service name params...)."""
from shared.infrastructure.internal import fetch_data
service = _to_str(args[0]) if len(args) > 0 else ""
query = _to_str(args[1]) if len(args) > 1 else ""
params = _to_dict(args[2]) if len(args) > 2 else {}
return await fetch_data(service, query, params)
async def _io_action(self, args: list) -> Any:
"""Handle (io-request "action" service name payload...)."""
from shared.infrastructure.internal import call_action
service = _to_str(args[0]) if len(args) > 0 else ""
action = _to_str(args[1]) if len(args) > 1 else ""
payload = _to_dict(args[2]) if len(args) > 2 else {}
return await call_action(service, action, payload)
def _io_request_arg(self, args: list) -> Any:
"""Handle (io-request "request-arg" name)."""
try:
from quart import request
name = _to_str(args[0]) if args else ""
return request.args.get(name)
except RuntimeError:
return None
def _io_request_method(self) -> str:
"""Handle (io-request "request-method")."""
try:
from quart import request
return request.method
except RuntimeError:
return "GET"
def _io_ctx(self, args: list, ctx: dict[str, Any] | None) -> Any:
"""Handle (io-request "ctx" key)."""
if ctx is None:
return None
key = _to_str(args[0]) if args else ""
return ctx.get(key)
# ------------------------------------------------------------------
# Module-level singleton
# ------------------------------------------------------------------
_bridge: OcamlBridge | None = None
async def get_bridge() -> OcamlBridge:
"""Get or create the singleton bridge instance."""
global _bridge
if _bridge is None:
_bridge = OcamlBridge()
if not _bridge._started:
await _bridge.start()
return _bridge
# ------------------------------------------------------------------
# Helpers
# ------------------------------------------------------------------
def _escape(s: str) -> str:
"""Escape a string for embedding in an SX string literal."""
return s.replace("\\", "\\\\").replace('"', '\\"').replace("\n", "\\n").replace("\r", "\\r").replace("\t", "\\t")
def _parse_response(line: str) -> tuple[str, str | None]:
"""Parse an (ok ...) or (error ...) response line.
Returns (kind, value) tuple.
"""
line = line.strip()
if line == "(ok)":
return ("ok", None)
if line.startswith("(ok "):
value = line[4:-1] # strip (ok and )
# If the value is a quoted string, unquote it
if value.startswith('"') and value.endswith('"'):
value = _unescape(value[1:-1])
return ("ok", value)
if line.startswith("(error "):
msg = line[7:-1]
if msg.startswith('"') and msg.endswith('"'):
msg = _unescape(msg[1:-1])
return ("error", msg)
return ("error", f"Unexpected response: {line}")
def _unescape(s: str) -> str:
"""Unescape an SX string literal."""
return (
s.replace("\\n", "\n")
.replace("\\r", "\r")
.replace("\\t", "\t")
.replace('\\"', '"')
.replace("\\\\", "\\")
)
def _to_str(val: Any) -> str:
"""Convert an SX parsed value to a Python string."""
if isinstance(val, str):
return val
if hasattr(val, "name"):
return val.name
return str(val)
def _to_dict(val: Any) -> dict:
"""Convert an SX parsed value to a Python dict."""
if isinstance(val, dict):
return val
return {}
def _serialize_for_ocaml(val: Any) -> str:
"""Serialize a Python value to SX text for sending to OCaml."""
if val is None:
return "nil"
if isinstance(val, bool):
return "true" if val else "false"
if isinstance(val, (int, float)):
if isinstance(val, float) and val == int(val):
return str(int(val))
return str(val)
if isinstance(val, str):
return f'"{_escape(val)}"'
if isinstance(val, (list, tuple)):
items = " ".join(_serialize_for_ocaml(v) for v in val)
return f"(list {items})"
if isinstance(val, dict):
pairs = " ".join(
f":{k} {_serialize_for_ocaml(v)}" for k, v in val.items()
)
return "{" + pairs + "}"
return f'"{_escape(str(val))}"'

View File

@@ -0,0 +1,220 @@
"""Tests for the OCaml SX bridge."""
import asyncio
import os
import sys
import unittest
# Add project root to path
_project_root = os.path.abspath(os.path.join(os.path.dirname(__file__), "../../.."))
if _project_root not in sys.path:
sys.path.insert(0, _project_root)
from shared.sx.ocaml_bridge import OcamlBridge, OcamlBridgeError, _escape, _parse_response, _serialize_for_ocaml
class TestHelpers(unittest.TestCase):
"""Test helper functions (no subprocess needed)."""
def test_escape_basic(self):
self.assertEqual(_escape('hello'), 'hello')
self.assertEqual(_escape('say "hi"'), 'say \\"hi\\"')
self.assertEqual(_escape('a\\b'), 'a\\\\b')
self.assertEqual(_escape('line\nbreak'), 'line\\nbreak')
def test_parse_response_ok_empty(self):
self.assertEqual(_parse_response("(ok)"), ("ok", None))
def test_parse_response_ok_number(self):
self.assertEqual(_parse_response("(ok 3)"), ("ok", "3"))
def test_parse_response_ok_string(self):
kind, val = _parse_response('(ok "<div>hi</div>")')
self.assertEqual(kind, "ok")
self.assertEqual(val, "<div>hi</div>")
def test_parse_response_error(self):
kind, val = _parse_response('(error "something broke")')
self.assertEqual(kind, "error")
self.assertEqual(val, "something broke")
def test_serialize_none(self):
self.assertEqual(_serialize_for_ocaml(None), "nil")
def test_serialize_bool(self):
self.assertEqual(_serialize_for_ocaml(True), "true")
self.assertEqual(_serialize_for_ocaml(False), "false")
def test_serialize_number(self):
self.assertEqual(_serialize_for_ocaml(42), "42")
self.assertEqual(_serialize_for_ocaml(3.14), "3.14")
def test_serialize_string(self):
self.assertEqual(_serialize_for_ocaml("hello"), '"hello"')
self.assertEqual(_serialize_for_ocaml('say "hi"'), '"say \\"hi\\""')
def test_serialize_list(self):
self.assertEqual(_serialize_for_ocaml([1, 2, 3]), "(list 1 2 3)")
def test_serialize_dict(self):
result = _serialize_for_ocaml({"a": 1})
self.assertEqual(result, "{:a 1}")
class TestBridge(unittest.IsolatedAsyncioTestCase):
"""Integration tests — require the OCaml binary to be built."""
@classmethod
def setUpClass(cls):
# Check if binary exists
from shared.sx.ocaml_bridge import _DEFAULT_BIN
bin_path = os.path.abspath(_DEFAULT_BIN)
if not os.path.isfile(bin_path):
raise unittest.SkipTest(
f"OCaml binary not found at {bin_path}. "
f"Build with: cd hosts/ocaml && eval $(opam env) && dune build"
)
async def asyncSetUp(self):
self.bridge = OcamlBridge()
await self.bridge.start()
async def asyncTearDown(self):
await self.bridge.stop()
async def test_ping(self):
self.assertTrue(await self.bridge.ping())
async def test_eval_arithmetic(self):
result = await self.bridge.eval("(+ 1 2)")
self.assertEqual(result, "3")
async def test_eval_string(self):
result = await self.bridge.eval('(str "hello" " " "world")')
self.assertIn("hello world", result)
async def test_render_simple(self):
html = await self.bridge.render('(div (p "hello"))')
self.assertEqual(html, "<div><p>hello</p></div>")
async def test_render_attrs(self):
html = await self.bridge.render('(div :class "card" (p "hi"))')
self.assertIn('class="card"', html)
self.assertIn("<p>hi</p>", html)
async def test_render_void_element(self):
html = await self.bridge.render("(br)")
self.assertEqual(html, "<br />")
async def test_load_source_defcomp(self):
count = await self.bridge.load_source(
'(defcomp ~test-card (&key title) (div :class "card" (h2 title)))'
)
self.assertEqual(count, 1)
html = await self.bridge.render('(~test-card :title "Hello")')
self.assertIn("Hello", html)
self.assertIn("card", html)
async def test_reset(self):
await self.bridge.load_source("(define x 42)")
result = await self.bridge.eval("x")
self.assertEqual(result, "42")
await self.bridge.reset()
with self.assertRaises(OcamlBridgeError):
await self.bridge.eval("x")
async def test_render_conditional(self):
html = await self.bridge.render('(if true (p "yes") (p "no"))')
self.assertEqual(html, "<p>yes</p>")
async def test_render_let(self):
html = await self.bridge.render('(let (x "hi") (p x))')
self.assertEqual(html, "<p>hi</p>")
async def test_render_map(self):
html = await self.bridge.render(
"(map (lambda (x) (li x)) (list \"a\" \"b\" \"c\"))"
)
self.assertEqual(html, "<li>a</li><li>b</li><li>c</li>")
async def test_render_fragment(self):
html = await self.bridge.render('(<> (p "a") (p "b"))')
self.assertEqual(html, "<p>a</p><p>b</p>")
async def test_eval_error(self):
with self.assertRaises(OcamlBridgeError):
await self.bridge.eval("(undefined-symbol-xyz)")
async def test_render_component_with_children(self):
await self.bridge.load_source(
'(defcomp ~wrapper (&rest children) (div :class "wrap" children))'
)
html = await self.bridge.render('(~wrapper (p "inside"))')
self.assertIn("wrap", html)
self.assertIn("<p>inside</p>", html)
async def test_render_macro(self):
await self.bridge.load_source(
"(defmacro unless (cond &rest body) (list 'if (list 'not cond) (cons 'do body)))"
)
html = await self.bridge.render('(unless false (p "shown"))')
self.assertEqual(html, "<p>shown</p>")
# ------------------------------------------------------------------
# ListRef regression tests — the `list` primitive returns ListRef
# (mutable), not List (immutable). Macro expansions that construct
# AST via `list` produce ListRef nodes. The renderer must handle
# both List and ListRef at every structural match point.
# ------------------------------------------------------------------
async def test_render_macro_generates_cond(self):
"""Macro that programmatically builds a (cond ...) with list."""
await self.bridge.load_source(
"(defmacro pick (x) "
" (list 'cond "
" (list (list '= x 1) '(p \"one\")) "
" (list (list '= x 2) '(p \"two\")) "
" (list ':else '(p \"other\"))))"
)
html = await self.bridge.render("(pick 2)")
self.assertEqual(html, "<p>two</p>")
async def test_render_macro_generates_let(self):
"""Macro that programmatically builds a (let ...) with list."""
await self.bridge.load_source(
"(defmacro with-greeting (name &rest body) "
" (list 'let (list (list 'greeting (list 'str \"Hello \" name))) "
" (cons 'do body)))"
)
html = await self.bridge.render('(with-greeting "World" (p greeting))')
self.assertEqual(html, "<p>Hello World</p>")
async def test_render_macro_nested_html_tags(self):
"""Macro expansion containing nested HTML tags via list."""
await self.bridge.load_source(
"(defmacro card (title &rest body) "
" (list 'div ':class \"card\" "
" (list 'h2 title) "
" (cons 'do body)))"
)
html = await self.bridge.render('(card "Title" (p "content"))')
self.assertIn('<div class="card">', html)
self.assertIn("<h2>Title</h2>", html)
self.assertIn("<p>content</p>", html)
async def test_render_eval_returns_listref(self):
"""Values created at runtime via (list ...) are ListRef."""
await self.bridge.load_source(
"(define make-items (lambda () (list "
' (list "a") (list "b") (list "c"))))'
)
html = await self.bridge.render(
"(ul (map (lambda (x) (li (first x))) (make-items)))"
)
self.assertIn("<li>a</li>", html)
self.assertIn("<li>b</li>", html)
self.assertIn("<li>c</li>", html)
if __name__ == "__main__":
unittest.main()