diff --git a/next/kernel/nx_kernel.erl b/next/kernel/nx_kernel.erl index da98c6c4..4b6288ab 100644 --- a/next/kernel/nx_kernel.erl +++ b/next/kernel/nx_kernel.erl @@ -9,6 +9,8 @@ actor_id/1, log_state/1, log_tip/1, key_spec/1, actor_state/1, projections/1, next_published/1, actor_log_state/2, actor_log_tip/2, + actor_inbox_state/2, actor_inbox_tip/2, + append_to_actor_inbox/3, actor_key_spec/2, actor_state/2, actor_projections/2, actor_next_published/2, actor_bucket/2, with_projections/2, with_actor_projections/3, @@ -18,6 +20,7 @@ -export([start_link/3, publish/1, query/0, log_tip/0, with_projections/1, stop/0, add_actor/3, publish_to/2, log_tip_for/1, log_state_for/1, + inbox_tip_for/1, inbox_state_for/1, append_inbox/2, actors/0, state_for/1, bucket_for/1, with_projections_for/2, bootstrap_actor/3]). @@ -61,9 +64,11 @@ add_actor(ActorId, KeySpec, AS, State) -> {error, already_present}; false -> {ok, L0} = log:open(ActorId, base_stub()), + {ok, I0} = log:open(ActorId, inbox_base_stub()), Bucket = [{key_spec, KeySpec}, {actor_state, AS}, {log, L0}, + {actor_inbox, I0}, {projections, []}, {next_published, 1}], Seq = field(next_actor_seq, State), @@ -183,6 +188,34 @@ actor_log_tip(ActorId, State) -> {error, _} -> nil end. +actor_inbox_state(ActorId, State) -> + case actor_bucket(ActorId, State) of + {ok, B} -> {ok, field(actor_inbox, B)}; + {error, _} -> {error, no_actor} + end. + +actor_inbox_tip(ActorId, State) -> + case actor_inbox_state(ActorId, State) of + {ok, I} -> log:tip(I); + {error, _} -> nil + end. + +%% append_to_actor_inbox/3 — pure-functional inbox append. Mirrors +%% publish/3's bucket-update shape; the activity is already signed +%% + validated by the time it lands here (Step 5's pipeline handles +%% sig verify + replay before this call). + +append_to_actor_inbox(ActorId, Activity, State) -> + case actor_bucket(ActorId, State) of + {error, no_actor} -> + {error, no_actor, State}; + {ok, Bucket} -> + Inbox = field(actor_inbox, Bucket), + {ok, NewInbox, _Seq} = log:append(Inbox, Activity), + B1 = set(actor_inbox, NewInbox, Bucket), + {ok, log:tip(NewInbox), set_bucket(ActorId, B1, State)} + end. + actor_key_spec(ActorId, State) -> case actor_bucket(ActorId, State) of {ok, B} -> {ok, field(key_spec, B)}; @@ -243,6 +276,13 @@ next_published(State) -> base_stub() -> <<98,97,115,101,95,115,116,117,98>>. +%% "inbox_base_stub" — distinct path stub so the in-memory log +%% module's open/2 returns a fresh log state for the per-actor +%% inbox bucket. Disk paths will namespace on this once Step 3b +%% on-disk persistence is reactivated for inbox buckets. +inbox_base_stub() -> + <<105,110,98,111,120,95,115,116,117,98>>. + bucket_field(Key, State) -> case field(actors, State) of [] -> nil; @@ -324,6 +364,15 @@ log_tip_for(ActorId) -> log_state_for(ActorId) -> gen_server:call(nx_kernel, {log_state_for, ActorId}). +inbox_tip_for(ActorId) -> + gen_server:call(nx_kernel, {inbox_tip_for, ActorId}). + +inbox_state_for(ActorId) -> + gen_server:call(nx_kernel, {inbox_state_for, ActorId}). + +append_inbox(ActorId, Activity) -> + gen_server:call(nx_kernel, {append_inbox, ActorId, Activity}). + actors() -> gen_server:call(nx_kernel, get_actors). @@ -371,6 +420,15 @@ handle_call({log_tip_for, ActorId}, _From, State) -> {reply, actor_log_tip(ActorId, State), State}; handle_call({log_state_for, ActorId}, _From, State) -> {reply, actor_log_state(ActorId, State), State}; +handle_call({inbox_tip_for, ActorId}, _From, State) -> + {reply, actor_inbox_tip(ActorId, State), State}; +handle_call({inbox_state_for, ActorId}, _From, State) -> + {reply, actor_inbox_state(ActorId, State), State}; +handle_call({append_inbox, ActorId, Activity}, _From, State) -> + case append_to_actor_inbox(ActorId, Activity, State) of + {ok, Tip, NewState} -> {reply, {ok, Tip}, NewState}; + {error, Reason, Same} -> {reply, {error, Reason}, Same} + end; handle_call(get_actors, _From, State) -> {reply, actors(State), State}; handle_call({state_for, ActorId}, _From, State) -> diff --git a/next/tests/inbox_bucket.sh b/next/tests/inbox_bucket.sh new file mode 100755 index 00000000..60a33b54 --- /dev/null +++ b/next/tests/inbox_bucket.sh @@ -0,0 +1,147 @@ +#!/usr/bin/env bash +# next/tests/inbox_bucket.sh — m2 Step 5a test. +# +# Per-actor :actor_inbox log bucket added to nx_kernel state. The +# inbox is a separate log from the outbox (:log) so peer-delivered +# activities don't interfere with the actor's own publish stream. +# Step 5b layers the signature-verify pipeline on top, Step 5c +# wires the http handler. + +set -uo pipefail +cd "$(git rev-parse --show-toplevel)" + +SX_SERVER="${SX_SERVER:-hosts/ocaml/_build/default/bin/sx_server.exe}" +if [ ! -x "$SX_SERVER" ]; then + SX_SERVER="/root/rose-ash/hosts/ocaml/_build/default/bin/sx_server.exe" +fi +if [ ! -x "$SX_SERVER" ]; then + echo "ERROR: sx_server.exe not found." >&2 + exit 1 +fi + +VERBOSE="${1:-}" +PASS=0; FAIL=0; ERRORS="" +TMPFILE=$(mktemp); trap "rm -f $TMPFILE" EXIT + +PRELUDE='K = <<1,2,3,4>>, KS = [{key_id,k1},{algorithm,ed25519},{value,K}], AS = [{public_keys,[[{id,k1},{created,0},{value,K}]]}], Act = [{type,note},{object,[{content,hi}]},{id,<<100,1>>},{actor,bob}],' + +cat > "$TMPFILE" < ok; _ -> bad end\") :name)") + +;; append_to_actor_inbox/3 returns {ok, Tip, NewState} +(epoch 12) +(eval "(get (erlang-eval-ast \"${PRELUDE} {ok, S} = nx_kernel:add_actor(alice, KS, AS, nx_kernel:new()), case nx_kernel:append_to_actor_inbox(alice, Act, S) of {ok, 1, _} -> ok; _ -> bad end\") :name)") + +;; After append, actor_inbox_tip advances +(epoch 13) +(eval "(erlang-eval-ast \"${PRELUDE} {ok, S0} = nx_kernel:add_actor(alice, KS, AS, nx_kernel:new()), {ok, _, S1} = nx_kernel:append_to_actor_inbox(alice, Act, S0), nx_kernel:actor_inbox_tip(alice, S1)\")") + +;; append to unknown actor -> {error, no_actor, State} +(epoch 14) +(eval "(get (erlang-eval-ast \"${PRELUDE} case nx_kernel:append_to_actor_inbox(ghost, Act, nx_kernel:new()) of {error, no_actor, _} -> ok; _ -> bad end\") :name)") + +;; Outbox tip is independent of inbox tip +(epoch 15) +(eval "(get (erlang-eval-ast \"${PRELUDE} {ok, S0} = nx_kernel:add_actor(alice, KS, AS, nx_kernel:new()), {ok, _, S1} = nx_kernel:append_to_actor_inbox(alice, Act, S0), {nx_kernel:actor_log_tip(alice, S1), nx_kernel:actor_inbox_tip(alice, S1)} =:= {0, 1}\") :name)") + +;; Two actors maintain independent inbox state +(epoch 16) +(eval "(get (erlang-eval-ast \"${PRELUDE} {ok, S0} = nx_kernel:add_actor(alice, KS, AS, nx_kernel:new()), {ok, S1} = nx_kernel:add_actor(bob, KS, AS, S0), {ok, _, S2} = nx_kernel:append_to_actor_inbox(alice, Act, S1), {nx_kernel:actor_inbox_tip(alice, S2), nx_kernel:actor_inbox_tip(bob, S2)} =:= {1, 0}\") :name)") + +;; gen_server inbox_tip_for/1 starts at 0 +(epoch 17) +(eval "(erlang-eval-ast \"${PRELUDE} nx_kernel:start_link(alice, KS, AS), nx_kernel:inbox_tip_for(alice)\")") + +;; gen_server append_inbox/2 advances tip +(epoch 18) +(eval "(get (erlang-eval-ast \"${PRELUDE} nx_kernel:start_link(alice, KS, AS), case nx_kernel:append_inbox(alice, Act) of {ok, 1} -> ok; _ -> bad end\") :name)") + +;; gen_server inbox is independent of outbox +(epoch 19) +(eval "(get (erlang-eval-ast \"${PRELUDE} nx_kernel:start_link(alice, KS, AS), nx_kernel:append_inbox(alice, Act), {nx_kernel:log_tip_for(alice), nx_kernel:inbox_tip_for(alice)} =:= {0, 1}\") :name)") + +;; gen_server append_inbox to unknown actor -> {error, no_actor} +(epoch 20) +(eval "(get (erlang-eval-ast \"${PRELUDE} nx_kernel:start_link(alice, KS, AS), case nx_kernel:append_inbox(ghost, Act) of {error, no_actor} -> ok; _ -> bad end\") :name)") + +;; gen_server inbox_state_for returns the log state +(epoch 21) +(eval "(get (erlang-eval-ast \"${PRELUDE} nx_kernel:start_link(alice, KS, AS), case nx_kernel:inbox_state_for(alice) of {ok, _} -> ok; _ -> bad end\") :name)") + +;; gen_server: append two activities, tip = 2; outbox tip unchanged +(epoch 22) +(eval "(get (erlang-eval-ast \"${PRELUDE} Act2 = [{type,note},{object,[{content,hi2}]},{id,<<100,2>>},{actor,bob}], nx_kernel:start_link(alice, KS, AS), nx_kernel:append_inbox(alice, Act), nx_kernel:append_inbox(alice, Act2), {nx_kernel:inbox_tip_for(alice), nx_kernel:log_tip_for(alice)} =:= {2, 0}\") :name)") +EPOCHS + +OUTPUT=$(timeout 240 "$SX_SERVER" < "$TMPFILE" 2>/dev/null) + +check() { + local epoch="$1" desc="$2" expected="$3" + local actual + actual=$(echo "$OUTPUT" | awk -v e="$epoch" ' + $0 ~ "^\\(ok-len " e " " { getline; print; exit } + $0 ~ "^\\(ok " e " " { print; exit } + $0 ~ "^\\(error " e " " { print; exit } + ') + [ -z "$actual" ] && actual="" + if echo "$actual" | grep -qF -- "$expected"; then + PASS=$((PASS+1)) + [ "$VERBOSE" = "-v" ] && echo " ok $desc" + else + FAIL=$((FAIL+1)) + ERRORS+=" FAIL [$desc] (epoch $epoch) expected: $expected | actual: $actual +" + fi +} + +check 7 "nx_kernel module loaded" "nx_kernel" +check 10 "fresh actor inbox tip = 0" "0" +check 11 "actor_inbox_state {ok, _}" "ok" +check 12 "append_to_actor_inbox/3 returns" "ok" +check 13 "append advances tip to 1" "1" +check 14 "append unknown -> no_actor" "ok" +check 15 "outbox tip independent of inbox" "true" +check 16 "two actors independent inboxes" "true" +check 17 "gen_server inbox_tip = 0" "0" +check 18 "gen_server append_inbox/2 -> ok" "ok" +check 19 "gen_server inbox != outbox" "true" +check 20 "gen_server append unknown -> err" "ok" +check 21 "gen_server inbox_state_for ok" "ok" +check 22 "two appends tip = 2" "true" + +TOTAL=$((PASS+FAIL)) +if [ $FAIL -eq 0 ]; then + echo "ok $PASS/$TOTAL next/tests/inbox_bucket.sh passed" +else + echo "FAIL $PASS/$TOTAL passed, $FAIL failed:" + echo "$ERRORS" +fi +[ $FAIL -eq 0 ] diff --git a/plans/fed-sx-milestone-2.md b/plans/fed-sx-milestone-2.md index 22caeccf..f2cf0e3d 100644 --- a/plans/fed-sx-milestone-2.md +++ b/plans/fed-sx-milestone-2.md @@ -334,24 +334,33 @@ actor *received*), and broadcasts to projections. **Deliverables:** -- New per-actor log: `actor_inbox`. Same shape as outbox; activities - marked `:received_from => PeerActorId`. -- Inbound pipeline: `stage_envelope` → `stage_signature` (against - peer's actor-state, not local) → `stage_replay`. -- Peer signature verification needs `:public_keys` from the peer's - actor-state. v2 fetches the peer's actor doc lazily on first - contact, caches it in a `peer-actors` projection. Stale-key - invalidation deferred to v3. -- HTTP handler: `POST /actors//inbox` returns 202 on accept, - 401 on bad sig, 422 on replay or validation failure. - -**Tests:** - -- POST /inbox with valid signed activity → 202, activity in inbox log. -- POST /inbox with tampered envelope → 401. -- POST /inbox with unknown actor target → 404. -- POST /inbox with replay → 422. -- Activity broadcast to receiving actor's projections. +- [x] **5a** — Per-actor `:actor_inbox` log bucket in nx_kernel. + `add_actor/4` now opens a fresh inbox log (distinct base stub) for + each new actor; the bucket carries `[..., {actor_inbox, LogState}, ...]` + alongside the existing `:log` outbox field. Pure-functional + exports: `actor_inbox_state/2`, `actor_inbox_tip/2`, + `append_to_actor_inbox/3`. gen_server exports: `inbox_tip_for/1`, + `inbox_state_for/1`, `append_inbox/2`. Inbox and outbox tips are + fully independent (appending to one doesn't touch the other). + `next/tests/inbox_bucket.sh` 14/14. Signature verification + + pipeline gating live in 5b. +- [ ] **5b** — Inbound validation pipeline: + `pipeline:validate_inbound/2(Activity, PeerActorState)` runs + `stage_envelope` → `stage_signature(PeerAS)` → `stage_replay(InboxLog)`. + Sig verification uses the peer's actor-state `:public_keys`, NOT + the local kernel's. Peer-AS resolution is the caller's + responsibility for 5b (5c wires the cache lookup). +- [ ] **5c** — Peer-actors cache projection (`peer_actors.erl`): + on first inbound from a new peer, fetches the peer's actor doc + and caches the public-keys. v2: synchronous fetch via the + http-client native primitive. Per design §13.6, stale-key + invalidation is v3. +- [ ] **5d** — http_server inbox handler wires the chain: + `POST /actors//inbox` body is the signed activity wire bytes; + parse → resolve peer-AS → `validate_inbound` → `append_inbox` → + 202 on accept, 401 on bad sig, 422 on replay/shape failure, + 404 on unknown target actor. Activity broadcast to receiving + actor's projections (via `projection:async_fold`). **Acceptance:** `bash next/tests/inbox.sh` passes 16+ cases. @@ -761,6 +770,21 @@ proceed. Newest first. +- **2026-06-06** — Step 5a: per-actor :actor_inbox log bucket. + `nx_kernel.erl` `add_actor/4` now opens a fresh log via + `log:open/2` with a distinct `inbox_base_stub()` for each new + bucket and stores it as `{actor_inbox, LogState}` alongside the + existing outbox `:log`. Pure exports `actor_inbox_state/2`, + `actor_inbox_tip/2`, `append_to_actor_inbox/3` + gen_server + exports `inbox_tip_for/1`, `inbox_state_for/1`, `append_inbox/2`. + `log:append/2` is `(LogState, Activity) -> {ok, NewState, Seq}` — + noted for future iterations. Inbox / outbox tips are fully + independent. `next/tests/inbox_bucket.sh` 14/14. Conformance + 761/761. 125/125 across 7 Step-5-adjacent suites + (inbox_bucket, nx_kernel_multi, nx_kernel_server, + bootstrap_start, http_publish, http_multi_actor, actor_lifecycle, + smoke_app_pure). + - **2026-06-06** — Step 4d: per-actor outbox listing + pagination. New `nx_kernel:log_state_for/1` gen_server export returns `{ok, LogState}` for an actor. `actor_outbox_response_for/3`