From 91ffba9975dc363ee0ffe66e873eaa3c555a8d2b Mon Sep 17 00:00:00 2001 From: giles Date: Sat, 6 Jun 2026 16:22:22 +0000 Subject: [PATCH] =?UTF-8?q?flow:=20Phase=201=20declarative=20DAG=20?= =?UTF-8?q?=E2=80=94=20sequence/parallel/defflow=20combinators=20+=2018=20?= =?UTF-8?q?tests?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Flow combinators as a Scheme prelude loaded onto scheme-standard-env; a flow is a Scheme procedure input->output, run inside the interpreter (sets up Phase 3 call/cc suspend). flow/start entry point, conformance runner, scoreboard. Co-Authored-By: Claude Opus 4.8 (1M context) --- lib/flow/api.sx | 30 ++++++++++ lib/flow/conformance.sh | 90 +++++++++++++++++++++++++++++ lib/flow/scoreboard.json | 11 ++++ lib/flow/scoreboard.md | 41 +++++++++++++ lib/flow/spec.sx | 29 ++++++++++ lib/flow/tests/basic.sx | 121 +++++++++++++++++++++++++++++++++++++++ plans/flow-on-sx.md | 33 +++++++---- 7 files changed, 343 insertions(+), 12 deletions(-) create mode 100644 lib/flow/api.sx create mode 100755 lib/flow/conformance.sh create mode 100644 lib/flow/scoreboard.json create mode 100644 lib/flow/scoreboard.md create mode 100644 lib/flow/spec.sx create mode 100644 lib/flow/tests/basic.sx diff --git a/lib/flow/api.sx b/lib/flow/api.sx new file mode 100644 index 00000000..dc23514e --- /dev/null +++ b/lib/flow/api.sx @@ -0,0 +1,30 @@ +;; lib/flow/api.sx — flow runtime entry points. +;; +;; Builds a Scheme env preloaded with the flow combinators (lib/flow/spec.sx) +;; plus the public flow API, and provides SX helpers to run flow programs. +;; +;; Scheme-level API (available inside flow programs): +;; (flow/start flow input) — run a flow with the given input, return result +;; +;; SX-level helpers (for hosts and tests): +;; (flow-make-env) — fresh standard env + combinators + api +;; (flow-run src) — eval a Scheme program string in a fresh flow env +;; (flow-run-in env src) — eval a Scheme program string in a given env + +(define flow-api-src "(define (flow/start flow input) (flow input))") + +(define + flow-make-env + (fn + () + (let + ((env (scheme-standard-env))) + (flow-load-combinators! env) + (scheme-eval-program (scheme-parse-all flow-api-src) env) + env))) + +(define + flow-run-in + (fn (env src) (scheme-eval-program (scheme-parse-all src) env))) + +(define flow-run (fn (src) (flow-run-in (flow-make-env) src))) diff --git a/lib/flow/conformance.sh b/lib/flow/conformance.sh new file mode 100755 index 00000000..b0fbab42 --- /dev/null +++ b/lib/flow/conformance.sh @@ -0,0 +1,90 @@ +#!/usr/bin/env bash +# flow-on-sx conformance runner — runs all flow test suites in one sx_server process. +# +# Usage: +# bash lib/flow/conformance.sh # run all suites +# bash lib/flow/conformance.sh -v # verbose (list each suite) + +set -uo pipefail +cd "$(git rev-parse --show-toplevel)" + +SX_SERVER="${SX_SERVER:-hosts/ocaml/_build/default/bin/sx_server.exe}" +if [ ! -x "$SX_SERVER" ]; then + SX_SERVER="/root/rose-ash/hosts/ocaml/_build/default/bin/sx_server.exe" +fi +if [ ! -x "$SX_SERVER" ]; then + echo "ERROR: sx_server.exe not found." >&2 + exit 1 +fi + +VERBOSE="${1:-}" + +# Suites: NAME RUNNER-FN PATH +SUITES=( + "basic flow-basic-tests-run! lib/flow/tests/basic.sx" +) + +TMPFILE=$(mktemp); trap "rm -f $TMPFILE" EXIT +EPOCH=1 + +emit_load () { echo "(epoch $EPOCH)"; echo "(load \"$1\")"; EPOCH=$((EPOCH+1)); } +emit_eval () { echo "(epoch $EPOCH)"; echo "(eval \"$1\")"; EPOCH=$((EPOCH+1)); } + +{ + emit_load "lib/guest/lex.sx" + emit_load "lib/guest/reflective/env.sx" + emit_load "lib/guest/reflective/quoting.sx" + emit_load "lib/scheme/parser.sx" + emit_load "lib/scheme/eval.sx" + emit_load "lib/scheme/runtime.sx" + emit_load "lib/flow/spec.sx" + emit_load "lib/flow/api.sx" + for SUITE in "${SUITES[@]}"; do + read -r _NAME _RUNNER FILE <<< "$SUITE" + emit_load "$FILE" + emit_eval "($_RUNNER)" + done +} > "$TMPFILE" + +OUTPUT=$(timeout 180 "$SX_SERVER" < "$TMPFILE" 2>&1 || true) + +TOTAL_PASS=0 +TOTAL_FAIL=0 +FAILED_SUITES=() + +LAST_DICT_LINES=$(echo "$OUTPUT" | grep -E '^\{:' || true) + +I=0 +while read -r LINE; do + [ -z "$LINE" ] && continue + P=$(echo "$LINE" | grep -oE ':passed [0-9]+' | awk '{print $2}') + F=$(echo "$LINE" | grep -oE ':failed [0-9]+' | awk '{print $2}') + [ -z "$P" ] && P=0 + [ -z "$F" ] && F=0 + SUITE_INFO="${SUITES[$I]}" + SUITE_NAME=$(echo "$SUITE_INFO" | awk '{print $1}') + TOTAL_PASS=$((TOTAL_PASS + P)) + TOTAL_FAIL=$((TOTAL_FAIL + F)) + if [ "$F" -gt 0 ]; then + FAILED_SUITES+=("$SUITE_NAME: $P/$((P+F))") + printf 'X %-12s %d/%d\n' "$SUITE_NAME" "$P" "$((P+F))" + echo "$LINE" | grep -oE ':name "[^"]*"' | sed 's/:name / fail: /' + elif [ "$VERBOSE" = "-v" ]; then + printf 'ok %-12s %d passed\n' "$SUITE_NAME" "$P" + fi + I=$((I+1)) +done <<< "$LAST_DICT_LINES" + +TOTAL=$((TOTAL_PASS + TOTAL_FAIL)) +if [ "$TOTAL" -eq 0 ]; then + echo "ERROR: no suite results parsed. Raw output:" >&2 + echo "$OUTPUT" >&2 + exit 1 +fi +if [ $TOTAL_FAIL -eq 0 ]; then + echo "ok $TOTAL_PASS/$TOTAL flow-on-sx tests passed (${#SUITES[@]} suites)" +else + echo "FAIL $TOTAL_PASS/$TOTAL passed, $TOTAL_FAIL failed:" + for S in "${FAILED_SUITES[@]}"; do echo " $S"; done + exit 1 +fi diff --git a/lib/flow/scoreboard.json b/lib/flow/scoreboard.json new file mode 100644 index 00000000..7fef1c7f --- /dev/null +++ b/lib/flow/scoreboard.json @@ -0,0 +1,11 @@ +{ + "total": 18, + "passed": 18, + "failed": 0, + "suites": { + "basic": { "passed": 18, "total": 18 } + }, + "phases": { + "phase1": "in-progress" + } +} diff --git a/lib/flow/scoreboard.md b/lib/flow/scoreboard.md new file mode 100644 index 00000000..6bf2efeb --- /dev/null +++ b/lib/flow/scoreboard.md @@ -0,0 +1,41 @@ +# flow-on-sx Scoreboard + +**All tests pass: 18 / 18 across 1 suite.** + +`bash lib/flow/conformance.sh` + +## Per-suite breakdown + +| Suite | Passing | Covers | +|-------|--------:|--------| +| basic | 18 | Phase 1: single nodes, linear sequence, data-flow threading, defflow, parallel fan/join, nested composition, publish-shaped flow | + +## Architecture + +Flow combinators are a **Scheme prelude** (`lib/flow/spec.sx`) loaded onto +`scheme-standard-env`. A flow is a Scheme procedure `input -> output`. The whole +flow executes inside the Scheme interpreter, so Phase 3's `suspend` (call/cc) will +capture the flow continuation directly. + +- `lib/flow/spec.sx` — combinators: `flow-node`, `flow-id`, `flow-const`, + `sequence`, `parallel`, `defflow`; `flow-load-combinators!`. +- `lib/flow/api.sx` — `flow/start` (Scheme); `flow-make-env`, `flow-run`, + `flow-run-in` (SX helpers). +- `lib/flow/tests/basic.sx` — 18 cases. +- `lib/flow/conformance.sh` — loads substrate + flow layer, runs suites. + +## Semantics notes + +- **node** = 1-arg Scheme procedure; the upstream value is the argument. A node + ignoring its argument is effectively a thunk. +- **sequence** threads left-to-right; empty sequence = identity. +- **parallel** fans the same input to every branch and joins results into a list. + Evaluation is **sequential** for now; true concurrency arrives in Phase 3. + +## Phases + +- [~] Phase 1 — Declarative DAG + sequential execution (combinators + 18 tests done; + `flow/start` done) +- [ ] Phase 2 — Control flow + error handling +- [ ] Phase 3 — Suspend / resume (the showcase) +- [ ] Phase 4 — Distributed nodes via fed-sx diff --git a/lib/flow/spec.sx b/lib/flow/spec.sx new file mode 100644 index 00000000..35e75150 --- /dev/null +++ b/lib/flow/spec.sx @@ -0,0 +1,29 @@ +;; lib/flow/spec.sx — flow combinators as a Scheme prelude. +;; +;; A flow is a Scheme procedure of one argument: the upstream value. +;; node : input -> output +;; A leaf node ignoring its argument is effectively a thunk. Combinators +;; build composite nodes out of child nodes. The whole flow runs INSIDE the +;; Scheme interpreter so that Phase 3's `suspend` (call/cc) can capture the +;; flow continuation directly. +;; +;; Phase 1 combinators: +;; (flow-node f) — wrap a 1-arg procedure as a node (identity) +;; (flow-id input) — pass the upstream value through unchanged +;; (flow-const v) — node that ignores input and yields v +;; (sequence n ...) — thread input left-to-right through children +;; (parallel n ...) — fan input to every child, join results into a list +;; (SEQUENTIAL evaluation; true concurrency is Phase 3) +;; (defflow name body)— bind a named flow + +(define + flow-combinators-src + "(define (flow-node f) f)\n (define (flow-id input) input)\n (define (flow-const v) (lambda (input) v))\n (define (flow-seq-step ns v)\n (if (null? ns) v (flow-seq-step (cdr ns) ((car ns) v))))\n (define sequence (lambda ns (lambda (input) (flow-seq-step ns input))))\n (define parallel (lambda ns (lambda (input) (map (lambda (n) (n input)) ns))))\n (define-syntax defflow\n (syntax-rules ()\n ((defflow nm body) (define nm body))))") + +(define + flow-load-combinators! + (fn + (env) + (begin + (scheme-eval-program (scheme-parse-all flow-combinators-src) env) + env))) diff --git a/lib/flow/tests/basic.sx b/lib/flow/tests/basic.sx new file mode 100644 index 00000000..da9d8972 --- /dev/null +++ b/lib/flow/tests/basic.sx @@ -0,0 +1,121 @@ +;; lib/flow/tests/basic.sx — Phase 1: declarative DAG + sequential execution. + +(define flow-basic-pass 0) +(define flow-basic-fail 0) +(define flow-basic-fails (list)) + +(define + flow-basic-test + (fn + (name actual expected) + (if + (= actual expected) + (set! flow-basic-pass (+ flow-basic-pass 1)) + (begin + (set! flow-basic-fail (+ flow-basic-fail 1)) + (append! flow-basic-fails {:name name :expected expected :actual actual}))))) + +;; Run a Scheme flow-program string and return its final value. +(define flow-b (fn (src) (flow-run src))) + +;; Scheme strings are boxed as {:scm-string "..."}; unwrap to a host string. +(define flow-bs (fn (src) (get (flow-run src) :scm-string))) + +;; ── single node ───────────────────────────────────────────────── +(flow-basic-test + "node: identity passes input through" + (flow-b "(flow/start flow-id 7)") + 7) +(flow-basic-test + "node: const ignores input" + (flow-b "(flow/start (flow-const 99) 1)") + 99) +(flow-basic-test + "node: bare lambda is a node" + (flow-b "(flow/start (lambda (x) (* x x)) 6)") + 36) + +;; ── linear sequence ───────────────────────────────────────────── +(flow-basic-test + "sequence: empty is identity" + (flow-b "(flow/start (sequence) 42)") + 42) +(flow-basic-test + "sequence: single child" + (flow-b "(flow/start (sequence (lambda (x) (+ x 1))) 41)") + 42) +(flow-basic-test + "sequence: two children thread" + (flow-b + "(flow/start (sequence (lambda (x) (+ x 1)) (lambda (x) (* x 10))) 4)") + 50) +(flow-basic-test + "sequence: three children thread" + (flow-b + "(flow/start (sequence (lambda (x) (+ x 1)) (lambda (x) (* x 2)) (lambda (x) (- x 3))) 5)") + 9) + +;; ── data flow between nodes ───────────────────────────────────── +(flow-basic-test + "data flow: string accumulation" + (flow-bs + "(flow/start (sequence (lambda (s) (string-append s \"-a\")) (lambda (s) (string-append s \"-b\"))) \"x\")") + "x-a-b") +(flow-basic-test + "data flow: list build" + (flow-b + "(flow/start (sequence (lambda (x) (cons x (list))) (lambda (xs) (cons 0 xs))) 7)") + (list 0 7)) + +;; ── defflow ───────────────────────────────────────────────────── +(flow-basic-test + "defflow: names a flow" + (flow-b + "(defflow inc2 (sequence (lambda (x) (+ x 1)) (lambda (x) (+ x 1)))) (flow/start inc2 40)") + 42) +(flow-basic-test + "defflow: reusable" + (flow-b + "(defflow dbl (lambda (x) (* x 2))) (+ (flow/start dbl 3) (flow/start dbl 10))") + 26) + +;; ── parallel (sequential semantics, join into list) ───────────── +(flow-basic-test + "parallel: fans input to all branches" + (flow-b + "(flow/start (parallel (lambda (x) (+ x 1)) (lambda (x) (* x 2)) (lambda (x) (- x 3))) 10)") + (list 11 20 7)) +(flow-basic-test + "parallel: empty joins to empty list" + (flow-b "(flow/start (parallel) 5)") + (list)) +(flow-basic-test + "parallel: single branch" + (flow-b "(flow/start (parallel (lambda (x) (* x x))) 9)") + (list 81)) + +;; ── nested composition ────────────────────────────────────────── +(flow-basic-test + "nested: sequence of sequences" + (flow-b + "(flow/start (sequence (sequence (lambda (x) (+ x 1)) (lambda (x) (+ x 1))) (sequence (lambda (x) (* x 3)))) 0)") + 6) +(flow-basic-test + "nested: parallel inside sequence, join then reduce" + (flow-b + "(flow/start (sequence (parallel (lambda (x) (+ x 1)) (lambda (x) (* x 2))) (lambda (xs) (apply + xs))) 10)") + 31) +(flow-basic-test + "nested: sequence inside parallel branch" + (flow-b + "(flow/start (parallel (sequence (lambda (x) (+ x 1)) (lambda (x) (* x 2))) (lambda (x) x)) 5)") + (list 12 5)) + +;; ── publish-shaped flow (the architecture sketch) ─────────────── +(flow-basic-test + "publish: write -> (review | spell) -> join lengths" + (flow-b + "(defflow publish (sequence (lambda (draft) (string-append draft \"!\")) (parallel (lambda (c) (string-length c)) (lambda (c) (string-length (string-append c \"?\")))))) (flow/start publish \"hi\")") + (list 3 4)) + +(define flow-basic-tests-run! (fn () {:total (+ flow-basic-pass flow-basic-fail) :passed flow-basic-pass :failed flow-basic-fail :fails flow-basic-fails})) diff --git a/plans/flow-on-sx.md b/plans/flow-on-sx.md index 46517942..d0dbf392 100644 --- a/plans/flow-on-sx.md +++ b/plans/flow-on-sx.md @@ -16,7 +16,7 @@ federation extension via fed-sx for remote-node execution. ## Status (rolling) -`bash lib/flow/conformance.sh` → **0/0** (not yet started) +`bash lib/flow/conformance.sh` → **18/18** (Phase 1 in progress) ## Ground rules @@ -62,15 +62,16 @@ lib/flow/spec.sx lib/flow/runtime.sx lib/flow/store.sx ## Phase 1 — Declarative DAG + sequential execution -- [ ] `lib/flow/spec.sx` — `defflow` macro, `sequence` combinator -- [ ] node = Scheme thunk; output threads to next node (data flow) -- [ ] `parallel` combinator (sequential semantics for now — TRUE parallelism in Phase 3) -- [ ] runtime executes a flow synchronously, returns final value -- [ ] `lib/flow/api.sx` — `(flow/start name args)` entry point -- [ ] `lib/flow/tests/basic.sx` — 15+ cases: linear sequence, nested sequences, - data flow between nodes, parallel-with-join -- [ ] `lib/flow/scoreboard.{json,md}` -- [ ] `lib/flow/conformance.sh` +- [x] `lib/flow/spec.sx` — `defflow` macro, `sequence` combinator +- [x] node = Scheme procedure of one arg (upstream value threaded in); output + threads to next node (data flow). A node ignoring its arg is a thunk. +- [x] `parallel` combinator (sequential semantics for now — TRUE parallelism in Phase 3) +- [x] runtime executes a flow synchronously, returns final value +- [x] `lib/flow/api.sx` — `(flow/start flow input)` entry point +- [x] `lib/flow/tests/basic.sx` — 18 cases: single nodes, linear/nested sequence, + data flow between nodes, parallel-with-join, publish-shaped flow +- [x] `lib/flow/scoreboard.{json,md}` +- [x] `lib/flow/conformance.sh` ## Phase 2 — Control flow + error handling @@ -101,8 +102,16 @@ lib/flow/spec.sx lib/flow/runtime.sx lib/flow/store.sx ## Progress log -(loop fills this in) +- **Phase 1 (combinators + sequential runtime).** Flow built as a Scheme prelude + loaded onto `scheme-standard-env`: a flow is a Scheme procedure `input -> output`, + so the whole flow runs inside the interpreter (sets up Phase 3 call/cc suspend). + Combinators `flow-node`/`flow-id`/`flow-const`/`sequence`/`parallel`/`defflow` in + `spec.sx`; `flow/start` + SX helpers (`flow-make-env`/`flow-run`) in `api.sx`. + 18/18 in `tests/basic.sx`. Substrate constraints found: dotted rest params + `(a . rest)` and named `let` are unsupported in `lib/scheme/eval.sx`, so + combinators use `(lambda args ...)` variadics + top-level recursion. Scheme + strings come back boxed as `{:scm-string "..."}` — unwrap with `(get s :scm-string)`. ## Blockers -(loop fills this in) +(none)