diff --git a/next/flow/conformance.sh b/next/flow/conformance.sh index 37a6ecb5..8047e140 100755 --- a/next/flow/conformance.sh +++ b/next/flow/conformance.sh @@ -103,6 +103,11 @@ cat > "$TMPFILE" < X + 1 end)]), 5) =:= {flow_suspended, t}\") :name)") +(epoch 64) +(eval "(get (erlang-eval-ast \"flow:drive(flow_spec:sequence([flow:wait(t), flow_spec:flow_node(fun(X) -> X + 1 end)]), 5, [{t, ignored}]) =:= {flow_done, 6}\") :name)") ;; ── durable store: registry ──────────────────────────────── (epoch 70) @@ -178,6 +183,8 @@ check 52 "retry runs node" "true" check 60 "suspend miss short-circuits" "true" check 61 "suspend replay completes" "true" check 62 "first of two suspends wins" "true" +check 63 "wait short-circuits on miss" "true" +check 64 "wait preserves value on resume" "true" check 70 "register + resolve + list" "true" check 71 "resolve unknown -> not_found" "true" check 80 "start one-shot -> done" "true" diff --git a/next/flow/flow.erl b/next/flow/flow.erl index db83d814..19f446ad 100644 --- a/next/flow/flow.erl +++ b/next/flow/flow.erl @@ -1,7 +1,7 @@ -module(flow). -export([drive/3, run/2, cont/2, susp/2, is_susp/1, ctx_value/1, ctx_log/1, - suspend/1, log_lookup/2]). + suspend/1, wait/1, log_lookup/2]). %% flow-on-erlang — the deterministic-replay core. A native Erlang port %% of the Scheme flow engine (lib/flow), so the fed-sx kernel can fan @@ -62,6 +62,24 @@ suspend(Tag) -> end end. +%% wait(Tag) — a timer-style suspend that PRESERVES the current value +%% instead of replacing it with the resolved one. Use it for pure +%% waits ("resume in the morning") where the resume is just a signal, +%% not a result: on the first pass it short-circuits like suspend; once +%% Tag is in the log the value flows through unchanged, so downstream +%% steps still see the value (e.g. the env) they had before the wait. +wait(Tag) -> + fun (Ctx) -> + case Ctx of + {flow_susp, _, _} -> Ctx; + {flow_cont, Value, Log} -> + case log_lookup(Tag, Log) of + {ok, _} -> {flow_cont, Value, Log}; + miss -> {flow_susp, Tag, Log} + end + end + end. + log_lookup(_, []) -> miss; log_lookup(Tag, [{Tag, Value} | _]) -> {ok, Value}; log_lookup(Tag, [_ | Rest]) -> log_lookup(Tag, Rest). diff --git a/next/flow/flows/blog_publish_digest.erl b/next/flow/flows/blog_publish_digest.erl new file mode 100644 index 00000000..48012728 --- /dev/null +++ b/next/flow/flows/blog_publish_digest.erl @@ -0,0 +1,81 @@ +-module(blog_publish_digest). +-export([build/1]). + +%% A motivating multi-step business flow for the fed-sx-triggers e2e: +%% when an Article is published, decide a batch policy by category, +%% (for newsletters) wait until morning, fetch the author's followers, +%% build a digest email for each, and emit a DigestSent activity — the +%% flow's own output, which a driver appends, closing the loop so it can +%% trigger downstream flows. +%% +%% Demonstrates: a branch on an activity field (:category), a timer +%% suspension (flow:wait/1, resumed by advancing the clock), an injected +%% effect (fetch_followers), and a follow-up activity emit. +%% +%% Effect-as-data: a flow runs inside flow_store's drive, where a +%% blocking call (e.g. into nx_kernel) would deadlock this scheduler, so +%% the flow does NOT perform IO itself. It DESCRIBES the effects in its +%% result — {digest_sent, Emails, DigestActivityObject} — and the driver +%% (the fan-out caller) dispatches the emails and appends the DigestSent +%% activity. fetch_followers is injected (the one external read) as a +%% pure function so the e2e can supply a deterministic list. +%% +%% Input env (from flow_dispatch): [{activity, A}, {actor, Actor}, ...]. +%% Result: {digest_sent, [Email], DigestObject} | skipped. + +build(Effects) -> + FetchFollowers = field(fetch_followers, Effects), + flow_spec:branch( + fun (Env) -> is_article(Env) end, + flow_spec:branch( + fun (Env) -> category_is(Env, newsletter) end, + %% newsletter: hold until morning, then send + emit + flow_spec:sequence([flow:wait(morning), send_emit(FetchFollowers)]), + flow_spec:branch( + fun (Env) -> category_is(Env, urgent) end, + %% urgent: send + emit now (no wait) + send_emit(FetchFollowers), + %% any other category: skip + flow_spec:flow_const(skipped))), + %% not an Article: skip + flow_spec:flow_const(skipped)). + +%% send_emit(FetchFollowers) — the terminal step: build one digest email +%% per follower and the DigestSent emit object. Pure given the injected +%% follower list, so it is replay-safe (and it sits after the only +%% suspend point, so it runs exactly once). +send_emit(FetchFollowers) -> + flow_spec:flow_node( + fun (Env) -> + Activity = env_activity(Env), + Actor = env_actor(Env), + ArtId = activity_id(Activity), + Followers = FetchFollowers(Actor), + Emails = [ [{to, F}, {article, ArtId}] || F <- Followers ], + Digest = [{type, digest_sent}, + {for, ArtId}, + {follower_count, length(Followers)}], + {digest_sent, Emails, Digest} + end). + +%% ── predicates / accessors ────────────────────────────────────── + +is_article(Env) -> + object_type(object_of(env_activity(Env))) =:= article. + +category_is(Env, Cat) -> + object_category(object_of(env_activity(Env))) =:= Cat. + +env_activity(Env) -> field(activity, Env). +env_actor(Env) -> field(actor, Env). + +object_of(Activity) -> field(object, Activity). +object_type(Obj) -> field(type, Obj). +object_category(Obj) -> field(category, Obj). +activity_id(Activity) -> field(id, Activity). + +field(Key, Proplist) -> + case envelope:get_field(Key, Proplist) of + {ok, V} -> V; + _ -> undefined + end. diff --git a/next/tests/triggers_e2e.sh b/next/tests/triggers_e2e.sh new file mode 100755 index 00000000..bba0ff38 --- /dev/null +++ b/next/tests/triggers_e2e.sh @@ -0,0 +1,134 @@ +#!/usr/bin/env bash +# next/tests/triggers_e2e.sh — fed-sx triggers Phase 4 (end-to-end). +# +# The motivating blog-publish-digest flow, driven the whole way: a +# trigger binds Article-creates to the flow; the post-append fan-out +# starts it; the flow branches on :category, (for newsletters) suspends +# on a morning timer, fetches followers (injected), and emits a +# DigestSent activity object. Effect-as-data: the flow returns the +# emails + DigestSent object (a driver would dispatch/append them) since +# a flow can't call kernel gen_servers from inside the drive. +# +# Each epoch starts fresh gen_servers so instance ids are deterministic. + +set -uo pipefail +cd "$(git rev-parse --show-toplevel)" + +SX_SERVER="${SX_SERVER:-hosts/ocaml/_build/default/bin/sx_server.exe}" +if [ ! -x "$SX_SERVER" ]; then + SX_SERVER="/root/rose-ash/hosts/ocaml/_build/default/bin/sx_server.exe" +fi +if [ ! -x "$SX_SERVER" ]; then + echo "ERROR: sx_server.exe not found." >&2 + exit 1 +fi + +VERBOSE="${1:-}" +PASS=0; FAIL=0; ERRORS="" +TMPFILE=$(mktemp); trap "rm -f $TMPFILE" EXIT + +# Bring-up shared by every case: registry + store, a 3-follower mock, +# the flow registered as blog_digest, and a trigger binding `create` +# to it guarded on "the object is an Article". Cfg/AS as the fan-out +# expects. Activities differ by :category (urgent / newsletter / draft) +# plus a non-Article note. +BOOT='trigger_registry:start_link(), flow_store:start_link(), FF = fun(_) -> [f1, f2, f3] end, Flow = blog_publish_digest:build([{fetch_followers, FF}]), flow_store:register_flow(blog_digest, Flow), Guard = fun(A, _) -> case envelope:get_field(object, A) of {ok, O} -> envelope:get_field(type, O) =:= {ok, article}; _ -> false end end, trigger_registry:add(create, trigger_registry:mk_spec(<<116,99>>, blog_digest, Guard, any)), Cfg = [{trigger_registry, trigger_registry}], AS = [{actor_id, alice}],' +URGENT='[{type, create}, {actor, alice}, {id, <<117,49>>}, {object, [{type, article}, {category, urgent}]}]' +NEWS='[{type, create}, {actor, alice}, {id, <<110,49>>}, {object, [{type, article}, {category, newsletter}]}]' +DRAFT='[{type, create}, {actor, alice}, {id, <<100,49>>}, {object, [{type, article}, {category, draft}]}]' +NOTE='[{type, create}, {actor, alice}, {id, <<120,49>>}, {object, [{type, note}]}]' + +cat > "$TMPFILE" <>, <<116,99>>, {ok, 1}}]}\") :name)") +(epoch 11) +(eval "(get (erlang-eval-ast \"${BOOT} pipeline:apply_triggers(${URGENT}, AS, Cfg), {ok, {done, {digest_sent, Emails, _}}} = flow_store:status(1), length(Emails) =:= 3\") :name)") +;; DigestSent emit object is well-formed (type, for the article, count) +(epoch 12) +(eval "(get (erlang-eval-ast \"${BOOT} pipeline:apply_triggers(${URGENT}, AS, Cfg), {ok, {done, {digest_sent, _, Digest}}} = flow_store:status(1), Digest =:= [{type, digest_sent}, {for, <<117,49>>}, {follower_count, 3}]\") :name)") + +;; ── newsletter: suspends on the morning timer, then resumes ─ +(epoch 20) +(eval "(get (erlang-eval-ast \"${BOOT} pipeline:apply_triggers(${NEWS}, AS, Cfg), flow_store:status(1) =:= {ok, {suspended, morning}}\") :name)") +;; advancing the clock (resume the timer) drives it to completion +(epoch 21) +(eval "(get (erlang-eval-ast \"${BOOT} pipeline:apply_triggers(${NEWS}, AS, Cfg), {ok, {flow_done, {digest_sent, Emails, _}}} = flow_store:resume(1, morning_ts), length(Emails) =:= 3\") :name)") +;; before resume no digest exists (still suspended, not done) +(epoch 22) +(eval "(get (erlang-eval-ast \"${BOOT} pipeline:apply_triggers(${NEWS}, AS, Cfg), case flow_store:status(1) of {ok, {done, _}} -> false; {ok, {suspended, morning}} -> true; _ -> false end\") :name)") + +;; ── draft: the :else branch, no emails, no DigestSent ────── +(epoch 30) +(eval "(get (erlang-eval-ast \"${BOOT} pipeline:apply_triggers(${DRAFT}, AS, Cfg), flow_store:status(1) =:= {ok, {done, skipped}}\") :name)") + +;; ── non-Article note: guard rejects, no flow dispatched ──── +(epoch 40) +(eval "(get (erlang-eval-ast \"${BOOT} pipeline:apply_triggers(${NOTE}, AS, Cfg) =:= {ok, []}\") :name)") + +;; ── dedup: the same activity arriving twice fires once ───── +(epoch 50) +(eval "(get (erlang-eval-ast \"${BOOT} pipeline:apply_triggers(${URGENT}, [{actor_id, alice}, {triggers_fired, [{<<117,49>>, <<116,99>>}]}], Cfg) =:= {ok, []}\") :name)") +EPOCHS + +OUTPUT=$(timeout 360 "$SX_SERVER" < "$TMPFILE" 2>/dev/null) + +check() { + local epoch="$1" desc="$2" expected="$3" + local actual + actual=$(echo "$OUTPUT" | awk -v e="$epoch" ' + $0 ~ "^\\(ok-len " e " " { getline; print; exit } + $0 ~ "^\\(ok " e " " { print; exit } + $0 ~ "^\\(error " e " " { print; exit } + ') + [ -z "$actual" ] && actual="" + if echo "$actual" | grep -qF -- "$expected"; then + PASS=$((PASS+1)) + [ "$VERBOSE" = "-v" ] && echo " ok $desc" + else + FAIL=$((FAIL+1)) + ERRORS+=" FAIL [$desc] (epoch $epoch) expected: $expected | actual: $actual +" + fi +} + +check 3 "blog_publish_digest loaded" "blog_publish_digest" +check 10 "urgent fans out (audit triple)" "true" +check 11 "urgent: 3 emails dispatched" "true" +check 12 "urgent: DigestSent object emitted" "true" +check 20 "newsletter suspends on timer" "true" +check 21 "newsletter resumes -> 3 emails" "true" +check 22 "no digest before resume" "true" +check 30 "draft -> else branch, skipped" "true" +check 40 "non-Article note -> guard rejects" "true" +check 50 "duplicate activity fires once" "true" + +TOTAL=$((PASS+FAIL)) +if [ $FAIL -eq 0 ]; then + echo "ok $PASS/$TOTAL next/tests/triggers_e2e.sh passed" +else + echo "FAIL $PASS/$TOTAL passed, $FAIL failed:" + echo "$ERRORS" +fi +[ $FAIL -eq 0 ] diff --git a/plans/fed-sx-design.md b/plans/fed-sx-design.md index 62e811d1..c3a1b4c9 100644 --- a/plans/fed-sx-design.md +++ b/plans/fed-sx-design.md @@ -1296,6 +1296,32 @@ inbox + pull from outbox. SSE is convenience, not protocol. unknown verbs are stored-but-not-projected — safe by default, with explicit operator control over what extensions load. +### 13.10 Activity-driven flow triggers (kernel convention) + +Beyond projections (which fold an activity into read-model state), the kernel +supports firing **durable business flows** off arriving activities — the +"something happened → here is what we DO about it" half of the model. The +convention (substrate landed in `loops/fed-sx-types`, Phases 5–8): + +- A `DefineTrigger{activity-type, flow-name, guard?, actor-scope?}` activity binds + an activity-type to a named flow. `trigger_registry` hydrates from a fold over + these (restart-safe, same content-addressing as `define-registry`). +- Fan-out runs **after** the kernel append, as the last pipeline step (§14): + `envelope → signature → activity-type schema → object schema → append → trigger + fan-out`. Only accepted activities fire flows; rejected ones never trigger. +- Fan-out is deduped per `{activity-cid, trigger-cid}` (federation can deliver the + same activity twice via different peers) using the actor's `:triggers_fired` + field, and is failure-isolated: one flow's failure never blocks the append or + the other flows. +- Flows run on **flow-on-erlang** (`next/flow/`), a native Erlang-on-SX durable + workflow engine (deterministic-replay suspend/resume; combinator algebra + mirrored from the Scheme `lib/flow`). It runs in the kernel's own runtime, so + the fan-out is a direct call — no cross-guest bridge. Because a flow runs inside + the engine's drive (where a blocking kernel call would deadlock the cooperative + scheduler), flows are **pure and describe effects as data** (their output, or a + `suspend`); a driver outside the flow performs IO and appends any follow-up + activity — which can in turn trigger further flows. + ## 14. Validation pipeline Every activity entering the substrate (whether published locally or received from a