Some checks failed
Test, Build, and Deploy / test-build-deploy (push) Failing after 21s
Adds the receiving-side log bucket every actor needs. add_actor/4
now opens a fresh in-memory log via log:open(ActorId, inbox_base_stub())
and stores it on the bucket as {actor_inbox, LogState} alongside
the outbox {log, _}. Two distinct base stubs ensure the in-memory
log module returns separate states even when the same ActorId is
the actor.
Pure-functional exports:
actor_inbox_state/2(ActorId, State) -> {ok, LogState} | {error, _}
actor_inbox_tip/2(ActorId, State) -> integer | nil
append_to_actor_inbox/3(ActorId, Activity, State)
-> {ok, NewTip, NewState} | {error, no_actor, State}
gen_server exports (mirror the outbox shape):
inbox_tip_for/1(ActorId) -> integer | nil
inbox_state_for/1(ActorId) -> {ok, LogState} | {error, _}
append_inbox/2(ActorId, Activity) -> {ok, NewTip} | {error, _}
handle_call dispatch added for all three.
Inbox and outbox tips are completely independent — appending to one
doesn't touch the other. This is the storage primitive 5b will
build the inbound validation pipeline on top of.
log:append/2 signature noted in code + progress log: it takes
(LogState, Activity) and returns {ok, NewState, Seq} — not
{ok, NewState} as I originally guessed.
next/tests/inbox_bucket.sh 14/14:
- fresh inbox tip = 0 (pure)
- actor_inbox_state {ok, _} (pure)
- append_to_actor_inbox/3 -> {ok, 1, _}
- tip advances after append
- unknown actor -> {error, no_actor, _}
- outbox + inbox tips fully independent
- two actors maintain independent inbox state
- gen_server inbox_tip_for/1 starts at 0
- gen_server append_inbox/2 -> {ok, 1}
- gen_server inbox != outbox tip
- gen_server unknown -> {error, no_actor}
- gen_server inbox_state_for {ok, _}
- two appends -> tip = 2
Conformance 761/761. 125/125 across 7 Step-5-adjacent suites
(inbox_bucket, nx_kernel_multi, nx_kernel_server, bootstrap_start,
http_publish, http_multi_actor, actor_lifecycle, smoke_app_pure).
452 lines
15 KiB
Erlang
452 lines
15 KiB
Erlang
-module(nx_kernel).
|
|
-behaviour(gen_server).
|
|
|
|
%% Pure-functional API
|
|
-export([new/0, new/3,
|
|
add_actor/4, has_actor/2, actors/1, actor_count/1,
|
|
publish/2, publish/3,
|
|
bootstrap_actor/4,
|
|
actor_id/1, log_state/1, log_tip/1,
|
|
key_spec/1, actor_state/1, projections/1, next_published/1,
|
|
actor_log_state/2, actor_log_tip/2,
|
|
actor_inbox_state/2, actor_inbox_tip/2,
|
|
append_to_actor_inbox/3,
|
|
actor_key_spec/2, actor_state/2, actor_projections/2,
|
|
actor_next_published/2, actor_bucket/2,
|
|
with_projections/2, with_actor_projections/3,
|
|
next_actor_seq/1]).
|
|
|
|
%% gen_server API
|
|
-export([start_link/3, publish/1, query/0, log_tip/0,
|
|
with_projections/1, stop/0,
|
|
add_actor/3, publish_to/2, log_tip_for/1, log_state_for/1,
|
|
inbox_tip_for/1, inbox_state_for/1, append_inbox/2,
|
|
actors/0, state_for/1, bucket_for/1,
|
|
with_projections_for/2,
|
|
bootstrap_actor/3]).
|
|
-export([init/1, handle_call/3, handle_cast/2, handle_info/2]).
|
|
|
|
%% Kernel orchestrator — the long-lived runtime state held by the
|
|
%% running fed-sx instance. Step 1 (m2) refactor: state is now
|
|
%% per-actor bucketed so one kernel hosts any number of actors.
|
|
%%
|
|
%% New state shape (property list):
|
|
%% [{actors, [{ActorId, ActorBucket}, ...]},
|
|
%% {next_actor_seq, NextN}]
|
|
%%
|
|
%% ActorBucket = [{key_spec, KS},
|
|
%% {actor_state, AS},
|
|
%% {log, L},
|
|
%% {projections, [Name]},
|
|
%% {next_published, NextSeq}]
|
|
%%
|
|
%% Legacy single-actor accessors (actor_id/1, key_spec/1, etc.)
|
|
%% continue to read from the first registered actor — keeps every
|
|
%% pre-m2 test passing through bootstrap:start/3.
|
|
%%
|
|
%% next_actor_seq is a monotonic counter handed out to add_actor for
|
|
%% future use (e.g. per-actor URL paths in Step 4). It's not yet
|
|
%% read by the rest of the kernel.
|
|
|
|
%% ── Pure-functional API ──────────────────────────────────────────
|
|
|
|
new() ->
|
|
[{actors, []}, {next_actor_seq, 1}].
|
|
|
|
new(ActorId, KeySpec, ActorStateProplist) ->
|
|
{ok, S} = add_actor(ActorId, KeySpec, ActorStateProplist, new()),
|
|
S.
|
|
|
|
add_actor(ActorId, KeySpec, AS, State) ->
|
|
Actors = field(actors, State),
|
|
case has_keyed(ActorId, Actors) of
|
|
true ->
|
|
{error, already_present};
|
|
false ->
|
|
{ok, L0} = log:open(ActorId, base_stub()),
|
|
{ok, I0} = log:open(ActorId, inbox_base_stub()),
|
|
Bucket = [{key_spec, KeySpec},
|
|
{actor_state, AS},
|
|
{log, L0},
|
|
{actor_inbox, I0},
|
|
{projections, []},
|
|
{next_published, 1}],
|
|
Seq = field(next_actor_seq, State),
|
|
State1 = set(actors, Actors ++ [{ActorId, Bucket}], State),
|
|
State2 = set(next_actor_seq, Seq + 1, State1),
|
|
{ok, State2}
|
|
end.
|
|
|
|
has_actor(ActorId, State) ->
|
|
has_keyed(ActorId, field(actors, State)).
|
|
|
|
actors(State) ->
|
|
[Id || {Id, _Bucket} <- field(actors, State)].
|
|
|
|
actor_count(State) ->
|
|
length(field(actors, State)).
|
|
|
|
next_actor_seq(State) ->
|
|
field(next_actor_seq, State).
|
|
|
|
actor_bucket(ActorId, State) ->
|
|
find_keyed(ActorId, field(actors, State)).
|
|
|
|
%% publish/3 — per-actor publish.
|
|
publish(ActorId, Request, State) ->
|
|
case actor_bucket(ActorId, State) of
|
|
{error, no_actor} ->
|
|
{error, no_actor, State};
|
|
{ok, Bucket} ->
|
|
P = field(next_published, Bucket),
|
|
Ctx = [{actor_id, ActorId},
|
|
{published, P},
|
|
{key_spec, field(key_spec, Bucket)},
|
|
{actor_state, field(actor_state, Bucket)},
|
|
{log, field(log, Bucket)},
|
|
{projections, field(projections, Bucket)}],
|
|
case outbox:publish(Request, Ctx) of
|
|
{ok, Result, NewLog} ->
|
|
B1 = set(log, NewLog, Bucket),
|
|
B2 = set(next_published, P + 1, B1),
|
|
NewState = set_bucket(ActorId, B2, State),
|
|
{ok, Result, NewState};
|
|
{error, Reason, _} ->
|
|
{error, Reason, State}
|
|
end
|
|
end.
|
|
|
|
%% publish/2 — legacy single-actor publish; routes to first actor.
|
|
publish(Request, State) ->
|
|
case actors(State) of
|
|
[] -> {error, no_actor, State};
|
|
[First | _] -> publish(First, Request, State)
|
|
end.
|
|
|
|
%% bootstrap_actor/4 — register an actor bucket and immediately
|
|
%% publish a Create{Person|Service|Group} as that actor's first
|
|
%% activity. Profile carries the object fields plus :public_keys.
|
|
%% Returns {ok, Result, NewState} where Result has the published
|
|
%% Create's CID, or {error, Reason, State} on validation halt.
|
|
|
|
bootstrap_actor(ActorId, Profile, KeySpec, State) ->
|
|
PublicKeys = case field(public_keys, Profile) of
|
|
nil -> [];
|
|
KS -> KS
|
|
end,
|
|
AS = [{public_keys, PublicKeys}],
|
|
case add_actor(ActorId, KeySpec, AS, State) of
|
|
{ok, State1} ->
|
|
ActorType = case field(type, Profile) of
|
|
nil -> person;
|
|
T -> T
|
|
end,
|
|
Object = [{type, ActorType}] ++ collect_profile_fields(
|
|
[name, preferredUsername, summary, icon, public_keys],
|
|
Profile),
|
|
Request = [{type, create}, {object, Object}],
|
|
publish(ActorId, Request, State1);
|
|
{error, Reason} ->
|
|
{error, Reason, State}
|
|
end.
|
|
|
|
collect_profile_fields([], _) -> [];
|
|
collect_profile_fields([F | Rest], Profile) ->
|
|
case field(F, Profile) of
|
|
nil -> collect_profile_fields(Rest, Profile);
|
|
V -> [{F, V} | collect_profile_fields(Rest, Profile)]
|
|
end.
|
|
|
|
with_actor_projections(ActorId, Names, State) ->
|
|
case actor_bucket(ActorId, State) of
|
|
{error, no_actor} ->
|
|
{error, no_actor};
|
|
{ok, Bucket} ->
|
|
B1 = set(projections, Names, Bucket),
|
|
{ok, set_bucket(ActorId, B1, State)}
|
|
end.
|
|
|
|
with_projections(Names, State) ->
|
|
case actors(State) of
|
|
[] -> State;
|
|
[First | _] ->
|
|
{ok, NewState} = with_actor_projections(First, Names, State),
|
|
NewState
|
|
end.
|
|
|
|
%% Per-actor accessors
|
|
|
|
actor_log_state(ActorId, State) ->
|
|
case actor_bucket(ActorId, State) of
|
|
{ok, B} -> {ok, field(log, B)};
|
|
{error, _} -> {error, no_actor}
|
|
end.
|
|
|
|
actor_log_tip(ActorId, State) ->
|
|
case actor_log_state(ActorId, State) of
|
|
{ok, L} -> log:tip(L);
|
|
{error, _} -> nil
|
|
end.
|
|
|
|
actor_inbox_state(ActorId, State) ->
|
|
case actor_bucket(ActorId, State) of
|
|
{ok, B} -> {ok, field(actor_inbox, B)};
|
|
{error, _} -> {error, no_actor}
|
|
end.
|
|
|
|
actor_inbox_tip(ActorId, State) ->
|
|
case actor_inbox_state(ActorId, State) of
|
|
{ok, I} -> log:tip(I);
|
|
{error, _} -> nil
|
|
end.
|
|
|
|
%% append_to_actor_inbox/3 — pure-functional inbox append. Mirrors
|
|
%% publish/3's bucket-update shape; the activity is already signed
|
|
%% + validated by the time it lands here (Step 5's pipeline handles
|
|
%% sig verify + replay before this call).
|
|
|
|
append_to_actor_inbox(ActorId, Activity, State) ->
|
|
case actor_bucket(ActorId, State) of
|
|
{error, no_actor} ->
|
|
{error, no_actor, State};
|
|
{ok, Bucket} ->
|
|
Inbox = field(actor_inbox, Bucket),
|
|
{ok, NewInbox, _Seq} = log:append(Inbox, Activity),
|
|
B1 = set(actor_inbox, NewInbox, Bucket),
|
|
{ok, log:tip(NewInbox), set_bucket(ActorId, B1, State)}
|
|
end.
|
|
|
|
actor_key_spec(ActorId, State) ->
|
|
case actor_bucket(ActorId, State) of
|
|
{ok, B} -> {ok, field(key_spec, B)};
|
|
{error, _} -> {error, no_actor}
|
|
end.
|
|
|
|
actor_state(ActorId, State) when is_list(State), is_atom(ActorId) ->
|
|
case actor_bucket(ActorId, State) of
|
|
{ok, B} -> {ok, field(actor_state, B)};
|
|
{error, _} -> {error, no_actor}
|
|
end.
|
|
|
|
actor_projections(ActorId, State) ->
|
|
case actor_bucket(ActorId, State) of
|
|
{ok, B} -> {ok, field(projections, B)};
|
|
{error, _} -> {error, no_actor}
|
|
end.
|
|
|
|
actor_next_published(ActorId, State) ->
|
|
case actor_bucket(ActorId, State) of
|
|
{ok, B} -> {ok, field(next_published, B)};
|
|
{error, _} -> {error, no_actor}
|
|
end.
|
|
|
|
%% Legacy single-actor accessors — read from first bucket. Keeps
|
|
%% every M1 test (smoke_app_pure, bootstrap_start, http_publish,
|
|
%% nx_kernel_server, http_post_format) passing.
|
|
|
|
actor_id(State) ->
|
|
case field(actors, State) of
|
|
[] -> nil;
|
|
[{First, _Bucket} | _] -> First
|
|
end.
|
|
|
|
key_spec(State) ->
|
|
bucket_field(key_spec, State).
|
|
|
|
actor_state(State) ->
|
|
bucket_field(actor_state, State).
|
|
|
|
log_state(State) ->
|
|
bucket_field(log, State).
|
|
|
|
log_tip(State) ->
|
|
log:tip(log_state(State)).
|
|
|
|
projections(State) ->
|
|
case bucket_field(projections, State) of
|
|
nil -> [];
|
|
Ps -> Ps
|
|
end.
|
|
|
|
next_published(State) ->
|
|
bucket_field(next_published, State).
|
|
|
|
%% ── Internal helpers ──────────────────────────────────────────────
|
|
|
|
base_stub() ->
|
|
<<98,97,115,101,95,115,116,117,98>>.
|
|
|
|
%% "inbox_base_stub" — distinct path stub so the in-memory log
|
|
%% module's open/2 returns a fresh log state for the per-actor
|
|
%% inbox bucket. Disk paths will namespace on this once Step 3b
|
|
%% on-disk persistence is reactivated for inbox buckets.
|
|
inbox_base_stub() ->
|
|
<<105,110,98,111,120,95,115,116,117,98>>.
|
|
|
|
bucket_field(Key, State) ->
|
|
case field(actors, State) of
|
|
[] -> nil;
|
|
[{_First, Bucket} | _] -> field(Key, Bucket)
|
|
end.
|
|
|
|
set_bucket(ActorId, NewBucket, State) ->
|
|
Actors = field(actors, State),
|
|
NewActors = set_keyed(ActorId, NewBucket, Actors),
|
|
set(actors, NewActors, State).
|
|
|
|
set_keyed(K, V, [{K, _} | Rest]) -> [{K, V} | Rest];
|
|
set_keyed(K, V, [P | Rest]) -> [P | set_keyed(K, V, Rest)];
|
|
set_keyed(_, _, []) -> [].
|
|
|
|
has_keyed(_, []) -> false;
|
|
has_keyed(K, [{K, _} | _]) -> true;
|
|
has_keyed(K, [_ | Rest]) -> has_keyed(K, Rest).
|
|
|
|
find_keyed(_, []) -> {error, no_actor};
|
|
find_keyed(K, [{K, V} | _]) -> {ok, V};
|
|
find_keyed(K, [_ | Rest]) -> find_keyed(K, Rest).
|
|
|
|
field(K, [{K, V} | _]) -> V;
|
|
field(K, [_ | Rest]) -> field(K, Rest);
|
|
field(_, []) -> nil.
|
|
|
|
set(K, V, []) -> [{K, V}];
|
|
set(K, V, [{K, _} | Rest]) -> [{K, V} | Rest];
|
|
set(K, V, [P | Rest]) -> [P | set(K, V, Rest)].
|
|
|
|
%% ── gen_server wrapper ──────────────────────────────────────────
|
|
%%
|
|
%% Mirrors the registry / projection gen_server patterns from
|
|
%% Steps 5b and 7b. Same port quirks: raw Pid return, no `?MODULE`
|
|
%% macro, spawned processes don't persist across separate
|
|
%% erlang-eval-ast calls — tests inline start_link with operations.
|
|
%%
|
|
%% Step 1b (m2) adds multi-actor gen_server calls:
|
|
%% add_actor/3, publish_to/2, log_tip_for/1, actors/0, state_for/1,
|
|
%% with_projections_for/2 — all delegating to the pure-functional
|
|
%% bucket APIs. Existing single-actor calls (publish/1, log_tip/0,
|
|
%% with_projections/1) continue to route through bucket 0.
|
|
|
|
start_link(ActorId, KeySpec, ActorStateProplist) ->
|
|
Pid = gen_server:start_link(nx_kernel,
|
|
[ActorId, KeySpec, ActorStateProplist]),
|
|
erlang:register(nx_kernel, Pid),
|
|
Pid.
|
|
|
|
stop() ->
|
|
R = gen_server:call(nx_kernel, '$gen_stop'),
|
|
erlang:unregister(nx_kernel),
|
|
R.
|
|
|
|
publish(Request) ->
|
|
gen_server:call(nx_kernel, {publish, Request}).
|
|
|
|
query() ->
|
|
gen_server:call(nx_kernel, get_state).
|
|
|
|
log_tip() ->
|
|
gen_server:call(nx_kernel, get_log_tip).
|
|
|
|
with_projections(Names) ->
|
|
gen_server:call(nx_kernel, {set_projections, Names}).
|
|
|
|
%% Step 1b — multi-actor gen_server calls.
|
|
|
|
add_actor(ActorId, KeySpec, AS) ->
|
|
gen_server:call(nx_kernel, {add_actor, ActorId, KeySpec, AS}).
|
|
|
|
publish_to(ActorId, Request) ->
|
|
gen_server:call(nx_kernel, {publish_to, ActorId, Request}).
|
|
|
|
log_tip_for(ActorId) ->
|
|
gen_server:call(nx_kernel, {log_tip_for, ActorId}).
|
|
|
|
log_state_for(ActorId) ->
|
|
gen_server:call(nx_kernel, {log_state_for, ActorId}).
|
|
|
|
inbox_tip_for(ActorId) ->
|
|
gen_server:call(nx_kernel, {inbox_tip_for, ActorId}).
|
|
|
|
inbox_state_for(ActorId) ->
|
|
gen_server:call(nx_kernel, {inbox_state_for, ActorId}).
|
|
|
|
append_inbox(ActorId, Activity) ->
|
|
gen_server:call(nx_kernel, {append_inbox, ActorId, Activity}).
|
|
|
|
actors() ->
|
|
gen_server:call(nx_kernel, get_actors).
|
|
|
|
state_for(ActorId) ->
|
|
gen_server:call(nx_kernel, {state_for, ActorId}).
|
|
|
|
bucket_for(ActorId) ->
|
|
gen_server:call(nx_kernel, {bucket_for, ActorId}).
|
|
|
|
with_projections_for(ActorId, Names) ->
|
|
gen_server:call(nx_kernel, {set_projections_for, ActorId, Names}).
|
|
|
|
bootstrap_actor(ActorId, Profile, KeySpec) ->
|
|
gen_server:call(nx_kernel, {bootstrap_actor, ActorId, Profile, KeySpec}).
|
|
|
|
%% gen_server callbacks
|
|
|
|
init([ActorId, KeySpec, AS]) ->
|
|
{ok, new(ActorId, KeySpec, AS)}.
|
|
|
|
handle_call({publish, Request}, _From, State) ->
|
|
case publish(Request, State) of
|
|
{ok, Result, NewState} ->
|
|
{reply, {ok, Result}, NewState};
|
|
{error, Reason, SameState} ->
|
|
{reply, {error, Reason}, SameState}
|
|
end;
|
|
handle_call(get_state, _From, State) ->
|
|
{reply, State, State};
|
|
handle_call(get_log_tip, _From, State) ->
|
|
{reply, log_tip(State), State};
|
|
handle_call({set_projections, Names}, _From, State) ->
|
|
{reply, ok, with_projections(Names, State)};
|
|
handle_call({add_actor, ActorId, KeySpec, AS}, _From, State) ->
|
|
case add_actor(ActorId, KeySpec, AS, State) of
|
|
{ok, NewState} -> {reply, ok, NewState};
|
|
{error, Reason} -> {reply, {error, Reason}, State}
|
|
end;
|
|
handle_call({publish_to, ActorId, Request}, _From, State) ->
|
|
case publish(ActorId, Request, State) of
|
|
{ok, Result, NewState} -> {reply, {ok, Result}, NewState};
|
|
{error, Reason, SameState} -> {reply, {error, Reason}, SameState}
|
|
end;
|
|
handle_call({log_tip_for, ActorId}, _From, State) ->
|
|
{reply, actor_log_tip(ActorId, State), State};
|
|
handle_call({log_state_for, ActorId}, _From, State) ->
|
|
{reply, actor_log_state(ActorId, State), State};
|
|
handle_call({inbox_tip_for, ActorId}, _From, State) ->
|
|
{reply, actor_inbox_tip(ActorId, State), State};
|
|
handle_call({inbox_state_for, ActorId}, _From, State) ->
|
|
{reply, actor_inbox_state(ActorId, State), State};
|
|
handle_call({append_inbox, ActorId, Activity}, _From, State) ->
|
|
case append_to_actor_inbox(ActorId, Activity, State) of
|
|
{ok, Tip, NewState} -> {reply, {ok, Tip}, NewState};
|
|
{error, Reason, Same} -> {reply, {error, Reason}, Same}
|
|
end;
|
|
handle_call(get_actors, _From, State) ->
|
|
{reply, actors(State), State};
|
|
handle_call({state_for, ActorId}, _From, State) ->
|
|
{reply, actor_state(ActorId, State), State};
|
|
handle_call({bucket_for, ActorId}, _From, State) ->
|
|
{reply, actor_bucket(ActorId, State), State};
|
|
handle_call({set_projections_for, ActorId, Names}, _From, State) ->
|
|
case with_actor_projections(ActorId, Names, State) of
|
|
{ok, NewState} -> {reply, ok, NewState};
|
|
{error, Reason} -> {reply, {error, Reason}, State}
|
|
end;
|
|
handle_call({bootstrap_actor, ActorId, Profile, KeySpec}, _From, State) ->
|
|
case bootstrap_actor(ActorId, Profile, KeySpec, State) of
|
|
{ok, Result, NewState} -> {reply, {ok, Result}, NewState};
|
|
{error, Reason, SameState} -> {reply, {error, Reason}, SameState}
|
|
end.
|
|
|
|
handle_cast(_, S) -> {noreply, S}.
|
|
|
|
handle_info(_, S) -> {noreply, S}.
|