409 lines
14 KiB
Python
409 lines
14 KiB
Python
"""
|
|
OCaml SX kernel ↔ Python coroutine bridge.
|
|
|
|
Manages a persistent OCaml subprocess (sx_server) that evaluates SX
|
|
expressions. When the OCaml kernel needs IO (database queries, service
|
|
calls), it yields an ``(io-request ...)`` back to Python, which fulfills
|
|
it asynchronously and sends an ``(io-response ...)`` back.
|
|
|
|
Usage::
|
|
|
|
bridge = OcamlBridge()
|
|
await bridge.start()
|
|
html = await bridge.render('(div (p "hello"))')
|
|
await bridge.stop()
|
|
"""
|
|
|
|
from __future__ import annotations
|
|
|
|
import asyncio
|
|
import logging
|
|
import os
|
|
from typing import Any
|
|
|
|
_logger = logging.getLogger("sx.ocaml")
|
|
|
|
# Default binary path — can be overridden via SX_OCAML_BIN env var
|
|
_DEFAULT_BIN = os.path.join(
|
|
os.path.dirname(__file__),
|
|
"../../hosts/ocaml/_build/default/bin/sx_server.exe",
|
|
)
|
|
|
|
|
|
class OcamlBridgeError(Exception):
|
|
"""Error from the OCaml SX kernel."""
|
|
|
|
|
|
class OcamlBridge:
|
|
"""Async bridge to a persistent OCaml SX subprocess."""
|
|
|
|
def __init__(self, binary: str | None = None):
|
|
self._binary = binary or os.environ.get("SX_OCAML_BIN") or _DEFAULT_BIN
|
|
self._proc: asyncio.subprocess.Process | None = None
|
|
self._lock = asyncio.Lock()
|
|
self._started = False
|
|
self._components_loaded = False
|
|
|
|
async def start(self) -> None:
|
|
"""Launch the OCaml subprocess and wait for (ready)."""
|
|
if self._started:
|
|
return
|
|
|
|
bin_path = os.path.abspath(self._binary)
|
|
if not os.path.isfile(bin_path):
|
|
raise FileNotFoundError(
|
|
f"OCaml SX server binary not found: {bin_path}\n"
|
|
f"Build with: cd hosts/ocaml && eval $(opam env) && dune build"
|
|
)
|
|
|
|
_logger.info("Starting OCaml SX kernel: %s", bin_path)
|
|
self._proc = await asyncio.create_subprocess_exec(
|
|
bin_path,
|
|
stdin=asyncio.subprocess.PIPE,
|
|
stdout=asyncio.subprocess.PIPE,
|
|
stderr=asyncio.subprocess.PIPE,
|
|
)
|
|
|
|
# Wait for (ready)
|
|
line = await self._readline()
|
|
if line != "(ready)":
|
|
raise OcamlBridgeError(f"Expected (ready), got: {line!r}")
|
|
|
|
self._started = True
|
|
|
|
# Verify engine identity
|
|
self._send("(ping)")
|
|
kind, engine = await self._read_response()
|
|
engine_name = engine if kind == "ok" else "unknown"
|
|
_logger.info("OCaml SX kernel ready (pid=%d, engine=%s)", self._proc.pid, engine_name)
|
|
|
|
async def stop(self) -> None:
|
|
"""Terminate the subprocess."""
|
|
if self._proc and self._proc.returncode is None:
|
|
self._proc.stdin.close()
|
|
try:
|
|
await asyncio.wait_for(self._proc.wait(), timeout=5.0)
|
|
except asyncio.TimeoutError:
|
|
self._proc.kill()
|
|
await self._proc.wait()
|
|
_logger.info("OCaml SX kernel stopped")
|
|
self._proc = None
|
|
self._started = False
|
|
|
|
async def ping(self) -> str:
|
|
"""Health check — returns engine name (e.g. 'ocaml-cek')."""
|
|
async with self._lock:
|
|
self._send("(ping)")
|
|
kind, value = await self._read_response()
|
|
return value or "" if kind == "ok" else ""
|
|
|
|
async def load(self, path: str) -> int:
|
|
"""Load an .sx file for side effects (defcomp, define, defmacro)."""
|
|
async with self._lock:
|
|
self._send(f'(load "{_escape(path)}")')
|
|
kind, value = await self._read_response()
|
|
if kind == "error":
|
|
raise OcamlBridgeError(f"load {path}: {value}")
|
|
return int(float(value)) if value else 0
|
|
|
|
async def load_source(self, source: str) -> int:
|
|
"""Evaluate SX source for side effects."""
|
|
async with self._lock:
|
|
self._send(f'(load-source "{_escape(source)}")')
|
|
kind, value = await self._read_response()
|
|
if kind == "error":
|
|
raise OcamlBridgeError(f"load-source: {value}")
|
|
return int(float(value)) if value else 0
|
|
|
|
async def eval(self, source: str) -> str:
|
|
"""Evaluate SX expression, return serialized result."""
|
|
async with self._lock:
|
|
self._send(f'(eval "{_escape(source)}")')
|
|
kind, value = await self._read_response()
|
|
if kind == "error":
|
|
raise OcamlBridgeError(f"eval: {value}")
|
|
return value or ""
|
|
|
|
async def render(
|
|
self,
|
|
source: str,
|
|
ctx: dict[str, Any] | None = None,
|
|
) -> str:
|
|
"""Render SX to HTML, handling io-requests via Python async IO."""
|
|
await self._ensure_components()
|
|
async with self._lock:
|
|
self._send(f'(render "{_escape(source)}")')
|
|
return await self._read_until_ok(ctx)
|
|
|
|
async def _ensure_components(self) -> None:
|
|
"""Load component definitions into the kernel on first use."""
|
|
if self._components_loaded:
|
|
return
|
|
self._components_loaded = True
|
|
try:
|
|
from .jinja_bridge import get_component_env, _CLIENT_LIBRARY_SOURCES
|
|
from .parser import serialize
|
|
from .types import Component, Island, Macro
|
|
|
|
env = get_component_env()
|
|
parts: list[str] = list(_CLIENT_LIBRARY_SOURCES)
|
|
for key, val in env.items():
|
|
if isinstance(val, Island):
|
|
ps = ["&key"] + list(val.params)
|
|
if val.has_children:
|
|
ps.extend(["&rest", "children"])
|
|
parts.append(f"(defisland ~{val.name} ({' '.join(ps)}) {serialize(val.body)})")
|
|
elif isinstance(val, Component):
|
|
ps = ["&key"] + list(val.params)
|
|
if val.has_children:
|
|
ps.extend(["&rest", "children"])
|
|
parts.append(f"(defcomp ~{val.name} ({' '.join(ps)}) {serialize(val.body)})")
|
|
elif isinstance(val, Macro):
|
|
ps = list(val.params)
|
|
if val.rest_param:
|
|
ps.extend(["&rest", val.rest_param])
|
|
parts.append(f"(defmacro {val.name} ({' '.join(ps)}) {serialize(val.body)})")
|
|
if parts:
|
|
source = "\n".join(parts)
|
|
await self.load_source(source)
|
|
_logger.info("Loaded %d definitions into OCaml kernel", len(parts))
|
|
except Exception as e:
|
|
_logger.error("Failed to load components into OCaml kernel: %s", e)
|
|
self._components_loaded = False # retry next time
|
|
|
|
async def reset(self) -> None:
|
|
"""Reset the kernel environment to pristine state."""
|
|
async with self._lock:
|
|
self._send("(reset)")
|
|
kind, value = await self._read_response()
|
|
if kind == "error":
|
|
raise OcamlBridgeError(f"reset: {value}")
|
|
|
|
# ------------------------------------------------------------------
|
|
# Internal protocol handling
|
|
# ------------------------------------------------------------------
|
|
|
|
def _send(self, line: str) -> None:
|
|
"""Write a line to the subprocess stdin."""
|
|
assert self._proc and self._proc.stdin
|
|
self._proc.stdin.write((line + "\n").encode())
|
|
|
|
async def _readline(self) -> str:
|
|
"""Read a line from the subprocess stdout."""
|
|
assert self._proc and self._proc.stdout
|
|
data = await self._proc.stdout.readline()
|
|
if not data:
|
|
# Process died — collect stderr for diagnostics
|
|
stderr = b""
|
|
if self._proc.stderr:
|
|
stderr = await self._proc.stderr.read()
|
|
raise OcamlBridgeError(
|
|
f"OCaml subprocess died unexpectedly. stderr: {stderr.decode(errors='replace')}"
|
|
)
|
|
return data.decode().rstrip("\n")
|
|
|
|
async def _read_response(self) -> tuple[str, str | None]:
|
|
"""Read a single (ok ...) or (error ...) response.
|
|
|
|
Returns (kind, value) where kind is "ok" or "error".
|
|
"""
|
|
line = await self._readline()
|
|
return _parse_response(line)
|
|
|
|
async def _read_until_ok(
|
|
self,
|
|
ctx: dict[str, Any] | None = None,
|
|
) -> str:
|
|
"""Read lines until (ok ...) or (error ...).
|
|
|
|
Handles (io-request ...) by fulfilling IO and sending (io-response ...).
|
|
"""
|
|
while True:
|
|
line = await self._readline()
|
|
|
|
if line.startswith("(io-request "):
|
|
result = await self._handle_io_request(line, ctx)
|
|
# Send response back to OCaml
|
|
self._send(f"(io-response {_serialize_for_ocaml(result)})")
|
|
continue
|
|
|
|
kind, value = _parse_response(line)
|
|
if kind == "error":
|
|
raise OcamlBridgeError(value or "Unknown error")
|
|
# kind == "ok"
|
|
return value or ""
|
|
|
|
async def _handle_io_request(
|
|
self,
|
|
line: str,
|
|
ctx: dict[str, Any] | None,
|
|
) -> Any:
|
|
"""Dispatch an io-request to the appropriate Python handler."""
|
|
from .parser import parse_all
|
|
|
|
# Parse the io-request
|
|
parsed = parse_all(line)
|
|
if not parsed or not isinstance(parsed[0], list):
|
|
raise OcamlBridgeError(f"Malformed io-request: {line}")
|
|
|
|
parts = parsed[0]
|
|
# parts = [Symbol("io-request"), name_str, ...args]
|
|
if len(parts) < 2:
|
|
raise OcamlBridgeError(f"Malformed io-request: {line}")
|
|
|
|
req_name = _to_str(parts[1])
|
|
args = parts[2:]
|
|
|
|
if req_name == "query":
|
|
return await self._io_query(args)
|
|
elif req_name == "action":
|
|
return await self._io_action(args)
|
|
elif req_name == "request-arg":
|
|
return self._io_request_arg(args)
|
|
elif req_name == "request-method":
|
|
return self._io_request_method()
|
|
elif req_name == "ctx":
|
|
return self._io_ctx(args, ctx)
|
|
else:
|
|
raise OcamlBridgeError(f"Unknown io-request type: {req_name}")
|
|
|
|
async def _io_query(self, args: list) -> Any:
|
|
"""Handle (io-request "query" service name params...)."""
|
|
from shared.infrastructure.internal import fetch_data
|
|
|
|
service = _to_str(args[0]) if len(args) > 0 else ""
|
|
query = _to_str(args[1]) if len(args) > 1 else ""
|
|
params = _to_dict(args[2]) if len(args) > 2 else {}
|
|
return await fetch_data(service, query, params)
|
|
|
|
async def _io_action(self, args: list) -> Any:
|
|
"""Handle (io-request "action" service name payload...)."""
|
|
from shared.infrastructure.internal import call_action
|
|
|
|
service = _to_str(args[0]) if len(args) > 0 else ""
|
|
action = _to_str(args[1]) if len(args) > 1 else ""
|
|
payload = _to_dict(args[2]) if len(args) > 2 else {}
|
|
return await call_action(service, action, payload)
|
|
|
|
def _io_request_arg(self, args: list) -> Any:
|
|
"""Handle (io-request "request-arg" name)."""
|
|
try:
|
|
from quart import request
|
|
name = _to_str(args[0]) if args else ""
|
|
return request.args.get(name)
|
|
except RuntimeError:
|
|
return None
|
|
|
|
def _io_request_method(self) -> str:
|
|
"""Handle (io-request "request-method")."""
|
|
try:
|
|
from quart import request
|
|
return request.method
|
|
except RuntimeError:
|
|
return "GET"
|
|
|
|
def _io_ctx(self, args: list, ctx: dict[str, Any] | None) -> Any:
|
|
"""Handle (io-request "ctx" key)."""
|
|
if ctx is None:
|
|
return None
|
|
key = _to_str(args[0]) if args else ""
|
|
return ctx.get(key)
|
|
|
|
|
|
# ------------------------------------------------------------------
|
|
# Module-level singleton
|
|
# ------------------------------------------------------------------
|
|
|
|
_bridge: OcamlBridge | None = None
|
|
|
|
|
|
async def get_bridge() -> OcamlBridge:
|
|
"""Get or create the singleton bridge instance."""
|
|
global _bridge
|
|
if _bridge is None:
|
|
_bridge = OcamlBridge()
|
|
if not _bridge._started:
|
|
await _bridge.start()
|
|
return _bridge
|
|
|
|
|
|
# ------------------------------------------------------------------
|
|
# Helpers
|
|
# ------------------------------------------------------------------
|
|
|
|
def _escape(s: str) -> str:
|
|
"""Escape a string for embedding in an SX string literal."""
|
|
return s.replace("\\", "\\\\").replace('"', '\\"').replace("\n", "\\n").replace("\r", "\\r").replace("\t", "\\t")
|
|
|
|
|
|
def _parse_response(line: str) -> tuple[str, str | None]:
|
|
"""Parse an (ok ...) or (error ...) response line.
|
|
|
|
Returns (kind, value) tuple.
|
|
"""
|
|
line = line.strip()
|
|
if line == "(ok)":
|
|
return ("ok", None)
|
|
if line.startswith("(ok "):
|
|
value = line[4:-1] # strip (ok and )
|
|
# If the value is a quoted string, unquote it
|
|
if value.startswith('"') and value.endswith('"'):
|
|
value = _unescape(value[1:-1])
|
|
return ("ok", value)
|
|
if line.startswith("(error "):
|
|
msg = line[7:-1]
|
|
if msg.startswith('"') and msg.endswith('"'):
|
|
msg = _unescape(msg[1:-1])
|
|
return ("error", msg)
|
|
return ("error", f"Unexpected response: {line}")
|
|
|
|
|
|
def _unescape(s: str) -> str:
|
|
"""Unescape an SX string literal."""
|
|
return (
|
|
s.replace("\\n", "\n")
|
|
.replace("\\r", "\r")
|
|
.replace("\\t", "\t")
|
|
.replace('\\"', '"')
|
|
.replace("\\\\", "\\")
|
|
)
|
|
|
|
|
|
def _to_str(val: Any) -> str:
|
|
"""Convert an SX parsed value to a Python string."""
|
|
if isinstance(val, str):
|
|
return val
|
|
if hasattr(val, "name"):
|
|
return val.name
|
|
return str(val)
|
|
|
|
|
|
def _to_dict(val: Any) -> dict:
|
|
"""Convert an SX parsed value to a Python dict."""
|
|
if isinstance(val, dict):
|
|
return val
|
|
return {}
|
|
|
|
|
|
def _serialize_for_ocaml(val: Any) -> str:
|
|
"""Serialize a Python value to SX text for sending to OCaml."""
|
|
if val is None:
|
|
return "nil"
|
|
if isinstance(val, bool):
|
|
return "true" if val else "false"
|
|
if isinstance(val, (int, float)):
|
|
if isinstance(val, float) and val == int(val):
|
|
return str(int(val))
|
|
return str(val)
|
|
if isinstance(val, str):
|
|
return f'"{_escape(val)}"'
|
|
if isinstance(val, (list, tuple)):
|
|
items = " ".join(_serialize_for_ocaml(v) for v in val)
|
|
return f"(list {items})"
|
|
if isinstance(val, dict):
|
|
pairs = " ".join(
|
|
f":{k} {_serialize_for_ocaml(v)}" for k, v in val.items()
|
|
)
|
|
return "{" + pairs + "}"
|
|
return f'"{_escape(str(val))}"'
|