Files
rose-ash/next/kernel/peer_actors.erl
giles d481af5791
Some checks failed
Test, Build, and Deploy / test-build-deploy (push) Failing after 34s
fed-sx-m2: Step 5c — peer-actors cache + 19 tests
New next/kernel/peer_actors.erl is the federation-side cache for
{PeerActorId, PeerActorState} entries. PeerAS is exactly the shape
envelope:verify_signature/2 reads (proplist with :public_keys), so
the inbox handler can pipe the cache hit straight into
pipeline:validate_inbound/3 from Step 5b.

Pure-functional API:
  new/0
  lookup/2(PeerId, State) -> {ok, PeerAS} | not_found
  store/3(PeerId, PeerAS, State) -> NewState
  evict/2(PeerId, State) -> NewState
  peers/1(State) -> [PeerId]
  lookup_or_fetch/3(PeerId, FetchFn, State)
      -> {ok, PeerAS, NewState}      cache hit returns unchanged State,
                                     miss stores FetchFn result.
      | {error, Reason, State}        FetchFn failure preserves cache.
      | {error, {bad_fetch_return, X}, State}

FetchFn contract: (PeerId) -> {ok, PeerAS} | {error, Reason}.
Failed fetches do NOT poison the cache so callers can retry on
transient HTTP failures.

gen_server wrapper (registered name peer_actors):
  start_link/0,1   start_link/1 accepts initial proplist for fixtures
  stop/0
  lookup_srv/1
  store_srv/2
  lookup_or_fetch_srv/2
  peers_srv/0
  evict_srv/1

handle_call dispatches mirror the pure-fn paths exactly.

The actual HTTP-GET fetch implementation (peer's actor doc -> peer
AS proplist) is Step 5d's responsibility — for 5c, FetchFn is just
the contract callers fill in.

19/19 in next/tests/peer_actors.sh:
  - new/0 -> []
  - lookup miss -> not_found
  - store + lookup round-trip
  - peers/1 in insertion order
  - evict + evict-unknown no-op
  - lookup_or_fetch miss invokes FetchFn, hits cache after
  - lookup_or_fetch hit skips FetchFn (verified by tombstone fn)
  - fetch error preserves cache state
  - bad fetch return shape captured
  - gen_server start_link + miss/hit/fetch/evict round-trips
  - start_link/1 pre-populates cache from initial state

Conformance 761/761. 139/139 across 9 Step-5-adjacent suites
(inbox_pipeline, inbox_bucket, pipeline_signature, registry_server,
projection_server, nx_kernel_multi, bootstrap_start, http_publish,
smoke_app_pure, plus the new peer_actors).
2026-06-06 16:36:19 +00:00

141 lines
5.0 KiB
Erlang

-module(peer_actors).
-export([new/0, lookup/2, store/3, evict/2, peers/1,
lookup_or_fetch/3,
start_link/0, start_link/1, stop/0,
lookup_srv/1, store_srv/2, lookup_or_fetch_srv/2,
peers_srv/0, evict_srv/1]).
-export([init/1, handle_call/3, handle_cast/2, handle_info/2]).
-behaviour(gen_server).
%% Peer-actors cache. On first inbound from a new peer, the
%% federation layer needs the peer's `:public_keys` (and eventually
%% other actor-doc fields) to verify the inbound signature. Fetching
%% the peer's actor doc on every inbound would be wasteful, so we
%% cache the peer-AS keyed by ActorId atom. Per design §13.6 stale-
%% key invalidation defers to v3 — for v2 entries are TTL-free.
%%
%% State shape (pure-functional):
%% [{PeerActorId, PeerActorState}, ...]
%%
%% PeerActorState is the same shape that envelope:verify_signature/2
%% reads — a proplist with :public_keys (a list of key proplists).
%%
%% lookup_or_fetch/3 is the load-bearing entry point: a miss invokes
%% the caller-supplied FetchFn (1-arity, takes PeerActorId, returns
%% {ok, PeerAS} | {error, Reason}). The cache stores successful
%% fetches; errors do NOT poison the cache so the caller can retry.
%%
%% gen_server wrapper exposes the same API for the http inbox
%% handler. Tests inline start_link with operations (same port quirks
%% as registry / projection / nx_kernel).
%% ── Pure-functional API ─────────────────────────────────────────
new() -> [].
lookup(PeerId, State) ->
case find_keyed(PeerId, State) of
{ok, PeerAS} -> {ok, PeerAS};
{error, _} -> not_found
end.
store(PeerId, PeerAS, State) ->
set_keyed(PeerId, PeerAS, State).
evict(PeerId, State) ->
delete_keyed(PeerId, State).
peers(State) -> [Id || {Id, _AS} <- State].
%% lookup_or_fetch/3 — cache hit returns {ok, PeerAS, State}
%% unchanged. Cache miss calls FetchFn; success path stores and
%% returns {ok, PeerAS, NewState}; failure returns {error, Reason,
%% State} so the caller knows the cache state and can retry on
%% transient errors.
lookup_or_fetch(PeerId, FetchFn, State) ->
case find_keyed(PeerId, State) of
{ok, PeerAS} -> {ok, PeerAS, State};
{error, _} ->
case FetchFn(PeerId) of
{ok, PeerAS} -> {ok, PeerAS, store(PeerId, PeerAS, State)};
{error, Reason} -> {error, Reason, State};
Other -> {error, {bad_fetch_return, Other}, State}
end
end.
%% ── gen_server wrapper ──────────────────────────────────────────
%%
%% Mirrors registry / projection / nx_kernel patterns. Registered
%% name `peer_actors` so callers (http_server inbox handler) can
%% find it without threading the Pid through Cfg.
start_link() ->
start_link([]).
start_link(InitialState) ->
Pid = gen_server:start_link(peer_actors, [InitialState]),
erlang:register(peer_actors, Pid),
Pid.
stop() ->
R = gen_server:call(peer_actors, '$gen_stop'),
erlang:unregister(peer_actors),
R.
lookup_srv(PeerId) ->
gen_server:call(peer_actors, {lookup, PeerId}).
store_srv(PeerId, PeerAS) ->
gen_server:call(peer_actors, {store, PeerId, PeerAS}).
%% lookup_or_fetch_srv/2 — same shape as the pure form. FetchFn must
%% be a 1-arity fun. Reply is {ok, PeerAS} on hit-or-fetched,
%% {error, Reason} on fetch failure.
lookup_or_fetch_srv(PeerId, FetchFn) ->
gen_server:call(peer_actors, {lookup_or_fetch, PeerId, FetchFn}).
peers_srv() ->
gen_server:call(peer_actors, get_peers).
evict_srv(PeerId) ->
gen_server:call(peer_actors, {evict, PeerId}).
%% gen_server callbacks
init([InitialState]) ->
{ok, InitialState}.
handle_call({lookup, PeerId}, _From, State) ->
{reply, lookup(PeerId, State), State};
handle_call({store, PeerId, PeerAS}, _From, State) ->
{reply, ok, store(PeerId, PeerAS, State)};
handle_call({lookup_or_fetch, PeerId, FetchFn}, _From, State) ->
case lookup_or_fetch(PeerId, FetchFn, State) of
{ok, PeerAS, NewState} -> {reply, {ok, PeerAS}, NewState};
{error, Reason, SameState} -> {reply, {error, Reason}, SameState}
end;
handle_call(get_peers, _From, State) ->
{reply, peers(State), State};
handle_call({evict, PeerId}, _From, State) ->
{reply, ok, evict(PeerId, State)}.
handle_cast(_, S) -> {noreply, S}.
handle_info(_, S) -> {noreply, S}.
%% ── Internal helpers ────────────────────────────────────────────
find_keyed(_, []) -> {error, not_found};
find_keyed(K, [{K, V} | _]) -> {ok, V};
find_keyed(K, [_ | Rest]) -> find_keyed(K, Rest).
set_keyed(K, V, []) -> [{K, V}];
set_keyed(K, V, [{K, _} | Rest]) -> [{K, V} | Rest];
set_keyed(K, V, [P | Rest]) -> [P | set_keyed(K, V, Rest)].
delete_keyed(_, []) -> [];
delete_keyed(K, [{K, _} | Rest]) -> Rest;
delete_keyed(K, [P | Rest]) -> [P | delete_keyed(K, Rest)].