Files
rose-ash/shared/sexp/tests/test_resolver.py
giles fbb7a1422c
All checks were successful
Build and Deploy / build-and-deploy (push) Successful in 1m14s
Phase 3: Async resolver with parallel I/O and graceful degradation
Tree walker collects I/O nodes (frag, query, action, current-user,
htmx-request?), dispatches them via asyncio.gather(), substitutes results,
and renders to HTML. Failed I/O degrades gracefully to empty string.

27 new tests (199 total), all mocked at execute_io boundary — no
infrastructure dependencies needed.

Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
2026-02-27 14:22:28 +00:00

321 lines
11 KiB
Python

"""Tests for the async resolver.
Uses asyncio.run() directly — no pytest-asyncio dependency needed.
Mocks execute_io at the resolver boundary to avoid infrastructure imports.
"""
import asyncio
from unittest.mock import AsyncMock, patch
import pytest
from shared.sexp import parse, evaluate
from shared.sexp.resolver import resolve, _collect_io, _IONode
from shared.sexp.primitives_io import RequestContext, execute_io
# ---------------------------------------------------------------------------
# Helpers
# ---------------------------------------------------------------------------
def run(coro):
"""Run an async coroutine synchronously."""
return asyncio.run(coro)
async def r(text, env=None, ctx=None):
"""Parse and resolve a single expression."""
return await resolve(parse(text), ctx=ctx, env=env)
def mock_io(**responses):
"""Patch execute_io to return canned responses by primitive name.
Usage::
with mock_io(frag='<b>Card</b>', query={"title": "Apple"}):
html = run(r('...'))
For dynamic responses, pass a callable::
async def frag_handler(args, kwargs, ctx):
return f"<b>{args[1]}</b>"
with mock_io(frag=frag_handler):
...
"""
async def side_effect(name, args, kwargs, ctx):
val = responses.get(name)
if val is None:
# Delegate to real handler for context primitives
if name == "current-user":
return ctx.user
if name == "htmx-request?":
return ctx.is_htmx
return None
if callable(val) and asyncio.iscoroutinefunction(val):
return await val(args, kwargs, ctx)
if callable(val):
return val(args, kwargs, ctx)
return val
return patch("shared.sexp.resolver.execute_io", side_effect=side_effect)
# ---------------------------------------------------------------------------
# Basic rendering (no I/O) — resolver should pass through to HTML renderer
# ---------------------------------------------------------------------------
class TestPassthrough:
def test_simple_html(self):
assert run(r('(div "Hello")')) == "<div>Hello</div>"
def test_nested_html(self):
assert run(r('(div (p "World"))')) == "<div><p>World</p></div>"
def test_with_env(self):
assert run(r('(p name)', env={"name": "Alice"})) == "<p>Alice</p>"
def test_component(self):
env = {}
evaluate(parse('(defcomp ~tag (&key label) (span :class "tag" label))'), env)
assert run(r('(~tag :label "New")', env=env)) == '<span class="tag">New</span>'
# ---------------------------------------------------------------------------
# I/O node collection
# ---------------------------------------------------------------------------
class TestCollectIO:
def test_finds_frag(self):
expr = parse('(div (frag "blog" "link-card" :slug "apple"))')
nodes: list[_IONode] = []
_collect_io(expr, {}, nodes)
assert len(nodes) == 1
assert nodes[0].name == "frag"
def test_finds_query(self):
expr = parse('(div (query "market" "products" :ids "1,2"))')
nodes: list[_IONode] = []
_collect_io(expr, {}, nodes)
assert len(nodes) == 1
assert nodes[0].name == "query"
def test_finds_multiple(self):
expr = parse('''
(div
(frag "blog" "card" :slug "a")
(query "market" "products" :ids "1"))
''')
nodes: list[_IONode] = []
_collect_io(expr, {}, nodes)
assert len(nodes) == 2
def test_finds_current_user(self):
expr = parse('(div (current-user))')
nodes: list[_IONode] = []
_collect_io(expr, {}, nodes)
assert len(nodes) == 1
assert nodes[0].name == "current-user"
def test_finds_htmx_request(self):
expr = parse('(div (htmx-request?))')
nodes: list[_IONode] = []
_collect_io(expr, {}, nodes)
assert len(nodes) == 1
assert nodes[0].name == "htmx-request?"
def test_no_io_nodes(self):
expr = parse('(div (p "Hello"))')
nodes: list[_IONode] = []
_collect_io(expr, {}, nodes)
assert len(nodes) == 0
def test_evaluates_kwargs(self):
expr = parse('(query "market" "products" :slug slug)')
env = {"slug": "apple"}
nodes: list[_IONode] = []
_collect_io(expr, env, nodes)
assert len(nodes) == 1
assert nodes[0].kwargs["slug"] == "apple"
def test_positional_args_evaluated(self):
expr = parse('(frag app frag_type)')
env = {"app": "blog", "frag_type": "card"}
nodes: list[_IONode] = []
_collect_io(expr, env, nodes)
assert nodes[0].args == ["blog", "card"]
# ---------------------------------------------------------------------------
# Fragment resolution (mocked)
# ---------------------------------------------------------------------------
class TestFragResolution:
def test_frag_substitution(self):
"""Fragment result is substituted as raw HTML."""
with mock_io(frag='<a href="/apple">Apple</a>'):
html = run(r('(div (frag "blog" "link-card" :slug "apple"))'))
assert '<a href="/apple">Apple</a>' in html
assert "&lt;" not in html # should NOT be escaped
def test_frag_with_surrounding(self):
"""Fragment result sits alongside static HTML."""
with mock_io(frag="<span>Card</span>"):
html = run(r('(div (h1 "Title") (frag "blog" "card" :slug "x"))'))
assert "<h1>Title</h1>" in html
assert "<span>Card</span>" in html
def test_frag_params_forwarded(self):
"""Keyword args are forwarded to the I/O handler."""
received = {}
async def capture_frag(args, kwargs, ctx):
received.update(kwargs)
return "<b>ok</b>"
with mock_io(frag=capture_frag):
run(r('(frag "blog" "card" :slug "apple" :size "large")'))
assert received == {"slug": "apple", "size": "large"}
# ---------------------------------------------------------------------------
# Query resolution (mocked)
# ---------------------------------------------------------------------------
class TestQueryResolution:
def test_query_result_dict(self):
"""Query returning a dict renders as empty (dicts aren't renderable)."""
with mock_io(query={"title": "Apple"}):
html = run(r('(query "market" "product" :slug "apple")'))
assert html == ""
def test_query_returns_list(self):
"""Query returning a list of strings renders them."""
with mock_io(query=["Apple", "Banana"]):
html = run(r('(query "market" "product-names")'))
assert "Apple" in html
assert "Banana" in html
# ---------------------------------------------------------------------------
# Parallel I/O
# ---------------------------------------------------------------------------
class TestParallelIO:
def test_parallel_fetches(self):
"""Multiple I/O nodes are fetched concurrently."""
call_count = {"n": 0}
async def counting_frag(args, kwargs, ctx):
call_count["n"] += 1
await asyncio.sleep(0.01)
return f"<div>{args[1]}</div>"
with mock_io(frag=counting_frag):
html = run(r('''
(div
(frag "blog" "card-a")
(frag "blog" "card-b")
(frag "blog" "card-c"))
'''))
assert "<div>card-a</div>" in html
assert "<div>card-b</div>" in html
assert "<div>card-c</div>" in html
assert call_count["n"] == 3
# ---------------------------------------------------------------------------
# Request context primitives
# ---------------------------------------------------------------------------
class TestRequestContext:
def test_current_user(self):
user = {"id": 1, "name": "Alice"}
ctx = RequestContext(user=user)
result = run(execute_io("current-user", [], {}, ctx))
assert result == user
def test_htmx_true(self):
ctx = RequestContext(is_htmx=True)
assert run(execute_io("htmx-request?", [], {}, ctx)) is True
def test_htmx_false(self):
ctx = RequestContext(is_htmx=False)
assert run(execute_io("htmx-request?", [], {}, ctx)) is False
def test_no_user(self):
ctx = RequestContext()
assert run(execute_io("current-user", [], {}, ctx)) is None
# ---------------------------------------------------------------------------
# Error handling
# ---------------------------------------------------------------------------
class TestErrorHandling:
def test_frag_error_degrades_gracefully(self):
"""Failed I/O substitutes empty string, doesn't crash."""
async def failing_frag(args, kwargs, ctx):
raise ConnectionError("connection refused")
with mock_io(frag=failing_frag):
html = run(r('(div (h1 "Title") (frag "blog" "broken"))'))
assert "<h1>Title</h1>" in html
assert "<div>" in html
def test_query_error_degrades_gracefully(self):
"""Failed query substitutes empty string."""
async def failing_query(args, kwargs, ctx):
raise TimeoutError("timeout")
with mock_io(query=failing_query):
html = run(r('(div (p "Static") (query "market" "broken"))'))
assert "<p>Static</p>" in html
# ---------------------------------------------------------------------------
# Mixed static + I/O
# ---------------------------------------------------------------------------
class TestMixedContent:
def test_static_and_frag(self):
with mock_io(frag="<span>Dynamic</span>"):
html = run(r('''
(div
(h1 "Static Title")
(p "Static body")
(frag "blog" "widget"))
'''))
assert "<h1>Static Title</h1>" in html
assert "<p>Static body</p>" in html
assert "<span>Dynamic</span>" in html
def test_multiple_frag_types(self):
"""Different fragment types in one tree."""
async def dynamic_frag(args, kwargs, ctx):
return f"<b>{args[1]}</b>"
with mock_io(frag=dynamic_frag):
html = run(r('''
(div
(frag "blog" "header")
(frag "market" "sidebar"))
'''))
assert "<b>header</b>" in html
assert "<b>sidebar</b>" in html
def test_frag_and_query_together(self):
"""Tree with both frag and query nodes."""
async def mock_handler(args, kwargs, ctx):
name = args[1] if len(args) > 1 else "?"
return f"<i>{name}</i>"
with mock_io(frag=mock_handler, query="data"):
html = run(r('''
(div
(frag "blog" "card")
(query "market" "stats"))
'''))
assert "<i>card</i>" in html
assert "data" in html