"""Tests for Phase 4 page data pipeline. Tests the serialize→parse roundtrip for data dicts (SX wire format), the kebab-case key conversion, component dep computation for :data pages, and the client data cache logic. """ import pytest from shared.sx.parser import parse, parse_all, serialize from shared.sx.types import Symbol, Keyword, NIL # --------------------------------------------------------------------------- # SX wire format roundtrip — data dicts # --------------------------------------------------------------------------- class TestDataSerializeRoundtrip: """Data dicts must survive serialize → parse as SX wire format.""" def test_simple_dict(self): data = {"name": "hello", "count": 42} sx = serialize(data) parsed = parse_all(sx) assert len(parsed) == 1 d = parsed[0] assert d["name"] == "hello" assert d["count"] == 42 def test_nested_list(self): data = {"items": [1, 2, 3]} sx = serialize(data) parsed = parse_all(sx) d = parsed[0] assert d["items"] == [1, 2, 3] def test_nested_dict(self): data = {"user": {"name": "alice", "active": True}} sx = serialize(data) parsed = parse_all(sx) d = parsed[0] assert d["user"]["name"] == "alice" assert d["user"]["active"] is True def test_nil_value(self): data = {"value": None} sx = serialize(data) parsed = parse_all(sx) d = parsed[0] assert d["value"] is NIL or d["value"] is None def test_boolean_values(self): data = {"yes": True, "no": False} sx = serialize(data) parsed = parse_all(sx) d = parsed[0] assert d["yes"] is True assert d["no"] is False def test_string_with_special_chars(self): data = {"msg": 'He said "hello"\nNew line'} sx = serialize(data) parsed = parse_all(sx) d = parsed[0] assert d["msg"] == 'He said "hello"\nNew line' def test_empty_dict(self): data = {} sx = serialize(data) parsed = parse_all(sx) d = parsed[0] assert d == {} def test_list_of_dicts(self): """Data helpers often return lists of dicts (e.g. items).""" data = {"items": [ {"label": "A", "value": 1}, {"label": "B", "value": 2}, ]} sx = serialize(data) parsed = parse_all(sx) d = parsed[0] items = d["items"] assert len(items) == 2 assert items[0]["label"] == "A" assert items[1]["value"] == 2 def test_float_values(self): data = {"pi": 3.14, "neg": -0.5} sx = serialize(data) parsed = parse_all(sx) d = parsed[0] assert d["pi"] == 3.14 assert d["neg"] == -0.5 def test_empty_string(self): data = {"empty": ""} sx = serialize(data) parsed = parse_all(sx) d = parsed[0] assert d["empty"] == "" def test_empty_list(self): data = {"items": []} sx = serialize(data) parsed = parse_all(sx) d = parsed[0] assert d["items"] == [] # --------------------------------------------------------------------------- # Kebab-case key conversion # --------------------------------------------------------------------------- class TestKebabCaseKeys: """evaluate_page_data converts underscore keys to kebab-case.""" def _kebab(self, d): """Same logic as evaluate_page_data.""" return {k.replace("_", "-"): v for k, v in d.items()} def test_underscores_to_kebab(self): d = {"total_count": 5, "is_active": True} result = self._kebab(d) assert "total-count" in result assert "is-active" in result assert result["total-count"] == 5 def test_no_underscores_unchanged(self): d = {"name": "hello", "count": 3} result = self._kebab(d) assert result == d def test_already_kebab_unchanged(self): d = {"my-key": "val"} result = self._kebab(d) assert result == {"my-key": "val"} def test_kebab_then_serialize_roundtrip(self): """Full pipeline: kebab-case → serialize → parse.""" data = {"total_count": 5, "page_title": "Test"} kebab = self._kebab(data) sx = serialize(kebab) parsed = parse_all(sx) d = parsed[0] assert d["total-count"] == 5 assert d["page-title"] == "Test" # --------------------------------------------------------------------------- # Component deps for :data pages # --------------------------------------------------------------------------- class TestDataPageDeps: """_build_pages_sx should compute deps for :data pages too.""" def test_deps_computed_for_data_page(self): from shared.sx.deps import components_needed from shared.sx.parser import parse_all as pa from shared.sx.ref.sx_ref import eval_expr as _eval, trampoline as _trampoline # Define a component env = {} for expr in pa('(defcomp ~card (&key title) (div title))'): _trampoline(_eval(expr, env)) # Content that uses ~card — this is what a :data page's content looks like content_src = '(~card :title page-title)' deps = components_needed(content_src, env) assert "~card" in deps def test_deps_transitive_for_data_page(self): from shared.sx.deps import components_needed from shared.sx.parser import parse_all as pa from shared.sx.ref.sx_ref import eval_expr as _eval, trampoline as _trampoline env = {} source = """ (defcomp ~inner (&key text) (span text)) (defcomp ~outer (&key title) (div (~inner :text title))) """ for expr in pa(source): _trampoline(_eval(expr, env)) content_src = '(~outer :title page-title)' deps = components_needed(content_src, env) assert "~outer" in deps assert "~inner" in deps # --------------------------------------------------------------------------- # Full data pipeline simulation # --------------------------------------------------------------------------- class TestDataPipelineSimulation: """Simulate the full data page pipeline without Quart context. Server: data_helper() → dict → kebab-case → serialize → SX text Client: SX text → parse → dict → merge into env → eval content Note: uses str/list ops instead of HTML tags since the bare evaluator doesn't have the HTML tag registry. The real client uses renderToDom. """ def test_full_pipeline(self): from shared.sx.parser import parse_all as pa from shared.sx.ref.sx_ref import eval_expr as _eval, trampoline as _trampoline # 1. Define a component that uses only pure primitives env = {} for expr in pa('(defcomp ~greeting (&key name time) (str "Hello " name " at " time))'): _trampoline(_eval(expr, env)) # 2. Server: data helper returns a dict data_result = {"user_name": "Alice", "server_time": "12:00"} # 3. Server: kebab-case + serialize kebab = {k.replace("_", "-"): v for k, v in data_result.items()} sx_wire = serialize(kebab) # 4. Client: parse SX wire format parsed = pa(sx_wire) assert len(parsed) == 1 data_dict = parsed[0] # 5. Client: merge data into env env.update(data_dict) # 6. Client: eval content expression content_src = '(~greeting :name user-name :time server-time)' for expr in pa(content_src): result = _trampoline(_eval(expr, env)) assert result == "Hello Alice at 12:00" def test_pipeline_with_list_data(self): from shared.sx.parser import parse_all as pa from shared.sx.ref.sx_ref import eval_expr as _eval, trampoline as _trampoline env = {} for expr in pa(''' (defcomp ~item-list (&key items) (map (fn (item) (get item "label")) items)) '''): _trampoline(_eval(expr, env)) # Server data data_result = {"items": [{"label": "One"}, {"label": "Two"}]} sx_wire = serialize(data_result) # Client parse + merge + eval data_dict = pa(sx_wire)[0] env.update(data_dict) result = None for expr in pa('(~item-list :items items)'): result = _trampoline(_eval(expr, env)) assert result == ["One", "Two"] def test_pipeline_data_isolation(self): """Different data for the same content produces different results.""" from shared.sx.parser import parse_all as pa from shared.sx.ref.sx_ref import eval_expr as _eval, trampoline as _trampoline env = {} for expr in pa('(defcomp ~page (&key title count) (str title ": " count))'): _trampoline(_eval(expr, env)) # Two different data payloads for title, count, expected in [ ("Posts", 42, "Posts: 42"), ("Users", 7, "Users: 7"), ]: data = {"title": title, "count": count} sx_wire = serialize(data) data_dict = pa(sx_wire)[0] page_env = dict(env) page_env.update(data_dict) for expr in pa('(~page :title title :count count)'): result = _trampoline(_eval(expr, page_env)) assert result == expected # --------------------------------------------------------------------------- # Client data cache # --------------------------------------------------------------------------- class TestDataCache: """Test the page data cache logic from orchestration.sx. The cache functions are pure SX evaluated with a mock now-ms primitive. """ def _make_env(self, current_time_ms=1000): """Create an env with cache functions and a controllable now-ms.""" from shared.sx.parser import parse_all as pa from shared.sx.ref.sx_ref import eval_expr as _eval, trampoline as _trampoline env = {} # Mock now-ms as a callable that returns current_time_ms self._time = current_time_ms env["now-ms"] = lambda: self._time # Define the cache functions from orchestration.sx cache_src = """ (define _page-data-cache (dict)) (define _page-data-cache-ttl 30000) (define page-data-cache-key (fn (page-name params) (let ((base page-name)) (if (or (nil? params) (empty? (keys params))) base (let ((parts (list))) (for-each (fn (k) (append! parts (str k "=" (get params k)))) (keys params)) (str base ":" (join "&" parts))))))) (define page-data-cache-get (fn (cache-key) (let ((entry (get _page-data-cache cache-key))) (if (nil? entry) nil (if (> (- (now-ms) (get entry "ts")) _page-data-cache-ttl) (do (dict-set! _page-data-cache cache-key nil) nil) (get entry "data")))))) (define page-data-cache-set (fn (cache-key data) (dict-set! _page-data-cache cache-key {"data" data "ts" (now-ms)}))) """ for expr in pa(cache_src): _trampoline(_eval(expr, env)) return env def _eval(self, src, env): from shared.sx.parser import parse_all as pa from shared.sx.ref.sx_ref import eval_expr as _eval, trampoline as _trampoline result = None for expr in pa(src): result = _trampoline(_eval(expr, env)) return result def test_cache_key_no_params(self): env = self._make_env() result = self._eval('(page-data-cache-key "data-test" {})', env) assert result == "data-test" def test_cache_key_with_params(self): env = self._make_env() result = self._eval('(page-data-cache-key "reference" {"slug" "div"})', env) assert result == "reference:slug=div" def test_cache_key_nil_params(self): env = self._make_env() result = self._eval('(page-data-cache-key "data-test" nil)', env) assert result == "data-test" def test_cache_miss_returns_nil(self): env = self._make_env() result = self._eval('(page-data-cache-get "nonexistent")', env) assert result is NIL or result is None def test_cache_set_then_get(self): env = self._make_env(current_time_ms=1000) self._eval('(page-data-cache-set "test-page" {"title" "Hello"})', env) result = self._eval('(page-data-cache-get "test-page")', env) assert result["title"] == "Hello" def test_cache_hit_within_ttl(self): env = self._make_env(current_time_ms=1000) self._eval('(page-data-cache-set "test-page" {"val" 42})', env) # Advance time by 10 seconds (within 30s TTL) self._time = 11000 result = self._eval('(page-data-cache-get "test-page")', env) assert result["val"] == 42 def test_cache_expired_returns_nil(self): env = self._make_env(current_time_ms=1000) self._eval('(page-data-cache-set "test-page" {"val" 42})', env) # Advance time by 31 seconds (past 30s TTL) self._time = 32000 result = self._eval('(page-data-cache-get "test-page")', env) assert result is NIL or result is None def test_cache_overwrite(self): env = self._make_env(current_time_ms=1000) self._eval('(page-data-cache-set "p" {"v" 1})', env) self._time = 2000 self._eval('(page-data-cache-set "p" {"v" 2})', env) result = self._eval('(page-data-cache-get "p")', env) assert result["v"] == 2 def test_cache_different_keys_independent(self): env = self._make_env(current_time_ms=1000) self._eval('(page-data-cache-set "a" {"x" 1})', env) self._eval('(page-data-cache-set "b" {"x" 2})', env) a = self._eval('(page-data-cache-get "a")', env) b = self._eval('(page-data-cache-get "b")', env) assert a["x"] == 1 assert b["x"] == 2 def test_cache_complex_data(self): """Cache preserves nested dicts and lists.""" env = self._make_env(current_time_ms=1000) self._eval(""" (page-data-cache-set "complex" {"items" (list {"label" "A"} {"label" "B"}) "count" 2}) """, env) result = self._eval('(page-data-cache-get "complex")', env) assert result["count"] == 2 assert len(result["items"]) == 2 assert result["items"][0]["label"] == "A"