From d481af57916d7b3dd5f11f182ca7e1cb60f412f7 Mon Sep 17 00:00:00 2001 From: giles Date: Sat, 6 Jun 2026 16:36:19 +0000 Subject: [PATCH] =?UTF-8?q?fed-sx-m2:=20Step=205c=20=E2=80=94=20peer-actor?= =?UTF-8?q?s=20cache=20+=2019=20tests?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit New next/kernel/peer_actors.erl is the federation-side cache for {PeerActorId, PeerActorState} entries. PeerAS is exactly the shape envelope:verify_signature/2 reads (proplist with :public_keys), so the inbox handler can pipe the cache hit straight into pipeline:validate_inbound/3 from Step 5b. Pure-functional API: new/0 lookup/2(PeerId, State) -> {ok, PeerAS} | not_found store/3(PeerId, PeerAS, State) -> NewState evict/2(PeerId, State) -> NewState peers/1(State) -> [PeerId] lookup_or_fetch/3(PeerId, FetchFn, State) -> {ok, PeerAS, NewState} cache hit returns unchanged State, miss stores FetchFn result. | {error, Reason, State} FetchFn failure preserves cache. | {error, {bad_fetch_return, X}, State} FetchFn contract: (PeerId) -> {ok, PeerAS} | {error, Reason}. Failed fetches do NOT poison the cache so callers can retry on transient HTTP failures. gen_server wrapper (registered name peer_actors): start_link/0,1 start_link/1 accepts initial proplist for fixtures stop/0 lookup_srv/1 store_srv/2 lookup_or_fetch_srv/2 peers_srv/0 evict_srv/1 handle_call dispatches mirror the pure-fn paths exactly. The actual HTTP-GET fetch implementation (peer's actor doc -> peer AS proplist) is Step 5d's responsibility — for 5c, FetchFn is just the contract callers fill in. 19/19 in next/tests/peer_actors.sh: - new/0 -> [] - lookup miss -> not_found - store + lookup round-trip - peers/1 in insertion order - evict + evict-unknown no-op - lookup_or_fetch miss invokes FetchFn, hits cache after - lookup_or_fetch hit skips FetchFn (verified by tombstone fn) - fetch error preserves cache state - bad fetch return shape captured - gen_server start_link + miss/hit/fetch/evict round-trips - start_link/1 pre-populates cache from initial state Conformance 761/761. 139/139 across 9 Step-5-adjacent suites (inbox_pipeline, inbox_bucket, pipeline_signature, registry_server, projection_server, nx_kernel_multi, bootstrap_start, http_publish, smoke_app_pure, plus the new peer_actors). --- next/kernel/peer_actors.erl | 140 ++++++++++++++++++++++++++++++ next/tests/peer_actors.sh | 165 ++++++++++++++++++++++++++++++++++++ plans/fed-sx-milestone-2.md | 34 ++++++-- 3 files changed, 334 insertions(+), 5 deletions(-) create mode 100644 next/kernel/peer_actors.erl create mode 100755 next/tests/peer_actors.sh diff --git a/next/kernel/peer_actors.erl b/next/kernel/peer_actors.erl new file mode 100644 index 00000000..a7a7d821 --- /dev/null +++ b/next/kernel/peer_actors.erl @@ -0,0 +1,140 @@ +-module(peer_actors). +-export([new/0, lookup/2, store/3, evict/2, peers/1, + lookup_or_fetch/3, + start_link/0, start_link/1, stop/0, + lookup_srv/1, store_srv/2, lookup_or_fetch_srv/2, + peers_srv/0, evict_srv/1]). +-export([init/1, handle_call/3, handle_cast/2, handle_info/2]). +-behaviour(gen_server). + +%% Peer-actors cache. On first inbound from a new peer, the +%% federation layer needs the peer's `:public_keys` (and eventually +%% other actor-doc fields) to verify the inbound signature. Fetching +%% the peer's actor doc on every inbound would be wasteful, so we +%% cache the peer-AS keyed by ActorId atom. Per design §13.6 stale- +%% key invalidation defers to v3 — for v2 entries are TTL-free. +%% +%% State shape (pure-functional): +%% [{PeerActorId, PeerActorState}, ...] +%% +%% PeerActorState is the same shape that envelope:verify_signature/2 +%% reads — a proplist with :public_keys (a list of key proplists). +%% +%% lookup_or_fetch/3 is the load-bearing entry point: a miss invokes +%% the caller-supplied FetchFn (1-arity, takes PeerActorId, returns +%% {ok, PeerAS} | {error, Reason}). The cache stores successful +%% fetches; errors do NOT poison the cache so the caller can retry. +%% +%% gen_server wrapper exposes the same API for the http inbox +%% handler. Tests inline start_link with operations (same port quirks +%% as registry / projection / nx_kernel). + +%% ── Pure-functional API ───────────────────────────────────────── + +new() -> []. + +lookup(PeerId, State) -> + case find_keyed(PeerId, State) of + {ok, PeerAS} -> {ok, PeerAS}; + {error, _} -> not_found + end. + +store(PeerId, PeerAS, State) -> + set_keyed(PeerId, PeerAS, State). + +evict(PeerId, State) -> + delete_keyed(PeerId, State). + +peers(State) -> [Id || {Id, _AS} <- State]. + +%% lookup_or_fetch/3 — cache hit returns {ok, PeerAS, State} +%% unchanged. Cache miss calls FetchFn; success path stores and +%% returns {ok, PeerAS, NewState}; failure returns {error, Reason, +%% State} so the caller knows the cache state and can retry on +%% transient errors. + +lookup_or_fetch(PeerId, FetchFn, State) -> + case find_keyed(PeerId, State) of + {ok, PeerAS} -> {ok, PeerAS, State}; + {error, _} -> + case FetchFn(PeerId) of + {ok, PeerAS} -> {ok, PeerAS, store(PeerId, PeerAS, State)}; + {error, Reason} -> {error, Reason, State}; + Other -> {error, {bad_fetch_return, Other}, State} + end + end. + +%% ── gen_server wrapper ────────────────────────────────────────── +%% +%% Mirrors registry / projection / nx_kernel patterns. Registered +%% name `peer_actors` so callers (http_server inbox handler) can +%% find it without threading the Pid through Cfg. + +start_link() -> + start_link([]). + +start_link(InitialState) -> + Pid = gen_server:start_link(peer_actors, [InitialState]), + erlang:register(peer_actors, Pid), + Pid. + +stop() -> + R = gen_server:call(peer_actors, '$gen_stop'), + erlang:unregister(peer_actors), + R. + +lookup_srv(PeerId) -> + gen_server:call(peer_actors, {lookup, PeerId}). + +store_srv(PeerId, PeerAS) -> + gen_server:call(peer_actors, {store, PeerId, PeerAS}). + +%% lookup_or_fetch_srv/2 — same shape as the pure form. FetchFn must +%% be a 1-arity fun. Reply is {ok, PeerAS} on hit-or-fetched, +%% {error, Reason} on fetch failure. + +lookup_or_fetch_srv(PeerId, FetchFn) -> + gen_server:call(peer_actors, {lookup_or_fetch, PeerId, FetchFn}). + +peers_srv() -> + gen_server:call(peer_actors, get_peers). + +evict_srv(PeerId) -> + gen_server:call(peer_actors, {evict, PeerId}). + +%% gen_server callbacks + +init([InitialState]) -> + {ok, InitialState}. + +handle_call({lookup, PeerId}, _From, State) -> + {reply, lookup(PeerId, State), State}; +handle_call({store, PeerId, PeerAS}, _From, State) -> + {reply, ok, store(PeerId, PeerAS, State)}; +handle_call({lookup_or_fetch, PeerId, FetchFn}, _From, State) -> + case lookup_or_fetch(PeerId, FetchFn, State) of + {ok, PeerAS, NewState} -> {reply, {ok, PeerAS}, NewState}; + {error, Reason, SameState} -> {reply, {error, Reason}, SameState} + end; +handle_call(get_peers, _From, State) -> + {reply, peers(State), State}; +handle_call({evict, PeerId}, _From, State) -> + {reply, ok, evict(PeerId, State)}. + +handle_cast(_, S) -> {noreply, S}. + +handle_info(_, S) -> {noreply, S}. + +%% ── Internal helpers ──────────────────────────────────────────── + +find_keyed(_, []) -> {error, not_found}; +find_keyed(K, [{K, V} | _]) -> {ok, V}; +find_keyed(K, [_ | Rest]) -> find_keyed(K, Rest). + +set_keyed(K, V, []) -> [{K, V}]; +set_keyed(K, V, [{K, _} | Rest]) -> [{K, V} | Rest]; +set_keyed(K, V, [P | Rest]) -> [P | set_keyed(K, V, Rest)]. + +delete_keyed(_, []) -> []; +delete_keyed(K, [{K, _} | Rest]) -> Rest; +delete_keyed(K, [P | Rest]) -> [P | delete_keyed(K, Rest)]. diff --git a/next/tests/peer_actors.sh b/next/tests/peer_actors.sh new file mode 100755 index 00000000..ae453d60 --- /dev/null +++ b/next/tests/peer_actors.sh @@ -0,0 +1,165 @@ +#!/usr/bin/env bash +# next/tests/peer_actors.sh — m2 Step 5c test. +# +# Peer-actors cache for the federation inbox handler. Tracks +# {PeerActorId, PeerActorState} pairs so signature verification +# can be done against a peer's :public_keys without re-fetching +# their actor doc on every inbound. lookup_or_fetch/3 is the +# load-bearing entry point: cache hit returns cached AS, miss +# invokes the caller-supplied FetchFn and stores its result. + +set -uo pipefail +cd "$(git rev-parse --show-toplevel)" + +SX_SERVER="${SX_SERVER:-hosts/ocaml/_build/default/bin/sx_server.exe}" +if [ ! -x "$SX_SERVER" ]; then + SX_SERVER="/root/rose-ash/hosts/ocaml/_build/default/bin/sx_server.exe" +fi +if [ ! -x "$SX_SERVER" ]; then + echo "ERROR: sx_server.exe not found." >&2 + exit 1 +fi + +VERBOSE="${1:-}" +PASS=0; FAIL=0; ERRORS="" +TMPFILE=$(mktemp); trap "rm -f $TMPFILE" EXIT + +SETUP='K1 = <<1,2,3,4>>, BobAS = [{public_keys,[[{id,k1},{created,0},{value,K1}]]}], K2 = <<5,6,7,8>>, CarolAS = [{public_keys,[[{id,k1},{created,0},{value,K2}]]}], OkFetch = fun(bob) -> {ok, BobAS}; (carol) -> {ok, CarolAS}; (_) -> {error, not_found} end,' + +cat > "$TMPFILE" < ok; _ -> bad end\") :name)") + +;; lookup_or_fetch hit returns cached value without invoking FetchFn +(epoch 17) +(eval "(get (erlang-eval-ast \"${SETUP} TombstoneFetch = fun(_) -> {error, should_not_be_called} end, S = peer_actors:store(bob, BobAS, peer_actors:new()), case peer_actors:lookup_or_fetch(bob, TombstoneFetch, S) of {ok, BobAS, S} -> ok; _ -> bad end\") :name)") + +;; lookup_or_fetch error from FetchFn does NOT store anything +(epoch 18) +(eval "(get (erlang-eval-ast \"${SETUP} BadFetch = fun(_) -> {error, http_404} end, case peer_actors:lookup_or_fetch(ghost, BadFetch, peer_actors:new()) of {error, http_404, []} -> ok; _ -> bad end\") :name)") + +;; lookup_or_fetch bad return shape is captured +(epoch 19) +(eval "(get (erlang-eval-ast \"${SETUP} JunkFetch = fun(_) -> garbage end, case peer_actors:lookup_or_fetch(ghost, JunkFetch, peer_actors:new()) of {error, {bad_fetch_return, garbage}, []} -> ok; _ -> bad end\") :name)") + +;; gen_server: start_link + lookup_srv miss returns not_found +(epoch 20) +(eval "(get (erlang-eval-ast \"peer_actors:start_link(), peer_actors:lookup_srv(bob) =:= not_found\") :name)") + +;; gen_server: store_srv + lookup_srv round-trip +(epoch 21) +(eval "(get (erlang-eval-ast \"${SETUP} peer_actors:start_link(), peer_actors:store_srv(bob, BobAS), peer_actors:lookup_srv(bob) =:= {ok, BobAS}\") :name)") + +;; gen_server: peers_srv reflects stored entries +(epoch 22) +(eval "(get (erlang-eval-ast \"${SETUP} peer_actors:start_link(), peer_actors:store_srv(bob, BobAS), peer_actors:store_srv(carol, CarolAS), peer_actors:peers_srv() =:= [bob, carol]\") :name)") + +;; gen_server: lookup_or_fetch_srv miss invokes FetchFn + caches +(epoch 23) +(eval "(get (erlang-eval-ast \"${SETUP} peer_actors:start_link(), R = peer_actors:lookup_or_fetch_srv(bob, OkFetch), R =:= {ok, BobAS} andalso peer_actors:peers_srv() =:= [bob]\") :name)") + +;; gen_server: subsequent lookup uses cached value (FetchFn would error) +(epoch 24) +(eval "(get (erlang-eval-ast \"${SETUP} TombstoneFetch = fun(_) -> {error, should_not_be_called} end, peer_actors:start_link(), peer_actors:store_srv(bob, BobAS), R = peer_actors:lookup_or_fetch_srv(bob, TombstoneFetch), R =:= {ok, BobAS}\") :name)") + +;; gen_server: fetch error doesn't poison cache +(epoch 25) +(eval "(get (erlang-eval-ast \"${SETUP} BadFetch = fun(_) -> {error, http_404} end, peer_actors:start_link(), R = peer_actors:lookup_or_fetch_srv(ghost, BadFetch), R =:= {error, http_404} andalso peer_actors:peers_srv() =:= []\") :name)") + +;; gen_server: evict_srv removes the entry +(epoch 26) +(eval "(get (erlang-eval-ast \"${SETUP} peer_actors:start_link(), peer_actors:store_srv(bob, BobAS), peer_actors:evict_srv(bob), peer_actors:lookup_srv(bob) =:= not_found\") :name)") + +;; Initial-state argument: start_link/1 pre-populates the cache +(epoch 27) +(eval "(get (erlang-eval-ast \"${SETUP} peer_actors:start_link([{bob, BobAS}]), peer_actors:lookup_srv(bob) =:= {ok, BobAS}\") :name)") +EPOCHS + +OUTPUT=$(timeout 240 "$SX_SERVER" < "$TMPFILE" 2>/dev/null) + +check() { + local epoch="$1" desc="$2" expected="$3" + local actual + actual=$(echo "$OUTPUT" | awk -v e="$epoch" ' + $0 ~ "^\\(ok-len " e " " { getline; print; exit } + $0 ~ "^\\(ok " e " " { print; exit } + $0 ~ "^\\(error " e " " { print; exit } + ') + [ -z "$actual" ] && actual="" + if echo "$actual" | grep -qF -- "$expected"; then + PASS=$((PASS+1)) + [ "$VERBOSE" = "-v" ] && echo " ok $desc" + else + FAIL=$((FAIL+1)) + ERRORS+=" FAIL [$desc] (epoch $epoch) expected: $expected | actual: $actual +" + fi +} + +check 3 "peer_actors module loaded" "peer_actors" +check 10 "new/0 -> []" "true" +check 11 "lookup on empty -> not_found" "true" +check 12 "store + lookup round-trip" "true" +check 13 "peers/1 lists in insertion order" "true" +check 14 "evict removes entry" "true" +check 15 "evict unknown -> no-op" "true" +check 16 "lookup_or_fetch miss fetches" "ok" +check 17 "lookup_or_fetch hit skips fetch" "ok" +check 18 "fetch error doesn't store" "ok" +check 19 "bad fetch return shape captured" "ok" +check 20 "gen_server lookup miss" "true" +check 21 "gen_server store + lookup" "true" +check 22 "gen_server peers_srv lists" "true" +check 23 "gen_server fetch + cache" "true" +check 24 "gen_server cached skips fetch" "true" +check 25 "gen_server fetch error pristine" "true" +check 26 "gen_server evict removes" "true" +check 27 "start_link/1 pre-populates" "true" + +TOTAL=$((PASS+FAIL)) +if [ $FAIL -eq 0 ]; then + echo "ok $PASS/$TOTAL next/tests/peer_actors.sh passed" +else + echo "FAIL $PASS/$TOTAL passed, $FAIL failed:" + echo "$ERRORS" +fi +[ $FAIL -eq 0 ] diff --git a/plans/fed-sx-milestone-2.md b/plans/fed-sx-milestone-2.md index 9541771a..423c13ef 100644 --- a/plans/fed-sx-milestone-2.md +++ b/plans/fed-sx-milestone-2.md @@ -360,11 +360,21 @@ actor *received*), and broadcasts to projections. (rejected by stage_envelope before sig runs), wrong peer AS (bad_signature), replay against inbox, distinct activities both verify, stage short-circuit ordering verified. -- [ ] **5c** — Peer-actors cache projection (`peer_actors.erl`): - on first inbound from a new peer, fetches the peer's actor doc - and caches the public-keys. v2: synchronous fetch via the - http-client native primitive. Per design §13.6, stale-key - invalidation is v3. +- [x] **5c** — Peer-actors cache (`peer_actors.erl`). State shape + `[{PeerActorId, PeerActorState}, ...]` keyed by atom; PeerAS is + exactly the shape `envelope:verify_signature/2` reads (proplist + with `:public_keys`). Pure exports: `new/0`, `lookup/2`, + `store/3`, `evict/2`, `peers/1`, and the load-bearing + `lookup_or_fetch/3(PeerId, FetchFn, State)` that calls the + caller-supplied `FetchFn :: (PeerId) -> {ok, PeerAS} | {error, _}` + on miss and stores the successful result. Failed fetches do NOT + poison the cache so callers can retry on transient errors. + gen_server wrapper: `start_link/0,1`, `lookup_srv/1`, + `store_srv/2`, `lookup_or_fetch_srv/2`, `peers_srv/0`, + `evict_srv/1`. `start_link/1` accepts an initial state proplist + for tests / fixtures. 19/19 in `peer_actors.sh`. The actual + fetch implementation (HTTP GET of the peer's actor doc) is + Step 5d's responsibility — for 5c, FetchFn is just a contract. - [ ] **5d** — http_server inbox handler wires the chain: `POST /actors//inbox` body is the signed activity wire bytes; parse → resolve peer-AS → `validate_inbound` → `append_inbox` → @@ -780,6 +790,20 @@ proceed. Newest first. +- **2026-06-06** — Step 5c: peer-actors cache (`peer_actors.erl`). + Pure-functional cache of `{PeerActorId, PeerAS}` entries with + the load-bearing `lookup_or_fetch/3(PeerId, FetchFn, State)` + entry: cache hit returns stored PeerAS unchanged; miss calls + `FetchFn(PeerId)`, stores success, returns `{ok, PeerAS, + NewState}`. Fetch errors don't poison the cache so callers can + retry on transient HTTP failures. gen_server wrapper exposes + the same shape under registered name `peer_actors`; + `start_link/1` accepts an initial proplist for tests. + Per-design v2 fetches are synchronous over plaintext HTTP; the + actual http-client call lands in Step 5d. 19/19 in + `peer_actors.sh`. Conformance 761/761. 139/139 across 9 + Step-5-adjacent suites. + - **2026-06-06** — Step 5b: federation inbound pipeline. `pipeline:validate_inbound/3(Activity, PeerAS, InboxLog)` runs `stage_envelope` → `stage_signature(PeerAS)` → `stage_replay(InboxLog)`