From 1ea47681b2ccb289c035ba9442c7dcdd36b337b5 Mon Sep 17 00:00:00 2001 From: giles Date: Thu, 28 May 2026 06:57:36 +0000 Subject: [PATCH] =?UTF-8?q?fed-sx-m1:=20Step=207c=20=E2=80=94=20outbox:pub?= =?UTF-8?q?lish=20broadcasts=20to=20projection=20processes=20+=2014=20test?= =?UTF-8?q?s?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- next/kernel/outbox.erl | 11 +++ next/tests/outbox_broadcast.sh | 129 +++++++++++++++++++++++++++++++++ plans/fed-sx-milestone-1.md | 3 +- 3 files changed, 142 insertions(+), 1 deletion(-) create mode 100755 next/tests/outbox_broadcast.sh diff --git a/next/kernel/outbox.erl b/next/kernel/outbox.erl index 0e5c307f..5c3bd52e 100644 --- a/next/kernel/outbox.erl +++ b/next/kernel/outbox.erl @@ -91,12 +91,23 @@ publish(Request, Context) -> case pipeline:run_stages(Signed, Stages) of ok -> {ok, NewLog, _Seq} = log:append(LogState, Signed), + broadcast(Signed, envelope_field(projections, Context)), Result = [{cid, cid_of(Signed)}, {activity, Signed}], {ok, Result, NewLog}; {error, Reason} -> {error, Reason, LogState} end. +%% broadcast/2 — fire-and-forget cast to each named projection. +%% Missing/nil/empty list is a no-op; the publish API does not +%% require projections to exist. Activity is the post-sign Signed +%% envelope (same value that landed in the log). +broadcast(_Activity, nil) -> ok; +broadcast(_Activity, []) -> ok; +broadcast(Activity, [Name | Rest]) -> + projection:async_fold(Name, Activity), + broadcast(Activity, Rest). + envelope_field(K, PL) -> case envelope:get_field(K, PL) of {ok, V} -> V; diff --git a/next/tests/outbox_broadcast.sh b/next/tests/outbox_broadcast.sh new file mode 100755 index 00000000..3e7e1c3a --- /dev/null +++ b/next/tests/outbox_broadcast.sh @@ -0,0 +1,129 @@ +#!/usr/bin/env bash +# next/tests/outbox_broadcast.sh — Step 7c acceptance test. +# +# Verifies outbox:publish/2 fans out to projection processes +# listed in Context's :projections entry. Each test inlines +# start_link with publish + query because spawned processes +# don't survive across erlang-eval-ast invocations. 9 cases. + +set -uo pipefail +cd "$(git rev-parse --show-toplevel)" + +SX_SERVER="${SX_SERVER:-hosts/ocaml/_build/default/bin/sx_server.exe}" +if [ ! -x "$SX_SERVER" ]; then + SX_SERVER="/root/rose-ash/hosts/ocaml/_build/default/bin/sx_server.exe" +fi +if [ ! -x "$SX_SERVER" ]; then + echo "ERROR: sx_server.exe not found." >&2 + exit 1 +fi + +VERBOSE="${1:-}" +PASS=0; FAIL=0; ERRORS="" +TMPFILE=$(mktemp); trap "rm -f $TMPFILE" EXIT + +# Shared prelude: KM/KS/AS/L0 + projections registered + Ctx with +# the named projections wired through. Each test threads from +# this state. +PRELUDE='KM = <<1,2,3,4>>, KS = [{key_id,k1},{algorithm,ed25519},{value,KM}], AS = [{public_keys,[[{id,k1},{created,50},{value,KM}]]}], {ok, L0} = log:open(alice, base), projection:start_link(p_count, 0, fun (_A, S) -> S + 1 end), projection:start_link(p_collect, [], fun (A, S) -> [A | S] end),' + +cat > "$TMPFILE" < count = 1 +(epoch 10) +(eval "(erlang-eval-ast \"${PRELUDE} Ctx = [{actor_id,alice},{published,100},{key_spec,KS},{actor_state,AS},{log,L0},{projections,[p_count]}], outbox:publish([{type,create},{object,nil}], Ctx), projection:query(p_count)\")") + +;; Single publish fans out to TWO projections -> both advance +(epoch 11) +(eval "(get (erlang-eval-ast \"${PRELUDE} Ctx = [{actor_id,alice},{published,100},{key_spec,KS},{actor_state,AS},{log,L0},{projections,[p_count, p_collect]}], outbox:publish([{type,create},{object,nil}], Ctx), C = projection:query(p_count), L = projection:query(p_collect), {C, length(L)} =:= {1, 1}\") :name)") + +;; Empty :projections list -> no fan-out, projections stay at initial state +(epoch 12) +(eval "(erlang-eval-ast \"${PRELUDE} Ctx = [{actor_id,alice},{published,100},{key_spec,KS},{actor_state,AS},{log,L0},{projections,[]}], outbox:publish([{type,create},{object,nil}], Ctx), projection:query(p_count)\")") + +;; Missing :projections field -> no fan-out +(epoch 13) +(eval "(erlang-eval-ast \"${PRELUDE} Ctx = [{actor_id,alice},{published,100},{key_spec,KS},{actor_state,AS},{log,L0}], outbox:publish([{type,create},{object,nil}], Ctx), projection:query(p_count)\")") + +;; Three sequential publishes -> projection count = 3 (state persisted across casts) +(epoch 14) +(eval "(erlang-eval-ast \"${PRELUDE} Ctx0 = [{actor_id,alice},{published,100},{key_spec,KS},{actor_state,AS},{log,L0},{projections,[p_count]}], {ok, _, L1} = outbox:publish([{type,create},{object,nil}], Ctx0), Ctx1 = [{actor_id,alice},{published,200},{key_spec,KS},{actor_state,AS},{log,L1},{projections,[p_count]}], {ok, _, L2} = outbox:publish([{type,create},{object,nil}], Ctx1), Ctx2 = [{actor_id,alice},{published,300},{key_spec,KS},{actor_state,AS},{log,L2},{projections,[p_count]}], outbox:publish([{type,create},{object,nil}], Ctx2), projection:query(p_count)\")") + +;; Replay-halted publish does NOT broadcast +(epoch 15) +(eval "(get (erlang-eval-ast \"${PRELUDE} Ctx = [{actor_id,alice},{published,100},{key_spec,KS},{actor_state,AS},{log,L0},{projections,[p_count]}], Req = [{type,create},{object,nil}], {ok, _, L1} = outbox:publish(Req, Ctx), Ctx2 = [{actor_id,alice},{published,100},{key_spec,KS},{actor_state,AS},{log,L1},{projections,[p_count]}], outbox:publish(Req, Ctx2), projection:query(p_count) =:= 1\") :name)") + +;; Sig-failed publish does NOT broadcast +(epoch 16) +(eval "(get (erlang-eval-ast \"${PRELUDE} BadKS = [{key_id,k1},{algorithm,ed25519},{value,<<9,9,9,9>>}], Ctx = [{actor_id,alice},{published,100},{key_spec,BadKS},{actor_state,AS},{log,L0},{projections,[p_count]}], outbox:publish([{type,create},{object,nil}], Ctx), projection:query(p_count) =:= 0\") :name)") + +;; Projections receive the Signed activity (collect-fold sees envelope structure) +(epoch 17) +(eval "(get (erlang-eval-ast \"${PRELUDE} Ctx = [{actor_id,alice},{published,100},{key_spec,KS},{actor_state,AS},{log,L0},{projections,[p_collect]}], {ok, Result, _} = outbox:publish([{type,create},{object,nil}], Ctx), {ok, ExpectedAct} = envelope:get_field(activity, Result), [Got] = projection:query(p_collect), Got =:= ExpectedAct\") :name)") +EPOCHS + +OUTPUT=$(timeout 240 "$SX_SERVER" < "$TMPFILE" 2>/dev/null) + +check() { + local epoch="$1" desc="$2" expected="$3" + local actual + actual=$(echo "$OUTPUT" | awk -v e="$epoch" ' + $0 ~ "^\\(ok-len " e " " { getline; print; exit } + $0 ~ "^\\(ok " e " " { print; exit } + $0 ~ "^\\(error " e " " { print; exit } + ') + [ -z "$actual" ] && actual="" + if echo "$actual" | grep -qF -- "$expected"; then + PASS=$((PASS+1)) + [ "$VERBOSE" = "-v" ] && echo " ok $desc" + else + FAIL=$((FAIL+1)) + ERRORS+=" FAIL [$desc] (epoch $epoch) expected: $expected | actual: $actual +" + fi +} + +check 2 "gen_server loaded" "gen_server" +check 3 "envelope module loaded" "envelope" +check 4 "log module loaded" "log" +check 5 "pipeline module loaded" "pipeline" +check 6 "projection module loaded" "projection" +check 7 "outbox module loaded" "outbox" +check 10 "single publish -> count = 1" "1" +check 11 "fan-out to two projections" "true" +check 12 "empty :projections -> no fanout" "0" +check 13 "missing :projections -> no fan" "0" +check 14 "three publishes -> count = 3" "3" +check 15 "replay halt skips broadcast" "true" +check 16 "sig failure skips broadcast" "true" +check 17 "projection sees Signed activity" "true" + +TOTAL=$((PASS+FAIL)) +if [ $FAIL -eq 0 ]; then + echo "ok $PASS/$TOTAL next/tests/outbox_broadcast.sh passed" +else + echo "FAIL $PASS/$TOTAL passed, $FAIL failed:" + echo "$ERRORS" +fi +[ $FAIL -eq 0 ] diff --git a/plans/fed-sx-milestone-1.md b/plans/fed-sx-milestone-1.md index ec94211b..93277443 100644 --- a/plans/fed-sx-milestone-1.md +++ b/plans/fed-sx-milestone-1.md @@ -458,7 +458,7 @@ publish(ActorId, ActivityRequest) -> **Sub-deliverables:** - [x] **7a** — Pure-functional `next/kernel/projection.erl`: `new/2,3`, `fold_activity/2`, `replay/2`, `name/1`, `state/1`, `fold_fn/1`. Projection record is `[{name, _}, {state, _}, {fold, fun}]`; fold body is an Erlang fun in v1 (SX-source eval bridge deferred). `next/tests/projection_pure.sh` (12 cases). - [x] **7b** — gen_server-per-projection: `start_link/3(Name, InitialState, FoldFn)` + `async_fold/2(Name, Activity)` (cast) + `query/1(Name)` (call) + `stop/1`. Each projection registered under its own Name atom. `next/tests/projection_server.sh` (11 cases). Snapshot persistence deferred (needs SX-source eval + on-disk state). -- [ ] **7c** — Broadcast hook from `outbox:publish` — feed `Signed` to every projection process. +- [x] **7c** — `outbox:publish` broadcast hook: after `log:append`, fans out the signed activity to every projection listed under `Context`'s `:projections` entry via `projection:async_fold`. Stage halts (replay, sig failure) skip broadcast. `next/tests/outbox_broadcast.sh` (14 cases). - [ ] **7d** — `sandbox:eval_pure/2` (Erlang sandbox-mode caller — gas budget + IO denial) once an SX-source eval bridge exists. **Deliverables:** @@ -977,6 +977,7 @@ A few things still under-specified; resolve as work begins. Newest first. One line per sub-deliverable commit. Erlang conformance gate (`bash lib/erlang/conformance.sh`) must remain 729/729 on every entry. +- **2026-05-28** — Step 7c: `outbox:publish` now broadcasts the signed activity to every projection process named in `Context`'s `:projections` entry — fired immediately after `log:append`, via `projection:async_fold`. Missing/nil/empty list is a no-op (preserves the Step 6d-publish contract). Stage halts (replay duplicate, sig failure) suppress the broadcast — projection state stays at zero while the activity is rejected. `next/tests/outbox_broadcast.sh` 14/14 covers single + multi projection fan-out, three-publish accumulation, replay-skip, sig-skip, and the projection receiving the post-sign Signed envelope (not the pre-sign skeleton). Erlang conformance 729/729. - **2026-05-28** — Step 7b: `projection.erl` extended with gen_server callbacks + per-projection named-process API. `start_link/3(Name, InitialState, FoldFn)` spawns and registers under the supplied atom; `async_fold/2(Name, Activity)` casts a fold message; `query/1(Name)` synchronously returns the current state. Same port quirks as registry gen_server (Step 5b): raw Pid return, no `?MODULE` macro, processes don't survive between separate `erlang-eval-ast` calls — tests inline start_link with operations. Two named projections are independent. Snapshot persistence deferred to a later sub-step (needs SX-source eval + on-disk state). `next/tests/projection_server.sh` 11/11. Erlang conformance 729/729. - **2026-05-28** — Step 7a: `next/kernel/projection.erl` — pure-functional projection driver. Record shape `[{name, _}, {state, _}, {fold, fun}]`; `fold_activity/2` advances state by one activity; `replay/2` folds a whole list (mirrors `log:entries/1` semantics); `new/2` defaults to the identity fold and `new/3` accepts a custom Erlang fun. Multiple projections share no state — independent record values. Step 7 split into 7a (done) + 7b (gen_server-per-projection) + 7c (broadcast hook from outbox) + 7d (sandbox eval, needs SX-source bridge). `next/tests/projection_pure.sh` 12/12. Erlang conformance 729/729. - **2026-05-28** — Step 6d-publish: `outbox:publish/2(Request, Context)` orchestrates construct + sign + `pipeline:run_stages` + `log:append`. Stage list is `[stage_envelope, stage_signature(AS), stage_replay(LogState)]` — so a duplicate publish (same Request, same Published) halts at the replay stage and returns `{error, replay, LogState}` with the log unchanged; bad key material halts at `bad_signature`. Happy path returns `{ok, [{cid, Cid}, {activity, Signed}], NewLog}`. Projection-scheduler dispatch deferred to Step 7. `next/tests/outbox_publish.sh` 13/13 covers happy path, replay halt, sig halt, multi-publish progression, CID stability across fresh logs. Erlang conformance 729/729.