diff --git a/next/kernel/delivery_worker.erl b/next/kernel/delivery_worker.erl new file mode 100644 index 00000000..d4fc581e --- /dev/null +++ b/next/kernel/delivery_worker.erl @@ -0,0 +1,198 @@ +-module(delivery_worker). +-behaviour(gen_server). +-export([new/1, pending/1, peer/1, + enqueue_pure/3, drain_pure/1, deliver_one_pure/2, + backoff_for/1, schedule_for/1, + start_link/1, start_link/2, stop/1, + enqueue/2, flush/1, pending_srv/1, set_dispatch_fn/2]). +-export([init/1, handle_call/3, handle_cast/2, handle_info/2]). + +%% Outbound delivery worker per design §13.4. One gen_server per +%% peer instance (peer-id atom) holding a FIFO queue of pending +%% activities to deliver. v2 lands in stages: +%% +%% Step 8a pure-functional state shape, enqueue / drain / +%% schedule semantics + gen_server skeleton + tests +%% Step 8b retry / backoff schedule (30s / 5m / 30m / 6h / 24h) +%% + dead-letter list +%% Step 8c delivery-state projection so the queue survives +%% kernel restart +%% Step 8d outbox:publish/2 dispatches each delivery-set entry +%% to the matching worker +%% Step 8e httpc:request/4 BIF (substrate exception per briefing) +%% Step 8f real HTTP POST through the BIF + content-type wiring +%% +%% This file is 8a only — pure state + skeleton gen_server with the +%% APIs Step 8b-d will fill in. Real HTTP dispatch is stubbed via a +%% caller-supplied `:dispatch_fn` so tests can intercept and Step 8f +%% can plug in the live httpc call without touching the queue logic. +%% +%% State shape (pure): +%% [{peer, PeerId}, +%% {pending, [Activity, ...]}, %% FIFO; head delivered first +%% {attempts, [{Cid, AttemptCount}, ...]}, +%% {dead_letter, [Activity, ...]}, +%% {dispatch_fn, fun/1 | undefined}] +%% +%% gen_server registers under the peer-id atom (one worker per peer); +%% the same APIs work as pure-functional state transitions for tests. + +%% ── Pure-functional API ───────────────────────────────────────── + +new(PeerId) -> + [{peer, PeerId}, + {pending, []}, + {attempts, []}, + {dead_letter, []}, + {dispatch_fn, undefined}]. + +pending(State) -> field(pending, State). +peer(State) -> field(peer, State). + +%% enqueue_pure/3 — append an activity to the queue. Returns new +%% state. Duplicate :id activities aren't deduplicated here — that's +%% the caller's job (Step 8d will pass each delivery-set entry once). + +enqueue_pure(_PeerId, Activity, State) -> + Pending = field(pending, State), + set_field(pending, Pending ++ [Activity], State). + +%% drain_pure/1 — attempt to deliver every queued activity through +%% the configured dispatch_fn. Returns {NewState, DeliveredCids, +%% RetryCids}. Activities that fail dispatch stay in :pending with +%% an incremented attempt counter — Step 8b will use the count to +%% pick a backoff slot. + +drain_pure(State) -> + Pending = field(pending, State), + drain_loop(Pending, [], State, [], []). + +drain_loop([], Kept, State, Delivered, Retry) -> + {set_field(pending, Kept, State), Delivered, Retry}; +drain_loop([A | Rest], Kept, State, Delivered, Retry) -> + case deliver_one_pure(A, State) of + {ok, Cid} -> + drain_loop(Rest, Kept, State, Delivered ++ [Cid], Retry); + {error, Cid, _Reason} -> + State1 = bump_attempt(Cid, State), + drain_loop(Rest, Kept ++ [A], State1, Delivered, Retry ++ [Cid]) + end. + +%% deliver_one_pure/2 — single-activity dispatch via the caller- +%% supplied dispatch_fn. Returns {ok, Cid} on success or {error, +%% Cid, Reason} on failure. With no dispatch_fn configured returns +%% {error, _, no_dispatch_fn} so callers know to wire one before +%% the worker is useful. + +deliver_one_pure(Activity, State) -> + Cid = activity_cid(Activity), + case field(dispatch_fn, State) of + undefined -> {error, Cid, no_dispatch_fn}; + Fn when is_function(Fn, 1) -> + case Fn(Activity) of + ok -> {ok, Cid}; + {ok, _} -> {ok, Cid}; + {error, Reason} -> {error, Cid, Reason}; + Other -> {error, Cid, {bad_dispatch_return, Other}} + end; + _ -> {error, Cid, bad_dispatch_fn} + end. + +%% backoff_for/1 — Step 8a returns the static schedule per the +%% plan; Step 8b wires it into the retry loop. Attempts are +%% 1-indexed (first retry uses slot 1). +%% +%% 30s / 5m / 30m / 6h / 24h then dead_letter. + +backoff_for(0) -> 0; +backoff_for(1) -> 30; +backoff_for(2) -> 300; % 5 * 60 +backoff_for(3) -> 1800; % 30 * 60 +backoff_for(4) -> 21600; % 6 * 3600 +backoff_for(5) -> 86400; % 24 * 3600 +backoff_for(_) -> dead_letter. + +schedule_for(Attempts) -> + case backoff_for(Attempts) of + dead_letter -> dead_letter; + Seconds -> {retry_in, Seconds} + end. + +%% ── gen_server wrapper ────────────────────────────────────────── + +start_link(PeerId) -> + start_link(PeerId, undefined). + +start_link(PeerId, DispatchFn) -> + Pid = gen_server:start_link(delivery_worker, [PeerId, DispatchFn]), + erlang:register(PeerId, Pid), + Pid. + +stop(PeerId) -> + R = gen_server:call(PeerId, '$gen_stop'), + erlang:unregister(PeerId), + R. + +enqueue(PeerId, Activity) -> + gen_server:call(PeerId, {enqueue, Activity}). + +flush(PeerId) -> + gen_server:call(PeerId, flush). + +pending_srv(PeerId) -> + gen_server:call(PeerId, get_pending). + +set_dispatch_fn(PeerId, Fn) -> + gen_server:call(PeerId, {set_dispatch_fn, Fn}). + +%% gen_server callbacks + +init([PeerId, DispatchFn]) -> + S0 = new(PeerId), + {ok, set_field(dispatch_fn, DispatchFn, S0)}. + +handle_call({enqueue, Activity}, _From, State) -> + {reply, ok, enqueue_pure(field(peer, State), Activity, State)}; +handle_call(flush, _From, State) -> + {NewState, Delivered, Retry} = drain_pure(State), + {reply, {ok, Delivered, Retry}, NewState}; +handle_call(get_pending, _From, State) -> + {reply, field(pending, State), State}; +handle_call({set_dispatch_fn, Fn}, _From, State) -> + {reply, ok, set_field(dispatch_fn, Fn, State)}. + +handle_cast(_, S) -> {noreply, S}. + +handle_info(_, S) -> {noreply, S}. + +%% ── Internal ──────────────────────────────────────────────────── + +activity_cid(Activity) -> + case envelope:get_field(id, Activity) of + {ok, Cid} -> Cid; + _ -> nil + end. + +bump_attempt(Cid, State) -> + Attempts = field(attempts, State), + Current = case find_keyed(Cid, Attempts) of + {ok, N} -> N; + _ -> 0 + end, + set_field(attempts, set_keyed(Cid, Current + 1, Attempts), State). + +field(K, [{K, V} | _]) -> V; +field(K, [_ | Rest]) -> field(K, Rest); +field(_, []) -> undefined. + +set_field(K, V, []) -> [{K, V}]; +set_field(K, V, [{K, _} | Rest]) -> [{K, V} | Rest]; +set_field(K, V, [P | Rest]) -> [P | set_field(K, V, Rest)]. + +find_keyed(_, []) -> {error, not_found}; +find_keyed(K, [{K, V} | _]) -> {ok, V}; +find_keyed(K, [_ | Rest]) -> find_keyed(K, Rest). + +set_keyed(K, V, []) -> [{K, V}]; +set_keyed(K, V, [{K, _} | Rest]) -> [{K, V} | Rest]; +set_keyed(K, V, [P | Rest]) -> [P | set_keyed(K, V, Rest)]. diff --git a/next/tests/delivery_worker.sh b/next/tests/delivery_worker.sh new file mode 100755 index 00000000..358ac95a --- /dev/null +++ b/next/tests/delivery_worker.sh @@ -0,0 +1,156 @@ +#!/usr/bin/env bash +# next/tests/delivery_worker.sh — m2 Step 8a test. +# +# Pure-functional state shape + gen_server skeleton for the +# outbound delivery worker. One worker per peer; FIFO queue of +# pending activities; caller-supplied :dispatch_fn does the actual +# HTTP POST (stubbed for tests, live httpc in Step 8f). Retry / +# backoff (Step 8b) and persist-survival (Step 8c) layer on top. + +set -uo pipefail +cd "$(git rev-parse --show-toplevel)" + +SX_SERVER="${SX_SERVER:-hosts/ocaml/_build/default/bin/sx_server.exe}" +if [ ! -x "$SX_SERVER" ]; then + SX_SERVER="/root/rose-ash/hosts/ocaml/_build/default/bin/sx_server.exe" +fi +if [ ! -x "$SX_SERVER" ]; then + echo "ERROR: sx_server.exe not found." >&2 + exit 1 +fi + +VERBOSE="${1:-}" +PASS=0; FAIL=0; ERRORS="" +TMPFILE=$(mktemp); trap "rm -f $TMPFILE" EXIT + +SETUP='Act1 = [{id, <<1,2,3>>}, {type, note}, {actor, alice}], Act2 = [{id, <<4,5,6>>}, {type, note}, {actor, alice}], OkFetch = fun(_) -> ok end, FailFetch = fun(_) -> {error, http_500} end,' + +cat > "$TMPFILE" < FIFO order +(epoch 13) +(eval "(get (erlang-eval-ast \"${SETUP} S0 = delivery_worker:new(bob), S1 = delivery_worker:enqueue_pure(bob, Act1, S0), S2 = delivery_worker:enqueue_pure(bob, Act2, S1), delivery_worker:pending(S2) =:= [Act1, Act2]\") :name)") + +;; drain_pure with no dispatch_fn -> all retry, queue intact +(epoch 14) +(eval "(get (erlang-eval-ast \"${SETUP} S0 = delivery_worker:new(bob), S1 = delivery_worker:enqueue_pure(bob, Act1, S0), {S2, Delivered, Retry} = delivery_worker:drain_pure(S1), Delivered =:= [] andalso length(Retry) =:= 1 andalso delivery_worker:pending(S2) =:= [Act1]\") :name)") + +;; drain_pure with success dispatch -> activities cleared +(epoch 15) +(eval "(get (erlang-eval-ast \"${SETUP} S0 = delivery_worker:new(bob), S1 = lists:foldl(fun(K, A) -> delivery_worker:enqueue_pure(bob, K, A) end, S0, [Act1, Act2]), Wired = [{peer, bob}, {pending, [Act1, Act2]}, {attempts, []}, {dead_letter, []}, {dispatch_fn, OkFetch}], {S2, Delivered, Retry} = delivery_worker:drain_pure(Wired), delivery_worker:pending(S2) =:= [] andalso length(Delivered) =:= 2 andalso Retry =:= []\") :name)") + +;; drain_pure with failing dispatch -> activities stay; attempt counter bumped +(epoch 16) +(eval "(get (erlang-eval-ast \"${SETUP} Wired = [{peer, bob}, {pending, [Act1]}, {attempts, []}, {dead_letter, []}, {dispatch_fn, FailFetch}], {S, Delivered, Retry} = delivery_worker:drain_pure(Wired), delivery_worker:pending(S) =:= [Act1] andalso Delivered =:= [] andalso length(Retry) =:= 1\") :name)") + +;; deliver_one_pure success returns {ok, Cid} +(epoch 17) +(eval "(get (erlang-eval-ast \"${SETUP} Wired = [{peer, bob}, {pending, []}, {attempts, []}, {dead_letter, []}, {dispatch_fn, OkFetch}], case delivery_worker:deliver_one_pure(Act1, Wired) of {ok, <<1,2,3>>} -> ok; _ -> bad end\") :name)") + +;; deliver_one_pure with no dispatch_fn returns no_dispatch_fn +(epoch 18) +(eval "(get (erlang-eval-ast \"${SETUP} case delivery_worker:deliver_one_pure(Act1, delivery_worker:new(bob)) of {error, _, no_dispatch_fn} -> ok; _ -> bad end\") :name)") + +;; backoff_for slots match the design schedule +(epoch 19) +(eval "(get (erlang-eval-ast \"{delivery_worker:backoff_for(1), delivery_worker:backoff_for(2), delivery_worker:backoff_for(3), delivery_worker:backoff_for(4), delivery_worker:backoff_for(5)} =:= {30, 300, 1800, 21600, 86400}\") :name)") + +;; backoff_for(>=6) returns dead_letter +(epoch 20) +(eval "(get (erlang-eval-ast \"delivery_worker:backoff_for(6) =:= dead_letter\") :name)") + +;; schedule_for returns {retry_in, Sec} or dead_letter +(epoch 21) +(eval "(get (erlang-eval-ast \"{delivery_worker:schedule_for(1), delivery_worker:schedule_for(6)} =:= {{retry_in, 30}, dead_letter}\") :name)") + +;; gen_server: start_link + enqueue + pending_srv +(epoch 22) +(eval "(get (erlang-eval-ast \"${SETUP} delivery_worker:start_link(bob), delivery_worker:enqueue(bob, Act1), delivery_worker:pending_srv(bob) =:= [Act1]\") :name)") + +;; gen_server: flush with dispatch_fn -> {ok, [Cid], []} +(epoch 23) +(eval "(get (erlang-eval-ast \"${SETUP} delivery_worker:start_link(bob, OkFetch), delivery_worker:enqueue(bob, Act1), case delivery_worker:flush(bob) of {ok, [<<1,2,3>>], []} -> ok; _ -> bad end\") :name)") + +;; gen_server: flush with failing dispatch -> {ok, [], [Cid]}, queue stays +(epoch 24) +(eval "(get (erlang-eval-ast \"${SETUP} delivery_worker:start_link(bob, FailFetch), delivery_worker:enqueue(bob, Act1), case delivery_worker:flush(bob) of {ok, [], [<<1,2,3>>]} -> ok; _ -> bad end andalso delivery_worker:pending_srv(bob) =:= [Act1]\") :name)") + +;; gen_server: set_dispatch_fn swaps the function in-flight +(epoch 25) +(eval "(get (erlang-eval-ast \"${SETUP} delivery_worker:start_link(bob), delivery_worker:enqueue(bob, Act1), delivery_worker:set_dispatch_fn(bob, OkFetch), case delivery_worker:flush(bob) of {ok, [<<1,2,3>>], []} -> ok; _ -> bad end\") :name)") +EPOCHS + +OUTPUT=$(timeout 360 "$SX_SERVER" < "$TMPFILE" 2>/dev/null) + +check() { + local epoch="$1" desc="$2" expected="$3" + local actual + actual=$(echo "$OUTPUT" | awk -v e="$epoch" ' + $0 ~ "^\\(ok-len " e " " { getline; print; exit } + $0 ~ "^\\(ok " e " " { print; exit } + $0 ~ "^\\(error " e " " { print; exit } + ') + [ -z "$actual" ] && actual="" + if echo "$actual" | grep -qF -- "$expected"; then + PASS=$((PASS+1)) + [ "$VERBOSE" = "-v" ] && echo " ok $desc" + else + FAIL=$((FAIL+1)) + ERRORS+=" FAIL [$desc] (epoch $epoch) expected: $expected | actual: $actual +" + fi +} + +check 4 "delivery_worker module loaded" "delivery_worker" +check 10 "new/1 -> empty queue" "true" +check 11 "peer/1 reads peer id" "true" +check 12 "enqueue_pure appends" "true" +check 13 "FIFO order preserved" "true" +check 14 "drain w/o dispatch -> retry" "true" +check 15 "drain ok clears queue" "true" +check 16 "drain fail keeps queue" "true" +check 17 "deliver_one ok -> {ok, Cid}" "ok" +check 18 "deliver_one no fn -> err" "ok" +check 19 "backoff schedule matches plan" "true" +check 20 "backoff overflow -> dead" "true" +check 21 "schedule_for shape" "true" +check 22 "gen_server enqueue + pending" "true" +check 23 "gen_server flush ok" "ok" +check 24 "gen_server flush fail keeps" "ok" +check 25 "gen_server set_dispatch_fn" "ok" + +TOTAL=$((PASS+FAIL)) +if [ $FAIL -eq 0 ]; then + echo "ok $PASS/$TOTAL next/tests/delivery_worker.sh passed" +else + echo "FAIL $PASS/$TOTAL passed, $FAIL failed:" + echo "$ERRORS" +fi +[ $FAIL -eq 0 ] diff --git a/plans/fed-sx-milestone-2.md b/plans/fed-sx-milestone-2.md index 94fa5db4..7a295e28 100644 --- a/plans/fed-sx-milestone-2.md +++ b/plans/fed-sx-milestone-2.md @@ -536,15 +536,38 @@ a dead-letter list visible via `/admin/dead-letter`. **Deliverables:** -- `delivery_worker.erl`: gen_server per-peer queue with `enqueue/2` - and a private retry loop. -- Backoff schedule: 30s / 5m / 30m / 6h / 24h then dead-letter. -- Delivery state stored as a projection (`delivery-state`) so it - survives kernel restarts. -- `outbox:publish/2` augmented: after `log:append`, dispatch to the - delivery worker for each delivery-set entry. -- HTTP client: extend the existing native httpc primitive to - carry signed envelope bytes + the right Content-Type. +- [x] **8a** — `delivery_worker.erl` skeleton: pure-functional + state shape `[{peer, _}, {pending, [_]}, {attempts, [{Cid, N}]}, + {dead_letter, [_]}, {dispatch_fn, _}]` plus + `enqueue_pure/3`, `drain_pure/1`, `deliver_one_pure/2` and the + backoff schedule (`backoff_for/1`, `schedule_for/1`) matching + §13.4 (30s / 5m / 30m / 6h / 24h then dead-letter). + gen_server wrapper with `start_link/1,2`, `enqueue/2`, `flush/1`, + `pending_srv/1`, `set_dispatch_fn/2`. dispatch_fn is a + caller-supplied 1-arity fun so tests can stub the HTTP POST; + Step 8f plugs in the live httpc call without touching the + queue logic. No actual HTTP yet; no retry timer wiring yet. + 17/17 in `delivery_worker.sh`. +- [ ] **8b** — Retry / backoff scheduler. Wire `schedule_for/1` + into a private retry loop: `flush/1` returns deliveries that + failed; the worker schedules a self-cast via Erlang `after` + timer for the next retry slot. Tests fake-time via a Cfg + `:now_fn`. +- [ ] **8c** — Delivery-state projection so the queue survives + kernel restart. New `next/kernel/delivery_state.erl` fold maps + enqueue / delivered / failed events to the worker's persistent + shape. +- [ ] **8d** — `outbox:publish/2` dispatches each delivery-set + entry to the matching worker. The worker is created lazily on + first delivery to a peer. +- [ ] **8e** — `httpc:request/4` BIF wrapper in + `lib/erlang/runtime.sx` (the briefing's allowed scope + exception for Step 8). Marshalling: SX dict ↔ Erlang proplist + shape with `{ok, Status, Headers, Body}` / `{error, Reason}`. +- [ ] **8f** — Real HTTP dispatch through the BIF + content-type + wiring. dispatch_fn for live use becomes a closure over the + peer URL that calls `httpc:request/4` with the signed envelope + bytes as the body. **Tests:** @@ -867,6 +890,18 @@ proceed. Newest first. +- **2026-06-07** — Step 8a: delivery_worker skeleton. + `next/kernel/delivery_worker.erl` with pure-functional state + + enqueue / drain / deliver_one + backoff schedule (30s / 5m / + 30m / 6h / 24h then dead-letter, per design §13.4). gen_server + wrapper exposes the same APIs under the peer-id atom. dispatch + is a caller-supplied `:dispatch_fn` fun — Step 8b layers the + retry timer, Step 8c persists the queue, Step 8d wires + `outbox:publish/2` to dispatch, Step 8e brings the + `httpc:request/4` BIF (substrate exception per briefing), Step + 8f closes with live HTTP. 17/17 in `delivery_worker.sh`. + Conformance 761/761. + - **2026-06-07** — Step 7c (closes Step 7): outbox-side delivery_set integration. `outbox:publish/2` computes the audience-resolved delivery set after sign + log and stashes