persist: projections — fold stream into read model, incremental resume + 9 tests
Some checks failed
Test, Build, and Deploy / test-build-deploy (push) Failing after 1m9s

project.sx: projection state {:value :seq}; persist/project folds the whole
stream, persist/project-resume folds only the tail so read models update
incrementally. Pure step (value event)->value. 37/37.

Co-Authored-By: Claude Opus 4.8 (1M context) <noreply@anthropic.com>
This commit is contained in:
2026-06-06 18:34:52 +00:00
parent 314cc37030
commit a6864178c3
6 changed files with 158 additions and 6 deletions

View File

@@ -13,7 +13,7 @@ if [ ! -x "$SX_SERVER" ]; then
exit 1 exit 1
fi fi
SUITES=(event log kv) SUITES=(event log kv project)
OUT_JSON="lib/persist/scoreboard.json" OUT_JSON="lib/persist/scoreboard.json"
OUT_MD="lib/persist/scoreboard.md" OUT_MD="lib/persist/scoreboard.md"
@@ -31,6 +31,7 @@ run_suite() {
(load "lib/persist/backend.sx") (load "lib/persist/backend.sx")
(load "lib/persist/log.sx") (load "lib/persist/log.sx")
(load "lib/persist/kv.sx") (load "lib/persist/kv.sx")
(load "lib/persist/project.sx")
(load "lib/persist/api.sx") (load "lib/persist/api.sx")
(epoch 2) (epoch 2)
(eval "(define persist-test-pass 0)") (eval "(define persist-test-pass 0)")

30
lib/persist/project.sx Normal file
View File

@@ -0,0 +1,30 @@
; persist/project — a projection folds a stream's events into a read model.
; A projection state is {:value v :seq s} where s is the last seq folded in,
; so a projection can resume incrementally from where it left off (replay only
; the tail). step : (value event) -> value. Determinism: step must be pure —
; time lives on the event (event-at), never a clock here.
; Requires: lib/persist/event.sx, lib/persist/log.sx.
; fold the tail (events with seq > prior's seq) onto a prior projection state
(define
persist/project-resume
(fn
(b stream step prior)
(let
((tail (persist/read-from b stream (+ 1 (get prior :seq)))))
(reduce (fn (acc e) {:value (step (get acc :value) e) :seq (persist/event-seq e)}) prior tail))))
; project the whole stream from seed
(define
persist/project
(fn (b stream step seed) (persist/project-resume b stream step {:value seed :seq 0})))
(define persist/project-value (fn (p) (get p :value)))
(define persist/project-seq (fn (p) (get p :seq)))
; convenience: project and return just the value
(define
persist/project-fold
(fn
(b stream step seed)
(persist/project-value (persist/project b stream step seed))))

View File

@@ -2,9 +2,10 @@
"suites": { "suites": {
"event": {"pass": 6, "fail": 0}, "event": {"pass": 6, "fail": 0},
"log": {"pass": 9, "fail": 0}, "log": {"pass": 9, "fail": 0},
"kv": {"pass": 13, "fail": 0} "kv": {"pass": 13, "fail": 0},
"project": {"pass": 9, "fail": 0}
}, },
"total_pass": 28, "total_pass": 37,
"total_fail": 0, "total_fail": 0,
"total": 28 "total": 37
} }

View File

@@ -7,4 +7,5 @@ _Generated by `lib/persist/conformance.sh`_
| event | 6 | 0 | 6 | | event | 6 | 0 | 6 |
| log | 9 | 0 | 9 | | log | 9 | 0 | 9 |
| kv | 13 | 0 | 13 | | kv | 13 | 0 | 13 |
| **Total** | **28** | **0** | **28** | | project | 9 | 0 | 9 |
| **Total** | **37** | **0** | **37** |

View File

@@ -0,0 +1,115 @@
; Phase 2 — projections: fold a stream into a read model, resume incrementally.
(persist-test
"project empty stream returns seed value"
(persist/project-fold
(persist/open)
"s"
(fn (acc e) (+ acc 1))
0)
0)
(persist-test
"project empty stream seq is 0"
(persist/project-seq
(persist/project (persist/open) "s" (fn (a e) a) 0))
0)
(persist-test
"project counts events"
(let
((b (persist/open)))
(begin
(persist/append b "s" "x" 0 {})
(persist/append b "s" "x" 0 {})
(persist/append b "s" "x" 0 {})
(persist/project-fold
b
"s"
(fn (acc e) (+ acc 1))
0)))
3)
(persist-test
"project sums event data"
(let
((b (persist/open)))
(begin
(persist/append b "ledger" "credit" 0 {:amt 10})
(persist/append b "ledger" "credit" 1 {:amt 5})
(persist/append b "ledger" "debit" 2 {:amt 3})
(persist/project-fold
b
"ledger"
(fn
(bal e)
(if
(equal? (persist/event-type e) "credit")
(+ bal (get (persist/event-data e) :amt))
(- bal (get (persist/event-data e) :amt))))
0)))
12)
(persist-test
"project tracks last seq"
(let
((b (persist/open)))
(begin
(persist/append b "s" "x" 0 {})
(persist/append b "s" "x" 0 {})
(persist/project-seq (persist/project b "s" (fn (a e) a) 0))))
2)
(persist-test
"resume folds only the tail"
(let
((b (persist/open)))
(begin
(persist/append b "s" "x" 0 {})
(persist/append b "s" "x" 0 {})
(let
((p1 (persist/project b "s" (fn (acc e) (+ acc 1)) 0)))
(begin
(persist/append b "s" "x" 0 {})
(persist/project-value
(persist/project-resume
b
"s"
(fn (acc e) (+ acc 1))
p1))))))
3)
(persist-test
"resume with no new events is a no-op"
(let
((b (persist/open)))
(begin
(persist/append b "s" "x" 0 {})
(let
((p1 (persist/project b "s" (fn (acc e) (+ acc 1)) 0)))
(persist/project-value
(persist/project-resume b "s" (fn (acc e) (+ acc 1)) p1)))))
1)
(persist-test
"resume advances seq"
(let
((b (persist/open)))
(begin
(persist/append b "s" "x" 0 {})
(let
((p1 (persist/project b "s" (fn (a e) a) 0)))
(begin
(persist/append b "s" "x" 0 {})
(persist/append b "s" "x" 0 {})
(persist/project-seq
(persist/project-resume b "s" (fn (a e) a) p1))))))
3)
(persist-test
"full project equals seed-resume from zero"
(let
((b (persist/open)))
(begin
(persist/append b "s" "x" 0 {})
(persist/append b "s" "x" 0 {})
(equal?
(persist/project b "s" (fn (acc e) (+ acc 1)) 0)
(persist/project-resume
b
"s"
(fn (acc e) (+ acc 1))
{:value 0 :seq 0}))))
true)

View File

@@ -94,7 +94,7 @@ lib/persist/backend.sx lib/persist/api.sx
- [x] `api.sx` + tests + scoreboard + conformance.sh - [x] `api.sx` + tests + scoreboard + conformance.sh
## Phase 2 — Projections + subscriptions ## Phase 2 — Projections + subscriptions
- [ ] `project.sx``(project stream step seed)`, incremental fold - [x] `project.sx``(project stream step seed)`, incremental fold
- [ ] subscription hook — projection / kv read model re-runs on append - [ ] subscription hook — projection / kv read model re-runs on append
- [ ] concurrency conflict surfaced as a real result, not a crash - [ ] concurrency conflict surfaced as a real result, not a crash
@@ -113,6 +113,10 @@ feed/-log, flow store, mod/audit, search index, acl grants, identity sessions al
become `persist` log or kv. Track each migration in that subsystem's plan. become `persist` log or kv. Track each migration in that subsystem's plan.
## Progress log ## Progress log
- **Phase 2a (37/37).** `project.sx` — projection state `{:value :seq}`;
`persist/project` folds whole stream from seed, `persist/project-resume`
folds only the tail (seq > prior seq) so read models update incrementally.
step is pure `(value event) -> value`. 9 tests incl. resume==full-from-zero.
- **Phase 1 complete (28/28).** `event.sx` (event record + accessors), - **Phase 1 complete (28/28).** `event.sx` (event record + accessors),
`backend.sx` (injectable protocol + in-memory log/kv impl, closure state via `backend.sx` (injectable protocol + in-memory log/kv impl, closure state via
set!), `log.sx` (append/read/read-from, sequential per-stream seq, stream set!), `log.sx` (append/read/read-from, sequential per-stream seq, stream