fed-sx-types Phase 5: flow-on-erlang engine core (next/flow/)
Some checks failed
Test, Build, and Deploy / test-build-deploy (push) Failing after 49s

A native Erlang-on-SX durable workflow engine, so the fed-sx kernel can
fan activities out into business flows in its own runtime — no cross-
guest FFI to the Scheme lib/flow, no marshalling, no Scheme dependency.
The seed of a real engine (chosen over bridging Scheme flow) that can
later supersede it for substrate use.

- flow.erl — the deterministic-replay driver. Same durability model as
  the Scheme engine (re-run from the top; effects go through suspend;
  the replay log is plain [{Tag,Value}] data, restart-ready), but
  adapted to three hard runtime constraints: no re-enterable
  continuation, no process dictionary, and a blocking receive inside a
  `try` deadlocks the cooperative scheduler. Resolution: thread the log
  through a railway-style context and make suspend SHORT-CIRCUIT (like a
  fail value) instead of throwing — purely functional, sidesteps all
  three. Ctx = {flow_cont,V,Log} | {flow_susp,Tag,Log}.
- flow_spec.erl — combinator algebra mirrored from lib/flow/spec.sx:
  leaves, sequence/parallel/map_flow, flow_while/flow_until, branch,
  railway fail/recover/attempt, tap, try_catch/retry.
- flow_store.erl — durable gen_server: named-flow registry + instance
  table + start/resume/status. Drives the pure flow from handle_call,
  so no gen_server:call is ever inside the replay try-path.

Gate: next/flow/conformance.sh — 34/34. lib/erlang untouched (771/771).
See next/flow/README.md for the model + why railway threading.

Co-Authored-By: Claude Opus 4.8 (1M context) <noreply@anthropic.com>
This commit is contained in:
2026-06-30 17:51:15 +00:00
parent bba2d7e5cd
commit 8b3d92ed5f
5 changed files with 755 additions and 0 deletions

91
next/flow/README.md Normal file
View File

@@ -0,0 +1,91 @@
# flow-on-erlang — durable workflows in the fed-sx runtime
A native Erlang-on-SX port of the Scheme flow engine (`lib/flow`), so
the fed-sx kernel can fan arriving activities out into durable,
branching, multi-step business flows **in its own runtime** — no
cross-guest FFI, no marshalling, no Scheme dependency. The seed of a
real engine that can later supersede the Scheme one for substrate use.
Run the suite: `bash next/flow/conformance.sh` → engine conformance.
## Model
A **flow** is an Erlang `fun(Ctx) -> Ctx`. Combinators (`flow_spec`)
compose flows; user code stays value-level (the functions you hand to
`flow_node`/`branch`/… take and return plain values). A flow that
ignores its input is a thunk; composition *is* function composition.
```erlang
F = flow_spec:sequence([
flow_spec:flow_node(fun(Draft) -> Draft + 1 end),
flow_spec:branch(fun(P) -> P >= 3 end,
flow_spec:flow_const(ok),
flow_spec:flow_const(rejected))]),
flow:run(F, 2) %% => {flow_done, ok}
```
## Durability — deterministic replay
Same semantics as the Scheme engine: a flow re-runs from the top on
every resume; effects/non-determinism go through `flow:suspend/1`,
whose resolved values are logged; an already-resolved suspend replays
its logged value, and the first unresolved suspend short-circuits back
to the driver. The persisted state is the **replay log** — plain
`[{Tag, Value}]` data — so nothing live (no continuation, no process)
is ever serialized; an instance survives restart by re-driving its
named flow against its log.
```erlang
flow_store:register_flow(publish, F),
{ok, Id, R} = flow_store:start(publish, Draft), %% R = {flow_suspended, Tag} | {flow_done, V}
%% ... driver performs the effect for Tag, then:
flow_store:resume(Id, EffectResult) %% re-drives; completes or suspends again
```
## Why railway threading instead of call/cc + a global
The Scheme engine uses an escape-only `call/cc` plus a mutable global
replay log. This Erlang-on-SX runtime can't do either, and has a third
sharp edge:
- **No re-enterable continuation** — but suspend only needs to *escape*,
which Erlang `throw` could do …
- **… except a blocking `receive` / `gen_server:call` inside a `try`
deadlocks** the cooperative scheduler. So `suspend` must not consult
the log via a registry process while inside a `try`.
- **No process dictionary** — so there is no ambient per-process slot to
stash the replay log in.
The resolution: thread the replay log through a railway-style **context**
and make `suspend` *short-circuit* (like a `fail` value) rather than
throw. No ambient state, no throw, no gen_server in the hot path —
purely functional, which sidesteps all three constraints. The driver
(`flow_store`) is the only stateful part, and it calls the pure
`flow:drive/3` from inside `handle_call`, never wrapping a blocking
receive.
A `Ctx` is `{flow_cont, Value, Log}` (running) or `{flow_susp, Tag,
Log}` (short-circuited); every combinator passes a suspended context
straight through.
## Modules
| Module | Role |
|---|---|
| `flow.erl` | pure replay driver: `drive/3`, `run/2`, `suspend/1`, the `Ctx` constructors/accessors |
| `flow_spec.erl` | combinator algebra: leaves, `sequence`/`parallel`/`map_flow`, `flow_while`/`flow_until`, `branch`, railway `fail`/`recover`/`attempt`, `tap`, `try_catch`/`retry` |
| `flow_store.erl` | durable gen_server: named-flow registry + instance table + `start`/`resume`/`status` |
## Consumed by
The fed-sx kernel's trigger fan-out (`pipeline.erl` + `flow_dispatch`)
starts named flows from arriving activities; see
`plans/fed-sx-host-types.md` and the triggers phases.
## Not yet (later layers)
- Persisting instance logs to the kernel's durable on-disk log (the
data shape is already restart-ready; only the backing is in-memory).
- `parallel` with multiple independent suspends resolving concurrently
(current `parallel` is sequential under one shared log).
- Full parity with the Scheme engine's distributed/remote nodes.

199
next/flow/conformance.sh Executable file
View File

@@ -0,0 +1,199 @@
#!/usr/bin/env bash
# next/flow/conformance.sh — flow-on-erlang engine conformance.
#
# Exercises the native Erlang-on-SX durable workflow engine
# (next/flow/{flow,flow_spec,flow_store}.erl): the combinator algebra,
# the deterministic-replay suspend/resume core, and the durable store.
# This is the gate for the engine, replacing lib/flow/conformance.sh
# (the Scheme engine) for the fed-sx substrate — the kernel's trigger
# fan-out drives flows in its own runtime, with no cross-guest FFI.
set -uo pipefail
cd "$(git rev-parse --show-toplevel)"
SX_SERVER="${SX_SERVER:-hosts/ocaml/_build/default/bin/sx_server.exe}"
if [ ! -x "$SX_SERVER" ]; then
SX_SERVER="/root/rose-ash/hosts/ocaml/_build/default/bin/sx_server.exe"
fi
if [ ! -x "$SX_SERVER" ]; then
echo "ERROR: sx_server.exe not found." >&2
exit 1
fi
VERBOSE="${1:-}"
PASS=0; FAIL=0; ERRORS=""
TMPFILE=$(mktemp); trap "rm -f $TMPFILE" EXIT
# Common combinator shorthands built per-epoch (Erlang locals don't
# survive across erlang-eval-ast calls; the gen_server state does).
N1='flow_spec:flow_node(fun(X) -> X + 1 end)'
N2='flow_spec:flow_node(fun(X) -> X * 2 end)'
SUSP_FLOW='flow_spec:sequence([flow_spec:flow_node(fun(X) -> X + 1 end), flow:suspend(wait1), flow_spec:flow_node(fun(V) -> {resumed, V} end)])'
TWO_SUSP='flow_spec:sequence([flow:suspend(a), flow_spec:flow_node(fun(V) -> V * 10 end), flow:suspend(b), flow_spec:flow_node(fun(V) -> V + 1 end)])'
cat > "$TMPFILE" <<EPOCHS
(epoch 1)
(load "lib/erlang/tokenizer.sx")
(load "lib/erlang/parser.sx")
(load "lib/erlang/parser-core.sx")
(load "lib/erlang/parser-expr.sx")
(load "lib/erlang/parser-module.sx")
(load "lib/erlang/transpile.sx")
(load "lib/erlang/runtime.sx")
(load "lib/erlang/vm/dispatcher.sx")
(epoch 2)
(eval "(er-load-gen-server!)")
(epoch 3)
(eval "(get (erlang-load-module (file-read \"next/flow/flow.erl\")) :name)")
(epoch 4)
(eval "(get (erlang-load-module (file-read \"next/flow/flow_spec.erl\")) :name)")
(epoch 5)
(eval "(get (erlang-load-module (file-read \"next/flow/flow_store.erl\")) :name)")
;; ── leaves ─────────────────────────────────────────────────
(epoch 10)
(eval "(get (erlang-eval-ast \"flow:run(flow_spec:flow_id(), 7) =:= {flow_done, 7}\") :name)")
(epoch 11)
(eval "(get (erlang-eval-ast \"flow:run(flow_spec:flow_const(k), 7) =:= {flow_done, k}\") :name)")
(epoch 12)
(eval "(get (erlang-eval-ast \"flow:run(${N1}, 41) =:= {flow_done, 42}\") :name)")
;; ── threading / fan-out / iteration ────────────────────────
(epoch 20)
(eval "(get (erlang-eval-ast \"flow:run(flow_spec:sequence([${N1}, ${N2}]), 3) =:= {flow_done, 8}\") :name)")
(epoch 21)
(eval "(get (erlang-eval-ast \"flow:run(flow_spec:parallel([flow_spec:flow_const(a), flow_spec:flow_const(b)]), 0) =:= {flow_done, [a, b]}\") :name)")
(epoch 22)
(eval "(get (erlang-eval-ast \"flow:run(flow_spec:map_flow(${N1}), [1, 2, 3]) =:= {flow_done, [2, 3, 4]}\") :name)")
(epoch 23)
(eval "(get (erlang-eval-ast \"flow:run(flow_spec:flow_while(fun(X) -> X < 10 end, ${N1}, 100), 0) =:= {flow_done, 10}\") :name)")
(epoch 24)
(eval "(get (erlang-eval-ast \"flow:run(flow_spec:flow_until(fun(X) -> X >= 5 end, ${N1}, 100), 0) =:= {flow_done, 5}\") :name)")
(epoch 25)
(eval "(get (erlang-eval-ast \"flow:run(flow_spec:flow_while(fun(_) -> true end, ${N1}, 3), 0) =:= {flow_done, 3}\") :name)")
;; ── branching ──────────────────────────────────────────────
(epoch 30)
(eval "(get (erlang-eval-ast \"flow:run(flow_spec:branch(fun(X) -> X > 0 end, flow_spec:flow_const(pos), flow_spec:flow_const(neg)), 5) =:= {flow_done, pos}\") :name)")
(epoch 31)
(eval "(get (erlang-eval-ast \"flow:run(flow_spec:branch(fun(X) -> X > 0 end, flow_spec:flow_const(pos), flow_spec:flow_const(neg)), -5) =:= {flow_done, neg}\") :name)")
;; ── railway failure ────────────────────────────────────────
(epoch 40)
(eval "(get (erlang-eval-ast \"flow_spec:failed(flow_spec:fail(x)) andalso (flow_spec:failed(42) =:= false)\") :name)")
(epoch 41)
(eval "(get (erlang-eval-ast \"flow:run(flow_spec:attempt([flow_spec:flow_node(fun(_) -> flow_spec:fail(boom) end), flow_spec:flow_node(fun(_) -> 999 end)]), 0) =:= {flow_done, {flow_fail, boom}}\") :name)")
(epoch 42)
(eval "(get (erlang-eval-ast \"flow:run(flow_spec:attempt([${N1}, ${N2}]), 3) =:= {flow_done, 8}\") :name)")
(epoch 43)
(eval "(get (erlang-eval-ast \"flow:run(flow_spec:recover(flow_spec:flow_node(fun(_) -> flow_spec:fail(bad) end), fun(R) -> {ok, R} end), 0) =:= {flow_done, {ok, bad}}\") :name)")
;; ── effects / exceptions ───────────────────────────────────
(epoch 50)
(eval "(get (erlang-eval-ast \"flow:run(flow_spec:tap(fun(_) -> ok end), 7) =:= {flow_done, 7}\") :name)")
(epoch 51)
(eval "(get (erlang-eval-ast \"flow:run(flow_spec:try_catch(flow_spec:flow_node(fun(_) -> throw(oops) end), fun(E) -> {caught, E} end), 0) =:= {flow_done, {caught, oops}}\") :name)")
(epoch 52)
(eval "(get (erlang-eval-ast \"flow:run(flow_spec:retry(5, flow_spec:flow_node(fun(X) -> X + 1 end)), 1) =:= {flow_done, 2}\") :name)")
;; ── suspend / replay (deterministic-replay core) ───────────
(epoch 60)
(eval "(get (erlang-eval-ast \"flow:run(${SUSP_FLOW}, 0) =:= {flow_suspended, wait1}\") :name)")
(epoch 61)
(eval "(get (erlang-eval-ast \"flow:drive(${SUSP_FLOW}, 0, [{wait1, 99}]) =:= {flow_done, {resumed, 99}}\") :name)")
(epoch 62)
(eval "(get (erlang-eval-ast \"flow:run(flow_spec:sequence([flow:suspend(a), flow:suspend(b)]), 0) =:= {flow_suspended, a}\") :name)")
;; ── durable store: registry ────────────────────────────────
(epoch 70)
(eval "(get (erlang-eval-ast \"flow_store:start_link(), flow_store:register_flow(f1, ${N1}), flow_store:resolve_flow(f1) =/= not_found andalso flow_store:registered_flows() =:= [f1]\") :name)")
(epoch 71)
(eval "(get (erlang-eval-ast \"flow_store:start_link(), flow_store:resolve_flow(ghost) =:= not_found\") :name)")
;; ── durable store: start / resume ──────────────────────────
;; one-shot flow runs to completion on start
(epoch 80)
(eval "(get (erlang-eval-ast \"flow_store:start_link(), flow_store:register_flow(done1, ${N1}), flow_store:start(done1, 41) =:= {ok, 1, {flow_done, 42}}\") :name)")
;; suspending flow: start suspends, resume completes
(epoch 81)
(eval "(get (erlang-eval-ast \"flow_store:start_link(), flow_store:register_flow(s1, ${SUSP_FLOW}), {ok, Id, R} = flow_store:start(s1, 10), R =:= {flow_suspended, wait1} andalso flow_store:status(Id) =:= {ok, {suspended, wait1}}\") :name)")
(epoch 82)
(eval "(get (erlang-eval-ast \"flow_store:start_link(), flow_store:register_flow(s1, ${SUSP_FLOW}), {ok, Id, _} = flow_store:start(s1, 10), flow_store:resume(Id, 99) =:= {ok, {flow_done, {resumed, 99}}}\") :name)")
(epoch 83)
(eval "(get (erlang-eval-ast \"flow_store:start_link(), flow_store:register_flow(s1, ${SUSP_FLOW}), {ok, Id, _} = flow_store:start(s1, 10), flow_store:resume(Id, 99), flow_store:status(Id) =:= {ok, {done, {resumed, 99}}}\") :name)")
;; two-suspend flow: resume chain accumulates the replay log
(epoch 84)
(eval "(get (erlang-eval-ast \"flow_store:start_link(), flow_store:register_flow(s2, ${TWO_SUSP}), {ok, Id, _} = flow_store:start(s2, 0), {ok, R1} = flow_store:resume(Id, 5), R2 = flow_store:resume(Id, 7), R1 =:= {flow_suspended, b} andalso R2 =:= {ok, {flow_done, 8}}\") :name)")
;; error paths
(epoch 85)
(eval "(get (erlang-eval-ast \"flow_store:start_link(), flow_store:start(ghost, 0) =:= {error, no_such_flow}\") :name)")
(epoch 86)
(eval "(get (erlang-eval-ast \"flow_store:start_link(), flow_store:resume(999, x) =:= {error, no_such_instance}\") :name)")
(epoch 87)
(eval "(get (erlang-eval-ast \"flow_store:start_link(), flow_store:register_flow(done1, ${N1}), {ok, Id, _} = flow_store:start(done1, 0), flow_store:resume(Id, x) =:= {error, already_done}\") :name)")
EPOCHS
OUTPUT=$(timeout 360 "$SX_SERVER" < "$TMPFILE" 2>/dev/null)
check() {
local epoch="$1" desc="$2" expected="$3"
local actual
actual=$(echo "$OUTPUT" | awk -v e="$epoch" '
$0 ~ "^\\(ok-len " e " " { getline; print; exit }
$0 ~ "^\\(ok " e " " { print; exit }
$0 ~ "^\\(error " e " " { print; exit }
')
[ -z "$actual" ] && actual="<no output for epoch $epoch>"
if echo "$actual" | grep -qF -- "$expected"; then
PASS=$((PASS+1))
[ "$VERBOSE" = "-v" ] && echo " ok $desc"
else
FAIL=$((FAIL+1))
ERRORS+=" FAIL [$desc] (epoch $epoch) expected: $expected | actual: $actual
"
fi
}
check 3 "flow module loaded" "flow"
check 4 "flow_spec module loaded" "flow_spec"
check 5 "flow_store module loaded" "flow_store"
check 10 "flow_id" "true"
check 11 "flow_const" "true"
check 12 "flow_node" "true"
check 20 "sequence threads left-to-right" "true"
check 21 "parallel fans out" "true"
check 22 "map_flow over a list" "true"
check 23 "flow_while bounded by pred" "true"
check 24 "flow_until bounded by pred" "true"
check 25 "flow_while bounded by max" "true"
check 30 "branch then-arm" "true"
check 31 "branch else-arm" "true"
check 40 "failed? predicate" "true"
check 41 "attempt stops at first fail" "true"
check 42 "attempt threads on success" "true"
check 43 "recover handles fail value" "true"
check 50 "tap pass-through" "true"
check 51 "try_catch catches a raise" "true"
check 52 "retry runs node" "true"
check 60 "suspend miss short-circuits" "true"
check 61 "suspend replay completes" "true"
check 62 "first of two suspends wins" "true"
check 70 "register + resolve + list" "true"
check 71 "resolve unknown -> not_found" "true"
check 80 "start one-shot -> done" "true"
check 81 "start suspends + status" "true"
check 82 "resume completes" "true"
check 83 "status after resume = done" "true"
check 84 "two-suspend resume chain" "true"
check 85 "start unknown -> no_such_flow" "true"
check 86 "resume unknown -> no_such_instance" "true"
check 87 "resume a done flow -> already_done" "true"
TOTAL=$((PASS+FAIL))
if [ $FAIL -eq 0 ]; then
echo "ok $PASS/$TOTAL flow-on-erlang engine tests passed"
else
echo "FAIL $PASS/$TOTAL passed, $FAIL failed:"
echo "$ERRORS"
fi
[ $FAIL -eq 0 ]

84
next/flow/flow.erl Normal file
View File

@@ -0,0 +1,84 @@
-module(flow).
-export([drive/3, run/2,
cont/2, susp/2, is_susp/1, ctx_value/1, ctx_log/1,
suspend/1, log_lookup/2]).
%% flow-on-erlang — the deterministic-replay core. A native Erlang port
%% of the Scheme flow engine (lib/flow), so the fed-sx kernel can fan
%% activities out into durable business flows in its own runtime (no
%% cross-guest FFI).
%%
%% Durability model — identical semantics to the Scheme engine, but
%% adapted to this Erlang-on-SX runtime, which has three hard
%% constraints the Scheme host doesn't: no escape continuation that can
%% be re-entered, no process dictionary, and (critically) a blocking
%% `receive` / `gen_server:call` inside a `try` deadlocks the
%% cooperative scheduler. So instead of the Scheme engine's
%% mutable-global + call/cc-escape, the replay log is THREADED through a
%% railway-style context and `suspend` SHORT-CIRCUITS (like a fail
%% value) rather than throwing. No ambient state, no throw, no
%% gen_server — purely functional, which sidesteps every constraint.
%%
%% A node is `fun(Ctx) -> Ctx`. A Ctx is one of:
%% {flow_cont, Value, Log} — running; Value is the current value
%% {flow_susp, Tag, Log} — short-circuited at suspend Tag
%% Log is the replay log: [{Tag, ResolvedValue}, ...]. Combinators
%% (flow_spec) thread Ctx and pass {flow_susp,...} straight through, so
%% once a flow suspends nothing downstream runs.
%%
%% suspend/1 is the load-bearing primitive: a node that, given the
%% running Ctx, looks Tag up in the replay log. A hit replaces the
%% current value with the logged value and continues; a miss
%% short-circuits to {flow_susp, Tag, Log}. ALL effects/non-determinism
%% go through a suspend node so they run once — in the driver, between
%% drives — and their results are logged, never re-run on replay. Tags
%% must be unique and deterministic across replays.
%% ── context constructors / accessors ────────────────────────────
cont(Value, Log) -> {flow_cont, Value, Log}.
susp(Tag, Log) -> {flow_susp, Tag, Log}.
is_susp({flow_susp, _, _}) -> true;
is_susp(_) -> false.
ctx_value({flow_cont, Value, _}) -> Value;
ctx_value({flow_susp, _, _}) -> undefined.
ctx_log({flow_cont, _, Log}) -> Log;
ctx_log({flow_susp, _, Log}) -> Log.
%% ── suspend node ────────────────────────────────────────────────
suspend(Tag) ->
fun (Ctx) ->
case Ctx of
{flow_susp, _, _} -> Ctx;
{flow_cont, _Value, Log} ->
case log_lookup(Tag, Log) of
{ok, Resolved} -> {flow_cont, Resolved, Log};
miss -> {flow_susp, Tag, Log}
end
end
end.
log_lookup(_, []) -> miss;
log_lookup(Tag, [{Tag, Value} | _]) -> {ok, Value};
log_lookup(Tag, [_ | Rest]) -> log_lookup(Tag, Rest).
%% ── driver ──────────────────────────────────────────────────────
%% drive(Flow, Input, Log) — run Flow under the replay Log.
%% {flow_done, Result} — flow completed
%% {flow_suspended, Tag} — flow short-circuited at an unresolved
%% suspend; the driver resolves Tag, appends
%% {Tag, Value} to Log, and re-drives.
drive(Flow, Input, Log) ->
case Flow({flow_cont, Input, Log}) of
{flow_cont, Result, _} -> {flow_done, Result};
{flow_susp, Tag, _} -> {flow_suspended, Tag}
end.
%% run(Flow, Input) — drive with an empty replay log.
run(Flow, Input) ->
drive(Flow, Input, []).

240
next/flow/flow_spec.erl Normal file
View File

@@ -0,0 +1,240 @@
-module(flow_spec).
-export([flow_node/1, flow_id/0, flow_const/1,
sequence/1, parallel/1, map_flow/1,
flow_while/3, flow_until/3,
branch/3, fail/1, failed/1, fail_reason/1,
recover/2, tap/1, attempt/1, try_catch/2, retry/2]).
%% flow-on-erlang combinators — a native port of lib/flow/spec.sx,
%% adapted to the railway-threaded context model in flow.erl. A node is
%% `fun(Ctx) -> Ctx`; every combinator passes a {flow_susp,...} context
%% straight through, so once a flow suspends nothing downstream runs.
%% User code stays value-level: the predicates/functions handed to
%% flow_node / branch / etc. take and return plain values, and the
%% combinator threads them into the context.
%%
%% Variadic Scheme forms (sequence, parallel, attempt) take an explicit
%% list here — the one idiom difference from the Scheme engine. Effects
%% must go through a flow:suspend/1 node so they run once (in the
%% driver) and replay from the log; `tap` is only for replay-safe
%% effects (e.g. tracing).
%% ── leaves ──────────────────────────────────────────────────────
%% flow_node(F) — lift a value function F :: Value -> Value into a node.
flow_node(F) ->
fun (Ctx) ->
case flow:is_susp(Ctx) of
true -> Ctx;
false -> flow:cont(F(flow:ctx_value(Ctx)), flow:ctx_log(Ctx))
end
end.
flow_id() ->
fun (Ctx) -> Ctx end.
flow_const(V) ->
fun (Ctx) ->
case flow:is_susp(Ctx) of
true -> Ctx;
false -> flow:cont(V, flow:ctx_log(Ctx))
end
end.
%% ── threading / fan-out / iteration ─────────────────────────────
%% sequence(Nodes) — thread the context left-to-right. Each node
%% self-guards on suspension, so a suspended context flows through
%% untouched.
sequence(Nodes) ->
fun (Ctx) -> seq_step(Nodes, Ctx) end.
seq_step([], Ctx) -> Ctx;
seq_step([N | Ns], Ctx) -> seq_step(Ns, N(Ctx)).
%% parallel(Nodes) — fan the input value to every node, join results
%% into a list (sequential evaluation under one shared replay log).
%% First child to suspend short-circuits the whole parallel.
parallel(Nodes) ->
fun (Ctx) ->
case flow:is_susp(Ctx) of
true -> Ctx;
false -> par_step(Nodes, flow:ctx_value(Ctx), flow:ctx_log(Ctx), [])
end
end.
par_step([], _Input, Log, Acc) ->
flow:cont(lists:reverse(Acc), Log);
par_step([N | Ns], Input, Log, Acc) ->
R = N(flow:cont(Input, Log)),
case flow:is_susp(R) of
true -> R;
false -> par_step(Ns, Input, Log, [flow:ctx_value(R) | Acc])
end.
%% map_flow(Node) — run Node over each item of a list input value.
map_flow(Node) ->
fun (Ctx) ->
case flow:is_susp(Ctx) of
true -> Ctx;
false -> map_step(Node, flow:ctx_value(Ctx), flow:ctx_log(Ctx), [])
end
end.
map_step(_, [], Log, Acc) ->
flow:cont(lists:reverse(Acc), Log);
map_step(Node, [I | Is], Log, Acc) ->
R = Node(flow:cont(I, Log)),
case flow:is_susp(R) of
true -> R;
false -> map_step(Node, Is, Log, [flow:ctx_value(R) | Acc])
end.
%% flow_while(Pred, Body, Max) — re-run Body (a node), threading the
%% context, while Pred(value) holds, up to Max steps. Pred :: Value ->
%% bool; Body :: node.
flow_while(Pred, Body, Max) ->
fun (Ctx) -> while_step(Pred, Body, Ctx, Max) end.
while_step(_, _, Ctx, N) when N =< 0 -> Ctx;
while_step(Pred, Body, Ctx, N) ->
case flow:is_susp(Ctx) of
true -> Ctx;
false ->
case Pred(flow:ctx_value(Ctx)) of
true -> while_step(Pred, Body, Body(Ctx), N - 1);
_ -> Ctx
end
end.
%% flow_until(Pred, Body, Max) — re-run Body until Pred(value) holds.
flow_until(Pred, Body, Max) ->
fun (Ctx) -> until_step(Pred, Body, Ctx, Max) end.
until_step(_, _, Ctx, N) when N =< 0 -> Ctx;
until_step(Pred, Body, Ctx, N) ->
case flow:is_susp(Ctx) of
true -> Ctx;
false ->
case Pred(flow:ctx_value(Ctx)) of
true -> Ctx;
_ -> until_step(Pred, Body, Body(Ctx), N - 1)
end
end.
%% ── branching ───────────────────────────────────────────────────
%% branch(Pred, Then, Else) — Pred :: Value -> bool; Then/Else :: node.
branch(Pred, Then, Else) ->
fun (Ctx) ->
case flow:is_susp(Ctx) of
true -> Ctx;
false ->
case Pred(flow:ctx_value(Ctx)) of
true -> Then(Ctx);
_ -> Else(Ctx)
end
end
end.
%% ── railway-style failure (values, not exceptions) ──────────────
fail(Reason) -> {flow_fail, Reason}.
failed({flow_fail, _}) -> true;
failed(_) -> false.
fail_reason({flow_fail, R}) -> R.
%% recover(Node, Handler) — if Node yields a fail VALUE, run Handler on
%% the reason; else pass through. Handler :: Reason -> Value.
recover(Node, Handler) ->
fun (Ctx) ->
R = Node(Ctx),
case flow:is_susp(R) of
true -> R;
false ->
V = flow:ctx_value(R),
case failed(V) of
true -> flow:cont(Handler(fail_reason(V)), flow:ctx_log(R));
false -> R
end
end
end.
%% tap(Effect) — replay-safe side-effecting pass-through (returns the
%% input value unchanged). Effect :: Value -> any.
tap(Effect) ->
fun (Ctx) ->
case flow:is_susp(Ctx) of
true -> Ctx;
false -> Effect(flow:ctx_value(Ctx)), Ctx
end
end.
%% attempt(Nodes) — railway sequence: thread left-to-right but stop at
%% the first node whose value is a fail, returning that failure.
attempt(Nodes) ->
fun (Ctx) -> attempt_step(Nodes, Ctx) end.
attempt_step([], Ctx) -> Ctx;
attempt_step([N | Ns], Ctx) ->
case flow:is_susp(Ctx) of
true -> Ctx;
false ->
case failed(flow:ctx_value(Ctx)) of
true -> Ctx;
false -> attempt_step(Ns, N(Ctx))
end
end.
%% ── exception-style control ─────────────────────────────────────
%% Nodes are pure (effects go through suspend, run by the driver), so a
%% try around a node never wraps a blocking receive — safe in this
%% runtime.
%% try_catch(Node, Handler) — run Node; if it raises, run Handler on the
%% exception. Handler :: Exception -> Value.
try_catch(Node, Handler) ->
fun (Ctx) ->
case flow:is_susp(Ctx) of
true -> Ctx;
false ->
Log = flow:ctx_log(Ctx),
try Node(Ctx) of
R -> R
catch
throw:E -> flow:cont(Handler(E), Log);
error:E -> flow:cont(Handler(E), Log);
exit:E -> flow:cont(Handler(E), Log)
end
end
end.
%% retry(N, Node) — run Node, retrying up to N attempts on a raise.
retry(N, Node) ->
fun (Ctx) -> retry_step(N, Node, Ctx) end.
retry_step(N, Node, Ctx) ->
case flow:is_susp(Ctx) of
true -> Ctx;
false ->
try Node(Ctx) of
R -> R
catch
throw:Reason -> retry_reraise(N, Node, Ctx, throw, Reason);
error:Reason -> retry_reraise(N, Node, Ctx, error, Reason);
exit:Reason -> retry_reraise(N, Node, Ctx, exit, Reason)
end
end.
retry_reraise(N, Node, Ctx, Class, Reason) ->
case N =< 1 of
false -> retry_step(N - 1, Node, Ctx);
true ->
case Class of
throw -> throw(Reason);
error -> erlang:error(Reason);
exit -> exit(Reason)
end
end.

141
next/flow/flow_store.erl Normal file
View File

@@ -0,0 +1,141 @@
-module(flow_store).
-export([start_link/0, start_link/1, stop/0,
register_flow/2, resolve_flow/1, registered_flows/0,
start/2, resume/2, status/1, instances/0]).
-export([init/1, handle_call/3, handle_cast/2, handle_info/2]).
-behaviour(gen_server).
%% flow-on-erlang durable store — the named-flow registry plus the
%% instance table that makes suspend/resume durable. flow.erl is the
%% pure replay driver; this gen_server is the stateful shell around it,
%% holding the registry (so triggers can reference flows by name, and
%% so an instance can be re-resolved + replayed after a restart) and
%% each instance's accumulated replay log.
%%
%% Crucially the driver stays OUT of any blocking context: start/resume
%% call flow:drive/3 (pure — no receive, no gen_server:call) from inside
%% handle_call, and the only message-passing is the caller's
%% gen_server:call into this store. (A blocking receive inside a `try`
%% deadlocks this cooperative scheduler, so the engine never does one.)
%%
%% State: {Registry, Instances, NextId}
%% Registry = [{Name, FlowFun}, ...]
%% Instances = [{Id, {Name, Input, Log, Status}}, ...]
%% Status = {suspended, Tag} | {done, Result}
%% Log = [{Tag, ResolvedValue}, ...] (the replay log — plain
%% data, so an instance is fully described by its log and
%% survives process restart by re-driving the named flow)
%%
%% v1 backs the store in gen_server memory; persisting the instance
%% logs to the kernel's durable log (so flows survive an OS restart) is
%% a later layer — the data shape is already restart-ready.
start_link() ->
start_link([]).
start_link(InitialFlows) ->
Pid = gen_server:start_link(flow_store, [InitialFlows]),
erlang:register(flow_store, Pid),
Pid.
stop() ->
R = gen_server:call(flow_store, '$gen_stop'),
erlang:unregister(flow_store),
R.
%% register_flow(Name, Flow) — register a named flow (a node fun). Named
%% rather than `register` to avoid the erlang:register/2 auto-import.
register_flow(Name, Flow) ->
gen_server:call(flow_store, {register_flow, Name, Flow}).
resolve_flow(Name) ->
gen_server:call(flow_store, {resolve_flow, Name}).
registered_flows() ->
gen_server:call(flow_store, registered_flows).
%% start(Name, Input) -> {ok, Id, Result} | {error, no_such_flow}.
%% Result is {flow_done, V} | {flow_suspended, Tag}; the instance is
%% recorded either way so a suspended flow can be resumed by Id.
start(Name, Input) ->
gen_server:call(flow_store, {start, Name, Input}).
%% resume(Id, Value) -> {ok, Result} | {error, Reason}. Resolves the
%% instance's current suspend tag with Value (appends {Tag, Value} to
%% its replay log) and re-drives from the top.
resume(Id, Value) ->
gen_server:call(flow_store, {resume, Id, Value}).
%% status(Id) -> {ok, {suspended, Tag}} | {ok, {done, Result}} | not_found
status(Id) ->
gen_server:call(flow_store, {status, Id}).
instances() ->
gen_server:call(flow_store, instances).
%% ── gen_server ──────────────────────────────────────────────────
init([InitialFlows]) ->
{ok, {InitialFlows, [], 1}}.
handle_call({register_flow, Name, Flow}, _From, {Reg, Ins, N}) ->
{reply, ok, {set_keyed(Name, Flow, Reg), Ins, N}};
handle_call({resolve_flow, Name}, _From, {Reg, Ins, N}) ->
{reply, find_keyed(Name, Reg), {Reg, Ins, N}};
handle_call(registered_flows, _From, {Reg, Ins, N}) ->
{reply, [Name || {Name, _} <- Reg], {Reg, Ins, N}};
handle_call({start, Name, Input}, _From, {Reg, Ins, N}) ->
case find_keyed(Name, Reg) of
not_found ->
{reply, {error, no_such_flow}, {Reg, Ins, N}};
{ok, Flow} ->
R = flow:drive(Flow, Input, []),
Status = result_status(R),
Ins2 = set_keyed(N, {Name, Input, [], Status}, Ins),
{reply, {ok, N, R}, {Reg, Ins2, N + 1}}
end;
handle_call({resume, Id, Value}, _From, {Reg, Ins, N}) ->
case find_keyed(Id, Ins) of
not_found ->
{reply, {error, no_such_instance}, {Reg, Ins, N}};
{ok, {_Name, _Input, _Log, {done, _}}} ->
{reply, {error, already_done}, {Reg, Ins, N}};
{ok, {Name, Input, Log, {suspended, Tag}}} ->
case find_keyed(Name, Reg) of
not_found ->
{reply, {error, no_such_flow}, {Reg, Ins, N}};
{ok, Flow} ->
NewLog = log_append(Log, Tag, Value),
R = flow:drive(Flow, Input, NewLog),
Status = result_status(R),
Ins2 = set_keyed(Id, {Name, Input, NewLog, Status}, Ins),
{reply, {ok, R}, {Reg, Ins2, N}}
end
end;
handle_call({status, Id}, _From, {Reg, Ins, N}) ->
case find_keyed(Id, Ins) of
{ok, {_Name, _Input, _Log, Status}} -> {reply, {ok, Status}, {Reg, Ins, N}};
not_found -> {reply, not_found, {Reg, Ins, N}}
end;
handle_call(instances, _From, {Reg, Ins, N}) ->
{reply, [Id || {Id, _} <- Ins], {Reg, Ins, N}}.
handle_cast(_, S) -> {noreply, S}.
handle_info(_, S) -> {noreply, S}.
%% ── helpers ─────────────────────────────────────────────────────
result_status({flow_done, R}) -> {done, R};
result_status({flow_suspended, T}) -> {suspended, T}.
log_append([], Tag, Value) -> [{Tag, Value}];
log_append([H | T], Tag, Value) -> [H | log_append(T, Tag, Value)].
find_keyed(_, []) -> not_found;
find_keyed(K, [{K, V} | _]) -> {ok, V};
find_keyed(K, [_ | Rest]) -> find_keyed(K, Rest).
set_keyed(K, V, []) -> [{K, V}];
set_keyed(K, V, [{K, _} | Rest]) -> [{K, V} | Rest];
set_keyed(K, V, [P | Rest]) -> [P | set_keyed(K, V, Rest)].