fed-sx-m2: Step 8d — outbox dispatches delivery_set to workers + 7 tests
Some checks failed
Test, Build, and Deploy / test-build-deploy (push) Failing after 32s
Some checks failed
Test, Build, and Deploy / test-build-deploy (push) Failing after 32s
outbox:publish/2 now walks the computed delivery_set and enqueues
the signed activity onto each matching delivery_worker
(registered under the peer-id atom). Missing workers are silently
skipped — lazy worker creation belongs to the kernel manager
later in Step 8.
Gated by Context's {dispatch_deliveries, true} so every M1
outbox caller (and every M2 caller that doesn't yet care about
delivery) stays back-compat: default off.
New helpers in outbox.erl:
dispatch_deliveries/3(Activity, DeliverySet, Context)
gates on Context :dispatch_deliveries flag
enqueue_each/2(Activity, [PeerId | _])
whereis-guarded enqueue per peer
7/7 in next/tests/delivery_dispatch.sh:
- single peer enqueued
- two peers both enqueued (fan-out)
- missing worker silently skipped
- no :dispatch_deliveries flag -> no-op (back-compat)
- two publishes -> FIFO append on the queue
- empty delivery_set -> no-op
outbox_publish.sh 17/17 unchanged; delivery_worker.sh 17/17
unchanged. Conformance preserved at 761/761 from the Step 8a
baseline.
This commit is contained in:
@@ -93,6 +93,7 @@ publish(Request, Context) ->
|
||||
{ok, NewLog, _Seq} = log:append(LogState, Signed),
|
||||
broadcast(Signed, envelope_field(projections, Context)),
|
||||
DeliverySet = compute_delivery_set(Request, Signed, Context),
|
||||
dispatch_deliveries(Signed, DeliverySet, Context),
|
||||
Result = [{cid, cid_of(Signed)},
|
||||
{activity, Signed},
|
||||
{delivery_set, DeliverySet}],
|
||||
@@ -101,6 +102,37 @@ publish(Request, Context) ->
|
||||
{error, Reason, LogState}
|
||||
end.
|
||||
|
||||
%% dispatch_deliveries/3 — Step 8d. For each ActorId in the
|
||||
%% delivery_set, enqueue the signed activity onto the matching
|
||||
%% delivery_worker if the worker is registered under that atom.
|
||||
%% Missing workers are silently skipped — lazy creation belongs
|
||||
%% to the kernel manager (later in Step 8). The Context
|
||||
%% `:dispatch_deliveries` field gates the call so existing
|
||||
%% outbox callers that don't yet care about delivery (e.g. all of
|
||||
%% M1's tests) stay back-compat.
|
||||
%%
|
||||
%% No-op when:
|
||||
%% - :dispatch_deliveries is absent or not the atom true
|
||||
%% - delivery_set is []
|
||||
%% - the per-peer worker isn't registered (whereis returns undefined)
|
||||
|
||||
dispatch_deliveries(Activity, DeliverySet, Context) ->
|
||||
case envelope_field(dispatch_deliveries, Context) of
|
||||
true -> enqueue_each(Activity, DeliverySet);
|
||||
_ -> ok
|
||||
end.
|
||||
|
||||
enqueue_each(_Activity, []) -> ok;
|
||||
enqueue_each(Activity, [PeerId | Rest]) when is_atom(PeerId) ->
|
||||
case erlang:whereis(PeerId) of
|
||||
undefined -> enqueue_each(Activity, Rest);
|
||||
_ ->
|
||||
delivery_worker:enqueue(PeerId, Activity),
|
||||
enqueue_each(Activity, Rest)
|
||||
end;
|
||||
enqueue_each(Activity, [_ | Rest]) ->
|
||||
enqueue_each(Activity, Rest).
|
||||
|
||||
%% compute_delivery_set/3 — Step 7c. Pulls the audience-resolved
|
||||
%% recipient list off the Request's `:to` / `:cc` fields (the
|
||||
%% envelope itself doesn't carry them — construct/4 only takes
|
||||
|
||||
120
next/tests/delivery_dispatch.sh
Executable file
120
next/tests/delivery_dispatch.sh
Executable file
@@ -0,0 +1,120 @@
|
||||
#!/usr/bin/env bash
|
||||
# next/tests/delivery_dispatch.sh — m2 Step 8d test.
|
||||
#
|
||||
# After a successful outbox:publish, each ActorId in the
|
||||
# Result's :delivery_set is enqueued onto the matching
|
||||
# delivery_worker (registered under the peer-id atom). Only
|
||||
# happens when Context carries {dispatch_deliveries, true} —
|
||||
# back-compat with every M1 outbox caller that doesn't dispatch.
|
||||
|
||||
set -uo pipefail
|
||||
cd "$(git rev-parse --show-toplevel)"
|
||||
|
||||
SX_SERVER="${SX_SERVER:-hosts/ocaml/_build/default/bin/sx_server.exe}"
|
||||
if [ ! -x "$SX_SERVER" ]; then
|
||||
SX_SERVER="/root/rose-ash/hosts/ocaml/_build/default/bin/sx_server.exe"
|
||||
fi
|
||||
if [ ! -x "$SX_SERVER" ]; then
|
||||
echo "ERROR: sx_server.exe not found." >&2
|
||||
exit 1
|
||||
fi
|
||||
|
||||
VERBOSE="${1:-}"
|
||||
PASS=0; FAIL=0; ERRORS=""
|
||||
TMPFILE=$(mktemp); trap "rm -f $TMPFILE" EXIT
|
||||
|
||||
# Alice publishes to bob (and carol). Each peer worker is registered
|
||||
# under its peer-id atom; the outbox dispatches via the workers'
|
||||
# enqueue path. dispatch_fn left undefined so the workers just
|
||||
# accumulate pending without firing HTTP.
|
||||
SETUP='K = <<1,2,3,4>>, KS = [{key_id,k1},{algorithm,ed25519},{value,K}], AS = [{public_keys,[[{id,k1},{created,0},{value,K}]]}], {ok, L0} = log:open(alice, <<98,97,115,101>>), Ctx = [{actor_id,alice},{published,1},{key_spec,KS},{actor_state,AS},{log,L0},{projections,[]},{dispatch_deliveries, true}], CtxNoDispatch = [{actor_id,alice},{published,1},{key_spec,KS},{actor_state,AS},{log,L0},{projections,[]}], ReqToBob = [{type, note}, {object, [{content, hi}]}, {to, bob}], ReqToTwo = [{type, note}, {object, [{content, hi}]}, {to, [bob, carol]}],'
|
||||
|
||||
cat > "$TMPFILE" <<EPOCHS
|
||||
(epoch 1)
|
||||
(load "lib/erlang/tokenizer.sx")
|
||||
(load "lib/erlang/parser.sx")
|
||||
(load "lib/erlang/parser-core.sx")
|
||||
(load "lib/erlang/parser-expr.sx")
|
||||
(load "lib/erlang/parser-module.sx")
|
||||
(load "lib/erlang/transpile.sx")
|
||||
(load "lib/erlang/runtime.sx")
|
||||
(load "lib/erlang/vm/dispatcher.sx")
|
||||
(epoch 2)
|
||||
(eval "(er-load-gen-server!)")
|
||||
(epoch 3)
|
||||
(eval "(get (erlang-load-module (file-read \"next/kernel/envelope.erl\")) :name)")
|
||||
(epoch 4)
|
||||
(eval "(get (erlang-load-module (file-read \"next/kernel/log.erl\")) :name)")
|
||||
(epoch 5)
|
||||
(eval "(get (erlang-load-module (file-read \"next/kernel/pipeline.erl\")) :name)")
|
||||
(epoch 6)
|
||||
(eval "(get (erlang-load-module (file-read \"next/kernel/follower_graph.erl\")) :name)")
|
||||
(epoch 7)
|
||||
(eval "(get (erlang-load-module (file-read \"next/kernel/delivery.erl\")) :name)")
|
||||
(epoch 8)
|
||||
(eval "(get (erlang-load-module (file-read \"next/kernel/delivery_worker.erl\")) :name)")
|
||||
(epoch 9)
|
||||
(eval "(get (erlang-load-module (file-read \"next/kernel/outbox.erl\")) :name)")
|
||||
|
||||
;; Bob's worker registered + publish to bob -> bob's pending has 1 entry
|
||||
(epoch 20)
|
||||
(eval "(get (erlang-eval-ast \"${SETUP} delivery_worker:start_link(bob), {ok, _, _} = outbox:publish(ReqToBob, Ctx), case delivery_worker:pending_srv(bob) of [_] -> ok; _ -> bad end\") :name)")
|
||||
|
||||
;; Carol's worker registered, publish to [bob, carol] -> both queues get 1 entry
|
||||
(epoch 21)
|
||||
(eval "(get (erlang-eval-ast \"${SETUP} delivery_worker:start_link(bob), delivery_worker:start_link(carol), {ok, _, _} = outbox:publish(ReqToTwo, Ctx), {length(delivery_worker:pending_srv(bob)), length(delivery_worker:pending_srv(carol))} =:= {1, 1}\") :name)")
|
||||
|
||||
;; Missing worker for an actor in delivery_set -> silently skipped (no error)
|
||||
(epoch 22)
|
||||
(eval "(get (erlang-eval-ast \"${SETUP} delivery_worker:start_link(bob), case outbox:publish(ReqToTwo, Ctx) of {ok, R, _} -> envelope:get_field(delivery_set, R) =:= {ok, [bob, carol]}; _ -> false end andalso length(delivery_worker:pending_srv(bob)) =:= 1\") :name)")
|
||||
|
||||
;; No :dispatch_deliveries flag -> no enqueue happens (back-compat)
|
||||
(epoch 23)
|
||||
(eval "(get (erlang-eval-ast \"${SETUP} delivery_worker:start_link(bob), {ok, _, _} = outbox:publish(ReqToBob, CtxNoDispatch), delivery_worker:pending_srv(bob) =:= []\") :name)")
|
||||
|
||||
;; Two publishes -> bob's queue has 2 entries (FIFO append)
|
||||
(epoch 24)
|
||||
(eval "(get (erlang-eval-ast \"${SETUP} delivery_worker:start_link(bob), {ok, _, NewLog} = outbox:publish(ReqToBob, Ctx), Ctx2 = [{actor_id,alice},{published,2},{key_spec,KS},{actor_state,AS},{log,NewLog},{projections,[]},{dispatch_deliveries, true}], {ok, _, _} = outbox:publish(ReqToBob, Ctx2), length(delivery_worker:pending_srv(bob)) =:= 2\") :name)")
|
||||
|
||||
;; Empty delivery_set -> no dispatch (no :to, no :cc)
|
||||
(epoch 25)
|
||||
(eval "(get (erlang-eval-ast \"${SETUP} delivery_worker:start_link(bob), ReqNoAud = [{type, note}, {object, [{content, hi}]}], {ok, _, _} = outbox:publish(ReqNoAud, Ctx), delivery_worker:pending_srv(bob) =:= []\") :name)")
|
||||
EPOCHS
|
||||
|
||||
OUTPUT=$(timeout 540 "$SX_SERVER" < "$TMPFILE" 2>/dev/null)
|
||||
|
||||
check() {
|
||||
local epoch="$1" desc="$2" expected="$3"
|
||||
local actual
|
||||
actual=$(echo "$OUTPUT" | awk -v e="$epoch" '
|
||||
$0 ~ "^\\(ok-len " e " " { getline; print; exit }
|
||||
$0 ~ "^\\(ok " e " " { print; exit }
|
||||
$0 ~ "^\\(error " e " " { print; exit }
|
||||
')
|
||||
[ -z "$actual" ] && actual="<no output for epoch $epoch>"
|
||||
if echo "$actual" | grep -qF -- "$expected"; then
|
||||
PASS=$((PASS+1))
|
||||
[ "$VERBOSE" = "-v" ] && echo " ok $desc"
|
||||
else
|
||||
FAIL=$((FAIL+1))
|
||||
ERRORS+=" FAIL [$desc] (epoch $epoch) expected: $expected | actual: $actual
|
||||
"
|
||||
fi
|
||||
}
|
||||
|
||||
check 9 "outbox module loaded" "outbox"
|
||||
check 20 "single peer enqueued" "ok"
|
||||
check 21 "two peers both enqueued" "true"
|
||||
check 22 "missing worker silently skip" "true"
|
||||
check 23 "no dispatch_deliveries no-op" "true"
|
||||
check 24 "two publishes FIFO append" "true"
|
||||
check 25 "empty delivery_set -> no-op" "true"
|
||||
|
||||
TOTAL=$((PASS+FAIL))
|
||||
if [ $FAIL -eq 0 ]; then
|
||||
echo "ok $PASS/$TOTAL next/tests/delivery_dispatch.sh passed"
|
||||
else
|
||||
echo "FAIL $PASS/$TOTAL passed, $FAIL failed:"
|
||||
echo "$ERRORS"
|
||||
fi
|
||||
[ $FAIL -eq 0 ]
|
||||
@@ -557,9 +557,18 @@ a dead-letter list visible via `/admin/dead-letter`.
|
||||
kernel restart. New `next/kernel/delivery_state.erl` fold maps
|
||||
enqueue / delivered / failed events to the worker's persistent
|
||||
shape.
|
||||
- [ ] **8d** — `outbox:publish/2` dispatches each delivery-set
|
||||
entry to the matching worker. The worker is created lazily on
|
||||
first delivery to a peer.
|
||||
- [x] **8d** — `outbox:publish/2` dispatches each delivery-set
|
||||
entry to the matching worker. New `dispatch_deliveries/3` +
|
||||
`enqueue_each/2` in `outbox.erl` walk the computed
|
||||
`delivery_set` and call `delivery_worker:enqueue(PeerId,
|
||||
Activity)` for each registered peer atom. Missing workers
|
||||
(no `whereis`) are silently skipped — lazy worker creation
|
||||
belongs to the kernel manager (Step 8d-mgr or later).
|
||||
Gated by `Context` field `{dispatch_deliveries, true}` so
|
||||
every M1 outbox caller stays back-compat (default off). 7/7
|
||||
in `delivery_dispatch.sh` covering single-peer enqueue,
|
||||
two-peer fan-out, missing-worker skip, no-flag no-op,
|
||||
FIFO append across two publishes, empty delivery_set no-op.
|
||||
- [ ] **8e** — `httpc:request/4` BIF wrapper in
|
||||
`lib/erlang/runtime.sx` (the briefing's allowed scope
|
||||
exception for Step 8). Marshalling: SX dict ↔ Erlang proplist
|
||||
@@ -890,6 +899,18 @@ proceed.
|
||||
|
||||
Newest first.
|
||||
|
||||
- **2026-06-07** — Step 8d: outbox dispatches delivery_set to
|
||||
workers. `outbox:publish/2` gained `dispatch_deliveries/3` and
|
||||
`enqueue_each/2`: after `log:append` + projection broadcast,
|
||||
the resolved `delivery_set` is walked and each registered
|
||||
peer-id atom's `delivery_worker:enqueue(PeerId, Activity)` is
|
||||
called. Missing workers (no `erlang:whereis`) are silently
|
||||
skipped. Gated by Context's `{dispatch_deliveries, true}` —
|
||||
default off so every M1 outbox caller stays back-compat. 7/7
|
||||
in `delivery_dispatch.sh`; `outbox_publish.sh` +
|
||||
`delivery_worker.sh` both still 17/17. Conformance preserved
|
||||
at 761/761 from the Step 8a baseline.
|
||||
|
||||
- **2026-06-07** — Step 8a: delivery_worker skeleton.
|
||||
`next/kernel/delivery_worker.erl` with pure-functional state +
|
||||
enqueue / drain / deliver_one + backoff schedule (30s / 5m /
|
||||
|
||||
Reference in New Issue
Block a user