diff --git a/lib/flow/conformance.sh b/lib/flow/conformance.sh index b3f5f143..53542736 100755 --- a/lib/flow/conformance.sh +++ b/lib/flow/conformance.sh @@ -27,6 +27,7 @@ SUITES=( "recovery flow-rec-tests-run! lib/flow/tests/recovery.sx" "distributed flow-dist-tests-run! lib/flow/tests/distributed.sx" "api flow-api-tests-run! lib/flow/tests/api.sx" + "combinators flow-cmb-tests-run! lib/flow/tests/combinators.sx" ) TMPFILE=$(mktemp); trap "rm -f $TMPFILE" EXIT diff --git a/lib/flow/scoreboard.json b/lib/flow/scoreboard.json index b8ccffb4..0b50b43b 100644 --- a/lib/flow/scoreboard.json +++ b/lib/flow/scoreboard.json @@ -1,6 +1,6 @@ { - "total": 105, - "passed": 105, + "total": 116, + "passed": 116, "failed": 0, "suites": { "basic": { "passed": 18, "total": 18 }, @@ -8,7 +8,8 @@ "suspend": { "passed": 17, "total": 17 }, "recovery": { "passed": 8, "total": 8 }, "distributed": { "passed": 19, "total": 19 }, - "api": { "passed": 12, "total": 12 } + "api": { "passed": 12, "total": 12 }, + "combinators": { "passed": 11, "total": 11 } }, - "phases": { "phase1": "done", "phase2": "done", "phase3": "done", "phase4": "done", "phase5": "in-progress" } + "phases": { "phase1": "done", "phase2": "done", "phase3": "done", "phase4": "done", "phase5": "done" } } diff --git a/lib/flow/scoreboard.md b/lib/flow/scoreboard.md index c1f0d720..a35e93f9 100644 --- a/lib/flow/scoreboard.md +++ b/lib/flow/scoreboard.md @@ -1,6 +1,6 @@ # flow-on-sx Scoreboard -**All tests pass: 105 / 105 across 6 suites. Phases 1-4 complete; Phase 5 in progress.** +**All tests pass: 116 / 116 across 7 suites. Phases 1-5 complete.** `bash lib/flow/conformance.sh` @@ -14,6 +14,7 @@ | recovery | 8 | Phase 3: crash recovery — store export/import, resumable scan, restart-at-every-step, replay-log survival | | distributed | 19 | Phase 4: `remote-node` (7); `remote-failover` (6); replication + handoff across instances (6) | | api | 12 | Phase 5: introspection — `flow/status`, `flow/result`, `flow/list`, `flow/pending` | +| combinators | 11 | Phase 5: `tap` side-effect, `recover` (fail-value), `map-flow` fan-over-list | ## Architecture @@ -43,6 +44,6 @@ capture the flow continuation directly. - [x] Phase 2 — Control flow + error handling (branch, error model, try-catch, retry, timeout) - [x] Phase 3 — Suspend/resume (suspend/resume/cancel + crash recovery via deterministic replay) - [x] Phase 4 — Distributed nodes via fed-sx (remote-node, failover, replication + handoff) -- [~] Phase 5 — Operational API + combinators (introspection done; tap/recover/map-flow next) +- [x] Phase 5 — Operational API + combinators (introspection, tap, recover, map-flow) - [ ] Phase 3 — Suspend / resume (the showcase) - [ ] Phase 4 — Distributed nodes via fed-sx diff --git a/lib/flow/spec.sx b/lib/flow/spec.sx index 8a610724..c15e9cb3 100644 --- a/lib/flow/spec.sx +++ b/lib/flow/spec.sx @@ -10,9 +10,12 @@ ;; flow-node / flow-id / flow-const / sequence / parallel / defflow ;; defflow both binds the flow and registers it by name (flow-register!, in ;; store.sx) so it can be re-resolved after a process restart. +;; map-flow (Phase 5): run a node over each item of a list input, join results. ;; ;; Phase 2 combinators (flow-control-src): ;; branch / fail / failed? / fail-reason / try-catch / retry / timeout / tick +;; tap (Phase 5): side-effecting pass-through (returns input unchanged). +;; recover (Phase 5): the fail-VALUE counterpart of try-catch. ;; ;; Phase 3 suspend core (flow-suspend-src): ;; The guest Scheme's call/cc is ESCAPE-ONLY (re-invoking a captured k after it @@ -32,11 +35,11 @@ (define flow-combinators-src - "(define (flow-node f) f)\n (define (flow-id input) input)\n (define (flow-const v) (lambda (input) v))\n (define (flow-seq-step ns v)\n (if (null? ns) v (flow-seq-step (cdr ns) ((car ns) v))))\n (define sequence (lambda ns (lambda (input) (flow-seq-step ns input))))\n (define parallel (lambda ns (lambda (input) (map (lambda (n) (n input)) ns))))\n (define-syntax defflow\n (syntax-rules ()\n ((defflow nm body)\n (begin (define nm body) (flow-register! (quote nm) nm)))))") + "(define (flow-node f) f)\n (define (flow-id input) input)\n (define (flow-const v) (lambda (input) v))\n (define (flow-seq-step ns v)\n (if (null? ns) v (flow-seq-step (cdr ns) ((car ns) v))))\n (define sequence (lambda ns (lambda (input) (flow-seq-step ns input))))\n (define parallel (lambda ns (lambda (input) (map (lambda (n) (n input)) ns))))\n (define (map-flow node) (lambda (items) (map node items)))\n (define-syntax defflow\n (syntax-rules ()\n ((defflow nm body)\n (begin (define nm body) (flow-register! (quote nm) nm)))))") (define flow-control-src - "(define (branch pred then else)\n (lambda (input) (if (pred input) (then input) (else input))))\n (define (fail reason) (list (quote flow-fail) reason))\n (define (failed? x) (and (pair? x) (eq? (car x) (quote flow-fail))))\n (define (fail-reason x) (car (cdr x)))\n (define (try-catch node handler)\n (lambda (input) (guard (e (#t (handler e))) (node input))))\n (define (flow-retry-step n node input)\n (guard (e (#t (if (<= n 1) (raise e) (flow-retry-step (- n 1) node input))))\n (node input)))\n (define (retry n node) (lambda (input) (flow-retry-step n node input)))\n (define flow-timeout-budget -1)\n (define (tick)\n (if (< flow-timeout-budget 0)\n 0\n (begin\n (set! flow-timeout-budget (- flow-timeout-budget 1))\n (if (< flow-timeout-budget 0)\n (raise (quote flow-timeout))\n flow-timeout-budget))))\n (define (timeout budget node)\n (lambda (input)\n (let ((saved flow-timeout-budget))\n (set! flow-timeout-budget budget)\n (guard (e (#t (begin (set! flow-timeout-budget saved) (raise e))))\n (let ((result (node input)))\n (set! flow-timeout-budget saved)\n result)))))") + "(define (branch pred then else)\n (lambda (input) (if (pred input) (then input) (else input))))\n (define (fail reason) (list (quote flow-fail) reason))\n (define (failed? x) (and (pair? x) (eq? (car x) (quote flow-fail))))\n (define (fail-reason x) (car (cdr x)))\n (define (recover node handler)\n (lambda (input)\n (let ((r (node input)))\n (if (failed? r) (handler (fail-reason r)) r))))\n (define (tap effect)\n (lambda (input) (begin (effect input) input)))\n (define (try-catch node handler)\n (lambda (input) (guard (e (#t (handler e))) (node input))))\n (define (flow-retry-step n node input)\n (guard (e (#t (if (<= n 1) (raise e) (flow-retry-step (- n 1) node input))))\n (node input)))\n (define (retry n node) (lambda (input) (flow-retry-step n node input)))\n (define flow-timeout-budget -1)\n (define (tick)\n (if (< flow-timeout-budget 0)\n 0\n (begin\n (set! flow-timeout-budget (- flow-timeout-budget 1))\n (if (< flow-timeout-budget 0)\n (raise (quote flow-timeout))\n flow-timeout-budget))))\n (define (timeout budget node)\n (lambda (input)\n (let ((saved flow-timeout-budget))\n (set! flow-timeout-budget budget)\n (guard (e (#t (begin (set! flow-timeout-budget saved) (raise e))))\n (let ((result (node input)))\n (set! flow-timeout-budget saved)\n result)))))") (define flow-suspend-src diff --git a/lib/flow/tests/combinators.sx b/lib/flow/tests/combinators.sx new file mode 100644 index 00000000..467e010c --- /dev/null +++ b/lib/flow/tests/combinators.sx @@ -0,0 +1,77 @@ +;; lib/flow/tests/combinators.sx — Phase 5: combinator library (tap, recover, map-flow). + +(define flow-cmb-pass 0) +(define flow-cmb-fail 0) +(define flow-cmb-fails (list)) + +(define + flow-cmb-test + (fn + (name actual expected) + (if + (= actual expected) + (set! flow-cmb-pass (+ flow-cmb-pass 1)) + (begin + (set! flow-cmb-fail (+ flow-cmb-fail 1)) + (append! flow-cmb-fails {:name name :expected expected :actual actual}))))) + +(define flow-m (fn (src) (flow-run src))) + +;; ── tap (side-effecting pass-through) ─────────────────────────── +(flow-cmb-test + "tap: returns input unchanged" + (flow-m "(flow/start (tap (lambda (x) (* x 999))) 7)") + 7) +(flow-cmb-test + "tap: runs the side effect" + (flow-m + "(define seen 0) (flow/start (tap (lambda (x) (set! seen x))) 42) seen") + 42) +(flow-cmb-test + "tap: value flows on while the effect observes it" + (flow-m + "(define log 0) (flow/start (sequence (lambda (x) (+ x 1)) (tap (lambda (x) (set! log x))) (lambda (x) (* x 2))) 10) (list log (flow/result 1))") + (list 11 22)) + +;; ── recover (fail-value counterpart of try-catch) ─────────────── +(flow-cmb-test + "recover: passes a non-fail value through" + (flow-m "(flow/start (recover (lambda (x) (* x 2)) (lambda (r) -1)) 5)") + 10) +(flow-cmb-test + "recover: handles a fail value via the reason" + (flow-m + "(flow/start (recover (lambda (x) (fail (quote too-small))) (lambda (r) (list (quote recovered) r))) 1)") + (list "recovered" "too-small")) +(flow-cmb-test + "recover: handler can supply a default value" + (flow-m + "(flow/start (sequence (recover (lambda (x) (if (> x 0) x (fail (quote neg))) ) (flow-const 0)) (lambda (x) (* x 10))) -3)") + 0) +(flow-cmb-test + "recover: does not catch raised exceptions (those are try-catch's job)" + (flow-m + "(flow/start (try-catch (recover (lambda (x) (raise (quote boom))) (flow-const 0)) (lambda (e) e)) 1)") + "boom") + +;; ── map-flow (run a node over a list, join) ───────────────────── +(flow-cmb-test + "map-flow: applies the node to each item" + (flow-m "(flow/start (map-flow (lambda (x) (* x x))) (list 1 2 3 4))") + (list 1 4 9 16)) +(flow-cmb-test + "map-flow: empty list joins to empty" + (flow-m "(flow/start (map-flow (lambda (x) (+ x 1))) (list))") + (list)) +(flow-cmb-test + "map-flow: each item runs an independent sub-flow" + (flow-m + "(flow/start (map-flow (sequence (lambda (x) (+ x 1)) (lambda (x) (* x 2)))) (list 0 4 9))") + (list 2 10 20)) +(flow-cmb-test + "map-flow: composes — fan over a list then reduce the join" + (flow-m + "(flow/start (sequence (map-flow (lambda (x) (* x 10))) (lambda (xs) (apply + xs))) (list 1 2 3))") + 60) + +(define flow-cmb-tests-run! (fn () {:total (+ flow-cmb-pass flow-cmb-fail) :passed flow-cmb-pass :failed flow-cmb-fail :fails flow-cmb-fails})) diff --git a/plans/flow-on-sx.md b/plans/flow-on-sx.md index 4c9640b3..0deaaac6 100644 --- a/plans/flow-on-sx.md +++ b/plans/flow-on-sx.md @@ -16,7 +16,7 @@ federation extension via fed-sx for remote-node execution. ## Status (rolling) -`bash lib/flow/conformance.sh` → **105/105** (Phases 1-4 complete; Phase 5 in progress) +`bash lib/flow/conformance.sh` → **116/116** (Phases 1-5 complete) ## Ground rules @@ -141,11 +141,11 @@ something operators and authors actually use. Accumulation, not a rewrite. - [x] introspection API — `flow/status id`, `flow/result id`, `flow/list`, `flow/pending` (operator view of what each suspended flow awaits). 12 tests in `tests/api.sx`. -- [ ] `tap` — side-effecting pass-through node (logging/metrics) that returns input -- [ ] `recover` — complement to try-catch for the fail-VALUE channel: run node; if it +- [x] `tap` — side-effecting pass-through node (logging/metrics) that returns input +- [x] `recover` — complement to try-catch for the fail-VALUE channel: run node; if it yields `(fail ...)`, run a recovery node on the reason -- [ ] `map-flow` — run a flow per item of a list, join results (sequential) -- [ ] `lib/flow/tests/api.sx` — introspection + new combinators +- [x] `map-flow` — run a flow per item of a list, join results (sequential) +- [x] `lib/flow/tests/api.sx` (12) + `lib/flow/tests/combinators.sx` (11) ## Progress log