Files
rose-ash/lib/flow
giles 9cfca1d008
Some checks failed
Test, Build, and Deploy / test-build-deploy (push) Failing after 34s
flow: reference host driver flow-drive-host/flow-run-host + 4 tests
Completes the host ABI from work-queue to driver loop: the host supplies only a
(kind payload) -> answer dispatch fn; flow-drive-host services one tick of pending
requests, flow-run-host ticks until quiescent (bounded). Tested via the art-dag
render -> human-review -> publish pipeline driven entirely by flow-run-host. The
art-dag integration is now: define dispatch, call flow-run-host. 166/166, 11 suites.

Co-Authored-By: Claude Opus 4.8 (1M context) <noreply@anthropic.com>
2026-06-06 19:33:04 +00:00
..

flow — durable DAG workflows on Scheme

flow is a workflow engine for rose-ash: content pipelines (write → review → publish → federate), scheduled jobs, and multi-step user flows (signup, confirm, onboard) that survive process restarts. It is a thin Scheme prelude over the Scheme-on-SX guest (lib/scheme/); a flow runs inside the interpreter.

Run the suite: bash lib/flow/conformance.sh151/151 across 10 suites.

Model

A flow is just a Scheme procedure of one argument — the upstream value:

node : input -> output

Combinators build composite nodes out of child nodes. A node that ignores its argument is effectively a thunk. There is no separate "graph" object: composition is function composition, so flows are values you can name, pass, and nest.

(defflow publish
  (sequence
    (lambda (draft) (string-append draft "!"))
    (branch (lambda (post) (>= (string-length post) 3))
            (remote-node 'fed 'publish)
            (flow-const 'rejected))))

(flow/start publish "hello")   ; => federated, or a (flow-suspended id tag) state

Building blocks (spec.sx)

Combinator Meaning
(flow-node f) / (flow-id x) / (flow-const v) leaf nodes
(sequence n ...) thread input left-to-right
(parallel n ...) fan input to every child, join results into a list (sequential eval)
(map-flow node) run node over each item of a list input, join results
(flow-while pred body max) / (flow-until ...) bounded iteration (cap max steps)
(defflow name body) bind + register a named flow (so it survives restart)

Control flow + errors (spec.sx)

Combinator Meaning
(branch pred then else) pred on input selects then/else (cond is a Scheme special form)
(retry n node) re-run on a raised exception, up to n attempts
(timeout budget node) cooperative step budget: nodes call (tick); the (budget+1)-th tick raises flow-timeout
(try-catch node handler) catch a raised exception → (handler error)
(fail reason) / (failed? x) / (fail-reason x) explicit failure values (flow downstream as data)
(recover node handler) the fail-VALUE counterpart of try-catch
(attempt n ...) railway sequence: stop at the first node returning a (fail ...)
(tap effect) run a side effect, return input unchanged

Two error channels, on purpose. Raised exceptions are for bugs/transients (caught by retry/try-catch). (fail reason) values are for expected business outcomes (validation rejected, declined) and compose via attempt/recover.

Suspend / resume — the durable core (spec.sx, store.sx)

The guest Scheme's call/cc is escape-only — re-invoking a captured continuation after it returns hangs the runtime. So flow does not serialize continuations. Instead it uses deterministic replay:

  • (suspend tag) — if tag is already in the replay log, return its logged value; otherwise escape to the driver as (flow-suspended tag).
  • resume appends (tag value) to the log and re-runs the flow from the start. Already-resolved suspends replay their values; the first unresolved one escapes again (or the flow completes).

The entire persisted state is the replay log — plain data. No live continuation is ever stored, so flows survive process restarts and even moves between instances.

Author contract: suspend tags must be unique and deterministic across replays, and all non-determinism / side effects must go through suspend points (so their results are logged) — otherwise they re-run on every replay.

Lifecycle (store.sx)

(flow/start flow input)   ; raw result if it completes, else (flow-suspended id tag)
(flow/resume id value)    ; inject value at the waiting tag, continue
(flow/cancel id)          ; terminate; a later resume is rejected

Introspection & hygiene

(flow/status id)   ; done | suspended | cancelled | unknown
(flow/result id)   ; result if done, else (flow-error reason)
(flow/list)        ; ((id status) ...)
(flow/pending)     ; ((id waiting-tag) ...) — what each suspended flow awaits
(flow/gc)          ; drop terminal records, keep live ones; returns count removed
(flow/forget id)   ; drop one terminal record (refuses live flows)

Crash recovery

(flow-store-export)      ; the store as plain data (live procs nulled)
(flow-store-import! d)   ; restore the store from exported data
(flow-resumable-ids)     ; ids of suspended flows to wake on restart

On restart the flow definitions are reloaded (defflow re-registers names) and the exported store reimported; resume re-resolves each flow's procedure by name.

Distribution via fed-sx (remote.sx)

(flow-peer-register! addr table)      ; mock a peer's exposed functions (fed-sx boundary)
(remote-node addr fn)                 ; run a node on a peer
(remote-failover addrs fn local)      ; try peers in order, fall through to a local node
(flow-replicate-to addr)              ; copy this store to a peer's replica slot
(flow-restore-from addr)              ; import a peer's replica (handoff)

Handoff is crash recovery across instances: replicate → local instance dies → peer restores the (plain-data) store and resumes. The replay log carries over, so all resolved suspends survive the move.

Files

File Contents
spec.sx combinators (flow-combinators-src / flow-control-src / flow-suspend-src)
store.sx durable store, lifecycle, crash recovery, introspection, hygiene
remote.sx fed-sx transport (mock peer registry), failover, replication
api.sx flow-make-env / flow-run SX helpers (one cached env, per-test reset)
tests/*.sx 10 suites, 151 cases
conformance.sh loads substrate + flow layer, runs every suite

Notes on the substrate

The guest Scheme (lib/scheme/, imported read-only) lacks dotted-rest params (a . rest) and named let; combinators use (lambda args ...) variadics + top- level recursion. cons is list-only (no dotted pairs), so log/assoc entries are 2-element lists. Strings box as {:scm-string "..."}. Timeout is a step budget because there is no wall clock; parallel is sequential for the same reason.