diff --git a/lib/flow/conformance.sh b/lib/flow/conformance.sh index f53f1fe7..237b288b 100755 --- a/lib/flow/conformance.sh +++ b/lib/flow/conformance.sh @@ -24,6 +24,7 @@ SUITES=( "basic flow-basic-tests-run! lib/flow/tests/basic.sx" "control flow-ctl-tests-run! lib/flow/tests/control.sx" "suspend flow-sus-tests-run! lib/flow/tests/suspend.sx" + "recovery flow-rec-tests-run! lib/flow/tests/recovery.sx" ) TMPFILE=$(mktemp); trap "rm -f $TMPFILE" EXIT diff --git a/lib/flow/scoreboard.json b/lib/flow/scoreboard.json index 333ae875..63dff0b8 100644 --- a/lib/flow/scoreboard.json +++ b/lib/flow/scoreboard.json @@ -1,11 +1,12 @@ { - "total": 66, - "passed": 66, + "total": 74, + "passed": 74, "failed": 0, "suites": { "basic": { "passed": 18, "total": 18 }, "control": { "passed": 31, "total": 31 }, - "suspend": { "passed": 17, "total": 17 } + "suspend": { "passed": 17, "total": 17 }, + "recovery": { "passed": 8, "total": 8 } }, - "phases": { "phase1": "done", "phase2": "done", "phase3": "in-progress" } + "phases": { "phase1": "done", "phase2": "done", "phase3": "done", "phase4": "pending" } } diff --git a/lib/flow/scoreboard.md b/lib/flow/scoreboard.md index c9acd6d6..e47a695a 100644 --- a/lib/flow/scoreboard.md +++ b/lib/flow/scoreboard.md @@ -1,6 +1,6 @@ # flow-on-sx Scoreboard -**All tests pass: 66 / 66 across 3 suites.** +**All tests pass: 74 / 74 across 4 suites.** `bash lib/flow/conformance.sh` @@ -11,6 +11,7 @@ | basic | 18 | Phase 1: single nodes, linear sequence, data-flow threading, defflow, parallel fan/join, nested composition, publish-shaped flow | | control | 31 | Phase 2: `branch` (6); error model `fail`/`failed?`/`fail-reason` (6); `try-catch` (6); `retry n` (6); `timeout` cooperative step budget (7) | | suspend | 17 | Phase 3: suspend/resume/cancel via deterministic replay; multi-step, replay determinism, lifecycle guards, suspend-in-branch | +| recovery | 8 | Phase 3: crash recovery — store export/import, resumable scan, restart-at-every-step, replay-log survival | ## Architecture @@ -38,6 +39,6 @@ capture the flow continuation directly. - [x] Phase 1 — Declarative DAG + sequential execution (combinators + 18 tests, `flow/start`) - [x] Phase 2 — Control flow + error handling (branch, error model, try-catch, retry, timeout) -- [~] Phase 3 — Suspend/resume (suspend/resume/cancel done via deterministic replay; crash-recovery next) +- [x] Phase 3 — Suspend/resume (suspend/resume/cancel + crash recovery via deterministic replay) - [ ] Phase 3 — Suspend / resume (the showcase) - [ ] Phase 4 — Distributed nodes via fed-sx diff --git a/lib/flow/spec.sx b/lib/flow/spec.sx index 72afeaed..8a610724 100644 --- a/lib/flow/spec.sx +++ b/lib/flow/spec.sx @@ -8,6 +8,8 @@ ;; ;; Phase 1 combinators (flow-combinators-src): ;; flow-node / flow-id / flow-const / sequence / parallel / defflow +;; defflow both binds the flow and registers it by name (flow-register!, in +;; store.sx) so it can be re-resolved after a process restart. ;; ;; Phase 2 combinators (flow-control-src): ;; branch / fail / failed? / fail-reason / try-catch / retry / timeout / tick @@ -30,7 +32,7 @@ (define flow-combinators-src - "(define (flow-node f) f)\n (define (flow-id input) input)\n (define (flow-const v) (lambda (input) v))\n (define (flow-seq-step ns v)\n (if (null? ns) v (flow-seq-step (cdr ns) ((car ns) v))))\n (define sequence (lambda ns (lambda (input) (flow-seq-step ns input))))\n (define parallel (lambda ns (lambda (input) (map (lambda (n) (n input)) ns))))\n (define-syntax defflow\n (syntax-rules ()\n ((defflow nm body) (define nm body))))") + "(define (flow-node f) f)\n (define (flow-id input) input)\n (define (flow-const v) (lambda (input) v))\n (define (flow-seq-step ns v)\n (if (null? ns) v (flow-seq-step (cdr ns) ((car ns) v))))\n (define sequence (lambda ns (lambda (input) (flow-seq-step ns input))))\n (define parallel (lambda ns (lambda (input) (map (lambda (n) (n input)) ns))))\n (define-syntax defflow\n (syntax-rules ()\n ((defflow nm body)\n (begin (define nm body) (flow-register! (quote nm) nm)))))") (define flow-control-src diff --git a/lib/flow/store.sx b/lib/flow/store.sx index 06a6251f..37284524 100644 --- a/lib/flow/store.sx +++ b/lib/flow/store.sx @@ -1,26 +1,32 @@ -;; lib/flow/store.sx — durable flow store + lifecycle (Phase 3). +;; lib/flow/store.sx — durable flow store + lifecycle + crash recovery (Phase 3). ;; -;; The store maps flow-id -> record. A record is a plain list: -;; (flow input log status payload) -;; flow — the flow procedure (live; re-resolved by name on restart) +;; The store maps flow-id -> record (one entry per id, newest wins). A record is: +;; (name proc input log status payload) +;; name — registered flow name (symbol) or #f for an anonymous flow +;; proc — the live flow procedure (#f after export; re-resolved by name) ;; input — the original start input (plain data) ;; log — replay log: list of (tag value) entries (plain data) ;; status — done | suspended | cancelled ;; payload — result (done) | waiting-tag (suspended) | #f (cancelled) ;; +;; A record is SERIALIZABLE once its proc is nulled (flow-store-export): name, +;; input, and log are plain data. On restart the flow definitions are reloaded +;; (defflow re-registers names), the store is reimported, and resume re-resolves +;; the proc by name — no live continuation is ever persisted. +;; ;; Lifecycle (all use deterministic replay via flow-drive — see spec.sx): -;; (flow/start flow input) — run from empty log. If it completes, return the raw -;; result (backward compatible with Phases 1-2). If it suspends, register the -;; record and return (flow-suspended id tag). -;; (flow/resume id value) — append (tag value) to the log and re-drive. Returns -;; the raw result on completion, (flow-suspended id tag) on a further suspend, -;; or (flow-error reason) if the id is unknown / not suspended. -;; (flow/cancel id) — mark cancelled; a later resume is rejected (the stale -;; replay can never wake a cancelled flow). +;; (flow/start flow input) — raw result if it completes (backward compatible), +;; else (flow-suspended id tag). +;; (flow/resume id value) — append (tag value) to the log and re-drive. +;; (flow/cancel id) — mark cancelled; a later resume is rejected. +;; Crash recovery: +;; (flow-store-export) — store as plain data (procs nulled) +;; (flow-store-import! d) — replace the store from exported data +;; (flow-resumable-ids) — ids of suspended (resumable) flows (define flow-store-src - "(define flow-store (list))\n (define flow-next-id 0)\n (define (flow-store-put! id rec) (set! flow-store (cons (list id rec) flow-store)))\n (define (flow-store-find id store)\n (if (null? store)\n (list)\n (if (= (car (car store)) id)\n (car (cdr (car store)))\n (flow-store-find id (cdr store)))))\n (define (flow-store-get id) (flow-store-find id flow-store))\n (define (flow-mk-rec flow input log status payload)\n (list flow input log status payload))\n (define (flow-rec-flow r) (car r))\n (define (flow-rec-input r) (car (cdr r)))\n (define (flow-rec-log r) (car (cdr (cdr r))))\n (define (flow-rec-status r) (car (cdr (cdr (cdr r)))))\n (define (flow-rec-payload r) (car (cdr (cdr (cdr (cdr r))))))\n (define (flow-outcome id flow input log outcome)\n (if (eq? (car outcome) (quote flow-done))\n (begin\n (flow-store-put! id (flow-mk-rec flow input log (quote done) (car (cdr outcome))))\n (car (cdr outcome)))\n (begin\n (flow-store-put! id (flow-mk-rec flow input log (quote suspended) (car (cdr outcome))))\n (list (quote flow-suspended) id (car (cdr outcome))))))\n (define (flow/start flow input)\n (set! flow-next-id (+ flow-next-id 1))\n (flow-outcome flow-next-id flow input (list) (flow-drive flow input (list))))\n (define (flow/resume id value)\n (let ((rec (flow-store-get id)))\n (if (null? rec)\n (list (quote flow-error) (quote no-such-flow))\n (if (eq? (flow-rec-status rec) (quote suspended))\n (let ((newlog (cons (list (flow-rec-payload rec) value) (flow-rec-log rec))))\n (flow-outcome id (flow-rec-flow rec) (flow-rec-input rec) newlog\n (flow-drive (flow-rec-flow rec) (flow-rec-input rec) newlog)))\n (list (quote flow-error) (quote not-suspended))))))\n (define (flow/cancel id)\n (let ((rec (flow-store-get id)))\n (if (null? rec)\n (list (quote flow-error) (quote no-such-flow))\n (begin\n (flow-store-put! id\n (flow-mk-rec (flow-rec-flow rec) (flow-rec-input rec) (flow-rec-log rec) (quote cancelled) #f))\n (list (quote flow-cancelled) id)))))") + "(define flow-registry (list))\n (define (flow-register! name proc) (set! flow-registry (cons (list name proc) flow-registry)))\n (define (flow-lookup-in name reg)\n (if (null? reg)\n #f\n (if (eq? (car (car reg)) name) (car (cdr (car reg))) (flow-lookup-in name (cdr reg)))))\n (define (flow-lookup name) (flow-lookup-in name flow-registry))\n (define (flow-name-of proc reg)\n (if (null? reg)\n #f\n (if (eq? (car (cdr (car reg))) proc) (car (car reg)) (flow-name-of proc (cdr reg)))))\n\n (define flow-store (list))\n (define flow-next-id 0)\n (define (flow-store-remove id store)\n (if (null? store)\n (list)\n (if (= (car (car store)) id)\n (flow-store-remove id (cdr store))\n (cons (car store) (flow-store-remove id (cdr store))))))\n (define (flow-store-put! id rec) (set! flow-store (cons (list id rec) (flow-store-remove id flow-store))))\n (define (flow-store-find id store)\n (if (null? store)\n (list)\n (if (= (car (car store)) id)\n (car (cdr (car store)))\n (flow-store-find id (cdr store)))))\n (define (flow-store-get id) (flow-store-find id flow-store))\n\n (define (flow-mk-rec name proc input log status payload)\n (list name proc input log status payload))\n (define (flow-rec-name r) (car r))\n (define (flow-rec-proc r) (car (cdr r)))\n (define (flow-rec-input r) (car (cdr (cdr r))))\n (define (flow-rec-log r) (car (cdr (cdr (cdr r)))))\n (define (flow-rec-status r) (car (cdr (cdr (cdr (cdr r))))))\n (define (flow-rec-payload r) (car (cdr (cdr (cdr (cdr (cdr r)))))))\n (define (flow-rec-resolve rec)\n (let ((byname (flow-lookup (flow-rec-name rec))))\n (if byname byname (flow-rec-proc rec))))\n\n (define (flow-outcome id name proc input log outcome)\n (if (eq? (car outcome) (quote flow-done))\n (begin\n (flow-store-put! id (flow-mk-rec name proc input log (quote done) (car (cdr outcome))))\n (car (cdr outcome)))\n (begin\n (flow-store-put! id (flow-mk-rec name proc input log (quote suspended) (car (cdr outcome))))\n (list (quote flow-suspended) id (car (cdr outcome))))))\n (define (flow/start flow input)\n (set! flow-next-id (+ flow-next-id 1))\n (flow-outcome flow-next-id (flow-name-of flow flow-registry) flow input (list)\n (flow-drive flow input (list))))\n (define (flow/resume id value)\n (let ((rec (flow-store-get id)))\n (if (null? rec)\n (list (quote flow-error) (quote no-such-flow))\n (if (eq? (flow-rec-status rec) (quote suspended))\n (let ((proc (flow-rec-resolve rec)))\n (let ((newlog (cons (list (flow-rec-payload rec) value) (flow-rec-log rec))))\n (flow-outcome id (flow-rec-name rec) proc (flow-rec-input rec) newlog\n (flow-drive proc (flow-rec-input rec) newlog))))\n (list (quote flow-error) (quote not-suspended))))))\n (define (flow/cancel id)\n (let ((rec (flow-store-get id)))\n (if (null? rec)\n (list (quote flow-error) (quote no-such-flow))\n (begin\n (flow-store-put! id\n (flow-mk-rec (flow-rec-name rec) (flow-rec-proc rec) (flow-rec-input rec)\n (flow-rec-log rec) (quote cancelled) #f))\n (list (quote flow-cancelled) id)))))\n\n (define (flow-export-entry entry)\n (let ((rec (car (cdr entry))))\n (list (car entry)\n (flow-mk-rec (flow-rec-name rec) #f (flow-rec-input rec)\n (flow-rec-log rec) (flow-rec-status rec) (flow-rec-payload rec)))))\n (define (flow-export-map store)\n (if (null? store) (list) (cons (flow-export-entry (car store)) (flow-export-map (cdr store)))))\n (define (flow-store-export) (flow-export-map flow-store))\n (define (flow-max-id store m)\n (if (null? store) m (flow-max-id (cdr store) (if (> (car (car store)) m) (car (car store)) m))))\n (define (flow-store-import! data)\n (begin (set! flow-store data) (set! flow-next-id (flow-max-id data 0))))\n (define (flow-collect-resumable store)\n (if (null? store)\n (list)\n (if (eq? (flow-rec-status (car (cdr (car store)))) (quote suspended))\n (cons (car (car store)) (flow-collect-resumable (cdr store)))\n (flow-collect-resumable (cdr store)))))\n (define (flow-resumable-ids) (flow-collect-resumable flow-store))") (define flow-load-store! diff --git a/lib/flow/tests/recovery.sx b/lib/flow/tests/recovery.sx new file mode 100644 index 00000000..cbe3b2f4 --- /dev/null +++ b/lib/flow/tests/recovery.sx @@ -0,0 +1,71 @@ +;; lib/flow/tests/recovery.sx — Phase 3: crash recovery (store export/import + restart). +;; +;; "restart" is simulated within one program: (set! flow-store (list)) wipes the +;; in-memory store (process death), while flow-registry persists as it would after +;; reloading flow definitions. Recovery = import the exported (plain-data) store and +;; resume; the flow proc is re-resolved by name. + +(define flow-rec-pass 0) +(define flow-rec-fail 0) +(define flow-rec-fails (list)) + +(define + flow-rec-test + (fn + (name actual expected) + (if + (= actual expected) + (set! flow-rec-pass (+ flow-rec-pass 1)) + (begin + (set! flow-rec-fail (+ flow-rec-fail 1)) + (append! flow-rec-fails {:name name :expected expected :actual actual}))))) + +(define flow-r (fn (src) (flow-run src))) + +;; ── export / wipe / import ────────────────────────────────────── +(flow-rec-test + "export nulls the live procedure" + (flow-r + "(defflow w (lambda (x) (suspend (quote await)))) (flow/start w 10) (car (cdr (car (cdr (car (flow-store-export))))))") + false) +(flow-rec-test + "a wiped store loses the flow (process death)" + (flow-r + "(defflow w (lambda (x) (suspend (quote await)))) (define id (car (cdr (flow/start w 10)))) (set! flow-store (list)) (flow/resume id 1)") + (list "flow-error" "no-such-flow")) +(flow-rec-test + "import restores a wiped store and resume completes" + (flow-r + "(defflow w (sequence (lambda (x) (suspend (quote await))) (lambda (c) (list (quote done) c)))) (define id (car (cdr (flow/start w 10)))) (define saved (flow-store-export)) (set! flow-store (list)) (flow-store-import! saved) (flow/resume id 777)") + (list "done" 777)) + +;; ── resumable scan ────────────────────────────────────────────── +(flow-rec-test + "resumable-ids lists the suspended flow after import" + (flow-r + "(defflow w (lambda (x) (suspend (quote await)))) (define id (car (cdr (flow/start w 10)))) (define saved (flow-store-export)) (set! flow-store (list)) (flow-store-import! saved) (flow-resumable-ids)") + (list 1)) +(flow-rec-test + "resumable-ids excludes completed flows" + (flow-r + "(defflow w (sequence (lambda (x) (suspend (quote await))) (lambda (c) c))) (define id (car (cdr (flow/start w 10)))) (flow/resume id 5) (flow-resumable-ids)") + (list)) +(flow-rec-test + "resumable-ids excludes cancelled flows after import" + (flow-r + "(defflow w (lambda (x) (suspend (quote await)))) (define id (car (cdr (flow/start w 10)))) (flow/cancel id) (define saved (flow-store-export)) (set! flow-store (list)) (flow-store-import! saved) (flow-resumable-ids)") + (list)) + +;; ── restart at every step ─────────────────────────────────────── +(flow-rec-test + "two suspends survive a restart between each step" + (flow-r + "(defflow two (sequence (lambda (x) (suspend (quote a))) (lambda (x) (suspend (quote b))) (lambda (x) (list (quote end) x)))) (define id (car (cdr (flow/start two 0)))) (define s1 (flow-store-export)) (set! flow-store (list)) (flow-store-import! s1) (flow/resume id 100) (define s2 (flow-store-export)) (set! flow-store (list)) (flow-store-import! s2) (flow/resume id 200)") + (list "end" 200)) +(flow-rec-test + "import preserves the replay log (earlier value survives restart)" + (flow-r + "(defflow two (sequence (lambda (x) (suspend (quote a))) (lambda (x) (suspend (quote b))) (lambda (x) (list x)))) (define id (car (cdr (flow/start two 0)))) (flow/resume id 11) (define saved (flow-store-export)) (set! flow-store (list)) (flow-store-import! saved) (flow/resume id 22)") + (list 22)) + +(define flow-rec-tests-run! (fn () {:total (+ flow-rec-pass flow-rec-fail) :passed flow-rec-pass :failed flow-rec-fail :fails flow-rec-fails})) diff --git a/plans/flow-on-sx.md b/plans/flow-on-sx.md index 6ee6b710..cef6d66c 100644 --- a/plans/flow-on-sx.md +++ b/plans/flow-on-sx.md @@ -16,7 +16,7 @@ federation extension via fed-sx for remote-node execution. ## Status (rolling) -`bash lib/flow/conformance.sh` → **66/66** (Phases 1-2 done; Phase 3 suspend/resume/cancel done, crash-recovery next) +`bash lib/flow/conformance.sh` → **74/74** (Phases 1-3 done; Phase 4 fed-sx next) ## Ground rules @@ -104,7 +104,11 @@ lib/flow/spec.sx lib/flow/runtime.sx lib/flow/store.sx result on completion, `(flow-suspended id tag)` on a further suspend. - [x] `(flow/cancel id)` — mark cancelled; a later resume is rejected (stale replay cannot wake a cancelled flow). -- [ ] crash recovery — on restart, scan store for paused flows, mark resumable +- [x] crash recovery — `flow-store-export` (procs nulled → plain data), + `flow-store-import!`, `flow-resumable-ids`. Records are name-keyed; resume + re-resolves the proc by name (defflow registers names), so a flow survives a + wiped store. `tests/recovery.sx`, 8 cases (export/wipe/import, resumable scan, + restart-at-every-step, replay-log survival). - [x] `lib/flow/tests/suspend.sx` — 17 cases: start/resume/cancel, multi-step, replay determinism, lifecycle guards, suspend-in-branch - Harness: `flow-run` now reuses one env with a per-test reset (building the full