""" Synchronous OCaml bridge — persistent subprocess for build-time evaluation. Used by bootstrappers (JS cli.py, OCaml bootstrap.py) that need a sync evaluator to run transpiler.sx. For async runtime use, see ocaml_bridge.py. """ from __future__ import annotations import os import subprocess import sys _DEFAULT_BIN = os.path.join( os.path.dirname(__file__), "../../hosts/ocaml/_build/default/bin/sx_server.exe", ) class OcamlSyncError(Exception): """Error from the OCaml SX kernel.""" def _sx_unescape(s: str) -> str: """Unescape an SX string literal (left-to-right, one pass).""" out = [] i = 0 while i < len(s): if s[i] == '\\' and i + 1 < len(s): c = s[i + 1] if c == 'n': out.append('\n') elif c == 'r': out.append('\r') elif c == 't': out.append('\t') elif c == '"': out.append('"') elif c == '\\': out.append('\\') else: out.append(c) i += 2 else: out.append(s[i]) i += 1 return ''.join(out) class OcamlSync: """Synchronous bridge to the OCaml sx_server subprocess.""" def __init__(self, binary: str | None = None): self._binary = binary or os.environ.get("SX_OCAML_BIN") or _DEFAULT_BIN self._proc: subprocess.Popen | None = None self._epoch: int = 0 def _ensure(self): if self._proc is not None and self._proc.poll() is None: return self._proc = subprocess.Popen( [self._binary], stdin=subprocess.PIPE, stdout=subprocess.PIPE, stderr=subprocess.PIPE, ) self._epoch = 0 # Wait for (ready) line = self._readline() if line != "(ready)": raise OcamlSyncError(f"Expected (ready), got: {line}") def _send(self, command: str): """Send a command with epoch prefix.""" assert self._proc and self._proc.stdin self._epoch += 1 self._proc.stdin.write(f"(epoch {self._epoch})\n".encode()) self._proc.stdin.write((command + "\n").encode()) self._proc.stdin.flush() def _readline(self) -> str: assert self._proc and self._proc.stdout data = self._proc.stdout.readline() if not data: raise OcamlSyncError("OCaml subprocess died unexpectedly") return data.decode().rstrip("\n") def _strip_epoch(self, inner: str) -> str: """Strip leading epoch number from a response value: '42 value' → 'value'.""" if inner and inner[0].isdigit(): space = inner.find(" ") if space > 0: return inner[space + 1:] return "" # epoch only, no value return inner def _read_response(self) -> str: """Read a single response. Returns the value string or raises on error. Handles epoch-tagged responses: (ok EPOCH), (ok EPOCH value), (ok-len EPOCH N), (error EPOCH "msg"). """ line = self._readline() # Length-prefixed blob: (ok-len N) or (ok-len EPOCH N) if line.startswith("(ok-len "): parts = line[1:-1].split() # ["ok-len", ...] n = int(parts[-1]) # last number is always byte count assert self._proc and self._proc.stdout data = self._proc.stdout.read(n) self._proc.stdout.readline() # trailing newline value = data.decode() # Blob is SX-serialized — strip string quotes and unescape if value.startswith('"') and value.endswith('"'): value = _sx_unescape(value[1:-1]) return value if line == "(ok)" or (line.startswith("(ok ") and line[4:-1].isdigit()): return "" if line.startswith("(ok-raw "): inner = self._strip_epoch(line[8:-1]) return inner if line.startswith("(ok "): value = self._strip_epoch(line[4:-1]) if value.startswith('"') and value.endswith('"'): value = _sx_unescape(value[1:-1]) return value if line.startswith("(error "): msg = self._strip_epoch(line[7:-1]) if msg.startswith('"') and msg.endswith('"'): msg = _sx_unescape(msg[1:-1]) raise OcamlSyncError(msg) raise OcamlSyncError(f"Unexpected response: {line}") def eval(self, source: str) -> str: """Evaluate SX source, return result as string.""" self._ensure() escaped = source.replace("\\", "\\\\").replace('"', '\\"') self._send(f'(eval "{escaped}")') return self._read_response() def load(self, path: str) -> str: """Load an .sx file into the kernel.""" self._ensure() self._send(f'(load "{path}")') return self._read_response() def load_source(self, source: str) -> str: """Load SX source directly into the kernel.""" self._ensure() escaped = source.replace("\\", "\\\\").replace('"', '\\"') self._send(f'(load-source "{escaped}")') return self._read_response() def stop(self): if self._proc and self._proc.poll() is None: self._proc.terminate() self._proc.wait(timeout=5) self._proc = None # Singleton _global: OcamlSync | None = None def get_sync_bridge() -> OcamlSync: global _global if _global is None: _global = OcamlSync() return _global