Some checks failed
Test, Build, and Deploy / test-build-deploy (push) Failing after 29s
140 lines
4.8 KiB
Erlang
140 lines
4.8 KiB
Erlang
-module(nx_kernel).
|
|
-behaviour(gen_server).
|
|
-export([new/3, publish/2,
|
|
actor_id/1, log_state/1, log_tip/1,
|
|
key_spec/1, actor_state/1, projections/1,
|
|
next_published/1, with_projections/2]).
|
|
-export([start_link/3, publish/1, query/0, log_tip/0,
|
|
with_projections/1, stop/0]).
|
|
-export([init/1, handle_call/3, handle_cast/2, handle_info/2]).
|
|
|
|
%% Kernel orchestrator — the long-lived runtime state held by the
|
|
%% running fed-sx instance. The HTTP layer (Step 8c-post-publish
|
|
%% follow-up) will park this in a gen_server and dispatch the POST
|
|
%% /activity request through `publish/2`.
|
|
%%
|
|
%% State shape (property list):
|
|
%% [{actor_id, A},
|
|
%% {key_spec, KS}, % proplist: key_id / algorithm / value
|
|
%% {actor_state, AS}, % proplist: public_keys
|
|
%% {log, L}, % log:open/2 return value
|
|
%% {projections, [Name]}, % list of registered projection process names
|
|
%% {next_published, N}] % monotonic counter we feed as :published
|
|
%%
|
|
%% Step 6c's stage_replay catches duplicates by `:id`; the `:id`
|
|
%% is derived from the unsigned envelope contents. Same Request +
|
|
%% same `:published` -> same CID, so the next_published counter
|
|
%% gives every publish a distinct timestamp without needing a
|
|
%% wall-clock BIF.
|
|
|
|
new(ActorId, KeySpec, ActorStateProplist) ->
|
|
{ok, L0} = log:open(ActorId, base_stub()),
|
|
[{actor_id, ActorId},
|
|
{key_spec, KeySpec},
|
|
{actor_state, ActorStateProplist},
|
|
{log, L0},
|
|
{projections, []},
|
|
{next_published, 1}].
|
|
|
|
%% publish/2 — pure state transition. Returns either:
|
|
%% {ok, Result, NewState} — log + counter advanced
|
|
%% {error, Reason, State} — state unchanged on validation halt
|
|
publish(Request, State) ->
|
|
P = field(next_published, State),
|
|
Ctx = [{actor_id, field(actor_id, State)},
|
|
{published, P},
|
|
{key_spec, field(key_spec, State)},
|
|
{actor_state, field(actor_state, State)},
|
|
{log, field(log, State)},
|
|
{projections, field(projections, State)}],
|
|
case outbox:publish(Request, Ctx) of
|
|
{ok, Result, NewLog} ->
|
|
State1 = set(log, NewLog, State),
|
|
State2 = set(next_published, P + 1, State1),
|
|
{ok, Result, State2};
|
|
{error, Reason, _} ->
|
|
{error, Reason, State}
|
|
end.
|
|
|
|
%% Accessors
|
|
|
|
actor_id(State) -> field(actor_id, State).
|
|
key_spec(State) -> field(key_spec, State).
|
|
actor_state(State) -> field(actor_state, State).
|
|
log_state(State) -> field(log, State).
|
|
log_tip(State) -> log:tip(field(log, State)).
|
|
projections(State) -> field(projections, State).
|
|
next_published(State) -> field(next_published, State).
|
|
|
|
%% with_projections — return a new state with :projections replaced.
|
|
with_projections(Names, State) ->
|
|
set(projections, Names, State).
|
|
|
|
%% Internal
|
|
|
|
%% "base_stub" — placeholder base path for the in-memory log
|
|
%% in v1 (the in-memory log ignores the base argument).
|
|
base_stub() ->
|
|
<<98,97,115,101,95,115,116,117,98>>.
|
|
|
|
field(K, [{K, V} | _]) -> V;
|
|
field(K, [_ | Rest]) -> field(K, Rest);
|
|
field(_, []) -> nil.
|
|
|
|
set(K, V, []) -> [{K, V}];
|
|
set(K, V, [{K, _} | Rest]) -> [{K, V} | Rest];
|
|
set(K, V, [P | Rest]) -> [P | set(K, V, Rest)].
|
|
|
|
%% ── gen_server wrapper ──────────────────────────────────────────
|
|
%%
|
|
%% Mirrors the registry / projection gen_server patterns from
|
|
%% Steps 5b and 7b. Same port quirks: raw Pid return, no `?MODULE`
|
|
%% macro, spawned processes don't persist across separate
|
|
%% erlang-eval-ast calls — tests inline start_link with operations.
|
|
|
|
start_link(ActorId, KeySpec, ActorStateProplist) ->
|
|
Pid = gen_server:start_link(nx_kernel,
|
|
[ActorId, KeySpec, ActorStateProplist]),
|
|
erlang:register(nx_kernel, Pid),
|
|
Pid.
|
|
|
|
stop() ->
|
|
R = gen_server:call(nx_kernel, '$gen_stop'),
|
|
erlang:unregister(nx_kernel),
|
|
R.
|
|
|
|
publish(Request) ->
|
|
gen_server:call(nx_kernel, {publish, Request}).
|
|
|
|
query() ->
|
|
gen_server:call(nx_kernel, get_state).
|
|
|
|
log_tip() ->
|
|
gen_server:call(nx_kernel, get_log_tip).
|
|
|
|
with_projections(Names) ->
|
|
gen_server:call(nx_kernel, {set_projections, Names}).
|
|
|
|
%% gen_server callbacks
|
|
|
|
init([ActorId, KeySpec, AS]) ->
|
|
{ok, new(ActorId, KeySpec, AS)}.
|
|
|
|
handle_call({publish, Request}, _From, State) ->
|
|
case publish(Request, State) of
|
|
{ok, Result, NewState} ->
|
|
{reply, {ok, Result}, NewState};
|
|
{error, Reason, SameState} ->
|
|
{reply, {error, Reason}, SameState}
|
|
end;
|
|
handle_call(get_state, _From, State) ->
|
|
{reply, State, State};
|
|
handle_call(get_log_tip, _From, State) ->
|
|
{reply, log_tip(State), State};
|
|
handle_call({set_projections, Names}, _From, State) ->
|
|
{reply, ok, with_projections(Names, State)}.
|
|
|
|
handle_cast(_, S) -> {noreply, S}.
|
|
|
|
handle_info(_, S) -> {noreply, S}.
|