-module(delivery_state). -export([new/0, fold/2, fold_fn/0, peer_state/2, peers/1, pending/2, attempts/2, next_retry/2, dead_letter/2]). %% Delivery-state projection. Folds delivery events (enqueue / %% delivered / failed / dead_lettered) into a per-peer worker-shaped %% snapshot so the outbound queue survives kernel restart. Per design %% §13.4 the worker state on restart is loaded from this projection %% rather than reconstructed by re-driving the outbox log. %% %% Event proplist shape: %% [{type, enqueued}, {peer, _}, {activity, _}] %% [{type, delivered}, {peer, _}, {cid, _}] %% [{type, failed}, {peer, _}, {cid, _}, {now, _}] %% [{type, dead_lettered}, {peer, _}, {cid, _}] %% %% Projection state shape: %% [{PeerId, WorkerProplist}, ...] %% %% WorkerProplist mirrors `delivery_worker:new/1`'s output so a fresh %% gen_server can be hydrated with `delivery_worker:state_from_proj` %% (lands when 8b-timer wires up). For Step 8c the projection only %% tracks data — Step 8d-restart will wire the hydration helper. new() -> []. fold_fn() -> fun (Event, State) -> fold(Event, State) end. fold(Event, State) -> case envelope:get_field(type, Event) of {ok, enqueued} -> fold_enqueued(Event, State); {ok, delivered} -> fold_delivered(Event, State); {ok, failed} -> fold_failed(Event, State); {ok, dead_lettered} -> fold_dead_lettered(Event, State); _ -> State end. fold_enqueued(Event, State) -> case {envelope:get_field(peer, Event), envelope:get_field(activity, Event)} of {{ok, Peer}, {ok, Act}} -> Worker = ensure_peer(Peer, State), Pending = field(pending, Worker), Worker1 = set_field(pending, Pending ++ [Act], Worker), set_peer(Peer, Worker1, State); _ -> State end. fold_delivered(Event, State) -> case {envelope:get_field(peer, Event), envelope:get_field(cid, Event)} of {{ok, Peer}, {ok, Cid}} -> case find_keyed(Peer, State) of {ok, Worker} -> Worker1 = drop_pending_by_cid(Cid, Worker), Worker2 = clear_retry_for(Cid, Worker1), set_peer(Peer, Worker2, State); _ -> State end; _ -> State end. fold_failed(Event, State) -> case {envelope:get_field(peer, Event), envelope:get_field(cid, Event), envelope:get_field(now, Event)} of {{ok, Peer}, {ok, Cid}, {ok, Now}} -> case find_keyed(Peer, State) of {ok, Worker} -> Attempts = field(attempts, Worker), Current = case find_keyed(Cid, Attempts) of {ok, N} -> N; _ -> 0 end, New = Current + 1, Attempts1 = set_keyed(Cid, New, Attempts), Worker1 = set_field(attempts, Attempts1, Worker), Worker2 = case delivery_worker:backoff_for(New) of dead_letter -> dead_letter_pending(Cid, Worker1); Seconds -> NR = field(next_retry, Worker1), NextAt = Now + Seconds, set_field(next_retry, set_keyed(Cid, NextAt, NR), Worker1) end, set_peer(Peer, Worker2, State); _ -> State end; _ -> State end. fold_dead_lettered(Event, State) -> case {envelope:get_field(peer, Event), envelope:get_field(cid, Event)} of {{ok, Peer}, {ok, Cid}} -> case find_keyed(Peer, State) of {ok, Worker} -> set_peer(Peer, dead_letter_pending(Cid, Worker), State); _ -> State end; _ -> State end. %% ── Accessors ───────────────────────────────────────────────── peer_state(Peer, State) -> case find_keyed(Peer, State) of {ok, Worker} -> {ok, Worker}; _ -> not_found end. peers(State) -> [P || {P, _} <- State]. pending(Peer, State) -> worker_field(Peer, pending, State, []). attempts(Peer, State) -> worker_field(Peer, attempts, State, []). next_retry(Peer, State) -> worker_field(Peer, next_retry, State, []). dead_letter(Peer, State) -> worker_field(Peer, dead_letter, State, []). %% ── Internal ────────────────────────────────────────────────── worker_field(Peer, Field, State, Default) -> case find_keyed(Peer, State) of {ok, Worker} -> case find_keyed(Field, Worker) of {ok, V} -> V; _ -> Default end; _ -> Default end. ensure_peer(Peer, State) -> case find_keyed(Peer, State) of {ok, Worker} -> Worker; _ -> empty_worker(Peer) end. empty_worker(Peer) -> [{peer, Peer}, {pending, []}, {attempts, []}, {next_retry, []}, {dead_letter, []}]. set_peer(Peer, Worker, State) -> set_keyed(Peer, Worker, State). drop_pending_by_cid(Cid, Worker) -> Pending = field(pending, Worker), Kept = [A || A <- Pending, activity_cid(A) =/= Cid], set_field(pending, Kept, Worker). clear_retry_for(Cid, Worker) -> A1 = del_keyed(Cid, field(attempts, Worker)), NR1 = del_keyed(Cid, field(next_retry, Worker)), set_field(attempts, A1, set_field(next_retry, NR1, Worker)). dead_letter_pending(Cid, Worker) -> Pending = field(pending, Worker), {Match, Rest} = split_by_cid(Cid, Pending), DL = field(dead_letter, Worker), Worker1 = set_field(pending, Rest, Worker), Worker2 = case Match of none -> Worker1; Act -> set_field(dead_letter, DL ++ [Act], Worker1) end, clear_retry_for(Cid, Worker2). split_by_cid(Cid, List) -> split_by_cid(Cid, List, []). split_by_cid(_, [], Acc) -> {none, lists:reverse(Acc)}; split_by_cid(Cid, [A | Rest], Acc) -> case activity_cid(A) of Cid -> {A, lists:reverse(Acc) ++ Rest}; _ -> split_by_cid(Cid, Rest, [A | Acc]) end. activity_cid(Activity) -> case envelope:get_field(id, Activity) of {ok, Cid} -> Cid; _ -> nil end. field(K, [{K, V} | _]) -> V; field(K, [_ | Rest]) -> field(K, Rest); field(_, []) -> undefined. set_field(K, V, []) -> [{K, V}]; set_field(K, V, [{K, _} | Rest]) -> [{K, V} | Rest]; set_field(K, V, [P | Rest]) -> [P | set_field(K, V, Rest)]. find_keyed(_, []) -> {error, not_found}; find_keyed(K, [{K, V} | _]) -> {ok, V}; find_keyed(K, [_ | Rest]) -> find_keyed(K, Rest). set_keyed(K, V, []) -> [{K, V}]; set_keyed(K, V, [{K, _} | Rest]) -> [{K, V} | Rest]; set_keyed(K, V, [P | Rest]) -> [P | set_keyed(K, V, Rest)]. del_keyed(_, []) -> []; del_keyed(K, [{K, _} | Rest]) -> Rest; del_keyed(K, [P | Rest]) -> [P | del_keyed(K, Rest)].