From a6864178c32a30fbb48856650ddba978eb3bc24a Mon Sep 17 00:00:00 2001 From: giles Date: Sat, 6 Jun 2026 18:34:52 +0000 Subject: [PATCH] =?UTF-8?q?persist:=20projections=20=E2=80=94=20fold=20str?= =?UTF-8?q?eam=20into=20read=20model,=20incremental=20resume=20+=209=20tes?= =?UTF-8?q?ts?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit project.sx: projection state {:value :seq}; persist/project folds the whole stream, persist/project-resume folds only the tail so read models update incrementally. Pure step (value event)->value. 37/37. Co-Authored-By: Claude Opus 4.8 (1M context) --- lib/persist/conformance.sh | 3 +- lib/persist/project.sx | 30 +++++++++ lib/persist/scoreboard.json | 7 ++- lib/persist/scoreboard.md | 3 +- lib/persist/tests/project.sx | 115 +++++++++++++++++++++++++++++++++++ plans/persist-on-sx.md | 6 +- 6 files changed, 158 insertions(+), 6 deletions(-) create mode 100644 lib/persist/project.sx create mode 100644 lib/persist/tests/project.sx diff --git a/lib/persist/conformance.sh b/lib/persist/conformance.sh index 5bcdb38f..d6dbe25c 100755 --- a/lib/persist/conformance.sh +++ b/lib/persist/conformance.sh @@ -13,7 +13,7 @@ if [ ! -x "$SX_SERVER" ]; then exit 1 fi -SUITES=(event log kv) +SUITES=(event log kv project) OUT_JSON="lib/persist/scoreboard.json" OUT_MD="lib/persist/scoreboard.md" @@ -31,6 +31,7 @@ run_suite() { (load "lib/persist/backend.sx") (load "lib/persist/log.sx") (load "lib/persist/kv.sx") +(load "lib/persist/project.sx") (load "lib/persist/api.sx") (epoch 2) (eval "(define persist-test-pass 0)") diff --git a/lib/persist/project.sx b/lib/persist/project.sx new file mode 100644 index 00000000..fbe93ad9 --- /dev/null +++ b/lib/persist/project.sx @@ -0,0 +1,30 @@ +; persist/project — a projection folds a stream's events into a read model. +; A projection state is {:value v :seq s} where s is the last seq folded in, +; so a projection can resume incrementally from where it left off (replay only +; the tail). step : (value event) -> value. Determinism: step must be pure — +; time lives on the event (event-at), never a clock here. +; Requires: lib/persist/event.sx, lib/persist/log.sx. + +; fold the tail (events with seq > prior's seq) onto a prior projection state +(define + persist/project-resume + (fn + (b stream step prior) + (let + ((tail (persist/read-from b stream (+ 1 (get prior :seq))))) + (reduce (fn (acc e) {:value (step (get acc :value) e) :seq (persist/event-seq e)}) prior tail)))) + +; project the whole stream from seed +(define + persist/project + (fn (b stream step seed) (persist/project-resume b stream step {:value seed :seq 0}))) + +(define persist/project-value (fn (p) (get p :value))) +(define persist/project-seq (fn (p) (get p :seq))) + +; convenience: project and return just the value +(define + persist/project-fold + (fn + (b stream step seed) + (persist/project-value (persist/project b stream step seed)))) diff --git a/lib/persist/scoreboard.json b/lib/persist/scoreboard.json index 0dfa26f3..4324f8fe 100644 --- a/lib/persist/scoreboard.json +++ b/lib/persist/scoreboard.json @@ -2,9 +2,10 @@ "suites": { "event": {"pass": 6, "fail": 0}, "log": {"pass": 9, "fail": 0}, - "kv": {"pass": 13, "fail": 0} + "kv": {"pass": 13, "fail": 0}, + "project": {"pass": 9, "fail": 0} }, - "total_pass": 28, + "total_pass": 37, "total_fail": 0, - "total": 28 + "total": 37 } diff --git a/lib/persist/scoreboard.md b/lib/persist/scoreboard.md index bd60db39..15a5b4e0 100644 --- a/lib/persist/scoreboard.md +++ b/lib/persist/scoreboard.md @@ -7,4 +7,5 @@ _Generated by `lib/persist/conformance.sh`_ | event | 6 | 0 | 6 | | log | 9 | 0 | 9 | | kv | 13 | 0 | 13 | -| **Total** | **28** | **0** | **28** | +| project | 9 | 0 | 9 | +| **Total** | **37** | **0** | **37** | diff --git a/lib/persist/tests/project.sx b/lib/persist/tests/project.sx new file mode 100644 index 00000000..9f378792 --- /dev/null +++ b/lib/persist/tests/project.sx @@ -0,0 +1,115 @@ +; Phase 2 — projections: fold a stream into a read model, resume incrementally. + +(persist-test + "project empty stream returns seed value" + (persist/project-fold + (persist/open) + "s" + (fn (acc e) (+ acc 1)) + 0) + 0) +(persist-test + "project empty stream seq is 0" + (persist/project-seq + (persist/project (persist/open) "s" (fn (a e) a) 0)) + 0) +(persist-test + "project counts events" + (let + ((b (persist/open))) + (begin + (persist/append b "s" "x" 0 {}) + (persist/append b "s" "x" 0 {}) + (persist/append b "s" "x" 0 {}) + (persist/project-fold + b + "s" + (fn (acc e) (+ acc 1)) + 0))) + 3) +(persist-test + "project sums event data" + (let + ((b (persist/open))) + (begin + (persist/append b "ledger" "credit" 0 {:amt 10}) + (persist/append b "ledger" "credit" 1 {:amt 5}) + (persist/append b "ledger" "debit" 2 {:amt 3}) + (persist/project-fold + b + "ledger" + (fn + (bal e) + (if + (equal? (persist/event-type e) "credit") + (+ bal (get (persist/event-data e) :amt)) + (- bal (get (persist/event-data e) :amt)))) + 0))) + 12) +(persist-test + "project tracks last seq" + (let + ((b (persist/open))) + (begin + (persist/append b "s" "x" 0 {}) + (persist/append b "s" "x" 0 {}) + (persist/project-seq (persist/project b "s" (fn (a e) a) 0)))) + 2) +(persist-test + "resume folds only the tail" + (let + ((b (persist/open))) + (begin + (persist/append b "s" "x" 0 {}) + (persist/append b "s" "x" 0 {}) + (let + ((p1 (persist/project b "s" (fn (acc e) (+ acc 1)) 0))) + (begin + (persist/append b "s" "x" 0 {}) + (persist/project-value + (persist/project-resume + b + "s" + (fn (acc e) (+ acc 1)) + p1)))))) + 3) +(persist-test + "resume with no new events is a no-op" + (let + ((b (persist/open))) + (begin + (persist/append b "s" "x" 0 {}) + (let + ((p1 (persist/project b "s" (fn (acc e) (+ acc 1)) 0))) + (persist/project-value + (persist/project-resume b "s" (fn (acc e) (+ acc 1)) p1))))) + 1) +(persist-test + "resume advances seq" + (let + ((b (persist/open))) + (begin + (persist/append b "s" "x" 0 {}) + (let + ((p1 (persist/project b "s" (fn (a e) a) 0))) + (begin + (persist/append b "s" "x" 0 {}) + (persist/append b "s" "x" 0 {}) + (persist/project-seq + (persist/project-resume b "s" (fn (a e) a) p1)))))) + 3) +(persist-test + "full project equals seed-resume from zero" + (let + ((b (persist/open))) + (begin + (persist/append b "s" "x" 0 {}) + (persist/append b "s" "x" 0 {}) + (equal? + (persist/project b "s" (fn (acc e) (+ acc 1)) 0) + (persist/project-resume + b + "s" + (fn (acc e) (+ acc 1)) + {:value 0 :seq 0})))) + true) diff --git a/plans/persist-on-sx.md b/plans/persist-on-sx.md index 8d316de2..6150ff75 100644 --- a/plans/persist-on-sx.md +++ b/plans/persist-on-sx.md @@ -94,7 +94,7 @@ lib/persist/backend.sx lib/persist/api.sx - [x] `api.sx` + tests + scoreboard + conformance.sh ## Phase 2 — Projections + subscriptions -- [ ] `project.sx` — `(project stream step seed)`, incremental fold +- [x] `project.sx` — `(project stream step seed)`, incremental fold - [ ] subscription hook — projection / kv read model re-runs on append - [ ] concurrency conflict surfaced as a real result, not a crash @@ -113,6 +113,10 @@ feed/-log, flow store, mod/audit, search index, acl grants, identity sessions al become `persist` log or kv. Track each migration in that subsystem's plan. ## Progress log +- **Phase 2a (37/37).** `project.sx` — projection state `{:value :seq}`; + `persist/project` folds whole stream from seed, `persist/project-resume` + folds only the tail (seq > prior seq) so read models update incrementally. + step is pure `(value event) -> value`. 9 tests incl. resume==full-from-zero. - **Phase 1 complete (28/28).** `event.sx` (event record + accessors), `backend.sx` (injectable protocol + in-memory log/kv impl, closure state via set!), `log.sx` (append/read/read-from, sequential per-stream seq, stream