Completes the host ABI from work-queue to driver loop: the host supplies only a (kind payload) -> answer dispatch fn; flow-drive-host services one tick of pending requests, flow-run-host ticks until quiescent (bounded). Tested via the art-dag render -> human-review -> publish pipeline driven entirely by flow-run-host. The art-dag integration is now: define dispatch, call flow-run-host. 166/166, 11 suites. Co-Authored-By: Claude Opus 4.8 (1M context) <noreply@anthropic.com>
13 KiB
flow-on-sx: Durable DAG Workflows on Scheme
rose-ash needs workflows that survive restarts: content pipelines (write → review → publish → federate), scheduled jobs (digest emails), multi-step user flows (signup, confirm, onboard). art-dag is the precedent — DAG-of-tasks with pause/resume at IO boundaries.
Scheme's call/cc + delimited continuations make pause/resume natural: a suspend
captures the continuation, serializes it as part of the flow record, and resume
re-enters at exactly that point. No state-machine bookkeeping by hand. R7RS-small is
already at 2644/2644 (see kernel/architecture status).
End-state: a Scheme-on-SX layer over the existing scheme runtime, with combinators for sequence/parallel/branch/retry/timeout/suspend, persistent flow store, and a federation extension via fed-sx for remote-node execution.
Status (rolling)
bash lib/flow/conformance.sh → 166/166 (Phases 1-8 complete; host ABI + reference driver)
Ground rules
- Scope: only touch
lib/flow/**andplans/flow-on-sx.md. Do not editspec/,hosts/,shared/,lib/scheme/**, or otherlib/<lang>/. You may import fromlib/scheme/(public API vialib/scheme/scheme.sx); do not modify Scheme. - Shared-file issues go under "Blockers" with a minimal repro; do not fix here.
- SX files: use
sx-treeMCP tools only. - Architecture: flow combinators are Scheme macros + procedures. Runtime is a
driver loop that walks the flow graph and invokes
call/ccatsuspendpoints. Persistence layer serializes the continuation + open file/socket placeholders are forbidden (continuations must be resumable across process restart). - art-dag awareness: read
plans/art-dag*if it exists for design lineage; do not import code. - Commits: one feature per commit. Keep Progress log updated and tick boxes.
Architecture sketch
(defflow publish
(sequence
(write-content)
(parallel
(review)
(spell-check))
(cond approved?
(sequence (publish) (federate))
(notify-author))))
│
▼
lib/flow/spec.sx lib/flow/runtime.sx lib/flow/store.sx
— defflow — driver loop — append-only flow log
— sequence/parallel — node dispatch — checkpoint serialize
— cond/retry/timeout — call/cc at suspend — restart loader
— suspend/resume │ │
▼ ▼
lib/flow/api.sx lib/flow/remote.sx
— (flow/start name args) — fed-sx adapter
— (flow/resume id value) — node-on-peer execution
— (flow/cancel id) — failure handling
Phase 1 — Declarative DAG + sequential execution
lib/flow/spec.sx—defflowmacro,sequencecombinator- node = Scheme procedure of one arg (upstream value threaded in); output threads to next node (data flow). A node ignoring its arg is a thunk.
parallelcombinator (sequential semantics for now — TRUE parallelism in Phase 3)- runtime executes a flow synchronously, returns final value
lib/flow/api.sx—(flow/start flow input)entry pointlib/flow/tests/basic.sx— 18 cases: single nodes, linear/nested sequence, data flow between nodes, parallel-with-join, publish-shaped flowlib/flow/scoreboard.{json,md}lib/flow/conformance.sh
Phase 2 — Control flow + error handling
condcombinator — predicate selects branch (namedbranch;condis a Scheme special form).(branch pred then else)— 6 tests.retry n— re-runs node up to n attempts on a raised exception; last exception propagates. Only raised exceptions are retried —(fail ...)values pass through. 6 tests. (Backoff deferred: no wall clock in pure SX.)timeout budget— bounds node execution via a cooperative step budget (deterministic; no scheduler/clock in pure SX). Nodes opt in via(tick);budgetticks allowed, the next raisesflow-timeout. Non-ticking nodes are unbounded; budgets nest. 7 tests.try-catch— exception handler with reified error:(try-catch node handler)runs node; on raise, calls(handler error)and returns its value. 6 tests.- error model — exceptions vs explicit
(fail reason)results:fail/failed?/fail-reasonproduce/inspect failure values that flow downstream as data (distinct from raised exceptions caught by retry/try-catch). 6 tests. lib/flow/tests/control.sx— 31 cases: branch, error model, try-catch, retry, timeout + compositions
Phase 3 — Suspend / resume (the showcase)
(suspend tag)— guest call/cc is ESCAPE-ONLY (re-entry hangs), so resume uses deterministic replay: suspend escapes to the driver as(flow-suspended tag); resume re-runs the flow, replaying resolved suspends from a(tag value)log. No live continuation is ever serialized — the log is plain data.lib/flow/store.sx— flow store: id→record(flow input log status payload);flow-driveruns a flow against a replay log.(flow/resume id value)— append(tag value)to the log, re-drive; raw result on completion,(flow-suspended id tag)on a further suspend.(flow/cancel id)— mark cancelled; a later resume is rejected (stale replay cannot wake a cancelled flow).- crash recovery —
flow-store-export(procs nulled → plain data),flow-store-import!,flow-resumable-ids. Records are name-keyed; resume re-resolves the proc by name (defflow registers names), so a flow survives a wiped store.tests/recovery.sx, 8 cases (export/wipe/import, resumable scan, restart-at-every-step, replay-log survival). lib/flow/tests/suspend.sx— 17 cases: start/resume/cancel, multi-step, replay determinism, lifecycle guards, suspend-in-branch- Harness:
flow-runnow reuses one env with a per-test reset (building the full standard env 66× was too slow) — seeapi.sx.
Phase 4 — Distributed nodes via fed-sx
(remote-node addr fn)— execute a node on a federation peer. Transport is the fed-sx boundary, MOCKED via a peer registry (flow-peer-register!); raisesflow-remote-unreachable/flow-remote-no-fn. Composes with sequence, suspend, retry.tests/distributed.sx, 7 cases.- failure semantics —
(remote-failover addrs fn local)tries each peer in order, moves to the next on any raised error, and runs thelocalnode if every peer fails. 6 tests. - persistence across instances —
(flow-replicate-to addr)copies this instance's store (the plain-data export) to a peer's replica slot;(flow-restore-from addr)imports it. Same mechanism as crash recovery, across instances. - handoff — a flow started here resumes on a peer after the local instance dies:
replicate → wipe local store → restore on peer →
flow/resume. The replay log (and thus all resolved suspends) survives the move. lib/flow/tests/distributed.sx— 19 cases: remote-node, failover, replication, handoff (including replay-log survival across the move)
Phase 5 — Operational API + combinator library
The four roadmap phases are complete; this phase rounds out the engine into something operators and authors actually use. Accumulation, not a rewrite.
- introspection API —
flow/status id,flow/result id,flow/list,flow/pending(operator view of what each suspended flow awaits). 12 tests intests/api.sx. - store hygiene —
flow/gcdrops terminal (done/cancelled) records keeping live suspended flows (returns count);flow/forget iddrops one terminal record and refuses live flows. Bounds unbounded store growth. 9 tests intests/hygiene.sx. tap— side-effecting pass-through node (logging/metrics) that returns inputrecover— complement to try-catch for the fail-VALUE channel: run node; if it yields(fail ...), run a recovery node on the reasonmap-flow— run a flow per item of a list, join results (sequential)flow-while/flow-until— bounded iteration: re-run body threading the value while/until pred holds, capped atmaxsteps (deterministic bound)lib/flow/tests/api.sx(12) +lib/flow/tests/combinators.sx(17)
Phase 6 — Railway-oriented composition
Make the (fail reason) value channel compose into real validation/ETL pipelines.
attempt— likesequence, but short-circuits at the first node that returns a(fail ...)value, returning that failure (the railway track). Pairs withrecoverfor the rejoin.lib/flow/tests/railway.sx— 10 cases: fail short-circuiting, no-run-after- failure, recover rejoin, validation pipeline reporting the failing stage
Phase 8 — Host integration ABI (art-dag / human-in-the-loop)
suspend is the seam to the outside world, but a bare tag is an ad-hoc convention.
This phase defines a stable request/response contract a host (an art-dag driver, a
review UI) codes against — so flow can orchestrate art-dag with human decision
points later without reverse-engineering tag shapes. lib/flow/host.sx.
(request kind payload)— suspend with a typed(flow-request kind payload)envelope; evaluates to the host's resume value.await-human/await-render/await-effectsugar.(flow-host-requests)— the host work queue:(id kind payload)for every suspended flow waiting on a host request;request?/request-kind/request-payloadparse a tag.(flow-drive-host dispatch)/(flow-run-host dispatch maxticks)— reference host driver: the host supplies only a(kind payload) -> answerdispatch fn; the loop drains pending requests and resumes until quiescent (bounded).lib/flow/tests/host.sx— 15 cases incl. the art-dag-shaped driver loop (render → human-review → publish) run both manually and viaflow-run-host.- Contract (documented in
host.sx+ README): the host owns IO + persistence; a flow never does IO, it onlyrequests; the host performs the effect and feeds the result back via resume (logged, so not re-run on recovery). NOT done here (host side, out oflib/flowscope): the real Celery/IPFS bridge and a persistent store backend — those live in the art-dag integration, coding against this ABI.
Phase 7 — End-to-end integration
Prove the phases compose: realistic flows exercising attempt + suspend + branch + remote-node + crash-recovery + handoff + introspection together.
lib/flow/tests/integration.sx— 10 cases: an order-processing flow (validate → payment suspend → branch → ledger federation) and an onboarding flow, run through the full lifecycle including a simulated crash and a peer handoff mid-flow, plus introspection (flow/pending/status/result) during the flow's life
Progress log
-
Phase 1 (combinators + sequential runtime). Flow built as a Scheme prelude loaded onto
scheme-standard-env: a flow is a Scheme procedureinput -> output, so the whole flow runs inside the interpreter (sets up Phase 3 call/cc suspend). Combinatorsflow-node/flow-id/flow-const/sequence/parallel/defflowinspec.sx;flow/start+ SX helpers (flow-make-env/flow-run) inapi.sx. 18/18 intests/basic.sx. Substrate constraints found: dotted rest params(a . rest)and namedletare unsupported inlib/scheme/eval.sx, so combinators use(lambda args ...)variadics + top-level recursion. Scheme strings come back boxed as{:scm-string "..."}— unwrap with(get s :scm-string). -
Phases 2-4. Control flow (branch/retry/timeout/try-catch + fail-value error model), then the showcase: durable suspend/resume. Guest call/cc is escape-only (re-entry hangs), so resume uses deterministic replay — re-run the flow, replaying resolved suspends from a
(tag value)log; only plain data persists, so flows survive a wiped store (crash recovery) and a move to another instance (replication + handoff). Phase 4 models the fed-sx boundary with a mock peer registry. Timeout is a cooperative step budget (no wall clock in pure SX). Test harness reuses one env with a per-test reset for speed. -
Phases 5-7 + docs. Operational API (introspection, hygiene), combinator library (tap/recover/map-flow/while/until), railway
attempt, end-to-end integration suite, andlib/flow/README.md(full API reference + replay-semantics contract). 151/151 across 10 suites. Conformance sx_server timeout raised to 540s for the 10-suite run under shared-machine CPU contention.
Blockers
(none)