-module(flow_spec). -export([flow_node/1, flow_id/0, flow_const/1, sequence/1, parallel/1, map_flow/1, flow_while/3, flow_until/3, branch/3, fail/1, failed/1, fail_reason/1, recover/2, tap/1, attempt/1, try_catch/2, retry/2]). %% flow-on-erlang combinators — a native port of lib/flow/spec.sx, %% adapted to the railway-threaded context model in flow.erl. A node is %% `fun(Ctx) -> Ctx`; every combinator passes a {flow_susp,...} context %% straight through, so once a flow suspends nothing downstream runs. %% User code stays value-level: the predicates/functions handed to %% flow_node / branch / etc. take and return plain values, and the %% combinator threads them into the context. %% %% Variadic Scheme forms (sequence, parallel, attempt) take an explicit %% list here — the one idiom difference from the Scheme engine. Effects %% must go through a flow:suspend/1 node so they run once (in the %% driver) and replay from the log; `tap` is only for replay-safe %% effects (e.g. tracing). %% ── leaves ────────────────────────────────────────────────────── %% flow_node(F) — lift a value function F :: Value -> Value into a node. flow_node(F) -> fun (Ctx) -> case flow:is_susp(Ctx) of true -> Ctx; false -> flow:cont(F(flow:ctx_value(Ctx)), flow:ctx_log(Ctx)) end end. flow_id() -> fun (Ctx) -> Ctx end. flow_const(V) -> fun (Ctx) -> case flow:is_susp(Ctx) of true -> Ctx; false -> flow:cont(V, flow:ctx_log(Ctx)) end end. %% ── threading / fan-out / iteration ───────────────────────────── %% sequence(Nodes) — thread the context left-to-right. Each node %% self-guards on suspension, so a suspended context flows through %% untouched. sequence(Nodes) -> fun (Ctx) -> seq_step(Nodes, Ctx) end. seq_step([], Ctx) -> Ctx; seq_step([N | Ns], Ctx) -> seq_step(Ns, N(Ctx)). %% parallel(Nodes) — fan the input value to every node, join results %% into a list (sequential evaluation under one shared replay log). %% First child to suspend short-circuits the whole parallel. parallel(Nodes) -> fun (Ctx) -> case flow:is_susp(Ctx) of true -> Ctx; false -> par_step(Nodes, flow:ctx_value(Ctx), flow:ctx_log(Ctx), []) end end. par_step([], _Input, Log, Acc) -> flow:cont(lists:reverse(Acc), Log); par_step([N | Ns], Input, Log, Acc) -> R = N(flow:cont(Input, Log)), case flow:is_susp(R) of true -> R; false -> par_step(Ns, Input, Log, [flow:ctx_value(R) | Acc]) end. %% map_flow(Node) — run Node over each item of a list input value. map_flow(Node) -> fun (Ctx) -> case flow:is_susp(Ctx) of true -> Ctx; false -> map_step(Node, flow:ctx_value(Ctx), flow:ctx_log(Ctx), []) end end. map_step(_, [], Log, Acc) -> flow:cont(lists:reverse(Acc), Log); map_step(Node, [I | Is], Log, Acc) -> R = Node(flow:cont(I, Log)), case flow:is_susp(R) of true -> R; false -> map_step(Node, Is, Log, [flow:ctx_value(R) | Acc]) end. %% flow_while(Pred, Body, Max) — re-run Body (a node), threading the %% context, while Pred(value) holds, up to Max steps. Pred :: Value -> %% bool; Body :: node. flow_while(Pred, Body, Max) -> fun (Ctx) -> while_step(Pred, Body, Ctx, Max) end. while_step(_, _, Ctx, N) when N =< 0 -> Ctx; while_step(Pred, Body, Ctx, N) -> case flow:is_susp(Ctx) of true -> Ctx; false -> case Pred(flow:ctx_value(Ctx)) of true -> while_step(Pred, Body, Body(Ctx), N - 1); _ -> Ctx end end. %% flow_until(Pred, Body, Max) — re-run Body until Pred(value) holds. flow_until(Pred, Body, Max) -> fun (Ctx) -> until_step(Pred, Body, Ctx, Max) end. until_step(_, _, Ctx, N) when N =< 0 -> Ctx; until_step(Pred, Body, Ctx, N) -> case flow:is_susp(Ctx) of true -> Ctx; false -> case Pred(flow:ctx_value(Ctx)) of true -> Ctx; _ -> until_step(Pred, Body, Body(Ctx), N - 1) end end. %% ── branching ─────────────────────────────────────────────────── %% branch(Pred, Then, Else) — Pred :: Value -> bool; Then/Else :: node. branch(Pred, Then, Else) -> fun (Ctx) -> case flow:is_susp(Ctx) of true -> Ctx; false -> case Pred(flow:ctx_value(Ctx)) of true -> Then(Ctx); _ -> Else(Ctx) end end end. %% ── railway-style failure (values, not exceptions) ────────────── fail(Reason) -> {flow_fail, Reason}. failed({flow_fail, _}) -> true; failed(_) -> false. fail_reason({flow_fail, R}) -> R. %% recover(Node, Handler) — if Node yields a fail VALUE, run Handler on %% the reason; else pass through. Handler :: Reason -> Value. recover(Node, Handler) -> fun (Ctx) -> R = Node(Ctx), case flow:is_susp(R) of true -> R; false -> V = flow:ctx_value(R), case failed(V) of true -> flow:cont(Handler(fail_reason(V)), flow:ctx_log(R)); false -> R end end end. %% tap(Effect) — replay-safe side-effecting pass-through (returns the %% input value unchanged). Effect :: Value -> any. tap(Effect) -> fun (Ctx) -> case flow:is_susp(Ctx) of true -> Ctx; false -> Effect(flow:ctx_value(Ctx)), Ctx end end. %% attempt(Nodes) — railway sequence: thread left-to-right but stop at %% the first node whose value is a fail, returning that failure. attempt(Nodes) -> fun (Ctx) -> attempt_step(Nodes, Ctx) end. attempt_step([], Ctx) -> Ctx; attempt_step([N | Ns], Ctx) -> case flow:is_susp(Ctx) of true -> Ctx; false -> case failed(flow:ctx_value(Ctx)) of true -> Ctx; false -> attempt_step(Ns, N(Ctx)) end end. %% ── exception-style control ───────────────────────────────────── %% Nodes are pure (effects go through suspend, run by the driver), so a %% try around a node never wraps a blocking receive — safe in this %% runtime. %% try_catch(Node, Handler) — run Node; if it raises, run Handler on the %% exception. Handler :: Exception -> Value. try_catch(Node, Handler) -> fun (Ctx) -> case flow:is_susp(Ctx) of true -> Ctx; false -> Log = flow:ctx_log(Ctx), try Node(Ctx) of R -> R catch throw:E -> flow:cont(Handler(E), Log); error:E -> flow:cont(Handler(E), Log); exit:E -> flow:cont(Handler(E), Log) end end end. %% retry(N, Node) — run Node, retrying up to N attempts on a raise. retry(N, Node) -> fun (Ctx) -> retry_step(N, Node, Ctx) end. retry_step(N, Node, Ctx) -> case flow:is_susp(Ctx) of true -> Ctx; false -> try Node(Ctx) of R -> R catch throw:Reason -> retry_reraise(N, Node, Ctx, throw, Reason); error:Reason -> retry_reraise(N, Node, Ctx, error, Reason); exit:Reason -> retry_reraise(N, Node, Ctx, exit, Reason) end end. retry_reraise(N, Node, Ctx, Class, Reason) -> case N =< 1 of false -> retry_step(N - 1, Node, Ctx); true -> case Class of throw -> throw(Reason); error -> erlang:error(Reason); exit -> exit(Reason) end end.