From 156d6f12ec357664e70bc55f7bac7c434153df0c Mon Sep 17 00:00:00 2001 From: giles Date: Sat, 6 Jun 2026 18:37:49 +0000 Subject: [PATCH] =?UTF-8?q?persist:=20optimistic=20concurrency=20=E2=80=94?= =?UTF-8?q?=20conflict=20as=20a=20real=20result=20+=208=20tests?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit concurrency.sx: persist/append-expect refuses an append when the stream advanced past the caller's expected seq, returning {:conflict :expected :actual} instead of crashing or overwriting. persist/conflict? + accessors. Phase 2 complete, 54/54. Co-Authored-By: Claude Opus 4.8 (1M context) --- lib/persist/concurrency.sx | 24 ++++++++ lib/persist/conformance.sh | 3 +- lib/persist/scoreboard.json | 7 ++- lib/persist/scoreboard.md | 3 +- lib/persist/tests/concurrency.sx | 96 ++++++++++++++++++++++++++++++++ plans/persist-on-sx.md | 10 +++- 6 files changed, 136 insertions(+), 7 deletions(-) create mode 100644 lib/persist/concurrency.sx create mode 100644 lib/persist/tests/concurrency.sx diff --git a/lib/persist/concurrency.sx b/lib/persist/concurrency.sx new file mode 100644 index 00000000..3049c8fc --- /dev/null +++ b/lib/persist/concurrency.sx @@ -0,0 +1,24 @@ +; persist/concurrency — optimistic concurrency for the log facet. The caller +; passes the seq it believes is current (the last-seq it last observed). If the +; stream has advanced since, the append is refused and a conflict VALUE is +; returned — never a crash, never a silent overwrite. The caller re-reads the +; tail and retries. This is the substrate-level answer to "two writers, one +; stream": the loser gets a result it can act on. +; Requires: lib/persist/log.sx. + +(define + persist/append-expect + (fn + (b stream expected type at data) + (let + ((actual (persist/last-seq b stream))) + (if + (= actual expected) + (persist/append b stream type at data) + {:actual actual :expected expected :conflict true})))) + +(define + persist/conflict? + (fn (r) (if (has-key? r :conflict) (get r :conflict) false))) +(define persist/conflict-expected (fn (r) (get r :expected))) +(define persist/conflict-actual (fn (r) (get r :actual))) diff --git a/lib/persist/conformance.sh b/lib/persist/conformance.sh index e3d9de13..d930b4e3 100755 --- a/lib/persist/conformance.sh +++ b/lib/persist/conformance.sh @@ -13,7 +13,7 @@ if [ ! -x "$SX_SERVER" ]; then exit 1 fi -SUITES=(event log kv project subscribe) +SUITES=(event log kv project subscribe concurrency) OUT_JSON="lib/persist/scoreboard.json" OUT_MD="lib/persist/scoreboard.md" @@ -32,6 +32,7 @@ run_suite() { (load "lib/persist/log.sx") (load "lib/persist/kv.sx") (load "lib/persist/project.sx") +(load "lib/persist/concurrency.sx") (load "lib/persist/subscribe.sx") (load "lib/persist/api.sx") (epoch 2) diff --git a/lib/persist/scoreboard.json b/lib/persist/scoreboard.json index 396e3cf1..65674ec4 100644 --- a/lib/persist/scoreboard.json +++ b/lib/persist/scoreboard.json @@ -4,9 +4,10 @@ "log": {"pass": 9, "fail": 0}, "kv": {"pass": 13, "fail": 0}, "project": {"pass": 9, "fail": 0}, - "subscribe": {"pass": 9, "fail": 0} + "subscribe": {"pass": 9, "fail": 0}, + "concurrency": {"pass": 8, "fail": 0} }, - "total_pass": 46, + "total_pass": 54, "total_fail": 0, - "total": 46 + "total": 54 } diff --git a/lib/persist/scoreboard.md b/lib/persist/scoreboard.md index 60131ac6..6b922c07 100644 --- a/lib/persist/scoreboard.md +++ b/lib/persist/scoreboard.md @@ -9,4 +9,5 @@ _Generated by `lib/persist/conformance.sh`_ | kv | 13 | 0 | 13 | | project | 9 | 0 | 9 | | subscribe | 9 | 0 | 9 | -| **Total** | **46** | **0** | **46** | +| concurrency | 8 | 0 | 8 | +| **Total** | **54** | **0** | **54** | diff --git a/lib/persist/tests/concurrency.sx b/lib/persist/tests/concurrency.sx new file mode 100644 index 00000000..aba28c8e --- /dev/null +++ b/lib/persist/tests/concurrency.sx @@ -0,0 +1,96 @@ +; Phase 2 — optimistic concurrency: conflict is a real result, not a crash. + +(persist-test + "append-expect 0 on empty stream succeeds" + (persist/event-seq + (persist/append-expect + (persist/open) + "s" + 0 + "x" + 0 + {})) + 1) +(persist-test + "append-expect with correct seq succeeds" + (let + ((b (persist/open))) + (begin + (persist/append b "s" "x" 0 {}) + (persist/event-seq + (persist/append-expect b "s" 1 "x" 0 {})))) + 2) +(persist-test + "append-expect with stale seq returns a conflict" + (let + ((b (persist/open))) + (begin + (persist/append b "s" "x" 0 {}) + (persist/append b "s" "x" 0 {}) + (persist/conflict? + (persist/append-expect b "s" 1 "x" 0 {})))) + true) +(persist-test + "a successful append is not a conflict" + (persist/conflict? + (persist/append-expect + (persist/open) + "s" + 0 + "x" + 0 + {})) + false) +(persist-test + "conflict carries expected and actual" + (let + ((b (persist/open))) + (begin + (persist/append b "s" "x" 0 {}) + (persist/append b "s" "x" 0 {}) + (let + ((r (persist/append-expect b "s" 0 "x" 0 {}))) + (list (persist/conflict-expected r) (persist/conflict-actual r))))) + (list 0 2)) +(persist-test + "a conflicting append does not write" + (let + ((b (persist/open))) + (begin + (persist/append b "s" "x" 0 {}) + (persist/append-expect b "s" 0 "x" 0 {}) + (persist/count b "s"))) + 1) +(persist-test + "two writers: first wins, second conflicts" + (let + ((b (persist/open))) + (let + ((seen (persist/last-seq b "s"))) + (begin + (persist/append-expect b "s" seen "x" 0 {:who "A"}) + (persist/conflict? + (persist/append-expect b "s" seen "x" 0 {:who "B"}))))) + true) +(persist-test + "retry after conflict succeeds" + (let + ((b (persist/open))) + (let + ((seen (persist/last-seq b "s"))) + (begin + (persist/append-expect b "s" seen "x" 0 {:who "A"}) + (let + ((r (persist/append-expect b "s" seen "x" 0 {:who "B"}))) + (if + (persist/conflict? r) + (persist/event-seq + (persist/append-expect + b + "s" + (persist/conflict-actual r) + "x" + 0 + {:who "B"})) + (persist/event-seq r)))))) + 2) diff --git a/plans/persist-on-sx.md b/plans/persist-on-sx.md index 9a02fdd1..b14fc32f 100644 --- a/plans/persist-on-sx.md +++ b/plans/persist-on-sx.md @@ -42,7 +42,7 @@ read models (feeds, indices, audit logs) update incrementally. ## Status (rolling) -`bash lib/persist/conformance.sh` → **28/28** (Phase 1 done) +`bash lib/persist/conformance.sh` → **54/54** (Phases 1–2 done) ## Ground rules @@ -96,7 +96,7 @@ lib/persist/backend.sx lib/persist/api.sx ## Phase 2 — Projections + subscriptions - [x] `project.sx` — `(project stream step seed)`, incremental fold - [x] subscription hook — projection / kv read model re-runs on append -- [ ] concurrency conflict surfaced as a real result, not a crash +- [x] concurrency conflict surfaced as a real result, not a crash ## Phase 3 — Snapshots + replay - [ ] `snapshot.sx` — checkpoint a projection; replay = snapshot + tail @@ -113,6 +113,12 @@ feed/-log, flow store, mod/audit, search index, acl grants, identity sessions al become `persist` log or kv. Track each migration in that subsystem's plan. ## Progress log +- **Phase 2c (54/54) — Phase 2 complete.** `concurrency.sx` — optimistic + concurrency: `persist/append-expect b stream expected ...` refuses the append + if the stream advanced past `expected`, returning a conflict VALUE + `{:conflict true :expected :actual}` (never a crash, never a silent + overwrite). `persist/conflict?` + accessors; caller re-reads actual and + retries. 8 tests incl. two-writer race + retry. - **Phase 2b (46/46).** `subscribe.sx` — `persist/hub` wraps a backend with per-stream callbacks. `persist/publish` appends then fires subscribers `(backend stream event)`; direct `persist/append` bypasses them by design