From a00439da6eec617f77a14126fcae244b712769e4 Mon Sep 17 00:00:00 2001 From: giles Date: Sat, 6 Jun 2026 19:20:22 +0000 Subject: [PATCH] =?UTF-8?q?persist:=20stream=20catalog=20=E2=80=94=20enume?= =?UTF-8?q?rate=20streams=20+=2010=20tests?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit New backend op :streams (from seq high-water marks, so compacted streams still list), threaded through mem-backend + durable serve/io-backend. catalog.sx: persist/streams, stream-count, stream-exists?, total-events. 143/143. Co-Authored-By: Claude Opus 4.8 (1M context) --- lib/persist/backend.sx | 8 ++-- lib/persist/catalog.sx | 20 +++++++++ lib/persist/conformance.sh | 3 +- lib/persist/durable.sx | 4 +- lib/persist/scoreboard.json | 5 ++- lib/persist/scoreboard.md | 3 +- lib/persist/tests/catalog.sx | 86 ++++++++++++++++++++++++++++++++++++ plans/persist-on-sx.md | 11 ++++- 8 files changed, 131 insertions(+), 9 deletions(-) create mode 100644 lib/persist/catalog.sx create mode 100644 lib/persist/tests/catalog.sx diff --git a/lib/persist/backend.sx b/lib/persist/backend.sx index ff16cfd9..133f2280 100644 --- a/lib/persist/backend.sx +++ b/lib/persist/backend.sx @@ -1,18 +1,19 @@ ; persist/backend — the injected storage protocol. Every facet (log, kv, ; snapshot) goes through a backend dict, never touching storage directly, so ; file/pg/ipfs-ref backends swap in unchanged. A backend is a dict of fns: -; {:append :read :last-seq :truncate-through +; {:append :read :last-seq :truncate-through :streams ; :kv-get :kv-put :kv-delete :kv-has? :kv-keys} ; The in-memory backend is the test default. State is three dicts held in a ; closure and mutated with set!: logs (stream -> event list), seqs (stream -> ; last assigned seq — a monotonic high-water mark that survives compaction so -; truncating the log prefix never lets a future append reuse a seq), kv. +; truncating the log prefix never lets a future append reuse a seq), kv. The +; stream catalog comes from seqs, so a fully-compacted stream still lists. (define persist/mem-backend (fn () - (let ((logs {}) (seqs {}) (kv {})) {:truncate-through (fn (stream n) (let ((cur (get logs stream))) (set! logs (assoc logs stream (filter (fn (e) (> (persist/event-seq e) n)) (if cur cur (list))))))) :kv-keys (fn () (keys kv)) :read (fn (stream) (let ((cur (get logs stream))) (if cur cur (list)))) :kv-has? (fn (key) (has-key? kv key)) :last-seq (fn (stream) (let ((s (get seqs stream))) (if s s 0))) :append (fn (stream event) (begin (let ((cur (get logs stream))) (set! logs (assoc logs stream (append (if cur cur (list)) event)))) (set! seqs (assoc seqs stream (persist/event-seq event))))) :kv-delete (fn (key) (set! kv (dissoc kv key))) :kv-put (fn (key val) (set! kv (assoc kv key val))) :kv-get (fn (key) (get kv key))}))) + (let ((logs {}) (seqs {}) (kv {})) {:truncate-through (fn (stream n) (let ((cur (get logs stream))) (set! logs (assoc logs stream (filter (fn (e) (> (persist/event-seq e) n)) (if cur cur (list))))))) :kv-keys (fn () (keys kv)) :read (fn (stream) (let ((cur (get logs stream))) (if cur cur (list)))) :kv-has? (fn (key) (has-key? kv key)) :last-seq (fn (stream) (let ((s (get seqs stream))) (if s s 0))) :streams (fn () (keys seqs)) :append (fn (stream event) (begin (let ((cur (get logs stream))) (set! logs (assoc logs stream (append (if cur cur (list)) event)))) (set! seqs (assoc seqs stream (persist/event-seq event))))) :kv-delete (fn (key) (set! kv (dissoc kv key))) :kv-put (fn (key val) (set! kv (assoc kv key val))) :kv-get (fn (key) (get kv key))}))) ; protocol accessors — call a backend op by keyword (define @@ -22,6 +23,7 @@ (define persist/backend-last-seq (fn (b stream) ((get b :last-seq) stream))) +(define persist/backend-streams (fn (b) ((get b :streams)))) (define persist/backend-truncate (fn (b stream n) ((get b :truncate-through) stream n))) diff --git a/lib/persist/catalog.sx b/lib/persist/catalog.sx new file mode 100644 index 00000000..fb0015ec --- /dev/null +++ b/lib/persist/catalog.sx @@ -0,0 +1,20 @@ +; persist/catalog — enumerate the streams a backend holds. The catalog is the +; set of streams ever appended to (from the seq high-water marks), so a stream +; whose log has been fully compacted still appears. For admin, global ops, and +; cross-stream tooling. Requires: lib/persist/backend.sx, lib/persist/log.sx. + +(define persist/streams (fn (b) (persist/backend-streams b))) +(define persist/stream-count (fn (b) (len (persist/streams b)))) +(define + persist/stream-exists? + (fn (b stream) (contains? (persist/streams b) stream))) + +; total logical events across all streams (sum of high-water marks) +(define + persist/total-events + (fn + (b) + (reduce + (fn (acc s) (+ acc (persist/last-seq b s))) + 0 + (persist/streams b)))) diff --git a/lib/persist/conformance.sh b/lib/persist/conformance.sh index 50c502c8..9e55295c 100755 --- a/lib/persist/conformance.sh +++ b/lib/persist/conformance.sh @@ -13,7 +13,7 @@ if [ ! -x "$SX_SERVER" ]; then exit 1 fi -SUITES=(event log kv project subscribe concurrency snapshot compaction durable blob view cas recovery) +SUITES=(event log kv project subscribe concurrency snapshot compaction durable blob view cas catalog recovery) OUT_JSON="lib/persist/scoreboard.json" OUT_MD="lib/persist/scoreboard.md" @@ -38,6 +38,7 @@ run_suite() { (load "lib/persist/durable.sx") (load "lib/persist/blob.sx") (load "lib/persist/view.sx") +(load "lib/persist/catalog.sx") (load "lib/persist/subscribe.sx") (load "lib/persist/api.sx") (epoch 2) diff --git a/lib/persist/durable.sx b/lib/persist/durable.sx index af1180fe..b7e50ec9 100644 --- a/lib/persist/durable.sx +++ b/lib/persist/durable.sx @@ -12,6 +12,7 @@ (define persist/req-append (fn (stream event) {:op "persist/append" :args (list stream event)})) (define persist/req-read (fn (stream) {:op "persist/read" :args (list stream)})) (define persist/req-last-seq (fn (stream) {:op "persist/last-seq" :args (list stream)})) +(define persist/req-streams (fn () {:op "persist/streams" :args (list)})) (define persist/req-truncate (fn (stream n) {:op "persist/truncate" :args (list stream n)})) (define persist/req-kv-get (fn (key) {:op "persist/kv-get" :args (list key)})) (define persist/req-kv-put (fn (key val) {:op "persist/kv-put" :args (list key val)})) @@ -20,7 +21,7 @@ (define persist/req-kv-keys (fn () {:op "persist/kv-keys" :args (list)})) ; a backend parameterized over a transport (req -> response) -(define persist/io-backend (fn (transport) {:truncate-through (fn (stream n) (transport (persist/req-truncate stream n))) :kv-keys (fn () (transport (persist/req-kv-keys))) :read (fn (stream) (transport (persist/req-read stream))) :kv-has? (fn (key) (transport (persist/req-kv-has? key))) :last-seq (fn (stream) (transport (persist/req-last-seq stream))) :append (fn (stream event) (transport (persist/req-append stream event))) :kv-delete (fn (key) (transport (persist/req-kv-delete key))) :kv-put (fn (key val) (transport (persist/req-kv-put key val))) :kv-get (fn (key) (transport (persist/req-kv-get key)))})) +(define persist/io-backend (fn (transport) {:truncate-through (fn (stream n) (transport (persist/req-truncate stream n))) :kv-keys (fn () (transport (persist/req-kv-keys))) :read (fn (stream) (transport (persist/req-read stream))) :kv-has? (fn (key) (transport (persist/req-kv-has? key))) :last-seq (fn (stream) (transport (persist/req-last-seq stream))) :streams (fn () (transport (persist/req-streams))) :append (fn (stream event) (transport (persist/req-append stream event))) :kv-delete (fn (key) (transport (persist/req-kv-delete key))) :kv-put (fn (key val) (transport (persist/req-kv-put key val))) :kv-get (fn (key) (transport (persist/req-kv-get key)))})) ; production backend — transport is the kernel's perform (suspends; host resumes) (define @@ -43,6 +44,7 @@ (persist/backend-read disk (first args))) ((equal? op "persist/last-seq") (persist/backend-last-seq disk (first args))) + ((equal? op "persist/streams") (persist/backend-streams disk)) ((equal? op "persist/truncate") (persist/backend-truncate disk (first args) (nth args 1))) ((equal? op "persist/kv-get") diff --git a/lib/persist/scoreboard.json b/lib/persist/scoreboard.json index 25e612b6..ae152373 100644 --- a/lib/persist/scoreboard.json +++ b/lib/persist/scoreboard.json @@ -12,9 +12,10 @@ "blob": {"pass": 14, "fail": 0}, "view": {"pass": 11, "fail": 0}, "cas": {"pass": 11, "fail": 0}, + "catalog": {"pass": 10, "fail": 0}, "recovery": {"pass": 6, "fail": 0} }, - "total_pass": 133, + "total_pass": 143, "total_fail": 0, - "total": 133 + "total": 143 } diff --git a/lib/persist/scoreboard.md b/lib/persist/scoreboard.md index b4bfb284..179e745d 100644 --- a/lib/persist/scoreboard.md +++ b/lib/persist/scoreboard.md @@ -16,5 +16,6 @@ _Generated by `lib/persist/conformance.sh`_ | blob | 14 | 0 | 14 | | view | 11 | 0 | 11 | | cas | 11 | 0 | 11 | +| catalog | 10 | 0 | 10 | | recovery | 6 | 0 | 6 | -| **Total** | **133** | **0** | **133** | +| **Total** | **143** | **0** | **143** | diff --git a/lib/persist/tests/catalog.sx b/lib/persist/tests/catalog.sx new file mode 100644 index 00000000..9c30a739 --- /dev/null +++ b/lib/persist/tests/catalog.sx @@ -0,0 +1,86 @@ +; Extension — stream catalog: enumerate streams, count, existence, totals. + +(persist-test + "empty backend has no streams" + (persist/stream-count (persist/open)) + 0) +(persist-test + "stream-exists? false when absent" + (persist/stream-exists? (persist/open) "orders") + false) +(persist-test + "append registers a stream" + (let + ((b (persist/open))) + (begin + (persist/append b "orders" "x" 0 {}) + (persist/stream-exists? b "orders"))) + true) +(persist-test + "stream-count counts distinct streams" + (let + ((b (persist/open))) + (begin + (persist/append b "a" "x" 0 {}) + (persist/append b "b" "x" 0 {}) + (persist/append b "a" "x" 0 {}) + (persist/stream-count b))) + 2) +(persist-test + "compacted-away stream still lists" + (let + ((b (persist/open))) + (begin + (persist/append b "a" "x" 0 {}) + (persist/checkpoint b "a" "snap" (fn (acc e) acc) 0) + (persist/truncate b "a" 1) + (list (persist/count b "a") (persist/stream-exists? b "a")))) + (list 0 true)) +(persist-test + "kv-only backend lists no streams" + (let + ((b (persist/open))) + (begin (persist/kv-put b "k" 1) (persist/stream-count b))) + 0) +(persist-test + "total-events sums high-water marks" + (let + ((b (persist/open))) + (begin + (persist/append b "a" "x" 0 {}) + (persist/append b "a" "x" 0 {}) + (persist/append b "b" "x" 0 {}) + (persist/total-events b))) + 3) +(persist-test + "total-events counts compacted events too" + (let + ((b (persist/open))) + (begin + (persist/append b "a" "x" 0 {}) + (persist/append b "a" "x" 0 {}) + (persist/checkpoint b "a" "snap" (fn (acc e) acc) 0) + (persist/truncate b "a" 2) + (persist/total-events b))) + 2) +(persist-test + "catalog works on the durable backend" + (let + ((db (persist/mock-durable (persist/mem-backend)))) + (begin + (persist/append db "a" "x" 0 {}) + (persist/append db "b" "x" 0 {}) + (persist/stream-count db))) + 2) +(persist-test + "catalog survives restart" + (let + ((disk (persist/mem-backend))) + (begin + (let + ((db (persist/mock-durable disk))) + (begin + (persist/append db "a" "x" 0 {}) + (persist/append db "b" "x" 0 {}))) + (persist/stream-count (persist/mock-durable disk)))) + 2) diff --git a/plans/persist-on-sx.md b/plans/persist-on-sx.md index 13a288a8..24ef8909 100644 --- a/plans/persist-on-sx.md +++ b/plans/persist-on-sx.md @@ -42,7 +42,7 @@ read models (feeds, indices, audit logs) update incrementally. ## Status (rolling) -`bash lib/persist/conformance.sh` → **133/133** (Phases 1–4 complete + extensions) +`bash lib/persist/conformance.sh` → **143/143** (Phases 1–4 complete + extensions) ## Ground rules @@ -154,11 +154,20 @@ over an in-process disk (the mock-IO harness). (create-only): atomic current-state updates, conflict as a real value (kv analogue of log `append-expect`). For sessions, acl grants, stock counts. +- [x] `catalog.sx` — stream catalog: `persist/streams`/`stream-count`/ + `stream-exists?`/`total-events`. Backend `:streams` op (from seq high-water + marks, so compacted streams still list), threaded through mem + durable. + ## Consumers (post-foundation, not in scope here) feed/-log, flow store, mod/audit, search index, acl grants, identity sessions all become `persist` log or kv. Track each migration in that subsystem's plan. ## Progress log +- **Ext: stream catalog (143/143).** New backend op `:streams` (keys of the seq + high-water-mark dict, threaded through mem-backend + durable serve/io-backend) + so fully-compacted streams still enumerate. `catalog.sx`: + `persist/streams`/`stream-count`/`stream-exists?`/`total-events`. 10 tests + incl. durable + restart. - **Ext: kv compare-and-swap (133/133).** `persist/kv-cas` sets a key only if its current value equals expected, else returns `{:conflict :expected :actual}`; `persist/kv-put-new` is create-only. The kv analogue of log