# flow — durable DAG workflows on Scheme `flow` is a workflow engine for rose-ash: content pipelines (write → review → publish → federate), scheduled jobs, and multi-step user flows (signup, confirm, onboard) that **survive process restarts**. It is a thin Scheme prelude over the Scheme-on-SX guest (`lib/scheme/`); a flow runs *inside* the interpreter. Run the suite: `bash lib/flow/conformance.sh` → **151/151 across 10 suites**. ## Model A **flow** is just a Scheme procedure of one argument — the upstream value: ``` node : input -> output ``` Combinators build composite nodes out of child nodes. A node that ignores its argument is effectively a thunk. There is no separate "graph" object: composition *is* function composition, so flows are values you can name, pass, and nest. ```scheme (defflow publish (sequence (lambda (draft) (string-append draft "!")) (branch (lambda (post) (>= (string-length post) 3)) (remote-node 'fed 'publish) (flow-const 'rejected)))) (flow/start publish "hello") ; => federated, or a (flow-suspended id tag) state ``` ## Building blocks (`spec.sx`) | Combinator | Meaning | |---|---| | `(flow-node f)` / `(flow-id x)` / `(flow-const v)` | leaf nodes | | `(sequence n ...)` | thread input left-to-right | | `(parallel n ...)` | fan input to every child, join results into a list (sequential eval) | | `(map-flow node)` | run `node` over each item of a list input, join results | | `(flow-while pred body max)` / `(flow-until ...)` | bounded iteration (cap `max` steps) | | `(defflow name body)` | bind + register a named flow (so it survives restart) | ## Control flow + errors (`spec.sx`) | Combinator | Meaning | |---|---| | `(branch pred then else)` | `pred` on input selects `then`/`else` (`cond` is a Scheme special form) | | `(retry n node)` | re-run on a *raised exception*, up to `n` attempts | | `(timeout budget node)` | cooperative **step budget**: nodes call `(tick)`; the `(budget+1)`-th tick raises `flow-timeout` | | `(try-catch node handler)` | catch a raised exception → `(handler error)` | | `(fail reason)` / `(failed? x)` / `(fail-reason x)` | explicit failure *values* (flow downstream as data) | | `(recover node handler)` | the fail-VALUE counterpart of try-catch | | `(attempt n ...)` | railway sequence: stop at the first node returning a `(fail ...)` | | `(tap effect)` | run a side effect, return input unchanged | **Two error channels, on purpose.** Raised exceptions are for *bugs/transients* (caught by `retry`/`try-catch`). `(fail reason)` values are for *expected business outcomes* (validation rejected, declined) and compose via `attempt`/`recover`. ## Suspend / resume — the durable core (`spec.sx`, `store.sx`) The guest Scheme's `call/cc` is **escape-only** — re-invoking a captured continuation after it returns *hangs* the runtime. So flow does **not** serialize continuations. Instead it uses **deterministic replay**: - `(suspend tag)` — if `tag` is already in the replay log, return its logged value; otherwise escape to the driver as `(flow-suspended tag)`. - `resume` appends `(tag value)` to the log and **re-runs the flow from the start**. Already-resolved suspends replay their values; the first unresolved one escapes again (or the flow completes). The entire persisted state is the replay log — plain data. No live continuation is ever stored, so flows survive process restarts and even moves between instances. > **Author contract:** suspend `tag`s must be unique and deterministic across > replays, and **all** non-determinism / side effects must go through suspend > points (so their results are logged) — otherwise they re-run on every replay. ### Lifecycle (`store.sx`) ```scheme (flow/start flow input) ; raw result if it completes, else (flow-suspended id tag) (flow/resume id value) ; inject value at the waiting tag, continue (flow/cancel id) ; terminate; a later resume is rejected ``` ### Introspection & hygiene ```scheme (flow/status id) ; done | suspended | cancelled | unknown (flow/result id) ; result if done, else (flow-error reason) (flow/list) ; ((id status) ...) (flow/pending) ; ((id waiting-tag) ...) — what each suspended flow awaits (flow/gc) ; drop terminal records, keep live ones; returns count removed (flow/forget id) ; drop one terminal record (refuses live flows) ``` ### Crash recovery ```scheme (flow-store-export) ; the store as plain data (live procs nulled) (flow-store-import! d) ; restore the store from exported data (flow-resumable-ids) ; ids of suspended flows to wake on restart ``` On restart the flow definitions are reloaded (`defflow` re-registers names) and the exported store reimported; `resume` re-resolves each flow's procedure **by name**. ## Distribution via fed-sx (`remote.sx`) ```scheme (flow-peer-register! addr table) ; mock a peer's exposed functions (fed-sx boundary) (remote-node addr fn) ; run a node on a peer (remote-failover addrs fn local) ; try peers in order, fall through to a local node (flow-replicate-to addr) ; copy this store to a peer's replica slot (flow-restore-from addr) ; import a peer's replica (handoff) ``` **Handoff** is crash recovery across instances: replicate → local instance dies → peer restores the (plain-data) store and resumes. The replay log carries over, so all resolved suspends survive the move. ## Files | File | Contents | |---|---| | `spec.sx` | combinators (flow-combinators-src / flow-control-src / flow-suspend-src) | | `store.sx` | durable store, lifecycle, crash recovery, introspection, hygiene | | `remote.sx` | fed-sx transport (mock peer registry), failover, replication | | `api.sx` | `flow-make-env` / `flow-run` SX helpers (one cached env, per-test reset) | | `tests/*.sx` | 10 suites, 151 cases | | `conformance.sh` | loads substrate + flow layer, runs every suite | ## Notes on the substrate The guest Scheme (`lib/scheme/`, imported read-only) lacks dotted-rest params `(a . rest)` and named `let`; combinators use `(lambda args ...)` variadics + top- level recursion. `cons` is list-only (no dotted pairs), so log/assoc entries are 2-element lists. Strings box as `{:scm-string "..."}`. Timeout is a step budget because there is no wall clock; `parallel` is sequential for the same reason.