H7: adjacency streams — per-(node,kind) edge reads, no more full kv scans (TDD)

Failing tests first (2 red: relate! wrote no adjacency streams). Every edge write now maintains
per-pair event streams (rel:src|kind ← {:dst}, rin:dst|kind ← {:src}); host/blog-out/-in/--out-raw
(+ new --in-raw) fold ONLY the pair's stream — O(edges of that node under that kind) instead of
O(all kv keys) per read. Append-only ⇒ no read-modify-write race (duplicate :adds fold to a set).
The edge:* kv rows remain (whole-graph consumers: subtype-closure, relations admin block) and feed
host/blog-reindex-edges! — the idempotent boot migration serve.sh now runs, so pre-H7 live stores
read correctly. Collapsed host/blog--add-edge-kv! into add-edge! (the type-algebra conj/disj edges
were bypassing the streams — caught by the existing algebra tests going red).

blog suite 256/256 (+6); FULL conformance 658/658.

Co-Authored-By: Claude Opus 4.8 <noreply@anthropic.com>
This commit is contained in:
2026-07-03 10:53:32 +00:00
parent f8b96b3d81
commit 99401ae21e
3 changed files with 78 additions and 24 deletions

View File

@@ -550,12 +550,48 @@
;; separate: its own "type:id" nodes in lib/relations, untouched by this.) ;; separate: its own "type:id" nodes in lib/relations, untouched by this.)
(define host/blog--edge-key (fn (src kind dst) (str "edge:" src "|" kind "|" dst))) (define host/blog--edge-key (fn (src kind dst) (str "edge:" src "|" kind "|" dst)))
;; H7: per-(node,kind) ADJACENCY STREAMS — the hot reads (out/in/out-raw) fold ONLY the pair's
;; event stream (append-only: no read-modify-write race; a duplicate :add folds to a set, so
;; concurrent writers are benign). The edge:* kv rows are still written — they feed the two
;; whole-graph consumers (subtype-closure, the relations admin block) and legacy-store migration
;; (host/blog-reindex-edges! at boot).
(define host/blog--rel-stream (fn (src kind) (str "rel:" src "|" kind)))
(define host/blog--rin-stream (fn (dst kind) (str "rin:" dst "|" kind)))
(define host/blog--adj-fold
(fn (stream field)
(reduce (fn (acc e)
(let ((v (get (persist/event-data e) field)))
(cond ((= (persist/event-type e) :add)
(if (contains? acc v) acc (concat acc (list v))))
((= (persist/event-type e) :del)
(filter (fn (x) (not (= x v))) acc))
(else acc))))
(list) (persist/read host/blog-store stream))))
(define host/blog--adj-out (fn (src kind) (host/blog--adj-fold (host/blog--rel-stream src kind) "dst")))
(define host/blog--adj-in (fn (dst kind) (host/blog--adj-fold (host/blog--rin-stream dst kind) "src")))
(define host/blog--add-edge! (define host/blog--add-edge!
(fn (src dst kind) (fn (src dst kind)
(persist/backend-kv-put host/blog-store (host/blog--edge-key src kind dst) 1))) (begin
(persist/backend-kv-put host/blog-store (host/blog--edge-key src kind dst) 1)
(when (not (contains? (host/blog--adj-out src kind) dst))
(persist/append host/blog-store (host/blog--rel-stream src kind) :add 0 {"dst" dst}))
(when (not (contains? (host/blog--adj-in dst kind) src))
(persist/append host/blog-store (host/blog--rin-stream dst kind) :add 0 {"src" src})))))
(define host/blog--del-edge! (define host/blog--del-edge!
(fn (src dst kind) (fn (src dst kind)
(persist/backend-kv-delete host/blog-store (host/blog--edge-key src kind dst)))) (begin
(persist/backend-kv-delete host/blog-store (host/blog--edge-key src kind dst))
(when (contains? (host/blog--adj-out src kind) dst)
(persist/append host/blog-store (host/blog--rel-stream src kind) :del 0 {"dst" dst}))
(when (contains? (host/blog--adj-in dst kind) src)
(persist/append host/blog-store (host/blog--rin-stream dst kind) :del 0 {"src" src})))))
;; MIGRATION: rebuild the adjacency streams from legacy edge:* rows (idempotent — add-edge! skips
;; already-indexed pairs). serve.sh calls this once at boot so pre-H7 stores read correctly.
(define host/blog-reindex-edges!
(fn ()
(for-each (fn (e) (host/blog--add-edge! (get e :src) (get e :dst) (get e :kind)))
(host/blog--all-edges))))
;; A symmetric kind writes both directions, so children alone read it from either ;; A symmetric kind writes both directions, so children alone read it from either
;; side; a directed kind writes one edge (the inverse is host/blog-in). ;; side; a directed kind writes one edge (the inverse is host/blog-in).
@@ -612,29 +648,20 @@
(map host/blog--parse-edge-key (persist/backend-kv-keys host/blog-store))))) (map host/blog--parse-edge-key (persist/backend-kv-keys host/blog-store)))))
;; outgoing targets / incoming sources of `slug` under `kind`, as existing slugs. ;; outgoing targets / incoming sources of `slug` under `kind`, as existing slugs.
;; H7: out/in read the pair's ADJACENCY STREAM — O(edges of this node under this kind), not a
;; full kv scan. out/in filter to existing local slugs; the -raw variants keep cross-domain refs.
(define host/blog-out (define host/blog-out
(fn (slug kind) (fn (slug kind)
(let ((existing (host/blog-slugs))) (let ((existing (host/blog-slugs)))
(filter (fn (s) (contains? existing s)) (filter (fn (s) (contains? existing s)) (host/blog--adj-out slug kind)))))
(reduce (fn (acc e)
(if (and (= (get e :src) slug) (= (get e :kind) kind))
(concat acc (list (get e :dst))) acc))
(list) (host/blog--all-edges))))))
;; unfiltered outgoing edges — includes CROSS-DOMAIN targets (a post/order on another peer, which ;; unfiltered outgoing edges — includes CROSS-DOMAIN targets (a post/order on another peer, which
;; isn't a local slug so host/blog-out would drop it). Used for federated links (allocated, sold). ;; isn't a local slug so host/blog-out would drop it). Used for federated links (allocated, sold).
(define host/blog--out-raw (define host/blog--out-raw (fn (slug kind) (host/blog--adj-out slug kind)))
(fn (slug kind) (define host/blog--in-raw (fn (slug kind) (host/blog--adj-in slug kind)))
(reduce (fn (acc e)
(if (and (= (get e :src) slug) (= (get e :kind) kind)) (concat acc (list (get e :dst))) acc))
(list) (host/blog--all-edges))))
(define host/blog-in (define host/blog-in
(fn (slug kind) (fn (slug kind)
(let ((existing (host/blog-slugs))) (let ((existing (host/blog-slugs)))
(filter (fn (s) (contains? existing s)) (filter (fn (s) (contains? existing s)) (host/blog--adj-in slug kind)))))
(reduce (fn (acc e)
(if (and (= (get e :dst) slug) (= (get e :kind) kind))
(concat acc (list (get e :src))) acc))
(list) (host/blog--all-edges))))))
;; back-compat: "related posts" is just the symmetric "related" kind. ;; back-compat: "related posts" is just the symmetric "related" kind.
(define host/blog-related (fn (slug) (host/blog-out slug "related"))) (define host/blog-related (fn (slug) (host/blog-out slug "related")))
@@ -714,25 +741,22 @@
;; operand edges live in the KV ONLY (read back via host/blog-out), NOT in lib/relations: ;; operand edges live in the KV ONLY (read back via host/blog-out), NOT in lib/relations:
;; conj/disj are structural, and feeding extra kinds into the Datalog graph blows up its ;; conj/disj are structural, and feeding extra kinds into the Datalog graph blows up its
;; per-query re-saturation. host/blog-load-edges! skips them on replay for the same reason. ;; per-query re-saturation. host/blog-load-edges! skips them on replay for the same reason.
(define host/blog--add-edge-kv!
(fn (src dst kind)
(persist/backend-kv-put host/blog-store (host/blog--edge-key src kind dst) 1)))
(define host/blog-make-and! (define host/blog-make-and!
(fn (t a b) (fn (t a b)
(begin (begin
(host/blog-seed! t t (host/blog-seed! t t
(str "(article (h1 \"" t "\") (p \"An intersection type (" a " ∧ " b ") — its instances are exactly those that are instances of BOTH.\"))") (str "(article (h1 \"" t "\") (p \"An intersection type (" a " ∧ " b ") — its instances are exactly those that are instances of BOTH.\"))")
"published") "published")
(host/blog--add-edge-kv! t a "conj") (host/blog--add-edge! t a "conj")
(host/blog--add-edge-kv! t b "conj")))) (host/blog--add-edge! t b "conj"))))
(define host/blog-make-or! (define host/blog-make-or!
(fn (t a b) (fn (t a b)
(begin (begin
(host/blog-seed! t t (host/blog-seed! t t
(str "(article (h1 \"" t "\") (p \"A union type (" a " " b ") — its instances are those that are instances of EITHER.\"))") (str "(article (h1 \"" t "\") (p \"A union type (" a " " b ") — its instances are those that are instances of EITHER.\"))")
"published") "published")
(host/blog--add-edge-kv! t a "disj") (host/blog--add-edge! t a "disj")
(host/blog--add-edge-kv! t b "disj")))) (host/blog--add-edge! t b "disj"))))
;; the EXTENT of a type expression: operands' extents combined by set ops (recursive). ;; the EXTENT of a type expression: operands' extents combined by set ops (recursive).
;; A plain type (no operands) falls through to its instances. ;; A plain type (no operands) falls through to its instances.
(define host/blog-instances-of-expr (define host/blog-instances-of-expr

View File

@@ -304,6 +304,10 @@ EPOCH=1
echo "(eval \"(host/blog--set-actor! \\\"${SX_ACTOR:-site}\\\" \\\"${SX_SELF_URL:-}\\\")\")" echo "(eval \"(host/blog--set-actor! \\\"${SX_ACTOR:-site}\\\" \\\"${SX_SELF_URL:-}\\\")\")"
EPOCH=$((EPOCH+1)) EPOCH=$((EPOCH+1))
echo "(epoch $EPOCH)" echo "(epoch $EPOCH)"
# H7: migrate legacy edge:* rows into the per-pair adjacency streams (idempotent).
echo "(eval \"(host/blog-reindex-edges!)\")"
EPOCH=$((EPOCH+1))
echo "(epoch $EPOCH)"
echo "(eval \"(host/blog-load-followers!)\")" echo "(eval \"(host/blog-load-followers!)\")"
EPOCH=$((EPOCH+1)) EPOCH=$((EPOCH+1))
echo "(epoch $EPOCH)" echo "(epoch $EPOCH)"

View File

@@ -1534,6 +1534,32 @@
(len (filter (fn (e) (= (get e "verb") "h6-ping")) host/blog--flow-log))) (len (filter (fn (e) (= (get e "verb") "h6-ping")) host/blog--flow-log)))
2) 2)
;; ── HARDENING H7: adjacency STREAMS — per-(node,kind) reads, not full kv scans ────────────
;; relate!/unrelate! maintain per-pair event streams (rel:src|kind ← {:dst}, rin:dst|kind ←
;; {:src}); out/in/out-raw fold ONLY the pair's stream (O(edges of that node), not O(all keys)).
;; Legacy stores migrate via host/blog-reindex-edges! at boot.
(host/blog-use-store! (persist/open))
(host/blog-put! "h7a" "a" "(article (h1 \"a\"))" "published")
(host/blog-put! "h7b" "b" "(article (h1 \"b\"))" "published")
(host/blog-relate! "h7a" "h7b" "h7k")
(host-bl-test "H7: relate! writes the forward adjacency stream rel:src|kind"
(>= (len (persist/read host/blog-store "rel:h7a|h7k")) 1) true)
(host-bl-test "H7: relate! writes the reverse adjacency stream rin:dst|kind"
(>= (len (persist/read host/blog-store "rin:h7b|h7k")) 1) true)
(host-bl-test "H7: out-raw reads through (regression)"
(host/blog--out-raw "h7a" "h7k") (list "h7b"))
(host-bl-test "H7: in reads through (regression)"
(host/blog-in "h7b" "h7k") (list "h7a"))
(host-bl-test "H7: unrelate! removes from both directions (regression)"
(begin (host/blog-unrelate! "h7a" "h7b" "h7k")
(list (host/blog--out-raw "h7a" "h7k") (host/blog-in "h7b" "h7k")))
(list (list) (list)))
(host-bl-test "H7: re-relate after remove works (add/del/add folds correctly)"
(begin (host/blog-relate! "h7a" "h7b" "h7k")
(host/blog--out-raw "h7a" "h7k"))
(list "h7b"))
(define (define
host-bl-tests-run! host-bl-tests-run!
(fn () (fn ()