flow: Phase 1 declarative DAG — sequence/parallel/defflow combinators + 18 tests
Some checks failed
Test, Build, and Deploy / test-build-deploy (push) Failing after 31s
Some checks failed
Test, Build, and Deploy / test-build-deploy (push) Failing after 31s
Flow combinators as a Scheme prelude loaded onto scheme-standard-env; a flow is a Scheme procedure input->output, run inside the interpreter (sets up Phase 3 call/cc suspend). flow/start entry point, conformance runner, scoreboard. Co-Authored-By: Claude Opus 4.8 (1M context) <noreply@anthropic.com>
This commit is contained in:
30
lib/flow/api.sx
Normal file
30
lib/flow/api.sx
Normal file
@@ -0,0 +1,30 @@
|
||||
;; lib/flow/api.sx — flow runtime entry points.
|
||||
;;
|
||||
;; Builds a Scheme env preloaded with the flow combinators (lib/flow/spec.sx)
|
||||
;; plus the public flow API, and provides SX helpers to run flow programs.
|
||||
;;
|
||||
;; Scheme-level API (available inside flow programs):
|
||||
;; (flow/start flow input) — run a flow with the given input, return result
|
||||
;;
|
||||
;; SX-level helpers (for hosts and tests):
|
||||
;; (flow-make-env) — fresh standard env + combinators + api
|
||||
;; (flow-run src) — eval a Scheme program string in a fresh flow env
|
||||
;; (flow-run-in env src) — eval a Scheme program string in a given env
|
||||
|
||||
(define flow-api-src "(define (flow/start flow input) (flow input))")
|
||||
|
||||
(define
|
||||
flow-make-env
|
||||
(fn
|
||||
()
|
||||
(let
|
||||
((env (scheme-standard-env)))
|
||||
(flow-load-combinators! env)
|
||||
(scheme-eval-program (scheme-parse-all flow-api-src) env)
|
||||
env)))
|
||||
|
||||
(define
|
||||
flow-run-in
|
||||
(fn (env src) (scheme-eval-program (scheme-parse-all src) env)))
|
||||
|
||||
(define flow-run (fn (src) (flow-run-in (flow-make-env) src)))
|
||||
90
lib/flow/conformance.sh
Executable file
90
lib/flow/conformance.sh
Executable file
@@ -0,0 +1,90 @@
|
||||
#!/usr/bin/env bash
|
||||
# flow-on-sx conformance runner — runs all flow test suites in one sx_server process.
|
||||
#
|
||||
# Usage:
|
||||
# bash lib/flow/conformance.sh # run all suites
|
||||
# bash lib/flow/conformance.sh -v # verbose (list each suite)
|
||||
|
||||
set -uo pipefail
|
||||
cd "$(git rev-parse --show-toplevel)"
|
||||
|
||||
SX_SERVER="${SX_SERVER:-hosts/ocaml/_build/default/bin/sx_server.exe}"
|
||||
if [ ! -x "$SX_SERVER" ]; then
|
||||
SX_SERVER="/root/rose-ash/hosts/ocaml/_build/default/bin/sx_server.exe"
|
||||
fi
|
||||
if [ ! -x "$SX_SERVER" ]; then
|
||||
echo "ERROR: sx_server.exe not found." >&2
|
||||
exit 1
|
||||
fi
|
||||
|
||||
VERBOSE="${1:-}"
|
||||
|
||||
# Suites: NAME RUNNER-FN PATH
|
||||
SUITES=(
|
||||
"basic flow-basic-tests-run! lib/flow/tests/basic.sx"
|
||||
)
|
||||
|
||||
TMPFILE=$(mktemp); trap "rm -f $TMPFILE" EXIT
|
||||
EPOCH=1
|
||||
|
||||
emit_load () { echo "(epoch $EPOCH)"; echo "(load \"$1\")"; EPOCH=$((EPOCH+1)); }
|
||||
emit_eval () { echo "(epoch $EPOCH)"; echo "(eval \"$1\")"; EPOCH=$((EPOCH+1)); }
|
||||
|
||||
{
|
||||
emit_load "lib/guest/lex.sx"
|
||||
emit_load "lib/guest/reflective/env.sx"
|
||||
emit_load "lib/guest/reflective/quoting.sx"
|
||||
emit_load "lib/scheme/parser.sx"
|
||||
emit_load "lib/scheme/eval.sx"
|
||||
emit_load "lib/scheme/runtime.sx"
|
||||
emit_load "lib/flow/spec.sx"
|
||||
emit_load "lib/flow/api.sx"
|
||||
for SUITE in "${SUITES[@]}"; do
|
||||
read -r _NAME _RUNNER FILE <<< "$SUITE"
|
||||
emit_load "$FILE"
|
||||
emit_eval "($_RUNNER)"
|
||||
done
|
||||
} > "$TMPFILE"
|
||||
|
||||
OUTPUT=$(timeout 180 "$SX_SERVER" < "$TMPFILE" 2>&1 || true)
|
||||
|
||||
TOTAL_PASS=0
|
||||
TOTAL_FAIL=0
|
||||
FAILED_SUITES=()
|
||||
|
||||
LAST_DICT_LINES=$(echo "$OUTPUT" | grep -E '^\{:' || true)
|
||||
|
||||
I=0
|
||||
while read -r LINE; do
|
||||
[ -z "$LINE" ] && continue
|
||||
P=$(echo "$LINE" | grep -oE ':passed [0-9]+' | awk '{print $2}')
|
||||
F=$(echo "$LINE" | grep -oE ':failed [0-9]+' | awk '{print $2}')
|
||||
[ -z "$P" ] && P=0
|
||||
[ -z "$F" ] && F=0
|
||||
SUITE_INFO="${SUITES[$I]}"
|
||||
SUITE_NAME=$(echo "$SUITE_INFO" | awk '{print $1}')
|
||||
TOTAL_PASS=$((TOTAL_PASS + P))
|
||||
TOTAL_FAIL=$((TOTAL_FAIL + F))
|
||||
if [ "$F" -gt 0 ]; then
|
||||
FAILED_SUITES+=("$SUITE_NAME: $P/$((P+F))")
|
||||
printf 'X %-12s %d/%d\n' "$SUITE_NAME" "$P" "$((P+F))"
|
||||
echo "$LINE" | grep -oE ':name "[^"]*"' | sed 's/:name / fail: /'
|
||||
elif [ "$VERBOSE" = "-v" ]; then
|
||||
printf 'ok %-12s %d passed\n' "$SUITE_NAME" "$P"
|
||||
fi
|
||||
I=$((I+1))
|
||||
done <<< "$LAST_DICT_LINES"
|
||||
|
||||
TOTAL=$((TOTAL_PASS + TOTAL_FAIL))
|
||||
if [ "$TOTAL" -eq 0 ]; then
|
||||
echo "ERROR: no suite results parsed. Raw output:" >&2
|
||||
echo "$OUTPUT" >&2
|
||||
exit 1
|
||||
fi
|
||||
if [ $TOTAL_FAIL -eq 0 ]; then
|
||||
echo "ok $TOTAL_PASS/$TOTAL flow-on-sx tests passed (${#SUITES[@]} suites)"
|
||||
else
|
||||
echo "FAIL $TOTAL_PASS/$TOTAL passed, $TOTAL_FAIL failed:"
|
||||
for S in "${FAILED_SUITES[@]}"; do echo " $S"; done
|
||||
exit 1
|
||||
fi
|
||||
11
lib/flow/scoreboard.json
Normal file
11
lib/flow/scoreboard.json
Normal file
@@ -0,0 +1,11 @@
|
||||
{
|
||||
"total": 18,
|
||||
"passed": 18,
|
||||
"failed": 0,
|
||||
"suites": {
|
||||
"basic": { "passed": 18, "total": 18 }
|
||||
},
|
||||
"phases": {
|
||||
"phase1": "in-progress"
|
||||
}
|
||||
}
|
||||
41
lib/flow/scoreboard.md
Normal file
41
lib/flow/scoreboard.md
Normal file
@@ -0,0 +1,41 @@
|
||||
# flow-on-sx Scoreboard
|
||||
|
||||
**All tests pass: 18 / 18 across 1 suite.**
|
||||
|
||||
`bash lib/flow/conformance.sh`
|
||||
|
||||
## Per-suite breakdown
|
||||
|
||||
| Suite | Passing | Covers |
|
||||
|-------|--------:|--------|
|
||||
| basic | 18 | Phase 1: single nodes, linear sequence, data-flow threading, defflow, parallel fan/join, nested composition, publish-shaped flow |
|
||||
|
||||
## Architecture
|
||||
|
||||
Flow combinators are a **Scheme prelude** (`lib/flow/spec.sx`) loaded onto
|
||||
`scheme-standard-env`. A flow is a Scheme procedure `input -> output`. The whole
|
||||
flow executes inside the Scheme interpreter, so Phase 3's `suspend` (call/cc) will
|
||||
capture the flow continuation directly.
|
||||
|
||||
- `lib/flow/spec.sx` — combinators: `flow-node`, `flow-id`, `flow-const`,
|
||||
`sequence`, `parallel`, `defflow`; `flow-load-combinators!`.
|
||||
- `lib/flow/api.sx` — `flow/start` (Scheme); `flow-make-env`, `flow-run`,
|
||||
`flow-run-in` (SX helpers).
|
||||
- `lib/flow/tests/basic.sx` — 18 cases.
|
||||
- `lib/flow/conformance.sh` — loads substrate + flow layer, runs suites.
|
||||
|
||||
## Semantics notes
|
||||
|
||||
- **node** = 1-arg Scheme procedure; the upstream value is the argument. A node
|
||||
ignoring its argument is effectively a thunk.
|
||||
- **sequence** threads left-to-right; empty sequence = identity.
|
||||
- **parallel** fans the same input to every branch and joins results into a list.
|
||||
Evaluation is **sequential** for now; true concurrency arrives in Phase 3.
|
||||
|
||||
## Phases
|
||||
|
||||
- [~] Phase 1 — Declarative DAG + sequential execution (combinators + 18 tests done;
|
||||
`flow/start` done)
|
||||
- [ ] Phase 2 — Control flow + error handling
|
||||
- [ ] Phase 3 — Suspend / resume (the showcase)
|
||||
- [ ] Phase 4 — Distributed nodes via fed-sx
|
||||
29
lib/flow/spec.sx
Normal file
29
lib/flow/spec.sx
Normal file
@@ -0,0 +1,29 @@
|
||||
;; lib/flow/spec.sx — flow combinators as a Scheme prelude.
|
||||
;;
|
||||
;; A flow is a Scheme procedure of one argument: the upstream value.
|
||||
;; node : input -> output
|
||||
;; A leaf node ignoring its argument is effectively a thunk. Combinators
|
||||
;; build composite nodes out of child nodes. The whole flow runs INSIDE the
|
||||
;; Scheme interpreter so that Phase 3's `suspend` (call/cc) can capture the
|
||||
;; flow continuation directly.
|
||||
;;
|
||||
;; Phase 1 combinators:
|
||||
;; (flow-node f) — wrap a 1-arg procedure as a node (identity)
|
||||
;; (flow-id input) — pass the upstream value through unchanged
|
||||
;; (flow-const v) — node that ignores input and yields v
|
||||
;; (sequence n ...) — thread input left-to-right through children
|
||||
;; (parallel n ...) — fan input to every child, join results into a list
|
||||
;; (SEQUENTIAL evaluation; true concurrency is Phase 3)
|
||||
;; (defflow name body)— bind a named flow
|
||||
|
||||
(define
|
||||
flow-combinators-src
|
||||
"(define (flow-node f) f)\n (define (flow-id input) input)\n (define (flow-const v) (lambda (input) v))\n (define (flow-seq-step ns v)\n (if (null? ns) v (flow-seq-step (cdr ns) ((car ns) v))))\n (define sequence (lambda ns (lambda (input) (flow-seq-step ns input))))\n (define parallel (lambda ns (lambda (input) (map (lambda (n) (n input)) ns))))\n (define-syntax defflow\n (syntax-rules ()\n ((defflow nm body) (define nm body))))")
|
||||
|
||||
(define
|
||||
flow-load-combinators!
|
||||
(fn
|
||||
(env)
|
||||
(begin
|
||||
(scheme-eval-program (scheme-parse-all flow-combinators-src) env)
|
||||
env)))
|
||||
121
lib/flow/tests/basic.sx
Normal file
121
lib/flow/tests/basic.sx
Normal file
@@ -0,0 +1,121 @@
|
||||
;; lib/flow/tests/basic.sx — Phase 1: declarative DAG + sequential execution.
|
||||
|
||||
(define flow-basic-pass 0)
|
||||
(define flow-basic-fail 0)
|
||||
(define flow-basic-fails (list))
|
||||
|
||||
(define
|
||||
flow-basic-test
|
||||
(fn
|
||||
(name actual expected)
|
||||
(if
|
||||
(= actual expected)
|
||||
(set! flow-basic-pass (+ flow-basic-pass 1))
|
||||
(begin
|
||||
(set! flow-basic-fail (+ flow-basic-fail 1))
|
||||
(append! flow-basic-fails {:name name :expected expected :actual actual})))))
|
||||
|
||||
;; Run a Scheme flow-program string and return its final value.
|
||||
(define flow-b (fn (src) (flow-run src)))
|
||||
|
||||
;; Scheme strings are boxed as {:scm-string "..."}; unwrap to a host string.
|
||||
(define flow-bs (fn (src) (get (flow-run src) :scm-string)))
|
||||
|
||||
;; ── single node ─────────────────────────────────────────────────
|
||||
(flow-basic-test
|
||||
"node: identity passes input through"
|
||||
(flow-b "(flow/start flow-id 7)")
|
||||
7)
|
||||
(flow-basic-test
|
||||
"node: const ignores input"
|
||||
(flow-b "(flow/start (flow-const 99) 1)")
|
||||
99)
|
||||
(flow-basic-test
|
||||
"node: bare lambda is a node"
|
||||
(flow-b "(flow/start (lambda (x) (* x x)) 6)")
|
||||
36)
|
||||
|
||||
;; ── linear sequence ─────────────────────────────────────────────
|
||||
(flow-basic-test
|
||||
"sequence: empty is identity"
|
||||
(flow-b "(flow/start (sequence) 42)")
|
||||
42)
|
||||
(flow-basic-test
|
||||
"sequence: single child"
|
||||
(flow-b "(flow/start (sequence (lambda (x) (+ x 1))) 41)")
|
||||
42)
|
||||
(flow-basic-test
|
||||
"sequence: two children thread"
|
||||
(flow-b
|
||||
"(flow/start (sequence (lambda (x) (+ x 1)) (lambda (x) (* x 10))) 4)")
|
||||
50)
|
||||
(flow-basic-test
|
||||
"sequence: three children thread"
|
||||
(flow-b
|
||||
"(flow/start (sequence (lambda (x) (+ x 1)) (lambda (x) (* x 2)) (lambda (x) (- x 3))) 5)")
|
||||
9)
|
||||
|
||||
;; ── data flow between nodes ─────────────────────────────────────
|
||||
(flow-basic-test
|
||||
"data flow: string accumulation"
|
||||
(flow-bs
|
||||
"(flow/start (sequence (lambda (s) (string-append s \"-a\")) (lambda (s) (string-append s \"-b\"))) \"x\")")
|
||||
"x-a-b")
|
||||
(flow-basic-test
|
||||
"data flow: list build"
|
||||
(flow-b
|
||||
"(flow/start (sequence (lambda (x) (cons x (list))) (lambda (xs) (cons 0 xs))) 7)")
|
||||
(list 0 7))
|
||||
|
||||
;; ── defflow ─────────────────────────────────────────────────────
|
||||
(flow-basic-test
|
||||
"defflow: names a flow"
|
||||
(flow-b
|
||||
"(defflow inc2 (sequence (lambda (x) (+ x 1)) (lambda (x) (+ x 1)))) (flow/start inc2 40)")
|
||||
42)
|
||||
(flow-basic-test
|
||||
"defflow: reusable"
|
||||
(flow-b
|
||||
"(defflow dbl (lambda (x) (* x 2))) (+ (flow/start dbl 3) (flow/start dbl 10))")
|
||||
26)
|
||||
|
||||
;; ── parallel (sequential semantics, join into list) ─────────────
|
||||
(flow-basic-test
|
||||
"parallel: fans input to all branches"
|
||||
(flow-b
|
||||
"(flow/start (parallel (lambda (x) (+ x 1)) (lambda (x) (* x 2)) (lambda (x) (- x 3))) 10)")
|
||||
(list 11 20 7))
|
||||
(flow-basic-test
|
||||
"parallel: empty joins to empty list"
|
||||
(flow-b "(flow/start (parallel) 5)")
|
||||
(list))
|
||||
(flow-basic-test
|
||||
"parallel: single branch"
|
||||
(flow-b "(flow/start (parallel (lambda (x) (* x x))) 9)")
|
||||
(list 81))
|
||||
|
||||
;; ── nested composition ──────────────────────────────────────────
|
||||
(flow-basic-test
|
||||
"nested: sequence of sequences"
|
||||
(flow-b
|
||||
"(flow/start (sequence (sequence (lambda (x) (+ x 1)) (lambda (x) (+ x 1))) (sequence (lambda (x) (* x 3)))) 0)")
|
||||
6)
|
||||
(flow-basic-test
|
||||
"nested: parallel inside sequence, join then reduce"
|
||||
(flow-b
|
||||
"(flow/start (sequence (parallel (lambda (x) (+ x 1)) (lambda (x) (* x 2))) (lambda (xs) (apply + xs))) 10)")
|
||||
31)
|
||||
(flow-basic-test
|
||||
"nested: sequence inside parallel branch"
|
||||
(flow-b
|
||||
"(flow/start (parallel (sequence (lambda (x) (+ x 1)) (lambda (x) (* x 2))) (lambda (x) x)) 5)")
|
||||
(list 12 5))
|
||||
|
||||
;; ── publish-shaped flow (the architecture sketch) ───────────────
|
||||
(flow-basic-test
|
||||
"publish: write -> (review | spell) -> join lengths"
|
||||
(flow-b
|
||||
"(defflow publish (sequence (lambda (draft) (string-append draft \"!\")) (parallel (lambda (c) (string-length c)) (lambda (c) (string-length (string-append c \"?\")))))) (flow/start publish \"hi\")")
|
||||
(list 3 4))
|
||||
|
||||
(define flow-basic-tests-run! (fn () {:total (+ flow-basic-pass flow-basic-fail) :passed flow-basic-pass :failed flow-basic-fail :fails flow-basic-fails}))
|
||||
@@ -16,7 +16,7 @@ federation extension via fed-sx for remote-node execution.
|
||||
|
||||
## Status (rolling)
|
||||
|
||||
`bash lib/flow/conformance.sh` → **0/0** (not yet started)
|
||||
`bash lib/flow/conformance.sh` → **18/18** (Phase 1 in progress)
|
||||
|
||||
## Ground rules
|
||||
|
||||
@@ -62,15 +62,16 @@ lib/flow/spec.sx lib/flow/runtime.sx lib/flow/store.sx
|
||||
|
||||
## Phase 1 — Declarative DAG + sequential execution
|
||||
|
||||
- [ ] `lib/flow/spec.sx` — `defflow` macro, `sequence` combinator
|
||||
- [ ] node = Scheme thunk; output threads to next node (data flow)
|
||||
- [ ] `parallel` combinator (sequential semantics for now — TRUE parallelism in Phase 3)
|
||||
- [ ] runtime executes a flow synchronously, returns final value
|
||||
- [ ] `lib/flow/api.sx` — `(flow/start name args)` entry point
|
||||
- [ ] `lib/flow/tests/basic.sx` — 15+ cases: linear sequence, nested sequences,
|
||||
data flow between nodes, parallel-with-join
|
||||
- [ ] `lib/flow/scoreboard.{json,md}`
|
||||
- [ ] `lib/flow/conformance.sh`
|
||||
- [x] `lib/flow/spec.sx` — `defflow` macro, `sequence` combinator
|
||||
- [x] node = Scheme procedure of one arg (upstream value threaded in); output
|
||||
threads to next node (data flow). A node ignoring its arg is a thunk.
|
||||
- [x] `parallel` combinator (sequential semantics for now — TRUE parallelism in Phase 3)
|
||||
- [x] runtime executes a flow synchronously, returns final value
|
||||
- [x] `lib/flow/api.sx` — `(flow/start flow input)` entry point
|
||||
- [x] `lib/flow/tests/basic.sx` — 18 cases: single nodes, linear/nested sequence,
|
||||
data flow between nodes, parallel-with-join, publish-shaped flow
|
||||
- [x] `lib/flow/scoreboard.{json,md}`
|
||||
- [x] `lib/flow/conformance.sh`
|
||||
|
||||
## Phase 2 — Control flow + error handling
|
||||
|
||||
@@ -101,8 +102,16 @@ lib/flow/spec.sx lib/flow/runtime.sx lib/flow/store.sx
|
||||
|
||||
## Progress log
|
||||
|
||||
(loop fills this in)
|
||||
- **Phase 1 (combinators + sequential runtime).** Flow built as a Scheme prelude
|
||||
loaded onto `scheme-standard-env`: a flow is a Scheme procedure `input -> output`,
|
||||
so the whole flow runs inside the interpreter (sets up Phase 3 call/cc suspend).
|
||||
Combinators `flow-node`/`flow-id`/`flow-const`/`sequence`/`parallel`/`defflow` in
|
||||
`spec.sx`; `flow/start` + SX helpers (`flow-make-env`/`flow-run`) in `api.sx`.
|
||||
18/18 in `tests/basic.sx`. Substrate constraints found: dotted rest params
|
||||
`(a . rest)` and named `let` are unsupported in `lib/scheme/eval.sx`, so
|
||||
combinators use `(lambda args ...)` variadics + top-level recursion. Scheme
|
||||
strings come back boxed as `{:scm-string "..."}` — unwrap with `(get s :scm-string)`.
|
||||
|
||||
## Blockers
|
||||
|
||||
(loop fills this in)
|
||||
(none)
|
||||
|
||||
Reference in New Issue
Block a user