Files
rose-ash/next/kernel/nx_kernel.erl
giles 6cfb1cb2d3
Some checks failed
Test, Build, and Deploy / test-build-deploy (push) Has been cancelled
fed-sx-m2: Step 4d — outbox listing from log + pagination + 8 tests
Per-actor GET /actors/<id>/outbox now reads the bucket's log via
new nx_kernel:log_state_for/1 gen_server export and renders the
paged CID list.

nx_kernel additions:
  log_state_for/1 gen_server call returning {ok, LogState} for
  the named actor (mirrors log_tip_for/1's shape).

http_server additions:
  - with_request_query/2 bakes Req's :query binary into Cfg as
    {request_query, Q} so sub-resource handlers can parse params
    without taking the Req as another arg
  - kernel_actor_log_data/2 -> {Tip, Entries} via
    nx_kernel:log_tip_for + log_state_for + log:entries
  - parse_page/1 reads ?page=N (default 1, non-digits -> 1)
  - page_size/0 returns 5 (test-friendly; production picks 20+)
  - page_slice/2 + drop_take/3 + take/2 for the page extraction
  - entry_cids/1 maps entries to :id CID binaries via envelope
  - actor_outbox_full_response_for/5 renders text / JSON / SX:
      text:  outbox: <id>\ntip: N\npage: P\nitem: <cid>\n...
      json:  {"outbox":"<id>","tip":N,"page":P,"items":[...]}
      sx:    (outbox "<id>" :tip N :page P :items (...))
    Empty page degrades to actor_outbox_with_tip_response_for so
    epochs 50-57 from Step 4c still pass — the prefix is preserved.

8 new cases in next/tests/http_multi_actor.sh (41/41 total):
  - 1 publish -> body contains outbox/tip=1/page=1/item: prefix
  - 3 publishes -> body contains tip=3/page=1/item: prefix
  - page=2 with 3 items -> empty page degrades to tip-only body
  - 6 publishes page=1 -> tip=6/page=1/item: prefix
  - 6 publishes page=2 -> tip=6/page=2/item: prefix
  - JSON body shape with items array (1 entry)
  - SX body shape with :items list (1 entry)
  - bad ?page=bad falls back to page 1

Conformance 761/761. 117/117 across 11 Step-4-adjacent suites
(http_multi_actor, http_route, http_publish, http_post_format,
http_marshal, http_publish_fold, http_listen_bif, http_server_start,
nx_kernel_multi, nx_kernel_server, bootstrap_start, actor_lifecycle).

Substrate gotcha logged: named recursive funs fun F(...) -> F(...)
end aren't supported by the parser ('fun-ref syntax not yet
supported'); binary:matches/2 and lists:foreach/2 aren't registered.
Tests prove behaviour via match_prefix substring checks rather than
counting occurrences.
2026-06-06 15:42:37 +00:00

394 lines
13 KiB
Erlang

-module(nx_kernel).
-behaviour(gen_server).
%% Pure-functional API
-export([new/0, new/3,
add_actor/4, has_actor/2, actors/1, actor_count/1,
publish/2, publish/3,
bootstrap_actor/4,
actor_id/1, log_state/1, log_tip/1,
key_spec/1, actor_state/1, projections/1, next_published/1,
actor_log_state/2, actor_log_tip/2,
actor_key_spec/2, actor_state/2, actor_projections/2,
actor_next_published/2, actor_bucket/2,
with_projections/2, with_actor_projections/3,
next_actor_seq/1]).
%% gen_server API
-export([start_link/3, publish/1, query/0, log_tip/0,
with_projections/1, stop/0,
add_actor/3, publish_to/2, log_tip_for/1, log_state_for/1,
actors/0, state_for/1, bucket_for/1,
with_projections_for/2,
bootstrap_actor/3]).
-export([init/1, handle_call/3, handle_cast/2, handle_info/2]).
%% Kernel orchestrator — the long-lived runtime state held by the
%% running fed-sx instance. Step 1 (m2) refactor: state is now
%% per-actor bucketed so one kernel hosts any number of actors.
%%
%% New state shape (property list):
%% [{actors, [{ActorId, ActorBucket}, ...]},
%% {next_actor_seq, NextN}]
%%
%% ActorBucket = [{key_spec, KS},
%% {actor_state, AS},
%% {log, L},
%% {projections, [Name]},
%% {next_published, NextSeq}]
%%
%% Legacy single-actor accessors (actor_id/1, key_spec/1, etc.)
%% continue to read from the first registered actor — keeps every
%% pre-m2 test passing through bootstrap:start/3.
%%
%% next_actor_seq is a monotonic counter handed out to add_actor for
%% future use (e.g. per-actor URL paths in Step 4). It's not yet
%% read by the rest of the kernel.
%% ── Pure-functional API ──────────────────────────────────────────
new() ->
[{actors, []}, {next_actor_seq, 1}].
new(ActorId, KeySpec, ActorStateProplist) ->
{ok, S} = add_actor(ActorId, KeySpec, ActorStateProplist, new()),
S.
add_actor(ActorId, KeySpec, AS, State) ->
Actors = field(actors, State),
case has_keyed(ActorId, Actors) of
true ->
{error, already_present};
false ->
{ok, L0} = log:open(ActorId, base_stub()),
Bucket = [{key_spec, KeySpec},
{actor_state, AS},
{log, L0},
{projections, []},
{next_published, 1}],
Seq = field(next_actor_seq, State),
State1 = set(actors, Actors ++ [{ActorId, Bucket}], State),
State2 = set(next_actor_seq, Seq + 1, State1),
{ok, State2}
end.
has_actor(ActorId, State) ->
has_keyed(ActorId, field(actors, State)).
actors(State) ->
[Id || {Id, _Bucket} <- field(actors, State)].
actor_count(State) ->
length(field(actors, State)).
next_actor_seq(State) ->
field(next_actor_seq, State).
actor_bucket(ActorId, State) ->
find_keyed(ActorId, field(actors, State)).
%% publish/3 — per-actor publish.
publish(ActorId, Request, State) ->
case actor_bucket(ActorId, State) of
{error, no_actor} ->
{error, no_actor, State};
{ok, Bucket} ->
P = field(next_published, Bucket),
Ctx = [{actor_id, ActorId},
{published, P},
{key_spec, field(key_spec, Bucket)},
{actor_state, field(actor_state, Bucket)},
{log, field(log, Bucket)},
{projections, field(projections, Bucket)}],
case outbox:publish(Request, Ctx) of
{ok, Result, NewLog} ->
B1 = set(log, NewLog, Bucket),
B2 = set(next_published, P + 1, B1),
NewState = set_bucket(ActorId, B2, State),
{ok, Result, NewState};
{error, Reason, _} ->
{error, Reason, State}
end
end.
%% publish/2 — legacy single-actor publish; routes to first actor.
publish(Request, State) ->
case actors(State) of
[] -> {error, no_actor, State};
[First | _] -> publish(First, Request, State)
end.
%% bootstrap_actor/4 — register an actor bucket and immediately
%% publish a Create{Person|Service|Group} as that actor's first
%% activity. Profile carries the object fields plus :public_keys.
%% Returns {ok, Result, NewState} where Result has the published
%% Create's CID, or {error, Reason, State} on validation halt.
bootstrap_actor(ActorId, Profile, KeySpec, State) ->
PublicKeys = case field(public_keys, Profile) of
nil -> [];
KS -> KS
end,
AS = [{public_keys, PublicKeys}],
case add_actor(ActorId, KeySpec, AS, State) of
{ok, State1} ->
ActorType = case field(type, Profile) of
nil -> person;
T -> T
end,
Object = [{type, ActorType}] ++ collect_profile_fields(
[name, preferredUsername, summary, icon, public_keys],
Profile),
Request = [{type, create}, {object, Object}],
publish(ActorId, Request, State1);
{error, Reason} ->
{error, Reason, State}
end.
collect_profile_fields([], _) -> [];
collect_profile_fields([F | Rest], Profile) ->
case field(F, Profile) of
nil -> collect_profile_fields(Rest, Profile);
V -> [{F, V} | collect_profile_fields(Rest, Profile)]
end.
with_actor_projections(ActorId, Names, State) ->
case actor_bucket(ActorId, State) of
{error, no_actor} ->
{error, no_actor};
{ok, Bucket} ->
B1 = set(projections, Names, Bucket),
{ok, set_bucket(ActorId, B1, State)}
end.
with_projections(Names, State) ->
case actors(State) of
[] -> State;
[First | _] ->
{ok, NewState} = with_actor_projections(First, Names, State),
NewState
end.
%% Per-actor accessors
actor_log_state(ActorId, State) ->
case actor_bucket(ActorId, State) of
{ok, B} -> {ok, field(log, B)};
{error, _} -> {error, no_actor}
end.
actor_log_tip(ActorId, State) ->
case actor_log_state(ActorId, State) of
{ok, L} -> log:tip(L);
{error, _} -> nil
end.
actor_key_spec(ActorId, State) ->
case actor_bucket(ActorId, State) of
{ok, B} -> {ok, field(key_spec, B)};
{error, _} -> {error, no_actor}
end.
actor_state(ActorId, State) when is_list(State), is_atom(ActorId) ->
case actor_bucket(ActorId, State) of
{ok, B} -> {ok, field(actor_state, B)};
{error, _} -> {error, no_actor}
end.
actor_projections(ActorId, State) ->
case actor_bucket(ActorId, State) of
{ok, B} -> {ok, field(projections, B)};
{error, _} -> {error, no_actor}
end.
actor_next_published(ActorId, State) ->
case actor_bucket(ActorId, State) of
{ok, B} -> {ok, field(next_published, B)};
{error, _} -> {error, no_actor}
end.
%% Legacy single-actor accessors — read from first bucket. Keeps
%% every M1 test (smoke_app_pure, bootstrap_start, http_publish,
%% nx_kernel_server, http_post_format) passing.
actor_id(State) ->
case field(actors, State) of
[] -> nil;
[{First, _Bucket} | _] -> First
end.
key_spec(State) ->
bucket_field(key_spec, State).
actor_state(State) ->
bucket_field(actor_state, State).
log_state(State) ->
bucket_field(log, State).
log_tip(State) ->
log:tip(log_state(State)).
projections(State) ->
case bucket_field(projections, State) of
nil -> [];
Ps -> Ps
end.
next_published(State) ->
bucket_field(next_published, State).
%% ── Internal helpers ──────────────────────────────────────────────
base_stub() ->
<<98,97,115,101,95,115,116,117,98>>.
bucket_field(Key, State) ->
case field(actors, State) of
[] -> nil;
[{_First, Bucket} | _] -> field(Key, Bucket)
end.
set_bucket(ActorId, NewBucket, State) ->
Actors = field(actors, State),
NewActors = set_keyed(ActorId, NewBucket, Actors),
set(actors, NewActors, State).
set_keyed(K, V, [{K, _} | Rest]) -> [{K, V} | Rest];
set_keyed(K, V, [P | Rest]) -> [P | set_keyed(K, V, Rest)];
set_keyed(_, _, []) -> [].
has_keyed(_, []) -> false;
has_keyed(K, [{K, _} | _]) -> true;
has_keyed(K, [_ | Rest]) -> has_keyed(K, Rest).
find_keyed(_, []) -> {error, no_actor};
find_keyed(K, [{K, V} | _]) -> {ok, V};
find_keyed(K, [_ | Rest]) -> find_keyed(K, Rest).
field(K, [{K, V} | _]) -> V;
field(K, [_ | Rest]) -> field(K, Rest);
field(_, []) -> nil.
set(K, V, []) -> [{K, V}];
set(K, V, [{K, _} | Rest]) -> [{K, V} | Rest];
set(K, V, [P | Rest]) -> [P | set(K, V, Rest)].
%% ── gen_server wrapper ──────────────────────────────────────────
%%
%% Mirrors the registry / projection gen_server patterns from
%% Steps 5b and 7b. Same port quirks: raw Pid return, no `?MODULE`
%% macro, spawned processes don't persist across separate
%% erlang-eval-ast calls — tests inline start_link with operations.
%%
%% Step 1b (m2) adds multi-actor gen_server calls:
%% add_actor/3, publish_to/2, log_tip_for/1, actors/0, state_for/1,
%% with_projections_for/2 — all delegating to the pure-functional
%% bucket APIs. Existing single-actor calls (publish/1, log_tip/0,
%% with_projections/1) continue to route through bucket 0.
start_link(ActorId, KeySpec, ActorStateProplist) ->
Pid = gen_server:start_link(nx_kernel,
[ActorId, KeySpec, ActorStateProplist]),
erlang:register(nx_kernel, Pid),
Pid.
stop() ->
R = gen_server:call(nx_kernel, '$gen_stop'),
erlang:unregister(nx_kernel),
R.
publish(Request) ->
gen_server:call(nx_kernel, {publish, Request}).
query() ->
gen_server:call(nx_kernel, get_state).
log_tip() ->
gen_server:call(nx_kernel, get_log_tip).
with_projections(Names) ->
gen_server:call(nx_kernel, {set_projections, Names}).
%% Step 1b — multi-actor gen_server calls.
add_actor(ActorId, KeySpec, AS) ->
gen_server:call(nx_kernel, {add_actor, ActorId, KeySpec, AS}).
publish_to(ActorId, Request) ->
gen_server:call(nx_kernel, {publish_to, ActorId, Request}).
log_tip_for(ActorId) ->
gen_server:call(nx_kernel, {log_tip_for, ActorId}).
log_state_for(ActorId) ->
gen_server:call(nx_kernel, {log_state_for, ActorId}).
actors() ->
gen_server:call(nx_kernel, get_actors).
state_for(ActorId) ->
gen_server:call(nx_kernel, {state_for, ActorId}).
bucket_for(ActorId) ->
gen_server:call(nx_kernel, {bucket_for, ActorId}).
with_projections_for(ActorId, Names) ->
gen_server:call(nx_kernel, {set_projections_for, ActorId, Names}).
bootstrap_actor(ActorId, Profile, KeySpec) ->
gen_server:call(nx_kernel, {bootstrap_actor, ActorId, Profile, KeySpec}).
%% gen_server callbacks
init([ActorId, KeySpec, AS]) ->
{ok, new(ActorId, KeySpec, AS)}.
handle_call({publish, Request}, _From, State) ->
case publish(Request, State) of
{ok, Result, NewState} ->
{reply, {ok, Result}, NewState};
{error, Reason, SameState} ->
{reply, {error, Reason}, SameState}
end;
handle_call(get_state, _From, State) ->
{reply, State, State};
handle_call(get_log_tip, _From, State) ->
{reply, log_tip(State), State};
handle_call({set_projections, Names}, _From, State) ->
{reply, ok, with_projections(Names, State)};
handle_call({add_actor, ActorId, KeySpec, AS}, _From, State) ->
case add_actor(ActorId, KeySpec, AS, State) of
{ok, NewState} -> {reply, ok, NewState};
{error, Reason} -> {reply, {error, Reason}, State}
end;
handle_call({publish_to, ActorId, Request}, _From, State) ->
case publish(ActorId, Request, State) of
{ok, Result, NewState} -> {reply, {ok, Result}, NewState};
{error, Reason, SameState} -> {reply, {error, Reason}, SameState}
end;
handle_call({log_tip_for, ActorId}, _From, State) ->
{reply, actor_log_tip(ActorId, State), State};
handle_call({log_state_for, ActorId}, _From, State) ->
{reply, actor_log_state(ActorId, State), State};
handle_call(get_actors, _From, State) ->
{reply, actors(State), State};
handle_call({state_for, ActorId}, _From, State) ->
{reply, actor_state(ActorId, State), State};
handle_call({bucket_for, ActorId}, _From, State) ->
{reply, actor_bucket(ActorId, State), State};
handle_call({set_projections_for, ActorId, Names}, _From, State) ->
case with_actor_projections(ActorId, Names, State) of
{ok, NewState} -> {reply, ok, NewState};
{error, Reason} -> {reply, {error, Reason}, State}
end;
handle_call({bootstrap_actor, ActorId, Profile, KeySpec}, _From, State) ->
case bootstrap_actor(ActorId, Profile, KeySpec, State) of
{ok, Result, NewState} -> {reply, {ok, Result}, NewState};
{error, Reason, SameState} -> {reply, {error, Reason}, SameState}
end.
handle_cast(_, S) -> {noreply, S}.
handle_info(_, S) -> {noreply, S}.