diff --git a/lib/flow/remote.sx b/lib/flow/remote.sx index 7b42ab44..8d998f49 100644 --- a/lib/flow/remote.sx +++ b/lib/flow/remote.sx @@ -10,10 +10,13 @@ ;; (flow-remote-unreachable) if the addr is unknown, (flow-remote-no-fn) if the ;; peer does not expose fn. ;; (remote-node addr fn) — a node that runs fn on the peer at addr. +;; (remote-failover addrs fn local) — try fn on each peer in addrs in order; on a +;; raised error move to the next peer; if every peer fails, run the `local` +;; node as a fallback. Threads the input through unchanged. (define flow-remote-src - "(define flow-peers (list))\n (define (flow-assoc key alist)\n (if (null? alist)\n #f\n (if (eq? (car (car alist)) key) (car (cdr (car alist))) (flow-assoc key (cdr alist)))))\n (define (flow-peer-register! addr table) (set! flow-peers (cons (list addr table) flow-peers)))\n (define (flow-transport addr fn input)\n (let ((table (flow-assoc addr flow-peers)))\n (if table\n (let ((proc (flow-assoc fn table)))\n (if proc (proc input) (raise (quote flow-remote-no-fn))))\n (raise (quote flow-remote-unreachable)))))\n (define (remote-node addr fn) (lambda (input) (flow-transport addr fn input)))") + "(define flow-peers (list))\n (define (flow-assoc key alist)\n (if (null? alist)\n #f\n (if (eq? (car (car alist)) key) (car (cdr (car alist))) (flow-assoc key (cdr alist)))))\n (define (flow-peer-register! addr table) (set! flow-peers (cons (list addr table) flow-peers)))\n (define (flow-transport addr fn input)\n (let ((table (flow-assoc addr flow-peers)))\n (if table\n (let ((proc (flow-assoc fn table)))\n (if proc (proc input) (raise (quote flow-remote-no-fn))))\n (raise (quote flow-remote-unreachable)))))\n (define (remote-node addr fn) (lambda (input) (flow-transport addr fn input)))\n (define (flow-failover-step addrs fn input local)\n (if (null? addrs)\n (local input)\n (guard (e (#t (flow-failover-step (cdr addrs) fn input local)))\n (flow-transport (car addrs) fn input))))\n (define (remote-failover addrs fn local)\n (lambda (input) (flow-failover-step addrs fn input local)))") (define flow-load-remote! diff --git a/lib/flow/scoreboard.json b/lib/flow/scoreboard.json index fd71294b..dcc71985 100644 --- a/lib/flow/scoreboard.json +++ b/lib/flow/scoreboard.json @@ -1,13 +1,13 @@ { - "total": 81, - "passed": 81, + "total": 87, + "passed": 87, "failed": 0, "suites": { "basic": { "passed": 18, "total": 18 }, "control": { "passed": 31, "total": 31 }, "suspend": { "passed": 17, "total": 17 }, "recovery": { "passed": 8, "total": 8 }, - "distributed": { "passed": 7, "total": 7 } + "distributed": { "passed": 13, "total": 13 } }, "phases": { "phase1": "done", "phase2": "done", "phase3": "done", "phase4": "in-progress" } } diff --git a/lib/flow/scoreboard.md b/lib/flow/scoreboard.md index 45ffe7ea..05749e04 100644 --- a/lib/flow/scoreboard.md +++ b/lib/flow/scoreboard.md @@ -1,6 +1,6 @@ # flow-on-sx Scoreboard -**All tests pass: 81 / 81 across 5 suites.** +**All tests pass: 87 / 87 across 5 suites.** `bash lib/flow/conformance.sh` @@ -12,7 +12,7 @@ | control | 31 | Phase 2: `branch` (6); error model `fail`/`failed?`/`fail-reason` (6); `try-catch` (6); `retry n` (6); `timeout` cooperative step budget (7) | | suspend | 17 | Phase 3: suspend/resume/cancel via deterministic replay; multi-step, replay determinism, lifecycle guards, suspend-in-branch | | recovery | 8 | Phase 3: crash recovery — store export/import, resumable scan, restart-at-every-step, replay-log survival | -| distributed | 7 | Phase 4: `remote-node` on mock fed-sx peers; compose/suspend/retry, unreachable + no-fn errors | +| distributed | 13 | Phase 4: `remote-node` on mock fed-sx peers (7); `remote-failover` across peers + local fallback (6) | ## Architecture diff --git a/lib/flow/tests/distributed.sx b/lib/flow/tests/distributed.sx index 15ab80d6..456f1c68 100644 --- a/lib/flow/tests/distributed.sx +++ b/lib/flow/tests/distributed.sx @@ -17,11 +17,6 @@ (define flow-d (fn (src) (flow-run src))) -;; A mock peer "edge" exposing double/inc, registered at the top of each program. -(define - peer-setup - "(flow-peer-register! (quote edge) (list (list (quote double) (lambda (x) (* x 2))) (list (quote inc) (lambda (x) (+ x 1)))))") - ;; ── remote-node ───────────────────────────────────────────────── (flow-dist-test "remote: a node executes on a peer" @@ -59,4 +54,36 @@ "(define hits 0) (flow-peer-register! (quote edge) (list (list (quote flaky) (lambda (x) (begin (set! hits (+ hits 1)) (if (< hits 2) (raise (quote down)) (* x 3))))))) (list (flow/start (retry 3 (remote-node (quote edge) (quote flaky))) 7) hits)") (list 21 2)) +;; ── failover (retry on a different peer, fall through to local) ── +(flow-dist-test + "failover: first reachable peer serves the request" + (flow-d + "(flow-peer-register! (quote p2) (list (list (quote f) (lambda (x) (+ x 100))))) (flow/start (remote-failover (list (quote p2) (quote down)) (quote f) (flow-const (quote local))) 5)") + 105) +(flow-dist-test + "failover: skips an unreachable peer to the next one" + (flow-d + "(flow-peer-register! (quote p2) (list (list (quote f) (lambda (x) (+ x 100))))) (flow/start (remote-failover (list (quote down) (quote p2)) (quote f) (flow-const (quote local))) 5)") + 105) +(flow-dist-test + "failover: skips a peer whose function raises" + (flow-d + "(flow-peer-register! (quote bad) (list (list (quote f) (lambda (x) (raise (quote boom)))))) (flow-peer-register! (quote good) (list (list (quote f) (lambda (x) (* x 10))))) (flow/start (remote-failover (list (quote bad) (quote good)) (quote f) (flow-const 0)) 4)") + 40) +(flow-dist-test + "failover: all peers fail, the local fallback runs" + (flow-d + "(flow/start (remote-failover (list (quote down1) (quote down2)) (quote f) (lambda (x) (* x -1))) 9)") + -9) +(flow-dist-test + "failover: threads the input through to the chosen peer" + (flow-d + "(flow-peer-register! (quote p) (list (list (quote f) (lambda (x) (list (quote got) x))))) (flow/start (sequence (lambda (x) (+ x 1)) (remote-failover (list (quote p)) (quote f) (flow-const 0))) 41)") + (list "got" 42)) +(flow-dist-test + "failover: composes inside a larger sequence" + (flow-d + "(flow-peer-register! (quote p) (list (list (quote f) (lambda (x) (* x 2))))) (flow/start (sequence (remote-failover (list (quote down) (quote p)) (quote f) (flow-const 1)) (lambda (x) (+ x 3))) 5)") + 13) + (define flow-dist-tests-run! (fn () {:total (+ flow-dist-pass flow-dist-fail) :passed flow-dist-pass :failed flow-dist-fail :fails flow-dist-fails})) diff --git a/plans/flow-on-sx.md b/plans/flow-on-sx.md index 9e22921a..6b41030c 100644 --- a/plans/flow-on-sx.md +++ b/plans/flow-on-sx.md @@ -16,7 +16,7 @@ federation extension via fed-sx for remote-node execution. ## Status (rolling) -`bash lib/flow/conformance.sh` → **81/81** (Phases 1-3 done; Phase 4 in progress: remote-node done) +`bash lib/flow/conformance.sh` → **87/87** (Phases 1-3 done; Phase 4 in progress: remote-node + failover done) ## Ground rules @@ -120,7 +120,9 @@ lib/flow/spec.sx lib/flow/runtime.sx lib/flow/store.sx the fed-sx boundary, MOCKED via a peer registry (`flow-peer-register!`); raises `flow-remote-unreachable` / `flow-remote-no-fn`. Composes with sequence, suspend, retry. `tests/distributed.sx`, 7 cases. -- [ ] failure semantics — retry on different peer, fall through to local +- [x] failure semantics — `(remote-failover addrs fn local)` tries each peer in + order, moves to the next on any raised error, and runs the `local` node if every + peer fails. 6 tests. - [ ] persistence across instances — flow state replicates via fed-sx - [ ] handoff — flow started here can resume on a peer if the local instance is down - [ ] `lib/flow/tests/distributed.sx` — federated flow scenarios (mock fed-sx in tests)