Compare commits
4 Commits
loops/blog
...
loops/fed-
| Author | SHA1 | Date | |
|---|---|---|---|
| 4da2a98c30 | |||
| 779e53b2a8 | |||
| d09c0048c7 | |||
| 3dbb3e318a |
@@ -38,6 +38,7 @@ SUITES=(
|
||||
"fib|er-fib-test-pass|er-fib-test-count"
|
||||
"ffi|er-ffi-test-pass|er-ffi-test-count"
|
||||
"vm|er-vm-test-pass|er-vm-test-count"
|
||||
"send_after|er-sa-test-pass|er-sa-test-count"
|
||||
)
|
||||
|
||||
cat > "$TMPFILE" << 'EPOCHS'
|
||||
@@ -61,6 +62,7 @@ cat > "$TMPFILE" << 'EPOCHS'
|
||||
(load "lib/erlang/vm/dispatcher.sx")
|
||||
(load "lib/erlang/tests/ffi.sx")
|
||||
(load "lib/erlang/tests/vm.sx")
|
||||
(load "lib/erlang/tests/send_after.sx")
|
||||
(epoch 100)
|
||||
(eval "(list er-test-pass er-test-count)")
|
||||
(epoch 101)
|
||||
@@ -83,6 +85,8 @@ cat > "$TMPFILE" << 'EPOCHS'
|
||||
(eval "(list er-ffi-test-pass er-ffi-test-count)")
|
||||
(epoch 110)
|
||||
(eval "(list er-vm-test-pass er-vm-test-count)")
|
||||
(epoch 111)
|
||||
(eval "(list er-sa-test-pass er-sa-test-count)")
|
||||
EPOCHS
|
||||
|
||||
timeout 600 "$SX_SERVER" < "$TMPFILE" > "$OUTFILE" 2>&1
|
||||
|
||||
@@ -135,6 +135,56 @@
|
||||
(dict-set! s :next-ref (+ n 1))
|
||||
(er-mk-ref n)))))
|
||||
|
||||
;; ── logical clock + timer wheel ──────────────────────────────────
|
||||
;; The scheduler runs a synchronous model: logical time advances only
|
||||
;; when the runnable queue drains (see `er-sched-advance-time!`). The
|
||||
;; clock is in milliseconds, monotonic, never derived from wall time
|
||||
;; — deterministic and time-travel-safe. `send_after` schedules a
|
||||
;; message-delivery event at an absolute deadline; `receive after Ms`
|
||||
;; schedules a timeout event the same way. When no process is runnable
|
||||
;; the scheduler jumps the clock to the earliest pending deadline and
|
||||
;; fires that single event, then re-runs.
|
||||
(define er-clock (fn () (get (er-sched) :clock)))
|
||||
|
||||
;; Advance the clock to `ms`, but never backwards (monotonicity).
|
||||
(define
|
||||
er-clock-set!
|
||||
(fn (ms) (dict-set! (er-sched) :clock (max (er-clock) ms))))
|
||||
|
||||
(define er-sched-timers (fn () (get (er-sched) :timers)))
|
||||
|
||||
;; Register a timer event. `dest` is a pid or registered-atom value,
|
||||
;; resolved to a live process at fire time. Returns the timer ref.
|
||||
(define
|
||||
er-timer-add!
|
||||
(fn
|
||||
(deadline dest msg ref)
|
||||
(append!
|
||||
(er-sched-timers)
|
||||
{:ref ref :deadline deadline :dest dest :msg msg :alive true})
|
||||
ref))
|
||||
|
||||
;; Find the live timer with the given ref, or nil.
|
||||
(define
|
||||
er-timer-find-alive
|
||||
(fn
|
||||
(ref)
|
||||
(let
|
||||
((ts (er-sched-timers)) (found (list nil)))
|
||||
(for-each
|
||||
(fn
|
||||
(i)
|
||||
(let
|
||||
((t (nth ts i)))
|
||||
(when
|
||||
(and
|
||||
(= (nth found 0) nil)
|
||||
(get t :alive)
|
||||
(er-ref-equal? (get t :ref) ref))
|
||||
(set-nth! found 0 t))))
|
||||
(range 0 (len ts)))
|
||||
(nth found 0))))
|
||||
|
||||
;; ── scheduler state ──────────────────────────────────────────────
|
||||
(define er-scheduler (list nil))
|
||||
|
||||
@@ -151,6 +201,8 @@
|
||||
:processes {}
|
||||
:registered {}
|
||||
:ets {}
|
||||
:clock 0
|
||||
:timers (list)
|
||||
:runnable (er-q-new)})))
|
||||
|
||||
(define er-sched (fn () (nth er-scheduler 0)))
|
||||
@@ -217,6 +269,7 @@
|
||||
:trap-exit false
|
||||
:has-timeout false
|
||||
:timed-out false
|
||||
:timeout-deadline nil
|
||||
:exit-reason nil}))
|
||||
(dict-set! (er-sched-processes) (er-pid-key pid) proc)
|
||||
(er-sched-enqueue! pid)
|
||||
@@ -456,6 +509,69 @@
|
||||
(error "Erlang: make_ref/0: arity")
|
||||
(er-ref-new!))))
|
||||
|
||||
;; ── timer BIFs ───────────────────────────────────────────────────
|
||||
;; erlang:send_after(Time, Dest, Msg) -> Ref
|
||||
;; Schedules Msg to be delivered to Dest after Time ms (logical).
|
||||
;; Time must be a non-negative integer; Dest a pid or registered
|
||||
;; atom name. Returns a fresh timer reference.
|
||||
(define
|
||||
er-bif-send-after
|
||||
(fn
|
||||
(vs)
|
||||
(let
|
||||
((time (nth vs 0)) (dest (nth vs 1)) (msg (nth vs 2)))
|
||||
(cond
|
||||
(not (and (= (type-of time) "number") (>= time 0)))
|
||||
(raise (er-mk-error-marker (er-mk-atom "badarg")))
|
||||
(not (or (er-pid? dest) (er-atom? dest)))
|
||||
(raise (er-mk-error-marker (er-mk-atom "badarg")))
|
||||
:else
|
||||
(er-timer-add!
|
||||
(+ (er-clock) (truncate time))
|
||||
dest
|
||||
msg
|
||||
(er-ref-new!))))))
|
||||
|
||||
;; erlang:cancel_timer(Ref) -> RemainingMs | false
|
||||
;; For a live (not-yet-fired) timer, marks it cancelled and returns
|
||||
;; the milliseconds left until its deadline. For an already-fired,
|
||||
;; already-cancelled, or unknown ref, returns the atom `false`.
|
||||
(define
|
||||
er-bif-cancel-timer
|
||||
(fn
|
||||
(vs)
|
||||
(let
|
||||
((ref (nth vs 0)))
|
||||
(cond
|
||||
(not (er-ref? ref))
|
||||
(raise (er-mk-error-marker (er-mk-atom "badarg")))
|
||||
:else
|
||||
(let
|
||||
((t (er-timer-find-alive ref)))
|
||||
(cond
|
||||
(= t nil) (er-mk-atom "false")
|
||||
:else (do
|
||||
(dict-set! t :alive false)
|
||||
(max 0 (- (get t :deadline) (er-clock))))))))))
|
||||
|
||||
;; erlang:monotonic_time() | erlang:monotonic_time(Unit) -> Integer
|
||||
;; Returns the scheduler's logical monotonic clock in milliseconds.
|
||||
;; Unit (millisecond / second / native) is accepted for API
|
||||
;; compatibility; all units report from the same ms-resolution clock.
|
||||
(define
|
||||
er-bif-monotonic-time
|
||||
(fn
|
||||
(vs)
|
||||
(cond
|
||||
(= (len vs) 0) (er-clock)
|
||||
(and (= (len vs) 1) (er-atom? (nth vs 0)))
|
||||
(let
|
||||
((unit (get (nth vs 0) :name)))
|
||||
(cond
|
||||
(= unit "second") (truncate (/ (er-clock) 1000))
|
||||
:else (er-clock)))
|
||||
:else (raise (er-mk-error-marker (er-mk-atom "badarg"))))))
|
||||
|
||||
;; Add `target` to `pid`'s :links list if not already there.
|
||||
(define
|
||||
er-link-add-one!
|
||||
@@ -664,37 +780,122 @@
|
||||
(cond
|
||||
(not (= pid nil))
|
||||
(do (er-sched-step! pid) (er-sched-run-all!))
|
||||
;; Queue empty — fire one pending receive-with-timeout and go again.
|
||||
(er-sched-fire-one-timeout!) (er-sched-run-all!)
|
||||
;; Queue empty — advance logical time to the next pending
|
||||
;; deadline (timer delivery or receive-timeout) and go again.
|
||||
(er-sched-advance-time!) (er-sched-run-all!)
|
||||
:else nil))))
|
||||
|
||||
;; Wake one waiting process whose receive had an `after Ms` clause.
|
||||
;; Returns true if one fired. In our synchronous model "time passes"
|
||||
;; once the runnable queue drains — timeouts only fire then.
|
||||
;; ── time advance ─────────────────────────────────────────────────
|
||||
;; Called when the runnable queue is empty. Two kinds of pending event
|
||||
;; carry a deadline: live `send_after` timers and waiting processes in
|
||||
;; a `receive ... after Ms` block. Find the single earliest deadline
|
||||
;; across both, jump the clock to it, and fire just that one event
|
||||
;; (timer wins ties — a message delivered exactly at the timeout
|
||||
;; arrives "first"). Returns true if an event fired, false when there
|
||||
;; is nothing left to wake (genuine idle / termination).
|
||||
(define
|
||||
er-sched-fire-one-timeout!
|
||||
er-sched-advance-time!
|
||||
(fn
|
||||
()
|
||||
(let
|
||||
((ks (keys (er-sched-processes))) (fired (list false)))
|
||||
((best (er-sched-next-event)))
|
||||
(cond
|
||||
(= best nil) false
|
||||
:else (do
|
||||
(er-clock-set! (get best :deadline))
|
||||
(cond
|
||||
(= (get best :kind) "timer")
|
||||
(er-timer-fire! (get best :timer))
|
||||
:else (er-recv-timeout-fire! (get best :proc)))
|
||||
true)))))
|
||||
|
||||
;; Scan timers and waiting-with-timeout processes for the earliest
|
||||
;; deadline. Returns {:kind "timer"|"recv" :deadline D ...} or nil.
|
||||
(define
|
||||
er-sched-next-event
|
||||
(fn
|
||||
()
|
||||
(let
|
||||
((best (list nil)))
|
||||
(for-each
|
||||
(fn
|
||||
(i)
|
||||
(let
|
||||
((t (nth (er-sched-timers) i)))
|
||||
(when
|
||||
(get t :alive)
|
||||
(er-event-consider!
|
||||
best
|
||||
{:kind "timer" :deadline (get t :deadline) :timer t}))))
|
||||
(range 0 (len (er-sched-timers))))
|
||||
(for-each
|
||||
(fn
|
||||
(k)
|
||||
(when
|
||||
(not (nth fired 0))
|
||||
(let
|
||||
((p (get (er-sched-processes) k)))
|
||||
(when
|
||||
(and
|
||||
(= (get p :state) "waiting")
|
||||
(get p :has-timeout))
|
||||
(dict-set! p :timed-out true)
|
||||
(dict-set! p :has-timeout false)
|
||||
(dict-set! p :state "runnable")
|
||||
(er-sched-enqueue! (get p :pid))
|
||||
(set-nth! fired 0 true)))))
|
||||
ks)
|
||||
(nth fired 0))))
|
||||
(let
|
||||
((p (get (er-sched-processes) k)))
|
||||
(when
|
||||
(and (= (get p :state) "waiting") (get p :has-timeout))
|
||||
(er-event-consider!
|
||||
best
|
||||
{:kind "recv"
|
||||
:deadline (get p :timeout-deadline)
|
||||
:proc p}))))
|
||||
(keys (er-sched-processes)))
|
||||
(nth best 0))))
|
||||
|
||||
;; Keep the earlier-deadline candidate in the single-cell `best`.
|
||||
;; Strictly-earlier replaces; equal deadlines keep the incumbent so a
|
||||
;; timer registered first (and timers over recv-timeouts) win ties.
|
||||
(define
|
||||
er-event-consider!
|
||||
(fn
|
||||
(best cand)
|
||||
(when
|
||||
(or
|
||||
(= (nth best 0) nil)
|
||||
(< (get cand :deadline) (get (nth best 0) :deadline)))
|
||||
(set-nth! best 0 cand))))
|
||||
|
||||
;; Deliver a fired timer's message to its destination and retire it.
|
||||
;; Destination is resolved at fire time; a dead/missing target (or an
|
||||
;; unregistered name) silently drops the message, as in real Erlang.
|
||||
(define
|
||||
er-timer-fire!
|
||||
(fn
|
||||
(t)
|
||||
(dict-set! t :alive false)
|
||||
(let
|
||||
((pid (er-timer-resolve-dest (get t :dest))))
|
||||
(when
|
||||
(and (not (= pid nil)) (er-proc-exists? pid))
|
||||
(er-proc-mailbox-push! pid (get t :msg))
|
||||
(when
|
||||
(= (er-proc-field pid :state) "waiting")
|
||||
(er-proc-set! pid :state "runnable")
|
||||
(er-sched-enqueue! pid))))))
|
||||
|
||||
;; Non-raising destination resolver for timer delivery.
|
||||
(define
|
||||
er-timer-resolve-dest
|
||||
(fn
|
||||
(v)
|
||||
(cond
|
||||
(er-pid? v) v
|
||||
(er-atom? v)
|
||||
(let
|
||||
((name (get v :name)))
|
||||
(if (dict-has? (er-registered) name) (get (er-registered) name) nil))
|
||||
:else nil)))
|
||||
|
||||
;; Wake a process whose `receive ... after Ms` deadline elapsed.
|
||||
(define
|
||||
er-recv-timeout-fire!
|
||||
(fn
|
||||
(p)
|
||||
(dict-set! p :timed-out true)
|
||||
(dict-set! p :has-timeout false)
|
||||
(dict-set! p :state "runnable")
|
||||
(er-sched-enqueue! (get p :pid))))
|
||||
|
||||
(define
|
||||
er-sched-step!
|
||||
@@ -1177,8 +1378,15 @@
|
||||
{reply, Reply, NewState} ->
|
||||
From ! {Ref, Reply},
|
||||
gen_server:loop(Mod, NewState);
|
||||
{reply, Reply, NewState, Timeout} ->
|
||||
From ! {Ref, Reply},
|
||||
erlang:send_after(Timeout, self(), {timeout}),
|
||||
gen_server:loop(Mod, NewState);
|
||||
{noreply, NewState} ->
|
||||
gen_server:loop(Mod, NewState);
|
||||
{noreply, NewState, Timeout} ->
|
||||
erlang:send_after(Timeout, self(), {timeout}),
|
||||
gen_server:loop(Mod, NewState);
|
||||
{stop, Reason, Reply, NewState} ->
|
||||
From ! {Ref, Reply},
|
||||
exit(Reason)
|
||||
@@ -1186,11 +1394,17 @@
|
||||
{'$gen_cast', Msg} ->
|
||||
case Mod:handle_cast(Msg, State) of
|
||||
{noreply, NewState} -> gen_server:loop(Mod, NewState);
|
||||
{noreply, NewState, Timeout} ->
|
||||
erlang:send_after(Timeout, self(), {timeout}),
|
||||
gen_server:loop(Mod, NewState);
|
||||
{stop, Reason, NewState} -> exit(Reason)
|
||||
end;
|
||||
Other ->
|
||||
case Mod:handle_info(Other, State) of
|
||||
{noreply, NewState} -> gen_server:loop(Mod, NewState);
|
||||
{noreply, NewState, Timeout} ->
|
||||
erlang:send_after(Timeout, self(), {timeout}),
|
||||
gen_server:loop(Mod, NewState);
|
||||
{stop, Reason, NewState} -> exit(Reason)
|
||||
end
|
||||
end.")
|
||||
@@ -1785,6 +1999,10 @@
|
||||
(er-register-bif! "erlang" "exit" 1 er-bif-exit)
|
||||
(er-register-bif! "erlang" "exit" 2 er-bif-exit)
|
||||
(er-register-bif! "erlang" "make_ref" 0 er-bif-make-ref)
|
||||
(er-register-bif! "erlang" "send_after" 3 er-bif-send-after)
|
||||
(er-register-bif! "erlang" "cancel_timer" 1 er-bif-cancel-timer)
|
||||
(er-register-bif! "erlang" "monotonic_time" 0 er-bif-monotonic-time)
|
||||
(er-register-bif! "erlang" "monotonic_time" 1 er-bif-monotonic-time)
|
||||
(er-register-bif! "erlang" "link" 1 er-bif-link)
|
||||
(er-register-bif! "erlang" "unlink" 1 er-bif-unlink)
|
||||
(er-register-bif! "erlang" "monitor" 2 er-bif-monitor)
|
||||
|
||||
@@ -1,7 +1,7 @@
|
||||
{
|
||||
"language": "erlang",
|
||||
"total_pass": 761,
|
||||
"total": 761,
|
||||
"total_pass": 771,
|
||||
"total": 771,
|
||||
"suites": [
|
||||
{"name":"tokenize","pass":62,"total":62,"status":"ok"},
|
||||
{"name":"parse","pass":52,"total":52,"status":"ok"},
|
||||
@@ -13,6 +13,7 @@
|
||||
{"name":"echo","pass":7,"total":7,"status":"ok"},
|
||||
{"name":"fib","pass":8,"total":8,"status":"ok"},
|
||||
{"name":"ffi","pass":37,"total":37,"status":"ok"},
|
||||
{"name":"vm","pass":78,"total":78,"status":"ok"}
|
||||
{"name":"vm","pass":78,"total":78,"status":"ok"},
|
||||
{"name":"send_after","pass":10,"total":10,"status":"ok"}
|
||||
]
|
||||
}
|
||||
|
||||
@@ -1,6 +1,6 @@
|
||||
# Erlang-on-SX Scoreboard
|
||||
|
||||
**Total: 761 / 761 tests passing**
|
||||
**Total: 771 / 771 tests passing**
|
||||
|
||||
| | Suite | Pass | Total |
|
||||
|---|---|---|---|
|
||||
@@ -15,6 +15,7 @@
|
||||
| ✅ | fib | 8 | 8 |
|
||||
| ✅ | ffi | 37 | 37 |
|
||||
| ✅ | vm | 78 | 78 |
|
||||
| ✅ | send_after | 10 | 10 |
|
||||
|
||||
|
||||
Generated by `lib/erlang/conformance.sh`.
|
||||
|
||||
163
lib/erlang/tests/send_after.sx
Normal file
163
lib/erlang/tests/send_after.sx
Normal file
@@ -0,0 +1,163 @@
|
||||
;; erlang:send_after / cancel_timer — timer primitives.
|
||||
;;
|
||||
;; A process schedules a message to itself (or another pid / registered
|
||||
;; name) after N logical milliseconds. `cancel_timer` removes a pending
|
||||
;; timer and reports the time left. These are the same primitives the
|
||||
;; gen_server library uses to implement `{noreply, State, Timeout}`.
|
||||
;;
|
||||
;; The scheduler runs a synchronous logical clock (see runtime.sx
|
||||
;; `er-sched-advance-time!`): time advances only when the runnable
|
||||
;; queue drains, jumping to the earliest pending deadline. That makes
|
||||
;; delivery deterministic and time-travel-safe — no wall clock.
|
||||
|
||||
(define er-sa-test-count 0)
|
||||
(define er-sa-test-pass 0)
|
||||
(define er-sa-test-fails (list))
|
||||
|
||||
(define
|
||||
er-sa-test
|
||||
(fn
|
||||
(name actual expected)
|
||||
(set! er-sa-test-count (+ er-sa-test-count 1))
|
||||
(if
|
||||
(= actual expected)
|
||||
(set! er-sa-test-pass (+ er-sa-test-pass 1))
|
||||
(append!
|
||||
er-sa-test-fails
|
||||
{:actual actual :expected expected :name name}))))
|
||||
|
||||
(define er-sa-pred
|
||||
(fn (name actual) (er-sa-test name (if actual true false) true)))
|
||||
|
||||
(define sa-ev erlang-eval-ast)
|
||||
|
||||
;; ── T1 — schedule a self-message, receive it after the deadline ──
|
||||
;; send_after returns a reference handle.
|
||||
(er-sa-pred
|
||||
"T1 send_after returns a ref"
|
||||
(er-ref?
|
||||
(sa-ev "erlang:send_after(50, self(), hello)")))
|
||||
|
||||
;; The scheduled message lands and a plain receive picks it up.
|
||||
(er-sa-test
|
||||
"T1 delivered message received"
|
||||
(get
|
||||
(sa-ev
|
||||
"erlang:send_after(50, self(), hello),
|
||||
receive M -> M end")
|
||||
:name)
|
||||
"hello")
|
||||
|
||||
;; Logical time advances exactly to the timer deadline (50ms) by the
|
||||
;; time the message is received — round-trip latency well under 100ms.
|
||||
(er-sa-test
|
||||
"T1 clock at deadline on receipt"
|
||||
(sa-ev
|
||||
"erlang:send_after(50, self(), hello),
|
||||
receive hello -> erlang:monotonic_time() end")
|
||||
50)
|
||||
|
||||
;; ── T2 — cancel_timer returns remaining ms; message never arrives ──
|
||||
;; Cancel immediately after scheduling: clock has not advanced, so the
|
||||
;; full duration (~1000ms) is reported as remaining.
|
||||
(er-sa-test
|
||||
"T2 cancel returns remaining ms"
|
||||
(sa-ev
|
||||
"Ref = erlang:send_after(1000, self(), late),
|
||||
erlang:cancel_timer(Ref)")
|
||||
1000)
|
||||
|
||||
;; The cancelled timer never delivers — the receive falls through to
|
||||
;; its `after` clause and returns `none`.
|
||||
(er-sa-test
|
||||
"T2 cancelled message never arrives"
|
||||
(get
|
||||
(sa-ev
|
||||
"Ref = erlang:send_after(1000, self(), late),
|
||||
erlang:cancel_timer(Ref),
|
||||
receive late -> got after 50 -> none end")
|
||||
:name)
|
||||
"none")
|
||||
|
||||
;; ── T3 — multiple timers fire in deadline order, not schedule order ──
|
||||
;; `b` is scheduled first (deadline 80) but `a` second (deadline 20).
|
||||
;; Two plain receives drain the mailbox in arrival order — and arrival
|
||||
;; is governed by deadline, so the first message out is `a`.
|
||||
(er-sa-test
|
||||
"T3 timers fire in deadline order"
|
||||
(er-format-value
|
||||
(sa-ev
|
||||
"erlang:send_after(80, self(), b),
|
||||
erlang:send_after(20, self(), a),
|
||||
X = receive M1 -> M1 end,
|
||||
Y = receive M2 -> M2 end,
|
||||
{X, Y}"))
|
||||
"{a,b}")
|
||||
|
||||
;; A selective receive on `a` matches the earlier-deadline timer even
|
||||
;; though `b` was scheduled first.
|
||||
(er-sa-test
|
||||
"T3 selective receive picks earliest deadline"
|
||||
(get
|
||||
(sa-ev
|
||||
"erlang:send_after(80, self(), b),
|
||||
erlang:send_after(20, self(), a),
|
||||
receive a -> first end")
|
||||
:name)
|
||||
"first")
|
||||
|
||||
;; ── T4 — cancel_timer on an already-fired timer returns false ──────
|
||||
;; Once `x` has been received the timer has fired; cancelling its ref
|
||||
;; now yields the atom `false`.
|
||||
(er-sa-test
|
||||
"T4 cancel of fired timer is false"
|
||||
(get
|
||||
(sa-ev
|
||||
"Ref = erlang:send_after(20, self(), x),
|
||||
receive x -> ok end,
|
||||
erlang:cancel_timer(Ref)")
|
||||
:name)
|
||||
"false")
|
||||
|
||||
;; ── T5 — send_after to a registered atom name ──────────────────────
|
||||
;; A second process registers itself as `srv`; the timer addresses it
|
||||
;; by name, and the delayed message lands in that process's mailbox.
|
||||
;; The server forwards what it got back to the parent for inspection.
|
||||
(er-sa-test
|
||||
"T5 timer delivers to registered name"
|
||||
(get
|
||||
(sa-ev
|
||||
"Me = self(),
|
||||
Pid = spawn(fun () -> receive M -> Me ! {got, M} end end),
|
||||
register(srv, Pid),
|
||||
erlang:send_after(20, srv, ping),
|
||||
receive {got, X} -> X end")
|
||||
:name)
|
||||
"ping")
|
||||
|
||||
;; ── T6 — gen_server {noreply, State, Timeout} hookup ───────────────
|
||||
;; A gen_server that, on the `arm` cast, returns {noreply, S, 100}.
|
||||
;; The library schedules {timeout} to itself via send_after; when no
|
||||
;; other message arrives first, handle_info({timeout}, S) fires. The
|
||||
;; handler signals the parent so we can confirm the timeout landed.
|
||||
(do
|
||||
(er-load-gen-server!)
|
||||
(erlang-load-module
|
||||
"-module(sa_tmo).
|
||||
init(Me) -> {ok, Me}.
|
||||
handle_call(_R, _F, S) -> {reply, ok, S}.
|
||||
handle_cast(arm, Me) -> {noreply, Me, 100}.
|
||||
handle_info({timeout}, Me) -> Me ! fired, {noreply, Me};
|
||||
handle_info(_M, S) -> {noreply, S}.")
|
||||
nil)
|
||||
|
||||
(er-sa-test
|
||||
"T6 gen_server timeout fires handle_info"
|
||||
(get
|
||||
(sa-ev
|
||||
"Me = self(),
|
||||
P = gen_server:start_link(sa_tmo, Me),
|
||||
gen_server:cast(P, arm),
|
||||
receive fired -> ok after 5000 -> timeout end")
|
||||
:name)
|
||||
"ok")
|
||||
@@ -1147,7 +1147,7 @@
|
||||
(and (er-atom? ms) (= (get ms :name) "infinity"))
|
||||
(er-eval-receive-loop node pid env)
|
||||
(= ms 0) (er-eval-receive-poll node pid env)
|
||||
:else (er-eval-receive-timed node pid env)))))
|
||||
:else (er-eval-receive-timed node pid env (+ (er-clock) ms))))))
|
||||
|
||||
;; after 0 — poll once; on no match, run the after-body immediately.
|
||||
(define
|
||||
@@ -1161,12 +1161,15 @@
|
||||
(get r :value)
|
||||
(er-eval-body (get node :after-body) env)))))
|
||||
|
||||
;; after Ms — suspend; on resume check :timed-out. When the scheduler
|
||||
;; runs out of other work it fires one pending timeout per round.
|
||||
;; after Ms — suspend with an absolute `deadline` (logical ms). On
|
||||
;; resume check :timed-out: the scheduler fires the earliest pending
|
||||
;; deadline once the runnable queue drains. A non-matching message can
|
||||
;; wake the process early; it re-suspends on the SAME deadline so the
|
||||
;; timeout window is not extended.
|
||||
(define
|
||||
er-eval-receive-timed
|
||||
(fn
|
||||
(node pid env)
|
||||
(node pid env deadline)
|
||||
(let
|
||||
((r (er-try-receive (get node :clauses) pid env)))
|
||||
(if
|
||||
@@ -1174,6 +1177,7 @@
|
||||
(get r :value)
|
||||
(do
|
||||
(er-proc-set! pid :has-timeout true)
|
||||
(er-proc-set! pid :timeout-deadline deadline)
|
||||
(call/cc
|
||||
(fn
|
||||
(k)
|
||||
@@ -1186,7 +1190,7 @@
|
||||
(er-proc-set! pid :timed-out false)
|
||||
(er-proc-set! pid :has-timeout false)
|
||||
(er-eval-body (get node :after-body) env))
|
||||
(er-eval-receive-timed node pid env)))))))
|
||||
(er-eval-receive-timed node pid env deadline)))))))
|
||||
|
||||
;; Scan mailbox in arrival order. For each msg, try every clause.
|
||||
;; On first match: remove that msg from mailbox and return body value.
|
||||
|
||||
@@ -5,9 +5,10 @@
|
||||
backoff_for/1, schedule_for/1,
|
||||
record_failure_pure/3, record_success_pure/2,
|
||||
next_due_pure/2, attempts_for/2, next_retry_at/2,
|
||||
dead_letter_list/1,
|
||||
dead_letter_list/1, timer_ref_for/2,
|
||||
start_link/1, start_link/2, stop/1,
|
||||
enqueue/2, flush/1, pending_srv/1, set_dispatch_fn/2]).
|
||||
enqueue/2, flush/1, pending_srv/1, set_dispatch_fn/2,
|
||||
state_srv/1]).
|
||||
-export([init/1, handle_call/3, handle_cast/2, handle_info/2]).
|
||||
|
||||
%% Outbound delivery worker per design §13.4. One gen_server per
|
||||
@@ -49,6 +50,7 @@ new(PeerId) ->
|
||||
{attempts, []},
|
||||
{next_retry, []},
|
||||
{dead_letter, []},
|
||||
{timers, []},
|
||||
{dispatch_fn, undefined}].
|
||||
|
||||
pending(State) -> field(pending, State).
|
||||
@@ -183,6 +185,16 @@ next_retry_at(Cid, State) ->
|
||||
|
||||
dead_letter_list(State) -> field(dead_letter, State).
|
||||
|
||||
%% Step 8b-timer: per-cid timer ref accessor. Exposed for tests so
|
||||
%% they can assert a retry timer was scheduled (or wasn't, after a
|
||||
%% success / dead-letter). Returns the live Ref or undefined.
|
||||
|
||||
timer_ref_for(Cid, State) ->
|
||||
case find_keyed(Cid, field(timers, State)) of
|
||||
{ok, Ref} -> Ref;
|
||||
_ -> undefined
|
||||
end.
|
||||
|
||||
move_to_dead_letter(Cid, State) ->
|
||||
Pending = field(pending, State),
|
||||
{Match, Rest} = take_by_cid(Cid, Pending, [], []),
|
||||
@@ -229,6 +241,13 @@ pending_srv(PeerId) ->
|
||||
set_dispatch_fn(PeerId, Fn) ->
|
||||
gen_server:call(PeerId, {set_dispatch_fn, Fn}).
|
||||
|
||||
%% Step 8b-timer: return the worker's full state so tests can use the
|
||||
%% pure introspection functions (attempts_for / next_retry_at /
|
||||
%% timer_ref_for / dead_letter_list) against it.
|
||||
|
||||
state_srv(PeerId) ->
|
||||
gen_server:call(PeerId, get_state).
|
||||
|
||||
%% gen_server callbacks
|
||||
|
||||
init([PeerId, DispatchFn]) ->
|
||||
@@ -238,17 +257,138 @@ init([PeerId, DispatchFn]) ->
|
||||
handle_call({enqueue, Activity}, _From, State) ->
|
||||
{reply, ok, enqueue_pure(field(peer, State), Activity, State)};
|
||||
handle_call(flush, _From, State) ->
|
||||
{NewState, Delivered, Retry} = drain_pure(State),
|
||||
%% Step 8b-timer: drain (which already bumps :attempts via
|
||||
%% bump_attempt on each failed deliver), then for each retried
|
||||
%% Cid compute the backoff slot from the now-current attempt
|
||||
%% count, set NextRetryAt, and arm a send_after self-cast.
|
||||
%% handle_info({retry, Cid}, ...) fires when the slot elapses.
|
||||
%% Reply shape unchanged.
|
||||
{DrainState, Delivered, Retry} = drain_pure(State),
|
||||
Now = monotonic_seconds(),
|
||||
NewState = lists:foldl(
|
||||
fun(Cid, S) -> arm_retry_timer(Cid, Now, S) end,
|
||||
DrainState, Retry),
|
||||
{reply, {ok, Delivered, Retry}, NewState};
|
||||
handle_call(get_pending, _From, State) ->
|
||||
{reply, field(pending, State), State};
|
||||
handle_call(get_state, _From, State) ->
|
||||
{reply, State, State};
|
||||
handle_call({set_dispatch_fn, Fn}, _From, State) ->
|
||||
{reply, ok, set_field(dispatch_fn, Fn, State)}.
|
||||
|
||||
handle_cast(_, S) -> {noreply, S}.
|
||||
|
||||
%% Step 8b-timer: a retry timer fired. Pull the activity by Cid from
|
||||
%% the pending queue (it might have been drained meanwhile by a
|
||||
%% concurrent flush — if so, we just clear bookkeeping and exit).
|
||||
%% Run deliver_one_pure: success clears retry state; failure bumps
|
||||
%% the counter and schedules the next slot — or dead-letters if the
|
||||
%% sixth attempt failed.
|
||||
|
||||
handle_info({retry, Cid}, State) ->
|
||||
%% Clear the timer ref we just consumed.
|
||||
State0 = clear_timer_ref(Cid, State),
|
||||
case take_by_cid(Cid, field(pending, State0), [], 0) of
|
||||
{none, _} ->
|
||||
%% Already drained / dead-lettered. Clear any stale
|
||||
%% bookkeeping in case the cid is half-tracked.
|
||||
{noreply, record_success_pure(Cid, State0)};
|
||||
{Activity, Rest} ->
|
||||
case deliver_one_pure(Activity, State0) of
|
||||
{ok, _} ->
|
||||
State1 = set_field(pending, Rest, State0),
|
||||
State2 = record_success_pure(Cid, State1),
|
||||
{noreply, State2};
|
||||
{error, _, _} ->
|
||||
%% Keep the activity in pending; record_failure
|
||||
%% leaves :pending alone (or dead-letters it on
|
||||
%% slot 6).
|
||||
Now = monotonic_seconds(),
|
||||
State1 = schedule_retry_for(Cid, Now, State0),
|
||||
{noreply, State1}
|
||||
end
|
||||
end;
|
||||
handle_info(_, S) -> {noreply, S}.
|
||||
|
||||
%% Step 8b-timer helpers ────────────────────────────────────────────
|
||||
|
||||
%% arm_retry_timer/3 — POST-DRAIN form. Used from handle_call(flush)
|
||||
%% after drain_pure has already bumped :attempts via bump_attempt.
|
||||
%% Sets next_retry_at = Now + backoff(attempts) and schedules the
|
||||
%% send_after self-cast. On the dead-letter slot (attempt 6), moves
|
||||
%% the activity from :pending to :dead_letter and arms no timer.
|
||||
|
||||
arm_retry_timer(Cid, Now, State) ->
|
||||
State0 = cancel_timer_for(Cid, State),
|
||||
Attempts = attempts_for(Cid, State0),
|
||||
case backoff_for(Attempts) of
|
||||
dead_letter ->
|
||||
move_to_dead_letter(Cid, State0);
|
||||
Seconds ->
|
||||
NextAt = Now + Seconds,
|
||||
NR = field(next_retry, State0),
|
||||
State1 = set_field(next_retry, set_keyed(Cid, NextAt, NR), State0),
|
||||
Ms = Seconds * 1000,
|
||||
Ref = erlang:send_after(Ms, self(), {retry, Cid}),
|
||||
Timers = field(timers, State1),
|
||||
set_field(timers, set_keyed(Cid, Ref, Timers), State1)
|
||||
end.
|
||||
|
||||
%% schedule_retry_for/3 — POST-RETRY-ATTEMPT form. Used from
|
||||
%% handle_info({retry, Cid}, ...) when the retry attempt failed.
|
||||
%% Bookkeep one failure and arm the next retry timer (or promote
|
||||
%% to dead-letter, in which case no timer is needed).
|
||||
|
||||
schedule_retry_for(Cid, Now, State) ->
|
||||
%% Cancel any in-flight timer for this Cid before scheduling a new
|
||||
%% one. Without the cancel a stale timer can still fire after
|
||||
%% record_success has cleared the cid, the handle_info no-match
|
||||
%% branch silently absorbs it — but it keeps the scheduler's
|
||||
%% run-loop alive long after the work is done. A pure clear (no
|
||||
%% cancel) is fine when the timer's own firing brought us here,
|
||||
%% so the explicit cancel only matters for the flush path.
|
||||
State0 = cancel_timer_for(Cid, State),
|
||||
State1 = record_failure_pure(Cid, Now, State0),
|
||||
Attempts = attempts_for(Cid, State1),
|
||||
case backoff_for(Attempts) of
|
||||
dead_letter ->
|
||||
State1;
|
||||
Seconds ->
|
||||
Ms = Seconds * 1000,
|
||||
Ref = erlang:send_after(Ms, self(), {retry, Cid}),
|
||||
Timers = field(timers, State1),
|
||||
set_field(timers, set_keyed(Cid, Ref, Timers), State1)
|
||||
end.
|
||||
|
||||
%% Cancel the live timer for Cid (if any) and clear it from :timers.
|
||||
%% Idempotent — silent no-op if there isn't one.
|
||||
|
||||
cancel_timer_for(Cid, State) ->
|
||||
Timers = field(timers, State),
|
||||
case find_keyed(Cid, Timers) of
|
||||
{ok, Ref} ->
|
||||
erlang:cancel_timer(Ref),
|
||||
set_field(timers, del_keyed(Cid, Timers), State);
|
||||
_ -> State
|
||||
end.
|
||||
|
||||
%% Drop the :timers entry for Cid without calling cancel_timer — used
|
||||
%% when the timer's own firing brought us into handle_info and the ref
|
||||
%% is already consumed.
|
||||
|
||||
clear_timer_ref(Cid, State) ->
|
||||
Timers = field(timers, State),
|
||||
case find_keyed(Cid, Timers) of
|
||||
{ok, _Ref} -> set_field(timers, del_keyed(Cid, Timers), State);
|
||||
_ -> State
|
||||
end.
|
||||
|
||||
%% Step 8b-timer: bookkeeping uses seconds (matches backoff_for /
|
||||
%% record_failure_pure / next_retry_at). The monotonic clock reports
|
||||
%% ms; we floor to seconds here to keep all the comparisons aligned.
|
||||
|
||||
monotonic_seconds() -> erlang:monotonic_time() div 1000.
|
||||
|
||||
%% ── Internal ────────────────────────────────────────────────────
|
||||
|
||||
activity_cid(Activity) ->
|
||||
|
||||
131
next/tests/delivery_retry_timer.sh
Executable file
131
next/tests/delivery_retry_timer.sh
Executable file
@@ -0,0 +1,131 @@
|
||||
#!/usr/bin/env bash
|
||||
# next/tests/delivery_retry_timer.sh — m2 Step 8b-timer.
|
||||
#
|
||||
# Live timer wiring on the delivery_worker gen_server. The pure
|
||||
# bookkeeping is covered by delivery_retry.sh — this suite proves the
|
||||
# erlang:send_after / cancel_timer wiring fires retries from the
|
||||
# scheduler's logical clock without anyone calling drain by hand.
|
||||
#
|
||||
# Substrate dependency: erlang:send_after/3 + cancel_timer/1 +
|
||||
# monotonic_time/0,1 — landed via cherry-pick from loops/erlang
|
||||
# (commits 3709460d / 98b0104c / 779e53b2 on this branch).
|
||||
#
|
||||
# Test discipline: every test cancels its leftover timer before
|
||||
# returning. If we don't, the scheduler keeps the run loop alive
|
||||
# advancing time through the full backoff chain (30s → 5m → 30m →
|
||||
# 6h → 24h), and each tick costs ~10s of wall time inside the
|
||||
# Erlang-on-SX VM. Canceling the trailing timer is the difference
|
||||
# between a 25s test and a 60s+ test.
|
||||
|
||||
set -uo pipefail
|
||||
cd "$(git rev-parse --show-toplevel)"
|
||||
|
||||
SX_SERVER="${SX_SERVER:-hosts/ocaml/_build/default/bin/sx_server.exe}"
|
||||
if [ ! -x "$SX_SERVER" ]; then
|
||||
SX_SERVER="/root/rose-ash/hosts/ocaml/_build/default/bin/sx_server.exe"
|
||||
fi
|
||||
if [ ! -x "$SX_SERVER" ]; then
|
||||
echo "ERROR: sx_server.exe not found." >&2
|
||||
exit 1
|
||||
fi
|
||||
|
||||
VERBOSE="${1:-}"
|
||||
PASS=0; FAIL=0; ERRORS=""
|
||||
TMPFILE=$(mktemp); trap "rm -f $TMPFILE" EXIT
|
||||
|
||||
# A canned activity with cid <<1,2,3>>.
|
||||
SETUP='Act = [{id, <<1,2,3>>}, {type, note}, {actor, alice}], FailFn = fun(_) -> {error, transient} end,'
|
||||
|
||||
# Convenience: cancel any leftover timer for cid <<1,2,3>> on Peer.
|
||||
# Prevents the scheduler from grinding through 30s/5m/30m/6h/24h of
|
||||
# retries between epochs.
|
||||
CANCEL='CancelLeftover = fun(Peer) -> SS = delivery_worker:state_srv(Peer), case delivery_worker:timer_ref_for(<<1,2,3>>, SS) of undefined -> ok; LRef -> erlang:cancel_timer(LRef), ok end end,'
|
||||
|
||||
cat > "$TMPFILE" <<EPOCHS
|
||||
(epoch 1)
|
||||
(load "lib/erlang/tokenizer.sx")
|
||||
(load "lib/erlang/parser.sx")
|
||||
(load "lib/erlang/parser-core.sx")
|
||||
(load "lib/erlang/parser-expr.sx")
|
||||
(load "lib/erlang/parser-module.sx")
|
||||
(load "lib/erlang/transpile.sx")
|
||||
(load "lib/erlang/runtime.sx")
|
||||
(load "lib/erlang/vm/dispatcher.sx")
|
||||
(epoch 2)
|
||||
(eval "(er-load-gen-server!)")
|
||||
(eval "(get (erlang-load-module (file-read \"next/kernel/envelope.erl\")) :name)")
|
||||
(eval "(get (erlang-load-module (file-read \"next/kernel/delivery_worker.erl\")) :name)")
|
||||
|
||||
;; T1 — a failing flush schedules a retry timer. timer_ref_for
|
||||
;; returns a live Ref (not undefined). Then cancel before
|
||||
;; returning so the scheduler doesn't grind the full backoff
|
||||
;; chain trying to retry.
|
||||
(epoch 10)
|
||||
(eval "(get (erlang-eval-ast \"${SETUP}${CANCEL} delivery_worker:start_link(bob, FailFn), delivery_worker:enqueue(bob, Act), {ok, [], [<<1,2,3>>]} = delivery_worker:flush(bob), S = delivery_worker:state_srv(bob), Ref = delivery_worker:timer_ref_for(<<1,2,3>>, S), Result = is_reference(Ref), CancelLeftover(bob), Result\") :name)")
|
||||
|
||||
;; T2 — initial flush bumps the attempt counter to 1; next_retry_at
|
||||
;; gets set; cancel the timer before returning.
|
||||
(epoch 11)
|
||||
(eval "(get (erlang-eval-ast \"${SETUP}${CANCEL} delivery_worker:start_link(bob, FailFn), delivery_worker:enqueue(bob, Act), delivery_worker:flush(bob), S = delivery_worker:state_srv(bob), Result = delivery_worker:attempts_for(<<1,2,3>>, S) =:= 1, CancelLeftover(bob), Result\") :name)")
|
||||
|
||||
;; T3 — advancing the logical clock past the 30s backoff fires the
|
||||
;; timer; handle_info({retry, Cid}) bumps attempts to 2 and arms
|
||||
;; the next slot (backoff(2)=300s). Then cancel the new timer.
|
||||
(epoch 12)
|
||||
(eval "(get (erlang-eval-ast \"${SETUP}${CANCEL} delivery_worker:start_link(bob, FailFn), delivery_worker:enqueue(bob, Act), delivery_worker:flush(bob), receive after 31000 -> ok end, S = delivery_worker:state_srv(bob), Result = delivery_worker:attempts_for(<<1,2,3>>, S) =:= 2, CancelLeftover(bob), Result\") :name)")
|
||||
|
||||
;; T4 — after the retry fires the worker has armed a fresh timer
|
||||
;; for the next backoff slot. Confirm it's a live ref, then
|
||||
;; cancel it.
|
||||
(epoch 13)
|
||||
(eval "(get (erlang-eval-ast \"${SETUP}${CANCEL} delivery_worker:start_link(bob, FailFn), delivery_worker:enqueue(bob, Act), delivery_worker:flush(bob), receive after 31000 -> ok end, S = delivery_worker:state_srv(bob), Result = is_reference(delivery_worker:timer_ref_for(<<1,2,3>>, S)), CancelLeftover(bob), Result\") :name)")
|
||||
|
||||
;; T5 — successful retry path. Dispatch fails twice then succeeds
|
||||
;; (ets-backed counter). After two backoff slots elapse
|
||||
;; (30s, then 300s), the third attempt succeeds and
|
||||
;; record_success_pure clears the per-cid bookkeeping. No new
|
||||
;; timer is scheduled, so the scheduler terminates naturally.
|
||||
(epoch 14)
|
||||
(eval "(get (erlang-eval-ast \"${SETUP} ets:new(rt_ctr, [named_table, public]), ets:insert(rt_ctr, {n, 0}), Mixed = fun(_) -> [{n, N}] = ets:lookup(rt_ctr, n), ets:insert(rt_ctr, {n, N+1}), case N < 2 of true -> {error, transient}; false -> ok end end, delivery_worker:start_link(carol, Mixed), delivery_worker:enqueue(carol, Act), delivery_worker:flush(carol), receive after 31000 -> ok end, receive after 301000 -> ok end, S = delivery_worker:state_srv(carol), delivery_worker:pending(S) =:= [] andalso delivery_worker:attempts_for(<<1,2,3>>, S) =:= 0 andalso delivery_worker:timer_ref_for(<<1,2,3>>, S) =:= undefined\") :name)")
|
||||
EPOCHS
|
||||
|
||||
OUTPUT=$(timeout 900 "$SX_SERVER" < "$TMPFILE" 2>/dev/null)
|
||||
|
||||
check() {
|
||||
local epoch="$1" desc="$2" expected="$3"
|
||||
local actual
|
||||
actual=$(echo "$OUTPUT" | awk -v e="$epoch" '
|
||||
$0 ~ "^\\(ok-len " e " " { getline; print; exit }
|
||||
$0 ~ "^\\(ok " e " " { print; exit }
|
||||
$0 ~ "^\\(error " e " " { print; exit }
|
||||
')
|
||||
[ -z "$actual" ] && actual="<no output for epoch $epoch>"
|
||||
if echo "$actual" | grep -qF -- "$expected"; then
|
||||
PASS=$((PASS+1))
|
||||
[ "$VERBOSE" = "-v" ] && echo " ok $desc"
|
||||
else
|
||||
FAIL=$((FAIL+1))
|
||||
ERRORS+=" FAIL [$desc] (epoch $epoch) expected: $expected | actual: $actual
|
||||
"
|
||||
fi
|
||||
}
|
||||
|
||||
check 10 "T1 flush schedules a timer" "true"
|
||||
check 11 "T2 initial flush bumps attempts to 1" "true"
|
||||
check 12 "T3 timer fires; attempts=2" "true"
|
||||
check 13 "T4 retry rearms next timer" "true"
|
||||
check 14 "T5 success clears retry state" "true"
|
||||
|
||||
TOTAL=$((PASS+FAIL))
|
||||
if [ $FAIL -eq 0 ]; then
|
||||
echo "ok $PASS/$TOTAL next/tests/delivery_retry_timer.sh passed"
|
||||
else
|
||||
echo "FAIL $PASS/$TOTAL passed, $FAIL failed:"
|
||||
echo "$ERRORS"
|
||||
if [ "$VERBOSE" = "-v" ]; then
|
||||
echo "--- sx_server output ---"
|
||||
echo "$OUTPUT" | tail -40
|
||||
echo "---"
|
||||
fi
|
||||
fi
|
||||
[ $FAIL -eq 0 ]
|
||||
@@ -562,10 +562,24 @@ a dead-letter list visible via `/admin/dead-letter`.
|
||||
is cleared from `:next_retry`. `record_success_pure` clears
|
||||
both. `next_due_pure` returns cids whose retry time has
|
||||
passed. 11 cases in `delivery_retry.sh`.
|
||||
- [ ] **8b-timer** — Erlang-side timer wiring (`erlang:send_after`
|
||||
self-cast or equivalent). Needs the same substrate primitive
|
||||
that `gen_server` uses for `timeout` returns. Defer behind
|
||||
substrate gap discovery for now — see Blockers.
|
||||
- [x] **8b-timer** — Erlang-side timer wiring on the
|
||||
`delivery_worker` gen_server. handle_call(flush) drains then
|
||||
arms a `send_after` self-cast per retried Cid (backoff from
|
||||
the now-bumped attempt counter); handle_info({retry, Cid})
|
||||
redrives that single Cid through deliver_one_pure. Success
|
||||
clears bookkeeping via record_success; failure bumps attempts
|
||||
via record_failure_pure and arms the next backoff slot — or
|
||||
promotes to dead-letter on the 6th attempt and stops arming.
|
||||
A `:timers [{Cid, Ref}]` state field tracks live refs so
|
||||
schedule_retry_for can cancel the previous one before arming
|
||||
the next (otherwise stale timers keep the scheduler's run
|
||||
loop alive long after the work is done). 5/5 in
|
||||
`delivery_retry_timer.sh`: T1 timer scheduled, T2 attempts=1,
|
||||
T3 retry fires + attempts=2, T4 next timer rearmed, T5 ets-
|
||||
counter dispatch (fail/fail/ok) lands in 3 attempts and
|
||||
clears state. Substrate dependency landed via cherry-pick
|
||||
from `loops/erlang` (3709460d / 98b0104c / 779e53b2) until
|
||||
`loops/erlang` → architecture catches up.
|
||||
- [x] **8c** — Delivery-state projection
|
||||
(`next/kernel/delivery_state.erl`). Folds delivery events into
|
||||
per-peer worker-shaped snapshots so the outbound queue survives
|
||||
@@ -1105,8 +1119,16 @@ proceed.
|
||||
through `delivery_worker`) and Step 10c (peer-actor doc
|
||||
fetch in `peer_actors`) are now unblocked.
|
||||
|
||||
3. **`erlang:send_after`-style timer primitive** — discovered
|
||||
during Step 8b prep. The retry loop needs a way for the
|
||||
3. **`erlang:send_after`-style timer primitive** — ~~discovered
|
||||
during Step 8b prep~~ **RESOLVED 2026-06-30** via the
|
||||
`loops/erlang` `send_after`/`cancel_timer`/`monotonic_time`
|
||||
work landing on `origin/loops/erlang` (commits 3709460d,
|
||||
98b0104c, b10e55f0; 766/766 → 771/771). m2 cherry-picked all
|
||||
three onto this branch so 8b-timer could land without waiting
|
||||
for `loops/erlang` → architecture; the cherry-picks fall away
|
||||
as no-op duplicates when architecture catches up. Original
|
||||
diagnosis preserved below for the audit trail.
|
||||
The retry loop needs a way for the
|
||||
delivery_worker to wake itself up after `backoff_for(N)`
|
||||
seconds. Erlang's `erlang:send_after/3` is the standard
|
||||
primitive; this port doesn't seem to register it (looked at
|
||||
@@ -1241,6 +1263,31 @@ proceed.
|
||||
|
||||
Newest first.
|
||||
|
||||
- **2026-06-30** — Step 8b-timer closed. Cherry-picked the three
|
||||
`loops/erlang` send_after commits onto m2 (3709460d, 98b0104c,
|
||||
779e53b2 — the substrate landed standalone on origin/loops/erlang
|
||||
earlier and hadn't propagated to origin/architecture yet). Wired
|
||||
the live timer loop in `next/kernel/delivery_worker.erl`: a
|
||||
`:timers [{Cid, Ref}]` state field; `handle_call(flush)` drains
|
||||
then arms a `send_after` self-cast per retried Cid; the new
|
||||
`handle_info({retry, Cid})` callback redrives that one Cid through
|
||||
`deliver_one_pure` and either records success / clears state, or
|
||||
bumps and arms the next backoff slot (or dead-letters on the 6th
|
||||
attempt). Two arm-paths split — `arm_retry_timer` (post-drain,
|
||||
attempts already bumped) vs `schedule_retry_for` (post-retry
|
||||
attempt, needs to bump). `cancel_timer_for/1` clears the previous
|
||||
timer before arming the next so stale timers don't keep the
|
||||
scheduler's run loop alive after the work is done. Two new public
|
||||
APIs for tests: `state_srv/1` returns the worker's full state,
|
||||
`timer_ref_for/2` looks up a Cid's live ref. 5/5 in new
|
||||
`delivery_retry_timer.sh` (T1 timer scheduled, T2 attempts=1, T3
|
||||
retry fires + attempts=2, T4 next timer rearmed, T5 ets-counter
|
||||
dispatch fail/fail/ok lands in 3 attempts and clears state).
|
||||
Existing `delivery_worker.sh` 17/17 and `delivery_retry.sh` 11/11
|
||||
still green. Conformance gate 771/771 (was 761/761; the +10 is
|
||||
the cherry-picked send_after suite). Blockers #3 RESOLVED.
|
||||
Reply shape of `flush` unchanged; no caller updates needed.
|
||||
|
||||
- **2026-06-28** — Merge-prep pass. Conformance 761/761 still green
|
||||
on m2 tip `cd0de8cb`. Both smoke tests still pass cold:
|
||||
`next/tests/smoke_kernel_route.sh` 6/6 (port 54471, listener up
|
||||
|
||||
Reference in New Issue
Block a user