From c2d628e9c394d6a17641838c8bbf54d0cf8a4fde Mon Sep 17 00:00:00 2001 From: giles Date: Sat, 6 Jun 2026 18:37:10 +0000 Subject: [PATCH] =?UTF-8?q?flow:=20README=20=E2=80=94=20API=20reference=20?= =?UTF-8?q?+=20deterministic-replay=20contract?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit User-facing docs for the flow engine: the node model, every combinator, the suspend/resume durability contract (escape-only call/cc -> deterministic replay), lifecycle/introspection/hygiene API, fed-sx distribution, and substrate notes. Doc-only; 151/151 unchanged. Co-Authored-By: Claude Opus 4.8 (1M context) --- lib/flow/README.md | 141 ++++++++++++++++++++++++++++++++++++++++++++ plans/flow-on-sx.md | 6 ++ 2 files changed, 147 insertions(+) create mode 100644 lib/flow/README.md diff --git a/lib/flow/README.md b/lib/flow/README.md new file mode 100644 index 00000000..9c2dee4c --- /dev/null +++ b/lib/flow/README.md @@ -0,0 +1,141 @@ +# flow — durable DAG workflows on Scheme + +`flow` is a workflow engine for rose-ash: content pipelines (write → review → +publish → federate), scheduled jobs, and multi-step user flows (signup, confirm, +onboard) that **survive process restarts**. It is a thin Scheme prelude over the +Scheme-on-SX guest (`lib/scheme/`); a flow runs *inside* the interpreter. + +Run the suite: `bash lib/flow/conformance.sh` → **151/151 across 10 suites**. + +## Model + +A **flow** is just a Scheme procedure of one argument — the upstream value: + +``` +node : input -> output +``` + +Combinators build composite nodes out of child nodes. A node that ignores its +argument is effectively a thunk. There is no separate "graph" object: composition +*is* function composition, so flows are values you can name, pass, and nest. + +```scheme +(defflow publish + (sequence + (lambda (draft) (string-append draft "!")) + (branch (lambda (post) (>= (string-length post) 3)) + (remote-node 'fed 'publish) + (flow-const 'rejected)))) + +(flow/start publish "hello") ; => federated, or a (flow-suspended id tag) state +``` + +## Building blocks (`spec.sx`) + +| Combinator | Meaning | +|---|---| +| `(flow-node f)` / `(flow-id x)` / `(flow-const v)` | leaf nodes | +| `(sequence n ...)` | thread input left-to-right | +| `(parallel n ...)` | fan input to every child, join results into a list (sequential eval) | +| `(map-flow node)` | run `node` over each item of a list input, join results | +| `(flow-while pred body max)` / `(flow-until ...)` | bounded iteration (cap `max` steps) | +| `(defflow name body)` | bind + register a named flow (so it survives restart) | + +## Control flow + errors (`spec.sx`) + +| Combinator | Meaning | +|---|---| +| `(branch pred then else)` | `pred` on input selects `then`/`else` (`cond` is a Scheme special form) | +| `(retry n node)` | re-run on a *raised exception*, up to `n` attempts | +| `(timeout budget node)` | cooperative **step budget**: nodes call `(tick)`; the `(budget+1)`-th tick raises `flow-timeout` | +| `(try-catch node handler)` | catch a raised exception → `(handler error)` | +| `(fail reason)` / `(failed? x)` / `(fail-reason x)` | explicit failure *values* (flow downstream as data) | +| `(recover node handler)` | the fail-VALUE counterpart of try-catch | +| `(attempt n ...)` | railway sequence: stop at the first node returning a `(fail ...)` | +| `(tap effect)` | run a side effect, return input unchanged | + +**Two error channels, on purpose.** Raised exceptions are for *bugs/transients* +(caught by `retry`/`try-catch`). `(fail reason)` values are for *expected business +outcomes* (validation rejected, declined) and compose via `attempt`/`recover`. + +## Suspend / resume — the durable core (`spec.sx`, `store.sx`) + +The guest Scheme's `call/cc` is **escape-only** — re-invoking a captured +continuation after it returns *hangs* the runtime. So flow does **not** serialize +continuations. Instead it uses **deterministic replay**: + +- `(suspend tag)` — if `tag` is already in the replay log, return its logged value; + otherwise escape to the driver as `(flow-suspended tag)`. +- `resume` appends `(tag value)` to the log and **re-runs the flow from the start**. + Already-resolved suspends replay their values; the first unresolved one escapes + again (or the flow completes). + +The entire persisted state is the replay log — plain data. No live continuation is +ever stored, so flows survive process restarts and even moves between instances. + +> **Author contract:** suspend `tag`s must be unique and deterministic across +> replays, and **all** non-determinism / side effects must go through suspend +> points (so their results are logged) — otherwise they re-run on every replay. + +### Lifecycle (`store.sx`) + +```scheme +(flow/start flow input) ; raw result if it completes, else (flow-suspended id tag) +(flow/resume id value) ; inject value at the waiting tag, continue +(flow/cancel id) ; terminate; a later resume is rejected +``` + +### Introspection & hygiene + +```scheme +(flow/status id) ; done | suspended | cancelled | unknown +(flow/result id) ; result if done, else (flow-error reason) +(flow/list) ; ((id status) ...) +(flow/pending) ; ((id waiting-tag) ...) — what each suspended flow awaits +(flow/gc) ; drop terminal records, keep live ones; returns count removed +(flow/forget id) ; drop one terminal record (refuses live flows) +``` + +### Crash recovery + +```scheme +(flow-store-export) ; the store as plain data (live procs nulled) +(flow-store-import! d) ; restore the store from exported data +(flow-resumable-ids) ; ids of suspended flows to wake on restart +``` + +On restart the flow definitions are reloaded (`defflow` re-registers names) and the +exported store reimported; `resume` re-resolves each flow's procedure **by name**. + +## Distribution via fed-sx (`remote.sx`) + +```scheme +(flow-peer-register! addr table) ; mock a peer's exposed functions (fed-sx boundary) +(remote-node addr fn) ; run a node on a peer +(remote-failover addrs fn local) ; try peers in order, fall through to a local node +(flow-replicate-to addr) ; copy this store to a peer's replica slot +(flow-restore-from addr) ; import a peer's replica (handoff) +``` + +**Handoff** is crash recovery across instances: replicate → local instance dies → +peer restores the (plain-data) store and resumes. The replay log carries over, so +all resolved suspends survive the move. + +## Files + +| File | Contents | +|---|---| +| `spec.sx` | combinators (flow-combinators-src / flow-control-src / flow-suspend-src) | +| `store.sx` | durable store, lifecycle, crash recovery, introspection, hygiene | +| `remote.sx` | fed-sx transport (mock peer registry), failover, replication | +| `api.sx` | `flow-make-env` / `flow-run` SX helpers (one cached env, per-test reset) | +| `tests/*.sx` | 10 suites, 151 cases | +| `conformance.sh` | loads substrate + flow layer, runs every suite | + +## Notes on the substrate + +The guest Scheme (`lib/scheme/`, imported read-only) lacks dotted-rest params +`(a . rest)` and named `let`; combinators use `(lambda args ...)` variadics + top- +level recursion. `cons` is list-only (no dotted pairs), so log/assoc entries are +2-element lists. Strings box as `{:scm-string "..."}`. Timeout is a step budget +because there is no wall clock; `parallel` is sequential for the same reason. diff --git a/plans/flow-on-sx.md b/plans/flow-on-sx.md index 108e74f9..c14d8631 100644 --- a/plans/flow-on-sx.md +++ b/plans/flow-on-sx.md @@ -193,6 +193,12 @@ remote-node + crash-recovery + handoff + introspection together. registry. Timeout is a cooperative step budget (no wall clock in pure SX). Test harness reuses one env with a per-test reset for speed. +- **Phases 5-7 + docs.** Operational API (introspection, hygiene), combinator + library (tap/recover/map-flow/while/until), railway `attempt`, end-to-end + integration suite, and `lib/flow/README.md` (full API reference + replay-semantics + contract). **151/151 across 10 suites.** Conformance sx_server timeout raised to + 540s for the 10-suite run under shared-machine CPU contention. + ## Blockers (none)