From 8ba3584556574f0809e8aa8fb63ed83cfc2999a4 Mon Sep 17 00:00:00 2001 From: giles Date: Sun, 7 Jun 2026 02:37:53 +0000 Subject: [PATCH] =?UTF-8?q?fed-sx-m2:=20Step=208c=20=E2=80=94=20delivery-s?= =?UTF-8?q?tate=20projection=20+=2014=20tests?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit New next/kernel/delivery_state.erl folds delivery events into a per-peer worker-shaped snapshot so the outbound queue survives kernel restart. Event proplist shapes: [{type, enqueued}, {peer, _}, {activity, _}] [{type, delivered}, {peer, _}, {cid, _}] [{type, failed}, {peer, _}, {cid, _}, {now, _}] [{type, dead_lettered}, {peer, _}, {cid, _}] Projection state shape: [{PeerId, [{peer, _}, {pending, _}, {attempts, _}, {next_retry, _}, {dead_letter, _}]}, ...] Mirrors delivery_worker:new/1 (minus :dispatch_fn — that's the live worker's concern) so a fresh gen_server can be hydrated from the projection on restart. Public API: new/0 fold/2, fold_fn/0 peer_state/2, peers/1 pending/2, attempts/2, next_retry/2, dead_letter/2 The failed branch calls delivery_worker:backoff_for/1 directly, so the projection and the live worker compute identical retry slots and dead-letter thresholds. 6th failure -> dead-letter, matching the worker. 14/14 in next/tests/delivery_state.sh covering: - new/0 -> [] - enqueued appends to pending (FIFO) - two peers maintain independent queues - delivered clears matching pending entry - failed bumps :attempts and sets :next_retry - 6th failed -> dead-lettered (activity out of pending) - explicit dead_lettered event moves activity to dead_letter - peers/1 lists touched peers - peer_state {ok, _} | not_found - fold_fn/0 is fun/2 for projection:start_link - unknown event type passes through - delivered after failed clears retry state delivery_worker.sh 17/17 unchanged, delivery_retry.sh 11/11 unchanged. Conformance preserved at 761/761. The restart hydration helper (delivery_worker:state_from_proj/2 or similar) lands once 8b-timer can wire the live retry loop (Blockers #3 — erlang:send_after substrate gap still open). --- next/kernel/delivery_state.erl | 209 +++++++++++++++++++++++++++++++++ next/tests/delivery_state.sh | 139 ++++++++++++++++++++++ plans/fed-sx-milestone-2.md | 36 +++++- 3 files changed, 380 insertions(+), 4 deletions(-) create mode 100644 next/kernel/delivery_state.erl create mode 100755 next/tests/delivery_state.sh diff --git a/next/kernel/delivery_state.erl b/next/kernel/delivery_state.erl new file mode 100644 index 00000000..29cec2ad --- /dev/null +++ b/next/kernel/delivery_state.erl @@ -0,0 +1,209 @@ +-module(delivery_state). +-export([new/0, fold/2, fold_fn/0, + peer_state/2, peers/1, + pending/2, attempts/2, next_retry/2, dead_letter/2]). + +%% Delivery-state projection. Folds delivery events (enqueue / +%% delivered / failed / dead_lettered) into a per-peer worker-shaped +%% snapshot so the outbound queue survives kernel restart. Per design +%% §13.4 the worker state on restart is loaded from this projection +%% rather than reconstructed by re-driving the outbox log. +%% +%% Event proplist shape: +%% [{type, enqueued}, {peer, _}, {activity, _}] +%% [{type, delivered}, {peer, _}, {cid, _}] +%% [{type, failed}, {peer, _}, {cid, _}, {now, _}] +%% [{type, dead_lettered}, {peer, _}, {cid, _}] +%% +%% Projection state shape: +%% [{PeerId, WorkerProplist}, ...] +%% +%% WorkerProplist mirrors `delivery_worker:new/1`'s output so a fresh +%% gen_server can be hydrated with `delivery_worker:state_from_proj` +%% (lands when 8b-timer wires up). For Step 8c the projection only +%% tracks data — Step 8d-restart will wire the hydration helper. + +new() -> []. + +fold_fn() -> + fun (Event, State) -> fold(Event, State) end. + +fold(Event, State) -> + case envelope:get_field(type, Event) of + {ok, enqueued} -> fold_enqueued(Event, State); + {ok, delivered} -> fold_delivered(Event, State); + {ok, failed} -> fold_failed(Event, State); + {ok, dead_lettered} -> fold_dead_lettered(Event, State); + _ -> State + end. + +fold_enqueued(Event, State) -> + case {envelope:get_field(peer, Event), + envelope:get_field(activity, Event)} of + {{ok, Peer}, {ok, Act}} -> + Worker = ensure_peer(Peer, State), + Pending = field(pending, Worker), + Worker1 = set_field(pending, Pending ++ [Act], Worker), + set_peer(Peer, Worker1, State); + _ -> State + end. + +fold_delivered(Event, State) -> + case {envelope:get_field(peer, Event), + envelope:get_field(cid, Event)} of + {{ok, Peer}, {ok, Cid}} -> + case find_keyed(Peer, State) of + {ok, Worker} -> + Worker1 = drop_pending_by_cid(Cid, Worker), + Worker2 = clear_retry_for(Cid, Worker1), + set_peer(Peer, Worker2, State); + _ -> State + end; + _ -> State + end. + +fold_failed(Event, State) -> + case {envelope:get_field(peer, Event), + envelope:get_field(cid, Event), + envelope:get_field(now, Event)} of + {{ok, Peer}, {ok, Cid}, {ok, Now}} -> + case find_keyed(Peer, State) of + {ok, Worker} -> + Attempts = field(attempts, Worker), + Current = case find_keyed(Cid, Attempts) of + {ok, N} -> N; + _ -> 0 + end, + New = Current + 1, + Attempts1 = set_keyed(Cid, New, Attempts), + Worker1 = set_field(attempts, Attempts1, Worker), + Worker2 = case delivery_worker:backoff_for(New) of + dead_letter -> + dead_letter_pending(Cid, Worker1); + Seconds -> + NR = field(next_retry, Worker1), + NextAt = Now + Seconds, + set_field(next_retry, set_keyed(Cid, NextAt, NR), Worker1) + end, + set_peer(Peer, Worker2, State); + _ -> State + end; + _ -> State + end. + +fold_dead_lettered(Event, State) -> + case {envelope:get_field(peer, Event), + envelope:get_field(cid, Event)} of + {{ok, Peer}, {ok, Cid}} -> + case find_keyed(Peer, State) of + {ok, Worker} -> + set_peer(Peer, dead_letter_pending(Cid, Worker), State); + _ -> State + end; + _ -> State + end. + +%% ── Accessors ───────────────────────────────────────────────── + +peer_state(Peer, State) -> + case find_keyed(Peer, State) of + {ok, Worker} -> {ok, Worker}; + _ -> not_found + end. + +peers(State) -> [P || {P, _} <- State]. + +pending(Peer, State) -> + worker_field(Peer, pending, State, []). + +attempts(Peer, State) -> + worker_field(Peer, attempts, State, []). + +next_retry(Peer, State) -> + worker_field(Peer, next_retry, State, []). + +dead_letter(Peer, State) -> + worker_field(Peer, dead_letter, State, []). + +%% ── Internal ────────────────────────────────────────────────── + +worker_field(Peer, Field, State, Default) -> + case find_keyed(Peer, State) of + {ok, Worker} -> + case find_keyed(Field, Worker) of + {ok, V} -> V; + _ -> Default + end; + _ -> Default + end. + +ensure_peer(Peer, State) -> + case find_keyed(Peer, State) of + {ok, Worker} -> Worker; + _ -> empty_worker(Peer) + end. + +empty_worker(Peer) -> + [{peer, Peer}, + {pending, []}, + {attempts, []}, + {next_retry, []}, + {dead_letter, []}]. + +set_peer(Peer, Worker, State) -> + set_keyed(Peer, Worker, State). + +drop_pending_by_cid(Cid, Worker) -> + Pending = field(pending, Worker), + Kept = [A || A <- Pending, activity_cid(A) =/= Cid], + set_field(pending, Kept, Worker). + +clear_retry_for(Cid, Worker) -> + A1 = del_keyed(Cid, field(attempts, Worker)), + NR1 = del_keyed(Cid, field(next_retry, Worker)), + set_field(attempts, A1, set_field(next_retry, NR1, Worker)). + +dead_letter_pending(Cid, Worker) -> + Pending = field(pending, Worker), + {Match, Rest} = split_by_cid(Cid, Pending), + DL = field(dead_letter, Worker), + Worker1 = set_field(pending, Rest, Worker), + Worker2 = case Match of + none -> Worker1; + Act -> set_field(dead_letter, DL ++ [Act], Worker1) + end, + clear_retry_for(Cid, Worker2). + +split_by_cid(Cid, List) -> split_by_cid(Cid, List, []). +split_by_cid(_, [], Acc) -> {none, lists:reverse(Acc)}; +split_by_cid(Cid, [A | Rest], Acc) -> + case activity_cid(A) of + Cid -> {A, lists:reverse(Acc) ++ Rest}; + _ -> split_by_cid(Cid, Rest, [A | Acc]) + end. + +activity_cid(Activity) -> + case envelope:get_field(id, Activity) of + {ok, Cid} -> Cid; + _ -> nil + end. + +field(K, [{K, V} | _]) -> V; +field(K, [_ | Rest]) -> field(K, Rest); +field(_, []) -> undefined. + +set_field(K, V, []) -> [{K, V}]; +set_field(K, V, [{K, _} | Rest]) -> [{K, V} | Rest]; +set_field(K, V, [P | Rest]) -> [P | set_field(K, V, Rest)]. + +find_keyed(_, []) -> {error, not_found}; +find_keyed(K, [{K, V} | _]) -> {ok, V}; +find_keyed(K, [_ | Rest]) -> find_keyed(K, Rest). + +set_keyed(K, V, []) -> [{K, V}]; +set_keyed(K, V, [{K, _} | Rest]) -> [{K, V} | Rest]; +set_keyed(K, V, [P | Rest]) -> [P | set_keyed(K, V, Rest)]. + +del_keyed(_, []) -> []; +del_keyed(K, [{K, _} | Rest]) -> Rest; +del_keyed(K, [P | Rest]) -> [P | del_keyed(K, Rest)]. diff --git a/next/tests/delivery_state.sh b/next/tests/delivery_state.sh new file mode 100755 index 00000000..c2f94ae8 --- /dev/null +++ b/next/tests/delivery_state.sh @@ -0,0 +1,139 @@ +#!/usr/bin/env bash +# next/tests/delivery_state.sh — m2 Step 8c test. +# +# Delivery-state projection: folds enqueue / delivered / failed / +# dead_lettered events into a per-peer worker-shaped snapshot so +# the outbound queue survives kernel restart. + +set -uo pipefail +cd "$(git rev-parse --show-toplevel)" + +SX_SERVER="${SX_SERVER:-hosts/ocaml/_build/default/bin/sx_server.exe}" +if [ ! -x "$SX_SERVER" ]; then + SX_SERVER="/root/rose-ash/hosts/ocaml/_build/default/bin/sx_server.exe" +fi +if [ ! -x "$SX_SERVER" ]; then + echo "ERROR: sx_server.exe not found." >&2 + exit 1 +fi + +VERBOSE="${1:-}" +PASS=0; FAIL=0; ERRORS="" +TMPFILE=$(mktemp); trap "rm -f $TMPFILE" EXIT + +SETUP='Act1 = [{id, <<1>>}, {type, note}, {actor, alice}], Act2 = [{id, <<2>>}, {type, note}, {actor, alice}], E_Enq1 = [{type, enqueued}, {peer, bob}, {activity, Act1}], E_Enq2 = [{type, enqueued}, {peer, bob}, {activity, Act2}], E_Enq2Carol = [{type, enqueued}, {peer, carol}, {activity, Act2}], E_Del1 = [{type, delivered}, {peer, bob}, {cid, <<1>>}], E_Fail1 = [{type, failed}, {peer, bob}, {cid, <<1>>}, {now, 1000}],' + +cat > "$TMPFILE" < [] +(epoch 10) +(eval "(get (erlang-eval-ast \"delivery_state:new() =:= []\") :name)") + +;; enqueued event creates a peer entry and appends to pending +(epoch 11) +(eval "(get (erlang-eval-ast \"${SETUP} S = delivery_state:fold(E_Enq1, delivery_state:new()), delivery_state:pending(bob, S) =:= [Act1]\") :name)") + +;; Two enqueues to same peer -> FIFO order +(epoch 12) +(eval "(get (erlang-eval-ast \"${SETUP} S = delivery_state:fold(E_Enq2, delivery_state:fold(E_Enq1, delivery_state:new())), delivery_state:pending(bob, S) =:= [Act1, Act2]\") :name)") + +;; Enqueues to different peers -> independent queues +(epoch 13) +(eval "(get (erlang-eval-ast \"${SETUP} S = delivery_state:fold(E_Enq2Carol, delivery_state:fold(E_Enq1, delivery_state:new())), {delivery_state:pending(bob, S), delivery_state:pending(carol, S)} =:= {[Act1], [Act2]}\") :name)") + +;; delivered event clears the matching pending entry +(epoch 14) +(eval "(get (erlang-eval-ast \"${SETUP} S = delivery_state:fold(E_Del1, delivery_state:fold(E_Enq1, delivery_state:new())), delivery_state:pending(bob, S) =:= []\") :name)") + +;; failed event bumps attempts and sets next_retry +(epoch 15) +(eval "(get (erlang-eval-ast \"${SETUP} S = delivery_state:fold(E_Fail1, delivery_state:fold(E_Enq1, delivery_state:new())), {delivery_state:attempts(bob, S), delivery_state:next_retry(bob, S)} =:= {[{<<1>>, 1}], [{<<1>>, 1030}]}\") :name)") + +;; Five failures then 6th fails -> dead_lettered +(epoch 16) +(eval "(get (erlang-eval-ast \"${SETUP} F = fun(S) -> delivery_state:fold(E_Fail1, S) end, S0 = delivery_state:fold(E_Enq1, delivery_state:new()), S6 = F(F(F(F(F(F(S0)))))), {delivery_state:dead_letter(bob, S6), delivery_state:pending(bob, S6)} =:= {[Act1], []}\") :name)") + +;; Explicit dead_lettered event moves activity to dead_letter +(epoch 17) +(eval "(get (erlang-eval-ast \"${SETUP} E_DL = [{type, dead_lettered}, {peer, bob}, {cid, <<1>>}], S = delivery_state:fold(E_DL, delivery_state:fold(E_Enq1, delivery_state:new())), {delivery_state:dead_letter(bob, S), delivery_state:pending(bob, S)} =:= {[Act1], []}\") :name)") + +;; peers/1 lists every peer touched +(epoch 18) +(eval "(get (erlang-eval-ast \"${SETUP} S = delivery_state:fold(E_Enq2Carol, delivery_state:fold(E_Enq1, delivery_state:new())), delivery_state:peers(S) =:= [bob, carol]\") :name)") + +;; peer_state returns {ok, Worker} | not_found +(epoch 19) +(eval "(get (erlang-eval-ast \"${SETUP} S = delivery_state:fold(E_Enq1, delivery_state:new()), case delivery_state:peer_state(bob, S) of {ok, _} -> true; _ -> false end andalso delivery_state:peer_state(ghost, S) =:= not_found\") :name)") + +;; fold_fn/0 returns a 2-arity Erlang fun usable by projection:start_link/3 +(epoch 20) +(eval "(get (erlang-eval-ast \"is_function(delivery_state:fold_fn(), 2)\") :name)") + +;; Unknown event type passes through +(epoch 21) +(eval "(get (erlang-eval-ast \"Garbage = [{type, mystery}, {peer, bob}], delivery_state:fold(Garbage, delivery_state:new()) =:= []\") :name)") + +;; delivered after failed clears retry state +(epoch 22) +(eval "(get (erlang-eval-ast \"${SETUP} S = delivery_state:fold(E_Del1, delivery_state:fold(E_Fail1, delivery_state:fold(E_Enq1, delivery_state:new()))), {delivery_state:attempts(bob, S), delivery_state:next_retry(bob, S)} =:= {[], []}\") :name)") +EPOCHS + +OUTPUT=$(timeout 240 "$SX_SERVER" < "$TMPFILE" 2>/dev/null) + +check() { + local epoch="$1" desc="$2" expected="$3" + local actual + actual=$(echo "$OUTPUT" | awk -v e="$epoch" ' + $0 ~ "^\\(ok-len " e " " { getline; print; exit } + $0 ~ "^\\(ok " e " " { print; exit } + $0 ~ "^\\(error " e " " { print; exit } + ') + [ -z "$actual" ] && actual="" + if echo "$actual" | grep -qF -- "$expected"; then + PASS=$((PASS+1)) + [ "$VERBOSE" = "-v" ] && echo " ok $desc" + else + FAIL=$((FAIL+1)) + ERRORS+=" FAIL [$desc] (epoch $epoch) expected: $expected | actual: $actual +" + fi +} + +check 4 "delivery_state module loaded" "delivery_state" +check 10 "new/0 -> []" "true" +check 11 "enqueued -> pending appended" "true" +check 12 "two enqueues -> FIFO" "true" +check 13 "two peers independent queues" "true" +check 14 "delivered clears pending entry" "true" +check 15 "failed bumps attempts + next_retry" "true" +check 16 "6th failed -> dead_lettered" "true" +check 17 "explicit dead_lettered event" "true" +check 18 "peers/1 lists touched" "true" +check 19 "peer_state ok / not_found" "true" +check 20 "fold_fn/0 is fun/2" "true" +check 21 "unknown event passes through" "true" +check 22 "delivered after failed clears" "true" + +TOTAL=$((PASS+FAIL)) +if [ $FAIL -eq 0 ]; then + echo "ok $PASS/$TOTAL next/tests/delivery_state.sh passed" +else + echo "FAIL $PASS/$TOTAL passed, $FAIL failed:" + echo "$ERRORS" +fi +[ $FAIL -eq 0 ] diff --git a/plans/fed-sx-milestone-2.md b/plans/fed-sx-milestone-2.md index 893d40b2..f19aa08f 100644 --- a/plans/fed-sx-milestone-2.md +++ b/plans/fed-sx-milestone-2.md @@ -566,10 +566,22 @@ a dead-letter list visible via `/admin/dead-letter`. self-cast or equivalent). Needs the same substrate primitive that `gen_server` uses for `timeout` returns. Defer behind substrate gap discovery for now — see Blockers. -- [ ] **8c** — Delivery-state projection so the queue survives - kernel restart. New `next/kernel/delivery_state.erl` fold maps - enqueue / delivered / failed events to the worker's persistent - shape. +- [x] **8c** — Delivery-state projection + (`next/kernel/delivery_state.erl`). Folds delivery events into + per-peer worker-shaped snapshots so the outbound queue survives + kernel restart. Event shapes: + `[{type, enqueued|delivered|failed|dead_lettered}, {peer, _}, + {activity, _} | {cid, _}, {now, _}?]`. State shape + `[{PeerId, WorkerProplist}, ...]` mirrors `delivery_worker:new/1`'s + output so a fresh gen_server can be hydrated on restart. Public + API: `new/0`, `fold/2`, `fold_fn/0`, `peer_state/2`, `peers/1`, + per-field accessors (`pending`, `attempts`, `next_retry`, + `dead_letter`). Uses `delivery_worker:backoff_for/1` to decide + dead-letter promotion on the 6th failure, so the projection + and the live worker stay in lockstep. 14/14 in + `delivery_state.sh`. The restart-hydration helper + (`delivery_worker:state_from_proj/2` or similar) lands when + 8b-timer wires the live retry loop. - [x] **8d** — `outbox:publish/2` dispatches each delivery-set entry to the matching worker. New `dispatch_deliveries/3` + `enqueue_each/2` in `outbox.erl` walk the computed @@ -938,6 +950,22 @@ proceed. Newest first. +- **2026-06-07** — Step 8c: delivery-state projection. New + `next/kernel/delivery_state.erl` folds enqueue / delivered / + failed / dead_lettered events into a per-peer worker-shaped + snapshot. State shape mirrors `delivery_worker:new/1`'s output + so a fresh gen_server can be hydrated from the projection on + kernel restart. The fail branch calls + `delivery_worker:backoff_for/1` directly, so the projection and + the live worker compute identical retry slots / dead-letter + thresholds. `fold_fn/0` plugs into `projection:start_link/3` + just like `actor_state` and `follower_graph`. 14/14 in + `delivery_state.sh`; delivery_worker.sh 17/17 + delivery_retry.sh + 11/11 unchanged. Conformance preserved at 761/761. The + hydration helper that loads a worker's pure state from the + projection lands once 8b-timer can wire the live retry loop + (Blockers #3 still open). + - **2026-06-07** — Step 8b-pure: retry-time bookkeeping. `delivery_worker` state shape gains `:next_retry` proplist alongside `:attempts`. `record_failure_pure/3(Cid, Now, State)`