From e1f802cfff56f17f6b4489c6965ac7e6cf7bbf84 Mon Sep 17 00:00:00 2001 From: giles Date: Sat, 6 Jun 2026 17:40:25 +0000 Subject: [PATCH] flow: remote-node via mock fed-sx transport + 7 tests (Phase 4 begins) (remote-node addr fn) runs a node on a federation peer. Transport is the fed-sx boundary, mocked by a peer registry (flow-peer-register!); raises flow-remote-unreachable / flow-remote-no-fn. Composes with sequence/suspend/retry. Also fixes conformance.sh to load remote.sx before api.sx. Co-Authored-By: Claude Opus 4.8 (1M context) --- lib/flow/api.sx | 15 +++++---- lib/flow/conformance.sh | 2 ++ lib/flow/remote.sx | 22 +++++++++++++ lib/flow/scoreboard.json | 9 ++--- lib/flow/scoreboard.md | 4 ++- lib/flow/tests/distributed.sx | 62 +++++++++++++++++++++++++++++++++++ plans/flow-on-sx.md | 7 ++-- 7 files changed, 108 insertions(+), 13 deletions(-) create mode 100644 lib/flow/remote.sx create mode 100644 lib/flow/tests/distributed.sx diff --git a/lib/flow/api.sx b/lib/flow/api.sx index b7810e60..757bf352 100644 --- a/lib/flow/api.sx +++ b/lib/flow/api.sx @@ -1,8 +1,8 @@ ;; lib/flow/api.sx — flow runtime entry points. ;; -;; Builds a Scheme env preloaded with the flow combinators (lib/flow/spec.sx) and -;; the durable store + lifecycle (lib/flow/store.sx), and provides SX helpers to -;; run flow programs. +;; Builds a Scheme env preloaded with the flow combinators (lib/flow/spec.sx), +;; the durable store + lifecycle (lib/flow/store.sx), and the fed-sx remote layer +;; (lib/flow/remote.sx), and provides SX helpers to run flow programs. ;; ;; Scheme-level API (available inside flow programs): ;; (flow/start flow input) — run a flow; raw result if it completes, else @@ -10,15 +10,17 @@ ;; (flow/resume id value) — resume a suspended flow (store.sx) ;; (flow/cancel id) — cancel a flow (store.sx) ;; (suspend tag) — suspension point (spec.sx) +;; (remote-node addr fn) — node executed on a federation peer (remote.sx) ;; ;; SX-level helpers (for hosts and tests): -;; (flow-make-env) — fresh standard env + combinators + store +;; (flow-make-env) — fresh standard env + combinators + store + remote ;; (flow-run src) — eval a Scheme program string in a reset shared env ;; (flow-run-in env src) — eval a Scheme program string in a given env ;; ;; flow-run reuses ONE env (building the full standard env is expensive) and ;; resets the mutable flow globals before each program, so tests stay isolated -;; without paying for a fresh standard env each time. +;; without paying for a fresh standard env each time. flow-registry persists (it +;; models reloaded flow definitions surviving a restart). (define flow-make-env @@ -28,6 +30,7 @@ ((env (scheme-standard-env))) (flow-load-combinators! env) (flow-load-store! env) + (flow-load-remote! env) env))) (define @@ -36,7 +39,7 @@ (define flow-reset-src - "(set! flow-store (list)) (set! flow-next-id 0) (set! flow-replay-log (list)) (set! flow-suspend-k #f) (set! flow-timeout-budget -1)") + "(set! flow-store (list)) (set! flow-next-id 0) (set! flow-replay-log (list)) (set! flow-suspend-k #f) (set! flow-timeout-budget -1) (set! flow-peers (list))") (define flow-env-cache false) diff --git a/lib/flow/conformance.sh b/lib/flow/conformance.sh index 237b288b..4fd45ee0 100755 --- a/lib/flow/conformance.sh +++ b/lib/flow/conformance.sh @@ -25,6 +25,7 @@ SUITES=( "control flow-ctl-tests-run! lib/flow/tests/control.sx" "suspend flow-sus-tests-run! lib/flow/tests/suspend.sx" "recovery flow-rec-tests-run! lib/flow/tests/recovery.sx" + "distributed flow-dist-tests-run! lib/flow/tests/distributed.sx" ) TMPFILE=$(mktemp); trap "rm -f $TMPFILE" EXIT @@ -42,6 +43,7 @@ emit_eval () { echo "(epoch $EPOCH)"; echo "(eval \"$1\")"; EPOCH=$((EPOCH+1)); emit_load "lib/scheme/runtime.sx" emit_load "lib/flow/spec.sx" emit_load "lib/flow/store.sx" + emit_load "lib/flow/remote.sx" emit_load "lib/flow/api.sx" for SUITE in "${SUITES[@]}"; do read -r _NAME _RUNNER FILE <<< "$SUITE" diff --git a/lib/flow/remote.sx b/lib/flow/remote.sx new file mode 100644 index 00000000..7b42ab44 --- /dev/null +++ b/lib/flow/remote.sx @@ -0,0 +1,22 @@ +;; lib/flow/remote.sx — distributed nodes via fed-sx (Phase 4). +;; +;; A node can execute on a federation peer. The transport is the fed-sx boundary; +;; it is MOCKED in tests by a peer registry mapping addr -> function table. In +;; production flow-transport would issue a fed-sx call; here it dispatches locally. +;; +;; (flow-peer-register! addr table) — register a mock peer. table is a list of +;; (fn-name proc) entries — the functions that peer exposes. +;; (flow-transport addr fn input) — invoke fn on the peer with input. Raises +;; (flow-remote-unreachable) if the addr is unknown, (flow-remote-no-fn) if the +;; peer does not expose fn. +;; (remote-node addr fn) — a node that runs fn on the peer at addr. + +(define + flow-remote-src + "(define flow-peers (list))\n (define (flow-assoc key alist)\n (if (null? alist)\n #f\n (if (eq? (car (car alist)) key) (car (cdr (car alist))) (flow-assoc key (cdr alist)))))\n (define (flow-peer-register! addr table) (set! flow-peers (cons (list addr table) flow-peers)))\n (define (flow-transport addr fn input)\n (let ((table (flow-assoc addr flow-peers)))\n (if table\n (let ((proc (flow-assoc fn table)))\n (if proc (proc input) (raise (quote flow-remote-no-fn))))\n (raise (quote flow-remote-unreachable)))))\n (define (remote-node addr fn) (lambda (input) (flow-transport addr fn input)))") + +(define + flow-load-remote! + (fn + (env) + (begin (scheme-eval-program (scheme-parse-all flow-remote-src) env) env))) diff --git a/lib/flow/scoreboard.json b/lib/flow/scoreboard.json index 63dff0b8..fd71294b 100644 --- a/lib/flow/scoreboard.json +++ b/lib/flow/scoreboard.json @@ -1,12 +1,13 @@ { - "total": 74, - "passed": 74, + "total": 81, + "passed": 81, "failed": 0, "suites": { "basic": { "passed": 18, "total": 18 }, "control": { "passed": 31, "total": 31 }, "suspend": { "passed": 17, "total": 17 }, - "recovery": { "passed": 8, "total": 8 } + "recovery": { "passed": 8, "total": 8 }, + "distributed": { "passed": 7, "total": 7 } }, - "phases": { "phase1": "done", "phase2": "done", "phase3": "done", "phase4": "pending" } + "phases": { "phase1": "done", "phase2": "done", "phase3": "done", "phase4": "in-progress" } } diff --git a/lib/flow/scoreboard.md b/lib/flow/scoreboard.md index e47a695a..45ffe7ea 100644 --- a/lib/flow/scoreboard.md +++ b/lib/flow/scoreboard.md @@ -1,6 +1,6 @@ # flow-on-sx Scoreboard -**All tests pass: 74 / 74 across 4 suites.** +**All tests pass: 81 / 81 across 5 suites.** `bash lib/flow/conformance.sh` @@ -12,6 +12,7 @@ | control | 31 | Phase 2: `branch` (6); error model `fail`/`failed?`/`fail-reason` (6); `try-catch` (6); `retry n` (6); `timeout` cooperative step budget (7) | | suspend | 17 | Phase 3: suspend/resume/cancel via deterministic replay; multi-step, replay determinism, lifecycle guards, suspend-in-branch | | recovery | 8 | Phase 3: crash recovery — store export/import, resumable scan, restart-at-every-step, replay-log survival | +| distributed | 7 | Phase 4: `remote-node` on mock fed-sx peers; compose/suspend/retry, unreachable + no-fn errors | ## Architecture @@ -40,5 +41,6 @@ capture the flow continuation directly. - [x] Phase 1 — Declarative DAG + sequential execution (combinators + 18 tests, `flow/start`) - [x] Phase 2 — Control flow + error handling (branch, error model, try-catch, retry, timeout) - [x] Phase 3 — Suspend/resume (suspend/resume/cancel + crash recovery via deterministic replay) +- [~] Phase 4 — Distributed nodes via fed-sx (remote-node done; failover + handoff next) - [ ] Phase 3 — Suspend / resume (the showcase) - [ ] Phase 4 — Distributed nodes via fed-sx diff --git a/lib/flow/tests/distributed.sx b/lib/flow/tests/distributed.sx new file mode 100644 index 00000000..15ab80d6 --- /dev/null +++ b/lib/flow/tests/distributed.sx @@ -0,0 +1,62 @@ +;; lib/flow/tests/distributed.sx — Phase 4: distributed nodes via fed-sx (mocked). + +(define flow-dist-pass 0) +(define flow-dist-fail 0) +(define flow-dist-fails (list)) + +(define + flow-dist-test + (fn + (name actual expected) + (if + (= actual expected) + (set! flow-dist-pass (+ flow-dist-pass 1)) + (begin + (set! flow-dist-fail (+ flow-dist-fail 1)) + (append! flow-dist-fails {:name name :expected expected :actual actual}))))) + +(define flow-d (fn (src) (flow-run src))) + +;; A mock peer "edge" exposing double/inc, registered at the top of each program. +(define + peer-setup + "(flow-peer-register! (quote edge) (list (list (quote double) (lambda (x) (* x 2))) (list (quote inc) (lambda (x) (+ x 1)))))") + +;; ── remote-node ───────────────────────────────────────────────── +(flow-dist-test + "remote: a node executes on a peer" + (flow-d + "(flow-peer-register! (quote edge) (list (list (quote double) (lambda (x) (* x 2))))) (flow/start (remote-node (quote edge) (quote double)) 21)") + 42) +(flow-dist-test + "remote: remote nodes compose in a sequence" + (flow-d + "(flow-peer-register! (quote edge) (list (list (quote inc) (lambda (x) (+ x 1))) (list (quote double) (lambda (x) (* x 2))))) (flow/start (sequence (remote-node (quote edge) (quote inc)) (remote-node (quote edge) (quote double))) 4)") + 10) +(flow-dist-test + "remote: a remote node mixes with local nodes" + (flow-d + "(flow-peer-register! (quote edge) (list (list (quote double) (lambda (x) (* x 2))))) (flow/start (sequence (lambda (x) (+ x 5)) (remote-node (quote edge) (quote double)) (lambda (x) (- x 1))) 10)") + 29) +(flow-dist-test + "remote: unreachable peer raises flow-remote-unreachable" + (flow-d + "(flow/start (try-catch (remote-node (quote ghost) (quote double)) (lambda (e) e)) 1)") + "flow-remote-unreachable") +(flow-dist-test + "remote: unknown function on a peer raises flow-remote-no-fn" + (flow-d + "(flow-peer-register! (quote edge) (list (list (quote double) (lambda (x) (* x 2))))) (flow/start (try-catch (remote-node (quote edge) (quote missing)) (lambda (e) e)) 1)") + "flow-remote-no-fn") +(flow-dist-test + "remote: a remote node can suspend the flow (peer returns control)" + (flow-d + "(flow-peer-register! (quote edge) (list (list (quote review) (lambda (x) x)))) (flow/start (sequence (remote-node (quote edge) (quote review)) (lambda (x) (suspend (quote human))) (lambda (v) (list (quote published) v))) 7)") + (list "flow-suspended" 1 "human")) +(flow-dist-test + "remote: a transient remote failure is recoverable with retry" + (flow-d + "(define hits 0) (flow-peer-register! (quote edge) (list (list (quote flaky) (lambda (x) (begin (set! hits (+ hits 1)) (if (< hits 2) (raise (quote down)) (* x 3))))))) (list (flow/start (retry 3 (remote-node (quote edge) (quote flaky))) 7) hits)") + (list 21 2)) + +(define flow-dist-tests-run! (fn () {:total (+ flow-dist-pass flow-dist-fail) :passed flow-dist-pass :failed flow-dist-fail :fails flow-dist-fails})) diff --git a/plans/flow-on-sx.md b/plans/flow-on-sx.md index cef6d66c..9e22921a 100644 --- a/plans/flow-on-sx.md +++ b/plans/flow-on-sx.md @@ -16,7 +16,7 @@ federation extension via fed-sx for remote-node execution. ## Status (rolling) -`bash lib/flow/conformance.sh` → **74/74** (Phases 1-3 done; Phase 4 fed-sx next) +`bash lib/flow/conformance.sh` → **81/81** (Phases 1-3 done; Phase 4 in progress: remote-node done) ## Ground rules @@ -116,7 +116,10 @@ lib/flow/spec.sx lib/flow/runtime.sx lib/flow/store.sx ## Phase 4 — Distributed nodes via fed-sx -- [ ] `(remote-node addr fn args)` — execute node on a federation peer +- [x] `(remote-node addr fn)` — execute a node on a federation peer. Transport is + the fed-sx boundary, MOCKED via a peer registry (`flow-peer-register!`); raises + `flow-remote-unreachable` / `flow-remote-no-fn`. Composes with sequence, suspend, + retry. `tests/distributed.sx`, 7 cases. - [ ] failure semantics — retry on different peer, fall through to local - [ ] persistence across instances — flow state replicates via fed-sx - [ ] handoff — flow started here can resume on a peer if the local instance is down