From 8bf2b45cf9ce77623d9f6474f56c460cb32a68ae Mon Sep 17 00:00:00 2001 From: giles Date: Sun, 7 Jun 2026 02:04:23 +0000 Subject: [PATCH] =?UTF-8?q?fed-sx-m2:=20Step=208b-pure=20=E2=80=94=20retry?= =?UTF-8?q?-time=20bookkeeping=20+=2011=20tests=20+=202=20Blockers?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit delivery_worker state shape gains :next_retry proplist alongside the existing :attempts: [{peer, _}, {pending, _}, {attempts, [{Cid, N}]}, {next_retry, [{Cid, NextRetryAt}]}, {dead_letter, _}, {dispatch_fn, _}] New pure-functional exports: record_failure_pure/3(Cid, Now, State) Bumps :attempts for Cid. On the 6th failure (backoff_for returns dead_letter) moves the matching activity from :pending to :dead_letter and clears the :next_retry entry. Otherwise sets next_retry to Now + backoff_for(NewAttempts). record_success_pure/2(Cid, State) Clears both :attempts and :next_retry for Cid. next_due_pure/2(Now, State) Returns cids whose retry time has passed (insertion order preserved so the worker drains in FIFO retry order). attempts_for/2, next_retry_at/2, dead_letter_list/1 Read-side accessors. Internal helper move_to_dead_letter/2 + take_by_cid/4 walks :pending to find the matching activity by cid. 11/11 in next/tests/delivery_retry.sh covering: - fresh state: 0 attempts / undefined retry / [] dead_letter - record_failure bumps to 1 - record_failure sets next_retry_at = Now + 30 (slot 1) - second failure: attempts=2, NextRetryAt = Now + 300 (slot 2) - record_success clears both - next_due returns due cids - next_due empty before due - 6th failure -> dead-letter; activity out of :pending - dead-lettered cid removed from :next_retry - per-cid isolation: success on one doesn't disturb another delivery_worker.sh 17/17 unchanged (new exports are additive). Blockers added: #2 — Native http-request primitive missing in bin/sx_server.ml (briefing assumed it existed; only http-listen exists). Belongs to loops/fed-prims. Step 8e wrapper waits for the native. #3 — erlang:send_after-style timer primitive missing. Needed for the real retry loop. Belongs to loops/erlang. 8b-pure captures the semantics so 8b-timer is a 1-shot wiring when the primitive lands. Conformance preserved at 761/761. --- next/kernel/delivery_worker.erl | 88 ++++++++++++++++++++++ next/tests/delivery_retry.sh | 126 ++++++++++++++++++++++++++++++++ plans/fed-sx-milestone-2.md | 73 +++++++++++++++--- 3 files changed, 278 insertions(+), 9 deletions(-) create mode 100755 next/tests/delivery_retry.sh diff --git a/next/kernel/delivery_worker.erl b/next/kernel/delivery_worker.erl index d4fc581e..c2912b39 100644 --- a/next/kernel/delivery_worker.erl +++ b/next/kernel/delivery_worker.erl @@ -3,6 +3,9 @@ -export([new/1, pending/1, peer/1, enqueue_pure/3, drain_pure/1, deliver_one_pure/2, backoff_for/1, schedule_for/1, + record_failure_pure/3, record_success_pure/2, + next_due_pure/2, attempts_for/2, next_retry_at/2, + dead_letter_list/1, start_link/1, start_link/2, stop/1, enqueue/2, flush/1, pending_srv/1, set_dispatch_fn/2]). -export([init/1, handle_call/3, handle_cast/2, handle_info/2]). @@ -31,6 +34,7 @@ %% [{peer, PeerId}, %% {pending, [Activity, ...]}, %% FIFO; head delivered first %% {attempts, [{Cid, AttemptCount}, ...]}, +%% {next_retry, [{Cid, NextRetryAt}, ...]}, %% Step 8b-pure %% {dead_letter, [Activity, ...]}, %% {dispatch_fn, fun/1 | undefined}] %% @@ -43,6 +47,7 @@ new(PeerId) -> [{peer, PeerId}, {pending, []}, {attempts, []}, + {next_retry, []}, {dead_letter, []}, {dispatch_fn, undefined}]. @@ -118,6 +123,85 @@ schedule_for(Attempts) -> Seconds -> {retry_in, Seconds} end. +%% ── Step 8b-pure: retry-time bookkeeping ─────────────────────── +%% +%% `record_failure_pure/3(Cid, Now, State)` — call after a failed +%% deliver_one. Bumps the per-cid attempt counter; if the new +%% attempt is past the dead-letter threshold, moves the matching +%% activity from :pending to :dead_letter. Otherwise records the +%% next retry time as Now + backoff_for(NewAttempt). +%% +%% Real timer wiring (erlang:send_after self-cast on the worker +%% pid) needs substrate support — Step 8b-timer when that lands. +%% +%% `record_success_pure/2(Cid, State)` — clears :attempts and +%% :next_retry entries for the cid; called after a successful +%% deliver_one. +%% +%% `next_due_pure/2(Now, State)` — returns the list of Cids whose +%% NextRetryAt has passed, in insertion order. + +record_failure_pure(Cid, Now, State) -> + Attempts = field(attempts, State), + Current = case find_keyed(Cid, Attempts) of + {ok, N} -> N; + _ -> 0 + end, + New = Current + 1, + State1 = set_field(attempts, set_keyed(Cid, New, Attempts), State), + case backoff_for(New) of + dead_letter -> + move_to_dead_letter(Cid, State1); + Seconds -> + NextAt = Now + Seconds, + NR = field(next_retry, State1), + set_field(next_retry, set_keyed(Cid, NextAt, NR), State1) + end. + +record_success_pure(Cid, State) -> + A1 = del_keyed(Cid, field(attempts, State)), + NR1 = del_keyed(Cid, field(next_retry, State)), + set_field(attempts, A1, set_field(next_retry, NR1, State)). + +%% next_due_pure/2 — Cids whose NextRetryAt <= Now. Preserves +%% insertion order so the worker drains them in FIFO retry order. + +next_due_pure(Now, State) -> + [Cid || {Cid, At} <- field(next_retry, State), At =< Now]. + +attempts_for(Cid, State) -> + case find_keyed(Cid, field(attempts, State)) of + {ok, N} -> N; + _ -> 0 + end. + +next_retry_at(Cid, State) -> + case find_keyed(Cid, field(next_retry, State)) of + {ok, At} -> At; + _ -> undefined + end. + +dead_letter_list(State) -> field(dead_letter, State). + +move_to_dead_letter(Cid, State) -> + Pending = field(pending, State), + {Match, Rest} = take_by_cid(Cid, Pending, [], []), + DL = field(dead_letter, State), + State1 = set_field(pending, Rest, State), + State2 = case Match of + none -> State1; + Act -> set_field(dead_letter, DL ++ [Act], State1) + end, + NR = field(next_retry, State2), + set_field(next_retry, del_keyed(Cid, NR), State2). + +take_by_cid(_, [], Acc, _) -> {none, lists:reverse(Acc)}; +take_by_cid(Cid, [A | Rest], Acc, _) -> + case activity_cid(A) of + Cid -> {A, lists:reverse(Acc) ++ Rest}; + _ -> take_by_cid(Cid, Rest, [A | Acc], 0) + end. + %% ── gen_server wrapper ────────────────────────────────────────── start_link(PeerId) -> @@ -196,3 +280,7 @@ find_keyed(K, [_ | Rest]) -> find_keyed(K, Rest). set_keyed(K, V, []) -> [{K, V}]; set_keyed(K, V, [{K, _} | Rest]) -> [{K, V} | Rest]; set_keyed(K, V, [P | Rest]) -> [P | set_keyed(K, V, Rest)]. + +del_keyed(_, []) -> []; +del_keyed(K, [{K, _} | Rest]) -> Rest; +del_keyed(K, [P | Rest]) -> [P | del_keyed(K, Rest)]. diff --git a/next/tests/delivery_retry.sh b/next/tests/delivery_retry.sh new file mode 100755 index 00000000..3949221a --- /dev/null +++ b/next/tests/delivery_retry.sh @@ -0,0 +1,126 @@ +#!/usr/bin/env bash +# next/tests/delivery_retry.sh — m2 Step 8b-pure test. +# +# Pure-functional retry-time bookkeeping for the delivery worker. +# record_failure bumps the attempt counter and computes the next +# retry time per backoff_for. record_success clears state for a +# cid. next_due returns cids whose retry time has passed. +# +# Real timer wiring (erlang:send_after self-cast) is Step 8b-timer +# once substrate support lands. + +set -uo pipefail +cd "$(git rev-parse --show-toplevel)" + +SX_SERVER="${SX_SERVER:-hosts/ocaml/_build/default/bin/sx_server.exe}" +if [ ! -x "$SX_SERVER" ]; then + SX_SERVER="/root/rose-ash/hosts/ocaml/_build/default/bin/sx_server.exe" +fi +if [ ! -x "$SX_SERVER" ]; then + echo "ERROR: sx_server.exe not found." >&2 + exit 1 +fi + +VERBOSE="${1:-}" +PASS=0; FAIL=0; ERRORS="" +TMPFILE=$(mktemp); trap "rm -f $TMPFILE" EXIT + +SETUP='Act1 = [{id, <<1>>}, {type, note}, {actor, alice}], Act2 = [{id, <<2>>}, {type, note}, {actor, alice}],' + +cat > "$TMPFILE" <>, S), delivery_worker:next_retry_at(<<1>>, S), delivery_worker:dead_letter_list(S)} =:= {0, undefined, []}\") :name)") + +;; record_failure bumps the attempt counter +(epoch 11) +(eval "(get (erlang-eval-ast \"${SETUP} S0 = delivery_worker:enqueue_pure(bob, Act1, delivery_worker:new(bob)), S1 = delivery_worker:record_failure_pure(<<1>>, 1000, S0), delivery_worker:attempts_for(<<1>>, S1) =:= 1\") :name)") + +;; record_failure sets next_retry_at = Now + backoff(1) = Now + 30 +(epoch 12) +(eval "(get (erlang-eval-ast \"${SETUP} S0 = delivery_worker:enqueue_pure(bob, Act1, delivery_worker:new(bob)), S1 = delivery_worker:record_failure_pure(<<1>>, 1000, S0), delivery_worker:next_retry_at(<<1>>, S1) =:= 1030\") :name)") + +;; Second failure -> attempts=2, NextRetryAt = Now+300 +(epoch 13) +(eval "(get (erlang-eval-ast \"${SETUP} S0 = delivery_worker:enqueue_pure(bob, Act1, delivery_worker:new(bob)), S1 = delivery_worker:record_failure_pure(<<1>>, 1000, S0), S2 = delivery_worker:record_failure_pure(<<1>>, 2000, S1), {delivery_worker:attempts_for(<<1>>, S2), delivery_worker:next_retry_at(<<1>>, S2)} =:= {2, 2300}\") :name)") + +;; record_success clears attempts + next_retry for the cid +(epoch 14) +(eval "(get (erlang-eval-ast \"${SETUP} S0 = delivery_worker:enqueue_pure(bob, Act1, delivery_worker:new(bob)), S1 = delivery_worker:record_failure_pure(<<1>>, 1000, S0), S2 = delivery_worker:record_success_pure(<<1>>, S1), {delivery_worker:attempts_for(<<1>>, S2), delivery_worker:next_retry_at(<<1>>, S2)} =:= {0, undefined}\") :name)") + +;; next_due returns Cids whose retry time has passed +(epoch 15) +(eval "(get (erlang-eval-ast \"${SETUP} S0 = delivery_worker:enqueue_pure(bob, Act1, delivery_worker:new(bob)), S1 = delivery_worker:record_failure_pure(<<1>>, 1000, S0), delivery_worker:next_due_pure(1030, S1) =:= [<<1>>]\") :name)") + +;; next_due returns [] before retry time +(epoch 16) +(eval "(get (erlang-eval-ast \"${SETUP} S0 = delivery_worker:enqueue_pure(bob, Act1, delivery_worker:new(bob)), S1 = delivery_worker:record_failure_pure(<<1>>, 1000, S0), delivery_worker:next_due_pure(1020, S1) =:= []\") :name)") + +;; 6th failure -> dead_letter; activity moves out of :pending +(epoch 17) +(eval "(get (erlang-eval-ast \"${SETUP} F = fun(S) -> delivery_worker:record_failure_pure(<<1>>, 1000, S) end, S0 = delivery_worker:enqueue_pure(bob, Act1, delivery_worker:new(bob)), S6 = F(F(F(F(F(F(S0)))))), {delivery_worker:dead_letter_list(S6), delivery_worker:pending(S6)} =:= {[Act1], []}\") :name)") + +;; Dead-lettered cid is no longer in next_retry +(epoch 18) +(eval "(get (erlang-eval-ast \"${SETUP} F = fun(S) -> delivery_worker:record_failure_pure(<<1>>, 1000, S) end, S0 = delivery_worker:enqueue_pure(bob, Act1, delivery_worker:new(bob)), S6 = F(F(F(F(F(F(S0)))))), delivery_worker:next_retry_at(<<1>>, S6) =:= undefined\") :name)") + +;; Two cids: success on one doesn't disturb the other's retry state +(epoch 19) +(eval "(get (erlang-eval-ast \"${SETUP} S0 = delivery_worker:enqueue_pure(bob, Act1, delivery_worker:enqueue_pure(bob, Act2, delivery_worker:new(bob))), S1 = delivery_worker:record_failure_pure(<<1>>, 1000, S0), S2 = delivery_worker:record_failure_pure(<<2>>, 1000, S1), S3 = delivery_worker:record_success_pure(<<1>>, S2), delivery_worker:next_retry_at(<<2>>, S3) =:= 1030\") :name)") +EPOCHS + +OUTPUT=$(timeout 240 "$SX_SERVER" < "$TMPFILE" 2>/dev/null) + +check() { + local epoch="$1" desc="$2" expected="$3" + local actual + actual=$(echo "$OUTPUT" | awk -v e="$epoch" ' + $0 ~ "^\\(ok-len " e " " { getline; print; exit } + $0 ~ "^\\(ok " e " " { print; exit } + $0 ~ "^\\(error " e " " { print; exit } + ') + [ -z "$actual" ] && actual="" + if echo "$actual" | grep -qF -- "$expected"; then + PASS=$((PASS+1)) + [ "$VERBOSE" = "-v" ] && echo " ok $desc" + else + FAIL=$((FAIL+1)) + ERRORS+=" FAIL [$desc] (epoch $epoch) expected: $expected | actual: $actual +" + fi +} + +check 3 "module loaded" "delivery_worker" +check 10 "fresh state empty" "true" +check 11 "record_failure bumps attempts" "true" +check 12 "record_failure sets next_retry_at" "true" +check 13 "second failure: slot 2 = +300" "true" +check 14 "record_success clears state" "true" +check 15 "next_due returns due cids" "true" +check 16 "next_due empty before due" "true" +check 17 "6th failure -> dead_letter" "true" +check 18 "dead-lettered cid out of retry" "true" +check 19 "success on one preserves other" "true" + +TOTAL=$((PASS+FAIL)) +if [ $FAIL -eq 0 ]; then + echo "ok $PASS/$TOTAL next/tests/delivery_retry.sh passed" +else + echo "FAIL $PASS/$TOTAL passed, $FAIL failed:" + echo "$ERRORS" +fi +[ $FAIL -eq 0 ] diff --git a/plans/fed-sx-milestone-2.md b/plans/fed-sx-milestone-2.md index 9ba71b98..893d40b2 100644 --- a/plans/fed-sx-milestone-2.md +++ b/plans/fed-sx-milestone-2.md @@ -548,11 +548,24 @@ a dead-letter list visible via `/admin/dead-letter`. Step 8f plugs in the live httpc call without touching the queue logic. No actual HTTP yet; no retry timer wiring yet. 17/17 in `delivery_worker.sh`. -- [ ] **8b** — Retry / backoff scheduler. Wire `schedule_for/1` - into a private retry loop: `flush/1` returns deliveries that - failed; the worker schedules a self-cast via Erlang `after` - timer for the next retry slot. Tests fake-time via a Cfg - `:now_fn`. +- [x] **8b-pure** — Retry-time bookkeeping (pure-functional). + State shape gains `{next_retry, [{Cid, NextRetryAt}]}` alongside + the existing `:attempts`. New exports: + `record_failure_pure/3(Cid, Now, State)`, + `record_success_pure/2(Cid, State)`, + `next_due_pure/2(Now, State)`, `attempts_for/2`, + `next_retry_at/2`, `dead_letter_list/1`. + `record_failure_pure` bumps the attempt counter and computes + `Now + backoff_for(NewAttempts)` as the next retry; on the 6th + failure (`backoff_for` returns `dead_letter`) the matching + activity moves from `:pending` to `:dead_letter` and the cid + is cleared from `:next_retry`. `record_success_pure` clears + both. `next_due_pure` returns cids whose retry time has + passed. 11 cases in `delivery_retry.sh`. +- [ ] **8b-timer** — Erlang-side timer wiring (`erlang:send_after` + self-cast or equivalent). Needs the same substrate primitive + that `gen_server` uses for `timeout` returns. Defer behind + substrate gap discovery for now — see Blockers. - [ ] **8c** — Delivery-state projection so the queue survives kernel restart. New `next/kernel/delivery_state.erl` fold maps enqueue / delivered / failed events to the worker's persistent @@ -569,10 +582,12 @@ a dead-letter list visible via `/admin/dead-letter`. in `delivery_dispatch.sh` covering single-peer enqueue, two-peer fan-out, missing-worker skip, no-flag no-op, FIFO append across two publishes, empty delivery_set no-op. -- [ ] **8e** — `httpc:request/4` BIF wrapper in - `lib/erlang/runtime.sx` (the briefing's allowed scope - exception for Step 8). Marshalling: SX dict ↔ Erlang proplist - shape with `{ok, Status, Headers, Body}` / `{error, Reason}`. +- [ ] **8e** — `httpc:request/4` BIF wrapper. **Blocker:** the + briefing assumed a native `http-request` primitive existed in + `bin/sx_server.ml`; on inspection there's only `http-listen`. + The native http-CLIENT primitive belongs to `loops/fed-prims` + (host primitives loop). Blockers entry below. m2 work + continues with the in-process flow until the native lands. - [ ] **8f** — Real HTTP dispatch through the BIF + content-type wiring. dispatch_fn for live use becomes a closure over the peer URL that calls `httpc:request/4` with the signed envelope @@ -893,12 +908,52 @@ proceed. until resolved. Confirmed pre-existing by stashing 1a's changes and re-running on the unmodified m1 closeout HEAD. +2. **Native `http-request` (HTTP client) primitive missing** — + discovered during Step 8e prep. The fed-sx-m2 briefing + ("Substrate available to you" §) claimed: "Native HTTP client + primitive (registered in `bin/sx_server.ml`): `http-request` — + exposed at the SX layer, currently native-only." On inspection + `bin/sx_server.ml` only registers `http-listen`; there is no + `http-request` registration. The HTTP client primitive belongs + to `loops/fed-prims` (host primitives loop) per the + one-primitive-loop-per-substrate convention. m2's Step 8e + wrapper (`httpc:request/4` BIF in `lib/erlang/runtime.sx`) + can land in a 1-line follow-up once the native exists; m2 + work continues with 8b-pure / 8c / 8d in the in-process flow. + +3. **`erlang:send_after`-style timer primitive** — discovered + during Step 8b prep. The retry loop needs a way for the + delivery_worker to wake itself up after `backoff_for(N)` + seconds. Erlang's `erlang:send_after/3` is the standard + primitive; this port doesn't seem to register it (looked at + how `gen_server` handles `timeout` returns — it's a + message-loop self-cast that needs a delayed send). Belongs + to `loops/erlang` (Erlang runtime substrate). m2 captures the + retry semantics pure-functionally in 8b-pure so 8b-timer + becomes a 1-shot wiring when the primitive lands. + --- ## Progress log Newest first. +- **2026-06-07** — Step 8b-pure: retry-time bookkeeping. + `delivery_worker` state shape gains `:next_retry` proplist + alongside `:attempts`. `record_failure_pure/3(Cid, Now, State)` + bumps the per-cid counter and computes the next retry as + `Now + backoff_for(NewAttempts)`. On the 6th failure + (`backoff_for` returns `dead_letter`) the matching activity + moves from `:pending` to `:dead_letter`. `record_success_pure/2` + clears both `:attempts` and `:next_retry` for the cid. + `next_due_pure/2(Now, State)` returns the cids whose retry + time has passed (insertion order preserved). 11/11 in + `delivery_retry.sh`. 8b-timer (real timer wiring via + `erlang:send_after`-style primitive) and 8e + (`httpc:request/4` BIF) hit substrate gaps — Blockers entries + added pointing to loops/erlang + loops/fed-prims. Conformance + preserved at 761/761. + - **2026-06-07** — Step 8d: outbox dispatches delivery_set to workers. `outbox:publish/2` gained `dispatch_deliveries/3` and `enqueue_each/2`: after `log:append` + projection broadcast,