spec/ now contains only the language definition (5 files): evaluator.sx, parser.sx, primitives.sx, render.sx, special-forms.sx lib/ contains code written IN the language (8 files): stdlib.sx, types.sx, freeze.sx, content.sx, bytecode.sx, compiler.sx, vm.sx, callcc.sx Test files follow source: spec/tests/ for core language tests, lib/tests/ for library tests (continuations, freeze, types, vm). Updated all consumers: - JS/Python/OCaml bootstrappers: added lib/ to source search paths - OCaml bridge: spec_dir for parser/render, lib_dir for compiler/freeze - JS test runner: scans spec/tests/ (always) + lib/tests/ (--full) - OCaml test runner: scans spec/tests/, lib tests via explicit request - Docker dev mounts: added ./lib:/app/lib:ro Tests: 1041 JS standard, 1322 JS full, 1101 OCaml — all pass Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
981 lines
40 KiB
Python
981 lines
40 KiB
Python
"""
|
|
OCaml SX kernel ↔ Python coroutine bridge.
|
|
|
|
Manages a persistent OCaml subprocess (sx_server) that evaluates SX
|
|
expressions. When the OCaml kernel needs IO (database queries, service
|
|
calls), it yields an ``(io-request ...)`` back to Python, which fulfills
|
|
it asynchronously and sends an ``(io-response ...)`` back.
|
|
|
|
Usage::
|
|
|
|
bridge = OcamlBridge()
|
|
await bridge.start()
|
|
html = await bridge.render('(div (p "hello"))')
|
|
await bridge.stop()
|
|
"""
|
|
|
|
from __future__ import annotations
|
|
|
|
import asyncio
|
|
import logging
|
|
import os
|
|
from typing import Any
|
|
|
|
_logger = logging.getLogger("sx.ocaml")
|
|
|
|
# Default binary path — can be overridden via SX_OCAML_BIN env var
|
|
_DEFAULT_BIN = os.path.join(
|
|
os.path.dirname(__file__),
|
|
"../../hosts/ocaml/_build/default/bin/sx_server.exe",
|
|
)
|
|
|
|
|
|
class OcamlBridgeError(Exception):
|
|
"""Error from the OCaml SX kernel."""
|
|
|
|
|
|
class OcamlBridge:
|
|
"""Async bridge to a persistent OCaml SX subprocess."""
|
|
|
|
def __init__(self, binary: str | None = None):
|
|
self._binary = binary or os.environ.get("SX_OCAML_BIN") or _DEFAULT_BIN
|
|
self._proc: asyncio.subprocess.Process | None = None
|
|
self._lock = asyncio.Lock()
|
|
self._in_io_handler = False # re-entrancy guard
|
|
self._started = False
|
|
self._components_loaded = False
|
|
self._helpers_injected = False
|
|
self._io_cache: dict[tuple, Any] = {} # (name, args...) → cached result
|
|
self._epoch: int = 0 # request epoch — monotonically increasing
|
|
|
|
async def start(self) -> None:
|
|
"""Launch the OCaml subprocess and wait for (ready)."""
|
|
if self._started:
|
|
return
|
|
|
|
bin_path = os.path.abspath(self._binary)
|
|
if not os.path.isfile(bin_path):
|
|
raise FileNotFoundError(
|
|
f"OCaml SX server binary not found: {bin_path}\n"
|
|
f"Build with: cd hosts/ocaml && eval $(opam env) && dune build"
|
|
)
|
|
|
|
_logger.info("Starting OCaml SX kernel: %s", bin_path)
|
|
import sys
|
|
self._proc = await asyncio.create_subprocess_exec(
|
|
bin_path,
|
|
stdin=asyncio.subprocess.PIPE,
|
|
stdout=asyncio.subprocess.PIPE,
|
|
stderr=sys.stderr, # kernel timing/debug to container logs
|
|
limit=10 * 1024 * 1024, # 10MB readline buffer for large spec data
|
|
)
|
|
|
|
# Wait for (ready)
|
|
line = await self._readline()
|
|
if line != "(ready)":
|
|
raise OcamlBridgeError(f"Expected (ready), got: {line!r}")
|
|
|
|
self._started = True
|
|
|
|
# Verify engine identity
|
|
await self._send_command("(ping)")
|
|
kind, engine = await self._read_response()
|
|
engine_name = engine if kind == "ok" else "unknown"
|
|
_logger.info("OCaml SX kernel ready (pid=%d, engine=%s)", self._proc.pid, engine_name)
|
|
|
|
async def stop(self) -> None:
|
|
"""Terminate the subprocess."""
|
|
if self._proc and self._proc.returncode is None:
|
|
self._proc.stdin.close()
|
|
try:
|
|
await asyncio.wait_for(self._proc.wait(), timeout=5.0)
|
|
except asyncio.TimeoutError:
|
|
self._proc.kill()
|
|
await self._proc.wait()
|
|
_logger.info("OCaml SX kernel stopped")
|
|
self._proc = None
|
|
self._started = False
|
|
|
|
async def _restart(self) -> None:
|
|
"""Kill and restart the OCaml subprocess to recover from pipe desync."""
|
|
_logger.warning("Restarting OCaml SX kernel (pipe recovery)")
|
|
if self._proc and self._proc.returncode is None:
|
|
self._proc.kill()
|
|
await self._proc.wait()
|
|
self._proc = None
|
|
self._started = False
|
|
self._components_loaded = False
|
|
self._helpers_injected = False
|
|
await self.start()
|
|
|
|
async def ping(self) -> str:
|
|
"""Health check — returns engine name (e.g. 'ocaml-cek')."""
|
|
async with self._lock:
|
|
await self._send_command("(ping)")
|
|
kind, value = await self._read_response()
|
|
return value or "" if kind == "ok" else ""
|
|
|
|
async def load(self, path: str) -> int:
|
|
"""Load an .sx file for side effects (defcomp, define, defmacro)."""
|
|
async with self._lock:
|
|
await self._send_command(f'(load "{_escape(path)}")')
|
|
value = await self._read_until_ok(ctx=None)
|
|
return int(float(value)) if value else 0
|
|
|
|
async def load_source(self, source: str) -> int:
|
|
"""Evaluate SX source for side effects."""
|
|
async with self._lock:
|
|
await self._send_command(f'(load-source "{_escape(source)}")')
|
|
value = await self._read_until_ok(ctx=None)
|
|
return int(float(value)) if value else 0
|
|
|
|
async def eval(self, source: str, ctx: dict[str, Any] | None = None) -> str:
|
|
"""Evaluate SX expression, return serialized result.
|
|
|
|
Supports io-requests (helper calls, query, action, etc.) via the
|
|
coroutine bridge, just like render().
|
|
"""
|
|
await self._ensure_components()
|
|
async with self._lock:
|
|
await self._send_command('(eval-blob)')
|
|
await self._send_blob(source)
|
|
return await self._read_until_ok(ctx)
|
|
|
|
async def render(
|
|
self,
|
|
source: str,
|
|
ctx: dict[str, Any] | None = None,
|
|
) -> str:
|
|
"""Render SX to HTML, handling io-requests via Python async IO."""
|
|
await self._ensure_components()
|
|
async with self._lock:
|
|
await self._send_command(f'(render "{_escape(source)}")')
|
|
return await self._read_until_ok(ctx)
|
|
|
|
async def aser(self, source: str, ctx: dict[str, Any] | None = None) -> str:
|
|
"""Evaluate SX and return SX wire format, handling io-requests."""
|
|
await self._ensure_components()
|
|
async with self._lock:
|
|
await self._send_command('(aser-blob)')
|
|
await self._send_blob(source)
|
|
return await self._read_until_ok(ctx)
|
|
|
|
async def aser_slot(self, source: str, ctx: dict[str, Any] | None = None) -> str:
|
|
"""Like aser() but expands ALL components server-side.
|
|
|
|
Equivalent to Python's async_eval_slot_to_sx — used for layout
|
|
slots where component bodies need server-side IO evaluation.
|
|
"""
|
|
await self._ensure_components()
|
|
async with self._lock:
|
|
# Inject helpers inside the lock to avoid pipe desync —
|
|
# a separate lock acquisition could let another coroutine
|
|
# interleave commands between injection and aser-slot.
|
|
await self._inject_helpers_locked()
|
|
await self._send_command('(aser-slot-blob)')
|
|
await self._send_blob(source)
|
|
return await self._read_until_ok(ctx)
|
|
|
|
_shell_statics_injected: bool = False
|
|
|
|
async def _inject_shell_statics_locked(self) -> None:
|
|
"""Inject cached shell static data into kernel. MUST hold lock."""
|
|
if self._shell_statics_injected:
|
|
return
|
|
from .helpers import _get_shell_static
|
|
try:
|
|
static = _get_shell_static()
|
|
except Exception:
|
|
return # not ready yet (no app context)
|
|
# Only inject small, safe values as kernel variables.
|
|
# Large/complex blobs use placeholder tokens at render time.
|
|
for key in ("component_hash", "sx_css_classes", "asset_url",
|
|
"sx_js_hash", "body_js_hash"):
|
|
val = static.get(key) or ""
|
|
var = f"__shell-{key.replace('_', '-')}"
|
|
defn = f'(define {var} "{_escape(str(val))}")'
|
|
try:
|
|
await self._send_command(f'(load-source "{_escape(defn)}")')
|
|
await self._read_until_ok(ctx=None)
|
|
except OcamlBridgeError as e:
|
|
_logger.warning("Shell static inject failed for %s: %s", key, e)
|
|
# List/nil values
|
|
for key in ("head_scripts", "body_scripts"):
|
|
val = static.get(key)
|
|
var = f"__shell-{key.replace('_', '-')}"
|
|
if val is None:
|
|
defn = f'(define {var} nil)'
|
|
elif isinstance(val, list):
|
|
items = " ".join(f'"{_escape(str(v))}"' for v in val)
|
|
defn = f'(define {var} (list {items}))'
|
|
else:
|
|
defn = f'(define {var} "{_escape(str(val))}")'
|
|
try:
|
|
await self._send_command(f'(load-source "{_escape(defn)}")')
|
|
await self._read_until_ok(ctx=None)
|
|
except OcamlBridgeError as e:
|
|
_logger.warning("Shell static inject failed for %s: %s", key, e)
|
|
self._shell_statics_injected = True
|
|
_logger.info("Injected shell statics into OCaml kernel")
|
|
|
|
async def _inject_request_cookies_locked(self) -> None:
|
|
"""Send current request cookies to kernel for get-cookie primitive."""
|
|
try:
|
|
from quart import request
|
|
cookies = request.cookies
|
|
except Exception:
|
|
return # no request context (CLI mode, tests)
|
|
if not cookies:
|
|
return
|
|
# Build SX dict: {:name1 "val1" :name2 "val2"}
|
|
# Cookie values may be URL-encoded (client set-cookie uses
|
|
# encodeURIComponent) — decode before sending to kernel.
|
|
from urllib.parse import unquote
|
|
pairs = []
|
|
for k, v in cookies.items():
|
|
pairs.append(f':{k} "{_escape(unquote(str(v)))}"')
|
|
if pairs:
|
|
cmd = f'(set-request-cookies {{{" ".join(pairs)}}})'
|
|
try:
|
|
await self._send_command(cmd)
|
|
await self._read_until_ok(ctx=None)
|
|
except OcamlBridgeError as e:
|
|
_logger.debug("Cookie inject failed: %s", e)
|
|
|
|
async def sx_page_full(
|
|
self,
|
|
page_source: str,
|
|
shell_kwargs: dict[str, Any],
|
|
ctx: dict[str, Any] | None = None,
|
|
) -> str:
|
|
"""Render full page HTML in one OCaml call: aser-slot + shell render.
|
|
|
|
Static data (component_defs, CSS, pages_sx) is pre-injected as
|
|
kernel vars on first call. Per-request command sends only small
|
|
values (title, csrf) + references to the kernel vars.
|
|
"""
|
|
await self._ensure_components()
|
|
async with self._lock:
|
|
await self._inject_helpers_locked()
|
|
await self._inject_shell_statics_locked()
|
|
# Send request cookies so get-cookie works during SSR
|
|
await self._inject_request_cookies_locked()
|
|
# Large/complex blobs use placeholders — OCaml renders the shell
|
|
# with short tokens; Python splices in the real values post-render.
|
|
# This avoids piping large strings or strings with special chars
|
|
# through the SX parser.
|
|
PLACEHOLDER_KEYS = {"component_defs", "pages_sx", "init_sx",
|
|
"sx_css", "inline_css", "inline_head_js"}
|
|
placeholders = {}
|
|
static_keys = {"component_hash", "sx_css_classes", "asset_url",
|
|
"sx_js_hash", "body_js_hash",
|
|
"head_scripts", "body_scripts"}
|
|
# page_source is SX wire format that may contain \" escapes.
|
|
# Send via binary blob protocol to avoid double-escaping
|
|
# through the SX string parser round-trip.
|
|
parts = ['(sx-page-full-blob']
|
|
for key, val in shell_kwargs.items():
|
|
k = key.replace("_", "-")
|
|
if key in PLACEHOLDER_KEYS:
|
|
token = f"__SLOT_{key.upper()}__"
|
|
placeholders[token] = str(val) if val else ""
|
|
parts.append(f' :{k} "{token}"')
|
|
elif key in static_keys:
|
|
parts.append(f' :{k} __shell-{k}')
|
|
elif val is None:
|
|
parts.append(f' :{k} nil')
|
|
elif isinstance(val, bool):
|
|
parts.append(f' :{k} {"true" if val else "false"}')
|
|
elif isinstance(val, list):
|
|
items = " ".join(f'"{_escape(str(v))}"' for v in val)
|
|
parts.append(f' :{k} ({items})')
|
|
else:
|
|
parts.append(f' :{k} "{_escape(str(val))}"')
|
|
parts.append(")")
|
|
cmd = "".join(parts)
|
|
await self._send_command(cmd)
|
|
# Send page source as binary blob (avoids string-escape issues)
|
|
await self._send_blob(page_source)
|
|
html = await self._read_until_ok(ctx)
|
|
# Splice in large blobs
|
|
for token, blob in placeholders.items():
|
|
html = html.replace(token, blob)
|
|
return html
|
|
|
|
async def _inject_helpers_locked(self) -> None:
|
|
"""Inject page helpers into the kernel. MUST be called with lock held."""
|
|
if self._helpers_injected:
|
|
return
|
|
self._helpers_injected = True
|
|
try:
|
|
from .pages import get_page_helpers
|
|
import inspect
|
|
helpers = get_page_helpers("sx")
|
|
if not helpers:
|
|
self._helpers_injected = False
|
|
return
|
|
count = 0
|
|
for name, fn in helpers.items():
|
|
if callable(fn) and not name.startswith("~"):
|
|
try:
|
|
sig = inspect.signature(fn)
|
|
nargs = sum(1 for p in sig.parameters.values()
|
|
if p.kind in (p.POSITIONAL_ONLY, p.POSITIONAL_OR_KEYWORD))
|
|
except (ValueError, TypeError):
|
|
nargs = 2
|
|
nargs = max(nargs, 1)
|
|
param_names = " ".join(chr(97 + i) for i in range(nargs))
|
|
arg_list = " ".join(chr(97 + i) for i in range(nargs))
|
|
sx_def = f'(define {name} (fn ({param_names}) (helper "{name}" {arg_list})))'
|
|
try:
|
|
await self._send_command(f'(load-source "{_escape(sx_def)}")')
|
|
await self._read_until_ok(ctx=None)
|
|
count += 1
|
|
except OcamlBridgeError:
|
|
pass
|
|
_logger.info("Injected %d page helpers into OCaml kernel", count)
|
|
except Exception as e:
|
|
_logger.warning("Helper injection failed: %s", e)
|
|
self._helpers_injected = False
|
|
|
|
async def _compile_adapter_module(self) -> None:
|
|
"""Compile adapter-sx.sx to bytecode and load as a VM module.
|
|
|
|
Previously used Python's sx_ref.py evaluator for compilation.
|
|
Now the OCaml kernel handles JIT compilation natively — this method
|
|
is a no-op. The kernel's own JIT hook compiles functions on first call.
|
|
"""
|
|
_logger.info("Adapter module compilation delegated to OCaml kernel JIT")
|
|
|
|
async def _ensure_components(self) -> None:
|
|
"""Load all .sx source files into the kernel on first use.
|
|
|
|
Errors during loading are handled gracefully — IO responses are
|
|
always sent back to keep the pipe clean.
|
|
"""
|
|
if self._components_loaded:
|
|
return
|
|
self._components_loaded = True
|
|
try:
|
|
from .jinja_bridge import _watched_dirs, _dirs_from_cache
|
|
import glob
|
|
|
|
# Skip patterns — files that use constructs not available in the kernel
|
|
skip_names = {"boundary.sx", "forms.sx"}
|
|
skip_dirs = {"tests"}
|
|
|
|
# Collect files to load
|
|
all_files: list[str] = []
|
|
|
|
# Core spec files
|
|
spec_dir = os.path.join(os.path.dirname(__file__), "../../spec")
|
|
for spec_file in ["parser.sx", "render.sx"]:
|
|
path = os.path.normpath(os.path.join(spec_dir, spec_file))
|
|
if os.path.isfile(path):
|
|
all_files.append(path)
|
|
|
|
# Library files (compiler, vm, freeze — written in the language)
|
|
lib_dir = os.path.join(os.path.dirname(__file__), "../../lib")
|
|
for lib_file in ["bytecode.sx", "compiler.sx"]:
|
|
path = os.path.normpath(os.path.join(lib_dir, lib_file))
|
|
if os.path.isfile(path):
|
|
all_files.append(path)
|
|
|
|
# All directories loaded into the Python env
|
|
all_dirs = list(set(_watched_dirs) | _dirs_from_cache)
|
|
|
|
# Isomorphic libraries: signals, rendering, web forms
|
|
web_dir = os.path.join(os.path.dirname(__file__), "../../web")
|
|
if os.path.isdir(web_dir):
|
|
for web_file in ["signals.sx", "adapter-html.sx", "adapter-sx.sx",
|
|
"web-forms.sx"]:
|
|
path = os.path.normpath(os.path.join(web_dir, web_file))
|
|
if os.path.isfile(path):
|
|
all_files.append(path)
|
|
# Library files loaded after adapters (depend on scope primitives)
|
|
for lib_file in ["freeze.sx"]:
|
|
path = os.path.normpath(os.path.join(lib_dir, lib_file))
|
|
if os.path.isfile(path):
|
|
all_files.append(path)
|
|
|
|
for directory in sorted(all_dirs):
|
|
files = sorted(
|
|
glob.glob(os.path.join(directory, "**", "*.sx"), recursive=True)
|
|
)
|
|
for filepath in files:
|
|
basename = os.path.basename(filepath)
|
|
# Skip known-bad files
|
|
if basename in skip_names:
|
|
continue
|
|
# Skip test and handler directories
|
|
parts = filepath.replace("\\", "/").split("/")
|
|
if any(d in skip_dirs for d in parts):
|
|
continue
|
|
all_files.append(filepath)
|
|
|
|
# Load all files under a single lock
|
|
count = 0
|
|
skipped = 0
|
|
async with self._lock:
|
|
for filepath in all_files:
|
|
try:
|
|
await self._send_command(f'(load "{_escape(filepath)}")')
|
|
value = await self._read_until_ok(ctx=None)
|
|
# Response may be a number (count) or a value — just count files
|
|
count += 1
|
|
except OcamlBridgeError as e:
|
|
skipped += 1
|
|
_logger.warning("OCaml load skipped %s: %s",
|
|
filepath, e)
|
|
|
|
# SSR overrides: effect is a no-op on the server (prevents
|
|
# reactive loops during island SSR — effects are DOM side-effects)
|
|
try:
|
|
noop_dispose = '(fn () nil)'
|
|
await self._send_command(f'(load-source "(define effect (fn (f) {noop_dispose}))")')
|
|
await self._read_until_ok(ctx=None)
|
|
except OcamlBridgeError:
|
|
pass
|
|
|
|
# Register JIT hook — lambdas compile on first call
|
|
try:
|
|
await self._send_command('(vm-compile-adapter)')
|
|
await self._read_until_ok(ctx=None)
|
|
_logger.info("JIT hook registered — lambdas compile on first call")
|
|
except OcamlBridgeError as e:
|
|
_logger.warning("JIT hook registration skipped: %s", e)
|
|
_logger.info("Loaded %d definitions from .sx files into OCaml kernel (%d skipped)",
|
|
count, skipped)
|
|
except Exception as e:
|
|
_logger.error("Failed to load .sx files into OCaml kernel: %s", e)
|
|
self._components_loaded = False # retry next time
|
|
|
|
async def inject_page_helpers(self, helpers: dict) -> None:
|
|
"""Register page helpers as IO-routing definitions in the kernel.
|
|
|
|
Each helper becomes a function that yields (io-request "helper" name ...),
|
|
routing the call back to Python via the coroutine bridge.
|
|
"""
|
|
await self._ensure_components()
|
|
async with self._lock:
|
|
count = 0
|
|
for name, fn in helpers.items():
|
|
if callable(fn) and not name.startswith("~"):
|
|
sx_def = f'(define {name} (fn (&rest args) (apply helper (concat (list "{name}") args))))'
|
|
try:
|
|
await self._send_command(f'(load-source "{_escape(sx_def)}")')
|
|
await self._read_until_ok(ctx=None)
|
|
count += 1
|
|
except OcamlBridgeError:
|
|
pass # non-fatal
|
|
if count:
|
|
_logger.info("Injected %d page helpers into OCaml kernel", count)
|
|
|
|
async def reset(self) -> None:
|
|
"""Reset the kernel environment to pristine state."""
|
|
async with self._lock:
|
|
await self._send_command("(reset)")
|
|
kind, value = await self._read_response()
|
|
if kind == "error":
|
|
raise OcamlBridgeError(f"reset: {value}")
|
|
|
|
# ------------------------------------------------------------------
|
|
# Internal protocol handling
|
|
# ------------------------------------------------------------------
|
|
|
|
async def _send(self, line: str) -> None:
|
|
"""Write a line to the subprocess stdin and flush."""
|
|
if self._in_io_handler:
|
|
raise OcamlBridgeError(
|
|
f"Re-entrant bridge call from IO handler: {line[:80]}. "
|
|
f"IO handlers must not call the bridge — use Python-only code."
|
|
)
|
|
assert self._proc and self._proc.stdin
|
|
_logger.debug("SEND: %s", line[:120])
|
|
self._proc.stdin.write((line + "\n").encode())
|
|
await self._proc.stdin.drain()
|
|
|
|
async def _send_command(self, line: str) -> None:
|
|
"""Send a command with a fresh epoch prefix.
|
|
|
|
Increments the epoch counter and sends (epoch N) before the
|
|
actual command. The OCaml kernel tags all responses with this
|
|
epoch so stale messages from previous requests are discarded.
|
|
"""
|
|
self._epoch += 1
|
|
assert self._proc and self._proc.stdin
|
|
_logger.debug("EPOCH %d SEND: %s", self._epoch, line[:120])
|
|
self._proc.stdin.write(f"(epoch {self._epoch})\n".encode())
|
|
self._proc.stdin.write((line + "\n").encode())
|
|
await self._proc.stdin.drain()
|
|
|
|
async def _send_blob(self, data: str) -> None:
|
|
"""Send a length-prefixed binary blob to the subprocess.
|
|
|
|
Protocol: sends "(blob N)\\n" followed by exactly N bytes, then "\\n".
|
|
The OCaml side reads the length, then reads exactly N bytes.
|
|
This avoids string-escape round-trip issues for SX wire format.
|
|
"""
|
|
assert self._proc and self._proc.stdin
|
|
encoded = data.encode()
|
|
self._proc.stdin.write(f"(blob {len(encoded)})\n".encode())
|
|
self._proc.stdin.write(encoded)
|
|
self._proc.stdin.write(b"\n")
|
|
await self._proc.stdin.drain()
|
|
|
|
async def _readline(self) -> str:
|
|
"""Read a line from the subprocess stdout."""
|
|
assert self._proc and self._proc.stdout
|
|
data = await self._proc.stdout.readline()
|
|
if not data:
|
|
# Process died
|
|
raise OcamlBridgeError(
|
|
"OCaml subprocess died unexpectedly (check container stderr)"
|
|
)
|
|
line = data.decode().rstrip("\n")
|
|
_logger.debug("RECV: %s", line[:120])
|
|
return line
|
|
|
|
async def _read_response(self) -> tuple[str, str | None]:
|
|
"""Read a single (ok ...) or (error ...) response.
|
|
|
|
Returns (kind, value) where kind is "ok" or "error".
|
|
Discards stale epoch messages.
|
|
"""
|
|
while True:
|
|
line = await self._readline()
|
|
if not self._is_current_epoch(line):
|
|
_logger.debug("Discarding stale response: %s", line[:80])
|
|
if line.startswith("(ok-len "):
|
|
parts = line[1:-1].split()
|
|
if len(parts) >= 3:
|
|
n = int(parts[-1])
|
|
assert self._proc and self._proc.stdout
|
|
await self._proc.stdout.readexactly(n)
|
|
await self._proc.stdout.readline()
|
|
continue
|
|
# Length-prefixed blob: (ok-len EPOCH N) or (ok-len N)
|
|
if line.startswith("(ok-len "):
|
|
parts = line[1:-1].split()
|
|
n = int(parts[-1])
|
|
assert self._proc and self._proc.stdout
|
|
data = await self._proc.stdout.readexactly(n)
|
|
await self._proc.stdout.readline() # trailing newline
|
|
return ("ok", data.decode())
|
|
return _parse_response(line)
|
|
|
|
def _is_current_epoch(self, line: str) -> bool:
|
|
"""Check if a response line belongs to the current epoch.
|
|
|
|
Lines tagged with a stale epoch are discarded. Untagged lines
|
|
(from a kernel that predates the epoch protocol) are accepted.
|
|
"""
|
|
# Extract epoch number from known tagged formats:
|
|
# (ok EPOCH ...), (error EPOCH ...), (ok-len EPOCH N),
|
|
# (io-request EPOCH ...), (io-done EPOCH N)
|
|
import re
|
|
m = re.match(r'\((?:ok|error|ok-len|ok-raw|io-request|io-done)\s+(\d+)\b', line)
|
|
if m:
|
|
return int(m.group(1)) == self._epoch
|
|
# Untagged (legacy) — accept
|
|
return True
|
|
|
|
async def _read_until_ok(
|
|
self,
|
|
ctx: dict[str, Any] | None = None,
|
|
) -> str:
|
|
"""Read lines until (ok ...) or (error ...).
|
|
|
|
Handles IO requests in two modes:
|
|
- Legacy (blocking): single io-request → immediate io-response
|
|
- Batched: collect io-requests until (io-done N), process ALL
|
|
concurrently with asyncio.gather, send responses in order
|
|
|
|
Lines tagged with a stale epoch are silently discarded, making
|
|
pipe desync from previous failed requests impossible.
|
|
"""
|
|
import asyncio
|
|
pending_batch: list[str] = []
|
|
|
|
while True:
|
|
line = await self._readline()
|
|
|
|
# Discard stale epoch messages
|
|
if not self._is_current_epoch(line):
|
|
_logger.debug("Discarding stale epoch message: %s", line[:80])
|
|
# If it's a stale ok-len, drain the blob bytes too
|
|
if line.startswith("(ok-len "):
|
|
parts = line[1:-1].split()
|
|
if len(parts) >= 3:
|
|
n = int(parts[2])
|
|
assert self._proc and self._proc.stdout
|
|
await self._proc.stdout.readexactly(n)
|
|
await self._proc.stdout.readline()
|
|
continue
|
|
|
|
if line.startswith("(io-request "):
|
|
# New format: (io-request EPOCH ...) or (io-request EPOCH ID ...)
|
|
# Strip epoch from the line for IO dispatch
|
|
after = line[len("(io-request "):].lstrip()
|
|
# Skip epoch number if present
|
|
if after and after[0].isdigit():
|
|
# Could be epoch or batch ID — check for second number
|
|
parts = after.split(None, 2)
|
|
if len(parts) >= 2 and parts[1][0].isdigit():
|
|
# (io-request EPOCH ID "name" args...) — batched with epoch
|
|
pending_batch.append(line)
|
|
continue
|
|
elif len(parts) >= 2 and parts[1].startswith('"'):
|
|
# (io-request EPOCH "name" args...) — legacy with epoch
|
|
try:
|
|
result = await self._handle_io_request(line, ctx)
|
|
await self._send(
|
|
f"(io-response {self._epoch} {_serialize_for_ocaml(result)})")
|
|
except Exception as e:
|
|
_logger.warning("IO request failed, sending nil: %s", e)
|
|
await self._send(f"(io-response {self._epoch} nil)")
|
|
continue
|
|
else:
|
|
# Old format: (io-request ID "name" ...) — batched, no epoch
|
|
pending_batch.append(line)
|
|
continue
|
|
# Legacy blocking mode — respond immediately
|
|
try:
|
|
result = await self._handle_io_request(line, ctx)
|
|
await self._send(
|
|
f"(io-response {self._epoch} {_serialize_for_ocaml(result)})")
|
|
except Exception as e:
|
|
_logger.warning("IO request failed, sending nil: %s", e)
|
|
await self._send(f"(io-response {self._epoch} nil)")
|
|
continue
|
|
|
|
if line.startswith("(io-done "):
|
|
# Batch complete — process all pending IO concurrently
|
|
tasks = [self._handle_io_request(req, ctx)
|
|
for req in pending_batch]
|
|
results = await asyncio.gather(*tasks, return_exceptions=True)
|
|
for result in results:
|
|
if isinstance(result, BaseException):
|
|
_logger.warning("Batched IO failed: %s", result)
|
|
await self._send(f"(io-response {self._epoch} nil)")
|
|
else:
|
|
await self._send(
|
|
f"(io-response {self._epoch} {_serialize_for_ocaml(result)})")
|
|
pending_batch = []
|
|
continue
|
|
|
|
# Length-prefixed blob: (ok-len EPOCH N) or (ok-len N)
|
|
if line.startswith("(ok-len "):
|
|
parts = line[1:-1].split() # ["ok-len", epoch, n] or ["ok-len", n]
|
|
n = int(parts[-1]) # last number is always byte count
|
|
assert self._proc and self._proc.stdout
|
|
data = await self._proc.stdout.readexactly(n)
|
|
# Read trailing newline
|
|
await self._proc.stdout.readline()
|
|
return data.decode()
|
|
|
|
kind, value = _parse_response(line)
|
|
if kind == "error":
|
|
raise OcamlBridgeError(value or "Unknown error")
|
|
# kind == "ok"
|
|
return value or ""
|
|
|
|
async def _handle_io_request(
|
|
self,
|
|
line: str,
|
|
ctx: dict[str, Any] | None,
|
|
) -> Any:
|
|
"""Dispatch an io-request to the appropriate Python handler.
|
|
|
|
IO handlers MUST NOT call the bridge (eval/aser/render) — doing so
|
|
would deadlock since the lock is already held. The _in_io_handler
|
|
flag triggers an immediate error if this rule is violated.
|
|
"""
|
|
self._in_io_handler = True
|
|
try:
|
|
return await self._dispatch_io(line, ctx)
|
|
finally:
|
|
self._in_io_handler = False
|
|
|
|
async def _dispatch_io(
|
|
self,
|
|
line: str,
|
|
ctx: dict[str, Any] | None,
|
|
) -> Any:
|
|
"""Inner dispatch for IO requests."""
|
|
from .parser import parse_all
|
|
|
|
# Parse the io-request
|
|
parsed = parse_all(line)
|
|
if not parsed or not isinstance(parsed[0], list):
|
|
raise OcamlBridgeError(f"Malformed io-request: {line}")
|
|
|
|
parts = parsed[0]
|
|
# Legacy: [Symbol("io-request"), name_str, ...args]
|
|
# Batched: [Symbol("io-request"), id_num, name_str, ...args]
|
|
if len(parts) < 2:
|
|
raise OcamlBridgeError(f"Malformed io-request: {line}")
|
|
|
|
# Skip numeric batch ID if present
|
|
offset = 1
|
|
if isinstance(parts[1], (int, float)):
|
|
offset = 2
|
|
req_name = _to_str(parts[offset])
|
|
args = parts[offset + 1:]
|
|
|
|
if req_name == "query":
|
|
return await self._io_query(args)
|
|
elif req_name == "action":
|
|
return await self._io_action(args)
|
|
elif req_name == "request-arg":
|
|
return self._io_request_arg(args)
|
|
elif req_name == "request-method":
|
|
return self._io_request_method()
|
|
elif req_name == "ctx":
|
|
return self._io_ctx(args, ctx)
|
|
elif req_name == "helper":
|
|
return await self._io_helper(args, ctx)
|
|
else:
|
|
# Fall back to registered IO handlers (set-response-status, sleep, etc.)
|
|
from .primitives_io import _IO_HANDLERS, RequestContext
|
|
io_handler = _IO_HANDLERS.get(req_name)
|
|
if io_handler is not None:
|
|
helper_args = [_to_python(a) for a in args]
|
|
return await io_handler(helper_args, {}, ctx or RequestContext())
|
|
raise OcamlBridgeError(f"Unknown io-request type: {req_name}")
|
|
|
|
async def _io_query(self, args: list) -> Any:
|
|
"""Handle (io-request "query" service name params...)."""
|
|
from shared.infrastructure.internal import fetch_data
|
|
|
|
service = _to_str(args[0]) if len(args) > 0 else ""
|
|
query = _to_str(args[1]) if len(args) > 1 else ""
|
|
params = _to_dict(args[2]) if len(args) > 2 else {}
|
|
return await fetch_data(service, query, params)
|
|
|
|
async def _io_action(self, args: list) -> Any:
|
|
"""Handle (io-request "action" service name payload...)."""
|
|
from shared.infrastructure.internal import call_action
|
|
|
|
service = _to_str(args[0]) if len(args) > 0 else ""
|
|
action = _to_str(args[1]) if len(args) > 1 else ""
|
|
payload = _to_dict(args[2]) if len(args) > 2 else {}
|
|
return await call_action(service, action, payload)
|
|
|
|
def _io_request_arg(self, args: list) -> Any:
|
|
"""Handle (io-request "request-arg" name)."""
|
|
try:
|
|
from quart import request
|
|
name = _to_str(args[0]) if args else ""
|
|
return request.args.get(name)
|
|
except RuntimeError:
|
|
return None
|
|
|
|
def _io_request_method(self) -> str:
|
|
"""Handle (io-request "request-method")."""
|
|
try:
|
|
from quart import request
|
|
return request.method
|
|
except RuntimeError:
|
|
return "GET"
|
|
|
|
def _io_ctx(self, args: list, ctx: dict[str, Any] | None) -> Any:
|
|
"""Handle (io-request "ctx" key)."""
|
|
if ctx is None:
|
|
return None
|
|
key = _to_str(args[0]) if args else ""
|
|
return ctx.get(key)
|
|
|
|
# Helpers that are pure functions — safe to cache by args.
|
|
_CACHEABLE_HELPERS = frozenset({
|
|
"highlight", "component-source", "primitives-data",
|
|
"special-forms-data", "reference-data", "read-spec-file",
|
|
"bootstrapper-data", "bundle-analyzer-data", "routing-analyzer-data",
|
|
})
|
|
|
|
async def _io_helper(self, args: list, ctx: dict[str, Any] | None) -> Any:
|
|
"""Handle (io-request "helper" name arg1 arg2 ...).
|
|
|
|
Dispatches to registered page helpers — Python functions like
|
|
read-spec-file, bootstrapper-data, etc. The helper service name
|
|
is passed via ctx["_helper_service"].
|
|
|
|
Pure helpers (highlight etc.) are cached — same input always
|
|
produces same output. Eliminates blocking round-trips for
|
|
repeat calls across pages.
|
|
"""
|
|
import asyncio
|
|
from .pages import get_page_helpers
|
|
from .primitives_io import _IO_HANDLERS, RequestContext
|
|
|
|
name = _to_str(args[0]) if args else ""
|
|
helper_args = [_to_python(a) for a in args[1:]]
|
|
|
|
# Cache lookup for pure helpers
|
|
if name in self._CACHEABLE_HELPERS:
|
|
cache_key = (name, *[repr(a) for a in helper_args])
|
|
if cache_key in self._io_cache:
|
|
return self._io_cache[cache_key]
|
|
|
|
# Check page helpers first (application-level)
|
|
service = (ctx or {}).get("_helper_service", "sx")
|
|
helpers = get_page_helpers(service)
|
|
fn = helpers.get(name)
|
|
if fn is not None:
|
|
result = fn(*helper_args)
|
|
if asyncio.iscoroutine(result):
|
|
result = await result
|
|
# Cache pure helper results
|
|
if name in self._CACHEABLE_HELPERS:
|
|
self._io_cache[cache_key] = result
|
|
return result
|
|
|
|
# Fall back to IO primitives (now, state-get, state-set!, etc.)
|
|
io_handler = _IO_HANDLERS.get(name)
|
|
if io_handler is not None:
|
|
return await io_handler(helper_args, {}, RequestContext())
|
|
|
|
# Fall back to regular primitives (json-encode, into, etc.)
|
|
from .primitives import get_primitive as _get_prim
|
|
prim = _get_prim(name)
|
|
if prim is not None:
|
|
return prim(*helper_args)
|
|
|
|
raise OcamlBridgeError(f"Unknown helper: {name!r}")
|
|
|
|
|
|
# ------------------------------------------------------------------
|
|
# Module-level singleton
|
|
# ------------------------------------------------------------------
|
|
|
|
_bridge: OcamlBridge | None = None
|
|
|
|
|
|
async def get_bridge() -> OcamlBridge:
|
|
"""Get or create the singleton bridge instance."""
|
|
global _bridge
|
|
if _bridge is None:
|
|
_bridge = OcamlBridge()
|
|
if not _bridge._started:
|
|
await _bridge.start()
|
|
return _bridge
|
|
|
|
|
|
# ------------------------------------------------------------------
|
|
# Helpers
|
|
# ------------------------------------------------------------------
|
|
|
|
def _escape(s: str) -> str:
|
|
"""Escape a string for embedding in an SX string literal."""
|
|
return s.replace("\\", "\\\\").replace('"', '\\"').replace("\n", "\\n").replace("\r", "\\r").replace("\t", "\\t")
|
|
|
|
|
|
def _parse_response(line: str) -> tuple[str, str | None]:
|
|
"""Parse an (ok ...) or (error ...) response line.
|
|
|
|
Handles epoch-tagged responses: (ok EPOCH), (ok EPOCH value),
|
|
(error EPOCH "msg"), as well as legacy untagged responses.
|
|
|
|
Returns (kind, value) tuple.
|
|
"""
|
|
line = line.strip()
|
|
# (ok EPOCH) — tagged no-value
|
|
if line == "(ok)" or (line.startswith("(ok ") and line[4:-1].isdigit()):
|
|
return ("ok", None)
|
|
if line.startswith("(ok-raw "):
|
|
# (ok-raw EPOCH value) or (ok-raw value)
|
|
inner = line[8:-1]
|
|
# Strip epoch if present
|
|
if inner and inner[0].isdigit():
|
|
space = inner.find(" ")
|
|
if space > 0:
|
|
inner = inner[space + 1:]
|
|
else:
|
|
return ("ok", None)
|
|
return ("ok", inner)
|
|
if line.startswith("(ok "):
|
|
inner = line[4:-1] # strip (ok and )
|
|
# Strip epoch number if present: (ok 42 "value") → "value"
|
|
if inner and inner[0].isdigit():
|
|
space = inner.find(" ")
|
|
if space > 0:
|
|
inner = inner[space + 1:]
|
|
else:
|
|
# (ok EPOCH) with no value
|
|
return ("ok", None)
|
|
# If the value is a quoted string, unquote it
|
|
if inner.startswith('"') and inner.endswith('"'):
|
|
inner = _unescape(inner[1:-1])
|
|
return ("ok", inner)
|
|
if line.startswith("(error "):
|
|
inner = line[7:-1]
|
|
# Strip epoch number if present: (error 42 "msg") → "msg"
|
|
if inner and inner[0].isdigit():
|
|
space = inner.find(" ")
|
|
if space > 0:
|
|
inner = inner[space + 1:]
|
|
if inner.startswith('"') and inner.endswith('"'):
|
|
inner = _unescape(inner[1:-1])
|
|
return ("error", inner)
|
|
return ("error", f"Unexpected response: {line}")
|
|
|
|
|
|
def _unescape(s: str) -> str:
|
|
"""Unescape an SX string literal."""
|
|
return (
|
|
s.replace("\\n", "\n")
|
|
.replace("\\r", "\r")
|
|
.replace("\\t", "\t")
|
|
.replace('\\"', '"')
|
|
.replace("\\\\", "\\")
|
|
)
|
|
|
|
|
|
def _to_python(val: Any) -> Any:
|
|
"""Convert an SX parsed value to a plain Python value."""
|
|
from .types import NIL as _NIL
|
|
if val is None or val is _NIL:
|
|
return None
|
|
if hasattr(val, "name"): # Symbol or Keyword
|
|
return val.name
|
|
return val
|
|
|
|
|
|
def _to_str(val: Any) -> str:
|
|
"""Convert an SX parsed value to a Python string."""
|
|
if isinstance(val, str):
|
|
return val
|
|
if hasattr(val, "name"):
|
|
return val.name
|
|
return str(val)
|
|
|
|
|
|
def _to_dict(val: Any) -> dict:
|
|
"""Convert an SX parsed value to a Python dict."""
|
|
if isinstance(val, dict):
|
|
return val
|
|
return {}
|
|
|
|
|
|
def _serialize_for_ocaml(val: Any) -> str:
|
|
"""Serialize a Python value to SX text for sending to OCaml."""
|
|
if val is None:
|
|
return "nil"
|
|
if isinstance(val, bool):
|
|
return "true" if val else "false"
|
|
if isinstance(val, (int, float)):
|
|
if isinstance(val, float) and val == int(val):
|
|
return str(int(val))
|
|
return str(val)
|
|
if isinstance(val, str):
|
|
return f'"{_escape(val)}"'
|
|
if isinstance(val, (list, tuple)):
|
|
items = " ".join(_serialize_for_ocaml(v) for v in val)
|
|
return f"(list {items})"
|
|
if isinstance(val, dict):
|
|
pairs = " ".join(
|
|
f":{k} {_serialize_for_ocaml(v)}" for k, v in val.items()
|
|
)
|
|
return "{" + pairs + "}"
|
|
return f'"{_escape(str(val))}"'
|