Files
rose-ash/lib/persist/tests/subscribe.sx
giles 03da8d4328
Some checks failed
Test, Build, and Deploy / test-build-deploy (push) Failing after 1m6s
persist: subscription hub — read models update on publish + 9 tests
subscribe.sx: persist/hub wraps a backend; persist/publish appends then fires
per-stream callbacks (backend stream event). Direct persist/append bypasses
subscribers (bulk load/replay). Callbacks drive kv counters / project-resume. 46/46.

Co-Authored-By: Claude Opus 4.8 (1M context) <noreply@anthropic.com>
2026-06-06 18:36:16 +00:00

131 lines
3.2 KiB
Plaintext

; Phase 2 — subscription hub: callbacks fire on publish, drive read models.
(persist-test
"no subscribers initially"
(persist/subscriber-count (persist/hub (persist/open)) "s")
0)
(persist-test
"subscribe registers a callback"
(let
((h (persist/hub (persist/open))))
(begin
(persist/subscribe h "s" (fn (b s e) nil))
(persist/subscriber-count h "s")))
1)
(persist-test
"publish appends to the log"
(let
((b (persist/open)))
(let
((h (persist/hub b)))
(begin
(persist/publish h "s" "x" 0 {})
(persist/publish h "s" "x" 0 {})
(persist/count b "s"))))
2)
(persist-test
"publish returns the stored event"
(let
((h (persist/hub (persist/open))))
(persist/event-seq (persist/publish h "s" "x" 0 {:id 1})))
1)
(persist-test
"callback fires on publish — drives a kv read model"
(let
((b (persist/open)))
(let
((h (persist/hub b)))
(begin
(persist/subscribe
h
"s"
(fn
(bk s e)
(persist/kv-update
bk
"count"
0
(fn (n) (+ n 1)))))
(persist/publish h "s" "x" 0 {})
(persist/publish h "s" "x" 0 {})
(persist/publish h "s" "x" 0 {})
(persist/kv-get b "count"))))
3)
(persist-test
"callback receives the event"
(let
((b (persist/open)))
(let
((h (persist/hub b)))
(begin
(persist/subscribe
h
"s"
(fn (bk s e) (persist/kv-put bk "last" (persist/event-type e))))
(persist/publish h "s" "created" 0 {})
(persist/kv-get b "last"))))
"created")
(persist-test
"subscriptions are per-stream"
(let
((b (persist/open)))
(let
((h (persist/hub b)))
(begin
(persist/subscribe
h
"s1"
(fn
(bk s e)
(persist/kv-update bk "n" 0 (fn (n) (+ n 1)))))
(persist/publish h "s2" "x" 0 {})
(persist/kv-get-or b "n" 0))))
0)
(persist-test
"multiple subscribers all fire"
(let
((b (persist/open)))
(let
((h (persist/hub b)))
(begin
(persist/subscribe
h
"s"
(fn
(bk s e)
(persist/kv-update bk "a" 0 (fn (n) (+ n 1)))))
(persist/subscribe
h
"s"
(fn
(bk s e)
(persist/kv-update bk "b" 0 (fn (n) (+ n 10)))))
(persist/publish h "s" "x" 0 {})
(list (persist/kv-get b "a") (persist/kv-get b "b")))))
(list 1 10))
(persist-test
"incremental read model via resume in callback"
(let
((b (persist/open)))
(let
((h (persist/hub b)))
(begin
(persist/kv-put b "proj" {:value 0 :seq 0})
(persist/subscribe
h
"s"
(fn
(bk s e)
(persist/kv-put
bk
"proj"
(persist/project-resume
bk
s
(fn (acc ev) (+ acc 1))
(persist/kv-get bk "proj")))))
(persist/publish h "s" "x" 0 {})
(persist/publish h "s" "x" 0 {})
(persist/project-value (persist/kv-get b "proj")))))
2)