fed-sx-m2: Step 1a — nx_kernel per-actor bucket refactor + 17 tests
Some checks failed
Test, Build, and Deploy / test-build-deploy (push) Failing after 20s
Some checks failed
Test, Build, and Deploy / test-build-deploy (push) Failing after 20s
State shape becomes [{actors, [{Id, Bucket}, ...]}, {next_actor_seq, N}]
with ActorBucket = [{key_spec, KS}, {actor_state, AS}, {log, L},
{projections, [Name]}, {next_published, N}]. Pure-functional multi-
actor APIs (new/0, add_actor/4, has_actor/2, actors/1, actor_count/1,
publish/3, per-actor accessors, with_actor_projections/3) join the
legacy single-actor accessors, which now read from the first bucket.
Every M1 test continues to pass via bootstrap:start/3 -> new/3 ->
first-bucket lookup.
Local has_keyed/find_keyed/set_keyed/set_bucket helpers cover the
keyed-list ops since lists:keymember/keyfind aren't registered in
this substrate.
next/tests/nx_kernel_multi.sh 17/17. M1 nx_kernel-adjacent suites
green (bootstrap_start 10/10, nx_kernel_server 11/11, http_publish
10/10, smoke_app_pure 12/12, http_post_format 13/13, http_publish_fold
10/10, http_marshal 10/10). Erlang conformance 761/761 preserved.
Blockers entry added for pre-existing http_server_tcp.sh 0/5
regression (78eae9ef left dead helper references in runtime.sx:1593) —
substrate-side, out of m2 scope, confirmed pre-existing by reverting
1a's changes and re-running.
This commit is contained in:
@@ -1,82 +1,232 @@
|
||||
-module(nx_kernel).
|
||||
-behaviour(gen_server).
|
||||
-export([new/3, publish/2,
|
||||
|
||||
%% Pure-functional API
|
||||
-export([new/0, new/3,
|
||||
add_actor/4, has_actor/2, actors/1, actor_count/1,
|
||||
publish/2, publish/3,
|
||||
actor_id/1, log_state/1, log_tip/1,
|
||||
key_spec/1, actor_state/1, projections/1,
|
||||
next_published/1, with_projections/2]).
|
||||
key_spec/1, actor_state/1, projections/1, next_published/1,
|
||||
actor_log_state/2, actor_log_tip/2,
|
||||
actor_key_spec/2, actor_state/2, actor_projections/2,
|
||||
actor_next_published/2, actor_bucket/2,
|
||||
with_projections/2, with_actor_projections/3,
|
||||
next_actor_seq/1]).
|
||||
|
||||
%% gen_server API
|
||||
-export([start_link/3, publish/1, query/0, log_tip/0,
|
||||
with_projections/1, stop/0]).
|
||||
-export([init/1, handle_call/3, handle_cast/2, handle_info/2]).
|
||||
|
||||
%% Kernel orchestrator — the long-lived runtime state held by the
|
||||
%% running fed-sx instance. The HTTP layer (Step 8c-post-publish
|
||||
%% follow-up) will park this in a gen_server and dispatch the POST
|
||||
%% /activity request through `publish/2`.
|
||||
%% running fed-sx instance. Step 1 (m2) refactor: state is now
|
||||
%% per-actor bucketed so one kernel hosts any number of actors.
|
||||
%%
|
||||
%% State shape (property list):
|
||||
%% [{actor_id, A},
|
||||
%% {key_spec, KS}, % proplist: key_id / algorithm / value
|
||||
%% {actor_state, AS}, % proplist: public_keys
|
||||
%% {log, L}, % log:open/2 return value
|
||||
%% {projections, [Name]}, % list of registered projection process names
|
||||
%% {next_published, N}] % monotonic counter we feed as :published
|
||||
%% New state shape (property list):
|
||||
%% [{actors, [{ActorId, ActorBucket}, ...]},
|
||||
%% {next_actor_seq, NextN}]
|
||||
%%
|
||||
%% Step 6c's stage_replay catches duplicates by `:id`; the `:id`
|
||||
%% is derived from the unsigned envelope contents. Same Request +
|
||||
%% same `:published` -> same CID, so the next_published counter
|
||||
%% gives every publish a distinct timestamp without needing a
|
||||
%% wall-clock BIF.
|
||||
%% ActorBucket = [{key_spec, KS},
|
||||
%% {actor_state, AS},
|
||||
%% {log, L},
|
||||
%% {projections, [Name]},
|
||||
%% {next_published, NextSeq}]
|
||||
%%
|
||||
%% Legacy single-actor accessors (actor_id/1, key_spec/1, etc.)
|
||||
%% continue to read from the first registered actor — keeps every
|
||||
%% pre-m2 test passing through bootstrap:start/3.
|
||||
%%
|
||||
%% next_actor_seq is a monotonic counter handed out to add_actor for
|
||||
%% future use (e.g. per-actor URL paths in Step 4). It's not yet
|
||||
%% read by the rest of the kernel.
|
||||
|
||||
%% ── Pure-functional API ──────────────────────────────────────────
|
||||
|
||||
new() ->
|
||||
[{actors, []}, {next_actor_seq, 1}].
|
||||
|
||||
new(ActorId, KeySpec, ActorStateProplist) ->
|
||||
{ok, L0} = log:open(ActorId, base_stub()),
|
||||
[{actor_id, ActorId},
|
||||
{key_spec, KeySpec},
|
||||
{actor_state, ActorStateProplist},
|
||||
{log, L0},
|
||||
{projections, []},
|
||||
{next_published, 1}].
|
||||
{ok, S} = add_actor(ActorId, KeySpec, ActorStateProplist, new()),
|
||||
S.
|
||||
|
||||
%% publish/2 — pure state transition. Returns either:
|
||||
%% {ok, Result, NewState} — log + counter advanced
|
||||
%% {error, Reason, State} — state unchanged on validation halt
|
||||
publish(Request, State) ->
|
||||
P = field(next_published, State),
|
||||
Ctx = [{actor_id, field(actor_id, State)},
|
||||
{published, P},
|
||||
{key_spec, field(key_spec, State)},
|
||||
{actor_state, field(actor_state, State)},
|
||||
{log, field(log, State)},
|
||||
{projections, field(projections, State)}],
|
||||
case outbox:publish(Request, Ctx) of
|
||||
{ok, Result, NewLog} ->
|
||||
State1 = set(log, NewLog, State),
|
||||
State2 = set(next_published, P + 1, State1),
|
||||
{ok, Result, State2};
|
||||
{error, Reason, _} ->
|
||||
{error, Reason, State}
|
||||
add_actor(ActorId, KeySpec, AS, State) ->
|
||||
Actors = field(actors, State),
|
||||
case has_keyed(ActorId, Actors) of
|
||||
true ->
|
||||
{error, already_present};
|
||||
false ->
|
||||
{ok, L0} = log:open(ActorId, base_stub()),
|
||||
Bucket = [{key_spec, KeySpec},
|
||||
{actor_state, AS},
|
||||
{log, L0},
|
||||
{projections, []},
|
||||
{next_published, 1}],
|
||||
Seq = field(next_actor_seq, State),
|
||||
State1 = set(actors, Actors ++ [{ActorId, Bucket}], State),
|
||||
State2 = set(next_actor_seq, Seq + 1, State1),
|
||||
{ok, State2}
|
||||
end.
|
||||
|
||||
%% Accessors
|
||||
has_actor(ActorId, State) ->
|
||||
has_keyed(ActorId, field(actors, State)).
|
||||
|
||||
actor_id(State) -> field(actor_id, State).
|
||||
key_spec(State) -> field(key_spec, State).
|
||||
actor_state(State) -> field(actor_state, State).
|
||||
log_state(State) -> field(log, State).
|
||||
log_tip(State) -> log:tip(field(log, State)).
|
||||
projections(State) -> field(projections, State).
|
||||
next_published(State) -> field(next_published, State).
|
||||
actors(State) ->
|
||||
[Id || {Id, _Bucket} <- field(actors, State)].
|
||||
|
||||
actor_count(State) ->
|
||||
length(field(actors, State)).
|
||||
|
||||
next_actor_seq(State) ->
|
||||
field(next_actor_seq, State).
|
||||
|
||||
actor_bucket(ActorId, State) ->
|
||||
find_keyed(ActorId, field(actors, State)).
|
||||
|
||||
%% publish/3 — per-actor publish.
|
||||
publish(ActorId, Request, State) ->
|
||||
case actor_bucket(ActorId, State) of
|
||||
{error, no_actor} ->
|
||||
{error, no_actor, State};
|
||||
{ok, Bucket} ->
|
||||
P = field(next_published, Bucket),
|
||||
Ctx = [{actor_id, ActorId},
|
||||
{published, P},
|
||||
{key_spec, field(key_spec, Bucket)},
|
||||
{actor_state, field(actor_state, Bucket)},
|
||||
{log, field(log, Bucket)},
|
||||
{projections, field(projections, Bucket)}],
|
||||
case outbox:publish(Request, Ctx) of
|
||||
{ok, Result, NewLog} ->
|
||||
B1 = set(log, NewLog, Bucket),
|
||||
B2 = set(next_published, P + 1, B1),
|
||||
NewState = set_bucket(ActorId, B2, State),
|
||||
{ok, Result, NewState};
|
||||
{error, Reason, _} ->
|
||||
{error, Reason, State}
|
||||
end
|
||||
end.
|
||||
|
||||
%% publish/2 — legacy single-actor publish; routes to first actor.
|
||||
publish(Request, State) ->
|
||||
case actors(State) of
|
||||
[] -> {error, no_actor, State};
|
||||
[First | _] -> publish(First, Request, State)
|
||||
end.
|
||||
|
||||
with_actor_projections(ActorId, Names, State) ->
|
||||
case actor_bucket(ActorId, State) of
|
||||
{error, no_actor} ->
|
||||
{error, no_actor};
|
||||
{ok, Bucket} ->
|
||||
B1 = set(projections, Names, Bucket),
|
||||
{ok, set_bucket(ActorId, B1, State)}
|
||||
end.
|
||||
|
||||
%% with_projections — return a new state with :projections replaced.
|
||||
with_projections(Names, State) ->
|
||||
set(projections, Names, State).
|
||||
case actors(State) of
|
||||
[] -> State;
|
||||
[First | _] ->
|
||||
{ok, NewState} = with_actor_projections(First, Names, State),
|
||||
NewState
|
||||
end.
|
||||
|
||||
%% Internal
|
||||
%% Per-actor accessors
|
||||
|
||||
actor_log_state(ActorId, State) ->
|
||||
case actor_bucket(ActorId, State) of
|
||||
{ok, B} -> {ok, field(log, B)};
|
||||
{error, _} -> {error, no_actor}
|
||||
end.
|
||||
|
||||
actor_log_tip(ActorId, State) ->
|
||||
case actor_log_state(ActorId, State) of
|
||||
{ok, L} -> log:tip(L);
|
||||
{error, _} -> nil
|
||||
end.
|
||||
|
||||
actor_key_spec(ActorId, State) ->
|
||||
case actor_bucket(ActorId, State) of
|
||||
{ok, B} -> {ok, field(key_spec, B)};
|
||||
{error, _} -> {error, no_actor}
|
||||
end.
|
||||
|
||||
actor_state(ActorId, State) when is_list(State), is_atom(ActorId) ->
|
||||
case actor_bucket(ActorId, State) of
|
||||
{ok, B} -> {ok, field(actor_state, B)};
|
||||
{error, _} -> {error, no_actor}
|
||||
end.
|
||||
|
||||
actor_projections(ActorId, State) ->
|
||||
case actor_bucket(ActorId, State) of
|
||||
{ok, B} -> {ok, field(projections, B)};
|
||||
{error, _} -> {error, no_actor}
|
||||
end.
|
||||
|
||||
actor_next_published(ActorId, State) ->
|
||||
case actor_bucket(ActorId, State) of
|
||||
{ok, B} -> {ok, field(next_published, B)};
|
||||
{error, _} -> {error, no_actor}
|
||||
end.
|
||||
|
||||
%% Legacy single-actor accessors — read from first bucket. Keeps
|
||||
%% every M1 test (smoke_app_pure, bootstrap_start, http_publish,
|
||||
%% nx_kernel_server, http_post_format) passing.
|
||||
|
||||
actor_id(State) ->
|
||||
case field(actors, State) of
|
||||
[] -> nil;
|
||||
[{First, _Bucket} | _] -> First
|
||||
end.
|
||||
|
||||
key_spec(State) ->
|
||||
bucket_field(key_spec, State).
|
||||
|
||||
actor_state(State) ->
|
||||
bucket_field(actor_state, State).
|
||||
|
||||
log_state(State) ->
|
||||
bucket_field(log, State).
|
||||
|
||||
log_tip(State) ->
|
||||
log:tip(log_state(State)).
|
||||
|
||||
projections(State) ->
|
||||
case bucket_field(projections, State) of
|
||||
nil -> [];
|
||||
Ps -> Ps
|
||||
end.
|
||||
|
||||
next_published(State) ->
|
||||
bucket_field(next_published, State).
|
||||
|
||||
%% ── Internal helpers ──────────────────────────────────────────────
|
||||
|
||||
%% "base_stub" — placeholder base path for the in-memory log
|
||||
%% in v1 (the in-memory log ignores the base argument).
|
||||
base_stub() ->
|
||||
<<98,97,115,101,95,115,116,117,98>>.
|
||||
|
||||
bucket_field(Key, State) ->
|
||||
case field(actors, State) of
|
||||
[] -> nil;
|
||||
[{_First, Bucket} | _] -> field(Key, Bucket)
|
||||
end.
|
||||
|
||||
set_bucket(ActorId, NewBucket, State) ->
|
||||
Actors = field(actors, State),
|
||||
NewActors = set_keyed(ActorId, NewBucket, Actors),
|
||||
set(actors, NewActors, State).
|
||||
|
||||
set_keyed(K, V, [{K, _} | Rest]) -> [{K, V} | Rest];
|
||||
set_keyed(K, V, [P | Rest]) -> [P | set_keyed(K, V, Rest)];
|
||||
set_keyed(_, _, []) -> [].
|
||||
|
||||
has_keyed(_, []) -> false;
|
||||
has_keyed(K, [{K, _} | _]) -> true;
|
||||
has_keyed(K, [_ | Rest]) -> has_keyed(K, Rest).
|
||||
|
||||
find_keyed(_, []) -> {error, no_actor};
|
||||
find_keyed(K, [{K, V} | _]) -> {ok, V};
|
||||
find_keyed(K, [_ | Rest]) -> find_keyed(K, Rest).
|
||||
|
||||
field(K, [{K, V} | _]) -> V;
|
||||
field(K, [_ | Rest]) -> field(K, Rest);
|
||||
field(_, []) -> nil.
|
||||
@@ -91,6 +241,10 @@ set(K, V, [P | Rest]) -> [P | set(K, V, Rest)].
|
||||
%% Steps 5b and 7b. Same port quirks: raw Pid return, no `?MODULE`
|
||||
%% macro, spawned processes don't persist across separate
|
||||
%% erlang-eval-ast calls — tests inline start_link with operations.
|
||||
%%
|
||||
%% Step 1a (m2) keeps the gen_server single-actor; multi-actor
|
||||
%% gen_server calls (publish_to/2, log_tip_for/1, ...) land in
|
||||
%% iteration 1b.
|
||||
|
||||
start_link(ActorId, KeySpec, ActorStateProplist) ->
|
||||
Pid = gen_server:start_link(nx_kernel,
|
||||
|
||||
Reference in New Issue
Block a user