diff --git a/next/kernel/outbox.erl b/next/kernel/outbox.erl index ccc4bd87..0e5c307f 100644 --- a/next/kernel/outbox.erl +++ b/next/kernel/outbox.erl @@ -1,5 +1,5 @@ -module(outbox). --export([construct/4, sign/2, cid_of/1]). +-export([construct/4, sign/2, cid_of/1, publish/2]). %% Outbox envelope construction + signing per design §3.1. %% @@ -53,3 +53,53 @@ sign(Envelope, KeySpec) -> cid_of(Envelope) -> {ok, Id} = envelope:get_field(id, Envelope), Id. + +%% publish/2 — the outbound activity pipeline orchestrator. +%% +%% Request shape: [{type, T}, {object, O}] +%% Context shape: [{actor_id, A}, {published, P}, {key_spec, KS}, +%% {actor_state, AS}, {log, L}] +%% +%% Returns: +%% {ok, [{cid, Cid}, {activity, Signed}], NewLog} — happy path +%% {error, Reason, LogState} — validation halted +%% +%% Stages run in order: envelope shape, signature, replay. The +%% replay check uses the log state pre-append, so if the caller +%% publishes the same Request twice with the same Published +%% timestamp the second call halts with {error, replay, _}. +%% +%% Projection-scheduler dispatch (the async fold the design calls +%% for) is deferred to Step 7 — once the projection gen_server +%% exists, this function will broadcast `Signed` to it. + +publish(Request, Context) -> + Type = envelope_field(type, Request), + Object = envelope_field(object, Request), + ActorId = envelope_field(actor_id, Context), + Published = envelope_field(published, Context), + KeySpec = envelope_field(key_spec, Context), + ActorState = envelope_field(actor_state, Context), + LogState = envelope_field(log, Context), + Unsigned = construct(Type, ActorId, Published, Object), + Signed = sign(Unsigned, KeySpec), + Stages = [ + fun (A) -> pipeline:stage_envelope(A) end, + pipeline:stage_signature(ActorState), + pipeline:stage_replay(LogState) + ], + case pipeline:run_stages(Signed, Stages) of + ok -> + {ok, NewLog, _Seq} = log:append(LogState, Signed), + Result = [{cid, cid_of(Signed)}, {activity, Signed}], + {ok, Result, NewLog}; + {error, Reason} -> + {error, Reason, LogState} + end. + +envelope_field(K, PL) -> + case envelope:get_field(K, PL) of + {ok, V} -> V; + not_found -> nil + end. + diff --git a/next/tests/outbox_publish.sh b/next/tests/outbox_publish.sh new file mode 100755 index 00000000..e06675ee --- /dev/null +++ b/next/tests/outbox_publish.sh @@ -0,0 +1,129 @@ +#!/usr/bin/env bash +# next/tests/outbox_publish.sh — Step 6d-publish acceptance test. +# +# Exercises outbox:publish/2 across the happy path, sig failure, +# replay halt, and envelope-shape failure. Returns shape: +# {ok, [{cid, _}, {activity, _}], NewLogState} +# {error, Reason, LogState} +# 10 cases. + +set -uo pipefail +cd "$(git rev-parse --show-toplevel)" + +SX_SERVER="${SX_SERVER:-hosts/ocaml/_build/default/bin/sx_server.exe}" +if [ ! -x "$SX_SERVER" ]; then + SX_SERVER="/root/rose-ash/hosts/ocaml/_build/default/bin/sx_server.exe" +fi +if [ ! -x "$SX_SERVER" ]; then + echo "ERROR: sx_server.exe not found." >&2 + exit 1 +fi + +VERBOSE="${1:-}" +PASS=0; FAIL=0; ERRORS="" +TMPFILE=$(mktemp); trap "rm -f $TMPFILE" EXIT + +# Shared prelude builds a fresh actor state, key spec, empty log, +# and a context proplist. Each test inlines it. +PRELUDE='KM = <<1,2,3,4>>, KS = [{key_id,k1},{algorithm,ed25519},{value,KM}], AS = [{public_keys,[[{id,k1},{created,50},{value,KM}]]}], {ok, L0} = log:open(alice, base), Ctx = [{actor_id,alice},{published,100},{key_spec,KS},{actor_state,AS},{log,L0}], Req = [{type,create},{object,nil}],' + +cat > "$TMPFILE" < log:tip(NewLog) =:= 1; _ -> false end\") :name)") + +;; Result has :cid pointing at the activity's CID +(epoch 11) +(eval "(get (erlang-eval-ast \"${PRELUDE} {ok, Result, _} = outbox:publish(Req, Ctx), {ok, Cid} = envelope:get_field(cid, Result), {ok, Act} = envelope:get_field(activity, Result), outbox:cid_of(Act) =:= Cid\") :name)") + +;; The signed activity is in the log +(epoch 12) +(eval "(get (erlang-eval-ast \"${PRELUDE} {ok, Result, NewLog} = outbox:publish(Req, Ctx), {ok, Act} = envelope:get_field(activity, Result), log:entries(NewLog) =:= [Act]\") :name)") + +;; Replay: second publish of identical Request halts the pipeline +(epoch 13) +(eval "(get (erlang-eval-ast \"${PRELUDE} {ok, _, L1} = outbox:publish(Req, Ctx), Ctx2 = [{actor_id,alice},{published,100},{key_spec,KS},{actor_state,AS},{log,L1}], case outbox:publish(Req, Ctx2) of {error, replay, _} -> ok; _ -> bad end\") :name)") + +;; Replay returns the pre-append LogState unchanged +(epoch 14) +(eval "(get (erlang-eval-ast \"${PRELUDE} {ok, _, L1} = outbox:publish(Req, Ctx), Ctx2 = [{actor_id,alice},{published,100},{key_spec,KS},{actor_state,AS},{log,L1}], {error, _, L2} = outbox:publish(Req, Ctx2), log:tip(L2) =:= 1\") :name)") + +;; Bad key material (sig fails) -> {error, bad_signature, LogState} +(epoch 15) +(eval "(get (erlang-eval-ast \"${PRELUDE} OtherKM = <<9,9,9,9>>, BadKS = [{key_id,k1},{algorithm,ed25519},{value,OtherKM}], BadCtx = [{actor_id,alice},{published,100},{key_spec,BadKS},{actor_state,AS},{log,L0}], case outbox:publish(Req, BadCtx) of {error, bad_signature, _} -> ok; _ -> bad end\") :name)") + +;; Distinct timestamps -> two activities in log +(epoch 16) +(eval "(get (erlang-eval-ast \"${PRELUDE} {ok, _, L1} = outbox:publish(Req, Ctx), Ctx2 = [{actor_id,alice},{published,200},{key_spec,KS},{actor_state,AS},{log,L1}], {ok, _, L2} = outbox:publish(Req, Ctx2), log:tip(L2) =:= 2\") :name)") + +;; Distinct types -> distinct CIDs +(epoch 17) +(eval "(get (erlang-eval-ast \"${PRELUDE} {ok, R1, L1} = outbox:publish(Req, Ctx), R2 = [{type,update},{object,nil}], Ctx2 = [{actor_id,alice},{published,100},{key_spec,KS},{actor_state,AS},{log,L1}], {ok, R, _} = outbox:publish(R2, Ctx2), {ok, C1} = envelope:get_field(cid, R1), {ok, C2} = envelope:get_field(cid, R), C1 =/= C2\") :name)") + +;; CID stable: same Request twice (across fresh logs) -> same CID +(epoch 18) +(eval "(get (erlang-eval-ast \"${PRELUDE} {ok, R1, _} = outbox:publish(Req, Ctx), {ok, L0b} = log:open(alice, base), Ctx_b = [{actor_id,alice},{published,100},{key_spec,KS},{actor_state,AS},{log,L0b}], {ok, R2, _} = outbox:publish(Req, Ctx_b), {ok, C1} = envelope:get_field(cid, R1), {ok, C2} = envelope:get_field(cid, R2), C1 =:= C2\") :name)") +EPOCHS + +OUTPUT=$(timeout 240 "$SX_SERVER" < "$TMPFILE" 2>/dev/null) + +check() { + local epoch="$1" desc="$2" expected="$3" + local actual + actual=$(echo "$OUTPUT" | awk -v e="$epoch" ' + $0 ~ "^\\(ok-len " e " " { getline; print; exit } + $0 ~ "^\\(ok " e " " { print; exit } + $0 ~ "^\\(error " e " " { print; exit } + ') + [ -z "$actual" ] && actual="" + if echo "$actual" | grep -qF -- "$expected"; then + PASS=$((PASS+1)) + [ "$VERBOSE" = "-v" ] && echo " ok $desc" + else + FAIL=$((FAIL+1)) + ERRORS+=" FAIL [$desc] (epoch $epoch) expected: $expected | actual: $actual +" + fi +} + +check 2 "envelope module loaded" "envelope" +check 3 "log module loaded" "log" +check 4 "pipeline module loaded" "pipeline" +check 5 "outbox module loaded" "outbox" +check 10 "happy path tip advances to 1" "true" +check 11 "result :cid matches activity" "true" +check 12 "signed activity in log entries" "true" +check 13 "duplicate publish -> replay" "ok" +check 14 "replay leaves log tip at 1" "true" +check 15 "bad key material -> bad_signature" "ok" +check 16 "distinct timestamps -> tip 2" "true" +check 17 "distinct types -> distinct CIDs" "true" +check 18 "same request -> same CID" "true" + +TOTAL=$((PASS+FAIL)) +if [ $FAIL -eq 0 ]; then + echo "ok $PASS/$TOTAL next/tests/outbox_publish.sh passed" +else + echo "FAIL $PASS/$TOTAL passed, $FAIL failed:" + echo "$ERRORS" +fi +[ $FAIL -eq 0 ] diff --git a/plans/fed-sx-milestone-1.md b/plans/fed-sx-milestone-1.md index 9d2956c0..b8f54797 100644 --- a/plans/fed-sx-milestone-1.md +++ b/plans/fed-sx-milestone-1.md @@ -392,7 +392,7 @@ projection fold maintains it.) - [x] **6c-replay** — `pipeline:stage_replay/2` (direct) + `stage_replay/1` (factory closed over LogState). Checks the log entries for an existing activity with the same `:id`. Returns `{error, replay}` on duplicate, `{error, no_id}` when missing. `next/tests/pipeline_replay.sh` (12 cases). - [ ] **6c-schema** — `stage_activity_schema/1` (registry lookup of activity-type, evaluate :schema body) — blocked behind SX-source eval bridge. - [x] **6d-cs** — `outbox:construct/4` (skeleton + CID-derived :id via `cid:to_string`) + `outbox:sign/2` (HMAC over canonical bytes, append :signature pair from KeySpec) + `cid_of/1` accessor. Verified end-to-end: construct→sign→envelope:verify_signature passes; wrong key material fails with bad_signature. `next/tests/outbox_construct.sh` (13 cases). -- [ ] **6d-publish** — `outbox:publish/N` orchestrates construct + sign + `pipeline:validate_outbound` + `log:append`; returns `{ok, #{cid, id}, NewLogState}`. +- [x] **6d-publish** — `outbox:publish/2(Request, Context)` orchestrates construct + sign + `pipeline:run_stages([envelope, signature, replay])` + `log:append`. Returns `{ok, [{cid, _}, {activity, _}], NewLog}` or `{error, Reason, LogState}` on stage halt. Replay catches duplicate publishes; bad key material surfaces `bad_signature`. `next/tests/outbox_publish.sh` (13 cases). - [ ] **6e** — HTTP handler for POST /activity glue (depends on Step 8 http server) **Deliverables:** @@ -971,6 +971,7 @@ A few things still under-specified; resolve as work begins. Newest first. One line per sub-deliverable commit. Erlang conformance gate (`bash lib/erlang/conformance.sh`) must remain 729/729 on every entry. +- **2026-05-28** — Step 6d-publish: `outbox:publish/2(Request, Context)` orchestrates construct + sign + `pipeline:run_stages` + `log:append`. Stage list is `[stage_envelope, stage_signature(AS), stage_replay(LogState)]` — so a duplicate publish (same Request, same Published) halts at the replay stage and returns `{error, replay, LogState}` with the log unchanged; bad key material halts at `bad_signature`. Happy path returns `{ok, [{cid, Cid}, {activity, Signed}], NewLog}`. Projection-scheduler dispatch deferred to Step 7. `next/tests/outbox_publish.sh` 13/13 covers happy path, replay halt, sig halt, multi-publish progression, CID stability across fresh logs. Erlang conformance 729/729. - **2026-05-28** — Step 6d-cs: `next/kernel/outbox.erl` — envelope construction + signing. `construct/4` takes `(Type, ActorId, Published, Object)`, builds the canonical key-sorted property list, and derives the activity `:id` from `cid:to_string({activity_envelope, Skeleton})`. `sign/2` extracts key_id/algorithm/key-material from a KeySpec proplist, computes the v1 HMAC over canonical bytes, and appends the `:signature` pair. `cid_of/1` is a convenience accessor. Round-trip end-to-end through `envelope:verify_signature/2` verified (correct key passes, wrong key returns bad_signature). Step 6d split into 6d-cs (done) + 6d-publish (orchestration). `next/tests/outbox_construct.sh` 13/13. Erlang conformance 729/729. - **2026-05-28** — Step 6c-replay: `pipeline:stage_replay/2` (direct) + `stage_replay/1` (factory closed over LogState). Linear scan of `log:entries/1` checking for an existing entry with the same `:id`. Returns ok if new, `{error, replay}` on duplicate, `{error, no_id}` when the activity has no id field. Step 6c split into 6c-replay (done) + 6c-schema (deferred — blocked behind SX-source eval bridge for the activity-type :schema body). `next/tests/pipeline_replay.sh` 12/12 covers direct + factory + composition with stage_envelope. Erlang conformance 729/729. - **2026-05-28** — Step 6b-sig: `pipeline:stage_signature/2` direct call + `stage_signature/1` factory returning a context-bound stage fun closed over ActorState. Not wired into the default `inbound_stages`/`outbound_stages` lists because actor state isn't a static-build-time value; callers prepend the factory result to a stage list (`Stages = [stage_envelope, pipeline:stage_signature(AS)]`). `next/tests/pipeline_signature.sh` 11/11 covers direct + factory + composition with stage_envelope (including halt ordering: bad envelope halts before sig; good envelope + bad sig surfaces sig error). Erlang conformance 729/729.