flow: Phase 3 suspend/resume/cancel via deterministic replay + 17 tests
Some checks failed
Test, Build, and Deploy / test-build-deploy (push) Failing after 50s

Guest Scheme call/cc is escape-only (re-entry hangs), so durable resume uses
deterministic replay: suspend escapes to the driver; resume re-runs the flow and
replays resolved suspends from a (tag value) log. No live continuation is ever
serialized — persisted state is plain data, survives restart. Adds flow/start
(now state-returning, backward compatible), flow/resume, flow/cancel, store.sx.
Harness reuses one env with a per-test reset (full env rebuild 66x was too slow).

Co-Authored-By: Claude Opus 4.8 (1M context) <noreply@anthropic.com>
This commit is contained in:
2026-06-06 17:20:09 +00:00
parent e762cc2e32
commit e896deffc8
8 changed files with 230 additions and 48 deletions

View File

@@ -1,17 +1,24 @@
;; lib/flow/api.sx — flow runtime entry points.
;;
;; Builds a Scheme env preloaded with the flow combinators (lib/flow/spec.sx)
;; plus the public flow API, and provides SX helpers to run flow programs.
;; Builds a Scheme env preloaded with the flow combinators (lib/flow/spec.sx) and
;; the durable store + lifecycle (lib/flow/store.sx), and provides SX helpers to
;; run flow programs.
;;
;; Scheme-level API (available inside flow programs):
;; (flow/start flow input) — run a flow with the given input, return result
;; (flow/start flow input) — run a flow; raw result if it completes, else
;; (flow-suspended id tag). Defined in store.sx.
;; (flow/resume id value) — resume a suspended flow (store.sx)
;; (flow/cancel id) — cancel a flow (store.sx)
;; (suspend tag) — suspension point (spec.sx)
;;
;; SX-level helpers (for hosts and tests):
;; (flow-make-env) — fresh standard env + combinators + api
;; (flow-run src) — eval a Scheme program string in a fresh flow env
;; (flow-make-env) — fresh standard env + combinators + store
;; (flow-run src) — eval a Scheme program string in a reset shared env
;; (flow-run-in env src) — eval a Scheme program string in a given env
(define flow-api-src "(define (flow/start flow input) (flow input))")
;;
;; flow-run reuses ONE env (building the full standard env is expensive) and
;; resets the mutable flow globals before each program, so tests stay isolated
;; without paying for a fresh standard env each time.
(define
flow-make-env
@@ -20,11 +27,33 @@
(let
((env (scheme-standard-env)))
(flow-load-combinators! env)
(scheme-eval-program (scheme-parse-all flow-api-src) env)
(flow-load-store! env)
env)))
(define
flow-run-in
(fn (env src) (scheme-eval-program (scheme-parse-all src) env)))
(define flow-run (fn (src) (flow-run-in (flow-make-env) src)))
(define
flow-reset-src
"(set! flow-store (list)) (set! flow-next-id 0) (set! flow-replay-log (list)) (set! flow-suspend-k #f) (set! flow-timeout-budget -1)")
(define flow-env-cache false)
(define
flow-shared-env
(fn
()
(begin
(if flow-env-cache nil (set! flow-env-cache (flow-make-env)))
flow-env-cache)))
(define
flow-run
(fn
(src)
(let
((env (flow-shared-env)))
(begin
(scheme-eval-program (scheme-parse-all flow-reset-src) env)
(scheme-eval-program (scheme-parse-all src) env)))))

View File

@@ -23,6 +23,7 @@ VERBOSE="${1:-}"
SUITES=(
"basic flow-basic-tests-run! lib/flow/tests/basic.sx"
"control flow-ctl-tests-run! lib/flow/tests/control.sx"
"suspend flow-sus-tests-run! lib/flow/tests/suspend.sx"
)
TMPFILE=$(mktemp); trap "rm -f $TMPFILE" EXIT
@@ -39,6 +40,7 @@ emit_eval () { echo "(epoch $EPOCH)"; echo "(eval \"$1\")"; EPOCH=$((EPOCH+1));
emit_load "lib/scheme/eval.sx"
emit_load "lib/scheme/runtime.sx"
emit_load "lib/flow/spec.sx"
emit_load "lib/flow/store.sx"
emit_load "lib/flow/api.sx"
for SUITE in "${SUITES[@]}"; do
read -r _NAME _RUNNER FILE <<< "$SUITE"

View File

@@ -1,10 +1,11 @@
{
"total": 49,
"passed": 49,
"total": 66,
"passed": 66,
"failed": 0,
"suites": {
"basic": { "passed": 18, "total": 18 },
"control": { "passed": 31, "total": 31 }
"control": { "passed": 31, "total": 31 },
"suspend": { "passed": 17, "total": 17 }
},
"phases": { "phase1": "done", "phase2": "done", "phase3": "pending" }
"phases": { "phase1": "done", "phase2": "done", "phase3": "in-progress" }
}

View File

@@ -1,6 +1,6 @@
# flow-on-sx Scoreboard
**All tests pass: 49 / 49 across 2 suites.**
**All tests pass: 66 / 66 across 3 suites.**
`bash lib/flow/conformance.sh`
@@ -10,6 +10,7 @@
|-------|--------:|--------|
| basic | 18 | Phase 1: single nodes, linear sequence, data-flow threading, defflow, parallel fan/join, nested composition, publish-shaped flow |
| control | 31 | Phase 2: `branch` (6); error model `fail`/`failed?`/`fail-reason` (6); `try-catch` (6); `retry n` (6); `timeout` cooperative step budget (7) |
| suspend | 17 | Phase 3: suspend/resume/cancel via deterministic replay; multi-step, replay determinism, lifecycle guards, suspend-in-branch |
## Architecture
@@ -37,5 +38,6 @@ capture the flow continuation directly.
- [x] Phase 1 — Declarative DAG + sequential execution (combinators + 18 tests, `flow/start`)
- [x] Phase 2 — Control flow + error handling (branch, error model, try-catch, retry, timeout)
- [~] Phase 3 — Suspend/resume (suspend/resume/cancel done via deterministic replay; crash-recovery next)
- [ ] Phase 3 — Suspend / resume (the showcase)
- [ ] Phase 4 — Distributed nodes via fed-sx

View File

@@ -4,37 +4,29 @@
;; node : input -> output
;; A leaf node ignoring its argument is effectively a thunk. Combinators
;; build composite nodes out of child nodes. The whole flow runs INSIDE the
;; Scheme interpreter so that Phase 3's `suspend` (call/cc) can capture the
;; flow continuation directly.
;; Scheme interpreter.
;;
;; Phase 1 combinators (flow-combinators-src):
;; (flow-node f) — wrap a 1-arg procedure as a node (identity)
;; (flow-id input) — pass the upstream value through unchanged
;; (flow-const v) — node that ignores input and yields v
;; (sequence n ...) — thread input left-to-right through children
;; (parallel n ...) — fan input to every child, join results into a list
;; (SEQUENTIAL evaluation; true concurrency is Phase 3)
;; (defflow name body)— bind a named flow
;; flow-node / flow-id / flow-const / sequence / parallel / defflow
;;
;; Phase 2 combinators (flow-control-src):
;; (branch pred then else) — pred on input selects then/else node
;; (`cond` is a Scheme special form, so the combinator is named `branch`)
;; (fail reason) — make an explicit failure value (data, not an exception)
;; (failed? x) — is x a failure value?
;; (fail-reason x) — the reason carried by a failure value
;; (try-catch node handler) — run node; if it raises, call (handler error)
;; with the reified error and return the handler's value
;; (retry n node) — run node, re-running up to n attempts total on a raised
;; exception; the last attempt's exception propagates. Only RAISED exceptions
;; are retried — explicit (fail ...) values pass through unchanged. (Once a
;; node has suspended in Phase 3, retry does not re-run it; resume continues.)
;; (timeout budget node) — bound node by a COOPERATIVE STEP BUDGET. There is no
;; scheduler or wall clock in pure SX, so timeout is deterministic: a node opts
;; in by calling (tick) at safe points. `budget` ticks are allowed; the next
;; tick raises (quote flow-timeout) (catchable by try-catch). A node that never
;; ticks is unbounded. Budgets nest (save/restore) and are isolated per flow
;; run (fresh env per flow-make-env).
;; (tick) — consume one unit of the active timeout budget
;; branch / fail / failed? / fail-reason / try-catch / retry / timeout / tick
;;
;; Phase 3 suspend core (flow-suspend-src):
;; The guest Scheme's call/cc is ESCAPE-ONLY (re-invoking a captured k after it
;; returns hangs the runtime), so suspend/resume CANNOT re-enter a continuation.
;; Instead, durability uses DETERMINISTIC REPLAY: a flow re-runs from the start
;; on each resume; suspend points that have already been resolved replay their
;; logged value, and the first unresolved suspend escapes back to the driver.
;; The entire persisted state is the replay log (plain (tag value) data), which
;; survives process restart — no live continuation is ever serialized.
;;
;; (suspend tag) — if tag is in the replay log, return its value; else escape
;; to the driver as (flow-suspended tag). tags must be unique & deterministic
;; across replays. ALL effects/non-determinism must go through suspend so their
;; results are logged (otherwise they re-run on every replay).
;; (flow-drive flow input log) — run flow with the given replay log; returns
;; (flow-done result) or (flow-suspended tag).
(define
flow-combinators-src
@@ -44,6 +36,10 @@
flow-control-src
"(define (branch pred then else)\n (lambda (input) (if (pred input) (then input) (else input))))\n (define (fail reason) (list (quote flow-fail) reason))\n (define (failed? x) (and (pair? x) (eq? (car x) (quote flow-fail))))\n (define (fail-reason x) (car (cdr x)))\n (define (try-catch node handler)\n (lambda (input) (guard (e (#t (handler e))) (node input))))\n (define (flow-retry-step n node input)\n (guard (e (#t (if (<= n 1) (raise e) (flow-retry-step (- n 1) node input))))\n (node input)))\n (define (retry n node) (lambda (input) (flow-retry-step n node input)))\n (define flow-timeout-budget -1)\n (define (tick)\n (if (< flow-timeout-budget 0)\n 0\n (begin\n (set! flow-timeout-budget (- flow-timeout-budget 1))\n (if (< flow-timeout-budget 0)\n (raise (quote flow-timeout))\n flow-timeout-budget))))\n (define (timeout budget node)\n (lambda (input)\n (let ((saved flow-timeout-budget))\n (set! flow-timeout-budget budget)\n (guard (e (#t (begin (set! flow-timeout-budget saved) (raise e))))\n (let ((result (node input)))\n (set! flow-timeout-budget saved)\n result)))))")
(define
flow-suspend-src
"(define flow-replay-log (list))\n (define flow-suspend-k #f)\n (define (flow-log-lookup tag log)\n (if (null? log)\n (list #f #f)\n (if (eq? (car (car log)) tag)\n (list #t (car (cdr (car log))))\n (flow-log-lookup tag (cdr log)))))\n (define (suspend tag)\n (let ((hit (flow-log-lookup tag flow-replay-log)))\n (if (car hit)\n (car (cdr hit))\n (flow-suspend-k (list (quote flow-suspended) tag)))))\n (define (flow-drive flow input log)\n (set! flow-replay-log log)\n (call/cc\n (lambda (k)\n (set! flow-suspend-k k)\n (list (quote flow-done) (flow input)))))")
(define
flow-load-combinators!
(fn
@@ -51,4 +47,5 @@
(begin
(scheme-eval-program (scheme-parse-all flow-combinators-src) env)
(scheme-eval-program (scheme-parse-all flow-control-src) env)
(scheme-eval-program (scheme-parse-all flow-suspend-src) env)
env)))

29
lib/flow/store.sx Normal file
View File

@@ -0,0 +1,29 @@
;; lib/flow/store.sx — durable flow store + lifecycle (Phase 3).
;;
;; The store maps flow-id -> record. A record is a plain list:
;; (flow input log status payload)
;; flow — the flow procedure (live; re-resolved by name on restart)
;; input — the original start input (plain data)
;; log — replay log: list of (tag value) entries (plain data)
;; status — done | suspended | cancelled
;; payload — result (done) | waiting-tag (suspended) | #f (cancelled)
;;
;; Lifecycle (all use deterministic replay via flow-drive — see spec.sx):
;; (flow/start flow input) — run from empty log. If it completes, return the raw
;; result (backward compatible with Phases 1-2). If it suspends, register the
;; record and return (flow-suspended id tag).
;; (flow/resume id value) — append (tag value) to the log and re-drive. Returns
;; the raw result on completion, (flow-suspended id tag) on a further suspend,
;; or (flow-error reason) if the id is unknown / not suspended.
;; (flow/cancel id) — mark cancelled; a later resume is rejected (the stale
;; replay can never wake a cancelled flow).
(define
flow-store-src
"(define flow-store (list))\n (define flow-next-id 0)\n (define (flow-store-put! id rec) (set! flow-store (cons (list id rec) flow-store)))\n (define (flow-store-find id store)\n (if (null? store)\n (list)\n (if (= (car (car store)) id)\n (car (cdr (car store)))\n (flow-store-find id (cdr store)))))\n (define (flow-store-get id) (flow-store-find id flow-store))\n (define (flow-mk-rec flow input log status payload)\n (list flow input log status payload))\n (define (flow-rec-flow r) (car r))\n (define (flow-rec-input r) (car (cdr r)))\n (define (flow-rec-log r) (car (cdr (cdr r))))\n (define (flow-rec-status r) (car (cdr (cdr (cdr r)))))\n (define (flow-rec-payload r) (car (cdr (cdr (cdr (cdr r))))))\n (define (flow-outcome id flow input log outcome)\n (if (eq? (car outcome) (quote flow-done))\n (begin\n (flow-store-put! id (flow-mk-rec flow input log (quote done) (car (cdr outcome))))\n (car (cdr outcome)))\n (begin\n (flow-store-put! id (flow-mk-rec flow input log (quote suspended) (car (cdr outcome))))\n (list (quote flow-suspended) id (car (cdr outcome))))))\n (define (flow/start flow input)\n (set! flow-next-id (+ flow-next-id 1))\n (flow-outcome flow-next-id flow input (list) (flow-drive flow input (list))))\n (define (flow/resume id value)\n (let ((rec (flow-store-get id)))\n (if (null? rec)\n (list (quote flow-error) (quote no-such-flow))\n (if (eq? (flow-rec-status rec) (quote suspended))\n (let ((newlog (cons (list (flow-rec-payload rec) value) (flow-rec-log rec))))\n (flow-outcome id (flow-rec-flow rec) (flow-rec-input rec) newlog\n (flow-drive (flow-rec-flow rec) (flow-rec-input rec) newlog)))\n (list (quote flow-error) (quote not-suspended))))))\n (define (flow/cancel id)\n (let ((rec (flow-store-get id)))\n (if (null? rec)\n (list (quote flow-error) (quote no-such-flow))\n (begin\n (flow-store-put! id\n (flow-mk-rec (flow-rec-flow rec) (flow-rec-input rec) (flow-rec-log rec) (quote cancelled) #f))\n (list (quote flow-cancelled) id)))))")
(define
flow-load-store!
(fn
(env)
(begin (scheme-eval-program (scheme-parse-all flow-store-src) env) env)))

114
lib/flow/tests/suspend.sx Normal file
View File

@@ -0,0 +1,114 @@
;; lib/flow/tests/suspend.sx — Phase 3: suspend / resume / cancel (deterministic replay).
(define flow-sus-pass 0)
(define flow-sus-fail 0)
(define flow-sus-fails (list))
(define
flow-sus-test
(fn
(name actual expected)
(if
(= actual expected)
(set! flow-sus-pass (+ flow-sus-pass 1))
(begin
(set! flow-sus-fail (+ flow-sus-fail 1))
(append! flow-sus-fails {:name name :expected expected :actual actual})))))
(define flow-s (fn (src) (flow-run src)))
;; ── flow/start ──────────────────────────────────────────────────
(flow-sus-test
"start: non-suspending flow returns the raw result"
(flow-s "(flow/start (lambda (x) (* x 2)) 5)")
10)
(flow-sus-test
"start: a suspending flow returns a flow-suspended state"
(flow-s
"(defflow w (sequence (lambda (x) (+ x 1)) (lambda (g) (suspend (quote await))) (lambda (c) c))) (car (flow/start w 10))")
"flow-suspended")
(flow-sus-test
"start: suspended state carries a numeric id"
(flow-s
"(defflow w (lambda (x) (suspend (quote await)))) (car (cdr (flow/start w 10)))")
1)
(flow-sus-test
"start: suspended state carries the suspend tag"
(flow-s
"(defflow w (lambda (x) (suspend (quote await)))) (car (cdr (cdr (flow/start w 10))))")
"await")
;; ── flow/resume ─────────────────────────────────────────────────
(flow-sus-test
"resume: injects the value and completes"
(flow-s
"(defflow w (sequence (lambda (x) (+ x 1)) (lambda (g) (suspend (quote await))) (lambda (c) (list (quote done) c)))) (define s (flow/start w 10)) (flow/resume (car (cdr s)) 777)")
(list "done" 777))
(flow-sus-test
"resume: injected value threads into the next node"
(flow-s
"(defflow w (sequence (lambda (x) (suspend (quote v))) (lambda (n) (* n 3)))) (define s (flow/start w 0)) (flow/resume (car (cdr s)) 14)")
42)
(flow-sus-test
"resume: replays earlier suspends (recompute is deterministic)"
(flow-s
"(define runs 0) (defflow w (sequence (lambda (x) (begin (set! runs (+ runs 1)) (+ x 1))) (lambda (g) (suspend (quote await))) (lambda (c) c))) (define s (flow/start w 10)) (flow/resume (car (cdr s)) 99) runs")
2)
;; ── multi-step suspension ───────────────────────────────────────
(flow-sus-test
"multi: first resume suspends at the next tag"
(flow-s
"(defflow two (sequence (lambda (x) (suspend (quote a))) (lambda (x) (suspend (quote b))) (lambda (x) (list (quote end) x)))) (define s (flow/start two 0)) (define s2 (flow/resume (car (cdr s)) 100)) (car (cdr (cdr s2)))")
"b")
(flow-sus-test
"multi: second resume completes with the latest value"
(flow-s
"(defflow two (sequence (lambda (x) (suspend (quote a))) (lambda (x) (suspend (quote b))) (lambda (x) (list (quote end) x)))) (define id (car (cdr (flow/start two 0)))) (flow/resume id 100) (flow/resume id 200)")
(list "end" 200))
;; ── error / lifecycle guards ────────────────────────────────────
(flow-sus-test
"resume: completed flow cannot be resumed again"
(flow-s
"(defflow w (lambda (x) (suspend (quote q)))) (define id (car (cdr (flow/start w 0)))) (flow/resume id 1) (flow/resume id 2)")
(list "flow-error" "not-suspended"))
(flow-sus-test
"resume: unknown id errors"
(flow-s "(flow/resume 999 1)")
(list "flow-error" "no-such-flow"))
;; ── flow/cancel ─────────────────────────────────────────────────
(flow-sus-test
"cancel: returns a flow-cancelled state"
(flow-s
"(defflow w (lambda (x) (suspend (quote q)))) (define id (car (cdr (flow/start w 0)))) (flow/cancel id)")
(list "flow-cancelled" 1))
(flow-sus-test
"cancel: a cancelled flow cannot be resumed (stale resume rejected)"
(flow-s
"(defflow w (lambda (x) (suspend (quote q)))) (define id (car (cdr (flow/start w 0)))) (flow/cancel id) (flow/resume id 5)")
(list "flow-error" "not-suspended"))
(flow-sus-test
"cancel: unknown id errors"
(flow-s "(flow/cancel 999)")
(list "flow-error" "no-such-flow"))
;; ── composition ─────────────────────────────────────────────────
(flow-sus-test
"suspend inside a branch arm"
(flow-s
"(defflow gate (branch (lambda (x) (> x 0)) (lambda (x) (suspend (quote approve))) (flow-const (quote rejected)))) (define s (flow/start gate 5)) (flow/resume (car (cdr s)) (quote approved))")
"approved")
(flow-sus-test
"two independent runs get independent ids"
(flow-s
"(defflow w (lambda (x) (suspend (quote q)))) (list (car (cdr (flow/start w 0))) (car (cdr (flow/start w 0))))")
(list 1 2))
(flow-sus-test
"suspend reason may be a structured value"
(flow-s
"(defflow w (lambda (x) (suspend (list (quote needs) (quote approval))))) (car (cdr (cdr (flow/start w 0))))")
(list "needs" "approval"))
(define flow-sus-tests-run! (fn () {:total (+ flow-sus-pass flow-sus-fail) :passed flow-sus-pass :failed flow-sus-fail :fails flow-sus-fails}))

View File

@@ -16,7 +16,7 @@ federation extension via fed-sx for remote-node execution.
## Status (rolling)
`bash lib/flow/conformance.sh`**49/49** (Phases 1-2 done; Phase 3 next)
`bash lib/flow/conformance.sh`**66/66** (Phases 1-2 done; Phase 3 suspend/resume/cancel done, crash-recovery next)
## Ground rules
@@ -94,13 +94,21 @@ lib/flow/spec.sx lib/flow/runtime.sx lib/flow/store.sx
## Phase 3 — Suspend / resume (the showcase)
- [ ] `(suspend reason)``call/cc` captures continuation, returns flow-id to caller
- [ ] `lib/flow/store.sx` — serialize flow state (continuation + open vars)
- [ ] `(flow/resume id value)` — load continuation, inject value, re-enter
- [ ] `(flow/cancel id)` — explicit termination
- [x] `(suspend tag)` — guest call/cc is ESCAPE-ONLY (re-entry hangs), so resume
uses **deterministic replay**: suspend escapes to the driver as `(flow-suspended
tag)`; resume re-runs the flow, replaying resolved suspends from a `(tag value)`
log. No live continuation is ever serialized — the log is plain data.
- [x] `lib/flow/store.sx` — flow store: id→record `(flow input log status payload)`;
`flow-drive` runs a flow against a replay log.
- [x] `(flow/resume id value)` — append `(tag value)` to the log, re-drive; raw
result on completion, `(flow-suspended id tag)` on a further suspend.
- [x] `(flow/cancel id)` — mark cancelled; a later resume is rejected (stale replay
cannot wake a cancelled flow).
- [ ] crash recovery — on restart, scan store for paused flows, mark resumable
- [ ] `lib/flow/tests/suspend.sx`pause-resume scenarios, cancellation, "restart"
scenarios (simulated by re-loading store)
- [x] `lib/flow/tests/suspend.sx`17 cases: start/resume/cancel, multi-step,
replay determinism, lifecycle guards, suspend-in-branch
- Harness: `flow-run` now reuses one env with a per-test reset (building the full
standard env 66× was too slow) — see `api.sx`.
## Phase 4 — Distributed nodes via fed-sx