persist: global commit ordering across streams + 11 tests
Some checks failed
Test, Build, and Deploy / test-build-deploy (push) Failing after 46s

global.sx: persist/gappend records a pointer in a reserved $global index whose
seq is the global commit position; read-global/project-global replay every
event in commit order; global-from for incremental consumers. Opt-in (plain
append untouched); $-prefixed streams now reserved + hidden from the public
catalog (streams-all reveals them). Gives feed its unified timeline.
Deterministic across restart. 191/191.

Co-Authored-By: Claude Opus 4.8 (1M context) <noreply@anthropic.com>
This commit is contained in:
2026-06-06 20:41:01 +00:00
parent 3e90c780e9
commit a37a158d01
7 changed files with 219 additions and 9 deletions

View File

@@ -1,15 +1,30 @@
; persist/catalog — enumerate the streams a backend holds. The catalog is the
; set of streams ever appended to (from the seq high-water marks), so a stream
; whose log has been fully compacted still appears. For admin, global ops, and
; cross-stream tooling. Requires: lib/persist/backend.sx, lib/persist/log.sx.
; whose log has been fully compacted still appears. $-prefixed streams are
; reserved for internal indexes (e.g. the $global commit index) and are hidden
; from the public catalog; use streams-all to see them. For admin, global ops,
; and cross-stream tooling. Requires: lib/persist/backend.sx, lib/persist/log.sx.
(define persist/reserved-stream? (fn (s) (starts-with? s "$")))
; every stream including reserved internal indexes
(define persist/streams-all (fn (b) (persist/backend-streams b)))
; public streams (reserved internal indexes hidden)
(define
persist/streams
(fn
(b)
(filter
(fn (s) (not (persist/reserved-stream? s)))
(persist/streams-all b))))
(define persist/streams (fn (b) (persist/backend-streams b)))
(define persist/stream-count (fn (b) (len (persist/streams b))))
(define
persist/stream-exists?
(fn (b stream) (contains? (persist/streams b) stream)))
; total logical events across all streams (sum of high-water marks)
; total logical events across all public streams (sum of high-water marks)
(define
persist/total-events
(fn

View File

@@ -13,7 +13,7 @@ if [ ! -x "$SX_SERVER" ]; then
exit 1
fi
SUITES=(event log kv project subscribe concurrency snapshot compaction durable blob view cas catalog query batch upcast idempotency recovery)
SUITES=(event log kv project subscribe concurrency snapshot compaction durable blob view cas catalog query batch upcast idempotency global recovery)
OUT_JSON="lib/persist/scoreboard.json"
OUT_MD="lib/persist/scoreboard.md"
@@ -43,6 +43,7 @@ run_suite() {
(load "lib/persist/batch.sx")
(load "lib/persist/upcast.sx")
(load "lib/persist/idempotency.sx")
(load "lib/persist/global.sx")
(load "lib/persist/subscribe.sx")
(load "lib/persist/api.sx")
(epoch 2)

55
lib/persist/global.sx Normal file
View File

@@ -0,0 +1,55 @@
; persist/global — a global commit ordering across streams. Per-stream seqs only
; order within a stream; a unified timeline (e.g. feed's home feed, a global
; audit trail) needs a single order across streams. `persist/gappend` appends to
; the target stream and then records a pointer in a reserved $global index whose
; own seq IS the global commit position. Reading the index in order and
; resolving each pointer yields every event in commit order. This is opt-in:
; streams that don't need global ordering use plain persist/append and never
; touch $global. Determinism: the order is the $global append order, replayed
; identically. Requires: lib/persist/log.sx, lib/persist/catalog.sx.
(define persist/global-stream "$global")
; append with a global commit position. Returns the stored stream event; the
; event's global position is the seq of its pointer in $global.
(define
persist/gappend
(fn
(b stream type at data)
(let
((ev (persist/append b stream type at data)))
(begin (persist/append b persist/global-stream "ref" at {:stream stream :seq (persist/event-seq ev)}) ev))))
; the global index: pointer events in commit order (each pointer's seq = gpos)
(define persist/global-log (fn (b) (persist/read b persist/global-stream)))
; the current global commit position (count of globally-ordered appends)
(define
persist/global-pos
(fn (b) (persist/last-seq b persist/global-stream)))
; resolve a pointer event to the actual stream event it references
(define
persist/resolve-ref
(fn
(b ptr)
(let
((d (persist/event-data ptr)))
(first (persist/read-from b (get d :stream) (get d :seq))))))
; every globally-ordered event, in commit order
(define
persist/read-global
(fn
(b)
(map (fn (ptr) (persist/resolve-ref b ptr)) (persist/global-log b))))
; pointer events at or after a global position (incremental global consumers)
(define
persist/global-from
(fn (b gpos) (persist/read-from b persist/global-stream gpos)))
; fold over all events in global commit order
(define
persist/project-global
(fn (b step seed) (reduce step seed (persist/read-global b))))

View File

@@ -17,9 +17,10 @@
"batch": {"pass": 10, "fail": 0},
"upcast": {"pass": 9, "fail": 0},
"idempotency": {"pass": 9, "fail": 0},
"global": {"pass": 11, "fail": 0},
"recovery": {"pass": 6, "fail": 0}
},
"total_pass": 180,
"total_pass": 191,
"total_fail": 0,
"total": 180
"total": 191
}

View File

@@ -21,5 +21,6 @@ _Generated by `lib/persist/conformance.sh`_
| batch | 10 | 0 | 10 |
| upcast | 9 | 0 | 9 |
| idempotency | 9 | 0 | 9 |
| global | 11 | 0 | 11 |
| recovery | 6 | 0 | 6 |
| **Total** | **180** | **0** | **180** |
| **Total** | **191** | **0** | **191** |

123
lib/persist/tests/global.sx Normal file
View File

@@ -0,0 +1,123 @@
; Extension — global commit ordering across streams.
(persist-test
"gappend returns the stream event with its local seq"
(let
((b (persist/open)))
(persist/event-seq
(persist/gappend b "orders" "placed" 0 {})))
1)
(persist-test
"global-pos advances per gappend regardless of stream"
(let
((b (persist/open)))
(begin
(persist/gappend b "orders" "placed" 0 {})
(persist/gappend b "users" "joined" 0 {})
(persist/gappend b "orders" "placed" 0 {})
(persist/global-pos b)))
3)
(persist-test
"read-global returns events in commit order across streams"
(let
((b (persist/open)))
(begin
(persist/gappend b "orders" "placed" 0 {:n 1})
(persist/gappend b "users" "joined" 0 {:n 2})
(persist/gappend b "orders" "placed" 0 {:n 3})
(let
((g (persist/read-global b)))
(list
(get (persist/event-data (nth g 0)) :n)
(get (persist/event-data (nth g 1)) :n)
(get (persist/event-data (nth g 2)) :n)))))
(list 1 2 3))
(persist-test
"read-global resolves to the right streams"
(let
((b (persist/open)))
(begin
(persist/gappend b "orders" "placed" 0 {})
(persist/gappend b "users" "joined" 0 {})
(let
((g (persist/read-global b)))
(list
(persist/event-stream (nth g 0))
(persist/event-stream (nth g 1))))))
(list "orders" "users"))
(persist-test
"project-global folds across all streams in order"
(let
((b (persist/open)))
(begin
(persist/gappend b "a" "x" 0 {:v 10})
(persist/gappend b "b" "x" 0 {:v 20})
(persist/gappend b "a" "x" 0 {:v 30})
(persist/project-global
b
(fn (acc e) (+ acc (get (persist/event-data e) :v)))
0)))
60)
(persist-test
"global index is hidden from the public catalog"
(let
((b (persist/open)))
(begin
(persist/gappend b "orders" "placed" 0 {})
(persist/gappend b "users" "joined" 0 {})
(list (persist/stream-count b) (persist/stream-exists? b "$global"))))
(list 2 false))
(persist-test
"streams-all reveals the reserved index"
(let
((b (persist/open)))
(begin
(persist/gappend b "orders" "placed" 0 {})
(contains? (persist/streams-all b) "$global")))
true)
(persist-test
"global-from gives pointers at or after a position"
(let
((b (persist/open)))
(begin
(persist/gappend b "a" "x" 0 {})
(persist/gappend b "a" "x" 0 {})
(persist/gappend b "a" "x" 0 {})
(len (persist/global-from b 2))))
2)
(persist-test
"plain append does not touch the global index"
(let
((b (persist/open)))
(begin
(persist/append b "orders" "placed" 0 {})
(persist/gappend b "orders" "placed" 0 {})
(persist/global-pos b)))
1)
(persist-test
"global ordering works on the durable backend"
(let
((db (persist/mock-durable (persist/mem-backend))))
(begin
(persist/gappend db "a" "x" 0 {:v 1})
(persist/gappend db "b" "x" 0 {:v 2})
(persist/project-global
db
(fn (acc e) (+ acc (get (persist/event-data e) :v)))
0)))
3)
(persist-test
"global order survives restart (determinism)"
(let
((disk (persist/mem-backend)))
(begin
(let
((db (persist/mock-durable disk)))
(begin
(persist/gappend db "a" "x" 0 {:v 1})
(persist/gappend db "b" "x" 0 {:v 2})))
(persist/project-global
(persist/mock-durable disk)
(fn (acc e) (+ acc (get (persist/event-data e) :v)))
0)))
3)