-module(delivery_worker). -behaviour(gen_server). -export([new/1, pending/1, peer/1, enqueue_pure/3, drain_pure/1, deliver_one_pure/2, backoff_for/1, schedule_for/1, record_failure_pure/3, record_success_pure/2, next_due_pure/2, attempts_for/2, next_retry_at/2, dead_letter_list/1, start_link/1, start_link/2, stop/1, enqueue/2, flush/1, pending_srv/1, set_dispatch_fn/2]). -export([init/1, handle_call/3, handle_cast/2, handle_info/2]). %% Outbound delivery worker per design §13.4. One gen_server per %% peer instance (peer-id atom) holding a FIFO queue of pending %% activities to deliver. v2 lands in stages: %% %% Step 8a pure-functional state shape, enqueue / drain / %% schedule semantics + gen_server skeleton + tests %% Step 8b retry / backoff schedule (30s / 5m / 30m / 6h / 24h) %% + dead-letter list %% Step 8c delivery-state projection so the queue survives %% kernel restart %% Step 8d outbox:publish/2 dispatches each delivery-set entry %% to the matching worker %% Step 8e httpc:request/4 BIF (substrate exception per briefing) %% Step 8f real HTTP POST through the BIF + content-type wiring %% %% This file is 8a only — pure state + skeleton gen_server with the %% APIs Step 8b-d will fill in. Real HTTP dispatch is stubbed via a %% caller-supplied `:dispatch_fn` so tests can intercept and Step 8f %% can plug in the live httpc call without touching the queue logic. %% %% State shape (pure): %% [{peer, PeerId}, %% {pending, [Activity, ...]}, %% FIFO; head delivered first %% {attempts, [{Cid, AttemptCount}, ...]}, %% {next_retry, [{Cid, NextRetryAt}, ...]}, %% Step 8b-pure %% {dead_letter, [Activity, ...]}, %% {dispatch_fn, fun/1 | undefined}] %% %% gen_server registers under the peer-id atom (one worker per peer); %% the same APIs work as pure-functional state transitions for tests. %% ── Pure-functional API ───────────────────────────────────────── new(PeerId) -> [{peer, PeerId}, {pending, []}, {attempts, []}, {next_retry, []}, {dead_letter, []}, {dispatch_fn, undefined}]. pending(State) -> field(pending, State). peer(State) -> field(peer, State). %% enqueue_pure/3 — append an activity to the queue. Returns new %% state. Duplicate :id activities aren't deduplicated here — that's %% the caller's job (Step 8d will pass each delivery-set entry once). enqueue_pure(_PeerId, Activity, State) -> Pending = field(pending, State), set_field(pending, Pending ++ [Activity], State). %% drain_pure/1 — attempt to deliver every queued activity through %% the configured dispatch_fn. Returns {NewState, DeliveredCids, %% RetryCids}. Activities that fail dispatch stay in :pending with %% an incremented attempt counter — Step 8b will use the count to %% pick a backoff slot. drain_pure(State) -> Pending = field(pending, State), drain_loop(Pending, [], State, [], []). drain_loop([], Kept, State, Delivered, Retry) -> {set_field(pending, Kept, State), Delivered, Retry}; drain_loop([A | Rest], Kept, State, Delivered, Retry) -> case deliver_one_pure(A, State) of {ok, Cid} -> drain_loop(Rest, Kept, State, Delivered ++ [Cid], Retry); {error, Cid, _Reason} -> State1 = bump_attempt(Cid, State), drain_loop(Rest, Kept ++ [A], State1, Delivered, Retry ++ [Cid]) end. %% deliver_one_pure/2 — single-activity dispatch via the caller- %% supplied dispatch_fn. Returns {ok, Cid} on success or {error, %% Cid, Reason} on failure. With no dispatch_fn configured returns %% {error, _, no_dispatch_fn} so callers know to wire one before %% the worker is useful. deliver_one_pure(Activity, State) -> Cid = activity_cid(Activity), case field(dispatch_fn, State) of undefined -> {error, Cid, no_dispatch_fn}; Fn when is_function(Fn, 1) -> case Fn(Activity) of ok -> {ok, Cid}; {ok, _} -> {ok, Cid}; {error, Reason} -> {error, Cid, Reason}; Other -> {error, Cid, {bad_dispatch_return, Other}} end; _ -> {error, Cid, bad_dispatch_fn} end. %% backoff_for/1 — Step 8a returns the static schedule per the %% plan; Step 8b wires it into the retry loop. Attempts are %% 1-indexed (first retry uses slot 1). %% %% 30s / 5m / 30m / 6h / 24h then dead_letter. backoff_for(0) -> 0; backoff_for(1) -> 30; backoff_for(2) -> 300; % 5 * 60 backoff_for(3) -> 1800; % 30 * 60 backoff_for(4) -> 21600; % 6 * 3600 backoff_for(5) -> 86400; % 24 * 3600 backoff_for(_) -> dead_letter. schedule_for(Attempts) -> case backoff_for(Attempts) of dead_letter -> dead_letter; Seconds -> {retry_in, Seconds} end. %% ── Step 8b-pure: retry-time bookkeeping ─────────────────────── %% %% `record_failure_pure/3(Cid, Now, State)` — call after a failed %% deliver_one. Bumps the per-cid attempt counter; if the new %% attempt is past the dead-letter threshold, moves the matching %% activity from :pending to :dead_letter. Otherwise records the %% next retry time as Now + backoff_for(NewAttempt). %% %% Real timer wiring (erlang:send_after self-cast on the worker %% pid) needs substrate support — Step 8b-timer when that lands. %% %% `record_success_pure/2(Cid, State)` — clears :attempts and %% :next_retry entries for the cid; called after a successful %% deliver_one. %% %% `next_due_pure/2(Now, State)` — returns the list of Cids whose %% NextRetryAt has passed, in insertion order. record_failure_pure(Cid, Now, State) -> Attempts = field(attempts, State), Current = case find_keyed(Cid, Attempts) of {ok, N} -> N; _ -> 0 end, New = Current + 1, State1 = set_field(attempts, set_keyed(Cid, New, Attempts), State), case backoff_for(New) of dead_letter -> move_to_dead_letter(Cid, State1); Seconds -> NextAt = Now + Seconds, NR = field(next_retry, State1), set_field(next_retry, set_keyed(Cid, NextAt, NR), State1) end. record_success_pure(Cid, State) -> A1 = del_keyed(Cid, field(attempts, State)), NR1 = del_keyed(Cid, field(next_retry, State)), set_field(attempts, A1, set_field(next_retry, NR1, State)). %% next_due_pure/2 — Cids whose NextRetryAt <= Now. Preserves %% insertion order so the worker drains them in FIFO retry order. next_due_pure(Now, State) -> [Cid || {Cid, At} <- field(next_retry, State), At =< Now]. attempts_for(Cid, State) -> case find_keyed(Cid, field(attempts, State)) of {ok, N} -> N; _ -> 0 end. next_retry_at(Cid, State) -> case find_keyed(Cid, field(next_retry, State)) of {ok, At} -> At; _ -> undefined end. dead_letter_list(State) -> field(dead_letter, State). move_to_dead_letter(Cid, State) -> Pending = field(pending, State), {Match, Rest} = take_by_cid(Cid, Pending, [], []), DL = field(dead_letter, State), State1 = set_field(pending, Rest, State), State2 = case Match of none -> State1; Act -> set_field(dead_letter, DL ++ [Act], State1) end, NR = field(next_retry, State2), set_field(next_retry, del_keyed(Cid, NR), State2). take_by_cid(_, [], Acc, _) -> {none, lists:reverse(Acc)}; take_by_cid(Cid, [A | Rest], Acc, _) -> case activity_cid(A) of Cid -> {A, lists:reverse(Acc) ++ Rest}; _ -> take_by_cid(Cid, Rest, [A | Acc], 0) end. %% ── gen_server wrapper ────────────────────────────────────────── start_link(PeerId) -> start_link(PeerId, undefined). start_link(PeerId, DispatchFn) -> Pid = gen_server:start_link(delivery_worker, [PeerId, DispatchFn]), erlang:register(PeerId, Pid), Pid. stop(PeerId) -> R = gen_server:call(PeerId, '$gen_stop'), erlang:unregister(PeerId), R. enqueue(PeerId, Activity) -> gen_server:call(PeerId, {enqueue, Activity}). flush(PeerId) -> gen_server:call(PeerId, flush). pending_srv(PeerId) -> gen_server:call(PeerId, get_pending). set_dispatch_fn(PeerId, Fn) -> gen_server:call(PeerId, {set_dispatch_fn, Fn}). %% gen_server callbacks init([PeerId, DispatchFn]) -> S0 = new(PeerId), {ok, set_field(dispatch_fn, DispatchFn, S0)}. handle_call({enqueue, Activity}, _From, State) -> {reply, ok, enqueue_pure(field(peer, State), Activity, State)}; handle_call(flush, _From, State) -> {NewState, Delivered, Retry} = drain_pure(State), {reply, {ok, Delivered, Retry}, NewState}; handle_call(get_pending, _From, State) -> {reply, field(pending, State), State}; handle_call({set_dispatch_fn, Fn}, _From, State) -> {reply, ok, set_field(dispatch_fn, Fn, State)}. handle_cast(_, S) -> {noreply, S}. handle_info(_, S) -> {noreply, S}. %% ── Internal ──────────────────────────────────────────────────── activity_cid(Activity) -> case envelope:get_field(id, Activity) of {ok, Cid} -> Cid; _ -> nil end. bump_attempt(Cid, State) -> Attempts = field(attempts, State), Current = case find_keyed(Cid, Attempts) of {ok, N} -> N; _ -> 0 end, set_field(attempts, set_keyed(Cid, Current + 1, Attempts), State). field(K, [{K, V} | _]) -> V; field(K, [_ | Rest]) -> field(K, Rest); field(_, []) -> undefined. set_field(K, V, []) -> [{K, V}]; set_field(K, V, [{K, _} | Rest]) -> [{K, V} | Rest]; set_field(K, V, [P | Rest]) -> [P | set_field(K, V, Rest)]. find_keyed(_, []) -> {error, not_found}; find_keyed(K, [{K, V} | _]) -> {ok, V}; find_keyed(K, [_ | Rest]) -> find_keyed(K, Rest). set_keyed(K, V, []) -> [{K, V}]; set_keyed(K, V, [{K, _} | Rest]) -> [{K, V} | Rest]; set_keyed(K, V, [P | Rest]) -> [P | set_keyed(K, V, Rest)]. del_keyed(_, []) -> []; del_keyed(K, [{K, _} | Rest]) -> Rest; del_keyed(K, [P | Rest]) -> [P | del_keyed(K, Rest)].