From aff7d1e84ff0ed24aa72269b22b6ff771f3849d2 Mon Sep 17 00:00:00 2001 From: giles Date: Sat, 6 Jun 2026 18:42:06 +0000 Subject: [PATCH] =?UTF-8?q?persist:=20compaction=20=E2=80=94=20drop=20snap?= =?UTF-8?q?shotted=20prefix,=20monotonic=20seq=20+=2011=20tests?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Backend now tracks last-seq as a monotonic high-water mark (survives truncation) and exposes :truncate-through. compaction.sx: persist/compact checkpoints then drops events with seq <= snapshot seq; should-compact?/ maybe-compact give an explicit every-N policy. Determinism: post-compaction replay value == uncompacted full replay. Phase 3 complete, 76/76. Co-Authored-By: Claude Opus 4.8 (1M context) --- lib/persist/backend.sx | 19 +++-- lib/persist/compaction.sx | 43 +++++++++++ lib/persist/conformance.sh | 3 +- lib/persist/log.sx | 26 ++++--- lib/persist/scoreboard.json | 7 +- lib/persist/scoreboard.md | 3 +- lib/persist/tests/compaction.sx | 124 ++++++++++++++++++++++++++++++++ plans/persist-on-sx.md | 13 +++- 8 files changed, 217 insertions(+), 21 deletions(-) create mode 100644 lib/persist/compaction.sx create mode 100644 lib/persist/tests/compaction.sx diff --git a/lib/persist/backend.sx b/lib/persist/backend.sx index 150f9911..ff16cfd9 100644 --- a/lib/persist/backend.sx +++ b/lib/persist/backend.sx @@ -1,19 +1,30 @@ ; persist/backend — the injected storage protocol. Every facet (log, kv, ; snapshot) goes through a backend dict, never touching storage directly, so ; file/pg/ipfs-ref backends swap in unchanged. A backend is a dict of fns: -; {:append :read :kv-get :kv-put :kv-delete :kv-has? :kv-keys} -; The in-memory backend is the test default. State is two dicts held in a -; closure and mutated with set!: logs (stream -> event list) and kv. +; {:append :read :last-seq :truncate-through +; :kv-get :kv-put :kv-delete :kv-has? :kv-keys} +; The in-memory backend is the test default. State is three dicts held in a +; closure and mutated with set!: logs (stream -> event list), seqs (stream -> +; last assigned seq — a monotonic high-water mark that survives compaction so +; truncating the log prefix never lets a future append reuse a seq), kv. (define persist/mem-backend - (fn () (let ((logs {}) (kv {})) {:kv-keys (fn () (keys kv)) :read (fn (stream) (let ((cur (get logs stream))) (if cur cur (list)))) :kv-has? (fn (key) (has-key? kv key)) :append (fn (stream event) (let ((cur (get logs stream))) (set! logs (assoc logs stream (append (if cur cur (list)) event))))) :kv-delete (fn (key) (set! kv (dissoc kv key))) :kv-put (fn (key val) (set! kv (assoc kv key val))) :kv-get (fn (key) (get kv key))}))) + (fn + () + (let ((logs {}) (seqs {}) (kv {})) {:truncate-through (fn (stream n) (let ((cur (get logs stream))) (set! logs (assoc logs stream (filter (fn (e) (> (persist/event-seq e) n)) (if cur cur (list))))))) :kv-keys (fn () (keys kv)) :read (fn (stream) (let ((cur (get logs stream))) (if cur cur (list)))) :kv-has? (fn (key) (has-key? kv key)) :last-seq (fn (stream) (let ((s (get seqs stream))) (if s s 0))) :append (fn (stream event) (begin (let ((cur (get logs stream))) (set! logs (assoc logs stream (append (if cur cur (list)) event)))) (set! seqs (assoc seqs stream (persist/event-seq event))))) :kv-delete (fn (key) (set! kv (dissoc kv key))) :kv-put (fn (key val) (set! kv (assoc kv key val))) :kv-get (fn (key) (get kv key))}))) ; protocol accessors — call a backend op by keyword (define persist/backend-append (fn (b stream event) ((get b :append) stream event))) (define persist/backend-read (fn (b stream) ((get b :read) stream))) +(define + persist/backend-last-seq + (fn (b stream) ((get b :last-seq) stream))) +(define + persist/backend-truncate + (fn (b stream n) ((get b :truncate-through) stream n))) (define persist/backend-kv-get (fn (b key) ((get b :kv-get) key))) (define persist/backend-kv-put (fn (b key val) ((get b :kv-put) key val))) (define persist/backend-kv-delete (fn (b key) ((get b :kv-delete) key))) diff --git a/lib/persist/compaction.sx b/lib/persist/compaction.sx new file mode 100644 index 00000000..e8b53de6 --- /dev/null +++ b/lib/persist/compaction.sx @@ -0,0 +1,43 @@ +; persist/compaction — once a snapshot subsumes a log prefix, those events are +; dead weight: replay starts from the snapshot, so events with seq <= the +; snapshot's seq are never folded again. Compaction checkpoints then truncates +; that prefix. The seq counter is monotonic (backend high-water mark) so future +; appends keep climbing — the surviving tail keeps its original seqs and replay +; from the snapshot still equals a full replay of the pre-compaction log. +; Policy is explicit: compact when the uncompacted tail reaches `every` events. +; Requires: lib/persist/snapshot.sx, lib/persist/log.sx. + +; events accumulated since the last snapshot for name +(define + persist/uncompacted + (fn + (b stream name seed) + (- + (persist/last-seq b stream) + (persist/project-seq (persist/snapshot-load b name seed))))) + +; policy: should we compact yet? tail since snapshot >= every +(define + persist/should-compact? + (fn + (b stream name every seed) + (>= (persist/uncompacted b stream name seed) every))) + +; checkpoint then drop the snapshotted prefix; returns the new snapshot state +(define + persist/compact + (fn + (b stream name step seed) + (let + ((state (persist/checkpoint b stream name step seed))) + (begin (persist/truncate b stream (persist/project-seq state)) state)))) + +; compact only if the policy fires; always returns the current snapshot state +(define + persist/maybe-compact + (fn + (b stream name step seed every) + (if + (persist/should-compact? b stream name every seed) + (persist/compact b stream name step seed) + (persist/snapshot-load b name seed)))) diff --git a/lib/persist/conformance.sh b/lib/persist/conformance.sh index b03e1b02..f20bc699 100755 --- a/lib/persist/conformance.sh +++ b/lib/persist/conformance.sh @@ -13,7 +13,7 @@ if [ ! -x "$SX_SERVER" ]; then exit 1 fi -SUITES=(event log kv project subscribe concurrency snapshot) +SUITES=(event log kv project subscribe concurrency snapshot compaction) OUT_JSON="lib/persist/scoreboard.json" OUT_MD="lib/persist/scoreboard.md" @@ -34,6 +34,7 @@ run_suite() { (load "lib/persist/project.sx") (load "lib/persist/concurrency.sx") (load "lib/persist/snapshot.sx") +(load "lib/persist/compaction.sx") (load "lib/persist/subscribe.sx") (load "lib/persist/api.sx") (epoch 2) diff --git a/lib/persist/log.sx b/lib/persist/log.sx index 46235f4c..56fb2ebe 100644 --- a/lib/persist/log.sx +++ b/lib/persist/log.sx @@ -1,14 +1,18 @@ -; persist/log — the log facet: append-only event streams. seq is assigned -; sequentially per stream (1-based). Reads return events oldest-first. +; persist/log — the log facet: append-only event streams. seq is assigned from +; a monotonic per-stream high-water mark (1-based) held by the backend, so it +; keeps climbing even after the log prefix is compacted away. Reads return the +; events currently stored, oldest-first. ; Requires: lib/persist/event.sx, lib/persist/backend.sx. -; current length of a stream +; logical last seq assigned in a stream (0 if none) — survives compaction (define - persist/stream-len - (fn (b stream) (len (persist/backend-read b stream)))) + persist/last-seq + (fn (b stream) (persist/backend-last-seq b stream))) -; last seq in a stream (0 if empty) -(define persist/last-seq (fn (b stream) (persist/stream-len b stream))) +; number of events physically stored in a stream (shrinks on compaction) +(define + persist/count + (fn (b stream) (len (persist/backend-read b stream)))) ; append an event, auto-assigning the next seq. Returns the stored event. (define @@ -21,7 +25,7 @@ ((ev (persist/event stream seq type at data))) (begin (persist/backend-append b stream ev) ev))))) -; read all events in a stream, oldest-first +; read all events currently stored in a stream, oldest-first (define persist/read (fn (b stream) (persist/backend-read b stream))) ; read events with seq >= from @@ -33,5 +37,7 @@ (fn (e) (>= (persist/event-seq e) from)) (persist/read b stream)))) -; number of events in a stream -(define persist/count (fn (b stream) (persist/stream-len b stream))) +; drop events with seq <= n (compaction); the seq counter is untouched +(define + persist/truncate + (fn (b stream n) (persist/backend-truncate b stream n))) diff --git a/lib/persist/scoreboard.json b/lib/persist/scoreboard.json index 2befd965..1aa5de14 100644 --- a/lib/persist/scoreboard.json +++ b/lib/persist/scoreboard.json @@ -6,9 +6,10 @@ "project": {"pass": 9, "fail": 0}, "subscribe": {"pass": 9, "fail": 0}, "concurrency": {"pass": 8, "fail": 0}, - "snapshot": {"pass": 11, "fail": 0} + "snapshot": {"pass": 11, "fail": 0}, + "compaction": {"pass": 11, "fail": 0} }, - "total_pass": 65, + "total_pass": 76, "total_fail": 0, - "total": 65 + "total": 76 } diff --git a/lib/persist/scoreboard.md b/lib/persist/scoreboard.md index 4bb832d0..a1593ad6 100644 --- a/lib/persist/scoreboard.md +++ b/lib/persist/scoreboard.md @@ -11,4 +11,5 @@ _Generated by `lib/persist/conformance.sh`_ | subscribe | 9 | 0 | 9 | | concurrency | 8 | 0 | 8 | | snapshot | 11 | 0 | 11 | -| **Total** | **65** | **0** | **65** | +| compaction | 11 | 0 | 11 | +| **Total** | **76** | **0** | **76** | diff --git a/lib/persist/tests/compaction.sx b/lib/persist/tests/compaction.sx new file mode 100644 index 00000000..a21531cb --- /dev/null +++ b/lib/persist/tests/compaction.sx @@ -0,0 +1,124 @@ +; Phase 3 — compaction: drop the snapshotted prefix; replay determinism holds. + +(define comp-count (fn (acc e) (+ acc 1))) + +(persist-test + "uncompacted counts events since snapshot" + (let + ((b (persist/open))) + (begin + (persist/append b "s" "x" 0 {}) + (persist/append b "s" "x" 0 {}) + (persist/uncompacted b "s" "snap" 0))) + 2) +(persist-test + "should-compact? false below threshold" + (let + ((b (persist/open))) + (begin + (persist/append b "s" "x" 0 {}) + (persist/should-compact? b "s" "snap" 3 0))) + false) +(persist-test + "should-compact? true at threshold" + (let + ((b (persist/open))) + (begin + (persist/append b "s" "x" 0 {}) + (persist/append b "s" "x" 0 {}) + (persist/append b "s" "x" 0 {}) + (persist/should-compact? b "s" "snap" 3 0))) + true) +(persist-test + "compact truncates the snapshotted prefix" + (let + ((b (persist/open))) + (begin + (persist/append b "s" "x" 0 {}) + (persist/append b "s" "x" 0 {}) + (persist/append b "s" "x" 0 {}) + (persist/compact b "s" "snap" comp-count 0) + (persist/count b "s"))) + 0) +(persist-test + "compact preserves logical last-seq" + (let + ((b (persist/open))) + (begin + (persist/append b "s" "x" 0 {}) + (persist/append b "s" "x" 0 {}) + (persist/compact b "s" "snap" comp-count 0) + (persist/last-seq b "s"))) + 2) +(persist-test + "append after compaction continues the seq" + (let + ((b (persist/open))) + (begin + (persist/append b "s" "x" 0 {}) + (persist/append b "s" "x" 0 {}) + (persist/compact b "s" "snap" comp-count 0) + (persist/event-seq (persist/append b "s" "x" 0 {})))) + 3) +(persist-test + "replay after compaction == full count before compaction" + (let + ((b (persist/open))) + (begin + (persist/append b "s" "x" 0 {}) + (persist/append b "s" "x" 0 {}) + (persist/append b "s" "x" 0 {}) + (persist/compact b "s" "snap" comp-count 0) + (persist/append b "s" "x" 0 {}) + (persist/append b "s" "x" 0 {}) + (persist/project-value + (persist/replay b "s" "snap" comp-count 0)))) + 5) +(persist-test + "determinism: post-compaction replay value equals uncompacted full replay" + (let + ((b (persist/open)) (c (persist/open))) + (begin + (persist/append b "s" "x" 0 {}) + (persist/append b "s" "x" 0 {}) + (persist/append b "s" "x" 0 {}) + (persist/append c "s" "x" 0 {}) + (persist/append c "s" "x" 0 {}) + (persist/append c "s" "x" 0 {}) + (persist/compact b "s" "snap" comp-count 0) + (persist/append b "s" "x" 0 {}) + (persist/append c "s" "x" 0 {}) + (equal? + (persist/project-value + (persist/replay b "s" "snap" comp-count 0)) + (persist/project-fold c "s" comp-count 0)))) + true) +(persist-test + "maybe-compact below threshold does not truncate" + (let + ((b (persist/open))) + (begin + (persist/append b "s" "x" 0 {}) + (persist/maybe-compact b "s" "snap" comp-count 0 5) + (persist/count b "s"))) + 1) +(persist-test + "maybe-compact at threshold truncates" + (let + ((b (persist/open))) + (begin + (persist/append b "s" "x" 0 {}) + (persist/append b "s" "x" 0 {}) + (persist/maybe-compact b "s" "snap" comp-count 0 2) + (persist/count b "s"))) + 0) +(persist-test + "compact is idempotent on an empty tail" + (let + ((b (persist/open))) + (begin + (persist/append b "s" "x" 0 {}) + (persist/compact b "s" "snap" comp-count 0) + (persist/project-value + (persist/compact b "s" "snap" comp-count 0)))) + 1) diff --git a/plans/persist-on-sx.md b/plans/persist-on-sx.md index 9917f527..ef572b64 100644 --- a/plans/persist-on-sx.md +++ b/plans/persist-on-sx.md @@ -42,7 +42,7 @@ read models (feeds, indices, audit logs) update incrementally. ## Status (rolling) -`bash lib/persist/conformance.sh` → **54/54** (Phases 1–2 done) +`bash lib/persist/conformance.sh` → **76/76** (Phases 1–3 done) ## Ground rules @@ -100,7 +100,7 @@ lib/persist/backend.sx lib/persist/api.sx ## Phase 3 — Snapshots + replay - [x] `snapshot.sx` — checkpoint a projection; replay = snapshot + tail -- [ ] compaction policy; replay-determinism tests +- [x] compaction policy; replay-determinism tests ## Phase 4 — Durable backends via kernel IO - [ ] file/log backend driven through `perform` (IO-suspension boundary) @@ -113,6 +113,15 @@ feed/-log, flow store, mod/audit, search index, acl grants, identity sessions al become `persist` log or kv. Track each migration in that subsystem's plan. ## Progress log +- **Phase 3b (76/76) — Phase 3 complete.** Backend refactor: `last-seq` is now + a monotonic per-stream high-water mark (backend `seqs` dict), not physical + length, so a compacted log keeps assigning climbing seqs. Added backend + `:truncate-through` + `persist/truncate`. `compaction.sx` — `persist/compact` + checkpoints then drops events with seq <= snapshot seq; `should-compact?`/ + `maybe-compact` give an explicit "compact every N tail events" policy. 11 + tests: post-compaction replay value == uncompacted full replay (determinism), + seq continuity after truncation, idempotence. `persist/count` = physical + stored count (shrinks on compaction) vs `persist/last-seq` = logical. - **Phase 3a (65/65).** `snapshot.sx` — a snapshot is a projection state `{:value :seq}` stored in the kv facet under `snapshot/`. `persist/checkpoint` replays + saves; `persist/replay` = snapshot + tail.