Some checks failed
Test, Build, and Deploy / test-build-deploy (push) Failing after 44s
Wires the delivery_worker's retry loop on top of the erlang:send_after / cancel_timer primitives just landed on loops/erlang (3709460d,98b0104c,779e53b2— cherry-picked here since origin/architecture hasn't caught up yet). Surface: - new :timers [{Cid, Ref}] state field tracks live timer refs - handle_call(flush): drain (existing semantics) + arm_retry_timer per retried Cid (computes backoff slot from the now-bumped attempt count, sets next_retry_at, send_after self-cast). Reply shape unchanged. - handle_info({retry, Cid}, S): redrives that one Cid through deliver_one_pure. Success → record_success_pure + clear pending. Failure → schedule_retry_for (which bumps attempts, dead-letters on slot 6, or arms next slot). - cancel_timer_for/2 before arming a new timer so stale timers don't keep the scheduler's run loop alive after the work is done. - state_srv/1 + timer_ref_for/2 for test introspection. 5/5 in new delivery_retry_timer.sh; existing delivery_worker.sh 17/17 and delivery_retry.sh 11/11 still green. Conformance gate 771/771 (was 761/761; the +10 is the cherry-picked send_after suite). Closes Blockers #3. m2 is now feature-complete. Co-Authored-By: Claude Opus 4.7 <noreply@anthropic.com>
427 lines
16 KiB
Erlang
427 lines
16 KiB
Erlang
-module(delivery_worker).
|
|
-behaviour(gen_server).
|
|
-export([new/1, pending/1, peer/1,
|
|
enqueue_pure/3, drain_pure/1, deliver_one_pure/2,
|
|
backoff_for/1, schedule_for/1,
|
|
record_failure_pure/3, record_success_pure/2,
|
|
next_due_pure/2, attempts_for/2, next_retry_at/2,
|
|
dead_letter_list/1, timer_ref_for/2,
|
|
start_link/1, start_link/2, stop/1,
|
|
enqueue/2, flush/1, pending_srv/1, set_dispatch_fn/2,
|
|
state_srv/1]).
|
|
-export([init/1, handle_call/3, handle_cast/2, handle_info/2]).
|
|
|
|
%% Outbound delivery worker per design §13.4. One gen_server per
|
|
%% peer instance (peer-id atom) holding a FIFO queue of pending
|
|
%% activities to deliver. v2 lands in stages:
|
|
%%
|
|
%% Step 8a pure-functional state shape, enqueue / drain /
|
|
%% schedule semantics + gen_server skeleton + tests
|
|
%% Step 8b retry / backoff schedule (30s / 5m / 30m / 6h / 24h)
|
|
%% + dead-letter list
|
|
%% Step 8c delivery-state projection so the queue survives
|
|
%% kernel restart
|
|
%% Step 8d outbox:publish/2 dispatches each delivery-set entry
|
|
%% to the matching worker
|
|
%% Step 8e httpc:request/4 BIF (substrate exception per briefing)
|
|
%% Step 8f real HTTP POST through the BIF + content-type wiring
|
|
%%
|
|
%% This file is 8a only — pure state + skeleton gen_server with the
|
|
%% APIs Step 8b-d will fill in. Real HTTP dispatch is stubbed via a
|
|
%% caller-supplied `:dispatch_fn` so tests can intercept and Step 8f
|
|
%% can plug in the live httpc call without touching the queue logic.
|
|
%%
|
|
%% State shape (pure):
|
|
%% [{peer, PeerId},
|
|
%% {pending, [Activity, ...]}, %% FIFO; head delivered first
|
|
%% {attempts, [{Cid, AttemptCount}, ...]},
|
|
%% {next_retry, [{Cid, NextRetryAt}, ...]}, %% Step 8b-pure
|
|
%% {dead_letter, [Activity, ...]},
|
|
%% {dispatch_fn, fun/1 | undefined}]
|
|
%%
|
|
%% gen_server registers under the peer-id atom (one worker per peer);
|
|
%% the same APIs work as pure-functional state transitions for tests.
|
|
|
|
%% ── Pure-functional API ─────────────────────────────────────────
|
|
|
|
new(PeerId) ->
|
|
[{peer, PeerId},
|
|
{pending, []},
|
|
{attempts, []},
|
|
{next_retry, []},
|
|
{dead_letter, []},
|
|
{timers, []},
|
|
{dispatch_fn, undefined}].
|
|
|
|
pending(State) -> field(pending, State).
|
|
peer(State) -> field(peer, State).
|
|
|
|
%% enqueue_pure/3 — append an activity to the queue. Returns new
|
|
%% state. Duplicate :id activities aren't deduplicated here — that's
|
|
%% the caller's job (Step 8d will pass each delivery-set entry once).
|
|
|
|
enqueue_pure(_PeerId, Activity, State) ->
|
|
Pending = field(pending, State),
|
|
set_field(pending, Pending ++ [Activity], State).
|
|
|
|
%% drain_pure/1 — attempt to deliver every queued activity through
|
|
%% the configured dispatch_fn. Returns {NewState, DeliveredCids,
|
|
%% RetryCids}. Activities that fail dispatch stay in :pending with
|
|
%% an incremented attempt counter — Step 8b will use the count to
|
|
%% pick a backoff slot.
|
|
|
|
drain_pure(State) ->
|
|
Pending = field(pending, State),
|
|
drain_loop(Pending, [], State, [], []).
|
|
|
|
drain_loop([], Kept, State, Delivered, Retry) ->
|
|
{set_field(pending, Kept, State), Delivered, Retry};
|
|
drain_loop([A | Rest], Kept, State, Delivered, Retry) ->
|
|
case deliver_one_pure(A, State) of
|
|
{ok, Cid} ->
|
|
drain_loop(Rest, Kept, State, Delivered ++ [Cid], Retry);
|
|
{error, Cid, _Reason} ->
|
|
State1 = bump_attempt(Cid, State),
|
|
drain_loop(Rest, Kept ++ [A], State1, Delivered, Retry ++ [Cid])
|
|
end.
|
|
|
|
%% deliver_one_pure/2 — single-activity dispatch via the caller-
|
|
%% supplied dispatch_fn. Returns {ok, Cid} on success or {error,
|
|
%% Cid, Reason} on failure. With no dispatch_fn configured returns
|
|
%% {error, _, no_dispatch_fn} so callers know to wire one before
|
|
%% the worker is useful.
|
|
|
|
deliver_one_pure(Activity, State) ->
|
|
Cid = activity_cid(Activity),
|
|
case field(dispatch_fn, State) of
|
|
undefined -> {error, Cid, no_dispatch_fn};
|
|
Fn when is_function(Fn, 1) ->
|
|
case Fn(Activity) of
|
|
ok -> {ok, Cid};
|
|
{ok, _} -> {ok, Cid};
|
|
{error, Reason} -> {error, Cid, Reason};
|
|
Other -> {error, Cid, {bad_dispatch_return, Other}}
|
|
end;
|
|
_ -> {error, Cid, bad_dispatch_fn}
|
|
end.
|
|
|
|
%% backoff_for/1 — Step 8a returns the static schedule per the
|
|
%% plan; Step 8b wires it into the retry loop. Attempts are
|
|
%% 1-indexed (first retry uses slot 1).
|
|
%%
|
|
%% 30s / 5m / 30m / 6h / 24h then dead_letter.
|
|
|
|
backoff_for(0) -> 0;
|
|
backoff_for(1) -> 30;
|
|
backoff_for(2) -> 300; % 5 * 60
|
|
backoff_for(3) -> 1800; % 30 * 60
|
|
backoff_for(4) -> 21600; % 6 * 3600
|
|
backoff_for(5) -> 86400; % 24 * 3600
|
|
backoff_for(_) -> dead_letter.
|
|
|
|
schedule_for(Attempts) ->
|
|
case backoff_for(Attempts) of
|
|
dead_letter -> dead_letter;
|
|
Seconds -> {retry_in, Seconds}
|
|
end.
|
|
|
|
%% ── Step 8b-pure: retry-time bookkeeping ───────────────────────
|
|
%%
|
|
%% `record_failure_pure/3(Cid, Now, State)` — call after a failed
|
|
%% deliver_one. Bumps the per-cid attempt counter; if the new
|
|
%% attempt is past the dead-letter threshold, moves the matching
|
|
%% activity from :pending to :dead_letter. Otherwise records the
|
|
%% next retry time as Now + backoff_for(NewAttempt).
|
|
%%
|
|
%% Real timer wiring (erlang:send_after self-cast on the worker
|
|
%% pid) needs substrate support — Step 8b-timer when that lands.
|
|
%%
|
|
%% `record_success_pure/2(Cid, State)` — clears :attempts and
|
|
%% :next_retry entries for the cid; called after a successful
|
|
%% deliver_one.
|
|
%%
|
|
%% `next_due_pure/2(Now, State)` — returns the list of Cids whose
|
|
%% NextRetryAt has passed, in insertion order.
|
|
|
|
record_failure_pure(Cid, Now, State) ->
|
|
Attempts = field(attempts, State),
|
|
Current = case find_keyed(Cid, Attempts) of
|
|
{ok, N} -> N;
|
|
_ -> 0
|
|
end,
|
|
New = Current + 1,
|
|
State1 = set_field(attempts, set_keyed(Cid, New, Attempts), State),
|
|
case backoff_for(New) of
|
|
dead_letter ->
|
|
move_to_dead_letter(Cid, State1);
|
|
Seconds ->
|
|
NextAt = Now + Seconds,
|
|
NR = field(next_retry, State1),
|
|
set_field(next_retry, set_keyed(Cid, NextAt, NR), State1)
|
|
end.
|
|
|
|
record_success_pure(Cid, State) ->
|
|
A1 = del_keyed(Cid, field(attempts, State)),
|
|
NR1 = del_keyed(Cid, field(next_retry, State)),
|
|
set_field(attempts, A1, set_field(next_retry, NR1, State)).
|
|
|
|
%% next_due_pure/2 — Cids whose NextRetryAt <= Now. Preserves
|
|
%% insertion order so the worker drains them in FIFO retry order.
|
|
|
|
next_due_pure(Now, State) ->
|
|
[Cid || {Cid, At} <- field(next_retry, State), At =< Now].
|
|
|
|
attempts_for(Cid, State) ->
|
|
case find_keyed(Cid, field(attempts, State)) of
|
|
{ok, N} -> N;
|
|
_ -> 0
|
|
end.
|
|
|
|
next_retry_at(Cid, State) ->
|
|
case find_keyed(Cid, field(next_retry, State)) of
|
|
{ok, At} -> At;
|
|
_ -> undefined
|
|
end.
|
|
|
|
dead_letter_list(State) -> field(dead_letter, State).
|
|
|
|
%% Step 8b-timer: per-cid timer ref accessor. Exposed for tests so
|
|
%% they can assert a retry timer was scheduled (or wasn't, after a
|
|
%% success / dead-letter). Returns the live Ref or undefined.
|
|
|
|
timer_ref_for(Cid, State) ->
|
|
case find_keyed(Cid, field(timers, State)) of
|
|
{ok, Ref} -> Ref;
|
|
_ -> undefined
|
|
end.
|
|
|
|
move_to_dead_letter(Cid, State) ->
|
|
Pending = field(pending, State),
|
|
{Match, Rest} = take_by_cid(Cid, Pending, [], []),
|
|
DL = field(dead_letter, State),
|
|
State1 = set_field(pending, Rest, State),
|
|
State2 = case Match of
|
|
none -> State1;
|
|
Act -> set_field(dead_letter, DL ++ [Act], State1)
|
|
end,
|
|
NR = field(next_retry, State2),
|
|
set_field(next_retry, del_keyed(Cid, NR), State2).
|
|
|
|
take_by_cid(_, [], Acc, _) -> {none, lists:reverse(Acc)};
|
|
take_by_cid(Cid, [A | Rest], Acc, _) ->
|
|
case activity_cid(A) of
|
|
Cid -> {A, lists:reverse(Acc) ++ Rest};
|
|
_ -> take_by_cid(Cid, Rest, [A | Acc], 0)
|
|
end.
|
|
|
|
%% ── gen_server wrapper ──────────────────────────────────────────
|
|
|
|
start_link(PeerId) ->
|
|
start_link(PeerId, undefined).
|
|
|
|
start_link(PeerId, DispatchFn) ->
|
|
Pid = gen_server:start_link(delivery_worker, [PeerId, DispatchFn]),
|
|
erlang:register(PeerId, Pid),
|
|
Pid.
|
|
|
|
stop(PeerId) ->
|
|
R = gen_server:call(PeerId, '$gen_stop'),
|
|
erlang:unregister(PeerId),
|
|
R.
|
|
|
|
enqueue(PeerId, Activity) ->
|
|
gen_server:call(PeerId, {enqueue, Activity}).
|
|
|
|
flush(PeerId) ->
|
|
gen_server:call(PeerId, flush).
|
|
|
|
pending_srv(PeerId) ->
|
|
gen_server:call(PeerId, get_pending).
|
|
|
|
set_dispatch_fn(PeerId, Fn) ->
|
|
gen_server:call(PeerId, {set_dispatch_fn, Fn}).
|
|
|
|
%% Step 8b-timer: return the worker's full state so tests can use the
|
|
%% pure introspection functions (attempts_for / next_retry_at /
|
|
%% timer_ref_for / dead_letter_list) against it.
|
|
|
|
state_srv(PeerId) ->
|
|
gen_server:call(PeerId, get_state).
|
|
|
|
%% gen_server callbacks
|
|
|
|
init([PeerId, DispatchFn]) ->
|
|
S0 = new(PeerId),
|
|
{ok, set_field(dispatch_fn, DispatchFn, S0)}.
|
|
|
|
handle_call({enqueue, Activity}, _From, State) ->
|
|
{reply, ok, enqueue_pure(field(peer, State), Activity, State)};
|
|
handle_call(flush, _From, State) ->
|
|
%% Step 8b-timer: drain (which already bumps :attempts via
|
|
%% bump_attempt on each failed deliver), then for each retried
|
|
%% Cid compute the backoff slot from the now-current attempt
|
|
%% count, set NextRetryAt, and arm a send_after self-cast.
|
|
%% handle_info({retry, Cid}, ...) fires when the slot elapses.
|
|
%% Reply shape unchanged.
|
|
{DrainState, Delivered, Retry} = drain_pure(State),
|
|
Now = monotonic_seconds(),
|
|
NewState = lists:foldl(
|
|
fun(Cid, S) -> arm_retry_timer(Cid, Now, S) end,
|
|
DrainState, Retry),
|
|
{reply, {ok, Delivered, Retry}, NewState};
|
|
handle_call(get_pending, _From, State) ->
|
|
{reply, field(pending, State), State};
|
|
handle_call(get_state, _From, State) ->
|
|
{reply, State, State};
|
|
handle_call({set_dispatch_fn, Fn}, _From, State) ->
|
|
{reply, ok, set_field(dispatch_fn, Fn, State)}.
|
|
|
|
handle_cast(_, S) -> {noreply, S}.
|
|
|
|
%% Step 8b-timer: a retry timer fired. Pull the activity by Cid from
|
|
%% the pending queue (it might have been drained meanwhile by a
|
|
%% concurrent flush — if so, we just clear bookkeeping and exit).
|
|
%% Run deliver_one_pure: success clears retry state; failure bumps
|
|
%% the counter and schedules the next slot — or dead-letters if the
|
|
%% sixth attempt failed.
|
|
|
|
handle_info({retry, Cid}, State) ->
|
|
%% Clear the timer ref we just consumed.
|
|
State0 = clear_timer_ref(Cid, State),
|
|
case take_by_cid(Cid, field(pending, State0), [], 0) of
|
|
{none, _} ->
|
|
%% Already drained / dead-lettered. Clear any stale
|
|
%% bookkeeping in case the cid is half-tracked.
|
|
{noreply, record_success_pure(Cid, State0)};
|
|
{Activity, Rest} ->
|
|
case deliver_one_pure(Activity, State0) of
|
|
{ok, _} ->
|
|
State1 = set_field(pending, Rest, State0),
|
|
State2 = record_success_pure(Cid, State1),
|
|
{noreply, State2};
|
|
{error, _, _} ->
|
|
%% Keep the activity in pending; record_failure
|
|
%% leaves :pending alone (or dead-letters it on
|
|
%% slot 6).
|
|
Now = monotonic_seconds(),
|
|
State1 = schedule_retry_for(Cid, Now, State0),
|
|
{noreply, State1}
|
|
end
|
|
end;
|
|
handle_info(_, S) -> {noreply, S}.
|
|
|
|
%% Step 8b-timer helpers ────────────────────────────────────────────
|
|
|
|
%% arm_retry_timer/3 — POST-DRAIN form. Used from handle_call(flush)
|
|
%% after drain_pure has already bumped :attempts via bump_attempt.
|
|
%% Sets next_retry_at = Now + backoff(attempts) and schedules the
|
|
%% send_after self-cast. On the dead-letter slot (attempt 6), moves
|
|
%% the activity from :pending to :dead_letter and arms no timer.
|
|
|
|
arm_retry_timer(Cid, Now, State) ->
|
|
State0 = cancel_timer_for(Cid, State),
|
|
Attempts = attempts_for(Cid, State0),
|
|
case backoff_for(Attempts) of
|
|
dead_letter ->
|
|
move_to_dead_letter(Cid, State0);
|
|
Seconds ->
|
|
NextAt = Now + Seconds,
|
|
NR = field(next_retry, State0),
|
|
State1 = set_field(next_retry, set_keyed(Cid, NextAt, NR), State0),
|
|
Ms = Seconds * 1000,
|
|
Ref = erlang:send_after(Ms, self(), {retry, Cid}),
|
|
Timers = field(timers, State1),
|
|
set_field(timers, set_keyed(Cid, Ref, Timers), State1)
|
|
end.
|
|
|
|
%% schedule_retry_for/3 — POST-RETRY-ATTEMPT form. Used from
|
|
%% handle_info({retry, Cid}, ...) when the retry attempt failed.
|
|
%% Bookkeep one failure and arm the next retry timer (or promote
|
|
%% to dead-letter, in which case no timer is needed).
|
|
|
|
schedule_retry_for(Cid, Now, State) ->
|
|
%% Cancel any in-flight timer for this Cid before scheduling a new
|
|
%% one. Without the cancel a stale timer can still fire after
|
|
%% record_success has cleared the cid, the handle_info no-match
|
|
%% branch silently absorbs it — but it keeps the scheduler's
|
|
%% run-loop alive long after the work is done. A pure clear (no
|
|
%% cancel) is fine when the timer's own firing brought us here,
|
|
%% so the explicit cancel only matters for the flush path.
|
|
State0 = cancel_timer_for(Cid, State),
|
|
State1 = record_failure_pure(Cid, Now, State0),
|
|
Attempts = attempts_for(Cid, State1),
|
|
case backoff_for(Attempts) of
|
|
dead_letter ->
|
|
State1;
|
|
Seconds ->
|
|
Ms = Seconds * 1000,
|
|
Ref = erlang:send_after(Ms, self(), {retry, Cid}),
|
|
Timers = field(timers, State1),
|
|
set_field(timers, set_keyed(Cid, Ref, Timers), State1)
|
|
end.
|
|
|
|
%% Cancel the live timer for Cid (if any) and clear it from :timers.
|
|
%% Idempotent — silent no-op if there isn't one.
|
|
|
|
cancel_timer_for(Cid, State) ->
|
|
Timers = field(timers, State),
|
|
case find_keyed(Cid, Timers) of
|
|
{ok, Ref} ->
|
|
erlang:cancel_timer(Ref),
|
|
set_field(timers, del_keyed(Cid, Timers), State);
|
|
_ -> State
|
|
end.
|
|
|
|
%% Drop the :timers entry for Cid without calling cancel_timer — used
|
|
%% when the timer's own firing brought us into handle_info and the ref
|
|
%% is already consumed.
|
|
|
|
clear_timer_ref(Cid, State) ->
|
|
Timers = field(timers, State),
|
|
case find_keyed(Cid, Timers) of
|
|
{ok, _Ref} -> set_field(timers, del_keyed(Cid, Timers), State);
|
|
_ -> State
|
|
end.
|
|
|
|
%% Step 8b-timer: bookkeeping uses seconds (matches backoff_for /
|
|
%% record_failure_pure / next_retry_at). The monotonic clock reports
|
|
%% ms; we floor to seconds here to keep all the comparisons aligned.
|
|
|
|
monotonic_seconds() -> erlang:monotonic_time() div 1000.
|
|
|
|
%% ── Internal ────────────────────────────────────────────────────
|
|
|
|
activity_cid(Activity) ->
|
|
case envelope:get_field(id, Activity) of
|
|
{ok, Cid} -> Cid;
|
|
_ -> nil
|
|
end.
|
|
|
|
bump_attempt(Cid, State) ->
|
|
Attempts = field(attempts, State),
|
|
Current = case find_keyed(Cid, Attempts) of
|
|
{ok, N} -> N;
|
|
_ -> 0
|
|
end,
|
|
set_field(attempts, set_keyed(Cid, Current + 1, Attempts), State).
|
|
|
|
field(K, [{K, V} | _]) -> V;
|
|
field(K, [_ | Rest]) -> field(K, Rest);
|
|
field(_, []) -> undefined.
|
|
|
|
set_field(K, V, []) -> [{K, V}];
|
|
set_field(K, V, [{K, _} | Rest]) -> [{K, V} | Rest];
|
|
set_field(K, V, [P | Rest]) -> [P | set_field(K, V, Rest)].
|
|
|
|
find_keyed(_, []) -> {error, not_found};
|
|
find_keyed(K, [{K, V} | _]) -> {ok, V};
|
|
find_keyed(K, [_ | Rest]) -> find_keyed(K, Rest).
|
|
|
|
set_keyed(K, V, []) -> [{K, V}];
|
|
set_keyed(K, V, [{K, _} | Rest]) -> [{K, V} | Rest];
|
|
set_keyed(K, V, [P | Rest]) -> [P | set_keyed(K, V, Rest)].
|
|
|
|
del_keyed(_, []) -> [];
|
|
del_keyed(K, [{K, _} | Rest]) -> Rest;
|
|
del_keyed(K, [P | Rest]) -> [P | del_keyed(K, Rest)].
|