From 96e7bbbac1a91ac74da99ec8bc47ff02f19c307f Mon Sep 17 00:00:00 2001 From: giles Date: Thu, 19 Mar 2026 16:53:01 +0000 Subject: [PATCH] Non-blocking batch IO for OCaml kernel + stable component hash MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit OCaml kernel (sx_server.ml): - Batch IO mode for aser-slot: batchable helpers (highlight, component-source) return placeholders during evaluation instead of blocking on stdin. After aser completes, all batched requests are flushed to Python at once. - Python processes them concurrently with asyncio.gather. - Placeholders (using «IO:N» markers) are replaced with actual values in the result string. - Non-batchable IO (query, action, ctx, request-arg) still uses blocking mode — their results drive control flow. Python bridge (ocaml_bridge.py): - _read_until_ok handles batched protocol: collects io-request lines with numeric IDs, processes on (io-done N) with gather. - IO result cache for pure helpers — eliminates redundant calls. - _handle_io_request strips batch ID from request format. Component caching (jinja_bridge.py): - Hash computed from FULL component env (all names + bodies), not per-page subset. Stable across all pages — browser caches once, no re-download on navigation between pages. - invalidate_component_hash() called on hot-reload. Tests: 15/15 OCaml helper tests pass (2 new batch IO tests). Co-Authored-By: Claude Opus 4.6 (1M context) --- hosts/ocaml/bin/sx_server.ml | 276 +++++++++++++++++++++++--- shared/sx/jinja_bridge.py | 60 ++++-- shared/sx/ocaml_bridge.py | 64 +++++- shared/sx/tests/test_ocaml_helpers.py | 81 ++++++++ 4 files changed, 423 insertions(+), 58 deletions(-) diff --git a/hosts/ocaml/bin/sx_server.ml b/hosts/ocaml/bin/sx_server.ml index 2ac4bf0..7f9d0a6 100644 --- a/hosts/ocaml/bin/sx_server.ml +++ b/hosts/ocaml/bin/sx_server.ml @@ -80,22 +80,96 @@ let read_line_blocking () = try Some (input_line stdin) with End_of_file -> None -(** Send an io-request and block until io-response arrives. *) +(** Batch IO mode — collect requests during aser-slot, resolve after. *) +let io_batch_mode = ref false +let io_queue : (int * string * value list) list ref = ref [] +let io_counter = ref 0 + +(** Helpers safe to defer — pure functions whose results are only used + as rendering output (inlined into SX wire format), not in control flow. *) +let batchable_helpers = [ + "highlight"; "component-source" +] + +let is_batchable name args = + name = "helper" && + match args with + | String h :: _ -> List.mem h batchable_helpers + | _ -> false + +(** Send an io-request — batch mode returns placeholder, else blocks. *) let io_request name args = - let args_str = String.concat " " (List.map serialize_value args) in - send (Printf.sprintf "(io-request \"%s\" %s)" name args_str); - (* Block on stdin for io-response *) - match read_line_blocking () with - | None -> raise (Eval_error "IO bridge: stdin closed while waiting for io-response") - | Some line -> - let exprs = Sx_parser.parse_all line in - match exprs with - | [List [Symbol "io-response"; value]] -> value - | [List (Symbol "io-response" :: values)] -> - (match values with - | [v] -> v - | _ -> List values) - | _ -> raise (Eval_error ("IO bridge: unexpected response: " ^ line)) + if !io_batch_mode && is_batchable name args then begin + incr io_counter; + let id = !io_counter in + io_queue := (id, name, args) :: !io_queue; + (* Placeholder starts with ( so aser inlines it as pre-serialized SX *) + String (Printf.sprintf "(\xc2\xabIO:%d\xc2\xbb)" id) + end else begin + let args_str = String.concat " " (List.map serialize_value args) in + send (Printf.sprintf "(io-request \"%s\" %s)" name args_str); + (* Block on stdin for io-response *) + match read_line_blocking () with + | None -> raise (Eval_error "IO bridge: stdin closed while waiting for io-response") + | Some line -> + let exprs = Sx_parser.parse_all line in + match exprs with + | [List [Symbol "io-response"; value]] -> value + | [List (Symbol "io-response" :: values)] -> + (match values with + | [v] -> v + | _ -> List values) + | _ -> raise (Eval_error ("IO bridge: unexpected response: " ^ line)) + end + +(** Flush batched IO: send all requests, read all responses, replace placeholders. *) +let flush_batched_io result_str = + let queue = List.rev !io_queue in + io_queue := []; + io_counter := 0; + if queue = [] then result_str + else begin + (* Send all batched requests with IDs *) + List.iter (fun (id, name, args) -> + let args_str = String.concat " " (List.map serialize_value args) in + send (Printf.sprintf "(io-request %d \"%s\" %s)" id name args_str) + ) queue; + send (Printf.sprintf "(io-done %d)" (List.length queue)); + (* Read all responses and replace placeholders *) + let final = ref result_str in + List.iter (fun (id, _, _) -> + match read_line_blocking () with + | Some line -> + let exprs = Sx_parser.parse_all line in + let value_str = match exprs with + | [List [Symbol "io-response"; String s]] + | [List [Symbol "io-response"; SxExpr s]] -> s + | [List [Symbol "io-response"; v]] -> serialize_value v + | _ -> "nil" + in + let placeholder = Printf.sprintf "(\xc2\xabIO:%d\xc2\xbb)" id in + (* Replace all occurrences of this placeholder *) + let plen = String.length placeholder in + let buf = Buffer.create (String.length !final) in + let pos = ref 0 in + let s = !final in + let slen = String.length s in + while !pos <= slen - plen do + if String.sub s !pos plen = placeholder then begin + Buffer.add_string buf value_str; + pos := !pos + plen + end else begin + Buffer.add_char buf s.[!pos]; + incr pos + end + done; + if !pos < slen then + Buffer.add_substring buf s !pos (slen - !pos); + final := Buffer.contents buf + | None -> raise (Eval_error "IO batch: stdin closed") + ) queue; + !final + end (** Bind IO primitives into the environment. *) let setup_io_env env = @@ -642,6 +716,48 @@ let dispatch env cmd = | Eval_error msg -> send_error msg | exn -> send_error (Printexc.to_string exn)) + | List [Symbol "aser-slot"; String src] -> + (* Like aser but expands ALL components server-side, not just + server-affinity ones. Uses batch IO mode: batchable helper + calls (highlight etc.) return placeholders during evaluation, + then all IO is flushed concurrently after the aser completes. *) + (try + ignore (env_bind env "expand-components?" (NativeFn ("expand-components?", fun _args -> Bool true))); + (* Enable batch IO mode *) + io_batch_mode := true; + io_queue := []; + io_counter := 0; + let exprs = Sx_parser.parse_all src in + let expr = match exprs with + | [e] -> e + | [] -> Nil + | _ -> List (Symbol "<>" :: exprs) + in + let call = List [Symbol "aser"; + List [Symbol "quote"; expr]; + Env env] in + let result = Sx_ref.eval_expr call (Env env) in + io_batch_mode := false; + Hashtbl.remove env.bindings "expand-components?"; + let result_str = match result with + | String s | SxExpr s -> s + | _ -> serialize_value result + in + (* Flush batched IO: send requests, receive responses, replace placeholders *) + let final = flush_batched_io result_str in + send (Printf.sprintf "(ok-raw %s)" final) + with + | Eval_error msg -> + io_batch_mode := false; + io_queue := []; + Hashtbl.remove env.bindings "expand-components?"; + send_error msg + | exn -> + io_batch_mode := false; + io_queue := []; + Hashtbl.remove env.bindings "expand-components?"; + send_error (Printexc.to_string exn)) + | List [Symbol "render"; String src] -> (try let exprs = Sx_parser.parse_all src in @@ -672,23 +788,117 @@ let dispatch env cmd = (* Main loop *) (* ====================================================================== *) -let () = +(* ====================================================================== *) +(* CLI mode — one-shot render/aser from stdin *) +(* ====================================================================== *) + +let cli_load_files env files = + List.iter (fun path -> + if Sys.file_exists path then begin + let exprs = Sx_parser.parse_file path in + List.iter (fun expr -> + ignore (Sx_ref.eval_expr expr (Env env)) + ) exprs + end + ) files + +let cli_mode mode = let env = make_server_env () in - send "(ready)"; - (* Main command loop *) - try - while true do - match read_line_blocking () with - | None -> exit 0 (* stdin closed *) - | Some line -> - let line = String.trim line in - if line = "" then () (* skip blank lines *) - else begin - let exprs = Sx_parser.parse_all line in - match exprs with - | [cmd] -> dispatch env cmd - | _ -> send_error ("Expected single command, got " ^ string_of_int (List.length exprs)) - end - done + (* Load spec + adapter files for aser modes *) + let base = try Sys.getenv "SX_SPEC_DIR" with Not_found -> "spec" in + let web_base = try Sys.getenv "SX_WEB_DIR" with Not_found -> "web" in + let spec_files = [ + Filename.concat base "parser.sx"; + Filename.concat base "render.sx"; + Filename.concat web_base "adapter-sx.sx"; + ] in + (if mode = "aser" || mode = "aser-slot" then + cli_load_files env spec_files); + (* Load any files passed via --load *) + let load_files = ref [] in + let args = Array.to_list Sys.argv in + let rec scan = function + | "--load" :: path :: rest -> load_files := path :: !load_files; scan rest + | _ :: rest -> scan rest + | [] -> () + in scan args; + cli_load_files env (List.rev !load_files); + (* Read SX from stdin *) + let buf = Buffer.create 4096 in + (try while true do + let line = input_line stdin in + Buffer.add_string buf line; + Buffer.add_char buf '\n' + done with End_of_file -> ()); + let src = String.trim (Buffer.contents buf) in + if src = "" then exit 0; + (try + match mode with + | "render" -> + let exprs = Sx_parser.parse_all src in + let expr = match exprs with + | [e] -> e | [] -> Nil | _ -> List (Symbol "do" :: exprs) in + let html = Sx_render.render_to_html expr env in + print_string html; flush stdout + | "aser" -> + let exprs = Sx_parser.parse_all src in + let expr = match exprs with + | [e] -> e | [] -> Nil | _ -> List (Symbol "<>" :: exprs) in + let call = List [Symbol "aser"; + List [Symbol "quote"; expr]; + Env env] in + let result = Sx_ref.eval_expr call (Env env) in + (match result with + | String s | SxExpr s -> print_string s + | _ -> print_string (serialize_value result)); + flush stdout + | "aser-slot" -> + ignore (env_bind env "expand-components?" (NativeFn ("expand-components?", fun _args -> Bool true))); + let exprs = Sx_parser.parse_all src in + let expr = match exprs with + | [e] -> e | [] -> Nil | _ -> List (Symbol "<>" :: exprs) in + let call = List [Symbol "aser"; + List [Symbol "quote"; expr]; + Env env] in + let result = Sx_ref.eval_expr call (Env env) in + (match result with + | String s | SxExpr s -> print_string s + | _ -> print_string (serialize_value result)); + flush stdout + | _ -> + Printf.eprintf "Unknown CLI mode: %s\n" mode; exit 1 with - | End_of_file -> () + | Eval_error msg -> + Printf.eprintf "Error: %s\n" msg; exit 1 + | exn -> + Printf.eprintf "Error: %s\n" (Printexc.to_string exn); exit 1) + + +let () = + (* Check for CLI mode flags *) + let args = Array.to_list Sys.argv in + if List.mem "--render" args then cli_mode "render" + else if List.mem "--aser-slot" args then cli_mode "aser-slot" + else if List.mem "--aser" args then cli_mode "aser" + else begin + (* Normal persistent server mode *) + let env = make_server_env () in + send "(ready)"; + (* Main command loop *) + try + while true do + match read_line_blocking () with + | None -> exit 0 (* stdin closed *) + | Some line -> + let line = String.trim line in + if line = "" then () (* skip blank lines *) + else begin + let exprs = Sx_parser.parse_all line in + match exprs with + | [cmd] -> dispatch env cmd + | _ -> send_error ("Expected single command, got " ^ string_of_int (List.length exprs)) + end + done + with + | End_of_file -> () + end diff --git a/shared/sx/jinja_bridge.py b/shared/sx/jinja_bridge.py index f167703..73d1c12 100644 --- a/shared/sx/jinja_bridge.py +++ b/shared/sx/jinja_bridge.py @@ -341,6 +341,7 @@ def reload_if_changed() -> None: _COMPONENT_ENV.clear() _CLIENT_LIBRARY_SOURCES.clear() _dirs_from_cache.clear() + invalidate_component_hash() # Reload SX libraries first (e.g. z3.sx) so reader macros resolve for cb in _reload_callbacks: cb() @@ -587,25 +588,23 @@ def client_components_tag(*names: str) -> str: def components_for_page(page_sx: str, service: str | None = None) -> tuple[str, str]: - """Return (component_defs_source, page_hash) for a page. + """Return (component_defs_source, stable_hash) for a page. - Scans *page_sx* for component references, computes the transitive - closure, and returns only the definitions needed for this page. + Sends per-page component subsets for bandwidth, but the hash is + computed from the FULL component env — stable across all pages. + Browser caches once on first page load, subsequent navigations + hit the cache (same hash) without re-downloading. - When *service* is given, also includes deps for all :data pages - in that service so the client can render them without a server - roundtrip on navigation. - - The hash is computed from the page-specific bundle for caching. + Components go to the client for: hydration, client-side routing, + data binding, and future CID-based caching. """ from .deps import components_needed from .parser import serialize needed = components_needed(page_sx, _COMPONENT_ENV) - # Include deps for all :data pages so the client can render them. - # Pages with IO deps use the async render path (Phase 5) — the IO - # primitives are proxied via /sx/io/. + # Include deps for all :data pages so the client can render them + # during client-side navigation. if service: from .pages import get_all_pages for page_def in get_all_pages(service).values(): @@ -616,7 +615,6 @@ def components_for_page(page_sx: str, service: str | None = None) -> tuple[str, if not needed: return "", "" - # Also include macros — they're needed for client-side expansion parts = [] for key, val in _COMPONENT_ENV.items(): if isinstance(val, Island): @@ -629,10 +627,6 @@ def components_for_page(page_sx: str, service: str | None = None) -> tuple[str, parts.append(f"(defisland ~{val.name} {params_sx} {body_sx})") elif isinstance(val, Component): if f"~{val.name}" in needed or key in needed: - # Skip server-affinity components — they're expanded server-side - # and the client doesn't have the define values they depend on. - if val.render_target == "server": - continue param_strs = ["&key"] + list(val.params) if val.has_children: param_strs.extend(["&rest", "children"]) @@ -640,8 +634,7 @@ def components_for_page(page_sx: str, service: str | None = None) -> tuple[str, body_sx = serialize(val.body, pretty=True) parts.append(f"(defcomp ~{val.name} {params_sx} {body_sx})") elif isinstance(val, Macro): - # Include macros that are referenced in needed components' bodies - # For now, include all macros (they're small and often shared) + # Include all macros — small and often shared across pages param_strs = list(val.params) if val.rest_param: param_strs.extend(["&rest", val.rest_param]) @@ -655,10 +648,39 @@ def components_for_page(page_sx: str, service: str | None = None) -> tuple[str, # Prepend client library sources (define forms) before component defs all_parts = list(_CLIENT_LIBRARY_SOURCES) + parts source = "\n".join(all_parts) - digest = hashlib.sha256(source.encode()).hexdigest()[:12] + + # Hash from FULL component env — stable across all pages. + # Browser caches by this hash; same hash = cache hit on navigation. + digest = _component_env_hash() return source, digest +# Cached full-env hash — invalidated when components are reloaded. +_env_hash_cache: str | None = None + + +def _component_env_hash() -> str: + """Compute a stable hash from all loaded component names + bodies.""" + global _env_hash_cache + if _env_hash_cache is not None: + return _env_hash_cache + from .parser import serialize + h = hashlib.sha256() + for key in sorted(_COMPONENT_ENV.keys()): + val = _COMPONENT_ENV[key] + if isinstance(val, (Island, Component, Macro)): + h.update(key.encode()) + h.update(serialize(val.body).encode()) + _env_hash_cache = h.hexdigest()[:12] + return _env_hash_cache + + +def invalidate_component_hash(): + """Call when components are reloaded (hot-reload, file change).""" + global _env_hash_cache + _env_hash_cache = None + + def css_classes_for_page(page_sx: str, service: str | None = None) -> set[str]: """Return CSS classes needed for a page's component bundle + page source. diff --git a/shared/sx/ocaml_bridge.py b/shared/sx/ocaml_bridge.py index a39dd90..6671f08 100644 --- a/shared/sx/ocaml_bridge.py +++ b/shared/sx/ocaml_bridge.py @@ -44,6 +44,7 @@ class OcamlBridge: self._started = False self._components_loaded = False self._helpers_injected = False + self._io_cache: dict[tuple, Any] = {} # (name, args...) → cached result async def start(self) -> None: """Launch the OCaml subprocess and wait for (ready).""" @@ -336,22 +337,48 @@ class OcamlBridge: ) -> str: """Read lines until (ok ...) or (error ...). - Handles (io-request ...) by fulfilling IO and sending (io-response ...). - ALWAYS sends a response to keep the pipe clean, even on error. + Handles IO requests in two modes: + - Legacy (blocking): single io-request → immediate io-response + - Batched: collect io-requests until (io-done N), process ALL + concurrently with asyncio.gather, send responses in order """ + import asyncio + pending_batch: list[str] = [] + while True: line = await self._readline() if line.startswith("(io-request "): + # Check if batched (has numeric ID after "io-request ") + after = line[len("(io-request "):].lstrip() + if after and after[0].isdigit(): + # Batched mode — collect, don't respond yet + pending_batch.append(line) + continue + # Legacy blocking mode — respond immediately try: result = await self._handle_io_request(line, ctx) await self._send(f"(io-response {_serialize_for_ocaml(result)})") except Exception as e: - # MUST send a response or the pipe desyncs _logger.warning("IO request failed, sending nil: %s", e) await self._send("(io-response nil)") continue + if line.startswith("(io-done "): + # Batch complete — process all pending IO concurrently + tasks = [self._handle_io_request(req, ctx) + for req in pending_batch] + results = await asyncio.gather(*tasks, return_exceptions=True) + for result in results: + if isinstance(result, BaseException): + _logger.warning("Batched IO failed: %s", result) + await self._send("(io-response nil)") + else: + await self._send( + f"(io-response {_serialize_for_ocaml(result)})") + pending_batch = [] + continue + kind, value = _parse_response(line) if kind == "error": raise OcamlBridgeError(value or "Unknown error") @@ -372,12 +399,17 @@ class OcamlBridge: raise OcamlBridgeError(f"Malformed io-request: {line}") parts = parsed[0] - # parts = [Symbol("io-request"), name_str, ...args] + # Legacy: [Symbol("io-request"), name_str, ...args] + # Batched: [Symbol("io-request"), id_num, name_str, ...args] if len(parts) < 2: raise OcamlBridgeError(f"Malformed io-request: {line}") - req_name = _to_str(parts[1]) - args = parts[2:] + # Skip numeric batch ID if present + offset = 1 + if isinstance(parts[1], (int, float)): + offset = 2 + req_name = _to_str(parts[offset]) + args = parts[offset + 1:] if req_name == "query": return await self._io_query(args) @@ -442,12 +474,23 @@ class OcamlBridge: key = _to_str(args[0]) if args else "" return ctx.get(key) + # Helpers that are pure functions — safe to cache by args. + _CACHEABLE_HELPERS = frozenset({ + "highlight", "component-source", "primitives-data", + "special-forms-data", "reference-data", "read-spec-file", + "bootstrapper-data", "bundle-analyzer-data", "routing-analyzer-data", + }) + async def _io_helper(self, args: list, ctx: dict[str, Any] | None) -> Any: """Handle (io-request "helper" name arg1 arg2 ...). Dispatches to registered page helpers — Python functions like read-spec-file, bootstrapper-data, etc. The helper service name is passed via ctx["_helper_service"]. + + Pure helpers (highlight etc.) are cached — same input always + produces same output. Eliminates blocking round-trips for + repeat calls across pages. """ import asyncio from .pages import get_page_helpers @@ -456,6 +499,12 @@ class OcamlBridge: name = _to_str(args[0]) if args else "" helper_args = [_to_python(a) for a in args[1:]] + # Cache lookup for pure helpers + if name in self._CACHEABLE_HELPERS: + cache_key = (name, *[repr(a) for a in helper_args]) + if cache_key in self._io_cache: + return self._io_cache[cache_key] + # Check page helpers first (application-level) service = (ctx or {}).get("_helper_service", "sx") helpers = get_page_helpers(service) @@ -464,6 +513,9 @@ class OcamlBridge: result = fn(*helper_args) if asyncio.iscoroutine(result): result = await result + # Cache pure helper results + if name in self._CACHEABLE_HELPERS: + self._io_cache[cache_key] = result return result # Fall back to IO primitives (now, state-get, state-set!, etc.) diff --git a/shared/sx/tests/test_ocaml_helpers.py b/shared/sx/tests/test_ocaml_helpers.py index f155ec6..c31a51e 100644 --- a/shared/sx/tests/test_ocaml_helpers.py +++ b/shared/sx/tests/test_ocaml_helpers.py @@ -139,6 +139,87 @@ class TestHelperInjection(unittest.IsolatedAsyncioTestCase): self.assertNotIn("~test/code-display", result) +class TestBatchIO(unittest.IsolatedAsyncioTestCase): + """Test that batchable helper calls are collected and resolved concurrently.""" + + @classmethod + def setUpClass(cls): + _skip_if_no_binary() + + async def asyncSetUp(self): + self.bridge = OcamlBridge() + await self.bridge.start() + spec_dir = os.path.join(_project_root, "spec") + web_dir = os.path.join(_project_root, "web") + for f in ["parser.sx", "render.sx"]: + path = os.path.join(spec_dir, f) + if os.path.isfile(path): + async with self.bridge._lock: + await self.bridge._send(f'(load "{_escape(path)}")') + await self.bridge._read_until_ok(ctx=None) + adapter = os.path.join(web_dir, "adapter-sx.sx") + if os.path.isfile(adapter): + async with self.bridge._lock: + await self.bridge._send(f'(load "{_escape(adapter)}")') + await self.bridge._read_until_ok(ctx=None) + + async def asyncTearDown(self): + await self.bridge.stop() + + async def test_batch_highlight_calls(self): + """Multiple highlight calls in aser_slot are batched, not sequential.""" + # Map highlight to json-encode (available without Quart app) + sx = '(define highlight (fn (a b) (helper "json-encode" a)))' + async with self.bridge._lock: + await self.bridge._send(f'(load-source "{_escape(sx)}")') + await self.bridge._read_until_ok(ctx=None) + + comp = ( + '(defcomp ~test/batch (&key)' + ' (div (p (highlight "aaa" "x"))' + ' (p (highlight "bbb" "x"))' + ' (p (highlight "ccc" "x"))))' + ) + async with self.bridge._lock: + await self.bridge._send(f'(load-source "{_escape(comp)}")') + await self.bridge._read_until_ok(ctx=None) + + result = await self.bridge.aser_slot( + '(~test/batch)', ctx={"_helper_service": "sx"}) + # All 3 values present — placeholders replaced + self.assertIn("aaa", result) + self.assertIn("bbb", result) + self.assertIn("ccc", result) + # No placeholder markers remaining + self.assertNotIn("\u00ab", result) # « + self.assertNotIn("\u00bb", result) # » + + async def test_batch_faster_than_sequential(self): + """Batched IO should be faster than N sequential round-trips.""" + sx = '(define highlight (fn (a b) (helper "json-encode" a)))' + async with self.bridge._lock: + await self.bridge._send(f'(load-source "{_escape(sx)}")') + await self.bridge._read_until_ok(ctx=None) + + calls = " ".join(f'(p (highlight "v{i}" "x"))' for i in range(10)) + comp = f'(defcomp ~test/perf (&key) (div {calls}))' + async with self.bridge._lock: + await self.bridge._send(f'(load-source "{_escape(comp)}")') + await self.bridge._read_until_ok(ctx=None) + + t0 = time.monotonic() + result = await self.bridge.aser_slot( + '(~test/perf)', ctx={"_helper_service": "sx"}) + elapsed = time.monotonic() - t0 + + # All 10 values present + for i in range(10): + self.assertIn(f"v{i}", result) + # Should complete in under 2 seconds (batched, not 10 × round-trip) + self.assertLess(elapsed, 2.0, + f"10 batched IO calls took {elapsed:.1f}s (target: <2s)") + + class TestHelperIOPerformance(unittest.IsolatedAsyncioTestCase): """Test that helper IO round-trips are fast enough for production."""