Some checks failed
Test, Build, and Deploy / test-build-deploy (push) Failing after 1m10s
User-facing docs for the flow engine: the node model, every combinator, the suspend/resume durability contract (escape-only call/cc -> deterministic replay), lifecycle/introspection/hygiene API, fed-sx distribution, and substrate notes. Doc-only; 151/151 unchanged. Co-Authored-By: Claude Opus 4.8 (1M context) <noreply@anthropic.com>
142 lines
6.3 KiB
Markdown
142 lines
6.3 KiB
Markdown
# flow — durable DAG workflows on Scheme
|
|
|
|
`flow` is a workflow engine for rose-ash: content pipelines (write → review →
|
|
publish → federate), scheduled jobs, and multi-step user flows (signup, confirm,
|
|
onboard) that **survive process restarts**. It is a thin Scheme prelude over the
|
|
Scheme-on-SX guest (`lib/scheme/`); a flow runs *inside* the interpreter.
|
|
|
|
Run the suite: `bash lib/flow/conformance.sh` → **151/151 across 10 suites**.
|
|
|
|
## Model
|
|
|
|
A **flow** is just a Scheme procedure of one argument — the upstream value:
|
|
|
|
```
|
|
node : input -> output
|
|
```
|
|
|
|
Combinators build composite nodes out of child nodes. A node that ignores its
|
|
argument is effectively a thunk. There is no separate "graph" object: composition
|
|
*is* function composition, so flows are values you can name, pass, and nest.
|
|
|
|
```scheme
|
|
(defflow publish
|
|
(sequence
|
|
(lambda (draft) (string-append draft "!"))
|
|
(branch (lambda (post) (>= (string-length post) 3))
|
|
(remote-node 'fed 'publish)
|
|
(flow-const 'rejected))))
|
|
|
|
(flow/start publish "hello") ; => federated, or a (flow-suspended id tag) state
|
|
```
|
|
|
|
## Building blocks (`spec.sx`)
|
|
|
|
| Combinator | Meaning |
|
|
|---|---|
|
|
| `(flow-node f)` / `(flow-id x)` / `(flow-const v)` | leaf nodes |
|
|
| `(sequence n ...)` | thread input left-to-right |
|
|
| `(parallel n ...)` | fan input to every child, join results into a list (sequential eval) |
|
|
| `(map-flow node)` | run `node` over each item of a list input, join results |
|
|
| `(flow-while pred body max)` / `(flow-until ...)` | bounded iteration (cap `max` steps) |
|
|
| `(defflow name body)` | bind + register a named flow (so it survives restart) |
|
|
|
|
## Control flow + errors (`spec.sx`)
|
|
|
|
| Combinator | Meaning |
|
|
|---|---|
|
|
| `(branch pred then else)` | `pred` on input selects `then`/`else` (`cond` is a Scheme special form) |
|
|
| `(retry n node)` | re-run on a *raised exception*, up to `n` attempts |
|
|
| `(timeout budget node)` | cooperative **step budget**: nodes call `(tick)`; the `(budget+1)`-th tick raises `flow-timeout` |
|
|
| `(try-catch node handler)` | catch a raised exception → `(handler error)` |
|
|
| `(fail reason)` / `(failed? x)` / `(fail-reason x)` | explicit failure *values* (flow downstream as data) |
|
|
| `(recover node handler)` | the fail-VALUE counterpart of try-catch |
|
|
| `(attempt n ...)` | railway sequence: stop at the first node returning a `(fail ...)` |
|
|
| `(tap effect)` | run a side effect, return input unchanged |
|
|
|
|
**Two error channels, on purpose.** Raised exceptions are for *bugs/transients*
|
|
(caught by `retry`/`try-catch`). `(fail reason)` values are for *expected business
|
|
outcomes* (validation rejected, declined) and compose via `attempt`/`recover`.
|
|
|
|
## Suspend / resume — the durable core (`spec.sx`, `store.sx`)
|
|
|
|
The guest Scheme's `call/cc` is **escape-only** — re-invoking a captured
|
|
continuation after it returns *hangs* the runtime. So flow does **not** serialize
|
|
continuations. Instead it uses **deterministic replay**:
|
|
|
|
- `(suspend tag)` — if `tag` is already in the replay log, return its logged value;
|
|
otherwise escape to the driver as `(flow-suspended tag)`.
|
|
- `resume` appends `(tag value)` to the log and **re-runs the flow from the start**.
|
|
Already-resolved suspends replay their values; the first unresolved one escapes
|
|
again (or the flow completes).
|
|
|
|
The entire persisted state is the replay log — plain data. No live continuation is
|
|
ever stored, so flows survive process restarts and even moves between instances.
|
|
|
|
> **Author contract:** suspend `tag`s must be unique and deterministic across
|
|
> replays, and **all** non-determinism / side effects must go through suspend
|
|
> points (so their results are logged) — otherwise they re-run on every replay.
|
|
|
|
### Lifecycle (`store.sx`)
|
|
|
|
```scheme
|
|
(flow/start flow input) ; raw result if it completes, else (flow-suspended id tag)
|
|
(flow/resume id value) ; inject value at the waiting tag, continue
|
|
(flow/cancel id) ; terminate; a later resume is rejected
|
|
```
|
|
|
|
### Introspection & hygiene
|
|
|
|
```scheme
|
|
(flow/status id) ; done | suspended | cancelled | unknown
|
|
(flow/result id) ; result if done, else (flow-error reason)
|
|
(flow/list) ; ((id status) ...)
|
|
(flow/pending) ; ((id waiting-tag) ...) — what each suspended flow awaits
|
|
(flow/gc) ; drop terminal records, keep live ones; returns count removed
|
|
(flow/forget id) ; drop one terminal record (refuses live flows)
|
|
```
|
|
|
|
### Crash recovery
|
|
|
|
```scheme
|
|
(flow-store-export) ; the store as plain data (live procs nulled)
|
|
(flow-store-import! d) ; restore the store from exported data
|
|
(flow-resumable-ids) ; ids of suspended flows to wake on restart
|
|
```
|
|
|
|
On restart the flow definitions are reloaded (`defflow` re-registers names) and the
|
|
exported store reimported; `resume` re-resolves each flow's procedure **by name**.
|
|
|
|
## Distribution via fed-sx (`remote.sx`)
|
|
|
|
```scheme
|
|
(flow-peer-register! addr table) ; mock a peer's exposed functions (fed-sx boundary)
|
|
(remote-node addr fn) ; run a node on a peer
|
|
(remote-failover addrs fn local) ; try peers in order, fall through to a local node
|
|
(flow-replicate-to addr) ; copy this store to a peer's replica slot
|
|
(flow-restore-from addr) ; import a peer's replica (handoff)
|
|
```
|
|
|
|
**Handoff** is crash recovery across instances: replicate → local instance dies →
|
|
peer restores the (plain-data) store and resumes. The replay log carries over, so
|
|
all resolved suspends survive the move.
|
|
|
|
## Files
|
|
|
|
| File | Contents |
|
|
|---|---|
|
|
| `spec.sx` | combinators (flow-combinators-src / flow-control-src / flow-suspend-src) |
|
|
| `store.sx` | durable store, lifecycle, crash recovery, introspection, hygiene |
|
|
| `remote.sx` | fed-sx transport (mock peer registry), failover, replication |
|
|
| `api.sx` | `flow-make-env` / `flow-run` SX helpers (one cached env, per-test reset) |
|
|
| `tests/*.sx` | 10 suites, 151 cases |
|
|
| `conformance.sh` | loads substrate + flow layer, runs every suite |
|
|
|
|
## Notes on the substrate
|
|
|
|
The guest Scheme (`lib/scheme/`, imported read-only) lacks dotted-rest params
|
|
`(a . rest)` and named `let`; combinators use `(lambda args ...)` variadics + top-
|
|
level recursion. `cons` is list-only (no dotted pairs), so log/assoc entries are
|
|
2-element lists. Strings box as `{:scm-string "..."}`. Timeout is a step budget
|
|
because there is no wall clock; `parallel` is sequential for the same reason.
|