From 3dbb3e318a517f7542ab39f996bfd6e434bb79cd Mon Sep 17 00:00:00 2001 From: giles Date: Sun, 28 Jun 2026 17:44:19 +0000 Subject: [PATCH 1/4] erlang: erlang:send_after/3 + cancel_timer/1 + monotonic_time (T1+T2, 766/766) Logical-clock timer wheel in the scheduler. send_after schedules a message-delivery event at an absolute deadline (clock + Time ms); cancel_timer marks a live timer cancelled and reports remaining ms, or false. Time advances only when the runnable queue drains, jumping to the earliest pending deadline (deterministic, no wall clock). monotonic_time/0,1 exposes the logical ms clock. Co-Authored-By: Claude Opus 4.8 (1M context) --- lib/erlang/conformance.sh | 4 + lib/erlang/runtime.sx | 249 ++++++++++++++++++++++++++++++--- lib/erlang/scoreboard.json | 7 +- lib/erlang/scoreboard.md | 3 +- lib/erlang/tests/send_after.sx | 80 +++++++++++ lib/erlang/transpile.sx | 14 +- 6 files changed, 326 insertions(+), 31 deletions(-) create mode 100644 lib/erlang/tests/send_after.sx diff --git a/lib/erlang/conformance.sh b/lib/erlang/conformance.sh index c4a56a0d..334b6d36 100755 --- a/lib/erlang/conformance.sh +++ b/lib/erlang/conformance.sh @@ -38,6 +38,7 @@ SUITES=( "fib|er-fib-test-pass|er-fib-test-count" "ffi|er-ffi-test-pass|er-ffi-test-count" "vm|er-vm-test-pass|er-vm-test-count" + "send_after|er-sa-test-pass|er-sa-test-count" ) cat > "$TMPFILE" << 'EPOCHS' @@ -61,6 +62,7 @@ cat > "$TMPFILE" << 'EPOCHS' (load "lib/erlang/vm/dispatcher.sx") (load "lib/erlang/tests/ffi.sx") (load "lib/erlang/tests/vm.sx") +(load "lib/erlang/tests/send_after.sx") (epoch 100) (eval "(list er-test-pass er-test-count)") (epoch 101) @@ -83,6 +85,8 @@ cat > "$TMPFILE" << 'EPOCHS' (eval "(list er-ffi-test-pass er-ffi-test-count)") (epoch 110) (eval "(list er-vm-test-pass er-vm-test-count)") +(epoch 111) +(eval "(list er-sa-test-pass er-sa-test-count)") EPOCHS timeout 600 "$SX_SERVER" < "$TMPFILE" > "$OUTFILE" 2>&1 diff --git a/lib/erlang/runtime.sx b/lib/erlang/runtime.sx index d8fe6eca..870255b9 100644 --- a/lib/erlang/runtime.sx +++ b/lib/erlang/runtime.sx @@ -135,6 +135,56 @@ (dict-set! s :next-ref (+ n 1)) (er-mk-ref n))))) +;; ── logical clock + timer wheel ────────────────────────────────── +;; The scheduler runs a synchronous model: logical time advances only +;; when the runnable queue drains (see `er-sched-advance-time!`). The +;; clock is in milliseconds, monotonic, never derived from wall time +;; — deterministic and time-travel-safe. `send_after` schedules a +;; message-delivery event at an absolute deadline; `receive after Ms` +;; schedules a timeout event the same way. When no process is runnable +;; the scheduler jumps the clock to the earliest pending deadline and +;; fires that single event, then re-runs. +(define er-clock (fn () (get (er-sched) :clock))) + +;; Advance the clock to `ms`, but never backwards (monotonicity). +(define + er-clock-set! + (fn (ms) (dict-set! (er-sched) :clock (max (er-clock) ms)))) + +(define er-sched-timers (fn () (get (er-sched) :timers))) + +;; Register a timer event. `dest` is a pid or registered-atom value, +;; resolved to a live process at fire time. Returns the timer ref. +(define + er-timer-add! + (fn + (deadline dest msg ref) + (append! + (er-sched-timers) + {:ref ref :deadline deadline :dest dest :msg msg :alive true}) + ref)) + +;; Find the live timer with the given ref, or nil. +(define + er-timer-find-alive + (fn + (ref) + (let + ((ts (er-sched-timers)) (found (list nil))) + (for-each + (fn + (i) + (let + ((t (nth ts i))) + (when + (and + (= (nth found 0) nil) + (get t :alive) + (er-ref-equal? (get t :ref) ref)) + (set-nth! found 0 t)))) + (range 0 (len ts))) + (nth found 0)))) + ;; ── scheduler state ────────────────────────────────────────────── (define er-scheduler (list nil)) @@ -151,6 +201,8 @@ :processes {} :registered {} :ets {} + :clock 0 + :timers (list) :runnable (er-q-new)}))) (define er-sched (fn () (nth er-scheduler 0))) @@ -217,6 +269,7 @@ :trap-exit false :has-timeout false :timed-out false + :timeout-deadline nil :exit-reason nil})) (dict-set! (er-sched-processes) (er-pid-key pid) proc) (er-sched-enqueue! pid) @@ -456,6 +509,69 @@ (error "Erlang: make_ref/0: arity") (er-ref-new!)))) +;; ── timer BIFs ─────────────────────────────────────────────────── +;; erlang:send_after(Time, Dest, Msg) -> Ref +;; Schedules Msg to be delivered to Dest after Time ms (logical). +;; Time must be a non-negative integer; Dest a pid or registered +;; atom name. Returns a fresh timer reference. +(define + er-bif-send-after + (fn + (vs) + (let + ((time (nth vs 0)) (dest (nth vs 1)) (msg (nth vs 2))) + (cond + (not (and (= (type-of time) "number") (>= time 0))) + (raise (er-mk-error-marker (er-mk-atom "badarg"))) + (not (or (er-pid? dest) (er-atom? dest))) + (raise (er-mk-error-marker (er-mk-atom "badarg"))) + :else + (er-timer-add! + (+ (er-clock) (truncate time)) + dest + msg + (er-ref-new!)))))) + +;; erlang:cancel_timer(Ref) -> RemainingMs | false +;; For a live (not-yet-fired) timer, marks it cancelled and returns +;; the milliseconds left until its deadline. For an already-fired, +;; already-cancelled, or unknown ref, returns the atom `false`. +(define + er-bif-cancel-timer + (fn + (vs) + (let + ((ref (nth vs 0))) + (cond + (not (er-ref? ref)) + (raise (er-mk-error-marker (er-mk-atom "badarg"))) + :else + (let + ((t (er-timer-find-alive ref))) + (cond + (= t nil) (er-mk-atom "false") + :else (do + (dict-set! t :alive false) + (max 0 (- (get t :deadline) (er-clock)))))))))) + +;; erlang:monotonic_time() | erlang:monotonic_time(Unit) -> Integer +;; Returns the scheduler's logical monotonic clock in milliseconds. +;; Unit (millisecond / second / native) is accepted for API +;; compatibility; all units report from the same ms-resolution clock. +(define + er-bif-monotonic-time + (fn + (vs) + (cond + (= (len vs) 0) (er-clock) + (and (= (len vs) 1) (er-atom? (nth vs 0))) + (let + ((unit (get (nth vs 0) :name))) + (cond + (= unit "second") (truncate (/ (er-clock) 1000)) + :else (er-clock))) + :else (raise (er-mk-error-marker (er-mk-atom "badarg")))))) + ;; Add `target` to `pid`'s :links list if not already there. (define er-link-add-one! @@ -664,37 +780,122 @@ (cond (not (= pid nil)) (do (er-sched-step! pid) (er-sched-run-all!)) - ;; Queue empty — fire one pending receive-with-timeout and go again. - (er-sched-fire-one-timeout!) (er-sched-run-all!) + ;; Queue empty — advance logical time to the next pending + ;; deadline (timer delivery or receive-timeout) and go again. + (er-sched-advance-time!) (er-sched-run-all!) :else nil)))) -;; Wake one waiting process whose receive had an `after Ms` clause. -;; Returns true if one fired. In our synchronous model "time passes" -;; once the runnable queue drains — timeouts only fire then. +;; ── time advance ───────────────────────────────────────────────── +;; Called when the runnable queue is empty. Two kinds of pending event +;; carry a deadline: live `send_after` timers and waiting processes in +;; a `receive ... after Ms` block. Find the single earliest deadline +;; across both, jump the clock to it, and fire just that one event +;; (timer wins ties — a message delivered exactly at the timeout +;; arrives "first"). Returns true if an event fired, false when there +;; is nothing left to wake (genuine idle / termination). (define - er-sched-fire-one-timeout! + er-sched-advance-time! (fn () (let - ((ks (keys (er-sched-processes))) (fired (list false))) + ((best (er-sched-next-event))) + (cond + (= best nil) false + :else (do + (er-clock-set! (get best :deadline)) + (cond + (= (get best :kind) "timer") + (er-timer-fire! (get best :timer)) + :else (er-recv-timeout-fire! (get best :proc))) + true))))) + +;; Scan timers and waiting-with-timeout processes for the earliest +;; deadline. Returns {:kind "timer"|"recv" :deadline D ...} or nil. +(define + er-sched-next-event + (fn + () + (let + ((best (list nil))) + (for-each + (fn + (i) + (let + ((t (nth (er-sched-timers) i))) + (when + (get t :alive) + (er-event-consider! + best + {:kind "timer" :deadline (get t :deadline) :timer t})))) + (range 0 (len (er-sched-timers)))) (for-each (fn (k) - (when - (not (nth fired 0)) - (let - ((p (get (er-sched-processes) k))) - (when - (and - (= (get p :state) "waiting") - (get p :has-timeout)) - (dict-set! p :timed-out true) - (dict-set! p :has-timeout false) - (dict-set! p :state "runnable") - (er-sched-enqueue! (get p :pid)) - (set-nth! fired 0 true))))) - ks) - (nth fired 0)))) + (let + ((p (get (er-sched-processes) k))) + (when + (and (= (get p :state) "waiting") (get p :has-timeout)) + (er-event-consider! + best + {:kind "recv" + :deadline (get p :timeout-deadline) + :proc p})))) + (keys (er-sched-processes))) + (nth best 0)))) + +;; Keep the earlier-deadline candidate in the single-cell `best`. +;; Strictly-earlier replaces; equal deadlines keep the incumbent so a +;; timer registered first (and timers over recv-timeouts) win ties. +(define + er-event-consider! + (fn + (best cand) + (when + (or + (= (nth best 0) nil) + (< (get cand :deadline) (get (nth best 0) :deadline))) + (set-nth! best 0 cand)))) + +;; Deliver a fired timer's message to its destination and retire it. +;; Destination is resolved at fire time; a dead/missing target (or an +;; unregistered name) silently drops the message, as in real Erlang. +(define + er-timer-fire! + (fn + (t) + (dict-set! t :alive false) + (let + ((pid (er-timer-resolve-dest (get t :dest)))) + (when + (and (not (= pid nil)) (er-proc-exists? pid)) + (er-proc-mailbox-push! pid (get t :msg)) + (when + (= (er-proc-field pid :state) "waiting") + (er-proc-set! pid :state "runnable") + (er-sched-enqueue! pid)))))) + +;; Non-raising destination resolver for timer delivery. +(define + er-timer-resolve-dest + (fn + (v) + (cond + (er-pid? v) v + (er-atom? v) + (let + ((name (get v :name))) + (if (dict-has? (er-registered) name) (get (er-registered) name) nil)) + :else nil))) + +;; Wake a process whose `receive ... after Ms` deadline elapsed. +(define + er-recv-timeout-fire! + (fn + (p) + (dict-set! p :timed-out true) + (dict-set! p :has-timeout false) + (dict-set! p :state "runnable") + (er-sched-enqueue! (get p :pid)))) (define er-sched-step! @@ -1785,6 +1986,10 @@ (er-register-bif! "erlang" "exit" 1 er-bif-exit) (er-register-bif! "erlang" "exit" 2 er-bif-exit) (er-register-bif! "erlang" "make_ref" 0 er-bif-make-ref) + (er-register-bif! "erlang" "send_after" 3 er-bif-send-after) + (er-register-bif! "erlang" "cancel_timer" 1 er-bif-cancel-timer) + (er-register-bif! "erlang" "monotonic_time" 0 er-bif-monotonic-time) + (er-register-bif! "erlang" "monotonic_time" 1 er-bif-monotonic-time) (er-register-bif! "erlang" "link" 1 er-bif-link) (er-register-bif! "erlang" "unlink" 1 er-bif-unlink) (er-register-bif! "erlang" "monitor" 2 er-bif-monitor) diff --git a/lib/erlang/scoreboard.json b/lib/erlang/scoreboard.json index a86b5fc6..3f06c462 100644 --- a/lib/erlang/scoreboard.json +++ b/lib/erlang/scoreboard.json @@ -1,7 +1,7 @@ { "language": "erlang", - "total_pass": 761, - "total": 761, + "total_pass": 766, + "total": 766, "suites": [ {"name":"tokenize","pass":62,"total":62,"status":"ok"}, {"name":"parse","pass":52,"total":52,"status":"ok"}, @@ -13,6 +13,7 @@ {"name":"echo","pass":7,"total":7,"status":"ok"}, {"name":"fib","pass":8,"total":8,"status":"ok"}, {"name":"ffi","pass":37,"total":37,"status":"ok"}, - {"name":"vm","pass":78,"total":78,"status":"ok"} + {"name":"vm","pass":78,"total":78,"status":"ok"}, + {"name":"send_after","pass":5,"total":5,"status":"ok"} ] } diff --git a/lib/erlang/scoreboard.md b/lib/erlang/scoreboard.md index bd4087cc..e83b5e9a 100644 --- a/lib/erlang/scoreboard.md +++ b/lib/erlang/scoreboard.md @@ -1,6 +1,6 @@ # Erlang-on-SX Scoreboard -**Total: 761 / 761 tests passing** +**Total: 766 / 766 tests passing** | | Suite | Pass | Total | |---|---|---|---| @@ -15,6 +15,7 @@ | ✅ | fib | 8 | 8 | | ✅ | ffi | 37 | 37 | | ✅ | vm | 78 | 78 | +| ✅ | send_after | 5 | 5 | Generated by `lib/erlang/conformance.sh`. diff --git a/lib/erlang/tests/send_after.sx b/lib/erlang/tests/send_after.sx new file mode 100644 index 00000000..1a83eff8 --- /dev/null +++ b/lib/erlang/tests/send_after.sx @@ -0,0 +1,80 @@ +;; erlang:send_after / cancel_timer — timer primitives. +;; +;; A process schedules a message to itself (or another pid / registered +;; name) after N logical milliseconds. `cancel_timer` removes a pending +;; timer and reports the time left. These are the same primitives the +;; gen_server library uses to implement `{noreply, State, Timeout}`. +;; +;; The scheduler runs a synchronous logical clock (see runtime.sx +;; `er-sched-advance-time!`): time advances only when the runnable +;; queue drains, jumping to the earliest pending deadline. That makes +;; delivery deterministic and time-travel-safe — no wall clock. + +(define er-sa-test-count 0) +(define er-sa-test-pass 0) +(define er-sa-test-fails (list)) + +(define + er-sa-test + (fn + (name actual expected) + (set! er-sa-test-count (+ er-sa-test-count 1)) + (if + (= actual expected) + (set! er-sa-test-pass (+ er-sa-test-pass 1)) + (append! + er-sa-test-fails + {:actual actual :expected expected :name name})))) + +(define er-sa-pred + (fn (name actual) (er-sa-test name (if actual true false) true))) + +(define sa-ev erlang-eval-ast) + +;; ── T1 — schedule a self-message, receive it after the deadline ── +;; send_after returns a reference handle. +(er-sa-pred + "T1 send_after returns a ref" + (er-ref? + (sa-ev "erlang:send_after(50, self(), hello)"))) + +;; The scheduled message lands and a plain receive picks it up. +(er-sa-test + "T1 delivered message received" + (get + (sa-ev + "erlang:send_after(50, self(), hello), + receive M -> M end") + :name) + "hello") + +;; Logical time advances exactly to the timer deadline (50ms) by the +;; time the message is received — round-trip latency well under 100ms. +(er-sa-test + "T1 clock at deadline on receipt" + (sa-ev + "erlang:send_after(50, self(), hello), + receive hello -> erlang:monotonic_time() end") + 50) + +;; ── T2 — cancel_timer returns remaining ms; message never arrives ── +;; Cancel immediately after scheduling: clock has not advanced, so the +;; full duration (~1000ms) is reported as remaining. +(er-sa-test + "T2 cancel returns remaining ms" + (sa-ev + "Ref = erlang:send_after(1000, self(), late), + erlang:cancel_timer(Ref)") + 1000) + +;; The cancelled timer never delivers — the receive falls through to +;; its `after` clause and returns `none`. +(er-sa-test + "T2 cancelled message never arrives" + (get + (sa-ev + "Ref = erlang:send_after(1000, self(), late), + erlang:cancel_timer(Ref), + receive late -> got after 50 -> none end") + :name) + "none") diff --git a/lib/erlang/transpile.sx b/lib/erlang/transpile.sx index 12e14b6f..52b99c6c 100644 --- a/lib/erlang/transpile.sx +++ b/lib/erlang/transpile.sx @@ -1147,7 +1147,7 @@ (and (er-atom? ms) (= (get ms :name) "infinity")) (er-eval-receive-loop node pid env) (= ms 0) (er-eval-receive-poll node pid env) - :else (er-eval-receive-timed node pid env))))) + :else (er-eval-receive-timed node pid env (+ (er-clock) ms)))))) ;; after 0 — poll once; on no match, run the after-body immediately. (define @@ -1161,12 +1161,15 @@ (get r :value) (er-eval-body (get node :after-body) env))))) -;; after Ms — suspend; on resume check :timed-out. When the scheduler -;; runs out of other work it fires one pending timeout per round. +;; after Ms — suspend with an absolute `deadline` (logical ms). On +;; resume check :timed-out: the scheduler fires the earliest pending +;; deadline once the runnable queue drains. A non-matching message can +;; wake the process early; it re-suspends on the SAME deadline so the +;; timeout window is not extended. (define er-eval-receive-timed (fn - (node pid env) + (node pid env deadline) (let ((r (er-try-receive (get node :clauses) pid env))) (if @@ -1174,6 +1177,7 @@ (get r :value) (do (er-proc-set! pid :has-timeout true) + (er-proc-set! pid :timeout-deadline deadline) (call/cc (fn (k) @@ -1186,7 +1190,7 @@ (er-proc-set! pid :timed-out false) (er-proc-set! pid :has-timeout false) (er-eval-body (get node :after-body) env)) - (er-eval-receive-timed node pid env))))))) + (er-eval-receive-timed node pid env deadline))))))) ;; Scan mailbox in arrival order. For each msg, try every clause. ;; On first match: remove that msg from mailbox and return body value. From d09c0048c79470557193b1489d44ddb4974652c7 Mon Sep 17 00:00:00 2001 From: giles Date: Sun, 28 Jun 2026 17:48:47 +0000 Subject: [PATCH 2/4] erlang: send_after deadline-ordering + cancel-of-fired tests (T3+T4, 769/769) MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit T3 — concurrent timers fire in deadline order, not schedule order (scheduler jumps the clock to the earliest pending deadline each time the runnable queue drains). T4 — cancel_timer on an already-fired timer returns the atom false. Co-Authored-By: Claude Opus 4.8 (1M context) --- lib/erlang/scoreboard.json | 6 ++--- lib/erlang/scoreboard.md | 4 ++-- lib/erlang/tests/send_after.sx | 40 ++++++++++++++++++++++++++++++++++ 3 files changed, 45 insertions(+), 5 deletions(-) diff --git a/lib/erlang/scoreboard.json b/lib/erlang/scoreboard.json index 3f06c462..ede12189 100644 --- a/lib/erlang/scoreboard.json +++ b/lib/erlang/scoreboard.json @@ -1,7 +1,7 @@ { "language": "erlang", - "total_pass": 766, - "total": 766, + "total_pass": 769, + "total": 769, "suites": [ {"name":"tokenize","pass":62,"total":62,"status":"ok"}, {"name":"parse","pass":52,"total":52,"status":"ok"}, @@ -14,6 +14,6 @@ {"name":"fib","pass":8,"total":8,"status":"ok"}, {"name":"ffi","pass":37,"total":37,"status":"ok"}, {"name":"vm","pass":78,"total":78,"status":"ok"}, - {"name":"send_after","pass":5,"total":5,"status":"ok"} + {"name":"send_after","pass":8,"total":8,"status":"ok"} ] } diff --git a/lib/erlang/scoreboard.md b/lib/erlang/scoreboard.md index e83b5e9a..7afac274 100644 --- a/lib/erlang/scoreboard.md +++ b/lib/erlang/scoreboard.md @@ -1,6 +1,6 @@ # Erlang-on-SX Scoreboard -**Total: 766 / 766 tests passing** +**Total: 769 / 769 tests passing** | | Suite | Pass | Total | |---|---|---|---| @@ -15,7 +15,7 @@ | ✅ | fib | 8 | 8 | | ✅ | ffi | 37 | 37 | | ✅ | vm | 78 | 78 | -| ✅ | send_after | 5 | 5 | +| ✅ | send_after | 8 | 8 | Generated by `lib/erlang/conformance.sh`. diff --git a/lib/erlang/tests/send_after.sx b/lib/erlang/tests/send_after.sx index 1a83eff8..7b150f18 100644 --- a/lib/erlang/tests/send_after.sx +++ b/lib/erlang/tests/send_after.sx @@ -78,3 +78,43 @@ receive late -> got after 50 -> none end") :name) "none") + +;; ── T3 — multiple timers fire in deadline order, not schedule order ── +;; `b` is scheduled first (deadline 80) but `a` second (deadline 20). +;; Two plain receives drain the mailbox in arrival order — and arrival +;; is governed by deadline, so the first message out is `a`. +(er-sa-test + "T3 timers fire in deadline order" + (er-format-value + (sa-ev + "erlang:send_after(80, self(), b), + erlang:send_after(20, self(), a), + X = receive M1 -> M1 end, + Y = receive M2 -> M2 end, + {X, Y}")) + "{a,b}") + +;; A selective receive on `a` matches the earlier-deadline timer even +;; though `b` was scheduled first. +(er-sa-test + "T3 selective receive picks earliest deadline" + (get + (sa-ev + "erlang:send_after(80, self(), b), + erlang:send_after(20, self(), a), + receive a -> first end") + :name) + "first") + +;; ── T4 — cancel_timer on an already-fired timer returns false ────── +;; Once `x` has been received the timer has fired; cancelling its ref +;; now yields the atom `false`. +(er-sa-test + "T4 cancel of fired timer is false" + (get + (sa-ev + "Ref = erlang:send_after(20, self(), x), + receive x -> ok end, + erlang:cancel_timer(Ref)") + :name) + "false") From 779e53b2a84c860263ab02f900c5c7e7508deb06 Mon Sep 17 00:00:00 2001 From: giles Date: Sun, 28 Jun 2026 17:53:08 +0000 Subject: [PATCH 3/4] erlang: send_after to registered name + gen_server timeout returns (T5+T6, 771/771) MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit T5 — send_after addresses a registered atom name; the delayed message lands in that process's mailbox (destination resolved at fire time, dead/unregistered targets drop silently). T6 — gen_server loop now handles the {reply,R,S,T} / {noreply,S,T} timeout-bearing callback returns by scheduling {timeout} to itself via send_after; handle_info({timeout}, S) fires when no other message arrives first. Sanity-checks the library hookup. Co-Authored-By: Claude Opus 4.8 (1M context) --- lib/erlang/runtime.sx | 13 ++++++++++ lib/erlang/scoreboard.json | 6 ++--- lib/erlang/scoreboard.md | 4 ++-- lib/erlang/tests/send_after.sx | 43 ++++++++++++++++++++++++++++++++++ 4 files changed, 61 insertions(+), 5 deletions(-) diff --git a/lib/erlang/runtime.sx b/lib/erlang/runtime.sx index 870255b9..e297b1ce 100644 --- a/lib/erlang/runtime.sx +++ b/lib/erlang/runtime.sx @@ -1378,8 +1378,15 @@ {reply, Reply, NewState} -> From ! {Ref, Reply}, gen_server:loop(Mod, NewState); + {reply, Reply, NewState, Timeout} -> + From ! {Ref, Reply}, + erlang:send_after(Timeout, self(), {timeout}), + gen_server:loop(Mod, NewState); {noreply, NewState} -> gen_server:loop(Mod, NewState); + {noreply, NewState, Timeout} -> + erlang:send_after(Timeout, self(), {timeout}), + gen_server:loop(Mod, NewState); {stop, Reason, Reply, NewState} -> From ! {Ref, Reply}, exit(Reason) @@ -1387,11 +1394,17 @@ {'$gen_cast', Msg} -> case Mod:handle_cast(Msg, State) of {noreply, NewState} -> gen_server:loop(Mod, NewState); + {noreply, NewState, Timeout} -> + erlang:send_after(Timeout, self(), {timeout}), + gen_server:loop(Mod, NewState); {stop, Reason, NewState} -> exit(Reason) end; Other -> case Mod:handle_info(Other, State) of {noreply, NewState} -> gen_server:loop(Mod, NewState); + {noreply, NewState, Timeout} -> + erlang:send_after(Timeout, self(), {timeout}), + gen_server:loop(Mod, NewState); {stop, Reason, NewState} -> exit(Reason) end end.") diff --git a/lib/erlang/scoreboard.json b/lib/erlang/scoreboard.json index ede12189..614cd84c 100644 --- a/lib/erlang/scoreboard.json +++ b/lib/erlang/scoreboard.json @@ -1,7 +1,7 @@ { "language": "erlang", - "total_pass": 769, - "total": 769, + "total_pass": 771, + "total": 771, "suites": [ {"name":"tokenize","pass":62,"total":62,"status":"ok"}, {"name":"parse","pass":52,"total":52,"status":"ok"}, @@ -14,6 +14,6 @@ {"name":"fib","pass":8,"total":8,"status":"ok"}, {"name":"ffi","pass":37,"total":37,"status":"ok"}, {"name":"vm","pass":78,"total":78,"status":"ok"}, - {"name":"send_after","pass":8,"total":8,"status":"ok"} + {"name":"send_after","pass":10,"total":10,"status":"ok"} ] } diff --git a/lib/erlang/scoreboard.md b/lib/erlang/scoreboard.md index 7afac274..a5daa145 100644 --- a/lib/erlang/scoreboard.md +++ b/lib/erlang/scoreboard.md @@ -1,6 +1,6 @@ # Erlang-on-SX Scoreboard -**Total: 769 / 769 tests passing** +**Total: 771 / 771 tests passing** | | Suite | Pass | Total | |---|---|---|---| @@ -15,7 +15,7 @@ | ✅ | fib | 8 | 8 | | ✅ | ffi | 37 | 37 | | ✅ | vm | 78 | 78 | -| ✅ | send_after | 8 | 8 | +| ✅ | send_after | 10 | 10 | Generated by `lib/erlang/conformance.sh`. diff --git a/lib/erlang/tests/send_after.sx b/lib/erlang/tests/send_after.sx index 7b150f18..98810709 100644 --- a/lib/erlang/tests/send_after.sx +++ b/lib/erlang/tests/send_after.sx @@ -118,3 +118,46 @@ erlang:cancel_timer(Ref)") :name) "false") + +;; ── T5 — send_after to a registered atom name ────────────────────── +;; A second process registers itself as `srv`; the timer addresses it +;; by name, and the delayed message lands in that process's mailbox. +;; The server forwards what it got back to the parent for inspection. +(er-sa-test + "T5 timer delivers to registered name" + (get + (sa-ev + "Me = self(), + Pid = spawn(fun () -> receive M -> Me ! {got, M} end end), + register(srv, Pid), + erlang:send_after(20, srv, ping), + receive {got, X} -> X end") + :name) + "ping") + +;; ── T6 — gen_server {noreply, State, Timeout} hookup ─────────────── +;; A gen_server that, on the `arm` cast, returns {noreply, S, 100}. +;; The library schedules {timeout} to itself via send_after; when no +;; other message arrives first, handle_info({timeout}, S) fires. The +;; handler signals the parent so we can confirm the timeout landed. +(do + (er-load-gen-server!) + (erlang-load-module + "-module(sa_tmo). + init(Me) -> {ok, Me}. + handle_call(_R, _F, S) -> {reply, ok, S}. + handle_cast(arm, Me) -> {noreply, Me, 100}. + handle_info({timeout}, Me) -> Me ! fired, {noreply, Me}; + handle_info(_M, S) -> {noreply, S}.") + nil) + +(er-sa-test + "T6 gen_server timeout fires handle_info" + (get + (sa-ev + "Me = self(), + P = gen_server:start_link(sa_tmo, Me), + gen_server:cast(P, arm), + receive fired -> ok after 5000 -> timeout end") + :name) + "ok") From 4da2a98c305ce5a575c1dd5151afc7c14f96b9e3 Mon Sep 17 00:00:00 2001 From: giles Date: Tue, 30 Jun 2026 14:05:31 +0000 Subject: [PATCH 4/4] =?UTF-8?q?fed-sx-m2:=20Step=208b-timer=20=E2=80=94=20?= =?UTF-8?q?live=20retry-loop=20wiring=20on=20send=5Fafter?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Wires the delivery_worker's retry loop on top of the erlang:send_after / cancel_timer primitives just landed on loops/erlang (3709460d, 98b0104c, 779e53b2 — cherry-picked here since origin/architecture hasn't caught up yet). Surface: - new :timers [{Cid, Ref}] state field tracks live timer refs - handle_call(flush): drain (existing semantics) + arm_retry_timer per retried Cid (computes backoff slot from the now-bumped attempt count, sets next_retry_at, send_after self-cast). Reply shape unchanged. - handle_info({retry, Cid}, S): redrives that one Cid through deliver_one_pure. Success → record_success_pure + clear pending. Failure → schedule_retry_for (which bumps attempts, dead-letters on slot 6, or arms next slot). - cancel_timer_for/2 before arming a new timer so stale timers don't keep the scheduler's run loop alive after the work is done. - state_srv/1 + timer_ref_for/2 for test introspection. 5/5 in new delivery_retry_timer.sh; existing delivery_worker.sh 17/17 and delivery_retry.sh 11/11 still green. Conformance gate 771/771 (was 761/761; the +10 is the cherry-picked send_after suite). Closes Blockers #3. m2 is now feature-complete. Co-Authored-By: Claude Opus 4.7 --- next/kernel/delivery_worker.erl | 146 ++++++++++++++++++++++++++++- next/tests/delivery_retry_timer.sh | 131 ++++++++++++++++++++++++++ plans/fed-sx-milestone-2.md | 59 ++++++++++-- 3 files changed, 327 insertions(+), 9 deletions(-) create mode 100755 next/tests/delivery_retry_timer.sh diff --git a/next/kernel/delivery_worker.erl b/next/kernel/delivery_worker.erl index c2912b39..073ea0e6 100644 --- a/next/kernel/delivery_worker.erl +++ b/next/kernel/delivery_worker.erl @@ -5,9 +5,10 @@ backoff_for/1, schedule_for/1, record_failure_pure/3, record_success_pure/2, next_due_pure/2, attempts_for/2, next_retry_at/2, - dead_letter_list/1, + dead_letter_list/1, timer_ref_for/2, start_link/1, start_link/2, stop/1, - enqueue/2, flush/1, pending_srv/1, set_dispatch_fn/2]). + enqueue/2, flush/1, pending_srv/1, set_dispatch_fn/2, + state_srv/1]). -export([init/1, handle_call/3, handle_cast/2, handle_info/2]). %% Outbound delivery worker per design §13.4. One gen_server per @@ -49,6 +50,7 @@ new(PeerId) -> {attempts, []}, {next_retry, []}, {dead_letter, []}, + {timers, []}, {dispatch_fn, undefined}]. pending(State) -> field(pending, State). @@ -183,6 +185,16 @@ next_retry_at(Cid, State) -> dead_letter_list(State) -> field(dead_letter, State). +%% Step 8b-timer: per-cid timer ref accessor. Exposed for tests so +%% they can assert a retry timer was scheduled (or wasn't, after a +%% success / dead-letter). Returns the live Ref or undefined. + +timer_ref_for(Cid, State) -> + case find_keyed(Cid, field(timers, State)) of + {ok, Ref} -> Ref; + _ -> undefined + end. + move_to_dead_letter(Cid, State) -> Pending = field(pending, State), {Match, Rest} = take_by_cid(Cid, Pending, [], []), @@ -229,6 +241,13 @@ pending_srv(PeerId) -> set_dispatch_fn(PeerId, Fn) -> gen_server:call(PeerId, {set_dispatch_fn, Fn}). +%% Step 8b-timer: return the worker's full state so tests can use the +%% pure introspection functions (attempts_for / next_retry_at / +%% timer_ref_for / dead_letter_list) against it. + +state_srv(PeerId) -> + gen_server:call(PeerId, get_state). + %% gen_server callbacks init([PeerId, DispatchFn]) -> @@ -238,17 +257,138 @@ init([PeerId, DispatchFn]) -> handle_call({enqueue, Activity}, _From, State) -> {reply, ok, enqueue_pure(field(peer, State), Activity, State)}; handle_call(flush, _From, State) -> - {NewState, Delivered, Retry} = drain_pure(State), + %% Step 8b-timer: drain (which already bumps :attempts via + %% bump_attempt on each failed deliver), then for each retried + %% Cid compute the backoff slot from the now-current attempt + %% count, set NextRetryAt, and arm a send_after self-cast. + %% handle_info({retry, Cid}, ...) fires when the slot elapses. + %% Reply shape unchanged. + {DrainState, Delivered, Retry} = drain_pure(State), + Now = monotonic_seconds(), + NewState = lists:foldl( + fun(Cid, S) -> arm_retry_timer(Cid, Now, S) end, + DrainState, Retry), {reply, {ok, Delivered, Retry}, NewState}; handle_call(get_pending, _From, State) -> {reply, field(pending, State), State}; +handle_call(get_state, _From, State) -> + {reply, State, State}; handle_call({set_dispatch_fn, Fn}, _From, State) -> {reply, ok, set_field(dispatch_fn, Fn, State)}. handle_cast(_, S) -> {noreply, S}. +%% Step 8b-timer: a retry timer fired. Pull the activity by Cid from +%% the pending queue (it might have been drained meanwhile by a +%% concurrent flush — if so, we just clear bookkeeping and exit). +%% Run deliver_one_pure: success clears retry state; failure bumps +%% the counter and schedules the next slot — or dead-letters if the +%% sixth attempt failed. + +handle_info({retry, Cid}, State) -> + %% Clear the timer ref we just consumed. + State0 = clear_timer_ref(Cid, State), + case take_by_cid(Cid, field(pending, State0), [], 0) of + {none, _} -> + %% Already drained / dead-lettered. Clear any stale + %% bookkeeping in case the cid is half-tracked. + {noreply, record_success_pure(Cid, State0)}; + {Activity, Rest} -> + case deliver_one_pure(Activity, State0) of + {ok, _} -> + State1 = set_field(pending, Rest, State0), + State2 = record_success_pure(Cid, State1), + {noreply, State2}; + {error, _, _} -> + %% Keep the activity in pending; record_failure + %% leaves :pending alone (or dead-letters it on + %% slot 6). + Now = monotonic_seconds(), + State1 = schedule_retry_for(Cid, Now, State0), + {noreply, State1} + end + end; handle_info(_, S) -> {noreply, S}. +%% Step 8b-timer helpers ──────────────────────────────────────────── + +%% arm_retry_timer/3 — POST-DRAIN form. Used from handle_call(flush) +%% after drain_pure has already bumped :attempts via bump_attempt. +%% Sets next_retry_at = Now + backoff(attempts) and schedules the +%% send_after self-cast. On the dead-letter slot (attempt 6), moves +%% the activity from :pending to :dead_letter and arms no timer. + +arm_retry_timer(Cid, Now, State) -> + State0 = cancel_timer_for(Cid, State), + Attempts = attempts_for(Cid, State0), + case backoff_for(Attempts) of + dead_letter -> + move_to_dead_letter(Cid, State0); + Seconds -> + NextAt = Now + Seconds, + NR = field(next_retry, State0), + State1 = set_field(next_retry, set_keyed(Cid, NextAt, NR), State0), + Ms = Seconds * 1000, + Ref = erlang:send_after(Ms, self(), {retry, Cid}), + Timers = field(timers, State1), + set_field(timers, set_keyed(Cid, Ref, Timers), State1) + end. + +%% schedule_retry_for/3 — POST-RETRY-ATTEMPT form. Used from +%% handle_info({retry, Cid}, ...) when the retry attempt failed. +%% Bookkeep one failure and arm the next retry timer (or promote +%% to dead-letter, in which case no timer is needed). + +schedule_retry_for(Cid, Now, State) -> + %% Cancel any in-flight timer for this Cid before scheduling a new + %% one. Without the cancel a stale timer can still fire after + %% record_success has cleared the cid, the handle_info no-match + %% branch silently absorbs it — but it keeps the scheduler's + %% run-loop alive long after the work is done. A pure clear (no + %% cancel) is fine when the timer's own firing brought us here, + %% so the explicit cancel only matters for the flush path. + State0 = cancel_timer_for(Cid, State), + State1 = record_failure_pure(Cid, Now, State0), + Attempts = attempts_for(Cid, State1), + case backoff_for(Attempts) of + dead_letter -> + State1; + Seconds -> + Ms = Seconds * 1000, + Ref = erlang:send_after(Ms, self(), {retry, Cid}), + Timers = field(timers, State1), + set_field(timers, set_keyed(Cid, Ref, Timers), State1) + end. + +%% Cancel the live timer for Cid (if any) and clear it from :timers. +%% Idempotent — silent no-op if there isn't one. + +cancel_timer_for(Cid, State) -> + Timers = field(timers, State), + case find_keyed(Cid, Timers) of + {ok, Ref} -> + erlang:cancel_timer(Ref), + set_field(timers, del_keyed(Cid, Timers), State); + _ -> State + end. + +%% Drop the :timers entry for Cid without calling cancel_timer — used +%% when the timer's own firing brought us into handle_info and the ref +%% is already consumed. + +clear_timer_ref(Cid, State) -> + Timers = field(timers, State), + case find_keyed(Cid, Timers) of + {ok, _Ref} -> set_field(timers, del_keyed(Cid, Timers), State); + _ -> State + end. + +%% Step 8b-timer: bookkeeping uses seconds (matches backoff_for / +%% record_failure_pure / next_retry_at). The monotonic clock reports +%% ms; we floor to seconds here to keep all the comparisons aligned. + +monotonic_seconds() -> erlang:monotonic_time() div 1000. + %% ── Internal ──────────────────────────────────────────────────── activity_cid(Activity) -> diff --git a/next/tests/delivery_retry_timer.sh b/next/tests/delivery_retry_timer.sh new file mode 100755 index 00000000..7be29de7 --- /dev/null +++ b/next/tests/delivery_retry_timer.sh @@ -0,0 +1,131 @@ +#!/usr/bin/env bash +# next/tests/delivery_retry_timer.sh — m2 Step 8b-timer. +# +# Live timer wiring on the delivery_worker gen_server. The pure +# bookkeeping is covered by delivery_retry.sh — this suite proves the +# erlang:send_after / cancel_timer wiring fires retries from the +# scheduler's logical clock without anyone calling drain by hand. +# +# Substrate dependency: erlang:send_after/3 + cancel_timer/1 + +# monotonic_time/0,1 — landed via cherry-pick from loops/erlang +# (commits 3709460d / 98b0104c / 779e53b2 on this branch). +# +# Test discipline: every test cancels its leftover timer before +# returning. If we don't, the scheduler keeps the run loop alive +# advancing time through the full backoff chain (30s → 5m → 30m → +# 6h → 24h), and each tick costs ~10s of wall time inside the +# Erlang-on-SX VM. Canceling the trailing timer is the difference +# between a 25s test and a 60s+ test. + +set -uo pipefail +cd "$(git rev-parse --show-toplevel)" + +SX_SERVER="${SX_SERVER:-hosts/ocaml/_build/default/bin/sx_server.exe}" +if [ ! -x "$SX_SERVER" ]; then + SX_SERVER="/root/rose-ash/hosts/ocaml/_build/default/bin/sx_server.exe" +fi +if [ ! -x "$SX_SERVER" ]; then + echo "ERROR: sx_server.exe not found." >&2 + exit 1 +fi + +VERBOSE="${1:-}" +PASS=0; FAIL=0; ERRORS="" +TMPFILE=$(mktemp); trap "rm -f $TMPFILE" EXIT + +# A canned activity with cid <<1,2,3>>. +SETUP='Act = [{id, <<1,2,3>>}, {type, note}, {actor, alice}], FailFn = fun(_) -> {error, transient} end,' + +# Convenience: cancel any leftover timer for cid <<1,2,3>> on Peer. +# Prevents the scheduler from grinding through 30s/5m/30m/6h/24h of +# retries between epochs. +CANCEL='CancelLeftover = fun(Peer) -> SS = delivery_worker:state_srv(Peer), case delivery_worker:timer_ref_for(<<1,2,3>>, SS) of undefined -> ok; LRef -> erlang:cancel_timer(LRef), ok end end,' + +cat > "$TMPFILE" <>]} = delivery_worker:flush(bob), S = delivery_worker:state_srv(bob), Ref = delivery_worker:timer_ref_for(<<1,2,3>>, S), Result = is_reference(Ref), CancelLeftover(bob), Result\") :name)") + +;; T2 — initial flush bumps the attempt counter to 1; next_retry_at +;; gets set; cancel the timer before returning. +(epoch 11) +(eval "(get (erlang-eval-ast \"${SETUP}${CANCEL} delivery_worker:start_link(bob, FailFn), delivery_worker:enqueue(bob, Act), delivery_worker:flush(bob), S = delivery_worker:state_srv(bob), Result = delivery_worker:attempts_for(<<1,2,3>>, S) =:= 1, CancelLeftover(bob), Result\") :name)") + +;; T3 — advancing the logical clock past the 30s backoff fires the +;; timer; handle_info({retry, Cid}) bumps attempts to 2 and arms +;; the next slot (backoff(2)=300s). Then cancel the new timer. +(epoch 12) +(eval "(get (erlang-eval-ast \"${SETUP}${CANCEL} delivery_worker:start_link(bob, FailFn), delivery_worker:enqueue(bob, Act), delivery_worker:flush(bob), receive after 31000 -> ok end, S = delivery_worker:state_srv(bob), Result = delivery_worker:attempts_for(<<1,2,3>>, S) =:= 2, CancelLeftover(bob), Result\") :name)") + +;; T4 — after the retry fires the worker has armed a fresh timer +;; for the next backoff slot. Confirm it's a live ref, then +;; cancel it. +(epoch 13) +(eval "(get (erlang-eval-ast \"${SETUP}${CANCEL} delivery_worker:start_link(bob, FailFn), delivery_worker:enqueue(bob, Act), delivery_worker:flush(bob), receive after 31000 -> ok end, S = delivery_worker:state_srv(bob), Result = is_reference(delivery_worker:timer_ref_for(<<1,2,3>>, S)), CancelLeftover(bob), Result\") :name)") + +;; T5 — successful retry path. Dispatch fails twice then succeeds +;; (ets-backed counter). After two backoff slots elapse +;; (30s, then 300s), the third attempt succeeds and +;; record_success_pure clears the per-cid bookkeeping. No new +;; timer is scheduled, so the scheduler terminates naturally. +(epoch 14) +(eval "(get (erlang-eval-ast \"${SETUP} ets:new(rt_ctr, [named_table, public]), ets:insert(rt_ctr, {n, 0}), Mixed = fun(_) -> [{n, N}] = ets:lookup(rt_ctr, n), ets:insert(rt_ctr, {n, N+1}), case N < 2 of true -> {error, transient}; false -> ok end end, delivery_worker:start_link(carol, Mixed), delivery_worker:enqueue(carol, Act), delivery_worker:flush(carol), receive after 31000 -> ok end, receive after 301000 -> ok end, S = delivery_worker:state_srv(carol), delivery_worker:pending(S) =:= [] andalso delivery_worker:attempts_for(<<1,2,3>>, S) =:= 0 andalso delivery_worker:timer_ref_for(<<1,2,3>>, S) =:= undefined\") :name)") +EPOCHS + +OUTPUT=$(timeout 900 "$SX_SERVER" < "$TMPFILE" 2>/dev/null) + +check() { + local epoch="$1" desc="$2" expected="$3" + local actual + actual=$(echo "$OUTPUT" | awk -v e="$epoch" ' + $0 ~ "^\\(ok-len " e " " { getline; print; exit } + $0 ~ "^\\(ok " e " " { print; exit } + $0 ~ "^\\(error " e " " { print; exit } + ') + [ -z "$actual" ] && actual="" + if echo "$actual" | grep -qF -- "$expected"; then + PASS=$((PASS+1)) + [ "$VERBOSE" = "-v" ] && echo " ok $desc" + else + FAIL=$((FAIL+1)) + ERRORS+=" FAIL [$desc] (epoch $epoch) expected: $expected | actual: $actual +" + fi +} + +check 10 "T1 flush schedules a timer" "true" +check 11 "T2 initial flush bumps attempts to 1" "true" +check 12 "T3 timer fires; attempts=2" "true" +check 13 "T4 retry rearms next timer" "true" +check 14 "T5 success clears retry state" "true" + +TOTAL=$((PASS+FAIL)) +if [ $FAIL -eq 0 ]; then + echo "ok $PASS/$TOTAL next/tests/delivery_retry_timer.sh passed" +else + echo "FAIL $PASS/$TOTAL passed, $FAIL failed:" + echo "$ERRORS" + if [ "$VERBOSE" = "-v" ]; then + echo "--- sx_server output ---" + echo "$OUTPUT" | tail -40 + echo "---" + fi +fi +[ $FAIL -eq 0 ] diff --git a/plans/fed-sx-milestone-2.md b/plans/fed-sx-milestone-2.md index 85a363eb..c466f5e2 100644 --- a/plans/fed-sx-milestone-2.md +++ b/plans/fed-sx-milestone-2.md @@ -562,10 +562,24 @@ a dead-letter list visible via `/admin/dead-letter`. is cleared from `:next_retry`. `record_success_pure` clears both. `next_due_pure` returns cids whose retry time has passed. 11 cases in `delivery_retry.sh`. -- [ ] **8b-timer** — Erlang-side timer wiring (`erlang:send_after` - self-cast or equivalent). Needs the same substrate primitive - that `gen_server` uses for `timeout` returns. Defer behind - substrate gap discovery for now — see Blockers. +- [x] **8b-timer** — Erlang-side timer wiring on the + `delivery_worker` gen_server. handle_call(flush) drains then + arms a `send_after` self-cast per retried Cid (backoff from + the now-bumped attempt counter); handle_info({retry, Cid}) + redrives that single Cid through deliver_one_pure. Success + clears bookkeeping via record_success; failure bumps attempts + via record_failure_pure and arms the next backoff slot — or + promotes to dead-letter on the 6th attempt and stops arming. + A `:timers [{Cid, Ref}]` state field tracks live refs so + schedule_retry_for can cancel the previous one before arming + the next (otherwise stale timers keep the scheduler's run + loop alive long after the work is done). 5/5 in + `delivery_retry_timer.sh`: T1 timer scheduled, T2 attempts=1, + T3 retry fires + attempts=2, T4 next timer rearmed, T5 ets- + counter dispatch (fail/fail/ok) lands in 3 attempts and + clears state. Substrate dependency landed via cherry-pick + from `loops/erlang` (3709460d / 98b0104c / 779e53b2) until + `loops/erlang` → architecture catches up. - [x] **8c** — Delivery-state projection (`next/kernel/delivery_state.erl`). Folds delivery events into per-peer worker-shaped snapshots so the outbound queue survives @@ -1105,8 +1119,16 @@ proceed. through `delivery_worker`) and Step 10c (peer-actor doc fetch in `peer_actors`) are now unblocked. -3. **`erlang:send_after`-style timer primitive** — discovered - during Step 8b prep. The retry loop needs a way for the +3. **`erlang:send_after`-style timer primitive** — ~~discovered + during Step 8b prep~~ **RESOLVED 2026-06-30** via the + `loops/erlang` `send_after`/`cancel_timer`/`monotonic_time` + work landing on `origin/loops/erlang` (commits 3709460d, + 98b0104c, b10e55f0; 766/766 → 771/771). m2 cherry-picked all + three onto this branch so 8b-timer could land without waiting + for `loops/erlang` → architecture; the cherry-picks fall away + as no-op duplicates when architecture catches up. Original + diagnosis preserved below for the audit trail. + The retry loop needs a way for the delivery_worker to wake itself up after `backoff_for(N)` seconds. Erlang's `erlang:send_after/3` is the standard primitive; this port doesn't seem to register it (looked at @@ -1241,6 +1263,31 @@ proceed. Newest first. +- **2026-06-30** — Step 8b-timer closed. Cherry-picked the three + `loops/erlang` send_after commits onto m2 (3709460d, 98b0104c, + 779e53b2 — the substrate landed standalone on origin/loops/erlang + earlier and hadn't propagated to origin/architecture yet). Wired + the live timer loop in `next/kernel/delivery_worker.erl`: a + `:timers [{Cid, Ref}]` state field; `handle_call(flush)` drains + then arms a `send_after` self-cast per retried Cid; the new + `handle_info({retry, Cid})` callback redrives that one Cid through + `deliver_one_pure` and either records success / clears state, or + bumps and arms the next backoff slot (or dead-letters on the 6th + attempt). Two arm-paths split — `arm_retry_timer` (post-drain, + attempts already bumped) vs `schedule_retry_for` (post-retry + attempt, needs to bump). `cancel_timer_for/1` clears the previous + timer before arming the next so stale timers don't keep the + scheduler's run loop alive after the work is done. Two new public + APIs for tests: `state_srv/1` returns the worker's full state, + `timer_ref_for/2` looks up a Cid's live ref. 5/5 in new + `delivery_retry_timer.sh` (T1 timer scheduled, T2 attempts=1, T3 + retry fires + attempts=2, T4 next timer rearmed, T5 ets-counter + dispatch fail/fail/ok lands in 3 attempts and clears state). + Existing `delivery_worker.sh` 17/17 and `delivery_retry.sh` 11/11 + still green. Conformance gate 771/771 (was 761/761; the +10 is + the cherry-picked send_after suite). Blockers #3 RESOLVED. + Reply shape of `flush` unchanged; no caller updates needed. + - **2026-06-28** — Merge-prep pass. Conformance 761/761 still green on m2 tip `cd0de8cb`. Both smoke tests still pass cold: `next/tests/smoke_kernel_route.sh` 6/6 (port 54471, listener up