;; content-on-sx — durable collaborative replication: CRDT ops on persist. ;; ;; Each replica appends its CRDT ops to its own persist stream ;; (crdt::). Any node reconstructs the converged document by ;; replaying every replica's log into a CvRDT state and merging them. Because ;; the merge is a join and crdt-apply is order/duplicate-insensitive, the ;; converged result is identical regardless of replica order or re-delivery — ;; the durable log + CRDT give offline-capable, eventually-consistent editing. ;; ;; Requires (loaded by harness): crdt.sx (+ deps) and persist ;; (event/backend/log/kv/api). Backend `b` injected via (persist/open). (define crdt/-stream (fn (doc-id replica) (str "crdt:" doc-id ":" replica))) ;; ── commit ops to a replica's durable log ── (define crdt/commit! (fn (b doc-id replica op at) (persist/append b (crdt/-stream doc-id replica) (get op :op) at op))) (define crdt/commit-all! (fn (b doc-id replica ops at) (if (= (len ops) 0) nil (begin (crdt/commit! b doc-id replica (first ops) at) (crdt/commit-all! b doc-id replica (rest ops) at))))) ;; ── read a replica's log ── (define crdt/log (fn (b doc-id replica) (persist/read b (crdt/-stream doc-id replica)))) (define crdt/replica-ops (fn (b doc-id replica) (map (fn (ev) (persist/event-data ev)) (crdt/log b doc-id replica)))) (define crdt/replica-version (fn (b doc-id replica) (persist/last-seq b (crdt/-stream doc-id replica)))) ;; ── replay one replica's log into a CvRDT state ── (define crdt/replay (fn (b doc-id replica) (crdt-apply-all (crdt-empty) (crdt/replica-ops b doc-id replica)))) ;; ── converge: merge every replica's replayed state ── (define crdt/converge (fn (b doc-id replicas) (crdt-merge-all (map (fn (r) (crdt/replay b doc-id r)) replicas)))) ;; ── converged, materialised document ── (define crdt/document (fn (b doc-id replicas) (crdt-materialize doc-id (crdt/converge b doc-id replicas)))) (define crdt/order (fn (b doc-id replicas) (crdt-order (crdt/converge b doc-id replicas))))