Some checks failed
Test, Build, and Deploy / test-build-deploy (push) Failing after 1m6s
subscribe.sx: persist/hub wraps a backend; persist/publish appends then fires per-stream callbacks (backend stream event). Direct persist/append bypasses subscribers (bulk load/replay). Callbacks drive kv counters / project-resume. 46/46. Co-Authored-By: Claude Opus 4.8 (1M context) <noreply@anthropic.com>
22 lines
1.2 KiB
Plaintext
22 lines
1.2 KiB
Plaintext
; persist/subscribe — a subscription hub wraps a backend with per-stream
|
|
; callbacks fired after each append. The canonical use: a callback re-runs a
|
|
; projection (or bumps a kv counter) so read models update incrementally on
|
|
; write instead of being recomputed on read.
|
|
; callback signature: (backend stream event) -> ignored
|
|
; Publish goes through the hub; direct persist/append on the backend bypasses
|
|
; subscribers by design (bulk loads, replay).
|
|
; Requires: lib/persist/log.sx.
|
|
|
|
(define persist/hub (fn (b) (let ((subs {})) {:subscriber-count (fn (stream) (let ((cs (get subs stream))) (if cs (len cs) 0))) :publish (fn (stream type at data) (let ((ev (persist/append b stream type at data))) (begin (for-each (fn (cb) (cb b stream ev)) (let ((cs (get subs stream))) (if cs cs (list)))) ev))) :subscribe (fn (stream cb) (let ((cur (get subs stream))) (set! subs (assoc subs stream (append (if cur cur (list)) cb))))) :backend b})))
|
|
|
|
(define persist/hub-backend (fn (h) (get h :backend)))
|
|
(define
|
|
persist/subscribe
|
|
(fn (h stream cb) ((get h :subscribe) stream cb)))
|
|
(define
|
|
persist/publish
|
|
(fn (h stream type at data) ((get h :publish) stream type at data)))
|
|
(define
|
|
persist/subscriber-count
|
|
(fn (h stream) ((get h :subscriber-count) stream)))
|