Phase 3: Async resolver with parallel I/O and graceful degradation

Tree walker collects I/O nodes (frag, query, action, current-user,
htmx-request?), dispatches them via asyncio.gather(), substitutes results,
and renders to HTML. Failed I/O degrades gracefully to empty string.

27 new tests (199 total), all mocked at execute_io boundary — no
infrastructure dependencies needed.

Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
This commit is contained in:
2026-02-27 14:22:28 +00:00
parent 09010db70e
commit fbb7a1422c
4 changed files with 702 additions and 0 deletions

View File

@@ -654,6 +654,39 @@ Each phase is independently deployable. The end state: a platform where the appl
- 63 tests: escaping (4), atoms (8), elements (6), attributes (8), boolean attrs (4), void elements (7), fragments (3), raw! (3), components (4), expressions with control flow (8), full pages (3), edge cases (5)
- **172 total tests across all 3 files, all passing**
### Phase 3: Async Resolver — COMPLETE
**Branch:** `sexpression`
**Delivered** (`shared/sexp/`):
- `resolver.py` — Async tree walker: collects I/O nodes from parsed tree, executes them in parallel via `asyncio.gather()`, substitutes results back, renders to HTML. Multi-pass resolution (up to 5 depth) for cases where resolved values contain further I/O. Graceful degradation: failed I/O nodes substitute empty string instead of crashing.
- `primitives_io.py` — I/O primitive registry and handlers:
- `(frag "service" "type" :key val ...)` → wraps `fetch_fragment`
- `(query "service" "query-name" :key val ...)` → wraps `fetch_data`
- `(action "service" "action-name" :key val ...)` → wraps `call_action`
- `(current-user)` → user dict from `RequestContext`
- `(htmx-request?)` → boolean from `RequestContext`
- `RequestContext` — per-request state (user, is_htmx, extras) passed to I/O handlers
**Resolution strategy:**
1. Parse s-expression tree
2. Walk tree, collect all I/O nodes (frag, query, action, current-user, htmx-request?)
3. Parse each node's positional args + keyword kwargs, evaluating expressions
4. Dispatch all I/O in parallel via `asyncio.gather(return_exceptions=True)`
5. Substitute results back into tree (fragments wrapped as `_RawHTML` to prevent escaping)
6. Repeat up to 5 passes if resolved values introduce new I/O nodes
7. Render fully-resolved tree to HTML via Phase 2 renderer
**Design decisions:**
- I/O handlers use deferred imports (inside functions) so `shared.sexp` doesn't depend on infrastructure at import time — only when actually executing I/O
- Tests mock at the `execute_io` boundary (patching `shared.sexp.resolver.execute_io`) rather than patching infrastructure imports, keeping tests self-contained with no external dependencies
- Fragment results wrapped as `_RawHTML` since they're already-rendered HTML
- Identity-based substitution (`id(expr)`) maps I/O nodes back to their tree position
**Tests** (`shared/sexp/tests/test_resolver.py`):
- 27 tests: passthrough rendering (4), I/O collection (8), fragment resolution (3), query resolution (2), parallel I/O (1), request context (4), error handling (2), mixed content (3)
- **199 total tests across all 4 files, all passing**
### Test Infrastructure — COMPLETE
**Delivered:**

View File

@@ -0,0 +1,153 @@
"""
Async I/O primitives for the s-expression resolver.
These wrap rose-ash's inter-service communication layer so that s-expressions
can fetch fragments, query data, call actions, and access request context.
Unlike pure primitives (primitives.py), these are **async** and are executed
by the resolver rather than the evaluator. They are identified by name
during the tree-walk phase and dispatched via ``asyncio.gather()``.
Usage in s-expressions::
(frag "blog" "link-card" :slug "apple")
(query "market" "products-by-ids" :ids "1,2,3")
(action "market" "create-marketplace" :name "Farm Shop" :slug "farm")
(current-user)
(htmx-request?)
"""
from __future__ import annotations
from typing import Any
# ---------------------------------------------------------------------------
# Registry of async primitives (name → metadata)
# ---------------------------------------------------------------------------
# Names that the resolver recognises as I/O nodes requiring async resolution.
# The resolver collects these during tree-walk, groups them, and dispatches
# them in parallel.
IO_PRIMITIVES: frozenset[str] = frozenset({
"frag",
"query",
"action",
"current-user",
"htmx-request?",
})
# ---------------------------------------------------------------------------
# Request context (set per-request by the resolver)
# ---------------------------------------------------------------------------
class RequestContext:
"""Per-request context provided to I/O primitives.
Populated by the resolver from the Quart request before resolution begins.
"""
__slots__ = ("user", "is_htmx", "extras")
def __init__(
self,
user: dict[str, Any] | None = None,
is_htmx: bool = False,
extras: dict[str, Any] | None = None,
):
self.user = user
self.is_htmx = is_htmx
self.extras = extras or {}
# ---------------------------------------------------------------------------
# I/O dispatch
# ---------------------------------------------------------------------------
async def execute_io(
name: str,
args: list[Any],
kwargs: dict[str, Any],
ctx: RequestContext,
) -> Any:
"""Execute an I/O primitive by name.
Called by the resolver after collecting and grouping I/O nodes.
Returns the result to be substituted back into the tree.
"""
handler = _IO_HANDLERS.get(name)
if handler is None:
raise RuntimeError(f"Unknown I/O primitive: {name}")
return await handler(args, kwargs, ctx)
# ---------------------------------------------------------------------------
# Individual handlers
# ---------------------------------------------------------------------------
async def _io_frag(
args: list[Any], kwargs: dict[str, Any], ctx: RequestContext
) -> str:
"""``(frag "service" "type" :key val ...)`` → fetch_fragment."""
if len(args) < 2:
raise ValueError("frag requires service and fragment type")
service = str(args[0])
frag_type = str(args[1])
params = {k: v for k, v in kwargs.items() if v is not None}
from shared.infrastructure.fragments import fetch_fragment
return await fetch_fragment(service, frag_type, params=params or None)
async def _io_query(
args: list[Any], kwargs: dict[str, Any], ctx: RequestContext
) -> Any:
"""``(query "service" "query-name" :key val ...)`` → fetch_data."""
if len(args) < 2:
raise ValueError("query requires service and query name")
service = str(args[0])
query_name = str(args[1])
params = {k: v for k, v in kwargs.items() if v is not None}
from shared.infrastructure.data_client import fetch_data
return await fetch_data(service, query_name, params=params or None)
async def _io_action(
args: list[Any], kwargs: dict[str, Any], ctx: RequestContext
) -> Any:
"""``(action "service" "action-name" :key val ...)`` → call_action."""
if len(args) < 2:
raise ValueError("action requires service and action name")
service = str(args[0])
action_name = str(args[1])
payload = {k: v for k, v in kwargs.items() if v is not None}
from shared.infrastructure.actions import call_action
return await call_action(service, action_name, payload=payload or None)
async def _io_current_user(
args: list[Any], kwargs: dict[str, Any], ctx: RequestContext
) -> dict[str, Any] | None:
"""``(current-user)`` → user dict from request context."""
return ctx.user
async def _io_htmx_request(
args: list[Any], kwargs: dict[str, Any], ctx: RequestContext
) -> bool:
"""``(htmx-request?)`` → True if HX-Request header present."""
return ctx.is_htmx
# ---------------------------------------------------------------------------
# Handler registry
# ---------------------------------------------------------------------------
_IO_HANDLERS: dict[str, Any] = {
"frag": _io_frag,
"query": _io_query,
"action": _io_action,
"current-user": _io_current_user,
"htmx-request?": _io_htmx_request,
}

196
shared/sexp/resolver.py Normal file
View File

@@ -0,0 +1,196 @@
"""
Async resolver — walks an s-expression tree, fetches I/O in parallel,
substitutes results, and renders to HTML.
This is the DAG execution engine applied to page rendering. The strategy:
1. **Walk** the parsed tree and identify I/O nodes (``frag``, ``query``,
``action``, ``current-user``, ``htmx-request?``).
2. **Group** independent fetches.
3. **Dispatch** via ``asyncio.gather()`` for maximum parallelism.
4. **Substitute** resolved values back into the tree.
5. **Render** the fully-resolved tree to HTML via the HTML renderer.
Usage::
from shared.sexp import parse
from shared.sexp.resolver import resolve, RequestContext
expr = parse('''
(div :class "page"
(h1 "Blog")
(raw! (frag "blog" "link-card" :slug "apple")))
''')
ctx = RequestContext(user=current_user, is_htmx=is_htmx_request())
html = await resolve(expr, ctx=ctx, env={})
"""
from __future__ import annotations
import asyncio
from typing import Any
from .types import Component, Keyword, Lambda, NIL, Symbol
from .evaluator import _eval
from .html import render as html_render, _RawHTML
from .primitives_io import (
IO_PRIMITIVES,
RequestContext,
execute_io,
)
# ---------------------------------------------------------------------------
# Public API
# ---------------------------------------------------------------------------
async def resolve(
expr: Any,
*,
ctx: RequestContext | None = None,
env: dict[str, Any] | None = None,
) -> str:
"""Resolve an s-expression tree and render to HTML.
1. Collect all I/O nodes from the tree.
2. Execute them in parallel.
3. Substitute results.
4. Render to HTML.
"""
if ctx is None:
ctx = RequestContext()
if env is None:
env = {}
# Resolve I/O nodes (may require multiple passes if I/O results
# contain further I/O references, though typically one pass suffices).
resolved = await _resolve_tree(expr, env, ctx)
# Render the fully-resolved tree to HTML
return html_render(resolved, env)
# ---------------------------------------------------------------------------
# Tree walker — collect, fetch, substitute
# ---------------------------------------------------------------------------
async def _resolve_tree(
expr: Any,
env: dict[str, Any],
ctx: RequestContext,
max_depth: int = 5,
) -> Any:
"""Resolve I/O nodes in the tree. Loops up to *max_depth* passes
in case resolved values introduce new I/O nodes."""
resolved = expr
for _ in range(max_depth):
# Collect I/O nodes
io_nodes: list[_IONode] = []
_collect_io(resolved, env, io_nodes)
if not io_nodes:
break # nothing to fetch
# Execute all I/O in parallel
results = await asyncio.gather(
*[_execute_node(node, ctx) for node in io_nodes],
return_exceptions=True,
)
# Build substitution map (node id → result)
for node, result in zip(io_nodes, results):
if isinstance(result, BaseException):
# On error, substitute empty string (graceful degradation)
node.result = ""
else:
node.result = result
# Substitute results back into tree
resolved = _substitute(resolved, env, {id(n.expr): n for n in io_nodes})
return resolved
class _IONode:
"""A collected I/O node from the tree."""
__slots__ = ("name", "args", "kwargs", "expr", "result")
def __init__(self, name: str, args: list[Any], kwargs: dict[str, Any], expr: list):
self.name = name
self.args = args
self.kwargs = kwargs
self.expr = expr # original list reference for identity-based substitution
self.result: Any = None
def _collect_io(
expr: Any,
env: dict[str, Any],
out: list[_IONode],
) -> None:
"""Walk the tree and collect I/O nodes into *out*."""
if not isinstance(expr, list) or not expr:
return
head = expr[0]
if isinstance(head, Symbol) and head.name in IO_PRIMITIVES:
# Parse args and kwargs from the rest of the expression
args, kwargs = _parse_io_args(expr[1:], env)
out.append(_IONode(head.name, args, kwargs, expr))
return # don't recurse into I/O node children
# Recurse into children
for child in expr:
if isinstance(child, list):
_collect_io(child, env, out)
def _parse_io_args(
exprs: list[Any],
env: dict[str, Any],
) -> tuple[list[Any], dict[str, Any]]:
"""Split I/O node arguments into positional args and keyword kwargs.
Evaluates each argument value so variables/expressions are resolved
before the I/O call.
"""
args: list[Any] = []
kwargs: dict[str, Any] = {}
i = 0
while i < len(exprs):
item = exprs[i]
if isinstance(item, Keyword) and i + 1 < len(exprs):
kwargs[item.name] = _eval(exprs[i + 1], env)
i += 2
else:
args.append(_eval(item, env))
i += 1
return args, kwargs
async def _execute_node(node: _IONode, ctx: RequestContext) -> Any:
"""Execute a single I/O node."""
return await execute_io(node.name, node.args, node.kwargs, ctx)
def _substitute(
expr: Any,
env: dict[str, Any],
node_map: dict[int, _IONode],
) -> Any:
"""Replace I/O nodes in the tree with their resolved results."""
if not isinstance(expr, list) or not expr:
return expr
# Check if this exact list is an I/O node
node = node_map.get(id(expr))
if node is not None:
result = node.result
# Fragment results are HTML strings — wrap as _RawHTML to prevent escaping
if node.name == "frag" and isinstance(result, str):
return _RawHTML(result)
return result
# Recurse into children
return [_substitute(child, env, node_map) for child in expr]

View File

@@ -0,0 +1,320 @@
"""Tests for the async resolver.
Uses asyncio.run() directly — no pytest-asyncio dependency needed.
Mocks execute_io at the resolver boundary to avoid infrastructure imports.
"""
import asyncio
from unittest.mock import AsyncMock, patch
import pytest
from shared.sexp import parse, evaluate
from shared.sexp.resolver import resolve, _collect_io, _IONode
from shared.sexp.primitives_io import RequestContext, execute_io
# ---------------------------------------------------------------------------
# Helpers
# ---------------------------------------------------------------------------
def run(coro):
"""Run an async coroutine synchronously."""
return asyncio.run(coro)
async def r(text, env=None, ctx=None):
"""Parse and resolve a single expression."""
return await resolve(parse(text), ctx=ctx, env=env)
def mock_io(**responses):
"""Patch execute_io to return canned responses by primitive name.
Usage::
with mock_io(frag='<b>Card</b>', query={"title": "Apple"}):
html = run(r('...'))
For dynamic responses, pass a callable::
async def frag_handler(args, kwargs, ctx):
return f"<b>{args[1]}</b>"
with mock_io(frag=frag_handler):
...
"""
async def side_effect(name, args, kwargs, ctx):
val = responses.get(name)
if val is None:
# Delegate to real handler for context primitives
if name == "current-user":
return ctx.user
if name == "htmx-request?":
return ctx.is_htmx
return None
if callable(val) and asyncio.iscoroutinefunction(val):
return await val(args, kwargs, ctx)
if callable(val):
return val(args, kwargs, ctx)
return val
return patch("shared.sexp.resolver.execute_io", side_effect=side_effect)
# ---------------------------------------------------------------------------
# Basic rendering (no I/O) — resolver should pass through to HTML renderer
# ---------------------------------------------------------------------------
class TestPassthrough:
def test_simple_html(self):
assert run(r('(div "Hello")')) == "<div>Hello</div>"
def test_nested_html(self):
assert run(r('(div (p "World"))')) == "<div><p>World</p></div>"
def test_with_env(self):
assert run(r('(p name)', env={"name": "Alice"})) == "<p>Alice</p>"
def test_component(self):
env = {}
evaluate(parse('(defcomp ~tag (&key label) (span :class "tag" label))'), env)
assert run(r('(~tag :label "New")', env=env)) == '<span class="tag">New</span>'
# ---------------------------------------------------------------------------
# I/O node collection
# ---------------------------------------------------------------------------
class TestCollectIO:
def test_finds_frag(self):
expr = parse('(div (frag "blog" "link-card" :slug "apple"))')
nodes: list[_IONode] = []
_collect_io(expr, {}, nodes)
assert len(nodes) == 1
assert nodes[0].name == "frag"
def test_finds_query(self):
expr = parse('(div (query "market" "products" :ids "1,2"))')
nodes: list[_IONode] = []
_collect_io(expr, {}, nodes)
assert len(nodes) == 1
assert nodes[0].name == "query"
def test_finds_multiple(self):
expr = parse('''
(div
(frag "blog" "card" :slug "a")
(query "market" "products" :ids "1"))
''')
nodes: list[_IONode] = []
_collect_io(expr, {}, nodes)
assert len(nodes) == 2
def test_finds_current_user(self):
expr = parse('(div (current-user))')
nodes: list[_IONode] = []
_collect_io(expr, {}, nodes)
assert len(nodes) == 1
assert nodes[0].name == "current-user"
def test_finds_htmx_request(self):
expr = parse('(div (htmx-request?))')
nodes: list[_IONode] = []
_collect_io(expr, {}, nodes)
assert len(nodes) == 1
assert nodes[0].name == "htmx-request?"
def test_no_io_nodes(self):
expr = parse('(div (p "Hello"))')
nodes: list[_IONode] = []
_collect_io(expr, {}, nodes)
assert len(nodes) == 0
def test_evaluates_kwargs(self):
expr = parse('(query "market" "products" :slug slug)')
env = {"slug": "apple"}
nodes: list[_IONode] = []
_collect_io(expr, env, nodes)
assert len(nodes) == 1
assert nodes[0].kwargs["slug"] == "apple"
def test_positional_args_evaluated(self):
expr = parse('(frag app frag_type)')
env = {"app": "blog", "frag_type": "card"}
nodes: list[_IONode] = []
_collect_io(expr, env, nodes)
assert nodes[0].args == ["blog", "card"]
# ---------------------------------------------------------------------------
# Fragment resolution (mocked)
# ---------------------------------------------------------------------------
class TestFragResolution:
def test_frag_substitution(self):
"""Fragment result is substituted as raw HTML."""
with mock_io(frag='<a href="/apple">Apple</a>'):
html = run(r('(div (frag "blog" "link-card" :slug "apple"))'))
assert '<a href="/apple">Apple</a>' in html
assert "&lt;" not in html # should NOT be escaped
def test_frag_with_surrounding(self):
"""Fragment result sits alongside static HTML."""
with mock_io(frag="<span>Card</span>"):
html = run(r('(div (h1 "Title") (frag "blog" "card" :slug "x"))'))
assert "<h1>Title</h1>" in html
assert "<span>Card</span>" in html
def test_frag_params_forwarded(self):
"""Keyword args are forwarded to the I/O handler."""
received = {}
async def capture_frag(args, kwargs, ctx):
received.update(kwargs)
return "<b>ok</b>"
with mock_io(frag=capture_frag):
run(r('(frag "blog" "card" :slug "apple" :size "large")'))
assert received == {"slug": "apple", "size": "large"}
# ---------------------------------------------------------------------------
# Query resolution (mocked)
# ---------------------------------------------------------------------------
class TestQueryResolution:
def test_query_result_dict(self):
"""Query returning a dict renders as empty (dicts aren't renderable)."""
with mock_io(query={"title": "Apple"}):
html = run(r('(query "market" "product" :slug "apple")'))
assert html == ""
def test_query_returns_list(self):
"""Query returning a list of strings renders them."""
with mock_io(query=["Apple", "Banana"]):
html = run(r('(query "market" "product-names")'))
assert "Apple" in html
assert "Banana" in html
# ---------------------------------------------------------------------------
# Parallel I/O
# ---------------------------------------------------------------------------
class TestParallelIO:
def test_parallel_fetches(self):
"""Multiple I/O nodes are fetched concurrently."""
call_count = {"n": 0}
async def counting_frag(args, kwargs, ctx):
call_count["n"] += 1
await asyncio.sleep(0.01)
return f"<div>{args[1]}</div>"
with mock_io(frag=counting_frag):
html = run(r('''
(div
(frag "blog" "card-a")
(frag "blog" "card-b")
(frag "blog" "card-c"))
'''))
assert "<div>card-a</div>" in html
assert "<div>card-b</div>" in html
assert "<div>card-c</div>" in html
assert call_count["n"] == 3
# ---------------------------------------------------------------------------
# Request context primitives
# ---------------------------------------------------------------------------
class TestRequestContext:
def test_current_user(self):
user = {"id": 1, "name": "Alice"}
ctx = RequestContext(user=user)
result = run(execute_io("current-user", [], {}, ctx))
assert result == user
def test_htmx_true(self):
ctx = RequestContext(is_htmx=True)
assert run(execute_io("htmx-request?", [], {}, ctx)) is True
def test_htmx_false(self):
ctx = RequestContext(is_htmx=False)
assert run(execute_io("htmx-request?", [], {}, ctx)) is False
def test_no_user(self):
ctx = RequestContext()
assert run(execute_io("current-user", [], {}, ctx)) is None
# ---------------------------------------------------------------------------
# Error handling
# ---------------------------------------------------------------------------
class TestErrorHandling:
def test_frag_error_degrades_gracefully(self):
"""Failed I/O substitutes empty string, doesn't crash."""
async def failing_frag(args, kwargs, ctx):
raise ConnectionError("connection refused")
with mock_io(frag=failing_frag):
html = run(r('(div (h1 "Title") (frag "blog" "broken"))'))
assert "<h1>Title</h1>" in html
assert "<div>" in html
def test_query_error_degrades_gracefully(self):
"""Failed query substitutes empty string."""
async def failing_query(args, kwargs, ctx):
raise TimeoutError("timeout")
with mock_io(query=failing_query):
html = run(r('(div (p "Static") (query "market" "broken"))'))
assert "<p>Static</p>" in html
# ---------------------------------------------------------------------------
# Mixed static + I/O
# ---------------------------------------------------------------------------
class TestMixedContent:
def test_static_and_frag(self):
with mock_io(frag="<span>Dynamic</span>"):
html = run(r('''
(div
(h1 "Static Title")
(p "Static body")
(frag "blog" "widget"))
'''))
assert "<h1>Static Title</h1>" in html
assert "<p>Static body</p>" in html
assert "<span>Dynamic</span>" in html
def test_multiple_frag_types(self):
"""Different fragment types in one tree."""
async def dynamic_frag(args, kwargs, ctx):
return f"<b>{args[1]}</b>"
with mock_io(frag=dynamic_frag):
html = run(r('''
(div
(frag "blog" "header")
(frag "market" "sidebar"))
'''))
assert "<b>header</b>" in html
assert "<b>sidebar</b>" in html
def test_frag_and_query_together(self):
"""Tree with both frag and query nodes."""
async def mock_handler(args, kwargs, ctx):
name = args[1] if len(args) > 1 else "?"
return f"<i>{name}</i>"
with mock_io(frag=mock_handler, query="data"):
html = run(r('''
(div
(frag "blog" "card")
(query "market" "stats"))
'''))
assert "<i>card</i>" in html
assert "data" in html