From 070986913deb6914b4809a05df33c8fd90857e79 Mon Sep 17 00:00:00 2001 From: giles Date: Sun, 7 Jun 2026 07:01:55 +0000 Subject: [PATCH] =?UTF-8?q?fed-sx-m2:=20Step=209c=20=E2=80=94=20auto-Accep?= =?UTF-8?q?t=20backfill=20drain=20+=206=20tests?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit maybe_auto_accept/3 in http_server.erl now calls maybe_backfill/3 after the Accept publish. Flow: inbound Follow{actor: bob, object: alice, backfill: SPEC} lands -> pipeline ok -> append_inbox + broadcast (Step 6b) -> maybe_auto_accept fires (Step 6c) -> publish Accept{actor: alice, object: Follow} (Step 6c) -> maybe_backfill (Step 9c) -> backfill_enabled cfg gate -> :backfill present on Follow -> backfill:parse_mode -> Mode -> nx_kernel:log_state_for(alice) -> LogState -> backfill:slice(Mode, LogState, true) -> [Wrapped] -> deliver_backfill(bob, Slice): whereis(bob) cfg gate (peer worker registered) -> delivery_worker:enqueue(bob, A) for each Cfg surface: {backfill_enabled, true} gate the drain (default off) {auto_accept_follows, true} Step 6c gate (required) Each backfilled entry carries {backfilled, true} (per design §13.3, :id preserved so the receiver's replay defence still catches the forward-going copy). 6/6 in next/tests/backfill_drain.sh: - Follow with {backfill, {last_n, 2}} + 3 pre-published notes -> bob's delivery_worker has exactly 2 pending entries - Each entry carries {backfilled, true} - :backfill_enabled absent -> no drain (back-compat) - Follow without :backfill field -> no drain - Missing peer worker (no whereis) -> silently skipped + 202 Step 9 fully closed (9a slicing + 9b ?since route + 9c Accept-drain). The live HTTP dispatch of the queued entries still gates on Blockers #2 (httpc). --- next/kernel/http_server.erl | 49 +++++++++++++- next/tests/backfill_drain.sh | 121 +++++++++++++++++++++++++++++++++++ plans/fed-sx-milestone-2.md | 33 ++++++++-- 3 files changed, 196 insertions(+), 7 deletions(-) create mode 100755 next/tests/backfill_drain.sh diff --git a/next/kernel/http_server.erl b/next/kernel/http_server.erl index 3e550a3f..5867b4d8 100644 --- a/next/kernel/http_server.erl +++ b/next/kernel/http_server.erl @@ -1178,12 +1178,59 @@ maybe_auto_accept(TargetAtom, Activity, Cfg) -> case envelope:get_field(type, Activity) of {ok, follow} -> AcceptRequest = [{type, accept}, {object, Activity}], - nx_kernel:publish_to(TargetAtom, AcceptRequest); + nx_kernel:publish_to(TargetAtom, AcceptRequest), + maybe_backfill(TargetAtom, Activity, Cfg); _ -> ok end; _ -> ok end. +%% maybe_backfill/3 — Step 9c. If Cfg carries +%% `{backfill_enabled, true}` AND the Follow activity carries a +%% `:backfill` field, parse the mode, slice the receiving actor's +%% outbox per `backfill:slice/3` (Wrap=true so each entry carries +%% `{backfilled, true}`), and enqueue each onto the new follower's +%% delivery_worker (registered under the follower's actor-id atom). +%% +%% Missing delivery_worker for the peer is silently skipped — the +%% kernel manager lazily creates workers (or won't, in single-kernel +%% in-process tests where the peer-worker is set up explicitly). + +maybe_backfill(TargetAtom, FollowActivity, Cfg) -> + case field(backfill_enabled, Cfg) of + true -> + case envelope:get_field(backfill, FollowActivity) of + {ok, Spec} -> + Mode = backfill:parse_mode(Spec), + drain_backfill(TargetAtom, FollowActivity, Mode); + _ -> ok + end; + _ -> ok + end. + +drain_backfill(TargetAtom, FollowActivity, Mode) -> + case nx_kernel:log_state_for(TargetAtom) of + {ok, LogState} -> + Slice = backfill:slice(Mode, LogState, true), + case envelope:get_field(actor, FollowActivity) of + {ok, PeerId} when is_atom(PeerId) -> + deliver_backfill(PeerId, Slice); + _ -> ok + end; + _ -> ok + end. + +deliver_backfill(PeerId, Activities) -> + case erlang:whereis(PeerId) of + undefined -> ok; + _ -> enqueue_backfill_each(PeerId, Activities) + end. + +enqueue_backfill_each(_, []) -> ok; +enqueue_backfill_each(PeerId, [A | Rest]) -> + delivery_worker:enqueue(PeerId, A), + enqueue_backfill_each(PeerId, Rest). + %% broadcast_to_inbox_projections/2 — Step 6b. Cfg may carry %% `{inbox_projections, [Name, ...]}` listing projection gen_servers %% that should see every successfully-ingested inbound activity. diff --git a/next/tests/backfill_drain.sh b/next/tests/backfill_drain.sh new file mode 100755 index 00000000..9ac5be14 --- /dev/null +++ b/next/tests/backfill_drain.sh @@ -0,0 +1,121 @@ +#!/usr/bin/env bash +# next/tests/backfill_drain.sh — m2 Step 9c test. +# +# Auto-Accept on Follow ingestion can now also drain the receiving +# actor's outbox into the new follower's delivery_worker queue per +# the Follow's :backfill spec. Gated by Cfg :backfill_enabled. + +set -uo pipefail +cd "$(git rev-parse --show-toplevel)" + +SX_SERVER="${SX_SERVER:-hosts/ocaml/_build/default/bin/sx_server.exe}" +if [ ! -x "$SX_SERVER" ]; then + SX_SERVER="/root/rose-ash/hosts/ocaml/_build/default/bin/sx_server.exe" +fi +if [ ! -x "$SX_SERVER" ]; then + echo "ERROR: sx_server.exe not found." >&2 + exit 1 +fi + +VERBOSE="${1:-}" +PASS=0; FAIL=0; ERRORS="" +TMPFILE=$(mktemp); trap "rm -f $TMPFILE" EXIT + +# Alice is the target (on this kernel). Bob is the peer publishing the +# Follow. Three notes pre-published to alice's outbox before bob's +# Follow lands; the Follow asks for last_n=2 backfill. +SETUP='AK = <<1,2,3,4>>, AKS = [{key_id,k1},{algorithm,ed25519},{value,AK}], AAS = [{public_keys,[[{id,k1},{created,0},{value,AK}]]}], BK = <<5,6,7,8>>, BKS = [{key_id,k1},{algorithm,ed25519},{value,BK}], BAS = [{public_keys,[[{id,k1},{created,0},{value,BK}]]}], FollowReq = [{type, follow}, {object, alice}], FollowReqBF = [{type, follow}, {object, alice}, {backfill, {last_n, 2}}], FollowEnvBF = outbox:construct(follow, bob, 1, alice), FollowSignedNoBF = outbox:sign(FollowEnvBF, BKS), FollowSignedBF = outbox:sign(FollowEnvBF ++ [{backfill, {last_n, 2}}], BKS), BodyBF = term_codec:encode(FollowSignedBF), BodyNoBF = term_codec:encode(FollowSignedNoBF), nx_kernel:start_link(alice, AKS, AAS), delivery_worker:start_link(bob), InboxPath = <<47,97,99,116,111,114,115,47,97,108,105,99,101,47,105,110,98,111,120>>,' + +cat > "$TMPFILE" < bob's delivery_worker has 2 pending entries after Follow lands +(epoch 20) +(eval "(get (erlang-eval-ast \"${SETUP} N1 = [{type, note}, {object, [{content, hi1}]}], N2 = [{type, note}, {object, [{content, hi2}]}], N3 = [{type, note}, {object, [{content, hi3}]}], nx_kernel:publish_to(alice, N1), nx_kernel:publish_to(alice, N2), nx_kernel:publish_to(alice, N3), Cfg = [{peer_as, [{bob, BAS}]}, {kernel, nx_kernel}, {auto_accept_follows, true}, {backfill_enabled, true}], Req = [{method, <<80,79,83,84>>}, {path, InboxPath}, {headers, []}, {body, BodyBF}], http_server:route(Req, Cfg), length(delivery_worker:pending_srv(bob)) =:= 2\") :name)") + +;; Each backfilled entry carries {backfilled, true} +(epoch 21) +(eval "(get (erlang-eval-ast \"${SETUP} N1 = [{type, note}, {object, [{content, hi}]}], nx_kernel:publish_to(alice, N1), Cfg = [{peer_as, [{bob, BAS}]}, {kernel, nx_kernel}, {auto_accept_follows, true}, {backfill_enabled, true}], Req = [{method, <<80,79,83,84>>}, {path, InboxPath}, {headers, []}, {body, BodyBF}], http_server:route(Req, Cfg), [E | _] = delivery_worker:pending_srv(bob), envelope:get_field(backfilled, E) =:= {ok, true}\") :name)") + +;; No :backfill_enabled flag -> no backfill drain even with :backfill in Follow +(epoch 22) +(eval "(get (erlang-eval-ast \"${SETUP} N1 = [{type, note}, {object, [{content, hi}]}], nx_kernel:publish_to(alice, N1), Cfg = [{peer_as, [{bob, BAS}]}, {kernel, nx_kernel}, {auto_accept_follows, true}], Req = [{method, <<80,79,83,84>>}, {path, InboxPath}, {headers, []}, {body, BodyBF}], http_server:route(Req, Cfg), delivery_worker:pending_srv(bob) =:= []\") :name)") + +;; Follow without :backfill field -> no backfill drain (even with the flag) +(epoch 23) +(eval "(get (erlang-eval-ast \"${SETUP} N1 = [{type, note}, {object, [{content, hi}]}], nx_kernel:publish_to(alice, N1), Cfg = [{peer_as, [{bob, BAS}]}, {kernel, nx_kernel}, {auto_accept_follows, true}, {backfill_enabled, true}], Req = [{method, <<80,79,83,84>>}, {path, InboxPath}, {headers, []}, {body, BodyNoBF}], http_server:route(Req, Cfg), delivery_worker:pending_srv(bob) =:= []\") :name)") + +;; Missing delivery_worker for the peer -> silently skipped (no enqueue, no crash) +(epoch 24) +(eval "(get (erlang-eval-ast \"AK = <<1,2,3,4>>, AKS = [{key_id,k1},{algorithm,ed25519},{value,AK}], AAS = [{public_keys,[[{id,k1},{created,0},{value,AK}]]}], BK = <<5,6,7,8>>, BKS = [{key_id,k1},{algorithm,ed25519},{value,BK}], BAS = [{public_keys,[[{id,k1},{created,0},{value,BK}]]}], nx_kernel:start_link(alice, AKS, AAS), FollowEnvBF = outbox:construct(follow, bob, 1, alice), FollowSignedBF = outbox:sign(FollowEnvBF ++ [{backfill, {last_n, 2}}], BKS), BodyBF = term_codec:encode(FollowSignedBF), N1 = [{type, note}, {object, [{content, hi}]}], nx_kernel:publish_to(alice, N1), Cfg = [{peer_as, [{bob, BAS}]}, {kernel, nx_kernel}, {auto_accept_follows, true}, {backfill_enabled, true}], InboxPath = <<47,97,99,116,111,114,115,47,97,108,105,99,101,47,105,110,98,111,120>>, Req = [{method, <<80,79,83,84>>}, {path, InboxPath}, {headers, []}, {body, BodyBF}], case http_server:route(Req, Cfg) of [{status, 202}, _, _] -> true; _ -> false end\") :name)") +EPOCHS + +OUTPUT=$(timeout 900 "$SX_SERVER" < "$TMPFILE" 2>/dev/null) + +check() { + local epoch="$1" desc="$2" expected="$3" + local actual + actual=$(echo "$OUTPUT" | awk -v e="$epoch" ' + $0 ~ "^\\(ok-len " e " " { getline; print; exit } + $0 ~ "^\\(ok " e " " { print; exit } + $0 ~ "^\\(error " e " " { print; exit } + ') + [ -z "$actual" ] && actual="" + if echo "$actual" | grep -qF -- "$expected"; then + PASS=$((PASS+1)) + [ "$VERBOSE" = "-v" ] && echo " ok $desc" + else + FAIL=$((FAIL+1)) + ERRORS+=" FAIL [$desc] (epoch $epoch) expected: $expected | actual: $actual +" + fi +} + +check 13 "http_server loaded" "http_server" +check 20 "Follow w/ backfill -> 2 enqueued" "true" +check 21 "backfilled marker on entries" "true" +check 22 "no flag -> no backfill" "true" +check 23 "no :backfill field -> no drain" "true" +check 24 "missing worker -> 202 (skip)" "true" + +TOTAL=$((PASS+FAIL)) +if [ $FAIL -eq 0 ]; then + echo "ok $PASS/$TOTAL next/tests/backfill_drain.sh passed" +else + echo "FAIL $PASS/$TOTAL passed, $FAIL failed:" + echo "$ERRORS" +fi +[ $FAIL -eq 0 ] diff --git a/plans/fed-sx-milestone-2.md b/plans/fed-sx-milestone-2.md index 778e786d..2d48ea8a 100644 --- a/plans/fed-sx-milestone-2.md +++ b/plans/fed-sx-milestone-2.md @@ -666,12 +666,20 @@ Per §13.3: A wants B's history when A first follows B. Four modes: to `http_multi_actor.sh` (downstream dependency since Step 7c/9a — must have been latently broken; the existing 41 passes + 3 new = 44 now all green). -- [ ] **9c** — Follow → Accept → backfill-delivery wiring. - The receiving kernel reads the Follow's `:backfill` field - via `parse_mode/1`, slices its outbox, and dispatches each - entry to the new follower's delivery_worker queue (Step 8d). - Gates on Blockers #2 (httpc) for the actual peer fetch path - but the in-process drain works today. +- [x] **9c** — Follow → Accept → backfill drain (in-process). + `maybe_auto_accept/3` in `http_server.erl` now calls a new + `maybe_backfill/3` after the Accept publish: when Cfg carries + `{backfill_enabled, true}` AND the Follow envelope carries a + `:backfill` field, the receiver parses the mode via + `backfill:parse_mode/1`, slices its outbox via + `backfill:slice/3` (Wrap=true so each entry gets + `{backfilled, true}`), and enqueues every slice entry onto + the peer's delivery_worker if registered (silently skipped + otherwise — kernel manager lazy creation belongs upstream). + 6/6 in `backfill_drain.sh` covering full path + entry marker + + flag-off no-op + missing-backfill-field no-op + missing- + worker silent skip. The live HTTP dispatch of those queued + entries still gates on Blockers #2 (httpc). **Tests:** @@ -1047,6 +1055,19 @@ proceed. Newest first. +- **2026-06-07** — Step 9c (closes Step 9): Follow → Accept → + backfill drain (in-process). `maybe_auto_accept/3` now calls + `maybe_backfill/3` after the Accept publish: when + `:backfill_enabled` is true and the Follow envelope carries a + `:backfill` field, the receiver parses the mode, slices its + outbox via `backfill:slice/3` (Wrap=true), and enqueues every + entry onto the peer's delivery_worker. Silent skip when the + worker isn't registered (kernel manager lazy creation + upstream). 6/6 in `backfill_drain.sh`. Step 9 fully closed + (9a slicing + 9b ?since route + 9c Accept-drain). Live HTTP + dispatch of queued entries still gates on Blockers #2 + (httpc). + - **2026-06-07** — Step 9b: outbox `?since=Cid` pagination. `actor_outbox_response_for/3` in `http_server.erl` now reads `?since=` from the query string via new `parse_since/1` +