flow: crash recovery — store export/import + resumable scan + 8 tests (Phase 3 complete)
Some checks failed
Test, Build, and Deploy / test-build-deploy (push) Failing after 58s
Some checks failed
Test, Build, and Deploy / test-build-deploy (push) Failing after 58s
Records are name-keyed (defflow registers names); flow-store-export nulls live procs to plain data, flow-store-import! restores, flow-resumable-ids scans for paused flows. Resume re-resolves the proc by name, so a flow survives a wiped store (simulated restart). The whole durable model persists only plain data. Co-Authored-By: Claude Opus 4.8 (1M context) <noreply@anthropic.com>
This commit is contained in:
@@ -24,6 +24,7 @@ SUITES=(
|
||||
"basic flow-basic-tests-run! lib/flow/tests/basic.sx"
|
||||
"control flow-ctl-tests-run! lib/flow/tests/control.sx"
|
||||
"suspend flow-sus-tests-run! lib/flow/tests/suspend.sx"
|
||||
"recovery flow-rec-tests-run! lib/flow/tests/recovery.sx"
|
||||
)
|
||||
|
||||
TMPFILE=$(mktemp); trap "rm -f $TMPFILE" EXIT
|
||||
|
||||
@@ -1,11 +1,12 @@
|
||||
{
|
||||
"total": 66,
|
||||
"passed": 66,
|
||||
"total": 74,
|
||||
"passed": 74,
|
||||
"failed": 0,
|
||||
"suites": {
|
||||
"basic": { "passed": 18, "total": 18 },
|
||||
"control": { "passed": 31, "total": 31 },
|
||||
"suspend": { "passed": 17, "total": 17 }
|
||||
"suspend": { "passed": 17, "total": 17 },
|
||||
"recovery": { "passed": 8, "total": 8 }
|
||||
},
|
||||
"phases": { "phase1": "done", "phase2": "done", "phase3": "in-progress" }
|
||||
"phases": { "phase1": "done", "phase2": "done", "phase3": "done", "phase4": "pending" }
|
||||
}
|
||||
|
||||
@@ -1,6 +1,6 @@
|
||||
# flow-on-sx Scoreboard
|
||||
|
||||
**All tests pass: 66 / 66 across 3 suites.**
|
||||
**All tests pass: 74 / 74 across 4 suites.**
|
||||
|
||||
`bash lib/flow/conformance.sh`
|
||||
|
||||
@@ -11,6 +11,7 @@
|
||||
| basic | 18 | Phase 1: single nodes, linear sequence, data-flow threading, defflow, parallel fan/join, nested composition, publish-shaped flow |
|
||||
| control | 31 | Phase 2: `branch` (6); error model `fail`/`failed?`/`fail-reason` (6); `try-catch` (6); `retry n` (6); `timeout` cooperative step budget (7) |
|
||||
| suspend | 17 | Phase 3: suspend/resume/cancel via deterministic replay; multi-step, replay determinism, lifecycle guards, suspend-in-branch |
|
||||
| recovery | 8 | Phase 3: crash recovery — store export/import, resumable scan, restart-at-every-step, replay-log survival |
|
||||
|
||||
## Architecture
|
||||
|
||||
@@ -38,6 +39,6 @@ capture the flow continuation directly.
|
||||
|
||||
- [x] Phase 1 — Declarative DAG + sequential execution (combinators + 18 tests, `flow/start`)
|
||||
- [x] Phase 2 — Control flow + error handling (branch, error model, try-catch, retry, timeout)
|
||||
- [~] Phase 3 — Suspend/resume (suspend/resume/cancel done via deterministic replay; crash-recovery next)
|
||||
- [x] Phase 3 — Suspend/resume (suspend/resume/cancel + crash recovery via deterministic replay)
|
||||
- [ ] Phase 3 — Suspend / resume (the showcase)
|
||||
- [ ] Phase 4 — Distributed nodes via fed-sx
|
||||
|
||||
@@ -8,6 +8,8 @@
|
||||
;;
|
||||
;; Phase 1 combinators (flow-combinators-src):
|
||||
;; flow-node / flow-id / flow-const / sequence / parallel / defflow
|
||||
;; defflow both binds the flow and registers it by name (flow-register!, in
|
||||
;; store.sx) so it can be re-resolved after a process restart.
|
||||
;;
|
||||
;; Phase 2 combinators (flow-control-src):
|
||||
;; branch / fail / failed? / fail-reason / try-catch / retry / timeout / tick
|
||||
@@ -30,7 +32,7 @@
|
||||
|
||||
(define
|
||||
flow-combinators-src
|
||||
"(define (flow-node f) f)\n (define (flow-id input) input)\n (define (flow-const v) (lambda (input) v))\n (define (flow-seq-step ns v)\n (if (null? ns) v (flow-seq-step (cdr ns) ((car ns) v))))\n (define sequence (lambda ns (lambda (input) (flow-seq-step ns input))))\n (define parallel (lambda ns (lambda (input) (map (lambda (n) (n input)) ns))))\n (define-syntax defflow\n (syntax-rules ()\n ((defflow nm body) (define nm body))))")
|
||||
"(define (flow-node f) f)\n (define (flow-id input) input)\n (define (flow-const v) (lambda (input) v))\n (define (flow-seq-step ns v)\n (if (null? ns) v (flow-seq-step (cdr ns) ((car ns) v))))\n (define sequence (lambda ns (lambda (input) (flow-seq-step ns input))))\n (define parallel (lambda ns (lambda (input) (map (lambda (n) (n input)) ns))))\n (define-syntax defflow\n (syntax-rules ()\n ((defflow nm body)\n (begin (define nm body) (flow-register! (quote nm) nm)))))")
|
||||
|
||||
(define
|
||||
flow-control-src
|
||||
|
||||
@@ -1,26 +1,32 @@
|
||||
;; lib/flow/store.sx — durable flow store + lifecycle (Phase 3).
|
||||
;; lib/flow/store.sx — durable flow store + lifecycle + crash recovery (Phase 3).
|
||||
;;
|
||||
;; The store maps flow-id -> record. A record is a plain list:
|
||||
;; (flow input log status payload)
|
||||
;; flow — the flow procedure (live; re-resolved by name on restart)
|
||||
;; The store maps flow-id -> record (one entry per id, newest wins). A record is:
|
||||
;; (name proc input log status payload)
|
||||
;; name — registered flow name (symbol) or #f for an anonymous flow
|
||||
;; proc — the live flow procedure (#f after export; re-resolved by name)
|
||||
;; input — the original start input (plain data)
|
||||
;; log — replay log: list of (tag value) entries (plain data)
|
||||
;; status — done | suspended | cancelled
|
||||
;; payload — result (done) | waiting-tag (suspended) | #f (cancelled)
|
||||
;;
|
||||
;; A record is SERIALIZABLE once its proc is nulled (flow-store-export): name,
|
||||
;; input, and log are plain data. On restart the flow definitions are reloaded
|
||||
;; (defflow re-registers names), the store is reimported, and resume re-resolves
|
||||
;; the proc by name — no live continuation is ever persisted.
|
||||
;;
|
||||
;; Lifecycle (all use deterministic replay via flow-drive — see spec.sx):
|
||||
;; (flow/start flow input) — run from empty log. If it completes, return the raw
|
||||
;; result (backward compatible with Phases 1-2). If it suspends, register the
|
||||
;; record and return (flow-suspended id tag).
|
||||
;; (flow/resume id value) — append (tag value) to the log and re-drive. Returns
|
||||
;; the raw result on completion, (flow-suspended id tag) on a further suspend,
|
||||
;; or (flow-error reason) if the id is unknown / not suspended.
|
||||
;; (flow/cancel id) — mark cancelled; a later resume is rejected (the stale
|
||||
;; replay can never wake a cancelled flow).
|
||||
;; (flow/start flow input) — raw result if it completes (backward compatible),
|
||||
;; else (flow-suspended id tag).
|
||||
;; (flow/resume id value) — append (tag value) to the log and re-drive.
|
||||
;; (flow/cancel id) — mark cancelled; a later resume is rejected.
|
||||
;; Crash recovery:
|
||||
;; (flow-store-export) — store as plain data (procs nulled)
|
||||
;; (flow-store-import! d) — replace the store from exported data
|
||||
;; (flow-resumable-ids) — ids of suspended (resumable) flows
|
||||
|
||||
(define
|
||||
flow-store-src
|
||||
"(define flow-store (list))\n (define flow-next-id 0)\n (define (flow-store-put! id rec) (set! flow-store (cons (list id rec) flow-store)))\n (define (flow-store-find id store)\n (if (null? store)\n (list)\n (if (= (car (car store)) id)\n (car (cdr (car store)))\n (flow-store-find id (cdr store)))))\n (define (flow-store-get id) (flow-store-find id flow-store))\n (define (flow-mk-rec flow input log status payload)\n (list flow input log status payload))\n (define (flow-rec-flow r) (car r))\n (define (flow-rec-input r) (car (cdr r)))\n (define (flow-rec-log r) (car (cdr (cdr r))))\n (define (flow-rec-status r) (car (cdr (cdr (cdr r)))))\n (define (flow-rec-payload r) (car (cdr (cdr (cdr (cdr r))))))\n (define (flow-outcome id flow input log outcome)\n (if (eq? (car outcome) (quote flow-done))\n (begin\n (flow-store-put! id (flow-mk-rec flow input log (quote done) (car (cdr outcome))))\n (car (cdr outcome)))\n (begin\n (flow-store-put! id (flow-mk-rec flow input log (quote suspended) (car (cdr outcome))))\n (list (quote flow-suspended) id (car (cdr outcome))))))\n (define (flow/start flow input)\n (set! flow-next-id (+ flow-next-id 1))\n (flow-outcome flow-next-id flow input (list) (flow-drive flow input (list))))\n (define (flow/resume id value)\n (let ((rec (flow-store-get id)))\n (if (null? rec)\n (list (quote flow-error) (quote no-such-flow))\n (if (eq? (flow-rec-status rec) (quote suspended))\n (let ((newlog (cons (list (flow-rec-payload rec) value) (flow-rec-log rec))))\n (flow-outcome id (flow-rec-flow rec) (flow-rec-input rec) newlog\n (flow-drive (flow-rec-flow rec) (flow-rec-input rec) newlog)))\n (list (quote flow-error) (quote not-suspended))))))\n (define (flow/cancel id)\n (let ((rec (flow-store-get id)))\n (if (null? rec)\n (list (quote flow-error) (quote no-such-flow))\n (begin\n (flow-store-put! id\n (flow-mk-rec (flow-rec-flow rec) (flow-rec-input rec) (flow-rec-log rec) (quote cancelled) #f))\n (list (quote flow-cancelled) id)))))")
|
||||
"(define flow-registry (list))\n (define (flow-register! name proc) (set! flow-registry (cons (list name proc) flow-registry)))\n (define (flow-lookup-in name reg)\n (if (null? reg)\n #f\n (if (eq? (car (car reg)) name) (car (cdr (car reg))) (flow-lookup-in name (cdr reg)))))\n (define (flow-lookup name) (flow-lookup-in name flow-registry))\n (define (flow-name-of proc reg)\n (if (null? reg)\n #f\n (if (eq? (car (cdr (car reg))) proc) (car (car reg)) (flow-name-of proc (cdr reg)))))\n\n (define flow-store (list))\n (define flow-next-id 0)\n (define (flow-store-remove id store)\n (if (null? store)\n (list)\n (if (= (car (car store)) id)\n (flow-store-remove id (cdr store))\n (cons (car store) (flow-store-remove id (cdr store))))))\n (define (flow-store-put! id rec) (set! flow-store (cons (list id rec) (flow-store-remove id flow-store))))\n (define (flow-store-find id store)\n (if (null? store)\n (list)\n (if (= (car (car store)) id)\n (car (cdr (car store)))\n (flow-store-find id (cdr store)))))\n (define (flow-store-get id) (flow-store-find id flow-store))\n\n (define (flow-mk-rec name proc input log status payload)\n (list name proc input log status payload))\n (define (flow-rec-name r) (car r))\n (define (flow-rec-proc r) (car (cdr r)))\n (define (flow-rec-input r) (car (cdr (cdr r))))\n (define (flow-rec-log r) (car (cdr (cdr (cdr r)))))\n (define (flow-rec-status r) (car (cdr (cdr (cdr (cdr r))))))\n (define (flow-rec-payload r) (car (cdr (cdr (cdr (cdr (cdr r)))))))\n (define (flow-rec-resolve rec)\n (let ((byname (flow-lookup (flow-rec-name rec))))\n (if byname byname (flow-rec-proc rec))))\n\n (define (flow-outcome id name proc input log outcome)\n (if (eq? (car outcome) (quote flow-done))\n (begin\n (flow-store-put! id (flow-mk-rec name proc input log (quote done) (car (cdr outcome))))\n (car (cdr outcome)))\n (begin\n (flow-store-put! id (flow-mk-rec name proc input log (quote suspended) (car (cdr outcome))))\n (list (quote flow-suspended) id (car (cdr outcome))))))\n (define (flow/start flow input)\n (set! flow-next-id (+ flow-next-id 1))\n (flow-outcome flow-next-id (flow-name-of flow flow-registry) flow input (list)\n (flow-drive flow input (list))))\n (define (flow/resume id value)\n (let ((rec (flow-store-get id)))\n (if (null? rec)\n (list (quote flow-error) (quote no-such-flow))\n (if (eq? (flow-rec-status rec) (quote suspended))\n (let ((proc (flow-rec-resolve rec)))\n (let ((newlog (cons (list (flow-rec-payload rec) value) (flow-rec-log rec))))\n (flow-outcome id (flow-rec-name rec) proc (flow-rec-input rec) newlog\n (flow-drive proc (flow-rec-input rec) newlog))))\n (list (quote flow-error) (quote not-suspended))))))\n (define (flow/cancel id)\n (let ((rec (flow-store-get id)))\n (if (null? rec)\n (list (quote flow-error) (quote no-such-flow))\n (begin\n (flow-store-put! id\n (flow-mk-rec (flow-rec-name rec) (flow-rec-proc rec) (flow-rec-input rec)\n (flow-rec-log rec) (quote cancelled) #f))\n (list (quote flow-cancelled) id)))))\n\n (define (flow-export-entry entry)\n (let ((rec (car (cdr entry))))\n (list (car entry)\n (flow-mk-rec (flow-rec-name rec) #f (flow-rec-input rec)\n (flow-rec-log rec) (flow-rec-status rec) (flow-rec-payload rec)))))\n (define (flow-export-map store)\n (if (null? store) (list) (cons (flow-export-entry (car store)) (flow-export-map (cdr store)))))\n (define (flow-store-export) (flow-export-map flow-store))\n (define (flow-max-id store m)\n (if (null? store) m (flow-max-id (cdr store) (if (> (car (car store)) m) (car (car store)) m))))\n (define (flow-store-import! data)\n (begin (set! flow-store data) (set! flow-next-id (flow-max-id data 0))))\n (define (flow-collect-resumable store)\n (if (null? store)\n (list)\n (if (eq? (flow-rec-status (car (cdr (car store)))) (quote suspended))\n (cons (car (car store)) (flow-collect-resumable (cdr store)))\n (flow-collect-resumable (cdr store)))))\n (define (flow-resumable-ids) (flow-collect-resumable flow-store))")
|
||||
|
||||
(define
|
||||
flow-load-store!
|
||||
|
||||
71
lib/flow/tests/recovery.sx
Normal file
71
lib/flow/tests/recovery.sx
Normal file
@@ -0,0 +1,71 @@
|
||||
;; lib/flow/tests/recovery.sx — Phase 3: crash recovery (store export/import + restart).
|
||||
;;
|
||||
;; "restart" is simulated within one program: (set! flow-store (list)) wipes the
|
||||
;; in-memory store (process death), while flow-registry persists as it would after
|
||||
;; reloading flow definitions. Recovery = import the exported (plain-data) store and
|
||||
;; resume; the flow proc is re-resolved by name.
|
||||
|
||||
(define flow-rec-pass 0)
|
||||
(define flow-rec-fail 0)
|
||||
(define flow-rec-fails (list))
|
||||
|
||||
(define
|
||||
flow-rec-test
|
||||
(fn
|
||||
(name actual expected)
|
||||
(if
|
||||
(= actual expected)
|
||||
(set! flow-rec-pass (+ flow-rec-pass 1))
|
||||
(begin
|
||||
(set! flow-rec-fail (+ flow-rec-fail 1))
|
||||
(append! flow-rec-fails {:name name :expected expected :actual actual})))))
|
||||
|
||||
(define flow-r (fn (src) (flow-run src)))
|
||||
|
||||
;; ── export / wipe / import ──────────────────────────────────────
|
||||
(flow-rec-test
|
||||
"export nulls the live procedure"
|
||||
(flow-r
|
||||
"(defflow w (lambda (x) (suspend (quote await)))) (flow/start w 10) (car (cdr (car (cdr (car (flow-store-export))))))")
|
||||
false)
|
||||
(flow-rec-test
|
||||
"a wiped store loses the flow (process death)"
|
||||
(flow-r
|
||||
"(defflow w (lambda (x) (suspend (quote await)))) (define id (car (cdr (flow/start w 10)))) (set! flow-store (list)) (flow/resume id 1)")
|
||||
(list "flow-error" "no-such-flow"))
|
||||
(flow-rec-test
|
||||
"import restores a wiped store and resume completes"
|
||||
(flow-r
|
||||
"(defflow w (sequence (lambda (x) (suspend (quote await))) (lambda (c) (list (quote done) c)))) (define id (car (cdr (flow/start w 10)))) (define saved (flow-store-export)) (set! flow-store (list)) (flow-store-import! saved) (flow/resume id 777)")
|
||||
(list "done" 777))
|
||||
|
||||
;; ── resumable scan ──────────────────────────────────────────────
|
||||
(flow-rec-test
|
||||
"resumable-ids lists the suspended flow after import"
|
||||
(flow-r
|
||||
"(defflow w (lambda (x) (suspend (quote await)))) (define id (car (cdr (flow/start w 10)))) (define saved (flow-store-export)) (set! flow-store (list)) (flow-store-import! saved) (flow-resumable-ids)")
|
||||
(list 1))
|
||||
(flow-rec-test
|
||||
"resumable-ids excludes completed flows"
|
||||
(flow-r
|
||||
"(defflow w (sequence (lambda (x) (suspend (quote await))) (lambda (c) c))) (define id (car (cdr (flow/start w 10)))) (flow/resume id 5) (flow-resumable-ids)")
|
||||
(list))
|
||||
(flow-rec-test
|
||||
"resumable-ids excludes cancelled flows after import"
|
||||
(flow-r
|
||||
"(defflow w (lambda (x) (suspend (quote await)))) (define id (car (cdr (flow/start w 10)))) (flow/cancel id) (define saved (flow-store-export)) (set! flow-store (list)) (flow-store-import! saved) (flow-resumable-ids)")
|
||||
(list))
|
||||
|
||||
;; ── restart at every step ───────────────────────────────────────
|
||||
(flow-rec-test
|
||||
"two suspends survive a restart between each step"
|
||||
(flow-r
|
||||
"(defflow two (sequence (lambda (x) (suspend (quote a))) (lambda (x) (suspend (quote b))) (lambda (x) (list (quote end) x)))) (define id (car (cdr (flow/start two 0)))) (define s1 (flow-store-export)) (set! flow-store (list)) (flow-store-import! s1) (flow/resume id 100) (define s2 (flow-store-export)) (set! flow-store (list)) (flow-store-import! s2) (flow/resume id 200)")
|
||||
(list "end" 200))
|
||||
(flow-rec-test
|
||||
"import preserves the replay log (earlier value survives restart)"
|
||||
(flow-r
|
||||
"(defflow two (sequence (lambda (x) (suspend (quote a))) (lambda (x) (suspend (quote b))) (lambda (x) (list x)))) (define id (car (cdr (flow/start two 0)))) (flow/resume id 11) (define saved (flow-store-export)) (set! flow-store (list)) (flow-store-import! saved) (flow/resume id 22)")
|
||||
(list 22))
|
||||
|
||||
(define flow-rec-tests-run! (fn () {:total (+ flow-rec-pass flow-rec-fail) :passed flow-rec-pass :failed flow-rec-fail :fails flow-rec-fails}))
|
||||
@@ -16,7 +16,7 @@ federation extension via fed-sx for remote-node execution.
|
||||
|
||||
## Status (rolling)
|
||||
|
||||
`bash lib/flow/conformance.sh` → **66/66** (Phases 1-2 done; Phase 3 suspend/resume/cancel done, crash-recovery next)
|
||||
`bash lib/flow/conformance.sh` → **74/74** (Phases 1-3 done; Phase 4 fed-sx next)
|
||||
|
||||
## Ground rules
|
||||
|
||||
@@ -104,7 +104,11 @@ lib/flow/spec.sx lib/flow/runtime.sx lib/flow/store.sx
|
||||
result on completion, `(flow-suspended id tag)` on a further suspend.
|
||||
- [x] `(flow/cancel id)` — mark cancelled; a later resume is rejected (stale replay
|
||||
cannot wake a cancelled flow).
|
||||
- [ ] crash recovery — on restart, scan store for paused flows, mark resumable
|
||||
- [x] crash recovery — `flow-store-export` (procs nulled → plain data),
|
||||
`flow-store-import!`, `flow-resumable-ids`. Records are name-keyed; resume
|
||||
re-resolves the proc by name (defflow registers names), so a flow survives a
|
||||
wiped store. `tests/recovery.sx`, 8 cases (export/wipe/import, resumable scan,
|
||||
restart-at-every-step, replay-log survival).
|
||||
- [x] `lib/flow/tests/suspend.sx` — 17 cases: start/resume/cancel, multi-step,
|
||||
replay determinism, lifecycle guards, suspend-in-branch
|
||||
- Harness: `flow-run` now reuses one env with a per-test reset (building the full
|
||||
|
||||
Reference in New Issue
Block a user