persist: compaction — drop snapshotted prefix, monotonic seq + 11 tests
Some checks failed
Test, Build, and Deploy / test-build-deploy (push) Failing after 1m0s

Backend now tracks last-seq as a monotonic high-water mark (survives
truncation) and exposes :truncate-through. compaction.sx: persist/compact
checkpoints then drops events with seq <= snapshot seq; should-compact?/
maybe-compact give an explicit every-N policy. Determinism: post-compaction
replay value == uncompacted full replay. Phase 3 complete, 76/76.

Co-Authored-By: Claude Opus 4.8 (1M context) <noreply@anthropic.com>
This commit is contained in:
2026-06-06 18:42:06 +00:00
parent b0874b1282
commit aff7d1e84f
8 changed files with 217 additions and 21 deletions

View File

@@ -1,19 +1,30 @@
; persist/backend — the injected storage protocol. Every facet (log, kv,
; snapshot) goes through a backend dict, never touching storage directly, so
; file/pg/ipfs-ref backends swap in unchanged. A backend is a dict of fns:
; {:append :read :kv-get :kv-put :kv-delete :kv-has? :kv-keys}
; The in-memory backend is the test default. State is two dicts held in a
; closure and mutated with set!: logs (stream -> event list) and kv.
; {:append :read :last-seq :truncate-through
; :kv-get :kv-put :kv-delete :kv-has? :kv-keys}
; The in-memory backend is the test default. State is three dicts held in a
; closure and mutated with set!: logs (stream -> event list), seqs (stream ->
; last assigned seq — a monotonic high-water mark that survives compaction so
; truncating the log prefix never lets a future append reuse a seq), kv.
(define
persist/mem-backend
(fn () (let ((logs {}) (kv {})) {:kv-keys (fn () (keys kv)) :read (fn (stream) (let ((cur (get logs stream))) (if cur cur (list)))) :kv-has? (fn (key) (has-key? kv key)) :append (fn (stream event) (let ((cur (get logs stream))) (set! logs (assoc logs stream (append (if cur cur (list)) event))))) :kv-delete (fn (key) (set! kv (dissoc kv key))) :kv-put (fn (key val) (set! kv (assoc kv key val))) :kv-get (fn (key) (get kv key))})))
(fn
()
(let ((logs {}) (seqs {}) (kv {})) {:truncate-through (fn (stream n) (let ((cur (get logs stream))) (set! logs (assoc logs stream (filter (fn (e) (> (persist/event-seq e) n)) (if cur cur (list))))))) :kv-keys (fn () (keys kv)) :read (fn (stream) (let ((cur (get logs stream))) (if cur cur (list)))) :kv-has? (fn (key) (has-key? kv key)) :last-seq (fn (stream) (let ((s (get seqs stream))) (if s s 0))) :append (fn (stream event) (begin (let ((cur (get logs stream))) (set! logs (assoc logs stream (append (if cur cur (list)) event)))) (set! seqs (assoc seqs stream (persist/event-seq event))))) :kv-delete (fn (key) (set! kv (dissoc kv key))) :kv-put (fn (key val) (set! kv (assoc kv key val))) :kv-get (fn (key) (get kv key))})))
; protocol accessors — call a backend op by keyword
(define
persist/backend-append
(fn (b stream event) ((get b :append) stream event)))
(define persist/backend-read (fn (b stream) ((get b :read) stream)))
(define
persist/backend-last-seq
(fn (b stream) ((get b :last-seq) stream)))
(define
persist/backend-truncate
(fn (b stream n) ((get b :truncate-through) stream n)))
(define persist/backend-kv-get (fn (b key) ((get b :kv-get) key)))
(define persist/backend-kv-put (fn (b key val) ((get b :kv-put) key val)))
(define persist/backend-kv-delete (fn (b key) ((get b :kv-delete) key)))

43
lib/persist/compaction.sx Normal file
View File

@@ -0,0 +1,43 @@
; persist/compaction — once a snapshot subsumes a log prefix, those events are
; dead weight: replay starts from the snapshot, so events with seq <= the
; snapshot's seq are never folded again. Compaction checkpoints then truncates
; that prefix. The seq counter is monotonic (backend high-water mark) so future
; appends keep climbing — the surviving tail keeps its original seqs and replay
; from the snapshot still equals a full replay of the pre-compaction log.
; Policy is explicit: compact when the uncompacted tail reaches `every` events.
; Requires: lib/persist/snapshot.sx, lib/persist/log.sx.
; events accumulated since the last snapshot for name
(define
persist/uncompacted
(fn
(b stream name seed)
(-
(persist/last-seq b stream)
(persist/project-seq (persist/snapshot-load b name seed)))))
; policy: should we compact yet? tail since snapshot >= every
(define
persist/should-compact?
(fn
(b stream name every seed)
(>= (persist/uncompacted b stream name seed) every)))
; checkpoint then drop the snapshotted prefix; returns the new snapshot state
(define
persist/compact
(fn
(b stream name step seed)
(let
((state (persist/checkpoint b stream name step seed)))
(begin (persist/truncate b stream (persist/project-seq state)) state))))
; compact only if the policy fires; always returns the current snapshot state
(define
persist/maybe-compact
(fn
(b stream name step seed every)
(if
(persist/should-compact? b stream name every seed)
(persist/compact b stream name step seed)
(persist/snapshot-load b name seed))))

View File

@@ -13,7 +13,7 @@ if [ ! -x "$SX_SERVER" ]; then
exit 1
fi
SUITES=(event log kv project subscribe concurrency snapshot)
SUITES=(event log kv project subscribe concurrency snapshot compaction)
OUT_JSON="lib/persist/scoreboard.json"
OUT_MD="lib/persist/scoreboard.md"
@@ -34,6 +34,7 @@ run_suite() {
(load "lib/persist/project.sx")
(load "lib/persist/concurrency.sx")
(load "lib/persist/snapshot.sx")
(load "lib/persist/compaction.sx")
(load "lib/persist/subscribe.sx")
(load "lib/persist/api.sx")
(epoch 2)

View File

@@ -1,14 +1,18 @@
; persist/log — the log facet: append-only event streams. seq is assigned
; sequentially per stream (1-based). Reads return events oldest-first.
; persist/log — the log facet: append-only event streams. seq is assigned from
; a monotonic per-stream high-water mark (1-based) held by the backend, so it
; keeps climbing even after the log prefix is compacted away. Reads return the
; events currently stored, oldest-first.
; Requires: lib/persist/event.sx, lib/persist/backend.sx.
; current length of a stream
; logical last seq assigned in a stream (0 if none) — survives compaction
(define
persist/stream-len
(fn (b stream) (len (persist/backend-read b stream))))
persist/last-seq
(fn (b stream) (persist/backend-last-seq b stream)))
; last seq in a stream (0 if empty)
(define persist/last-seq (fn (b stream) (persist/stream-len b stream)))
; number of events physically stored in a stream (shrinks on compaction)
(define
persist/count
(fn (b stream) (len (persist/backend-read b stream))))
; append an event, auto-assigning the next seq. Returns the stored event.
(define
@@ -21,7 +25,7 @@
((ev (persist/event stream seq type at data)))
(begin (persist/backend-append b stream ev) ev)))))
; read all events in a stream, oldest-first
; read all events currently stored in a stream, oldest-first
(define persist/read (fn (b stream) (persist/backend-read b stream)))
; read events with seq >= from
@@ -33,5 +37,7 @@
(fn (e) (>= (persist/event-seq e) from))
(persist/read b stream))))
; number of events in a stream
(define persist/count (fn (b stream) (persist/stream-len b stream)))
; drop events with seq <= n (compaction); the seq counter is untouched
(define
persist/truncate
(fn (b stream n) (persist/backend-truncate b stream n)))

View File

@@ -6,9 +6,10 @@
"project": {"pass": 9, "fail": 0},
"subscribe": {"pass": 9, "fail": 0},
"concurrency": {"pass": 8, "fail": 0},
"snapshot": {"pass": 11, "fail": 0}
"snapshot": {"pass": 11, "fail": 0},
"compaction": {"pass": 11, "fail": 0}
},
"total_pass": 65,
"total_pass": 76,
"total_fail": 0,
"total": 65
"total": 76
}

View File

@@ -11,4 +11,5 @@ _Generated by `lib/persist/conformance.sh`_
| subscribe | 9 | 0 | 9 |
| concurrency | 8 | 0 | 8 |
| snapshot | 11 | 0 | 11 |
| **Total** | **65** | **0** | **65** |
| compaction | 11 | 0 | 11 |
| **Total** | **76** | **0** | **76** |

View File

@@ -0,0 +1,124 @@
; Phase 3 — compaction: drop the snapshotted prefix; replay determinism holds.
(define comp-count (fn (acc e) (+ acc 1)))
(persist-test
"uncompacted counts events since snapshot"
(let
((b (persist/open)))
(begin
(persist/append b "s" "x" 0 {})
(persist/append b "s" "x" 0 {})
(persist/uncompacted b "s" "snap" 0)))
2)
(persist-test
"should-compact? false below threshold"
(let
((b (persist/open)))
(begin
(persist/append b "s" "x" 0 {})
(persist/should-compact? b "s" "snap" 3 0)))
false)
(persist-test
"should-compact? true at threshold"
(let
((b (persist/open)))
(begin
(persist/append b "s" "x" 0 {})
(persist/append b "s" "x" 0 {})
(persist/append b "s" "x" 0 {})
(persist/should-compact? b "s" "snap" 3 0)))
true)
(persist-test
"compact truncates the snapshotted prefix"
(let
((b (persist/open)))
(begin
(persist/append b "s" "x" 0 {})
(persist/append b "s" "x" 0 {})
(persist/append b "s" "x" 0 {})
(persist/compact b "s" "snap" comp-count 0)
(persist/count b "s")))
0)
(persist-test
"compact preserves logical last-seq"
(let
((b (persist/open)))
(begin
(persist/append b "s" "x" 0 {})
(persist/append b "s" "x" 0 {})
(persist/compact b "s" "snap" comp-count 0)
(persist/last-seq b "s")))
2)
(persist-test
"append after compaction continues the seq"
(let
((b (persist/open)))
(begin
(persist/append b "s" "x" 0 {})
(persist/append b "s" "x" 0 {})
(persist/compact b "s" "snap" comp-count 0)
(persist/event-seq (persist/append b "s" "x" 0 {}))))
3)
(persist-test
"replay after compaction == full count before compaction"
(let
((b (persist/open)))
(begin
(persist/append b "s" "x" 0 {})
(persist/append b "s" "x" 0 {})
(persist/append b "s" "x" 0 {})
(persist/compact b "s" "snap" comp-count 0)
(persist/append b "s" "x" 0 {})
(persist/append b "s" "x" 0 {})
(persist/project-value
(persist/replay b "s" "snap" comp-count 0))))
5)
(persist-test
"determinism: post-compaction replay value equals uncompacted full replay"
(let
((b (persist/open)) (c (persist/open)))
(begin
(persist/append b "s" "x" 0 {})
(persist/append b "s" "x" 0 {})
(persist/append b "s" "x" 0 {})
(persist/append c "s" "x" 0 {})
(persist/append c "s" "x" 0 {})
(persist/append c "s" "x" 0 {})
(persist/compact b "s" "snap" comp-count 0)
(persist/append b "s" "x" 0 {})
(persist/append c "s" "x" 0 {})
(equal?
(persist/project-value
(persist/replay b "s" "snap" comp-count 0))
(persist/project-fold c "s" comp-count 0))))
true)
(persist-test
"maybe-compact below threshold does not truncate"
(let
((b (persist/open)))
(begin
(persist/append b "s" "x" 0 {})
(persist/maybe-compact b "s" "snap" comp-count 0 5)
(persist/count b "s")))
1)
(persist-test
"maybe-compact at threshold truncates"
(let
((b (persist/open)))
(begin
(persist/append b "s" "x" 0 {})
(persist/append b "s" "x" 0 {})
(persist/maybe-compact b "s" "snap" comp-count 0 2)
(persist/count b "s")))
0)
(persist-test
"compact is idempotent on an empty tail"
(let
((b (persist/open)))
(begin
(persist/append b "s" "x" 0 {})
(persist/compact b "s" "snap" comp-count 0)
(persist/project-value
(persist/compact b "s" "snap" comp-count 0))))
1)