""" Synchronous OCaml bridge — persistent subprocess for build-time evaluation. Used by bootstrappers (JS cli.py, OCaml bootstrap.py) that need a sync evaluator to run transpiler.sx. For async runtime use, see ocaml_bridge.py. """ from __future__ import annotations import os import subprocess import sys _DEFAULT_BIN = os.path.join( os.path.dirname(__file__), "../../hosts/ocaml/_build/default/bin/sx_server.exe", ) class OcamlSyncError(Exception): """Error from the OCaml SX kernel.""" def _sx_unescape(s: str) -> str: """Unescape an SX string literal (left-to-right, one pass).""" out = [] i = 0 while i < len(s): if s[i] == '\\' and i + 1 < len(s): c = s[i + 1] if c == 'n': out.append('\n') elif c == 'r': out.append('\r') elif c == 't': out.append('\t') elif c == '"': out.append('"') elif c == '\\': out.append('\\') else: out.append(c) i += 2 else: out.append(s[i]) i += 1 return ''.join(out) class OcamlSync: """Synchronous bridge to the OCaml sx_server subprocess.""" def __init__(self, binary: str | None = None): self._binary = binary or os.environ.get("SX_OCAML_BIN") or _DEFAULT_BIN self._proc: subprocess.Popen | None = None def _ensure(self): if self._proc is not None and self._proc.poll() is None: return self._proc = subprocess.Popen( [self._binary], stdin=subprocess.PIPE, stdout=subprocess.PIPE, stderr=subprocess.PIPE, ) # Wait for (ready) line = self._readline() if line != "(ready)": raise OcamlSyncError(f"Expected (ready), got: {line}") def _send(self, command: str): assert self._proc and self._proc.stdin self._proc.stdin.write((command + "\n").encode()) self._proc.stdin.flush() def _readline(self) -> str: assert self._proc and self._proc.stdout data = self._proc.stdout.readline() if not data: raise OcamlSyncError("OCaml subprocess died unexpectedly") return data.decode().rstrip("\n") def _read_response(self) -> str: """Read a single response. Returns the value string or raises on error.""" line = self._readline() # Length-prefixed blob: (ok-len N) if line.startswith("(ok-len "): n = int(line[8:-1]) assert self._proc and self._proc.stdout data = self._proc.stdout.read(n) self._proc.stdout.readline() # trailing newline value = data.decode() # Blob is SX-serialized — strip string quotes and unescape if value.startswith('"') and value.endswith('"'): value = _sx_unescape(value[1:-1]) return value if line == "(ok)": return "" if line.startswith("(ok-raw "): return line[8:-1] if line.startswith("(ok "): value = line[4:-1] if value.startswith('"') and value.endswith('"'): value = _sx_unescape(value[1:-1]) return value if line.startswith("(error "): msg = line[7:-1] if msg.startswith('"') and msg.endswith('"'): msg = _sx_unescape(msg[1:-1]) raise OcamlSyncError(msg) raise OcamlSyncError(f"Unexpected response: {line}") def eval(self, source: str) -> str: """Evaluate SX source, return result as string.""" self._ensure() escaped = source.replace("\\", "\\\\").replace('"', '\\"') self._send(f'(eval "{escaped}")') return self._read_response() def load(self, path: str) -> str: """Load an .sx file into the kernel.""" self._ensure() self._send(f'(load "{path}")') return self._read_response() def load_source(self, source: str) -> str: """Load SX source directly into the kernel.""" self._ensure() escaped = source.replace("\\", "\\\\").replace('"', '\\"') self._send(f'(load-source "{escaped}")') return self._read_response() def stop(self): if self._proc and self._proc.poll() is None: self._proc.terminate() self._proc.wait(timeout=5) self._proc = None # Singleton _global: OcamlSync | None = None def get_sync_bridge() -> OcamlSync: global _global if _global is None: _global = OcamlSync() return _global