diff --git a/lib/persist/conformance.sh b/lib/persist/conformance.sh index d6dbe25c..e3d9de13 100755 --- a/lib/persist/conformance.sh +++ b/lib/persist/conformance.sh @@ -13,7 +13,7 @@ if [ ! -x "$SX_SERVER" ]; then exit 1 fi -SUITES=(event log kv project) +SUITES=(event log kv project subscribe) OUT_JSON="lib/persist/scoreboard.json" OUT_MD="lib/persist/scoreboard.md" @@ -32,6 +32,7 @@ run_suite() { (load "lib/persist/log.sx") (load "lib/persist/kv.sx") (load "lib/persist/project.sx") +(load "lib/persist/subscribe.sx") (load "lib/persist/api.sx") (epoch 2) (eval "(define persist-test-pass 0)") diff --git a/lib/persist/scoreboard.json b/lib/persist/scoreboard.json index 4324f8fe..396e3cf1 100644 --- a/lib/persist/scoreboard.json +++ b/lib/persist/scoreboard.json @@ -3,9 +3,10 @@ "event": {"pass": 6, "fail": 0}, "log": {"pass": 9, "fail": 0}, "kv": {"pass": 13, "fail": 0}, - "project": {"pass": 9, "fail": 0} + "project": {"pass": 9, "fail": 0}, + "subscribe": {"pass": 9, "fail": 0} }, - "total_pass": 37, + "total_pass": 46, "total_fail": 0, - "total": 37 + "total": 46 } diff --git a/lib/persist/scoreboard.md b/lib/persist/scoreboard.md index 15a5b4e0..60131ac6 100644 --- a/lib/persist/scoreboard.md +++ b/lib/persist/scoreboard.md @@ -8,4 +8,5 @@ _Generated by `lib/persist/conformance.sh`_ | log | 9 | 0 | 9 | | kv | 13 | 0 | 13 | | project | 9 | 0 | 9 | -| **Total** | **37** | **0** | **37** | +| subscribe | 9 | 0 | 9 | +| **Total** | **46** | **0** | **46** | diff --git a/lib/persist/subscribe.sx b/lib/persist/subscribe.sx new file mode 100644 index 00000000..73e2db4f --- /dev/null +++ b/lib/persist/subscribe.sx @@ -0,0 +1,21 @@ +; persist/subscribe — a subscription hub wraps a backend with per-stream +; callbacks fired after each append. The canonical use: a callback re-runs a +; projection (or bumps a kv counter) so read models update incrementally on +; write instead of being recomputed on read. +; callback signature: (backend stream event) -> ignored +; Publish goes through the hub; direct persist/append on the backend bypasses +; subscribers by design (bulk loads, replay). +; Requires: lib/persist/log.sx. + +(define persist/hub (fn (b) (let ((subs {})) {:subscriber-count (fn (stream) (let ((cs (get subs stream))) (if cs (len cs) 0))) :publish (fn (stream type at data) (let ((ev (persist/append b stream type at data))) (begin (for-each (fn (cb) (cb b stream ev)) (let ((cs (get subs stream))) (if cs cs (list)))) ev))) :subscribe (fn (stream cb) (let ((cur (get subs stream))) (set! subs (assoc subs stream (append (if cur cur (list)) cb))))) :backend b}))) + +(define persist/hub-backend (fn (h) (get h :backend))) +(define + persist/subscribe + (fn (h stream cb) ((get h :subscribe) stream cb))) +(define + persist/publish + (fn (h stream type at data) ((get h :publish) stream type at data))) +(define + persist/subscriber-count + (fn (h stream) ((get h :subscriber-count) stream))) diff --git a/lib/persist/tests/subscribe.sx b/lib/persist/tests/subscribe.sx new file mode 100644 index 00000000..3cc23ca1 --- /dev/null +++ b/lib/persist/tests/subscribe.sx @@ -0,0 +1,130 @@ +; Phase 2 — subscription hub: callbacks fire on publish, drive read models. + +(persist-test + "no subscribers initially" + (persist/subscriber-count (persist/hub (persist/open)) "s") + 0) +(persist-test + "subscribe registers a callback" + (let + ((h (persist/hub (persist/open)))) + (begin + (persist/subscribe h "s" (fn (b s e) nil)) + (persist/subscriber-count h "s"))) + 1) +(persist-test + "publish appends to the log" + (let + ((b (persist/open))) + (let + ((h (persist/hub b))) + (begin + (persist/publish h "s" "x" 0 {}) + (persist/publish h "s" "x" 0 {}) + (persist/count b "s")))) + 2) +(persist-test + "publish returns the stored event" + (let + ((h (persist/hub (persist/open)))) + (persist/event-seq (persist/publish h "s" "x" 0 {:id 1}))) + 1) +(persist-test + "callback fires on publish — drives a kv read model" + (let + ((b (persist/open))) + (let + ((h (persist/hub b))) + (begin + (persist/subscribe + h + "s" + (fn + (bk s e) + (persist/kv-update + bk + "count" + 0 + (fn (n) (+ n 1))))) + (persist/publish h "s" "x" 0 {}) + (persist/publish h "s" "x" 0 {}) + (persist/publish h "s" "x" 0 {}) + (persist/kv-get b "count")))) + 3) +(persist-test + "callback receives the event" + (let + ((b (persist/open))) + (let + ((h (persist/hub b))) + (begin + (persist/subscribe + h + "s" + (fn (bk s e) (persist/kv-put bk "last" (persist/event-type e)))) + (persist/publish h "s" "created" 0 {}) + (persist/kv-get b "last")))) + "created") +(persist-test + "subscriptions are per-stream" + (let + ((b (persist/open))) + (let + ((h (persist/hub b))) + (begin + (persist/subscribe + h + "s1" + (fn + (bk s e) + (persist/kv-update bk "n" 0 (fn (n) (+ n 1))))) + (persist/publish h "s2" "x" 0 {}) + (persist/kv-get-or b "n" 0)))) + 0) +(persist-test + "multiple subscribers all fire" + (let + ((b (persist/open))) + (let + ((h (persist/hub b))) + (begin + (persist/subscribe + h + "s" + (fn + (bk s e) + (persist/kv-update bk "a" 0 (fn (n) (+ n 1))))) + (persist/subscribe + h + "s" + (fn + (bk s e) + (persist/kv-update bk "b" 0 (fn (n) (+ n 10))))) + (persist/publish h "s" "x" 0 {}) + (list (persist/kv-get b "a") (persist/kv-get b "b"))))) + (list 1 10)) +(persist-test + "incremental read model via resume in callback" + (let + ((b (persist/open))) + (let + ((h (persist/hub b))) + (begin + (persist/kv-put b "proj" {:value 0 :seq 0}) + (persist/subscribe + h + "s" + (fn + (bk s e) + (persist/kv-put + bk + "proj" + (persist/project-resume + bk + s + (fn (acc ev) (+ acc 1)) + (persist/kv-get bk "proj"))))) + (persist/publish h "s" "x" 0 {}) + (persist/publish h "s" "x" 0 {}) + (persist/project-value (persist/kv-get b "proj"))))) + 2) diff --git a/plans/persist-on-sx.md b/plans/persist-on-sx.md index 6150ff75..9a02fdd1 100644 --- a/plans/persist-on-sx.md +++ b/plans/persist-on-sx.md @@ -95,7 +95,7 @@ lib/persist/backend.sx lib/persist/api.sx ## Phase 2 — Projections + subscriptions - [x] `project.sx` — `(project stream step seed)`, incremental fold -- [ ] subscription hook — projection / kv read model re-runs on append +- [x] subscription hook — projection / kv read model re-runs on append - [ ] concurrency conflict surfaced as a real result, not a crash ## Phase 3 — Snapshots + replay @@ -113,6 +113,11 @@ feed/-log, flow store, mod/audit, search index, acl grants, identity sessions al become `persist` log or kv. Track each migration in that subsystem's plan. ## Progress log +- **Phase 2b (46/46).** `subscribe.sx` — `persist/hub` wraps a backend with + per-stream callbacks. `persist/publish` appends then fires subscribers + `(backend stream event)`; direct `persist/append` bypasses them by design + (bulk load/replay). Canonical use: callback re-runs `project-resume` or bumps + a kv counter so read models update on write. 9 tests. - **Phase 2a (37/37).** `project.sx` — projection state `{:value :seq}`; `persist/project` folds whole stream from seed, `persist/project-resume` folds only the tail (seq > prior seq) so read models update incrementally.