Non-blocking batch IO for OCaml kernel + stable component hash

OCaml kernel (sx_server.ml):
- Batch IO mode for aser-slot: batchable helpers (highlight,
  component-source) return placeholders during evaluation instead
  of blocking on stdin. After aser completes, all batched requests
  are flushed to Python at once.
- Python processes them concurrently with asyncio.gather.
- Placeholders (using «IO:N» markers) are replaced with actual
  values in the result string.
- Non-batchable IO (query, action, ctx, request-arg) still uses
  blocking mode — their results drive control flow.

Python bridge (ocaml_bridge.py):
- _read_until_ok handles batched protocol: collects io-request
  lines with numeric IDs, processes on (io-done N) with gather.
- IO result cache for pure helpers — eliminates redundant calls.
- _handle_io_request strips batch ID from request format.

Component caching (jinja_bridge.py):
- Hash computed from FULL component env (all names + bodies),
  not per-page subset. Stable across all pages — browser caches
  once, no re-download on navigation between pages.
- invalidate_component_hash() called on hot-reload.

Tests: 15/15 OCaml helper tests pass (2 new batch IO tests).

Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
This commit is contained in:
2026-03-19 16:53:01 +00:00
parent d3b3b4b720
commit 96e7bbbac1
4 changed files with 423 additions and 58 deletions

View File

@@ -341,6 +341,7 @@ def reload_if_changed() -> None:
_COMPONENT_ENV.clear()
_CLIENT_LIBRARY_SOURCES.clear()
_dirs_from_cache.clear()
invalidate_component_hash()
# Reload SX libraries first (e.g. z3.sx) so reader macros resolve
for cb in _reload_callbacks:
cb()
@@ -587,25 +588,23 @@ def client_components_tag(*names: str) -> str:
def components_for_page(page_sx: str, service: str | None = None) -> tuple[str, str]:
"""Return (component_defs_source, page_hash) for a page.
"""Return (component_defs_source, stable_hash) for a page.
Scans *page_sx* for component references, computes the transitive
closure, and returns only the definitions needed for this page.
Sends per-page component subsets for bandwidth, but the hash is
computed from the FULL component env — stable across all pages.
Browser caches once on first page load, subsequent navigations
hit the cache (same hash) without re-downloading.
When *service* is given, also includes deps for all :data pages
in that service so the client can render them without a server
roundtrip on navigation.
The hash is computed from the page-specific bundle for caching.
Components go to the client for: hydration, client-side routing,
data binding, and future CID-based caching.
"""
from .deps import components_needed
from .parser import serialize
needed = components_needed(page_sx, _COMPONENT_ENV)
# Include deps for all :data pages so the client can render them.
# Pages with IO deps use the async render path (Phase 5) — the IO
# primitives are proxied via /sx/io/<name>.
# Include deps for all :data pages so the client can render them
# during client-side navigation.
if service:
from .pages import get_all_pages
for page_def in get_all_pages(service).values():
@@ -616,7 +615,6 @@ def components_for_page(page_sx: str, service: str | None = None) -> tuple[str,
if not needed:
return "", ""
# Also include macros — they're needed for client-side expansion
parts = []
for key, val in _COMPONENT_ENV.items():
if isinstance(val, Island):
@@ -629,10 +627,6 @@ def components_for_page(page_sx: str, service: str | None = None) -> tuple[str,
parts.append(f"(defisland ~{val.name} {params_sx} {body_sx})")
elif isinstance(val, Component):
if f"~{val.name}" in needed or key in needed:
# Skip server-affinity components — they're expanded server-side
# and the client doesn't have the define values they depend on.
if val.render_target == "server":
continue
param_strs = ["&key"] + list(val.params)
if val.has_children:
param_strs.extend(["&rest", "children"])
@@ -640,8 +634,7 @@ def components_for_page(page_sx: str, service: str | None = None) -> tuple[str,
body_sx = serialize(val.body, pretty=True)
parts.append(f"(defcomp ~{val.name} {params_sx} {body_sx})")
elif isinstance(val, Macro):
# Include macros that are referenced in needed components' bodies
# For now, include all macros (they're small and often shared)
# Include all macros — small and often shared across pages
param_strs = list(val.params)
if val.rest_param:
param_strs.extend(["&rest", val.rest_param])
@@ -655,10 +648,39 @@ def components_for_page(page_sx: str, service: str | None = None) -> tuple[str,
# Prepend client library sources (define forms) before component defs
all_parts = list(_CLIENT_LIBRARY_SOURCES) + parts
source = "\n".join(all_parts)
digest = hashlib.sha256(source.encode()).hexdigest()[:12]
# Hash from FULL component env — stable across all pages.
# Browser caches by this hash; same hash = cache hit on navigation.
digest = _component_env_hash()
return source, digest
# Cached full-env hash — invalidated when components are reloaded.
_env_hash_cache: str | None = None
def _component_env_hash() -> str:
"""Compute a stable hash from all loaded component names + bodies."""
global _env_hash_cache
if _env_hash_cache is not None:
return _env_hash_cache
from .parser import serialize
h = hashlib.sha256()
for key in sorted(_COMPONENT_ENV.keys()):
val = _COMPONENT_ENV[key]
if isinstance(val, (Island, Component, Macro)):
h.update(key.encode())
h.update(serialize(val.body).encode())
_env_hash_cache = h.hexdigest()[:12]
return _env_hash_cache
def invalidate_component_hash():
"""Call when components are reloaded (hot-reload, file change)."""
global _env_hash_cache
_env_hash_cache = None
def css_classes_for_page(page_sx: str, service: str | None = None) -> set[str]:
"""Return CSS classes needed for a page's component bundle + page source.

View File

@@ -44,6 +44,7 @@ class OcamlBridge:
self._started = False
self._components_loaded = False
self._helpers_injected = False
self._io_cache: dict[tuple, Any] = {} # (name, args...) → cached result
async def start(self) -> None:
"""Launch the OCaml subprocess and wait for (ready)."""
@@ -336,22 +337,48 @@ class OcamlBridge:
) -> str:
"""Read lines until (ok ...) or (error ...).
Handles (io-request ...) by fulfilling IO and sending (io-response ...).
ALWAYS sends a response to keep the pipe clean, even on error.
Handles IO requests in two modes:
- Legacy (blocking): single io-request → immediate io-response
- Batched: collect io-requests until (io-done N), process ALL
concurrently with asyncio.gather, send responses in order
"""
import asyncio
pending_batch: list[str] = []
while True:
line = await self._readline()
if line.startswith("(io-request "):
# Check if batched (has numeric ID after "io-request ")
after = line[len("(io-request "):].lstrip()
if after and after[0].isdigit():
# Batched mode — collect, don't respond yet
pending_batch.append(line)
continue
# Legacy blocking mode — respond immediately
try:
result = await self._handle_io_request(line, ctx)
await self._send(f"(io-response {_serialize_for_ocaml(result)})")
except Exception as e:
# MUST send a response or the pipe desyncs
_logger.warning("IO request failed, sending nil: %s", e)
await self._send("(io-response nil)")
continue
if line.startswith("(io-done "):
# Batch complete — process all pending IO concurrently
tasks = [self._handle_io_request(req, ctx)
for req in pending_batch]
results = await asyncio.gather(*tasks, return_exceptions=True)
for result in results:
if isinstance(result, BaseException):
_logger.warning("Batched IO failed: %s", result)
await self._send("(io-response nil)")
else:
await self._send(
f"(io-response {_serialize_for_ocaml(result)})")
pending_batch = []
continue
kind, value = _parse_response(line)
if kind == "error":
raise OcamlBridgeError(value or "Unknown error")
@@ -372,12 +399,17 @@ class OcamlBridge:
raise OcamlBridgeError(f"Malformed io-request: {line}")
parts = parsed[0]
# parts = [Symbol("io-request"), name_str, ...args]
# Legacy: [Symbol("io-request"), name_str, ...args]
# Batched: [Symbol("io-request"), id_num, name_str, ...args]
if len(parts) < 2:
raise OcamlBridgeError(f"Malformed io-request: {line}")
req_name = _to_str(parts[1])
args = parts[2:]
# Skip numeric batch ID if present
offset = 1
if isinstance(parts[1], (int, float)):
offset = 2
req_name = _to_str(parts[offset])
args = parts[offset + 1:]
if req_name == "query":
return await self._io_query(args)
@@ -442,12 +474,23 @@ class OcamlBridge:
key = _to_str(args[0]) if args else ""
return ctx.get(key)
# Helpers that are pure functions — safe to cache by args.
_CACHEABLE_HELPERS = frozenset({
"highlight", "component-source", "primitives-data",
"special-forms-data", "reference-data", "read-spec-file",
"bootstrapper-data", "bundle-analyzer-data", "routing-analyzer-data",
})
async def _io_helper(self, args: list, ctx: dict[str, Any] | None) -> Any:
"""Handle (io-request "helper" name arg1 arg2 ...).
Dispatches to registered page helpers — Python functions like
read-spec-file, bootstrapper-data, etc. The helper service name
is passed via ctx["_helper_service"].
Pure helpers (highlight etc.) are cached — same input always
produces same output. Eliminates blocking round-trips for
repeat calls across pages.
"""
import asyncio
from .pages import get_page_helpers
@@ -456,6 +499,12 @@ class OcamlBridge:
name = _to_str(args[0]) if args else ""
helper_args = [_to_python(a) for a in args[1:]]
# Cache lookup for pure helpers
if name in self._CACHEABLE_HELPERS:
cache_key = (name, *[repr(a) for a in helper_args])
if cache_key in self._io_cache:
return self._io_cache[cache_key]
# Check page helpers first (application-level)
service = (ctx or {}).get("_helper_service", "sx")
helpers = get_page_helpers(service)
@@ -464,6 +513,9 @@ class OcamlBridge:
result = fn(*helper_args)
if asyncio.iscoroutine(result):
result = await result
# Cache pure helper results
if name in self._CACHEABLE_HELPERS:
self._io_cache[cache_key] = result
return result
# Fall back to IO primitives (now, state-get, state-set!, etc.)

View File

@@ -139,6 +139,87 @@ class TestHelperInjection(unittest.IsolatedAsyncioTestCase):
self.assertNotIn("~test/code-display", result)
class TestBatchIO(unittest.IsolatedAsyncioTestCase):
"""Test that batchable helper calls are collected and resolved concurrently."""
@classmethod
def setUpClass(cls):
_skip_if_no_binary()
async def asyncSetUp(self):
self.bridge = OcamlBridge()
await self.bridge.start()
spec_dir = os.path.join(_project_root, "spec")
web_dir = os.path.join(_project_root, "web")
for f in ["parser.sx", "render.sx"]:
path = os.path.join(spec_dir, f)
if os.path.isfile(path):
async with self.bridge._lock:
await self.bridge._send(f'(load "{_escape(path)}")')
await self.bridge._read_until_ok(ctx=None)
adapter = os.path.join(web_dir, "adapter-sx.sx")
if os.path.isfile(adapter):
async with self.bridge._lock:
await self.bridge._send(f'(load "{_escape(adapter)}")')
await self.bridge._read_until_ok(ctx=None)
async def asyncTearDown(self):
await self.bridge.stop()
async def test_batch_highlight_calls(self):
"""Multiple highlight calls in aser_slot are batched, not sequential."""
# Map highlight to json-encode (available without Quart app)
sx = '(define highlight (fn (a b) (helper "json-encode" a)))'
async with self.bridge._lock:
await self.bridge._send(f'(load-source "{_escape(sx)}")')
await self.bridge._read_until_ok(ctx=None)
comp = (
'(defcomp ~test/batch (&key)'
' (div (p (highlight "aaa" "x"))'
' (p (highlight "bbb" "x"))'
' (p (highlight "ccc" "x"))))'
)
async with self.bridge._lock:
await self.bridge._send(f'(load-source "{_escape(comp)}")')
await self.bridge._read_until_ok(ctx=None)
result = await self.bridge.aser_slot(
'(~test/batch)', ctx={"_helper_service": "sx"})
# All 3 values present — placeholders replaced
self.assertIn("aaa", result)
self.assertIn("bbb", result)
self.assertIn("ccc", result)
# No placeholder markers remaining
self.assertNotIn("\u00ab", result) # «
self.assertNotIn("\u00bb", result) # »
async def test_batch_faster_than_sequential(self):
"""Batched IO should be faster than N sequential round-trips."""
sx = '(define highlight (fn (a b) (helper "json-encode" a)))'
async with self.bridge._lock:
await self.bridge._send(f'(load-source "{_escape(sx)}")')
await self.bridge._read_until_ok(ctx=None)
calls = " ".join(f'(p (highlight "v{i}" "x"))' for i in range(10))
comp = f'(defcomp ~test/perf (&key) (div {calls}))'
async with self.bridge._lock:
await self.bridge._send(f'(load-source "{_escape(comp)}")')
await self.bridge._read_until_ok(ctx=None)
t0 = time.monotonic()
result = await self.bridge.aser_slot(
'(~test/perf)', ctx={"_helper_service": "sx"})
elapsed = time.monotonic() - t0
# All 10 values present
for i in range(10):
self.assertIn(f"v{i}", result)
# Should complete in under 2 seconds (batched, not 10 × round-trip)
self.assertLess(elapsed, 2.0,
f"10 batched IO calls took {elapsed:.1f}s (target: <2s)")
class TestHelperIOPerformance(unittest.IsolatedAsyncioTestCase):
"""Test that helper IO round-trips are fast enough for production."""