fed-sx-types Phase 8: blog-publish-digest e2e + flow:wait
Some checks failed
Test, Build, and Deploy / test-build-deploy (push) Failing after 58s

The motivating end-to-end demonstration (fed-sx-triggers-loop.md Phase
4): one trigger arriving in the pipeline drives a multi-step business
flow with a branch, a timer suspension, an injected effect, and a
follow-up activity emit — all in the kernel's own runtime.

- flow.erl: flow:wait/1 — a timer-style suspend that PRESERVES the value
  on resume (vs flow:suspend/1, which returns the logged result), so a
  "wait until morning" step lets the env flow through to later steps.
- next/flow/flows/blog_publish_digest.erl: the flow. Branches on the
  article :category (newsletter -> wait-until-morning -> send + emit;
  urgent -> send + emit now; else -> skip), fetches followers (injected),
  builds a digest email per follower, and emits a DigestSent activity
  OBJECT. Effect-as-data: a flow can't call kernel gen_servers from
  inside the drive (a blocking call there deadlocks the scheduler), so
  it returns the emails + DigestSent object for a driver to dispatch and
  append — which can then trigger downstream flows, closing the loop.

Test: triggers_e2e.sh (10) — urgent completes in one cycle with 3 emails
+ a DigestSent object; newsletter suspends on the morning timer, then
resumes to the same on "advancing the clock"; draft takes the else
branch (no emails); a non-Article note is rejected by the guard; a
duplicate activity fires once. flow:wait covered in next/flow (36/36).

plans/fed-sx-design.md §13.10 documents the trigger fan-out as a
kernel convention. lib/erlang 771/771.

Co-Authored-By: Claude Opus 4.8 (1M context) <noreply@anthropic.com>
This commit is contained in:
2026-06-30 18:31:26 +00:00
parent 6b4850b34e
commit 6c9b96390f
5 changed files with 267 additions and 1 deletions

View File

@@ -103,6 +103,11 @@ cat > "$TMPFILE" <<EPOCHS
(eval "(get (erlang-eval-ast \"flow:drive(${SUSP_FLOW}, 0, [{wait1, 99}]) =:= {flow_done, {resumed, 99}}\") :name)") (eval "(get (erlang-eval-ast \"flow:drive(${SUSP_FLOW}, 0, [{wait1, 99}]) =:= {flow_done, {resumed, 99}}\") :name)")
(epoch 62) (epoch 62)
(eval "(get (erlang-eval-ast \"flow:run(flow_spec:sequence([flow:suspend(a), flow:suspend(b)]), 0) =:= {flow_suspended, a}\") :name)") (eval "(get (erlang-eval-ast \"flow:run(flow_spec:sequence([flow:suspend(a), flow:suspend(b)]), 0) =:= {flow_suspended, a}\") :name)")
;; wait/1 — timer-style suspend that PRESERVES the value on resume
(epoch 63)
(eval "(get (erlang-eval-ast \"flow:run(flow_spec:sequence([flow:wait(t), flow_spec:flow_node(fun(X) -> X + 1 end)]), 5) =:= {flow_suspended, t}\") :name)")
(epoch 64)
(eval "(get (erlang-eval-ast \"flow:drive(flow_spec:sequence([flow:wait(t), flow_spec:flow_node(fun(X) -> X + 1 end)]), 5, [{t, ignored}]) =:= {flow_done, 6}\") :name)")
;; ── durable store: registry ──────────────────────────────── ;; ── durable store: registry ────────────────────────────────
(epoch 70) (epoch 70)
@@ -178,6 +183,8 @@ check 52 "retry runs node" "true"
check 60 "suspend miss short-circuits" "true" check 60 "suspend miss short-circuits" "true"
check 61 "suspend replay completes" "true" check 61 "suspend replay completes" "true"
check 62 "first of two suspends wins" "true" check 62 "first of two suspends wins" "true"
check 63 "wait short-circuits on miss" "true"
check 64 "wait preserves value on resume" "true"
check 70 "register + resolve + list" "true" check 70 "register + resolve + list" "true"
check 71 "resolve unknown -> not_found" "true" check 71 "resolve unknown -> not_found" "true"
check 80 "start one-shot -> done" "true" check 80 "start one-shot -> done" "true"

View File

@@ -1,7 +1,7 @@
-module(flow). -module(flow).
-export([drive/3, run/2, -export([drive/3, run/2,
cont/2, susp/2, is_susp/1, ctx_value/1, ctx_log/1, cont/2, susp/2, is_susp/1, ctx_value/1, ctx_log/1,
suspend/1, log_lookup/2]). suspend/1, wait/1, log_lookup/2]).
%% flow-on-erlang — the deterministic-replay core. A native Erlang port %% flow-on-erlang — the deterministic-replay core. A native Erlang port
%% of the Scheme flow engine (lib/flow), so the fed-sx kernel can fan %% of the Scheme flow engine (lib/flow), so the fed-sx kernel can fan
@@ -62,6 +62,24 @@ suspend(Tag) ->
end end
end. end.
%% wait(Tag) — a timer-style suspend that PRESERVES the current value
%% instead of replacing it with the resolved one. Use it for pure
%% waits ("resume in the morning") where the resume is just a signal,
%% not a result: on the first pass it short-circuits like suspend; once
%% Tag is in the log the value flows through unchanged, so downstream
%% steps still see the value (e.g. the env) they had before the wait.
wait(Tag) ->
fun (Ctx) ->
case Ctx of
{flow_susp, _, _} -> Ctx;
{flow_cont, Value, Log} ->
case log_lookup(Tag, Log) of
{ok, _} -> {flow_cont, Value, Log};
miss -> {flow_susp, Tag, Log}
end
end
end.
log_lookup(_, []) -> miss; log_lookup(_, []) -> miss;
log_lookup(Tag, [{Tag, Value} | _]) -> {ok, Value}; log_lookup(Tag, [{Tag, Value} | _]) -> {ok, Value};
log_lookup(Tag, [_ | Rest]) -> log_lookup(Tag, Rest). log_lookup(Tag, [_ | Rest]) -> log_lookup(Tag, Rest).

View File

@@ -0,0 +1,81 @@
-module(blog_publish_digest).
-export([build/1]).
%% A motivating multi-step business flow for the fed-sx-triggers e2e:
%% when an Article is published, decide a batch policy by category,
%% (for newsletters) wait until morning, fetch the author's followers,
%% build a digest email for each, and emit a DigestSent activity — the
%% flow's own output, which a driver appends, closing the loop so it can
%% trigger downstream flows.
%%
%% Demonstrates: a branch on an activity field (:category), a timer
%% suspension (flow:wait/1, resumed by advancing the clock), an injected
%% effect (fetch_followers), and a follow-up activity emit.
%%
%% Effect-as-data: a flow runs inside flow_store's drive, where a
%% blocking call (e.g. into nx_kernel) would deadlock this scheduler, so
%% the flow does NOT perform IO itself. It DESCRIBES the effects in its
%% result — {digest_sent, Emails, DigestActivityObject} — and the driver
%% (the fan-out caller) dispatches the emails and appends the DigestSent
%% activity. fetch_followers is injected (the one external read) as a
%% pure function so the e2e can supply a deterministic list.
%%
%% Input env (from flow_dispatch): [{activity, A}, {actor, Actor}, ...].
%% Result: {digest_sent, [Email], DigestObject} | skipped.
build(Effects) ->
FetchFollowers = field(fetch_followers, Effects),
flow_spec:branch(
fun (Env) -> is_article(Env) end,
flow_spec:branch(
fun (Env) -> category_is(Env, newsletter) end,
%% newsletter: hold until morning, then send + emit
flow_spec:sequence([flow:wait(morning), send_emit(FetchFollowers)]),
flow_spec:branch(
fun (Env) -> category_is(Env, urgent) end,
%% urgent: send + emit now (no wait)
send_emit(FetchFollowers),
%% any other category: skip
flow_spec:flow_const(skipped))),
%% not an Article: skip
flow_spec:flow_const(skipped)).
%% send_emit(FetchFollowers) — the terminal step: build one digest email
%% per follower and the DigestSent emit object. Pure given the injected
%% follower list, so it is replay-safe (and it sits after the only
%% suspend point, so it runs exactly once).
send_emit(FetchFollowers) ->
flow_spec:flow_node(
fun (Env) ->
Activity = env_activity(Env),
Actor = env_actor(Env),
ArtId = activity_id(Activity),
Followers = FetchFollowers(Actor),
Emails = [ [{to, F}, {article, ArtId}] || F <- Followers ],
Digest = [{type, digest_sent},
{for, ArtId},
{follower_count, length(Followers)}],
{digest_sent, Emails, Digest}
end).
%% ── predicates / accessors ──────────────────────────────────────
is_article(Env) ->
object_type(object_of(env_activity(Env))) =:= article.
category_is(Env, Cat) ->
object_category(object_of(env_activity(Env))) =:= Cat.
env_activity(Env) -> field(activity, Env).
env_actor(Env) -> field(actor, Env).
object_of(Activity) -> field(object, Activity).
object_type(Obj) -> field(type, Obj).
object_category(Obj) -> field(category, Obj).
activity_id(Activity) -> field(id, Activity).
field(Key, Proplist) ->
case envelope:get_field(Key, Proplist) of
{ok, V} -> V;
_ -> undefined
end.

134
next/tests/triggers_e2e.sh Executable file
View File

@@ -0,0 +1,134 @@
#!/usr/bin/env bash
# next/tests/triggers_e2e.sh — fed-sx triggers Phase 4 (end-to-end).
#
# The motivating blog-publish-digest flow, driven the whole way: a
# trigger binds Article-creates to the flow; the post-append fan-out
# starts it; the flow branches on :category, (for newsletters) suspends
# on a morning timer, fetches followers (injected), and emits a
# DigestSent activity object. Effect-as-data: the flow returns the
# emails + DigestSent object (a driver would dispatch/append them) since
# a flow can't call kernel gen_servers from inside the drive.
#
# Each epoch starts fresh gen_servers so instance ids are deterministic.
set -uo pipefail
cd "$(git rev-parse --show-toplevel)"
SX_SERVER="${SX_SERVER:-hosts/ocaml/_build/default/bin/sx_server.exe}"
if [ ! -x "$SX_SERVER" ]; then
SX_SERVER="/root/rose-ash/hosts/ocaml/_build/default/bin/sx_server.exe"
fi
if [ ! -x "$SX_SERVER" ]; then
echo "ERROR: sx_server.exe not found." >&2
exit 1
fi
VERBOSE="${1:-}"
PASS=0; FAIL=0; ERRORS=""
TMPFILE=$(mktemp); trap "rm -f $TMPFILE" EXIT
# Bring-up shared by every case: registry + store, a 3-follower mock,
# the flow registered as blog_digest, and a trigger binding `create`
# to it guarded on "the object is an Article". Cfg/AS as the fan-out
# expects. Activities differ by :category (urgent / newsletter / draft)
# plus a non-Article note.
BOOT='trigger_registry:start_link(), flow_store:start_link(), FF = fun(_) -> [f1, f2, f3] end, Flow = blog_publish_digest:build([{fetch_followers, FF}]), flow_store:register_flow(blog_digest, Flow), Guard = fun(A, _) -> case envelope:get_field(object, A) of {ok, O} -> envelope:get_field(type, O) =:= {ok, article}; _ -> false end end, trigger_registry:add(create, trigger_registry:mk_spec(<<116,99>>, blog_digest, Guard, any)), Cfg = [{trigger_registry, trigger_registry}], AS = [{actor_id, alice}],'
URGENT='[{type, create}, {actor, alice}, {id, <<117,49>>}, {object, [{type, article}, {category, urgent}]}]'
NEWS='[{type, create}, {actor, alice}, {id, <<110,49>>}, {object, [{type, article}, {category, newsletter}]}]'
DRAFT='[{type, create}, {actor, alice}, {id, <<100,49>>}, {object, [{type, article}, {category, draft}]}]'
NOTE='[{type, create}, {actor, alice}, {id, <<120,49>>}, {object, [{type, note}]}]'
cat > "$TMPFILE" <<EPOCHS
(epoch 1)
(load "lib/erlang/tokenizer.sx")
(load "lib/erlang/parser.sx")
(load "lib/erlang/parser-core.sx")
(load "lib/erlang/parser-expr.sx")
(load "lib/erlang/parser-module.sx")
(load "lib/erlang/transpile.sx")
(load "lib/erlang/runtime.sx")
(load "lib/erlang/vm/dispatcher.sx")
(epoch 2)
(eval "(er-load-gen-server!)")
(eval "(get (erlang-load-module (file-read \"next/kernel/envelope.erl\")) :name)")
(eval "(get (erlang-load-module (file-read \"next/flow/flow.erl\")) :name)")
(eval "(get (erlang-load-module (file-read \"next/flow/flow_spec.erl\")) :name)")
(eval "(get (erlang-load-module (file-read \"next/flow/flow_store.erl\")) :name)")
(eval "(get (erlang-load-module (file-read \"next/kernel/trigger_registry.erl\")) :name)")
(eval "(get (erlang-load-module (file-read \"next/kernel/flow_dispatch.erl\")) :name)")
(eval "(get (erlang-load-module (file-read \"next/kernel/pipeline.erl\")) :name)")
(epoch 3)
(eval "(get (erlang-load-module (file-read \"next/flow/flows/blog_publish_digest.erl\")) :name)")
;; ── urgent: fans out, completes in one cycle, 3 emails ─────
(epoch 10)
(eval "(get (erlang-eval-ast \"${BOOT} pipeline:apply_triggers(${URGENT}, AS, Cfg) =:= {ok, [{<<117,49>>, <<116,99>>, {ok, 1}}]}\") :name)")
(epoch 11)
(eval "(get (erlang-eval-ast \"${BOOT} pipeline:apply_triggers(${URGENT}, AS, Cfg), {ok, {done, {digest_sent, Emails, _}}} = flow_store:status(1), length(Emails) =:= 3\") :name)")
;; DigestSent emit object is well-formed (type, for the article, count)
(epoch 12)
(eval "(get (erlang-eval-ast \"${BOOT} pipeline:apply_triggers(${URGENT}, AS, Cfg), {ok, {done, {digest_sent, _, Digest}}} = flow_store:status(1), Digest =:= [{type, digest_sent}, {for, <<117,49>>}, {follower_count, 3}]\") :name)")
;; ── newsletter: suspends on the morning timer, then resumes ─
(epoch 20)
(eval "(get (erlang-eval-ast \"${BOOT} pipeline:apply_triggers(${NEWS}, AS, Cfg), flow_store:status(1) =:= {ok, {suspended, morning}}\") :name)")
;; advancing the clock (resume the timer) drives it to completion
(epoch 21)
(eval "(get (erlang-eval-ast \"${BOOT} pipeline:apply_triggers(${NEWS}, AS, Cfg), {ok, {flow_done, {digest_sent, Emails, _}}} = flow_store:resume(1, morning_ts), length(Emails) =:= 3\") :name)")
;; before resume no digest exists (still suspended, not done)
(epoch 22)
(eval "(get (erlang-eval-ast \"${BOOT} pipeline:apply_triggers(${NEWS}, AS, Cfg), case flow_store:status(1) of {ok, {done, _}} -> false; {ok, {suspended, morning}} -> true; _ -> false end\") :name)")
;; ── draft: the :else branch, no emails, no DigestSent ──────
(epoch 30)
(eval "(get (erlang-eval-ast \"${BOOT} pipeline:apply_triggers(${DRAFT}, AS, Cfg), flow_store:status(1) =:= {ok, {done, skipped}}\") :name)")
;; ── non-Article note: guard rejects, no flow dispatched ────
(epoch 40)
(eval "(get (erlang-eval-ast \"${BOOT} pipeline:apply_triggers(${NOTE}, AS, Cfg) =:= {ok, []}\") :name)")
;; ── dedup: the same activity arriving twice fires once ─────
(epoch 50)
(eval "(get (erlang-eval-ast \"${BOOT} pipeline:apply_triggers(${URGENT}, [{actor_id, alice}, {triggers_fired, [{<<117,49>>, <<116,99>>}]}], Cfg) =:= {ok, []}\") :name)")
EPOCHS
OUTPUT=$(timeout 360 "$SX_SERVER" < "$TMPFILE" 2>/dev/null)
check() {
local epoch="$1" desc="$2" expected="$3"
local actual
actual=$(echo "$OUTPUT" | awk -v e="$epoch" '
$0 ~ "^\\(ok-len " e " " { getline; print; exit }
$0 ~ "^\\(ok " e " " { print; exit }
$0 ~ "^\\(error " e " " { print; exit }
')
[ -z "$actual" ] && actual="<no output for epoch $epoch>"
if echo "$actual" | grep -qF -- "$expected"; then
PASS=$((PASS+1))
[ "$VERBOSE" = "-v" ] && echo " ok $desc"
else
FAIL=$((FAIL+1))
ERRORS+=" FAIL [$desc] (epoch $epoch) expected: $expected | actual: $actual
"
fi
}
check 3 "blog_publish_digest loaded" "blog_publish_digest"
check 10 "urgent fans out (audit triple)" "true"
check 11 "urgent: 3 emails dispatched" "true"
check 12 "urgent: DigestSent object emitted" "true"
check 20 "newsletter suspends on timer" "true"
check 21 "newsletter resumes -> 3 emails" "true"
check 22 "no digest before resume" "true"
check 30 "draft -> else branch, skipped" "true"
check 40 "non-Article note -> guard rejects" "true"
check 50 "duplicate activity fires once" "true"
TOTAL=$((PASS+FAIL))
if [ $FAIL -eq 0 ]; then
echo "ok $PASS/$TOTAL next/tests/triggers_e2e.sh passed"
else
echo "FAIL $PASS/$TOTAL passed, $FAIL failed:"
echo "$ERRORS"
fi
[ $FAIL -eq 0 ]

View File

@@ -1296,6 +1296,32 @@ inbox + pull from outbox. SSE is convenience, not protocol.
unknown verbs are stored-but-not-projected — safe by default, with explicit unknown verbs are stored-but-not-projected — safe by default, with explicit
operator control over what extensions load. operator control over what extensions load.
### 13.10 Activity-driven flow triggers (kernel convention)
Beyond projections (which fold an activity into read-model state), the kernel
supports firing **durable business flows** off arriving activities — the
"something happened → here is what we DO about it" half of the model. The
convention (substrate landed in `loops/fed-sx-types`, Phases 58):
- A `DefineTrigger{activity-type, flow-name, guard?, actor-scope?}` activity binds
an activity-type to a named flow. `trigger_registry` hydrates from a fold over
these (restart-safe, same content-addressing as `define-registry`).
- Fan-out runs **after** the kernel append, as the last pipeline step (§14):
`envelope → signature → activity-type schema → object schema → append → trigger
fan-out`. Only accepted activities fire flows; rejected ones never trigger.
- Fan-out is deduped per `{activity-cid, trigger-cid}` (federation can deliver the
same activity twice via different peers) using the actor's `:triggers_fired`
field, and is failure-isolated: one flow's failure never blocks the append or
the other flows.
- Flows run on **flow-on-erlang** (`next/flow/`), a native Erlang-on-SX durable
workflow engine (deterministic-replay suspend/resume; combinator algebra
mirrored from the Scheme `lib/flow`). It runs in the kernel's own runtime, so
the fan-out is a direct call — no cross-guest bridge. Because a flow runs inside
the engine's drive (where a blocking kernel call would deadlock the cooperative
scheduler), flows are **pure and describe effects as data** (their output, or a
`suspend`); a driver outside the flow performs IO and appends any follow-up
activity — which can in turn trigger further flows.
## 14. Validation pipeline ## 14. Validation pipeline
Every activity entering the substrate (whether published locally or received from a Every activity entering the substrate (whether published locally or received from a