User-facing docs for the flow engine: the node model, every combinator, the suspend/resume durability contract (escape-only call/cc -> deterministic replay), lifecycle/introspection/hygiene API, fed-sx distribution, and substrate notes. Doc-only; 151/151 unchanged. Co-Authored-By: Claude Opus 4.8 (1M context) <noreply@anthropic.com>
flow — durable DAG workflows on Scheme
flow is a workflow engine for rose-ash: content pipelines (write → review →
publish → federate), scheduled jobs, and multi-step user flows (signup, confirm,
onboard) that survive process restarts. It is a thin Scheme prelude over the
Scheme-on-SX guest (lib/scheme/); a flow runs inside the interpreter.
Run the suite: bash lib/flow/conformance.sh → 151/151 across 10 suites.
Model
A flow is just a Scheme procedure of one argument — the upstream value:
node : input -> output
Combinators build composite nodes out of child nodes. A node that ignores its argument is effectively a thunk. There is no separate "graph" object: composition is function composition, so flows are values you can name, pass, and nest.
(defflow publish
(sequence
(lambda (draft) (string-append draft "!"))
(branch (lambda (post) (>= (string-length post) 3))
(remote-node 'fed 'publish)
(flow-const 'rejected))))
(flow/start publish "hello") ; => federated, or a (flow-suspended id tag) state
Building blocks (spec.sx)
| Combinator | Meaning |
|---|---|
(flow-node f) / (flow-id x) / (flow-const v) |
leaf nodes |
(sequence n ...) |
thread input left-to-right |
(parallel n ...) |
fan input to every child, join results into a list (sequential eval) |
(map-flow node) |
run node over each item of a list input, join results |
(flow-while pred body max) / (flow-until ...) |
bounded iteration (cap max steps) |
(defflow name body) |
bind + register a named flow (so it survives restart) |
Control flow + errors (spec.sx)
| Combinator | Meaning |
|---|---|
(branch pred then else) |
pred on input selects then/else (cond is a Scheme special form) |
(retry n node) |
re-run on a raised exception, up to n attempts |
(timeout budget node) |
cooperative step budget: nodes call (tick); the (budget+1)-th tick raises flow-timeout |
(try-catch node handler) |
catch a raised exception → (handler error) |
(fail reason) / (failed? x) / (fail-reason x) |
explicit failure values (flow downstream as data) |
(recover node handler) |
the fail-VALUE counterpart of try-catch |
(attempt n ...) |
railway sequence: stop at the first node returning a (fail ...) |
(tap effect) |
run a side effect, return input unchanged |
Two error channels, on purpose. Raised exceptions are for bugs/transients
(caught by retry/try-catch). (fail reason) values are for expected business
outcomes (validation rejected, declined) and compose via attempt/recover.
Suspend / resume — the durable core (spec.sx, store.sx)
The guest Scheme's call/cc is escape-only — re-invoking a captured
continuation after it returns hangs the runtime. So flow does not serialize
continuations. Instead it uses deterministic replay:
(suspend tag)— iftagis already in the replay log, return its logged value; otherwise escape to the driver as(flow-suspended tag).resumeappends(tag value)to the log and re-runs the flow from the start. Already-resolved suspends replay their values; the first unresolved one escapes again (or the flow completes).
The entire persisted state is the replay log — plain data. No live continuation is ever stored, so flows survive process restarts and even moves between instances.
Author contract: suspend
tags must be unique and deterministic across replays, and all non-determinism / side effects must go through suspend points (so their results are logged) — otherwise they re-run on every replay.
Lifecycle (store.sx)
(flow/start flow input) ; raw result if it completes, else (flow-suspended id tag)
(flow/resume id value) ; inject value at the waiting tag, continue
(flow/cancel id) ; terminate; a later resume is rejected
Introspection & hygiene
(flow/status id) ; done | suspended | cancelled | unknown
(flow/result id) ; result if done, else (flow-error reason)
(flow/list) ; ((id status) ...)
(flow/pending) ; ((id waiting-tag) ...) — what each suspended flow awaits
(flow/gc) ; drop terminal records, keep live ones; returns count removed
(flow/forget id) ; drop one terminal record (refuses live flows)
Crash recovery
(flow-store-export) ; the store as plain data (live procs nulled)
(flow-store-import! d) ; restore the store from exported data
(flow-resumable-ids) ; ids of suspended flows to wake on restart
On restart the flow definitions are reloaded (defflow re-registers names) and the
exported store reimported; resume re-resolves each flow's procedure by name.
Distribution via fed-sx (remote.sx)
(flow-peer-register! addr table) ; mock a peer's exposed functions (fed-sx boundary)
(remote-node addr fn) ; run a node on a peer
(remote-failover addrs fn local) ; try peers in order, fall through to a local node
(flow-replicate-to addr) ; copy this store to a peer's replica slot
(flow-restore-from addr) ; import a peer's replica (handoff)
Handoff is crash recovery across instances: replicate → local instance dies → peer restores the (plain-data) store and resumes. The replay log carries over, so all resolved suspends survive the move.
Files
| File | Contents |
|---|---|
spec.sx |
combinators (flow-combinators-src / flow-control-src / flow-suspend-src) |
store.sx |
durable store, lifecycle, crash recovery, introspection, hygiene |
remote.sx |
fed-sx transport (mock peer registry), failover, replication |
api.sx |
flow-make-env / flow-run SX helpers (one cached env, per-test reset) |
tests/*.sx |
10 suites, 151 cases |
conformance.sh |
loads substrate + flow layer, runs every suite |
Notes on the substrate
The guest Scheme (lib/scheme/, imported read-only) lacks dotted-rest params
(a . rest) and named let; combinators use (lambda args ...) variadics + top-
level recursion. cons is list-only (no dotted pairs), so log/assoc entries are
2-element lists. Strings box as {:scm-string "..."}. Timeout is a step budget
because there is no wall clock; parallel is sequential for the same reason.