diff --git a/next/kernel/delivery_worker.erl b/next/kernel/delivery_worker.erl index c2912b39..073ea0e6 100644 --- a/next/kernel/delivery_worker.erl +++ b/next/kernel/delivery_worker.erl @@ -5,9 +5,10 @@ backoff_for/1, schedule_for/1, record_failure_pure/3, record_success_pure/2, next_due_pure/2, attempts_for/2, next_retry_at/2, - dead_letter_list/1, + dead_letter_list/1, timer_ref_for/2, start_link/1, start_link/2, stop/1, - enqueue/2, flush/1, pending_srv/1, set_dispatch_fn/2]). + enqueue/2, flush/1, pending_srv/1, set_dispatch_fn/2, + state_srv/1]). -export([init/1, handle_call/3, handle_cast/2, handle_info/2]). %% Outbound delivery worker per design §13.4. One gen_server per @@ -49,6 +50,7 @@ new(PeerId) -> {attempts, []}, {next_retry, []}, {dead_letter, []}, + {timers, []}, {dispatch_fn, undefined}]. pending(State) -> field(pending, State). @@ -183,6 +185,16 @@ next_retry_at(Cid, State) -> dead_letter_list(State) -> field(dead_letter, State). +%% Step 8b-timer: per-cid timer ref accessor. Exposed for tests so +%% they can assert a retry timer was scheduled (or wasn't, after a +%% success / dead-letter). Returns the live Ref or undefined. + +timer_ref_for(Cid, State) -> + case find_keyed(Cid, field(timers, State)) of + {ok, Ref} -> Ref; + _ -> undefined + end. + move_to_dead_letter(Cid, State) -> Pending = field(pending, State), {Match, Rest} = take_by_cid(Cid, Pending, [], []), @@ -229,6 +241,13 @@ pending_srv(PeerId) -> set_dispatch_fn(PeerId, Fn) -> gen_server:call(PeerId, {set_dispatch_fn, Fn}). +%% Step 8b-timer: return the worker's full state so tests can use the +%% pure introspection functions (attempts_for / next_retry_at / +%% timer_ref_for / dead_letter_list) against it. + +state_srv(PeerId) -> + gen_server:call(PeerId, get_state). + %% gen_server callbacks init([PeerId, DispatchFn]) -> @@ -238,17 +257,138 @@ init([PeerId, DispatchFn]) -> handle_call({enqueue, Activity}, _From, State) -> {reply, ok, enqueue_pure(field(peer, State), Activity, State)}; handle_call(flush, _From, State) -> - {NewState, Delivered, Retry} = drain_pure(State), + %% Step 8b-timer: drain (which already bumps :attempts via + %% bump_attempt on each failed deliver), then for each retried + %% Cid compute the backoff slot from the now-current attempt + %% count, set NextRetryAt, and arm a send_after self-cast. + %% handle_info({retry, Cid}, ...) fires when the slot elapses. + %% Reply shape unchanged. + {DrainState, Delivered, Retry} = drain_pure(State), + Now = monotonic_seconds(), + NewState = lists:foldl( + fun(Cid, S) -> arm_retry_timer(Cid, Now, S) end, + DrainState, Retry), {reply, {ok, Delivered, Retry}, NewState}; handle_call(get_pending, _From, State) -> {reply, field(pending, State), State}; +handle_call(get_state, _From, State) -> + {reply, State, State}; handle_call({set_dispatch_fn, Fn}, _From, State) -> {reply, ok, set_field(dispatch_fn, Fn, State)}. handle_cast(_, S) -> {noreply, S}. +%% Step 8b-timer: a retry timer fired. Pull the activity by Cid from +%% the pending queue (it might have been drained meanwhile by a +%% concurrent flush — if so, we just clear bookkeeping and exit). +%% Run deliver_one_pure: success clears retry state; failure bumps +%% the counter and schedules the next slot — or dead-letters if the +%% sixth attempt failed. + +handle_info({retry, Cid}, State) -> + %% Clear the timer ref we just consumed. + State0 = clear_timer_ref(Cid, State), + case take_by_cid(Cid, field(pending, State0), [], 0) of + {none, _} -> + %% Already drained / dead-lettered. Clear any stale + %% bookkeeping in case the cid is half-tracked. + {noreply, record_success_pure(Cid, State0)}; + {Activity, Rest} -> + case deliver_one_pure(Activity, State0) of + {ok, _} -> + State1 = set_field(pending, Rest, State0), + State2 = record_success_pure(Cid, State1), + {noreply, State2}; + {error, _, _} -> + %% Keep the activity in pending; record_failure + %% leaves :pending alone (or dead-letters it on + %% slot 6). + Now = monotonic_seconds(), + State1 = schedule_retry_for(Cid, Now, State0), + {noreply, State1} + end + end; handle_info(_, S) -> {noreply, S}. +%% Step 8b-timer helpers ──────────────────────────────────────────── + +%% arm_retry_timer/3 — POST-DRAIN form. Used from handle_call(flush) +%% after drain_pure has already bumped :attempts via bump_attempt. +%% Sets next_retry_at = Now + backoff(attempts) and schedules the +%% send_after self-cast. On the dead-letter slot (attempt 6), moves +%% the activity from :pending to :dead_letter and arms no timer. + +arm_retry_timer(Cid, Now, State) -> + State0 = cancel_timer_for(Cid, State), + Attempts = attempts_for(Cid, State0), + case backoff_for(Attempts) of + dead_letter -> + move_to_dead_letter(Cid, State0); + Seconds -> + NextAt = Now + Seconds, + NR = field(next_retry, State0), + State1 = set_field(next_retry, set_keyed(Cid, NextAt, NR), State0), + Ms = Seconds * 1000, + Ref = erlang:send_after(Ms, self(), {retry, Cid}), + Timers = field(timers, State1), + set_field(timers, set_keyed(Cid, Ref, Timers), State1) + end. + +%% schedule_retry_for/3 — POST-RETRY-ATTEMPT form. Used from +%% handle_info({retry, Cid}, ...) when the retry attempt failed. +%% Bookkeep one failure and arm the next retry timer (or promote +%% to dead-letter, in which case no timer is needed). + +schedule_retry_for(Cid, Now, State) -> + %% Cancel any in-flight timer for this Cid before scheduling a new + %% one. Without the cancel a stale timer can still fire after + %% record_success has cleared the cid, the handle_info no-match + %% branch silently absorbs it — but it keeps the scheduler's + %% run-loop alive long after the work is done. A pure clear (no + %% cancel) is fine when the timer's own firing brought us here, + %% so the explicit cancel only matters for the flush path. + State0 = cancel_timer_for(Cid, State), + State1 = record_failure_pure(Cid, Now, State0), + Attempts = attempts_for(Cid, State1), + case backoff_for(Attempts) of + dead_letter -> + State1; + Seconds -> + Ms = Seconds * 1000, + Ref = erlang:send_after(Ms, self(), {retry, Cid}), + Timers = field(timers, State1), + set_field(timers, set_keyed(Cid, Ref, Timers), State1) + end. + +%% Cancel the live timer for Cid (if any) and clear it from :timers. +%% Idempotent — silent no-op if there isn't one. + +cancel_timer_for(Cid, State) -> + Timers = field(timers, State), + case find_keyed(Cid, Timers) of + {ok, Ref} -> + erlang:cancel_timer(Ref), + set_field(timers, del_keyed(Cid, Timers), State); + _ -> State + end. + +%% Drop the :timers entry for Cid without calling cancel_timer — used +%% when the timer's own firing brought us into handle_info and the ref +%% is already consumed. + +clear_timer_ref(Cid, State) -> + Timers = field(timers, State), + case find_keyed(Cid, Timers) of + {ok, _Ref} -> set_field(timers, del_keyed(Cid, Timers), State); + _ -> State + end. + +%% Step 8b-timer: bookkeeping uses seconds (matches backoff_for / +%% record_failure_pure / next_retry_at). The monotonic clock reports +%% ms; we floor to seconds here to keep all the comparisons aligned. + +monotonic_seconds() -> erlang:monotonic_time() div 1000. + %% ── Internal ──────────────────────────────────────────────────── activity_cid(Activity) -> diff --git a/next/tests/delivery_retry_timer.sh b/next/tests/delivery_retry_timer.sh new file mode 100755 index 00000000..7be29de7 --- /dev/null +++ b/next/tests/delivery_retry_timer.sh @@ -0,0 +1,131 @@ +#!/usr/bin/env bash +# next/tests/delivery_retry_timer.sh — m2 Step 8b-timer. +# +# Live timer wiring on the delivery_worker gen_server. The pure +# bookkeeping is covered by delivery_retry.sh — this suite proves the +# erlang:send_after / cancel_timer wiring fires retries from the +# scheduler's logical clock without anyone calling drain by hand. +# +# Substrate dependency: erlang:send_after/3 + cancel_timer/1 + +# monotonic_time/0,1 — landed via cherry-pick from loops/erlang +# (commits 3709460d / 98b0104c / 779e53b2 on this branch). +# +# Test discipline: every test cancels its leftover timer before +# returning. If we don't, the scheduler keeps the run loop alive +# advancing time through the full backoff chain (30s → 5m → 30m → +# 6h → 24h), and each tick costs ~10s of wall time inside the +# Erlang-on-SX VM. Canceling the trailing timer is the difference +# between a 25s test and a 60s+ test. + +set -uo pipefail +cd "$(git rev-parse --show-toplevel)" + +SX_SERVER="${SX_SERVER:-hosts/ocaml/_build/default/bin/sx_server.exe}" +if [ ! -x "$SX_SERVER" ]; then + SX_SERVER="/root/rose-ash/hosts/ocaml/_build/default/bin/sx_server.exe" +fi +if [ ! -x "$SX_SERVER" ]; then + echo "ERROR: sx_server.exe not found." >&2 + exit 1 +fi + +VERBOSE="${1:-}" +PASS=0; FAIL=0; ERRORS="" +TMPFILE=$(mktemp); trap "rm -f $TMPFILE" EXIT + +# A canned activity with cid <<1,2,3>>. +SETUP='Act = [{id, <<1,2,3>>}, {type, note}, {actor, alice}], FailFn = fun(_) -> {error, transient} end,' + +# Convenience: cancel any leftover timer for cid <<1,2,3>> on Peer. +# Prevents the scheduler from grinding through 30s/5m/30m/6h/24h of +# retries between epochs. +CANCEL='CancelLeftover = fun(Peer) -> SS = delivery_worker:state_srv(Peer), case delivery_worker:timer_ref_for(<<1,2,3>>, SS) of undefined -> ok; LRef -> erlang:cancel_timer(LRef), ok end end,' + +cat > "$TMPFILE" <>]} = delivery_worker:flush(bob), S = delivery_worker:state_srv(bob), Ref = delivery_worker:timer_ref_for(<<1,2,3>>, S), Result = is_reference(Ref), CancelLeftover(bob), Result\") :name)") + +;; T2 — initial flush bumps the attempt counter to 1; next_retry_at +;; gets set; cancel the timer before returning. +(epoch 11) +(eval "(get (erlang-eval-ast \"${SETUP}${CANCEL} delivery_worker:start_link(bob, FailFn), delivery_worker:enqueue(bob, Act), delivery_worker:flush(bob), S = delivery_worker:state_srv(bob), Result = delivery_worker:attempts_for(<<1,2,3>>, S) =:= 1, CancelLeftover(bob), Result\") :name)") + +;; T3 — advancing the logical clock past the 30s backoff fires the +;; timer; handle_info({retry, Cid}) bumps attempts to 2 and arms +;; the next slot (backoff(2)=300s). Then cancel the new timer. +(epoch 12) +(eval "(get (erlang-eval-ast \"${SETUP}${CANCEL} delivery_worker:start_link(bob, FailFn), delivery_worker:enqueue(bob, Act), delivery_worker:flush(bob), receive after 31000 -> ok end, S = delivery_worker:state_srv(bob), Result = delivery_worker:attempts_for(<<1,2,3>>, S) =:= 2, CancelLeftover(bob), Result\") :name)") + +;; T4 — after the retry fires the worker has armed a fresh timer +;; for the next backoff slot. Confirm it's a live ref, then +;; cancel it. +(epoch 13) +(eval "(get (erlang-eval-ast \"${SETUP}${CANCEL} delivery_worker:start_link(bob, FailFn), delivery_worker:enqueue(bob, Act), delivery_worker:flush(bob), receive after 31000 -> ok end, S = delivery_worker:state_srv(bob), Result = is_reference(delivery_worker:timer_ref_for(<<1,2,3>>, S)), CancelLeftover(bob), Result\") :name)") + +;; T5 — successful retry path. Dispatch fails twice then succeeds +;; (ets-backed counter). After two backoff slots elapse +;; (30s, then 300s), the third attempt succeeds and +;; record_success_pure clears the per-cid bookkeeping. No new +;; timer is scheduled, so the scheduler terminates naturally. +(epoch 14) +(eval "(get (erlang-eval-ast \"${SETUP} ets:new(rt_ctr, [named_table, public]), ets:insert(rt_ctr, {n, 0}), Mixed = fun(_) -> [{n, N}] = ets:lookup(rt_ctr, n), ets:insert(rt_ctr, {n, N+1}), case N < 2 of true -> {error, transient}; false -> ok end end, delivery_worker:start_link(carol, Mixed), delivery_worker:enqueue(carol, Act), delivery_worker:flush(carol), receive after 31000 -> ok end, receive after 301000 -> ok end, S = delivery_worker:state_srv(carol), delivery_worker:pending(S) =:= [] andalso delivery_worker:attempts_for(<<1,2,3>>, S) =:= 0 andalso delivery_worker:timer_ref_for(<<1,2,3>>, S) =:= undefined\") :name)") +EPOCHS + +OUTPUT=$(timeout 900 "$SX_SERVER" < "$TMPFILE" 2>/dev/null) + +check() { + local epoch="$1" desc="$2" expected="$3" + local actual + actual=$(echo "$OUTPUT" | awk -v e="$epoch" ' + $0 ~ "^\\(ok-len " e " " { getline; print; exit } + $0 ~ "^\\(ok " e " " { print; exit } + $0 ~ "^\\(error " e " " { print; exit } + ') + [ -z "$actual" ] && actual="" + if echo "$actual" | grep -qF -- "$expected"; then + PASS=$((PASS+1)) + [ "$VERBOSE" = "-v" ] && echo " ok $desc" + else + FAIL=$((FAIL+1)) + ERRORS+=" FAIL [$desc] (epoch $epoch) expected: $expected | actual: $actual +" + fi +} + +check 10 "T1 flush schedules a timer" "true" +check 11 "T2 initial flush bumps attempts to 1" "true" +check 12 "T3 timer fires; attempts=2" "true" +check 13 "T4 retry rearms next timer" "true" +check 14 "T5 success clears retry state" "true" + +TOTAL=$((PASS+FAIL)) +if [ $FAIL -eq 0 ]; then + echo "ok $PASS/$TOTAL next/tests/delivery_retry_timer.sh passed" +else + echo "FAIL $PASS/$TOTAL passed, $FAIL failed:" + echo "$ERRORS" + if [ "$VERBOSE" = "-v" ]; then + echo "--- sx_server output ---" + echo "$OUTPUT" | tail -40 + echo "---" + fi +fi +[ $FAIL -eq 0 ] diff --git a/plans/fed-sx-milestone-2.md b/plans/fed-sx-milestone-2.md index 85a363eb..c466f5e2 100644 --- a/plans/fed-sx-milestone-2.md +++ b/plans/fed-sx-milestone-2.md @@ -562,10 +562,24 @@ a dead-letter list visible via `/admin/dead-letter`. is cleared from `:next_retry`. `record_success_pure` clears both. `next_due_pure` returns cids whose retry time has passed. 11 cases in `delivery_retry.sh`. -- [ ] **8b-timer** — Erlang-side timer wiring (`erlang:send_after` - self-cast or equivalent). Needs the same substrate primitive - that `gen_server` uses for `timeout` returns. Defer behind - substrate gap discovery for now — see Blockers. +- [x] **8b-timer** — Erlang-side timer wiring on the + `delivery_worker` gen_server. handle_call(flush) drains then + arms a `send_after` self-cast per retried Cid (backoff from + the now-bumped attempt counter); handle_info({retry, Cid}) + redrives that single Cid through deliver_one_pure. Success + clears bookkeeping via record_success; failure bumps attempts + via record_failure_pure and arms the next backoff slot — or + promotes to dead-letter on the 6th attempt and stops arming. + A `:timers [{Cid, Ref}]` state field tracks live refs so + schedule_retry_for can cancel the previous one before arming + the next (otherwise stale timers keep the scheduler's run + loop alive long after the work is done). 5/5 in + `delivery_retry_timer.sh`: T1 timer scheduled, T2 attempts=1, + T3 retry fires + attempts=2, T4 next timer rearmed, T5 ets- + counter dispatch (fail/fail/ok) lands in 3 attempts and + clears state. Substrate dependency landed via cherry-pick + from `loops/erlang` (3709460d / 98b0104c / 779e53b2) until + `loops/erlang` → architecture catches up. - [x] **8c** — Delivery-state projection (`next/kernel/delivery_state.erl`). Folds delivery events into per-peer worker-shaped snapshots so the outbound queue survives @@ -1105,8 +1119,16 @@ proceed. through `delivery_worker`) and Step 10c (peer-actor doc fetch in `peer_actors`) are now unblocked. -3. **`erlang:send_after`-style timer primitive** — discovered - during Step 8b prep. The retry loop needs a way for the +3. **`erlang:send_after`-style timer primitive** — ~~discovered + during Step 8b prep~~ **RESOLVED 2026-06-30** via the + `loops/erlang` `send_after`/`cancel_timer`/`monotonic_time` + work landing on `origin/loops/erlang` (commits 3709460d, + 98b0104c, b10e55f0; 766/766 → 771/771). m2 cherry-picked all + three onto this branch so 8b-timer could land without waiting + for `loops/erlang` → architecture; the cherry-picks fall away + as no-op duplicates when architecture catches up. Original + diagnosis preserved below for the audit trail. + The retry loop needs a way for the delivery_worker to wake itself up after `backoff_for(N)` seconds. Erlang's `erlang:send_after/3` is the standard primitive; this port doesn't seem to register it (looked at @@ -1241,6 +1263,31 @@ proceed. Newest first. +- **2026-06-30** — Step 8b-timer closed. Cherry-picked the three + `loops/erlang` send_after commits onto m2 (3709460d, 98b0104c, + 779e53b2 — the substrate landed standalone on origin/loops/erlang + earlier and hadn't propagated to origin/architecture yet). Wired + the live timer loop in `next/kernel/delivery_worker.erl`: a + `:timers [{Cid, Ref}]` state field; `handle_call(flush)` drains + then arms a `send_after` self-cast per retried Cid; the new + `handle_info({retry, Cid})` callback redrives that one Cid through + `deliver_one_pure` and either records success / clears state, or + bumps and arms the next backoff slot (or dead-letters on the 6th + attempt). Two arm-paths split — `arm_retry_timer` (post-drain, + attempts already bumped) vs `schedule_retry_for` (post-retry + attempt, needs to bump). `cancel_timer_for/1` clears the previous + timer before arming the next so stale timers don't keep the + scheduler's run loop alive after the work is done. Two new public + APIs for tests: `state_srv/1` returns the worker's full state, + `timer_ref_for/2` looks up a Cid's live ref. 5/5 in new + `delivery_retry_timer.sh` (T1 timer scheduled, T2 attempts=1, T3 + retry fires + attempts=2, T4 next timer rearmed, T5 ets-counter + dispatch fail/fail/ok lands in 3 attempts and clears state). + Existing `delivery_worker.sh` 17/17 and `delivery_retry.sh` 11/11 + still green. Conformance gate 771/771 (was 761/761; the +10 is + the cherry-picked send_after suite). Blockers #3 RESOLVED. + Reply shape of `flush` unchanged; no caller updates needed. + - **2026-06-28** — Merge-prep pass. Conformance 761/761 still green on m2 tip `cd0de8cb`. Both smoke tests still pass cold: `next/tests/smoke_kernel_route.sh` 6/6 (port 54471, listener up