; Phase 2 — subscription hub: callbacks fire on publish, drive read models. (persist-test "no subscribers initially" (persist/subscriber-count (persist/hub (persist/open)) "s") 0) (persist-test "subscribe registers a callback" (let ((h (persist/hub (persist/open)))) (begin (persist/subscribe h "s" (fn (b s e) nil)) (persist/subscriber-count h "s"))) 1) (persist-test "publish appends to the log" (let ((b (persist/open))) (let ((h (persist/hub b))) (begin (persist/publish h "s" "x" 0 {}) (persist/publish h "s" "x" 0 {}) (persist/count b "s")))) 2) (persist-test "publish returns the stored event" (let ((h (persist/hub (persist/open)))) (persist/event-seq (persist/publish h "s" "x" 0 {:id 1}))) 1) (persist-test "callback fires on publish — drives a kv read model" (let ((b (persist/open))) (let ((h (persist/hub b))) (begin (persist/subscribe h "s" (fn (bk s e) (persist/kv-update bk "count" 0 (fn (n) (+ n 1))))) (persist/publish h "s" "x" 0 {}) (persist/publish h "s" "x" 0 {}) (persist/publish h "s" "x" 0 {}) (persist/kv-get b "count")))) 3) (persist-test "callback receives the event" (let ((b (persist/open))) (let ((h (persist/hub b))) (begin (persist/subscribe h "s" (fn (bk s e) (persist/kv-put bk "last" (persist/event-type e)))) (persist/publish h "s" "created" 0 {}) (persist/kv-get b "last")))) "created") (persist-test "subscriptions are per-stream" (let ((b (persist/open))) (let ((h (persist/hub b))) (begin (persist/subscribe h "s1" (fn (bk s e) (persist/kv-update bk "n" 0 (fn (n) (+ n 1))))) (persist/publish h "s2" "x" 0 {}) (persist/kv-get-or b "n" 0)))) 0) (persist-test "multiple subscribers all fire" (let ((b (persist/open))) (let ((h (persist/hub b))) (begin (persist/subscribe h "s" (fn (bk s e) (persist/kv-update bk "a" 0 (fn (n) (+ n 1))))) (persist/subscribe h "s" (fn (bk s e) (persist/kv-update bk "b" 0 (fn (n) (+ n 10))))) (persist/publish h "s" "x" 0 {}) (list (persist/kv-get b "a") (persist/kv-get b "b"))))) (list 1 10)) (persist-test "incremental read model via resume in callback" (let ((b (persist/open))) (let ((h (persist/hub b))) (begin (persist/kv-put b "proj" {:value 0 :seq 0}) (persist/subscribe h "s" (fn (bk s e) (persist/kv-put bk "proj" (persist/project-resume bk s (fn (acc ev) (+ acc 1)) (persist/kv-get bk "proj"))))) (persist/publish h "s" "x" 0 {}) (persist/publish h "s" "x" 0 {}) (persist/project-value (persist/kv-get b "proj"))))) 2)