"""Tests for Phase 5 async IO proxy infrastructure. Tests the io-deps page registry field, SxExpr serialization through the IO proxy pipeline, dynamic allowlist construction, and the orchestration.sx routing logic for IO-dependent pages. """ import pytest from shared.sx.parser import parse_all, serialize, SxExpr from shared.sx.types import Component, Macro, Symbol, Keyword, NIL from shared.sx.deps import ( _compute_all_io_refs_fallback, components_needed, ) # --------------------------------------------------------------------------- # Helpers # --------------------------------------------------------------------------- def make_env(*sx_sources: str) -> dict: """Parse and evaluate component definitions into an env dict.""" from shared.sx.evaluator import _eval, _trampoline env: dict = {} for source in sx_sources: exprs = parse_all(source) for expr in exprs: _trampoline(_eval(expr, env)) return env IO_NAMES = {"highlight", "current-user", "app-url", "config", "fetch-data"} # --------------------------------------------------------------------------- # io-deps in page registry entries # --------------------------------------------------------------------------- class TestIoDepsSerialization: """The page registry should emit :io-deps as a list of IO primitive names.""" def test_pure_page_gets_empty_io_deps(self): """Pages with no IO-dependent components get :io-deps ().""" env = make_env( '(defcomp ~card (&key title) (div title))', ) _compute_all_io_refs_fallback(env, IO_NAMES) deps = {"~card"} io_deps: set[str] = set() for dep_name in deps: comp = env.get(dep_name) if isinstance(comp, Component) and comp.io_refs: io_deps.update(comp.io_refs) assert io_deps == set() def test_io_page_gets_io_dep_names(self): """Pages with IO-dependent components get :io-deps ("highlight" ...).""" env = make_env( '(defcomp ~code-block (&key src) (pre (highlight src "lisp")))', ) _compute_all_io_refs_fallback(env, IO_NAMES) deps = {"~code-block"} io_deps: set[str] = set() for dep_name in deps: comp = env.get(dep_name) if isinstance(comp, Component) and comp.io_refs: io_deps.update(comp.io_refs) assert io_deps == {"highlight"} def test_multiple_io_deps_collected(self): """Multiple distinct IO primitives from different components are unioned.""" env = make_env( '(defcomp ~nav (&key) (nav (app-url "/")))', '(defcomp ~page (&key) (div (~nav) (config "key")))', ) _compute_all_io_refs_fallback(env, IO_NAMES) deps = {"~nav", "~page"} io_deps: set[str] = set() for dep_name in deps: comp = env.get(dep_name) if isinstance(comp, Component) and comp.io_refs: io_deps.update(comp.io_refs) assert io_deps == {"app-url", "config"} def test_transitive_io_deps_included(self): """IO deps from transitive component dependencies are included.""" env = make_env( '(defcomp ~inner (&key) (div (highlight "code" "lisp")))', '(defcomp ~outer (&key) (div (~inner)))', ) _compute_all_io_refs_fallback(env, IO_NAMES) deps = {"~inner", "~outer"} io_deps: set[str] = set() for dep_name in deps: comp = env.get(dep_name) if isinstance(comp, Component) and comp.io_refs: io_deps.update(comp.io_refs) # Both components transitively depend on highlight assert "highlight" in io_deps def test_io_deps_sx_format(self): """io-deps serializes as a proper SX list of strings.""" from shared.sx.helpers import _sx_literal io_deps = {"highlight", "config"} io_deps_sx = ( "(" + " ".join(_sx_literal(n) for n in sorted(io_deps)) + ")" ) assert io_deps_sx == '("config" "highlight")' # Parse it back parsed = parse_all(io_deps_sx) assert len(parsed) == 1 assert parsed[0] == ["config", "highlight"] def test_empty_io_deps_sx_format(self): io_deps_sx = "()" parsed = parse_all(io_deps_sx) assert len(parsed) == 1 assert parsed[0] == [] # --------------------------------------------------------------------------- # Dynamic IO allowlist from component IO refs # --------------------------------------------------------------------------- class TestDynamicAllowlist: """The IO proxy allowlist should be built from component IO refs.""" def test_allowlist_from_env(self): """Union of all component io_refs gives the allowlist.""" env = make_env( '(defcomp ~a (&key) (div (highlight "x" "lisp")))', '(defcomp ~b (&key) (div (config "key")))', '(defcomp ~c (&key) (div "pure"))', ) _compute_all_io_refs_fallback(env, IO_NAMES) allowed: set[str] = set() for val in env.values(): if isinstance(val, Component) and val.io_refs: allowed.update(val.io_refs) assert "highlight" in allowed assert "config" in allowed assert len(allowed) == 2 # only these two def test_pure_env_has_empty_allowlist(self): """An env with only pure components yields empty allowlist.""" env = make_env( '(defcomp ~a (&key) (div "hello"))', '(defcomp ~b (&key) (span "world"))', ) _compute_all_io_refs_fallback(env, IO_NAMES) allowed: set[str] = set() for val in env.values(): if isinstance(val, Component) and val.io_refs: allowed.update(val.io_refs) assert allowed == set() # --------------------------------------------------------------------------- # SxExpr serialization through IO proxy pipeline # --------------------------------------------------------------------------- class TestSxExprIoRoundtrip: """SxExpr (from highlight etc.) must survive serialize → parse.""" def test_sxexpr_serializes_unquoted(self): """SxExpr is emitted as raw SX source, not as a quoted string.""" expr = SxExpr('(span :class "text-red-500" "hello")') sx = serialize(expr) assert sx == '(span :class "text-red-500" "hello")' assert not sx.startswith('"') def test_sxexpr_roundtrip(self): """SxExpr → serialize → parse → yields an AST list.""" expr = SxExpr('(span :class "text-violet-600" "keyword")') sx = serialize(expr) parsed = parse_all(sx) assert len(parsed) == 1 # Should be a list: [Symbol("span"), Keyword("class"), "text-violet-600", "keyword"] node = parsed[0] assert isinstance(node, list) assert isinstance(node[0], Symbol) assert node[0].name == "span" def test_fragment_sxexpr_roundtrip(self): """Fragment SxExpr with multiple children.""" expr = SxExpr( '(<> (span :class "text-red-500" "if") ' '(span " ") ' '(span :class "text-green-500" "true"))' ) sx = serialize(expr) parsed = parse_all(sx) assert len(parsed) == 1 node = parsed[0] assert isinstance(node, list) assert node[0].name == "<>" def test_nil_serializes_as_nil(self): """None result from IO proxy serializes as 'nil'.""" sx = serialize(None) assert sx == "nil" parsed = parse_all(sx) assert parsed[0] is NIL or parsed[0] is None def test_sxexpr_in_dict_value(self): """SxExpr as a dict value serializes inline (not quoted).""" expr = SxExpr('(span "hello")') data = {"code": expr} sx = serialize(data) # Should be {:code (span "hello")} not {:code "(span \"hello\")"} assert '(span "hello")' in sx parsed = parse_all(sx) d = parsed[0] # The value should be a list (AST), not a string assert isinstance(d["code"], list) # --------------------------------------------------------------------------- # IO proxy arg parsing (GET query string vs POST JSON body) # --------------------------------------------------------------------------- class TestIoProxyArgParsing: """Test the arg extraction logic used by the IO proxy.""" def test_get_args_from_query_string(self): """GET: _arg0, _arg1, ... become positional args.""" query = {"_arg0": "(defcomp ~card ...)", "_arg1": "lisp"} args = [] kwargs = {} for k, v in query.items(): if k.startswith("_arg"): args.append(v) else: kwargs[k] = v assert args == ["(defcomp ~card ...)", "lisp"] assert kwargs == {} def test_get_kwargs_from_query_string(self): """GET: non-_arg keys become kwargs.""" query = {"_arg0": "code", "language": "python"} args = [] kwargs = {} for k, v in query.items(): if k.startswith("_arg"): args.append(v) else: kwargs[k] = v assert args == ["code"] assert kwargs == {"language": "python"} def test_post_json_body(self): """POST: args and kwargs from JSON body.""" body = {"args": ["(defcomp ~card ...)", "lisp"], "kwargs": {}} args = body.get("args", []) kwargs = body.get("kwargs", {}) assert args == ["(defcomp ~card ...)", "lisp"] assert kwargs == {} # --------------------------------------------------------------------------- # IO-aware client routing logic (orchestration.sx) # --------------------------------------------------------------------------- class TestIoRoutingLogic: """Test the orchestration.sx routing decisions for IO pages. Uses the SX evaluator to run the actual routing logic. """ def _eval(self, src, env): from shared.sx.evaluator import _eval, _trampoline result = None for expr in parse_all(src): result = _trampoline(_eval(expr, env)) return result def test_io_deps_list_truthiness(self): """A non-empty io-deps list is truthy, empty is falsy.""" env = make_env() # Non-empty list — (and io-deps (not (empty? io-deps))) is truthy result = self._eval( '(let ((io-deps (list "highlight")))' ' (if (and io-deps (not (empty? io-deps))) true false))', env, ) assert result is True # Empty list — (and io-deps (not (empty? io-deps))) is falsy # (and short-circuits: empty list is falsy, returns []) result = self._eval( '(let ((io-deps (list)))' ' (if (and io-deps (not (empty? io-deps))) true false))', env, ) assert result is False def test_io_deps_from_parsed_page_entry(self): """io-deps field round-trips through serialize → parse correctly.""" entry_sx = '{:name "test" :io-deps ("highlight" "config")}' parsed = parse_all(entry_sx) entry = parsed[0] env = make_env() env["entry"] = entry io_deps = self._eval('(get entry "io-deps")', env) assert io_deps == ["highlight", "config"] has_io = self._eval( '(let ((d (get entry "io-deps")))' ' (and d (not (empty? d))))', env, ) assert has_io is True def test_empty_io_deps_from_parsed_page_entry(self): """Empty io-deps list means page is pure.""" entry_sx = '{:name "test" :io-deps ()}' parsed = parse_all(entry_sx) entry = parsed[0] env = make_env() env["entry"] = entry has_io = self._eval( '(let ((d (get entry "io-deps")))' ' (if (and d (not (empty? d))) true false))', env, ) assert has_io is False # --------------------------------------------------------------------------- # Cache key determinism for IO proxy # --------------------------------------------------------------------------- class TestIoCacheKey: """The client-side IO cache keys by name + args. Verify determinism.""" def test_same_args_same_key(self): """Identical calls produce identical cache keys.""" def make_key(name, args, kwargs=None): key = name for a in args: key += "\0" + str(a) if kwargs: for k, v in sorted(kwargs.items()): key += "\0" + k + "=" + str(v) return key k1 = make_key("highlight", ["(div 1)", "lisp"]) k2 = make_key("highlight", ["(div 1)", "lisp"]) assert k1 == k2 def test_different_args_different_key(self): def make_key(name, args): key = name for a in args: key += "\0" + str(a) return key k1 = make_key("highlight", ["(div 1)", "lisp"]) k2 = make_key("highlight", ["(div 2)", "lisp"]) assert k1 != k2 def test_different_name_different_key(self): def make_key(name, args): key = name for a in args: key += "\0" + str(a) return key k1 = make_key("highlight", ["code"]) k2 = make_key("config", ["code"]) assert k1 != k2