diff --git a/next/kernel/pipeline.erl b/next/kernel/pipeline.erl index bb4d01af..ba3ace63 100644 --- a/next/kernel/pipeline.erl +++ b/next/kernel/pipeline.erl @@ -1,7 +1,8 @@ -module(pipeline). -export([run_stages/2, - validate_inbound/1, validate_outbound/1, - inbound_stages/0, outbound_stages/0, + validate_inbound/1, validate_inbound/3, + validate_outbound/1, + inbound_stages/0, inbound_stages/2, outbound_stages/0, stage_envelope/1, stage_signature/1, stage_signature/2, stage_replay/1, stage_replay/2, @@ -34,12 +35,43 @@ run_stages(Activity, [Stage | Rest]) -> validate_inbound(Activity) -> run_stages(Activity, inbound_stages()). +%% validate_inbound/3 — Step 5b federation inbound pipeline. +%% +%% Activity: the signed envelope as received from the peer. +%% PeerActorState: the peer's actor-state proplist carrying +%% :public_keys for signature verification. Caller +%% resolves this — for v2 it's either pre-populated +%% from a peer-actors cache (Step 5c) or known from +%% a two-instance test fixture. +%% InboxLog: the receiving actor's :actor_inbox log state. +%% Used by stage_replay to reject duplicate :id. +%% +%% Stages (per design §13.2 + §14): +%% stage_envelope — shape check +%% stage_signature(PeerAS) — peer sig verify +%% stage_replay(InboxLog) — replay defence against +%% receiving actor's inbox +%% +%% Returns ok | {error, Reason}. The driver halts on first failure. +%% Audience / schema / capabilities / trust stages defer to v3. + +validate_inbound(Activity, PeerActorState, InboxLog) -> + run_stages(Activity, inbound_stages(PeerActorState, InboxLog)). + validate_outbound(Activity) -> run_stages(Activity, outbound_stages()). inbound_stages() -> [fun (A) -> stage_envelope(A) end]. +%% inbound_stages/2 — the full ordered stage list for federation +%% inbound (envelope -> peer sig -> replay against inbox). + +inbound_stages(PeerActorState, InboxLog) -> + [fun (A) -> stage_envelope(A) end, + stage_signature(PeerActorState), + stage_replay(InboxLog)]. + outbound_stages() -> [fun (A) -> stage_envelope(A) end]. diff --git a/next/tests/inbox_pipeline.sh b/next/tests/inbox_pipeline.sh new file mode 100755 index 00000000..2bb6f4e0 --- /dev/null +++ b/next/tests/inbox_pipeline.sh @@ -0,0 +1,146 @@ +#!/usr/bin/env bash +# next/tests/inbox_pipeline.sh — m2 Step 5b test. +# +# Exercises pipeline:validate_inbound/3(Activity, PeerActorState, +# InboxLog) — the federation inbound pipeline that runs +# envelope-shape -> peer signature -> replay against the receiving +# actor's inbox log. Step 5c wires this into the HTTP handler. + +set -uo pipefail +cd "$(git rev-parse --show-toplevel)" + +SX_SERVER="${SX_SERVER:-hosts/ocaml/_build/default/bin/sx_server.exe}" +if [ ! -x "$SX_SERVER" ]; then + SX_SERVER="/root/rose-ash/hosts/ocaml/_build/default/bin/sx_server.exe" +fi +if [ ! -x "$SX_SERVER" ]; then + echo "ERROR: sx_server.exe not found." >&2 + exit 1 +fi + +VERBOSE="${1:-}" +PASS=0; FAIL=0; ERRORS="" +TMPFILE=$(mktemp); trap "rm -f $TMPFILE" EXIT + +# Bob (the peer) signs activities with K1. Alice (the recipient) has +# PeerAS = Bob's actor-state (with Bob's public key). The InboxLog is +# Alice's :actor_inbox bucket. +SETUP='K1 = <<1,2,3,4>>, K1S = [{key_id,k1},{algorithm,ed25519},{value,K1}], BobAS = [{public_keys,[[{id,k1},{created,0},{value,K1}]]}], K2 = <<9,9,9,9>>, EvilAS = [{public_keys,[[{id,k1},{created,0},{value,K2}]]}], Env = outbox:construct(note, bob, 1, [{content,hi}]), Signed = outbox:sign(Env, K1S), {ok, FreshInbox} = log:open(alice, <<105,110,98>>),' + +cat > "$TMPFILE" < ok +(epoch 10) +(eval "(get (erlang-eval-ast \"${SETUP} pipeline:validate_inbound(Signed, BobAS, FreshInbox) =:= ok\") :name)") + +;; Tampered envelope (broken shape) -> {error, invalid_shape} +(epoch 11) +(eval "(get (erlang-eval-ast \"${SETUP} Bad = [{type,note}], case pipeline:validate_inbound(Bad, BobAS, FreshInbox) of {error, _} -> ok; _ -> bad end\") :name)") + +;; Activity sans :signature -> stage_envelope rejects as +;; {missing_field, signature} (short-circuit before sig stage) +(epoch 12) +(eval "(get (erlang-eval-ast \"${SETUP} Unsigned = Env, case pipeline:validate_inbound(Unsigned, BobAS, FreshInbox) of {error, {missing_field, signature}} -> ok; _ -> bad end\") :name)") + +;; Wrong peer AS (EvilAS doesn't carry Bob's key bytes) -> bad_signature +(epoch 13) +(eval "(get (erlang-eval-ast \"${SETUP} case pipeline:validate_inbound(Signed, EvilAS, FreshInbox) of {error, bad_signature} -> ok; _ -> bad end\") :name)") + +;; Pre-populated inbox containing the same activity -> {error, replay} +(epoch 14) +(eval "(get (erlang-eval-ast \"${SETUP} {ok, InboxWithMsg, _} = log:append(FreshInbox, Signed), case pipeline:validate_inbound(Signed, BobAS, InboxWithMsg) of {error, replay} -> ok; _ -> bad end\") :name)") + +;; Inbox with a DIFFERENT activity doesn't trigger replay +(epoch 15) +(eval "(get (erlang-eval-ast \"${SETUP} Other = [{type,note},{object,[{content,hello}]},{id,<<200,1>>}], {ok, InboxWithOther, _} = log:append(FreshInbox, Other), pipeline:validate_inbound(Signed, BobAS, InboxWithOther) =:= ok\") :name)") + +;; inbound_stages/2 returns 3 stages +(epoch 16) +(eval "(get (erlang-eval-ast \"${SETUP} length(pipeline:inbound_stages(BobAS, FreshInbox)) =:= 3\") :name)") + +;; inbound_stages/0 stays at 1 stage (back-compat for outbox-side callers) +(epoch 17) +(eval "(get (erlang-eval-ast \"length(pipeline:inbound_stages()) =:= 1\") :name)") + +;; validate_inbound/1 still works (envelope-only fast path) +(epoch 18) +(eval "(get (erlang-eval-ast \"${SETUP} pipeline:validate_inbound(Signed) =:= ok\") :name)") + +;; Stages compose: envelope failure short-circuits before sig +(epoch 19) +(eval "(get (erlang-eval-ast \"${SETUP} BadShape = [{type,note}], case pipeline:validate_inbound(BadShape, EvilAS, FreshInbox) of {error, _} -> ok; _ -> bad end\") :name)") + +;; Sig failure short-circuits before replay +(epoch 20) +(eval "(get (erlang-eval-ast \"${SETUP} {ok, InboxWithMsg, _} = log:append(FreshInbox, Signed), case pipeline:validate_inbound(Signed, EvilAS, InboxWithMsg) of {error, bad_signature} -> ok; _ -> bad end\") :name)") + +;; Two distinct peer activities both verify (different :published seq -> different :id) +(epoch 21) +(eval "(get (erlang-eval-ast \"${SETUP} Env2 = outbox:construct(note, bob, 2, [{content,hi}]), Signed2 = outbox:sign(Env2, K1S), pipeline:validate_inbound(Signed, BobAS, FreshInbox) =:= ok andalso pipeline:validate_inbound(Signed2, BobAS, FreshInbox) =:= ok\") :name)") + +;; Inbox with peer1's activity doesn't replay peer2's +(epoch 22) +(eval "(get (erlang-eval-ast \"${SETUP} Env2 = outbox:construct(note, bob, 2, [{content,hi}]), Signed2 = outbox:sign(Env2, K1S), {ok, InboxA, _} = log:append(FreshInbox, Signed), pipeline:validate_inbound(Signed2, BobAS, InboxA) =:= ok\") :name)") +EPOCHS + +OUTPUT=$(timeout 240 "$SX_SERVER" < "$TMPFILE" 2>/dev/null) + +check() { + local epoch="$1" desc="$2" expected="$3" + local actual + actual=$(echo "$OUTPUT" | awk -v e="$epoch" ' + $0 ~ "^\\(ok-len " e " " { getline; print; exit } + $0 ~ "^\\(ok " e " " { print; exit } + $0 ~ "^\\(error " e " " { print; exit } + ') + [ -z "$actual" ] && actual="" + if echo "$actual" | grep -qF -- "$expected"; then + PASS=$((PASS+1)) + [ "$VERBOSE" = "-v" ] && echo " ok $desc" + else + FAIL=$((FAIL+1)) + ERRORS+=" FAIL [$desc] (epoch $epoch) expected: $expected | actual: $actual +" + fi +} + +check 4 "pipeline module loaded" "pipeline" +check 10 "happy path -> ok" "true" +check 11 "bad envelope shape -> {error, _}" "ok" +check 12 "unsigned -> missing_field rejection" "ok" +check 13 "wrong peer AS -> bad_signature" "ok" +check 14 "duplicate activity -> replay" "ok" +check 15 "different activity, no replay" "true" +check 16 "inbound_stages/2 -> 3 stages" "true" +check 17 "inbound_stages/0 -> 1 stage" "true" +check 18 "validate_inbound/1 still works" "true" +check 19 "shape fail short-circuits sig" "ok" +check 20 "sig fail short-circuits replay" "ok" +check 21 "two distinct activities verify" "true" +check 22 "inbox-of-one doesn't replay other" "true" + +TOTAL=$((PASS+FAIL)) +if [ $FAIL -eq 0 ]; then + echo "ok $PASS/$TOTAL next/tests/inbox_pipeline.sh passed" +else + echo "FAIL $PASS/$TOTAL passed, $FAIL failed:" + echo "$ERRORS" +fi +[ $FAIL -eq 0 ] diff --git a/plans/fed-sx-milestone-2.md b/plans/fed-sx-milestone-2.md index f2cf0e3d..9541771a 100644 --- a/plans/fed-sx-milestone-2.md +++ b/plans/fed-sx-milestone-2.md @@ -344,12 +344,22 @@ actor *received*), and broadcasts to projections. fully independent (appending to one doesn't touch the other). `next/tests/inbox_bucket.sh` 14/14. Signature verification + pipeline gating live in 5b. -- [ ] **5b** — Inbound validation pipeline: - `pipeline:validate_inbound/2(Activity, PeerActorState)` runs - `stage_envelope` → `stage_signature(PeerAS)` → `stage_replay(InboxLog)`. - Sig verification uses the peer's actor-state `:public_keys`, NOT - the local kernel's. Peer-AS resolution is the caller's - responsibility for 5b (5c wires the cache lookup). +- [x] **5b** — Inbound validation pipeline. New + `pipeline:validate_inbound/3(Activity, PeerActorState, InboxLog)` + runs the federation inbound stage list — `stage_envelope` → + `stage_signature(PeerAS)` → `stage_replay(InboxLog)` — halting + on the first failure. New helper `inbound_stages/2(PeerAS, InboxLog)` + exposes the stage list for callers that want to splice extra + stages. Existing `validate_inbound/1` and the static + `inbound_stages/0` (envelope-only) stay untouched so outbox-side + callers don't have to re-key on a peer-AS they don't have. Sig + verification uses the peer's actor-state `:public_keys`, NOT the + local kernel's; peer-AS resolution is the caller's responsibility + (Step 5c wires the cache lookup). 14 cases in + `inbox_pipeline.sh`: happy path, bad shape, missing :signature + (rejected by stage_envelope before sig runs), wrong peer AS + (bad_signature), replay against inbox, distinct activities both + verify, stage short-circuit ordering verified. - [ ] **5c** — Peer-actors cache projection (`peer_actors.erl`): on first inbound from a new peer, fetches the peer's actor doc and caches the public-keys. v2: synchronous fetch via the @@ -770,6 +780,22 @@ proceed. Newest first. +- **2026-06-06** — Step 5b: federation inbound pipeline. + `pipeline:validate_inbound/3(Activity, PeerAS, InboxLog)` runs + `stage_envelope` → `stage_signature(PeerAS)` → `stage_replay(InboxLog)` + in order, halting on first failure. New `inbound_stages/2` + helper returns the 3-stage list. M1's `validate_inbound/1` + + static `inbound_stages/0` (envelope-only) preserved for outbox- + side callers. 14/14 in `inbox_pipeline.sh` covering happy path, + bad shape, missing :signature, wrong peer AS, replay against + inbox, distinct activities both verify, stage short-circuit + ordering. Sig verification routes through the peer's AS (not the + local kernel's) — Step 5c will wire the cache lookup. Conformance + 761/761. 130/130 across 10 Step-5-adjacent suites + (pipeline_envelope, pipeline_signature, pipeline_replay, + pipeline_driver, inbox_pipeline, inbox_bucket, nx_kernel_multi, + bootstrap_start, http_publish, outbox_publish, smoke_app_pure). + - **2026-06-06** — Step 5a: per-actor :actor_inbox log bucket. `nx_kernel.erl` `add_actor/4` now opens a fresh log via `log:open/2` with a distinct `inbox_base_stub()` for each new