persist: stream catalog — enumerate streams + 10 tests
Some checks failed
Test, Build, and Deploy / test-build-deploy (push) Failing after 30s
Some checks failed
Test, Build, and Deploy / test-build-deploy (push) Failing after 30s
New backend op :streams (from seq high-water marks, so compacted streams still list), threaded through mem-backend + durable serve/io-backend. catalog.sx: persist/streams, stream-count, stream-exists?, total-events. 143/143. Co-Authored-By: Claude Opus 4.8 (1M context) <noreply@anthropic.com>
This commit is contained in:
@@ -1,18 +1,19 @@
|
||||
; persist/backend — the injected storage protocol. Every facet (log, kv,
|
||||
; snapshot) goes through a backend dict, never touching storage directly, so
|
||||
; file/pg/ipfs-ref backends swap in unchanged. A backend is a dict of fns:
|
||||
; {:append :read :last-seq :truncate-through
|
||||
; {:append :read :last-seq :truncate-through :streams
|
||||
; :kv-get :kv-put :kv-delete :kv-has? :kv-keys}
|
||||
; The in-memory backend is the test default. State is three dicts held in a
|
||||
; closure and mutated with set!: logs (stream -> event list), seqs (stream ->
|
||||
; last assigned seq — a monotonic high-water mark that survives compaction so
|
||||
; truncating the log prefix never lets a future append reuse a seq), kv.
|
||||
; truncating the log prefix never lets a future append reuse a seq), kv. The
|
||||
; stream catalog comes from seqs, so a fully-compacted stream still lists.
|
||||
|
||||
(define
|
||||
persist/mem-backend
|
||||
(fn
|
||||
()
|
||||
(let ((logs {}) (seqs {}) (kv {})) {:truncate-through (fn (stream n) (let ((cur (get logs stream))) (set! logs (assoc logs stream (filter (fn (e) (> (persist/event-seq e) n)) (if cur cur (list))))))) :kv-keys (fn () (keys kv)) :read (fn (stream) (let ((cur (get logs stream))) (if cur cur (list)))) :kv-has? (fn (key) (has-key? kv key)) :last-seq (fn (stream) (let ((s (get seqs stream))) (if s s 0))) :append (fn (stream event) (begin (let ((cur (get logs stream))) (set! logs (assoc logs stream (append (if cur cur (list)) event)))) (set! seqs (assoc seqs stream (persist/event-seq event))))) :kv-delete (fn (key) (set! kv (dissoc kv key))) :kv-put (fn (key val) (set! kv (assoc kv key val))) :kv-get (fn (key) (get kv key))})))
|
||||
(let ((logs {}) (seqs {}) (kv {})) {:truncate-through (fn (stream n) (let ((cur (get logs stream))) (set! logs (assoc logs stream (filter (fn (e) (> (persist/event-seq e) n)) (if cur cur (list))))))) :kv-keys (fn () (keys kv)) :read (fn (stream) (let ((cur (get logs stream))) (if cur cur (list)))) :kv-has? (fn (key) (has-key? kv key)) :last-seq (fn (stream) (let ((s (get seqs stream))) (if s s 0))) :streams (fn () (keys seqs)) :append (fn (stream event) (begin (let ((cur (get logs stream))) (set! logs (assoc logs stream (append (if cur cur (list)) event)))) (set! seqs (assoc seqs stream (persist/event-seq event))))) :kv-delete (fn (key) (set! kv (dissoc kv key))) :kv-put (fn (key val) (set! kv (assoc kv key val))) :kv-get (fn (key) (get kv key))})))
|
||||
|
||||
; protocol accessors — call a backend op by keyword
|
||||
(define
|
||||
@@ -22,6 +23,7 @@
|
||||
(define
|
||||
persist/backend-last-seq
|
||||
(fn (b stream) ((get b :last-seq) stream)))
|
||||
(define persist/backend-streams (fn (b) ((get b :streams))))
|
||||
(define
|
||||
persist/backend-truncate
|
||||
(fn (b stream n) ((get b :truncate-through) stream n)))
|
||||
|
||||
Reference in New Issue
Block a user