From f8722b3b08bd42fa1d2cd330bf8f2a74d854df05 Mon Sep 17 00:00:00 2001 From: giles Date: Sat, 6 Jun 2026 17:44:04 +0000 Subject: [PATCH] =?UTF-8?q?flow:=20remote-failover=20=E2=80=94=20try=20pee?= =?UTF-8?q?rs=20in=20order,=20fall=20through=20to=20local=20+=206=20tests?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit (remote-failover addrs fn local) tries fn on each peer in order, moves to the next on any raised error, and runs the local node if every peer fails. Threads input, composes in sequences. Co-Authored-By: Claude Opus 4.8 (1M context) --- lib/flow/remote.sx | 5 ++++- lib/flow/scoreboard.json | 6 +++--- lib/flow/scoreboard.md | 4 ++-- lib/flow/tests/distributed.sx | 37 ++++++++++++++++++++++++++++++----- plans/flow-on-sx.md | 6 ++++-- 5 files changed, 45 insertions(+), 13 deletions(-) diff --git a/lib/flow/remote.sx b/lib/flow/remote.sx index 7b42ab44..8d998f49 100644 --- a/lib/flow/remote.sx +++ b/lib/flow/remote.sx @@ -10,10 +10,13 @@ ;; (flow-remote-unreachable) if the addr is unknown, (flow-remote-no-fn) if the ;; peer does not expose fn. ;; (remote-node addr fn) — a node that runs fn on the peer at addr. +;; (remote-failover addrs fn local) — try fn on each peer in addrs in order; on a +;; raised error move to the next peer; if every peer fails, run the `local` +;; node as a fallback. Threads the input through unchanged. (define flow-remote-src - "(define flow-peers (list))\n (define (flow-assoc key alist)\n (if (null? alist)\n #f\n (if (eq? (car (car alist)) key) (car (cdr (car alist))) (flow-assoc key (cdr alist)))))\n (define (flow-peer-register! addr table) (set! flow-peers (cons (list addr table) flow-peers)))\n (define (flow-transport addr fn input)\n (let ((table (flow-assoc addr flow-peers)))\n (if table\n (let ((proc (flow-assoc fn table)))\n (if proc (proc input) (raise (quote flow-remote-no-fn))))\n (raise (quote flow-remote-unreachable)))))\n (define (remote-node addr fn) (lambda (input) (flow-transport addr fn input)))") + "(define flow-peers (list))\n (define (flow-assoc key alist)\n (if (null? alist)\n #f\n (if (eq? (car (car alist)) key) (car (cdr (car alist))) (flow-assoc key (cdr alist)))))\n (define (flow-peer-register! addr table) (set! flow-peers (cons (list addr table) flow-peers)))\n (define (flow-transport addr fn input)\n (let ((table (flow-assoc addr flow-peers)))\n (if table\n (let ((proc (flow-assoc fn table)))\n (if proc (proc input) (raise (quote flow-remote-no-fn))))\n (raise (quote flow-remote-unreachable)))))\n (define (remote-node addr fn) (lambda (input) (flow-transport addr fn input)))\n (define (flow-failover-step addrs fn input local)\n (if (null? addrs)\n (local input)\n (guard (e (#t (flow-failover-step (cdr addrs) fn input local)))\n (flow-transport (car addrs) fn input))))\n (define (remote-failover addrs fn local)\n (lambda (input) (flow-failover-step addrs fn input local)))") (define flow-load-remote! diff --git a/lib/flow/scoreboard.json b/lib/flow/scoreboard.json index fd71294b..dcc71985 100644 --- a/lib/flow/scoreboard.json +++ b/lib/flow/scoreboard.json @@ -1,13 +1,13 @@ { - "total": 81, - "passed": 81, + "total": 87, + "passed": 87, "failed": 0, "suites": { "basic": { "passed": 18, "total": 18 }, "control": { "passed": 31, "total": 31 }, "suspend": { "passed": 17, "total": 17 }, "recovery": { "passed": 8, "total": 8 }, - "distributed": { "passed": 7, "total": 7 } + "distributed": { "passed": 13, "total": 13 } }, "phases": { "phase1": "done", "phase2": "done", "phase3": "done", "phase4": "in-progress" } } diff --git a/lib/flow/scoreboard.md b/lib/flow/scoreboard.md index 45ffe7ea..05749e04 100644 --- a/lib/flow/scoreboard.md +++ b/lib/flow/scoreboard.md @@ -1,6 +1,6 @@ # flow-on-sx Scoreboard -**All tests pass: 81 / 81 across 5 suites.** +**All tests pass: 87 / 87 across 5 suites.** `bash lib/flow/conformance.sh` @@ -12,7 +12,7 @@ | control | 31 | Phase 2: `branch` (6); error model `fail`/`failed?`/`fail-reason` (6); `try-catch` (6); `retry n` (6); `timeout` cooperative step budget (7) | | suspend | 17 | Phase 3: suspend/resume/cancel via deterministic replay; multi-step, replay determinism, lifecycle guards, suspend-in-branch | | recovery | 8 | Phase 3: crash recovery — store export/import, resumable scan, restart-at-every-step, replay-log survival | -| distributed | 7 | Phase 4: `remote-node` on mock fed-sx peers; compose/suspend/retry, unreachable + no-fn errors | +| distributed | 13 | Phase 4: `remote-node` on mock fed-sx peers (7); `remote-failover` across peers + local fallback (6) | ## Architecture diff --git a/lib/flow/tests/distributed.sx b/lib/flow/tests/distributed.sx index 15ab80d6..456f1c68 100644 --- a/lib/flow/tests/distributed.sx +++ b/lib/flow/tests/distributed.sx @@ -17,11 +17,6 @@ (define flow-d (fn (src) (flow-run src))) -;; A mock peer "edge" exposing double/inc, registered at the top of each program. -(define - peer-setup - "(flow-peer-register! (quote edge) (list (list (quote double) (lambda (x) (* x 2))) (list (quote inc) (lambda (x) (+ x 1)))))") - ;; ── remote-node ───────────────────────────────────────────────── (flow-dist-test "remote: a node executes on a peer" @@ -59,4 +54,36 @@ "(define hits 0) (flow-peer-register! (quote edge) (list (list (quote flaky) (lambda (x) (begin (set! hits (+ hits 1)) (if (< hits 2) (raise (quote down)) (* x 3))))))) (list (flow/start (retry 3 (remote-node (quote edge) (quote flaky))) 7) hits)") (list 21 2)) +;; ── failover (retry on a different peer, fall through to local) ── +(flow-dist-test + "failover: first reachable peer serves the request" + (flow-d + "(flow-peer-register! (quote p2) (list (list (quote f) (lambda (x) (+ x 100))))) (flow/start (remote-failover (list (quote p2) (quote down)) (quote f) (flow-const (quote local))) 5)") + 105) +(flow-dist-test + "failover: skips an unreachable peer to the next one" + (flow-d + "(flow-peer-register! (quote p2) (list (list (quote f) (lambda (x) (+ x 100))))) (flow/start (remote-failover (list (quote down) (quote p2)) (quote f) (flow-const (quote local))) 5)") + 105) +(flow-dist-test + "failover: skips a peer whose function raises" + (flow-d + "(flow-peer-register! (quote bad) (list (list (quote f) (lambda (x) (raise (quote boom)))))) (flow-peer-register! (quote good) (list (list (quote f) (lambda (x) (* x 10))))) (flow/start (remote-failover (list (quote bad) (quote good)) (quote f) (flow-const 0)) 4)") + 40) +(flow-dist-test + "failover: all peers fail, the local fallback runs" + (flow-d + "(flow/start (remote-failover (list (quote down1) (quote down2)) (quote f) (lambda (x) (* x -1))) 9)") + -9) +(flow-dist-test + "failover: threads the input through to the chosen peer" + (flow-d + "(flow-peer-register! (quote p) (list (list (quote f) (lambda (x) (list (quote got) x))))) (flow/start (sequence (lambda (x) (+ x 1)) (remote-failover (list (quote p)) (quote f) (flow-const 0))) 41)") + (list "got" 42)) +(flow-dist-test + "failover: composes inside a larger sequence" + (flow-d + "(flow-peer-register! (quote p) (list (list (quote f) (lambda (x) (* x 2))))) (flow/start (sequence (remote-failover (list (quote down) (quote p)) (quote f) (flow-const 1)) (lambda (x) (+ x 3))) 5)") + 13) + (define flow-dist-tests-run! (fn () {:total (+ flow-dist-pass flow-dist-fail) :passed flow-dist-pass :failed flow-dist-fail :fails flow-dist-fails})) diff --git a/plans/flow-on-sx.md b/plans/flow-on-sx.md index 9e22921a..6b41030c 100644 --- a/plans/flow-on-sx.md +++ b/plans/flow-on-sx.md @@ -16,7 +16,7 @@ federation extension via fed-sx for remote-node execution. ## Status (rolling) -`bash lib/flow/conformance.sh` → **81/81** (Phases 1-3 done; Phase 4 in progress: remote-node done) +`bash lib/flow/conformance.sh` → **87/87** (Phases 1-3 done; Phase 4 in progress: remote-node + failover done) ## Ground rules @@ -120,7 +120,9 @@ lib/flow/spec.sx lib/flow/runtime.sx lib/flow/store.sx the fed-sx boundary, MOCKED via a peer registry (`flow-peer-register!`); raises `flow-remote-unreachable` / `flow-remote-no-fn`. Composes with sequence, suspend, retry. `tests/distributed.sx`, 7 cases. -- [ ] failure semantics — retry on different peer, fall through to local +- [x] failure semantics — `(remote-failover addrs fn local)` tries each peer in + order, moves to the next on any raised error, and runs the `local` node if every + peer fails. 6 tests. - [ ] persistence across instances — flow state replicates via fed-sx - [ ] handoff — flow started here can resume on a peer if the local instance is down - [ ] `lib/flow/tests/distributed.sx` — federated flow scenarios (mock fed-sx in tests)