Files
rose-ash/shared/sx/ocaml_bridge.py
giles 57cffb8bcc Fix isomorphic SSR: revert inline opcodes, add named let compilation, fix cookie decode
Three bugs broke island SSR rendering of the home stepper widget:

1. Inline VM opcodes (OP_ADD..OP_DEC) broke JIT-compiled functions.
   The compiler emitted single-byte opcodes for first/rest/len/= etc.
   that produced wrong results in complex recursive code (sx-parse
   returned nil, split-tag produced 1 step instead of 16). Reverted
   compiler to use CALL_PRIM for all primitives. VM opcode handlers
   kept for future use.

2. Named let (let loop ((x init)) body) had no compiler support —
   silently produced broken bytecode. Added desugaring to letrec.

3. URL-encoded cookie values not decoded server-side. Client set-cookie
   uses encodeURIComponent but Werkzeug doesn't decode cookie values.
   Added unquote() in bridge cookie injection.

Also: call-lambda used eval_expr which copies Dict values (signals),
breaking mutations through aser lambda calls. Switched to cek_call.

Also: stepper preview now includes ~cssx/tw spreads for SSR styling.

Tests: 1317 JS, 1114 OCaml, 26 integration (2 pre-existing failures)

Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
2026-03-24 22:32:51 +00:00

974 lines
40 KiB
Python

"""
OCaml SX kernel ↔ Python coroutine bridge.
Manages a persistent OCaml subprocess (sx_server) that evaluates SX
expressions. When the OCaml kernel needs IO (database queries, service
calls), it yields an ``(io-request ...)`` back to Python, which fulfills
it asynchronously and sends an ``(io-response ...)`` back.
Usage::
bridge = OcamlBridge()
await bridge.start()
html = await bridge.render('(div (p "hello"))')
await bridge.stop()
"""
from __future__ import annotations
import asyncio
import logging
import os
from typing import Any
_logger = logging.getLogger("sx.ocaml")
# Default binary path — can be overridden via SX_OCAML_BIN env var
_DEFAULT_BIN = os.path.join(
os.path.dirname(__file__),
"../../hosts/ocaml/_build/default/bin/sx_server.exe",
)
class OcamlBridgeError(Exception):
"""Error from the OCaml SX kernel."""
class OcamlBridge:
"""Async bridge to a persistent OCaml SX subprocess."""
def __init__(self, binary: str | None = None):
self._binary = binary or os.environ.get("SX_OCAML_BIN") or _DEFAULT_BIN
self._proc: asyncio.subprocess.Process | None = None
self._lock = asyncio.Lock()
self._in_io_handler = False # re-entrancy guard
self._started = False
self._components_loaded = False
self._helpers_injected = False
self._io_cache: dict[tuple, Any] = {} # (name, args...) → cached result
self._epoch: int = 0 # request epoch — monotonically increasing
async def start(self) -> None:
"""Launch the OCaml subprocess and wait for (ready)."""
if self._started:
return
bin_path = os.path.abspath(self._binary)
if not os.path.isfile(bin_path):
raise FileNotFoundError(
f"OCaml SX server binary not found: {bin_path}\n"
f"Build with: cd hosts/ocaml && eval $(opam env) && dune build"
)
_logger.info("Starting OCaml SX kernel: %s", bin_path)
import sys
self._proc = await asyncio.create_subprocess_exec(
bin_path,
stdin=asyncio.subprocess.PIPE,
stdout=asyncio.subprocess.PIPE,
stderr=sys.stderr, # kernel timing/debug to container logs
limit=10 * 1024 * 1024, # 10MB readline buffer for large spec data
)
# Wait for (ready)
line = await self._readline()
if line != "(ready)":
raise OcamlBridgeError(f"Expected (ready), got: {line!r}")
self._started = True
# Verify engine identity
await self._send_command("(ping)")
kind, engine = await self._read_response()
engine_name = engine if kind == "ok" else "unknown"
_logger.info("OCaml SX kernel ready (pid=%d, engine=%s)", self._proc.pid, engine_name)
async def stop(self) -> None:
"""Terminate the subprocess."""
if self._proc and self._proc.returncode is None:
self._proc.stdin.close()
try:
await asyncio.wait_for(self._proc.wait(), timeout=5.0)
except asyncio.TimeoutError:
self._proc.kill()
await self._proc.wait()
_logger.info("OCaml SX kernel stopped")
self._proc = None
self._started = False
async def _restart(self) -> None:
"""Kill and restart the OCaml subprocess to recover from pipe desync."""
_logger.warning("Restarting OCaml SX kernel (pipe recovery)")
if self._proc and self._proc.returncode is None:
self._proc.kill()
await self._proc.wait()
self._proc = None
self._started = False
self._components_loaded = False
self._helpers_injected = False
await self.start()
async def ping(self) -> str:
"""Health check — returns engine name (e.g. 'ocaml-cek')."""
async with self._lock:
await self._send_command("(ping)")
kind, value = await self._read_response()
return value or "" if kind == "ok" else ""
async def load(self, path: str) -> int:
"""Load an .sx file for side effects (defcomp, define, defmacro)."""
async with self._lock:
await self._send_command(f'(load "{_escape(path)}")')
value = await self._read_until_ok(ctx=None)
return int(float(value)) if value else 0
async def load_source(self, source: str) -> int:
"""Evaluate SX source for side effects."""
async with self._lock:
await self._send_command(f'(load-source "{_escape(source)}")')
value = await self._read_until_ok(ctx=None)
return int(float(value)) if value else 0
async def eval(self, source: str, ctx: dict[str, Any] | None = None) -> str:
"""Evaluate SX expression, return serialized result.
Supports io-requests (helper calls, query, action, etc.) via the
coroutine bridge, just like render().
"""
await self._ensure_components()
async with self._lock:
await self._send_command('(eval-blob)')
await self._send_blob(source)
return await self._read_until_ok(ctx)
async def render(
self,
source: str,
ctx: dict[str, Any] | None = None,
) -> str:
"""Render SX to HTML, handling io-requests via Python async IO."""
await self._ensure_components()
async with self._lock:
await self._send_command(f'(render "{_escape(source)}")')
return await self._read_until_ok(ctx)
async def aser(self, source: str, ctx: dict[str, Any] | None = None) -> str:
"""Evaluate SX and return SX wire format, handling io-requests."""
await self._ensure_components()
async with self._lock:
await self._send_command('(aser-blob)')
await self._send_blob(source)
return await self._read_until_ok(ctx)
async def aser_slot(self, source: str, ctx: dict[str, Any] | None = None) -> str:
"""Like aser() but expands ALL components server-side.
Equivalent to Python's async_eval_slot_to_sx — used for layout
slots where component bodies need server-side IO evaluation.
"""
await self._ensure_components()
async with self._lock:
# Inject helpers inside the lock to avoid pipe desync —
# a separate lock acquisition could let another coroutine
# interleave commands between injection and aser-slot.
await self._inject_helpers_locked()
await self._send_command('(aser-slot-blob)')
await self._send_blob(source)
return await self._read_until_ok(ctx)
_shell_statics_injected: bool = False
async def _inject_shell_statics_locked(self) -> None:
"""Inject cached shell static data into kernel. MUST hold lock."""
if self._shell_statics_injected:
return
from .helpers import _get_shell_static
try:
static = _get_shell_static()
except Exception:
return # not ready yet (no app context)
# Only inject small, safe values as kernel variables.
# Large/complex blobs use placeholder tokens at render time.
for key in ("component_hash", "sx_css_classes", "asset_url",
"sx_js_hash", "body_js_hash"):
val = static.get(key) or ""
var = f"__shell-{key.replace('_', '-')}"
defn = f'(define {var} "{_escape(str(val))}")'
try:
await self._send_command(f'(load-source "{_escape(defn)}")')
await self._read_until_ok(ctx=None)
except OcamlBridgeError as e:
_logger.warning("Shell static inject failed for %s: %s", key, e)
# List/nil values
for key in ("head_scripts", "body_scripts"):
val = static.get(key)
var = f"__shell-{key.replace('_', '-')}"
if val is None:
defn = f'(define {var} nil)'
elif isinstance(val, list):
items = " ".join(f'"{_escape(str(v))}"' for v in val)
defn = f'(define {var} (list {items}))'
else:
defn = f'(define {var} "{_escape(str(val))}")'
try:
await self._send_command(f'(load-source "{_escape(defn)}")')
await self._read_until_ok(ctx=None)
except OcamlBridgeError as e:
_logger.warning("Shell static inject failed for %s: %s", key, e)
self._shell_statics_injected = True
_logger.info("Injected shell statics into OCaml kernel")
async def _inject_request_cookies_locked(self) -> None:
"""Send current request cookies to kernel for get-cookie primitive."""
try:
from quart import request
cookies = request.cookies
except Exception:
return # no request context (CLI mode, tests)
if not cookies:
return
# Build SX dict: {:name1 "val1" :name2 "val2"}
# Cookie values may be URL-encoded (client set-cookie uses
# encodeURIComponent) — decode before sending to kernel.
from urllib.parse import unquote
pairs = []
for k, v in cookies.items():
pairs.append(f':{k} "{_escape(unquote(str(v)))}"')
if pairs:
cmd = f'(set-request-cookies {{{" ".join(pairs)}}})'
try:
await self._send_command(cmd)
await self._read_until_ok(ctx=None)
except OcamlBridgeError as e:
_logger.debug("Cookie inject failed: %s", e)
async def sx_page_full(
self,
page_source: str,
shell_kwargs: dict[str, Any],
ctx: dict[str, Any] | None = None,
) -> str:
"""Render full page HTML in one OCaml call: aser-slot + shell render.
Static data (component_defs, CSS, pages_sx) is pre-injected as
kernel vars on first call. Per-request command sends only small
values (title, csrf) + references to the kernel vars.
"""
await self._ensure_components()
async with self._lock:
await self._inject_helpers_locked()
await self._inject_shell_statics_locked()
# Send request cookies so get-cookie works during SSR
await self._inject_request_cookies_locked()
# Large/complex blobs use placeholders — OCaml renders the shell
# with short tokens; Python splices in the real values post-render.
# This avoids piping large strings or strings with special chars
# through the SX parser.
PLACEHOLDER_KEYS = {"component_defs", "pages_sx", "init_sx",
"sx_css", "inline_css", "inline_head_js"}
placeholders = {}
static_keys = {"component_hash", "sx_css_classes", "asset_url",
"sx_js_hash", "body_js_hash",
"head_scripts", "body_scripts"}
# page_source is SX wire format that may contain \" escapes.
# Send via binary blob protocol to avoid double-escaping
# through the SX string parser round-trip.
parts = ['(sx-page-full-blob']
for key, val in shell_kwargs.items():
k = key.replace("_", "-")
if key in PLACEHOLDER_KEYS:
token = f"__SLOT_{key.upper()}__"
placeholders[token] = str(val) if val else ""
parts.append(f' :{k} "{token}"')
elif key in static_keys:
parts.append(f' :{k} __shell-{k}')
elif val is None:
parts.append(f' :{k} nil')
elif isinstance(val, bool):
parts.append(f' :{k} {"true" if val else "false"}')
elif isinstance(val, list):
items = " ".join(f'"{_escape(str(v))}"' for v in val)
parts.append(f' :{k} ({items})')
else:
parts.append(f' :{k} "{_escape(str(val))}"')
parts.append(")")
cmd = "".join(parts)
await self._send_command(cmd)
# Send page source as binary blob (avoids string-escape issues)
await self._send_blob(page_source)
html = await self._read_until_ok(ctx)
# Splice in large blobs
for token, blob in placeholders.items():
html = html.replace(token, blob)
return html
async def _inject_helpers_locked(self) -> None:
"""Inject page helpers into the kernel. MUST be called with lock held."""
if self._helpers_injected:
return
self._helpers_injected = True
try:
from .pages import get_page_helpers
import inspect
helpers = get_page_helpers("sx")
if not helpers:
self._helpers_injected = False
return
count = 0
for name, fn in helpers.items():
if callable(fn) and not name.startswith("~"):
try:
sig = inspect.signature(fn)
nargs = sum(1 for p in sig.parameters.values()
if p.kind in (p.POSITIONAL_ONLY, p.POSITIONAL_OR_KEYWORD))
except (ValueError, TypeError):
nargs = 2
nargs = max(nargs, 1)
param_names = " ".join(chr(97 + i) for i in range(nargs))
arg_list = " ".join(chr(97 + i) for i in range(nargs))
sx_def = f'(define {name} (fn ({param_names}) (helper "{name}" {arg_list})))'
try:
await self._send_command(f'(load-source "{_escape(sx_def)}")')
await self._read_until_ok(ctx=None)
count += 1
except OcamlBridgeError:
pass
_logger.info("Injected %d page helpers into OCaml kernel", count)
except Exception as e:
_logger.warning("Helper injection failed: %s", e)
self._helpers_injected = False
async def _compile_adapter_module(self) -> None:
"""Compile adapter-sx.sx to bytecode and load as a VM module.
Previously used Python's sx_ref.py evaluator for compilation.
Now the OCaml kernel handles JIT compilation natively — this method
is a no-op. The kernel's own JIT hook compiles functions on first call.
"""
_logger.info("Adapter module compilation delegated to OCaml kernel JIT")
async def _ensure_components(self) -> None:
"""Load all .sx source files into the kernel on first use.
Errors during loading are handled gracefully — IO responses are
always sent back to keep the pipe clean.
"""
if self._components_loaded:
return
self._components_loaded = True
try:
from .jinja_bridge import _watched_dirs, _dirs_from_cache
import glob
# Skip patterns — files that use constructs not available in the kernel
skip_names = {"boundary.sx", "forms.sx"}
skip_dirs = {"tests"}
# Collect files to load
all_files: list[str] = []
# Spec files needed by aser + bytecode compiler
spec_dir = os.path.join(os.path.dirname(__file__), "../../spec")
for spec_file in ["parser.sx", "render.sx", "bytecode.sx", "compiler.sx"]:
path = os.path.normpath(os.path.join(spec_dir, spec_file))
if os.path.isfile(path):
all_files.append(path)
# All directories loaded into the Python env
all_dirs = list(set(_watched_dirs) | _dirs_from_cache)
# Isomorphic libraries: signals, rendering, freeze scopes, web forms
web_dir = os.path.join(os.path.dirname(__file__), "../../web")
if os.path.isdir(web_dir):
for web_file in ["signals.sx", "adapter-html.sx", "adapter-sx.sx",
"web-forms.sx"]:
path = os.path.normpath(os.path.join(web_dir, web_file))
if os.path.isfile(path):
all_files.append(path)
# Spec library files (loaded after adapters)
for spec_lib in ["freeze.sx"]:
path = os.path.normpath(os.path.join(spec_dir, spec_lib))
if os.path.isfile(path):
all_files.append(path)
for directory in sorted(all_dirs):
files = sorted(
glob.glob(os.path.join(directory, "**", "*.sx"), recursive=True)
)
for filepath in files:
basename = os.path.basename(filepath)
# Skip known-bad files
if basename in skip_names:
continue
# Skip test and handler directories
parts = filepath.replace("\\", "/").split("/")
if any(d in skip_dirs for d in parts):
continue
all_files.append(filepath)
# Load all files under a single lock
count = 0
skipped = 0
async with self._lock:
for filepath in all_files:
try:
await self._send_command(f'(load "{_escape(filepath)}")')
value = await self._read_until_ok(ctx=None)
# Response may be a number (count) or a value — just count files
count += 1
except OcamlBridgeError as e:
skipped += 1
_logger.warning("OCaml load skipped %s: %s",
filepath, e)
# SSR overrides: effect is a no-op on the server (prevents
# reactive loops during island SSR — effects are DOM side-effects)
try:
noop_dispose = '(fn () nil)'
await self._send_command(f'(load-source "(define effect (fn (f) {noop_dispose}))")')
await self._read_until_ok(ctx=None)
except OcamlBridgeError:
pass
# Register JIT hook — lambdas compile on first call
try:
await self._send_command('(vm-compile-adapter)')
await self._read_until_ok(ctx=None)
_logger.info("JIT hook registered — lambdas compile on first call")
except OcamlBridgeError as e:
_logger.warning("JIT hook registration skipped: %s", e)
_logger.info("Loaded %d definitions from .sx files into OCaml kernel (%d skipped)",
count, skipped)
except Exception as e:
_logger.error("Failed to load .sx files into OCaml kernel: %s", e)
self._components_loaded = False # retry next time
async def inject_page_helpers(self, helpers: dict) -> None:
"""Register page helpers as IO-routing definitions in the kernel.
Each helper becomes a function that yields (io-request "helper" name ...),
routing the call back to Python via the coroutine bridge.
"""
await self._ensure_components()
async with self._lock:
count = 0
for name, fn in helpers.items():
if callable(fn) and not name.startswith("~"):
sx_def = f'(define {name} (fn (&rest args) (apply helper (concat (list "{name}") args))))'
try:
await self._send_command(f'(load-source "{_escape(sx_def)}")')
await self._read_until_ok(ctx=None)
count += 1
except OcamlBridgeError:
pass # non-fatal
if count:
_logger.info("Injected %d page helpers into OCaml kernel", count)
async def reset(self) -> None:
"""Reset the kernel environment to pristine state."""
async with self._lock:
await self._send_command("(reset)")
kind, value = await self._read_response()
if kind == "error":
raise OcamlBridgeError(f"reset: {value}")
# ------------------------------------------------------------------
# Internal protocol handling
# ------------------------------------------------------------------
async def _send(self, line: str) -> None:
"""Write a line to the subprocess stdin and flush."""
if self._in_io_handler:
raise OcamlBridgeError(
f"Re-entrant bridge call from IO handler: {line[:80]}. "
f"IO handlers must not call the bridge — use Python-only code."
)
assert self._proc and self._proc.stdin
_logger.debug("SEND: %s", line[:120])
self._proc.stdin.write((line + "\n").encode())
await self._proc.stdin.drain()
async def _send_command(self, line: str) -> None:
"""Send a command with a fresh epoch prefix.
Increments the epoch counter and sends (epoch N) before the
actual command. The OCaml kernel tags all responses with this
epoch so stale messages from previous requests are discarded.
"""
self._epoch += 1
assert self._proc and self._proc.stdin
_logger.debug("EPOCH %d SEND: %s", self._epoch, line[:120])
self._proc.stdin.write(f"(epoch {self._epoch})\n".encode())
self._proc.stdin.write((line + "\n").encode())
await self._proc.stdin.drain()
async def _send_blob(self, data: str) -> None:
"""Send a length-prefixed binary blob to the subprocess.
Protocol: sends "(blob N)\\n" followed by exactly N bytes, then "\\n".
The OCaml side reads the length, then reads exactly N bytes.
This avoids string-escape round-trip issues for SX wire format.
"""
assert self._proc and self._proc.stdin
encoded = data.encode()
self._proc.stdin.write(f"(blob {len(encoded)})\n".encode())
self._proc.stdin.write(encoded)
self._proc.stdin.write(b"\n")
await self._proc.stdin.drain()
async def _readline(self) -> str:
"""Read a line from the subprocess stdout."""
assert self._proc and self._proc.stdout
data = await self._proc.stdout.readline()
if not data:
# Process died
raise OcamlBridgeError(
"OCaml subprocess died unexpectedly (check container stderr)"
)
line = data.decode().rstrip("\n")
_logger.debug("RECV: %s", line[:120])
return line
async def _read_response(self) -> tuple[str, str | None]:
"""Read a single (ok ...) or (error ...) response.
Returns (kind, value) where kind is "ok" or "error".
Discards stale epoch messages.
"""
while True:
line = await self._readline()
if not self._is_current_epoch(line):
_logger.debug("Discarding stale response: %s", line[:80])
if line.startswith("(ok-len "):
parts = line[1:-1].split()
if len(parts) >= 3:
n = int(parts[-1])
assert self._proc and self._proc.stdout
await self._proc.stdout.readexactly(n)
await self._proc.stdout.readline()
continue
# Length-prefixed blob: (ok-len EPOCH N) or (ok-len N)
if line.startswith("(ok-len "):
parts = line[1:-1].split()
n = int(parts[-1])
assert self._proc and self._proc.stdout
data = await self._proc.stdout.readexactly(n)
await self._proc.stdout.readline() # trailing newline
return ("ok", data.decode())
return _parse_response(line)
def _is_current_epoch(self, line: str) -> bool:
"""Check if a response line belongs to the current epoch.
Lines tagged with a stale epoch are discarded. Untagged lines
(from a kernel that predates the epoch protocol) are accepted.
"""
# Extract epoch number from known tagged formats:
# (ok EPOCH ...), (error EPOCH ...), (ok-len EPOCH N),
# (io-request EPOCH ...), (io-done EPOCH N)
import re
m = re.match(r'\((?:ok|error|ok-len|ok-raw|io-request|io-done)\s+(\d+)\b', line)
if m:
return int(m.group(1)) == self._epoch
# Untagged (legacy) — accept
return True
async def _read_until_ok(
self,
ctx: dict[str, Any] | None = None,
) -> str:
"""Read lines until (ok ...) or (error ...).
Handles IO requests in two modes:
- Legacy (blocking): single io-request → immediate io-response
- Batched: collect io-requests until (io-done N), process ALL
concurrently with asyncio.gather, send responses in order
Lines tagged with a stale epoch are silently discarded, making
pipe desync from previous failed requests impossible.
"""
import asyncio
pending_batch: list[str] = []
while True:
line = await self._readline()
# Discard stale epoch messages
if not self._is_current_epoch(line):
_logger.debug("Discarding stale epoch message: %s", line[:80])
# If it's a stale ok-len, drain the blob bytes too
if line.startswith("(ok-len "):
parts = line[1:-1].split()
if len(parts) >= 3:
n = int(parts[2])
assert self._proc and self._proc.stdout
await self._proc.stdout.readexactly(n)
await self._proc.stdout.readline()
continue
if line.startswith("(io-request "):
# New format: (io-request EPOCH ...) or (io-request EPOCH ID ...)
# Strip epoch from the line for IO dispatch
after = line[len("(io-request "):].lstrip()
# Skip epoch number if present
if after and after[0].isdigit():
# Could be epoch or batch ID — check for second number
parts = after.split(None, 2)
if len(parts) >= 2 and parts[1][0].isdigit():
# (io-request EPOCH ID "name" args...) — batched with epoch
pending_batch.append(line)
continue
elif len(parts) >= 2 and parts[1].startswith('"'):
# (io-request EPOCH "name" args...) — legacy with epoch
try:
result = await self._handle_io_request(line, ctx)
await self._send(
f"(io-response {self._epoch} {_serialize_for_ocaml(result)})")
except Exception as e:
_logger.warning("IO request failed, sending nil: %s", e)
await self._send(f"(io-response {self._epoch} nil)")
continue
else:
# Old format: (io-request ID "name" ...) — batched, no epoch
pending_batch.append(line)
continue
# Legacy blocking mode — respond immediately
try:
result = await self._handle_io_request(line, ctx)
await self._send(
f"(io-response {self._epoch} {_serialize_for_ocaml(result)})")
except Exception as e:
_logger.warning("IO request failed, sending nil: %s", e)
await self._send(f"(io-response {self._epoch} nil)")
continue
if line.startswith("(io-done "):
# Batch complete — process all pending IO concurrently
tasks = [self._handle_io_request(req, ctx)
for req in pending_batch]
results = await asyncio.gather(*tasks, return_exceptions=True)
for result in results:
if isinstance(result, BaseException):
_logger.warning("Batched IO failed: %s", result)
await self._send(f"(io-response {self._epoch} nil)")
else:
await self._send(
f"(io-response {self._epoch} {_serialize_for_ocaml(result)})")
pending_batch = []
continue
# Length-prefixed blob: (ok-len EPOCH N) or (ok-len N)
if line.startswith("(ok-len "):
parts = line[1:-1].split() # ["ok-len", epoch, n] or ["ok-len", n]
n = int(parts[-1]) # last number is always byte count
assert self._proc and self._proc.stdout
data = await self._proc.stdout.readexactly(n)
# Read trailing newline
await self._proc.stdout.readline()
return data.decode()
kind, value = _parse_response(line)
if kind == "error":
raise OcamlBridgeError(value or "Unknown error")
# kind == "ok"
return value or ""
async def _handle_io_request(
self,
line: str,
ctx: dict[str, Any] | None,
) -> Any:
"""Dispatch an io-request to the appropriate Python handler.
IO handlers MUST NOT call the bridge (eval/aser/render) — doing so
would deadlock since the lock is already held. The _in_io_handler
flag triggers an immediate error if this rule is violated.
"""
self._in_io_handler = True
try:
return await self._dispatch_io(line, ctx)
finally:
self._in_io_handler = False
async def _dispatch_io(
self,
line: str,
ctx: dict[str, Any] | None,
) -> Any:
"""Inner dispatch for IO requests."""
from .parser import parse_all
# Parse the io-request
parsed = parse_all(line)
if not parsed or not isinstance(parsed[0], list):
raise OcamlBridgeError(f"Malformed io-request: {line}")
parts = parsed[0]
# Legacy: [Symbol("io-request"), name_str, ...args]
# Batched: [Symbol("io-request"), id_num, name_str, ...args]
if len(parts) < 2:
raise OcamlBridgeError(f"Malformed io-request: {line}")
# Skip numeric batch ID if present
offset = 1
if isinstance(parts[1], (int, float)):
offset = 2
req_name = _to_str(parts[offset])
args = parts[offset + 1:]
if req_name == "query":
return await self._io_query(args)
elif req_name == "action":
return await self._io_action(args)
elif req_name == "request-arg":
return self._io_request_arg(args)
elif req_name == "request-method":
return self._io_request_method()
elif req_name == "ctx":
return self._io_ctx(args, ctx)
elif req_name == "helper":
return await self._io_helper(args, ctx)
else:
# Fall back to registered IO handlers (set-response-status, sleep, etc.)
from .primitives_io import _IO_HANDLERS, RequestContext
io_handler = _IO_HANDLERS.get(req_name)
if io_handler is not None:
helper_args = [_to_python(a) for a in args]
return await io_handler(helper_args, {}, ctx or RequestContext())
raise OcamlBridgeError(f"Unknown io-request type: {req_name}")
async def _io_query(self, args: list) -> Any:
"""Handle (io-request "query" service name params...)."""
from shared.infrastructure.internal import fetch_data
service = _to_str(args[0]) if len(args) > 0 else ""
query = _to_str(args[1]) if len(args) > 1 else ""
params = _to_dict(args[2]) if len(args) > 2 else {}
return await fetch_data(service, query, params)
async def _io_action(self, args: list) -> Any:
"""Handle (io-request "action" service name payload...)."""
from shared.infrastructure.internal import call_action
service = _to_str(args[0]) if len(args) > 0 else ""
action = _to_str(args[1]) if len(args) > 1 else ""
payload = _to_dict(args[2]) if len(args) > 2 else {}
return await call_action(service, action, payload)
def _io_request_arg(self, args: list) -> Any:
"""Handle (io-request "request-arg" name)."""
try:
from quart import request
name = _to_str(args[0]) if args else ""
return request.args.get(name)
except RuntimeError:
return None
def _io_request_method(self) -> str:
"""Handle (io-request "request-method")."""
try:
from quart import request
return request.method
except RuntimeError:
return "GET"
def _io_ctx(self, args: list, ctx: dict[str, Any] | None) -> Any:
"""Handle (io-request "ctx" key)."""
if ctx is None:
return None
key = _to_str(args[0]) if args else ""
return ctx.get(key)
# Helpers that are pure functions — safe to cache by args.
_CACHEABLE_HELPERS = frozenset({
"highlight", "component-source", "primitives-data",
"special-forms-data", "reference-data", "read-spec-file",
"bootstrapper-data", "bundle-analyzer-data", "routing-analyzer-data",
})
async def _io_helper(self, args: list, ctx: dict[str, Any] | None) -> Any:
"""Handle (io-request "helper" name arg1 arg2 ...).
Dispatches to registered page helpers — Python functions like
read-spec-file, bootstrapper-data, etc. The helper service name
is passed via ctx["_helper_service"].
Pure helpers (highlight etc.) are cached — same input always
produces same output. Eliminates blocking round-trips for
repeat calls across pages.
"""
import asyncio
from .pages import get_page_helpers
from .primitives_io import _IO_HANDLERS, RequestContext
name = _to_str(args[0]) if args else ""
helper_args = [_to_python(a) for a in args[1:]]
# Cache lookup for pure helpers
if name in self._CACHEABLE_HELPERS:
cache_key = (name, *[repr(a) for a in helper_args])
if cache_key in self._io_cache:
return self._io_cache[cache_key]
# Check page helpers first (application-level)
service = (ctx or {}).get("_helper_service", "sx")
helpers = get_page_helpers(service)
fn = helpers.get(name)
if fn is not None:
result = fn(*helper_args)
if asyncio.iscoroutine(result):
result = await result
# Cache pure helper results
if name in self._CACHEABLE_HELPERS:
self._io_cache[cache_key] = result
return result
# Fall back to IO primitives (now, state-get, state-set!, etc.)
io_handler = _IO_HANDLERS.get(name)
if io_handler is not None:
return await io_handler(helper_args, {}, RequestContext())
# Fall back to regular primitives (json-encode, into, etc.)
from .primitives import get_primitive as _get_prim
prim = _get_prim(name)
if prim is not None:
return prim(*helper_args)
raise OcamlBridgeError(f"Unknown helper: {name!r}")
# ------------------------------------------------------------------
# Module-level singleton
# ------------------------------------------------------------------
_bridge: OcamlBridge | None = None
async def get_bridge() -> OcamlBridge:
"""Get or create the singleton bridge instance."""
global _bridge
if _bridge is None:
_bridge = OcamlBridge()
if not _bridge._started:
await _bridge.start()
return _bridge
# ------------------------------------------------------------------
# Helpers
# ------------------------------------------------------------------
def _escape(s: str) -> str:
"""Escape a string for embedding in an SX string literal."""
return s.replace("\\", "\\\\").replace('"', '\\"').replace("\n", "\\n").replace("\r", "\\r").replace("\t", "\\t")
def _parse_response(line: str) -> tuple[str, str | None]:
"""Parse an (ok ...) or (error ...) response line.
Handles epoch-tagged responses: (ok EPOCH), (ok EPOCH value),
(error EPOCH "msg"), as well as legacy untagged responses.
Returns (kind, value) tuple.
"""
line = line.strip()
# (ok EPOCH) — tagged no-value
if line == "(ok)" or (line.startswith("(ok ") and line[4:-1].isdigit()):
return ("ok", None)
if line.startswith("(ok-raw "):
# (ok-raw EPOCH value) or (ok-raw value)
inner = line[8:-1]
# Strip epoch if present
if inner and inner[0].isdigit():
space = inner.find(" ")
if space > 0:
inner = inner[space + 1:]
else:
return ("ok", None)
return ("ok", inner)
if line.startswith("(ok "):
inner = line[4:-1] # strip (ok and )
# Strip epoch number if present: (ok 42 "value") → "value"
if inner and inner[0].isdigit():
space = inner.find(" ")
if space > 0:
inner = inner[space + 1:]
else:
# (ok EPOCH) with no value
return ("ok", None)
# If the value is a quoted string, unquote it
if inner.startswith('"') and inner.endswith('"'):
inner = _unescape(inner[1:-1])
return ("ok", inner)
if line.startswith("(error "):
inner = line[7:-1]
# Strip epoch number if present: (error 42 "msg") → "msg"
if inner and inner[0].isdigit():
space = inner.find(" ")
if space > 0:
inner = inner[space + 1:]
if inner.startswith('"') and inner.endswith('"'):
inner = _unescape(inner[1:-1])
return ("error", inner)
return ("error", f"Unexpected response: {line}")
def _unescape(s: str) -> str:
"""Unescape an SX string literal."""
return (
s.replace("\\n", "\n")
.replace("\\r", "\r")
.replace("\\t", "\t")
.replace('\\"', '"')
.replace("\\\\", "\\")
)
def _to_python(val: Any) -> Any:
"""Convert an SX parsed value to a plain Python value."""
from .types import NIL as _NIL
if val is None or val is _NIL:
return None
if hasattr(val, "name"): # Symbol or Keyword
return val.name
return val
def _to_str(val: Any) -> str:
"""Convert an SX parsed value to a Python string."""
if isinstance(val, str):
return val
if hasattr(val, "name"):
return val.name
return str(val)
def _to_dict(val: Any) -> dict:
"""Convert an SX parsed value to a Python dict."""
if isinstance(val, dict):
return val
return {}
def _serialize_for_ocaml(val: Any) -> str:
"""Serialize a Python value to SX text for sending to OCaml."""
if val is None:
return "nil"
if isinstance(val, bool):
return "true" if val else "false"
if isinstance(val, (int, float)):
if isinstance(val, float) and val == int(val):
return str(int(val))
return str(val)
if isinstance(val, str):
return f'"{_escape(val)}"'
if isinstance(val, (list, tuple)):
items = " ".join(_serialize_for_ocaml(v) for v in val)
return f"(list {items})"
if isinstance(val, dict):
pairs = " ".join(
f":{k} {_serialize_for_ocaml(v)}" for k, v in val.items()
)
return "{" + pairs + "}"
return f'"{_escape(str(val))}"'