Previously these mutating operations were internal helpers in the JS bootstrapper but not declared in primitives.sx or registered in the Python evaluator. Now properly specced and available in both hosts. Removes mock injections from cache tests — they use real primitives. Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
424 lines
14 KiB
Python
424 lines
14 KiB
Python
"""Tests for Phase 4 page data pipeline.
|
|
|
|
Tests the serialize→parse roundtrip for data dicts (SX wire format),
|
|
the kebab-case key conversion, component dep computation for
|
|
:data pages, and the client data cache logic.
|
|
"""
|
|
|
|
import pytest
|
|
from shared.sx.parser import parse, parse_all, serialize
|
|
from shared.sx.types import Symbol, Keyword, NIL
|
|
|
|
|
|
# ---------------------------------------------------------------------------
|
|
# SX wire format roundtrip — data dicts
|
|
# ---------------------------------------------------------------------------
|
|
|
|
class TestDataSerializeRoundtrip:
|
|
"""Data dicts must survive serialize → parse as SX wire format."""
|
|
|
|
def test_simple_dict(self):
|
|
data = {"name": "hello", "count": 42}
|
|
sx = serialize(data)
|
|
parsed = parse_all(sx)
|
|
assert len(parsed) == 1
|
|
d = parsed[0]
|
|
assert d["name"] == "hello"
|
|
assert d["count"] == 42
|
|
|
|
def test_nested_list(self):
|
|
data = {"items": [1, 2, 3]}
|
|
sx = serialize(data)
|
|
parsed = parse_all(sx)
|
|
d = parsed[0]
|
|
assert d["items"] == [1, 2, 3]
|
|
|
|
def test_nested_dict(self):
|
|
data = {"user": {"name": "alice", "active": True}}
|
|
sx = serialize(data)
|
|
parsed = parse_all(sx)
|
|
d = parsed[0]
|
|
assert d["user"]["name"] == "alice"
|
|
assert d["user"]["active"] is True
|
|
|
|
def test_nil_value(self):
|
|
data = {"value": None}
|
|
sx = serialize(data)
|
|
parsed = parse_all(sx)
|
|
d = parsed[0]
|
|
assert d["value"] is NIL or d["value"] is None
|
|
|
|
def test_boolean_values(self):
|
|
data = {"yes": True, "no": False}
|
|
sx = serialize(data)
|
|
parsed = parse_all(sx)
|
|
d = parsed[0]
|
|
assert d["yes"] is True
|
|
assert d["no"] is False
|
|
|
|
def test_string_with_special_chars(self):
|
|
data = {"msg": 'He said "hello"\nNew line'}
|
|
sx = serialize(data)
|
|
parsed = parse_all(sx)
|
|
d = parsed[0]
|
|
assert d["msg"] == 'He said "hello"\nNew line'
|
|
|
|
def test_empty_dict(self):
|
|
data = {}
|
|
sx = serialize(data)
|
|
parsed = parse_all(sx)
|
|
d = parsed[0]
|
|
assert d == {}
|
|
|
|
def test_list_of_dicts(self):
|
|
"""Data helpers often return lists of dicts (e.g. items)."""
|
|
data = {"items": [
|
|
{"label": "A", "value": 1},
|
|
{"label": "B", "value": 2},
|
|
]}
|
|
sx = serialize(data)
|
|
parsed = parse_all(sx)
|
|
d = parsed[0]
|
|
items = d["items"]
|
|
assert len(items) == 2
|
|
assert items[0]["label"] == "A"
|
|
assert items[1]["value"] == 2
|
|
|
|
def test_float_values(self):
|
|
data = {"pi": 3.14, "neg": -0.5}
|
|
sx = serialize(data)
|
|
parsed = parse_all(sx)
|
|
d = parsed[0]
|
|
assert d["pi"] == 3.14
|
|
assert d["neg"] == -0.5
|
|
|
|
def test_empty_string(self):
|
|
data = {"empty": ""}
|
|
sx = serialize(data)
|
|
parsed = parse_all(sx)
|
|
d = parsed[0]
|
|
assert d["empty"] == ""
|
|
|
|
def test_empty_list(self):
|
|
data = {"items": []}
|
|
sx = serialize(data)
|
|
parsed = parse_all(sx)
|
|
d = parsed[0]
|
|
assert d["items"] == []
|
|
|
|
|
|
# ---------------------------------------------------------------------------
|
|
# Kebab-case key conversion
|
|
# ---------------------------------------------------------------------------
|
|
|
|
class TestKebabCaseKeys:
|
|
"""evaluate_page_data converts underscore keys to kebab-case."""
|
|
|
|
def _kebab(self, d):
|
|
"""Same logic as evaluate_page_data."""
|
|
return {k.replace("_", "-"): v for k, v in d.items()}
|
|
|
|
def test_underscores_to_kebab(self):
|
|
d = {"total_count": 5, "is_active": True}
|
|
result = self._kebab(d)
|
|
assert "total-count" in result
|
|
assert "is-active" in result
|
|
assert result["total-count"] == 5
|
|
|
|
def test_no_underscores_unchanged(self):
|
|
d = {"name": "hello", "count": 3}
|
|
result = self._kebab(d)
|
|
assert result == d
|
|
|
|
def test_already_kebab_unchanged(self):
|
|
d = {"my-key": "val"}
|
|
result = self._kebab(d)
|
|
assert result == {"my-key": "val"}
|
|
|
|
def test_kebab_then_serialize_roundtrip(self):
|
|
"""Full pipeline: kebab-case → serialize → parse."""
|
|
data = {"total_count": 5, "page_title": "Test"}
|
|
kebab = self._kebab(data)
|
|
sx = serialize(kebab)
|
|
parsed = parse_all(sx)
|
|
d = parsed[0]
|
|
assert d["total-count"] == 5
|
|
assert d["page-title"] == "Test"
|
|
|
|
|
|
# ---------------------------------------------------------------------------
|
|
# Component deps for :data pages
|
|
# ---------------------------------------------------------------------------
|
|
|
|
class TestDataPageDeps:
|
|
"""_build_pages_sx should compute deps for :data pages too."""
|
|
|
|
def test_deps_computed_for_data_page(self):
|
|
from shared.sx.deps import components_needed
|
|
from shared.sx.parser import parse_all as pa
|
|
from shared.sx.evaluator import _eval, _trampoline
|
|
|
|
# Define a component
|
|
env = {}
|
|
for expr in pa('(defcomp ~card (&key title) (div title))'):
|
|
_trampoline(_eval(expr, env))
|
|
|
|
# Content that uses ~card — this is what a :data page's content looks like
|
|
content_src = '(~card :title page-title)'
|
|
|
|
deps = components_needed(content_src, env)
|
|
assert "~card" in deps
|
|
|
|
def test_deps_transitive_for_data_page(self):
|
|
from shared.sx.deps import components_needed
|
|
from shared.sx.parser import parse_all as pa
|
|
from shared.sx.evaluator import _eval, _trampoline
|
|
|
|
env = {}
|
|
source = """
|
|
(defcomp ~inner (&key text) (span text))
|
|
(defcomp ~outer (&key title) (div (~inner :text title)))
|
|
"""
|
|
for expr in pa(source):
|
|
_trampoline(_eval(expr, env))
|
|
|
|
content_src = '(~outer :title page-title)'
|
|
|
|
deps = components_needed(content_src, env)
|
|
assert "~outer" in deps
|
|
assert "~inner" in deps
|
|
|
|
|
|
# ---------------------------------------------------------------------------
|
|
# Full data pipeline simulation
|
|
# ---------------------------------------------------------------------------
|
|
|
|
class TestDataPipelineSimulation:
|
|
"""Simulate the full data page pipeline without Quart context.
|
|
|
|
Server: data_helper() → dict → kebab-case → serialize → SX text
|
|
Client: SX text → parse → dict → merge into env → eval content
|
|
|
|
Note: uses str/list ops instead of HTML tags since the bare evaluator
|
|
doesn't have the HTML tag registry. The real client uses renderToDom.
|
|
"""
|
|
|
|
def test_full_pipeline(self):
|
|
from shared.sx.parser import parse_all as pa
|
|
from shared.sx.evaluator import _eval, _trampoline
|
|
|
|
# 1. Define a component that uses only pure primitives
|
|
env = {}
|
|
for expr in pa('(defcomp ~greeting (&key name time) (str "Hello " name " at " time))'):
|
|
_trampoline(_eval(expr, env))
|
|
|
|
# 2. Server: data helper returns a dict
|
|
data_result = {"user_name": "Alice", "server_time": "12:00"}
|
|
|
|
# 3. Server: kebab-case + serialize
|
|
kebab = {k.replace("_", "-"): v for k, v in data_result.items()}
|
|
sx_wire = serialize(kebab)
|
|
|
|
# 4. Client: parse SX wire format
|
|
parsed = pa(sx_wire)
|
|
assert len(parsed) == 1
|
|
data_dict = parsed[0]
|
|
|
|
# 5. Client: merge data into env
|
|
env.update(data_dict)
|
|
|
|
# 6. Client: eval content expression
|
|
content_src = '(~greeting :name user-name :time server-time)'
|
|
for expr in pa(content_src):
|
|
result = _trampoline(_eval(expr, env))
|
|
|
|
assert result == "Hello Alice at 12:00"
|
|
|
|
def test_pipeline_with_list_data(self):
|
|
from shared.sx.parser import parse_all as pa
|
|
from shared.sx.evaluator import _eval, _trampoline
|
|
|
|
env = {}
|
|
for expr in pa('''
|
|
(defcomp ~item-list (&key items)
|
|
(map (fn (item) (get item "label")) items))
|
|
'''):
|
|
_trampoline(_eval(expr, env))
|
|
|
|
# Server data
|
|
data_result = {"items": [{"label": "One"}, {"label": "Two"}]}
|
|
sx_wire = serialize(data_result)
|
|
|
|
# Client parse + merge + eval
|
|
data_dict = pa(sx_wire)[0]
|
|
env.update(data_dict)
|
|
|
|
result = None
|
|
for expr in pa('(~item-list :items items)'):
|
|
result = _trampoline(_eval(expr, env))
|
|
|
|
assert result == ["One", "Two"]
|
|
|
|
def test_pipeline_data_isolation(self):
|
|
"""Different data for the same content produces different results."""
|
|
from shared.sx.parser import parse_all as pa
|
|
from shared.sx.evaluator import _eval, _trampoline
|
|
|
|
env = {}
|
|
for expr in pa('(defcomp ~page (&key title count) (str title ": " count))'):
|
|
_trampoline(_eval(expr, env))
|
|
|
|
# Two different data payloads
|
|
for title, count, expected in [
|
|
("Posts", 42, "Posts: 42"),
|
|
("Users", 7, "Users: 7"),
|
|
]:
|
|
data = {"title": title, "count": count}
|
|
sx_wire = serialize(data)
|
|
data_dict = pa(sx_wire)[0]
|
|
|
|
page_env = dict(env)
|
|
page_env.update(data_dict)
|
|
|
|
for expr in pa('(~page :title title :count count)'):
|
|
result = _trampoline(_eval(expr, page_env))
|
|
assert result == expected
|
|
|
|
|
|
# ---------------------------------------------------------------------------
|
|
# Client data cache
|
|
# ---------------------------------------------------------------------------
|
|
|
|
class TestDataCache:
|
|
"""Test the page data cache logic from orchestration.sx.
|
|
|
|
The cache functions are pure SX evaluated with a mock now-ms primitive.
|
|
"""
|
|
|
|
def _make_env(self, current_time_ms=1000):
|
|
"""Create an env with cache functions and a controllable now-ms."""
|
|
from shared.sx.parser import parse_all as pa
|
|
from shared.sx.evaluator import _eval, _trampoline
|
|
|
|
env = {}
|
|
# Mock now-ms as a callable that returns current_time_ms
|
|
self._time = current_time_ms
|
|
env["now-ms"] = lambda: self._time
|
|
|
|
# Define the cache functions from orchestration.sx
|
|
cache_src = """
|
|
(define _page-data-cache (dict))
|
|
(define _page-data-cache-ttl 30000)
|
|
|
|
(define page-data-cache-key
|
|
(fn (page-name params)
|
|
(let ((base page-name))
|
|
(if (or (nil? params) (empty? (keys params)))
|
|
base
|
|
(let ((parts (list)))
|
|
(for-each
|
|
(fn (k)
|
|
(append! parts (str k "=" (get params k))))
|
|
(keys params))
|
|
(str base ":" (join "&" parts)))))))
|
|
|
|
(define page-data-cache-get
|
|
(fn (cache-key)
|
|
(let ((entry (get _page-data-cache cache-key)))
|
|
(if (nil? entry)
|
|
nil
|
|
(if (> (- (now-ms) (get entry "ts")) _page-data-cache-ttl)
|
|
(do
|
|
(dict-set! _page-data-cache cache-key nil)
|
|
nil)
|
|
(get entry "data"))))))
|
|
|
|
(define page-data-cache-set
|
|
(fn (cache-key data)
|
|
(dict-set! _page-data-cache cache-key
|
|
{"data" data "ts" (now-ms)})))
|
|
"""
|
|
for expr in pa(cache_src):
|
|
_trampoline(_eval(expr, env))
|
|
return env
|
|
|
|
def _eval(self, src, env):
|
|
from shared.sx.parser import parse_all as pa
|
|
from shared.sx.evaluator import _eval, _trampoline
|
|
result = None
|
|
for expr in pa(src):
|
|
result = _trampoline(_eval(expr, env))
|
|
return result
|
|
|
|
def test_cache_key_no_params(self):
|
|
env = self._make_env()
|
|
result = self._eval('(page-data-cache-key "data-test" {})', env)
|
|
assert result == "data-test"
|
|
|
|
def test_cache_key_with_params(self):
|
|
env = self._make_env()
|
|
result = self._eval('(page-data-cache-key "reference" {"slug" "div"})', env)
|
|
assert result == "reference:slug=div"
|
|
|
|
def test_cache_key_nil_params(self):
|
|
env = self._make_env()
|
|
result = self._eval('(page-data-cache-key "data-test" nil)', env)
|
|
assert result == "data-test"
|
|
|
|
def test_cache_miss_returns_nil(self):
|
|
env = self._make_env()
|
|
result = self._eval('(page-data-cache-get "nonexistent")', env)
|
|
assert result is NIL or result is None
|
|
|
|
def test_cache_set_then_get(self):
|
|
env = self._make_env(current_time_ms=1000)
|
|
self._eval('(page-data-cache-set "test-page" {"title" "Hello"})', env)
|
|
result = self._eval('(page-data-cache-get "test-page")', env)
|
|
assert result["title"] == "Hello"
|
|
|
|
def test_cache_hit_within_ttl(self):
|
|
env = self._make_env(current_time_ms=1000)
|
|
self._eval('(page-data-cache-set "test-page" {"val" 42})', env)
|
|
# Advance time by 10 seconds (within 30s TTL)
|
|
self._time = 11000
|
|
result = self._eval('(page-data-cache-get "test-page")', env)
|
|
assert result["val"] == 42
|
|
|
|
def test_cache_expired_returns_nil(self):
|
|
env = self._make_env(current_time_ms=1000)
|
|
self._eval('(page-data-cache-set "test-page" {"val" 42})', env)
|
|
# Advance time by 31 seconds (past 30s TTL)
|
|
self._time = 32000
|
|
result = self._eval('(page-data-cache-get "test-page")', env)
|
|
assert result is NIL or result is None
|
|
|
|
def test_cache_overwrite(self):
|
|
env = self._make_env(current_time_ms=1000)
|
|
self._eval('(page-data-cache-set "p" {"v" 1})', env)
|
|
self._time = 2000
|
|
self._eval('(page-data-cache-set "p" {"v" 2})', env)
|
|
result = self._eval('(page-data-cache-get "p")', env)
|
|
assert result["v"] == 2
|
|
|
|
def test_cache_different_keys_independent(self):
|
|
env = self._make_env(current_time_ms=1000)
|
|
self._eval('(page-data-cache-set "a" {"x" 1})', env)
|
|
self._eval('(page-data-cache-set "b" {"x" 2})', env)
|
|
a = self._eval('(page-data-cache-get "a")', env)
|
|
b = self._eval('(page-data-cache-get "b")', env)
|
|
assert a["x"] == 1
|
|
assert b["x"] == 2
|
|
|
|
def test_cache_complex_data(self):
|
|
"""Cache preserves nested dicts and lists."""
|
|
env = self._make_env(current_time_ms=1000)
|
|
self._eval("""
|
|
(page-data-cache-set "complex"
|
|
{"items" (list {"label" "A"} {"label" "B"})
|
|
"count" 2})
|
|
""", env)
|
|
result = self._eval('(page-data-cache-get "complex")', env)
|
|
assert result["count"] == 2
|
|
assert len(result["items"]) == 2
|
|
assert result["items"][0]["label"] == "A"
|