Compare commits

...

22 Commits

Author SHA1 Message Date
200b93c1f6 persist: Blocker spec for the host durable-storage adapter
Some checks failed
Test, Build, and Deploy / test-build-deploy (push) Failing after 36s
Document the one gap to real durability: a hosts/ servicer for the persist/*
IO ops. Includes the silent-data-loss repro (durable-backend currently no-ops
under sx_server's default resolver), the full op contract table, hard
invariants (monotonic last-seq, etc.), the blob adapter shape, where to
register in sx_server.ml, and an acceptance test (swap transport, run durable +
recovery suites against real storage, survive a real restart).

Co-Authored-By: Claude Opus 4.8 (1M context) <noreply@anthropic.com>
2026-06-06 20:52:44 +00:00
84d5732b38 persist: worked reference migration — acl grants on persist + 10 tests
Some checks failed
Test, Build, and Deploy / test-build-deploy (push) Failing after 41s
examples/acl.sx: a tested template migrating an ACL-grants store from a
hand-rolled ephemeral map to persist — grants/revokes as events, current set as
a projection, O(1) checks via a materialized view, audit via read-window.
Header carries the BEFORE->AFTER diff. Proves grants survive restart on the
durable backend (the capability the BEFORE version lacked). The pattern other
subsystem loops copy; does not touch the real lib/acl. 201/201.

Co-Authored-By: Claude Opus 4.8 (1M context) <noreply@anthropic.com>
2026-06-06 20:43:15 +00:00
a37a158d01 persist: global commit ordering across streams + 11 tests
Some checks failed
Test, Build, and Deploy / test-build-deploy (push) Failing after 46s
global.sx: persist/gappend records a pointer in a reserved $global index whose
seq is the global commit position; read-global/project-global replay every
event in commit order; global-from for incremental consumers. Opt-in (plain
append untouched); $-prefixed streams now reserved + hidden from the public
catalog (streams-all reveals them). Gives feed its unified timeline.
Deterministic across restart. 191/191.

Co-Authored-By: Claude Opus 4.8 (1M context) <noreply@anthropic.com>
2026-06-06 20:41:01 +00:00
3e90c780e9 persist: exactly-once append under retries + 9 tests
Some checks failed
Test, Build, and Deploy / test-build-deploy (push) Failing after 41s
idempotency.sx: persist/append-once appends at most once per (stream,
idempotency key), returning the same event on a repeat. The marker lives in the
kv facet, so idempotency holds across a restart (verified on durable).
persist/seen? check. 180/180.

Co-Authored-By: Claude Opus 4.8 (1M context) <noreply@anthropic.com>
2026-06-06 19:28:21 +00:00
0f6dbdfc7d persist: event schema evolution via upcasters + 9 tests
Some checks failed
Test, Build, and Deploy / test-build-deploy (push) Failing after 37s
upcast.sx: register a pure (event -> event) upcaster per type in an immutable
registry; read-upcast/project-upcast lift legacy events to the current shape on
read so projections see one shape (no version branching, no history rewrite).
upcast-data helper merges new :data fields. 171/171.

Co-Authored-By: Claude Opus 4.8 (1M context) <noreply@anthropic.com>
2026-06-06 19:26:35 +00:00
62a1485302 persist: atomic batch append — contiguous block + transactional guard + 10 tests
Some checks failed
Test, Build, and Deploy / test-build-deploy (push) Failing after 43s
batch.sx: persist/append-batch commits (type at data) specs as one contiguous
block; persist/append-batch-expect checks the stream is still at expected
before writing any event, so the batch is all-or-nothing under a concurrent
writer (conflict is a value, not a partial write). 162/162.

Co-Authored-By: Claude Opus 4.8 (1M context) <noreply@anthropic.com>
2026-06-06 19:24:35 +00:00
4e521e3d7a persist: read-side query helpers — seq/time/type/predicate scans + 9 tests
Some checks failed
Test, Build, and Deploy / test-build-deploy (push) Failing after 28s
query.sx: read-between (seq range), read-since/read-window (by :at),
read-by-type, read-where, count-where. Pure scans over persist/read for audit
windows, type filters, since-cursors. 152/152.

Co-Authored-By: Claude Opus 4.8 (1M context) <noreply@anthropic.com>
2026-06-06 19:22:03 +00:00
a00439da6e persist: stream catalog — enumerate streams + 10 tests
Some checks failed
Test, Build, and Deploy / test-build-deploy (push) Failing after 30s
New backend op :streams (from seq high-water marks, so compacted streams still
list), threaded through mem-backend + durable serve/io-backend. catalog.sx:
persist/streams, stream-count, stream-exists?, total-events. 143/143.

Co-Authored-By: Claude Opus 4.8 (1M context) <noreply@anthropic.com>
2026-06-06 19:20:22 +00:00
8e16ba6b04 persist: kv compare-and-swap + create-only put + 11 tests
Some checks failed
Test, Build, and Deploy / test-build-deploy (push) Failing after 43s
kv.sx: persist/kv-cas sets a key only if its current value equals expected,
else returns {:conflict :expected :actual}; persist/kv-put-new is create-only.
The kv analogue of log append-expect — atomic current-state for sessions, acl
grants, stock counts. 133/133.

Co-Authored-By: Claude Opus 4.8 (1M context) <noreply@anthropic.com>
2026-06-06 19:17:53 +00:00
ecdaeea223 persist: materialized views — stay current on write, O(1) read + 11 tests
Some checks failed
Test, Build, and Deploy / test-build-deploy (push) Failing after 27s
view.sx: persist/view bundles stream + fold + snapshot name; view-attach
subscribes it to a hub so each publish refreshes the snapshot incrementally,
making view-peek an O(1) current read. view-value always folds the tail so it
is never stale. The consumer read-model abstraction (feed indices, audit
rollups, search counters). 122/122.

Co-Authored-By: Claude Opus 4.8 (1M context) <noreply@anthropic.com>
2026-06-06 19:16:16 +00:00
4be6988963 persist: crash/restart recovery integration + migration notes — Phase 4 complete
Some checks failed
Test, Build, and Deploy / test-build-deploy (push) Failing after 37s
recovery.sx: 6-test end-to-end crash/restart of an order ledger (log +
subscription kv read model + snapshot + compaction + invoice blob ref) on the
durable backend; everything survives a restart over the same disk + content
store, seq continues, two restarts converge. Migration notes (mem → durable
under a live subsystem) added to the plan. Roadmap done, 111/111.

Co-Authored-By: Claude Opus 4.8 (1M context) <noreply@anthropic.com>
2026-06-06 19:14:01 +00:00
1c7b602978 persist: blob backend — store the ref/CID, never the bytes + 14 tests
Some checks failed
Test, Build, and Deploy / test-build-deploy (push) Has been cancelled
blob.sx: a blob ref is {:cid :size :mime}; the blob store is a separate
injected dependency (perform in prod, mock content store in tests).
persist/blob-store puts bytes and returns only the ref; bytes live in a
content-addressed store (artdag/IPFS). Tests assert refs in log/kv never carry
the bytes + content-address dedup. 105/105.

Co-Authored-By: Claude Opus 4.8 (1M context) <noreply@anthropic.com>
2026-06-06 19:11:48 +00:00
90c2a57975 persist: durable backend over the perform IO boundary + 15 tests
Some checks failed
Test, Build, and Deploy / test-build-deploy (push) Has been cancelled
durable.sx: io-backend with an injectable transport — persist/durable-backend
performs each op as {:op "persist/..." :args (...)} (kernel suspends, host
resumes); persist/mock-durable services via persist/serve over an in-memory
disk. Identical request shapes mean the whole facet/projection/snapshot/
compaction stack runs unchanged on the durable backend. Crash/restart replay
recovers log+kv+snapshot. 91/91.

Co-Authored-By: Claude Opus 4.8 (1M context) <noreply@anthropic.com>
2026-06-06 19:09:12 +00:00
aff7d1e84f persist: compaction — drop snapshotted prefix, monotonic seq + 11 tests
Some checks failed
Test, Build, and Deploy / test-build-deploy (push) Failing after 1m0s
Backend now tracks last-seq as a monotonic high-water mark (survives
truncation) and exposes :truncate-through. compaction.sx: persist/compact
checkpoints then drops events with seq <= snapshot seq; should-compact?/
maybe-compact give an explicit every-N policy. Determinism: post-compaction
replay value == uncompacted full replay. Phase 3 complete, 76/76.

Co-Authored-By: Claude Opus 4.8 (1M context) <noreply@anthropic.com>
2026-06-06 18:42:06 +00:00
b0874b1282 persist: snapshots — checkpoint + replay = snapshot + tail + 11 tests
Some checks are pending
Test, Build, and Deploy / test-build-deploy (push) Waiting to run
snapshot.sx: snapshot is a projection state {:value :seq} stored in kv under
snapshot/<name>. persist/checkpoint replays and saves; persist/replay folds
only the tail after the snapshot. Tests assert snapshot+tail == full replay
both ways + determinism. 65/65.

Co-Authored-By: Claude Opus 4.8 (1M context) <noreply@anthropic.com>
2026-06-06 18:39:41 +00:00
156d6f12ec persist: optimistic concurrency — conflict as a real result + 8 tests
Some checks failed
Test, Build, and Deploy / test-build-deploy (push) Has been cancelled
concurrency.sx: persist/append-expect refuses an append when the stream
advanced past the caller's expected seq, returning {:conflict :expected
:actual} instead of crashing or overwriting. persist/conflict? + accessors.
Phase 2 complete, 54/54.

Co-Authored-By: Claude Opus 4.8 (1M context) <noreply@anthropic.com>
2026-06-06 18:37:49 +00:00
03da8d4328 persist: subscription hub — read models update on publish + 9 tests
Some checks failed
Test, Build, and Deploy / test-build-deploy (push) Failing after 1m6s
subscribe.sx: persist/hub wraps a backend; persist/publish appends then fires
per-stream callbacks (backend stream event). Direct persist/append bypasses
subscribers (bulk load/replay). Callbacks drive kv counters / project-resume. 46/46.

Co-Authored-By: Claude Opus 4.8 (1M context) <noreply@anthropic.com>
2026-06-06 18:36:16 +00:00
a6864178c3 persist: projections — fold stream into read model, incremental resume + 9 tests
Some checks failed
Test, Build, and Deploy / test-build-deploy (push) Failing after 1m9s
project.sx: projection state {:value :seq}; persist/project folds the whole
stream, persist/project-resume folds only the tail so read models update
incrementally. Pure step (value event)->value. 37/37.

Co-Authored-By: Claude Opus 4.8 (1M context) <noreply@anthropic.com>
2026-06-06 18:34:52 +00:00
314cc37030 persist: Phase 1 — log + kv facets on injectable in-memory backend + 28 tests
Some checks failed
Test, Build, and Deploy / test-build-deploy (push) Failing after 1m3s
event/backend/log/kv/api over one injected backend protocol (mem default).
log: append/read/read-from, sequential per-stream seq, stream isolation.
kv: get/put/delete/has?/keys/get-or/update. conformance.sh + 3 suites, 28/28.

Co-Authored-By: Claude Opus 4.8 (1M context) <noreply@anthropic.com>
2026-06-06 18:32:51 +00:00
b80cc32363 briefings: add persist-on-sx loop briefing
Some checks failed
Test, Build, and Deploy / test-build-deploy (push) Failing after 1m2s
Co-Authored-By: Claude Opus 4.8 (1M context) <noreply@anthropic.com>
2026-06-06 18:24:52 +00:00
1902cce57f plans: rename store-on-sx → persist-on-sx; clarify it's persistence not shop, and scope (log+kv facets, blobs delegated, cache excluded)
Co-Authored-By: Claude Opus 4.8 (1M context) <noreply@anthropic.com>
2026-06-06 18:20:14 +00:00
ff537bfba2 plans: six subsystem outline plans for the SX rewrite (store, commerce, identity, content, events, host)
Gap analysis from the five-subsystem set (acl/feed/flow/mod/search):
- store-on-sx: event-sourcing foundation the others fake with in-memory lists (build first)
- commerce-on-sx: catalog/cart/pricing/orders on miniKanren (+ store + flow)
- identity-on-sx: OAuth2/sessions/membership on Erlang (the core acl assumes)
- content-on-sx: documents/blocks/CRDT on Smalltalk
- events-on-sx: calendar/ticketing on Datalog + flow-driven delivery
- host-on-sx: the web boundary — off Quart onto native server+SXTP now, dream-on-sx next

All DRAFT outlines; substrate choices proposed, not final.

Co-Authored-By: Claude Opus 4.8 (1M context) <noreply@anthropic.com>
2026-06-06 17:39:29 +00:00
50 changed files with 4081 additions and 0 deletions

10
lib/persist/api.sx Normal file
View File

@@ -0,0 +1,10 @@
; persist/api — the public entry point. persist/open returns a backend (the
; in-memory one by default; pass a custom backend to inject file/pg/ipfs-ref).
; All facet functions take this backend as their first argument.
; Requires: lib/persist/backend.sx, lib/persist/log.sx, lib/persist/kv.sx.
(define
persist/open
(fn
(&rest args)
(if (= (len args) 0) (persist/mem-backend) (first args))))

34
lib/persist/backend.sx Normal file
View File

@@ -0,0 +1,34 @@
; persist/backend — the injected storage protocol. Every facet (log, kv,
; snapshot) goes through a backend dict, never touching storage directly, so
; file/pg/ipfs-ref backends swap in unchanged. A backend is a dict of fns:
; {:append :read :last-seq :truncate-through :streams
; :kv-get :kv-put :kv-delete :kv-has? :kv-keys}
; The in-memory backend is the test default. State is three dicts held in a
; closure and mutated with set!: logs (stream -> event list), seqs (stream ->
; last assigned seq — a monotonic high-water mark that survives compaction so
; truncating the log prefix never lets a future append reuse a seq), kv. The
; stream catalog comes from seqs, so a fully-compacted stream still lists.
(define
persist/mem-backend
(fn
()
(let ((logs {}) (seqs {}) (kv {})) {:truncate-through (fn (stream n) (let ((cur (get logs stream))) (set! logs (assoc logs stream (filter (fn (e) (> (persist/event-seq e) n)) (if cur cur (list))))))) :kv-keys (fn () (keys kv)) :read (fn (stream) (let ((cur (get logs stream))) (if cur cur (list)))) :kv-has? (fn (key) (has-key? kv key)) :last-seq (fn (stream) (let ((s (get seqs stream))) (if s s 0))) :streams (fn () (keys seqs)) :append (fn (stream event) (begin (let ((cur (get logs stream))) (set! logs (assoc logs stream (append (if cur cur (list)) event)))) (set! seqs (assoc seqs stream (persist/event-seq event))))) :kv-delete (fn (key) (set! kv (dissoc kv key))) :kv-put (fn (key val) (set! kv (assoc kv key val))) :kv-get (fn (key) (get kv key))})))
; protocol accessors — call a backend op by keyword
(define
persist/backend-append
(fn (b stream event) ((get b :append) stream event)))
(define persist/backend-read (fn (b stream) ((get b :read) stream)))
(define
persist/backend-last-seq
(fn (b stream) ((get b :last-seq) stream)))
(define persist/backend-streams (fn (b) ((get b :streams))))
(define
persist/backend-truncate
(fn (b stream n) ((get b :truncate-through) stream n)))
(define persist/backend-kv-get (fn (b key) ((get b :kv-get) key)))
(define persist/backend-kv-put (fn (b key val) ((get b :kv-put) key val)))
(define persist/backend-kv-delete (fn (b key) ((get b :kv-delete) key)))
(define persist/backend-kv-has? (fn (b key) ((get b :kv-has?) key)))
(define persist/backend-kv-keys (fn (b) ((get b :kv-keys))))

40
lib/persist/batch.sx Normal file
View File

@@ -0,0 +1,40 @@
; persist/batch — commit several events to a stream as one contiguous block.
; Each spec is (type at data). Plain append-batch always appends; the -expect
; form is the transactional commit: it checks the stream is still at `expected`
; before writing ANY event, so a batch is all-or-nothing under a concurrent
; writer (conflict is a value, not a partial write). For an order + its line
; items, an audit entry + its reason, etc. Requires: lib/persist/log.sx.
; append a list of (type at data) specs as one block; returns the stored events
; (a real cons-list, in order, with contiguous seqs)
(define
persist/append-batch
(fn
(b stream specs)
(reverse
(reduce
(fn
(acc spec)
(cons
(persist/append
b
stream
(first spec)
(nth spec 1)
(nth spec 2))
acc))
(list)
specs))))
; transactional batch: commit all specs only if the stream is still at expected,
; else return a conflict and write nothing
(define
persist/append-batch-expect
(fn
(b stream expected specs)
(let
((actual (persist/last-seq b stream)))
(if
(= actual expected)
(persist/append-batch b stream specs)
{:actual actual :expected expected :conflict true}))))

66
lib/persist/blob.sx Normal file
View File

@@ -0,0 +1,66 @@
; persist/blob — large objects (images, media) are NOT persist's to hold. They
; live in a content-addressed store (artdag/IPFS); persist stores only a
; reference: {:cid :size :mime}. The blob store is a SEPARATE injected
; dependency with its own transport (perform in production, a mock content store
; in tests), distinct from the event/kv backend. The invariant: a blob ref that
; lands in the log or kv carries the CID + metadata and never the bytes.
; Requires: lib/persist/backend.sx.
(define persist/blob-ref (fn (cid size mime) {:mime mime :size size :cid cid}))
(define persist/blob-ref? (fn (r) (has-key? r :cid)))
(define persist/blob-cid (fn (r) (get r :cid)))
(define persist/blob-size (fn (r) (get r :size)))
(define persist/blob-mime (fn (r) (get r :mime)))
; blob store protocol over an injectable transport
(define persist/blob-io (fn (transport) {:put (fn (bytes mime) (transport {:op "blob/put" :args (list bytes mime)})) :get (fn (cid) (transport {:op "blob/get" :args (list cid)})) :has? (fn (cid) (transport {:op "blob/has?" :args (list cid)}))}))
; production blob store — transport is the kernel's perform
(define
persist/blob-store-backend
(fn () (persist/blob-io (fn (req) (perform req)))))
; store bytes via the blob backend; return ONLY the ref (cid + metadata) — this
; is what the caller persists in the log/kv. The bytes never enter persist.
(define
persist/blob-store
(fn
(blob bytes mime)
(let
((cid ((get blob :put) bytes mime)))
(persist/blob-ref cid (len bytes) mime))))
(define
persist/blob-fetch
(fn (blob ref) ((get blob :get) (persist/blob-cid ref))))
(define
persist/blob-exists?
(fn (blob ref) ((get blob :has?) (persist/blob-cid ref))))
; mock content-addressed store (stands in for artdag/IPFS). CID is a
; deterministic content address: identical bytes dedupe to one CID. A real
; store computes a SHA3/IPFS CID host-side; the prefix keeps the mock readable.
(define persist/blob-cid-of (fn (bytes) (str "cid:" bytes)))
(define
persist/blob-serve
(fn
(store req)
(let
((op (get req :op)) (args (get req :args)))
(cond
((equal? op "blob/put")
(let
((cid (persist/blob-cid-of (first args))))
(begin (persist/backend-kv-put store cid (first args)) cid)))
((equal? op "blob/get") (persist/backend-kv-get store (first args)))
((equal? op "blob/has?")
(persist/backend-kv-has? store (first args)))
(else (error (str "persist/blob-serve: unknown op " op)))))))
(define
persist/blob-mock-transport
(fn (store) (fn (req) (persist/blob-serve store req))))
(define
persist/mock-blob
(fn (store) (persist/blob-io (persist/blob-mock-transport store))))

35
lib/persist/catalog.sx Normal file
View File

@@ -0,0 +1,35 @@
; persist/catalog — enumerate the streams a backend holds. The catalog is the
; set of streams ever appended to (from the seq high-water marks), so a stream
; whose log has been fully compacted still appears. $-prefixed streams are
; reserved for internal indexes (e.g. the $global commit index) and are hidden
; from the public catalog; use streams-all to see them. For admin, global ops,
; and cross-stream tooling. Requires: lib/persist/backend.sx, lib/persist/log.sx.
(define persist/reserved-stream? (fn (s) (starts-with? s "$")))
; every stream including reserved internal indexes
(define persist/streams-all (fn (b) (persist/backend-streams b)))
; public streams (reserved internal indexes hidden)
(define
persist/streams
(fn
(b)
(filter
(fn (s) (not (persist/reserved-stream? s)))
(persist/streams-all b))))
(define persist/stream-count (fn (b) (len (persist/streams b))))
(define
persist/stream-exists?
(fn (b stream) (contains? (persist/streams b) stream)))
; total logical events across all public streams (sum of high-water marks)
(define
persist/total-events
(fn
(b)
(reduce
(fn (acc s) (+ acc (persist/last-seq b s)))
0
(persist/streams b))))

43
lib/persist/compaction.sx Normal file
View File

@@ -0,0 +1,43 @@
; persist/compaction — once a snapshot subsumes a log prefix, those events are
; dead weight: replay starts from the snapshot, so events with seq <= the
; snapshot's seq are never folded again. Compaction checkpoints then truncates
; that prefix. The seq counter is monotonic (backend high-water mark) so future
; appends keep climbing — the surviving tail keeps its original seqs and replay
; from the snapshot still equals a full replay of the pre-compaction log.
; Policy is explicit: compact when the uncompacted tail reaches `every` events.
; Requires: lib/persist/snapshot.sx, lib/persist/log.sx.
; events accumulated since the last snapshot for name
(define
persist/uncompacted
(fn
(b stream name seed)
(-
(persist/last-seq b stream)
(persist/project-seq (persist/snapshot-load b name seed)))))
; policy: should we compact yet? tail since snapshot >= every
(define
persist/should-compact?
(fn
(b stream name every seed)
(>= (persist/uncompacted b stream name seed) every)))
; checkpoint then drop the snapshotted prefix; returns the new snapshot state
(define
persist/compact
(fn
(b stream name step seed)
(let
((state (persist/checkpoint b stream name step seed)))
(begin (persist/truncate b stream (persist/project-seq state)) state))))
; compact only if the policy fires; always returns the current snapshot state
(define
persist/maybe-compact
(fn
(b stream name step seed every)
(if
(persist/should-compact? b stream name every seed)
(persist/compact b stream name step seed)
(persist/snapshot-load b name seed))))

View File

@@ -0,0 +1,24 @@
; persist/concurrency — optimistic concurrency for the log facet. The caller
; passes the seq it believes is current (the last-seq it last observed). If the
; stream has advanced since, the append is refused and a conflict VALUE is
; returned — never a crash, never a silent overwrite. The caller re-reads the
; tail and retries. This is the substrate-level answer to "two writers, one
; stream": the loser gets a result it can act on.
; Requires: lib/persist/log.sx.
(define
persist/append-expect
(fn
(b stream expected type at data)
(let
((actual (persist/last-seq b stream)))
(if
(= actual expected)
(persist/append b stream type at data)
{:actual actual :expected expected :conflict true}))))
(define
persist/conflict?
(fn (r) (if (has-key? r :conflict) (get r :conflict) false)))
(define persist/conflict-expected (fn (r) (get r :expected)))
(define persist/conflict-actual (fn (r) (get r :actual)))

128
lib/persist/conformance.sh Executable file
View File

@@ -0,0 +1,128 @@
#!/usr/bin/env bash
# lib/persist/conformance.sh — run persist test suites, emit scoreboard.json + scoreboard.md.
set -uo pipefail
cd "$(git rev-parse --show-toplevel)"
SX_SERVER="${SX_SERVER:-/root/rose-ash/hosts/ocaml/_build/default/bin/sx_server.exe}"
if [ ! -x "$SX_SERVER" ]; then
SX_SERVER="hosts/ocaml/_build/default/bin/sx_server.exe"
fi
if [ ! -x "$SX_SERVER" ]; then
echo "ERROR: sx_server.exe not found." >&2
exit 1
fi
SUITES=(event log kv project subscribe concurrency snapshot compaction durable blob view cas catalog query batch upcast idempotency global example-acl recovery)
OUT_JSON="lib/persist/scoreboard.json"
OUT_MD="lib/persist/scoreboard.md"
run_suite() {
local suite=$1
local file="lib/persist/tests/${suite}.sx"
local TMP
TMP=$(mktemp)
cat > "$TMP" << EPOCHS
(epoch 1)
(load "spec/stdlib.sx")
(load "lib/r7rs.sx")
(load "lib/persist/event.sx")
(load "lib/persist/backend.sx")
(load "lib/persist/log.sx")
(load "lib/persist/kv.sx")
(load "lib/persist/project.sx")
(load "lib/persist/concurrency.sx")
(load "lib/persist/snapshot.sx")
(load "lib/persist/compaction.sx")
(load "lib/persist/durable.sx")
(load "lib/persist/blob.sx")
(load "lib/persist/view.sx")
(load "lib/persist/catalog.sx")
(load "lib/persist/query.sx")
(load "lib/persist/batch.sx")
(load "lib/persist/upcast.sx")
(load "lib/persist/idempotency.sx")
(load "lib/persist/global.sx")
(load "lib/persist/examples/acl.sx")
(load "lib/persist/subscribe.sx")
(load "lib/persist/api.sx")
(epoch 2)
(eval "(define persist-test-pass 0)")
(eval "(define persist-test-fail 0)")
(eval "(define persist-test (fn (name got expected) (if (equal? got expected) (set! persist-test-pass (+ persist-test-pass 1)) (set! persist-test-fail (+ persist-test-fail 1)))))")
(epoch 3)
(load "${file}")
(epoch 4)
(eval "(list persist-test-pass persist-test-fail)")
EPOCHS
local OUTPUT
OUTPUT=$(timeout 300 "$SX_SERVER" < "$TMP" 2>/dev/null)
rm -f "$TMP"
local LINE
LINE=$(echo "$OUTPUT" | awk '/^\(ok-len 4 / {getline; print; exit}')
if [ -z "$LINE" ]; then
LINE=$(echo "$OUTPUT" | grep -E '^\(ok 4 \([0-9]+ [0-9]+\)\)' | tail -1 \
| sed -E 's/^\(ok 4 //; s/\)$//')
fi
local P F
P=$(echo "$LINE" | sed -E 's/^\(([0-9]+) ([0-9]+)\).*/\1/')
F=$(echo "$LINE" | sed -E 's/^\(([0-9]+) ([0-9]+)\).*/\2/')
P=${P:-0}
F=${F:-0}
echo "${P} ${F}"
}
declare -A SUITE_PASS
declare -A SUITE_FAIL
TOTAL_PASS=0
TOTAL_FAIL=0
echo "Running persist conformance suite..." >&2
for s in "${SUITES[@]}"; do
read -r p f < <(run_suite "$s")
SUITE_PASS[$s]=$p
SUITE_FAIL[$s]=$f
TOTAL_PASS=$((TOTAL_PASS + p))
TOTAL_FAIL=$((TOTAL_FAIL + f))
printf " %-12s %d/%d\n" "$s" "$p" "$((p+f))" >&2
done
# scoreboard.json
{
printf '{\n'
printf ' "suites": {\n'
first=1
for s in "${SUITES[@]}"; do
if [ $first -eq 0 ]; then printf ',\n'; fi
printf ' "%s": {"pass": %d, "fail": %d}' "$s" "${SUITE_PASS[$s]}" "${SUITE_FAIL[$s]}"
first=0
done
printf '\n },\n'
printf ' "total_pass": %d,\n' "$TOTAL_PASS"
printf ' "total_fail": %d,\n' "$TOTAL_FAIL"
printf ' "total": %d\n' "$((TOTAL_PASS + TOTAL_FAIL))"
printf '}\n'
} > "$OUT_JSON"
# scoreboard.md
{
printf '# persist Conformance Scoreboard\n\n'
printf '_Generated by `lib/persist/conformance.sh`_\n\n'
printf '| Suite | Pass | Fail | Total |\n'
printf '|-------|-----:|-----:|------:|\n'
for s in "${SUITES[@]}"; do
p=${SUITE_PASS[$s]}
f=${SUITE_FAIL[$s]}
printf '| %s | %d | %d | %d |\n' "$s" "$p" "$f" "$((p+f))"
done
printf '| **Total** | **%d** | **%d** | **%d** |\n' "$TOTAL_PASS" "$TOTAL_FAIL" "$((TOTAL_PASS + TOTAL_FAIL))"
} > "$OUT_MD"
echo "Wrote $OUT_JSON and $OUT_MD" >&2
echo "Total: $TOTAL_PASS pass, $TOTAL_FAIL fail" >&2
[ "$TOTAL_FAIL" -eq 0 ]

71
lib/persist/durable.sx Normal file
View File

@@ -0,0 +1,71 @@
; persist/durable — a backend whose every op crosses the kernel's IO-suspension
; boundary. Each op performs an IO request {:op "persist/..." :args (...)};
; under the real kernel `perform` suspends the CEK machine and the host (file,
; pg, ipfs-ref) services the request and resumes with the result — so the facet
; code above (log/kv/project/snapshot/compaction) never changes. The TRANSPORT
; is injectable: production passes the kernel's perform; tests pass a mock
; servicer over an in-memory disk. Same request shapes either way, so the whole
; existing facet stack runs unchanged on the mock-durable backend.
; Requires: lib/persist/backend.sx.
; request encoders — the exact payloads the durable backend performs
(define persist/req-append (fn (stream event) {:op "persist/append" :args (list stream event)}))
(define persist/req-read (fn (stream) {:op "persist/read" :args (list stream)}))
(define persist/req-last-seq (fn (stream) {:op "persist/last-seq" :args (list stream)}))
(define persist/req-streams (fn () {:op "persist/streams" :args (list)}))
(define persist/req-truncate (fn (stream n) {:op "persist/truncate" :args (list stream n)}))
(define persist/req-kv-get (fn (key) {:op "persist/kv-get" :args (list key)}))
(define persist/req-kv-put (fn (key val) {:op "persist/kv-put" :args (list key val)}))
(define persist/req-kv-delete (fn (key) {:op "persist/kv-delete" :args (list key)}))
(define persist/req-kv-has? (fn (key) {:op "persist/kv-has?" :args (list key)}))
(define persist/req-kv-keys (fn () {:op "persist/kv-keys" :args (list)}))
; a backend parameterized over a transport (req -> response)
(define persist/io-backend (fn (transport) {:truncate-through (fn (stream n) (transport (persist/req-truncate stream n))) :kv-keys (fn () (transport (persist/req-kv-keys))) :read (fn (stream) (transport (persist/req-read stream))) :kv-has? (fn (key) (transport (persist/req-kv-has? key))) :last-seq (fn (stream) (transport (persist/req-last-seq stream))) :streams (fn () (transport (persist/req-streams))) :append (fn (stream event) (transport (persist/req-append stream event))) :kv-delete (fn (key) (transport (persist/req-kv-delete key))) :kv-put (fn (key val) (transport (persist/req-kv-put key val))) :kv-get (fn (key) (transport (persist/req-kv-get key)))}))
; production backend — transport is the kernel's perform (suspends; host resumes)
(define
persist/durable-backend
(fn () (persist/io-backend (fn (req) (perform req)))))
; reference host: service one request against a disk (any backend protocol impl).
; This is what a real host plugs into the kernel's IO resolver, and the mock-IO
; harness for tests: it never touches a real disk, just an in-memory backend.
(define
persist/serve
(fn
(disk req)
(let
((op (get req :op)) (args (get req :args)))
(cond
((equal? op "persist/append")
(persist/backend-append disk (first args) (nth args 1)))
((equal? op "persist/read")
(persist/backend-read disk (first args)))
((equal? op "persist/last-seq")
(persist/backend-last-seq disk (first args)))
((equal? op "persist/streams") (persist/backend-streams disk))
((equal? op "persist/truncate")
(persist/backend-truncate disk (first args) (nth args 1)))
((equal? op "persist/kv-get")
(persist/backend-kv-get disk (first args)))
((equal? op "persist/kv-put")
(persist/backend-kv-put disk (first args) (nth args 1)))
((equal? op "persist/kv-delete")
(persist/backend-kv-delete disk (first args)))
((equal? op "persist/kv-has?")
(persist/backend-kv-has? disk (first args)))
((equal? op "persist/kv-keys") (persist/backend-kv-keys disk))
(else (error (str "persist/serve: unknown op " op)))))))
; mock transport: a perform-replacement that services against a disk in-process
(define
persist/mock-transport
(fn (disk) (fn (req) (persist/serve disk req))))
; a durable backend wired to a mock disk — exercises the full io-backend path
; (request-encode -> serve -> disk) with no suspension, so the existing facet
; suite runs against it unchanged.
(define
persist/mock-durable
(fn (disk) (persist/io-backend (persist/mock-transport disk))))

13
lib/persist/event.sx Normal file
View File

@@ -0,0 +1,13 @@
; persist/event — an event is the unit of the log facet:
; {:stream :seq :type :at :data}
; stream = which append-only stream, seq = 1-based position within it,
; type = event kind, at = caller-supplied timestamp (never a clock here:
; replay must stay pure), data = payload dict.
(define persist/event (fn (stream seq type at data) {:data data :type type :at at :stream stream :seq seq}))
(define persist/event-stream (fn (e) (get e :stream)))
(define persist/event-seq (fn (e) (get e :seq)))
(define persist/event-type (fn (e) (get e :type)))
(define persist/event-at (fn (e) (get e :at)))
(define persist/event-data (fn (e) (get e :data)))

View File

@@ -0,0 +1,79 @@
; persist/examples/acl — a WORKED MIGRATION REFERENCE. A subsystem (acl grants:
; who may access what) currently hand-rolls an in-memory mutable map that loses
; every grant on restart and keeps no audit trail. This shows the same subsystem
; rebuilt on persist. It is the template other subsystem loops copy; it does NOT
; touch the real lib/acl (out of this loop's scope).
;
; BEFORE — hand-rolled, ephemeral, no history, no concurrency safety:
; (define acl-grants {}) ; resource -> principal list (mutable)
; (define acl-grant! (fn (r p) (set! acl-grants (assoc acl-grants r (cons p (get acl-grants r))))))
; (define acl-revoke! (fn (r p) (set! acl-grants (assoc acl-grants r (remove p ...)))))
; (define acl-can? (fn (r p) (contains? (get acl-grants r) p)))
; ;; vanishes on restart; "when/why was X granted?" is unanswerable.
;
; AFTER — on persist. Grants/revokes are EVENTS (history matters), the current
; grant set is a PROJECTION, checks read a materialized VIEW, and the audit trail
; is a time-windowed query. Every fn takes a backend `b`, so the same code runs
; on the in-memory backend today and the durable backend unchanged.
; Requires: lib/persist/log.sx, lib/persist/project.sx, lib/persist/view.sx,
; lib/persist/query.sx.
(define acl/stream (fn (resource) (str "acl/" resource)))
; write side — grant/revoke append events (the history is the source of truth)
(define
acl/grant
(fn
(b resource principal at)
(persist/append b (acl/stream resource) "granted" at {:principal principal})))
(define
acl/revoke
(fn
(b resource principal at)
(persist/append b (acl/stream resource) "revoked" at {:principal principal})))
; fold step: grant adds a principal (once), revoke removes it
(define
acl/step
(fn
(set e)
(let
((p (get (persist/event-data e) :principal)))
(if
(equal? (persist/event-type e) "granted")
(if (contains? set p) set (append set p))
(filter (fn (x) (not (equal? x p))) set)))))
; read side — current grant set + membership check (replays the log)
(define
acl/grants
(fn
(b resource)
(persist/project-fold b (acl/stream resource) acl/step (list))))
(define
acl/can?
(fn (b resource principal) (contains? (acl/grants b resource) principal)))
; materialized view — attach to a hub for O(1) checks that stay current on write
(define
acl/view
(fn
(resource)
(persist/view
(str "acl-current/" resource)
(acl/stream resource)
acl/step
(list))))
(define
acl/can-fast?
(fn
(b resource principal)
(contains? (persist/view-peek b (acl/view resource)) principal)))
; audit — grants/revokes for a resource in a time window (the new capability the
; hand-rolled version could never answer)
(define
acl/audit-window
(fn
(b resource from to)
(persist/read-window b (acl/stream resource) from to)))

55
lib/persist/global.sx Normal file
View File

@@ -0,0 +1,55 @@
; persist/global — a global commit ordering across streams. Per-stream seqs only
; order within a stream; a unified timeline (e.g. feed's home feed, a global
; audit trail) needs a single order across streams. `persist/gappend` appends to
; the target stream and then records a pointer in a reserved $global index whose
; own seq IS the global commit position. Reading the index in order and
; resolving each pointer yields every event in commit order. This is opt-in:
; streams that don't need global ordering use plain persist/append and never
; touch $global. Determinism: the order is the $global append order, replayed
; identically. Requires: lib/persist/log.sx, lib/persist/catalog.sx.
(define persist/global-stream "$global")
; append with a global commit position. Returns the stored stream event; the
; event's global position is the seq of its pointer in $global.
(define
persist/gappend
(fn
(b stream type at data)
(let
((ev (persist/append b stream type at data)))
(begin (persist/append b persist/global-stream "ref" at {:stream stream :seq (persist/event-seq ev)}) ev))))
; the global index: pointer events in commit order (each pointer's seq = gpos)
(define persist/global-log (fn (b) (persist/read b persist/global-stream)))
; the current global commit position (count of globally-ordered appends)
(define
persist/global-pos
(fn (b) (persist/last-seq b persist/global-stream)))
; resolve a pointer event to the actual stream event it references
(define
persist/resolve-ref
(fn
(b ptr)
(let
((d (persist/event-data ptr)))
(first (persist/read-from b (get d :stream) (get d :seq))))))
; every globally-ordered event, in commit order
(define
persist/read-global
(fn
(b)
(map (fn (ptr) (persist/resolve-ref b ptr)) (persist/global-log b))))
; pointer events at or after a global position (incremental global consumers)
(define
persist/global-from
(fn (b gpos) (persist/read-from b persist/global-stream gpos)))
; fold over all events in global commit order
(define
persist/project-global
(fn (b step seed) (reduce step seed (persist/read-global b))))

View File

@@ -0,0 +1,28 @@
; persist/idempotency — exactly-once append under retries. A command retried
; after a network blip must not append its event twice. The caller supplies an
; idempotency key; the first append for that (stream, key) stores the event and
; remembers the key in the kv facet; a repeat returns the SAME event without
; appending. Because the marker lives in kv, idempotency holds across a restart
; too. Keyed per stream. Requires: lib/persist/log.sx, lib/persist/kv.sx.
(define persist/idem-key (fn (stream key) (str "idem/" stream "/" key)))
; true if an append-once has already been recorded for (stream, key)
(define
persist/seen?
(fn (b stream key) (persist/kv-has? b (persist/idem-key stream key))))
; append at most once per (stream, key). Returns the stored event either way —
; freshly appended on first use, the remembered one on a repeat.
(define
persist/append-once
(fn
(b stream key type at data)
(let
((k (persist/idem-key stream key)))
(if
(persist/kv-has? b k)
(persist/kv-get b k)
(let
((ev (persist/append b stream type at data)))
(begin (persist/kv-put b k ev) ev))))))

44
lib/persist/kv.sx Normal file
View File

@@ -0,0 +1,44 @@
; persist/kv — the kv facet: current-state values, no history. For things
; whose history does NOT matter (stock counts, config, profiles, session
; blobs) and where projections materialize their read models.
; Requires: lib/persist/backend.sx.
(define persist/kv-get (fn (b key) (persist/backend-kv-get b key)))
(define
persist/kv-put
(fn (b key val) (begin (persist/backend-kv-put b key val) val)))
(define persist/kv-delete (fn (b key) (persist/backend-kv-delete b key)))
(define persist/kv-has? (fn (b key) (persist/backend-kv-has? b key)))
(define persist/kv-keys (fn (b) (persist/backend-kv-keys b)))
; get with a default when the key is absent
(define
persist/kv-get-or
(fn
(b key dflt)
(if (persist/kv-has? b key) (persist/kv-get b key) dflt)))
; read-modify-write: apply f to the current value (or dflt if absent), store result
(define
persist/kv-update
(fn
(b key dflt f)
(persist/kv-put b key (f (persist/kv-get-or b key dflt)))))
; compare-and-swap: set key to new ONLY if its current value equals expected.
; Returns new on success, or a conflict value {:conflict true :expected :actual}
; the caller can re-read and retry on. The kv analogue of log append-expect.
(define
persist/kv-cas
(fn
(b key expected new)
(let
((actual (persist/kv-get b key)))
(if (equal? actual expected) (persist/kv-put b key new) {:actual actual :expected expected :conflict true}))))
; create-only: put a value only if the key is absent; conflict if it exists
(define
persist/kv-put-new
(fn
(b key val)
(if (persist/kv-has? b key) {:actual (persist/kv-get b key) :conflict true :reason "exists"} (persist/kv-put b key val))))

43
lib/persist/log.sx Normal file
View File

@@ -0,0 +1,43 @@
; persist/log — the log facet: append-only event streams. seq is assigned from
; a monotonic per-stream high-water mark (1-based) held by the backend, so it
; keeps climbing even after the log prefix is compacted away. Reads return the
; events currently stored, oldest-first.
; Requires: lib/persist/event.sx, lib/persist/backend.sx.
; logical last seq assigned in a stream (0 if none) — survives compaction
(define
persist/last-seq
(fn (b stream) (persist/backend-last-seq b stream)))
; number of events physically stored in a stream (shrinks on compaction)
(define
persist/count
(fn (b stream) (len (persist/backend-read b stream))))
; append an event, auto-assigning the next seq. Returns the stored event.
(define
persist/append
(fn
(b stream type at data)
(let
((seq (+ 1 (persist/last-seq b stream))))
(let
((ev (persist/event stream seq type at data)))
(begin (persist/backend-append b stream ev) ev)))))
; read all events currently stored in a stream, oldest-first
(define persist/read (fn (b stream) (persist/backend-read b stream)))
; read events with seq >= from
(define
persist/read-from
(fn
(b stream from)
(filter
(fn (e) (>= (persist/event-seq e) from))
(persist/read b stream))))
; drop events with seq <= n (compaction); the seq counter is untouched
(define
persist/truncate
(fn (b stream n) (persist/backend-truncate b stream n)))

30
lib/persist/project.sx Normal file
View File

@@ -0,0 +1,30 @@
; persist/project — a projection folds a stream's events into a read model.
; A projection state is {:value v :seq s} where s is the last seq folded in,
; so a projection can resume incrementally from where it left off (replay only
; the tail). step : (value event) -> value. Determinism: step must be pure —
; time lives on the event (event-at), never a clock here.
; Requires: lib/persist/event.sx, lib/persist/log.sx.
; fold the tail (events with seq > prior's seq) onto a prior projection state
(define
persist/project-resume
(fn
(b stream step prior)
(let
((tail (persist/read-from b stream (+ 1 (get prior :seq)))))
(reduce (fn (acc e) {:value (step (get acc :value) e) :seq (persist/event-seq e)}) prior tail))))
; project the whole stream from seed
(define
persist/project
(fn (b stream step seed) (persist/project-resume b stream step {:value seed :seq 0})))
(define persist/project-value (fn (p) (get p :value)))
(define persist/project-seq (fn (p) (get p :seq)))
; convenience: project and return just the value
(define
persist/project-fold
(fn
(b stream step seed)
(persist/project-value (persist/project b stream step seed))))

54
lib/persist/query.sx Normal file
View File

@@ -0,0 +1,54 @@
; persist/query — read-side helpers over a stream: slice by seq range, filter by
; timestamp / type / predicate. Pure reads composed from persist/read, no
; backend changes. The log is bad at ad-hoc relational queries (project into a
; kv read model for those) but these cover the common log scans: an audit window
; by time, a type filter, a since-cursor for incremental consumers.
; Requires: lib/persist/log.sx.
; events with seq in [from, to] inclusive
(define
persist/read-between
(fn
(b stream from to)
(filter
(fn
(e)
(and (>= (persist/event-seq e) from) (<= (persist/event-seq e) to)))
(persist/read b stream))))
; events at or after a timestamp (events carry :at; never a clock here)
(define
persist/read-since
(fn
(b stream at)
(filter (fn (e) (>= (persist/event-at e) at)) (persist/read b stream))))
; events whose :at is in [from, to] inclusive — an audit window
(define
persist/read-window
(fn
(b stream from to)
(filter
(fn
(e)
(and (>= (persist/event-at e) from) (<= (persist/event-at e) to)))
(persist/read b stream))))
; events matching a predicate (e -> truthy)
(define
persist/read-where
(fn (b stream pred) (filter pred (persist/read b stream))))
; events of a given type
(define
persist/read-by-type
(fn
(b stream type)
(filter
(fn (e) (equal? (persist/event-type e) type))
(persist/read b stream))))
; count events matching a predicate
(define
persist/count-where
(fn (b stream pred) (len (persist/read-where b stream pred))))

View File

@@ -0,0 +1,27 @@
{
"suites": {
"event": {"pass": 6, "fail": 0},
"log": {"pass": 9, "fail": 0},
"kv": {"pass": 13, "fail": 0},
"project": {"pass": 9, "fail": 0},
"subscribe": {"pass": 9, "fail": 0},
"concurrency": {"pass": 8, "fail": 0},
"snapshot": {"pass": 11, "fail": 0},
"compaction": {"pass": 11, "fail": 0},
"durable": {"pass": 15, "fail": 0},
"blob": {"pass": 14, "fail": 0},
"view": {"pass": 11, "fail": 0},
"cas": {"pass": 11, "fail": 0},
"catalog": {"pass": 10, "fail": 0},
"query": {"pass": 9, "fail": 0},
"batch": {"pass": 10, "fail": 0},
"upcast": {"pass": 9, "fail": 0},
"idempotency": {"pass": 9, "fail": 0},
"global": {"pass": 11, "fail": 0},
"example-acl": {"pass": 10, "fail": 0},
"recovery": {"pass": 6, "fail": 0}
},
"total_pass": 201,
"total_fail": 0,
"total": 201
}

27
lib/persist/scoreboard.md Normal file
View File

@@ -0,0 +1,27 @@
# persist Conformance Scoreboard
_Generated by `lib/persist/conformance.sh`_
| Suite | Pass | Fail | Total |
|-------|-----:|-----:|------:|
| event | 6 | 0 | 6 |
| log | 9 | 0 | 9 |
| kv | 13 | 0 | 13 |
| project | 9 | 0 | 9 |
| subscribe | 9 | 0 | 9 |
| concurrency | 8 | 0 | 8 |
| snapshot | 11 | 0 | 11 |
| compaction | 11 | 0 | 11 |
| durable | 15 | 0 | 15 |
| blob | 14 | 0 | 14 |
| view | 11 | 0 | 11 |
| cas | 11 | 0 | 11 |
| catalog | 10 | 0 | 10 |
| query | 9 | 0 | 9 |
| batch | 10 | 0 | 10 |
| upcast | 9 | 0 | 9 |
| idempotency | 9 | 0 | 9 |
| global | 11 | 0 | 11 |
| example-acl | 10 | 0 | 10 |
| recovery | 6 | 0 | 6 |
| **Total** | **201** | **0** | **201** |

40
lib/persist/snapshot.sx Normal file
View File

@@ -0,0 +1,40 @@
; persist/snapshot — checkpoint a projection so a read model rebuilds as
; snapshot + tail instead of a full replay. A snapshot is just a projection
; state {:value :seq} stored in the kv facet under a namespaced key. The
; headline property (tested both ways): snapshot + tail == full replay. Replay
; is pure — it depends only on the stored snapshot and the log tail, never a
; clock. Requires: lib/persist/project.sx, lib/persist/kv.sx.
(define persist/snapshot-key (fn (name) (str "snapshot/" name)))
; load the stored snapshot for name, or a fresh {:value seed :seq 0} if none
(define
persist/snapshot-load
(fn
(b name seed)
(persist/kv-get-or b (persist/snapshot-key name) {:value seed :seq 0})))
; store a projection state as the snapshot for name; returns the state
(define
persist/snapshot-save
(fn (b name state) (persist/kv-put b (persist/snapshot-key name) state)))
(define
persist/snapshot-exists?
(fn (b name) (persist/kv-has? b (persist/snapshot-key name))))
; replay = snapshot + tail: load the snapshot then fold events after it
(define
persist/replay
(fn
(b stream name step seed)
(persist/project-resume b stream step (persist/snapshot-load b name seed))))
; replay then persist the new snapshot; returns the updated state
(define
persist/checkpoint
(fn
(b stream name step seed)
(let
((state (persist/replay b stream name step seed)))
(begin (persist/snapshot-save b name state) state))))

21
lib/persist/subscribe.sx Normal file
View File

@@ -0,0 +1,21 @@
; persist/subscribe — a subscription hub wraps a backend with per-stream
; callbacks fired after each append. The canonical use: a callback re-runs a
; projection (or bumps a kv counter) so read models update incrementally on
; write instead of being recomputed on read.
; callback signature: (backend stream event) -> ignored
; Publish goes through the hub; direct persist/append on the backend bypasses
; subscribers by design (bulk loads, replay).
; Requires: lib/persist/log.sx.
(define persist/hub (fn (b) (let ((subs {})) {:subscriber-count (fn (stream) (let ((cs (get subs stream))) (if cs (len cs) 0))) :publish (fn (stream type at data) (let ((ev (persist/append b stream type at data))) (begin (for-each (fn (cb) (cb b stream ev)) (let ((cs (get subs stream))) (if cs cs (list)))) ev))) :subscribe (fn (stream cb) (let ((cur (get subs stream))) (set! subs (assoc subs stream (append (if cur cur (list)) cb))))) :backend b})))
(define persist/hub-backend (fn (h) (get h :backend)))
(define
persist/subscribe
(fn (h stream cb) ((get h :subscribe) stream cb)))
(define
persist/publish
(fn (h stream type at data) ((get h :publish) stream type at data)))
(define
persist/subscriber-count
(fn (h stream) ((get h :subscriber-count) stream)))

122
lib/persist/tests/batch.sx Normal file
View File

@@ -0,0 +1,122 @@
; Extension — atomic batch append: contiguous seqs, transactional all-or-nothing.
(persist-test
"batch assigns contiguous seqs"
(let
((b (persist/open)))
(let
((evs (persist/append-batch b "s" (list (list "a" 0 {}) (list "b" 0 {}) (list "c" 0 {})))))
(list
(persist/event-seq (first evs))
(persist/event-seq (nth evs 2)))))
(list 1 3))
(persist-test
"batch returns events in order"
(let
((b (persist/open)))
(let
((evs (persist/append-batch b "s" (list (list "a" 0 {}) (list "b" 0 {})))))
(list
(persist/event-type (first evs))
(persist/event-type (nth evs 1)))))
(list "a" "b"))
(persist-test
"batch grows the stream by its size"
(let
((b (persist/open)))
(begin
(persist/append-batch
b
"s"
(list
(list "a" 0 {})
(list "b" 0 {})
(list "c" 0 {})))
(persist/count b "s")))
3)
(persist-test
"batch continues an existing stream"
(let
((b (persist/open)))
(begin
(persist/append b "s" "x" 0 {})
(let
((evs (persist/append-batch b "s" (list (list "a" 0 {}) (list "b" 0 {})))))
(persist/event-seq (first evs)))))
2)
(persist-test
"empty batch is a no-op"
(let
((b (persist/open)))
(begin (persist/append-batch b "s" (list)) (persist/count b "s")))
0)
(persist-test
"batch-expect with correct seq commits all"
(let
((b (persist/open)))
(begin
(persist/append-batch-expect
b
"s"
0
(list
(list "a" 0 {})
(list "b" 0 {})))
(persist/count b "s")))
2)
(persist-test
"batch-expect with stale seq writes nothing"
(let
((b (persist/open)))
(begin
(persist/append b "s" "x" 0 {})
(persist/append-batch-expect
b
"s"
0
(list
(list "a" 0 {})
(list "b" 0 {})))
(persist/count b "s")))
1)
(persist-test
"batch-expect stale returns a conflict"
(let
((b (persist/open)))
(begin
(persist/append b "s" "x" 0 {})
(persist/conflict?
(persist/append-batch-expect
b
"s"
0
(list (list "a" 0 {}))))))
true)
(persist-test
"batch data is preserved"
(let
((b (persist/open)))
(begin
(persist/append-batch
b
"order"
(list
(list "placed" 0 {:id 1})
(list "line" 0 {:sku "x"})))
(get
(persist/event-data (nth (persist/read b "order") 1))
:sku)))
"x")
(persist-test
"batch works on the durable backend"
(let
((db (persist/mock-durable (persist/mem-backend))))
(begin
(persist/append-batch
db
"s"
(list
(list "a" 0 {})
(list "b" 0 {})))
(persist/last-seq db "s")))
2)

112
lib/persist/tests/blob.sx Normal file
View File

@@ -0,0 +1,112 @@
; Phase 4 — blob backend: store the ref, never the bytes. Bytes live in a
; separate content-addressed store (mock here).
(persist-test
"blob-ref carries cid"
(persist/blob-cid (persist/blob-ref "c1" 10 "image/png"))
"c1")
(persist-test
"blob-ref carries size"
(persist/blob-size (persist/blob-ref "c1" 10 "image/png"))
10)
(persist-test
"blob-ref carries mime"
(persist/blob-mime (persist/blob-ref "c1" 10 "image/png"))
"image/png")
(persist-test
"blob-ref? true for a ref"
(persist/blob-ref? (persist/blob-ref "c1" 1 "x"))
true)
(persist-test
"blob-ref? false for a plain dict"
(persist/blob-ref? {:n 1})
false)
(persist-test
"store returns a ref, not the bytes"
(let
((blob (persist/mock-blob (persist/mem-backend))))
(persist/blob-ref? (persist/blob-store blob "PNGDATA" "image/png")))
true)
(persist-test
"store records the byte length as size"
(let
((blob (persist/mock-blob (persist/mem-backend))))
(persist/blob-size (persist/blob-store blob "12345" "text/plain")))
5)
(persist-test
"fetch round-trips the bytes via the ref"
(let
((blob (persist/mock-blob (persist/mem-backend))))
(let
((ref (persist/blob-store blob "PAYLOAD" "text/plain")))
(persist/blob-fetch blob ref)))
"PAYLOAD")
(persist-test
"exists? true after store"
(let
((blob (persist/mock-blob (persist/mem-backend))))
(let
((ref (persist/blob-store blob "X" "text/plain")))
(persist/blob-exists? blob ref)))
true)
(persist-test
"content addressing: same bytes dedupe to same cid"
(let
((blob (persist/mock-blob (persist/mem-backend))))
(equal?
(persist/blob-cid (persist/blob-store blob "SAME" "text/plain"))
(persist/blob-cid (persist/blob-store blob "SAME" "text/plain"))))
true)
(persist-test
"different bytes get different cids"
(let
((blob (persist/mock-blob (persist/mem-backend))))
(equal?
(persist/blob-cid (persist/blob-store blob "A" "text/plain"))
(persist/blob-cid (persist/blob-store blob "B" "text/plain"))))
false)
; ---------- the invariant: persist holds the ref, never the bytes ----------
(persist-test
"a blob ref stored in kv is a ref"
(let
((db (persist/mock-durable (persist/mem-backend)))
(blob (persist/mock-blob (persist/mem-backend))))
(begin
(persist/kv-put
db
"avatar"
(persist/blob-store blob "BIGIMAGE" "image/png"))
(persist/blob-ref? (persist/kv-get db "avatar"))))
true)
(persist-test
"the kv value does not contain the bytes"
(let
((db (persist/mock-durable (persist/mem-backend)))
(blob (persist/mock-blob (persist/mem-backend))))
(begin
(persist/kv-put
db
"avatar"
(persist/blob-store blob "BIGIMAGE" "image/png"))
(has-key? (persist/kv-get db "avatar") :bytes)))
false)
(persist-test
"a blob ref stored in the log is a ref, bytes fetched separately"
(let
((db (persist/mock-durable (persist/mem-backend)))
(store (persist/mem-backend)))
(let
((blob (persist/mock-blob store)))
(begin
(persist/append
db
"uploads"
"added"
0
(persist/blob-store blob "FILEBYTES" "application/pdf"))
(let
((ref (persist/event-data (first (persist/read db "uploads")))))
(list (persist/blob-ref? ref) (persist/blob-fetch blob ref))))))
(list true "FILEBYTES"))

96
lib/persist/tests/cas.sx Normal file
View File

@@ -0,0 +1,96 @@
; Extension — kv compare-and-swap: atomic current-state updates. Uses
; persist/conflict? from concurrency.sx.
(persist-test
"cas on absent key with nil expected succeeds"
(let ((b (persist/open))) (persist/kv-cas b "k" nil 1))
1)
(persist-test
"cas with matching expected succeeds"
(let
((b (persist/open)))
(begin
(persist/kv-put b "k" 5)
(persist/kv-cas b "k" 5 6)
(persist/kv-get b "k")))
6)
(persist-test
"cas with stale expected returns a conflict"
(let
((b (persist/open)))
(begin
(persist/kv-put b "k" 5)
(persist/conflict? (persist/kv-cas b "k" 4 6))))
true)
(persist-test
"a conflicting cas does not write"
(let
((b (persist/open)))
(begin
(persist/kv-put b "k" 5)
(persist/kv-cas b "k" 4 6)
(persist/kv-get b "k")))
5)
(persist-test
"cas conflict carries expected and actual"
(let
((b (persist/open)))
(begin
(persist/kv-put b "k" 5)
(let
((r (persist/kv-cas b "k" 4 6)))
(list (persist/conflict-expected r) (persist/conflict-actual r)))))
(list 4 5))
(persist-test
"two cas racers: first wins, second conflicts"
(let
((b (persist/open)))
(begin
(persist/kv-put b "stock" 10)
(persist/kv-cas b "stock" 10 9)
(persist/conflict? (persist/kv-cas b "stock" 10 9))))
true)
(persist-test
"retry after cas conflict succeeds"
(let
((b (persist/open)))
(begin
(persist/kv-put b "stock" 10)
(persist/kv-cas b "stock" 10 9)
(let
((r (persist/kv-cas b "stock" 10 9)))
(if
(persist/conflict? r)
(persist/kv-cas b "stock" (persist/conflict-actual r) 8)
r))))
8)
(persist-test
"put-new on absent key succeeds"
(let ((b (persist/open))) (persist/kv-put-new b "k" 1))
1)
(persist-test
"put-new on existing key conflicts"
(let
((b (persist/open)))
(begin
(persist/kv-put b "k" 1)
(persist/conflict? (persist/kv-put-new b "k" 2))))
true)
(persist-test
"put-new does not overwrite"
(let
((b (persist/open)))
(begin
(persist/kv-put b "k" 1)
(persist/kv-put-new b "k" 2)
(persist/kv-get b "k")))
1)
(persist-test
"cas works on the durable backend"
(let
((db (persist/mock-durable (persist/mem-backend))))
(begin
(persist/kv-put db "k" 1)
(persist/kv-cas db "k" 1 2)
(persist/kv-get db "k")))
2)

View File

@@ -0,0 +1,86 @@
; Extension — stream catalog: enumerate streams, count, existence, totals.
(persist-test
"empty backend has no streams"
(persist/stream-count (persist/open))
0)
(persist-test
"stream-exists? false when absent"
(persist/stream-exists? (persist/open) "orders")
false)
(persist-test
"append registers a stream"
(let
((b (persist/open)))
(begin
(persist/append b "orders" "x" 0 {})
(persist/stream-exists? b "orders")))
true)
(persist-test
"stream-count counts distinct streams"
(let
((b (persist/open)))
(begin
(persist/append b "a" "x" 0 {})
(persist/append b "b" "x" 0 {})
(persist/append b "a" "x" 0 {})
(persist/stream-count b)))
2)
(persist-test
"compacted-away stream still lists"
(let
((b (persist/open)))
(begin
(persist/append b "a" "x" 0 {})
(persist/checkpoint b "a" "snap" (fn (acc e) acc) 0)
(persist/truncate b "a" 1)
(list (persist/count b "a") (persist/stream-exists? b "a"))))
(list 0 true))
(persist-test
"kv-only backend lists no streams"
(let
((b (persist/open)))
(begin (persist/kv-put b "k" 1) (persist/stream-count b)))
0)
(persist-test
"total-events sums high-water marks"
(let
((b (persist/open)))
(begin
(persist/append b "a" "x" 0 {})
(persist/append b "a" "x" 0 {})
(persist/append b "b" "x" 0 {})
(persist/total-events b)))
3)
(persist-test
"total-events counts compacted events too"
(let
((b (persist/open)))
(begin
(persist/append b "a" "x" 0 {})
(persist/append b "a" "x" 0 {})
(persist/checkpoint b "a" "snap" (fn (acc e) acc) 0)
(persist/truncate b "a" 2)
(persist/total-events b)))
2)
(persist-test
"catalog works on the durable backend"
(let
((db (persist/mock-durable (persist/mem-backend))))
(begin
(persist/append db "a" "x" 0 {})
(persist/append db "b" "x" 0 {})
(persist/stream-count db)))
2)
(persist-test
"catalog survives restart"
(let
((disk (persist/mem-backend)))
(begin
(let
((db (persist/mock-durable disk)))
(begin
(persist/append db "a" "x" 0 {})
(persist/append db "b" "x" 0 {})))
(persist/stream-count (persist/mock-durable disk))))
2)

View File

@@ -0,0 +1,124 @@
; Phase 3 — compaction: drop the snapshotted prefix; replay determinism holds.
(define comp-count (fn (acc e) (+ acc 1)))
(persist-test
"uncompacted counts events since snapshot"
(let
((b (persist/open)))
(begin
(persist/append b "s" "x" 0 {})
(persist/append b "s" "x" 0 {})
(persist/uncompacted b "s" "snap" 0)))
2)
(persist-test
"should-compact? false below threshold"
(let
((b (persist/open)))
(begin
(persist/append b "s" "x" 0 {})
(persist/should-compact? b "s" "snap" 3 0)))
false)
(persist-test
"should-compact? true at threshold"
(let
((b (persist/open)))
(begin
(persist/append b "s" "x" 0 {})
(persist/append b "s" "x" 0 {})
(persist/append b "s" "x" 0 {})
(persist/should-compact? b "s" "snap" 3 0)))
true)
(persist-test
"compact truncates the snapshotted prefix"
(let
((b (persist/open)))
(begin
(persist/append b "s" "x" 0 {})
(persist/append b "s" "x" 0 {})
(persist/append b "s" "x" 0 {})
(persist/compact b "s" "snap" comp-count 0)
(persist/count b "s")))
0)
(persist-test
"compact preserves logical last-seq"
(let
((b (persist/open)))
(begin
(persist/append b "s" "x" 0 {})
(persist/append b "s" "x" 0 {})
(persist/compact b "s" "snap" comp-count 0)
(persist/last-seq b "s")))
2)
(persist-test
"append after compaction continues the seq"
(let
((b (persist/open)))
(begin
(persist/append b "s" "x" 0 {})
(persist/append b "s" "x" 0 {})
(persist/compact b "s" "snap" comp-count 0)
(persist/event-seq (persist/append b "s" "x" 0 {}))))
3)
(persist-test
"replay after compaction == full count before compaction"
(let
((b (persist/open)))
(begin
(persist/append b "s" "x" 0 {})
(persist/append b "s" "x" 0 {})
(persist/append b "s" "x" 0 {})
(persist/compact b "s" "snap" comp-count 0)
(persist/append b "s" "x" 0 {})
(persist/append b "s" "x" 0 {})
(persist/project-value
(persist/replay b "s" "snap" comp-count 0))))
5)
(persist-test
"determinism: post-compaction replay value equals uncompacted full replay"
(let
((b (persist/open)) (c (persist/open)))
(begin
(persist/append b "s" "x" 0 {})
(persist/append b "s" "x" 0 {})
(persist/append b "s" "x" 0 {})
(persist/append c "s" "x" 0 {})
(persist/append c "s" "x" 0 {})
(persist/append c "s" "x" 0 {})
(persist/compact b "s" "snap" comp-count 0)
(persist/append b "s" "x" 0 {})
(persist/append c "s" "x" 0 {})
(equal?
(persist/project-value
(persist/replay b "s" "snap" comp-count 0))
(persist/project-fold c "s" comp-count 0))))
true)
(persist-test
"maybe-compact below threshold does not truncate"
(let
((b (persist/open)))
(begin
(persist/append b "s" "x" 0 {})
(persist/maybe-compact b "s" "snap" comp-count 0 5)
(persist/count b "s")))
1)
(persist-test
"maybe-compact at threshold truncates"
(let
((b (persist/open)))
(begin
(persist/append b "s" "x" 0 {})
(persist/append b "s" "x" 0 {})
(persist/maybe-compact b "s" "snap" comp-count 0 2)
(persist/count b "s")))
0)
(persist-test
"compact is idempotent on an empty tail"
(let
((b (persist/open)))
(begin
(persist/append b "s" "x" 0 {})
(persist/compact b "s" "snap" comp-count 0)
(persist/project-value
(persist/compact b "s" "snap" comp-count 0))))
1)

View File

@@ -0,0 +1,96 @@
; Phase 2 — optimistic concurrency: conflict is a real result, not a crash.
(persist-test
"append-expect 0 on empty stream succeeds"
(persist/event-seq
(persist/append-expect
(persist/open)
"s"
0
"x"
0
{}))
1)
(persist-test
"append-expect with correct seq succeeds"
(let
((b (persist/open)))
(begin
(persist/append b "s" "x" 0 {})
(persist/event-seq
(persist/append-expect b "s" 1 "x" 0 {}))))
2)
(persist-test
"append-expect with stale seq returns a conflict"
(let
((b (persist/open)))
(begin
(persist/append b "s" "x" 0 {})
(persist/append b "s" "x" 0 {})
(persist/conflict?
(persist/append-expect b "s" 1 "x" 0 {}))))
true)
(persist-test
"a successful append is not a conflict"
(persist/conflict?
(persist/append-expect
(persist/open)
"s"
0
"x"
0
{}))
false)
(persist-test
"conflict carries expected and actual"
(let
((b (persist/open)))
(begin
(persist/append b "s" "x" 0 {})
(persist/append b "s" "x" 0 {})
(let
((r (persist/append-expect b "s" 0 "x" 0 {})))
(list (persist/conflict-expected r) (persist/conflict-actual r)))))
(list 0 2))
(persist-test
"a conflicting append does not write"
(let
((b (persist/open)))
(begin
(persist/append b "s" "x" 0 {})
(persist/append-expect b "s" 0 "x" 0 {})
(persist/count b "s")))
1)
(persist-test
"two writers: first wins, second conflicts"
(let
((b (persist/open)))
(let
((seen (persist/last-seq b "s")))
(begin
(persist/append-expect b "s" seen "x" 0 {:who "A"})
(persist/conflict?
(persist/append-expect b "s" seen "x" 0 {:who "B"})))))
true)
(persist-test
"retry after conflict succeeds"
(let
((b (persist/open)))
(let
((seen (persist/last-seq b "s")))
(begin
(persist/append-expect b "s" seen "x" 0 {:who "A"})
(let
((r (persist/append-expect b "s" seen "x" 0 {:who "B"})))
(if
(persist/conflict? r)
(persist/event-seq
(persist/append-expect
b
"s"
(persist/conflict-actual r)
"x"
0
{:who "B"}))
(persist/event-seq r))))))
2)

View File

@@ -0,0 +1,163 @@
; Phase 4 — durable backend over the IO-suspension boundary, tested with a mock
; transport (the mock-IO harness for the durable protocol). The whole facet
; stack must run unchanged on mock-durable, and a "crash/restart" (drop the
; backend, keep the disk) must recover state by replay.
(define dur-count (fn (acc e) (+ acc 1)))
; ---------- request encoders ----------
(persist-test
"req-append encodes op + args"
(persist/req-append "s" {:k 1})
{:op "persist/append" :args (list "s" {:k 1})})
(persist-test
"req-kv-put encodes op + args"
(persist/req-kv-put "k" 7)
{:op "persist/kv-put" :args (list "k" 7)})
; ---------- serve round-trips against a disk ----------
(persist-test
"serve append then serve read"
(let
((disk (persist/mem-backend)))
(begin
(persist/serve
disk
(persist/req-append
"s"
(persist/event "s" 1 "x" 0 {:n 1})))
(get
(persist/event-data
(first (persist/serve disk (persist/req-read "s"))))
:n)))
1)
(persist-test
"serve kv-put then kv-get"
(let
((disk (persist/mem-backend)))
(begin
(persist/serve disk (persist/req-kv-put "k" 42))
(persist/serve disk (persist/req-kv-get "k"))))
42)
(persist-test
"serve unknown op is a clear error"
(let
((disk (persist/mem-backend)))
(guard (e (true "errored")) (persist/serve disk {:op "persist/bogus" :args (list)})))
"errored")
; ---------- full facet stack on mock-durable ----------
(persist-test
"log facet works on mock-durable"
(let
((db (persist/mock-durable (persist/mem-backend))))
(begin
(persist/append db "s" "x" 0 {})
(persist/append db "s" "x" 0 {})
(persist/count db "s")))
2)
(persist-test
"seq assignment works on mock-durable"
(let
((db (persist/mock-durable (persist/mem-backend))))
(begin
(persist/append db "s" "x" 0 {})
(persist/event-seq (persist/append db "s" "x" 0 {}))))
2)
(persist-test
"kv facet works on mock-durable"
(let
((db (persist/mock-durable (persist/mem-backend))))
(begin (persist/kv-put db "k" 5) (persist/kv-get db "k")))
5)
(persist-test
"projection works on mock-durable"
(let
((db (persist/mock-durable (persist/mem-backend))))
(begin
(persist/append db "s" "x" 0 {})
(persist/append db "s" "x" 0 {})
(persist/append db "s" "x" 0 {})
(persist/project-fold db "s" dur-count 0)))
3)
(persist-test
"snapshot + replay work on mock-durable"
(let
((db (persist/mock-durable (persist/mem-backend))))
(begin
(persist/append db "s" "x" 0 {})
(persist/append db "s" "x" 0 {})
(persist/checkpoint db "s" "snap" dur-count 0)
(persist/append db "s" "x" 0 {})
(persist/project-value
(persist/replay db "s" "snap" dur-count 0))))
3)
(persist-test
"compaction works on mock-durable"
(let
((db (persist/mock-durable (persist/mem-backend))))
(begin
(persist/append db "s" "x" 0 {})
(persist/append db "s" "x" 0 {})
(persist/compact db "s" "snap" dur-count 0)
(list (persist/count db "s") (persist/last-seq db "s"))))
(list 0 2))
; ---------- crash / restart replay ----------
(persist-test
"restart recovers log state from the disk"
(let
((disk (persist/mem-backend)))
(begin
(let
((db (persist/mock-durable disk)))
(begin
(persist/append db "s" "x" 0 {})
(persist/append db "s" "x" 0 {})))
(let
((db2 (persist/mock-durable disk)))
(persist/project-fold db2 "s" dur-count 0))))
2)
(persist-test
"restart continues the seq counter"
(let
((disk (persist/mem-backend)))
(begin
(let
((db (persist/mock-durable disk)))
(begin
(persist/append db "s" "x" 0 {})
(persist/append db "s" "x" 0 {})))
(let
((db2 (persist/mock-durable disk)))
(persist/event-seq (persist/append db2 "s" "x" 0 {})))))
3)
(persist-test
"restart recovers a kv value"
(let
((disk (persist/mem-backend)))
(begin
(let
((db (persist/mock-durable disk)))
(persist/kv-put db "cfg" "on"))
(let ((db2 (persist/mock-durable disk))) (persist/kv-get db2 "cfg"))))
"on")
(persist-test
"restart from snapshot equals full replay"
(let
((disk (persist/mem-backend)))
(begin
(let
((db (persist/mock-durable disk)))
(begin
(persist/append db "s" "x" 0 {})
(persist/append db "s" "x" 0 {})
(persist/checkpoint db "s" "snap" dur-count 0)
(persist/append db "s" "x" 0 {})))
(let
((db2 (persist/mock-durable disk)))
(equal?
(persist/project-value
(persist/replay db2 "s" "snap" dur-count 0))
(persist/project-fold db2 "s" dur-count 0)))))
true)

View File

@@ -0,0 +1,30 @@
; Phase 1 — event record accessors. Uses the persist-test harness
; (persist-test name got expected) provided by conformance.sh.
(persist-test
"event-stream"
(persist/event-stream
(persist/event "s" 1 "t" 0 {}))
"s")
(persist-test
"event-seq"
(persist/event-seq (persist/event "s" 3 "t" 0 {}))
3)
(persist-test
"event-type"
(persist/event-type
(persist/event "s" 1 "create" 0 {}))
"create")
(persist-test
"event-at"
(persist/event-at (persist/event "s" 1 "t" 42 {}))
42)
(persist-test
"event-data"
(persist/event-data
(persist/event "s" 1 "t" 0 {:x 9}))
{:x 9})
(persist-test
"event is a dict with all fields"
(len (keys (persist/event "s" 1 "t" 0 {})))
5)

View File

@@ -0,0 +1,104 @@
; Reference migration — acl grants on persist. Proves the AFTER behaviour,
; including the capabilities the hand-rolled BEFORE version could not provide
; (durability across restart + an audit trail).
(persist-test
"grant then can?"
(let
((b (persist/open)))
(begin
(acl/grant b "doc-1" "alice" 0)
(acl/can? b "doc-1" "alice")))
true)
(persist-test
"no grant means no access"
(acl/can? (persist/open) "doc-1" "alice")
false)
(persist-test
"revoke removes access"
(let
((b (persist/open)))
(begin
(acl/grant b "doc-1" "alice" 0)
(acl/revoke b "doc-1" "alice" 1)
(acl/can? b "doc-1" "alice")))
false)
(persist-test
"multiple principals tracked independently"
(let
((b (persist/open)))
(begin
(acl/grant b "doc-1" "alice" 0)
(acl/grant b "doc-1" "bob" 1)
(acl/revoke b "doc-1" "alice" 2)
(list (acl/can? b "doc-1" "alice") (acl/can? b "doc-1" "bob"))))
(list false true))
(persist-test
"granting twice is idempotent in the set"
(let
((b (persist/open)))
(begin
(acl/grant b "doc-1" "alice" 0)
(acl/grant b "doc-1" "alice" 1)
(len (acl/grants b "doc-1"))))
1)
(persist-test
"grants on different resources are isolated"
(let
((b (persist/open)))
(begin
(acl/grant b "doc-1" "alice" 0)
(acl/grant b "doc-2" "bob" 0)
(list (acl/can? b "doc-1" "bob") (acl/can? b "doc-2" "bob"))))
(list false true))
(persist-test
"audit window answers when-was-it-granted (new capability)"
(let
((b (persist/open)))
(begin
(acl/grant b "doc-1" "alice" 100)
(acl/revoke b "doc-1" "alice" 200)
(acl/grant b "doc-1" "bob" 300)
(len (acl/audit-window b "doc-1" 150 300))))
2)
(persist-test
"materialized view stays current on publish"
(let
((b (persist/open)))
(let
((h (persist/view-attach (persist/hub b) (acl/view "doc-1"))))
(begin
(persist/publish
h
(acl/stream "doc-1")
"granted"
0
{:principal "alice"})
(acl/can-fast? b "doc-1" "alice"))))
true)
(persist-test
"grants survive restart on the durable backend (the headline win)"
(let
((disk (persist/mem-backend)))
(begin
(let
((db (persist/mock-durable disk)))
(begin
(acl/grant db "doc-1" "alice" 0)
(acl/grant db "doc-1" "bob" 1)))
(let
((db2 (persist/mock-durable disk)))
(list (acl/can? db2 "doc-1" "alice") (acl/can? db2 "doc-1" "bob")))))
(list true true))
(persist-test
"revoke before restart is still revoked after"
(let
((disk (persist/mem-backend)))
(begin
(let
((db (persist/mock-durable disk)))
(begin
(acl/grant db "doc-1" "alice" 0)
(acl/revoke db "doc-1" "alice" 1)))
(acl/can? (persist/mock-durable disk) "doc-1" "alice")))
false)

123
lib/persist/tests/global.sx Normal file
View File

@@ -0,0 +1,123 @@
; Extension — global commit ordering across streams.
(persist-test
"gappend returns the stream event with its local seq"
(let
((b (persist/open)))
(persist/event-seq
(persist/gappend b "orders" "placed" 0 {})))
1)
(persist-test
"global-pos advances per gappend regardless of stream"
(let
((b (persist/open)))
(begin
(persist/gappend b "orders" "placed" 0 {})
(persist/gappend b "users" "joined" 0 {})
(persist/gappend b "orders" "placed" 0 {})
(persist/global-pos b)))
3)
(persist-test
"read-global returns events in commit order across streams"
(let
((b (persist/open)))
(begin
(persist/gappend b "orders" "placed" 0 {:n 1})
(persist/gappend b "users" "joined" 0 {:n 2})
(persist/gappend b "orders" "placed" 0 {:n 3})
(let
((g (persist/read-global b)))
(list
(get (persist/event-data (nth g 0)) :n)
(get (persist/event-data (nth g 1)) :n)
(get (persist/event-data (nth g 2)) :n)))))
(list 1 2 3))
(persist-test
"read-global resolves to the right streams"
(let
((b (persist/open)))
(begin
(persist/gappend b "orders" "placed" 0 {})
(persist/gappend b "users" "joined" 0 {})
(let
((g (persist/read-global b)))
(list
(persist/event-stream (nth g 0))
(persist/event-stream (nth g 1))))))
(list "orders" "users"))
(persist-test
"project-global folds across all streams in order"
(let
((b (persist/open)))
(begin
(persist/gappend b "a" "x" 0 {:v 10})
(persist/gappend b "b" "x" 0 {:v 20})
(persist/gappend b "a" "x" 0 {:v 30})
(persist/project-global
b
(fn (acc e) (+ acc (get (persist/event-data e) :v)))
0)))
60)
(persist-test
"global index is hidden from the public catalog"
(let
((b (persist/open)))
(begin
(persist/gappend b "orders" "placed" 0 {})
(persist/gappend b "users" "joined" 0 {})
(list (persist/stream-count b) (persist/stream-exists? b "$global"))))
(list 2 false))
(persist-test
"streams-all reveals the reserved index"
(let
((b (persist/open)))
(begin
(persist/gappend b "orders" "placed" 0 {})
(contains? (persist/streams-all b) "$global")))
true)
(persist-test
"global-from gives pointers at or after a position"
(let
((b (persist/open)))
(begin
(persist/gappend b "a" "x" 0 {})
(persist/gappend b "a" "x" 0 {})
(persist/gappend b "a" "x" 0 {})
(len (persist/global-from b 2))))
2)
(persist-test
"plain append does not touch the global index"
(let
((b (persist/open)))
(begin
(persist/append b "orders" "placed" 0 {})
(persist/gappend b "orders" "placed" 0 {})
(persist/global-pos b)))
1)
(persist-test
"global ordering works on the durable backend"
(let
((db (persist/mock-durable (persist/mem-backend))))
(begin
(persist/gappend db "a" "x" 0 {:v 1})
(persist/gappend db "b" "x" 0 {:v 2})
(persist/project-global
db
(fn (acc e) (+ acc (get (persist/event-data e) :v)))
0)))
3)
(persist-test
"global order survives restart (determinism)"
(let
((disk (persist/mem-backend)))
(begin
(let
((db (persist/mock-durable disk)))
(begin
(persist/gappend db "a" "x" 0 {:v 1})
(persist/gappend db "b" "x" 0 {:v 2})))
(persist/project-global
(persist/mock-durable disk)
(fn (acc e) (+ acc (get (persist/event-data e) :v)))
0)))
3)

View File

@@ -0,0 +1,92 @@
; Extension — exactly-once append under retries.
(persist-test
"seen? false before first append"
(persist/seen? (persist/open) "orders" "cmd-1")
false)
(persist-test
"append-once appends on first use"
(let
((b (persist/open)))
(begin
(persist/append-once b "orders" "cmd-1" "placed" 0 {})
(persist/count b "orders")))
1)
(persist-test
"seen? true after first append"
(let
((b (persist/open)))
(begin
(persist/append-once b "orders" "cmd-1" "placed" 0 {})
(persist/seen? b "orders" "cmd-1")))
true)
(persist-test
"repeat with same key does not append again"
(let
((b (persist/open)))
(begin
(persist/append-once b "orders" "cmd-1" "placed" 0 {})
(persist/append-once b "orders" "cmd-1" "placed" 0 {})
(persist/append-once b "orders" "cmd-1" "placed" 0 {})
(persist/count b "orders")))
1)
(persist-test
"repeat returns the same event (same seq)"
(let
((b (persist/open)))
(let
((e1 (persist/append-once b "orders" "cmd-1" "placed" 0 {})))
(persist/event-seq
(persist/append-once b "orders" "cmd-1" "placed" 0 {}))))
1)
(persist-test
"different keys append separately"
(let
((b (persist/open)))
(begin
(persist/append-once b "orders" "cmd-1" "placed" 0 {})
(persist/append-once b "orders" "cmd-2" "placed" 0 {})
(persist/count b "orders")))
2)
(persist-test
"idempotency is per-stream"
(let
((b (persist/open)))
(begin
(persist/append-once b "a" "cmd-1" "x" 0 {})
(persist/append-once b "b" "cmd-1" "x" 0 {})
(list (persist/count b "a") (persist/count b "b"))))
(list 1 1))
(persist-test
"stored data is preserved on first append"
(let
((b (persist/open)))
(get
(persist/event-data
(persist/append-once b "s" "k" "x" 0 {:n 9}))
:n))
9)
(persist-test
"idempotency survives restart on the durable backend"
(let
((disk (persist/mem-backend)))
(begin
(persist/append-once
(persist/mock-durable disk)
"orders"
"cmd-1"
"placed"
0
{})
(let
((db2 (persist/mock-durable disk)))
(begin
(persist/append-once
db2
"orders"
"cmd-1"
"placed"
0
{})
(persist/count db2 "orders")))))
1)

86
lib/persist/tests/kv.sx Normal file
View File

@@ -0,0 +1,86 @@
; Phase 1 — kv facet: get/put/delete/has?/keys, get-or, update.
(persist-test "absent key reads nil" (persist/kv-get (persist/open) "x") nil)
(persist-test
"has? false when absent"
(persist/kv-has? (persist/open) "x")
false)
(persist-test
"put then get"
(let
((b (persist/open)))
(begin (persist/kv-put b "x" 7) (persist/kv-get b "x")))
7)
(persist-test
"put returns value"
(let ((b (persist/open))) (persist/kv-put b "x" 9))
9)
(persist-test
"has? true after put"
(let
((b (persist/open)))
(begin (persist/kv-put b "x" 1) (persist/kv-has? b "x")))
true)
(persist-test
"put overwrites"
(let
((b (persist/open)))
(begin
(persist/kv-put b "x" 1)
(persist/kv-put b "x" 2)
(persist/kv-get b "x")))
2)
(persist-test
"delete removes key"
(let
((b (persist/open)))
(begin
(persist/kv-put b "x" 1)
(persist/kv-delete b "x")
(persist/kv-has? b "x")))
false)
(persist-test
"delete then get is nil"
(let
((b (persist/open)))
(begin
(persist/kv-put b "x" 1)
(persist/kv-delete b "x")
(persist/kv-get b "x")))
nil)
(persist-test
"keys lists stored keys"
(let
((b (persist/open)))
(begin
(persist/kv-put b "a" 1)
(persist/kv-put b "b" 2)
(len (persist/kv-keys b))))
2)
(persist-test
"get-or returns default when absent"
(persist/kv-get-or (persist/open) "x" 99)
99)
(persist-test
"get-or returns value when present"
(let
((b (persist/open)))
(begin
(persist/kv-put b "x" 5)
(persist/kv-get-or b "x" 99)))
5)
(persist-test
"kv-update applies fn over default"
(let
((b (persist/open)))
(begin
(persist/kv-update b "n" 0 (fn (v) (+ v 1)))
(persist/kv-update b "n" 0 (fn (v) (+ v 1)))
(persist/kv-get b "n")))
2)
(persist-test
"kv facet does not touch log"
(let
((b (persist/open)))
(begin (persist/kv-put b "x" 1) (persist/count b "x")))
0)

81
lib/persist/tests/log.sx Normal file
View File

@@ -0,0 +1,81 @@
; Phase 1 — log facet: append/read/read-from, sequential seq, stream isolation.
; Note: map returns an array-backed list not equal? to a (list ...) literal,
; so assertions build their compared list with list/nth, not map.
(persist-test
"empty stream reads empty"
(len (persist/read (persist/open) "orders"))
0)
(persist-test
"last-seq empty is 0"
(persist/last-seq (persist/open) "orders")
0)
(persist-test
"append returns event with seq 1"
(persist/event-seq
(persist/append (persist/open) "orders" "placed" 0 {:id 1}))
1)
(persist-test
"append assigns sequential seqs"
(let
((b (persist/open)))
(begin
(persist/append b "orders" "placed" 0 {})
(persist/append b "orders" "placed" 1 {})
(persist/event-seq
(persist/append b "orders" "placed" 2 {}))))
3)
(persist-test
"read returns events oldest-first"
(let
((b (persist/open)))
(begin
(persist/append b "s" "a" 0 {:n 1})
(persist/append b "s" "b" 0 {:n 2})
(let
((es (persist/read b "s")))
(list
(get (persist/event-data (nth es 0)) :n)
(get (persist/event-data (nth es 1)) :n)))))
(list 1 2))
(persist-test
"count tracks appends"
(let
((b (persist/open)))
(begin
(persist/append b "s" "a" 0 {})
(persist/append b "s" "a" 0 {})
(persist/count b "s")))
2)
(persist-test
"streams are isolated"
(let
((b (persist/open)))
(begin
(persist/append b "s1" "a" 0 {})
(persist/append b "s2" "a" 0 {})
(persist/append b "s2" "a" 0 {})
(list (persist/count b "s1") (persist/count b "s2"))))
(list 1 2))
(persist-test
"read-from filters by seq"
(let
((b (persist/open)))
(begin
(persist/append b "s" "a" 0 {})
(persist/append b "s" "a" 0 {})
(persist/append b "s" "a" 0 {})
(let
((es (persist/read-from b "s" 2)))
(list
(persist/event-seq (nth es 0))
(persist/event-seq (nth es 1))))))
(list 2 3))
(persist-test
"read-from past end is empty"
(let
((b (persist/open)))
(begin
(persist/append b "s" "a" 0 {})
(len (persist/read-from b "s" 5))))
0)

View File

@@ -0,0 +1,115 @@
; Phase 2 — projections: fold a stream into a read model, resume incrementally.
(persist-test
"project empty stream returns seed value"
(persist/project-fold
(persist/open)
"s"
(fn (acc e) (+ acc 1))
0)
0)
(persist-test
"project empty stream seq is 0"
(persist/project-seq
(persist/project (persist/open) "s" (fn (a e) a) 0))
0)
(persist-test
"project counts events"
(let
((b (persist/open)))
(begin
(persist/append b "s" "x" 0 {})
(persist/append b "s" "x" 0 {})
(persist/append b "s" "x" 0 {})
(persist/project-fold
b
"s"
(fn (acc e) (+ acc 1))
0)))
3)
(persist-test
"project sums event data"
(let
((b (persist/open)))
(begin
(persist/append b "ledger" "credit" 0 {:amt 10})
(persist/append b "ledger" "credit" 1 {:amt 5})
(persist/append b "ledger" "debit" 2 {:amt 3})
(persist/project-fold
b
"ledger"
(fn
(bal e)
(if
(equal? (persist/event-type e) "credit")
(+ bal (get (persist/event-data e) :amt))
(- bal (get (persist/event-data e) :amt))))
0)))
12)
(persist-test
"project tracks last seq"
(let
((b (persist/open)))
(begin
(persist/append b "s" "x" 0 {})
(persist/append b "s" "x" 0 {})
(persist/project-seq (persist/project b "s" (fn (a e) a) 0))))
2)
(persist-test
"resume folds only the tail"
(let
((b (persist/open)))
(begin
(persist/append b "s" "x" 0 {})
(persist/append b "s" "x" 0 {})
(let
((p1 (persist/project b "s" (fn (acc e) (+ acc 1)) 0)))
(begin
(persist/append b "s" "x" 0 {})
(persist/project-value
(persist/project-resume
b
"s"
(fn (acc e) (+ acc 1))
p1))))))
3)
(persist-test
"resume with no new events is a no-op"
(let
((b (persist/open)))
(begin
(persist/append b "s" "x" 0 {})
(let
((p1 (persist/project b "s" (fn (acc e) (+ acc 1)) 0)))
(persist/project-value
(persist/project-resume b "s" (fn (acc e) (+ acc 1)) p1)))))
1)
(persist-test
"resume advances seq"
(let
((b (persist/open)))
(begin
(persist/append b "s" "x" 0 {})
(let
((p1 (persist/project b "s" (fn (a e) a) 0)))
(begin
(persist/append b "s" "x" 0 {})
(persist/append b "s" "x" 0 {})
(persist/project-seq
(persist/project-resume b "s" (fn (a e) a) p1))))))
3)
(persist-test
"full project equals seed-resume from zero"
(let
((b (persist/open)))
(begin
(persist/append b "s" "x" 0 {})
(persist/append b "s" "x" 0 {})
(equal?
(persist/project b "s" (fn (acc e) (+ acc 1)) 0)
(persist/project-resume
b
"s"
(fn (acc e) (+ acc 1))
{:value 0 :seq 0}))))
true)

101
lib/persist/tests/query.sx Normal file
View File

@@ -0,0 +1,101 @@
; Extension — read-side query helpers. Assertions count / index, not map vs list.
(define q-seqs (fn (es) (map persist/event-seq es)))
(persist-test
"read-between slices a seq range"
(let
((b (persist/open)))
(begin
(persist/append b "s" "x" 0 {})
(persist/append b "s" "x" 0 {})
(persist/append b "s" "x" 0 {})
(persist/append b "s" "x" 0 {})
(let
((es (persist/read-between b "s" 2 3)))
(list
(len es)
(persist/event-seq (first es))
(persist/event-seq (nth es 1))))))
(list 2 2 3))
(persist-test
"read-between is inclusive of endpoints"
(let
((b (persist/open)))
(begin
(persist/append b "s" "x" 0 {})
(persist/append b "s" "x" 0 {})
(persist/append b "s" "x" 0 {})
(len (persist/read-between b "s" 1 3))))
3)
(persist-test
"read-since filters by timestamp"
(let
((b (persist/open)))
(begin
(persist/append b "s" "x" 100 {})
(persist/append b "s" "x" 200 {})
(persist/append b "s" "x" 300 {})
(len (persist/read-since b "s" 200))))
2)
(persist-test
"read-window is an inclusive time range"
(let
((b (persist/open)))
(begin
(persist/append b "s" "x" 100 {})
(persist/append b "s" "x" 200 {})
(persist/append b "s" "x" 300 {})
(persist/append b "s" "x" 400 {})
(len (persist/read-window b "s" 200 300))))
2)
(persist-test
"read-by-type filters by event type"
(let
((b (persist/open)))
(begin
(persist/append b "s" "created" 0 {})
(persist/append b "s" "updated" 0 {})
(persist/append b "s" "created" 0 {})
(len (persist/read-by-type b "s" "created"))))
2)
(persist-test
"read-where filters by predicate over data"
(let
((b (persist/open)))
(begin
(persist/append b "s" "x" 0 {:amt 5})
(persist/append b "s" "x" 0 {:amt 15})
(persist/append b "s" "x" 0 {:amt 25})
(len
(persist/read-where
b
"s"
(fn (e) (> (get (persist/event-data e) :amt) 10))))))
2)
(persist-test
"count-where counts matches"
(let
((b (persist/open)))
(begin
(persist/append b "s" "a" 0 {})
(persist/append b "s" "b" 0 {})
(persist/append b "s" "a" 0 {})
(persist/count-where
b
"s"
(fn (e) (equal? (persist/event-type e) "a")))))
2)
(persist-test
"queries return empty on empty stream"
(len (persist/read-since (persist/open) "s" 0))
0)
(persist-test
"queries work on the durable backend"
(let
((db (persist/mock-durable (persist/mem-backend))))
(begin
(persist/append db "s" "x" 100 {})
(persist/append db "s" "x" 200 {})
(len (persist/read-since db "s" 150))))
1)

View File

@@ -0,0 +1,126 @@
; Phase 4 — crash/restart integration. A whole subsystem (an order ledger:
; event log + a kv read model kept by a subscription + a periodic snapshot + an
; invoice blob ref) on the durable backend must survive a restart. "Crash" =
; drop every in-process object (backend, hub, projections); "restart" = rebuild
; them over the SAME disk + blob store. Nothing but the disk and content store
; carries across, exactly as a real process restart.
(define rec-count (fn (acc e) (+ acc 1)))
(persist-test
"log survives restart and seq continues"
(let
((disk (persist/mem-backend)))
(begin
(let
((db (persist/mock-durable disk)))
(begin
(persist/append db "orders" "placed" 0 {:id "a"})
(persist/append db "orders" "placed" 1 {:id "b"})))
(let
((db2 (persist/mock-durable disk)))
(list
(persist/project-fold db2 "orders" rec-count 0)
(persist/event-seq
(persist/append db2 "orders" "placed" 2 {:id "c"}))))))
(list 2 3))
(persist-test
"subscription-driven kv read model survives restart"
(let
((disk (persist/mem-backend)))
(begin
(let
((h (persist/hub (persist/mock-durable disk))))
(begin
(persist/subscribe
h
"orders"
(fn
(bk s e)
(persist/kv-update
bk
"order-count"
0
(fn (n) (+ n 1)))))
(persist/publish h "orders" "placed" 0 {})
(persist/publish h "orders" "placed" 1 {})))
(let
((db2 (persist/mock-durable disk)))
(persist/kv-get db2 "order-count"))))
2)
(persist-test
"snapshot taken before crash drives replay after restart"
(let
((disk (persist/mem-backend)))
(begin
(let
((db (persist/mock-durable disk)))
(begin
(persist/append db "orders" "placed" 0 {})
(persist/append db "orders" "placed" 1 {})
(persist/checkpoint db "orders" "count" rec-count 0)
(persist/append db "orders" "placed" 2 {})))
(let
((db2 (persist/mock-durable disk)))
(equal?
(persist/project-value
(persist/replay db2 "orders" "count" rec-count 0))
(persist/project-fold db2 "orders" rec-count 0)))))
true)
(persist-test
"compacted log still replays correctly after restart"
(let
((disk (persist/mem-backend)))
(begin
(let
((db (persist/mock-durable disk)))
(begin
(persist/append db "orders" "placed" 0 {})
(persist/append db "orders" "placed" 1 {})
(persist/append db "orders" "placed" 2 {})
(persist/compact db "orders" "count" rec-count 0)
(persist/append db "orders" "placed" 3 {})))
(let
((db2 (persist/mock-durable disk)))
(persist/project-value
(persist/replay db2 "orders" "count" rec-count 0)))))
4)
(persist-test
"invoice blob ref survives restart, bytes fetched from content store"
(let
((disk (persist/mem-backend)) (store (persist/mem-backend)))
(begin
(let
((db (persist/mock-durable disk)) (blob (persist/mock-blob store)))
(persist/kv-put
db
"invoice"
(persist/blob-store blob "INVOICEPDF" "application/pdf")))
(let
((db2 (persist/mock-durable disk))
(blob2 (persist/mock-blob store)))
(persist/blob-fetch blob2 (persist/kv-get db2 "invoice")))))
"INVOICEPDF")
(persist-test
"two independent restarts converge to the same state (determinism)"
(let
((disk (persist/mem-backend)))
(begin
(let
((db (persist/mock-durable disk)))
(begin
(persist/append db "orders" "placed" 0 {})
(persist/append db "orders" "placed" 1 {})
(persist/append db "orders" "placed" 2 {})))
(equal?
(persist/project-fold
(persist/mock-durable disk)
"orders"
rec-count
0)
(persist/project-fold
(persist/mock-durable disk)
"orders"
rec-count
0))))
true)

View File

@@ -0,0 +1,114 @@
; Phase 3 — snapshots + replay. Headline: snapshot + tail == full replay.
(define snap-count (fn (acc e) (+ acc 1)))
(persist-test
"no snapshot loads fresh seed state"
(persist/snapshot-load (persist/open) "feed" 0)
{:value 0 :seq 0})
(persist-test
"snapshot-exists? false initially"
(persist/snapshot-exists? (persist/open) "feed")
false)
(persist-test
"checkpoint stores a snapshot"
(let
((b (persist/open)))
(begin
(persist/append b "s" "x" 0 {})
(persist/checkpoint b "s" "snap" snap-count 0)
(persist/snapshot-exists? b "snap")))
true)
(persist-test
"checkpoint value equals full projection"
(let
((b (persist/open)))
(begin
(persist/append b "s" "x" 0 {})
(persist/append b "s" "x" 0 {})
(persist/append b "s" "x" 0 {})
(persist/project-value
(persist/checkpoint b "s" "snap" snap-count 0))))
3)
(persist-test
"checkpoint records the last seq"
(let
((b (persist/open)))
(begin
(persist/append b "s" "x" 0 {})
(persist/append b "s" "x" 0 {})
(persist/project-seq
(persist/checkpoint b "s" "snap" snap-count 0))))
2)
(persist-test
"replay after checkpoint only folds the tail"
(let
((b (persist/open)))
(begin
(persist/append b "s" "x" 0 {})
(persist/append b "s" "x" 0 {})
(persist/checkpoint b "s" "snap" snap-count 0)
(persist/append b "s" "x" 0 {})
(persist/project-value
(persist/replay b "s" "snap" snap-count 0))))
3)
(persist-test
"snapshot + tail == full replay (value)"
(let
((b (persist/open)))
(begin
(persist/append b "s" "x" 0 {})
(persist/append b "s" "x" 0 {})
(persist/checkpoint b "s" "snap" snap-count 0)
(persist/append b "s" "x" 0 {})
(persist/append b "s" "x" 0 {})
(equal?
(persist/project-value
(persist/replay b "s" "snap" snap-count 0))
(persist/project-fold b "s" snap-count 0))))
true)
(persist-test
"snapshot + tail == full replay (whole state)"
(let
((b (persist/open)))
(begin
(persist/append b "s" "x" 0 {})
(persist/checkpoint b "s" "snap" snap-count 0)
(persist/append b "s" "x" 0 {})
(persist/append b "s" "x" 0 {})
(equal?
(persist/replay b "s" "snap" snap-count 0)
(persist/project b "s" snap-count 0))))
true)
(persist-test
"replay determinism: two replays from same snapshot agree"
(let
((b (persist/open)))
(begin
(persist/append b "s" "x" 0 {})
(persist/checkpoint b "s" "snap" snap-count 0)
(persist/append b "s" "x" 0 {})
(equal?
(persist/replay b "s" "snap" snap-count 0)
(persist/replay b "s" "snap" snap-count 0))))
true)
(persist-test
"re-checkpoint advances the snapshot"
(let
((b (persist/open)))
(begin
(persist/append b "s" "x" 0 {})
(persist/checkpoint b "s" "snap" snap-count 0)
(persist/append b "s" "x" 0 {})
(persist/checkpoint b "s" "snap" snap-count 0)
(persist/project-seq (persist/snapshot-load b "snap" 0))))
2)
(persist-test
"snapshots are keyed independently"
(let
((b (persist/open)))
(begin
(persist/append b "s" "x" 0 {})
(persist/checkpoint b "s" "a" snap-count 0)
(persist/snapshot-exists? b "b")))
false)

View File

@@ -0,0 +1,130 @@
; Phase 2 — subscription hub: callbacks fire on publish, drive read models.
(persist-test
"no subscribers initially"
(persist/subscriber-count (persist/hub (persist/open)) "s")
0)
(persist-test
"subscribe registers a callback"
(let
((h (persist/hub (persist/open))))
(begin
(persist/subscribe h "s" (fn (b s e) nil))
(persist/subscriber-count h "s")))
1)
(persist-test
"publish appends to the log"
(let
((b (persist/open)))
(let
((h (persist/hub b)))
(begin
(persist/publish h "s" "x" 0 {})
(persist/publish h "s" "x" 0 {})
(persist/count b "s"))))
2)
(persist-test
"publish returns the stored event"
(let
((h (persist/hub (persist/open))))
(persist/event-seq (persist/publish h "s" "x" 0 {:id 1})))
1)
(persist-test
"callback fires on publish — drives a kv read model"
(let
((b (persist/open)))
(let
((h (persist/hub b)))
(begin
(persist/subscribe
h
"s"
(fn
(bk s e)
(persist/kv-update
bk
"count"
0
(fn (n) (+ n 1)))))
(persist/publish h "s" "x" 0 {})
(persist/publish h "s" "x" 0 {})
(persist/publish h "s" "x" 0 {})
(persist/kv-get b "count"))))
3)
(persist-test
"callback receives the event"
(let
((b (persist/open)))
(let
((h (persist/hub b)))
(begin
(persist/subscribe
h
"s"
(fn (bk s e) (persist/kv-put bk "last" (persist/event-type e))))
(persist/publish h "s" "created" 0 {})
(persist/kv-get b "last"))))
"created")
(persist-test
"subscriptions are per-stream"
(let
((b (persist/open)))
(let
((h (persist/hub b)))
(begin
(persist/subscribe
h
"s1"
(fn
(bk s e)
(persist/kv-update bk "n" 0 (fn (n) (+ n 1)))))
(persist/publish h "s2" "x" 0 {})
(persist/kv-get-or b "n" 0))))
0)
(persist-test
"multiple subscribers all fire"
(let
((b (persist/open)))
(let
((h (persist/hub b)))
(begin
(persist/subscribe
h
"s"
(fn
(bk s e)
(persist/kv-update bk "a" 0 (fn (n) (+ n 1)))))
(persist/subscribe
h
"s"
(fn
(bk s e)
(persist/kv-update bk "b" 0 (fn (n) (+ n 10)))))
(persist/publish h "s" "x" 0 {})
(list (persist/kv-get b "a") (persist/kv-get b "b")))))
(list 1 10))
(persist-test
"incremental read model via resume in callback"
(let
((b (persist/open)))
(let
((h (persist/hub b)))
(begin
(persist/kv-put b "proj" {:value 0 :seq 0})
(persist/subscribe
h
"s"
(fn
(bk s e)
(persist/kv-put
bk
"proj"
(persist/project-resume
bk
s
(fn (acc ev) (+ acc 1))
(persist/kv-get bk "proj")))))
(persist/publish h "s" "x" 0 {})
(persist/publish h "s" "x" 0 {})
(persist/project-value (persist/kv-get b "proj")))))
2)

115
lib/persist/tests/upcast.sx Normal file
View File

@@ -0,0 +1,115 @@
; Extension — event schema evolution via upcasters.
; v1 "placed" events had {:total N}; v2 wants {:amount N :currency "GBP"}.
(define up-placed (fn (e) (persist/upcast-data e {:amount (get (persist/event-data e) :total) :currency "GBP"})))
(persist-test
"unregistered type passes through unchanged"
(let
((reg (persist/upcasters)))
(persist/event-data
(persist/upcast
reg
(persist/event "s" 1 "other" 0 {:x 1}))))
{:x 1})
(persist-test
"registered upcaster lifts an old event"
(let
((reg (persist/register-upcaster (persist/upcasters) "placed" up-placed)))
(get
(persist/event-data
(persist/upcast
reg
(persist/event "s" 1 "placed" 0 {:total 50})))
:amount))
50)
(persist-test
"upcaster adds the new field"
(let
((reg (persist/register-upcaster (persist/upcasters) "placed" up-placed)))
(get
(persist/event-data
(persist/upcast
reg
(persist/event "s" 1 "placed" 0 {:total 50})))
:currency))
"GBP")
(persist-test
"upcast preserves stream/seq/type/at"
(let
((reg (persist/register-upcaster (persist/upcasters) "placed" up-placed)))
(let
((e (persist/upcast reg (persist/event "orders" 7 "placed" 99 {:total 1}))))
(list
(persist/event-seq e)
(persist/event-at e)
(persist/event-type e))))
(list 7 99 "placed"))
(persist-test
"registry is immutable — register returns a new dict"
(let
((r0 (persist/upcasters)))
(begin
(persist/register-upcaster r0 "placed" up-placed)
(has-key? r0 "placed")))
false)
(persist-test
"read-upcast lifts every event in a stream"
(let
((b (persist/open))
(reg
(persist/register-upcaster (persist/upcasters) "placed" up-placed)))
(begin
(persist/append b "orders" "placed" 0 {:total 10})
(persist/append b "orders" "placed" 0 {:total 20})
(let
((es (persist/read-upcast b "orders" reg)))
(list
(get (persist/event-data (nth es 0)) :amount)
(get (persist/event-data (nth es 1)) :amount)))))
(list 10 20))
(persist-test
"project-upcast folds over the current shape"
(let
((b (persist/open))
(reg
(persist/register-upcaster (persist/upcasters) "placed" up-placed)))
(begin
(persist/append b "orders" "placed" 0 {:total 10})
(persist/append b "orders" "placed" 0 {:total 20})
(persist/project-upcast
b
"orders"
reg
(fn (acc e) (+ acc (get (persist/event-data e) :amount)))
0)))
30)
(persist-test
"mixed old and new events fold uniformly"
(let
((b (persist/open))
(reg
(persist/register-upcaster (persist/upcasters) "placed" up-placed)))
(begin
(persist/append b "orders" "placed" 0 {:total 5})
(persist/append b "orders" "placed" 0 {:total 7 :amount 7})
(persist/project-upcast
b
"orders"
reg
(fn (acc e) (+ acc (get (persist/event-data e) :amount)))
0)))
12)
(persist-test
"upcast works on the durable backend"
(let
((db (persist/mock-durable (persist/mem-backend)))
(reg
(persist/register-upcaster (persist/upcasters) "placed" up-placed)))
(begin
(persist/append db "orders" "placed" 0 {:total 42})
(get
(persist/event-data
(nth (persist/read-upcast db "orders" reg) 0))
:amount)))
42)

105
lib/persist/tests/view.sx Normal file
View File

@@ -0,0 +1,105 @@
; Extension — materialized views: stay current on write, read O(1) via peek.
(define vw-count (fn (acc e) (+ acc 1)))
(define vw (persist/view "order-count" "orders" vw-count 0))
(persist-test "view-name" (persist/view-name vw) "order-count")
(persist-test "view-stream" (persist/view-stream vw) "orders")
(persist-test
"view-value folds the stream"
(let
((b (persist/open)))
(begin
(persist/append b "orders" "x" 0 {})
(persist/append b "orders" "x" 0 {})
(persist/view-value b vw)))
2)
(persist-test
"view-refresh persists a snapshot that peek then reads"
(let
((b (persist/open)))
(begin
(persist/append b "orders" "x" 0 {})
(persist/view-refresh b vw)
(persist/view-peek b vw)))
1)
(persist-test
"peek lags an un-refreshed tail"
(let
((b (persist/open)))
(begin
(persist/append b "orders" "x" 0 {})
(persist/view-refresh b vw)
(persist/append b "orders" "x" 0 {})
(persist/view-peek b vw)))
1)
(persist-test
"view-value sees the whole stream even after a stale snapshot"
(let
((b (persist/open)))
(begin
(persist/append b "orders" "x" 0 {})
(persist/view-refresh b vw)
(persist/append b "orders" "x" 0 {})
(persist/view-value b vw)))
2)
(persist-test
"attached view stays current on publish — peek needs no manual refresh"
(let
((b (persist/open)))
(let
((h (persist/view-attach (persist/hub b) vw)))
(begin
(persist/publish h "orders" "x" 0 {})
(persist/publish h "orders" "x" 0 {})
(persist/publish h "orders" "x" 0 {})
(persist/view-peek b vw))))
3)
(persist-test
"attached view advances the snapshot seq incrementally"
(let
((b (persist/open)))
(let
((h (persist/view-attach (persist/hub b) vw)))
(begin
(persist/publish h "orders" "x" 0 {})
(persist/publish h "orders" "x" 0 {})
(persist/project-seq
(persist/snapshot-load b "order-count" 0)))))
2)
(persist-test
"attach only reacts to its own stream"
(let
((b (persist/open)))
(let
((h (persist/view-attach (persist/hub b) vw)))
(begin
(persist/publish h "other" "x" 0 {})
(persist/view-peek b vw))))
0)
(persist-test
"materialized view works on the durable backend"
(let
((db (persist/mock-durable (persist/mem-backend))))
(let
((h (persist/view-attach (persist/hub db) vw)))
(begin
(persist/publish h "orders" "x" 0 {})
(persist/publish h "orders" "x" 0 {})
(persist/view-peek db vw))))
2)
(persist-test
"view sum over event data"
(let
((b (persist/open))
(sumv
(persist/view
"rev"
"sales"
(fn (acc e) (+ acc (get (persist/event-data e) :amt)))
0)))
(begin
(persist/append b "sales" "sale" 0 {:amt 10})
(persist/append b "sales" "sale" 1 {:amt 25})
(persist/view-value b sumv)))
35)

44
lib/persist/upcast.sx Normal file
View File

@@ -0,0 +1,44 @@
; persist/upcast — event schema evolution. An append-only log keeps events
; forever, so old events have old shapes. Rather than migrate stored data (you
; can't rewrite history) or branch every projection on version, register an
; upcaster per event type: a pure (event -> event) that lifts an old event to
; the current shape. Reads pass through the registry so projections see ONE
; shape. The registry is an immutable dict the consumer threads (no global
; mutable state). Requires: lib/persist/event.sx, lib/persist/log.sx.
(define persist/upcasters (fn () {}))
(define persist/register-upcaster (fn (reg type fn) (assoc reg type fn)))
; apply the registered upcaster for an event's type, or pass it through unchanged
(define
persist/upcast
(fn
(reg e)
(let ((f (get reg (persist/event-type e)))) (if f (f e) e))))
; read a stream with every event lifted to current shape
(define
persist/read-upcast
(fn
(b stream reg)
(map (fn (e) (persist/upcast reg e)) (persist/read b stream))))
; project over upcasted events — projections never see a legacy shape
(define
persist/project-upcast
(fn
(b stream reg step seed)
(reduce step seed (persist/read-upcast b stream reg))))
; helper: upcast an event's :data by merging in/overriding fields, keeping the
; record's stream/seq/type/at. Common upcaster body.
(define
persist/upcast-data
(fn
(e new-data)
(persist/event
(persist/event-stream e)
(persist/event-seq e)
(persist/event-type e)
(persist/event-at e)
(merge (persist/event-data e) new-data))))

49
lib/persist/view.sx Normal file
View File

@@ -0,0 +1,49 @@
; persist/view — a materialized view: the consumer-facing read model. It bundles
; a stream, a fold (step + seed) and a snapshot name. Attached to a hub it
; refreshes incrementally on every publish, so the materialized value stays
; current on write and reads are O(1) snapshot loads (persist/view-peek) instead
; of a full fold. This is what feed indices, mod audit rollups, search counters,
; etc. sit on. Requires: lib/persist/snapshot.sx, lib/persist/subscribe.sx.
(define persist/view (fn (name stream step seed) {:name name :step step :stream stream :seed seed}))
(define persist/view-name (fn (v) (get v :name)))
(define persist/view-stream (fn (v) (get v :stream)))
; bring the view's snapshot up to date with the log tail; returns the state
(define
persist/view-refresh
(fn
(b v)
(persist/checkpoint
b
(get v :stream)
(get v :name)
(get v :step)
(get v :seed))))
; current materialized value — refreshes first, so never stale
(define
persist/view-value
(fn (b v) (persist/project-value (persist/view-refresh b v))))
; O(1) read of the last persisted snapshot value WITHOUT folding the tail. Equal
; to view-value when the view is attached (kept current on every publish);
; otherwise may lag the log by the un-refreshed tail.
(define
persist/view-peek
(fn
(b v)
(persist/project-value
(persist/snapshot-load b (get v :name) (get v :seed)))))
; attach to a hub: refresh the view on every publish to its stream
(define
persist/view-attach
(fn
(h v)
(begin
(persist/subscribe
h
(persist/view-stream v)
(fn (bk s e) (persist/view-refresh bk v)))
h)))

View File

@@ -0,0 +1,115 @@
# persist-on-sx loop agent (single agent, queue-driven)
Role: iterates `plans/persist-on-sx.md` forever. **Durable state on the SX kernel**
— the foundation substrate every other subsystem currently fakes with an in-memory
mutable list. Event log (append-only streams) + kv (current-state) over one
injectable backend; pure projections; snapshots; durable IO at the kernel's
`perform` boundary. This is **substrate-level**, not a guest language.
```
description: persist-on-sx queue loop
subagent_type: general-purpose
run_in_background: true
isolation: worktree
```
## Prompt
You are the sole background agent working `plans/persist-on-sx.md`. Isolated
worktree `/root/rose-ash-loops/persist` on branch `loops/persist`, forever, one
commit per feature. Push to `origin/loops/persist` after every commit. Never touch
`main` or `architecture`.
## Restart baseline — check before iterating
1. Read `plans/persist-on-sx.md` — roadmap + Progress log. Note the scope table:
persist owns the **log** + **kv** facets; blobs are delegated (store the CID,
not the bytes); cache is out of scope. Do not event-source everything.
2. `ls lib/persist/` — pick up from the most advanced file.
3. If `lib/persist/tests/*.sx` exist, run them via `bash lib/persist/conformance.sh`.
Green before new work.
4. If `lib/persist/scoreboard.md` exists, that's your baseline.
5. **Learn the substrate before writing durable code.** persist sits on the kernel's
IO-suspension surface — the third CEK phase: `perform`, `cek-step-loop`,
`cek-resume`, `make-cek-suspended`. Study how IO is requested and resumed, and
how `spec/harness.sx` mocks an IO platform for tests (assert-io-*). Phases 13
need NO real IO — the in-memory backend is pure SX. Real durable IO (Phase 4)
goes through `perform` and is tested against the mock-IO harness, not a real disk.
Verify the actual exported names with sx_find_all / grep before relying on them.
## The queue
Phase order per `plans/persist-on-sx.md`:
- **Phase 1** — log + kv + in-memory backend (event record, injectable backend
protocol, append/read, kv get/put/delete, api).
- **Phase 2** — projections (`fold step seed`) + subscriptions; concurrency
conflict as a real result.
- **Phase 3** — snapshots + replay (checkpoint, replay = snapshot + tail,
determinism).
- **Phase 4** — durable backend via kernel IO (`perform`), blob-ref interface,
crash/restart replay against the mock-IO harness.
Within a phase, pick the checkbox that unlocks the most tests per effort.
Every iteration: implement → test → commit → tick `[ ]` → Progress log → next.
## Ground rules (hard)
- **Scope:** only `lib/persist/**` and `plans/persist-on-sx.md`. Do **not** edit
`spec/`, `hosts/`, `shared/`, or any `lib/<lang>/`. You may **import** the
kernel's IO-suspension + platform-IO surface only. **Do NOT add host primitives.**
If a durable IO op you need doesn't exist, it belongs in `hosts/` (out of scope) →
Blockers entry with a minimal repro, and stop on that item.
- **NEVER call `sx_build`.** 600s watchdog. If the sx_server binary is broken →
Blockers entry, stop. Run tests by invoking the sx_server binary directly from a
conformance.sh (model it on an existing one, e.g. `lib/apl/conformance.sh`),
pointing `SX_SERVER` at `/root/rose-ash/hosts/ocaml/_build/default/bin/sx_server.exe`
— fresh worktrees have no `_build/`.
- **Determinism:** replay must be pure — same log → same state. No clocks/randomness
inside projections; timestamps live on the event, passed in.
- **Shared-file issues** → plan's Blockers with minimal repro; don't fix here.
- **SX files:** `sx-tree` MCP tools ONLY. **They take `file:` not `path:`** — a
wrong key yields `Yojson Type_error("Expected string, got null")`, which looks
like a broken binary but is just a param mismatch. `sx_validate` after edits.
Path-based edits (`sx_replace_node`) count comment headers in their indices and
can clobber the wrong node — re-read after, or prefer `sx_write_file` for small
files.
- **Unicode in `.sx`:** raw UTF-8 only, never `\uXXXX` escapes.
- **Commit granularity:** one feature per commit. Short factual messages
(`persist: kv facet get/put/delete + 6 tests`). Push to `origin/loops/persist`.
- **Plan file:** update Progress log (newest first) + tick boxes every commit.
## persist-specific gotchas
- **Two facets, not one.** Don't force current-state values (a stock count, a
config value, a session blob) through the event log — that's the kv facet. Event
log is for things whose *history* matters.
- **Backend is injected.** The in-memory backend is the test default; never hardwire
it. Every op goes through the backend protocol so file/pg/ipfs swap in unchanged.
- **Optimistic concurrency is a real result.** A conflicting append returns a
conflict value the caller can retry on — not a crash, not a silent overwrite.
- **Blobs by reference only.** persist stores a content-address/CID + metadata. The
bytes live in a content-addressed store (artdag/IPFS). Never put large payloads in
the log.
- **Replay determinism is the headline property.** Snapshot + tail must equal full
replay. Test it explicitly, both directions.
## General gotchas (all loops)
- SX `do` = R7RS iteration. Use `begin` for multi-expr sequences.
- `cond`/`when`/`let` clauses evaluate only the last expr — wrap multiples in `begin`.
- `let` is parallel, not sequential — nest `let`s when a binding references an earlier one.
- `env-bind!` creates a binding; `env-set!` mutates an existing one (walks scope chain).
- `sx_validate` after every structural edit.
- Namespace-prefix all helpers (`persist/...`) — short/host-colliding names get
silently shadowed or hang the runtime.
## Style
- No comments in `.sx` unless non-obvious.
- No new planning docs — update `plans/persist-on-sx.md` inline.
- Short, factual commit messages.
- One feature per iteration. Commit. Log. Push. Next.
Go. Start by reading the plan; find the first unchecked `[ ]`; implement it.

82
plans/commerce-on-sx.md Normal file
View File

@@ -0,0 +1,82 @@
# commerce-on-sx: Catalog, cart, pricing & orders on miniKanren
> **DRAFT outline.** The revenue vertical. Depends on `persist-on-sx` (durable
> orders) and `flow-on-sx` (checkout as a durable flow). Don't start before
> persist-on-sx Phase 1 is green.
rose-ash's revenue engine — market (catalog), cart (checkout), orders (SumUp
payment, reconciliation) — has no SX subsystem. The hard part of commerce isn't
CRUD; it's **pricing**: discounts, bundles, tax, membership rates, promotions that
stack (or don't). These are relations, and a relational engine can run them in
multiple directions — forward ("what's the total?") and backward ("what promo code
yields this total?", "which line item triggered the discount?").
That's a miniKanren fit. Pricing/promotion rules are relational; cart and order
*lifecycle* (reserve → pay → fulfil → reconcile) is a durable `flow`; the order
ledger is a `persist` stream. Commerce is the first real **composition** subsystem.
End-state: a catalog model, a relational pricing/promotion engine, a cart with
deterministic totals, and an order lifecycle flow with payment-webhook
reconciliation — all auditable via the event log.
## Status (rolling)
`bash lib/commerce/conformance.sh`**0/0** (not yet started)
## Ground rules
- **Scope:** only `lib/commerce/**` and `plans/commerce-on-sx.md`. May **import**
from `lib/minikanren/`, and (once they exist) `lib/persist/` + `lib/flow/`. Do not
edit substrates.
- **Architecture:** prices/promotions are miniKanren relations over catalog facts;
a cart total is a *deterministic* query result (first solution under a fixed rule
order). Order lifecycle is a `flow` that suspends at the payment IO boundary.
Money is integer minor units — never floats.
- **Determinism:** promotion stacking must have explicit, tested precedence;
totals must be reproducible from the cart + catalog snapshot.
- **Commits:** one feature per commit. Progress log + tick boxes.
## Architecture sketch
```
Catalog + cart Total / order
product(id,price,tags) {:subtotal :discounts :tax :total}
│ ▲
▼ │
lib/commerce/catalog.sx lib/commerce/price.sx
— product / variant / stock facts — miniKanren pricing relations
│ — promo stacking, membership rates
▼ ▲
lib/commerce/cart.sx lib/commerce/order.sx (flow + store)
— line items, quantities — reserve→pay→fulfil→reconcile
│ — SumUp webhook = flow resume
▼ │
lib/commerce/api.sx ── (commerce/add) (commerce/total) (commerce/checkout) ──┘
```
## Phase 1 — Catalog + cart + deterministic totals
- [ ] `catalog.sx` — product/variant/stock as facts
- [ ] `cart.sx` — line items, add/remove/qty
- [ ] `price.sx` — base pricing relation, subtotal; tax
- [ ] `api.sx` + tests + scoreboard + conformance.sh
## Phase 2 — Promotions (relational)
- [ ] promo rules: percentage, fixed, bundle, member rate
- [ ] explicit stacking precedence; "best price" backward query
- [ ] tests: stacking order, mutually-exclusive promos, member vs guest
## Phase 3 — Order lifecycle (flow + store)
- [ ] order flow: reserve stock → await payment → fulfil
- [ ] payment webhook resumes the suspended flow
- [ ] order ledger as a `persist` stream; idempotent reconciliation
## Phase 4 — Reconciliation + federation
- [ ] mismatch detection (paid≠ordered) as queries over the ledger
- [ ] cross-instance catalog (federated marketplace) — out-of-scope stub
- [ ] tests: webhook replay, partial refund, double-charge guard
## Progress log
(loop fills this in)
## Blockers
(loop fills this in)

82
plans/content-on-sx.md Normal file
View File

@@ -0,0 +1,82 @@
# content-on-sx: Documents, blocks & collaborative editing on Smalltalk
> **DRAFT outline.** The CMS vertical — blog, WYSIWYG editor, Ghost sync. Depends
> on `persist-on-sx` (document history as an event log). Ghost/CMS sync stays a thin
> external adapter (Python/FFI) until a native replacement exists.
rose-ash's `blog` domain is content management: a block-based WYSIWYG editor,
navigation, Ghost CMS sync. A document is a tree of live blocks; editing is a
stream of operations; collaboration needs conflict-free merge. That is an object
model — blocks are objects, edits are messages, and a document is the object graph
responding to them. Smalltalk's "everything is an object responding to messages"
maps directly to a block/WYSIWYG model, and a semilattice (CRDT) merge keeps
concurrent edits conflict-free.
End-state: a Smalltalk-on-SX document model (typed blocks, structural ops),
operation log + CRDT merge for collaborative editing, versioning/history via the
event store, and a render boundary to HTML/SX. External CMS (Ghost) sync is an
injected adapter, not core.
## Status (rolling)
`bash lib/content/conformance.sh`**0/0** (not yet started)
## Ground rules
- **Scope:** only `lib/content/**` and `plans/content-on-sx.md`. May **import**
from `lib/smalltalk/`, and (once it exists) `lib/persist/`. Do not edit substrates.
- **Architecture:** a document is an ordered tree of blocks (objects); an edit is a
message (`insert`/`update`/`move`/`delete`); concurrent edits merge via a
commutative (CRDT/semilattice) operation so order doesn't matter. History is the
`persist` event stream; any version is a replay.
- **Determinism:** merge must be commutative + idempotent (test: apply ops in any
order / twice → same document).
- **Commits:** one feature per commit. Progress log + tick boxes.
## Architecture sketch
```
Edit op Rendered document
(insert block after id) ... HTML / SX tree
│ ▲
▼ │
lib/content/block.sx lib/content/render.sx
— typed blocks as objects — block tree → HTML/SX
— heading/text/image/embed — (reuses SX render boundary)
│ ▲
▼ │
lib/content/doc.sx lib/content/merge.sx
— ordered block tree — CRDT/semilattice op merge
— apply op, structural moves — concurrent-edit reconciliation
│ ▲
▼ │
lib/content/api.sx ── (content/edit) (content/render) (content/history) ──┐
│ │
├── op log + versions → persist │
└── Ghost/CMS sync → injected external adapter (thin, non-core) ──┘
```
## Phase 1 — Block document model
- [ ] `block.sx` — typed block objects
- [ ] `doc.sx` — ordered tree, apply edit op, structural moves
- [ ] `render.sx` — block tree → HTML/SX
- [ ] `api.sx` + tests + scoreboard + conformance.sh
## Phase 2 — Op log + versioning
- [ ] edit ops as `persist` events; replay to any version
- [ ] `(content/history doc)`, diff between versions
## Phase 3 — Collaborative merge (CRDT)
- [ ] commutative/idempotent op merge
- [ ] concurrent-edit tests (any order, double-apply → identical)
## Phase 4 — External sync + federation
- [ ] Ghost/CMS sync via injected adapter (import/export)
- [ ] federated documents (peer-authored blocks) — trust-gated stub
- [ ] tests: round-trip import/export, conflict on concurrent external edit
## Progress log
(loop fills this in)
## Blockers
(loop fills this in)

81
plans/events-on-sx.md Normal file
View File

@@ -0,0 +1,81 @@
# events-on-sx: Calendar, ticketing & notification delivery on Datalog
> **DRAFT outline.** The events vertical + the shared notification-delivery edge.
> Depends on `persist-on-sx` (bookings ledger) and `flow-on-sx` (reminders, retrying
> delivery). Pairs with `commerce-on-sx` for paid tickets.
rose-ash's `events` domain is calendar + ticketing: recurring events, availability,
capacity, bookings. Scheduling is constraint reasoning — "is this slot free given
recurrence, capacity, and the attendee's other bookings?" — which is rule
evaluation over facts. Datalog expresses availability, recurrence expansion, and
capacity as rules; a booking is a transaction; reminders and digests are durable
`flow`s. Notification *delivery* (email/push) — needed here and by `feed/notify`
is folded in as an injected transport, extractable later.
End-state: a Datalog-on-SX events layer with recurrence expansion, availability +
capacity rules, transactional booking, and a flow-driven notification dispatcher
(reminders, digests, retries) over an injected transport.
## Status (rolling)
`bash lib/events/conformance.sh`**0/0** (not yet started)
## Ground rules
- **Scope:** only `lib/events/**` and `plans/events-on-sx.md`. May **import** from
`lib/datalog/`, and (once they exist) `lib/persist/` + `lib/flow/`. Do not edit
substrates.
- **Architecture:** events/availability/capacity are Datalog facts + rules;
recurrence expands to occurrence facts within a window; a booking checks rules
then appends a `persist` event (idempotent, capacity-safe). Notifications are flows
that suspend on transport IO and retry on failure.
- **Determinism:** recurrence expansion + availability must be reproducible for a
fixed window + ruleset; capacity checks must be race-safe (no overbooking).
- **Commits:** one feature per commit. Progress log + tick boxes.
## Architecture sketch
```
Event + booking Result
event(id,start,rrule,capacity) {:booked | :full | :conflict} + reminders
│ ▲
▼ │
lib/events/calendar.sx lib/events/availability.sx
— event facts, recurrence (RRULE) — free/busy + capacity rules (Datalog)
— expand occurrences in window │
│ ▲
▼ │
lib/events/booking.sx lib/events/notify.sx (flow)
— transactional, capacity-safe — reminders / digests, retry on fail
— bookings → persist ledger — injected transport (email/push)
│ │
▼ ▼
lib/events/api.sx ── (events/schedule) (events/book) (events/agenda) ──────┘
```
## Phase 1 — Calendar + recurrence
- [ ] `calendar.sx` — event facts, RRULE expansion in a window
- [ ] `availability.sx` — free/busy rules
- [ ] `api.sx` + tests + scoreboard + conformance.sh
## Phase 2 — Ticketing + booking
- [ ] capacity rules; transactional booking → `persist` (no overbooking)
- [ ] paid tickets compose with `commerce` order flow
- [ ] tests: capacity edge, double-book guard, conflict detection
## Phase 3 — Notification delivery (flow)
- [ ] `notify.sx` — reminder/digest flows over injected transport
- [ ] retry/backoff on transport failure (flow suspend/resume)
- [ ] tests: delivery success, retry path, idempotent re-send
- [ ] NOTE: shared with `feed/notify` — candidate for later extraction to a
`delivery-on-sx` once a second consumer is real
## Phase 4 — Federation
- [ ] cross-instance events (peer calendar) — trust-gated stub
- [ ] tests: federated agenda merge
## Progress log
(loop fills this in)
## Blockers
(loop fills this in)

100
plans/host-on-sx.md Normal file
View File

@@ -0,0 +1,100 @@
# host-on-sx: The SX web host — off Quart, onto the kernel (Dream-bound)
> **DRAFT outline.** The integration boundary that turns the subsystem libraries
> into running services, and the strangler path off Python/Quart. This is the
> dependency hub — it imports every subsystem. Decision recorded below: native
> server + SXTP **now**, `dream-on-sx` framework layer **next**, Python only at the
> external-integration edges.
The subsystems (`feed`, `search`, `acl`, `mod`, `flow`, `commerce`, `identity`,
`content`, `events`) are libraries. Something has to receive an HTTP request, route
it, call the right subsystem, and serialize the response. Today that's Python/Quart
— the one large non-SX component in the stack: separate runtime, deploy, and
failure mode. The goal is to move the web/host/domain layer onto the SX substrate
and retire Quart, **incrementally (strangler-fig), never big-bang.**
This is already underway: a native OCaml HTTP server is live in prod on
`sx.rose-ash.com` (~3ms cached, ~323 req/s, ~2MB RSS), `defhandler`/`defpage`
exist, and a partial **SXTP** protocol is specced. That is the unblocked near-term
host — no `ocaml-on-sx` dependency.
## Two layers, two timelines
1. **Now (unblocked): native server + SXTP adapter + SX handlers.** Route rose-ash
endpoints onto the SX host one at a time. Each migrated endpoint is an SX
handler dispatching to a subsystem; Quart proxies the rest until cut over.
2. **Next: `dream-on-sx` as the framework layer.** Dream gives Quart-grade
ergonomics — typed routing, middleware stacks, sessions, CSRF. It is gated on
`ocaml-on-sx` Phases 15 + minimal stdlib. **This plan is the concrete target
user that un-parks `dream-on-sx`** (see `plans/dream-on-sx.md`): "the subsystems
need an HTTP front door" is the real feature pulling Dream. Until then, do not
block migration on Dream — the native server is sufficient.
3. **Always: Python only at the edges.** External integrations — SumUp payments,
Ghost CMS, ActivityPub crypto, IPFS/Kubo — ride Python libraries today. They
stay as thin injected adapters (Python/FFI) behind subsystem interfaces until
native replacements exist. "Drop Quart" ≠ "drop every line of Python."
## Status (rolling)
`bash lib/host/conformance.sh`**0/0** (not yet started)
## Ground rules
- **Scope:** `lib/host/**` and `plans/host-on-sx.md`. May **import** every subsystem
+ the kernel's server/SXTP surface. Do **not** edit `spec/`, `hosts/`, `shared/`,
or subsystem internals — wire to their public APIs only. Host-primitive / server
changes belong in `hosts/` (out of scope) → Blockers.
- **Architecture:** a route maps (method, path) → handler; a handler is an SX fn
`request -> response` that calls subsystem APIs; middleware is composed handlers
(auth via `identity`, permission via `acl`, mute via subsystem prefs). SXTP is the
wire format between host and subsystem-as-service.
- **Migration discipline:** each endpoint moved must be behavior-equivalent to its
Quart original (golden-response test before flip). Keep a migration ledger.
- **Commits:** one feature per commit. Progress log + tick boxes.
## Architecture sketch
```
HTTP request HTTP response
│ ▲
▼ │
native OCaml http server (prod) ──────► lib/host/router.sx
(hosts/ — out of scope) — (method,path) → handler
│ ▲
▼ │
lib/host/middleware.sx lib/host/handler.sx
— auth(identity) ∘ acl ∘ mute ∘ ... — request → subsystem call → response
│ ▲
▼ │
lib/host/sxtp.sx subsystem APIs (feed/search/commerce/…)
— wire format, host↔service — called via public interfaces
└── external edges: SumUp / Ghost / AP / IPFS → injected Python/FFI adapters
```
## Phase 1 — Router + handler + one real endpoint
- [ ] `router.sx` — route table, (method,path) match
- [ ] `handler.sx` — request/response model, subsystem dispatch
- [ ] migrate ONE read endpoint (e.g. a feed timeline) end-to-end, golden test
- [ ] `conformance.sh` + scoreboard
## Phase 2 — Middleware + SXTP
- [ ] `middleware.sx` — composable auth/acl/mute/error layers
- [ ] `sxtp.sx` — host↔subsystem wire format (align with existing spec)
- [ ] migrate a write endpoint (auth + permission + action)
## Phase 3 — Strangler migration ledger
- [ ] enumerate Quart endpoints; track migrated vs proxied
- [ ] golden-response harness vs the live Quart responses
- [ ] cut over a whole domain (smallest: `likes` or `relations`) as proof
## Phase 4 — Dream framework layer (gated)
- [ ] gate: `ocaml-on-sx` Phases 15 + minimal stdlib green
- [ ] adopt `dream-on-sx` routing/middleware/session ergonomics over the same handlers
- [ ] re-home external adapters as native where replacements land
## Progress log
(loop fills this in)
## Blockers
(loop fills this in)

84
plans/identity-on-sx.md Normal file
View File

@@ -0,0 +1,84 @@
# identity-on-sx: OAuth2, sessions & membership on Erlang
> **DRAFT outline.** The identity core `acl-on-sx` assumes already exists. `acl`
> answers "may X do Y"; identity answers "who is X, and how did they prove it."
> Depends on `persist-on-sx` (grant/audit ledger). Pairs with `acl-on-sx`.
rose-ash's `account` domain is the OAuth2 authorization server every other app is
a client of: silent SSO, per-app first-party cookies, grant verification,
membership. Sessions and grants are **long-lived, concurrent, individually
addressable, and expire on their own** — that is the actor model. Erlang's
processes + mailboxes map cleanly: a session is a process, token issue/refresh/
revoke are messages, expiry is a process timeout, and SSO is one process answering
many apps.
End-state: an Erlang-on-SX layer with the OAuth2 authorization-code + silent
(`prompt=none`) flows as message protocols, a session/grant registry, token
lifecycle (issue/refresh/revoke/introspect), and membership state — all auditable
through the event log, all authorization questions delegated to `acl-on-sx`.
## Status (rolling)
`bash lib/identity/conformance.sh`**0/0** (not yet started)
## Ground rules
- **Scope:** only `lib/identity/**` and `plans/identity-on-sx.md`. May **import**
from `lib/erlang/`, and (once they exist) `lib/persist/` + `lib/acl/`. Do not edit
substrates.
- **Architecture:** a session/grant is a process holding its own state; the
registry routes messages by subject/client id. Tokens are opaque + introspected,
not self-validating (revocation must be real). Authorization decisions are NOT
made here — `identity` proves identity, `acl` decides permission.
- **Security:** revocation is immediate (kill the process / tombstone the grant);
no decision relies on a token that outlived its grant. Negative answers are
explicit, never "absence of a yes."
- **Commits:** one feature per commit. Progress log + tick boxes.
## Architecture sketch
```
Auth request Token / session
(authorize client scope subject) {:access :refresh :expires :grant}
│ ▲
▼ │
lib/identity/oauth.sx lib/identity/token.sx
— authz-code + prompt=none flows — issue / refresh / revoke / introspect
— as Erlang message protocols — opaque tokens, grant-backed
│ ▲
▼ │
lib/identity/session.sx lib/identity/registry.sx
— session = process, expiry=timeout — route by subject/client; SSO fan-out
│ │
▼ ▼
lib/identity/api.sx ── (identity/login) (identity/grant?) (identity/revoke) ──┐
│ │
└──────── grant + audit events → persist ; permission? → acl ──────────┘
```
## Phase 1 — Sessions + tokens
- [ ] `session.sx` — session process, create/lookup/expire
- [ ] `token.sx` — issue/introspect/revoke (opaque, grant-backed)
- [ ] `registry.sx` — route by subject/client
- [ ] `api.sx` + tests + scoreboard + conformance.sh
## Phase 2 — OAuth2 flows
- [ ] authorization-code flow as a message protocol
- [ ] refresh + rotation; revocation cascades to issued tokens
- [ ] tests: full code exchange, refresh, revoke-then-use (must fail)
## Phase 3 — Silent SSO + membership
- [ ] `prompt=none` cross-app login (one session, many clients)
- [ ] membership state + per-app grant projection
- [ ] grant verification delegated cache (mirror Redis-cache pattern)
## Phase 4 — Audit + federation
- [ ] every issue/refresh/revoke is a `persist` event; `(identity/audit subject)`
- [ ] federated identity (peer-asserted subject) — advisory, trust-gated stub
- [ ] tests: audit completeness, cross-instance subject mapping
## Progress log
(loop fills this in)
## Blockers
(loop fills this in)

411
plans/persist-on-sx.md Normal file
View File

@@ -0,0 +1,411 @@
# persist-on-sx: Durable state on the SX kernel
> **DRAFT outline.** Foundation subsystem — the durable substrate the other five
> currently fake with in-memory mutable lists. Build this first.
>
> **"persist" = persistence / data store, NOT the shop.** The shop/commerce vertical
> is `commerce-on-sx`.
rose-ash needs durable state: every subsystem (feed log, flow store, mod audit,
search index, acl grants, sessions) today hand-rolls an in-memory structure that
vanishes on restart. `persist-on-sx` is the one durable substrate they share. It
lives directly on the SX kernel's IO-suspension primitives (`perform`/`cek-resume`
— the third CEK phase) so a read/write `perform`s and the kernel persists at the
boundary. Concrete storage backends are injected.
## Does it cover ALL persistence? No — and on purpose.
Event-sourcing-everything is a known trap (replay cost, event schema evolution,
awkward ad-hoc queries, 5MB images in a log). So persist owns the **durable
source-of-truth substrate**, exposed as **two facets over one backend protocol**,
with two things explicitly delegated out:
| Shape | Owner | Notes |
|-------|-------|-------|
| **Event streams** (append-only, history matters) | persist — **log facet** | feed activities, mod audit, order ledger, flow state, content edits |
| **Current-state values** (KV / document, no history) | persist — **kv facet** | profiles, stock counts, config, session blobs; also where projections materialize |
| **Snapshots / read models** (derived, queryable) | persist — projections → kv/log | rebuildable from the log; persisted so you don't replay to answer a query |
| **Blobs / large objects** (images, media) | **delegated** → content-addressed store (artdag/IPFS already) | persist stores the *reference/CID*, never the bytes |
| **Cache** (ephemeral, evictable) | **out of scope** | not persistence — different lifecycle (Redis-shaped) |
| **Ad-hoc relational query** | the subsystem, over a projected read model | the log is bad at "all orders by X in March"; project into a queryable kv/SQL backend |
So: persist is the **single durable substrate** for state that's either a stream of
changes or a current value — but it does **not** force everything into an event
log, it does **not** hold blobs (only their content-addressed refs), and it does
**not** do caching. Those boundaries are the whole point of calling it a substrate
rather than "the database."
End-state: `log` (append/read streams) + `kv` (get/put/delete by key) facets, an
injectable backend protocol (mem → file → Postgres → IPFS-ref), pure projections
with incremental snapshots, optimistic concurrency, and a subscription hook so
read models (feeds, indices, audit logs) update incrementally.
## Status (rolling)
`bash lib/persist/conformance.sh`**201/201** (Phases 14 complete + extensions + a reference migration)
## Ground rules
- **Scope:** only `lib/persist/**` and `plans/persist-on-sx.md`. May **import** the
kernel's IO-suspension surface (`perform`, platform IO ops) — verify what's
exported first. Do not add host primitives; a missing durable IO op is a Blockers
entry (it belongs in `hosts/`, out of scope).
- **Architecture:** an event is `{:stream :seq :type :at :data}`; the log is an
ordered append-only vector; a projection is `(fold step seed events)`; a kv value
is `(get/put/delete key)`. Both facets sit on one injected backend
`{:append :read :kv-get :kv-put :snapshot-read :snapshot-write}`. The in-memory
backend is the test default; real backends wire in unchanged.
- **Determinism:** replay is pure — same log → same state, always. No clocks or
randomness inside projections; time lives on the event.
- **Blobs:** store the content-address/CID and metadata; never the bytes. The blob
backend is a separate injected dependency.
- **Commits:** one feature per commit. Progress log + tick boxes.
## Architecture sketch
```
Command / write Read model / value
(append stream type data) (project stream step seed)
(kv-put key value) (kv-get key)
│ ▲
▼ │
lib/persist/event.sx lib/persist/project.sx
— {:stream :seq :type :at :data} — fold step seed; incremental from snapshot
│ ▲
▼ │
lib/persist/log.sx lib/persist/kv.sx lib/persist/snapshot.sx
— append/read — get/put/delete — checkpoint; replay = snapshot + tail
— optimistic seq — current-state
│ │ ▲
└──────────────────┴── (perform → backend) ───┘
lib/persist/backend.sx lib/persist/api.sx
— injected protocol — (persist/append) (persist/project)
— mem | file | pg | ipfs-ref — (persist/kv-get/put) (persist/subscribe)
└── blobs → content-addressed store (artdag/IPFS), by reference only
```
## Phase 1 — Log + kv + in-memory backend
- [x] `event.sx` — event record, stream/seq helpers
- [x] `backend.sx` — injectable protocol + in-memory impl (log + kv)
- [x] `log.sx``append` (optimistic seq), `read`, `read-from`
- [x] `kv.sx``get`/`put`/`delete` current-state
- [x] `api.sx` + tests + scoreboard + conformance.sh
## Phase 2 — Projections + subscriptions
- [x] `project.sx``(project stream step seed)`, incremental fold
- [x] subscription hook — projection / kv read model re-runs on append
- [x] concurrency conflict surfaced as a real result, not a crash
## Phase 3 — Snapshots + replay
- [x] `snapshot.sx` — checkpoint a projection; replay = snapshot + tail
- [x] compaction policy; replay-determinism tests
## Phase 4 — Durable backends via kernel IO
- [x] file/log backend driven through `perform` (IO-suspension boundary)
- [x] blob backend interface (store ref/CID; bytes live in artdag/IPFS)
- [x] crash/restart replay test (mock IO platform)
- [x] migration notes for swapping mem → durable under a live subsystem
### Migration notes — mem → durable under a live subsystem
The facet API takes the backend as its first argument and never names a concrete
backend, so swapping storage is a one-line change at the open site:
```
(persist/open) ; in-memory (test / ephemeral)
(persist/mock-durable (persist/mem-backend)); durable protocol, in-process disk
(persist/durable-backend) ; production: ops cross perform → host
```
Everything above the backend — `append`/`read`/`project`/`subscribe`/`snapshot`
/`compact` — is byte-identical across all three. A subsystem migrates by:
1. **Pick the seam.** The subsystem holds one backend value (today an in-memory
list). Replace its construction with `persist/open`/`durable-backend`; leave
every call site untouched.
2. **Backfill.** For an existing in-memory store, replay its current state into
the durable backend once (append historical events / `kv-put` current
values) before cutting reads over. New writes go to durable from then on.
3. **Read models rebuild themselves.** A projection is pure `(fold step seed)`;
after cutover, `persist/replay` (snapshot + tail) reconstructs every read
model from the durable log — no bespoke migration of derived state.
4. **Blobs first, by reference.** Move large payloads into the content store and
store only `persist/blob-ref`s; the log/kv stay small, so the backfill in (2)
never copies bytes.
5. **Concurrency is already handled.** Two writers racing a stream get a
`persist/conflict?` result, not corruption — the same on mem or durable, so
no new code is needed at cutover.
The only behavioural difference durable introduces is that each op crosses the
kernel IO-suspension boundary (`perform`): under the real kernel the call
suspends and the host resumes it transparently, so the facet code is unaware.
Tests prove this by routing the identical request shapes through `persist/serve`
over an in-process disk (the mock-IO harness).
## Extensions (post-roadmap)
- [x] `view.sx` — materialized views: bundle stream + fold + snapshot name;
`view-attach` keeps the snapshot current on every publish so `view-peek` is an
O(1) read. The consumer-facing read-model abstraction (feed indices, audit
rollups, search counters).
- [x] `kv.sx` CAS — `persist/kv-cas` (compare-and-swap) + `persist/kv-put-new`
(create-only): atomic current-state updates, conflict as a real value (kv
analogue of log `append-expect`). For sessions, acl grants, stock counts.
- [x] `catalog.sx` — stream catalog: `persist/streams`/`stream-count`/
`stream-exists?`/`total-events`. Backend `:streams` op (from seq high-water
marks, so compacted streams still list), threaded through mem + durable.
- [x] `query.sx` — read-side scans: `read-between` (seq range), `read-since`/
`read-window` (by `:at`), `read-by-type`, `read-where`, `count-where`. Pure
reads for audit windows / type filters / since-cursors.
- [x] `batch.sx``persist/append-batch` commits a list of `(type at data)`
specs as one contiguous block; `persist/append-batch-expect` is transactional
(all-or-nothing guarded by optimistic concurrency). For an order + its line
items as one commit.
- [x] `upcast.sx` — event schema evolution: register a pure `(event -> event)`
upcaster per type; `read-upcast`/`project-upcast` lift old events to the
current shape on read so projections see one shape. Immutable registry;
`upcast-data` helper merges new `:data` fields. Addresses the schema-evolution
trap without rewriting history.
- [x] `idempotency.sx` — exactly-once append under retries: `persist/append-once`
keyed by a caller idempotency key (per stream), returning the same event on a
repeat. Marker lives in kv, so idempotency holds across restart. `seen?` check.
- [x] `global.sx` — global commit ordering across streams (the primitive feed's
unified timeline needs). `persist/gappend` records a pointer in a reserved
`$global` index whose seq is the commit position; `read-global`/
`project-global` replay every event in commit order; `global-from` for
incremental consumers. Opt-in (plain `append` never touches it); reserved
index hidden from the public catalog. Deterministic across restart.
## Consumers (post-foundation, not in scope here)
feed/-log, flow store, mod/audit, search index, acl grants, identity sessions all
become `persist` log or kv. Track each migration in that subsystem's plan.
**Reference migration:** `lib/persist/examples/acl.sx` is a worked, tested
template — an ACL-grants store rebuilt on persist (grants/revokes as events,
current set as a projection, O(1) checks via a materialized view, an audit-window
query). It carries an explicit BEFORE (hand-rolled ephemeral map) → AFTER
diff in its header and proves the headline win (grants survive restart) on the
durable backend. Other subsystem loops copy this pattern; it does not touch the
real `lib/acl`.
## Progress log
- **Reference migration: acl grants (201/201).** `lib/persist/examples/acl.sx`
a worked, in-scope template migrating an ACL-grants store from a hand-rolled
ephemeral map to persist: grants/revokes as events, current set as a
projection, O(1) checks via a materialized view, audit via `read-window`.
Header carries the BEFORE→AFTER diff. 10 tests, incl. grants surviving restart
on the durable backend (the capability the BEFORE version lacked). The pattern
other subsystem loops copy.
- **Ext: global commit ordering (191/191).** `global.sx``persist/gappend`
records a pointer in a reserved `$global` index (its seq = global commit
position); `read-global`/`project-global` resolve pointers to events in commit
order; `global-from` for incremental global consumers. Opt-in; `$`-streams are
now reserved + hidden from the public catalog (`streams-all` reveals them).
Gives feed its cross-stream timeline. 11 tests incl. durable + restart
determinism.
- **Ext: exactly-once append (180/180).** `idempotency.sx`
`persist/append-once` appends at most once per (stream, idempotency key),
returning the same event on a repeat; the marker lives in kv so it survives
restart (verified on durable). `persist/seen?` check. 9 tests.
- **Ext: event schema evolution (171/171).** `upcast.sx` — per-type pure
`(event -> event)` upcasters in an immutable registry; `read-upcast`/
`project-upcast` lift legacy events to the current shape on read so
projections never branch on version. `upcast-data` merges new `:data` fields
keeping stream/seq/type/at. 9 tests incl. mixed old/new + durable.
- **Ext: atomic batch append (162/162).** `batch.sx``persist/append-batch`
commits `(type at data)` specs as one contiguous block (real cons-list, in
order); `persist/append-batch-expect` checks the stream is still at expected
before writing any event, so the batch is all-or-nothing under a concurrent
writer. 10 tests incl. conflict-writes-nothing + durable.
- **Ext: read-side query helpers (152/152).** `query.sx``read-between` (seq
range), `read-since`/`read-window` (by `:at`), `read-by-type`, `read-where`,
`count-where`. Pure scans over `persist/read`; for ad-hoc relational queries
consumers still project into a kv read model. 9 tests incl. durable.
- **Ext: stream catalog (143/143).** New backend op `:streams` (keys of the seq
high-water-mark dict, threaded through mem-backend + durable serve/io-backend)
so fully-compacted streams still enumerate. `catalog.sx`:
`persist/streams`/`stream-count`/`stream-exists?`/`total-events`. 10 tests
incl. durable + restart.
- **Ext: kv compare-and-swap (133/133).** `persist/kv-cas` sets a key only if
its current value equals expected, else returns `{:conflict :expected
:actual}`; `persist/kv-put-new` is create-only. The kv analogue of log
`append-expect` — atomic current-state for sessions/acl/stock. 11 tests incl.
racer + retry + durable backend.
- **Ext: materialized views (122/122).** `view.sx``persist/view` bundles
stream + step + seed + snapshot name; `view-attach` subscribes it to a hub so
every publish refreshes the snapshot incrementally; `view-peek` is then an
O(1) current read (no fold), `view-value` always folds the tail so it's never
stale. 11 tests incl. on durable backend + a sum-over-data view.
- **Phase 4c+4d (111/111) — Phase 4 complete, roadmap done.** `recovery.sx` — a
6-test crash/restart integration: an order ledger (event log + subscription
kv read model + snapshot + compaction + invoice blob ref) over the durable
backend, where "crash" drops every in-process object and "restart" rebuilds
over the same disk + content store. Log, read model, snapshot, compacted
replay, and blob ref all survive; seq continues; two restarts converge
(determinism). Migration notes (mem → durable under a live subsystem) added
inline above.
- **Phase 4b (105/105).** `blob.sx` — large objects stay out of persist. A blob
ref is `{:cid :size :mime}`; the blob store is a SEPARATE injected dependency
(`persist/blob-io` over an injectable transport, perform in prod / mock
content store in tests). `persist/blob-store` puts bytes and returns ONLY the
ref; `persist/blob-fetch` retrieves bytes via the ref. Mock store is
content-addressed (same bytes dedupe). 14 tests assert the invariant: a ref in
the log/kv carries the CID, never the bytes (`has-key? :bytes` is false).
- **Phase 4a (91/91).** `durable.sx` — a backend whose every op crosses the
kernel IO boundary via `(perform {:op "persist/..." :args (...)})`. The
transport is injectable: `persist/durable-backend` uses the kernel's
`perform` (suspends; host resumes); `persist/mock-durable` uses
`persist/serve` over an in-memory disk. `persist/serve` is the reference host
+ the mock-IO harness. Because the request shapes are identical, the ENTIRE
facet stack (log/kv/project/snapshot/compaction) runs unchanged on
mock-durable — verified. Crash/restart (drop backend, keep disk) recovers log
+ kv + snapshot by replay; seq counter continues. 15 tests. See Blockers for
why end-to-end perform suspension isn't exercised under sx_server.exe.
- **Phase 3b (76/76) — Phase 3 complete.** Backend refactor: `last-seq` is now
a monotonic per-stream high-water mark (backend `seqs` dict), not physical
length, so a compacted log keeps assigning climbing seqs. Added backend
`:truncate-through` + `persist/truncate`. `compaction.sx``persist/compact`
checkpoints then drops events with seq <= snapshot seq; `should-compact?`/
`maybe-compact` give an explicit "compact every N tail events" policy. 11
tests: post-compaction replay value == uncompacted full replay (determinism),
seq continuity after truncation, idempotence. `persist/count` = physical
stored count (shrinks on compaction) vs `persist/last-seq` = logical.
- **Phase 3a (65/65).** `snapshot.sx` — a snapshot is a projection state
`{:value :seq}` stored in the kv facet under `snapshot/<name>`.
`persist/checkpoint` replays + saves; `persist/replay` = snapshot + tail.
11 tests assert the headline both ways: snapshot+tail == full replay (value
and whole state), plus replay determinism.
- **Phase 2c (54/54) — Phase 2 complete.** `concurrency.sx` — optimistic
concurrency: `persist/append-expect b stream expected ...` refuses the append
if the stream advanced past `expected`, returning a conflict VALUE
`{:conflict true :expected :actual}` (never a crash, never a silent
overwrite). `persist/conflict?` + accessors; caller re-reads actual and
retries. 8 tests incl. two-writer race + retry.
- **Phase 2b (46/46).** `subscribe.sx``persist/hub` wraps a backend with
per-stream callbacks. `persist/publish` appends then fires subscribers
`(backend stream event)`; direct `persist/append` bypasses them by design
(bulk load/replay). Canonical use: callback re-runs `project-resume` or bumps
a kv counter so read models update on write. 9 tests.
- **Phase 2a (37/37).** `project.sx` — projection state `{:value :seq}`;
`persist/project` folds whole stream from seed, `persist/project-resume`
folds only the tail (seq > prior seq) so read models update incrementally.
step is pure `(value event) -> value`. 9 tests incl. resume==full-from-zero.
- **Phase 1 complete (28/28).** `event.sx` (event record + accessors),
`backend.sx` (injectable protocol + in-memory log/kv impl, closure state via
set!), `log.sx` (append/read/read-from, sequential per-stream seq, stream
isolation), `kv.sx` (get/put/delete/has?/keys/get-or/update), `api.sx`
(`persist/open` — mem default, backend injectable). conformance.sh + three
suites (event/log/kv). Gotcha logged in Blockers: `map` returns an
array-backed list not `equal?` to a `(list ...)` literal — assertions build
compared lists with list/nth.
## Blockers
### OPEN — host durable-storage adapter (the only gap to real durability)
**Owner:** a `hosts/` loop (NOT this one — `lib/persist/**` is the scope fence,
and `sx_build` is forbidden here). **Without it, durable persistence silently
drops all writes.**
**Symptom / minimal repro.** `persist/durable-backend` performs
`{:op "persist/..." :args (...)}` for every storage op. Under `sx_server.exe`
the kernel's default IO resolver answers unknown ops with `nil` — so the durable
backend does not error, it *silently no-ops*:
```
; load event/backend/log/durable, then:
(let ((b (persist/durable-backend)))
(begin (persist/append b "s" "x" 0 {})
(persist/append b "s" "x" 0 {})
(list (persist/event-seq (persist/append b "s" "x" 0 {}))
(persist/count b "s")
(persist/read b "s"))))
; => (1 0 nil) ; every append gets seq 1, nothing stored, reads empty — DATA LOSS
```
The in-memory backend (`persist/open`) is correct and complete; this gap is
*only* the production transport.
**What to build.** A host servicer that answers the `persist/*` IO ops against a
real store (sqlite/files/pg). It is the production twin of `persist/serve`
(`lib/persist/durable.sx`) — same op names, same request/response shapes — so
mirror that function and back it with durable storage instead of a mem-backend.
**Op contract** (request `{:op :args}` → response). `args` is a positional list;
events are dicts `{:stream :seq :type :at :data}`:
| op | args | returns | semantics |
|----|------|---------|-----------|
| `persist/append` | `(stream event)` | (ignored) | store `event` in `stream` |
| `persist/read` | `(stream)` | event list (oldest-first) | currently-stored events |
| `persist/last-seq` | `(stream)` | number | **monotonic high-water mark** (see below) |
| `persist/streams` | `()` | stream-name list | every stream ever appended to |
| `persist/truncate` | `(stream n)` | (ignored) | drop events with `seq <= n` |
| `persist/kv-get` | `(key)` | value or nil | |
| `persist/kv-put` | `(key val)` | (ignored) | upsert |
| `persist/kv-delete`| `(key)` | (ignored) | remove key |
| `persist/kv-has?` | `(key)` | boolean | |
| `persist/kv-keys` | `()` | key list | |
**Hard invariants** (the facets above rely on these; mem-backend + `persist/serve`
are the reference):
1. **`last-seq` is a per-stream monotonic counter, NOT the row count.** It must
keep climbing after `truncate`, so a compacted stream never reassigns a seq.
Store the counter separately from the rows.
2. `append` is the only seq-assigner upstream (`log.sx` does `last-seq + 1`); the
host must not renumber.
3. `read` returns events in append order with `:seq` intact (post-truncate it
returns only the surviving tail).
4. `streams` is the set of streams that ever had an append (survives full
compaction) — keep it keyed off the seq counters, like mem-backend's `seqs`.
5. Values round-trip structurally: dicts/lists/numbers/strings/nil/booleans in =
same out (event `:data`, kv values, blob refs).
**Blobs** are a *separate* adapter with the same pattern: ops `blob/put`
`(bytes mime)` → cid, `blob/get` `(cid)` → bytes, `blob/has?` `(cid)` → bool
(see `lib/persist/blob.sx` / `persist/blob-serve`). Back it with the
content-addressed store (artdag/IPFS); persist only ever stores the returned ref.
**Where to register.** `hosts/ocaml/bin/sx_server.ml`:
- the in-process resolver `Sx_types._cek_io_resolver` (~line 3864) — add a
`"persist/..."` match arm dispatching to the new storage module (used by
SSR/`eval_with_io`); and/or
- the bridge path in `cek_run_with_io` (~line 528576), which currently forwards
unknown ops via `io_request op args` to the external bridge — a Python-bridge
handler is the alternative home if storage lives Python-side.
Pick one home; the op names are the contract, not the location.
**Acceptance test.** Swap the transport: point a `persist/io-backend` at the new
host servicer (instead of `persist/serve` over a mem disk) and run the existing
`durable` + `recovery` suites — they must stay green, and state must survive an
actual process restart (kill the server, restart, replay → recovered). That is
exactly what `lib/persist/tests/durable.sx` and `recovery.sx` already assert
against the mock; the host adapter just makes the disk real.
---
- **Phase 4 perform-suspension not exercised end-to-end under sx_server.exe (by
design, not a bug).** The CEK suspension primitives (`cek-step-loop`,
`cek-resume`, `cek-suspended?`, `cek-io-request`) and a settable SX-level IO
hook are only bound by the `run_tests` OCaml binary (out of scope: hosts/, and
sx_build is forbidden). Under `sx_server.exe`, an unhandled `perform` resolves
through the OCaml io-request/io-response stdin bridge (production path) — not
callable from the pure-eval conformance harness. Resolution: the durable
backend's transport is injectable, so the production path is one line
`(perform req)` (kernel-handled) and ALL durable logic is tested through the
mock transport (`persist/serve` over an in-memory disk). The single untested
line is the kernel primitive itself. No host primitive needed; nothing to fix.
- **Not a blocker, a testing convention:** `map` returns an array-backed list
that is NOT `equal?` to a `(list ...)` cons-literal (two `map` results do
compare equal to each other). When asserting list-shaped results against a
`(list ...)` literal, build the compared value with `list`/`nth`/`cons`, not
`map`. `into`/list-coercion needs the IO bridge and is unusable in the
pure-eval harness.