Some checks failed
Test, Build, and Deploy / test-build-deploy (push) Failing after 45s
New next/kernel/delivery_state.erl folds delivery events into a
per-peer worker-shaped snapshot so the outbound queue survives
kernel restart.
Event proplist shapes:
[{type, enqueued}, {peer, _}, {activity, _}]
[{type, delivered}, {peer, _}, {cid, _}]
[{type, failed}, {peer, _}, {cid, _}, {now, _}]
[{type, dead_lettered}, {peer, _}, {cid, _}]
Projection state shape:
[{PeerId, [{peer, _}, {pending, _}, {attempts, _},
{next_retry, _}, {dead_letter, _}]}, ...]
Mirrors delivery_worker:new/1 (minus :dispatch_fn — that's the
live worker's concern) so a fresh gen_server can be hydrated
from the projection on restart.
Public API:
new/0
fold/2, fold_fn/0
peer_state/2, peers/1
pending/2, attempts/2, next_retry/2, dead_letter/2
The failed branch calls delivery_worker:backoff_for/1 directly,
so the projection and the live worker compute identical retry
slots and dead-letter thresholds. 6th failure -> dead-letter,
matching the worker.
14/14 in next/tests/delivery_state.sh covering:
- new/0 -> []
- enqueued appends to pending (FIFO)
- two peers maintain independent queues
- delivered clears matching pending entry
- failed bumps :attempts and sets :next_retry
- 6th failed -> dead-lettered (activity out of pending)
- explicit dead_lettered event moves activity to dead_letter
- peers/1 lists touched peers
- peer_state {ok, _} | not_found
- fold_fn/0 is fun/2 for projection:start_link
- unknown event type passes through
- delivered after failed clears retry state
delivery_worker.sh 17/17 unchanged, delivery_retry.sh 11/11
unchanged. Conformance preserved at 761/761.
The restart hydration helper (delivery_worker:state_from_proj/2
or similar) lands once 8b-timer can wire the live retry loop
(Blockers #3 — erlang:send_after substrate gap still open).
210 lines
7.0 KiB
Erlang
210 lines
7.0 KiB
Erlang
-module(delivery_state).
|
|
-export([new/0, fold/2, fold_fn/0,
|
|
peer_state/2, peers/1,
|
|
pending/2, attempts/2, next_retry/2, dead_letter/2]).
|
|
|
|
%% Delivery-state projection. Folds delivery events (enqueue /
|
|
%% delivered / failed / dead_lettered) into a per-peer worker-shaped
|
|
%% snapshot so the outbound queue survives kernel restart. Per design
|
|
%% §13.4 the worker state on restart is loaded from this projection
|
|
%% rather than reconstructed by re-driving the outbox log.
|
|
%%
|
|
%% Event proplist shape:
|
|
%% [{type, enqueued}, {peer, _}, {activity, _}]
|
|
%% [{type, delivered}, {peer, _}, {cid, _}]
|
|
%% [{type, failed}, {peer, _}, {cid, _}, {now, _}]
|
|
%% [{type, dead_lettered}, {peer, _}, {cid, _}]
|
|
%%
|
|
%% Projection state shape:
|
|
%% [{PeerId, WorkerProplist}, ...]
|
|
%%
|
|
%% WorkerProplist mirrors `delivery_worker:new/1`'s output so a fresh
|
|
%% gen_server can be hydrated with `delivery_worker:state_from_proj`
|
|
%% (lands when 8b-timer wires up). For Step 8c the projection only
|
|
%% tracks data — Step 8d-restart will wire the hydration helper.
|
|
|
|
new() -> [].
|
|
|
|
fold_fn() ->
|
|
fun (Event, State) -> fold(Event, State) end.
|
|
|
|
fold(Event, State) ->
|
|
case envelope:get_field(type, Event) of
|
|
{ok, enqueued} -> fold_enqueued(Event, State);
|
|
{ok, delivered} -> fold_delivered(Event, State);
|
|
{ok, failed} -> fold_failed(Event, State);
|
|
{ok, dead_lettered} -> fold_dead_lettered(Event, State);
|
|
_ -> State
|
|
end.
|
|
|
|
fold_enqueued(Event, State) ->
|
|
case {envelope:get_field(peer, Event),
|
|
envelope:get_field(activity, Event)} of
|
|
{{ok, Peer}, {ok, Act}} ->
|
|
Worker = ensure_peer(Peer, State),
|
|
Pending = field(pending, Worker),
|
|
Worker1 = set_field(pending, Pending ++ [Act], Worker),
|
|
set_peer(Peer, Worker1, State);
|
|
_ -> State
|
|
end.
|
|
|
|
fold_delivered(Event, State) ->
|
|
case {envelope:get_field(peer, Event),
|
|
envelope:get_field(cid, Event)} of
|
|
{{ok, Peer}, {ok, Cid}} ->
|
|
case find_keyed(Peer, State) of
|
|
{ok, Worker} ->
|
|
Worker1 = drop_pending_by_cid(Cid, Worker),
|
|
Worker2 = clear_retry_for(Cid, Worker1),
|
|
set_peer(Peer, Worker2, State);
|
|
_ -> State
|
|
end;
|
|
_ -> State
|
|
end.
|
|
|
|
fold_failed(Event, State) ->
|
|
case {envelope:get_field(peer, Event),
|
|
envelope:get_field(cid, Event),
|
|
envelope:get_field(now, Event)} of
|
|
{{ok, Peer}, {ok, Cid}, {ok, Now}} ->
|
|
case find_keyed(Peer, State) of
|
|
{ok, Worker} ->
|
|
Attempts = field(attempts, Worker),
|
|
Current = case find_keyed(Cid, Attempts) of
|
|
{ok, N} -> N;
|
|
_ -> 0
|
|
end,
|
|
New = Current + 1,
|
|
Attempts1 = set_keyed(Cid, New, Attempts),
|
|
Worker1 = set_field(attempts, Attempts1, Worker),
|
|
Worker2 = case delivery_worker:backoff_for(New) of
|
|
dead_letter ->
|
|
dead_letter_pending(Cid, Worker1);
|
|
Seconds ->
|
|
NR = field(next_retry, Worker1),
|
|
NextAt = Now + Seconds,
|
|
set_field(next_retry, set_keyed(Cid, NextAt, NR), Worker1)
|
|
end,
|
|
set_peer(Peer, Worker2, State);
|
|
_ -> State
|
|
end;
|
|
_ -> State
|
|
end.
|
|
|
|
fold_dead_lettered(Event, State) ->
|
|
case {envelope:get_field(peer, Event),
|
|
envelope:get_field(cid, Event)} of
|
|
{{ok, Peer}, {ok, Cid}} ->
|
|
case find_keyed(Peer, State) of
|
|
{ok, Worker} ->
|
|
set_peer(Peer, dead_letter_pending(Cid, Worker), State);
|
|
_ -> State
|
|
end;
|
|
_ -> State
|
|
end.
|
|
|
|
%% ── Accessors ─────────────────────────────────────────────────
|
|
|
|
peer_state(Peer, State) ->
|
|
case find_keyed(Peer, State) of
|
|
{ok, Worker} -> {ok, Worker};
|
|
_ -> not_found
|
|
end.
|
|
|
|
peers(State) -> [P || {P, _} <- State].
|
|
|
|
pending(Peer, State) ->
|
|
worker_field(Peer, pending, State, []).
|
|
|
|
attempts(Peer, State) ->
|
|
worker_field(Peer, attempts, State, []).
|
|
|
|
next_retry(Peer, State) ->
|
|
worker_field(Peer, next_retry, State, []).
|
|
|
|
dead_letter(Peer, State) ->
|
|
worker_field(Peer, dead_letter, State, []).
|
|
|
|
%% ── Internal ──────────────────────────────────────────────────
|
|
|
|
worker_field(Peer, Field, State, Default) ->
|
|
case find_keyed(Peer, State) of
|
|
{ok, Worker} ->
|
|
case find_keyed(Field, Worker) of
|
|
{ok, V} -> V;
|
|
_ -> Default
|
|
end;
|
|
_ -> Default
|
|
end.
|
|
|
|
ensure_peer(Peer, State) ->
|
|
case find_keyed(Peer, State) of
|
|
{ok, Worker} -> Worker;
|
|
_ -> empty_worker(Peer)
|
|
end.
|
|
|
|
empty_worker(Peer) ->
|
|
[{peer, Peer},
|
|
{pending, []},
|
|
{attempts, []},
|
|
{next_retry, []},
|
|
{dead_letter, []}].
|
|
|
|
set_peer(Peer, Worker, State) ->
|
|
set_keyed(Peer, Worker, State).
|
|
|
|
drop_pending_by_cid(Cid, Worker) ->
|
|
Pending = field(pending, Worker),
|
|
Kept = [A || A <- Pending, activity_cid(A) =/= Cid],
|
|
set_field(pending, Kept, Worker).
|
|
|
|
clear_retry_for(Cid, Worker) ->
|
|
A1 = del_keyed(Cid, field(attempts, Worker)),
|
|
NR1 = del_keyed(Cid, field(next_retry, Worker)),
|
|
set_field(attempts, A1, set_field(next_retry, NR1, Worker)).
|
|
|
|
dead_letter_pending(Cid, Worker) ->
|
|
Pending = field(pending, Worker),
|
|
{Match, Rest} = split_by_cid(Cid, Pending),
|
|
DL = field(dead_letter, Worker),
|
|
Worker1 = set_field(pending, Rest, Worker),
|
|
Worker2 = case Match of
|
|
none -> Worker1;
|
|
Act -> set_field(dead_letter, DL ++ [Act], Worker1)
|
|
end,
|
|
clear_retry_for(Cid, Worker2).
|
|
|
|
split_by_cid(Cid, List) -> split_by_cid(Cid, List, []).
|
|
split_by_cid(_, [], Acc) -> {none, lists:reverse(Acc)};
|
|
split_by_cid(Cid, [A | Rest], Acc) ->
|
|
case activity_cid(A) of
|
|
Cid -> {A, lists:reverse(Acc) ++ Rest};
|
|
_ -> split_by_cid(Cid, Rest, [A | Acc])
|
|
end.
|
|
|
|
activity_cid(Activity) ->
|
|
case envelope:get_field(id, Activity) of
|
|
{ok, Cid} -> Cid;
|
|
_ -> nil
|
|
end.
|
|
|
|
field(K, [{K, V} | _]) -> V;
|
|
field(K, [_ | Rest]) -> field(K, Rest);
|
|
field(_, []) -> undefined.
|
|
|
|
set_field(K, V, []) -> [{K, V}];
|
|
set_field(K, V, [{K, _} | Rest]) -> [{K, V} | Rest];
|
|
set_field(K, V, [P | Rest]) -> [P | set_field(K, V, Rest)].
|
|
|
|
find_keyed(_, []) -> {error, not_found};
|
|
find_keyed(K, [{K, V} | _]) -> {ok, V};
|
|
find_keyed(K, [_ | Rest]) -> find_keyed(K, Rest).
|
|
|
|
set_keyed(K, V, []) -> [{K, V}];
|
|
set_keyed(K, V, [{K, _} | Rest]) -> [{K, V} | Rest];
|
|
set_keyed(K, V, [P | Rest]) -> [P | set_keyed(K, V, Rest)].
|
|
|
|
del_keyed(_, []) -> [];
|
|
del_keyed(K, [{K, _} | Rest]) -> Rest;
|
|
del_keyed(K, [P | Rest]) -> [P | del_keyed(K, Rest)].
|