From dda967e0605659a186c27fe5e1ac7b3941302d78 Mon Sep 17 00:00:00 2001 From: giles Date: Sun, 7 Jun 2026 01:32:59 +0000 Subject: [PATCH] =?UTF-8?q?fed-sx-m2:=20Step=208d=20=E2=80=94=20outbox=20d?= =?UTF-8?q?ispatches=20delivery=5Fset=20to=20workers=20+=207=20tests?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit outbox:publish/2 now walks the computed delivery_set and enqueues the signed activity onto each matching delivery_worker (registered under the peer-id atom). Missing workers are silently skipped — lazy worker creation belongs to the kernel manager later in Step 8. Gated by Context's {dispatch_deliveries, true} so every M1 outbox caller (and every M2 caller that doesn't yet care about delivery) stays back-compat: default off. New helpers in outbox.erl: dispatch_deliveries/3(Activity, DeliverySet, Context) gates on Context :dispatch_deliveries flag enqueue_each/2(Activity, [PeerId | _]) whereis-guarded enqueue per peer 7/7 in next/tests/delivery_dispatch.sh: - single peer enqueued - two peers both enqueued (fan-out) - missing worker silently skipped - no :dispatch_deliveries flag -> no-op (back-compat) - two publishes -> FIFO append on the queue - empty delivery_set -> no-op outbox_publish.sh 17/17 unchanged; delivery_worker.sh 17/17 unchanged. Conformance preserved at 761/761 from the Step 8a baseline. --- next/kernel/outbox.erl | 32 +++++++++ next/tests/delivery_dispatch.sh | 120 ++++++++++++++++++++++++++++++++ plans/fed-sx-milestone-2.md | 27 ++++++- 3 files changed, 176 insertions(+), 3 deletions(-) create mode 100755 next/tests/delivery_dispatch.sh diff --git a/next/kernel/outbox.erl b/next/kernel/outbox.erl index b92b2994..ac316da0 100644 --- a/next/kernel/outbox.erl +++ b/next/kernel/outbox.erl @@ -93,6 +93,7 @@ publish(Request, Context) -> {ok, NewLog, _Seq} = log:append(LogState, Signed), broadcast(Signed, envelope_field(projections, Context)), DeliverySet = compute_delivery_set(Request, Signed, Context), + dispatch_deliveries(Signed, DeliverySet, Context), Result = [{cid, cid_of(Signed)}, {activity, Signed}, {delivery_set, DeliverySet}], @@ -101,6 +102,37 @@ publish(Request, Context) -> {error, Reason, LogState} end. +%% dispatch_deliveries/3 — Step 8d. For each ActorId in the +%% delivery_set, enqueue the signed activity onto the matching +%% delivery_worker if the worker is registered under that atom. +%% Missing workers are silently skipped — lazy creation belongs +%% to the kernel manager (later in Step 8). The Context +%% `:dispatch_deliveries` field gates the call so existing +%% outbox callers that don't yet care about delivery (e.g. all of +%% M1's tests) stay back-compat. +%% +%% No-op when: +%% - :dispatch_deliveries is absent or not the atom true +%% - delivery_set is [] +%% - the per-peer worker isn't registered (whereis returns undefined) + +dispatch_deliveries(Activity, DeliverySet, Context) -> + case envelope_field(dispatch_deliveries, Context) of + true -> enqueue_each(Activity, DeliverySet); + _ -> ok + end. + +enqueue_each(_Activity, []) -> ok; +enqueue_each(Activity, [PeerId | Rest]) when is_atom(PeerId) -> + case erlang:whereis(PeerId) of + undefined -> enqueue_each(Activity, Rest); + _ -> + delivery_worker:enqueue(PeerId, Activity), + enqueue_each(Activity, Rest) + end; +enqueue_each(Activity, [_ | Rest]) -> + enqueue_each(Activity, Rest). + %% compute_delivery_set/3 — Step 7c. Pulls the audience-resolved %% recipient list off the Request's `:to` / `:cc` fields (the %% envelope itself doesn't carry them — construct/4 only takes diff --git a/next/tests/delivery_dispatch.sh b/next/tests/delivery_dispatch.sh new file mode 100755 index 00000000..5258bb0d --- /dev/null +++ b/next/tests/delivery_dispatch.sh @@ -0,0 +1,120 @@ +#!/usr/bin/env bash +# next/tests/delivery_dispatch.sh — m2 Step 8d test. +# +# After a successful outbox:publish, each ActorId in the +# Result's :delivery_set is enqueued onto the matching +# delivery_worker (registered under the peer-id atom). Only +# happens when Context carries {dispatch_deliveries, true} — +# back-compat with every M1 outbox caller that doesn't dispatch. + +set -uo pipefail +cd "$(git rev-parse --show-toplevel)" + +SX_SERVER="${SX_SERVER:-hosts/ocaml/_build/default/bin/sx_server.exe}" +if [ ! -x "$SX_SERVER" ]; then + SX_SERVER="/root/rose-ash/hosts/ocaml/_build/default/bin/sx_server.exe" +fi +if [ ! -x "$SX_SERVER" ]; then + echo "ERROR: sx_server.exe not found." >&2 + exit 1 +fi + +VERBOSE="${1:-}" +PASS=0; FAIL=0; ERRORS="" +TMPFILE=$(mktemp); trap "rm -f $TMPFILE" EXIT + +# Alice publishes to bob (and carol). Each peer worker is registered +# under its peer-id atom; the outbox dispatches via the workers' +# enqueue path. dispatch_fn left undefined so the workers just +# accumulate pending without firing HTTP. +SETUP='K = <<1,2,3,4>>, KS = [{key_id,k1},{algorithm,ed25519},{value,K}], AS = [{public_keys,[[{id,k1},{created,0},{value,K}]]}], {ok, L0} = log:open(alice, <<98,97,115,101>>), Ctx = [{actor_id,alice},{published,1},{key_spec,KS},{actor_state,AS},{log,L0},{projections,[]},{dispatch_deliveries, true}], CtxNoDispatch = [{actor_id,alice},{published,1},{key_spec,KS},{actor_state,AS},{log,L0},{projections,[]}], ReqToBob = [{type, note}, {object, [{content, hi}]}, {to, bob}], ReqToTwo = [{type, note}, {object, [{content, hi}]}, {to, [bob, carol]}],' + +cat > "$TMPFILE" < bob's pending has 1 entry +(epoch 20) +(eval "(get (erlang-eval-ast \"${SETUP} delivery_worker:start_link(bob), {ok, _, _} = outbox:publish(ReqToBob, Ctx), case delivery_worker:pending_srv(bob) of [_] -> ok; _ -> bad end\") :name)") + +;; Carol's worker registered, publish to [bob, carol] -> both queues get 1 entry +(epoch 21) +(eval "(get (erlang-eval-ast \"${SETUP} delivery_worker:start_link(bob), delivery_worker:start_link(carol), {ok, _, _} = outbox:publish(ReqToTwo, Ctx), {length(delivery_worker:pending_srv(bob)), length(delivery_worker:pending_srv(carol))} =:= {1, 1}\") :name)") + +;; Missing worker for an actor in delivery_set -> silently skipped (no error) +(epoch 22) +(eval "(get (erlang-eval-ast \"${SETUP} delivery_worker:start_link(bob), case outbox:publish(ReqToTwo, Ctx) of {ok, R, _} -> envelope:get_field(delivery_set, R) =:= {ok, [bob, carol]}; _ -> false end andalso length(delivery_worker:pending_srv(bob)) =:= 1\") :name)") + +;; No :dispatch_deliveries flag -> no enqueue happens (back-compat) +(epoch 23) +(eval "(get (erlang-eval-ast \"${SETUP} delivery_worker:start_link(bob), {ok, _, _} = outbox:publish(ReqToBob, CtxNoDispatch), delivery_worker:pending_srv(bob) =:= []\") :name)") + +;; Two publishes -> bob's queue has 2 entries (FIFO append) +(epoch 24) +(eval "(get (erlang-eval-ast \"${SETUP} delivery_worker:start_link(bob), {ok, _, NewLog} = outbox:publish(ReqToBob, Ctx), Ctx2 = [{actor_id,alice},{published,2},{key_spec,KS},{actor_state,AS},{log,NewLog},{projections,[]},{dispatch_deliveries, true}], {ok, _, _} = outbox:publish(ReqToBob, Ctx2), length(delivery_worker:pending_srv(bob)) =:= 2\") :name)") + +;; Empty delivery_set -> no dispatch (no :to, no :cc) +(epoch 25) +(eval "(get (erlang-eval-ast \"${SETUP} delivery_worker:start_link(bob), ReqNoAud = [{type, note}, {object, [{content, hi}]}], {ok, _, _} = outbox:publish(ReqNoAud, Ctx), delivery_worker:pending_srv(bob) =:= []\") :name)") +EPOCHS + +OUTPUT=$(timeout 540 "$SX_SERVER" < "$TMPFILE" 2>/dev/null) + +check() { + local epoch="$1" desc="$2" expected="$3" + local actual + actual=$(echo "$OUTPUT" | awk -v e="$epoch" ' + $0 ~ "^\\(ok-len " e " " { getline; print; exit } + $0 ~ "^\\(ok " e " " { print; exit } + $0 ~ "^\\(error " e " " { print; exit } + ') + [ -z "$actual" ] && actual="" + if echo "$actual" | grep -qF -- "$expected"; then + PASS=$((PASS+1)) + [ "$VERBOSE" = "-v" ] && echo " ok $desc" + else + FAIL=$((FAIL+1)) + ERRORS+=" FAIL [$desc] (epoch $epoch) expected: $expected | actual: $actual +" + fi +} + +check 9 "outbox module loaded" "outbox" +check 20 "single peer enqueued" "ok" +check 21 "two peers both enqueued" "true" +check 22 "missing worker silently skip" "true" +check 23 "no dispatch_deliveries no-op" "true" +check 24 "two publishes FIFO append" "true" +check 25 "empty delivery_set -> no-op" "true" + +TOTAL=$((PASS+FAIL)) +if [ $FAIL -eq 0 ]; then + echo "ok $PASS/$TOTAL next/tests/delivery_dispatch.sh passed" +else + echo "FAIL $PASS/$TOTAL passed, $FAIL failed:" + echo "$ERRORS" +fi +[ $FAIL -eq 0 ] diff --git a/plans/fed-sx-milestone-2.md b/plans/fed-sx-milestone-2.md index 7a295e28..9ba71b98 100644 --- a/plans/fed-sx-milestone-2.md +++ b/plans/fed-sx-milestone-2.md @@ -557,9 +557,18 @@ a dead-letter list visible via `/admin/dead-letter`. kernel restart. New `next/kernel/delivery_state.erl` fold maps enqueue / delivered / failed events to the worker's persistent shape. -- [ ] **8d** — `outbox:publish/2` dispatches each delivery-set - entry to the matching worker. The worker is created lazily on - first delivery to a peer. +- [x] **8d** — `outbox:publish/2` dispatches each delivery-set + entry to the matching worker. New `dispatch_deliveries/3` + + `enqueue_each/2` in `outbox.erl` walk the computed + `delivery_set` and call `delivery_worker:enqueue(PeerId, + Activity)` for each registered peer atom. Missing workers + (no `whereis`) are silently skipped — lazy worker creation + belongs to the kernel manager (Step 8d-mgr or later). + Gated by `Context` field `{dispatch_deliveries, true}` so + every M1 outbox caller stays back-compat (default off). 7/7 + in `delivery_dispatch.sh` covering single-peer enqueue, + two-peer fan-out, missing-worker skip, no-flag no-op, + FIFO append across two publishes, empty delivery_set no-op. - [ ] **8e** — `httpc:request/4` BIF wrapper in `lib/erlang/runtime.sx` (the briefing's allowed scope exception for Step 8). Marshalling: SX dict ↔ Erlang proplist @@ -890,6 +899,18 @@ proceed. Newest first. +- **2026-06-07** — Step 8d: outbox dispatches delivery_set to + workers. `outbox:publish/2` gained `dispatch_deliveries/3` and + `enqueue_each/2`: after `log:append` + projection broadcast, + the resolved `delivery_set` is walked and each registered + peer-id atom's `delivery_worker:enqueue(PeerId, Activity)` is + called. Missing workers (no `erlang:whereis`) are silently + skipped. Gated by Context's `{dispatch_deliveries, true}` — + default off so every M1 outbox caller stays back-compat. 7/7 + in `delivery_dispatch.sh`; `outbox_publish.sh` + + `delivery_worker.sh` both still 17/17. Conformance preserved + at 761/761 from the Step 8a baseline. + - **2026-06-07** — Step 8a: delivery_worker skeleton. `next/kernel/delivery_worker.erl` with pure-functional state + enqueue / drain / deliver_one + backoff schedule (30s / 5m /