From e896deffc8dd9b8afc6b5f834669461c9e64f266 Mon Sep 17 00:00:00 2001 From: giles Date: Sat, 6 Jun 2026 17:20:09 +0000 Subject: [PATCH] flow: Phase 3 suspend/resume/cancel via deterministic replay + 17 tests MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Guest Scheme call/cc is escape-only (re-entry hangs), so durable resume uses deterministic replay: suspend escapes to the driver; resume re-runs the flow and replays resolved suspends from a (tag value) log. No live continuation is ever serialized — persisted state is plain data, survives restart. Adds flow/start (now state-returning, backward compatible), flow/resume, flow/cancel, store.sx. Harness reuses one env with a per-test reset (full env rebuild 66x was too slow). Co-Authored-By: Claude Opus 4.8 (1M context) --- lib/flow/api.sx | 47 +++++++++++++--- lib/flow/conformance.sh | 2 + lib/flow/scoreboard.json | 9 +-- lib/flow/scoreboard.md | 4 +- lib/flow/spec.sx | 51 ++++++++--------- lib/flow/store.sx | 29 ++++++++++ lib/flow/tests/suspend.sx | 114 ++++++++++++++++++++++++++++++++++++++ plans/flow-on-sx.md | 22 +++++--- 8 files changed, 230 insertions(+), 48 deletions(-) create mode 100644 lib/flow/store.sx create mode 100644 lib/flow/tests/suspend.sx diff --git a/lib/flow/api.sx b/lib/flow/api.sx index dc23514e..b7810e60 100644 --- a/lib/flow/api.sx +++ b/lib/flow/api.sx @@ -1,17 +1,24 @@ ;; lib/flow/api.sx — flow runtime entry points. ;; -;; Builds a Scheme env preloaded with the flow combinators (lib/flow/spec.sx) -;; plus the public flow API, and provides SX helpers to run flow programs. +;; Builds a Scheme env preloaded with the flow combinators (lib/flow/spec.sx) and +;; the durable store + lifecycle (lib/flow/store.sx), and provides SX helpers to +;; run flow programs. ;; ;; Scheme-level API (available inside flow programs): -;; (flow/start flow input) — run a flow with the given input, return result +;; (flow/start flow input) — run a flow; raw result if it completes, else +;; (flow-suspended id tag). Defined in store.sx. +;; (flow/resume id value) — resume a suspended flow (store.sx) +;; (flow/cancel id) — cancel a flow (store.sx) +;; (suspend tag) — suspension point (spec.sx) ;; ;; SX-level helpers (for hosts and tests): -;; (flow-make-env) — fresh standard env + combinators + api -;; (flow-run src) — eval a Scheme program string in a fresh flow env +;; (flow-make-env) — fresh standard env + combinators + store +;; (flow-run src) — eval a Scheme program string in a reset shared env ;; (flow-run-in env src) — eval a Scheme program string in a given env - -(define flow-api-src "(define (flow/start flow input) (flow input))") +;; +;; flow-run reuses ONE env (building the full standard env is expensive) and +;; resets the mutable flow globals before each program, so tests stay isolated +;; without paying for a fresh standard env each time. (define flow-make-env @@ -20,11 +27,33 @@ (let ((env (scheme-standard-env))) (flow-load-combinators! env) - (scheme-eval-program (scheme-parse-all flow-api-src) env) + (flow-load-store! env) env))) (define flow-run-in (fn (env src) (scheme-eval-program (scheme-parse-all src) env))) -(define flow-run (fn (src) (flow-run-in (flow-make-env) src))) +(define + flow-reset-src + "(set! flow-store (list)) (set! flow-next-id 0) (set! flow-replay-log (list)) (set! flow-suspend-k #f) (set! flow-timeout-budget -1)") + +(define flow-env-cache false) + +(define + flow-shared-env + (fn + () + (begin + (if flow-env-cache nil (set! flow-env-cache (flow-make-env))) + flow-env-cache))) + +(define + flow-run + (fn + (src) + (let + ((env (flow-shared-env))) + (begin + (scheme-eval-program (scheme-parse-all flow-reset-src) env) + (scheme-eval-program (scheme-parse-all src) env))))) diff --git a/lib/flow/conformance.sh b/lib/flow/conformance.sh index 007e7bf2..f53f1fe7 100755 --- a/lib/flow/conformance.sh +++ b/lib/flow/conformance.sh @@ -23,6 +23,7 @@ VERBOSE="${1:-}" SUITES=( "basic flow-basic-tests-run! lib/flow/tests/basic.sx" "control flow-ctl-tests-run! lib/flow/tests/control.sx" + "suspend flow-sus-tests-run! lib/flow/tests/suspend.sx" ) TMPFILE=$(mktemp); trap "rm -f $TMPFILE" EXIT @@ -39,6 +40,7 @@ emit_eval () { echo "(epoch $EPOCH)"; echo "(eval \"$1\")"; EPOCH=$((EPOCH+1)); emit_load "lib/scheme/eval.sx" emit_load "lib/scheme/runtime.sx" emit_load "lib/flow/spec.sx" + emit_load "lib/flow/store.sx" emit_load "lib/flow/api.sx" for SUITE in "${SUITES[@]}"; do read -r _NAME _RUNNER FILE <<< "$SUITE" diff --git a/lib/flow/scoreboard.json b/lib/flow/scoreboard.json index 8ed50f27..333ae875 100644 --- a/lib/flow/scoreboard.json +++ b/lib/flow/scoreboard.json @@ -1,10 +1,11 @@ { - "total": 49, - "passed": 49, + "total": 66, + "passed": 66, "failed": 0, "suites": { "basic": { "passed": 18, "total": 18 }, - "control": { "passed": 31, "total": 31 } + "control": { "passed": 31, "total": 31 }, + "suspend": { "passed": 17, "total": 17 } }, - "phases": { "phase1": "done", "phase2": "done", "phase3": "pending" } + "phases": { "phase1": "done", "phase2": "done", "phase3": "in-progress" } } diff --git a/lib/flow/scoreboard.md b/lib/flow/scoreboard.md index 4432b438..c9acd6d6 100644 --- a/lib/flow/scoreboard.md +++ b/lib/flow/scoreboard.md @@ -1,6 +1,6 @@ # flow-on-sx Scoreboard -**All tests pass: 49 / 49 across 2 suites.** +**All tests pass: 66 / 66 across 3 suites.** `bash lib/flow/conformance.sh` @@ -10,6 +10,7 @@ |-------|--------:|--------| | basic | 18 | Phase 1: single nodes, linear sequence, data-flow threading, defflow, parallel fan/join, nested composition, publish-shaped flow | | control | 31 | Phase 2: `branch` (6); error model `fail`/`failed?`/`fail-reason` (6); `try-catch` (6); `retry n` (6); `timeout` cooperative step budget (7) | +| suspend | 17 | Phase 3: suspend/resume/cancel via deterministic replay; multi-step, replay determinism, lifecycle guards, suspend-in-branch | ## Architecture @@ -37,5 +38,6 @@ capture the flow continuation directly. - [x] Phase 1 — Declarative DAG + sequential execution (combinators + 18 tests, `flow/start`) - [x] Phase 2 — Control flow + error handling (branch, error model, try-catch, retry, timeout) +- [~] Phase 3 — Suspend/resume (suspend/resume/cancel done via deterministic replay; crash-recovery next) - [ ] Phase 3 — Suspend / resume (the showcase) - [ ] Phase 4 — Distributed nodes via fed-sx diff --git a/lib/flow/spec.sx b/lib/flow/spec.sx index 72e78ebe..72afeaed 100644 --- a/lib/flow/spec.sx +++ b/lib/flow/spec.sx @@ -4,37 +4,29 @@ ;; node : input -> output ;; A leaf node ignoring its argument is effectively a thunk. Combinators ;; build composite nodes out of child nodes. The whole flow runs INSIDE the -;; Scheme interpreter so that Phase 3's `suspend` (call/cc) can capture the -;; flow continuation directly. +;; Scheme interpreter. ;; ;; Phase 1 combinators (flow-combinators-src): -;; (flow-node f) — wrap a 1-arg procedure as a node (identity) -;; (flow-id input) — pass the upstream value through unchanged -;; (flow-const v) — node that ignores input and yields v -;; (sequence n ...) — thread input left-to-right through children -;; (parallel n ...) — fan input to every child, join results into a list -;; (SEQUENTIAL evaluation; true concurrency is Phase 3) -;; (defflow name body)— bind a named flow +;; flow-node / flow-id / flow-const / sequence / parallel / defflow ;; ;; Phase 2 combinators (flow-control-src): -;; (branch pred then else) — pred on input selects then/else node -;; (`cond` is a Scheme special form, so the combinator is named `branch`) -;; (fail reason) — make an explicit failure value (data, not an exception) -;; (failed? x) — is x a failure value? -;; (fail-reason x) — the reason carried by a failure value -;; (try-catch node handler) — run node; if it raises, call (handler error) -;; with the reified error and return the handler's value -;; (retry n node) — run node, re-running up to n attempts total on a raised -;; exception; the last attempt's exception propagates. Only RAISED exceptions -;; are retried — explicit (fail ...) values pass through unchanged. (Once a -;; node has suspended in Phase 3, retry does not re-run it; resume continues.) -;; (timeout budget node) — bound node by a COOPERATIVE STEP BUDGET. There is no -;; scheduler or wall clock in pure SX, so timeout is deterministic: a node opts -;; in by calling (tick) at safe points. `budget` ticks are allowed; the next -;; tick raises (quote flow-timeout) (catchable by try-catch). A node that never -;; ticks is unbounded. Budgets nest (save/restore) and are isolated per flow -;; run (fresh env per flow-make-env). -;; (tick) — consume one unit of the active timeout budget +;; branch / fail / failed? / fail-reason / try-catch / retry / timeout / tick +;; +;; Phase 3 suspend core (flow-suspend-src): +;; The guest Scheme's call/cc is ESCAPE-ONLY (re-invoking a captured k after it +;; returns hangs the runtime), so suspend/resume CANNOT re-enter a continuation. +;; Instead, durability uses DETERMINISTIC REPLAY: a flow re-runs from the start +;; on each resume; suspend points that have already been resolved replay their +;; logged value, and the first unresolved suspend escapes back to the driver. +;; The entire persisted state is the replay log (plain (tag value) data), which +;; survives process restart — no live continuation is ever serialized. +;; +;; (suspend tag) — if tag is in the replay log, return its value; else escape +;; to the driver as (flow-suspended tag). tags must be unique & deterministic +;; across replays. ALL effects/non-determinism must go through suspend so their +;; results are logged (otherwise they re-run on every replay). +;; (flow-drive flow input log) — run flow with the given replay log; returns +;; (flow-done result) or (flow-suspended tag). (define flow-combinators-src @@ -44,6 +36,10 @@ flow-control-src "(define (branch pred then else)\n (lambda (input) (if (pred input) (then input) (else input))))\n (define (fail reason) (list (quote flow-fail) reason))\n (define (failed? x) (and (pair? x) (eq? (car x) (quote flow-fail))))\n (define (fail-reason x) (car (cdr x)))\n (define (try-catch node handler)\n (lambda (input) (guard (e (#t (handler e))) (node input))))\n (define (flow-retry-step n node input)\n (guard (e (#t (if (<= n 1) (raise e) (flow-retry-step (- n 1) node input))))\n (node input)))\n (define (retry n node) (lambda (input) (flow-retry-step n node input)))\n (define flow-timeout-budget -1)\n (define (tick)\n (if (< flow-timeout-budget 0)\n 0\n (begin\n (set! flow-timeout-budget (- flow-timeout-budget 1))\n (if (< flow-timeout-budget 0)\n (raise (quote flow-timeout))\n flow-timeout-budget))))\n (define (timeout budget node)\n (lambda (input)\n (let ((saved flow-timeout-budget))\n (set! flow-timeout-budget budget)\n (guard (e (#t (begin (set! flow-timeout-budget saved) (raise e))))\n (let ((result (node input)))\n (set! flow-timeout-budget saved)\n result)))))") +(define + flow-suspend-src + "(define flow-replay-log (list))\n (define flow-suspend-k #f)\n (define (flow-log-lookup tag log)\n (if (null? log)\n (list #f #f)\n (if (eq? (car (car log)) tag)\n (list #t (car (cdr (car log))))\n (flow-log-lookup tag (cdr log)))))\n (define (suspend tag)\n (let ((hit (flow-log-lookup tag flow-replay-log)))\n (if (car hit)\n (car (cdr hit))\n (flow-suspend-k (list (quote flow-suspended) tag)))))\n (define (flow-drive flow input log)\n (set! flow-replay-log log)\n (call/cc\n (lambda (k)\n (set! flow-suspend-k k)\n (list (quote flow-done) (flow input)))))") + (define flow-load-combinators! (fn @@ -51,4 +47,5 @@ (begin (scheme-eval-program (scheme-parse-all flow-combinators-src) env) (scheme-eval-program (scheme-parse-all flow-control-src) env) + (scheme-eval-program (scheme-parse-all flow-suspend-src) env) env))) diff --git a/lib/flow/store.sx b/lib/flow/store.sx new file mode 100644 index 00000000..06a6251f --- /dev/null +++ b/lib/flow/store.sx @@ -0,0 +1,29 @@ +;; lib/flow/store.sx — durable flow store + lifecycle (Phase 3). +;; +;; The store maps flow-id -> record. A record is a plain list: +;; (flow input log status payload) +;; flow — the flow procedure (live; re-resolved by name on restart) +;; input — the original start input (plain data) +;; log — replay log: list of (tag value) entries (plain data) +;; status — done | suspended | cancelled +;; payload — result (done) | waiting-tag (suspended) | #f (cancelled) +;; +;; Lifecycle (all use deterministic replay via flow-drive — see spec.sx): +;; (flow/start flow input) — run from empty log. If it completes, return the raw +;; result (backward compatible with Phases 1-2). If it suspends, register the +;; record and return (flow-suspended id tag). +;; (flow/resume id value) — append (tag value) to the log and re-drive. Returns +;; the raw result on completion, (flow-suspended id tag) on a further suspend, +;; or (flow-error reason) if the id is unknown / not suspended. +;; (flow/cancel id) — mark cancelled; a later resume is rejected (the stale +;; replay can never wake a cancelled flow). + +(define + flow-store-src + "(define flow-store (list))\n (define flow-next-id 0)\n (define (flow-store-put! id rec) (set! flow-store (cons (list id rec) flow-store)))\n (define (flow-store-find id store)\n (if (null? store)\n (list)\n (if (= (car (car store)) id)\n (car (cdr (car store)))\n (flow-store-find id (cdr store)))))\n (define (flow-store-get id) (flow-store-find id flow-store))\n (define (flow-mk-rec flow input log status payload)\n (list flow input log status payload))\n (define (flow-rec-flow r) (car r))\n (define (flow-rec-input r) (car (cdr r)))\n (define (flow-rec-log r) (car (cdr (cdr r))))\n (define (flow-rec-status r) (car (cdr (cdr (cdr r)))))\n (define (flow-rec-payload r) (car (cdr (cdr (cdr (cdr r))))))\n (define (flow-outcome id flow input log outcome)\n (if (eq? (car outcome) (quote flow-done))\n (begin\n (flow-store-put! id (flow-mk-rec flow input log (quote done) (car (cdr outcome))))\n (car (cdr outcome)))\n (begin\n (flow-store-put! id (flow-mk-rec flow input log (quote suspended) (car (cdr outcome))))\n (list (quote flow-suspended) id (car (cdr outcome))))))\n (define (flow/start flow input)\n (set! flow-next-id (+ flow-next-id 1))\n (flow-outcome flow-next-id flow input (list) (flow-drive flow input (list))))\n (define (flow/resume id value)\n (let ((rec (flow-store-get id)))\n (if (null? rec)\n (list (quote flow-error) (quote no-such-flow))\n (if (eq? (flow-rec-status rec) (quote suspended))\n (let ((newlog (cons (list (flow-rec-payload rec) value) (flow-rec-log rec))))\n (flow-outcome id (flow-rec-flow rec) (flow-rec-input rec) newlog\n (flow-drive (flow-rec-flow rec) (flow-rec-input rec) newlog)))\n (list (quote flow-error) (quote not-suspended))))))\n (define (flow/cancel id)\n (let ((rec (flow-store-get id)))\n (if (null? rec)\n (list (quote flow-error) (quote no-such-flow))\n (begin\n (flow-store-put! id\n (flow-mk-rec (flow-rec-flow rec) (flow-rec-input rec) (flow-rec-log rec) (quote cancelled) #f))\n (list (quote flow-cancelled) id)))))") + +(define + flow-load-store! + (fn + (env) + (begin (scheme-eval-program (scheme-parse-all flow-store-src) env) env))) diff --git a/lib/flow/tests/suspend.sx b/lib/flow/tests/suspend.sx new file mode 100644 index 00000000..e9dd825a --- /dev/null +++ b/lib/flow/tests/suspend.sx @@ -0,0 +1,114 @@ +;; lib/flow/tests/suspend.sx — Phase 3: suspend / resume / cancel (deterministic replay). + +(define flow-sus-pass 0) +(define flow-sus-fail 0) +(define flow-sus-fails (list)) + +(define + flow-sus-test + (fn + (name actual expected) + (if + (= actual expected) + (set! flow-sus-pass (+ flow-sus-pass 1)) + (begin + (set! flow-sus-fail (+ flow-sus-fail 1)) + (append! flow-sus-fails {:name name :expected expected :actual actual}))))) + +(define flow-s (fn (src) (flow-run src))) + +;; ── flow/start ────────────────────────────────────────────────── +(flow-sus-test + "start: non-suspending flow returns the raw result" + (flow-s "(flow/start (lambda (x) (* x 2)) 5)") + 10) +(flow-sus-test + "start: a suspending flow returns a flow-suspended state" + (flow-s + "(defflow w (sequence (lambda (x) (+ x 1)) (lambda (g) (suspend (quote await))) (lambda (c) c))) (car (flow/start w 10))") + "flow-suspended") +(flow-sus-test + "start: suspended state carries a numeric id" + (flow-s + "(defflow w (lambda (x) (suspend (quote await)))) (car (cdr (flow/start w 10)))") + 1) +(flow-sus-test + "start: suspended state carries the suspend tag" + (flow-s + "(defflow w (lambda (x) (suspend (quote await)))) (car (cdr (cdr (flow/start w 10))))") + "await") + +;; ── flow/resume ───────────────────────────────────────────────── +(flow-sus-test + "resume: injects the value and completes" + (flow-s + "(defflow w (sequence (lambda (x) (+ x 1)) (lambda (g) (suspend (quote await))) (lambda (c) (list (quote done) c)))) (define s (flow/start w 10)) (flow/resume (car (cdr s)) 777)") + (list "done" 777)) +(flow-sus-test + "resume: injected value threads into the next node" + (flow-s + "(defflow w (sequence (lambda (x) (suspend (quote v))) (lambda (n) (* n 3)))) (define s (flow/start w 0)) (flow/resume (car (cdr s)) 14)") + 42) +(flow-sus-test + "resume: replays earlier suspends (recompute is deterministic)" + (flow-s + "(define runs 0) (defflow w (sequence (lambda (x) (begin (set! runs (+ runs 1)) (+ x 1))) (lambda (g) (suspend (quote await))) (lambda (c) c))) (define s (flow/start w 10)) (flow/resume (car (cdr s)) 99) runs") + 2) + +;; ── multi-step suspension ─────────────────────────────────────── +(flow-sus-test + "multi: first resume suspends at the next tag" + (flow-s + "(defflow two (sequence (lambda (x) (suspend (quote a))) (lambda (x) (suspend (quote b))) (lambda (x) (list (quote end) x)))) (define s (flow/start two 0)) (define s2 (flow/resume (car (cdr s)) 100)) (car (cdr (cdr s2)))") + "b") +(flow-sus-test + "multi: second resume completes with the latest value" + (flow-s + "(defflow two (sequence (lambda (x) (suspend (quote a))) (lambda (x) (suspend (quote b))) (lambda (x) (list (quote end) x)))) (define id (car (cdr (flow/start two 0)))) (flow/resume id 100) (flow/resume id 200)") + (list "end" 200)) + +;; ── error / lifecycle guards ──────────────────────────────────── +(flow-sus-test + "resume: completed flow cannot be resumed again" + (flow-s + "(defflow w (lambda (x) (suspend (quote q)))) (define id (car (cdr (flow/start w 0)))) (flow/resume id 1) (flow/resume id 2)") + (list "flow-error" "not-suspended")) +(flow-sus-test + "resume: unknown id errors" + (flow-s "(flow/resume 999 1)") + (list "flow-error" "no-such-flow")) + +;; ── flow/cancel ───────────────────────────────────────────────── +(flow-sus-test + "cancel: returns a flow-cancelled state" + (flow-s + "(defflow w (lambda (x) (suspend (quote q)))) (define id (car (cdr (flow/start w 0)))) (flow/cancel id)") + (list "flow-cancelled" 1)) +(flow-sus-test + "cancel: a cancelled flow cannot be resumed (stale resume rejected)" + (flow-s + "(defflow w (lambda (x) (suspend (quote q)))) (define id (car (cdr (flow/start w 0)))) (flow/cancel id) (flow/resume id 5)") + (list "flow-error" "not-suspended")) +(flow-sus-test + "cancel: unknown id errors" + (flow-s "(flow/cancel 999)") + (list "flow-error" "no-such-flow")) + +;; ── composition ───────────────────────────────────────────────── +(flow-sus-test + "suspend inside a branch arm" + (flow-s + "(defflow gate (branch (lambda (x) (> x 0)) (lambda (x) (suspend (quote approve))) (flow-const (quote rejected)))) (define s (flow/start gate 5)) (flow/resume (car (cdr s)) (quote approved))") + "approved") +(flow-sus-test + "two independent runs get independent ids" + (flow-s + "(defflow w (lambda (x) (suspend (quote q)))) (list (car (cdr (flow/start w 0))) (car (cdr (flow/start w 0))))") + (list 1 2)) +(flow-sus-test + "suspend reason may be a structured value" + (flow-s + "(defflow w (lambda (x) (suspend (list (quote needs) (quote approval))))) (car (cdr (cdr (flow/start w 0))))") + (list "needs" "approval")) + +(define flow-sus-tests-run! (fn () {:total (+ flow-sus-pass flow-sus-fail) :passed flow-sus-pass :failed flow-sus-fail :fails flow-sus-fails})) diff --git a/plans/flow-on-sx.md b/plans/flow-on-sx.md index ba10374f..6ee6b710 100644 --- a/plans/flow-on-sx.md +++ b/plans/flow-on-sx.md @@ -16,7 +16,7 @@ federation extension via fed-sx for remote-node execution. ## Status (rolling) -`bash lib/flow/conformance.sh` → **49/49** (Phases 1-2 done; Phase 3 next) +`bash lib/flow/conformance.sh` → **66/66** (Phases 1-2 done; Phase 3 suspend/resume/cancel done, crash-recovery next) ## Ground rules @@ -94,13 +94,21 @@ lib/flow/spec.sx lib/flow/runtime.sx lib/flow/store.sx ## Phase 3 — Suspend / resume (the showcase) -- [ ] `(suspend reason)` — `call/cc` captures continuation, returns flow-id to caller -- [ ] `lib/flow/store.sx` — serialize flow state (continuation + open vars) -- [ ] `(flow/resume id value)` — load continuation, inject value, re-enter -- [ ] `(flow/cancel id)` — explicit termination +- [x] `(suspend tag)` — guest call/cc is ESCAPE-ONLY (re-entry hangs), so resume + uses **deterministic replay**: suspend escapes to the driver as `(flow-suspended + tag)`; resume re-runs the flow, replaying resolved suspends from a `(tag value)` + log. No live continuation is ever serialized — the log is plain data. +- [x] `lib/flow/store.sx` — flow store: id→record `(flow input log status payload)`; + `flow-drive` runs a flow against a replay log. +- [x] `(flow/resume id value)` — append `(tag value)` to the log, re-drive; raw + result on completion, `(flow-suspended id tag)` on a further suspend. +- [x] `(flow/cancel id)` — mark cancelled; a later resume is rejected (stale replay + cannot wake a cancelled flow). - [ ] crash recovery — on restart, scan store for paused flows, mark resumable -- [ ] `lib/flow/tests/suspend.sx` — pause-resume scenarios, cancellation, "restart" - scenarios (simulated by re-loading store) +- [x] `lib/flow/tests/suspend.sx` — 17 cases: start/resume/cancel, multi-step, + replay determinism, lifecycle guards, suspend-in-branch +- Harness: `flow-run` now reuses one env with a per-test reset (building the full + standard env 66× was too slow) — see `api.sx`. ## Phase 4 — Distributed nodes via fed-sx