""" OCaml SX kernel ↔ Python coroutine bridge. Manages a persistent OCaml subprocess (sx_server) that evaluates SX expressions. When the OCaml kernel needs IO (database queries, service calls), it yields an ``(io-request ...)`` back to Python, which fulfills it asynchronously and sends an ``(io-response ...)`` back. Usage:: bridge = OcamlBridge() await bridge.start() html = await bridge.render('(div (p "hello"))') await bridge.stop() """ from __future__ import annotations import asyncio import logging import os from typing import Any _logger = logging.getLogger("sx.ocaml") # Default binary path — can be overridden via SX_OCAML_BIN env var _DEFAULT_BIN = os.path.join( os.path.dirname(__file__), "../../hosts/ocaml/_build/default/bin/sx_server.exe", ) class OcamlBridgeError(Exception): """Error from the OCaml SX kernel.""" class OcamlBridge: """Async bridge to a persistent OCaml SX subprocess.""" def __init__(self, binary: str | None = None): self._binary = binary or os.environ.get("SX_OCAML_BIN") or _DEFAULT_BIN self._proc: asyncio.subprocess.Process | None = None self._lock = asyncio.Lock() self._started = False self._components_loaded = False async def start(self) -> None: """Launch the OCaml subprocess and wait for (ready).""" if self._started: return bin_path = os.path.abspath(self._binary) if not os.path.isfile(bin_path): raise FileNotFoundError( f"OCaml SX server binary not found: {bin_path}\n" f"Build with: cd hosts/ocaml && eval $(opam env) && dune build" ) _logger.info("Starting OCaml SX kernel: %s", bin_path) self._proc = await asyncio.create_subprocess_exec( bin_path, stdin=asyncio.subprocess.PIPE, stdout=asyncio.subprocess.PIPE, stderr=asyncio.subprocess.PIPE, ) # Wait for (ready) line = await self._readline() if line != "(ready)": raise OcamlBridgeError(f"Expected (ready), got: {line!r}") self._started = True # Verify engine identity self._send("(ping)") kind, engine = await self._read_response() engine_name = engine if kind == "ok" else "unknown" _logger.info("OCaml SX kernel ready (pid=%d, engine=%s)", self._proc.pid, engine_name) async def stop(self) -> None: """Terminate the subprocess.""" if self._proc and self._proc.returncode is None: self._proc.stdin.close() try: await asyncio.wait_for(self._proc.wait(), timeout=5.0) except asyncio.TimeoutError: self._proc.kill() await self._proc.wait() _logger.info("OCaml SX kernel stopped") self._proc = None self._started = False async def ping(self) -> str: """Health check — returns engine name (e.g. 'ocaml-cek').""" async with self._lock: self._send("(ping)") kind, value = await self._read_response() return value or "" if kind == "ok" else "" async def load(self, path: str) -> int: """Load an .sx file for side effects (defcomp, define, defmacro).""" async with self._lock: self._send(f'(load "{_escape(path)}")') kind, value = await self._read_response() if kind == "error": raise OcamlBridgeError(f"load {path}: {value}") return int(float(value)) if value else 0 async def load_source(self, source: str) -> int: """Evaluate SX source for side effects.""" async with self._lock: self._send(f'(load-source "{_escape(source)}")') kind, value = await self._read_response() if kind == "error": raise OcamlBridgeError(f"load-source: {value}") return int(float(value)) if value else 0 async def eval(self, source: str) -> str: """Evaluate SX expression, return serialized result.""" async with self._lock: self._send(f'(eval "{_escape(source)}")') kind, value = await self._read_response() if kind == "error": raise OcamlBridgeError(f"eval: {value}") return value or "" async def render( self, source: str, ctx: dict[str, Any] | None = None, ) -> str: """Render SX to HTML, handling io-requests via Python async IO.""" await self._ensure_components() async with self._lock: self._send(f'(render "{_escape(source)}")') return await self._read_until_ok(ctx) async def _ensure_components(self) -> None: """Load component definitions into the kernel on first use.""" if self._components_loaded: return self._components_loaded = True try: from .jinja_bridge import get_component_env, _CLIENT_LIBRARY_SOURCES from .parser import serialize from .types import Component, Island, Macro env = get_component_env() parts: list[str] = list(_CLIENT_LIBRARY_SOURCES) for key, val in env.items(): if isinstance(val, Island): ps = ["&key"] + list(val.params) if val.has_children: ps.extend(["&rest", "children"]) parts.append(f"(defisland ~{val.name} ({' '.join(ps)}) {serialize(val.body)})") elif isinstance(val, Component): ps = ["&key"] + list(val.params) if val.has_children: ps.extend(["&rest", "children"]) parts.append(f"(defcomp ~{val.name} ({' '.join(ps)}) {serialize(val.body)})") elif isinstance(val, Macro): ps = list(val.params) if val.rest_param: ps.extend(["&rest", val.rest_param]) parts.append(f"(defmacro {val.name} ({' '.join(ps)}) {serialize(val.body)})") if parts: source = "\n".join(parts) await self.load_source(source) _logger.info("Loaded %d definitions into OCaml kernel", len(parts)) except Exception as e: _logger.error("Failed to load components into OCaml kernel: %s", e) self._components_loaded = False # retry next time async def reset(self) -> None: """Reset the kernel environment to pristine state.""" async with self._lock: self._send("(reset)") kind, value = await self._read_response() if kind == "error": raise OcamlBridgeError(f"reset: {value}") # ------------------------------------------------------------------ # Internal protocol handling # ------------------------------------------------------------------ def _send(self, line: str) -> None: """Write a line to the subprocess stdin.""" assert self._proc and self._proc.stdin self._proc.stdin.write((line + "\n").encode()) async def _readline(self) -> str: """Read a line from the subprocess stdout.""" assert self._proc and self._proc.stdout data = await self._proc.stdout.readline() if not data: # Process died — collect stderr for diagnostics stderr = b"" if self._proc.stderr: stderr = await self._proc.stderr.read() raise OcamlBridgeError( f"OCaml subprocess died unexpectedly. stderr: {stderr.decode(errors='replace')}" ) return data.decode().rstrip("\n") async def _read_response(self) -> tuple[str, str | None]: """Read a single (ok ...) or (error ...) response. Returns (kind, value) where kind is "ok" or "error". """ line = await self._readline() return _parse_response(line) async def _read_until_ok( self, ctx: dict[str, Any] | None = None, ) -> str: """Read lines until (ok ...) or (error ...). Handles (io-request ...) by fulfilling IO and sending (io-response ...). """ while True: line = await self._readline() if line.startswith("(io-request "): result = await self._handle_io_request(line, ctx) # Send response back to OCaml self._send(f"(io-response {_serialize_for_ocaml(result)})") continue kind, value = _parse_response(line) if kind == "error": raise OcamlBridgeError(value or "Unknown error") # kind == "ok" return value or "" async def _handle_io_request( self, line: str, ctx: dict[str, Any] | None, ) -> Any: """Dispatch an io-request to the appropriate Python handler.""" from .parser import parse_all # Parse the io-request parsed = parse_all(line) if not parsed or not isinstance(parsed[0], list): raise OcamlBridgeError(f"Malformed io-request: {line}") parts = parsed[0] # parts = [Symbol("io-request"), name_str, ...args] if len(parts) < 2: raise OcamlBridgeError(f"Malformed io-request: {line}") req_name = _to_str(parts[1]) args = parts[2:] if req_name == "query": return await self._io_query(args) elif req_name == "action": return await self._io_action(args) elif req_name == "request-arg": return self._io_request_arg(args) elif req_name == "request-method": return self._io_request_method() elif req_name == "ctx": return self._io_ctx(args, ctx) else: raise OcamlBridgeError(f"Unknown io-request type: {req_name}") async def _io_query(self, args: list) -> Any: """Handle (io-request "query" service name params...).""" from shared.infrastructure.internal import fetch_data service = _to_str(args[0]) if len(args) > 0 else "" query = _to_str(args[1]) if len(args) > 1 else "" params = _to_dict(args[2]) if len(args) > 2 else {} return await fetch_data(service, query, params) async def _io_action(self, args: list) -> Any: """Handle (io-request "action" service name payload...).""" from shared.infrastructure.internal import call_action service = _to_str(args[0]) if len(args) > 0 else "" action = _to_str(args[1]) if len(args) > 1 else "" payload = _to_dict(args[2]) if len(args) > 2 else {} return await call_action(service, action, payload) def _io_request_arg(self, args: list) -> Any: """Handle (io-request "request-arg" name).""" try: from quart import request name = _to_str(args[0]) if args else "" return request.args.get(name) except RuntimeError: return None def _io_request_method(self) -> str: """Handle (io-request "request-method").""" try: from quart import request return request.method except RuntimeError: return "GET" def _io_ctx(self, args: list, ctx: dict[str, Any] | None) -> Any: """Handle (io-request "ctx" key).""" if ctx is None: return None key = _to_str(args[0]) if args else "" return ctx.get(key) # ------------------------------------------------------------------ # Module-level singleton # ------------------------------------------------------------------ _bridge: OcamlBridge | None = None async def get_bridge() -> OcamlBridge: """Get or create the singleton bridge instance.""" global _bridge if _bridge is None: _bridge = OcamlBridge() if not _bridge._started: await _bridge.start() return _bridge # ------------------------------------------------------------------ # Helpers # ------------------------------------------------------------------ def _escape(s: str) -> str: """Escape a string for embedding in an SX string literal.""" return s.replace("\\", "\\\\").replace('"', '\\"').replace("\n", "\\n").replace("\r", "\\r").replace("\t", "\\t") def _parse_response(line: str) -> tuple[str, str | None]: """Parse an (ok ...) or (error ...) response line. Returns (kind, value) tuple. """ line = line.strip() if line == "(ok)": return ("ok", None) if line.startswith("(ok "): value = line[4:-1] # strip (ok and ) # If the value is a quoted string, unquote it if value.startswith('"') and value.endswith('"'): value = _unescape(value[1:-1]) return ("ok", value) if line.startswith("(error "): msg = line[7:-1] if msg.startswith('"') and msg.endswith('"'): msg = _unescape(msg[1:-1]) return ("error", msg) return ("error", f"Unexpected response: {line}") def _unescape(s: str) -> str: """Unescape an SX string literal.""" return ( s.replace("\\n", "\n") .replace("\\r", "\r") .replace("\\t", "\t") .replace('\\"', '"') .replace("\\\\", "\\") ) def _to_str(val: Any) -> str: """Convert an SX parsed value to a Python string.""" if isinstance(val, str): return val if hasattr(val, "name"): return val.name return str(val) def _to_dict(val: Any) -> dict: """Convert an SX parsed value to a Python dict.""" if isinstance(val, dict): return val return {} def _serialize_for_ocaml(val: Any) -> str: """Serialize a Python value to SX text for sending to OCaml.""" if val is None: return "nil" if isinstance(val, bool): return "true" if val else "false" if isinstance(val, (int, float)): if isinstance(val, float) and val == int(val): return str(int(val)) return str(val) if isinstance(val, str): return f'"{_escape(val)}"' if isinstance(val, (list, tuple)): items = " ".join(_serialize_for_ocaml(v) for v in val) return f"(list {items})" if isinstance(val, dict): pairs = " ".join( f":{k} {_serialize_for_ocaml(v)}" for k, v in val.items() ) return "{" + pairs + "}" return f'"{_escape(str(val))}"'