diff --git a/lib/persist/conformance.sh b/lib/persist/conformance.sh index 9e55295c..56aeb14a 100755 --- a/lib/persist/conformance.sh +++ b/lib/persist/conformance.sh @@ -13,7 +13,7 @@ if [ ! -x "$SX_SERVER" ]; then exit 1 fi -SUITES=(event log kv project subscribe concurrency snapshot compaction durable blob view cas catalog recovery) +SUITES=(event log kv project subscribe concurrency snapshot compaction durable blob view cas catalog query recovery) OUT_JSON="lib/persist/scoreboard.json" OUT_MD="lib/persist/scoreboard.md" @@ -39,6 +39,7 @@ run_suite() { (load "lib/persist/blob.sx") (load "lib/persist/view.sx") (load "lib/persist/catalog.sx") +(load "lib/persist/query.sx") (load "lib/persist/subscribe.sx") (load "lib/persist/api.sx") (epoch 2) diff --git a/lib/persist/query.sx b/lib/persist/query.sx new file mode 100644 index 00000000..ca05298d --- /dev/null +++ b/lib/persist/query.sx @@ -0,0 +1,54 @@ +; persist/query — read-side helpers over a stream: slice by seq range, filter by +; timestamp / type / predicate. Pure reads composed from persist/read, no +; backend changes. The log is bad at ad-hoc relational queries (project into a +; kv read model for those) but these cover the common log scans: an audit window +; by time, a type filter, a since-cursor for incremental consumers. +; Requires: lib/persist/log.sx. + +; events with seq in [from, to] inclusive +(define + persist/read-between + (fn + (b stream from to) + (filter + (fn + (e) + (and (>= (persist/event-seq e) from) (<= (persist/event-seq e) to))) + (persist/read b stream)))) + +; events at or after a timestamp (events carry :at; never a clock here) +(define + persist/read-since + (fn + (b stream at) + (filter (fn (e) (>= (persist/event-at e) at)) (persist/read b stream)))) + +; events whose :at is in [from, to] inclusive — an audit window +(define + persist/read-window + (fn + (b stream from to) + (filter + (fn + (e) + (and (>= (persist/event-at e) from) (<= (persist/event-at e) to))) + (persist/read b stream)))) + +; events matching a predicate (e -> truthy) +(define + persist/read-where + (fn (b stream pred) (filter pred (persist/read b stream)))) + +; events of a given type +(define + persist/read-by-type + (fn + (b stream type) + (filter + (fn (e) (equal? (persist/event-type e) type)) + (persist/read b stream)))) + +; count events matching a predicate +(define + persist/count-where + (fn (b stream pred) (len (persist/read-where b stream pred)))) diff --git a/lib/persist/scoreboard.json b/lib/persist/scoreboard.json index ae152373..df52825a 100644 --- a/lib/persist/scoreboard.json +++ b/lib/persist/scoreboard.json @@ -13,9 +13,10 @@ "view": {"pass": 11, "fail": 0}, "cas": {"pass": 11, "fail": 0}, "catalog": {"pass": 10, "fail": 0}, + "query": {"pass": 9, "fail": 0}, "recovery": {"pass": 6, "fail": 0} }, - "total_pass": 143, + "total_pass": 152, "total_fail": 0, - "total": 143 + "total": 152 } diff --git a/lib/persist/scoreboard.md b/lib/persist/scoreboard.md index 179e745d..5274bc6f 100644 --- a/lib/persist/scoreboard.md +++ b/lib/persist/scoreboard.md @@ -17,5 +17,6 @@ _Generated by `lib/persist/conformance.sh`_ | view | 11 | 0 | 11 | | cas | 11 | 0 | 11 | | catalog | 10 | 0 | 10 | +| query | 9 | 0 | 9 | | recovery | 6 | 0 | 6 | -| **Total** | **143** | **0** | **143** | +| **Total** | **152** | **0** | **152** | diff --git a/lib/persist/tests/query.sx b/lib/persist/tests/query.sx new file mode 100644 index 00000000..d82c48f3 --- /dev/null +++ b/lib/persist/tests/query.sx @@ -0,0 +1,101 @@ +; Extension — read-side query helpers. Assertions count / index, not map vs list. + +(define q-seqs (fn (es) (map persist/event-seq es))) + +(persist-test + "read-between slices a seq range" + (let + ((b (persist/open))) + (begin + (persist/append b "s" "x" 0 {}) + (persist/append b "s" "x" 0 {}) + (persist/append b "s" "x" 0 {}) + (persist/append b "s" "x" 0 {}) + (let + ((es (persist/read-between b "s" 2 3))) + (list + (len es) + (persist/event-seq (first es)) + (persist/event-seq (nth es 1)))))) + (list 2 2 3)) +(persist-test + "read-between is inclusive of endpoints" + (let + ((b (persist/open))) + (begin + (persist/append b "s" "x" 0 {}) + (persist/append b "s" "x" 0 {}) + (persist/append b "s" "x" 0 {}) + (len (persist/read-between b "s" 1 3)))) + 3) +(persist-test + "read-since filters by timestamp" + (let + ((b (persist/open))) + (begin + (persist/append b "s" "x" 100 {}) + (persist/append b "s" "x" 200 {}) + (persist/append b "s" "x" 300 {}) + (len (persist/read-since b "s" 200)))) + 2) +(persist-test + "read-window is an inclusive time range" + (let + ((b (persist/open))) + (begin + (persist/append b "s" "x" 100 {}) + (persist/append b "s" "x" 200 {}) + (persist/append b "s" "x" 300 {}) + (persist/append b "s" "x" 400 {}) + (len (persist/read-window b "s" 200 300)))) + 2) +(persist-test + "read-by-type filters by event type" + (let + ((b (persist/open))) + (begin + (persist/append b "s" "created" 0 {}) + (persist/append b "s" "updated" 0 {}) + (persist/append b "s" "created" 0 {}) + (len (persist/read-by-type b "s" "created")))) + 2) +(persist-test + "read-where filters by predicate over data" + (let + ((b (persist/open))) + (begin + (persist/append b "s" "x" 0 {:amt 5}) + (persist/append b "s" "x" 0 {:amt 15}) + (persist/append b "s" "x" 0 {:amt 25}) + (len + (persist/read-where + b + "s" + (fn (e) (> (get (persist/event-data e) :amt) 10)))))) + 2) +(persist-test + "count-where counts matches" + (let + ((b (persist/open))) + (begin + (persist/append b "s" "a" 0 {}) + (persist/append b "s" "b" 0 {}) + (persist/append b "s" "a" 0 {}) + (persist/count-where + b + "s" + (fn (e) (equal? (persist/event-type e) "a"))))) + 2) +(persist-test + "queries return empty on empty stream" + (len (persist/read-since (persist/open) "s" 0)) + 0) +(persist-test + "queries work on the durable backend" + (let + ((db (persist/mock-durable (persist/mem-backend)))) + (begin + (persist/append db "s" "x" 100 {}) + (persist/append db "s" "x" 200 {}) + (len (persist/read-since db "s" 150)))) + 1) diff --git a/plans/persist-on-sx.md b/plans/persist-on-sx.md index 24ef8909..34ba72a8 100644 --- a/plans/persist-on-sx.md +++ b/plans/persist-on-sx.md @@ -42,7 +42,7 @@ read models (feeds, indices, audit logs) update incrementally. ## Status (rolling) -`bash lib/persist/conformance.sh` → **143/143** (Phases 1–4 complete + extensions) +`bash lib/persist/conformance.sh` → **152/152** (Phases 1–4 complete + extensions) ## Ground rules @@ -158,11 +158,19 @@ over an in-process disk (the mock-IO harness). `stream-exists?`/`total-events`. Backend `:streams` op (from seq high-water marks, so compacted streams still list), threaded through mem + durable. +- [x] `query.sx` — read-side scans: `read-between` (seq range), `read-since`/ + `read-window` (by `:at`), `read-by-type`, `read-where`, `count-where`. Pure + reads for audit windows / type filters / since-cursors. + ## Consumers (post-foundation, not in scope here) feed/-log, flow store, mod/audit, search index, acl grants, identity sessions all become `persist` log or kv. Track each migration in that subsystem's plan. ## Progress log +- **Ext: read-side query helpers (152/152).** `query.sx` — `read-between` (seq + range), `read-since`/`read-window` (by `:at`), `read-by-type`, `read-where`, + `count-where`. Pure scans over `persist/read`; for ad-hoc relational queries + consumers still project into a kv read model. 9 tests incl. durable. - **Ext: stream catalog (143/143).** New backend op `:streams` (keys of the seq high-water-mark dict, threaded through mem-backend + durable serve/io-backend) so fully-compacted streams still enumerate. `catalog.sx`: