"""Tests for the async resolver. Uses asyncio.run() directly — no pytest-asyncio dependency needed. Mocks execute_io at the resolver boundary to avoid infrastructure imports. """ import asyncio from unittest.mock import AsyncMock, patch import pytest from shared.sx import parse, evaluate from shared.sx.resolver import resolve, _collect_io, _IONode from shared.sx.primitives_io import RequestContext, execute_io # --------------------------------------------------------------------------- # Helpers # --------------------------------------------------------------------------- def run(coro): """Run an async coroutine synchronously.""" return asyncio.run(coro) async def r(text, env=None, ctx=None): """Parse and resolve a single expression.""" return await resolve(parse(text), ctx=ctx, env=env) def mock_io(**responses): """Patch execute_io to return canned responses by primitive name. Usage:: with mock_io(frag='Card', query={"title": "Apple"}): html = run(r('...')) For dynamic responses, pass a callable:: async def frag_handler(args, kwargs, ctx): return f"{args[1]}" with mock_io(frag=frag_handler): ... """ async def side_effect(name, args, kwargs, ctx): val = responses.get(name) if val is None: # Delegate to real handler for context primitives if name == "current-user": return ctx.user if name == "htmx-request?": return ctx.is_htmx return None if callable(val) and asyncio.iscoroutinefunction(val): return await val(args, kwargs, ctx) if callable(val): return val(args, kwargs, ctx) return val return patch("shared.sx.resolver.execute_io", side_effect=side_effect) # --------------------------------------------------------------------------- # Basic rendering (no I/O) — resolver should pass through to HTML renderer # --------------------------------------------------------------------------- class TestPassthrough: def test_simple_html(self): assert run(r('(div "Hello")')) == "
Hello
" def test_nested_html(self): assert run(r('(div (p "World"))')) == "

World

" def test_with_env(self): assert run(r('(p name)', env={"name": "Alice"})) == "

Alice

" def test_component(self): env = {} evaluate(parse('(defcomp ~tag (&key label) (span :class "tag" label))'), env) assert run(r('(~tag :label "New")', env=env)) == 'New' # --------------------------------------------------------------------------- # I/O node collection # --------------------------------------------------------------------------- class TestCollectIO: def test_finds_frag(self): expr = parse('(div (frag "blog" "link-card" :slug "apple"))') nodes: list[_IONode] = [] _collect_io(expr, {}, nodes) assert len(nodes) == 1 assert nodes[0].name == "frag" def test_finds_query(self): expr = parse('(div (query "market" "products" :ids "1,2"))') nodes: list[_IONode] = [] _collect_io(expr, {}, nodes) assert len(nodes) == 1 assert nodes[0].name == "query" def test_finds_multiple(self): expr = parse(''' (div (frag "blog" "card" :slug "a") (query "market" "products" :ids "1")) ''') nodes: list[_IONode] = [] _collect_io(expr, {}, nodes) assert len(nodes) == 2 def test_finds_current_user(self): expr = parse('(div (current-user))') nodes: list[_IONode] = [] _collect_io(expr, {}, nodes) assert len(nodes) == 1 assert nodes[0].name == "current-user" def test_finds_htmx_request(self): expr = parse('(div (htmx-request?))') nodes: list[_IONode] = [] _collect_io(expr, {}, nodes) assert len(nodes) == 1 assert nodes[0].name == "htmx-request?" def test_no_io_nodes(self): expr = parse('(div (p "Hello"))') nodes: list[_IONode] = [] _collect_io(expr, {}, nodes) assert len(nodes) == 0 def test_evaluates_kwargs(self): expr = parse('(query "market" "products" :slug slug)') env = {"slug": "apple"} nodes: list[_IONode] = [] _collect_io(expr, env, nodes) assert len(nodes) == 1 assert nodes[0].kwargs["slug"] == "apple" def test_positional_args_evaluated(self): expr = parse('(frag app frag_type)') env = {"app": "blog", "frag_type": "card"} nodes: list[_IONode] = [] _collect_io(expr, env, nodes) assert nodes[0].args == ["blog", "card"] # --------------------------------------------------------------------------- # Fragment resolution (mocked) # --------------------------------------------------------------------------- class TestFragResolution: def test_frag_substitution(self): """Fragment result is substituted as raw HTML.""" with mock_io(frag='Apple'): html = run(r('(div (frag "blog" "link-card" :slug "apple"))')) assert 'Apple' in html assert "<" not in html # should NOT be escaped def test_frag_with_surrounding(self): """Fragment result sits alongside static HTML.""" with mock_io(frag="Card"): html = run(r('(div (h1 "Title") (frag "blog" "card" :slug "x"))')) assert "

Title

" in html assert "Card" in html def test_frag_params_forwarded(self): """Keyword args are forwarded to the I/O handler.""" received = {} async def capture_frag(args, kwargs, ctx): received.update(kwargs) return "ok" with mock_io(frag=capture_frag): run(r('(frag "blog" "card" :slug "apple" :size "large")')) assert received == {"slug": "apple", "size": "large"} # --------------------------------------------------------------------------- # Query resolution (mocked) # --------------------------------------------------------------------------- class TestQueryResolution: def test_query_result_dict(self): """Query returning a dict renders as empty (dicts aren't renderable).""" with mock_io(query={"title": "Apple"}): html = run(r('(query "market" "product" :slug "apple")')) assert html == "" def test_query_returns_list(self): """Query returning a list of strings renders them.""" with mock_io(query=["Apple", "Banana"]): html = run(r('(query "market" "product-names")')) assert "Apple" in html assert "Banana" in html # --------------------------------------------------------------------------- # Parallel I/O # --------------------------------------------------------------------------- class TestParallelIO: def test_parallel_fetches(self): """Multiple I/O nodes are fetched concurrently.""" call_count = {"n": 0} async def counting_frag(args, kwargs, ctx): call_count["n"] += 1 await asyncio.sleep(0.01) return f"
{args[1]}
" with mock_io(frag=counting_frag): html = run(r(''' (div (frag "blog" "card-a") (frag "blog" "card-b") (frag "blog" "card-c")) ''')) assert "
card-a
" in html assert "
card-b
" in html assert "
card-c
" in html assert call_count["n"] == 3 # --------------------------------------------------------------------------- # Request context primitives # --------------------------------------------------------------------------- class TestRequestContext: def test_current_user(self): user = {"id": 1, "name": "Alice"} ctx = RequestContext(user=user) result = run(execute_io("current-user", [], {}, ctx)) assert result == user def test_htmx_true(self): ctx = RequestContext(is_htmx=True) assert run(execute_io("htmx-request?", [], {}, ctx)) is True def test_htmx_false(self): ctx = RequestContext(is_htmx=False) assert run(execute_io("htmx-request?", [], {}, ctx)) is False def test_no_user(self): ctx = RequestContext() assert run(execute_io("current-user", [], {}, ctx)) is None # --------------------------------------------------------------------------- # Error handling # --------------------------------------------------------------------------- class TestErrorHandling: def test_frag_error_degrades_gracefully(self): """Failed I/O substitutes empty string, doesn't crash.""" async def failing_frag(args, kwargs, ctx): raise ConnectionError("connection refused") with mock_io(frag=failing_frag): html = run(r('(div (h1 "Title") (frag "blog" "broken"))')) assert "

Title

" in html assert "
" in html def test_query_error_degrades_gracefully(self): """Failed query substitutes empty string.""" async def failing_query(args, kwargs, ctx): raise TimeoutError("timeout") with mock_io(query=failing_query): html = run(r('(div (p "Static") (query "market" "broken"))')) assert "

Static

" in html # --------------------------------------------------------------------------- # Mixed static + I/O # --------------------------------------------------------------------------- class TestMixedContent: def test_static_and_frag(self): with mock_io(frag="Dynamic"): html = run(r(''' (div (h1 "Static Title") (p "Static body") (frag "blog" "widget")) ''')) assert "

Static Title

" in html assert "

Static body

" in html assert "Dynamic" in html def test_multiple_frag_types(self): """Different fragment types in one tree.""" async def dynamic_frag(args, kwargs, ctx): return f"{args[1]}" with mock_io(frag=dynamic_frag): html = run(r(''' (div (frag "blog" "header") (frag "market" "sidebar")) ''')) assert "header" in html assert "sidebar" in html def test_frag_and_query_together(self): """Tree with both frag and query nodes.""" async def mock_handler(args, kwargs, ctx): name = args[1] if len(args) > 1 else "?" return f"{name}" with mock_io(frag=mock_handler, query="data"): html = run(r(''' (div (frag "blog" "card") (query "market" "stats")) ''')) assert "card" in html assert "data" in html