fed-sx-m2: Step 8a — delivery_worker skeleton + 17 tests
Some checks failed
Test, Build, and Deploy / test-build-deploy (push) Failing after 55s

next/kernel/delivery_worker.erl is the gen_server-per-peer
delivery queue per design §13.4. Step 8a lands the skeleton:
pure-functional state shape + enqueue / drain / deliver_one
helpers + backoff schedule + gen_server wrapper. No retry
timer wiring yet (Step 8b), no persist projection yet (8c),
no outbox dispatch wiring yet (8d), no httpc BIF yet (8e), no
live HTTP yet (8f).

State shape (pure):
  [{peer, PeerId},
   {pending, [Activity, ...]},          %% FIFO queue
   {attempts, [{Cid, AttemptCount}]},   %% per-cid retry count
   {dead_letter, [Activity, ...]},
   {dispatch_fn, fun/1 | undefined}]

Pure-functional API:
  new/1
  pending/1, peer/1
  enqueue_pure/3       — append to FIFO
  drain_pure/1         — attempt every queued; returns
                         {NewState, DeliveredCids, RetryCids}
  deliver_one_pure/2   — single dispatch via :dispatch_fn

Backoff schedule (§13.4): 30s / 5m / 30m / 6h / 24h then dead_letter
  backoff_for/1   — attempt -> seconds | dead_letter
  schedule_for/1  — attempt -> {retry_in, Sec} | dead_letter

gen_server (registered under peer-id atom):
  start_link/1, start_link/2(PeerId, DispatchFn)
  stop/1
  enqueue/2     — sync call
  flush/1       — drain + reply with {ok, Delivered, Retry}
  pending_srv/1
  set_dispatch_fn/2  — swap dispatch in flight

dispatch_fn is a caller-supplied 1-arity fun so tests can stub the
HTTP POST. Step 8f will plug in a closure over httpc:request/4
without touching the queue logic.

17/17 in next/tests/delivery_worker.sh covering:
  - new/peer/pending base cases
  - enqueue_pure FIFO append
  - drain_pure no-dispatch -> retry, queue intact
  - drain_pure ok dispatch -> queue empties + delivered list
  - drain_pure failing dispatch -> queue intact + retry list
  - deliver_one_pure {ok, Cid} and {error, _, no_dispatch_fn}
  - backoff_for slot values match §13.4
  - backoff_for >=6 returns dead_letter
  - schedule_for wraps the slot or dead_letter
  - gen_server start_link + enqueue + pending_srv
  - gen_server flush with ok dispatch (delivered)
  - gen_server flush with failing dispatch (queue kept)
  - gen_server set_dispatch_fn in-flight swap

Conformance 761/761.
This commit is contained in:
2026-06-07 01:01:17 +00:00
parent c6b4920074
commit bf4e034c4e
3 changed files with 398 additions and 9 deletions

View File

@@ -0,0 +1,198 @@
-module(delivery_worker).
-behaviour(gen_server).
-export([new/1, pending/1, peer/1,
enqueue_pure/3, drain_pure/1, deliver_one_pure/2,
backoff_for/1, schedule_for/1,
start_link/1, start_link/2, stop/1,
enqueue/2, flush/1, pending_srv/1, set_dispatch_fn/2]).
-export([init/1, handle_call/3, handle_cast/2, handle_info/2]).
%% Outbound delivery worker per design §13.4. One gen_server per
%% peer instance (peer-id atom) holding a FIFO queue of pending
%% activities to deliver. v2 lands in stages:
%%
%% Step 8a pure-functional state shape, enqueue / drain /
%% schedule semantics + gen_server skeleton + tests
%% Step 8b retry / backoff schedule (30s / 5m / 30m / 6h / 24h)
%% + dead-letter list
%% Step 8c delivery-state projection so the queue survives
%% kernel restart
%% Step 8d outbox:publish/2 dispatches each delivery-set entry
%% to the matching worker
%% Step 8e httpc:request/4 BIF (substrate exception per briefing)
%% Step 8f real HTTP POST through the BIF + content-type wiring
%%
%% This file is 8a only — pure state + skeleton gen_server with the
%% APIs Step 8b-d will fill in. Real HTTP dispatch is stubbed via a
%% caller-supplied `:dispatch_fn` so tests can intercept and Step 8f
%% can plug in the live httpc call without touching the queue logic.
%%
%% State shape (pure):
%% [{peer, PeerId},
%% {pending, [Activity, ...]}, %% FIFO; head delivered first
%% {attempts, [{Cid, AttemptCount}, ...]},
%% {dead_letter, [Activity, ...]},
%% {dispatch_fn, fun/1 | undefined}]
%%
%% gen_server registers under the peer-id atom (one worker per peer);
%% the same APIs work as pure-functional state transitions for tests.
%% ── Pure-functional API ─────────────────────────────────────────
new(PeerId) ->
[{peer, PeerId},
{pending, []},
{attempts, []},
{dead_letter, []},
{dispatch_fn, undefined}].
pending(State) -> field(pending, State).
peer(State) -> field(peer, State).
%% enqueue_pure/3 — append an activity to the queue. Returns new
%% state. Duplicate :id activities aren't deduplicated here — that's
%% the caller's job (Step 8d will pass each delivery-set entry once).
enqueue_pure(_PeerId, Activity, State) ->
Pending = field(pending, State),
set_field(pending, Pending ++ [Activity], State).
%% drain_pure/1 — attempt to deliver every queued activity through
%% the configured dispatch_fn. Returns {NewState, DeliveredCids,
%% RetryCids}. Activities that fail dispatch stay in :pending with
%% an incremented attempt counter — Step 8b will use the count to
%% pick a backoff slot.
drain_pure(State) ->
Pending = field(pending, State),
drain_loop(Pending, [], State, [], []).
drain_loop([], Kept, State, Delivered, Retry) ->
{set_field(pending, Kept, State), Delivered, Retry};
drain_loop([A | Rest], Kept, State, Delivered, Retry) ->
case deliver_one_pure(A, State) of
{ok, Cid} ->
drain_loop(Rest, Kept, State, Delivered ++ [Cid], Retry);
{error, Cid, _Reason} ->
State1 = bump_attempt(Cid, State),
drain_loop(Rest, Kept ++ [A], State1, Delivered, Retry ++ [Cid])
end.
%% deliver_one_pure/2 — single-activity dispatch via the caller-
%% supplied dispatch_fn. Returns {ok, Cid} on success or {error,
%% Cid, Reason} on failure. With no dispatch_fn configured returns
%% {error, _, no_dispatch_fn} so callers know to wire one before
%% the worker is useful.
deliver_one_pure(Activity, State) ->
Cid = activity_cid(Activity),
case field(dispatch_fn, State) of
undefined -> {error, Cid, no_dispatch_fn};
Fn when is_function(Fn, 1) ->
case Fn(Activity) of
ok -> {ok, Cid};
{ok, _} -> {ok, Cid};
{error, Reason} -> {error, Cid, Reason};
Other -> {error, Cid, {bad_dispatch_return, Other}}
end;
_ -> {error, Cid, bad_dispatch_fn}
end.
%% backoff_for/1 — Step 8a returns the static schedule per the
%% plan; Step 8b wires it into the retry loop. Attempts are
%% 1-indexed (first retry uses slot 1).
%%
%% 30s / 5m / 30m / 6h / 24h then dead_letter.
backoff_for(0) -> 0;
backoff_for(1) -> 30;
backoff_for(2) -> 300; % 5 * 60
backoff_for(3) -> 1800; % 30 * 60
backoff_for(4) -> 21600; % 6 * 3600
backoff_for(5) -> 86400; % 24 * 3600
backoff_for(_) -> dead_letter.
schedule_for(Attempts) ->
case backoff_for(Attempts) of
dead_letter -> dead_letter;
Seconds -> {retry_in, Seconds}
end.
%% ── gen_server wrapper ──────────────────────────────────────────
start_link(PeerId) ->
start_link(PeerId, undefined).
start_link(PeerId, DispatchFn) ->
Pid = gen_server:start_link(delivery_worker, [PeerId, DispatchFn]),
erlang:register(PeerId, Pid),
Pid.
stop(PeerId) ->
R = gen_server:call(PeerId, '$gen_stop'),
erlang:unregister(PeerId),
R.
enqueue(PeerId, Activity) ->
gen_server:call(PeerId, {enqueue, Activity}).
flush(PeerId) ->
gen_server:call(PeerId, flush).
pending_srv(PeerId) ->
gen_server:call(PeerId, get_pending).
set_dispatch_fn(PeerId, Fn) ->
gen_server:call(PeerId, {set_dispatch_fn, Fn}).
%% gen_server callbacks
init([PeerId, DispatchFn]) ->
S0 = new(PeerId),
{ok, set_field(dispatch_fn, DispatchFn, S0)}.
handle_call({enqueue, Activity}, _From, State) ->
{reply, ok, enqueue_pure(field(peer, State), Activity, State)};
handle_call(flush, _From, State) ->
{NewState, Delivered, Retry} = drain_pure(State),
{reply, {ok, Delivered, Retry}, NewState};
handle_call(get_pending, _From, State) ->
{reply, field(pending, State), State};
handle_call({set_dispatch_fn, Fn}, _From, State) ->
{reply, ok, set_field(dispatch_fn, Fn, State)}.
handle_cast(_, S) -> {noreply, S}.
handle_info(_, S) -> {noreply, S}.
%% ── Internal ────────────────────────────────────────────────────
activity_cid(Activity) ->
case envelope:get_field(id, Activity) of
{ok, Cid} -> Cid;
_ -> nil
end.
bump_attempt(Cid, State) ->
Attempts = field(attempts, State),
Current = case find_keyed(Cid, Attempts) of
{ok, N} -> N;
_ -> 0
end,
set_field(attempts, set_keyed(Cid, Current + 1, Attempts), State).
field(K, [{K, V} | _]) -> V;
field(K, [_ | Rest]) -> field(K, Rest);
field(_, []) -> undefined.
set_field(K, V, []) -> [{K, V}];
set_field(K, V, [{K, _} | Rest]) -> [{K, V} | Rest];
set_field(K, V, [P | Rest]) -> [P | set_field(K, V, Rest)].
find_keyed(_, []) -> {error, not_found};
find_keyed(K, [{K, V} | _]) -> {ok, V};
find_keyed(K, [_ | Rest]) -> find_keyed(K, Rest).
set_keyed(K, V, []) -> [{K, V}];
set_keyed(K, V, [{K, _} | Rest]) -> [{K, V} | Rest];
set_keyed(K, V, [P | Rest]) -> [P | set_keyed(K, V, Rest)].

156
next/tests/delivery_worker.sh Executable file
View File

@@ -0,0 +1,156 @@
#!/usr/bin/env bash
# next/tests/delivery_worker.sh — m2 Step 8a test.
#
# Pure-functional state shape + gen_server skeleton for the
# outbound delivery worker. One worker per peer; FIFO queue of
# pending activities; caller-supplied :dispatch_fn does the actual
# HTTP POST (stubbed for tests, live httpc in Step 8f). Retry /
# backoff (Step 8b) and persist-survival (Step 8c) layer on top.
set -uo pipefail
cd "$(git rev-parse --show-toplevel)"
SX_SERVER="${SX_SERVER:-hosts/ocaml/_build/default/bin/sx_server.exe}"
if [ ! -x "$SX_SERVER" ]; then
SX_SERVER="/root/rose-ash/hosts/ocaml/_build/default/bin/sx_server.exe"
fi
if [ ! -x "$SX_SERVER" ]; then
echo "ERROR: sx_server.exe not found." >&2
exit 1
fi
VERBOSE="${1:-}"
PASS=0; FAIL=0; ERRORS=""
TMPFILE=$(mktemp); trap "rm -f $TMPFILE" EXIT
SETUP='Act1 = [{id, <<1,2,3>>}, {type, note}, {actor, alice}], Act2 = [{id, <<4,5,6>>}, {type, note}, {actor, alice}], OkFetch = fun(_) -> ok end, FailFetch = fun(_) -> {error, http_500} end,'
cat > "$TMPFILE" <<EPOCHS
(epoch 1)
(load "lib/erlang/tokenizer.sx")
(load "lib/erlang/parser.sx")
(load "lib/erlang/parser-core.sx")
(load "lib/erlang/parser-expr.sx")
(load "lib/erlang/parser-module.sx")
(load "lib/erlang/transpile.sx")
(load "lib/erlang/runtime.sx")
(load "lib/erlang/vm/dispatcher.sx")
(epoch 2)
(eval "(er-load-gen-server!)")
(epoch 3)
(eval "(get (erlang-load-module (file-read \"next/kernel/envelope.erl\")) :name)")
(epoch 4)
(eval "(get (erlang-load-module (file-read \"next/kernel/delivery_worker.erl\")) :name)")
;; new/1 returns initial state with empty queue
(epoch 10)
(eval "(get (erlang-eval-ast \"delivery_worker:pending(delivery_worker:new(bob)) =:= []\") :name)")
;; peer/1 reads the peer id
(epoch 11)
(eval "(get (erlang-eval-ast \"delivery_worker:peer(delivery_worker:new(bob)) =:= bob\") :name)")
;; enqueue_pure appends to the queue
(epoch 12)
(eval "(get (erlang-eval-ast \"${SETUP} S = delivery_worker:enqueue_pure(bob, Act1, delivery_worker:new(bob)), delivery_worker:pending(S) =:= [Act1]\") :name)")
;; Two enqueues -> FIFO order
(epoch 13)
(eval "(get (erlang-eval-ast \"${SETUP} S0 = delivery_worker:new(bob), S1 = delivery_worker:enqueue_pure(bob, Act1, S0), S2 = delivery_worker:enqueue_pure(bob, Act2, S1), delivery_worker:pending(S2) =:= [Act1, Act2]\") :name)")
;; drain_pure with no dispatch_fn -> all retry, queue intact
(epoch 14)
(eval "(get (erlang-eval-ast \"${SETUP} S0 = delivery_worker:new(bob), S1 = delivery_worker:enqueue_pure(bob, Act1, S0), {S2, Delivered, Retry} = delivery_worker:drain_pure(S1), Delivered =:= [] andalso length(Retry) =:= 1 andalso delivery_worker:pending(S2) =:= [Act1]\") :name)")
;; drain_pure with success dispatch -> activities cleared
(epoch 15)
(eval "(get (erlang-eval-ast \"${SETUP} S0 = delivery_worker:new(bob), S1 = lists:foldl(fun(K, A) -> delivery_worker:enqueue_pure(bob, K, A) end, S0, [Act1, Act2]), Wired = [{peer, bob}, {pending, [Act1, Act2]}, {attempts, []}, {dead_letter, []}, {dispatch_fn, OkFetch}], {S2, Delivered, Retry} = delivery_worker:drain_pure(Wired), delivery_worker:pending(S2) =:= [] andalso length(Delivered) =:= 2 andalso Retry =:= []\") :name)")
;; drain_pure with failing dispatch -> activities stay; attempt counter bumped
(epoch 16)
(eval "(get (erlang-eval-ast \"${SETUP} Wired = [{peer, bob}, {pending, [Act1]}, {attempts, []}, {dead_letter, []}, {dispatch_fn, FailFetch}], {S, Delivered, Retry} = delivery_worker:drain_pure(Wired), delivery_worker:pending(S) =:= [Act1] andalso Delivered =:= [] andalso length(Retry) =:= 1\") :name)")
;; deliver_one_pure success returns {ok, Cid}
(epoch 17)
(eval "(get (erlang-eval-ast \"${SETUP} Wired = [{peer, bob}, {pending, []}, {attempts, []}, {dead_letter, []}, {dispatch_fn, OkFetch}], case delivery_worker:deliver_one_pure(Act1, Wired) of {ok, <<1,2,3>>} -> ok; _ -> bad end\") :name)")
;; deliver_one_pure with no dispatch_fn returns no_dispatch_fn
(epoch 18)
(eval "(get (erlang-eval-ast \"${SETUP} case delivery_worker:deliver_one_pure(Act1, delivery_worker:new(bob)) of {error, _, no_dispatch_fn} -> ok; _ -> bad end\") :name)")
;; backoff_for slots match the design schedule
(epoch 19)
(eval "(get (erlang-eval-ast \"{delivery_worker:backoff_for(1), delivery_worker:backoff_for(2), delivery_worker:backoff_for(3), delivery_worker:backoff_for(4), delivery_worker:backoff_for(5)} =:= {30, 300, 1800, 21600, 86400}\") :name)")
;; backoff_for(>=6) returns dead_letter
(epoch 20)
(eval "(get (erlang-eval-ast \"delivery_worker:backoff_for(6) =:= dead_letter\") :name)")
;; schedule_for returns {retry_in, Sec} or dead_letter
(epoch 21)
(eval "(get (erlang-eval-ast \"{delivery_worker:schedule_for(1), delivery_worker:schedule_for(6)} =:= {{retry_in, 30}, dead_letter}\") :name)")
;; gen_server: start_link + enqueue + pending_srv
(epoch 22)
(eval "(get (erlang-eval-ast \"${SETUP} delivery_worker:start_link(bob), delivery_worker:enqueue(bob, Act1), delivery_worker:pending_srv(bob) =:= [Act1]\") :name)")
;; gen_server: flush with dispatch_fn -> {ok, [Cid], []}
(epoch 23)
(eval "(get (erlang-eval-ast \"${SETUP} delivery_worker:start_link(bob, OkFetch), delivery_worker:enqueue(bob, Act1), case delivery_worker:flush(bob) of {ok, [<<1,2,3>>], []} -> ok; _ -> bad end\") :name)")
;; gen_server: flush with failing dispatch -> {ok, [], [Cid]}, queue stays
(epoch 24)
(eval "(get (erlang-eval-ast \"${SETUP} delivery_worker:start_link(bob, FailFetch), delivery_worker:enqueue(bob, Act1), case delivery_worker:flush(bob) of {ok, [], [<<1,2,3>>]} -> ok; _ -> bad end andalso delivery_worker:pending_srv(bob) =:= [Act1]\") :name)")
;; gen_server: set_dispatch_fn swaps the function in-flight
(epoch 25)
(eval "(get (erlang-eval-ast \"${SETUP} delivery_worker:start_link(bob), delivery_worker:enqueue(bob, Act1), delivery_worker:set_dispatch_fn(bob, OkFetch), case delivery_worker:flush(bob) of {ok, [<<1,2,3>>], []} -> ok; _ -> bad end\") :name)")
EPOCHS
OUTPUT=$(timeout 360 "$SX_SERVER" < "$TMPFILE" 2>/dev/null)
check() {
local epoch="$1" desc="$2" expected="$3"
local actual
actual=$(echo "$OUTPUT" | awk -v e="$epoch" '
$0 ~ "^\\(ok-len " e " " { getline; print; exit }
$0 ~ "^\\(ok " e " " { print; exit }
$0 ~ "^\\(error " e " " { print; exit }
')
[ -z "$actual" ] && actual="<no output for epoch $epoch>"
if echo "$actual" | grep -qF -- "$expected"; then
PASS=$((PASS+1))
[ "$VERBOSE" = "-v" ] && echo " ok $desc"
else
FAIL=$((FAIL+1))
ERRORS+=" FAIL [$desc] (epoch $epoch) expected: $expected | actual: $actual
"
fi
}
check 4 "delivery_worker module loaded" "delivery_worker"
check 10 "new/1 -> empty queue" "true"
check 11 "peer/1 reads peer id" "true"
check 12 "enqueue_pure appends" "true"
check 13 "FIFO order preserved" "true"
check 14 "drain w/o dispatch -> retry" "true"
check 15 "drain ok clears queue" "true"
check 16 "drain fail keeps queue" "true"
check 17 "deliver_one ok -> {ok, Cid}" "ok"
check 18 "deliver_one no fn -> err" "ok"
check 19 "backoff schedule matches plan" "true"
check 20 "backoff overflow -> dead" "true"
check 21 "schedule_for shape" "true"
check 22 "gen_server enqueue + pending" "true"
check 23 "gen_server flush ok" "ok"
check 24 "gen_server flush fail keeps" "ok"
check 25 "gen_server set_dispatch_fn" "ok"
TOTAL=$((PASS+FAIL))
if [ $FAIL -eq 0 ]; then
echo "ok $PASS/$TOTAL next/tests/delivery_worker.sh passed"
else
echo "FAIL $PASS/$TOTAL passed, $FAIL failed:"
echo "$ERRORS"
fi
[ $FAIL -eq 0 ]

View File

@@ -536,15 +536,38 @@ a dead-letter list visible via `/admin/dead-letter`.
**Deliverables:**
- `delivery_worker.erl`: gen_server per-peer queue with `enqueue/2`
and a private retry loop.
- Backoff schedule: 30s / 5m / 30m / 6h / 24h then dead-letter.
- Delivery state stored as a projection (`delivery-state`) so it
survives kernel restarts.
- `outbox:publish/2` augmented: after `log:append`, dispatch to the
delivery worker for each delivery-set entry.
- HTTP client: extend the existing native httpc primitive to
carry signed envelope bytes + the right Content-Type.
- [x] **8a** — `delivery_worker.erl` skeleton: pure-functional
state shape `[{peer, _}, {pending, [_]}, {attempts, [{Cid, N}]},
{dead_letter, [_]}, {dispatch_fn, _}]` plus
`enqueue_pure/3`, `drain_pure/1`, `deliver_one_pure/2` and the
backoff schedule (`backoff_for/1`, `schedule_for/1`) matching
§13.4 (30s / 5m / 30m / 6h / 24h then dead-letter).
gen_server wrapper with `start_link/1,2`, `enqueue/2`, `flush/1`,
`pending_srv/1`, `set_dispatch_fn/2`. dispatch_fn is a
caller-supplied 1-arity fun so tests can stub the HTTP POST;
Step 8f plugs in the live httpc call without touching the
queue logic. No actual HTTP yet; no retry timer wiring yet.
17/17 in `delivery_worker.sh`.
- [ ] **8b** — Retry / backoff scheduler. Wire `schedule_for/1`
into a private retry loop: `flush/1` returns deliveries that
failed; the worker schedules a self-cast via Erlang `after`
timer for the next retry slot. Tests fake-time via a Cfg
`:now_fn`.
- [ ] **8c** — Delivery-state projection so the queue survives
kernel restart. New `next/kernel/delivery_state.erl` fold maps
enqueue / delivered / failed events to the worker's persistent
shape.
- [ ] **8d** — `outbox:publish/2` dispatches each delivery-set
entry to the matching worker. The worker is created lazily on
first delivery to a peer.
- [ ] **8e** — `httpc:request/4` BIF wrapper in
`lib/erlang/runtime.sx` (the briefing's allowed scope
exception for Step 8). Marshalling: SX dict ↔ Erlang proplist
shape with `{ok, Status, Headers, Body}` / `{error, Reason}`.
- [ ] **8f** — Real HTTP dispatch through the BIF + content-type
wiring. dispatch_fn for live use becomes a closure over the
peer URL that calls `httpc:request/4` with the signed envelope
bytes as the body.
**Tests:**
@@ -867,6 +890,18 @@ proceed.
Newest first.
- **2026-06-07** — Step 8a: delivery_worker skeleton.
`next/kernel/delivery_worker.erl` with pure-functional state +
enqueue / drain / deliver_one + backoff schedule (30s / 5m /
30m / 6h / 24h then dead-letter, per design §13.4). gen_server
wrapper exposes the same APIs under the peer-id atom. dispatch
is a caller-supplied `:dispatch_fn` fun — Step 8b layers the
retry timer, Step 8c persists the queue, Step 8d wires
`outbox:publish/2` to dispatch, Step 8e brings the
`httpc:request/4` BIF (substrate exception per briefing), Step
8f closes with live HTTP. 17/17 in `delivery_worker.sh`.
Conformance 761/761.
- **2026-06-07** — Step 7c (closes Step 7): outbox-side
delivery_set integration. `outbox:publish/2` computes the
audience-resolved delivery set after sign + log and stashes