From a37a158d017fb55a2b568f09b13269cbd3c10ff3 Mon Sep 17 00:00:00 2001 From: giles Date: Sat, 6 Jun 2026 20:41:01 +0000 Subject: [PATCH] persist: global commit ordering across streams + 11 tests global.sx: persist/gappend records a pointer in a reserved $global index whose seq is the global commit position; read-global/project-global replay every event in commit order; global-from for incremental consumers. Opt-in (plain append untouched); $-prefixed streams now reserved + hidden from the public catalog (streams-all reveals them). Gives feed its unified timeline. Deterministic across restart. 191/191. Co-Authored-By: Claude Opus 4.8 (1M context) --- lib/persist/catalog.sx | 23 +++++-- lib/persist/conformance.sh | 3 +- lib/persist/global.sx | 55 ++++++++++++++++ lib/persist/scoreboard.json | 5 +- lib/persist/scoreboard.md | 3 +- lib/persist/tests/global.sx | 123 ++++++++++++++++++++++++++++++++++++ plans/persist-on-sx.md | 16 ++++- 7 files changed, 219 insertions(+), 9 deletions(-) create mode 100644 lib/persist/global.sx create mode 100644 lib/persist/tests/global.sx diff --git a/lib/persist/catalog.sx b/lib/persist/catalog.sx index fb0015ec..74de044a 100644 --- a/lib/persist/catalog.sx +++ b/lib/persist/catalog.sx @@ -1,15 +1,30 @@ ; persist/catalog — enumerate the streams a backend holds. The catalog is the ; set of streams ever appended to (from the seq high-water marks), so a stream -; whose log has been fully compacted still appears. For admin, global ops, and -; cross-stream tooling. Requires: lib/persist/backend.sx, lib/persist/log.sx. +; whose log has been fully compacted still appears. $-prefixed streams are +; reserved for internal indexes (e.g. the $global commit index) and are hidden +; from the public catalog; use streams-all to see them. For admin, global ops, +; and cross-stream tooling. Requires: lib/persist/backend.sx, lib/persist/log.sx. + +(define persist/reserved-stream? (fn (s) (starts-with? s "$"))) + +; every stream including reserved internal indexes +(define persist/streams-all (fn (b) (persist/backend-streams b))) + +; public streams (reserved internal indexes hidden) +(define + persist/streams + (fn + (b) + (filter + (fn (s) (not (persist/reserved-stream? s))) + (persist/streams-all b)))) -(define persist/streams (fn (b) (persist/backend-streams b))) (define persist/stream-count (fn (b) (len (persist/streams b)))) (define persist/stream-exists? (fn (b stream) (contains? (persist/streams b) stream))) -; total logical events across all streams (sum of high-water marks) +; total logical events across all public streams (sum of high-water marks) (define persist/total-events (fn diff --git a/lib/persist/conformance.sh b/lib/persist/conformance.sh index bb83f151..6d100448 100755 --- a/lib/persist/conformance.sh +++ b/lib/persist/conformance.sh @@ -13,7 +13,7 @@ if [ ! -x "$SX_SERVER" ]; then exit 1 fi -SUITES=(event log kv project subscribe concurrency snapshot compaction durable blob view cas catalog query batch upcast idempotency recovery) +SUITES=(event log kv project subscribe concurrency snapshot compaction durable blob view cas catalog query batch upcast idempotency global recovery) OUT_JSON="lib/persist/scoreboard.json" OUT_MD="lib/persist/scoreboard.md" @@ -43,6 +43,7 @@ run_suite() { (load "lib/persist/batch.sx") (load "lib/persist/upcast.sx") (load "lib/persist/idempotency.sx") +(load "lib/persist/global.sx") (load "lib/persist/subscribe.sx") (load "lib/persist/api.sx") (epoch 2) diff --git a/lib/persist/global.sx b/lib/persist/global.sx new file mode 100644 index 00000000..66db11d1 --- /dev/null +++ b/lib/persist/global.sx @@ -0,0 +1,55 @@ +; persist/global — a global commit ordering across streams. Per-stream seqs only +; order within a stream; a unified timeline (e.g. feed's home feed, a global +; audit trail) needs a single order across streams. `persist/gappend` appends to +; the target stream and then records a pointer in a reserved $global index whose +; own seq IS the global commit position. Reading the index in order and +; resolving each pointer yields every event in commit order. This is opt-in: +; streams that don't need global ordering use plain persist/append and never +; touch $global. Determinism: the order is the $global append order, replayed +; identically. Requires: lib/persist/log.sx, lib/persist/catalog.sx. + +(define persist/global-stream "$global") + +; append with a global commit position. Returns the stored stream event; the +; event's global position is the seq of its pointer in $global. +(define + persist/gappend + (fn + (b stream type at data) + (let + ((ev (persist/append b stream type at data))) + (begin (persist/append b persist/global-stream "ref" at {:stream stream :seq (persist/event-seq ev)}) ev)))) + +; the global index: pointer events in commit order (each pointer's seq = gpos) +(define persist/global-log (fn (b) (persist/read b persist/global-stream))) + +; the current global commit position (count of globally-ordered appends) +(define + persist/global-pos + (fn (b) (persist/last-seq b persist/global-stream))) + +; resolve a pointer event to the actual stream event it references +(define + persist/resolve-ref + (fn + (b ptr) + (let + ((d (persist/event-data ptr))) + (first (persist/read-from b (get d :stream) (get d :seq)))))) + +; every globally-ordered event, in commit order +(define + persist/read-global + (fn + (b) + (map (fn (ptr) (persist/resolve-ref b ptr)) (persist/global-log b)))) + +; pointer events at or after a global position (incremental global consumers) +(define + persist/global-from + (fn (b gpos) (persist/read-from b persist/global-stream gpos))) + +; fold over all events in global commit order +(define + persist/project-global + (fn (b step seed) (reduce step seed (persist/read-global b)))) diff --git a/lib/persist/scoreboard.json b/lib/persist/scoreboard.json index b9888053..ac995fa1 100644 --- a/lib/persist/scoreboard.json +++ b/lib/persist/scoreboard.json @@ -17,9 +17,10 @@ "batch": {"pass": 10, "fail": 0}, "upcast": {"pass": 9, "fail": 0}, "idempotency": {"pass": 9, "fail": 0}, + "global": {"pass": 11, "fail": 0}, "recovery": {"pass": 6, "fail": 0} }, - "total_pass": 180, + "total_pass": 191, "total_fail": 0, - "total": 180 + "total": 191 } diff --git a/lib/persist/scoreboard.md b/lib/persist/scoreboard.md index c8f05201..ada4507f 100644 --- a/lib/persist/scoreboard.md +++ b/lib/persist/scoreboard.md @@ -21,5 +21,6 @@ _Generated by `lib/persist/conformance.sh`_ | batch | 10 | 0 | 10 | | upcast | 9 | 0 | 9 | | idempotency | 9 | 0 | 9 | +| global | 11 | 0 | 11 | | recovery | 6 | 0 | 6 | -| **Total** | **180** | **0** | **180** | +| **Total** | **191** | **0** | **191** | diff --git a/lib/persist/tests/global.sx b/lib/persist/tests/global.sx new file mode 100644 index 00000000..260586bb --- /dev/null +++ b/lib/persist/tests/global.sx @@ -0,0 +1,123 @@ +; Extension — global commit ordering across streams. + +(persist-test + "gappend returns the stream event with its local seq" + (let + ((b (persist/open))) + (persist/event-seq + (persist/gappend b "orders" "placed" 0 {}))) + 1) +(persist-test + "global-pos advances per gappend regardless of stream" + (let + ((b (persist/open))) + (begin + (persist/gappend b "orders" "placed" 0 {}) + (persist/gappend b "users" "joined" 0 {}) + (persist/gappend b "orders" "placed" 0 {}) + (persist/global-pos b))) + 3) +(persist-test + "read-global returns events in commit order across streams" + (let + ((b (persist/open))) + (begin + (persist/gappend b "orders" "placed" 0 {:n 1}) + (persist/gappend b "users" "joined" 0 {:n 2}) + (persist/gappend b "orders" "placed" 0 {:n 3}) + (let + ((g (persist/read-global b))) + (list + (get (persist/event-data (nth g 0)) :n) + (get (persist/event-data (nth g 1)) :n) + (get (persist/event-data (nth g 2)) :n))))) + (list 1 2 3)) +(persist-test + "read-global resolves to the right streams" + (let + ((b (persist/open))) + (begin + (persist/gappend b "orders" "placed" 0 {}) + (persist/gappend b "users" "joined" 0 {}) + (let + ((g (persist/read-global b))) + (list + (persist/event-stream (nth g 0)) + (persist/event-stream (nth g 1)))))) + (list "orders" "users")) +(persist-test + "project-global folds across all streams in order" + (let + ((b (persist/open))) + (begin + (persist/gappend b "a" "x" 0 {:v 10}) + (persist/gappend b "b" "x" 0 {:v 20}) + (persist/gappend b "a" "x" 0 {:v 30}) + (persist/project-global + b + (fn (acc e) (+ acc (get (persist/event-data e) :v))) + 0))) + 60) +(persist-test + "global index is hidden from the public catalog" + (let + ((b (persist/open))) + (begin + (persist/gappend b "orders" "placed" 0 {}) + (persist/gappend b "users" "joined" 0 {}) + (list (persist/stream-count b) (persist/stream-exists? b "$global")))) + (list 2 false)) +(persist-test + "streams-all reveals the reserved index" + (let + ((b (persist/open))) + (begin + (persist/gappend b "orders" "placed" 0 {}) + (contains? (persist/streams-all b) "$global"))) + true) +(persist-test + "global-from gives pointers at or after a position" + (let + ((b (persist/open))) + (begin + (persist/gappend b "a" "x" 0 {}) + (persist/gappend b "a" "x" 0 {}) + (persist/gappend b "a" "x" 0 {}) + (len (persist/global-from b 2)))) + 2) +(persist-test + "plain append does not touch the global index" + (let + ((b (persist/open))) + (begin + (persist/append b "orders" "placed" 0 {}) + (persist/gappend b "orders" "placed" 0 {}) + (persist/global-pos b))) + 1) +(persist-test + "global ordering works on the durable backend" + (let + ((db (persist/mock-durable (persist/mem-backend)))) + (begin + (persist/gappend db "a" "x" 0 {:v 1}) + (persist/gappend db "b" "x" 0 {:v 2}) + (persist/project-global + db + (fn (acc e) (+ acc (get (persist/event-data e) :v))) + 0))) + 3) +(persist-test + "global order survives restart (determinism)" + (let + ((disk (persist/mem-backend))) + (begin + (let + ((db (persist/mock-durable disk))) + (begin + (persist/gappend db "a" "x" 0 {:v 1}) + (persist/gappend db "b" "x" 0 {:v 2}))) + (persist/project-global + (persist/mock-durable disk) + (fn (acc e) (+ acc (get (persist/event-data e) :v))) + 0))) + 3) diff --git a/plans/persist-on-sx.md b/plans/persist-on-sx.md index e4e5e852..25b6ba0a 100644 --- a/plans/persist-on-sx.md +++ b/plans/persist-on-sx.md @@ -42,7 +42,7 @@ read models (feeds, indices, audit logs) update incrementally. ## Status (rolling) -`bash lib/persist/conformance.sh` → **180/180** (Phases 1–4 complete + extensions) +`bash lib/persist/conformance.sh` → **191/191** (Phases 1–4 complete + extensions) ## Ground rules @@ -177,11 +177,25 @@ over an in-process disk (the mock-IO harness). keyed by a caller idempotency key (per stream), returning the same event on a repeat. Marker lives in kv, so idempotency holds across restart. `seen?` check. +- [x] `global.sx` — global commit ordering across streams (the primitive feed's + unified timeline needs). `persist/gappend` records a pointer in a reserved + `$global` index whose seq is the commit position; `read-global`/ + `project-global` replay every event in commit order; `global-from` for + incremental consumers. Opt-in (plain `append` never touches it); reserved + index hidden from the public catalog. Deterministic across restart. + ## Consumers (post-foundation, not in scope here) feed/-log, flow store, mod/audit, search index, acl grants, identity sessions all become `persist` log or kv. Track each migration in that subsystem's plan. ## Progress log +- **Ext: global commit ordering (191/191).** `global.sx` — `persist/gappend` + records a pointer in a reserved `$global` index (its seq = global commit + position); `read-global`/`project-global` resolve pointers to events in commit + order; `global-from` for incremental global consumers. Opt-in; `$`-streams are + now reserved + hidden from the public catalog (`streams-all` reveals them). + Gives feed its cross-stream timeline. 11 tests incl. durable + restart + determinism. - **Ext: exactly-once append (180/180).** `idempotency.sx` — `persist/append-once` appends at most once per (stream, idempotency key), returning the same event on a repeat; the marker lives in kv so it survives