""" OCaml SX kernel ↔ Python coroutine bridge. Manages a persistent OCaml subprocess (sx_server) that evaluates SX expressions. When the OCaml kernel needs IO (database queries, service calls), it yields an ``(io-request ...)`` back to Python, which fulfills it asynchronously and sends an ``(io-response ...)`` back. Usage:: bridge = OcamlBridge() await bridge.start() html = await bridge.render('(div (p "hello"))') await bridge.stop() """ from __future__ import annotations import asyncio import logging import os from typing import Any _logger = logging.getLogger("sx.ocaml") # Default binary path — can be overridden via SX_OCAML_BIN env var _DEFAULT_BIN = os.path.join( os.path.dirname(__file__), "../../hosts/ocaml/_build/default/bin/sx_server.exe", ) class OcamlBridgeError(Exception): """Error from the OCaml SX kernel.""" class OcamlBridge: """Async bridge to a persistent OCaml SX subprocess.""" def __init__(self, binary: str | None = None): self._binary = binary or os.environ.get("SX_OCAML_BIN") or _DEFAULT_BIN self._proc: asyncio.subprocess.Process | None = None self._lock = asyncio.Lock() self._in_io_handler = False # re-entrancy guard self._started = False self._components_loaded = False self._helpers_injected = False self._io_cache: dict[tuple, Any] = {} # (name, args...) → cached result async def start(self) -> None: """Launch the OCaml subprocess and wait for (ready).""" if self._started: return bin_path = os.path.abspath(self._binary) if not os.path.isfile(bin_path): raise FileNotFoundError( f"OCaml SX server binary not found: {bin_path}\n" f"Build with: cd hosts/ocaml && eval $(opam env) && dune build" ) _logger.info("Starting OCaml SX kernel: %s", bin_path) import sys self._proc = await asyncio.create_subprocess_exec( bin_path, stdin=asyncio.subprocess.PIPE, stdout=asyncio.subprocess.PIPE, stderr=sys.stderr, # kernel timing/debug to container logs limit=10 * 1024 * 1024, # 10MB readline buffer for large spec data ) # Wait for (ready) line = await self._readline() if line != "(ready)": raise OcamlBridgeError(f"Expected (ready), got: {line!r}") self._started = True # Verify engine identity await self._send("(ping)") kind, engine = await self._read_response() engine_name = engine if kind == "ok" else "unknown" _logger.info("OCaml SX kernel ready (pid=%d, engine=%s)", self._proc.pid, engine_name) async def stop(self) -> None: """Terminate the subprocess.""" if self._proc and self._proc.returncode is None: self._proc.stdin.close() try: await asyncio.wait_for(self._proc.wait(), timeout=5.0) except asyncio.TimeoutError: self._proc.kill() await self._proc.wait() _logger.info("OCaml SX kernel stopped") self._proc = None self._started = False async def ping(self) -> str: """Health check — returns engine name (e.g. 'ocaml-cek').""" async with self._lock: await self._send("(ping)") kind, value = await self._read_response() return value or "" if kind == "ok" else "" async def load(self, path: str) -> int: """Load an .sx file for side effects (defcomp, define, defmacro).""" async with self._lock: await self._send(f'(load "{_escape(path)}")') value = await self._read_until_ok(ctx=None) return int(float(value)) if value else 0 async def load_source(self, source: str) -> int: """Evaluate SX source for side effects.""" async with self._lock: await self._send(f'(load-source "{_escape(source)}")') value = await self._read_until_ok(ctx=None) return int(float(value)) if value else 0 async def eval(self, source: str, ctx: dict[str, Any] | None = None) -> str: """Evaluate SX expression, return serialized result. Supports io-requests (helper calls, query, action, etc.) via the coroutine bridge, just like render(). """ await self._ensure_components() async with self._lock: await self._send('(eval-blob)') await self._send_blob(source) return await self._read_until_ok(ctx) async def render( self, source: str, ctx: dict[str, Any] | None = None, ) -> str: """Render SX to HTML, handling io-requests via Python async IO.""" await self._ensure_components() async with self._lock: await self._send(f'(render "{_escape(source)}")') return await self._read_until_ok(ctx) async def aser(self, source: str, ctx: dict[str, Any] | None = None) -> str: """Evaluate SX and return SX wire format, handling io-requests.""" await self._ensure_components() async with self._lock: await self._send('(aser-blob)') await self._send_blob(source) return await self._read_until_ok(ctx) async def aser_slot(self, source: str, ctx: dict[str, Any] | None = None) -> str: """Like aser() but expands ALL components server-side. Equivalent to Python's async_eval_slot_to_sx — used for layout slots where component bodies need server-side IO evaluation. """ await self._ensure_components() async with self._lock: # Inject helpers inside the lock to avoid pipe desync — # a separate lock acquisition could let another coroutine # interleave commands between injection and aser-slot. await self._inject_helpers_locked() await self._send('(aser-slot-blob)') await self._send_blob(source) return await self._read_until_ok(ctx) _shell_statics_injected: bool = False async def _inject_shell_statics_locked(self) -> None: """Inject cached shell static data into kernel. MUST hold lock.""" if self._shell_statics_injected: return from .helpers import _get_shell_static try: static = _get_shell_static() except Exception: return # not ready yet (no app context) # Only inject small, safe values as kernel variables. # Large/complex blobs use placeholder tokens at render time. for key in ("component_hash", "sx_css_classes", "asset_url", "sx_js_hash", "body_js_hash"): val = static.get(key) or "" var = f"__shell-{key.replace('_', '-')}" defn = f'(define {var} "{_escape(str(val))}")' try: await self._send(f'(load-source "{_escape(defn)}")') await self._read_until_ok(ctx=None) except OcamlBridgeError as e: _logger.warning("Shell static inject failed for %s: %s", key, e) # List/nil values for key in ("head_scripts", "body_scripts"): val = static.get(key) var = f"__shell-{key.replace('_', '-')}" if val is None: defn = f'(define {var} nil)' elif isinstance(val, list): items = " ".join(f'"{_escape(str(v))}"' for v in val) defn = f'(define {var} (list {items}))' else: defn = f'(define {var} "{_escape(str(val))}")' try: await self._send(f'(load-source "{_escape(defn)}")') await self._read_until_ok(ctx=None) except OcamlBridgeError as e: _logger.warning("Shell static inject failed for %s: %s", key, e) self._shell_statics_injected = True _logger.info("Injected shell statics into OCaml kernel") async def sx_page_full( self, page_source: str, shell_kwargs: dict[str, Any], ctx: dict[str, Any] | None = None, ) -> str: """Render full page HTML in one OCaml call: aser-slot + shell render. Static data (component_defs, CSS, pages_sx) is pre-injected as kernel vars on first call. Per-request command sends only small values (title, csrf) + references to the kernel vars. """ await self._ensure_components() async with self._lock: await self._inject_helpers_locked() await self._inject_shell_statics_locked() # Large/complex blobs use placeholders — OCaml renders the shell # with short tokens; Python splices in the real values post-render. # This avoids piping large strings or strings with special chars # through the SX parser. PLACEHOLDER_KEYS = {"component_defs", "pages_sx", "init_sx", "sx_css", "inline_css", "inline_head_js"} placeholders = {} static_keys = {"component_hash", "sx_css_classes", "asset_url", "sx_js_hash", "body_js_hash", "head_scripts", "body_scripts"} # page_source is SX wire format that may contain \" escapes. # Send via binary blob protocol to avoid double-escaping # through the SX string parser round-trip. parts = ['(sx-page-full-blob'] for key, val in shell_kwargs.items(): k = key.replace("_", "-") if key in PLACEHOLDER_KEYS: token = f"__SLOT_{key.upper()}__" placeholders[token] = str(val) if val else "" parts.append(f' :{k} "{token}"') elif key in static_keys: parts.append(f' :{k} __shell-{k}') elif val is None: parts.append(f' :{k} nil') elif isinstance(val, bool): parts.append(f' :{k} {"true" if val else "false"}') elif isinstance(val, list): items = " ".join(f'"{_escape(str(v))}"' for v in val) parts.append(f' :{k} ({items})') else: parts.append(f' :{k} "{_escape(str(val))}"') parts.append(")") cmd = "".join(parts) await self._send(cmd) # Send page source as binary blob (avoids string-escape issues) await self._send_blob(page_source) html = await self._read_until_ok(ctx) # Splice in large blobs for token, blob in placeholders.items(): html = html.replace(token, blob) return html async def _inject_helpers_locked(self) -> None: """Inject page helpers into the kernel. MUST be called with lock held.""" if self._helpers_injected: return self._helpers_injected = True try: from .pages import get_page_helpers import inspect helpers = get_page_helpers("sx") if not helpers: self._helpers_injected = False return count = 0 for name, fn in helpers.items(): if callable(fn) and not name.startswith("~"): try: sig = inspect.signature(fn) nargs = sum(1 for p in sig.parameters.values() if p.kind in (p.POSITIONAL_ONLY, p.POSITIONAL_OR_KEYWORD)) except (ValueError, TypeError): nargs = 2 nargs = max(nargs, 1) param_names = " ".join(chr(97 + i) for i in range(nargs)) arg_list = " ".join(chr(97 + i) for i in range(nargs)) sx_def = f'(define {name} (fn ({param_names}) (helper "{name}" {arg_list})))' try: await self._send(f'(load-source "{_escape(sx_def)}")') await self._read_until_ok(ctx=None) count += 1 except OcamlBridgeError: pass _logger.info("Injected %d page helpers into OCaml kernel", count) except Exception as e: _logger.warning("Helper injection failed: %s", e) self._helpers_injected = False async def _compile_adapter_module(self) -> None: """Compile adapter-sx.sx to bytecode and load as a VM module. All aser functions become NativeFn VM closures in the kernel env. Subsequent aser-slot calls find them as NativeFn → VM executes the entire render path compiled, no CEK steps. """ from .parser import parse_all, serialize from .ref.sx_ref import eval_expr, trampoline, PRIMITIVES # Ensure compiler primitives are available if 'serialize' not in PRIMITIVES: PRIMITIVES['serialize'] = lambda x: serialize(x) if 'primitive?' not in PRIMITIVES: PRIMITIVES['primitive?'] = lambda name: isinstance(name, str) and name in PRIMITIVES if 'has-key?' not in PRIMITIVES: PRIMITIVES['has-key?'] = lambda *a: isinstance(a[0], dict) and str(a[1]) in a[0] if 'set-nth!' not in PRIMITIVES: from .types import NIL PRIMITIVES['set-nth!'] = lambda *a: (a[0].__setitem__(int(a[1]), a[2]), NIL)[-1] if 'init' not in PRIMITIVES: PRIMITIVES['init'] = lambda *a: a[0][:-1] if isinstance(a[0], list) else a[0] if 'concat' not in PRIMITIVES: PRIMITIVES['concat'] = lambda *a: (a[0] or []) + (a[1] or []) if 'slice' not in PRIMITIVES: PRIMITIVES['slice'] = lambda *a: a[0][int(a[1]):int(a[2])] if len(a) == 3 else a[0][int(a[1]):] from .types import Symbol if 'make-symbol' not in PRIMITIVES: PRIMITIVES['make-symbol'] = lambda name: Symbol(name) from .types import NIL for ho in ['map', 'filter', 'for-each', 'reduce', 'some', 'every?', 'map-indexed']: if ho not in PRIMITIVES: PRIMITIVES[ho] = lambda *a: NIL # Load compiler compiler_env = {} spec_dir = os.path.join(os.path.dirname(__file__), "../../spec") for f in ["bytecode.sx", "compiler.sx"]: path = os.path.join(spec_dir, f) if os.path.isfile(path): with open(path) as fh: for expr in parse_all(fh.read()): trampoline(eval_expr(expr, compiler_env)) # Compile adapter-sx.sx web_dir = os.path.join(os.path.dirname(__file__), "../../web") adapter_path = os.path.join(web_dir, "adapter-sx.sx") if not os.path.isfile(adapter_path): _logger.warning("adapter-sx.sx not found at %s", adapter_path) return with open(adapter_path) as f: adapter_exprs = parse_all(f.read()) compiled = trampoline(eval_expr( [Symbol('compile-module'), [Symbol('quote'), adapter_exprs]], compiler_env)) code_sx = serialize(compiled) _logger.info("Compiled adapter-sx.sx: %d bytes bytecode", len(code_sx)) # Load the compiled module into the OCaml VM async with self._lock: await self._send(f'(vm-load-module {code_sx})') await self._read_until_ok(ctx=None) _logger.info("Loaded adapter-sx.sx as VM module") async def _ensure_components(self) -> None: """Load all .sx source files into the kernel on first use. Errors during loading are handled gracefully — IO responses are always sent back to keep the pipe clean. """ if self._components_loaded: return self._components_loaded = True try: from .jinja_bridge import _watched_dirs, _dirs_from_cache import glob # Skip patterns — files that use constructs not available in the kernel skip_names = {"boundary.sx", "forms.sx"} skip_dirs = {"tests"} # Collect files to load all_files: list[str] = [] # Spec files needed by aser + bytecode compiler spec_dir = os.path.join(os.path.dirname(__file__), "../../spec") for spec_file in ["parser.sx", "render.sx", "bytecode.sx", "compiler.sx"]: path = os.path.normpath(os.path.join(spec_dir, spec_file)) if os.path.isfile(path): all_files.append(path) # All directories loaded into the Python env all_dirs = list(set(_watched_dirs) | _dirs_from_cache) # Web adapters + signals (aser lives in adapter-sx.sx, # signals.sx provides reactive primitives for island SSR) web_dir = os.path.join(os.path.dirname(__file__), "../../web") if os.path.isdir(web_dir): for web_file in ["signals.sx", "adapter-sx.sx"]: path = os.path.normpath(os.path.join(web_dir, web_file)) if os.path.isfile(path): all_files.append(path) for directory in sorted(all_dirs): files = sorted( glob.glob(os.path.join(directory, "**", "*.sx"), recursive=True) ) for filepath in files: basename = os.path.basename(filepath) # Skip known-bad files if basename in skip_names: continue # Skip test and handler directories parts = filepath.replace("\\", "/").split("/") if any(d in skip_dirs for d in parts): continue all_files.append(filepath) # Load all files under a single lock count = 0 skipped = 0 async with self._lock: for filepath in all_files: try: await self._send(f'(load "{_escape(filepath)}")') value = await self._read_until_ok(ctx=None) # Response may be a number (count) or a value — just count files count += 1 except OcamlBridgeError as e: skipped += 1 _logger.warning("OCaml load skipped %s: %s", filepath, e) # Register JIT hook — lambdas compile on first call try: await self._send('(vm-compile-adapter)') await self._read_until_ok(ctx=None) _logger.info("JIT hook registered — lambdas compile on first call") except OcamlBridgeError as e: _logger.warning("JIT hook registration skipped: %s", e) _logger.info("Loaded %d definitions from .sx files into OCaml kernel (%d skipped)", count, skipped) except Exception as e: _logger.error("Failed to load .sx files into OCaml kernel: %s", e) self._components_loaded = False # retry next time async def inject_page_helpers(self, helpers: dict) -> None: """Register page helpers as IO-routing definitions in the kernel. Each helper becomes a function that yields (io-request "helper" name ...), routing the call back to Python via the coroutine bridge. """ await self._ensure_components() async with self._lock: count = 0 for name, fn in helpers.items(): if callable(fn) and not name.startswith("~"): sx_def = f'(define {name} (fn (&rest args) (apply helper (concat (list "{name}") args))))' try: await self._send(f'(load-source "{_escape(sx_def)}")') await self._read_until_ok(ctx=None) count += 1 except OcamlBridgeError: pass # non-fatal if count: _logger.info("Injected %d page helpers into OCaml kernel", count) async def reset(self) -> None: """Reset the kernel environment to pristine state.""" async with self._lock: await self._send("(reset)") kind, value = await self._read_response() if kind == "error": raise OcamlBridgeError(f"reset: {value}") # ------------------------------------------------------------------ # Internal protocol handling # ------------------------------------------------------------------ async def _send(self, line: str) -> None: """Write a line to the subprocess stdin and flush.""" if self._in_io_handler: raise OcamlBridgeError( f"Re-entrant bridge call from IO handler: {line[:80]}. " f"IO handlers must not call the bridge — use Python-only code." ) assert self._proc and self._proc.stdin _logger.debug("SEND: %s", line[:120]) self._proc.stdin.write((line + "\n").encode()) await self._proc.stdin.drain() async def _send_blob(self, data: str) -> None: """Send a length-prefixed binary blob to the subprocess. Protocol: sends "(blob N)\\n" followed by exactly N bytes, then "\\n". The OCaml side reads the length, then reads exactly N bytes. This avoids string-escape round-trip issues for SX wire format. """ assert self._proc and self._proc.stdin encoded = data.encode() self._proc.stdin.write(f"(blob {len(encoded)})\n".encode()) self._proc.stdin.write(encoded) self._proc.stdin.write(b"\n") await self._proc.stdin.drain() async def _readline(self) -> str: """Read a line from the subprocess stdout.""" assert self._proc and self._proc.stdout data = await self._proc.stdout.readline() if not data: # Process died raise OcamlBridgeError( "OCaml subprocess died unexpectedly (check container stderr)" ) line = data.decode().rstrip("\n") _logger.debug("RECV: %s", line[:120]) return line async def _read_response(self) -> tuple[str, str | None]: """Read a single (ok ...) or (error ...) response. Returns (kind, value) where kind is "ok" or "error". """ line = await self._readline() # Length-prefixed blob if line.startswith("(ok-len "): n = int(line[8:-1]) assert self._proc and self._proc.stdout data = await self._proc.stdout.readexactly(n) await self._proc.stdout.readline() # trailing newline return ("ok", data.decode()) return _parse_response(line) async def _read_until_ok( self, ctx: dict[str, Any] | None = None, ) -> str: """Read lines until (ok ...) or (error ...). Handles IO requests in two modes: - Legacy (blocking): single io-request → immediate io-response - Batched: collect io-requests until (io-done N), process ALL concurrently with asyncio.gather, send responses in order """ import asyncio pending_batch: list[str] = [] while True: line = await self._readline() if line.startswith("(io-request "): # Check if batched (has numeric ID after "io-request ") after = line[len("(io-request "):].lstrip() if after and after[0].isdigit(): # Batched mode — collect, don't respond yet pending_batch.append(line) continue # Legacy blocking mode — respond immediately try: result = await self._handle_io_request(line, ctx) await self._send(f"(io-response {_serialize_for_ocaml(result)})") except Exception as e: _logger.warning("IO request failed, sending nil: %s", e) await self._send("(io-response nil)") continue if line.startswith("(io-done "): # Batch complete — process all pending IO concurrently tasks = [self._handle_io_request(req, ctx) for req in pending_batch] results = await asyncio.gather(*tasks, return_exceptions=True) for result in results: if isinstance(result, BaseException): _logger.warning("Batched IO failed: %s", result) await self._send("(io-response nil)") else: await self._send( f"(io-response {_serialize_for_ocaml(result)})") pending_batch = [] continue # Length-prefixed blob: (ok-len N) if line.startswith("(ok-len "): n = int(line[8:-1]) assert self._proc and self._proc.stdout data = await self._proc.stdout.readexactly(n) # Read trailing newline await self._proc.stdout.readline() return data.decode() kind, value = _parse_response(line) if kind == "error": raise OcamlBridgeError(value or "Unknown error") # kind == "ok" return value or "" async def _handle_io_request( self, line: str, ctx: dict[str, Any] | None, ) -> Any: """Dispatch an io-request to the appropriate Python handler. IO handlers MUST NOT call the bridge (eval/aser/render) — doing so would deadlock since the lock is already held. The _in_io_handler flag triggers an immediate error if this rule is violated. """ self._in_io_handler = True try: return await self._dispatch_io(line, ctx) finally: self._in_io_handler = False async def _dispatch_io( self, line: str, ctx: dict[str, Any] | None, ) -> Any: """Inner dispatch for IO requests.""" from .parser import parse_all # Parse the io-request parsed = parse_all(line) if not parsed or not isinstance(parsed[0], list): raise OcamlBridgeError(f"Malformed io-request: {line}") parts = parsed[0] # Legacy: [Symbol("io-request"), name_str, ...args] # Batched: [Symbol("io-request"), id_num, name_str, ...args] if len(parts) < 2: raise OcamlBridgeError(f"Malformed io-request: {line}") # Skip numeric batch ID if present offset = 1 if isinstance(parts[1], (int, float)): offset = 2 req_name = _to_str(parts[offset]) args = parts[offset + 1:] if req_name == "query": return await self._io_query(args) elif req_name == "action": return await self._io_action(args) elif req_name == "request-arg": return self._io_request_arg(args) elif req_name == "request-method": return self._io_request_method() elif req_name == "ctx": return self._io_ctx(args, ctx) elif req_name == "helper": return await self._io_helper(args, ctx) else: # Fall back to registered IO handlers (set-response-status, sleep, etc.) from .primitives_io import _IO_HANDLERS, RequestContext io_handler = _IO_HANDLERS.get(req_name) if io_handler is not None: helper_args = [_to_python(a) for a in args] return await io_handler(helper_args, {}, ctx or RequestContext()) raise OcamlBridgeError(f"Unknown io-request type: {req_name}") async def _io_query(self, args: list) -> Any: """Handle (io-request "query" service name params...).""" from shared.infrastructure.internal import fetch_data service = _to_str(args[0]) if len(args) > 0 else "" query = _to_str(args[1]) if len(args) > 1 else "" params = _to_dict(args[2]) if len(args) > 2 else {} return await fetch_data(service, query, params) async def _io_action(self, args: list) -> Any: """Handle (io-request "action" service name payload...).""" from shared.infrastructure.internal import call_action service = _to_str(args[0]) if len(args) > 0 else "" action = _to_str(args[1]) if len(args) > 1 else "" payload = _to_dict(args[2]) if len(args) > 2 else {} return await call_action(service, action, payload) def _io_request_arg(self, args: list) -> Any: """Handle (io-request "request-arg" name).""" try: from quart import request name = _to_str(args[0]) if args else "" return request.args.get(name) except RuntimeError: return None def _io_request_method(self) -> str: """Handle (io-request "request-method").""" try: from quart import request return request.method except RuntimeError: return "GET" def _io_ctx(self, args: list, ctx: dict[str, Any] | None) -> Any: """Handle (io-request "ctx" key).""" if ctx is None: return None key = _to_str(args[0]) if args else "" return ctx.get(key) # Helpers that are pure functions — safe to cache by args. _CACHEABLE_HELPERS = frozenset({ "highlight", "component-source", "primitives-data", "special-forms-data", "reference-data", "read-spec-file", "bootstrapper-data", "bundle-analyzer-data", "routing-analyzer-data", }) async def _io_helper(self, args: list, ctx: dict[str, Any] | None) -> Any: """Handle (io-request "helper" name arg1 arg2 ...). Dispatches to registered page helpers — Python functions like read-spec-file, bootstrapper-data, etc. The helper service name is passed via ctx["_helper_service"]. Pure helpers (highlight etc.) are cached — same input always produces same output. Eliminates blocking round-trips for repeat calls across pages. """ import asyncio from .pages import get_page_helpers from .primitives_io import _IO_HANDLERS, RequestContext name = _to_str(args[0]) if args else "" helper_args = [_to_python(a) for a in args[1:]] # Cache lookup for pure helpers if name in self._CACHEABLE_HELPERS: cache_key = (name, *[repr(a) for a in helper_args]) if cache_key in self._io_cache: return self._io_cache[cache_key] # Check page helpers first (application-level) service = (ctx or {}).get("_helper_service", "sx") helpers = get_page_helpers(service) fn = helpers.get(name) if fn is not None: result = fn(*helper_args) if asyncio.iscoroutine(result): result = await result # Cache pure helper results if name in self._CACHEABLE_HELPERS: self._io_cache[cache_key] = result return result # Fall back to IO primitives (now, state-get, state-set!, etc.) io_handler = _IO_HANDLERS.get(name) if io_handler is not None: return await io_handler(helper_args, {}, RequestContext()) # Fall back to regular primitives (json-encode, into, etc.) from .primitives import get_primitive as _get_prim prim = _get_prim(name) if prim is not None: return prim(*helper_args) raise OcamlBridgeError(f"Unknown helper: {name!r}") # ------------------------------------------------------------------ # Module-level singleton # ------------------------------------------------------------------ _bridge: OcamlBridge | None = None async def get_bridge() -> OcamlBridge: """Get or create the singleton bridge instance.""" global _bridge if _bridge is None: _bridge = OcamlBridge() if not _bridge._started: await _bridge.start() return _bridge # ------------------------------------------------------------------ # Helpers # ------------------------------------------------------------------ def _escape(s: str) -> str: """Escape a string for embedding in an SX string literal.""" return s.replace("\\", "\\\\").replace('"', '\\"').replace("\n", "\\n").replace("\r", "\\r").replace("\t", "\\t") def _parse_response(line: str) -> tuple[str, str | None]: """Parse an (ok ...) or (error ...) response line. Returns (kind, value) tuple. """ line = line.strip() if line == "(ok)": return ("ok", None) if line.startswith("(ok-raw "): # Raw SX wire format — no unescaping needed return ("ok", line[8:-1]) if line.startswith("(ok "): value = line[4:-1] # strip (ok and ) # If the value is a quoted string, unquote it if value.startswith('"') and value.endswith('"'): value = _unescape(value[1:-1]) return ("ok", value) if line.startswith("(error "): msg = line[7:-1] if msg.startswith('"') and msg.endswith('"'): msg = _unescape(msg[1:-1]) return ("error", msg) return ("error", f"Unexpected response: {line}") def _unescape(s: str) -> str: """Unescape an SX string literal.""" return ( s.replace("\\n", "\n") .replace("\\r", "\r") .replace("\\t", "\t") .replace('\\"', '"') .replace("\\\\", "\\") ) def _to_python(val: Any) -> Any: """Convert an SX parsed value to a plain Python value.""" from .types import NIL as _NIL if val is None or val is _NIL: return None if hasattr(val, "name"): # Symbol or Keyword return val.name return val def _to_str(val: Any) -> str: """Convert an SX parsed value to a Python string.""" if isinstance(val, str): return val if hasattr(val, "name"): return val.name return str(val) def _to_dict(val: Any) -> dict: """Convert an SX parsed value to a Python dict.""" if isinstance(val, dict): return val return {} def _serialize_for_ocaml(val: Any) -> str: """Serialize a Python value to SX text for sending to OCaml.""" if val is None: return "nil" if isinstance(val, bool): return "true" if val else "false" if isinstance(val, (int, float)): if isinstance(val, float) and val == int(val): return str(int(val)) return str(val) if isinstance(val, str): return f'"{_escape(val)}"' if isinstance(val, (list, tuple)): items = " ".join(_serialize_for_ocaml(v) for v in val) return f"(list {items})" if isinstance(val, dict): pairs = " ".join( f":{k} {_serialize_for_ocaml(v)}" for k, v in val.items() ) return "{" + pairs + "}" return f'"{_escape(str(val))}"'