persist: atomic batch append — contiguous block + transactional guard + 10 tests
Some checks failed
Test, Build, and Deploy / test-build-deploy (push) Failing after 43s
Some checks failed
Test, Build, and Deploy / test-build-deploy (push) Failing after 43s
batch.sx: persist/append-batch commits (type at data) specs as one contiguous block; persist/append-batch-expect checks the stream is still at expected before writing any event, so the batch is all-or-nothing under a concurrent writer (conflict is a value, not a partial write). 162/162. Co-Authored-By: Claude Opus 4.8 (1M context) <noreply@anthropic.com>
This commit is contained in:
40
lib/persist/batch.sx
Normal file
40
lib/persist/batch.sx
Normal file
@@ -0,0 +1,40 @@
|
||||
; persist/batch — commit several events to a stream as one contiguous block.
|
||||
; Each spec is (type at data). Plain append-batch always appends; the -expect
|
||||
; form is the transactional commit: it checks the stream is still at `expected`
|
||||
; before writing ANY event, so a batch is all-or-nothing under a concurrent
|
||||
; writer (conflict is a value, not a partial write). For an order + its line
|
||||
; items, an audit entry + its reason, etc. Requires: lib/persist/log.sx.
|
||||
|
||||
; append a list of (type at data) specs as one block; returns the stored events
|
||||
; (a real cons-list, in order, with contiguous seqs)
|
||||
(define
|
||||
persist/append-batch
|
||||
(fn
|
||||
(b stream specs)
|
||||
(reverse
|
||||
(reduce
|
||||
(fn
|
||||
(acc spec)
|
||||
(cons
|
||||
(persist/append
|
||||
b
|
||||
stream
|
||||
(first spec)
|
||||
(nth spec 1)
|
||||
(nth spec 2))
|
||||
acc))
|
||||
(list)
|
||||
specs))))
|
||||
|
||||
; transactional batch: commit all specs only if the stream is still at expected,
|
||||
; else return a conflict and write nothing
|
||||
(define
|
||||
persist/append-batch-expect
|
||||
(fn
|
||||
(b stream expected specs)
|
||||
(let
|
||||
((actual (persist/last-seq b stream)))
|
||||
(if
|
||||
(= actual expected)
|
||||
(persist/append-batch b stream specs)
|
||||
{:actual actual :expected expected :conflict true}))))
|
||||
@@ -13,7 +13,7 @@ if [ ! -x "$SX_SERVER" ]; then
|
||||
exit 1
|
||||
fi
|
||||
|
||||
SUITES=(event log kv project subscribe concurrency snapshot compaction durable blob view cas catalog query recovery)
|
||||
SUITES=(event log kv project subscribe concurrency snapshot compaction durable blob view cas catalog query batch recovery)
|
||||
|
||||
OUT_JSON="lib/persist/scoreboard.json"
|
||||
OUT_MD="lib/persist/scoreboard.md"
|
||||
@@ -40,6 +40,7 @@ run_suite() {
|
||||
(load "lib/persist/view.sx")
|
||||
(load "lib/persist/catalog.sx")
|
||||
(load "lib/persist/query.sx")
|
||||
(load "lib/persist/batch.sx")
|
||||
(load "lib/persist/subscribe.sx")
|
||||
(load "lib/persist/api.sx")
|
||||
(epoch 2)
|
||||
|
||||
@@ -14,9 +14,10 @@
|
||||
"cas": {"pass": 11, "fail": 0},
|
||||
"catalog": {"pass": 10, "fail": 0},
|
||||
"query": {"pass": 9, "fail": 0},
|
||||
"batch": {"pass": 10, "fail": 0},
|
||||
"recovery": {"pass": 6, "fail": 0}
|
||||
},
|
||||
"total_pass": 152,
|
||||
"total_pass": 162,
|
||||
"total_fail": 0,
|
||||
"total": 152
|
||||
"total": 162
|
||||
}
|
||||
|
||||
@@ -18,5 +18,6 @@ _Generated by `lib/persist/conformance.sh`_
|
||||
| cas | 11 | 0 | 11 |
|
||||
| catalog | 10 | 0 | 10 |
|
||||
| query | 9 | 0 | 9 |
|
||||
| batch | 10 | 0 | 10 |
|
||||
| recovery | 6 | 0 | 6 |
|
||||
| **Total** | **152** | **0** | **152** |
|
||||
| **Total** | **162** | **0** | **162** |
|
||||
|
||||
122
lib/persist/tests/batch.sx
Normal file
122
lib/persist/tests/batch.sx
Normal file
@@ -0,0 +1,122 @@
|
||||
; Extension — atomic batch append: contiguous seqs, transactional all-or-nothing.
|
||||
|
||||
(persist-test
|
||||
"batch assigns contiguous seqs"
|
||||
(let
|
||||
((b (persist/open)))
|
||||
(let
|
||||
((evs (persist/append-batch b "s" (list (list "a" 0 {}) (list "b" 0 {}) (list "c" 0 {})))))
|
||||
(list
|
||||
(persist/event-seq (first evs))
|
||||
(persist/event-seq (nth evs 2)))))
|
||||
(list 1 3))
|
||||
(persist-test
|
||||
"batch returns events in order"
|
||||
(let
|
||||
((b (persist/open)))
|
||||
(let
|
||||
((evs (persist/append-batch b "s" (list (list "a" 0 {}) (list "b" 0 {})))))
|
||||
(list
|
||||
(persist/event-type (first evs))
|
||||
(persist/event-type (nth evs 1)))))
|
||||
(list "a" "b"))
|
||||
(persist-test
|
||||
"batch grows the stream by its size"
|
||||
(let
|
||||
((b (persist/open)))
|
||||
(begin
|
||||
(persist/append-batch
|
||||
b
|
||||
"s"
|
||||
(list
|
||||
(list "a" 0 {})
|
||||
(list "b" 0 {})
|
||||
(list "c" 0 {})))
|
||||
(persist/count b "s")))
|
||||
3)
|
||||
(persist-test
|
||||
"batch continues an existing stream"
|
||||
(let
|
||||
((b (persist/open)))
|
||||
(begin
|
||||
(persist/append b "s" "x" 0 {})
|
||||
(let
|
||||
((evs (persist/append-batch b "s" (list (list "a" 0 {}) (list "b" 0 {})))))
|
||||
(persist/event-seq (first evs)))))
|
||||
2)
|
||||
(persist-test
|
||||
"empty batch is a no-op"
|
||||
(let
|
||||
((b (persist/open)))
|
||||
(begin (persist/append-batch b "s" (list)) (persist/count b "s")))
|
||||
0)
|
||||
(persist-test
|
||||
"batch-expect with correct seq commits all"
|
||||
(let
|
||||
((b (persist/open)))
|
||||
(begin
|
||||
(persist/append-batch-expect
|
||||
b
|
||||
"s"
|
||||
0
|
||||
(list
|
||||
(list "a" 0 {})
|
||||
(list "b" 0 {})))
|
||||
(persist/count b "s")))
|
||||
2)
|
||||
(persist-test
|
||||
"batch-expect with stale seq writes nothing"
|
||||
(let
|
||||
((b (persist/open)))
|
||||
(begin
|
||||
(persist/append b "s" "x" 0 {})
|
||||
(persist/append-batch-expect
|
||||
b
|
||||
"s"
|
||||
0
|
||||
(list
|
||||
(list "a" 0 {})
|
||||
(list "b" 0 {})))
|
||||
(persist/count b "s")))
|
||||
1)
|
||||
(persist-test
|
||||
"batch-expect stale returns a conflict"
|
||||
(let
|
||||
((b (persist/open)))
|
||||
(begin
|
||||
(persist/append b "s" "x" 0 {})
|
||||
(persist/conflict?
|
||||
(persist/append-batch-expect
|
||||
b
|
||||
"s"
|
||||
0
|
||||
(list (list "a" 0 {}))))))
|
||||
true)
|
||||
(persist-test
|
||||
"batch data is preserved"
|
||||
(let
|
||||
((b (persist/open)))
|
||||
(begin
|
||||
(persist/append-batch
|
||||
b
|
||||
"order"
|
||||
(list
|
||||
(list "placed" 0 {:id 1})
|
||||
(list "line" 0 {:sku "x"})))
|
||||
(get
|
||||
(persist/event-data (nth (persist/read b "order") 1))
|
||||
:sku)))
|
||||
"x")
|
||||
(persist-test
|
||||
"batch works on the durable backend"
|
||||
(let
|
||||
((db (persist/mock-durable (persist/mem-backend))))
|
||||
(begin
|
||||
(persist/append-batch
|
||||
db
|
||||
"s"
|
||||
(list
|
||||
(list "a" 0 {})
|
||||
(list "b" 0 {})))
|
||||
(persist/last-seq db "s")))
|
||||
2)
|
||||
Reference in New Issue
Block a user