"""Test SxEngine features in sx.js — trigger parsing, param filtering, etc. Runs pure-logic SxEngine functions through Node.js (no DOM required). """ from __future__ import annotations import json import subprocess from pathlib import Path import pytest SX_JS = Path(__file__).resolve().parents[2] / "static" / "scripts" / "sx.js" def _run_engine_js(js_code: str) -> str: """Run a JS snippet that has access to SxEngine internals. We load sx.js with a minimal DOM stub so the IIFE doesn't crash, then expose internal functions via a test harness. """ stub = """ // Minimal DOM stub for SxEngine initialisation global.document = { readyState: "complete", head: { querySelector: function() { return null; } }, body: null, querySelector: function() { return null; }, querySelectorAll: function() { return []; }, getElementById: function() { return null; }, createElement: function(t) { return { tagName: t, attributes: [], childNodes: [], setAttribute: function() {}, appendChild: function() {}, querySelectorAll: function() { return []; }, }; }, createTextNode: function(t) { return { nodeType: 3, nodeValue: t }; }, createDocumentFragment: function() { return { nodeType: 11, childNodes: [], appendChild: function() {} }; }, addEventListener: function() {}, title: "", cookie: "", }; global.window = global; global.window.addEventListener = function() {}; global.window.matchMedia = function() { return { matches: false }; }; global.window.confirm = function() { return true; }; global.window.prompt = function() { return ""; }; global.window.scrollTo = function() {}; global.requestAnimationFrame = function(fn) { fn(); }; global.setTimeout = global.setTimeout || function(fn) { fn(); }; global.setInterval = global.setInterval || function() {}; global.clearTimeout = global.clearTimeout || function() {}; global.console = { log: function() {}, error: function() {}, warn: function() {} }; global.CSS = { escape: function(s) { return s; } }; global.location = { href: "http://localhost/", hostname: "localhost", origin: "http://localhost", assign: function() {}, reload: function() {} }; global.history = { pushState: function() {}, replaceState: function() {} }; global.fetch = function() { return Promise.resolve({ ok: true, headers: new Map(), text: function() { return Promise.resolve(""); } }); }; global.Headers = function(o) { this._h = o || {}; this.get = function(k) { return this._h[k] || null; }; }; global.URL = function(u, b) { var full = u.indexOf("://") >= 0 ? u : b + u; this.origin = "http://localhost"; this.hostname = "localhost"; }; global.CustomEvent = function(n, o) { this.type = n; this.detail = (o || {}).detail; }; global.AbortController = function() { this.signal = {}; this.abort = function() {}; }; global.URLSearchParams = function(init) { this._data = []; if (init) { if (typeof init.forEach === "function") { var self = this; init.forEach(function(v, k) { self._data.push([k, v]); }); } } this.append = function(k, v) { this._data.push([k, v]); }; this.delete = function(k) { this._data = this._data.filter(function(p) { return p[0] !== k; }); }; this.getAll = function(k) { return this._data.filter(function(p) { return p[0] === k; }).map(function(p) { return p[1]; }); }; this.toString = function() { return this._data.map(function(p) { return p[0] + "=" + p[1]; }).join("&"); }; this.forEach = function(fn) { this._data.forEach(function(p) { fn(p[1], p[0]); }); }; }; global.FormData = function() { this._data = []; this.forEach = function(fn) { this._data.forEach(function(p) { fn(p[1], p[0]); }); }; }; global.MutationObserver = function() { this.observe = function() {}; this.disconnect = function() {}; }; global.EventSource = function(url) { this.url = url; this.addEventListener = function() {}; this.close = function() {}; }; global.IntersectionObserver = function() { this.observe = function() {}; }; """ script = f""" {stub} {SX_JS.read_text()} // --- test code --- {js_code} """ result = subprocess.run( ["node", "-e", script], capture_output=True, text=True, timeout=5, ) if result.returncode != 0: pytest.fail(f"Node.js error:\n{result.stderr}") return result.stdout # --------------------------------------------------------------------------- # parseTrigger tests # --------------------------------------------------------------------------- class TestParseTrigger: """Test the parseTrigger function for various trigger specifications.""" def _parse(self, spec: str) -> list[dict]: out = _run_engine_js(f""" // Access parseTrigger via the IIFE's internal scope isn't possible directly, // but we can test it indirectly. Actually, we need to extract it. // Since SxEngine is built as an IIFE, we need to re-expose parseTrigger. // Let's test via a workaround: add a test method. // Actually, parseTrigger is captured in the closure. Let's hook into process. // Better approach: just re-parse the function from sx.js source. // Simplest: duplicate parseTrigger logic for testing (not ideal). // Best: we patch SxEngine to expose it before the IIFE closes. // Actually, the simplest approach: the _parseTime and parseTrigger functions // are inside the SxEngine IIFE. We can test them by examining the behavior // through the process() function, but that needs DOM. // // Instead, let's just eval the same code to test the logic: var _parseTime = function(s) {{ if (!s) return 0; if (s.indexOf("ms") >= 0) return parseInt(s, 10); if (s.indexOf("s") >= 0) return parseFloat(s) * 1000; return parseInt(s, 10); }}; var parseTrigger = function(spec) {{ if (!spec) return null; var triggers = []; var parts = spec.split(","); for (var i = 0; i < parts.length; i++) {{ var p = parts[i].trim(); if (!p) continue; var tokens = p.split(/\\s+/); if (tokens[0] === "every" && tokens.length >= 2) {{ triggers.push({{ event: "every", modifiers: {{ interval: _parseTime(tokens[1]) }} }}); continue; }} var trigger = {{ event: tokens[0], modifiers: {{}} }}; for (var j = 1; j < tokens.length; j++) {{ var tok = tokens[j]; if (tok === "once") trigger.modifiers.once = true; else if (tok === "changed") trigger.modifiers.changed = true; else if (tok.indexOf("delay:") === 0) trigger.modifiers.delay = _parseTime(tok.substring(6)); else if (tok.indexOf("from:") === 0) trigger.modifiers.from = tok.substring(5); }} triggers.push(trigger); }} return triggers; }}; process.stdout.write(JSON.stringify(parseTrigger({json.dumps(spec)}))); """) return json.loads(out) def test_click(self): result = self._parse("click") assert len(result) == 1 assert result[0]["event"] == "click" def test_every_seconds(self): result = self._parse("every 2s") assert len(result) == 1 assert result[0]["event"] == "every" assert result[0]["modifiers"]["interval"] == 2000 def test_every_milliseconds(self): result = self._parse("every 500ms") assert len(result) == 1 assert result[0]["event"] == "every" assert result[0]["modifiers"]["interval"] == 500 def test_delay_modifier(self): result = self._parse("input changed delay:300ms") assert result[0]["event"] == "input" assert result[0]["modifiers"]["changed"] is True assert result[0]["modifiers"]["delay"] == 300 def test_multiple_triggers(self): result = self._parse("click, every 5s") assert len(result) == 2 assert result[0]["event"] == "click" assert result[1]["event"] == "every" assert result[1]["modifiers"]["interval"] == 5000 def test_once_modifier(self): result = self._parse("click once") assert result[0]["modifiers"]["once"] is True def test_from_modifier(self): result = self._parse("keyup from:#search") assert result[0]["event"] == "keyup" assert result[0]["modifiers"]["from"] == "#search" def test_load_trigger(self): result = self._parse("load") assert result[0]["event"] == "load" def test_intersect(self): result = self._parse("intersect once") assert result[0]["event"] == "intersect" assert result[0]["modifiers"]["once"] is True def test_delay_seconds(self): result = self._parse("click delay:1s") assert result[0]["modifiers"]["delay"] == 1000 # --------------------------------------------------------------------------- # sx-params filtering tests # --------------------------------------------------------------------------- class TestParamsFiltering: """Test the sx-params parameter filtering logic.""" def _filter(self, params_spec: str, form_data: dict[str, str]) -> dict[str, str]: fd_entries = json.dumps([[k, v] for k, v in form_data.items()]) out = _run_engine_js(f""" var body = new URLSearchParams(); var entries = {fd_entries}; entries.forEach(function(p) {{ body.append(p[0], p[1]); }}); var paramsSpec = {json.dumps(params_spec)}; if (paramsSpec === "none") {{ body = new URLSearchParams(); }} else if (paramsSpec.indexOf("not ") === 0) {{ var excluded = paramsSpec.substring(4).split(",").map(function(s) {{ return s.trim(); }}); excluded.forEach(function(k) {{ body.delete(k); }}); }} else if (paramsSpec !== "*") {{ var allowed = paramsSpec.split(",").map(function(s) {{ return s.trim(); }}); var filtered = new URLSearchParams(); allowed.forEach(function(k) {{ body.getAll(k).forEach(function(v) {{ filtered.append(k, v); }}); }}); body = filtered; }} var result = {{}}; body.forEach(function(v, k) {{ result[k] = v; }}); process.stdout.write(JSON.stringify(result)); """) return json.loads(out) def test_all(self): result = self._filter("*", {"a": "1", "b": "2"}) assert result == {"a": "1", "b": "2"} def test_none(self): result = self._filter("none", {"a": "1", "b": "2"}) assert result == {} def test_include(self): result = self._filter("name", {"name": "Alice", "secret": "123"}) assert result == {"name": "Alice"} def test_include_multiple(self): result = self._filter("name,email", {"name": "Alice", "email": "a@b.c", "secret": "123"}) assert "name" in result assert "email" in result assert "secret" not in result def test_exclude(self): result = self._filter("not secret", {"name": "Alice", "secret": "123", "email": "a@b.c"}) assert "name" in result assert "email" in result assert "secret" not in result # --------------------------------------------------------------------------- # _dispatchTriggerEvents parsing tests # --------------------------------------------------------------------------- class TestTriggerEventParsing: """Test SX-Trigger header value parsing.""" def _parse_trigger(self, header_val: str) -> list[dict]: out = _run_engine_js(f""" var events = []; // Stub dispatch to capture events function dispatch(el, name, detail) {{ events.push({{ name: name, detail: detail }}); return true; }} function _dispatchTriggerEvents(el, headerVal) {{ if (!headerVal) return; try {{ var parsed = JSON.parse(headerVal); if (typeof parsed === "object" && parsed !== null) {{ for (var evtName in parsed) dispatch(el, evtName, parsed[evtName]); }} else {{ dispatch(el, String(parsed), {{}}); }} }} catch (e) {{ headerVal.split(",").forEach(function(name) {{ var n = name.trim(); if (n) dispatch(el, n, {{}}); }}); }} }} _dispatchTriggerEvents(null, {json.dumps(header_val)}); process.stdout.write(JSON.stringify(events)); """) return json.loads(out) def test_plain_string(self): events = self._parse_trigger("myEvent") assert len(events) == 1 assert events[0]["name"] == "myEvent" def test_comma_separated(self): events = self._parse_trigger("eventA, eventB") assert len(events) == 2 assert events[0]["name"] == "eventA" assert events[1]["name"] == "eventB" def test_json_object(self): events = self._parse_trigger('{"myEvent": {"key": "val"}}') assert len(events) == 1 assert events[0]["name"] == "myEvent" assert events[0]["detail"]["key"] == "val" def test_json_multiple(self): events = self._parse_trigger('{"a": {}, "b": {"x": 1}}') assert len(events) == 2 names = [e["name"] for e in events] assert "a" in names assert "b" in names # --------------------------------------------------------------------------- # _parseTime tests # --------------------------------------------------------------------------- class TestParseTime: """Test the time parsing utility.""" def _parse_time(self, s: str) -> int: out = _run_engine_js(f""" var _parseTime = function(s) {{ if (!s) return 0; if (s.indexOf("ms") >= 0) return parseInt(s, 10); if (s.indexOf("s") >= 0) return parseFloat(s) * 1000; return parseInt(s, 10); }}; process.stdout.write(String(_parseTime({json.dumps(s)}))); """) return int(out) def test_seconds(self): assert self._parse_time("2s") == 2000 def test_milliseconds(self): assert self._parse_time("500ms") == 500 def test_fractional_seconds(self): assert self._parse_time("1.5s") == 1500 def test_plain_number(self): assert self._parse_time("100") == 100 def test_empty(self): assert self._parse_time("") == 0 # --------------------------------------------------------------------------- # View Transition parsing tests # --------------------------------------------------------------------------- class TestSwapParsing: """Test sx-swap value parsing with transition modifier.""" def _parse_swap(self, raw_swap: str) -> dict: out = _run_engine_js(f""" var rawSwap = {json.dumps(raw_swap)}; var swapParts = rawSwap.split(/\\s+/); var swapStyle = swapParts[0]; var useTransition = false; for (var sp = 1; sp < swapParts.length; sp++) {{ if (swapParts[sp] === "transition:true") useTransition = true; else if (swapParts[sp] === "transition:false") useTransition = false; }} process.stdout.write(JSON.stringify({{ style: swapStyle, transition: useTransition }})); """) return json.loads(out) def test_plain_swap(self): result = self._parse_swap("innerHTML") assert result["style"] == "innerHTML" assert result["transition"] is False def test_transition_true(self): result = self._parse_swap("innerHTML transition:true") assert result["style"] == "innerHTML" assert result["transition"] is True def test_transition_false(self): result = self._parse_swap("outerHTML transition:false") assert result["style"] == "outerHTML" assert result["transition"] is False