Component names now reflect filesystem location using / as path separator and : as namespace separator for shared components: ~sx-header → ~layouts/header ~layout-app-body → ~shared:layout/app-body ~blog-admin-dashboard → ~admin/dashboard 209 files, 4,941 replacements across all services. Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
388 lines
13 KiB
Python
388 lines
13 KiB
Python
"""Tests for Phase 5 async IO proxy infrastructure.
|
|
|
|
Tests the io-deps page registry field, SxExpr serialization through
|
|
the IO proxy pipeline, dynamic allowlist construction, and the
|
|
orchestration.sx routing logic for IO-dependent pages.
|
|
"""
|
|
|
|
import pytest
|
|
from shared.sx.parser import parse_all, serialize, SxExpr
|
|
from shared.sx.types import Component, Macro, Symbol, Keyword, NIL
|
|
from shared.sx.deps import (
|
|
_compute_all_io_refs_fallback,
|
|
components_needed,
|
|
)
|
|
|
|
|
|
# ---------------------------------------------------------------------------
|
|
# Helpers
|
|
# ---------------------------------------------------------------------------
|
|
|
|
def make_env(*sx_sources: str) -> dict:
|
|
"""Parse and evaluate component definitions into an env dict."""
|
|
from shared.sx.ref.sx_ref import eval_expr as _eval, trampoline as _trampoline
|
|
env: dict = {}
|
|
for source in sx_sources:
|
|
exprs = parse_all(source)
|
|
for expr in exprs:
|
|
_trampoline(_eval(expr, env))
|
|
return env
|
|
|
|
|
|
IO_NAMES = {"highlight", "current-user", "app-url", "config", "fetch-data"}
|
|
|
|
|
|
# ---------------------------------------------------------------------------
|
|
# io-deps in page registry entries
|
|
# ---------------------------------------------------------------------------
|
|
|
|
class TestIoDepsSerialization:
|
|
"""The page registry should emit :io-deps as a list of IO primitive names."""
|
|
|
|
def test_pure_page_gets_empty_io_deps(self):
|
|
"""Pages with no IO-dependent components get :io-deps ()."""
|
|
env = make_env(
|
|
'(defcomp ~card (&key title) (div title))',
|
|
)
|
|
_compute_all_io_refs_fallback(env, IO_NAMES)
|
|
|
|
deps = {"~card"}
|
|
io_deps: set[str] = set()
|
|
for dep_name in deps:
|
|
comp = env.get(dep_name)
|
|
if isinstance(comp, Component) and comp.io_refs:
|
|
io_deps.update(comp.io_refs)
|
|
|
|
assert io_deps == set()
|
|
|
|
def test_io_page_gets_io_dep_names(self):
|
|
"""Pages with IO-dependent components get :io-deps ("highlight" ...)."""
|
|
env = make_env(
|
|
'(defcomp ~code-block (&key src) (pre (highlight src "lisp")))',
|
|
)
|
|
_compute_all_io_refs_fallback(env, IO_NAMES)
|
|
|
|
deps = {"~code-block"}
|
|
io_deps: set[str] = set()
|
|
for dep_name in deps:
|
|
comp = env.get(dep_name)
|
|
if isinstance(comp, Component) and comp.io_refs:
|
|
io_deps.update(comp.io_refs)
|
|
|
|
assert io_deps == {"highlight"}
|
|
|
|
def test_multiple_io_deps_collected(self):
|
|
"""Multiple distinct IO primitives from different components are unioned."""
|
|
env = make_env(
|
|
'(defcomp ~plans/environment-images/nav (&key) (nav (app-url "/")))',
|
|
'(defcomp ~page (&key) (div (~plans/environment-images/nav) (config "key")))',
|
|
)
|
|
_compute_all_io_refs_fallback(env, IO_NAMES)
|
|
|
|
deps = {"~plans/environment-images/nav", "~page"}
|
|
io_deps: set[str] = set()
|
|
for dep_name in deps:
|
|
comp = env.get(dep_name)
|
|
if isinstance(comp, Component) and comp.io_refs:
|
|
io_deps.update(comp.io_refs)
|
|
|
|
assert io_deps == {"app-url", "config"}
|
|
|
|
def test_transitive_io_deps_included(self):
|
|
"""IO deps from transitive component dependencies are included."""
|
|
env = make_env(
|
|
'(defcomp ~inner (&key) (div (highlight "code" "lisp")))',
|
|
'(defcomp ~outer (&key) (div (~inner)))',
|
|
)
|
|
_compute_all_io_refs_fallback(env, IO_NAMES)
|
|
|
|
deps = {"~inner", "~outer"}
|
|
io_deps: set[str] = set()
|
|
for dep_name in deps:
|
|
comp = env.get(dep_name)
|
|
if isinstance(comp, Component) and comp.io_refs:
|
|
io_deps.update(comp.io_refs)
|
|
|
|
# Both components transitively depend on highlight
|
|
assert "highlight" in io_deps
|
|
|
|
def test_io_deps_sx_format(self):
|
|
"""io-deps serializes as a proper SX list of strings."""
|
|
from shared.sx.helpers import _sx_literal
|
|
|
|
io_deps = {"highlight", "config"}
|
|
io_deps_sx = (
|
|
"(" + " ".join(_sx_literal(n) for n in sorted(io_deps)) + ")"
|
|
)
|
|
assert io_deps_sx == '("config" "highlight")'
|
|
|
|
# Parse it back
|
|
parsed = parse_all(io_deps_sx)
|
|
assert len(parsed) == 1
|
|
assert parsed[0] == ["config", "highlight"]
|
|
|
|
def test_empty_io_deps_sx_format(self):
|
|
io_deps_sx = "()"
|
|
parsed = parse_all(io_deps_sx)
|
|
assert len(parsed) == 1
|
|
assert parsed[0] == []
|
|
|
|
|
|
# ---------------------------------------------------------------------------
|
|
# Dynamic IO allowlist from component IO refs
|
|
# ---------------------------------------------------------------------------
|
|
|
|
class TestDynamicAllowlist:
|
|
"""The IO proxy allowlist should be built from component IO refs."""
|
|
|
|
def test_allowlist_from_env(self):
|
|
"""Union of all component io_refs gives the allowlist."""
|
|
env = make_env(
|
|
'(defcomp ~a (&key) (div (highlight "x" "lisp")))',
|
|
'(defcomp ~b (&key) (div (config "key")))',
|
|
'(defcomp ~c (&key) (div "pure"))',
|
|
)
|
|
_compute_all_io_refs_fallback(env, IO_NAMES)
|
|
|
|
allowed: set[str] = set()
|
|
for val in env.values():
|
|
if isinstance(val, Component) and val.io_refs:
|
|
allowed.update(val.io_refs)
|
|
|
|
assert "highlight" in allowed
|
|
assert "config" in allowed
|
|
assert len(allowed) == 2 # only these two
|
|
|
|
def test_pure_env_has_empty_allowlist(self):
|
|
"""An env with only pure components yields empty allowlist."""
|
|
env = make_env(
|
|
'(defcomp ~a (&key) (div "hello"))',
|
|
'(defcomp ~b (&key) (span "world"))',
|
|
)
|
|
_compute_all_io_refs_fallback(env, IO_NAMES)
|
|
|
|
allowed: set[str] = set()
|
|
for val in env.values():
|
|
if isinstance(val, Component) and val.io_refs:
|
|
allowed.update(val.io_refs)
|
|
|
|
assert allowed == set()
|
|
|
|
|
|
# ---------------------------------------------------------------------------
|
|
# SxExpr serialization through IO proxy pipeline
|
|
# ---------------------------------------------------------------------------
|
|
|
|
class TestSxExprIoRoundtrip:
|
|
"""SxExpr (from highlight etc.) must survive serialize → parse."""
|
|
|
|
def test_sxexpr_serializes_unquoted(self):
|
|
"""SxExpr is emitted as raw SX source, not as a quoted string."""
|
|
expr = SxExpr('(span :class "text-red-500" "hello")')
|
|
sx = serialize(expr)
|
|
assert sx == '(span :class "text-red-500" "hello")'
|
|
assert not sx.startswith('"')
|
|
|
|
def test_sxexpr_roundtrip(self):
|
|
"""SxExpr → serialize → parse → yields an AST list."""
|
|
expr = SxExpr('(span :class "text-violet-600" "keyword")')
|
|
sx = serialize(expr)
|
|
parsed = parse_all(sx)
|
|
assert len(parsed) == 1
|
|
# Should be a list: [Symbol("span"), Keyword("class"), "text-violet-600", "keyword"]
|
|
node = parsed[0]
|
|
assert isinstance(node, list)
|
|
assert isinstance(node[0], Symbol)
|
|
assert node[0].name == "span"
|
|
|
|
def test_fragment_sxexpr_roundtrip(self):
|
|
"""Fragment SxExpr with multiple children."""
|
|
expr = SxExpr(
|
|
'(<> (span :class "text-red-500" "if") '
|
|
'(span " ") '
|
|
'(span :class "text-green-500" "true"))'
|
|
)
|
|
sx = serialize(expr)
|
|
parsed = parse_all(sx)
|
|
assert len(parsed) == 1
|
|
node = parsed[0]
|
|
assert isinstance(node, list)
|
|
assert node[0].name == "<>"
|
|
|
|
def test_nil_serializes_as_nil(self):
|
|
"""None result from IO proxy serializes as 'nil'."""
|
|
sx = serialize(None)
|
|
assert sx == "nil"
|
|
parsed = parse_all(sx)
|
|
assert parsed[0] is NIL or parsed[0] is None
|
|
|
|
def test_sxexpr_in_dict_value(self):
|
|
"""SxExpr as a dict value serializes inline (not quoted)."""
|
|
expr = SxExpr('(span "hello")')
|
|
data = {"code": expr}
|
|
sx = serialize(data)
|
|
# Should be {:code (span "hello")} not {:code "(span \"hello\")"}
|
|
assert '(span "hello")' in sx
|
|
parsed = parse_all(sx)
|
|
d = parsed[0]
|
|
# The value should be a list (AST), not a string
|
|
assert isinstance(d["code"], list)
|
|
|
|
|
|
# ---------------------------------------------------------------------------
|
|
# IO proxy arg parsing (GET query string vs POST JSON body)
|
|
# ---------------------------------------------------------------------------
|
|
|
|
class TestIoProxyArgParsing:
|
|
"""Test the arg extraction logic used by the IO proxy."""
|
|
|
|
def test_get_args_from_query_string(self):
|
|
"""GET: _arg0, _arg1, ... become positional args."""
|
|
query = {"_arg0": "(defcomp ~card ...)", "_arg1": "lisp"}
|
|
args = []
|
|
kwargs = {}
|
|
for k, v in query.items():
|
|
if k.startswith("_arg"):
|
|
args.append(v)
|
|
else:
|
|
kwargs[k] = v
|
|
assert args == ["(defcomp ~card ...)", "lisp"]
|
|
assert kwargs == {}
|
|
|
|
def test_get_kwargs_from_query_string(self):
|
|
"""GET: non-_arg keys become kwargs."""
|
|
query = {"_arg0": "code", "language": "python"}
|
|
args = []
|
|
kwargs = {}
|
|
for k, v in query.items():
|
|
if k.startswith("_arg"):
|
|
args.append(v)
|
|
else:
|
|
kwargs[k] = v
|
|
assert args == ["code"]
|
|
assert kwargs == {"language": "python"}
|
|
|
|
def test_post_json_body(self):
|
|
"""POST: args and kwargs from JSON body."""
|
|
body = {"args": ["(defcomp ~card ...)", "lisp"], "kwargs": {}}
|
|
args = body.get("args", [])
|
|
kwargs = body.get("kwargs", {})
|
|
assert args == ["(defcomp ~card ...)", "lisp"]
|
|
assert kwargs == {}
|
|
|
|
|
|
# ---------------------------------------------------------------------------
|
|
# IO-aware client routing logic (orchestration.sx)
|
|
# ---------------------------------------------------------------------------
|
|
|
|
class TestIoRoutingLogic:
|
|
"""Test the orchestration.sx routing decisions for IO pages.
|
|
|
|
Uses the SX evaluator to run the actual routing logic.
|
|
"""
|
|
|
|
def _eval(self, src, env):
|
|
from shared.sx.ref.sx_ref import eval_expr as _eval, trampoline as _trampoline
|
|
result = None
|
|
for expr in parse_all(src):
|
|
result = _trampoline(_eval(expr, env))
|
|
return result
|
|
|
|
def test_io_deps_list_truthiness(self):
|
|
"""A non-empty io-deps list is truthy, empty is falsy."""
|
|
env = make_env()
|
|
# Non-empty list — (and io-deps (not (empty? io-deps))) is truthy
|
|
result = self._eval(
|
|
'(let ((io-deps (list "highlight")))'
|
|
' (if (and io-deps (not (empty? io-deps))) true false))',
|
|
env,
|
|
)
|
|
assert result is True
|
|
|
|
# Empty list — (and io-deps (not (empty? io-deps))) is falsy
|
|
# (and short-circuits: empty list is falsy, returns [])
|
|
result = self._eval(
|
|
'(let ((io-deps (list)))'
|
|
' (if (and io-deps (not (empty? io-deps))) true false))',
|
|
env,
|
|
)
|
|
assert result is False
|
|
|
|
def test_io_deps_from_parsed_page_entry(self):
|
|
"""io-deps field round-trips through serialize → parse correctly."""
|
|
entry_sx = '{:name "test" :io-deps ("highlight" "config")}'
|
|
parsed = parse_all(entry_sx)
|
|
entry = parsed[0]
|
|
|
|
env = make_env()
|
|
env["entry"] = entry
|
|
io_deps = self._eval('(get entry "io-deps")', env)
|
|
assert io_deps == ["highlight", "config"]
|
|
|
|
has_io = self._eval(
|
|
'(let ((d (get entry "io-deps")))'
|
|
' (and d (not (empty? d))))',
|
|
env,
|
|
)
|
|
assert has_io is True
|
|
|
|
def test_empty_io_deps_from_parsed_page_entry(self):
|
|
"""Empty io-deps list means page is pure."""
|
|
entry_sx = '{:name "test" :io-deps ()}'
|
|
parsed = parse_all(entry_sx)
|
|
entry = parsed[0]
|
|
|
|
env = make_env()
|
|
env["entry"] = entry
|
|
has_io = self._eval(
|
|
'(let ((d (get entry "io-deps")))'
|
|
' (if (and d (not (empty? d))) true false))',
|
|
env,
|
|
)
|
|
assert has_io is False
|
|
|
|
|
|
# ---------------------------------------------------------------------------
|
|
# Cache key determinism for IO proxy
|
|
# ---------------------------------------------------------------------------
|
|
|
|
class TestIoCacheKey:
|
|
"""The client-side IO cache keys by name + args. Verify determinism."""
|
|
|
|
def test_same_args_same_key(self):
|
|
"""Identical calls produce identical cache keys."""
|
|
def make_key(name, args, kwargs=None):
|
|
key = name
|
|
for a in args:
|
|
key += "\0" + str(a)
|
|
if kwargs:
|
|
for k, v in sorted(kwargs.items()):
|
|
key += "\0" + k + "=" + str(v)
|
|
return key
|
|
|
|
k1 = make_key("highlight", ["(div 1)", "lisp"])
|
|
k2 = make_key("highlight", ["(div 1)", "lisp"])
|
|
assert k1 == k2
|
|
|
|
def test_different_args_different_key(self):
|
|
def make_key(name, args):
|
|
key = name
|
|
for a in args:
|
|
key += "\0" + str(a)
|
|
return key
|
|
|
|
k1 = make_key("highlight", ["(div 1)", "lisp"])
|
|
k2 = make_key("highlight", ["(div 2)", "lisp"])
|
|
assert k1 != k2
|
|
|
|
def test_different_name_different_key(self):
|
|
def make_key(name, args):
|
|
key = name
|
|
for a in args:
|
|
key += "\0" + str(a)
|
|
return key
|
|
|
|
k1 = make_key("highlight", ["code"])
|
|
k2 = make_key("config", ["code"])
|
|
assert k1 != k2
|