diff --git a/lib/persist/conformance.sh b/lib/persist/conformance.sh index e25b865b..d7ebd032 100755 --- a/lib/persist/conformance.sh +++ b/lib/persist/conformance.sh @@ -13,7 +13,7 @@ if [ ! -x "$SX_SERVER" ]; then exit 1 fi -SUITES=(event log kv project subscribe concurrency snapshot compaction durable blob view cas catalog query batch recovery) +SUITES=(event log kv project subscribe concurrency snapshot compaction durable blob view cas catalog query batch upcast recovery) OUT_JSON="lib/persist/scoreboard.json" OUT_MD="lib/persist/scoreboard.md" @@ -41,6 +41,7 @@ run_suite() { (load "lib/persist/catalog.sx") (load "lib/persist/query.sx") (load "lib/persist/batch.sx") +(load "lib/persist/upcast.sx") (load "lib/persist/subscribe.sx") (load "lib/persist/api.sx") (epoch 2) diff --git a/lib/persist/scoreboard.json b/lib/persist/scoreboard.json index 6c07cf9d..d236f68c 100644 --- a/lib/persist/scoreboard.json +++ b/lib/persist/scoreboard.json @@ -15,9 +15,10 @@ "catalog": {"pass": 10, "fail": 0}, "query": {"pass": 9, "fail": 0}, "batch": {"pass": 10, "fail": 0}, + "upcast": {"pass": 9, "fail": 0}, "recovery": {"pass": 6, "fail": 0} }, - "total_pass": 162, + "total_pass": 171, "total_fail": 0, - "total": 162 + "total": 171 } diff --git a/lib/persist/scoreboard.md b/lib/persist/scoreboard.md index 5fa603da..88f3b77b 100644 --- a/lib/persist/scoreboard.md +++ b/lib/persist/scoreboard.md @@ -19,5 +19,6 @@ _Generated by `lib/persist/conformance.sh`_ | catalog | 10 | 0 | 10 | | query | 9 | 0 | 9 | | batch | 10 | 0 | 10 | +| upcast | 9 | 0 | 9 | | recovery | 6 | 0 | 6 | -| **Total** | **162** | **0** | **162** | +| **Total** | **171** | **0** | **171** | diff --git a/lib/persist/tests/upcast.sx b/lib/persist/tests/upcast.sx new file mode 100644 index 00000000..19295f52 --- /dev/null +++ b/lib/persist/tests/upcast.sx @@ -0,0 +1,115 @@ +; Extension — event schema evolution via upcasters. + +; v1 "placed" events had {:total N}; v2 wants {:amount N :currency "GBP"}. +(define up-placed (fn (e) (persist/upcast-data e {:amount (get (persist/event-data e) :total) :currency "GBP"}))) + +(persist-test + "unregistered type passes through unchanged" + (let + ((reg (persist/upcasters))) + (persist/event-data + (persist/upcast + reg + (persist/event "s" 1 "other" 0 {:x 1})))) + {:x 1}) +(persist-test + "registered upcaster lifts an old event" + (let + ((reg (persist/register-upcaster (persist/upcasters) "placed" up-placed))) + (get + (persist/event-data + (persist/upcast + reg + (persist/event "s" 1 "placed" 0 {:total 50}))) + :amount)) + 50) +(persist-test + "upcaster adds the new field" + (let + ((reg (persist/register-upcaster (persist/upcasters) "placed" up-placed))) + (get + (persist/event-data + (persist/upcast + reg + (persist/event "s" 1 "placed" 0 {:total 50}))) + :currency)) + "GBP") +(persist-test + "upcast preserves stream/seq/type/at" + (let + ((reg (persist/register-upcaster (persist/upcasters) "placed" up-placed))) + (let + ((e (persist/upcast reg (persist/event "orders" 7 "placed" 99 {:total 1})))) + (list + (persist/event-seq e) + (persist/event-at e) + (persist/event-type e)))) + (list 7 99 "placed")) +(persist-test + "registry is immutable — register returns a new dict" + (let + ((r0 (persist/upcasters))) + (begin + (persist/register-upcaster r0 "placed" up-placed) + (has-key? r0 "placed"))) + false) +(persist-test + "read-upcast lifts every event in a stream" + (let + ((b (persist/open)) + (reg + (persist/register-upcaster (persist/upcasters) "placed" up-placed))) + (begin + (persist/append b "orders" "placed" 0 {:total 10}) + (persist/append b "orders" "placed" 0 {:total 20}) + (let + ((es (persist/read-upcast b "orders" reg))) + (list + (get (persist/event-data (nth es 0)) :amount) + (get (persist/event-data (nth es 1)) :amount))))) + (list 10 20)) +(persist-test + "project-upcast folds over the current shape" + (let + ((b (persist/open)) + (reg + (persist/register-upcaster (persist/upcasters) "placed" up-placed))) + (begin + (persist/append b "orders" "placed" 0 {:total 10}) + (persist/append b "orders" "placed" 0 {:total 20}) + (persist/project-upcast + b + "orders" + reg + (fn (acc e) (+ acc (get (persist/event-data e) :amount))) + 0))) + 30) +(persist-test + "mixed old and new events fold uniformly" + (let + ((b (persist/open)) + (reg + (persist/register-upcaster (persist/upcasters) "placed" up-placed))) + (begin + (persist/append b "orders" "placed" 0 {:total 5}) + (persist/append b "orders" "placed" 0 {:total 7 :amount 7}) + (persist/project-upcast + b + "orders" + reg + (fn (acc e) (+ acc (get (persist/event-data e) :amount))) + 0))) + 12) +(persist-test + "upcast works on the durable backend" + (let + ((db (persist/mock-durable (persist/mem-backend))) + (reg + (persist/register-upcaster (persist/upcasters) "placed" up-placed))) + (begin + (persist/append db "orders" "placed" 0 {:total 42}) + (get + (persist/event-data + (nth (persist/read-upcast db "orders" reg) 0)) + :amount))) + 42) diff --git a/lib/persist/upcast.sx b/lib/persist/upcast.sx new file mode 100644 index 00000000..74bc0197 --- /dev/null +++ b/lib/persist/upcast.sx @@ -0,0 +1,44 @@ +; persist/upcast — event schema evolution. An append-only log keeps events +; forever, so old events have old shapes. Rather than migrate stored data (you +; can't rewrite history) or branch every projection on version, register an +; upcaster per event type: a pure (event -> event) that lifts an old event to +; the current shape. Reads pass through the registry so projections see ONE +; shape. The registry is an immutable dict the consumer threads (no global +; mutable state). Requires: lib/persist/event.sx, lib/persist/log.sx. + +(define persist/upcasters (fn () {})) +(define persist/register-upcaster (fn (reg type fn) (assoc reg type fn))) + +; apply the registered upcaster for an event's type, or pass it through unchanged +(define + persist/upcast + (fn + (reg e) + (let ((f (get reg (persist/event-type e)))) (if f (f e) e)))) + +; read a stream with every event lifted to current shape +(define + persist/read-upcast + (fn + (b stream reg) + (map (fn (e) (persist/upcast reg e)) (persist/read b stream)))) + +; project over upcasted events — projections never see a legacy shape +(define + persist/project-upcast + (fn + (b stream reg step seed) + (reduce step seed (persist/read-upcast b stream reg)))) + +; helper: upcast an event's :data by merging in/overriding fields, keeping the +; record's stream/seq/type/at. Common upcaster body. +(define + persist/upcast-data + (fn + (e new-data) + (persist/event + (persist/event-stream e) + (persist/event-seq e) + (persist/event-type e) + (persist/event-at e) + (merge (persist/event-data e) new-data)))) diff --git a/plans/persist-on-sx.md b/plans/persist-on-sx.md index a8f94222..15d8087a 100644 --- a/plans/persist-on-sx.md +++ b/plans/persist-on-sx.md @@ -42,7 +42,7 @@ read models (feeds, indices, audit logs) update incrementally. ## Status (rolling) -`bash lib/persist/conformance.sh` → **162/162** (Phases 1–4 complete + extensions) +`bash lib/persist/conformance.sh` → **171/171** (Phases 1–4 complete + extensions) ## Ground rules @@ -167,11 +167,22 @@ over an in-process disk (the mock-IO harness). (all-or-nothing guarded by optimistic concurrency). For an order + its line items as one commit. +- [x] `upcast.sx` — event schema evolution: register a pure `(event -> event)` + upcaster per type; `read-upcast`/`project-upcast` lift old events to the + current shape on read so projections see one shape. Immutable registry; + `upcast-data` helper merges new `:data` fields. Addresses the schema-evolution + trap without rewriting history. + ## Consumers (post-foundation, not in scope here) feed/-log, flow store, mod/audit, search index, acl grants, identity sessions all become `persist` log or kv. Track each migration in that subsystem's plan. ## Progress log +- **Ext: event schema evolution (171/171).** `upcast.sx` — per-type pure + `(event -> event)` upcasters in an immutable registry; `read-upcast`/ + `project-upcast` lift legacy events to the current shape on read so + projections never branch on version. `upcast-data` merges new `:data` fields + keeping stream/seq/type/at. 9 tests incl. mixed old/new + durable. - **Ext: atomic batch append (162/162).** `batch.sx` — `persist/append-batch` commits `(type at data)` specs as one contiguous block (real cons-list, in order); `persist/append-batch-expect` checks the stream is still at expected