From 8b3d92ed5fae3f0e1d758c8d2e219080330d1b24 Mon Sep 17 00:00:00 2001 From: giles Date: Tue, 30 Jun 2026 17:51:15 +0000 Subject: [PATCH] fed-sx-types Phase 5: flow-on-erlang engine core (next/flow/) MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit A native Erlang-on-SX durable workflow engine, so the fed-sx kernel can fan activities out into business flows in its own runtime — no cross- guest FFI to the Scheme lib/flow, no marshalling, no Scheme dependency. The seed of a real engine (chosen over bridging Scheme flow) that can later supersede it for substrate use. - flow.erl — the deterministic-replay driver. Same durability model as the Scheme engine (re-run from the top; effects go through suspend; the replay log is plain [{Tag,Value}] data, restart-ready), but adapted to three hard runtime constraints: no re-enterable continuation, no process dictionary, and a blocking receive inside a `try` deadlocks the cooperative scheduler. Resolution: thread the log through a railway-style context and make suspend SHORT-CIRCUIT (like a fail value) instead of throwing — purely functional, sidesteps all three. Ctx = {flow_cont,V,Log} | {flow_susp,Tag,Log}. - flow_spec.erl — combinator algebra mirrored from lib/flow/spec.sx: leaves, sequence/parallel/map_flow, flow_while/flow_until, branch, railway fail/recover/attempt, tap, try_catch/retry. - flow_store.erl — durable gen_server: named-flow registry + instance table + start/resume/status. Drives the pure flow from handle_call, so no gen_server:call is ever inside the replay try-path. Gate: next/flow/conformance.sh — 34/34. lib/erlang untouched (771/771). See next/flow/README.md for the model + why railway threading. Co-Authored-By: Claude Opus 4.8 (1M context) --- next/flow/README.md | 91 +++++++++++++++ next/flow/conformance.sh | 199 ++++++++++++++++++++++++++++++++ next/flow/flow.erl | 84 ++++++++++++++ next/flow/flow_spec.erl | 240 +++++++++++++++++++++++++++++++++++++++ next/flow/flow_store.erl | 141 +++++++++++++++++++++++ 5 files changed, 755 insertions(+) create mode 100644 next/flow/README.md create mode 100755 next/flow/conformance.sh create mode 100644 next/flow/flow.erl create mode 100644 next/flow/flow_spec.erl create mode 100644 next/flow/flow_store.erl diff --git a/next/flow/README.md b/next/flow/README.md new file mode 100644 index 00000000..214e57f5 --- /dev/null +++ b/next/flow/README.md @@ -0,0 +1,91 @@ +# flow-on-erlang — durable workflows in the fed-sx runtime + +A native Erlang-on-SX port of the Scheme flow engine (`lib/flow`), so +the fed-sx kernel can fan arriving activities out into durable, +branching, multi-step business flows **in its own runtime** — no +cross-guest FFI, no marshalling, no Scheme dependency. The seed of a +real engine that can later supersede the Scheme one for substrate use. + +Run the suite: `bash next/flow/conformance.sh` → engine conformance. + +## Model + +A **flow** is an Erlang `fun(Ctx) -> Ctx`. Combinators (`flow_spec`) +compose flows; user code stays value-level (the functions you hand to +`flow_node`/`branch`/… take and return plain values). A flow that +ignores its input is a thunk; composition *is* function composition. + +```erlang +F = flow_spec:sequence([ + flow_spec:flow_node(fun(Draft) -> Draft + 1 end), + flow_spec:branch(fun(P) -> P >= 3 end, + flow_spec:flow_const(ok), + flow_spec:flow_const(rejected))]), +flow:run(F, 2) %% => {flow_done, ok} +``` + +## Durability — deterministic replay + +Same semantics as the Scheme engine: a flow re-runs from the top on +every resume; effects/non-determinism go through `flow:suspend/1`, +whose resolved values are logged; an already-resolved suspend replays +its logged value, and the first unresolved suspend short-circuits back +to the driver. The persisted state is the **replay log** — plain +`[{Tag, Value}]` data — so nothing live (no continuation, no process) +is ever serialized; an instance survives restart by re-driving its +named flow against its log. + +```erlang +flow_store:register_flow(publish, F), +{ok, Id, R} = flow_store:start(publish, Draft), %% R = {flow_suspended, Tag} | {flow_done, V} +%% ... driver performs the effect for Tag, then: +flow_store:resume(Id, EffectResult) %% re-drives; completes or suspends again +``` + +## Why railway threading instead of call/cc + a global + +The Scheme engine uses an escape-only `call/cc` plus a mutable global +replay log. This Erlang-on-SX runtime can't do either, and has a third +sharp edge: + +- **No re-enterable continuation** — but suspend only needs to *escape*, + which Erlang `throw` could do … +- **… except a blocking `receive` / `gen_server:call` inside a `try` + deadlocks** the cooperative scheduler. So `suspend` must not consult + the log via a registry process while inside a `try`. +- **No process dictionary** — so there is no ambient per-process slot to + stash the replay log in. + +The resolution: thread the replay log through a railway-style **context** +and make `suspend` *short-circuit* (like a `fail` value) rather than +throw. No ambient state, no throw, no gen_server in the hot path — +purely functional, which sidesteps all three constraints. The driver +(`flow_store`) is the only stateful part, and it calls the pure +`flow:drive/3` from inside `handle_call`, never wrapping a blocking +receive. + +A `Ctx` is `{flow_cont, Value, Log}` (running) or `{flow_susp, Tag, +Log}` (short-circuited); every combinator passes a suspended context +straight through. + +## Modules + +| Module | Role | +|---|---| +| `flow.erl` | pure replay driver: `drive/3`, `run/2`, `suspend/1`, the `Ctx` constructors/accessors | +| `flow_spec.erl` | combinator algebra: leaves, `sequence`/`parallel`/`map_flow`, `flow_while`/`flow_until`, `branch`, railway `fail`/`recover`/`attempt`, `tap`, `try_catch`/`retry` | +| `flow_store.erl` | durable gen_server: named-flow registry + instance table + `start`/`resume`/`status` | + +## Consumed by + +The fed-sx kernel's trigger fan-out (`pipeline.erl` + `flow_dispatch`) +starts named flows from arriving activities; see +`plans/fed-sx-host-types.md` and the triggers phases. + +## Not yet (later layers) + +- Persisting instance logs to the kernel's durable on-disk log (the + data shape is already restart-ready; only the backing is in-memory). +- `parallel` with multiple independent suspends resolving concurrently + (current `parallel` is sequential under one shared log). +- Full parity with the Scheme engine's distributed/remote nodes. diff --git a/next/flow/conformance.sh b/next/flow/conformance.sh new file mode 100755 index 00000000..37a6ecb5 --- /dev/null +++ b/next/flow/conformance.sh @@ -0,0 +1,199 @@ +#!/usr/bin/env bash +# next/flow/conformance.sh — flow-on-erlang engine conformance. +# +# Exercises the native Erlang-on-SX durable workflow engine +# (next/flow/{flow,flow_spec,flow_store}.erl): the combinator algebra, +# the deterministic-replay suspend/resume core, and the durable store. +# This is the gate for the engine, replacing lib/flow/conformance.sh +# (the Scheme engine) for the fed-sx substrate — the kernel's trigger +# fan-out drives flows in its own runtime, with no cross-guest FFI. + +set -uo pipefail +cd "$(git rev-parse --show-toplevel)" + +SX_SERVER="${SX_SERVER:-hosts/ocaml/_build/default/bin/sx_server.exe}" +if [ ! -x "$SX_SERVER" ]; then + SX_SERVER="/root/rose-ash/hosts/ocaml/_build/default/bin/sx_server.exe" +fi +if [ ! -x "$SX_SERVER" ]; then + echo "ERROR: sx_server.exe not found." >&2 + exit 1 +fi + +VERBOSE="${1:-}" +PASS=0; FAIL=0; ERRORS="" +TMPFILE=$(mktemp); trap "rm -f $TMPFILE" EXIT + +# Common combinator shorthands built per-epoch (Erlang locals don't +# survive across erlang-eval-ast calls; the gen_server state does). +N1='flow_spec:flow_node(fun(X) -> X + 1 end)' +N2='flow_spec:flow_node(fun(X) -> X * 2 end)' +SUSP_FLOW='flow_spec:sequence([flow_spec:flow_node(fun(X) -> X + 1 end), flow:suspend(wait1), flow_spec:flow_node(fun(V) -> {resumed, V} end)])' +TWO_SUSP='flow_spec:sequence([flow:suspend(a), flow_spec:flow_node(fun(V) -> V * 10 end), flow:suspend(b), flow_spec:flow_node(fun(V) -> V + 1 end)])' + +cat > "$TMPFILE" < X < 10 end, ${N1}, 100), 0) =:= {flow_done, 10}\") :name)") +(epoch 24) +(eval "(get (erlang-eval-ast \"flow:run(flow_spec:flow_until(fun(X) -> X >= 5 end, ${N1}, 100), 0) =:= {flow_done, 5}\") :name)") +(epoch 25) +(eval "(get (erlang-eval-ast \"flow:run(flow_spec:flow_while(fun(_) -> true end, ${N1}, 3), 0) =:= {flow_done, 3}\") :name)") + +;; ── branching ────────────────────────────────────────────── +(epoch 30) +(eval "(get (erlang-eval-ast \"flow:run(flow_spec:branch(fun(X) -> X > 0 end, flow_spec:flow_const(pos), flow_spec:flow_const(neg)), 5) =:= {flow_done, pos}\") :name)") +(epoch 31) +(eval "(get (erlang-eval-ast \"flow:run(flow_spec:branch(fun(X) -> X > 0 end, flow_spec:flow_const(pos), flow_spec:flow_const(neg)), -5) =:= {flow_done, neg}\") :name)") + +;; ── railway failure ──────────────────────────────────────── +(epoch 40) +(eval "(get (erlang-eval-ast \"flow_spec:failed(flow_spec:fail(x)) andalso (flow_spec:failed(42) =:= false)\") :name)") +(epoch 41) +(eval "(get (erlang-eval-ast \"flow:run(flow_spec:attempt([flow_spec:flow_node(fun(_) -> flow_spec:fail(boom) end), flow_spec:flow_node(fun(_) -> 999 end)]), 0) =:= {flow_done, {flow_fail, boom}}\") :name)") +(epoch 42) +(eval "(get (erlang-eval-ast \"flow:run(flow_spec:attempt([${N1}, ${N2}]), 3) =:= {flow_done, 8}\") :name)") +(epoch 43) +(eval "(get (erlang-eval-ast \"flow:run(flow_spec:recover(flow_spec:flow_node(fun(_) -> flow_spec:fail(bad) end), fun(R) -> {ok, R} end), 0) =:= {flow_done, {ok, bad}}\") :name)") + +;; ── effects / exceptions ─────────────────────────────────── +(epoch 50) +(eval "(get (erlang-eval-ast \"flow:run(flow_spec:tap(fun(_) -> ok end), 7) =:= {flow_done, 7}\") :name)") +(epoch 51) +(eval "(get (erlang-eval-ast \"flow:run(flow_spec:try_catch(flow_spec:flow_node(fun(_) -> throw(oops) end), fun(E) -> {caught, E} end), 0) =:= {flow_done, {caught, oops}}\") :name)") +(epoch 52) +(eval "(get (erlang-eval-ast \"flow:run(flow_spec:retry(5, flow_spec:flow_node(fun(X) -> X + 1 end)), 1) =:= {flow_done, 2}\") :name)") + +;; ── suspend / replay (deterministic-replay core) ─────────── +(epoch 60) +(eval "(get (erlang-eval-ast \"flow:run(${SUSP_FLOW}, 0) =:= {flow_suspended, wait1}\") :name)") +(epoch 61) +(eval "(get (erlang-eval-ast \"flow:drive(${SUSP_FLOW}, 0, [{wait1, 99}]) =:= {flow_done, {resumed, 99}}\") :name)") +(epoch 62) +(eval "(get (erlang-eval-ast \"flow:run(flow_spec:sequence([flow:suspend(a), flow:suspend(b)]), 0) =:= {flow_suspended, a}\") :name)") + +;; ── durable store: registry ──────────────────────────────── +(epoch 70) +(eval "(get (erlang-eval-ast \"flow_store:start_link(), flow_store:register_flow(f1, ${N1}), flow_store:resolve_flow(f1) =/= not_found andalso flow_store:registered_flows() =:= [f1]\") :name)") +(epoch 71) +(eval "(get (erlang-eval-ast \"flow_store:start_link(), flow_store:resolve_flow(ghost) =:= not_found\") :name)") + +;; ── durable store: start / resume ────────────────────────── +;; one-shot flow runs to completion on start +(epoch 80) +(eval "(get (erlang-eval-ast \"flow_store:start_link(), flow_store:register_flow(done1, ${N1}), flow_store:start(done1, 41) =:= {ok, 1, {flow_done, 42}}\") :name)") +;; suspending flow: start suspends, resume completes +(epoch 81) +(eval "(get (erlang-eval-ast \"flow_store:start_link(), flow_store:register_flow(s1, ${SUSP_FLOW}), {ok, Id, R} = flow_store:start(s1, 10), R =:= {flow_suspended, wait1} andalso flow_store:status(Id) =:= {ok, {suspended, wait1}}\") :name)") +(epoch 82) +(eval "(get (erlang-eval-ast \"flow_store:start_link(), flow_store:register_flow(s1, ${SUSP_FLOW}), {ok, Id, _} = flow_store:start(s1, 10), flow_store:resume(Id, 99) =:= {ok, {flow_done, {resumed, 99}}}\") :name)") +(epoch 83) +(eval "(get (erlang-eval-ast \"flow_store:start_link(), flow_store:register_flow(s1, ${SUSP_FLOW}), {ok, Id, _} = flow_store:start(s1, 10), flow_store:resume(Id, 99), flow_store:status(Id) =:= {ok, {done, {resumed, 99}}}\") :name)") +;; two-suspend flow: resume chain accumulates the replay log +(epoch 84) +(eval "(get (erlang-eval-ast \"flow_store:start_link(), flow_store:register_flow(s2, ${TWO_SUSP}), {ok, Id, _} = flow_store:start(s2, 0), {ok, R1} = flow_store:resume(Id, 5), R2 = flow_store:resume(Id, 7), R1 =:= {flow_suspended, b} andalso R2 =:= {ok, {flow_done, 8}}\") :name)") +;; error paths +(epoch 85) +(eval "(get (erlang-eval-ast \"flow_store:start_link(), flow_store:start(ghost, 0) =:= {error, no_such_flow}\") :name)") +(epoch 86) +(eval "(get (erlang-eval-ast \"flow_store:start_link(), flow_store:resume(999, x) =:= {error, no_such_instance}\") :name)") +(epoch 87) +(eval "(get (erlang-eval-ast \"flow_store:start_link(), flow_store:register_flow(done1, ${N1}), {ok, Id, _} = flow_store:start(done1, 0), flow_store:resume(Id, x) =:= {error, already_done}\") :name)") +EPOCHS + +OUTPUT=$(timeout 360 "$SX_SERVER" < "$TMPFILE" 2>/dev/null) + +check() { + local epoch="$1" desc="$2" expected="$3" + local actual + actual=$(echo "$OUTPUT" | awk -v e="$epoch" ' + $0 ~ "^\\(ok-len " e " " { getline; print; exit } + $0 ~ "^\\(ok " e " " { print; exit } + $0 ~ "^\\(error " e " " { print; exit } + ') + [ -z "$actual" ] && actual="" + if echo "$actual" | grep -qF -- "$expected"; then + PASS=$((PASS+1)) + [ "$VERBOSE" = "-v" ] && echo " ok $desc" + else + FAIL=$((FAIL+1)) + ERRORS+=" FAIL [$desc] (epoch $epoch) expected: $expected | actual: $actual +" + fi +} + +check 3 "flow module loaded" "flow" +check 4 "flow_spec module loaded" "flow_spec" +check 5 "flow_store module loaded" "flow_store" +check 10 "flow_id" "true" +check 11 "flow_const" "true" +check 12 "flow_node" "true" +check 20 "sequence threads left-to-right" "true" +check 21 "parallel fans out" "true" +check 22 "map_flow over a list" "true" +check 23 "flow_while bounded by pred" "true" +check 24 "flow_until bounded by pred" "true" +check 25 "flow_while bounded by max" "true" +check 30 "branch then-arm" "true" +check 31 "branch else-arm" "true" +check 40 "failed? predicate" "true" +check 41 "attempt stops at first fail" "true" +check 42 "attempt threads on success" "true" +check 43 "recover handles fail value" "true" +check 50 "tap pass-through" "true" +check 51 "try_catch catches a raise" "true" +check 52 "retry runs node" "true" +check 60 "suspend miss short-circuits" "true" +check 61 "suspend replay completes" "true" +check 62 "first of two suspends wins" "true" +check 70 "register + resolve + list" "true" +check 71 "resolve unknown -> not_found" "true" +check 80 "start one-shot -> done" "true" +check 81 "start suspends + status" "true" +check 82 "resume completes" "true" +check 83 "status after resume = done" "true" +check 84 "two-suspend resume chain" "true" +check 85 "start unknown -> no_such_flow" "true" +check 86 "resume unknown -> no_such_instance" "true" +check 87 "resume a done flow -> already_done" "true" + +TOTAL=$((PASS+FAIL)) +if [ $FAIL -eq 0 ]; then + echo "ok $PASS/$TOTAL flow-on-erlang engine tests passed" +else + echo "FAIL $PASS/$TOTAL passed, $FAIL failed:" + echo "$ERRORS" +fi +[ $FAIL -eq 0 ] diff --git a/next/flow/flow.erl b/next/flow/flow.erl new file mode 100644 index 00000000..db83d814 --- /dev/null +++ b/next/flow/flow.erl @@ -0,0 +1,84 @@ +-module(flow). +-export([drive/3, run/2, + cont/2, susp/2, is_susp/1, ctx_value/1, ctx_log/1, + suspend/1, log_lookup/2]). + +%% flow-on-erlang — the deterministic-replay core. A native Erlang port +%% of the Scheme flow engine (lib/flow), so the fed-sx kernel can fan +%% activities out into durable business flows in its own runtime (no +%% cross-guest FFI). +%% +%% Durability model — identical semantics to the Scheme engine, but +%% adapted to this Erlang-on-SX runtime, which has three hard +%% constraints the Scheme host doesn't: no escape continuation that can +%% be re-entered, no process dictionary, and (critically) a blocking +%% `receive` / `gen_server:call` inside a `try` deadlocks the +%% cooperative scheduler. So instead of the Scheme engine's +%% mutable-global + call/cc-escape, the replay log is THREADED through a +%% railway-style context and `suspend` SHORT-CIRCUITS (like a fail +%% value) rather than throwing. No ambient state, no throw, no +%% gen_server — purely functional, which sidesteps every constraint. +%% +%% A node is `fun(Ctx) -> Ctx`. A Ctx is one of: +%% {flow_cont, Value, Log} — running; Value is the current value +%% {flow_susp, Tag, Log} — short-circuited at suspend Tag +%% Log is the replay log: [{Tag, ResolvedValue}, ...]. Combinators +%% (flow_spec) thread Ctx and pass {flow_susp,...} straight through, so +%% once a flow suspends nothing downstream runs. +%% +%% suspend/1 is the load-bearing primitive: a node that, given the +%% running Ctx, looks Tag up in the replay log. A hit replaces the +%% current value with the logged value and continues; a miss +%% short-circuits to {flow_susp, Tag, Log}. ALL effects/non-determinism +%% go through a suspend node so they run once — in the driver, between +%% drives — and their results are logged, never re-run on replay. Tags +%% must be unique and deterministic across replays. + +%% ── context constructors / accessors ──────────────────────────── + +cont(Value, Log) -> {flow_cont, Value, Log}. +susp(Tag, Log) -> {flow_susp, Tag, Log}. + +is_susp({flow_susp, _, _}) -> true; +is_susp(_) -> false. + +ctx_value({flow_cont, Value, _}) -> Value; +ctx_value({flow_susp, _, _}) -> undefined. + +ctx_log({flow_cont, _, Log}) -> Log; +ctx_log({flow_susp, _, Log}) -> Log. + +%% ── suspend node ──────────────────────────────────────────────── + +suspend(Tag) -> + fun (Ctx) -> + case Ctx of + {flow_susp, _, _} -> Ctx; + {flow_cont, _Value, Log} -> + case log_lookup(Tag, Log) of + {ok, Resolved} -> {flow_cont, Resolved, Log}; + miss -> {flow_susp, Tag, Log} + end + end + end. + +log_lookup(_, []) -> miss; +log_lookup(Tag, [{Tag, Value} | _]) -> {ok, Value}; +log_lookup(Tag, [_ | Rest]) -> log_lookup(Tag, Rest). + +%% ── driver ────────────────────────────────────────────────────── + +%% drive(Flow, Input, Log) — run Flow under the replay Log. +%% {flow_done, Result} — flow completed +%% {flow_suspended, Tag} — flow short-circuited at an unresolved +%% suspend; the driver resolves Tag, appends +%% {Tag, Value} to Log, and re-drives. +drive(Flow, Input, Log) -> + case Flow({flow_cont, Input, Log}) of + {flow_cont, Result, _} -> {flow_done, Result}; + {flow_susp, Tag, _} -> {flow_suspended, Tag} + end. + +%% run(Flow, Input) — drive with an empty replay log. +run(Flow, Input) -> + drive(Flow, Input, []). diff --git a/next/flow/flow_spec.erl b/next/flow/flow_spec.erl new file mode 100644 index 00000000..88321206 --- /dev/null +++ b/next/flow/flow_spec.erl @@ -0,0 +1,240 @@ +-module(flow_spec). +-export([flow_node/1, flow_id/0, flow_const/1, + sequence/1, parallel/1, map_flow/1, + flow_while/3, flow_until/3, + branch/3, fail/1, failed/1, fail_reason/1, + recover/2, tap/1, attempt/1, try_catch/2, retry/2]). + +%% flow-on-erlang combinators — a native port of lib/flow/spec.sx, +%% adapted to the railway-threaded context model in flow.erl. A node is +%% `fun(Ctx) -> Ctx`; every combinator passes a {flow_susp,...} context +%% straight through, so once a flow suspends nothing downstream runs. +%% User code stays value-level: the predicates/functions handed to +%% flow_node / branch / etc. take and return plain values, and the +%% combinator threads them into the context. +%% +%% Variadic Scheme forms (sequence, parallel, attempt) take an explicit +%% list here — the one idiom difference from the Scheme engine. Effects +%% must go through a flow:suspend/1 node so they run once (in the +%% driver) and replay from the log; `tap` is only for replay-safe +%% effects (e.g. tracing). + +%% ── leaves ────────────────────────────────────────────────────── + +%% flow_node(F) — lift a value function F :: Value -> Value into a node. +flow_node(F) -> + fun (Ctx) -> + case flow:is_susp(Ctx) of + true -> Ctx; + false -> flow:cont(F(flow:ctx_value(Ctx)), flow:ctx_log(Ctx)) + end + end. + +flow_id() -> + fun (Ctx) -> Ctx end. + +flow_const(V) -> + fun (Ctx) -> + case flow:is_susp(Ctx) of + true -> Ctx; + false -> flow:cont(V, flow:ctx_log(Ctx)) + end + end. + +%% ── threading / fan-out / iteration ───────────────────────────── + +%% sequence(Nodes) — thread the context left-to-right. Each node +%% self-guards on suspension, so a suspended context flows through +%% untouched. +sequence(Nodes) -> + fun (Ctx) -> seq_step(Nodes, Ctx) end. + +seq_step([], Ctx) -> Ctx; +seq_step([N | Ns], Ctx) -> seq_step(Ns, N(Ctx)). + +%% parallel(Nodes) — fan the input value to every node, join results +%% into a list (sequential evaluation under one shared replay log). +%% First child to suspend short-circuits the whole parallel. +parallel(Nodes) -> + fun (Ctx) -> + case flow:is_susp(Ctx) of + true -> Ctx; + false -> par_step(Nodes, flow:ctx_value(Ctx), flow:ctx_log(Ctx), []) + end + end. + +par_step([], _Input, Log, Acc) -> + flow:cont(lists:reverse(Acc), Log); +par_step([N | Ns], Input, Log, Acc) -> + R = N(flow:cont(Input, Log)), + case flow:is_susp(R) of + true -> R; + false -> par_step(Ns, Input, Log, [flow:ctx_value(R) | Acc]) + end. + +%% map_flow(Node) — run Node over each item of a list input value. +map_flow(Node) -> + fun (Ctx) -> + case flow:is_susp(Ctx) of + true -> Ctx; + false -> map_step(Node, flow:ctx_value(Ctx), flow:ctx_log(Ctx), []) + end + end. + +map_step(_, [], Log, Acc) -> + flow:cont(lists:reverse(Acc), Log); +map_step(Node, [I | Is], Log, Acc) -> + R = Node(flow:cont(I, Log)), + case flow:is_susp(R) of + true -> R; + false -> map_step(Node, Is, Log, [flow:ctx_value(R) | Acc]) + end. + +%% flow_while(Pred, Body, Max) — re-run Body (a node), threading the +%% context, while Pred(value) holds, up to Max steps. Pred :: Value -> +%% bool; Body :: node. +flow_while(Pred, Body, Max) -> + fun (Ctx) -> while_step(Pred, Body, Ctx, Max) end. + +while_step(_, _, Ctx, N) when N =< 0 -> Ctx; +while_step(Pred, Body, Ctx, N) -> + case flow:is_susp(Ctx) of + true -> Ctx; + false -> + case Pred(flow:ctx_value(Ctx)) of + true -> while_step(Pred, Body, Body(Ctx), N - 1); + _ -> Ctx + end + end. + +%% flow_until(Pred, Body, Max) — re-run Body until Pred(value) holds. +flow_until(Pred, Body, Max) -> + fun (Ctx) -> until_step(Pred, Body, Ctx, Max) end. + +until_step(_, _, Ctx, N) when N =< 0 -> Ctx; +until_step(Pred, Body, Ctx, N) -> + case flow:is_susp(Ctx) of + true -> Ctx; + false -> + case Pred(flow:ctx_value(Ctx)) of + true -> Ctx; + _ -> until_step(Pred, Body, Body(Ctx), N - 1) + end + end. + +%% ── branching ─────────────────────────────────────────────────── + +%% branch(Pred, Then, Else) — Pred :: Value -> bool; Then/Else :: node. +branch(Pred, Then, Else) -> + fun (Ctx) -> + case flow:is_susp(Ctx) of + true -> Ctx; + false -> + case Pred(flow:ctx_value(Ctx)) of + true -> Then(Ctx); + _ -> Else(Ctx) + end + end + end. + +%% ── railway-style failure (values, not exceptions) ────────────── + +fail(Reason) -> {flow_fail, Reason}. + +failed({flow_fail, _}) -> true; +failed(_) -> false. + +fail_reason({flow_fail, R}) -> R. + +%% recover(Node, Handler) — if Node yields a fail VALUE, run Handler on +%% the reason; else pass through. Handler :: Reason -> Value. +recover(Node, Handler) -> + fun (Ctx) -> + R = Node(Ctx), + case flow:is_susp(R) of + true -> R; + false -> + V = flow:ctx_value(R), + case failed(V) of + true -> flow:cont(Handler(fail_reason(V)), flow:ctx_log(R)); + false -> R + end + end + end. + +%% tap(Effect) — replay-safe side-effecting pass-through (returns the +%% input value unchanged). Effect :: Value -> any. +tap(Effect) -> + fun (Ctx) -> + case flow:is_susp(Ctx) of + true -> Ctx; + false -> Effect(flow:ctx_value(Ctx)), Ctx + end + end. + +%% attempt(Nodes) — railway sequence: thread left-to-right but stop at +%% the first node whose value is a fail, returning that failure. +attempt(Nodes) -> + fun (Ctx) -> attempt_step(Nodes, Ctx) end. + +attempt_step([], Ctx) -> Ctx; +attempt_step([N | Ns], Ctx) -> + case flow:is_susp(Ctx) of + true -> Ctx; + false -> + case failed(flow:ctx_value(Ctx)) of + true -> Ctx; + false -> attempt_step(Ns, N(Ctx)) + end + end. + +%% ── exception-style control ───────────────────────────────────── +%% Nodes are pure (effects go through suspend, run by the driver), so a +%% try around a node never wraps a blocking receive — safe in this +%% runtime. + +%% try_catch(Node, Handler) — run Node; if it raises, run Handler on the +%% exception. Handler :: Exception -> Value. +try_catch(Node, Handler) -> + fun (Ctx) -> + case flow:is_susp(Ctx) of + true -> Ctx; + false -> + Log = flow:ctx_log(Ctx), + try Node(Ctx) of + R -> R + catch + throw:E -> flow:cont(Handler(E), Log); + error:E -> flow:cont(Handler(E), Log); + exit:E -> flow:cont(Handler(E), Log) + end + end + end. + +%% retry(N, Node) — run Node, retrying up to N attempts on a raise. +retry(N, Node) -> + fun (Ctx) -> retry_step(N, Node, Ctx) end. + +retry_step(N, Node, Ctx) -> + case flow:is_susp(Ctx) of + true -> Ctx; + false -> + try Node(Ctx) of + R -> R + catch + throw:Reason -> retry_reraise(N, Node, Ctx, throw, Reason); + error:Reason -> retry_reraise(N, Node, Ctx, error, Reason); + exit:Reason -> retry_reraise(N, Node, Ctx, exit, Reason) + end + end. + +retry_reraise(N, Node, Ctx, Class, Reason) -> + case N =< 1 of + false -> retry_step(N - 1, Node, Ctx); + true -> + case Class of + throw -> throw(Reason); + error -> erlang:error(Reason); + exit -> exit(Reason) + end + end. diff --git a/next/flow/flow_store.erl b/next/flow/flow_store.erl new file mode 100644 index 00000000..4de5925a --- /dev/null +++ b/next/flow/flow_store.erl @@ -0,0 +1,141 @@ +-module(flow_store). +-export([start_link/0, start_link/1, stop/0, + register_flow/2, resolve_flow/1, registered_flows/0, + start/2, resume/2, status/1, instances/0]). +-export([init/1, handle_call/3, handle_cast/2, handle_info/2]). +-behaviour(gen_server). + +%% flow-on-erlang durable store — the named-flow registry plus the +%% instance table that makes suspend/resume durable. flow.erl is the +%% pure replay driver; this gen_server is the stateful shell around it, +%% holding the registry (so triggers can reference flows by name, and +%% so an instance can be re-resolved + replayed after a restart) and +%% each instance's accumulated replay log. +%% +%% Crucially the driver stays OUT of any blocking context: start/resume +%% call flow:drive/3 (pure — no receive, no gen_server:call) from inside +%% handle_call, and the only message-passing is the caller's +%% gen_server:call into this store. (A blocking receive inside a `try` +%% deadlocks this cooperative scheduler, so the engine never does one.) +%% +%% State: {Registry, Instances, NextId} +%% Registry = [{Name, FlowFun}, ...] +%% Instances = [{Id, {Name, Input, Log, Status}}, ...] +%% Status = {suspended, Tag} | {done, Result} +%% Log = [{Tag, ResolvedValue}, ...] (the replay log — plain +%% data, so an instance is fully described by its log and +%% survives process restart by re-driving the named flow) +%% +%% v1 backs the store in gen_server memory; persisting the instance +%% logs to the kernel's durable log (so flows survive an OS restart) is +%% a later layer — the data shape is already restart-ready. + +start_link() -> + start_link([]). + +start_link(InitialFlows) -> + Pid = gen_server:start_link(flow_store, [InitialFlows]), + erlang:register(flow_store, Pid), + Pid. + +stop() -> + R = gen_server:call(flow_store, '$gen_stop'), + erlang:unregister(flow_store), + R. + +%% register_flow(Name, Flow) — register a named flow (a node fun). Named +%% rather than `register` to avoid the erlang:register/2 auto-import. +register_flow(Name, Flow) -> + gen_server:call(flow_store, {register_flow, Name, Flow}). + +resolve_flow(Name) -> + gen_server:call(flow_store, {resolve_flow, Name}). + +registered_flows() -> + gen_server:call(flow_store, registered_flows). + +%% start(Name, Input) -> {ok, Id, Result} | {error, no_such_flow}. +%% Result is {flow_done, V} | {flow_suspended, Tag}; the instance is +%% recorded either way so a suspended flow can be resumed by Id. +start(Name, Input) -> + gen_server:call(flow_store, {start, Name, Input}). + +%% resume(Id, Value) -> {ok, Result} | {error, Reason}. Resolves the +%% instance's current suspend tag with Value (appends {Tag, Value} to +%% its replay log) and re-drives from the top. +resume(Id, Value) -> + gen_server:call(flow_store, {resume, Id, Value}). + +%% status(Id) -> {ok, {suspended, Tag}} | {ok, {done, Result}} | not_found +status(Id) -> + gen_server:call(flow_store, {status, Id}). + +instances() -> + gen_server:call(flow_store, instances). + +%% ── gen_server ────────────────────────────────────────────────── + +init([InitialFlows]) -> + {ok, {InitialFlows, [], 1}}. + +handle_call({register_flow, Name, Flow}, _From, {Reg, Ins, N}) -> + {reply, ok, {set_keyed(Name, Flow, Reg), Ins, N}}; +handle_call({resolve_flow, Name}, _From, {Reg, Ins, N}) -> + {reply, find_keyed(Name, Reg), {Reg, Ins, N}}; +handle_call(registered_flows, _From, {Reg, Ins, N}) -> + {reply, [Name || {Name, _} <- Reg], {Reg, Ins, N}}; +handle_call({start, Name, Input}, _From, {Reg, Ins, N}) -> + case find_keyed(Name, Reg) of + not_found -> + {reply, {error, no_such_flow}, {Reg, Ins, N}}; + {ok, Flow} -> + R = flow:drive(Flow, Input, []), + Status = result_status(R), + Ins2 = set_keyed(N, {Name, Input, [], Status}, Ins), + {reply, {ok, N, R}, {Reg, Ins2, N + 1}} + end; +handle_call({resume, Id, Value}, _From, {Reg, Ins, N}) -> + case find_keyed(Id, Ins) of + not_found -> + {reply, {error, no_such_instance}, {Reg, Ins, N}}; + {ok, {_Name, _Input, _Log, {done, _}}} -> + {reply, {error, already_done}, {Reg, Ins, N}}; + {ok, {Name, Input, Log, {suspended, Tag}}} -> + case find_keyed(Name, Reg) of + not_found -> + {reply, {error, no_such_flow}, {Reg, Ins, N}}; + {ok, Flow} -> + NewLog = log_append(Log, Tag, Value), + R = flow:drive(Flow, Input, NewLog), + Status = result_status(R), + Ins2 = set_keyed(Id, {Name, Input, NewLog, Status}, Ins), + {reply, {ok, R}, {Reg, Ins2, N}} + end + end; +handle_call({status, Id}, _From, {Reg, Ins, N}) -> + case find_keyed(Id, Ins) of + {ok, {_Name, _Input, _Log, Status}} -> {reply, {ok, Status}, {Reg, Ins, N}}; + not_found -> {reply, not_found, {Reg, Ins, N}} + end; +handle_call(instances, _From, {Reg, Ins, N}) -> + {reply, [Id || {Id, _} <- Ins], {Reg, Ins, N}}. + +handle_cast(_, S) -> {noreply, S}. + +handle_info(_, S) -> {noreply, S}. + +%% ── helpers ───────────────────────────────────────────────────── + +result_status({flow_done, R}) -> {done, R}; +result_status({flow_suspended, T}) -> {suspended, T}. + +log_append([], Tag, Value) -> [{Tag, Value}]; +log_append([H | T], Tag, Value) -> [H | log_append(T, Tag, Value)]. + +find_keyed(_, []) -> not_found; +find_keyed(K, [{K, V} | _]) -> {ok, V}; +find_keyed(K, [_ | Rest]) -> find_keyed(K, Rest). + +set_keyed(K, V, []) -> [{K, V}]; +set_keyed(K, V, [{K, _} | Rest]) -> [{K, V} | Rest]; +set_keyed(K, V, [P | Rest]) -> [P | set_keyed(K, V, Rest)].