diff --git a/next/kernel/projection.erl b/next/kernel/projection.erl index a88c2a96..55978fca 100644 --- a/next/kernel/projection.erl +++ b/next/kernel/projection.erl @@ -1,6 +1,9 @@ -module(projection). +-behaviour(gen_server). -export([new/2, new/3, fold_activity/2, replay/2, name/1, state/1, fold_fn/1]). +-export([start_link/3, async_fold/2, query/1, stop/1]). +-export([init/1, handle_call/3, handle_cast/2, handle_info/2]). %% Pure-functional projection driver per design §10. %% @@ -52,3 +55,43 @@ field(_, []) -> erlang:error(badkey). set_field(K, V, [{K, _} | Rest]) -> [{K, V} | Rest]; set_field(K, V, [P | Rest]) -> [P | set_field(K, V, Rest)]; set_field(K, V, []) -> [{K, V}]. + +%% ── Step 7b: gen_server wrapper ───────────────────────────────── +%% +%% Each projection runs in its own gen_server, registered under the +%% projection's Name atom. `async_fold/2` casts an activity into the +%% process; `query/1` synchronously fetches the current state. +%% +%% Port notes (mirroring Step 5b on the registry): `gen_server:start_link` +%% returns the raw Pid; `?MODULE` macro is unsupported; spawned +%% processes don't survive across separate `erlang-eval-ast` calls +%% so tests must inline start_link with their operations. + +start_link(Name, InitialState, FoldFn) -> + Pid = gen_server:start_link(projection, [Name, InitialState, FoldFn]), + erlang:register(Name, Pid), + Pid. + +async_fold(Name, Activity) -> + gen_server:cast(Name, {fold, Activity}). + +query(Name) -> + gen_server:call(Name, get_state). + +stop(Name) -> + R = gen_server:call(Name, '$gen_stop'), + erlang:unregister(Name), + R. + +%% gen_server callbacks + +init([Name, InitialState, FoldFn]) -> + {ok, new(Name, InitialState, FoldFn)}. + +handle_call(get_state, _From, Proj) -> + {reply, state(Proj), Proj}. + +handle_cast({fold, Activity}, Proj) -> + {noreply, fold_activity(Proj, Activity)}. + +handle_info(_, Proj) -> {noreply, Proj}. diff --git a/next/tests/projection_server.sh b/next/tests/projection_server.sh new file mode 100755 index 00000000..a3a3096a --- /dev/null +++ b/next/tests/projection_server.sh @@ -0,0 +1,117 @@ +#!/usr/bin/env bash +# next/tests/projection_server.sh — Step 7b acceptance test. +# +# Exercises gen_server-per-projection: start_link/3, async_fold/2, +# query/1. Each test inlines start_link with operations because +# the Erlang-on-SX scheduler doesn't preserve processes across +# separate erlang-eval-ast invocations. 10 cases. + +set -uo pipefail +cd "$(git rev-parse --show-toplevel)" + +SX_SERVER="${SX_SERVER:-hosts/ocaml/_build/default/bin/sx_server.exe}" +if [ ! -x "$SX_SERVER" ]; then + SX_SERVER="/root/rose-ash/hosts/ocaml/_build/default/bin/sx_server.exe" +fi +if [ ! -x "$SX_SERVER" ]; then + echo "ERROR: sx_server.exe not found." >&2 + exit 1 +fi + +VERBOSE="${1:-}" +PASS=0; FAIL=0; ERRORS="" +TMPFILE=$(mktemp); trap "rm -f $TMPFILE" EXIT + +cat > "$TMPFILE" <<'EPOCHS' +(epoch 1) +(load "lib/erlang/tokenizer.sx") +(load "lib/erlang/parser.sx") +(load "lib/erlang/parser-core.sx") +(load "lib/erlang/parser-expr.sx") +(load "lib/erlang/parser-module.sx") +(load "lib/erlang/transpile.sx") +(load "lib/erlang/runtime.sx") +(load "lib/erlang/vm/dispatcher.sx") +(epoch 2) +(eval "(er-load-gen-server!)") +(epoch 3) +(eval "(get (erlang-load-module (file-read \"next/kernel/projection.erl\")) :name)") + +;; start_link returns a Pid registered under the given name +(epoch 10) +(eval "(get (erlang-eval-ast \"is_pid(projection:start_link(p1, 0, fun (_A, S) -> S + 1 end))\") :name)") + +;; query before any async_fold returns initial state +(epoch 11) +(eval "(erlang-eval-ast \"projection:start_link(p1, 0, fun (_A, S) -> S + 1 end), projection:query(p1)\")") + +;; Single async_fold + query returns new state +(epoch 12) +(eval "(erlang-eval-ast \"projection:start_link(p1, 0, fun (_A, S) -> S + 1 end), projection:async_fold(p1, a), projection:query(p1)\")") + +;; Five async_folds accumulate +(epoch 13) +(eval "(erlang-eval-ast \"projection:start_link(p1, 0, fun (_A, S) -> S + 1 end), projection:async_fold(p1, 1), projection:async_fold(p1, 2), projection:async_fold(p1, 3), projection:async_fold(p1, 4), projection:async_fold(p1, 5), projection:query(p1)\")") + +;; Custom initial state preserved +(epoch 14) +(eval "(erlang-eval-ast \"projection:start_link(p1, 42, fun (A, S) -> S + A end), projection:query(p1)\")") + +;; Fold can read the activity (sum activities) +(epoch 15) +(eval "(erlang-eval-ast \"projection:start_link(p1, 0, fun (A, S) -> S + A end), projection:async_fold(p1, 10), projection:async_fold(p1, 20), projection:async_fold(p1, 30), projection:query(p1)\")") + +;; List-append fold preserves insertion order (newest-first) +(epoch 16) +(eval "(get (erlang-eval-ast \"projection:start_link(p1, [], fun (A, S) -> [A | S] end), projection:async_fold(p1, a), projection:async_fold(p1, b), projection:async_fold(p1, c), projection:query(p1) =:= [c, b, a]\") :name)") + +;; Two named projections are independent +(epoch 17) +(eval "(get (erlang-eval-ast \"projection:start_link(p1, 0, fun (_A, S) -> S + 1 end), projection:start_link(p2, [], fun (A, S) -> [A | S] end), projection:async_fold(p1, x), projection:async_fold(p1, y), projection:async_fold(p2, x), {projection:query(p1), projection:query(p2)} =:= {2, [x]}\") :name)") + +;; Conditional fold (filter on activity tag) +(epoch 18) +(eval "(erlang-eval-ast \"projection:start_link(p1, 0, fun (A, S) -> case A of {keep, _} -> S + 1; _ -> S end end), projection:async_fold(p1, {keep, a}), projection:async_fold(p1, plain), projection:async_fold(p1, {keep, b}), projection:async_fold(p1, plain), projection:query(p1)\")") +EPOCHS + +OUTPUT=$(timeout 180 "$SX_SERVER" < "$TMPFILE" 2>/dev/null) + +check() { + local epoch="$1" desc="$2" expected="$3" + local actual + actual=$(echo "$OUTPUT" | awk -v e="$epoch" ' + $0 ~ "^\\(ok-len " e " " { getline; print; exit } + $0 ~ "^\\(ok " e " " { print; exit } + $0 ~ "^\\(error " e " " { print; exit } + ') + [ -z "$actual" ] && actual="" + if echo "$actual" | grep -qF -- "$expected"; then + PASS=$((PASS+1)) + [ "$VERBOSE" = "-v" ] && echo " ok $desc" + else + FAIL=$((FAIL+1)) + ERRORS+=" FAIL [$desc] (epoch $epoch) expected: $expected | actual: $actual +" + fi +} + +check 2 "gen_server loaded" "gen_server" +check 3 "projection module loaded" "projection" +check 10 "start_link returns Pid" "true" +check 11 "initial state via query" "0" +check 12 "async_fold + query" "1" +check 13 "five async_folds accumulate" "5" +check 14 "custom initial state" "42" +check 15 "fold reads activity (sum)" "60" +check 16 "list-append fold order" "true" +check 17 "two named projections indep." "true" +check 18 "conditional fold (filter)" "2" + +TOTAL=$((PASS+FAIL)) +if [ $FAIL -eq 0 ]; then + echo "ok $PASS/$TOTAL next/tests/projection_server.sh passed" +else + echo "FAIL $PASS/$TOTAL passed, $FAIL failed:" + echo "$ERRORS" +fi +[ $FAIL -eq 0 ] diff --git a/plans/fed-sx-milestone-1.md b/plans/fed-sx-milestone-1.md index 6d99ae7f..ec94211b 100644 --- a/plans/fed-sx-milestone-1.md +++ b/plans/fed-sx-milestone-1.md @@ -457,7 +457,7 @@ publish(ActorId, ActivityRequest) -> **Sub-deliverables:** - [x] **7a** — Pure-functional `next/kernel/projection.erl`: `new/2,3`, `fold_activity/2`, `replay/2`, `name/1`, `state/1`, `fold_fn/1`. Projection record is `[{name, _}, {state, _}, {fold, fun}]`; fold body is an Erlang fun in v1 (SX-source eval bridge deferred). `next/tests/projection_pure.sh` (12 cases). -- [ ] **7b** — gen_server wrapper: `start_link/1`, named-per-projection, `async_fold/2`, `query/1`, `snapshot/1`. +- [x] **7b** — gen_server-per-projection: `start_link/3(Name, InitialState, FoldFn)` + `async_fold/2(Name, Activity)` (cast) + `query/1(Name)` (call) + `stop/1`. Each projection registered under its own Name atom. `next/tests/projection_server.sh` (11 cases). Snapshot persistence deferred (needs SX-source eval + on-disk state). - [ ] **7c** — Broadcast hook from `outbox:publish` — feed `Signed` to every projection process. - [ ] **7d** — `sandbox:eval_pure/2` (Erlang sandbox-mode caller — gas budget + IO denial) once an SX-source eval bridge exists. @@ -977,6 +977,7 @@ A few things still under-specified; resolve as work begins. Newest first. One line per sub-deliverable commit. Erlang conformance gate (`bash lib/erlang/conformance.sh`) must remain 729/729 on every entry. +- **2026-05-28** — Step 7b: `projection.erl` extended with gen_server callbacks + per-projection named-process API. `start_link/3(Name, InitialState, FoldFn)` spawns and registers under the supplied atom; `async_fold/2(Name, Activity)` casts a fold message; `query/1(Name)` synchronously returns the current state. Same port quirks as registry gen_server (Step 5b): raw Pid return, no `?MODULE` macro, processes don't survive between separate `erlang-eval-ast` calls — tests inline start_link with operations. Two named projections are independent. Snapshot persistence deferred to a later sub-step (needs SX-source eval + on-disk state). `next/tests/projection_server.sh` 11/11. Erlang conformance 729/729. - **2026-05-28** — Step 7a: `next/kernel/projection.erl` — pure-functional projection driver. Record shape `[{name, _}, {state, _}, {fold, fun}]`; `fold_activity/2` advances state by one activity; `replay/2` folds a whole list (mirrors `log:entries/1` semantics); `new/2` defaults to the identity fold and `new/3` accepts a custom Erlang fun. Multiple projections share no state — independent record values. Step 7 split into 7a (done) + 7b (gen_server-per-projection) + 7c (broadcast hook from outbox) + 7d (sandbox eval, needs SX-source bridge). `next/tests/projection_pure.sh` 12/12. Erlang conformance 729/729. - **2026-05-28** — Step 6d-publish: `outbox:publish/2(Request, Context)` orchestrates construct + sign + `pipeline:run_stages` + `log:append`. Stage list is `[stage_envelope, stage_signature(AS), stage_replay(LogState)]` — so a duplicate publish (same Request, same Published) halts at the replay stage and returns `{error, replay, LogState}` with the log unchanged; bad key material halts at `bad_signature`. Happy path returns `{ok, [{cid, Cid}, {activity, Signed}], NewLog}`. Projection-scheduler dispatch deferred to Step 7. `next/tests/outbox_publish.sh` 13/13 covers happy path, replay halt, sig halt, multi-publish progression, CID stability across fresh logs. Erlang conformance 729/729. - **2026-05-28** — Step 6d-cs: `next/kernel/outbox.erl` — envelope construction + signing. `construct/4` takes `(Type, ActorId, Published, Object)`, builds the canonical key-sorted property list, and derives the activity `:id` from `cid:to_string({activity_envelope, Skeleton})`. `sign/2` extracts key_id/algorithm/key-material from a KeySpec proplist, computes the v1 HMAC over canonical bytes, and appends the `:signature` pair. `cid_of/1` is a convenience accessor. Round-trip end-to-end through `envelope:verify_signature/2` verified (correct key passes, wrong key returns bad_signature). Step 6d split into 6d-cs (done) + 6d-publish (orchestration). `next/tests/outbox_construct.sh` 13/13. Erlang conformance 729/729.