-module(nx_kernel). -behaviour(gen_server). %% Pure-functional API -export([new/0, new/3, add_actor/4, has_actor/2, actors/1, actor_count/1, publish/2, publish/3, bootstrap_actor/4, actor_id/1, log_state/1, log_tip/1, key_spec/1, actor_state/1, projections/1, next_published/1, actor_log_state/2, actor_log_tip/2, actor_inbox_state/2, actor_inbox_tip/2, append_to_actor_inbox/3, actor_key_spec/2, actor_state/2, actor_projections/2, actor_next_published/2, actor_bucket/2, with_projections/2, with_actor_projections/3, next_actor_seq/1]). %% gen_server API -export([start_link/3, publish/1, query/0, log_tip/0, with_projections/1, stop/0, add_actor/3, publish_to/2, log_tip_for/1, log_state_for/1, inbox_tip_for/1, inbox_state_for/1, append_inbox/2, actors/0, state_for/1, bucket_for/1, with_projections_for/2, bootstrap_actor/3]). -export([init/1, handle_call/3, handle_cast/2, handle_info/2]). %% Kernel orchestrator — the long-lived runtime state held by the %% running fed-sx instance. Step 1 (m2) refactor: state is now %% per-actor bucketed so one kernel hosts any number of actors. %% %% New state shape (property list): %% [{actors, [{ActorId, ActorBucket}, ...]}, %% {next_actor_seq, NextN}] %% %% ActorBucket = [{key_spec, KS}, %% {actor_state, AS}, %% {log, L}, %% {projections, [Name]}, %% {next_published, NextSeq}] %% %% Legacy single-actor accessors (actor_id/1, key_spec/1, etc.) %% continue to read from the first registered actor — keeps every %% pre-m2 test passing through bootstrap:start/3. %% %% next_actor_seq is a monotonic counter handed out to add_actor for %% future use (e.g. per-actor URL paths in Step 4). It's not yet %% read by the rest of the kernel. %% ── Pure-functional API ────────────────────────────────────────── new() -> [{actors, []}, {next_actor_seq, 1}]. new(ActorId, KeySpec, ActorStateProplist) -> {ok, S} = add_actor(ActorId, KeySpec, ActorStateProplist, new()), S. add_actor(ActorId, KeySpec, AS, State) -> Actors = field(actors, State), case has_keyed(ActorId, Actors) of true -> {error, already_present}; false -> {ok, L0} = log:open(ActorId, base_stub()), {ok, I0} = log:open(ActorId, inbox_base_stub()), Bucket = [{key_spec, KeySpec}, {actor_state, AS}, {log, L0}, {actor_inbox, I0}, {projections, []}, {next_published, 1}], Seq = field(next_actor_seq, State), State1 = set(actors, Actors ++ [{ActorId, Bucket}], State), State2 = set(next_actor_seq, Seq + 1, State1), {ok, State2} end. has_actor(ActorId, State) -> has_keyed(ActorId, field(actors, State)). actors(State) -> [Id || {Id, _Bucket} <- field(actors, State)]. actor_count(State) -> length(field(actors, State)). next_actor_seq(State) -> field(next_actor_seq, State). actor_bucket(ActorId, State) -> find_keyed(ActorId, field(actors, State)). %% publish/3 — per-actor publish. publish(ActorId, Request, State) -> case actor_bucket(ActorId, State) of {error, no_actor} -> {error, no_actor, State}; {ok, Bucket} -> P = field(next_published, Bucket), Ctx = [{actor_id, ActorId}, {published, P}, {key_spec, field(key_spec, Bucket)}, {actor_state, field(actor_state, Bucket)}, {log, field(log, Bucket)}, {projections, field(projections, Bucket)}], case outbox:publish(Request, Ctx) of {ok, Result, NewLog} -> B1 = set(log, NewLog, Bucket), B2 = set(next_published, P + 1, B1), NewState = set_bucket(ActorId, B2, State), {ok, Result, NewState}; {error, Reason, _} -> {error, Reason, State} end end. %% publish/2 — legacy single-actor publish; routes to first actor. publish(Request, State) -> case actors(State) of [] -> {error, no_actor, State}; [First | _] -> publish(First, Request, State) end. %% bootstrap_actor/4 — register an actor bucket and immediately %% publish a Create{Person|Service|Group} as that actor's first %% activity. Profile carries the object fields plus :public_keys. %% Returns {ok, Result, NewState} where Result has the published %% Create's CID, or {error, Reason, State} on validation halt. bootstrap_actor(ActorId, Profile, KeySpec, State) -> PublicKeys = case field(public_keys, Profile) of nil -> []; KS -> KS end, AS = [{public_keys, PublicKeys}], case add_actor(ActorId, KeySpec, AS, State) of {ok, State1} -> ActorType = case field(type, Profile) of nil -> person; T -> T end, Object = [{type, ActorType}] ++ collect_profile_fields( [name, preferredUsername, summary, icon, public_keys], Profile), Request = [{type, create}, {object, Object}], publish(ActorId, Request, State1); {error, Reason} -> {error, Reason, State} end. collect_profile_fields([], _) -> []; collect_profile_fields([F | Rest], Profile) -> case field(F, Profile) of nil -> collect_profile_fields(Rest, Profile); V -> [{F, V} | collect_profile_fields(Rest, Profile)] end. with_actor_projections(ActorId, Names, State) -> case actor_bucket(ActorId, State) of {error, no_actor} -> {error, no_actor}; {ok, Bucket} -> B1 = set(projections, Names, Bucket), {ok, set_bucket(ActorId, B1, State)} end. with_projections(Names, State) -> case actors(State) of [] -> State; [First | _] -> {ok, NewState} = with_actor_projections(First, Names, State), NewState end. %% Per-actor accessors actor_log_state(ActorId, State) -> case actor_bucket(ActorId, State) of {ok, B} -> {ok, field(log, B)}; {error, _} -> {error, no_actor} end. actor_log_tip(ActorId, State) -> case actor_log_state(ActorId, State) of {ok, L} -> log:tip(L); {error, _} -> nil end. actor_inbox_state(ActorId, State) -> case actor_bucket(ActorId, State) of {ok, B} -> {ok, field(actor_inbox, B)}; {error, _} -> {error, no_actor} end. actor_inbox_tip(ActorId, State) -> case actor_inbox_state(ActorId, State) of {ok, I} -> log:tip(I); {error, _} -> nil end. %% append_to_actor_inbox/3 — pure-functional inbox append. Mirrors %% publish/3's bucket-update shape; the activity is already signed %% + validated by the time it lands here (Step 5's pipeline handles %% sig verify + replay before this call). append_to_actor_inbox(ActorId, Activity, State) -> case actor_bucket(ActorId, State) of {error, no_actor} -> {error, no_actor, State}; {ok, Bucket} -> Inbox = field(actor_inbox, Bucket), {ok, NewInbox, _Seq} = log:append(Inbox, Activity), B1 = set(actor_inbox, NewInbox, Bucket), {ok, log:tip(NewInbox), set_bucket(ActorId, B1, State)} end. actor_key_spec(ActorId, State) -> case actor_bucket(ActorId, State) of {ok, B} -> {ok, field(key_spec, B)}; {error, _} -> {error, no_actor} end. actor_state(ActorId, State) when is_list(State), is_atom(ActorId) -> case actor_bucket(ActorId, State) of {ok, B} -> {ok, field(actor_state, B)}; {error, _} -> {error, no_actor} end. actor_projections(ActorId, State) -> case actor_bucket(ActorId, State) of {ok, B} -> {ok, field(projections, B)}; {error, _} -> {error, no_actor} end. actor_next_published(ActorId, State) -> case actor_bucket(ActorId, State) of {ok, B} -> {ok, field(next_published, B)}; {error, _} -> {error, no_actor} end. %% Legacy single-actor accessors — read from first bucket. Keeps %% every M1 test (smoke_app_pure, bootstrap_start, http_publish, %% nx_kernel_server, http_post_format) passing. actor_id(State) -> case field(actors, State) of [] -> nil; [{First, _Bucket} | _] -> First end. key_spec(State) -> bucket_field(key_spec, State). actor_state(State) -> bucket_field(actor_state, State). log_state(State) -> bucket_field(log, State). log_tip(State) -> log:tip(log_state(State)). projections(State) -> case bucket_field(projections, State) of nil -> []; Ps -> Ps end. next_published(State) -> bucket_field(next_published, State). %% ── Internal helpers ────────────────────────────────────────────── base_stub() -> <<98,97,115,101,95,115,116,117,98>>. %% "inbox_base_stub" — distinct path stub so the in-memory log %% module's open/2 returns a fresh log state for the per-actor %% inbox bucket. Disk paths will namespace on this once Step 3b %% on-disk persistence is reactivated for inbox buckets. inbox_base_stub() -> <<105,110,98,111,120,95,115,116,117,98>>. bucket_field(Key, State) -> case field(actors, State) of [] -> nil; [{_First, Bucket} | _] -> field(Key, Bucket) end. set_bucket(ActorId, NewBucket, State) -> Actors = field(actors, State), NewActors = set_keyed(ActorId, NewBucket, Actors), set(actors, NewActors, State). set_keyed(K, V, [{K, _} | Rest]) -> [{K, V} | Rest]; set_keyed(K, V, [P | Rest]) -> [P | set_keyed(K, V, Rest)]; set_keyed(_, _, []) -> []. has_keyed(_, []) -> false; has_keyed(K, [{K, _} | _]) -> true; has_keyed(K, [_ | Rest]) -> has_keyed(K, Rest). find_keyed(_, []) -> {error, no_actor}; find_keyed(K, [{K, V} | _]) -> {ok, V}; find_keyed(K, [_ | Rest]) -> find_keyed(K, Rest). field(K, [{K, V} | _]) -> V; field(K, [_ | Rest]) -> field(K, Rest); field(_, []) -> nil. set(K, V, []) -> [{K, V}]; set(K, V, [{K, _} | Rest]) -> [{K, V} | Rest]; set(K, V, [P | Rest]) -> [P | set(K, V, Rest)]. %% ── gen_server wrapper ────────────────────────────────────────── %% %% Mirrors the registry / projection gen_server patterns from %% Steps 5b and 7b. Same port quirks: raw Pid return, no `?MODULE` %% macro, spawned processes don't persist across separate %% erlang-eval-ast calls — tests inline start_link with operations. %% %% Step 1b (m2) adds multi-actor gen_server calls: %% add_actor/3, publish_to/2, log_tip_for/1, actors/0, state_for/1, %% with_projections_for/2 — all delegating to the pure-functional %% bucket APIs. Existing single-actor calls (publish/1, log_tip/0, %% with_projections/1) continue to route through bucket 0. start_link(ActorId, KeySpec, ActorStateProplist) -> Pid = gen_server:start_link(nx_kernel, [ActorId, KeySpec, ActorStateProplist]), erlang:register(nx_kernel, Pid), Pid. stop() -> R = gen_server:call(nx_kernel, '$gen_stop'), erlang:unregister(nx_kernel), R. publish(Request) -> gen_server:call(nx_kernel, {publish, Request}). query() -> gen_server:call(nx_kernel, get_state). log_tip() -> gen_server:call(nx_kernel, get_log_tip). with_projections(Names) -> gen_server:call(nx_kernel, {set_projections, Names}). %% Step 1b — multi-actor gen_server calls. add_actor(ActorId, KeySpec, AS) -> gen_server:call(nx_kernel, {add_actor, ActorId, KeySpec, AS}). publish_to(ActorId, Request) -> gen_server:call(nx_kernel, {publish_to, ActorId, Request}). log_tip_for(ActorId) -> gen_server:call(nx_kernel, {log_tip_for, ActorId}). log_state_for(ActorId) -> gen_server:call(nx_kernel, {log_state_for, ActorId}). inbox_tip_for(ActorId) -> gen_server:call(nx_kernel, {inbox_tip_for, ActorId}). inbox_state_for(ActorId) -> gen_server:call(nx_kernel, {inbox_state_for, ActorId}). append_inbox(ActorId, Activity) -> gen_server:call(nx_kernel, {append_inbox, ActorId, Activity}). actors() -> gen_server:call(nx_kernel, get_actors). state_for(ActorId) -> gen_server:call(nx_kernel, {state_for, ActorId}). bucket_for(ActorId) -> gen_server:call(nx_kernel, {bucket_for, ActorId}). with_projections_for(ActorId, Names) -> gen_server:call(nx_kernel, {set_projections_for, ActorId, Names}). bootstrap_actor(ActorId, Profile, KeySpec) -> gen_server:call(nx_kernel, {bootstrap_actor, ActorId, Profile, KeySpec}). %% gen_server callbacks init([ActorId, KeySpec, AS]) -> {ok, new(ActorId, KeySpec, AS)}. handle_call({publish, Request}, _From, State) -> case publish(Request, State) of {ok, Result, NewState} -> {reply, {ok, Result}, NewState}; {error, Reason, SameState} -> {reply, {error, Reason}, SameState} end; handle_call(get_state, _From, State) -> {reply, State, State}; handle_call(get_log_tip, _From, State) -> {reply, log_tip(State), State}; handle_call({set_projections, Names}, _From, State) -> {reply, ok, with_projections(Names, State)}; handle_call({add_actor, ActorId, KeySpec, AS}, _From, State) -> case add_actor(ActorId, KeySpec, AS, State) of {ok, NewState} -> {reply, ok, NewState}; {error, Reason} -> {reply, {error, Reason}, State} end; handle_call({publish_to, ActorId, Request}, _From, State) -> case publish(ActorId, Request, State) of {ok, Result, NewState} -> {reply, {ok, Result}, NewState}; {error, Reason, SameState} -> {reply, {error, Reason}, SameState} end; handle_call({log_tip_for, ActorId}, _From, State) -> {reply, actor_log_tip(ActorId, State), State}; handle_call({log_state_for, ActorId}, _From, State) -> {reply, actor_log_state(ActorId, State), State}; handle_call({inbox_tip_for, ActorId}, _From, State) -> {reply, actor_inbox_tip(ActorId, State), State}; handle_call({inbox_state_for, ActorId}, _From, State) -> {reply, actor_inbox_state(ActorId, State), State}; handle_call({append_inbox, ActorId, Activity}, _From, State) -> case append_to_actor_inbox(ActorId, Activity, State) of {ok, Tip, NewState} -> {reply, {ok, Tip}, NewState}; {error, Reason, Same} -> {reply, {error, Reason}, Same} end; handle_call(get_actors, _From, State) -> {reply, actors(State), State}; handle_call({state_for, ActorId}, _From, State) -> {reply, actor_state(ActorId, State), State}; handle_call({bucket_for, ActorId}, _From, State) -> {reply, actor_bucket(ActorId, State), State}; handle_call({set_projections_for, ActorId, Names}, _From, State) -> case with_actor_projections(ActorId, Names, State) of {ok, NewState} -> {reply, ok, NewState}; {error, Reason} -> {reply, {error, Reason}, State} end; handle_call({bootstrap_actor, ActorId, Profile, KeySpec}, _From, State) -> case bootstrap_actor(ActorId, Profile, KeySpec, State) of {ok, Result, NewState} -> {reply, {ok, Result}, NewState}; {error, Reason, SameState} -> {reply, {error, Reason}, SameState} end. handle_cast(_, S) -> {noreply, S}. handle_info(_, S) -> {noreply, S}.