fed-sx-m2: Step 5a — per-actor :actor_inbox log bucket + 14 tests
Some checks failed
Test, Build, and Deploy / test-build-deploy (push) Failing after 21s

Adds the receiving-side log bucket every actor needs. add_actor/4
now opens a fresh in-memory log via log:open(ActorId, inbox_base_stub())
and stores it on the bucket as {actor_inbox, LogState} alongside
the outbox {log, _}. Two distinct base stubs ensure the in-memory
log module returns separate states even when the same ActorId is
the actor.

Pure-functional exports:
  actor_inbox_state/2(ActorId, State) -> {ok, LogState} | {error, _}
  actor_inbox_tip/2(ActorId, State) -> integer | nil
  append_to_actor_inbox/3(ActorId, Activity, State)
      -> {ok, NewTip, NewState} | {error, no_actor, State}

gen_server exports (mirror the outbox shape):
  inbox_tip_for/1(ActorId) -> integer | nil
  inbox_state_for/1(ActorId) -> {ok, LogState} | {error, _}
  append_inbox/2(ActorId, Activity) -> {ok, NewTip} | {error, _}

handle_call dispatch added for all three.

Inbox and outbox tips are completely independent — appending to one
doesn't touch the other. This is the storage primitive 5b will
build the inbound validation pipeline on top of.

log:append/2 signature noted in code + progress log: it takes
(LogState, Activity) and returns {ok, NewState, Seq} — not
{ok, NewState} as I originally guessed.

next/tests/inbox_bucket.sh 14/14:
  - fresh inbox tip = 0 (pure)
  - actor_inbox_state {ok, _} (pure)
  - append_to_actor_inbox/3 -> {ok, 1, _}
  - tip advances after append
  - unknown actor -> {error, no_actor, _}
  - outbox + inbox tips fully independent
  - two actors maintain independent inbox state
  - gen_server inbox_tip_for/1 starts at 0
  - gen_server append_inbox/2 -> {ok, 1}
  - gen_server inbox != outbox tip
  - gen_server unknown -> {error, no_actor}
  - gen_server inbox_state_for {ok, _}
  - two appends -> tip = 2

Conformance 761/761. 125/125 across 7 Step-5-adjacent suites
(inbox_bucket, nx_kernel_multi, nx_kernel_server, bootstrap_start,
http_publish, http_multi_actor, actor_lifecycle, smoke_app_pure).
This commit is contained in:
2026-06-06 15:58:17 +00:00
parent a23a2eb95a
commit bc4b23cc62
3 changed files with 247 additions and 18 deletions

View File

@@ -9,6 +9,8 @@
actor_id/1, log_state/1, log_tip/1,
key_spec/1, actor_state/1, projections/1, next_published/1,
actor_log_state/2, actor_log_tip/2,
actor_inbox_state/2, actor_inbox_tip/2,
append_to_actor_inbox/3,
actor_key_spec/2, actor_state/2, actor_projections/2,
actor_next_published/2, actor_bucket/2,
with_projections/2, with_actor_projections/3,
@@ -18,6 +20,7 @@
-export([start_link/3, publish/1, query/0, log_tip/0,
with_projections/1, stop/0,
add_actor/3, publish_to/2, log_tip_for/1, log_state_for/1,
inbox_tip_for/1, inbox_state_for/1, append_inbox/2,
actors/0, state_for/1, bucket_for/1,
with_projections_for/2,
bootstrap_actor/3]).
@@ -61,9 +64,11 @@ add_actor(ActorId, KeySpec, AS, State) ->
{error, already_present};
false ->
{ok, L0} = log:open(ActorId, base_stub()),
{ok, I0} = log:open(ActorId, inbox_base_stub()),
Bucket = [{key_spec, KeySpec},
{actor_state, AS},
{log, L0},
{actor_inbox, I0},
{projections, []},
{next_published, 1}],
Seq = field(next_actor_seq, State),
@@ -183,6 +188,34 @@ actor_log_tip(ActorId, State) ->
{error, _} -> nil
end.
actor_inbox_state(ActorId, State) ->
case actor_bucket(ActorId, State) of
{ok, B} -> {ok, field(actor_inbox, B)};
{error, _} -> {error, no_actor}
end.
actor_inbox_tip(ActorId, State) ->
case actor_inbox_state(ActorId, State) of
{ok, I} -> log:tip(I);
{error, _} -> nil
end.
%% append_to_actor_inbox/3 — pure-functional inbox append. Mirrors
%% publish/3's bucket-update shape; the activity is already signed
%% + validated by the time it lands here (Step 5's pipeline handles
%% sig verify + replay before this call).
append_to_actor_inbox(ActorId, Activity, State) ->
case actor_bucket(ActorId, State) of
{error, no_actor} ->
{error, no_actor, State};
{ok, Bucket} ->
Inbox = field(actor_inbox, Bucket),
{ok, NewInbox, _Seq} = log:append(Inbox, Activity),
B1 = set(actor_inbox, NewInbox, Bucket),
{ok, log:tip(NewInbox), set_bucket(ActorId, B1, State)}
end.
actor_key_spec(ActorId, State) ->
case actor_bucket(ActorId, State) of
{ok, B} -> {ok, field(key_spec, B)};
@@ -243,6 +276,13 @@ next_published(State) ->
base_stub() ->
<<98,97,115,101,95,115,116,117,98>>.
%% "inbox_base_stub" — distinct path stub so the in-memory log
%% module's open/2 returns a fresh log state for the per-actor
%% inbox bucket. Disk paths will namespace on this once Step 3b
%% on-disk persistence is reactivated for inbox buckets.
inbox_base_stub() ->
<<105,110,98,111,120,95,115,116,117,98>>.
bucket_field(Key, State) ->
case field(actors, State) of
[] -> nil;
@@ -324,6 +364,15 @@ log_tip_for(ActorId) ->
log_state_for(ActorId) ->
gen_server:call(nx_kernel, {log_state_for, ActorId}).
inbox_tip_for(ActorId) ->
gen_server:call(nx_kernel, {inbox_tip_for, ActorId}).
inbox_state_for(ActorId) ->
gen_server:call(nx_kernel, {inbox_state_for, ActorId}).
append_inbox(ActorId, Activity) ->
gen_server:call(nx_kernel, {append_inbox, ActorId, Activity}).
actors() ->
gen_server:call(nx_kernel, get_actors).
@@ -371,6 +420,15 @@ handle_call({log_tip_for, ActorId}, _From, State) ->
{reply, actor_log_tip(ActorId, State), State};
handle_call({log_state_for, ActorId}, _From, State) ->
{reply, actor_log_state(ActorId, State), State};
handle_call({inbox_tip_for, ActorId}, _From, State) ->
{reply, actor_inbox_tip(ActorId, State), State};
handle_call({inbox_state_for, ActorId}, _From, State) ->
{reply, actor_inbox_state(ActorId, State), State};
handle_call({append_inbox, ActorId, Activity}, _From, State) ->
case append_to_actor_inbox(ActorId, Activity, State) of
{ok, Tip, NewState} -> {reply, {ok, Tip}, NewState};
{error, Reason, Same} -> {reply, {error, Reason}, Same}
end;
handle_call(get_actors, _From, State) ->
{reply, actors(State), State};
handle_call({state_for, ActorId}, _From, State) ->

147
next/tests/inbox_bucket.sh Executable file
View File

@@ -0,0 +1,147 @@
#!/usr/bin/env bash
# next/tests/inbox_bucket.sh — m2 Step 5a test.
#
# Per-actor :actor_inbox log bucket added to nx_kernel state. The
# inbox is a separate log from the outbox (:log) so peer-delivered
# activities don't interfere with the actor's own publish stream.
# Step 5b layers the signature-verify pipeline on top, Step 5c
# wires the http handler.
set -uo pipefail
cd "$(git rev-parse --show-toplevel)"
SX_SERVER="${SX_SERVER:-hosts/ocaml/_build/default/bin/sx_server.exe}"
if [ ! -x "$SX_SERVER" ]; then
SX_SERVER="/root/rose-ash/hosts/ocaml/_build/default/bin/sx_server.exe"
fi
if [ ! -x "$SX_SERVER" ]; then
echo "ERROR: sx_server.exe not found." >&2
exit 1
fi
VERBOSE="${1:-}"
PASS=0; FAIL=0; ERRORS=""
TMPFILE=$(mktemp); trap "rm -f $TMPFILE" EXIT
PRELUDE='K = <<1,2,3,4>>, KS = [{key_id,k1},{algorithm,ed25519},{value,K}], AS = [{public_keys,[[{id,k1},{created,0},{value,K}]]}], Act = [{type,note},{object,[{content,hi}]},{id,<<100,1>>},{actor,bob}],'
cat > "$TMPFILE" <<EPOCHS
(epoch 1)
(load "lib/erlang/tokenizer.sx")
(load "lib/erlang/parser.sx")
(load "lib/erlang/parser-core.sx")
(load "lib/erlang/parser-expr.sx")
(load "lib/erlang/parser-module.sx")
(load "lib/erlang/transpile.sx")
(load "lib/erlang/runtime.sx")
(load "lib/erlang/vm/dispatcher.sx")
(epoch 2)
(eval "(er-load-gen-server!)")
(epoch 3)
(eval "(get (erlang-load-module (file-read \"next/kernel/envelope.erl\")) :name)")
(epoch 4)
(eval "(get (erlang-load-module (file-read \"next/kernel/log.erl\")) :name)")
(epoch 5)
(eval "(get (erlang-load-module (file-read \"next/kernel/pipeline.erl\")) :name)")
(epoch 6)
(eval "(get (erlang-load-module (file-read \"next/kernel/outbox.erl\")) :name)")
(epoch 7)
(eval "(get (erlang-load-module (file-read \"next/kernel/nx_kernel.erl\")) :name)")
;; Fresh actor has inbox tip 0 (pure state)
(epoch 10)
(eval "(erlang-eval-ast \"${PRELUDE} {ok, S} = nx_kernel:add_actor(alice, KS, AS, nx_kernel:new()), nx_kernel:actor_inbox_tip(alice, S)\")")
;; actor_inbox_state returns the log state
(epoch 11)
(eval "(get (erlang-eval-ast \"${PRELUDE} {ok, S} = nx_kernel:add_actor(alice, KS, AS, nx_kernel:new()), case nx_kernel:actor_inbox_state(alice, S) of {ok, _} -> ok; _ -> bad end\") :name)")
;; append_to_actor_inbox/3 returns {ok, Tip, NewState}
(epoch 12)
(eval "(get (erlang-eval-ast \"${PRELUDE} {ok, S} = nx_kernel:add_actor(alice, KS, AS, nx_kernel:new()), case nx_kernel:append_to_actor_inbox(alice, Act, S) of {ok, 1, _} -> ok; _ -> bad end\") :name)")
;; After append, actor_inbox_tip advances
(epoch 13)
(eval "(erlang-eval-ast \"${PRELUDE} {ok, S0} = nx_kernel:add_actor(alice, KS, AS, nx_kernel:new()), {ok, _, S1} = nx_kernel:append_to_actor_inbox(alice, Act, S0), nx_kernel:actor_inbox_tip(alice, S1)\")")
;; append to unknown actor -> {error, no_actor, State}
(epoch 14)
(eval "(get (erlang-eval-ast \"${PRELUDE} case nx_kernel:append_to_actor_inbox(ghost, Act, nx_kernel:new()) of {error, no_actor, _} -> ok; _ -> bad end\") :name)")
;; Outbox tip is independent of inbox tip
(epoch 15)
(eval "(get (erlang-eval-ast \"${PRELUDE} {ok, S0} = nx_kernel:add_actor(alice, KS, AS, nx_kernel:new()), {ok, _, S1} = nx_kernel:append_to_actor_inbox(alice, Act, S0), {nx_kernel:actor_log_tip(alice, S1), nx_kernel:actor_inbox_tip(alice, S1)} =:= {0, 1}\") :name)")
;; Two actors maintain independent inbox state
(epoch 16)
(eval "(get (erlang-eval-ast \"${PRELUDE} {ok, S0} = nx_kernel:add_actor(alice, KS, AS, nx_kernel:new()), {ok, S1} = nx_kernel:add_actor(bob, KS, AS, S0), {ok, _, S2} = nx_kernel:append_to_actor_inbox(alice, Act, S1), {nx_kernel:actor_inbox_tip(alice, S2), nx_kernel:actor_inbox_tip(bob, S2)} =:= {1, 0}\") :name)")
;; gen_server inbox_tip_for/1 starts at 0
(epoch 17)
(eval "(erlang-eval-ast \"${PRELUDE} nx_kernel:start_link(alice, KS, AS), nx_kernel:inbox_tip_for(alice)\")")
;; gen_server append_inbox/2 advances tip
(epoch 18)
(eval "(get (erlang-eval-ast \"${PRELUDE} nx_kernel:start_link(alice, KS, AS), case nx_kernel:append_inbox(alice, Act) of {ok, 1} -> ok; _ -> bad end\") :name)")
;; gen_server inbox is independent of outbox
(epoch 19)
(eval "(get (erlang-eval-ast \"${PRELUDE} nx_kernel:start_link(alice, KS, AS), nx_kernel:append_inbox(alice, Act), {nx_kernel:log_tip_for(alice), nx_kernel:inbox_tip_for(alice)} =:= {0, 1}\") :name)")
;; gen_server append_inbox to unknown actor -> {error, no_actor}
(epoch 20)
(eval "(get (erlang-eval-ast \"${PRELUDE} nx_kernel:start_link(alice, KS, AS), case nx_kernel:append_inbox(ghost, Act) of {error, no_actor} -> ok; _ -> bad end\") :name)")
;; gen_server inbox_state_for returns the log state
(epoch 21)
(eval "(get (erlang-eval-ast \"${PRELUDE} nx_kernel:start_link(alice, KS, AS), case nx_kernel:inbox_state_for(alice) of {ok, _} -> ok; _ -> bad end\") :name)")
;; gen_server: append two activities, tip = 2; outbox tip unchanged
(epoch 22)
(eval "(get (erlang-eval-ast \"${PRELUDE} Act2 = [{type,note},{object,[{content,hi2}]},{id,<<100,2>>},{actor,bob}], nx_kernel:start_link(alice, KS, AS), nx_kernel:append_inbox(alice, Act), nx_kernel:append_inbox(alice, Act2), {nx_kernel:inbox_tip_for(alice), nx_kernel:log_tip_for(alice)} =:= {2, 0}\") :name)")
EPOCHS
OUTPUT=$(timeout 240 "$SX_SERVER" < "$TMPFILE" 2>/dev/null)
check() {
local epoch="$1" desc="$2" expected="$3"
local actual
actual=$(echo "$OUTPUT" | awk -v e="$epoch" '
$0 ~ "^\\(ok-len " e " " { getline; print; exit }
$0 ~ "^\\(ok " e " " { print; exit }
$0 ~ "^\\(error " e " " { print; exit }
')
[ -z "$actual" ] && actual="<no output for epoch $epoch>"
if echo "$actual" | grep -qF -- "$expected"; then
PASS=$((PASS+1))
[ "$VERBOSE" = "-v" ] && echo " ok $desc"
else
FAIL=$((FAIL+1))
ERRORS+=" FAIL [$desc] (epoch $epoch) expected: $expected | actual: $actual
"
fi
}
check 7 "nx_kernel module loaded" "nx_kernel"
check 10 "fresh actor inbox tip = 0" "0"
check 11 "actor_inbox_state {ok, _}" "ok"
check 12 "append_to_actor_inbox/3 returns" "ok"
check 13 "append advances tip to 1" "1"
check 14 "append unknown -> no_actor" "ok"
check 15 "outbox tip independent of inbox" "true"
check 16 "two actors independent inboxes" "true"
check 17 "gen_server inbox_tip = 0" "0"
check 18 "gen_server append_inbox/2 -> ok" "ok"
check 19 "gen_server inbox != outbox" "true"
check 20 "gen_server append unknown -> err" "ok"
check 21 "gen_server inbox_state_for ok" "ok"
check 22 "two appends tip = 2" "true"
TOTAL=$((PASS+FAIL))
if [ $FAIL -eq 0 ]; then
echo "ok $PASS/$TOTAL next/tests/inbox_bucket.sh passed"
else
echo "FAIL $PASS/$TOTAL passed, $FAIL failed:"
echo "$ERRORS"
fi
[ $FAIL -eq 0 ]

View File

@@ -334,24 +334,33 @@ actor *received*), and broadcasts to projections.
**Deliverables:**
- New per-actor log: `actor_inbox`. Same shape as outbox; activities
marked `:received_from => PeerActorId`.
- Inbound pipeline: `stage_envelope` → `stage_signature` (against
peer's actor-state, not local) → `stage_replay`.
- Peer signature verification needs `:public_keys` from the peer's
actor-state. v2 fetches the peer's actor doc lazily on first
contact, caches it in a `peer-actors` projection. Stale-key
invalidation deferred to v3.
- HTTP handler: `POST /actors/<id>/inbox` returns 202 on accept,
401 on bad sig, 422 on replay or validation failure.
**Tests:**
- POST /inbox with valid signed activity → 202, activity in inbox log.
- POST /inbox with tampered envelope → 401.
- POST /inbox with unknown actor target → 404.
- POST /inbox with replay → 422.
- Activity broadcast to receiving actor's projections.
- [x] **5a** — Per-actor `:actor_inbox` log bucket in nx_kernel.
`add_actor/4` now opens a fresh inbox log (distinct base stub) for
each new actor; the bucket carries `[..., {actor_inbox, LogState}, ...]`
alongside the existing `:log` outbox field. Pure-functional
exports: `actor_inbox_state/2`, `actor_inbox_tip/2`,
`append_to_actor_inbox/3`. gen_server exports: `inbox_tip_for/1`,
`inbox_state_for/1`, `append_inbox/2`. Inbox and outbox tips are
fully independent (appending to one doesn't touch the other).
`next/tests/inbox_bucket.sh` 14/14. Signature verification +
pipeline gating live in 5b.
- [ ] **5b** — Inbound validation pipeline:
`pipeline:validate_inbound/2(Activity, PeerActorState)` runs
`stage_envelope` → `stage_signature(PeerAS)` → `stage_replay(InboxLog)`.
Sig verification uses the peer's actor-state `:public_keys`, NOT
the local kernel's. Peer-AS resolution is the caller's
responsibility for 5b (5c wires the cache lookup).
- [ ] **5c** — Peer-actors cache projection (`peer_actors.erl`):
on first inbound from a new peer, fetches the peer's actor doc
and caches the public-keys. v2: synchronous fetch via the
http-client native primitive. Per design §13.6, stale-key
invalidation is v3.
- [ ] **5d** — http_server inbox handler wires the chain:
`POST /actors/<id>/inbox` body is the signed activity wire bytes;
parse → resolve peer-AS → `validate_inbound` → `append_inbox` →
202 on accept, 401 on bad sig, 422 on replay/shape failure,
404 on unknown target actor. Activity broadcast to receiving
actor's projections (via `projection:async_fold`).
**Acceptance:** `bash next/tests/inbox.sh` passes 16+ cases.
@@ -761,6 +770,21 @@ proceed.
Newest first.
- **2026-06-06** — Step 5a: per-actor :actor_inbox log bucket.
`nx_kernel.erl` `add_actor/4` now opens a fresh log via
`log:open/2` with a distinct `inbox_base_stub()` for each new
bucket and stores it as `{actor_inbox, LogState}` alongside the
existing outbox `:log`. Pure exports `actor_inbox_state/2`,
`actor_inbox_tip/2`, `append_to_actor_inbox/3` + gen_server
exports `inbox_tip_for/1`, `inbox_state_for/1`, `append_inbox/2`.
`log:append/2` is `(LogState, Activity) -> {ok, NewState, Seq}` —
noted for future iterations. Inbox / outbox tips are fully
independent. `next/tests/inbox_bucket.sh` 14/14. Conformance
761/761. 125/125 across 7 Step-5-adjacent suites
(inbox_bucket, nx_kernel_multi, nx_kernel_server,
bootstrap_start, http_publish, http_multi_actor, actor_lifecycle,
smoke_app_pure).
- **2026-06-06** — Step 4d: per-actor outbox listing + pagination.
New `nx_kernel:log_state_for/1` gen_server export returns
`{ok, LogState}` for an actor. `actor_outbox_response_for/3`