fed-sx-m2: Step 5c — peer-actors cache + 19 tests
Some checks failed
Test, Build, and Deploy / test-build-deploy (push) Failing after 34s

New next/kernel/peer_actors.erl is the federation-side cache for
{PeerActorId, PeerActorState} entries. PeerAS is exactly the shape
envelope:verify_signature/2 reads (proplist with :public_keys), so
the inbox handler can pipe the cache hit straight into
pipeline:validate_inbound/3 from Step 5b.

Pure-functional API:
  new/0
  lookup/2(PeerId, State) -> {ok, PeerAS} | not_found
  store/3(PeerId, PeerAS, State) -> NewState
  evict/2(PeerId, State) -> NewState
  peers/1(State) -> [PeerId]
  lookup_or_fetch/3(PeerId, FetchFn, State)
      -> {ok, PeerAS, NewState}      cache hit returns unchanged State,
                                     miss stores FetchFn result.
      | {error, Reason, State}        FetchFn failure preserves cache.
      | {error, {bad_fetch_return, X}, State}

FetchFn contract: (PeerId) -> {ok, PeerAS} | {error, Reason}.
Failed fetches do NOT poison the cache so callers can retry on
transient HTTP failures.

gen_server wrapper (registered name peer_actors):
  start_link/0,1   start_link/1 accepts initial proplist for fixtures
  stop/0
  lookup_srv/1
  store_srv/2
  lookup_or_fetch_srv/2
  peers_srv/0
  evict_srv/1

handle_call dispatches mirror the pure-fn paths exactly.

The actual HTTP-GET fetch implementation (peer's actor doc -> peer
AS proplist) is Step 5d's responsibility — for 5c, FetchFn is just
the contract callers fill in.

19/19 in next/tests/peer_actors.sh:
  - new/0 -> []
  - lookup miss -> not_found
  - store + lookup round-trip
  - peers/1 in insertion order
  - evict + evict-unknown no-op
  - lookup_or_fetch miss invokes FetchFn, hits cache after
  - lookup_or_fetch hit skips FetchFn (verified by tombstone fn)
  - fetch error preserves cache state
  - bad fetch return shape captured
  - gen_server start_link + miss/hit/fetch/evict round-trips
  - start_link/1 pre-populates cache from initial state

Conformance 761/761. 139/139 across 9 Step-5-adjacent suites
(inbox_pipeline, inbox_bucket, pipeline_signature, registry_server,
projection_server, nx_kernel_multi, bootstrap_start, http_publish,
smoke_app_pure, plus the new peer_actors).
This commit is contained in:
2026-06-06 16:36:19 +00:00
parent d103ecb863
commit d481af5791
3 changed files with 334 additions and 5 deletions

140
next/kernel/peer_actors.erl Normal file
View File

@@ -0,0 +1,140 @@
-module(peer_actors).
-export([new/0, lookup/2, store/3, evict/2, peers/1,
lookup_or_fetch/3,
start_link/0, start_link/1, stop/0,
lookup_srv/1, store_srv/2, lookup_or_fetch_srv/2,
peers_srv/0, evict_srv/1]).
-export([init/1, handle_call/3, handle_cast/2, handle_info/2]).
-behaviour(gen_server).
%% Peer-actors cache. On first inbound from a new peer, the
%% federation layer needs the peer's `:public_keys` (and eventually
%% other actor-doc fields) to verify the inbound signature. Fetching
%% the peer's actor doc on every inbound would be wasteful, so we
%% cache the peer-AS keyed by ActorId atom. Per design §13.6 stale-
%% key invalidation defers to v3 — for v2 entries are TTL-free.
%%
%% State shape (pure-functional):
%% [{PeerActorId, PeerActorState}, ...]
%%
%% PeerActorState is the same shape that envelope:verify_signature/2
%% reads — a proplist with :public_keys (a list of key proplists).
%%
%% lookup_or_fetch/3 is the load-bearing entry point: a miss invokes
%% the caller-supplied FetchFn (1-arity, takes PeerActorId, returns
%% {ok, PeerAS} | {error, Reason}). The cache stores successful
%% fetches; errors do NOT poison the cache so the caller can retry.
%%
%% gen_server wrapper exposes the same API for the http inbox
%% handler. Tests inline start_link with operations (same port quirks
%% as registry / projection / nx_kernel).
%% ── Pure-functional API ─────────────────────────────────────────
new() -> [].
lookup(PeerId, State) ->
case find_keyed(PeerId, State) of
{ok, PeerAS} -> {ok, PeerAS};
{error, _} -> not_found
end.
store(PeerId, PeerAS, State) ->
set_keyed(PeerId, PeerAS, State).
evict(PeerId, State) ->
delete_keyed(PeerId, State).
peers(State) -> [Id || {Id, _AS} <- State].
%% lookup_or_fetch/3 — cache hit returns {ok, PeerAS, State}
%% unchanged. Cache miss calls FetchFn; success path stores and
%% returns {ok, PeerAS, NewState}; failure returns {error, Reason,
%% State} so the caller knows the cache state and can retry on
%% transient errors.
lookup_or_fetch(PeerId, FetchFn, State) ->
case find_keyed(PeerId, State) of
{ok, PeerAS} -> {ok, PeerAS, State};
{error, _} ->
case FetchFn(PeerId) of
{ok, PeerAS} -> {ok, PeerAS, store(PeerId, PeerAS, State)};
{error, Reason} -> {error, Reason, State};
Other -> {error, {bad_fetch_return, Other}, State}
end
end.
%% ── gen_server wrapper ──────────────────────────────────────────
%%
%% Mirrors registry / projection / nx_kernel patterns. Registered
%% name `peer_actors` so callers (http_server inbox handler) can
%% find it without threading the Pid through Cfg.
start_link() ->
start_link([]).
start_link(InitialState) ->
Pid = gen_server:start_link(peer_actors, [InitialState]),
erlang:register(peer_actors, Pid),
Pid.
stop() ->
R = gen_server:call(peer_actors, '$gen_stop'),
erlang:unregister(peer_actors),
R.
lookup_srv(PeerId) ->
gen_server:call(peer_actors, {lookup, PeerId}).
store_srv(PeerId, PeerAS) ->
gen_server:call(peer_actors, {store, PeerId, PeerAS}).
%% lookup_or_fetch_srv/2 — same shape as the pure form. FetchFn must
%% be a 1-arity fun. Reply is {ok, PeerAS} on hit-or-fetched,
%% {error, Reason} on fetch failure.
lookup_or_fetch_srv(PeerId, FetchFn) ->
gen_server:call(peer_actors, {lookup_or_fetch, PeerId, FetchFn}).
peers_srv() ->
gen_server:call(peer_actors, get_peers).
evict_srv(PeerId) ->
gen_server:call(peer_actors, {evict, PeerId}).
%% gen_server callbacks
init([InitialState]) ->
{ok, InitialState}.
handle_call({lookup, PeerId}, _From, State) ->
{reply, lookup(PeerId, State), State};
handle_call({store, PeerId, PeerAS}, _From, State) ->
{reply, ok, store(PeerId, PeerAS, State)};
handle_call({lookup_or_fetch, PeerId, FetchFn}, _From, State) ->
case lookup_or_fetch(PeerId, FetchFn, State) of
{ok, PeerAS, NewState} -> {reply, {ok, PeerAS}, NewState};
{error, Reason, SameState} -> {reply, {error, Reason}, SameState}
end;
handle_call(get_peers, _From, State) ->
{reply, peers(State), State};
handle_call({evict, PeerId}, _From, State) ->
{reply, ok, evict(PeerId, State)}.
handle_cast(_, S) -> {noreply, S}.
handle_info(_, S) -> {noreply, S}.
%% ── Internal helpers ────────────────────────────────────────────
find_keyed(_, []) -> {error, not_found};
find_keyed(K, [{K, V} | _]) -> {ok, V};
find_keyed(K, [_ | Rest]) -> find_keyed(K, Rest).
set_keyed(K, V, []) -> [{K, V}];
set_keyed(K, V, [{K, _} | Rest]) -> [{K, V} | Rest];
set_keyed(K, V, [P | Rest]) -> [P | set_keyed(K, V, Rest)].
delete_keyed(_, []) -> [];
delete_keyed(K, [{K, _} | Rest]) -> Rest;
delete_keyed(K, [P | Rest]) -> [P | delete_keyed(K, Rest)].

165
next/tests/peer_actors.sh Executable file
View File

@@ -0,0 +1,165 @@
#!/usr/bin/env bash
# next/tests/peer_actors.sh — m2 Step 5c test.
#
# Peer-actors cache for the federation inbox handler. Tracks
# {PeerActorId, PeerActorState} pairs so signature verification
# can be done against a peer's :public_keys without re-fetching
# their actor doc on every inbound. lookup_or_fetch/3 is the
# load-bearing entry point: cache hit returns cached AS, miss
# invokes the caller-supplied FetchFn and stores its result.
set -uo pipefail
cd "$(git rev-parse --show-toplevel)"
SX_SERVER="${SX_SERVER:-hosts/ocaml/_build/default/bin/sx_server.exe}"
if [ ! -x "$SX_SERVER" ]; then
SX_SERVER="/root/rose-ash/hosts/ocaml/_build/default/bin/sx_server.exe"
fi
if [ ! -x "$SX_SERVER" ]; then
echo "ERROR: sx_server.exe not found." >&2
exit 1
fi
VERBOSE="${1:-}"
PASS=0; FAIL=0; ERRORS=""
TMPFILE=$(mktemp); trap "rm -f $TMPFILE" EXIT
SETUP='K1 = <<1,2,3,4>>, BobAS = [{public_keys,[[{id,k1},{created,0},{value,K1}]]}], K2 = <<5,6,7,8>>, CarolAS = [{public_keys,[[{id,k1},{created,0},{value,K2}]]}], OkFetch = fun(bob) -> {ok, BobAS}; (carol) -> {ok, CarolAS}; (_) -> {error, not_found} end,'
cat > "$TMPFILE" <<EPOCHS
(epoch 1)
(load "lib/erlang/tokenizer.sx")
(load "lib/erlang/parser.sx")
(load "lib/erlang/parser-core.sx")
(load "lib/erlang/parser-expr.sx")
(load "lib/erlang/parser-module.sx")
(load "lib/erlang/transpile.sx")
(load "lib/erlang/runtime.sx")
(load "lib/erlang/vm/dispatcher.sx")
(epoch 2)
(eval "(er-load-gen-server!)")
(epoch 3)
(eval "(get (erlang-load-module (file-read \"next/kernel/peer_actors.erl\")) :name)")
;; new/0 returns []
(epoch 10)
(eval "(get (erlang-eval-ast \"peer_actors:new() =:= []\") :name)")
;; lookup on empty cache returns not_found
(epoch 11)
(eval "(get (erlang-eval-ast \"peer_actors:lookup(bob, peer_actors:new()) =:= not_found\") :name)")
;; store + lookup round-trip
(epoch 12)
(eval "(get (erlang-eval-ast \"${SETUP} S = peer_actors:store(bob, BobAS, peer_actors:new()), peer_actors:lookup(bob, S) =:= {ok, BobAS}\") :name)")
;; peers/1 lists cached peer IDs in insertion order
(epoch 13)
(eval "(get (erlang-eval-ast \"${SETUP} S0 = peer_actors:new(), S1 = peer_actors:store(bob, BobAS, S0), S2 = peer_actors:store(carol, CarolAS, S1), peer_actors:peers(S2) =:= [bob, carol]\") :name)")
;; evict removes the entry
(epoch 14)
(eval "(get (erlang-eval-ast \"${SETUP} S0 = peer_actors:store(bob, BobAS, peer_actors:new()), S1 = peer_actors:evict(bob, S0), peer_actors:lookup(bob, S1) =:= not_found\") :name)")
;; evict unknown peer is a no-op
(epoch 15)
(eval "(get (erlang-eval-ast \"peer_actors:evict(ghost, peer_actors:new()) =:= []\") :name)")
;; lookup_or_fetch miss invokes FetchFn and stores the result
(epoch 16)
(eval "(get (erlang-eval-ast \"${SETUP} case peer_actors:lookup_or_fetch(bob, OkFetch, peer_actors:new()) of {ok, BobAS, [{bob, BobAS}]} -> ok; _ -> bad end\") :name)")
;; lookup_or_fetch hit returns cached value without invoking FetchFn
(epoch 17)
(eval "(get (erlang-eval-ast \"${SETUP} TombstoneFetch = fun(_) -> {error, should_not_be_called} end, S = peer_actors:store(bob, BobAS, peer_actors:new()), case peer_actors:lookup_or_fetch(bob, TombstoneFetch, S) of {ok, BobAS, S} -> ok; _ -> bad end\") :name)")
;; lookup_or_fetch error from FetchFn does NOT store anything
(epoch 18)
(eval "(get (erlang-eval-ast \"${SETUP} BadFetch = fun(_) -> {error, http_404} end, case peer_actors:lookup_or_fetch(ghost, BadFetch, peer_actors:new()) of {error, http_404, []} -> ok; _ -> bad end\") :name)")
;; lookup_or_fetch bad return shape is captured
(epoch 19)
(eval "(get (erlang-eval-ast \"${SETUP} JunkFetch = fun(_) -> garbage end, case peer_actors:lookup_or_fetch(ghost, JunkFetch, peer_actors:new()) of {error, {bad_fetch_return, garbage}, []} -> ok; _ -> bad end\") :name)")
;; gen_server: start_link + lookup_srv miss returns not_found
(epoch 20)
(eval "(get (erlang-eval-ast \"peer_actors:start_link(), peer_actors:lookup_srv(bob) =:= not_found\") :name)")
;; gen_server: store_srv + lookup_srv round-trip
(epoch 21)
(eval "(get (erlang-eval-ast \"${SETUP} peer_actors:start_link(), peer_actors:store_srv(bob, BobAS), peer_actors:lookup_srv(bob) =:= {ok, BobAS}\") :name)")
;; gen_server: peers_srv reflects stored entries
(epoch 22)
(eval "(get (erlang-eval-ast \"${SETUP} peer_actors:start_link(), peer_actors:store_srv(bob, BobAS), peer_actors:store_srv(carol, CarolAS), peer_actors:peers_srv() =:= [bob, carol]\") :name)")
;; gen_server: lookup_or_fetch_srv miss invokes FetchFn + caches
(epoch 23)
(eval "(get (erlang-eval-ast \"${SETUP} peer_actors:start_link(), R = peer_actors:lookup_or_fetch_srv(bob, OkFetch), R =:= {ok, BobAS} andalso peer_actors:peers_srv() =:= [bob]\") :name)")
;; gen_server: subsequent lookup uses cached value (FetchFn would error)
(epoch 24)
(eval "(get (erlang-eval-ast \"${SETUP} TombstoneFetch = fun(_) -> {error, should_not_be_called} end, peer_actors:start_link(), peer_actors:store_srv(bob, BobAS), R = peer_actors:lookup_or_fetch_srv(bob, TombstoneFetch), R =:= {ok, BobAS}\") :name)")
;; gen_server: fetch error doesn't poison cache
(epoch 25)
(eval "(get (erlang-eval-ast \"${SETUP} BadFetch = fun(_) -> {error, http_404} end, peer_actors:start_link(), R = peer_actors:lookup_or_fetch_srv(ghost, BadFetch), R =:= {error, http_404} andalso peer_actors:peers_srv() =:= []\") :name)")
;; gen_server: evict_srv removes the entry
(epoch 26)
(eval "(get (erlang-eval-ast \"${SETUP} peer_actors:start_link(), peer_actors:store_srv(bob, BobAS), peer_actors:evict_srv(bob), peer_actors:lookup_srv(bob) =:= not_found\") :name)")
;; Initial-state argument: start_link/1 pre-populates the cache
(epoch 27)
(eval "(get (erlang-eval-ast \"${SETUP} peer_actors:start_link([{bob, BobAS}]), peer_actors:lookup_srv(bob) =:= {ok, BobAS}\") :name)")
EPOCHS
OUTPUT=$(timeout 240 "$SX_SERVER" < "$TMPFILE" 2>/dev/null)
check() {
local epoch="$1" desc="$2" expected="$3"
local actual
actual=$(echo "$OUTPUT" | awk -v e="$epoch" '
$0 ~ "^\\(ok-len " e " " { getline; print; exit }
$0 ~ "^\\(ok " e " " { print; exit }
$0 ~ "^\\(error " e " " { print; exit }
')
[ -z "$actual" ] && actual="<no output for epoch $epoch>"
if echo "$actual" | grep -qF -- "$expected"; then
PASS=$((PASS+1))
[ "$VERBOSE" = "-v" ] && echo " ok $desc"
else
FAIL=$((FAIL+1))
ERRORS+=" FAIL [$desc] (epoch $epoch) expected: $expected | actual: $actual
"
fi
}
check 3 "peer_actors module loaded" "peer_actors"
check 10 "new/0 -> []" "true"
check 11 "lookup on empty -> not_found" "true"
check 12 "store + lookup round-trip" "true"
check 13 "peers/1 lists in insertion order" "true"
check 14 "evict removes entry" "true"
check 15 "evict unknown -> no-op" "true"
check 16 "lookup_or_fetch miss fetches" "ok"
check 17 "lookup_or_fetch hit skips fetch" "ok"
check 18 "fetch error doesn't store" "ok"
check 19 "bad fetch return shape captured" "ok"
check 20 "gen_server lookup miss" "true"
check 21 "gen_server store + lookup" "true"
check 22 "gen_server peers_srv lists" "true"
check 23 "gen_server fetch + cache" "true"
check 24 "gen_server cached skips fetch" "true"
check 25 "gen_server fetch error pristine" "true"
check 26 "gen_server evict removes" "true"
check 27 "start_link/1 pre-populates" "true"
TOTAL=$((PASS+FAIL))
if [ $FAIL -eq 0 ]; then
echo "ok $PASS/$TOTAL next/tests/peer_actors.sh passed"
else
echo "FAIL $PASS/$TOTAL passed, $FAIL failed:"
echo "$ERRORS"
fi
[ $FAIL -eq 0 ]