diff --git a/lib/flow/api.sx b/lib/flow/api.sx index 757bf352..d3e8dce3 100644 --- a/lib/flow/api.sx +++ b/lib/flow/api.sx @@ -39,7 +39,7 @@ (define flow-reset-src - "(set! flow-store (list)) (set! flow-next-id 0) (set! flow-replay-log (list)) (set! flow-suspend-k #f) (set! flow-timeout-budget -1) (set! flow-peers (list))") + "(set! flow-store (list)) (set! flow-next-id 0) (set! flow-replay-log (list)) (set! flow-suspend-k #f) (set! flow-timeout-budget -1) (set! flow-peers (list)) (set! flow-replicas (list))") (define flow-env-cache false) diff --git a/lib/flow/conformance.sh b/lib/flow/conformance.sh index 4fd45ee0..f717029b 100755 --- a/lib/flow/conformance.sh +++ b/lib/flow/conformance.sh @@ -52,7 +52,7 @@ emit_eval () { echo "(epoch $EPOCH)"; echo "(eval \"$1\")"; EPOCH=$((EPOCH+1)); done } > "$TMPFILE" -OUTPUT=$(timeout 180 "$SX_SERVER" < "$TMPFILE" 2>&1 || true) +OUTPUT=$(timeout 300 "$SX_SERVER" < "$TMPFILE" 2>&1 || true) TOTAL_PASS=0 TOTAL_FAIL=0 diff --git a/lib/flow/remote.sx b/lib/flow/remote.sx index 8d998f49..2ddc6a1a 100644 --- a/lib/flow/remote.sx +++ b/lib/flow/remote.sx @@ -12,11 +12,20 @@ ;; (remote-node addr fn) — a node that runs fn on the peer at addr. ;; (remote-failover addrs fn local) — try fn on each peer in addrs in order; on a ;; raised error move to the next peer; if every peer fails, run the `local` -;; node as a fallback. Threads the input through unchanged. +;; node as a fallback. +;; +;; Persistence across instances + handoff. Each instance runs the same flow +;; definitions, so the only thing that needs to cross the wire is the (plain-data) +;; store — exactly flow-store-export from store.sx. Replication pushes that export +;; to a peer's replica slot; handoff = restore the replica on the peer and resume. +;; +;; (flow-replicate-to addr) — copy this instance's store to peer addr's replica +;; (flow-restore-from addr) — import the replica from peer addr (#t / #f) +;; (flow-replica-get addr) — the raw replicated store at addr (or #f) (define flow-remote-src - "(define flow-peers (list))\n (define (flow-assoc key alist)\n (if (null? alist)\n #f\n (if (eq? (car (car alist)) key) (car (cdr (car alist))) (flow-assoc key (cdr alist)))))\n (define (flow-peer-register! addr table) (set! flow-peers (cons (list addr table) flow-peers)))\n (define (flow-transport addr fn input)\n (let ((table (flow-assoc addr flow-peers)))\n (if table\n (let ((proc (flow-assoc fn table)))\n (if proc (proc input) (raise (quote flow-remote-no-fn))))\n (raise (quote flow-remote-unreachable)))))\n (define (remote-node addr fn) (lambda (input) (flow-transport addr fn input)))\n (define (flow-failover-step addrs fn input local)\n (if (null? addrs)\n (local input)\n (guard (e (#t (flow-failover-step (cdr addrs) fn input local)))\n (flow-transport (car addrs) fn input))))\n (define (remote-failover addrs fn local)\n (lambda (input) (flow-failover-step addrs fn input local)))") + "(define flow-peers (list))\n (define (flow-assoc key alist)\n (if (null? alist)\n #f\n (if (eq? (car (car alist)) key) (car (cdr (car alist))) (flow-assoc key (cdr alist)))))\n (define (flow-peer-register! addr table) (set! flow-peers (cons (list addr table) flow-peers)))\n (define (flow-transport addr fn input)\n (let ((table (flow-assoc addr flow-peers)))\n (if table\n (let ((proc (flow-assoc fn table)))\n (if proc (proc input) (raise (quote flow-remote-no-fn))))\n (raise (quote flow-remote-unreachable)))))\n (define (remote-node addr fn) (lambda (input) (flow-transport addr fn input)))\n (define (flow-failover-step addrs fn input local)\n (if (null? addrs)\n (local input)\n (guard (e (#t (flow-failover-step (cdr addrs) fn input local)))\n (flow-transport (car addrs) fn input))))\n (define (remote-failover addrs fn local)\n (lambda (input) (flow-failover-step addrs fn input local)))\n\n (define flow-replicas (list))\n (define (flow-replicas-remove addr reps)\n (if (null? reps)\n (list)\n (if (eq? (car (car reps)) addr)\n (flow-replicas-remove addr (cdr reps))\n (cons (car reps) (flow-replicas-remove addr (cdr reps))))))\n (define (flow-replicate-to addr)\n (set! flow-replicas (cons (list addr (flow-store-export)) (flow-replicas-remove addr flow-replicas))))\n (define (flow-replica-get addr) (flow-assoc addr flow-replicas))\n (define (flow-restore-from addr)\n (let ((data (flow-replica-get addr)))\n (if data (begin (flow-store-import! data) #t) #f)))") (define flow-load-remote! diff --git a/lib/flow/scoreboard.json b/lib/flow/scoreboard.json index dcc71985..a31a9169 100644 --- a/lib/flow/scoreboard.json +++ b/lib/flow/scoreboard.json @@ -1,13 +1,13 @@ { - "total": 87, - "passed": 87, + "total": 93, + "passed": 93, "failed": 0, "suites": { "basic": { "passed": 18, "total": 18 }, "control": { "passed": 31, "total": 31 }, "suspend": { "passed": 17, "total": 17 }, "recovery": { "passed": 8, "total": 8 }, - "distributed": { "passed": 13, "total": 13 } + "distributed": { "passed": 19, "total": 19 } }, - "phases": { "phase1": "done", "phase2": "done", "phase3": "done", "phase4": "in-progress" } + "phases": { "phase1": "done", "phase2": "done", "phase3": "done", "phase4": "done" } } diff --git a/lib/flow/scoreboard.md b/lib/flow/scoreboard.md index 05749e04..1d2e832d 100644 --- a/lib/flow/scoreboard.md +++ b/lib/flow/scoreboard.md @@ -1,6 +1,6 @@ # flow-on-sx Scoreboard -**All tests pass: 87 / 87 across 5 suites.** +**All tests pass: 93 / 93 across 5 suites. All four phases complete.** `bash lib/flow/conformance.sh` @@ -12,7 +12,7 @@ | control | 31 | Phase 2: `branch` (6); error model `fail`/`failed?`/`fail-reason` (6); `try-catch` (6); `retry n` (6); `timeout` cooperative step budget (7) | | suspend | 17 | Phase 3: suspend/resume/cancel via deterministic replay; multi-step, replay determinism, lifecycle guards, suspend-in-branch | | recovery | 8 | Phase 3: crash recovery — store export/import, resumable scan, restart-at-every-step, replay-log survival | -| distributed | 13 | Phase 4: `remote-node` on mock fed-sx peers (7); `remote-failover` across peers + local fallback (6) | +| distributed | 19 | Phase 4: `remote-node` (7); `remote-failover` (6); replication + handoff across instances (6) | ## Architecture @@ -41,6 +41,6 @@ capture the flow continuation directly. - [x] Phase 1 — Declarative DAG + sequential execution (combinators + 18 tests, `flow/start`) - [x] Phase 2 — Control flow + error handling (branch, error model, try-catch, retry, timeout) - [x] Phase 3 — Suspend/resume (suspend/resume/cancel + crash recovery via deterministic replay) -- [~] Phase 4 — Distributed nodes via fed-sx (remote-node done; failover + handoff next) +- [x] Phase 4 — Distributed nodes via fed-sx (remote-node, failover, replication + handoff) - [ ] Phase 3 — Suspend / resume (the showcase) - [ ] Phase 4 — Distributed nodes via fed-sx diff --git a/lib/flow/tests/distributed.sx b/lib/flow/tests/distributed.sx index 456f1c68..cd6bbb49 100644 --- a/lib/flow/tests/distributed.sx +++ b/lib/flow/tests/distributed.sx @@ -86,4 +86,35 @@ "(flow-peer-register! (quote p) (list (list (quote f) (lambda (x) (* x 2))))) (flow/start (sequence (remote-failover (list (quote down) (quote p)) (quote f) (flow-const 1)) (lambda (x) (+ x 3))) 5)") 13) +;; ── replication + handoff ─────────────────────────────────────── +(flow-dist-test + "replicate: a peer holds the exported store" + (flow-d + "(defflow w (lambda (x) (suspend (quote q)))) (flow/start w 10) (flow-replicate-to (quote peerB)) (if (flow-replica-get (quote peerB)) (quote replicated) (quote missing))") + "replicated") +(flow-dist-test + "handoff: a peer resumes a flow after the local instance dies" + (flow-d + "(defflow w (sequence (lambda (x) (suspend (quote q))) (lambda (v) (list (quote done) v)))) (define id (car (cdr (flow/start w 10)))) (flow-replicate-to (quote peerB)) (set! flow-store (list)) (flow-restore-from (quote peerB)) (flow/resume id 55)") + (list "done" 55)) +(flow-dist-test + "handoff: restored peer reports the flow as resumable" + (flow-d + "(defflow w (lambda (x) (suspend (quote q)))) (define id (car (cdr (flow/start w 10)))) (flow-replicate-to (quote peerB)) (set! flow-store (list)) (flow-restore-from (quote peerB)) (flow-resumable-ids)") + (list 1)) +(flow-dist-test + "handoff: without restore the dead instance has lost the flow" + (flow-d + "(defflow w (lambda (x) (suspend (quote q)))) (define id (car (cdr (flow/start w 10)))) (flow-replicate-to (quote peerB)) (set! flow-store (list)) (flow/resume id 1)") + (list "flow-error" "no-such-flow")) +(flow-dist-test + "restore: from an unknown peer yields false" + (flow-d "(flow-restore-from (quote nowhere))") + false) +(flow-dist-test + "handoff: replication preserves the replay log across the move" + (flow-d + "(defflow two (sequence (lambda (x) (suspend (quote a))) (lambda (x) (suspend (quote b))) (lambda (x) (list x)))) (define id (car (cdr (flow/start two 0)))) (flow/resume id 11) (flow-replicate-to (quote peerB)) (set! flow-store (list)) (flow-restore-from (quote peerB)) (flow/resume id 22)") + (list 22)) + (define flow-dist-tests-run! (fn () {:total (+ flow-dist-pass flow-dist-fail) :passed flow-dist-pass :failed flow-dist-fail :fails flow-dist-fails})) diff --git a/plans/flow-on-sx.md b/plans/flow-on-sx.md index 6b41030c..2a8f2e5d 100644 --- a/plans/flow-on-sx.md +++ b/plans/flow-on-sx.md @@ -16,7 +16,7 @@ federation extension via fed-sx for remote-node execution. ## Status (rolling) -`bash lib/flow/conformance.sh` → **87/87** (Phases 1-3 done; Phase 4 in progress: remote-node + failover done) +`bash lib/flow/conformance.sh` → **93/93** (all four phases complete) ## Ground rules @@ -123,9 +123,15 @@ lib/flow/spec.sx lib/flow/runtime.sx lib/flow/store.sx - [x] failure semantics — `(remote-failover addrs fn local)` tries each peer in order, moves to the next on any raised error, and runs the `local` node if every peer fails. 6 tests. -- [ ] persistence across instances — flow state replicates via fed-sx -- [ ] handoff — flow started here can resume on a peer if the local instance is down -- [ ] `lib/flow/tests/distributed.sx` — federated flow scenarios (mock fed-sx in tests) +- [x] persistence across instances — `(flow-replicate-to addr)` copies this + instance's store (the plain-data export) to a peer's replica slot; + `(flow-restore-from addr)` imports it. Same mechanism as crash recovery, across + instances. +- [x] handoff — a flow started here resumes on a peer after the local instance dies: + replicate → wipe local store → restore on peer → `flow/resume`. The replay log + (and thus all resolved suspends) survives the move. +- [x] `lib/flow/tests/distributed.sx` — 19 cases: remote-node, failover, + replication, handoff (including replay-log survival across the move) ## Progress log @@ -139,6 +145,15 @@ lib/flow/spec.sx lib/flow/runtime.sx lib/flow/store.sx combinators use `(lambda args ...)` variadics + top-level recursion. Scheme strings come back boxed as `{:scm-string "..."}` — unwrap with `(get s :scm-string)`. +- **Phases 2-4.** Control flow (branch/retry/timeout/try-catch + fail-value error + model), then the showcase: durable suspend/resume. Guest call/cc is escape-only + (re-entry hangs), so resume uses **deterministic replay** — re-run the flow, + replaying resolved suspends from a `(tag value)` log; only plain data persists, so + flows survive a wiped store (crash recovery) and a move to another instance + (replication + handoff). Phase 4 models the fed-sx boundary with a mock peer + registry. Timeout is a cooperative step budget (no wall clock in pure SX). Test + harness reuses one env with a per-test reset for speed. + ## Blockers (none)