persist: event schema evolution via upcasters + 9 tests
Some checks failed
Test, Build, and Deploy / test-build-deploy (push) Failing after 37s

upcast.sx: register a pure (event -> event) upcaster per type in an immutable
registry; read-upcast/project-upcast lift legacy events to the current shape on
read so projections see one shape (no version branching, no history rewrite).
upcast-data helper merges new :data fields. 171/171.

Co-Authored-By: Claude Opus 4.8 (1M context) <noreply@anthropic.com>
This commit is contained in:
2026-06-06 19:26:35 +00:00
parent 62a1485302
commit 0f6dbdfc7d
6 changed files with 178 additions and 5 deletions

View File

@@ -13,7 +13,7 @@ if [ ! -x "$SX_SERVER" ]; then
exit 1 exit 1
fi fi
SUITES=(event log kv project subscribe concurrency snapshot compaction durable blob view cas catalog query batch recovery) SUITES=(event log kv project subscribe concurrency snapshot compaction durable blob view cas catalog query batch upcast recovery)
OUT_JSON="lib/persist/scoreboard.json" OUT_JSON="lib/persist/scoreboard.json"
OUT_MD="lib/persist/scoreboard.md" OUT_MD="lib/persist/scoreboard.md"
@@ -41,6 +41,7 @@ run_suite() {
(load "lib/persist/catalog.sx") (load "lib/persist/catalog.sx")
(load "lib/persist/query.sx") (load "lib/persist/query.sx")
(load "lib/persist/batch.sx") (load "lib/persist/batch.sx")
(load "lib/persist/upcast.sx")
(load "lib/persist/subscribe.sx") (load "lib/persist/subscribe.sx")
(load "lib/persist/api.sx") (load "lib/persist/api.sx")
(epoch 2) (epoch 2)

View File

@@ -15,9 +15,10 @@
"catalog": {"pass": 10, "fail": 0}, "catalog": {"pass": 10, "fail": 0},
"query": {"pass": 9, "fail": 0}, "query": {"pass": 9, "fail": 0},
"batch": {"pass": 10, "fail": 0}, "batch": {"pass": 10, "fail": 0},
"upcast": {"pass": 9, "fail": 0},
"recovery": {"pass": 6, "fail": 0} "recovery": {"pass": 6, "fail": 0}
}, },
"total_pass": 162, "total_pass": 171,
"total_fail": 0, "total_fail": 0,
"total": 162 "total": 171
} }

View File

@@ -19,5 +19,6 @@ _Generated by `lib/persist/conformance.sh`_
| catalog | 10 | 0 | 10 | | catalog | 10 | 0 | 10 |
| query | 9 | 0 | 9 | | query | 9 | 0 | 9 |
| batch | 10 | 0 | 10 | | batch | 10 | 0 | 10 |
| upcast | 9 | 0 | 9 |
| recovery | 6 | 0 | 6 | | recovery | 6 | 0 | 6 |
| **Total** | **162** | **0** | **162** | | **Total** | **171** | **0** | **171** |

115
lib/persist/tests/upcast.sx Normal file
View File

@@ -0,0 +1,115 @@
; Extension — event schema evolution via upcasters.
; v1 "placed" events had {:total N}; v2 wants {:amount N :currency "GBP"}.
(define up-placed (fn (e) (persist/upcast-data e {:amount (get (persist/event-data e) :total) :currency "GBP"})))
(persist-test
"unregistered type passes through unchanged"
(let
((reg (persist/upcasters)))
(persist/event-data
(persist/upcast
reg
(persist/event "s" 1 "other" 0 {:x 1}))))
{:x 1})
(persist-test
"registered upcaster lifts an old event"
(let
((reg (persist/register-upcaster (persist/upcasters) "placed" up-placed)))
(get
(persist/event-data
(persist/upcast
reg
(persist/event "s" 1 "placed" 0 {:total 50})))
:amount))
50)
(persist-test
"upcaster adds the new field"
(let
((reg (persist/register-upcaster (persist/upcasters) "placed" up-placed)))
(get
(persist/event-data
(persist/upcast
reg
(persist/event "s" 1 "placed" 0 {:total 50})))
:currency))
"GBP")
(persist-test
"upcast preserves stream/seq/type/at"
(let
((reg (persist/register-upcaster (persist/upcasters) "placed" up-placed)))
(let
((e (persist/upcast reg (persist/event "orders" 7 "placed" 99 {:total 1}))))
(list
(persist/event-seq e)
(persist/event-at e)
(persist/event-type e))))
(list 7 99 "placed"))
(persist-test
"registry is immutable — register returns a new dict"
(let
((r0 (persist/upcasters)))
(begin
(persist/register-upcaster r0 "placed" up-placed)
(has-key? r0 "placed")))
false)
(persist-test
"read-upcast lifts every event in a stream"
(let
((b (persist/open))
(reg
(persist/register-upcaster (persist/upcasters) "placed" up-placed)))
(begin
(persist/append b "orders" "placed" 0 {:total 10})
(persist/append b "orders" "placed" 0 {:total 20})
(let
((es (persist/read-upcast b "orders" reg)))
(list
(get (persist/event-data (nth es 0)) :amount)
(get (persist/event-data (nth es 1)) :amount)))))
(list 10 20))
(persist-test
"project-upcast folds over the current shape"
(let
((b (persist/open))
(reg
(persist/register-upcaster (persist/upcasters) "placed" up-placed)))
(begin
(persist/append b "orders" "placed" 0 {:total 10})
(persist/append b "orders" "placed" 0 {:total 20})
(persist/project-upcast
b
"orders"
reg
(fn (acc e) (+ acc (get (persist/event-data e) :amount)))
0)))
30)
(persist-test
"mixed old and new events fold uniformly"
(let
((b (persist/open))
(reg
(persist/register-upcaster (persist/upcasters) "placed" up-placed)))
(begin
(persist/append b "orders" "placed" 0 {:total 5})
(persist/append b "orders" "placed" 0 {:total 7 :amount 7})
(persist/project-upcast
b
"orders"
reg
(fn (acc e) (+ acc (get (persist/event-data e) :amount)))
0)))
12)
(persist-test
"upcast works on the durable backend"
(let
((db (persist/mock-durable (persist/mem-backend)))
(reg
(persist/register-upcaster (persist/upcasters) "placed" up-placed)))
(begin
(persist/append db "orders" "placed" 0 {:total 42})
(get
(persist/event-data
(nth (persist/read-upcast db "orders" reg) 0))
:amount)))
42)

44
lib/persist/upcast.sx Normal file
View File

@@ -0,0 +1,44 @@
; persist/upcast — event schema evolution. An append-only log keeps events
; forever, so old events have old shapes. Rather than migrate stored data (you
; can't rewrite history) or branch every projection on version, register an
; upcaster per event type: a pure (event -> event) that lifts an old event to
; the current shape. Reads pass through the registry so projections see ONE
; shape. The registry is an immutable dict the consumer threads (no global
; mutable state). Requires: lib/persist/event.sx, lib/persist/log.sx.
(define persist/upcasters (fn () {}))
(define persist/register-upcaster (fn (reg type fn) (assoc reg type fn)))
; apply the registered upcaster for an event's type, or pass it through unchanged
(define
persist/upcast
(fn
(reg e)
(let ((f (get reg (persist/event-type e)))) (if f (f e) e))))
; read a stream with every event lifted to current shape
(define
persist/read-upcast
(fn
(b stream reg)
(map (fn (e) (persist/upcast reg e)) (persist/read b stream))))
; project over upcasted events — projections never see a legacy shape
(define
persist/project-upcast
(fn
(b stream reg step seed)
(reduce step seed (persist/read-upcast b stream reg))))
; helper: upcast an event's :data by merging in/overriding fields, keeping the
; record's stream/seq/type/at. Common upcaster body.
(define
persist/upcast-data
(fn
(e new-data)
(persist/event
(persist/event-stream e)
(persist/event-seq e)
(persist/event-type e)
(persist/event-at e)
(merge (persist/event-data e) new-data))))

View File

@@ -42,7 +42,7 @@ read models (feeds, indices, audit logs) update incrementally.
## Status (rolling) ## Status (rolling)
`bash lib/persist/conformance.sh`**162/162** (Phases 14 complete + extensions) `bash lib/persist/conformance.sh`**171/171** (Phases 14 complete + extensions)
## Ground rules ## Ground rules
@@ -167,11 +167,22 @@ over an in-process disk (the mock-IO harness).
(all-or-nothing guarded by optimistic concurrency). For an order + its line (all-or-nothing guarded by optimistic concurrency). For an order + its line
items as one commit. items as one commit.
- [x] `upcast.sx` — event schema evolution: register a pure `(event -> event)`
upcaster per type; `read-upcast`/`project-upcast` lift old events to the
current shape on read so projections see one shape. Immutable registry;
`upcast-data` helper merges new `:data` fields. Addresses the schema-evolution
trap without rewriting history.
## Consumers (post-foundation, not in scope here) ## Consumers (post-foundation, not in scope here)
feed/-log, flow store, mod/audit, search index, acl grants, identity sessions all feed/-log, flow store, mod/audit, search index, acl grants, identity sessions all
become `persist` log or kv. Track each migration in that subsystem's plan. become `persist` log or kv. Track each migration in that subsystem's plan.
## Progress log ## Progress log
- **Ext: event schema evolution (171/171).** `upcast.sx` — per-type pure
`(event -> event)` upcasters in an immutable registry; `read-upcast`/
`project-upcast` lift legacy events to the current shape on read so
projections never branch on version. `upcast-data` merges new `:data` fields
keeping stream/seq/type/at. 9 tests incl. mixed old/new + durable.
- **Ext: atomic batch append (162/162).** `batch.sx``persist/append-batch` - **Ext: atomic batch append (162/162).** `batch.sx``persist/append-batch`
commits `(type at data)` specs as one contiguous block (real cons-list, in commits `(type at data)` specs as one contiguous block (real cons-list, in
order); `persist/append-batch-expect` checks the stream is still at expected order); `persist/append-batch-expect` checks the stream is still at expected