Fix pipe desync: async drain on _send, robust Playwright tests

Root cause: OcamlBridge._send() used write() without drain().
asyncio.StreamWriter buffers writes — without drain(), multiple
commands accumulate and flush as a batch. The kernel processes
them sequentially, sending responses, but Python only reads one
response per command → pipe desync → "unexpected response" errors.

Fix: _send() is now async, calls drain() after every write.
All 14 callers updated to await.

Playwright tests rewritten:
- test_home_has_header: verifies #logo-opacity visible (was only
  checking for "sx" text — never caught missing header)
- test_home_has_nav_children: Geography link must be visible
- test_home_has_main_panel: #main-panel must have child elements
- TestDirectPageLoad: fresh browser.new_context() per test to
  avoid stale component hash in localStorage
- _setup_error_capture + _check_no_fatal_errors helpers

_render_to_sx uses aser_slot (not aser) — layout wrappers contain
re-parsed content that needs full expansion capability.

Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
This commit is contained in:
2026-03-19 16:11:55 +00:00
parent f819fda587
commit d3b3b4b720
4 changed files with 102 additions and 74 deletions

View File

@@ -74,7 +74,7 @@ class OcamlBridge:
self._started = True
# Verify engine identity
self._send("(ping)")
await self._send("(ping)")
kind, engine = await self._read_response()
engine_name = engine if kind == "ok" else "unknown"
_logger.info("OCaml SX kernel ready (pid=%d, engine=%s)", self._proc.pid, engine_name)
@@ -95,21 +95,21 @@ class OcamlBridge:
async def ping(self) -> str:
"""Health check — returns engine name (e.g. 'ocaml-cek')."""
async with self._lock:
self._send("(ping)")
await self._send("(ping)")
kind, value = await self._read_response()
return value or "" if kind == "ok" else ""
async def load(self, path: str) -> int:
"""Load an .sx file for side effects (defcomp, define, defmacro)."""
async with self._lock:
self._send(f'(load "{_escape(path)}")')
await self._send(f'(load "{_escape(path)}")')
value = await self._read_until_ok(ctx=None)
return int(float(value)) if value else 0
async def load_source(self, source: str) -> int:
"""Evaluate SX source for side effects."""
async with self._lock:
self._send(f'(load-source "{_escape(source)}")')
await self._send(f'(load-source "{_escape(source)}")')
value = await self._read_until_ok(ctx=None)
return int(float(value)) if value else 0
@@ -121,7 +121,7 @@ class OcamlBridge:
"""
await self._ensure_components()
async with self._lock:
self._send(f'(eval "{_escape(source)}")')
await self._send(f'(eval "{_escape(source)}")')
return await self._read_until_ok(ctx)
async def render(
@@ -132,14 +132,14 @@ class OcamlBridge:
"""Render SX to HTML, handling io-requests via Python async IO."""
await self._ensure_components()
async with self._lock:
self._send(f'(render "{_escape(source)}")')
await self._send(f'(render "{_escape(source)}")')
return await self._read_until_ok(ctx)
async def aser(self, source: str, ctx: dict[str, Any] | None = None) -> str:
"""Evaluate SX and return SX wire format, handling io-requests."""
await self._ensure_components()
async with self._lock:
self._send(f'(aser "{_escape(source)}")')
await self._send(f'(aser "{_escape(source)}")')
return await self._read_until_ok(ctx)
async def aser_slot(self, source: str, ctx: dict[str, Any] | None = None) -> str:
@@ -154,7 +154,7 @@ class OcamlBridge:
# a separate lock acquisition could let another coroutine
# interleave commands between injection and aser-slot.
await self._inject_helpers_locked()
self._send(f'(aser-slot "{_escape(source)}")')
await self._send(f'(aser-slot "{_escape(source)}")')
return await self._read_until_ok(ctx)
async def _inject_helpers_locked(self) -> None:
@@ -183,7 +183,7 @@ class OcamlBridge:
arg_list = " ".join(chr(97 + i) for i in range(nargs))
sx_def = f'(define {name} (fn ({param_names}) (helper "{name}" {arg_list})))'
try:
self._send(f'(load-source "{_escape(sx_def)}")')
await self._send(f'(load-source "{_escape(sx_def)}")')
await self._read_until_ok(ctx=None)
count += 1
except OcamlBridgeError:
@@ -252,7 +252,7 @@ class OcamlBridge:
async with self._lock:
for filepath in all_files:
try:
self._send(f'(load "{_escape(filepath)}")')
await self._send(f'(load "{_escape(filepath)}")')
value = await self._read_until_ok(ctx=None)
# Response may be a number (count) or a value — just count files
count += 1
@@ -279,7 +279,7 @@ class OcamlBridge:
if callable(fn) and not name.startswith("~"):
sx_def = f'(define {name} (fn (&rest args) (apply helper (concat (list "{name}") args))))'
try:
self._send(f'(load-source "{_escape(sx_def)}")')
await self._send(f'(load-source "{_escape(sx_def)}")')
await self._read_until_ok(ctx=None)
count += 1
except OcamlBridgeError:
@@ -290,7 +290,7 @@ class OcamlBridge:
async def reset(self) -> None:
"""Reset the kernel environment to pristine state."""
async with self._lock:
self._send("(reset)")
await self._send("(reset)")
kind, value = await self._read_response()
if kind == "error":
raise OcamlBridgeError(f"reset: {value}")
@@ -299,10 +299,12 @@ class OcamlBridge:
# Internal protocol handling
# ------------------------------------------------------------------
def _send(self, line: str) -> None:
"""Write a line to the subprocess stdin."""
async def _send(self, line: str) -> None:
"""Write a line to the subprocess stdin and flush."""
assert self._proc and self._proc.stdin
_logger.debug("SEND: %s", line[:120])
self._proc.stdin.write((line + "\n").encode())
await self._proc.stdin.drain()
async def _readline(self) -> str:
"""Read a line from the subprocess stdout."""
@@ -316,7 +318,9 @@ class OcamlBridge:
raise OcamlBridgeError(
f"OCaml subprocess died unexpectedly. stderr: {stderr.decode(errors='replace')}"
)
return data.decode().rstrip("\n")
line = data.decode().rstrip("\n")
_logger.debug("RECV: %s", line[:120])
return line
async def _read_response(self) -> tuple[str, str | None]:
"""Read a single (ok ...) or (error ...) response.
@@ -341,11 +345,11 @@ class OcamlBridge:
if line.startswith("(io-request "):
try:
result = await self._handle_io_request(line, ctx)
self._send(f"(io-response {_serialize_for_ocaml(result)})")
await self._send(f"(io-response {_serialize_for_ocaml(result)})")
except Exception as e:
# MUST send a response or the pipe desyncs
_logger.warning("IO request failed, sending nil: %s", e)
self._send("(io-response nil)")
await self._send("(io-response nil)")
continue
kind, value = _parse_response(line)

View File

@@ -49,12 +49,12 @@ class TestHelperInjection(unittest.IsolatedAsyncioTestCase):
path = os.path.join(spec_dir, f)
if os.path.isfile(path):
async with self.bridge._lock:
self.bridge._send(f'(load "{_escape(path)}")')
await self.bridge._send(f'(load "{_escape(path)}")')
await self.bridge._read_until_ok(ctx=None)
adapter = os.path.join(web_dir, "adapter-sx.sx")
if os.path.isfile(adapter):
async with self.bridge._lock:
self.bridge._send(f'(load "{_escape(adapter)}")')
await self.bridge._send(f'(load "{_escape(adapter)}")')
await self.bridge._read_until_ok(ctx=None)
async def asyncTearDown(self):
@@ -66,7 +66,7 @@ class TestHelperInjection(unittest.IsolatedAsyncioTestCase):
arg_list = " ".join(chr(97 + i) for i in range(nargs))
sx_def = f'(define {name} (fn ({param_names}) (helper "{name}" {arg_list})))'
async with self.bridge._lock:
self.bridge._send(f'(load-source "{_escape(sx_def)}")')
await self.bridge._send(f'(load-source "{_escape(sx_def)}")')
await self.bridge._read_until_ok(ctx=None)
async def test_helper_call_returns_value(self):
@@ -87,7 +87,7 @@ class TestHelperInjection(unittest.IsolatedAsyncioTestCase):
# Define a 2-arg test helper via the generic helper binding
sx_def = '(define test-two-args (fn (a b) (helper "json-encode" (str a ":" b))))'
async with self.bridge._lock:
self.bridge._send(f'(load-source "{_escape(sx_def)}")')
await self.bridge._send(f'(load-source "{_escape(sx_def)}")')
await self.bridge._read_until_ok(ctx=None)
result = await self.bridge.eval(
@@ -121,7 +121,7 @@ class TestHelperInjection(unittest.IsolatedAsyncioTestCase):
# Define a component that calls the helper
async with self.bridge._lock:
self.bridge._send(
await self.bridge._send(
'(load-source "(defcomp ~test/code-display (&key code) '
'(pre (code code)))")'
)
@@ -155,12 +155,12 @@ class TestHelperIOPerformance(unittest.IsolatedAsyncioTestCase):
path = os.path.join(spec_dir, f)
if os.path.isfile(path):
async with self.bridge._lock:
self.bridge._send(f'(load "{_escape(path)}")')
await self.bridge._send(f'(load "{_escape(path)}")')
await self.bridge._read_until_ok(ctx=None)
adapter = os.path.join(web_dir, "adapter-sx.sx")
if os.path.isfile(adapter):
async with self.bridge._lock:
self.bridge._send(f'(load "{_escape(adapter)}")')
await self.bridge._send(f'(load "{_escape(adapter)}")')
await self.bridge._read_until_ok(ctx=None)
async def asyncTearDown(self):
@@ -172,7 +172,7 @@ class TestHelperIOPerformance(unittest.IsolatedAsyncioTestCase):
param_names = "a"
sx_def = '(define json-encode (fn (a) (helper "json-encode" a)))'
async with self.bridge._lock:
self.bridge._send(f'(load-source "{_escape(sx_def)}")')
await self.bridge._send(f'(load-source "{_escape(sx_def)}")')
await self.bridge._read_until_ok(ctx=None)
# Time 20 sequential calls (simulating a page with 20 highlight calls)
@@ -192,7 +192,7 @@ class TestHelperIOPerformance(unittest.IsolatedAsyncioTestCase):
"""aser_slot with multiple helper calls completes in reasonable time."""
sx_def = '(define json-encode (fn (a) (helper "json-encode" a)))'
async with self.bridge._lock:
self.bridge._send(f'(load-source "{_escape(sx_def)}")')
await self.bridge._send(f'(load-source "{_escape(sx_def)}")')
await self.bridge._read_until_ok(ctx=None)
# Define a component with multiple helper calls
@@ -206,7 +206,7 @@ class TestHelperIOPerformance(unittest.IsolatedAsyncioTestCase):
' (p (json-encode "e"))))'
)
async with self.bridge._lock:
self.bridge._send(f'(load-source "{_escape(comp_def)}")')
await self.bridge._send(f'(load-source "{_escape(comp_def)}")')
await self.bridge._read_until_ok(ctx=None)
start = time.monotonic()
@@ -237,12 +237,12 @@ class TestAserSlotClientAffinity(unittest.IsolatedAsyncioTestCase):
path = os.path.join(spec_dir, f)
if os.path.isfile(path):
async with self.bridge._lock:
self.bridge._send(f'(load "{_escape(path)}")')
await self.bridge._send(f'(load "{_escape(path)}")')
await self.bridge._read_until_ok(ctx=None)
adapter = os.path.join(web_dir, "adapter-sx.sx")
if os.path.isfile(adapter):
async with self.bridge._lock:
self.bridge._send(f'(load "{_escape(adapter)}")')
await self.bridge._send(f'(load "{_escape(adapter)}")')
await self.bridge._read_until_ok(ctx=None)
async def asyncTearDown(self):
@@ -255,7 +255,7 @@ class TestAserSlotClientAffinity(unittest.IsolatedAsyncioTestCase):
' (div "browser-only-content"))'
)
async with self.bridge._lock:
self.bridge._send(f'(load-source "{_escape(comp_def)}")')
await self.bridge._send(f'(load-source "{_escape(comp_def)}")')
await self.bridge._read_until_ok(ctx=None)
result = await self.bridge.aser_slot('(~test/client-only)')
@@ -270,7 +270,7 @@ class TestAserSlotClientAffinity(unittest.IsolatedAsyncioTestCase):
' (div :class "server" label))'
)
async with self.bridge._lock:
self.bridge._send(f'(load-source "{_escape(comp_def)}")')
await self.bridge._send(f'(load-source "{_escape(comp_def)}")')
await self.bridge._read_until_ok(ctx=None)
result = await self.bridge.aser(
@@ -287,7 +287,7 @@ class TestAserSlotClientAffinity(unittest.IsolatedAsyncioTestCase):
' (div "auto-content" label))'
)
async with self.bridge._lock:
self.bridge._send(f'(load-source "{_escape(comp_def)}")')
await self.bridge._send(f'(load-source "{_escape(comp_def)}")')
await self.bridge._read_until_ok(ctx=None)
result = await self.bridge.aser(
@@ -302,7 +302,7 @@ class TestAserSlotClientAffinity(unittest.IsolatedAsyncioTestCase):
' (div (deref (signal 0)) label))'
)
async with self.bridge._lock:
self.bridge._send(f'(load-source "{_escape(island_def)}")')
await self.bridge._send(f'(load-source "{_escape(island_def)}")')
await self.bridge._read_until_ok(ctx=None)
result = await self.bridge.aser_slot(
@@ -321,7 +321,7 @@ class TestAserSlotClientAffinity(unittest.IsolatedAsyncioTestCase):
' (div (h1 title) (~test/inner-isle :v 42)))'
)
async with self.bridge._lock:
self.bridge._send(f'(load-source "{_escape(src)}")')
await self.bridge._send(f'(load-source "{_escape(src)}")')
await self.bridge._read_until_ok(ctx=None)
result = await self.bridge.aser_slot(
@@ -340,7 +340,7 @@ class TestAserSlotClientAffinity(unittest.IsolatedAsyncioTestCase):
' (div "expanded" label))'
)
async with self.bridge._lock:
self.bridge._send(f'(load-source "{_escape(comp_def)}")')
await self.bridge._send(f'(load-source "{_escape(comp_def)}")')
await self.bridge._read_until_ok(ctx=None)
result = await self.bridge.aser_slot(

View File

@@ -216,18 +216,8 @@ async def eval_sx_url(raw_path: str) -> Any:
Keyword("content"), wrapped_ast,
]
full_text = serialize(full_ast)
has_nl = chr(10) in full_text
if has_nl:
logger.error("NEWLINE in aser_slot input at char %d!",
full_text.index(chr(10)))
import time as _time
_t0 = _time.monotonic()
body_sx = SxExpr(await bridge.aser_slot(
full_text, ctx=ocaml_ctx))
_elapsed = _time.monotonic() - _t0
logger.info("aser_slot: %.1fs, input=%d chars, output=%d chars, starts=%s",
_elapsed, len(full_text), len(body_sx),
str(body_sx)[:100])
tctx = await get_template_context()
return await make_response(
await sx_page(tctx, body_sx), 200)

View File

@@ -506,61 +506,95 @@ class TestSpecExplorer:
# Key doc pages (smoke tests)
# ---------------------------------------------------------------------------
def _check_no_fatal_errors(errors):
"""Assert no fatal JS errors were collected."""
fatal = [e for e in errors
if any(s in e for s in ("Not callable", "Undefined symbol",
"SES_UNCAUGHT", "UNCAUGHT"))]
assert not fatal, f"JS errors: {fatal}"
def _setup_error_capture(page):
"""Attach error listeners, return the errors list."""
errors = []
page.on("pageerror", lambda err: errors.append(f"UNCAUGHT: {err.message}"))
page.on("console", lambda msg: errors.append(msg.text) if msg.type == "error" else None)
return errors
class TestHomePage:
def test_home_loads(self, page: Page):
def test_home_has_header(self, page: Page):
"""Home page MUST have the (<sx>) header with logo."""
errors = _setup_error_capture(page)
nav(page, "")
expect(page.locator("#main-panel")).to_contain_text("sx", timeout=10000)
# Header logo — the (<sx>) text
expect(page.locator("#logo-opacity")).to_be_visible(timeout=15000)
_check_no_fatal_errors(errors)
def test_home_has_nav_children(self, page: Page):
"""Home page MUST have nav children (Geography, Language, etc.)."""
nav(page, "")
expect(page.locator("a[sx-push-url]:has-text('Geography')")).to_be_visible(timeout=15000)
def test_home_has_main_panel(self, page: Page):
"""Home page MUST render #main-panel with content inside it."""
nav(page, "")
expect(page.locator("#main-panel")).to_be_visible(timeout=15000)
# Must have actual content (divs/headings), not be empty
page.wait_for_selector("#main-panel div, #main-panel h2, #main-panel p", timeout=15000)
def test_no_console_errors(self, page: Page):
"""Home page should have no JS errors (console or uncaught)."""
errors = []
page.on("console", lambda msg: errors.append(msg.text) if msg.type == "error" else None)
page.on("pageerror", lambda err: errors.append(f"UNCAUGHT: {err.message}"))
errors = _setup_error_capture(page)
page.goto(f"{BASE}/sx/", wait_until="networkidle")
page.wait_for_timeout(3000)
fatal = [e for e in errors if "Not callable" in e or "Undefined symbol" in e or "SES_UNCAUGHT" in e or "UNCAUGHT" in e]
assert not fatal, f"JS errors on home page: {fatal}"
_check_no_fatal_errors(errors)
def test_navigate_from_home_to_geography(self, page: Page):
"""Click Geography nav link from home — content must render."""
errors = []
page.on("pageerror", lambda err: errors.append(f"UNCAUGHT: {err.message}"))
errors = _setup_error_capture(page)
nav(page, "")
# Click the Geography link in the nav children
geo_link = page.locator("a[sx-push-url]:has-text('Geography')").first
expect(geo_link).to_be_visible(timeout=10000)
geo_link.click()
page.wait_for_timeout(5000)
# Content must still be visible after navigation
expect(page.locator("#main-panel")).to_contain_text("Geography", timeout=10000)
fatal = [e for e in errors if "Not callable" in e or "Undefined symbol" in e or "UNCAUGHT" in e]
assert not fatal, f"JS errors after navigation: {fatal}"
_check_no_fatal_errors(errors)
def test_navigate_geography_to_reactive(self, page: Page):
"""Click Reactive Islands from Geography — content must render."""
errors = []
page.on("pageerror", lambda err: errors.append(f"UNCAUGHT: {err.message}"))
errors = _setup_error_capture(page)
nav(page, "(geography)")
ri_link = page.locator("a[sx-push-url]:has-text('Reactive Islands')").first
expect(ri_link).to_be_visible(timeout=10000)
ri_link.click()
page.wait_for_timeout(5000)
expect(page.locator("#main-panel")).to_contain_text("Reactive Islands", timeout=10000)
fatal = [e for e in errors if "Not callable" in e or "Undefined symbol" in e or "UNCAUGHT" in e]
assert not fatal, f"JS errors after navigation: {fatal}"
_check_no_fatal_errors(errors)
def test_direct_load_reactive_page(self, page: Page):
"""Direct load of reactive islands page — no errors, content renders."""
errors = []
page.on("pageerror", lambda err: errors.append(f"UNCAUGHT: {err.message}"))
page.on("console", lambda msg: errors.append(msg.text) if msg.type == "error" else None)
page.goto(f"{BASE}/sx/(geography.(reactive))", wait_until="networkidle")
# aser_slot may take several seconds on first load — wait for render
page.wait_for_selector("#main-panel", timeout=30000)
expect(page.locator("#main-panel")).to_contain_text("Reactive Islands", timeout=10000)
fatal = [e for e in errors if "Not callable" in e or "Undefined symbol" in e or "SES_UNCAUGHT" in e or "UNCAUGHT" in e]
assert not fatal, f"JS errors on reactive page: {fatal}"
class TestDirectPageLoad:
"""Direct page loads (not HTMX) — each uses a FRESH browser context
to avoid stale component hash in localStorage from prior tests."""
@pytest.mark.parametrize("path,expected_text", [
("(geography.(reactive))", "Reactive Islands"),
("(etc)", "Etc"),
("(geography)", "Geography"),
])
def test_page_renders_with_header(self, browser, path: str, expected_text: str):
"""Page must have header, #main-panel, and expected content."""
ctx = browser.new_context(ignore_https_errors=True)
page = ctx.new_page()
try:
errors = _setup_error_capture(page)
page.goto(f"{BASE}/sx/{path}", wait_until="networkidle")
page.wait_for_selector("#main-panel", timeout=30000)
expect(page.locator("#main-panel")).to_contain_text(expected_text, timeout=10000)
expect(page.locator("#sx-nav")).to_be_visible(timeout=5000)
_check_no_fatal_errors(errors)
finally:
ctx.close()
class TestDocPages: