persist: subscription hub — read models update on publish + 9 tests
Some checks failed
Test, Build, and Deploy / test-build-deploy (push) Failing after 1m6s
Some checks failed
Test, Build, and Deploy / test-build-deploy (push) Failing after 1m6s
subscribe.sx: persist/hub wraps a backend; persist/publish appends then fires per-stream callbacks (backend stream event). Direct persist/append bypasses subscribers (bulk load/replay). Callbacks drive kv counters / project-resume. 46/46. Co-Authored-By: Claude Opus 4.8 (1M context) <noreply@anthropic.com>
This commit is contained in:
@@ -13,7 +13,7 @@ if [ ! -x "$SX_SERVER" ]; then
|
|||||||
exit 1
|
exit 1
|
||||||
fi
|
fi
|
||||||
|
|
||||||
SUITES=(event log kv project)
|
SUITES=(event log kv project subscribe)
|
||||||
|
|
||||||
OUT_JSON="lib/persist/scoreboard.json"
|
OUT_JSON="lib/persist/scoreboard.json"
|
||||||
OUT_MD="lib/persist/scoreboard.md"
|
OUT_MD="lib/persist/scoreboard.md"
|
||||||
@@ -32,6 +32,7 @@ run_suite() {
|
|||||||
(load "lib/persist/log.sx")
|
(load "lib/persist/log.sx")
|
||||||
(load "lib/persist/kv.sx")
|
(load "lib/persist/kv.sx")
|
||||||
(load "lib/persist/project.sx")
|
(load "lib/persist/project.sx")
|
||||||
|
(load "lib/persist/subscribe.sx")
|
||||||
(load "lib/persist/api.sx")
|
(load "lib/persist/api.sx")
|
||||||
(epoch 2)
|
(epoch 2)
|
||||||
(eval "(define persist-test-pass 0)")
|
(eval "(define persist-test-pass 0)")
|
||||||
|
|||||||
@@ -3,9 +3,10 @@
|
|||||||
"event": {"pass": 6, "fail": 0},
|
"event": {"pass": 6, "fail": 0},
|
||||||
"log": {"pass": 9, "fail": 0},
|
"log": {"pass": 9, "fail": 0},
|
||||||
"kv": {"pass": 13, "fail": 0},
|
"kv": {"pass": 13, "fail": 0},
|
||||||
"project": {"pass": 9, "fail": 0}
|
"project": {"pass": 9, "fail": 0},
|
||||||
|
"subscribe": {"pass": 9, "fail": 0}
|
||||||
},
|
},
|
||||||
"total_pass": 37,
|
"total_pass": 46,
|
||||||
"total_fail": 0,
|
"total_fail": 0,
|
||||||
"total": 37
|
"total": 46
|
||||||
}
|
}
|
||||||
|
|||||||
@@ -8,4 +8,5 @@ _Generated by `lib/persist/conformance.sh`_
|
|||||||
| log | 9 | 0 | 9 |
|
| log | 9 | 0 | 9 |
|
||||||
| kv | 13 | 0 | 13 |
|
| kv | 13 | 0 | 13 |
|
||||||
| project | 9 | 0 | 9 |
|
| project | 9 | 0 | 9 |
|
||||||
| **Total** | **37** | **0** | **37** |
|
| subscribe | 9 | 0 | 9 |
|
||||||
|
| **Total** | **46** | **0** | **46** |
|
||||||
|
|||||||
21
lib/persist/subscribe.sx
Normal file
21
lib/persist/subscribe.sx
Normal file
@@ -0,0 +1,21 @@
|
|||||||
|
; persist/subscribe — a subscription hub wraps a backend with per-stream
|
||||||
|
; callbacks fired after each append. The canonical use: a callback re-runs a
|
||||||
|
; projection (or bumps a kv counter) so read models update incrementally on
|
||||||
|
; write instead of being recomputed on read.
|
||||||
|
; callback signature: (backend stream event) -> ignored
|
||||||
|
; Publish goes through the hub; direct persist/append on the backend bypasses
|
||||||
|
; subscribers by design (bulk loads, replay).
|
||||||
|
; Requires: lib/persist/log.sx.
|
||||||
|
|
||||||
|
(define persist/hub (fn (b) (let ((subs {})) {:subscriber-count (fn (stream) (let ((cs (get subs stream))) (if cs (len cs) 0))) :publish (fn (stream type at data) (let ((ev (persist/append b stream type at data))) (begin (for-each (fn (cb) (cb b stream ev)) (let ((cs (get subs stream))) (if cs cs (list)))) ev))) :subscribe (fn (stream cb) (let ((cur (get subs stream))) (set! subs (assoc subs stream (append (if cur cur (list)) cb))))) :backend b})))
|
||||||
|
|
||||||
|
(define persist/hub-backend (fn (h) (get h :backend)))
|
||||||
|
(define
|
||||||
|
persist/subscribe
|
||||||
|
(fn (h stream cb) ((get h :subscribe) stream cb)))
|
||||||
|
(define
|
||||||
|
persist/publish
|
||||||
|
(fn (h stream type at data) ((get h :publish) stream type at data)))
|
||||||
|
(define
|
||||||
|
persist/subscriber-count
|
||||||
|
(fn (h stream) ((get h :subscriber-count) stream)))
|
||||||
130
lib/persist/tests/subscribe.sx
Normal file
130
lib/persist/tests/subscribe.sx
Normal file
@@ -0,0 +1,130 @@
|
|||||||
|
; Phase 2 — subscription hub: callbacks fire on publish, drive read models.
|
||||||
|
|
||||||
|
(persist-test
|
||||||
|
"no subscribers initially"
|
||||||
|
(persist/subscriber-count (persist/hub (persist/open)) "s")
|
||||||
|
0)
|
||||||
|
(persist-test
|
||||||
|
"subscribe registers a callback"
|
||||||
|
(let
|
||||||
|
((h (persist/hub (persist/open))))
|
||||||
|
(begin
|
||||||
|
(persist/subscribe h "s" (fn (b s e) nil))
|
||||||
|
(persist/subscriber-count h "s")))
|
||||||
|
1)
|
||||||
|
(persist-test
|
||||||
|
"publish appends to the log"
|
||||||
|
(let
|
||||||
|
((b (persist/open)))
|
||||||
|
(let
|
||||||
|
((h (persist/hub b)))
|
||||||
|
(begin
|
||||||
|
(persist/publish h "s" "x" 0 {})
|
||||||
|
(persist/publish h "s" "x" 0 {})
|
||||||
|
(persist/count b "s"))))
|
||||||
|
2)
|
||||||
|
(persist-test
|
||||||
|
"publish returns the stored event"
|
||||||
|
(let
|
||||||
|
((h (persist/hub (persist/open))))
|
||||||
|
(persist/event-seq (persist/publish h "s" "x" 0 {:id 1})))
|
||||||
|
1)
|
||||||
|
(persist-test
|
||||||
|
"callback fires on publish — drives a kv read model"
|
||||||
|
(let
|
||||||
|
((b (persist/open)))
|
||||||
|
(let
|
||||||
|
((h (persist/hub b)))
|
||||||
|
(begin
|
||||||
|
(persist/subscribe
|
||||||
|
h
|
||||||
|
"s"
|
||||||
|
(fn
|
||||||
|
(bk s e)
|
||||||
|
(persist/kv-update
|
||||||
|
bk
|
||||||
|
"count"
|
||||||
|
0
|
||||||
|
(fn (n) (+ n 1)))))
|
||||||
|
(persist/publish h "s" "x" 0 {})
|
||||||
|
(persist/publish h "s" "x" 0 {})
|
||||||
|
(persist/publish h "s" "x" 0 {})
|
||||||
|
(persist/kv-get b "count"))))
|
||||||
|
3)
|
||||||
|
(persist-test
|
||||||
|
"callback receives the event"
|
||||||
|
(let
|
||||||
|
((b (persist/open)))
|
||||||
|
(let
|
||||||
|
((h (persist/hub b)))
|
||||||
|
(begin
|
||||||
|
(persist/subscribe
|
||||||
|
h
|
||||||
|
"s"
|
||||||
|
(fn (bk s e) (persist/kv-put bk "last" (persist/event-type e))))
|
||||||
|
(persist/publish h "s" "created" 0 {})
|
||||||
|
(persist/kv-get b "last"))))
|
||||||
|
"created")
|
||||||
|
(persist-test
|
||||||
|
"subscriptions are per-stream"
|
||||||
|
(let
|
||||||
|
((b (persist/open)))
|
||||||
|
(let
|
||||||
|
((h (persist/hub b)))
|
||||||
|
(begin
|
||||||
|
(persist/subscribe
|
||||||
|
h
|
||||||
|
"s1"
|
||||||
|
(fn
|
||||||
|
(bk s e)
|
||||||
|
(persist/kv-update bk "n" 0 (fn (n) (+ n 1)))))
|
||||||
|
(persist/publish h "s2" "x" 0 {})
|
||||||
|
(persist/kv-get-or b "n" 0))))
|
||||||
|
0)
|
||||||
|
(persist-test
|
||||||
|
"multiple subscribers all fire"
|
||||||
|
(let
|
||||||
|
((b (persist/open)))
|
||||||
|
(let
|
||||||
|
((h (persist/hub b)))
|
||||||
|
(begin
|
||||||
|
(persist/subscribe
|
||||||
|
h
|
||||||
|
"s"
|
||||||
|
(fn
|
||||||
|
(bk s e)
|
||||||
|
(persist/kv-update bk "a" 0 (fn (n) (+ n 1)))))
|
||||||
|
(persist/subscribe
|
||||||
|
h
|
||||||
|
"s"
|
||||||
|
(fn
|
||||||
|
(bk s e)
|
||||||
|
(persist/kv-update bk "b" 0 (fn (n) (+ n 10)))))
|
||||||
|
(persist/publish h "s" "x" 0 {})
|
||||||
|
(list (persist/kv-get b "a") (persist/kv-get b "b")))))
|
||||||
|
(list 1 10))
|
||||||
|
(persist-test
|
||||||
|
"incremental read model via resume in callback"
|
||||||
|
(let
|
||||||
|
((b (persist/open)))
|
||||||
|
(let
|
||||||
|
((h (persist/hub b)))
|
||||||
|
(begin
|
||||||
|
(persist/kv-put b "proj" {:value 0 :seq 0})
|
||||||
|
(persist/subscribe
|
||||||
|
h
|
||||||
|
"s"
|
||||||
|
(fn
|
||||||
|
(bk s e)
|
||||||
|
(persist/kv-put
|
||||||
|
bk
|
||||||
|
"proj"
|
||||||
|
(persist/project-resume
|
||||||
|
bk
|
||||||
|
s
|
||||||
|
(fn (acc ev) (+ acc 1))
|
||||||
|
(persist/kv-get bk "proj")))))
|
||||||
|
(persist/publish h "s" "x" 0 {})
|
||||||
|
(persist/publish h "s" "x" 0 {})
|
||||||
|
(persist/project-value (persist/kv-get b "proj")))))
|
||||||
|
2)
|
||||||
@@ -95,7 +95,7 @@ lib/persist/backend.sx lib/persist/api.sx
|
|||||||
|
|
||||||
## Phase 2 — Projections + subscriptions
|
## Phase 2 — Projections + subscriptions
|
||||||
- [x] `project.sx` — `(project stream step seed)`, incremental fold
|
- [x] `project.sx` — `(project stream step seed)`, incremental fold
|
||||||
- [ ] subscription hook — projection / kv read model re-runs on append
|
- [x] subscription hook — projection / kv read model re-runs on append
|
||||||
- [ ] concurrency conflict surfaced as a real result, not a crash
|
- [ ] concurrency conflict surfaced as a real result, not a crash
|
||||||
|
|
||||||
## Phase 3 — Snapshots + replay
|
## Phase 3 — Snapshots + replay
|
||||||
@@ -113,6 +113,11 @@ feed/-log, flow store, mod/audit, search index, acl grants, identity sessions al
|
|||||||
become `persist` log or kv. Track each migration in that subsystem's plan.
|
become `persist` log or kv. Track each migration in that subsystem's plan.
|
||||||
|
|
||||||
## Progress log
|
## Progress log
|
||||||
|
- **Phase 2b (46/46).** `subscribe.sx` — `persist/hub` wraps a backend with
|
||||||
|
per-stream callbacks. `persist/publish` appends then fires subscribers
|
||||||
|
`(backend stream event)`; direct `persist/append` bypasses them by design
|
||||||
|
(bulk load/replay). Canonical use: callback re-runs `project-resume` or bumps
|
||||||
|
a kv counter so read models update on write. 9 tests.
|
||||||
- **Phase 2a (37/37).** `project.sx` — projection state `{:value :seq}`;
|
- **Phase 2a (37/37).** `project.sx` — projection state `{:value :seq}`;
|
||||||
`persist/project` folds whole stream from seed, `persist/project-resume`
|
`persist/project` folds whole stream from seed, `persist/project-resume`
|
||||||
folds only the tail (seq > prior seq) so read models update incrementally.
|
folds only the tail (seq > prior seq) so read models update incrementally.
|
||||||
|
|||||||
Reference in New Issue
Block a user