; persist/subscribe — a subscription hub wraps a backend with per-stream ; callbacks fired after each append. The canonical use: a callback re-runs a ; projection (or bumps a kv counter) so read models update incrementally on ; write instead of being recomputed on read. ; callback signature: (backend stream event) -> ignored ; Publish goes through the hub; direct persist/append on the backend bypasses ; subscribers by design (bulk loads, replay). ; Requires: lib/persist/log.sx. (define persist/hub (fn (b) (let ((subs {})) {:subscriber-count (fn (stream) (let ((cs (get subs stream))) (if cs (len cs) 0))) :publish (fn (stream type at data) (let ((ev (persist/append b stream type at data))) (begin (for-each (fn (cb) (cb b stream ev)) (let ((cs (get subs stream))) (if cs cs (list)))) ev))) :subscribe (fn (stream cb) (let ((cur (get subs stream))) (set! subs (assoc subs stream (append (if cur cur (list)) cb))))) :backend b}))) (define persist/hub-backend (fn (h) (get h :backend))) (define persist/subscribe (fn (h stream cb) ((get h :subscribe) stream cb))) (define persist/publish (fn (h stream type at data) ((get h :publish) stream type at data))) (define persist/subscriber-count (fn (h stream) ((get h :subscriber-count) stream)))