7c — Optimistic Data Updates: - orchestration.sx: optimistic-cache-update/revert/confirm + submit-mutation - pages.py: mount_action_endpoint at /sx/action/<name> for client mutations - optimistic-demo.sx: live demo with todo list, pending/confirmed/reverted states - helpers.py: demo data + add-demo-item action handler 7d — Offline Data Layer: - orchestration.sx: connectivity tracking, offline-queue-mutation, offline-sync, offline-aware-mutation (routes online→submit, offline→queue) - offline-demo.sx: live demo with notes, connectivity indicator, sync timeline - helpers.py: offline demo data Also updates plans.sx: marks Phase 7 fully complete (all 6 sub-phases 7a-7f). Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
970 lines
34 KiB
Python
970 lines
34 KiB
Python
"""Page helper registration for sx docs.
|
|
|
|
All helpers return data values (dicts, lists) — no sx_call(), no SxExpr.
|
|
Markup composition lives entirely in .sx files.
|
|
"""
|
|
from __future__ import annotations
|
|
|
|
|
|
def _register_sx_helpers() -> None:
|
|
"""Register Python data helpers as page helpers."""
|
|
from shared.sx.pages import register_page_helpers
|
|
from content.highlight import highlight as _highlight
|
|
|
|
register_page_helpers("sx", {
|
|
"highlight": _highlight,
|
|
"primitives-data": _primitives_data,
|
|
"special-forms-data": _special_forms_data,
|
|
"reference-data": _reference_data,
|
|
"attr-detail-data": _attr_detail_data,
|
|
"header-detail-data": _header_detail_data,
|
|
"event-detail-data": _event_detail_data,
|
|
"read-spec-file": _read_spec_file,
|
|
"bootstrapper-data": _bootstrapper_data,
|
|
"bundle-analyzer-data": _bundle_analyzer_data,
|
|
"routing-analyzer-data": _routing_analyzer_data,
|
|
"data-test-data": _data_test_data,
|
|
"run-spec-tests": _run_spec_tests,
|
|
"run-modular-tests": _run_modular_tests,
|
|
"streaming-demo-data": _streaming_demo_data,
|
|
"affinity-demo-data": _affinity_demo_data,
|
|
"optimistic-demo-data": _optimistic_demo_data,
|
|
"action:add-demo-item": _add_demo_item,
|
|
"offline-demo-data": _offline_demo_data,
|
|
})
|
|
|
|
|
|
def _primitives_data() -> dict:
|
|
"""Return the PRIMITIVES dict for the primitives docs page."""
|
|
from content.pages import PRIMITIVES
|
|
return PRIMITIVES
|
|
|
|
|
|
def _special_forms_data() -> dict:
|
|
"""Parse special-forms.sx and return categorized form data.
|
|
|
|
Returns a dict of category → list of form dicts, each with:
|
|
name, syntax, doc, tail_position, example
|
|
"""
|
|
import os
|
|
from shared.sx.parser import parse_all, serialize
|
|
from shared.sx.types import Symbol, Keyword
|
|
|
|
ref_dir = os.path.join(os.path.dirname(__file__), "..", "..", "shared", "sx", "ref")
|
|
if not os.path.isdir(ref_dir):
|
|
ref_dir = "/app/shared/sx/ref"
|
|
spec_path = os.path.join(ref_dir, "special-forms.sx")
|
|
with open(spec_path) as f:
|
|
exprs = parse_all(f.read())
|
|
|
|
# Categories inferred from comment sections in the file.
|
|
# We assign forms to categories based on their order in the spec.
|
|
categories: dict[str, list[dict]] = {}
|
|
current_category = "Other"
|
|
|
|
# Map form names to categories
|
|
category_map = {
|
|
"if": "Control Flow", "when": "Control Flow", "cond": "Control Flow",
|
|
"case": "Control Flow", "and": "Control Flow", "or": "Control Flow",
|
|
"let": "Binding", "let*": "Binding", "letrec": "Binding",
|
|
"define": "Binding", "set!": "Binding",
|
|
"lambda": "Functions & Components", "fn": "Functions & Components",
|
|
"defcomp": "Functions & Components", "defmacro": "Functions & Components",
|
|
"begin": "Sequencing & Threading", "do": "Sequencing & Threading",
|
|
"->": "Sequencing & Threading",
|
|
"quote": "Quoting", "quasiquote": "Quoting",
|
|
"reset": "Continuations", "shift": "Continuations",
|
|
"dynamic-wind": "Guards",
|
|
"map": "Higher-Order Forms", "map-indexed": "Higher-Order Forms",
|
|
"filter": "Higher-Order Forms", "reduce": "Higher-Order Forms",
|
|
"some": "Higher-Order Forms", "every?": "Higher-Order Forms",
|
|
"for-each": "Higher-Order Forms",
|
|
"defstyle": "Domain Definitions",
|
|
"defhandler": "Domain Definitions", "defpage": "Domain Definitions",
|
|
"defquery": "Domain Definitions", "defaction": "Domain Definitions",
|
|
}
|
|
|
|
for expr in exprs:
|
|
if not isinstance(expr, list) or len(expr) < 2:
|
|
continue
|
|
head = expr[0]
|
|
if not isinstance(head, Symbol) or head.name != "define-special-form":
|
|
continue
|
|
|
|
name = expr[1]
|
|
# Extract keyword args
|
|
kwargs: dict[str, str] = {}
|
|
i = 2
|
|
while i < len(expr) - 1:
|
|
if isinstance(expr[i], Keyword):
|
|
key = expr[i].name
|
|
val = expr[i + 1]
|
|
if isinstance(val, list):
|
|
# For :syntax, avoid quote sugar (quasiquote → `x)
|
|
items = [serialize(item) for item in val]
|
|
kwargs[key] = "(" + " ".join(items) + ")"
|
|
else:
|
|
kwargs[key] = str(val)
|
|
i += 2
|
|
else:
|
|
i += 1
|
|
|
|
category = category_map.get(name, "Other")
|
|
if category not in categories:
|
|
categories[category] = []
|
|
categories[category].append({
|
|
"name": name,
|
|
"syntax": kwargs.get("syntax", ""),
|
|
"doc": kwargs.get("doc", ""),
|
|
"tail-position": kwargs.get("tail-position", ""),
|
|
"example": kwargs.get("example", ""),
|
|
})
|
|
|
|
return categories
|
|
|
|
|
|
def _reference_data(slug: str) -> dict:
|
|
"""Return reference table data for a given slug.
|
|
|
|
Returns a dict whose keys become SX env bindings:
|
|
- attributes: req-attrs, beh-attrs, uniq-attrs
|
|
- headers: req-headers, resp-headers
|
|
- events: events-list
|
|
- js-api: js-api-list
|
|
"""
|
|
from content.pages import (
|
|
REQUEST_ATTRS, BEHAVIOR_ATTRS, SX_UNIQUE_ATTRS,
|
|
REQUEST_HEADERS, RESPONSE_HEADERS,
|
|
EVENTS, JS_API, ATTR_DETAILS, HEADER_DETAILS,
|
|
)
|
|
|
|
if slug == "attributes":
|
|
return {
|
|
"req-attrs": [
|
|
{"name": a, "desc": d, "exists": e,
|
|
"href": f"/reference/attributes/{a}" if e and a in ATTR_DETAILS else None}
|
|
for a, d, e in REQUEST_ATTRS
|
|
],
|
|
"beh-attrs": [
|
|
{"name": a, "desc": d, "exists": e,
|
|
"href": f"/reference/attributes/{a}" if e and a in ATTR_DETAILS else None}
|
|
for a, d, e in BEHAVIOR_ATTRS
|
|
],
|
|
"uniq-attrs": [
|
|
{"name": a, "desc": d, "exists": e,
|
|
"href": f"/reference/attributes/{a}" if e and a in ATTR_DETAILS else None}
|
|
for a, d, e in SX_UNIQUE_ATTRS
|
|
],
|
|
}
|
|
elif slug == "headers":
|
|
return {
|
|
"req-headers": [
|
|
{"name": n, "value": v, "desc": d,
|
|
"href": f"/reference/headers/{n}" if n in HEADER_DETAILS else None}
|
|
for n, v, d in REQUEST_HEADERS
|
|
],
|
|
"resp-headers": [
|
|
{"name": n, "value": v, "desc": d,
|
|
"href": f"/reference/headers/{n}" if n in HEADER_DETAILS else None}
|
|
for n, v, d in RESPONSE_HEADERS
|
|
],
|
|
}
|
|
elif slug == "events":
|
|
from content.pages import EVENT_DETAILS
|
|
return {
|
|
"events-list": [
|
|
{"name": n, "desc": d,
|
|
"href": f"/reference/events/{n}" if n in EVENT_DETAILS else None}
|
|
for n, d in EVENTS
|
|
],
|
|
}
|
|
elif slug == "js-api":
|
|
return {
|
|
"js-api-list": [
|
|
{"name": n, "desc": d}
|
|
for n, d in JS_API
|
|
],
|
|
}
|
|
# Default — return attrs data for fallback
|
|
return {
|
|
"req-attrs": [
|
|
{"name": a, "desc": d, "exists": e,
|
|
"href": f"/reference/attributes/{a}" if e and a in ATTR_DETAILS else None}
|
|
for a, d, e in REQUEST_ATTRS
|
|
],
|
|
"beh-attrs": [
|
|
{"name": a, "desc": d, "exists": e,
|
|
"href": f"/reference/attributes/{a}" if e and a in ATTR_DETAILS else None}
|
|
for a, d, e in BEHAVIOR_ATTRS
|
|
],
|
|
"uniq-attrs": [
|
|
{"name": a, "desc": d, "exists": e,
|
|
"href": f"/reference/attributes/{a}" if e and a in ATTR_DETAILS else None}
|
|
for a, d, e in SX_UNIQUE_ATTRS
|
|
],
|
|
}
|
|
|
|
|
|
def _read_spec_file(filename: str) -> str:
|
|
"""Read a spec .sx file from the ref directory. Pure I/O — metadata lives in .sx."""
|
|
import os
|
|
ref_dir = os.path.join(os.path.dirname(__file__), "..", "..", "shared", "sx", "ref")
|
|
if not os.path.isdir(ref_dir):
|
|
ref_dir = "/app/shared/sx/ref"
|
|
filepath = os.path.join(ref_dir, filename)
|
|
try:
|
|
with open(filepath, encoding="utf-8") as f:
|
|
return f.read()
|
|
except FileNotFoundError:
|
|
return ";; spec file not found"
|
|
|
|
|
|
def _bootstrapper_data(target: str) -> dict:
|
|
"""Return bootstrapper source and generated output for a target.
|
|
|
|
Returns a dict whose keys become SX env bindings:
|
|
- bootstrapper-source: the Python bootstrapper source code
|
|
- bootstrapped-output: the generated JavaScript
|
|
- bootstrapper-not-found: truthy if target unknown
|
|
"""
|
|
import os
|
|
|
|
if target not in ("javascript", "python"):
|
|
return {"bootstrapper-not-found": True}
|
|
|
|
ref_dir = os.path.join(os.path.dirname(__file__), "..", "..", "shared", "sx", "ref")
|
|
if not os.path.isdir(ref_dir):
|
|
ref_dir = "/app/shared/sx/ref"
|
|
|
|
if target == "javascript":
|
|
# Read bootstrapper source
|
|
bs_path = os.path.join(ref_dir, "bootstrap_js.py")
|
|
try:
|
|
with open(bs_path, encoding="utf-8") as f:
|
|
bootstrapper_source = f.read()
|
|
except FileNotFoundError:
|
|
bootstrapper_source = "# bootstrapper source not found"
|
|
|
|
# Run the bootstrap to generate JS
|
|
from shared.sx.ref.bootstrap_js import compile_ref_to_js
|
|
try:
|
|
bootstrapped_output = compile_ref_to_js(
|
|
adapters=["dom", "engine", "orchestration", "boot", "cssx"]
|
|
)
|
|
except Exception as e:
|
|
bootstrapped_output = f"// bootstrap error: {e}"
|
|
|
|
elif target == "python":
|
|
bs_path = os.path.join(ref_dir, "bootstrap_py.py")
|
|
try:
|
|
with open(bs_path, encoding="utf-8") as f:
|
|
bootstrapper_source = f.read()
|
|
except FileNotFoundError:
|
|
bootstrapper_source = "# bootstrapper source not found"
|
|
|
|
from shared.sx.ref.bootstrap_py import compile_ref_to_py
|
|
try:
|
|
bootstrapped_output = compile_ref_to_py()
|
|
except Exception as e:
|
|
bootstrapped_output = f"# bootstrap error: {e}"
|
|
|
|
return {
|
|
"bootstrapper-not-found": None,
|
|
"bootstrapper-source": bootstrapper_source,
|
|
"bootstrapped-output": bootstrapped_output,
|
|
}
|
|
|
|
|
|
def _bundle_analyzer_data() -> dict:
|
|
"""Compute per-page component bundle analysis for the sx-docs app."""
|
|
from shared.sx.jinja_bridge import get_component_env
|
|
from shared.sx.pages import get_all_pages
|
|
from shared.sx.deps import components_needed, scan_components_from_sx
|
|
from shared.sx.parser import serialize
|
|
from shared.sx.types import Component, Macro
|
|
|
|
env = get_component_env()
|
|
total_components = sum(1 for v in env.values() if isinstance(v, Component))
|
|
total_macros = sum(1 for v in env.values() if isinstance(v, Macro))
|
|
pure_count = sum(1 for v in env.values() if isinstance(v, Component) and v.is_pure)
|
|
io_count = total_components - pure_count
|
|
|
|
pages_data = []
|
|
for name, page_def in sorted(get_all_pages("sx").items()):
|
|
content_sx = serialize(page_def.content_expr)
|
|
direct = scan_components_from_sx(content_sx)
|
|
needed = components_needed(content_sx, env)
|
|
n = len(needed)
|
|
pct = round(n / total_components * 100) if total_components else 0
|
|
savings = 100 - pct
|
|
|
|
# IO classification + component details for this page
|
|
pure_in_page = 0
|
|
io_in_page = 0
|
|
page_io_refs: set[str] = set()
|
|
comp_details = []
|
|
for comp_name in sorted(needed):
|
|
val = env.get(comp_name)
|
|
if isinstance(val, Component):
|
|
is_pure = val.is_pure
|
|
if is_pure:
|
|
pure_in_page += 1
|
|
else:
|
|
io_in_page += 1
|
|
page_io_refs.update(val.io_refs)
|
|
# Reconstruct defcomp source
|
|
param_strs = ["&key"] + list(val.params)
|
|
if val.has_children:
|
|
param_strs.extend(["&rest", "children"])
|
|
params_sx = "(" + " ".join(param_strs) + ")"
|
|
body_sx = serialize(val.body, pretty=True)
|
|
source = f"(defcomp ~{val.name} {params_sx}\n {body_sx})"
|
|
comp_details.append({
|
|
"name": comp_name,
|
|
"is-pure": is_pure,
|
|
"affinity": val.affinity,
|
|
"render-target": val.render_target,
|
|
"io-refs": sorted(val.io_refs),
|
|
"deps": sorted(val.deps),
|
|
"source": source,
|
|
})
|
|
|
|
pages_data.append({
|
|
"name": name,
|
|
"path": page_def.path,
|
|
"direct": len(direct),
|
|
"needed": n,
|
|
"pct": pct,
|
|
"savings": savings,
|
|
"io-refs": len(page_io_refs),
|
|
"pure-in-page": pure_in_page,
|
|
"io-in-page": io_in_page,
|
|
"components": comp_details,
|
|
})
|
|
|
|
pages_data.sort(key=lambda p: p["needed"], reverse=True)
|
|
|
|
return {
|
|
"pages": pages_data,
|
|
"total-components": total_components,
|
|
"total-macros": total_macros,
|
|
"pure-count": pure_count,
|
|
"io-count": io_count,
|
|
}
|
|
|
|
|
|
def _routing_analyzer_data() -> dict:
|
|
"""Compute per-page routing classification for the sx-docs app."""
|
|
from shared.sx.pages import get_all_pages
|
|
from shared.sx.parser import serialize as sx_serialize
|
|
from shared.sx.helpers import _sx_literal
|
|
|
|
pages_data = []
|
|
full_content: list[tuple[str, str, bool]] = [] # (name, full_content, has_data)
|
|
client_count = 0
|
|
server_count = 0
|
|
|
|
for name, page_def in sorted(get_all_pages("sx").items()):
|
|
has_data = page_def.data_expr is not None
|
|
content_src = ""
|
|
if page_def.content_expr is not None:
|
|
try:
|
|
content_src = sx_serialize(page_def.content_expr)
|
|
except Exception:
|
|
pass
|
|
|
|
full_content.append((name, content_src, has_data))
|
|
|
|
# Determine routing mode and reason
|
|
if has_data:
|
|
mode = "server"
|
|
reason = "Has :data expression — needs server IO"
|
|
server_count += 1
|
|
elif not content_src:
|
|
mode = "server"
|
|
reason = "No content expression"
|
|
server_count += 1
|
|
else:
|
|
mode = "client"
|
|
reason = ""
|
|
client_count += 1
|
|
|
|
pages_data.append({
|
|
"name": name,
|
|
"path": page_def.path,
|
|
"mode": mode,
|
|
"has-data": has_data,
|
|
"content-expr": content_src[:80] + ("..." if len(content_src) > 80 else ""),
|
|
"reason": reason,
|
|
})
|
|
|
|
# Sort: client pages first, then server
|
|
pages_data.sort(key=lambda p: (0 if p["mode"] == "client" else 1, p["name"]))
|
|
|
|
# Build a sample of the SX page registry format (use full content, first 3)
|
|
total = client_count + server_count
|
|
sample_entries = []
|
|
sorted_full = sorted(full_content, key=lambda x: x[0])
|
|
for name, csrc, hd in sorted_full[:3]:
|
|
page_def = get_all_pages("sx").get(name)
|
|
if not page_def:
|
|
continue
|
|
entry = (
|
|
"{:name " + _sx_literal(name)
|
|
+ "\n :path " + _sx_literal(page_def.path)
|
|
+ "\n :auth " + _sx_literal("public")
|
|
+ " :has-data " + ("true" if hd else "false")
|
|
+ "\n :content " + _sx_literal(csrc)
|
|
+ "\n :closure {}}"
|
|
)
|
|
sample_entries.append(entry)
|
|
registry_sample = "\n\n".join(sample_entries)
|
|
|
|
return {
|
|
"pages": pages_data,
|
|
"total-pages": total,
|
|
"client-count": client_count,
|
|
"server-count": server_count,
|
|
"registry-sample": registry_sample,
|
|
}
|
|
|
|
|
|
def _attr_detail_data(slug: str) -> dict:
|
|
"""Return attribute detail data for a specific attribute slug.
|
|
|
|
Returns a dict whose keys become SX env bindings:
|
|
- attr-title, attr-description, attr-example, attr-handler
|
|
- attr-demo (component call or None)
|
|
- attr-wire-id (wire placeholder id or None)
|
|
- attr-not-found (truthy if not found)
|
|
"""
|
|
from content.pages import ATTR_DETAILS
|
|
from shared.sx.helpers import sx_call
|
|
|
|
detail = ATTR_DETAILS.get(slug)
|
|
if not detail:
|
|
return {"attr-not-found": True}
|
|
|
|
demo_name = detail.get("demo")
|
|
wire_id = None
|
|
if "handler" in detail:
|
|
wire_id = f"ref-wire-{slug.replace(':', '-').replace('*', 'star')}"
|
|
|
|
return {
|
|
"attr-not-found": None,
|
|
"attr-title": slug,
|
|
"attr-description": detail["description"],
|
|
"attr-example": detail["example"],
|
|
"attr-handler": detail.get("handler"),
|
|
"attr-demo": sx_call(demo_name) if demo_name else None,
|
|
"attr-wire-id": wire_id,
|
|
}
|
|
|
|
|
|
def _header_detail_data(slug: str) -> dict:
|
|
"""Return header detail data for a specific header slug."""
|
|
from content.pages import HEADER_DETAILS
|
|
from shared.sx.helpers import sx_call
|
|
|
|
detail = HEADER_DETAILS.get(slug)
|
|
if not detail:
|
|
return {"header-not-found": True}
|
|
|
|
demo_name = detail.get("demo")
|
|
return {
|
|
"header-not-found": None,
|
|
"header-title": slug,
|
|
"header-direction": detail["direction"],
|
|
"header-description": detail["description"],
|
|
"header-example": detail.get("example"),
|
|
"header-demo": sx_call(demo_name) if demo_name else None,
|
|
}
|
|
|
|
|
|
def _event_detail_data(slug: str) -> dict:
|
|
"""Return event detail data for a specific event slug."""
|
|
from content.pages import EVENT_DETAILS
|
|
from shared.sx.helpers import sx_call
|
|
|
|
detail = EVENT_DETAILS.get(slug)
|
|
if not detail:
|
|
return {"event-not-found": True}
|
|
|
|
demo_name = detail.get("demo")
|
|
return {
|
|
"event-not-found": None,
|
|
"event-title": slug,
|
|
"event-description": detail["description"],
|
|
"event-example": detail.get("example"),
|
|
"event-demo": sx_call(demo_name) if demo_name else None,
|
|
}
|
|
|
|
|
|
def _run_spec_tests() -> dict:
|
|
"""Run test.sx against the Python SX evaluator and return results."""
|
|
import os
|
|
import time
|
|
from shared.sx.parser import parse_all
|
|
from shared.sx.evaluator import _eval, _trampoline
|
|
|
|
ref_dir = os.path.join(os.path.dirname(__file__), "..", "..", "shared", "sx", "ref")
|
|
if not os.path.isdir(ref_dir):
|
|
ref_dir = "/app/shared/sx/ref"
|
|
test_path = os.path.join(ref_dir, "test.sx")
|
|
with open(test_path, encoding="utf-8") as f:
|
|
src = f.read()
|
|
|
|
suite_stack: list[str] = []
|
|
passed = 0
|
|
failed = 0
|
|
test_num = 0
|
|
lines: list[str] = []
|
|
|
|
def try_call(thunk):
|
|
try:
|
|
_trampoline(_eval([thunk], {}))
|
|
return {"ok": True}
|
|
except Exception as e:
|
|
return {"ok": False, "error": str(e)}
|
|
|
|
def report_pass(name):
|
|
nonlocal passed, test_num
|
|
test_num += 1
|
|
passed += 1
|
|
lines.append("ok " + str(test_num) + " - " + " > ".join(suite_stack + [name]))
|
|
|
|
def report_fail(name, error):
|
|
nonlocal failed, test_num
|
|
test_num += 1
|
|
failed += 1
|
|
full = " > ".join(suite_stack + [name])
|
|
lines.append("not ok " + str(test_num) + " - " + full)
|
|
lines.append(" # " + str(error))
|
|
|
|
def push_suite(name):
|
|
suite_stack.append(name)
|
|
|
|
def pop_suite():
|
|
suite_stack.pop()
|
|
|
|
env = {
|
|
"try-call": try_call,
|
|
"report-pass": report_pass,
|
|
"report-fail": report_fail,
|
|
"push-suite": push_suite,
|
|
"pop-suite": pop_suite,
|
|
}
|
|
|
|
t0 = time.monotonic()
|
|
exprs = parse_all(src)
|
|
for expr in exprs:
|
|
_trampoline(_eval(expr, env))
|
|
elapsed = round((time.monotonic() - t0) * 1000)
|
|
|
|
return {
|
|
"passed": passed,
|
|
"failed": failed,
|
|
"total": passed + failed,
|
|
"elapsed-ms": elapsed,
|
|
"output": "\n".join(lines),
|
|
}
|
|
|
|
|
|
def _run_modular_tests(spec_name: str) -> dict:
|
|
"""Run modular test specs against the Python SX evaluator.
|
|
|
|
spec_name: "eval", "parser", "router", "render", or "all".
|
|
Returns dict with server-results key containing results per spec.
|
|
"""
|
|
import os
|
|
import time
|
|
from shared.sx.parser import parse_all
|
|
from shared.sx.evaluator import _eval, _trampoline
|
|
from shared.sx.types import Symbol, Keyword, Lambda, NIL
|
|
|
|
ref_dir = os.path.join(os.path.dirname(__file__), "..", "..", "shared", "sx", "ref")
|
|
if not os.path.isdir(ref_dir):
|
|
ref_dir = "/app/shared/sx/ref"
|
|
|
|
suite_stack: list[str] = []
|
|
passed = 0
|
|
failed = 0
|
|
test_num = 0
|
|
lines: list[str] = []
|
|
|
|
def try_call(thunk):
|
|
try:
|
|
_trampoline(_eval([thunk], env))
|
|
return {"ok": True}
|
|
except Exception as e:
|
|
return {"ok": False, "error": str(e)}
|
|
|
|
def report_pass(name):
|
|
nonlocal passed, test_num
|
|
test_num += 1
|
|
passed += 1
|
|
lines.append("ok " + str(test_num) + " - " + " > ".join(suite_stack + [name]))
|
|
|
|
def report_fail(name, error):
|
|
nonlocal failed, test_num
|
|
test_num += 1
|
|
failed += 1
|
|
full = " > ".join(suite_stack + [name])
|
|
lines.append("not ok " + str(test_num) + " - " + full)
|
|
lines.append(" # " + str(error))
|
|
|
|
def push_suite(name):
|
|
suite_stack.append(name)
|
|
|
|
def pop_suite():
|
|
suite_stack.pop()
|
|
|
|
def sx_parse(source):
|
|
return parse_all(source)
|
|
|
|
def sx_serialize(val):
|
|
if val is None or val is NIL:
|
|
return "nil"
|
|
if isinstance(val, bool):
|
|
return "true" if val else "false"
|
|
if isinstance(val, (int, float)):
|
|
return str(val)
|
|
if isinstance(val, str):
|
|
escaped = val.replace("\\", "\\\\").replace('"', '\\"')
|
|
return f'"{escaped}"'
|
|
if isinstance(val, Symbol):
|
|
return val.name
|
|
if isinstance(val, Keyword):
|
|
return f":{val.name}"
|
|
if isinstance(val, list):
|
|
inner = " ".join(sx_serialize(x) for x in val)
|
|
return f"({inner})"
|
|
if isinstance(val, dict):
|
|
parts = []
|
|
for k, v in val.items():
|
|
parts.append(f":{k}")
|
|
parts.append(sx_serialize(v))
|
|
return "{" + " ".join(parts) + "}"
|
|
return str(val)
|
|
|
|
def render_html(sx_source):
|
|
try:
|
|
from shared.sx.ref.sx_ref import render_to_html as _render_to_html
|
|
except ImportError:
|
|
return "<!-- render-to-html not available -->"
|
|
exprs = parse_all(sx_source)
|
|
render_env = dict(env)
|
|
result = ""
|
|
for expr in exprs:
|
|
result += _render_to_html(expr, render_env)
|
|
return result
|
|
|
|
def _call_sx(fn, args, caller_env):
|
|
if isinstance(fn, Lambda):
|
|
from shared.sx.evaluator import _call_lambda
|
|
return _trampoline(_call_lambda(fn, list(args), caller_env))
|
|
return fn(*args)
|
|
|
|
def _for_each_indexed(fn, coll):
|
|
if isinstance(fn, Lambda):
|
|
closure = fn.closure
|
|
for i, item in enumerate(coll or []):
|
|
for p, v in zip(fn.params, [i, item]):
|
|
closure[p] = v
|
|
_trampoline(_eval(fn.body, closure))
|
|
else:
|
|
for i, item in enumerate(coll or []):
|
|
fn(i, item)
|
|
return NIL
|
|
|
|
env = {
|
|
"try-call": try_call,
|
|
"report-pass": report_pass,
|
|
"report-fail": report_fail,
|
|
"push-suite": push_suite,
|
|
"pop-suite": pop_suite,
|
|
"sx-parse": sx_parse,
|
|
"sx-serialize": sx_serialize,
|
|
"make-symbol": lambda name: Symbol(name),
|
|
"make-keyword": lambda name: Keyword(name),
|
|
"symbol-name": lambda sym: sym.name if isinstance(sym, Symbol) else str(sym),
|
|
"keyword-name": lambda kw: kw.name if isinstance(kw, Keyword) else str(kw),
|
|
"render-html": render_html,
|
|
"for-each-indexed": _for_each_indexed,
|
|
"dict-set!": lambda d, k, v: (d.__setitem__(k, v), NIL)[-1] if isinstance(d, dict) else NIL,
|
|
"dict-has?": lambda d, k: isinstance(d, dict) and k in d,
|
|
"dict-get": lambda d, k: d.get(k, NIL) if isinstance(d, dict) else NIL,
|
|
"append!": lambda lst, item: (lst.append(item), NIL)[-1] if isinstance(lst, list) else NIL,
|
|
"inc": lambda n: n + 1,
|
|
}
|
|
|
|
def eval_file(filename):
|
|
filepath = os.path.join(ref_dir, filename)
|
|
if not os.path.exists(filepath):
|
|
return
|
|
with open(filepath) as f:
|
|
src = f.read()
|
|
exprs = parse_all(src)
|
|
for expr in exprs:
|
|
_trampoline(_eval(expr, env))
|
|
|
|
SPECS = {
|
|
"eval": {"file": "test-eval.sx", "needs": []},
|
|
"parser": {"file": "test-parser.sx", "needs": ["sx-parse"]},
|
|
"router": {"file": "test-router.sx", "needs": []},
|
|
"render": {"file": "test-render.sx", "needs": ["render-html"]},
|
|
"deps": {"file": "test-deps.sx", "needs": []},
|
|
"engine": {"file": "test-engine.sx", "needs": []},
|
|
}
|
|
|
|
specs_to_run = list(SPECS.keys()) if spec_name == "all" else [spec_name]
|
|
|
|
t0 = time.monotonic()
|
|
|
|
# Load framework
|
|
eval_file("test-framework.sx")
|
|
|
|
for sn in specs_to_run:
|
|
spec = SPECS.get(sn)
|
|
if not spec:
|
|
continue
|
|
|
|
# Load module functions from bootstrap
|
|
if sn == "router":
|
|
try:
|
|
from shared.sx.ref.sx_ref import (
|
|
split_path_segments,
|
|
parse_route_pattern,
|
|
match_route_segments,
|
|
match_route,
|
|
find_matching_route,
|
|
make_route_segment,
|
|
)
|
|
env["split-path-segments"] = split_path_segments
|
|
env["parse-route-pattern"] = parse_route_pattern
|
|
env["match-route-segments"] = match_route_segments
|
|
env["match-route"] = match_route
|
|
env["find-matching-route"] = find_matching_route
|
|
env["make-route-segment"] = make_route_segment
|
|
except ImportError:
|
|
eval_file("router.sx")
|
|
elif sn == "deps":
|
|
try:
|
|
from shared.sx.ref.sx_ref import (
|
|
scan_refs, scan_components_from_source,
|
|
transitive_deps, compute_all_deps,
|
|
components_needed, page_component_bundle,
|
|
page_css_classes, scan_io_refs,
|
|
transitive_io_refs, compute_all_io_refs,
|
|
component_pure_p,
|
|
)
|
|
env["scan-refs"] = scan_refs
|
|
env["scan-components-from-source"] = scan_components_from_source
|
|
env["transitive-deps"] = transitive_deps
|
|
env["compute-all-deps"] = compute_all_deps
|
|
env["components-needed"] = components_needed
|
|
env["page-component-bundle"] = page_component_bundle
|
|
env["page-css-classes"] = page_css_classes
|
|
env["scan-io-refs"] = scan_io_refs
|
|
env["transitive-io-refs"] = transitive_io_refs
|
|
env["compute-all-io-refs"] = compute_all_io_refs
|
|
env["component-pure?"] = component_pure_p
|
|
env["test-env"] = lambda: env
|
|
except ImportError:
|
|
eval_file("deps.sx")
|
|
env["test-env"] = lambda: env
|
|
elif sn == "engine":
|
|
try:
|
|
from shared.sx.ref.sx_ref import (
|
|
parse_time, parse_trigger_spec, default_trigger,
|
|
parse_swap_spec, parse_retry_spec, filter_params,
|
|
)
|
|
env["parse-time"] = parse_time
|
|
env["parse-trigger-spec"] = parse_trigger_spec
|
|
env["default-trigger"] = default_trigger
|
|
env["parse-swap-spec"] = parse_swap_spec
|
|
env["parse-retry-spec"] = parse_retry_spec
|
|
env["next-retry-ms"] = lambda cur, cap: min(cur * 2, cap)
|
|
env["filter-params"] = filter_params
|
|
except ImportError:
|
|
eval_file("engine.sx")
|
|
|
|
eval_file(spec["file"])
|
|
|
|
elapsed = round((time.monotonic() - t0) * 1000)
|
|
|
|
result = {
|
|
"server-results": {
|
|
"passed": passed,
|
|
"failed": failed,
|
|
"total": passed + failed,
|
|
"elapsed-ms": elapsed,
|
|
"output": "\n".join(lines),
|
|
"spec": spec_name,
|
|
},
|
|
"framework-source": _read_spec_file("test-framework.sx"),
|
|
}
|
|
|
|
# Include spec sources so :content can reference them from data
|
|
if spec_name == "all":
|
|
result["eval-source"] = _read_spec_file("test-eval.sx")
|
|
result["parser-source"] = _read_spec_file("test-parser.sx")
|
|
result["router-source"] = _read_spec_file("test-router.sx")
|
|
result["render-source"] = _read_spec_file("test-render.sx")
|
|
result["deps-source"] = _read_spec_file("test-deps.sx")
|
|
result["engine-source"] = _read_spec_file("test-engine.sx")
|
|
else:
|
|
spec = SPECS.get(spec_name)
|
|
if spec:
|
|
result["spec-source"] = _read_spec_file(spec["file"])
|
|
|
|
return result
|
|
|
|
|
|
def _data_test_data() -> dict:
|
|
"""Return test data for the client-side data rendering test page.
|
|
|
|
This exercises the Phase 4 data endpoint: server evaluates this
|
|
helper, serializes the result as SX, the client fetches and parses
|
|
it, then renders the page content with these bindings.
|
|
"""
|
|
from datetime import datetime, timezone
|
|
|
|
return {
|
|
"server-time": datetime.now(timezone.utc).isoformat(timespec="seconds"),
|
|
"items": [
|
|
{"label": "Eval", "detail": "Server evaluates :data expression"},
|
|
{"label": "Serialize", "detail": "Result serialized as SX wire format"},
|
|
{"label": "Fetch", "detail": "Client calls resolve-page-data"},
|
|
{"label": "Parse", "detail": "Client parses SX response to dict"},
|
|
{"label": "Render", "detail": "Client merges data into env, renders content"},
|
|
],
|
|
"phase": "Phase 4 — Client Async & IO Bridge",
|
|
"transport": "SX wire format (text/sx)",
|
|
}
|
|
|
|
|
|
async def _streaming_demo_data():
|
|
"""Multi-stream demo — yields three chunks at staggered intervals.
|
|
|
|
Each yield is a dict with _stream_id (matching a ~suspense :id in the
|
|
shell) plus bindings for the :content expression. The streaming
|
|
infrastructure detects the async generator and resolves each suspense
|
|
placeholder as each chunk arrives.
|
|
"""
|
|
import asyncio
|
|
from datetime import datetime, timezone
|
|
|
|
await asyncio.sleep(1)
|
|
yield {
|
|
"stream-id": "stream-fast",
|
|
"stream-label": "Fast API",
|
|
"stream-color": "green",
|
|
"stream-message": "Responded in ~1 second",
|
|
"stream-time": datetime.now(timezone.utc).isoformat(timespec="seconds"),
|
|
}
|
|
|
|
await asyncio.sleep(2) # 3s total
|
|
yield {
|
|
"stream-id": "stream-medium",
|
|
"stream-label": "Database Query",
|
|
"stream-color": "blue",
|
|
"stream-message": "Query completed in ~3 seconds",
|
|
"stream-time": datetime.now(timezone.utc).isoformat(timespec="seconds"),
|
|
}
|
|
|
|
await asyncio.sleep(2) # 5s total
|
|
yield {
|
|
"stream-id": "stream-slow",
|
|
"stream-label": "ML Inference",
|
|
"stream-color": "amber",
|
|
"stream-message": "Model inference completed in ~5 seconds",
|
|
"stream-time": datetime.now(timezone.utc).isoformat(timespec="seconds"),
|
|
}
|
|
|
|
|
|
def _affinity_demo_data() -> dict:
|
|
"""Return affinity analysis for the demo components + page render plans."""
|
|
from shared.sx.jinja_bridge import get_component_env
|
|
from shared.sx.types import Component
|
|
from shared.sx.pages import get_all_pages
|
|
|
|
env = get_component_env()
|
|
demo_names = [
|
|
"~aff-demo-auto",
|
|
"~aff-demo-client",
|
|
"~aff-demo-server",
|
|
"~aff-demo-io-auto",
|
|
"~aff-demo-io-client",
|
|
]
|
|
components = []
|
|
for name in demo_names:
|
|
val = env.get(name)
|
|
if isinstance(val, Component):
|
|
components.append({
|
|
"name": name,
|
|
"affinity": val.affinity,
|
|
"render-target": val.render_target,
|
|
"io-refs": sorted(val.io_refs),
|
|
"is-pure": val.is_pure,
|
|
})
|
|
|
|
# Collect render plans from all sx service pages
|
|
page_plans = []
|
|
for page_def in get_all_pages("sx").values():
|
|
plan = page_def.render_plan
|
|
if plan:
|
|
page_plans.append({
|
|
"name": page_def.name,
|
|
"path": page_def.path,
|
|
"server-count": len(plan.get("server", [])),
|
|
"client-count": len(plan.get("client", [])),
|
|
"server": plan.get("server", []),
|
|
"client": plan.get("client", []),
|
|
"io-deps": plan.get("io-deps", []),
|
|
})
|
|
|
|
return {"components": components, "page-plans": page_plans}
|
|
|
|
|
|
def _optimistic_demo_data() -> dict:
|
|
"""Return demo data for the optimistic update test page."""
|
|
from datetime import datetime, timezone
|
|
|
|
return {
|
|
"items": [
|
|
{"id": 1, "label": "First item", "status": "confirmed"},
|
|
{"id": 2, "label": "Second item", "status": "confirmed"},
|
|
{"id": 3, "label": "Third item", "status": "confirmed"},
|
|
],
|
|
"server-time": datetime.now(timezone.utc).isoformat(timespec="seconds"),
|
|
}
|
|
|
|
|
|
def _add_demo_item(**kwargs) -> dict:
|
|
"""Action: add a demo item. Returns confirmation with new item."""
|
|
from datetime import datetime, timezone
|
|
import random
|
|
|
|
label = kwargs.get("label", "Untitled")
|
|
return {
|
|
"id": random.randint(100, 9999),
|
|
"label": label,
|
|
"status": "confirmed",
|
|
"added-at": datetime.now(timezone.utc).isoformat(timespec="seconds"),
|
|
}
|
|
|
|
|
|
def _offline_demo_data() -> dict:
|
|
"""Return demo data for the offline data layer test page."""
|
|
from datetime import datetime, timezone
|
|
|
|
return {
|
|
"notes": [
|
|
{"id": 1, "text": "First note", "created": "2026-03-08T10:00:00Z"},
|
|
{"id": 2, "text": "Second note", "created": "2026-03-08T11:30:00Z"},
|
|
{"id": 3, "text": "Third note", "created": "2026-03-08T14:15:00Z"},
|
|
],
|
|
"server-time": datetime.now(timezone.utc).isoformat(timespec="seconds"),
|
|
}
|