fed-sx-m1: Step 7c — outbox:publish broadcasts to projection processes + 14 tests
Some checks failed
Test, Build, and Deploy / test-build-deploy (push) Failing after 28s

This commit is contained in:
2026-05-28 06:57:36 +00:00
parent c91683b885
commit 1ea47681b2
3 changed files with 142 additions and 1 deletions

View File

@@ -91,12 +91,23 @@ publish(Request, Context) ->
case pipeline:run_stages(Signed, Stages) of
ok ->
{ok, NewLog, _Seq} = log:append(LogState, Signed),
broadcast(Signed, envelope_field(projections, Context)),
Result = [{cid, cid_of(Signed)}, {activity, Signed}],
{ok, Result, NewLog};
{error, Reason} ->
{error, Reason, LogState}
end.
%% broadcast/2 — fire-and-forget cast to each named projection.
%% Missing/nil/empty list is a no-op; the publish API does not
%% require projections to exist. Activity is the post-sign Signed
%% envelope (same value that landed in the log).
broadcast(_Activity, nil) -> ok;
broadcast(_Activity, []) -> ok;
broadcast(Activity, [Name | Rest]) ->
projection:async_fold(Name, Activity),
broadcast(Activity, Rest).
envelope_field(K, PL) ->
case envelope:get_field(K, PL) of
{ok, V} -> V;

129
next/tests/outbox_broadcast.sh Executable file
View File

@@ -0,0 +1,129 @@
#!/usr/bin/env bash
# next/tests/outbox_broadcast.sh — Step 7c acceptance test.
#
# Verifies outbox:publish/2 fans out to projection processes
# listed in Context's :projections entry. Each test inlines
# start_link with publish + query because spawned processes
# don't survive across erlang-eval-ast invocations. 9 cases.
set -uo pipefail
cd "$(git rev-parse --show-toplevel)"
SX_SERVER="${SX_SERVER:-hosts/ocaml/_build/default/bin/sx_server.exe}"
if [ ! -x "$SX_SERVER" ]; then
SX_SERVER="/root/rose-ash/hosts/ocaml/_build/default/bin/sx_server.exe"
fi
if [ ! -x "$SX_SERVER" ]; then
echo "ERROR: sx_server.exe not found." >&2
exit 1
fi
VERBOSE="${1:-}"
PASS=0; FAIL=0; ERRORS=""
TMPFILE=$(mktemp); trap "rm -f $TMPFILE" EXIT
# Shared prelude: KM/KS/AS/L0 + projections registered + Ctx with
# the named projections wired through. Each test threads from
# this state.
PRELUDE='KM = <<1,2,3,4>>, KS = [{key_id,k1},{algorithm,ed25519},{value,KM}], AS = [{public_keys,[[{id,k1},{created,50},{value,KM}]]}], {ok, L0} = log:open(alice, base), projection:start_link(p_count, 0, fun (_A, S) -> S + 1 end), projection:start_link(p_collect, [], fun (A, S) -> [A | S] end),'
cat > "$TMPFILE" <<EPOCHS
(epoch 1)
(load "lib/erlang/tokenizer.sx")
(load "lib/erlang/parser.sx")
(load "lib/erlang/parser-core.sx")
(load "lib/erlang/parser-expr.sx")
(load "lib/erlang/parser-module.sx")
(load "lib/erlang/transpile.sx")
(load "lib/erlang/runtime.sx")
(load "lib/erlang/vm/dispatcher.sx")
(epoch 2)
(eval "(er-load-gen-server!)")
(epoch 3)
(eval "(get (erlang-load-module (file-read \"next/kernel/envelope.erl\")) :name)")
(epoch 4)
(eval "(get (erlang-load-module (file-read \"next/kernel/log.erl\")) :name)")
(epoch 5)
(eval "(get (erlang-load-module (file-read \"next/kernel/pipeline.erl\")) :name)")
(epoch 6)
(eval "(get (erlang-load-module (file-read \"next/kernel/projection.erl\")) :name)")
(epoch 7)
(eval "(get (erlang-load-module (file-read \"next/kernel/outbox.erl\")) :name)")
;; Single publish fans out to one projection -> count = 1
(epoch 10)
(eval "(erlang-eval-ast \"${PRELUDE} Ctx = [{actor_id,alice},{published,100},{key_spec,KS},{actor_state,AS},{log,L0},{projections,[p_count]}], outbox:publish([{type,create},{object,nil}], Ctx), projection:query(p_count)\")")
;; Single publish fans out to TWO projections -> both advance
(epoch 11)
(eval "(get (erlang-eval-ast \"${PRELUDE} Ctx = [{actor_id,alice},{published,100},{key_spec,KS},{actor_state,AS},{log,L0},{projections,[p_count, p_collect]}], outbox:publish([{type,create},{object,nil}], Ctx), C = projection:query(p_count), L = projection:query(p_collect), {C, length(L)} =:= {1, 1}\") :name)")
;; Empty :projections list -> no fan-out, projections stay at initial state
(epoch 12)
(eval "(erlang-eval-ast \"${PRELUDE} Ctx = [{actor_id,alice},{published,100},{key_spec,KS},{actor_state,AS},{log,L0},{projections,[]}], outbox:publish([{type,create},{object,nil}], Ctx), projection:query(p_count)\")")
;; Missing :projections field -> no fan-out
(epoch 13)
(eval "(erlang-eval-ast \"${PRELUDE} Ctx = [{actor_id,alice},{published,100},{key_spec,KS},{actor_state,AS},{log,L0}], outbox:publish([{type,create},{object,nil}], Ctx), projection:query(p_count)\")")
;; Three sequential publishes -> projection count = 3 (state persisted across casts)
(epoch 14)
(eval "(erlang-eval-ast \"${PRELUDE} Ctx0 = [{actor_id,alice},{published,100},{key_spec,KS},{actor_state,AS},{log,L0},{projections,[p_count]}], {ok, _, L1} = outbox:publish([{type,create},{object,nil}], Ctx0), Ctx1 = [{actor_id,alice},{published,200},{key_spec,KS},{actor_state,AS},{log,L1},{projections,[p_count]}], {ok, _, L2} = outbox:publish([{type,create},{object,nil}], Ctx1), Ctx2 = [{actor_id,alice},{published,300},{key_spec,KS},{actor_state,AS},{log,L2},{projections,[p_count]}], outbox:publish([{type,create},{object,nil}], Ctx2), projection:query(p_count)\")")
;; Replay-halted publish does NOT broadcast
(epoch 15)
(eval "(get (erlang-eval-ast \"${PRELUDE} Ctx = [{actor_id,alice},{published,100},{key_spec,KS},{actor_state,AS},{log,L0},{projections,[p_count]}], Req = [{type,create},{object,nil}], {ok, _, L1} = outbox:publish(Req, Ctx), Ctx2 = [{actor_id,alice},{published,100},{key_spec,KS},{actor_state,AS},{log,L1},{projections,[p_count]}], outbox:publish(Req, Ctx2), projection:query(p_count) =:= 1\") :name)")
;; Sig-failed publish does NOT broadcast
(epoch 16)
(eval "(get (erlang-eval-ast \"${PRELUDE} BadKS = [{key_id,k1},{algorithm,ed25519},{value,<<9,9,9,9>>}], Ctx = [{actor_id,alice},{published,100},{key_spec,BadKS},{actor_state,AS},{log,L0},{projections,[p_count]}], outbox:publish([{type,create},{object,nil}], Ctx), projection:query(p_count) =:= 0\") :name)")
;; Projections receive the Signed activity (collect-fold sees envelope structure)
(epoch 17)
(eval "(get (erlang-eval-ast \"${PRELUDE} Ctx = [{actor_id,alice},{published,100},{key_spec,KS},{actor_state,AS},{log,L0},{projections,[p_collect]}], {ok, Result, _} = outbox:publish([{type,create},{object,nil}], Ctx), {ok, ExpectedAct} = envelope:get_field(activity, Result), [Got] = projection:query(p_collect), Got =:= ExpectedAct\") :name)")
EPOCHS
OUTPUT=$(timeout 240 "$SX_SERVER" < "$TMPFILE" 2>/dev/null)
check() {
local epoch="$1" desc="$2" expected="$3"
local actual
actual=$(echo "$OUTPUT" | awk -v e="$epoch" '
$0 ~ "^\\(ok-len " e " " { getline; print; exit }
$0 ~ "^\\(ok " e " " { print; exit }
$0 ~ "^\\(error " e " " { print; exit }
')
[ -z "$actual" ] && actual="<no output for epoch $epoch>"
if echo "$actual" | grep -qF -- "$expected"; then
PASS=$((PASS+1))
[ "$VERBOSE" = "-v" ] && echo " ok $desc"
else
FAIL=$((FAIL+1))
ERRORS+=" FAIL [$desc] (epoch $epoch) expected: $expected | actual: $actual
"
fi
}
check 2 "gen_server loaded" "gen_server"
check 3 "envelope module loaded" "envelope"
check 4 "log module loaded" "log"
check 5 "pipeline module loaded" "pipeline"
check 6 "projection module loaded" "projection"
check 7 "outbox module loaded" "outbox"
check 10 "single publish -> count = 1" "1"
check 11 "fan-out to two projections" "true"
check 12 "empty :projections -> no fanout" "0"
check 13 "missing :projections -> no fan" "0"
check 14 "three publishes -> count = 3" "3"
check 15 "replay halt skips broadcast" "true"
check 16 "sig failure skips broadcast" "true"
check 17 "projection sees Signed activity" "true"
TOTAL=$((PASS+FAIL))
if [ $FAIL -eq 0 ]; then
echo "ok $PASS/$TOTAL next/tests/outbox_broadcast.sh passed"
else
echo "FAIL $PASS/$TOTAL passed, $FAIL failed:"
echo "$ERRORS"
fi
[ $FAIL -eq 0 ]

View File

@@ -458,7 +458,7 @@ publish(ActorId, ActivityRequest) ->
**Sub-deliverables:**
- [x] **7a** — Pure-functional `next/kernel/projection.erl`: `new/2,3`, `fold_activity/2`, `replay/2`, `name/1`, `state/1`, `fold_fn/1`. Projection record is `[{name, _}, {state, _}, {fold, fun}]`; fold body is an Erlang fun in v1 (SX-source eval bridge deferred). `next/tests/projection_pure.sh` (12 cases).
- [x] **7b** — gen_server-per-projection: `start_link/3(Name, InitialState, FoldFn)` + `async_fold/2(Name, Activity)` (cast) + `query/1(Name)` (call) + `stop/1`. Each projection registered under its own Name atom. `next/tests/projection_server.sh` (11 cases). Snapshot persistence deferred (needs SX-source eval + on-disk state).
- [ ] **7c**Broadcast hook from `outbox:publish` — feed `Signed` to every projection process.
- [x] **7c**`outbox:publish` broadcast hook: after `log:append`, fans out the signed activity to every projection listed under `Context`'s `:projections` entry via `projection:async_fold`. Stage halts (replay, sig failure) skip broadcast. `next/tests/outbox_broadcast.sh` (14 cases).
- [ ] **7d**`sandbox:eval_pure/2` (Erlang sandbox-mode caller — gas budget + IO denial) once an SX-source eval bridge exists.
**Deliverables:**
@@ -977,6 +977,7 @@ A few things still under-specified; resolve as work begins.
Newest first. One line per sub-deliverable commit. Erlang conformance gate
(`bash lib/erlang/conformance.sh`) must remain 729/729 on every entry.
- **2026-05-28** — Step 7c: `outbox:publish` now broadcasts the signed activity to every projection process named in `Context`'s `:projections` entry — fired immediately after `log:append`, via `projection:async_fold`. Missing/nil/empty list is a no-op (preserves the Step 6d-publish contract). Stage halts (replay duplicate, sig failure) suppress the broadcast — projection state stays at zero while the activity is rejected. `next/tests/outbox_broadcast.sh` 14/14 covers single + multi projection fan-out, three-publish accumulation, replay-skip, sig-skip, and the projection receiving the post-sign Signed envelope (not the pre-sign skeleton). Erlang conformance 729/729.
- **2026-05-28** — Step 7b: `projection.erl` extended with gen_server callbacks + per-projection named-process API. `start_link/3(Name, InitialState, FoldFn)` spawns and registers under the supplied atom; `async_fold/2(Name, Activity)` casts a fold message; `query/1(Name)` synchronously returns the current state. Same port quirks as registry gen_server (Step 5b): raw Pid return, no `?MODULE` macro, processes don't survive between separate `erlang-eval-ast` calls — tests inline start_link with operations. Two named projections are independent. Snapshot persistence deferred to a later sub-step (needs SX-source eval + on-disk state). `next/tests/projection_server.sh` 11/11. Erlang conformance 729/729.
- **2026-05-28** — Step 7a: `next/kernel/projection.erl` — pure-functional projection driver. Record shape `[{name, _}, {state, _}, {fold, fun}]`; `fold_activity/2` advances state by one activity; `replay/2` folds a whole list (mirrors `log:entries/1` semantics); `new/2` defaults to the identity fold and `new/3` accepts a custom Erlang fun. Multiple projections share no state — independent record values. Step 7 split into 7a (done) + 7b (gen_server-per-projection) + 7c (broadcast hook from outbox) + 7d (sandbox eval, needs SX-source bridge). `next/tests/projection_pure.sh` 12/12. Erlang conformance 729/729.
- **2026-05-28** — Step 6d-publish: `outbox:publish/2(Request, Context)` orchestrates construct + sign + `pipeline:run_stages` + `log:append`. Stage list is `[stage_envelope, stage_signature(AS), stage_replay(LogState)]` — so a duplicate publish (same Request, same Published) halts at the replay stage and returns `{error, replay, LogState}` with the log unchanged; bad key material halts at `bad_signature`. Happy path returns `{ok, [{cid, Cid}, {activity, Signed}], NewLog}`. Projection-scheduler dispatch deferred to Step 7. `next/tests/outbox_publish.sh` 13/13 covers happy path, replay halt, sig halt, multi-publish progression, CID stability across fresh logs. Erlang conformance 729/729.