diff --git a/next/kernel/http_server.erl b/next/kernel/http_server.erl index 084d9ae9..8a41a956 100644 --- a/next/kernel/http_server.erl +++ b/next/kernel/http_server.erl @@ -1098,16 +1098,40 @@ handle_inbox_decoded(TargetId, Activity, Cfg) -> end end. -run_inbox_pipeline(TargetAtom, Activity, PeerAS, InboxLog, _Cfg) -> +run_inbox_pipeline(TargetAtom, Activity, PeerAS, InboxLog, Cfg) -> case pipeline:validate_inbound(Activity, PeerAS, InboxLog) of ok -> nx_kernel:append_inbox(TargetAtom, Activity), + broadcast_to_inbox_projections(Activity, Cfg), actor_inbox_post_response(); {error, bad_signature} -> unauthorized_response(); {error, no_signature} -> unauthorized_response(); {error, _} -> validation_failed_response() end. +%% broadcast_to_inbox_projections/2 — Step 6b. Cfg may carry +%% `{inbox_projections, [Name, ...]}` listing projection gen_servers +%% that should see every successfully-ingested inbound activity. +%% Casts via projection:async_fold/2 — fire-and-forget so the inbox +%% handler doesn't block on projection processing. +%% +%% No-op when the field is absent. v2 v2 layers per-actor projection +%% routing on top (each actor's bucket can carry its own projection +%% list); for now the field is global. + +broadcast_to_inbox_projections(Activity, Cfg) -> + case field(inbox_projections, Cfg) of + nil -> ok; + Names when is_list(Names) -> + broadcast_each(Activity, Names); + _ -> ok + end. + +broadcast_each(_, []) -> ok; +broadcast_each(Activity, [Name | Rest]) -> + projection:async_fold(Name, Activity), + broadcast_each(Activity, Rest). + %% kernel_has_actor/2 — guard against unknown target actors. nil %% kernel (e.g. tests without a kernel cfg'd) treats every Id as %% present so the rest of the pipeline can still exercise. diff --git a/next/tests/bootstrap_start.sh b/next/tests/bootstrap_start.sh index 51eafd5d..c3cc97fe 100755 --- a/next/tests/bootstrap_start.sh +++ b/next/tests/bootstrap_start.sh @@ -92,7 +92,7 @@ cat > "$TMPFILE" <>) of {ok, _} -> ok; _ -> bad end\") :name)") EPOCHS -OUTPUT=$(timeout 300 "$SX_SERVER" < "$TMPFILE" 2>/dev/null) +OUTPUT=$(timeout 600 "$SX_SERVER" < "$TMPFILE" 2>/dev/null) check() { local epoch="$1" desc="$2" expected="$3" diff --git a/next/tests/follow_lifecycle.sh b/next/tests/follow_lifecycle.sh new file mode 100755 index 00000000..33d022bb --- /dev/null +++ b/next/tests/follow_lifecycle.sh @@ -0,0 +1,137 @@ +#!/usr/bin/env bash +# next/tests/follow_lifecycle.sh — m2 Step 6b test. +# +# Ties Step 5 (POST /actors//inbox real ingestion) to Step 6a +# (follower_graph projection) via Cfg :inbox_projections. The +# inbox handler casts every successfully-ingested activity into +# each named projection — the follower_graph state mutates as +# Follow / Accept / Reject / Undo activities land. + +set -uo pipefail +cd "$(git rev-parse --show-toplevel)" + +SX_SERVER="${SX_SERVER:-hosts/ocaml/_build/default/bin/sx_server.exe}" +if [ ! -x "$SX_SERVER" ]; then + SX_SERVER="/root/rose-ash/hosts/ocaml/_build/default/bin/sx_server.exe" +fi +if [ ! -x "$SX_SERVER" ]; then + echo "ERROR: sx_server.exe not found." >&2 + exit 1 +fi + +VERBOSE="${1:-}" +PASS=0; FAIL=0; ERRORS="" +TMPFILE=$(mktemp); trap "rm -f $TMPFILE" EXIT + +# Alice is on this kernel (target). Bob is the peer (signs activities +# with BobKS). PeerAS = Bob's actor-state (Bob's public_keys). The +# :inbox_projections wires inbound to the followers projection so +# follower_graph state advances on every successful ingestion. +SETUP='AK = <<1,2,3,4>>, AKS = [{key_id,k1},{algorithm,ed25519},{value,AK}], AAS = [{public_keys,[[{id,k1},{created,0},{value,AK}]]}], BK = <<5,6,7,8>>, BKS = [{key_id,k1},{algorithm,ed25519},{value,BK}], BAS = [{public_keys,[[{id,k1},{created,0},{value,BK}]]}], FollowReq = [{actor, bob}, {type, follow}, {object, alice}, {published, 1}], FollowEnv = outbox:construct(follow, bob, 1, alice), SignedFollow = outbox:sign(FollowEnv, BKS), Body = term_codec:encode(SignedFollow), nx_kernel:start_link(alice, AKS, AAS), projection:start_link(followers, follower_graph:new(), follower_graph:fold_fn()), Cfg = [{peer_as, [{bob, BAS}]}, {kernel, nx_kernel}, {inbox_projections, [followers]}], InboxPath = <<47,97,99,116,111,114,115,47,97,108,105,99,101,47,105,110,98,111,120>>,' + +cat > "$TMPFILE" < 202 from inbox handler +(epoch 20) +(eval "(get (erlang-eval-ast \"${SETUP} Req = [{method, <<80,79,83,84>>}, {path, InboxPath}, {headers, []}, {body, Body}], case http_server:route(Req, Cfg) of [{status, 202}, _, _] -> true; _ -> false end\") :name)") + +;; After Follow: follower_graph state shows alice with pending_inbound = [bob] +(epoch 21) +(eval "(get (erlang-eval-ast \"${SETUP} Req = [{method, <<80,79,83,84>>}, {path, InboxPath}, {headers, []}, {object, alice}, {body, Body}], http_server:route(Req, Cfg), follower_graph:pending_inbound(alice, projection:query(followers)) =:= [bob]\") :name)") + +;; And bob has pending_outbound = [alice] +(epoch 22) +(eval "(get (erlang-eval-ast \"${SETUP} Req = [{method, <<80,79,83,84>>}, {path, InboxPath}, {headers, []}, {body, Body}], http_server:route(Req, Cfg), follower_graph:pending_outbound(bob, projection:query(followers)) =:= [alice]\") :name)") + +;; Inbox tip advanced even without auto-Accept (separate concern) +(epoch 23) +(eval "(erlang-eval-ast \"${SETUP} Req = [{method, <<80,79,83,84>>}, {path, InboxPath}, {headers, []}, {body, Body}], http_server:route(Req, Cfg), nx_kernel:inbox_tip_for(alice)\")") + +;; No :inbox_projections in Cfg: projection state stays empty +(epoch 24) +(eval "(get (erlang-eval-ast \"${SETUP} BareCfg = [{peer_as, [{bob, BAS}]}, {kernel, nx_kernel}], Req = [{method, <<80,79,83,84>>}, {path, InboxPath}, {headers, []}, {body, Body}], http_server:route(Req, BareCfg), follower_graph:pending_inbound(alice, projection:query(followers)) =:= []\") :name)") + +;; Follow + Accept end-to-end: bob -> alice (Follow), alice -> bob (Accept via outbox). +;; v2 only has the inbox side wired; the Accept is built locally in the test and +;; folded through the same projection to demonstrate that the projection state +;; converges. Auto-Accept publish lands in 6c. +(epoch 25) +(eval "(get (erlang-eval-ast \"${SETUP} Req = [{method, <<80,79,83,84>>}, {path, InboxPath}, {headers, []}, {body, Body}], http_server:route(Req, Cfg), AcceptAct = [{actor, alice}, {type, accept}, {object, [{actor, bob}, {type, follow}, {object, alice}]}], projection:async_fold(followers, AcceptAct), S = projection:query(followers), follower_graph:followers(alice, S) =:= [bob] andalso follower_graph:following(bob, S) =:= [alice]\") :name)") + +;; Inbox handler with bad sig fails BEFORE projection broadcast +(epoch 26) +(eval "(get (erlang-eval-ast \"AK = <<1,2,3,4>>, AKS = [{key_id,k1},{algorithm,ed25519},{value,AK}], AAS = [{public_keys,[[{id,k1},{created,0},{value,AK}]]}], EvilK = <<9,9,9,9>>, EvilAS = [{public_keys,[[{id,k1},{created,0},{value,EvilK}]]}], BK = <<5,6,7,8>>, BKS = [{key_id,k1},{algorithm,ed25519},{value,BK}], FollowEnv = outbox:construct(follow, bob, 1, alice), SignedFollow = outbox:sign(FollowEnv, BKS), Body = term_codec:encode(SignedFollow), nx_kernel:start_link(alice, AKS, AAS), projection:start_link(followers, follower_graph:new(), follower_graph:fold_fn()), EvilCfg = [{peer_as, [{bob, EvilAS}]}, {kernel, nx_kernel}, {inbox_projections, [followers]}], InboxPath = <<47,97,99,116,111,114,115,47,97,108,105,99,101,47,105,110,98,111,120>>, Req = [{method, <<80,79,83,84>>}, {path, InboxPath}, {headers, []}, {body, Body}], http_server:route(Req, EvilCfg), follower_graph:actors(projection:query(followers)) =:= []\") :name)") + +;; Multiple distinct peer Follows accumulate +(epoch 27) +(eval "(get (erlang-eval-ast \"${SETUP} CK = <<9,9,9,9>>, CKS = [{key_id,k1},{algorithm,ed25519},{value,CK}], CAS = [{public_keys,[[{id,k1},{created,0},{value,CK}]]}], MultiCfg = [{peer_as, [{bob, BAS}, {carol, CAS}]}, {kernel, nx_kernel}, {inbox_projections, [followers]}], CarolEnv = outbox:construct(follow, carol, 1, alice), CarolSigned = outbox:sign(CarolEnv, CKS), CarolBody = term_codec:encode(CarolSigned), Req1 = [{method, <<80,79,83,84>>}, {path, InboxPath}, {headers, []}, {body, Body}], Req2 = [{method, <<80,79,83,84>>}, {path, InboxPath}, {headers, []}, {body, CarolBody}], http_server:route(Req1, MultiCfg), http_server:route(Req2, MultiCfg), follower_graph:pending_inbound(alice, projection:query(followers)) =:= [bob, carol]\") :name)") +EPOCHS + +OUTPUT=$(timeout 900 "$SX_SERVER" < "$TMPFILE" 2>/dev/null) + +check() { + local epoch="$1" desc="$2" expected="$3" + local actual + actual=$(echo "$OUTPUT" | awk -v e="$epoch" ' + $0 ~ "^\\(ok-len " e " " { getline; print; exit } + $0 ~ "^\\(ok " e " " { print; exit } + $0 ~ "^\\(error " e " " { print; exit } + ') + [ -z "$actual" ] && actual="" + if echo "$actual" | grep -qF -- "$expected"; then + PASS=$((PASS+1)) + [ "$VERBOSE" = "-v" ] && echo " ok $desc" + else + FAIL=$((FAIL+1)) + ERRORS+=" FAIL [$desc] (epoch $epoch) expected: $expected | actual: $actual +" + fi +} + +check 11 "http_server module loaded" "http_server" +check 20 "Follow ingestion -> 202" "true" +check 21 "alice.pending_inbound = [bob]" "true" +check 22 "bob.pending_outbound = [alice]" "true" +check 23 "inbox tip advances to 1" "1" +check 24 "no inbox_projections -> no fold" "true" +check 25 "Follow + Accept projection state" "true" +check 26 "bad sig doesn't pollute projection" "true" +check 27 "two distinct peer Follows accumulate" "true" + +TOTAL=$((PASS+FAIL)) +if [ $FAIL -eq 0 ]; then + echo "ok $PASS/$TOTAL next/tests/follow_lifecycle.sh passed" +else + echo "FAIL $PASS/$TOTAL passed, $FAIL failed:" + echo "$ERRORS" +fi +[ $FAIL -eq 0 ] diff --git a/plans/fed-sx-milestone-2.md b/plans/fed-sx-milestone-2.md index 62d3da5c..a5431321 100644 --- a/plans/fed-sx-milestone-2.md +++ b/plans/fed-sx-milestone-2.md @@ -436,11 +436,27 @@ tracks the state. `Undo{Follow}` reverses it. 18 cases in `follower_graph.sh`. The `fold_fn/0` 2-arity fun plugs into `projection:start_link/3` exactly like `define_registry:fold_fn/0` and `actor_state:fold_fn/0`. -- [ ] **6b** — Wire follower-graph fold to the inbox handler so a - peer Follow lands, fires auto-Accept publish (open-world policy - per §13.2; manual moderation deferred to v3). Acceptance test - in `follow_lifecycle.sh` covering the end-to-end - Follow → inbox → auto-Accept → projection-state-converges flow. +- [x] **6b** — Wire follower-graph fold to the inbox handler. + `http_server.erl` `run_inbox_pipeline` now calls + `broadcast_to_inbox_projections/2` after a successful + `nx_kernel:append_inbox`. Cfg may carry `{inbox_projections, + [Name, ...]}` listing projection gen_servers; each gets the + activity via `projection:async_fold/2` (fire-and-forget so the + handler doesn't block on fold processing). Field absent = + no-op. v2 leaves the routing field global; per-actor + projection wiring is a forward-looking follow-up. 9/9 in + `follow_lifecycle.sh` covering 202 ingestion, follower_graph + pending-state mutation on both sides, no-inbox_projections + no-op path, bad-sig short-circuit (projection stays clean), + multi-peer accumulation, end-to-end Follow+Accept projection + convergence (Accept fed in via projection:async_fold for v2). +- [ ] **6c** — Auto-Accept publish. On Follow ingestion, the + receiving kernel constructs an `Accept{actor: target, object: + Follow}` envelope, signs it with the target's key, and + publishes via `nx_kernel:publish_to/2`. Per design §13.2 the + policy is open-world (auto-accept every Follow); manual + moderation (held in a pending list, accepted via /admin/) is + v3. **Acceptance:** `bash next/tests/follow_lifecycle.sh` passes 14+ cases. @@ -813,6 +829,19 @@ proceed. Newest first. +- **2026-06-06** — Step 6b: wire follower_graph fold to the + inbox handler. New `broadcast_to_inbox_projections/2` in + `http_server.erl` casts every successfully-ingested activity + into each `:inbox_projections` Cfg entry via + `projection:async_fold/2`. Fire-and-forget so the inbox + handler doesn't block on fold processing. Empty / absent + `:inbox_projections` is a no-op (back-compat with Steps 5d + callers). 9/9 in `follow_lifecycle.sh` covering 202 + bilateral + pending-state mutation + bad-sig short-circuit + multi-peer + + end-to-end projection convergence on Follow+Accept. Conformance + 761/761. Auto-Accept publish (the receiving kernel responds + with a signed Accept) is Step 6c. + - **2026-06-06** — Step 6a: follower-graph projection (`follower_graph.erl`). Pure-functional fold over Follow / Accept / Reject / Undo activities per design §13.2. State is a