- 10 new tests: cache key generation, set/get, TTL expiry, overwrite, key independence, complex nested data - Update data-test.sx with cache verification instructions: navigate away+back within 30s → client+cache, after 30s → new fetch Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
434 lines
15 KiB
Python
434 lines
15 KiB
Python
"""Tests for Phase 4 page data pipeline.
|
|
|
|
Tests the serialize→parse roundtrip for data dicts (SX wire format),
|
|
the kebab-case key conversion, component dep computation for
|
|
:data pages, and the client data cache logic.
|
|
"""
|
|
|
|
import pytest
|
|
from shared.sx.parser import parse, parse_all, serialize
|
|
from shared.sx.types import Symbol, Keyword, NIL
|
|
|
|
|
|
# ---------------------------------------------------------------------------
|
|
# SX wire format roundtrip — data dicts
|
|
# ---------------------------------------------------------------------------
|
|
|
|
class TestDataSerializeRoundtrip:
|
|
"""Data dicts must survive serialize → parse as SX wire format."""
|
|
|
|
def test_simple_dict(self):
|
|
data = {"name": "hello", "count": 42}
|
|
sx = serialize(data)
|
|
parsed = parse_all(sx)
|
|
assert len(parsed) == 1
|
|
d = parsed[0]
|
|
assert d["name"] == "hello"
|
|
assert d["count"] == 42
|
|
|
|
def test_nested_list(self):
|
|
data = {"items": [1, 2, 3]}
|
|
sx = serialize(data)
|
|
parsed = parse_all(sx)
|
|
d = parsed[0]
|
|
assert d["items"] == [1, 2, 3]
|
|
|
|
def test_nested_dict(self):
|
|
data = {"user": {"name": "alice", "active": True}}
|
|
sx = serialize(data)
|
|
parsed = parse_all(sx)
|
|
d = parsed[0]
|
|
assert d["user"]["name"] == "alice"
|
|
assert d["user"]["active"] is True
|
|
|
|
def test_nil_value(self):
|
|
data = {"value": None}
|
|
sx = serialize(data)
|
|
parsed = parse_all(sx)
|
|
d = parsed[0]
|
|
assert d["value"] is NIL or d["value"] is None
|
|
|
|
def test_boolean_values(self):
|
|
data = {"yes": True, "no": False}
|
|
sx = serialize(data)
|
|
parsed = parse_all(sx)
|
|
d = parsed[0]
|
|
assert d["yes"] is True
|
|
assert d["no"] is False
|
|
|
|
def test_string_with_special_chars(self):
|
|
data = {"msg": 'He said "hello"\nNew line'}
|
|
sx = serialize(data)
|
|
parsed = parse_all(sx)
|
|
d = parsed[0]
|
|
assert d["msg"] == 'He said "hello"\nNew line'
|
|
|
|
def test_empty_dict(self):
|
|
data = {}
|
|
sx = serialize(data)
|
|
parsed = parse_all(sx)
|
|
d = parsed[0]
|
|
assert d == {}
|
|
|
|
def test_list_of_dicts(self):
|
|
"""Data helpers often return lists of dicts (e.g. items)."""
|
|
data = {"items": [
|
|
{"label": "A", "value": 1},
|
|
{"label": "B", "value": 2},
|
|
]}
|
|
sx = serialize(data)
|
|
parsed = parse_all(sx)
|
|
d = parsed[0]
|
|
items = d["items"]
|
|
assert len(items) == 2
|
|
assert items[0]["label"] == "A"
|
|
assert items[1]["value"] == 2
|
|
|
|
def test_float_values(self):
|
|
data = {"pi": 3.14, "neg": -0.5}
|
|
sx = serialize(data)
|
|
parsed = parse_all(sx)
|
|
d = parsed[0]
|
|
assert d["pi"] == 3.14
|
|
assert d["neg"] == -0.5
|
|
|
|
def test_empty_string(self):
|
|
data = {"empty": ""}
|
|
sx = serialize(data)
|
|
parsed = parse_all(sx)
|
|
d = parsed[0]
|
|
assert d["empty"] == ""
|
|
|
|
def test_empty_list(self):
|
|
data = {"items": []}
|
|
sx = serialize(data)
|
|
parsed = parse_all(sx)
|
|
d = parsed[0]
|
|
assert d["items"] == []
|
|
|
|
|
|
# ---------------------------------------------------------------------------
|
|
# Kebab-case key conversion
|
|
# ---------------------------------------------------------------------------
|
|
|
|
class TestKebabCaseKeys:
|
|
"""evaluate_page_data converts underscore keys to kebab-case."""
|
|
|
|
def _kebab(self, d):
|
|
"""Same logic as evaluate_page_data."""
|
|
return {k.replace("_", "-"): v for k, v in d.items()}
|
|
|
|
def test_underscores_to_kebab(self):
|
|
d = {"total_count": 5, "is_active": True}
|
|
result = self._kebab(d)
|
|
assert "total-count" in result
|
|
assert "is-active" in result
|
|
assert result["total-count"] == 5
|
|
|
|
def test_no_underscores_unchanged(self):
|
|
d = {"name": "hello", "count": 3}
|
|
result = self._kebab(d)
|
|
assert result == d
|
|
|
|
def test_already_kebab_unchanged(self):
|
|
d = {"my-key": "val"}
|
|
result = self._kebab(d)
|
|
assert result == {"my-key": "val"}
|
|
|
|
def test_kebab_then_serialize_roundtrip(self):
|
|
"""Full pipeline: kebab-case → serialize → parse."""
|
|
data = {"total_count": 5, "page_title": "Test"}
|
|
kebab = self._kebab(data)
|
|
sx = serialize(kebab)
|
|
parsed = parse_all(sx)
|
|
d = parsed[0]
|
|
assert d["total-count"] == 5
|
|
assert d["page-title"] == "Test"
|
|
|
|
|
|
# ---------------------------------------------------------------------------
|
|
# Component deps for :data pages
|
|
# ---------------------------------------------------------------------------
|
|
|
|
class TestDataPageDeps:
|
|
"""_build_pages_sx should compute deps for :data pages too."""
|
|
|
|
def test_deps_computed_for_data_page(self):
|
|
from shared.sx.deps import components_needed
|
|
from shared.sx.parser import parse_all as pa
|
|
from shared.sx.evaluator import _eval, _trampoline
|
|
|
|
# Define a component
|
|
env = {}
|
|
for expr in pa('(defcomp ~card (&key title) (div title))'):
|
|
_trampoline(_eval(expr, env))
|
|
|
|
# Content that uses ~card — this is what a :data page's content looks like
|
|
content_src = '(~card :title page-title)'
|
|
|
|
deps = components_needed(content_src, env)
|
|
assert "~card" in deps
|
|
|
|
def test_deps_transitive_for_data_page(self):
|
|
from shared.sx.deps import components_needed
|
|
from shared.sx.parser import parse_all as pa
|
|
from shared.sx.evaluator import _eval, _trampoline
|
|
|
|
env = {}
|
|
source = """
|
|
(defcomp ~inner (&key text) (span text))
|
|
(defcomp ~outer (&key title) (div (~inner :text title)))
|
|
"""
|
|
for expr in pa(source):
|
|
_trampoline(_eval(expr, env))
|
|
|
|
content_src = '(~outer :title page-title)'
|
|
|
|
deps = components_needed(content_src, env)
|
|
assert "~outer" in deps
|
|
assert "~inner" in deps
|
|
|
|
|
|
# ---------------------------------------------------------------------------
|
|
# Full data pipeline simulation
|
|
# ---------------------------------------------------------------------------
|
|
|
|
class TestDataPipelineSimulation:
|
|
"""Simulate the full data page pipeline without Quart context.
|
|
|
|
Server: data_helper() → dict → kebab-case → serialize → SX text
|
|
Client: SX text → parse → dict → merge into env → eval content
|
|
|
|
Note: uses str/list ops instead of HTML tags since the bare evaluator
|
|
doesn't have the HTML tag registry. The real client uses renderToDom.
|
|
"""
|
|
|
|
def test_full_pipeline(self):
|
|
from shared.sx.parser import parse_all as pa
|
|
from shared.sx.evaluator import _eval, _trampoline
|
|
|
|
# 1. Define a component that uses only pure primitives
|
|
env = {}
|
|
for expr in pa('(defcomp ~greeting (&key name time) (str "Hello " name " at " time))'):
|
|
_trampoline(_eval(expr, env))
|
|
|
|
# 2. Server: data helper returns a dict
|
|
data_result = {"user_name": "Alice", "server_time": "12:00"}
|
|
|
|
# 3. Server: kebab-case + serialize
|
|
kebab = {k.replace("_", "-"): v for k, v in data_result.items()}
|
|
sx_wire = serialize(kebab)
|
|
|
|
# 4. Client: parse SX wire format
|
|
parsed = pa(sx_wire)
|
|
assert len(parsed) == 1
|
|
data_dict = parsed[0]
|
|
|
|
# 5. Client: merge data into env
|
|
env.update(data_dict)
|
|
|
|
# 6. Client: eval content expression
|
|
content_src = '(~greeting :name user-name :time server-time)'
|
|
for expr in pa(content_src):
|
|
result = _trampoline(_eval(expr, env))
|
|
|
|
assert result == "Hello Alice at 12:00"
|
|
|
|
def test_pipeline_with_list_data(self):
|
|
from shared.sx.parser import parse_all as pa
|
|
from shared.sx.evaluator import _eval, _trampoline
|
|
|
|
env = {}
|
|
for expr in pa('''
|
|
(defcomp ~item-list (&key items)
|
|
(map (fn (item) (get item "label")) items))
|
|
'''):
|
|
_trampoline(_eval(expr, env))
|
|
|
|
# Server data
|
|
data_result = {"items": [{"label": "One"}, {"label": "Two"}]}
|
|
sx_wire = serialize(data_result)
|
|
|
|
# Client parse + merge + eval
|
|
data_dict = pa(sx_wire)[0]
|
|
env.update(data_dict)
|
|
|
|
result = None
|
|
for expr in pa('(~item-list :items items)'):
|
|
result = _trampoline(_eval(expr, env))
|
|
|
|
assert result == ["One", "Two"]
|
|
|
|
def test_pipeline_data_isolation(self):
|
|
"""Different data for the same content produces different results."""
|
|
from shared.sx.parser import parse_all as pa
|
|
from shared.sx.evaluator import _eval, _trampoline
|
|
|
|
env = {}
|
|
for expr in pa('(defcomp ~page (&key title count) (str title ": " count))'):
|
|
_trampoline(_eval(expr, env))
|
|
|
|
# Two different data payloads
|
|
for title, count, expected in [
|
|
("Posts", 42, "Posts: 42"),
|
|
("Users", 7, "Users: 7"),
|
|
]:
|
|
data = {"title": title, "count": count}
|
|
sx_wire = serialize(data)
|
|
data_dict = pa(sx_wire)[0]
|
|
|
|
page_env = dict(env)
|
|
page_env.update(data_dict)
|
|
|
|
for expr in pa('(~page :title title :count count)'):
|
|
result = _trampoline(_eval(expr, page_env))
|
|
assert result == expected
|
|
|
|
|
|
# ---------------------------------------------------------------------------
|
|
# Client data cache
|
|
# ---------------------------------------------------------------------------
|
|
|
|
class TestDataCache:
|
|
"""Test the page data cache logic from orchestration.sx.
|
|
|
|
The cache functions are pure SX evaluated with a mock now-ms primitive.
|
|
"""
|
|
|
|
def _make_env(self, current_time_ms=1000):
|
|
"""Create an env with cache functions and a controllable now-ms."""
|
|
from shared.sx.parser import parse_all as pa
|
|
from shared.sx.evaluator import _eval, _trampoline
|
|
|
|
env = {}
|
|
# Mock now-ms as a callable that returns current_time_ms
|
|
self._time = current_time_ms
|
|
env["now-ms"] = lambda: self._time
|
|
|
|
# Mutating primitives needed by cache (available in JS, not bare Python)
|
|
def _dict_set(d, k, v):
|
|
d[k] = v
|
|
return v
|
|
def _append_b(lst, item):
|
|
lst.append(item)
|
|
return lst
|
|
env["dict-set!"] = _dict_set
|
|
env["append!"] = _append_b
|
|
|
|
# Define the cache functions from orchestration.sx
|
|
cache_src = """
|
|
(define _page-data-cache (dict))
|
|
(define _page-data-cache-ttl 30000)
|
|
|
|
(define page-data-cache-key
|
|
(fn (page-name params)
|
|
(let ((base page-name))
|
|
(if (or (nil? params) (empty? (keys params)))
|
|
base
|
|
(let ((parts (list)))
|
|
(for-each
|
|
(fn (k)
|
|
(append! parts (str k "=" (get params k))))
|
|
(keys params))
|
|
(str base ":" (join "&" parts)))))))
|
|
|
|
(define page-data-cache-get
|
|
(fn (cache-key)
|
|
(let ((entry (get _page-data-cache cache-key)))
|
|
(if (nil? entry)
|
|
nil
|
|
(if (> (- (now-ms) (get entry "ts")) _page-data-cache-ttl)
|
|
(do
|
|
(dict-set! _page-data-cache cache-key nil)
|
|
nil)
|
|
(get entry "data"))))))
|
|
|
|
(define page-data-cache-set
|
|
(fn (cache-key data)
|
|
(dict-set! _page-data-cache cache-key
|
|
{"data" data "ts" (now-ms)})))
|
|
"""
|
|
for expr in pa(cache_src):
|
|
_trampoline(_eval(expr, env))
|
|
return env
|
|
|
|
def _eval(self, src, env):
|
|
from shared.sx.parser import parse_all as pa
|
|
from shared.sx.evaluator import _eval, _trampoline
|
|
result = None
|
|
for expr in pa(src):
|
|
result = _trampoline(_eval(expr, env))
|
|
return result
|
|
|
|
def test_cache_key_no_params(self):
|
|
env = self._make_env()
|
|
result = self._eval('(page-data-cache-key "data-test" {})', env)
|
|
assert result == "data-test"
|
|
|
|
def test_cache_key_with_params(self):
|
|
env = self._make_env()
|
|
result = self._eval('(page-data-cache-key "reference" {"slug" "div"})', env)
|
|
assert result == "reference:slug=div"
|
|
|
|
def test_cache_key_nil_params(self):
|
|
env = self._make_env()
|
|
result = self._eval('(page-data-cache-key "data-test" nil)', env)
|
|
assert result == "data-test"
|
|
|
|
def test_cache_miss_returns_nil(self):
|
|
env = self._make_env()
|
|
result = self._eval('(page-data-cache-get "nonexistent")', env)
|
|
assert result is NIL or result is None
|
|
|
|
def test_cache_set_then_get(self):
|
|
env = self._make_env(current_time_ms=1000)
|
|
self._eval('(page-data-cache-set "test-page" {"title" "Hello"})', env)
|
|
result = self._eval('(page-data-cache-get "test-page")', env)
|
|
assert result["title"] == "Hello"
|
|
|
|
def test_cache_hit_within_ttl(self):
|
|
env = self._make_env(current_time_ms=1000)
|
|
self._eval('(page-data-cache-set "test-page" {"val" 42})', env)
|
|
# Advance time by 10 seconds (within 30s TTL)
|
|
self._time = 11000
|
|
result = self._eval('(page-data-cache-get "test-page")', env)
|
|
assert result["val"] == 42
|
|
|
|
def test_cache_expired_returns_nil(self):
|
|
env = self._make_env(current_time_ms=1000)
|
|
self._eval('(page-data-cache-set "test-page" {"val" 42})', env)
|
|
# Advance time by 31 seconds (past 30s TTL)
|
|
self._time = 32000
|
|
result = self._eval('(page-data-cache-get "test-page")', env)
|
|
assert result is NIL or result is None
|
|
|
|
def test_cache_overwrite(self):
|
|
env = self._make_env(current_time_ms=1000)
|
|
self._eval('(page-data-cache-set "p" {"v" 1})', env)
|
|
self._time = 2000
|
|
self._eval('(page-data-cache-set "p" {"v" 2})', env)
|
|
result = self._eval('(page-data-cache-get "p")', env)
|
|
assert result["v"] == 2
|
|
|
|
def test_cache_different_keys_independent(self):
|
|
env = self._make_env(current_time_ms=1000)
|
|
self._eval('(page-data-cache-set "a" {"x" 1})', env)
|
|
self._eval('(page-data-cache-set "b" {"x" 2})', env)
|
|
a = self._eval('(page-data-cache-get "a")', env)
|
|
b = self._eval('(page-data-cache-get "b")', env)
|
|
assert a["x"] == 1
|
|
assert b["x"] == 2
|
|
|
|
def test_cache_complex_data(self):
|
|
"""Cache preserves nested dicts and lists."""
|
|
env = self._make_env(current_time_ms=1000)
|
|
self._eval("""
|
|
(page-data-cache-set "complex"
|
|
{"items" (list {"label" "A"} {"label" "B"})
|
|
"count" 2})
|
|
""", env)
|
|
result = self._eval('(page-data-cache-get "complex")', env)
|
|
assert result["count"] == 2
|
|
assert len(result["items"]) == 2
|
|
assert result["items"][0]["label"] == "A"
|