All checks were successful
Build and Deploy / build-and-deploy (push) Successful in 11m37s
Rename all sexp directories, files, identifiers, and references to sx. artdag/ excluded (separate media processing DSL). Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
321 lines
11 KiB
Python
321 lines
11 KiB
Python
"""Tests for the async resolver.
|
|
|
|
Uses asyncio.run() directly — no pytest-asyncio dependency needed.
|
|
Mocks execute_io at the resolver boundary to avoid infrastructure imports.
|
|
"""
|
|
|
|
import asyncio
|
|
from unittest.mock import AsyncMock, patch
|
|
|
|
import pytest
|
|
from shared.sx import parse, evaluate
|
|
from shared.sx.resolver import resolve, _collect_io, _IONode
|
|
from shared.sx.primitives_io import RequestContext, execute_io
|
|
|
|
|
|
# ---------------------------------------------------------------------------
|
|
# Helpers
|
|
# ---------------------------------------------------------------------------
|
|
|
|
def run(coro):
|
|
"""Run an async coroutine synchronously."""
|
|
return asyncio.run(coro)
|
|
|
|
|
|
async def r(text, env=None, ctx=None):
|
|
"""Parse and resolve a single expression."""
|
|
return await resolve(parse(text), ctx=ctx, env=env)
|
|
|
|
|
|
def mock_io(**responses):
|
|
"""Patch execute_io to return canned responses by primitive name.
|
|
|
|
Usage::
|
|
|
|
with mock_io(frag='<b>Card</b>', query={"title": "Apple"}):
|
|
html = run(r('...'))
|
|
|
|
For dynamic responses, pass a callable::
|
|
|
|
async def frag_handler(args, kwargs, ctx):
|
|
return f"<b>{args[1]}</b>"
|
|
with mock_io(frag=frag_handler):
|
|
...
|
|
"""
|
|
async def side_effect(name, args, kwargs, ctx):
|
|
val = responses.get(name)
|
|
if val is None:
|
|
# Delegate to real handler for context primitives
|
|
if name == "current-user":
|
|
return ctx.user
|
|
if name == "htmx-request?":
|
|
return ctx.is_htmx
|
|
return None
|
|
if callable(val) and asyncio.iscoroutinefunction(val):
|
|
return await val(args, kwargs, ctx)
|
|
if callable(val):
|
|
return val(args, kwargs, ctx)
|
|
return val
|
|
|
|
return patch("shared.sx.resolver.execute_io", side_effect=side_effect)
|
|
|
|
|
|
# ---------------------------------------------------------------------------
|
|
# Basic rendering (no I/O) — resolver should pass through to HTML renderer
|
|
# ---------------------------------------------------------------------------
|
|
|
|
class TestPassthrough:
|
|
def test_simple_html(self):
|
|
assert run(r('(div "Hello")')) == "<div>Hello</div>"
|
|
|
|
def test_nested_html(self):
|
|
assert run(r('(div (p "World"))')) == "<div><p>World</p></div>"
|
|
|
|
def test_with_env(self):
|
|
assert run(r('(p name)', env={"name": "Alice"})) == "<p>Alice</p>"
|
|
|
|
def test_component(self):
|
|
env = {}
|
|
evaluate(parse('(defcomp ~tag (&key label) (span :class "tag" label))'), env)
|
|
assert run(r('(~tag :label "New")', env=env)) == '<span class="tag">New</span>'
|
|
|
|
|
|
# ---------------------------------------------------------------------------
|
|
# I/O node collection
|
|
# ---------------------------------------------------------------------------
|
|
|
|
class TestCollectIO:
|
|
def test_finds_frag(self):
|
|
expr = parse('(div (frag "blog" "link-card" :slug "apple"))')
|
|
nodes: list[_IONode] = []
|
|
_collect_io(expr, {}, nodes)
|
|
assert len(nodes) == 1
|
|
assert nodes[0].name == "frag"
|
|
|
|
def test_finds_query(self):
|
|
expr = parse('(div (query "market" "products" :ids "1,2"))')
|
|
nodes: list[_IONode] = []
|
|
_collect_io(expr, {}, nodes)
|
|
assert len(nodes) == 1
|
|
assert nodes[0].name == "query"
|
|
|
|
def test_finds_multiple(self):
|
|
expr = parse('''
|
|
(div
|
|
(frag "blog" "card" :slug "a")
|
|
(query "market" "products" :ids "1"))
|
|
''')
|
|
nodes: list[_IONode] = []
|
|
_collect_io(expr, {}, nodes)
|
|
assert len(nodes) == 2
|
|
|
|
def test_finds_current_user(self):
|
|
expr = parse('(div (current-user))')
|
|
nodes: list[_IONode] = []
|
|
_collect_io(expr, {}, nodes)
|
|
assert len(nodes) == 1
|
|
assert nodes[0].name == "current-user"
|
|
|
|
def test_finds_htmx_request(self):
|
|
expr = parse('(div (htmx-request?))')
|
|
nodes: list[_IONode] = []
|
|
_collect_io(expr, {}, nodes)
|
|
assert len(nodes) == 1
|
|
assert nodes[0].name == "htmx-request?"
|
|
|
|
def test_no_io_nodes(self):
|
|
expr = parse('(div (p "Hello"))')
|
|
nodes: list[_IONode] = []
|
|
_collect_io(expr, {}, nodes)
|
|
assert len(nodes) == 0
|
|
|
|
def test_evaluates_kwargs(self):
|
|
expr = parse('(query "market" "products" :slug slug)')
|
|
env = {"slug": "apple"}
|
|
nodes: list[_IONode] = []
|
|
_collect_io(expr, env, nodes)
|
|
assert len(nodes) == 1
|
|
assert nodes[0].kwargs["slug"] == "apple"
|
|
|
|
def test_positional_args_evaluated(self):
|
|
expr = parse('(frag app frag_type)')
|
|
env = {"app": "blog", "frag_type": "card"}
|
|
nodes: list[_IONode] = []
|
|
_collect_io(expr, env, nodes)
|
|
assert nodes[0].args == ["blog", "card"]
|
|
|
|
|
|
# ---------------------------------------------------------------------------
|
|
# Fragment resolution (mocked)
|
|
# ---------------------------------------------------------------------------
|
|
|
|
class TestFragResolution:
|
|
def test_frag_substitution(self):
|
|
"""Fragment result is substituted as raw HTML."""
|
|
with mock_io(frag='<a href="/apple">Apple</a>'):
|
|
html = run(r('(div (frag "blog" "link-card" :slug "apple"))'))
|
|
assert '<a href="/apple">Apple</a>' in html
|
|
assert "<" not in html # should NOT be escaped
|
|
|
|
def test_frag_with_surrounding(self):
|
|
"""Fragment result sits alongside static HTML."""
|
|
with mock_io(frag="<span>Card</span>"):
|
|
html = run(r('(div (h1 "Title") (frag "blog" "card" :slug "x"))'))
|
|
assert "<h1>Title</h1>" in html
|
|
assert "<span>Card</span>" in html
|
|
|
|
def test_frag_params_forwarded(self):
|
|
"""Keyword args are forwarded to the I/O handler."""
|
|
received = {}
|
|
|
|
async def capture_frag(args, kwargs, ctx):
|
|
received.update(kwargs)
|
|
return "<b>ok</b>"
|
|
|
|
with mock_io(frag=capture_frag):
|
|
run(r('(frag "blog" "card" :slug "apple" :size "large")'))
|
|
assert received == {"slug": "apple", "size": "large"}
|
|
|
|
|
|
# ---------------------------------------------------------------------------
|
|
# Query resolution (mocked)
|
|
# ---------------------------------------------------------------------------
|
|
|
|
class TestQueryResolution:
|
|
def test_query_result_dict(self):
|
|
"""Query returning a dict renders as empty (dicts aren't renderable)."""
|
|
with mock_io(query={"title": "Apple"}):
|
|
html = run(r('(query "market" "product" :slug "apple")'))
|
|
assert html == ""
|
|
|
|
def test_query_returns_list(self):
|
|
"""Query returning a list of strings renders them."""
|
|
with mock_io(query=["Apple", "Banana"]):
|
|
html = run(r('(query "market" "product-names")'))
|
|
assert "Apple" in html
|
|
assert "Banana" in html
|
|
|
|
|
|
# ---------------------------------------------------------------------------
|
|
# Parallel I/O
|
|
# ---------------------------------------------------------------------------
|
|
|
|
class TestParallelIO:
|
|
def test_parallel_fetches(self):
|
|
"""Multiple I/O nodes are fetched concurrently."""
|
|
call_count = {"n": 0}
|
|
|
|
async def counting_frag(args, kwargs, ctx):
|
|
call_count["n"] += 1
|
|
await asyncio.sleep(0.01)
|
|
return f"<div>{args[1]}</div>"
|
|
|
|
with mock_io(frag=counting_frag):
|
|
html = run(r('''
|
|
(div
|
|
(frag "blog" "card-a")
|
|
(frag "blog" "card-b")
|
|
(frag "blog" "card-c"))
|
|
'''))
|
|
|
|
assert "<div>card-a</div>" in html
|
|
assert "<div>card-b</div>" in html
|
|
assert "<div>card-c</div>" in html
|
|
assert call_count["n"] == 3
|
|
|
|
|
|
# ---------------------------------------------------------------------------
|
|
# Request context primitives
|
|
# ---------------------------------------------------------------------------
|
|
|
|
class TestRequestContext:
|
|
def test_current_user(self):
|
|
user = {"id": 1, "name": "Alice"}
|
|
ctx = RequestContext(user=user)
|
|
result = run(execute_io("current-user", [], {}, ctx))
|
|
assert result == user
|
|
|
|
def test_htmx_true(self):
|
|
ctx = RequestContext(is_htmx=True)
|
|
assert run(execute_io("htmx-request?", [], {}, ctx)) is True
|
|
|
|
def test_htmx_false(self):
|
|
ctx = RequestContext(is_htmx=False)
|
|
assert run(execute_io("htmx-request?", [], {}, ctx)) is False
|
|
|
|
def test_no_user(self):
|
|
ctx = RequestContext()
|
|
assert run(execute_io("current-user", [], {}, ctx)) is None
|
|
|
|
|
|
# ---------------------------------------------------------------------------
|
|
# Error handling
|
|
# ---------------------------------------------------------------------------
|
|
|
|
class TestErrorHandling:
|
|
def test_frag_error_degrades_gracefully(self):
|
|
"""Failed I/O substitutes empty string, doesn't crash."""
|
|
async def failing_frag(args, kwargs, ctx):
|
|
raise ConnectionError("connection refused")
|
|
|
|
with mock_io(frag=failing_frag):
|
|
html = run(r('(div (h1 "Title") (frag "blog" "broken"))'))
|
|
assert "<h1>Title</h1>" in html
|
|
assert "<div>" in html
|
|
|
|
def test_query_error_degrades_gracefully(self):
|
|
"""Failed query substitutes empty string."""
|
|
async def failing_query(args, kwargs, ctx):
|
|
raise TimeoutError("timeout")
|
|
|
|
with mock_io(query=failing_query):
|
|
html = run(r('(div (p "Static") (query "market" "broken"))'))
|
|
assert "<p>Static</p>" in html
|
|
|
|
|
|
# ---------------------------------------------------------------------------
|
|
# Mixed static + I/O
|
|
# ---------------------------------------------------------------------------
|
|
|
|
class TestMixedContent:
|
|
def test_static_and_frag(self):
|
|
with mock_io(frag="<span>Dynamic</span>"):
|
|
html = run(r('''
|
|
(div
|
|
(h1 "Static Title")
|
|
(p "Static body")
|
|
(frag "blog" "widget"))
|
|
'''))
|
|
assert "<h1>Static Title</h1>" in html
|
|
assert "<p>Static body</p>" in html
|
|
assert "<span>Dynamic</span>" in html
|
|
|
|
def test_multiple_frag_types(self):
|
|
"""Different fragment types in one tree."""
|
|
async def dynamic_frag(args, kwargs, ctx):
|
|
return f"<b>{args[1]}</b>"
|
|
|
|
with mock_io(frag=dynamic_frag):
|
|
html = run(r('''
|
|
(div
|
|
(frag "blog" "header")
|
|
(frag "market" "sidebar"))
|
|
'''))
|
|
assert "<b>header</b>" in html
|
|
assert "<b>sidebar</b>" in html
|
|
|
|
def test_frag_and_query_together(self):
|
|
"""Tree with both frag and query nodes."""
|
|
async def mock_handler(args, kwargs, ctx):
|
|
name = args[1] if len(args) > 1 else "?"
|
|
return f"<i>{name}</i>"
|
|
|
|
with mock_io(frag=mock_handler, query="data"):
|
|
html = run(r('''
|
|
(div
|
|
(frag "blog" "card")
|
|
(query "market" "stats"))
|
|
'''))
|
|
assert "<i>card</i>" in html
|
|
assert "data" in html
|