diff --git a/lib/flow/README.md b/lib/flow/README.md new file mode 100644 index 00000000..9c2dee4c --- /dev/null +++ b/lib/flow/README.md @@ -0,0 +1,141 @@ +# flow — durable DAG workflows on Scheme + +`flow` is a workflow engine for rose-ash: content pipelines (write → review → +publish → federate), scheduled jobs, and multi-step user flows (signup, confirm, +onboard) that **survive process restarts**. It is a thin Scheme prelude over the +Scheme-on-SX guest (`lib/scheme/`); a flow runs *inside* the interpreter. + +Run the suite: `bash lib/flow/conformance.sh` → **151/151 across 10 suites**. + +## Model + +A **flow** is just a Scheme procedure of one argument — the upstream value: + +``` +node : input -> output +``` + +Combinators build composite nodes out of child nodes. A node that ignores its +argument is effectively a thunk. There is no separate "graph" object: composition +*is* function composition, so flows are values you can name, pass, and nest. + +```scheme +(defflow publish + (sequence + (lambda (draft) (string-append draft "!")) + (branch (lambda (post) (>= (string-length post) 3)) + (remote-node 'fed 'publish) + (flow-const 'rejected)))) + +(flow/start publish "hello") ; => federated, or a (flow-suspended id tag) state +``` + +## Building blocks (`spec.sx`) + +| Combinator | Meaning | +|---|---| +| `(flow-node f)` / `(flow-id x)` / `(flow-const v)` | leaf nodes | +| `(sequence n ...)` | thread input left-to-right | +| `(parallel n ...)` | fan input to every child, join results into a list (sequential eval) | +| `(map-flow node)` | run `node` over each item of a list input, join results | +| `(flow-while pred body max)` / `(flow-until ...)` | bounded iteration (cap `max` steps) | +| `(defflow name body)` | bind + register a named flow (so it survives restart) | + +## Control flow + errors (`spec.sx`) + +| Combinator | Meaning | +|---|---| +| `(branch pred then else)` | `pred` on input selects `then`/`else` (`cond` is a Scheme special form) | +| `(retry n node)` | re-run on a *raised exception*, up to `n` attempts | +| `(timeout budget node)` | cooperative **step budget**: nodes call `(tick)`; the `(budget+1)`-th tick raises `flow-timeout` | +| `(try-catch node handler)` | catch a raised exception → `(handler error)` | +| `(fail reason)` / `(failed? x)` / `(fail-reason x)` | explicit failure *values* (flow downstream as data) | +| `(recover node handler)` | the fail-VALUE counterpart of try-catch | +| `(attempt n ...)` | railway sequence: stop at the first node returning a `(fail ...)` | +| `(tap effect)` | run a side effect, return input unchanged | + +**Two error channels, on purpose.** Raised exceptions are for *bugs/transients* +(caught by `retry`/`try-catch`). `(fail reason)` values are for *expected business +outcomes* (validation rejected, declined) and compose via `attempt`/`recover`. + +## Suspend / resume — the durable core (`spec.sx`, `store.sx`) + +The guest Scheme's `call/cc` is **escape-only** — re-invoking a captured +continuation after it returns *hangs* the runtime. So flow does **not** serialize +continuations. Instead it uses **deterministic replay**: + +- `(suspend tag)` — if `tag` is already in the replay log, return its logged value; + otherwise escape to the driver as `(flow-suspended tag)`. +- `resume` appends `(tag value)` to the log and **re-runs the flow from the start**. + Already-resolved suspends replay their values; the first unresolved one escapes + again (or the flow completes). + +The entire persisted state is the replay log — plain data. No live continuation is +ever stored, so flows survive process restarts and even moves between instances. + +> **Author contract:** suspend `tag`s must be unique and deterministic across +> replays, and **all** non-determinism / side effects must go through suspend +> points (so their results are logged) — otherwise they re-run on every replay. + +### Lifecycle (`store.sx`) + +```scheme +(flow/start flow input) ; raw result if it completes, else (flow-suspended id tag) +(flow/resume id value) ; inject value at the waiting tag, continue +(flow/cancel id) ; terminate; a later resume is rejected +``` + +### Introspection & hygiene + +```scheme +(flow/status id) ; done | suspended | cancelled | unknown +(flow/result id) ; result if done, else (flow-error reason) +(flow/list) ; ((id status) ...) +(flow/pending) ; ((id waiting-tag) ...) — what each suspended flow awaits +(flow/gc) ; drop terminal records, keep live ones; returns count removed +(flow/forget id) ; drop one terminal record (refuses live flows) +``` + +### Crash recovery + +```scheme +(flow-store-export) ; the store as plain data (live procs nulled) +(flow-store-import! d) ; restore the store from exported data +(flow-resumable-ids) ; ids of suspended flows to wake on restart +``` + +On restart the flow definitions are reloaded (`defflow` re-registers names) and the +exported store reimported; `resume` re-resolves each flow's procedure **by name**. + +## Distribution via fed-sx (`remote.sx`) + +```scheme +(flow-peer-register! addr table) ; mock a peer's exposed functions (fed-sx boundary) +(remote-node addr fn) ; run a node on a peer +(remote-failover addrs fn local) ; try peers in order, fall through to a local node +(flow-replicate-to addr) ; copy this store to a peer's replica slot +(flow-restore-from addr) ; import a peer's replica (handoff) +``` + +**Handoff** is crash recovery across instances: replicate → local instance dies → +peer restores the (plain-data) store and resumes. The replay log carries over, so +all resolved suspends survive the move. + +## Files + +| File | Contents | +|---|---| +| `spec.sx` | combinators (flow-combinators-src / flow-control-src / flow-suspend-src) | +| `store.sx` | durable store, lifecycle, crash recovery, introspection, hygiene | +| `remote.sx` | fed-sx transport (mock peer registry), failover, replication | +| `api.sx` | `flow-make-env` / `flow-run` SX helpers (one cached env, per-test reset) | +| `tests/*.sx` | 10 suites, 151 cases | +| `conformance.sh` | loads substrate + flow layer, runs every suite | + +## Notes on the substrate + +The guest Scheme (`lib/scheme/`, imported read-only) lacks dotted-rest params +`(a . rest)` and named `let`; combinators use `(lambda args ...)` variadics + top- +level recursion. `cons` is list-only (no dotted pairs), so log/assoc entries are +2-element lists. Strings box as `{:scm-string "..."}`. Timeout is a step budget +because there is no wall clock; `parallel` is sequential for the same reason. diff --git a/lib/flow/api.sx b/lib/flow/api.sx new file mode 100644 index 00000000..b6feca69 --- /dev/null +++ b/lib/flow/api.sx @@ -0,0 +1,65 @@ +;; lib/flow/api.sx — flow runtime entry points. +;; +;; Builds a Scheme env preloaded with the flow combinators (lib/flow/spec.sx), +;; the durable store + lifecycle (lib/flow/store.sx), the fed-sx remote layer +;; (lib/flow/remote.sx), and the host integration ABI (lib/flow/host.sx), and +;; provides SX helpers to run flow programs. +;; +;; Scheme-level API (available inside flow programs): +;; (flow/start flow input) — run a flow; raw result if it completes, else +;; (flow-suspended id tag). Defined in store.sx. +;; (flow/resume id value) — resume a suspended flow (store.sx) +;; (flow/cancel id) — cancel a flow (store.sx) +;; (suspend tag) — suspension point (spec.sx) +;; (request kind payload) — host request envelope over suspend (host.sx) +;; (remote-node addr fn) — node executed on a federation peer (remote.sx) +;; +;; SX-level helpers (for hosts and tests): +;; (flow-make-env) — fresh standard env + combinators + store + remote + host +;; (flow-run src) — eval a Scheme program string in a reset shared env +;; (flow-run-in env src) — eval a Scheme program string in a given env +;; +;; flow-run reuses ONE env (building the full standard env is expensive) and +;; resets the mutable flow globals before each program, so tests stay isolated +;; without paying for a fresh standard env each time. flow-registry persists (it +;; models reloaded flow definitions surviving a restart). + +(define + flow-make-env + (fn + () + (let + ((env (scheme-standard-env))) + (flow-load-combinators! env) + (flow-load-store! env) + (flow-load-remote! env) + (flow-load-host! env) + env))) + +(define + flow-run-in + (fn (env src) (scheme-eval-program (scheme-parse-all src) env))) + +(define + flow-reset-src + "(set! flow-store (list)) (set! flow-next-id 0) (set! flow-replay-log (list)) (set! flow-suspend-k #f) (set! flow-timeout-budget -1) (set! flow-peers (list)) (set! flow-replicas (list))") + +(define flow-env-cache false) + +(define + flow-shared-env + (fn + () + (begin + (if flow-env-cache nil (set! flow-env-cache (flow-make-env))) + flow-env-cache))) + +(define + flow-run + (fn + (src) + (let + ((env (flow-shared-env))) + (begin + (scheme-eval-program (scheme-parse-all flow-reset-src) env) + (scheme-eval-program (scheme-parse-all src) env))))) diff --git a/lib/flow/conformance.sh b/lib/flow/conformance.sh new file mode 100755 index 00000000..31a1d860 --- /dev/null +++ b/lib/flow/conformance.sh @@ -0,0 +1,103 @@ +#!/usr/bin/env bash +# flow-on-sx conformance runner — runs all flow test suites in one sx_server process. +# +# Usage: +# bash lib/flow/conformance.sh # run all suites +# bash lib/flow/conformance.sh -v # verbose (list each suite) + +set -uo pipefail +cd "$(git rev-parse --show-toplevel)" + +SX_SERVER="${SX_SERVER:-hosts/ocaml/_build/default/bin/sx_server.exe}" +if [ ! -x "$SX_SERVER" ]; then + SX_SERVER="/root/rose-ash/hosts/ocaml/_build/default/bin/sx_server.exe" +fi +if [ ! -x "$SX_SERVER" ]; then + echo "ERROR: sx_server.exe not found." >&2 + exit 1 +fi + +VERBOSE="${1:-}" + +# Suites: NAME RUNNER-FN PATH +SUITES=( + "basic flow-basic-tests-run! lib/flow/tests/basic.sx" + "control flow-ctl-tests-run! lib/flow/tests/control.sx" + "suspend flow-sus-tests-run! lib/flow/tests/suspend.sx" + "recovery flow-rec-tests-run! lib/flow/tests/recovery.sx" + "distributed flow-dist-tests-run! lib/flow/tests/distributed.sx" + "api flow-api-tests-run! lib/flow/tests/api.sx" + "combinators flow-cmb-tests-run! lib/flow/tests/combinators.sx" + "railway flow-rail-tests-run! lib/flow/tests/railway.sx" + "integration flow-int-tests-run! lib/flow/tests/integration.sx" + "hygiene flow-hyg-tests-run! lib/flow/tests/hygiene.sx" + "host flow-hst-tests-run! lib/flow/tests/host.sx" +) + +TMPFILE=$(mktemp); trap "rm -f $TMPFILE" EXIT +EPOCH=1 + +emit_load () { echo "(epoch $EPOCH)"; echo "(load \"$1\")"; EPOCH=$((EPOCH+1)); } +emit_eval () { echo "(epoch $EPOCH)"; echo "(eval \"$1\")"; EPOCH=$((EPOCH+1)); } + +{ + emit_load "lib/guest/lex.sx" + emit_load "lib/guest/reflective/env.sx" + emit_load "lib/guest/reflective/quoting.sx" + emit_load "lib/scheme/parser.sx" + emit_load "lib/scheme/eval.sx" + emit_load "lib/scheme/runtime.sx" + emit_load "lib/flow/spec.sx" + emit_load "lib/flow/store.sx" + emit_load "lib/flow/remote.sx" + emit_load "lib/flow/host.sx" + emit_load "lib/flow/api.sx" + for SUITE in "${SUITES[@]}"; do + read -r _NAME _RUNNER FILE <<< "$SUITE" + emit_load "$FILE" + emit_eval "($_RUNNER)" + done +} > "$TMPFILE" + +OUTPUT=$(timeout 540 "$SX_SERVER" < "$TMPFILE" 2>&1 || true) + +TOTAL_PASS=0 +TOTAL_FAIL=0 +FAILED_SUITES=() + +LAST_DICT_LINES=$(echo "$OUTPUT" | grep -E '^\{:' || true) + +I=0 +while read -r LINE; do + [ -z "$LINE" ] && continue + P=$(echo "$LINE" | grep -oE ':passed [0-9]+' | awk '{print $2}') + F=$(echo "$LINE" | grep -oE ':failed [0-9]+' | awk '{print $2}') + [ -z "$P" ] && P=0 + [ -z "$F" ] && F=0 + SUITE_INFO="${SUITES[$I]}" + SUITE_NAME=$(echo "$SUITE_INFO" | awk '{print $1}') + TOTAL_PASS=$((TOTAL_PASS + P)) + TOTAL_FAIL=$((TOTAL_FAIL + F)) + if [ "$F" -gt 0 ]; then + FAILED_SUITES+=("$SUITE_NAME: $P/$((P+F))") + printf 'X %-12s %d/%d\n' "$SUITE_NAME" "$P" "$((P+F))" + echo "$LINE" | grep -oE ':name "[^"]*"' | sed 's/:name / fail: /' + elif [ "$VERBOSE" = "-v" ]; then + printf 'ok %-12s %d passed\n' "$SUITE_NAME" "$P" + fi + I=$((I+1)) +done <<< "$LAST_DICT_LINES" + +TOTAL=$((TOTAL_PASS + TOTAL_FAIL)) +if [ "$TOTAL" -eq 0 ]; then + echo "ERROR: no suite results parsed. Raw output:" >&2 + echo "$OUTPUT" >&2 + exit 1 +fi +if [ $TOTAL_FAIL -eq 0 ]; then + echo "ok $TOTAL_PASS/$TOTAL flow-on-sx tests passed (${#SUITES[@]} suites)" +else + echo "FAIL $TOTAL_PASS/$TOTAL passed, $TOTAL_FAIL failed:" + for S in "${FAILED_SUITES[@]}"; do echo " $S"; done + exit 1 +fi diff --git a/lib/flow/host.sx b/lib/flow/host.sx new file mode 100644 index 00000000..da92ac7a --- /dev/null +++ b/lib/flow/host.sx @@ -0,0 +1,42 @@ +;; lib/flow/host.sx — the host integration ABI (Phase 8). +;; +;; `suspend` is flow's seam to the outside world, but a bare (suspend tag) is just a +;; signal — every author would invent their own tag shape. This layer defines a +;; stable request/response contract so a host (e.g. an art-dag driver, or a human +;; review UI) can hook in WITHOUT reverse-engineering ad-hoc tags. +;; +;; A flow asks the host to do something and waits for the answer: +;; (request kind payload) — suspend with a typed envelope (flow-request kind +;; payload); evaluates to the host's resume value. +;; (await-human prompt) — request kind=human (a decision point) +;; (await-render recipe) — request kind=render (e.g. an art-dag job) +;; (await-effect kind p) — request of an arbitrary kind +;; +;; The host drives flows by polling its work queue and resuming: +;; (flow-host-requests) — ((id kind payload) ...) for every SUSPENDED flow whose +;; waiting tag is a host request. The host dispatches by kind (render -> submit a +;; Celery job; human -> show UI), then calls (flow/resume id answer). +;; (request? tag) / (request-kind tag) / (request-payload tag) — parse one tag. +;; +;; Reference driver — the host only supplies `dispatch`, a (kind payload) -> answer: +;; (flow-drive-host dispatch) — one tick: service every CURRENTLY pending +;; request (snapshot), resuming each with (dispatch kind payload); returns the +;; count serviced. Resumes may create new requests — serviced on the next tick. +;; (flow-run-host dispatch maxticks) — tick until quiescent (no pending requests) +;; or maxticks reached; returns total requests serviced. Bounded for determinism. +;; +;; Contract: the host owns IO and persistence. flow stays deterministic — a flow +;; never performs IO itself, it only `request`s; the host performs the effect and +;; feeds the result back via resume (which the replay log records, so the effect is +;; not re-run on recovery). Persist with flow-store-export after each transition and +;; flow-store-import! on boot. + +(define + flow-host-src + "(define (request kind payload) (suspend (list (quote flow-request) kind payload)))\n (define (request? tag) (and (pair? tag) (eq? (car tag) (quote flow-request))))\n (define (request-kind tag) (car (cdr tag)))\n (define (request-payload tag) (car (cdr (cdr tag))))\n (define (await-human prompt) (request (quote human) prompt))\n (define (await-render recipe) (request (quote render) recipe))\n (define (await-effect kind payload) (request kind payload))\n (define (flow-host-req-step pend)\n (if (null? pend)\n (list)\n (let ((id (car (car pend))) (tag (car (cdr (car pend)))))\n (if (request? tag)\n (cons (list id (request-kind tag) (request-payload tag))\n (flow-host-req-step (cdr pend)))\n (flow-host-req-step (cdr pend))))))\n (define (flow-host-requests) (flow-host-req-step (flow/pending)))\n (define (flow-drive-host-step reqs dispatch)\n (if (null? reqs)\n 0\n (begin\n (flow/resume (car (car reqs)) (dispatch (car (cdr (car reqs))) (car (cdr (cdr (car reqs))))))\n (+ 1 (flow-drive-host-step (cdr reqs) dispatch)))))\n (define (flow-drive-host dispatch) (flow-drive-host-step (flow-host-requests) dispatch))\n (define (flow-run-host dispatch maxticks)\n (if (<= maxticks 0)\n 0\n (let ((n (flow-drive-host dispatch)))\n (if (= n 0) 0 (+ n (flow-run-host dispatch (- maxticks 1)))))))") + +(define + flow-load-host! + (fn + (env) + (begin (scheme-eval-program (scheme-parse-all flow-host-src) env) env))) diff --git a/lib/flow/remote.sx b/lib/flow/remote.sx new file mode 100644 index 00000000..2ddc6a1a --- /dev/null +++ b/lib/flow/remote.sx @@ -0,0 +1,34 @@ +;; lib/flow/remote.sx — distributed nodes via fed-sx (Phase 4). +;; +;; A node can execute on a federation peer. The transport is the fed-sx boundary; +;; it is MOCKED in tests by a peer registry mapping addr -> function table. In +;; production flow-transport would issue a fed-sx call; here it dispatches locally. +;; +;; (flow-peer-register! addr table) — register a mock peer. table is a list of +;; (fn-name proc) entries — the functions that peer exposes. +;; (flow-transport addr fn input) — invoke fn on the peer with input. Raises +;; (flow-remote-unreachable) if the addr is unknown, (flow-remote-no-fn) if the +;; peer does not expose fn. +;; (remote-node addr fn) — a node that runs fn on the peer at addr. +;; (remote-failover addrs fn local) — try fn on each peer in addrs in order; on a +;; raised error move to the next peer; if every peer fails, run the `local` +;; node as a fallback. +;; +;; Persistence across instances + handoff. Each instance runs the same flow +;; definitions, so the only thing that needs to cross the wire is the (plain-data) +;; store — exactly flow-store-export from store.sx. Replication pushes that export +;; to a peer's replica slot; handoff = restore the replica on the peer and resume. +;; +;; (flow-replicate-to addr) — copy this instance's store to peer addr's replica +;; (flow-restore-from addr) — import the replica from peer addr (#t / #f) +;; (flow-replica-get addr) — the raw replicated store at addr (or #f) + +(define + flow-remote-src + "(define flow-peers (list))\n (define (flow-assoc key alist)\n (if (null? alist)\n #f\n (if (eq? (car (car alist)) key) (car (cdr (car alist))) (flow-assoc key (cdr alist)))))\n (define (flow-peer-register! addr table) (set! flow-peers (cons (list addr table) flow-peers)))\n (define (flow-transport addr fn input)\n (let ((table (flow-assoc addr flow-peers)))\n (if table\n (let ((proc (flow-assoc fn table)))\n (if proc (proc input) (raise (quote flow-remote-no-fn))))\n (raise (quote flow-remote-unreachable)))))\n (define (remote-node addr fn) (lambda (input) (flow-transport addr fn input)))\n (define (flow-failover-step addrs fn input local)\n (if (null? addrs)\n (local input)\n (guard (e (#t (flow-failover-step (cdr addrs) fn input local)))\n (flow-transport (car addrs) fn input))))\n (define (remote-failover addrs fn local)\n (lambda (input) (flow-failover-step addrs fn input local)))\n\n (define flow-replicas (list))\n (define (flow-replicas-remove addr reps)\n (if (null? reps)\n (list)\n (if (eq? (car (car reps)) addr)\n (flow-replicas-remove addr (cdr reps))\n (cons (car reps) (flow-replicas-remove addr (cdr reps))))))\n (define (flow-replicate-to addr)\n (set! flow-replicas (cons (list addr (flow-store-export)) (flow-replicas-remove addr flow-replicas))))\n (define (flow-replica-get addr) (flow-assoc addr flow-replicas))\n (define (flow-restore-from addr)\n (let ((data (flow-replica-get addr)))\n (if data (begin (flow-store-import! data) #t) #f)))") + +(define + flow-load-remote! + (fn + (env) + (begin (scheme-eval-program (scheme-parse-all flow-remote-src) env) env))) diff --git a/lib/flow/scoreboard.json b/lib/flow/scoreboard.json new file mode 100644 index 00000000..5229b185 --- /dev/null +++ b/lib/flow/scoreboard.json @@ -0,0 +1,19 @@ +{ + "total": 166, + "passed": 166, + "failed": 0, + "suites": { + "basic": { "passed": 18, "total": 18 }, + "control": { "passed": 31, "total": 31 }, + "suspend": { "passed": 17, "total": 17 }, + "recovery": { "passed": 8, "total": 8 }, + "distributed": { "passed": 19, "total": 19 }, + "api": { "passed": 12, "total": 12 }, + "combinators": { "passed": 17, "total": 17 }, + "railway": { "passed": 10, "total": 10 }, + "integration": { "passed": 10, "total": 10 }, + "hygiene": { "passed": 9, "total": 9 }, + "host": { "passed": 15, "total": 15 } + }, + "phases": { "phase1": "done", "phase2": "done", "phase3": "done", "phase4": "done", "phase5": "done", "phase6": "done", "phase7": "done", "phase8": "done" } +} diff --git a/lib/flow/scoreboard.md b/lib/flow/scoreboard.md new file mode 100644 index 00000000..70afaeee --- /dev/null +++ b/lib/flow/scoreboard.md @@ -0,0 +1,53 @@ +# flow-on-sx Scoreboard + +**All tests pass: 166 / 166 across 11 suites. Phases 1-8 complete.** + +`bash lib/flow/conformance.sh` + +## Per-suite breakdown + +| Suite | Passing | Covers | +|-------|--------:|--------| +| basic | 18 | Phase 1: single nodes, linear sequence, data-flow threading, defflow, parallel fan/join, nested composition, publish-shaped flow | +| control | 31 | Phase 2: `branch` (6); error model `fail`/`failed?`/`fail-reason` (6); `try-catch` (6); `retry n` (6); `timeout` cooperative step budget (7) | +| suspend | 17 | Phase 3: suspend/resume/cancel via deterministic replay; multi-step, replay determinism, lifecycle guards, suspend-in-branch | +| recovery | 8 | Phase 3: crash recovery — store export/import, resumable scan, restart-at-every-step, replay-log survival | +| distributed | 19 | Phase 4: `remote-node` (7); `remote-failover` (6); replication + handoff across instances (6) | +| api | 12 | Phase 5: introspection — `flow/status`, `flow/result`, `flow/list`, `flow/pending` | +| combinators | 17 | Phase 5: `tap`, `recover` (fail-value), `map-flow` fan-over-list, `flow-while`/`flow-until` bounded iteration | +| railway | 10 | Phase 6: `attempt` — fail-value short-circuiting sequence + recover rejoin | +| integration | 10 | Phase 7: end-to-end order + onboarding flows composing every phase (suspend, branch, federation, crash recovery, handoff, introspection) | +| hygiene | 9 | Phase 5: `flow/gc` (prune terminal flows), `flow/forget` (drop one terminal record) | +| host | 15 | Phase 8: host ABI — `request`/`await-human`/`await-render`, `flow-host-requests` queue, `flow-run-host` reference driver; art-dag-shaped render→review→publish loop | + +## Architecture + +Flow combinators are a **Scheme prelude** (`lib/flow/spec.sx`) loaded onto +`scheme-standard-env`. A flow is a Scheme procedure `input -> output`. The whole +flow executes inside the Scheme interpreter, so Phase 3's `suspend` (call/cc) will +capture the flow continuation directly. + +- `lib/flow/spec.sx` — combinators: `flow-node`, `flow-id`, `flow-const`, + `sequence`, `parallel`, `defflow`; `flow-load-combinators!`. +- `lib/flow/api.sx` — `flow/start` (Scheme); `flow-make-env`, `flow-run`, + `flow-run-in` (SX helpers). +- `lib/flow/tests/basic.sx` — 18 cases. +- `lib/flow/conformance.sh` — loads substrate + flow layer, runs suites. + +## Semantics notes + +- **node** = 1-arg Scheme procedure; the upstream value is the argument. A node + ignoring its argument is effectively a thunk. +- **sequence** threads left-to-right; empty sequence = identity. +- **parallel** fans the same input to every branch and joins results into a list. + Evaluation is **sequential** for now; true concurrency arrives in Phase 3. + +## Phases + +- [x] Phase 1 — Declarative DAG + sequential execution (combinators + 18 tests, `flow/start`) +- [x] Phase 2 — Control flow + error handling (branch, error model, try-catch, retry, timeout) +- [x] Phase 3 — Suspend/resume (suspend/resume/cancel + crash recovery via deterministic replay) +- [x] Phase 4 — Distributed nodes via fed-sx (remote-node, failover, replication + handoff) +- [x] Phase 5 — Operational API + combinators (introspection, tap, recover, map-flow) +- [ ] Phase 3 — Suspend / resume (the showcase) +- [ ] Phase 4 — Distributed nodes via fed-sx diff --git a/lib/flow/spec.sx b/lib/flow/spec.sx new file mode 100644 index 00000000..2bfd5c56 --- /dev/null +++ b/lib/flow/spec.sx @@ -0,0 +1,61 @@ +;; lib/flow/spec.sx — flow combinators as a Scheme prelude. +;; +;; A flow is a Scheme procedure of one argument: the upstream value. +;; node : input -> output +;; A leaf node ignoring its argument is effectively a thunk. Combinators +;; build composite nodes out of child nodes. The whole flow runs INSIDE the +;; Scheme interpreter. +;; +;; Phase 1 combinators (flow-combinators-src): +;; flow-node / flow-id / flow-const / sequence / parallel / defflow +;; defflow both binds the flow and registers it by name (flow-register!, in +;; store.sx) so it can be re-resolved after a process restart. +;; map-flow (Phase 5): run a node over each item of a list input, join results. +;; flow-while / flow-until (Phase 5): bounded iteration — re-run body, threading +;; the value, while/until pred holds, up to `max` steps (deterministic bound; no +;; unbounded loops in pure SX). +;; +;; Phase 2 combinators (flow-control-src): +;; branch / fail / failed? / fail-reason / try-catch / retry / timeout / tick +;; tap (Phase 5): side-effecting pass-through (returns input unchanged). +;; recover (Phase 5): the fail-VALUE counterpart of try-catch. +;; attempt (Phase 6): railway sequence — thread nodes left-to-right but stop at +;; the first node that returns a (fail ...) value, returning that failure. +;; +;; Phase 3 suspend core (flow-suspend-src): +;; The guest Scheme's call/cc is ESCAPE-ONLY (re-invoking a captured k after it +;; returns hangs the runtime), so suspend/resume CANNOT re-enter a continuation. +;; Instead, durability uses DETERMINISTIC REPLAY: a flow re-runs from the start +;; on each resume; suspend points that have already been resolved replay their +;; logged value, and the first unresolved suspend escapes back to the driver. +;; The entire persisted state is the replay log (plain (tag value) data), which +;; survives process restart — no live continuation is ever serialized. +;; +;; (suspend tag) — if tag is in the replay log, return its value; else escape +;; to the driver as (flow-suspended tag). tags must be unique & deterministic +;; across replays. ALL effects/non-determinism must go through suspend so their +;; results are logged (otherwise they re-run on every replay). +;; (flow-drive flow input log) — run flow with the given replay log; returns +;; (flow-done result) or (flow-suspended tag). + +(define + flow-combinators-src + "(define (flow-node f) f)\n (define (flow-id input) input)\n (define (flow-const v) (lambda (input) v))\n (define (flow-seq-step ns v)\n (if (null? ns) v (flow-seq-step (cdr ns) ((car ns) v))))\n (define sequence (lambda ns (lambda (input) (flow-seq-step ns input))))\n (define parallel (lambda ns (lambda (input) (map (lambda (n) (n input)) ns))))\n (define (map-flow node) (lambda (items) (map node items)))\n (define (flow-while-step pred body input n)\n (if (<= n 0)\n input\n (if (pred input) (flow-while-step pred body (body input) (- n 1)) input)))\n (define (flow-while pred body max) (lambda (input) (flow-while-step pred body input max)))\n (define (flow-until-step pred body input n)\n (if (<= n 0)\n input\n (if (pred input) input (flow-until-step pred body (body input) (- n 1)))))\n (define (flow-until pred body max) (lambda (input) (flow-until-step pred body input max)))\n (define-syntax defflow\n (syntax-rules ()\n ((defflow nm body)\n (begin (define nm body) (flow-register! (quote nm) nm)))))") + +(define + flow-control-src + "(define (branch pred then else)\n (lambda (input) (if (pred input) (then input) (else input))))\n (define (fail reason) (list (quote flow-fail) reason))\n (define (failed? x) (and (pair? x) (eq? (car x) (quote flow-fail))))\n (define (fail-reason x) (car (cdr x)))\n (define (recover node handler)\n (lambda (input)\n (let ((r (node input)))\n (if (failed? r) (handler (fail-reason r)) r))))\n (define (tap effect)\n (lambda (input) (begin (effect input) input)))\n (define (flow-attempt-step ns v)\n (if (failed? v)\n v\n (if (null? ns) v (flow-attempt-step (cdr ns) ((car ns) v)))))\n (define attempt (lambda ns (lambda (input) (flow-attempt-step ns input))))\n (define (try-catch node handler)\n (lambda (input) (guard (e (#t (handler e))) (node input))))\n (define (flow-retry-step n node input)\n (guard (e (#t (if (<= n 1) (raise e) (flow-retry-step (- n 1) node input))))\n (node input)))\n (define (retry n node) (lambda (input) (flow-retry-step n node input)))\n (define flow-timeout-budget -1)\n (define (tick)\n (if (< flow-timeout-budget 0)\n 0\n (begin\n (set! flow-timeout-budget (- flow-timeout-budget 1))\n (if (< flow-timeout-budget 0)\n (raise (quote flow-timeout))\n flow-timeout-budget))))\n (define (timeout budget node)\n (lambda (input)\n (let ((saved flow-timeout-budget))\n (set! flow-timeout-budget budget)\n (guard (e (#t (begin (set! flow-timeout-budget saved) (raise e))))\n (let ((result (node input)))\n (set! flow-timeout-budget saved)\n result)))))") + +(define + flow-suspend-src + "(define flow-replay-log (list))\n (define flow-suspend-k #f)\n (define (flow-log-lookup tag log)\n (if (null? log)\n (list #f #f)\n (if (eq? (car (car log)) tag)\n (list #t (car (cdr (car log))))\n (flow-log-lookup tag (cdr log)))))\n (define (suspend tag)\n (let ((hit (flow-log-lookup tag flow-replay-log)))\n (if (car hit)\n (car (cdr hit))\n (flow-suspend-k (list (quote flow-suspended) tag)))))\n (define (flow-drive flow input log)\n (set! flow-replay-log log)\n (call/cc\n (lambda (k)\n (set! flow-suspend-k k)\n (list (quote flow-done) (flow input)))))") + +(define + flow-load-combinators! + (fn + (env) + (begin + (scheme-eval-program (scheme-parse-all flow-combinators-src) env) + (scheme-eval-program (scheme-parse-all flow-control-src) env) + (scheme-eval-program (scheme-parse-all flow-suspend-src) env) + env))) diff --git a/lib/flow/store.sx b/lib/flow/store.sx new file mode 100644 index 00000000..abd75360 --- /dev/null +++ b/lib/flow/store.sx @@ -0,0 +1,45 @@ +;; lib/flow/store.sx — durable flow store + lifecycle + crash recovery (Phase 3). +;; +;; The store maps flow-id -> record (one entry per id, newest wins). A record is: +;; (name proc input log status payload) +;; name — registered flow name (symbol) or #f for an anonymous flow +;; proc — the live flow procedure (#f after export; re-resolved by name) +;; input — the original start input (plain data) +;; log — replay log: list of (tag value) entries (plain data) +;; status — done | suspended | cancelled +;; payload — result (done) | waiting-tag (suspended) | #f (cancelled) +;; +;; A record is SERIALIZABLE once its proc is nulled (flow-store-export): name, +;; input, and log are plain data. On restart the flow definitions are reloaded +;; (defflow re-registers names), the store is reimported, and resume re-resolves +;; the proc by name — no live continuation is ever persisted. +;; +;; Lifecycle (all use deterministic replay via flow-drive — see spec.sx): +;; (flow/start flow input) — raw result if it completes (backward compatible), +;; else (flow-suspended id tag). +;; (flow/resume id value) — append (tag value) to the log and re-drive. +;; (flow/cancel id) — mark cancelled; a later resume is rejected. +;; Crash recovery: +;; (flow-store-export) — store as plain data (procs nulled) +;; (flow-store-import! d) — replace the store from exported data +;; (flow-resumable-ids) — ids of suspended (resumable) flows +;; Introspection (Phase 5): +;; (flow/status id) — done | suspended | cancelled | unknown +;; (flow/result id) — result if done, else (flow-error reason) +;; (flow/list) — list of (id status) for every flow +;; (flow/pending) — list of (id waiting-tag) for suspended flows +;; Store hygiene (Phase 5): +;; (flow/gc) — drop all terminal (done/cancelled) records, keeping +;; suspended (live) flows; returns the count removed +;; (flow/forget id) — drop one TERMINAL record (#t); refuses to forget a +;; still-suspended flow or an unknown id (#f) + +(define + flow-store-src + "(define flow-registry (list))\n (define (flow-register! name proc) (set! flow-registry (cons (list name proc) flow-registry)))\n (define (flow-lookup-in name reg)\n (if (null? reg)\n #f\n (if (eq? (car (car reg)) name) (car (cdr (car reg))) (flow-lookup-in name (cdr reg)))))\n (define (flow-lookup name) (flow-lookup-in name flow-registry))\n (define (flow-name-of proc reg)\n (if (null? reg)\n #f\n (if (eq? (car (cdr (car reg))) proc) (car (car reg)) (flow-name-of proc (cdr reg)))))\n\n (define flow-store (list))\n (define flow-next-id 0)\n (define (flow-store-remove id store)\n (if (null? store)\n (list)\n (if (= (car (car store)) id)\n (flow-store-remove id (cdr store))\n (cons (car store) (flow-store-remove id (cdr store))))))\n (define (flow-store-put! id rec) (set! flow-store (cons (list id rec) (flow-store-remove id flow-store))))\n (define (flow-store-find id store)\n (if (null? store)\n (list)\n (if (= (car (car store)) id)\n (car (cdr (car store)))\n (flow-store-find id (cdr store)))))\n (define (flow-store-get id) (flow-store-find id flow-store))\n\n (define (flow-mk-rec name proc input log status payload)\n (list name proc input log status payload))\n (define (flow-rec-name r) (car r))\n (define (flow-rec-proc r) (car (cdr r)))\n (define (flow-rec-input r) (car (cdr (cdr r))))\n (define (flow-rec-log r) (car (cdr (cdr (cdr r)))))\n (define (flow-rec-status r) (car (cdr (cdr (cdr (cdr r))))))\n (define (flow-rec-payload r) (car (cdr (cdr (cdr (cdr (cdr r)))))))\n (define (flow-rec-resolve rec)\n (let ((byname (flow-lookup (flow-rec-name rec))))\n (if byname byname (flow-rec-proc rec))))\n\n (define (flow-outcome id name proc input log outcome)\n (if (eq? (car outcome) (quote flow-done))\n (begin\n (flow-store-put! id (flow-mk-rec name proc input log (quote done) (car (cdr outcome))))\n (car (cdr outcome)))\n (begin\n (flow-store-put! id (flow-mk-rec name proc input log (quote suspended) (car (cdr outcome))))\n (list (quote flow-suspended) id (car (cdr outcome))))))\n (define (flow/start flow input)\n (set! flow-next-id (+ flow-next-id 1))\n (flow-outcome flow-next-id (flow-name-of flow flow-registry) flow input (list)\n (flow-drive flow input (list))))\n (define (flow/resume id value)\n (let ((rec (flow-store-get id)))\n (if (null? rec)\n (list (quote flow-error) (quote no-such-flow))\n (if (eq? (flow-rec-status rec) (quote suspended))\n (let ((proc (flow-rec-resolve rec)))\n (let ((newlog (cons (list (flow-rec-payload rec) value) (flow-rec-log rec))))\n (flow-outcome id (flow-rec-name rec) proc (flow-rec-input rec) newlog\n (flow-drive proc (flow-rec-input rec) newlog))))\n (list (quote flow-error) (quote not-suspended))))))\n (define (flow/cancel id)\n (let ((rec (flow-store-get id)))\n (if (null? rec)\n (list (quote flow-error) (quote no-such-flow))\n (begin\n (flow-store-put! id\n (flow-mk-rec (flow-rec-name rec) (flow-rec-proc rec) (flow-rec-input rec)\n (flow-rec-log rec) (quote cancelled) #f))\n (list (quote flow-cancelled) id)))))\n\n (define (flow-export-entry entry)\n (let ((rec (car (cdr entry))))\n (list (car entry)\n (flow-mk-rec (flow-rec-name rec) #f (flow-rec-input rec)\n (flow-rec-log rec) (flow-rec-status rec) (flow-rec-payload rec)))))\n (define (flow-export-map store)\n (if (null? store) (list) (cons (flow-export-entry (car store)) (flow-export-map (cdr store)))))\n (define (flow-store-export) (flow-export-map flow-store))\n (define (flow-max-id store m)\n (if (null? store) m (flow-max-id (cdr store) (if (> (car (car store)) m) (car (car store)) m))))\n (define (flow-store-import! data)\n (begin (set! flow-store data) (set! flow-next-id (flow-max-id data 0))))\n (define (flow-collect-resumable store)\n (if (null? store)\n (list)\n (if (eq? (flow-rec-status (car (cdr (car store)))) (quote suspended))\n (cons (car (car store)) (flow-collect-resumable (cdr store)))\n (flow-collect-resumable (cdr store)))))\n (define (flow-resumable-ids) (flow-collect-resumable flow-store))\n\n (define (flow/status id)\n (let ((rec (flow-store-get id)))\n (if (null? rec) (quote unknown) (flow-rec-status rec))))\n (define (flow/result id)\n (let ((rec (flow-store-get id)))\n (if (null? rec)\n (list (quote flow-error) (quote no-such-flow))\n (if (eq? (flow-rec-status rec) (quote done))\n (flow-rec-payload rec)\n (list (quote flow-error) (quote not-done))))))\n (define (flow-list-step store)\n (if (null? store)\n (list)\n (cons (list (car (car store)) (flow-rec-status (car (cdr (car store)))))\n (flow-list-step (cdr store)))))\n (define (flow/list) (flow-list-step flow-store))\n (define (flow-pending-step store)\n (if (null? store)\n (list)\n (if (eq? (flow-rec-status (car (cdr (car store)))) (quote suspended))\n (cons (list (car (car store)) (flow-rec-payload (car (cdr (car store)))))\n (flow-pending-step (cdr store)))\n (flow-pending-step (cdr store)))))\n (define (flow/pending) (flow-pending-step flow-store))\n\n (define (flow-store-count store) (if (null? store) 0 (+ 1 (flow-store-count (cdr store)))))\n (define (flow-gc-keep store)\n (if (null? store)\n (list)\n (if (eq? (flow-rec-status (car (cdr (car store)))) (quote suspended))\n (cons (car store) (flow-gc-keep (cdr store)))\n (flow-gc-keep (cdr store)))))\n (define (flow/gc)\n (let ((before (flow-store-count flow-store)))\n (set! flow-store (flow-gc-keep flow-store))\n (- before (flow-store-count flow-store))))\n (define (flow/forget id)\n (let ((rec (flow-store-get id)))\n (if (null? rec)\n #f\n (if (eq? (flow-rec-status rec) (quote suspended))\n #f\n (begin (set! flow-store (flow-store-remove id flow-store)) #t)))))") + +(define + flow-load-store! + (fn + (env) + (begin (scheme-eval-program (scheme-parse-all flow-store-src) env) env))) diff --git a/lib/flow/tests/api.sx b/lib/flow/tests/api.sx new file mode 100644 index 00000000..6211b0f0 --- /dev/null +++ b/lib/flow/tests/api.sx @@ -0,0 +1,79 @@ +;; lib/flow/tests/api.sx — Phase 5: operational introspection API. + +(define flow-api-pass 0) +(define flow-api-fail 0) +(define flow-api-fails (list)) + +(define + flow-api-test + (fn + (name actual expected) + (if + (= actual expected) + (set! flow-api-pass (+ flow-api-pass 1)) + (begin + (set! flow-api-fail (+ flow-api-fail 1)) + (append! flow-api-fails {:name name :expected expected :actual actual}))))) + +(define flow-a (fn (src) (flow-run src))) + +;; ── flow/status ───────────────────────────────────────────────── +(flow-api-test "status: unknown id" (flow-a "(flow/status 999)") "unknown") +(flow-api-test + "status: suspended flow" + (flow-a + "(defflow w (lambda (x) (suspend (quote q)))) (define id (car (cdr (flow/start w 0)))) (flow/status id)") + "suspended") +(flow-api-test + "status: completed flow" + (flow-a + "(defflow w (sequence (lambda (x) (suspend (quote q))) (lambda (v) v))) (define id (car (cdr (flow/start w 0)))) (flow/resume id 5) (flow/status id)") + "done") +(flow-api-test + "status: cancelled flow" + (flow-a + "(defflow w (lambda (x) (suspend (quote q)))) (define id (car (cdr (flow/start w 0)))) (flow/cancel id) (flow/status id)") + "cancelled") + +;; ── flow/result ───────────────────────────────────────────────── +(flow-api-test + "result: returns the value of a completed flow" + (flow-a + "(defflow w (sequence (lambda (x) (suspend (quote q))) (lambda (v) (list (quote got) v)))) (define id (car (cdr (flow/start w 0)))) (flow/resume id 9) (flow/result id)") + (list "got" 9)) +(flow-api-test + "result: a still-suspended flow has no result" + (flow-a + "(defflow w (lambda (x) (suspend (quote q)))) (define id (car (cdr (flow/start w 0)))) (flow/result id)") + (list "flow-error" "not-done")) +(flow-api-test + "result: unknown id errors" + (flow-a "(flow/result 999)") + (list "flow-error" "no-such-flow")) + +;; ── flow/list ─────────────────────────────────────────────────── +(flow-api-test "list: empty store" (flow-a "(flow/list)") (list)) +(flow-api-test + "list: reports id + status for each flow (newest first)" + (flow-a + "(defflow w (lambda (x) (suspend (quote q)))) (flow/start w 0) (flow/start (lambda (x) (* x 2)) 5) (flow/list)") + (list (list 2 "done") (list 1 "suspended"))) + +;; ── flow/pending ──────────────────────────────────────────────── +(flow-api-test + "pending: lists suspended flows with their waiting tag" + (flow-a + "(defflow w (lambda (x) (suspend (quote review)))) (flow/start w 0) (flow/pending)") + (list (list 1 "review"))) +(flow-api-test + "pending: excludes completed and cancelled flows" + (flow-a + "(defflow w (lambda (x) (suspend (quote q)))) (defflow v (sequence (lambda (x) (suspend (quote r))) (lambda (y) y))) (define i1 (car (cdr (flow/start w 0)))) (define i2 (car (cdr (flow/start v 0)))) (define i3 (car (cdr (flow/start w 0)))) (flow/resume i2 1) (flow/cancel i3) (flow/pending)") + (list (list 1 "q"))) +(flow-api-test + "pending: operator can drain all pending flows" + (flow-a + "(defflow w (sequence (lambda (x) (suspend (quote q))) (lambda (v) (* v 10)))) (flow/start w 0) (flow/start w 0) (define ps (flow/pending)) (flow/resume (car (car ps)) 1) (flow/resume (car (car (cdr ps))) 2) (flow/list)") + (list (list 1 "done") (list 2 "done"))) + +(define flow-api-tests-run! (fn () {:total (+ flow-api-pass flow-api-fail) :passed flow-api-pass :failed flow-api-fail :fails flow-api-fails})) diff --git a/lib/flow/tests/basic.sx b/lib/flow/tests/basic.sx new file mode 100644 index 00000000..da9d8972 --- /dev/null +++ b/lib/flow/tests/basic.sx @@ -0,0 +1,121 @@ +;; lib/flow/tests/basic.sx — Phase 1: declarative DAG + sequential execution. + +(define flow-basic-pass 0) +(define flow-basic-fail 0) +(define flow-basic-fails (list)) + +(define + flow-basic-test + (fn + (name actual expected) + (if + (= actual expected) + (set! flow-basic-pass (+ flow-basic-pass 1)) + (begin + (set! flow-basic-fail (+ flow-basic-fail 1)) + (append! flow-basic-fails {:name name :expected expected :actual actual}))))) + +;; Run a Scheme flow-program string and return its final value. +(define flow-b (fn (src) (flow-run src))) + +;; Scheme strings are boxed as {:scm-string "..."}; unwrap to a host string. +(define flow-bs (fn (src) (get (flow-run src) :scm-string))) + +;; ── single node ───────────────────────────────────────────────── +(flow-basic-test + "node: identity passes input through" + (flow-b "(flow/start flow-id 7)") + 7) +(flow-basic-test + "node: const ignores input" + (flow-b "(flow/start (flow-const 99) 1)") + 99) +(flow-basic-test + "node: bare lambda is a node" + (flow-b "(flow/start (lambda (x) (* x x)) 6)") + 36) + +;; ── linear sequence ───────────────────────────────────────────── +(flow-basic-test + "sequence: empty is identity" + (flow-b "(flow/start (sequence) 42)") + 42) +(flow-basic-test + "sequence: single child" + (flow-b "(flow/start (sequence (lambda (x) (+ x 1))) 41)") + 42) +(flow-basic-test + "sequence: two children thread" + (flow-b + "(flow/start (sequence (lambda (x) (+ x 1)) (lambda (x) (* x 10))) 4)") + 50) +(flow-basic-test + "sequence: three children thread" + (flow-b + "(flow/start (sequence (lambda (x) (+ x 1)) (lambda (x) (* x 2)) (lambda (x) (- x 3))) 5)") + 9) + +;; ── data flow between nodes ───────────────────────────────────── +(flow-basic-test + "data flow: string accumulation" + (flow-bs + "(flow/start (sequence (lambda (s) (string-append s \"-a\")) (lambda (s) (string-append s \"-b\"))) \"x\")") + "x-a-b") +(flow-basic-test + "data flow: list build" + (flow-b + "(flow/start (sequence (lambda (x) (cons x (list))) (lambda (xs) (cons 0 xs))) 7)") + (list 0 7)) + +;; ── defflow ───────────────────────────────────────────────────── +(flow-basic-test + "defflow: names a flow" + (flow-b + "(defflow inc2 (sequence (lambda (x) (+ x 1)) (lambda (x) (+ x 1)))) (flow/start inc2 40)") + 42) +(flow-basic-test + "defflow: reusable" + (flow-b + "(defflow dbl (lambda (x) (* x 2))) (+ (flow/start dbl 3) (flow/start dbl 10))") + 26) + +;; ── parallel (sequential semantics, join into list) ───────────── +(flow-basic-test + "parallel: fans input to all branches" + (flow-b + "(flow/start (parallel (lambda (x) (+ x 1)) (lambda (x) (* x 2)) (lambda (x) (- x 3))) 10)") + (list 11 20 7)) +(flow-basic-test + "parallel: empty joins to empty list" + (flow-b "(flow/start (parallel) 5)") + (list)) +(flow-basic-test + "parallel: single branch" + (flow-b "(flow/start (parallel (lambda (x) (* x x))) 9)") + (list 81)) + +;; ── nested composition ────────────────────────────────────────── +(flow-basic-test + "nested: sequence of sequences" + (flow-b + "(flow/start (sequence (sequence (lambda (x) (+ x 1)) (lambda (x) (+ x 1))) (sequence (lambda (x) (* x 3)))) 0)") + 6) +(flow-basic-test + "nested: parallel inside sequence, join then reduce" + (flow-b + "(flow/start (sequence (parallel (lambda (x) (+ x 1)) (lambda (x) (* x 2))) (lambda (xs) (apply + xs))) 10)") + 31) +(flow-basic-test + "nested: sequence inside parallel branch" + (flow-b + "(flow/start (parallel (sequence (lambda (x) (+ x 1)) (lambda (x) (* x 2))) (lambda (x) x)) 5)") + (list 12 5)) + +;; ── publish-shaped flow (the architecture sketch) ─────────────── +(flow-basic-test + "publish: write -> (review | spell) -> join lengths" + (flow-b + "(defflow publish (sequence (lambda (draft) (string-append draft \"!\")) (parallel (lambda (c) (string-length c)) (lambda (c) (string-length (string-append c \"?\")))))) (flow/start publish \"hi\")") + (list 3 4)) + +(define flow-basic-tests-run! (fn () {:total (+ flow-basic-pass flow-basic-fail) :passed flow-basic-pass :failed flow-basic-fail :fails flow-basic-fails})) diff --git a/lib/flow/tests/combinators.sx b/lib/flow/tests/combinators.sx new file mode 100644 index 00000000..7931bfeb --- /dev/null +++ b/lib/flow/tests/combinators.sx @@ -0,0 +1,108 @@ +;; lib/flow/tests/combinators.sx — Phase 5: combinator library (tap, recover, map-flow, iteration). + +(define flow-cmb-pass 0) +(define flow-cmb-fail 0) +(define flow-cmb-fails (list)) + +(define + flow-cmb-test + (fn + (name actual expected) + (if + (= actual expected) + (set! flow-cmb-pass (+ flow-cmb-pass 1)) + (begin + (set! flow-cmb-fail (+ flow-cmb-fail 1)) + (append! flow-cmb-fails {:name name :expected expected :actual actual}))))) + +(define flow-m (fn (src) (flow-run src))) + +;; ── tap (side-effecting pass-through) ─────────────────────────── +(flow-cmb-test + "tap: returns input unchanged" + (flow-m "(flow/start (tap (lambda (x) (* x 999))) 7)") + 7) +(flow-cmb-test + "tap: runs the side effect" + (flow-m + "(define seen 0) (flow/start (tap (lambda (x) (set! seen x))) 42) seen") + 42) +(flow-cmb-test + "tap: value flows on while the effect observes it" + (flow-m + "(define log 0) (flow/start (sequence (lambda (x) (+ x 1)) (tap (lambda (x) (set! log x))) (lambda (x) (* x 2))) 10) (list log (flow/result 1))") + (list 11 22)) + +;; ── recover (fail-value counterpart of try-catch) ─────────────── +(flow-cmb-test + "recover: passes a non-fail value through" + (flow-m "(flow/start (recover (lambda (x) (* x 2)) (lambda (r) -1)) 5)") + 10) +(flow-cmb-test + "recover: handles a fail value via the reason" + (flow-m + "(flow/start (recover (lambda (x) (fail (quote too-small))) (lambda (r) (list (quote recovered) r))) 1)") + (list "recovered" "too-small")) +(flow-cmb-test + "recover: handler can supply a default value" + (flow-m + "(flow/start (sequence (recover (lambda (x) (if (> x 0) x (fail (quote neg))) ) (flow-const 0)) (lambda (x) (* x 10))) -3)") + 0) +(flow-cmb-test + "recover: does not catch raised exceptions (those are try-catch's job)" + (flow-m + "(flow/start (try-catch (recover (lambda (x) (raise (quote boom))) (flow-const 0)) (lambda (e) e)) 1)") + "boom") + +;; ── map-flow (run a node over a list, join) ───────────────────── +(flow-cmb-test + "map-flow: applies the node to each item" + (flow-m "(flow/start (map-flow (lambda (x) (* x x))) (list 1 2 3 4))") + (list 1 4 9 16)) +(flow-cmb-test + "map-flow: empty list joins to empty" + (flow-m "(flow/start (map-flow (lambda (x) (+ x 1))) (list))") + (list)) +(flow-cmb-test + "map-flow: each item runs an independent sub-flow" + (flow-m + "(flow/start (map-flow (sequence (lambda (x) (+ x 1)) (lambda (x) (* x 2)))) (list 0 4 9))") + (list 2 10 20)) +(flow-cmb-test + "map-flow: composes — fan over a list then reduce the join" + (flow-m + "(flow/start (sequence (map-flow (lambda (x) (* x 10))) (lambda (xs) (apply + xs))) (list 1 2 3))") + 60) + +;; ── flow-while / flow-until (bounded iteration) ───────────────── +(flow-cmb-test + "flow-while: iterates while the predicate holds" + (flow-m + "(flow/start (flow-while (lambda (x) (< x 10)) (lambda (x) (+ x 1)) 100) 0)") + 10) +(flow-cmb-test + "flow-while: a false predicate leaves input unchanged" + (flow-m + "(flow/start (flow-while (lambda (x) (< x 0)) (lambda (x) (+ x 1)) 100) 5)") + 5) +(flow-cmb-test + "flow-while: respects the max-iteration bound" + (flow-m "(flow/start (flow-while (lambda (x) #t) (lambda (x) (+ x 1)) 3) 0)") + 3) +(flow-cmb-test + "flow-while: doubles until past a threshold" + (flow-m + "(flow/start (flow-while (lambda (x) (< x 50)) (lambda (x) (* x 2)) 100) 3)") + 96) +(flow-cmb-test + "flow-until: iterates until the predicate becomes true" + (flow-m + "(flow/start (flow-until (lambda (x) (>= x 10)) (lambda (x) (+ x 3)) 100) 0)") + 12) +(flow-cmb-test + "flow-until: composes inside a sequence" + (flow-m + "(flow/start (sequence (flow-until (lambda (x) (> x 100)) (lambda (x) (* x 3)) 100) (lambda (x) (- x 100))) 5)") + 35) + +(define flow-cmb-tests-run! (fn () {:total (+ flow-cmb-pass flow-cmb-fail) :passed flow-cmb-pass :failed flow-cmb-fail :fails flow-cmb-fails})) diff --git a/lib/flow/tests/control.sx b/lib/flow/tests/control.sx new file mode 100644 index 00000000..e4fc086a --- /dev/null +++ b/lib/flow/tests/control.sx @@ -0,0 +1,179 @@ +;; lib/flow/tests/control.sx — Phase 2: control flow + error handling. + +(define flow-ctl-pass 0) +(define flow-ctl-fail 0) +(define flow-ctl-fails (list)) + +(define + flow-ctl-test + (fn + (name actual expected) + (if + (= actual expected) + (set! flow-ctl-pass (+ flow-ctl-pass 1)) + (begin + (set! flow-ctl-fail (+ flow-ctl-fail 1)) + (append! flow-ctl-fails {:name name :expected expected :actual actual}))))) + +(define flow-c (fn (src) (flow-run src))) +(define flow-cs (fn (src) (get (flow-run src) :scm-string))) + +;; ── branch ────────────────────────────────────────────────────── +(flow-ctl-test + "branch: true selects then" + (flow-c + "(flow/start (branch (lambda (x) (> x 0)) (lambda (x) (* x 100)) (lambda (x) (- 0 x))) 5)") + 500) +(flow-ctl-test + "branch: false selects else" + (flow-c + "(flow/start (branch (lambda (x) (> x 0)) (lambda (x) (* x 100)) (lambda (x) (- 0 x))) -3)") + 3) +(flow-ctl-test + "branch: predicate sees the threaded input" + (flow-c + "(flow/start (sequence (lambda (x) (+ x 1)) (branch (lambda (x) (> x 3)) (flow-const 100) (flow-const 0))) 3)") + 100) +(flow-ctl-test + "branch: branches are full nodes (sequence inside)" + (flow-c + "(flow/start (branch (lambda (x) (< x 10)) (sequence (lambda (x) (+ x 1)) (lambda (x) (* x 2))) (flow-const 0)) 4)") + 10) +(flow-ctl-test + "branch: nested branch (3-way sign)" + (flow-c + "(defflow sign (branch (lambda (x) (> x 0)) (flow-const 1) (branch (lambda (x) (< x 0)) (flow-const -1) (flow-const 0)))) (list (flow/start sign 7) (flow/start sign -7) (flow/start sign 0))") + (list 1 -1 0)) +(flow-ctl-test + "branch: publish-shaped approval gate" + (flow-cs + "(defflow publish (branch (lambda (post) (>= (string-length post) 3)) (lambda (post) (string-append post \" [published]\")) (lambda (post) (string-append post \" [rejected]\")))) (flow/start publish \"ok\")") + "ok [rejected]") + +;; ── error model — explicit (fail reason) values ───────────────── +(flow-ctl-test + "fail: failed? is true for a failure value" + (flow-c "(failed? (fail 404))") + true) +(flow-ctl-test + "fail: fail-reason extracts the reason" + (flow-c "(fail-reason (fail 404))") + 404) +(flow-ctl-test + "fail: failed? is false for a plain value" + (flow-c "(failed? 7)") + false) +(flow-ctl-test + "fail: failed? is false for an ordinary list" + (flow-c "(failed? (list 1 2 3))") + false) +(flow-ctl-test + "fail: a node may emit a failure as data" + (flow-c + "(defflow validate (lambda (s) (if (>= (string-length s) 3) s (fail (quote too-short))))) (failed? (flow/start validate \"hi\"))") + true) +(flow-ctl-test + "fail: failure flows downstream, branch recovers" + (flow-c + "(defflow guarded (sequence (lambda (s) (if (>= (string-length s) 3) (string-length s) (fail (quote too-short)))) (branch failed? (lambda (f) (list (quote recovered) (fail-reason f))) (lambda (n) (list (quote ok) n))))) (flow/start guarded \"hi\")") + (list "recovered" "too-short")) + +;; ── try-catch — reify raised exceptions ───────────────────────── +(flow-ctl-test + "try-catch: no exception returns node result" + (flow-c "(flow/start (try-catch (lambda (x) (* x 2)) (lambda (e) -1)) 5)") + 10) +(flow-ctl-test + "try-catch: handler runs on raise" + (flow-c + "(flow/start (try-catch (lambda (x) (raise (quote boom))) (flow-const 99)) 1)") + 99) +(flow-ctl-test + "try-catch: handler receives the reified error" + (flow-c "(flow/start (try-catch (lambda (x) (raise 42)) (lambda (e) e)) 0)") + 42) +(flow-ctl-test + "try-catch: catches exception from deep inside a sequence" + (flow-c + "(flow/start (try-catch (sequence (lambda (x) (+ x 1)) (lambda (x) (raise (quote deep)))) (flow-const -99)) 5)") + -99) +(flow-ctl-test + "try-catch: handler may convert to a failure value" + (flow-c + "(failed? (flow/start (try-catch (lambda (x) (raise (quote bad))) (lambda (e) (fail e))) 0))") + true) +(flow-ctl-test + "try-catch: composes — recover then continue" + (flow-c + "(flow/start (sequence (try-catch (lambda (x) (raise (quote x))) (flow-const 10)) (lambda (n) (* n 5))) 0)") + 50) + +;; ── retry — re-run on raised exceptions ───────────────────────── +(flow-ctl-test + "retry: succeeds after transient failures" + (flow-c + "(define ctr 0) (defflow flaky (lambda (x) (set! ctr (+ ctr 1)) (if (< ctr 3) (raise (quote nope)) (* x 10)))) (list (flow/start (retry 5 flaky) 7) ctr)") + (list 70 3)) +(flow-ctl-test + "retry: exhausted re-raises (caught by try-catch)" + (flow-c + "(flow/start (try-catch (retry 2 (lambda (x) (raise (quote always)))) (flow-const (quote gaveup))) 0)") + "gaveup") +(flow-ctl-test + "retry: n=1 means a single attempt" + (flow-c + "(define ctr 0) (flow/start (try-catch (retry 1 (lambda (x) (set! ctr (+ ctr 1)) (raise (quote bad)))) (lambda (e) ctr)) 0)") + 1) +(flow-ctl-test + "retry: success on first attempt does not re-run" + (flow-c + "(define ctr 0) (flow/start (sequence (retry 5 (lambda (x) (set! ctr (+ ctr 1)) (* x 2))) (lambda (n) ctr)) 21)") + 1) +(flow-ctl-test + "retry: does not retry explicit failure values" + (flow-c + "(define ctr 0) (failed? (flow/start (retry 5 (lambda (x) (set! ctr (+ ctr 1)) (fail (quote bad)))) 0))") + true) +(flow-ctl-test + "retry: failure-value path runs node exactly once" + (flow-c + "(define ctr 0) (flow/start (sequence (retry 5 (lambda (x) (set! ctr (+ ctr 1)) (fail (quote bad)))) (lambda (f) ctr)) 0)") + 1) + +;; ── timeout — cooperative step budget ─────────────────────────── +(flow-ctl-test + "timeout: work within budget completes" + (flow-c + "(define (cd n) (if (<= n 0) 99 (begin (tick) (cd (- n 1))))) (flow/start (try-catch (timeout 10 (lambda (x) (cd x))) (flow-const (quote timed-out))) 5)") + 99) +(flow-ctl-test + "timeout: work exceeding budget raises flow-timeout" + (flow-c + "(define (cd n) (if (<= n 0) 99 (begin (tick) (cd (- n 1))))) (flow/start (try-catch (timeout 10 (lambda (x) (cd x))) (flow-const (quote timed-out))) 20)") + "timed-out") +(flow-ctl-test + "timeout: exact budget boundary completes" + (flow-c + "(define (cd n) (if (<= n 0) 99 (begin (tick) (cd (- n 1))))) (flow/start (try-catch (timeout 5 (lambda (x) (cd x))) (flow-const (quote timed-out))) 5)") + 99) +(flow-ctl-test + "timeout: one tick over the budget raises" + (flow-c + "(define (cd n) (if (<= n 0) 99 (begin (tick) (cd (- n 1))))) (flow/start (try-catch (timeout 5 (lambda (x) (cd x))) (flow-const (quote timed-out))) 6)") + "timed-out") +(flow-ctl-test + "timeout: the raised error is identifiable" + (flow-c + "(define (cd n) (if (<= n 0) 99 (begin (tick) (cd (- n 1))))) (flow/start (try-catch (timeout 2 (lambda (x) (cd x))) (lambda (e) e)) 9)") + "flow-timeout") +(flow-ctl-test + "timeout: a node that never ticks is unbounded" + (flow-c "(flow/start (timeout 0 (lambda (x) (* x 2))) 5)") + 10) +(flow-ctl-test + "timeout: budget is restored across sequential timeouts" + (flow-c + "(define (cd n) (if (<= n 0) 1 (begin (tick) (cd (- n 1))))) (flow/start (sequence (timeout 4 (lambda (x) (cd x))) (timeout 4 (lambda (x) (cd 3))) (lambda (x) (begin (tick) (+ x 100)))) 3)") + 101) + +(define flow-ctl-tests-run! (fn () {:total (+ flow-ctl-pass flow-ctl-fail) :passed flow-ctl-pass :failed flow-ctl-fail :fails flow-ctl-fails})) diff --git a/lib/flow/tests/distributed.sx b/lib/flow/tests/distributed.sx new file mode 100644 index 00000000..cd6bbb49 --- /dev/null +++ b/lib/flow/tests/distributed.sx @@ -0,0 +1,120 @@ +;; lib/flow/tests/distributed.sx — Phase 4: distributed nodes via fed-sx (mocked). + +(define flow-dist-pass 0) +(define flow-dist-fail 0) +(define flow-dist-fails (list)) + +(define + flow-dist-test + (fn + (name actual expected) + (if + (= actual expected) + (set! flow-dist-pass (+ flow-dist-pass 1)) + (begin + (set! flow-dist-fail (+ flow-dist-fail 1)) + (append! flow-dist-fails {:name name :expected expected :actual actual}))))) + +(define flow-d (fn (src) (flow-run src))) + +;; ── remote-node ───────────────────────────────────────────────── +(flow-dist-test + "remote: a node executes on a peer" + (flow-d + "(flow-peer-register! (quote edge) (list (list (quote double) (lambda (x) (* x 2))))) (flow/start (remote-node (quote edge) (quote double)) 21)") + 42) +(flow-dist-test + "remote: remote nodes compose in a sequence" + (flow-d + "(flow-peer-register! (quote edge) (list (list (quote inc) (lambda (x) (+ x 1))) (list (quote double) (lambda (x) (* x 2))))) (flow/start (sequence (remote-node (quote edge) (quote inc)) (remote-node (quote edge) (quote double))) 4)") + 10) +(flow-dist-test + "remote: a remote node mixes with local nodes" + (flow-d + "(flow-peer-register! (quote edge) (list (list (quote double) (lambda (x) (* x 2))))) (flow/start (sequence (lambda (x) (+ x 5)) (remote-node (quote edge) (quote double)) (lambda (x) (- x 1))) 10)") + 29) +(flow-dist-test + "remote: unreachable peer raises flow-remote-unreachable" + (flow-d + "(flow/start (try-catch (remote-node (quote ghost) (quote double)) (lambda (e) e)) 1)") + "flow-remote-unreachable") +(flow-dist-test + "remote: unknown function on a peer raises flow-remote-no-fn" + (flow-d + "(flow-peer-register! (quote edge) (list (list (quote double) (lambda (x) (* x 2))))) (flow/start (try-catch (remote-node (quote edge) (quote missing)) (lambda (e) e)) 1)") + "flow-remote-no-fn") +(flow-dist-test + "remote: a remote node can suspend the flow (peer returns control)" + (flow-d + "(flow-peer-register! (quote edge) (list (list (quote review) (lambda (x) x)))) (flow/start (sequence (remote-node (quote edge) (quote review)) (lambda (x) (suspend (quote human))) (lambda (v) (list (quote published) v))) 7)") + (list "flow-suspended" 1 "human")) +(flow-dist-test + "remote: a transient remote failure is recoverable with retry" + (flow-d + "(define hits 0) (flow-peer-register! (quote edge) (list (list (quote flaky) (lambda (x) (begin (set! hits (+ hits 1)) (if (< hits 2) (raise (quote down)) (* x 3))))))) (list (flow/start (retry 3 (remote-node (quote edge) (quote flaky))) 7) hits)") + (list 21 2)) + +;; ── failover (retry on a different peer, fall through to local) ── +(flow-dist-test + "failover: first reachable peer serves the request" + (flow-d + "(flow-peer-register! (quote p2) (list (list (quote f) (lambda (x) (+ x 100))))) (flow/start (remote-failover (list (quote p2) (quote down)) (quote f) (flow-const (quote local))) 5)") + 105) +(flow-dist-test + "failover: skips an unreachable peer to the next one" + (flow-d + "(flow-peer-register! (quote p2) (list (list (quote f) (lambda (x) (+ x 100))))) (flow/start (remote-failover (list (quote down) (quote p2)) (quote f) (flow-const (quote local))) 5)") + 105) +(flow-dist-test + "failover: skips a peer whose function raises" + (flow-d + "(flow-peer-register! (quote bad) (list (list (quote f) (lambda (x) (raise (quote boom)))))) (flow-peer-register! (quote good) (list (list (quote f) (lambda (x) (* x 10))))) (flow/start (remote-failover (list (quote bad) (quote good)) (quote f) (flow-const 0)) 4)") + 40) +(flow-dist-test + "failover: all peers fail, the local fallback runs" + (flow-d + "(flow/start (remote-failover (list (quote down1) (quote down2)) (quote f) (lambda (x) (* x -1))) 9)") + -9) +(flow-dist-test + "failover: threads the input through to the chosen peer" + (flow-d + "(flow-peer-register! (quote p) (list (list (quote f) (lambda (x) (list (quote got) x))))) (flow/start (sequence (lambda (x) (+ x 1)) (remote-failover (list (quote p)) (quote f) (flow-const 0))) 41)") + (list "got" 42)) +(flow-dist-test + "failover: composes inside a larger sequence" + (flow-d + "(flow-peer-register! (quote p) (list (list (quote f) (lambda (x) (* x 2))))) (flow/start (sequence (remote-failover (list (quote down) (quote p)) (quote f) (flow-const 1)) (lambda (x) (+ x 3))) 5)") + 13) + +;; ── replication + handoff ─────────────────────────────────────── +(flow-dist-test + "replicate: a peer holds the exported store" + (flow-d + "(defflow w (lambda (x) (suspend (quote q)))) (flow/start w 10) (flow-replicate-to (quote peerB)) (if (flow-replica-get (quote peerB)) (quote replicated) (quote missing))") + "replicated") +(flow-dist-test + "handoff: a peer resumes a flow after the local instance dies" + (flow-d + "(defflow w (sequence (lambda (x) (suspend (quote q))) (lambda (v) (list (quote done) v)))) (define id (car (cdr (flow/start w 10)))) (flow-replicate-to (quote peerB)) (set! flow-store (list)) (flow-restore-from (quote peerB)) (flow/resume id 55)") + (list "done" 55)) +(flow-dist-test + "handoff: restored peer reports the flow as resumable" + (flow-d + "(defflow w (lambda (x) (suspend (quote q)))) (define id (car (cdr (flow/start w 10)))) (flow-replicate-to (quote peerB)) (set! flow-store (list)) (flow-restore-from (quote peerB)) (flow-resumable-ids)") + (list 1)) +(flow-dist-test + "handoff: without restore the dead instance has lost the flow" + (flow-d + "(defflow w (lambda (x) (suspend (quote q)))) (define id (car (cdr (flow/start w 10)))) (flow-replicate-to (quote peerB)) (set! flow-store (list)) (flow/resume id 1)") + (list "flow-error" "no-such-flow")) +(flow-dist-test + "restore: from an unknown peer yields false" + (flow-d "(flow-restore-from (quote nowhere))") + false) +(flow-dist-test + "handoff: replication preserves the replay log across the move" + (flow-d + "(defflow two (sequence (lambda (x) (suspend (quote a))) (lambda (x) (suspend (quote b))) (lambda (x) (list x)))) (define id (car (cdr (flow/start two 0)))) (flow/resume id 11) (flow-replicate-to (quote peerB)) (set! flow-store (list)) (flow-restore-from (quote peerB)) (flow/resume id 22)") + (list 22)) + +(define flow-dist-tests-run! (fn () {:total (+ flow-dist-pass flow-dist-fail) :passed flow-dist-pass :failed flow-dist-fail :fails flow-dist-fails})) diff --git a/lib/flow/tests/host.sx b/lib/flow/tests/host.sx new file mode 100644 index 00000000..d0e8335a --- /dev/null +++ b/lib/flow/tests/host.sx @@ -0,0 +1,106 @@ +;; lib/flow/tests/host.sx — Phase 8: host integration ABI (request/await/host-queue/driver). + +(define flow-hst-pass 0) +(define flow-hst-fail 0) +(define flow-hst-fails (list)) + +(define + flow-hst-test + (fn + (name actual expected) + (if + (= actual expected) + (set! flow-hst-pass (+ flow-hst-pass 1)) + (begin + (set! flow-hst-fail (+ flow-hst-fail 1)) + (append! flow-hst-fails {:name name :expected expected :actual actual}))))) + +(define flow-hst (fn (src) (flow-run src))) + +;; ── request envelope ──────────────────────────────────────────── +(flow-hst-test + "request: suspends with a typed envelope" + (flow-hst + "(car (cdr (cdr (flow/start (lambda (x) (request (quote render) x)) 5))))") + (list "flow-request" "render" 5)) +(flow-hst-test + "request?: recognizes an envelope" + (flow-hst "(request? (list (quote flow-request) (quote human) 1))") + true) +(flow-hst-test + "request?: a plain tag is not a request" + (flow-hst "(request? (list (quote review) 1))") + false) +(flow-hst-test + "request-kind / request-payload: parse the envelope" + (flow-hst + "(define t (list (quote flow-request) (quote render) (list (quote recipe) 7))) (list (request-kind t) (request-payload t))") + (list "render" (list "recipe" 7))) + +;; ── named decision points ─────────────────────────────────────── +(flow-hst-test + "await-human: is a request of kind human" + (flow-hst + "(car (cdr (cdr (flow/start (lambda (x) (await-human x)) (quote approve?)))))") + (list "flow-request" "human" "approve?")) +(flow-hst-test + "await-render: is a request of kind render" + (flow-hst + "(car (cdr (cdr (flow/start (lambda (x) (await-render x)) (quote recipe)))))") + (list "flow-request" "render" "recipe")) +(flow-hst-test + "request: the host's resume value flows back into the flow" + (flow-hst + "(defflow f (sequence (lambda (x) (await-render x)) (lambda (art) (list (quote got) art)))) (define id (car (cdr (flow/start f 1)))) (flow/resume id (quote the-artifact))") + (list "got" "the-artifact")) + +;; ── host work queue ───────────────────────────────────────────── +(flow-hst-test + "flow-host-requests: lists (id kind payload) for pending requests" + (flow-hst + "(flow/start (lambda (x) (await-render x)) 99) (flow-host-requests)") + (list (list 1 "render" 99))) +(flow-hst-test + "flow-host-requests: excludes bare (non-request) suspends" + (flow-hst + "(defflow a (lambda (x) (await-render x))) (defflow b (lambda (x) (suspend (quote plain)))) (flow/start a 1) (flow/start b 2) (flow-host-requests)") + (list (list 1 "render" 1))) + +;; ── the art-dag-shaped host driver loop (manual resumes) ──────── +(flow-hst-test + "host driver: render then human-review then publish" + (flow-hst + "(defflow pipeline (sequence (lambda (recipe) (await-render recipe)) (lambda (art) (await-human (list (quote review) art))) (branch (lambda (d) (eq? d (quote approve))) (flow-const (quote published)) (flow-const (fail (quote rejected)))))) (define id (car (cdr (flow/start pipeline 99)))) (define r1 (flow-host-requests)) (flow/resume id (list (quote art) 99)) (define r2 (flow-host-requests)) (flow/resume id (quote approve)) (list r1 r2 (flow/status id) (flow/result id))") + (list + (list (list 1 "render" 99)) + (list (list 1 "human" (list "review" (list "art" 99)))) + "done" + "published")) +(flow-hst-test + "host driver: rejection at the human gate yields a failure" + (flow-hst + "(defflow pipeline (sequence (lambda (recipe) (await-render recipe)) (lambda (art) (await-human (list (quote review) art))) (branch (lambda (d) (eq? d (quote approve))) (flow-const (quote published)) (flow-const (fail (quote rejected)))))) (define id (car (cdr (flow/start pipeline 1)))) (flow/resume id (quote artifact)) (failed? (flow/resume id (quote reject)))") + true) + +;; ── reference driver: host supplies only a dispatch fn ────────── +(flow-hst-test + "flow-drive-host: one tick services every pending request" + (flow-hst + "(flow/start (lambda (x) (await-render x)) 5) (define n (flow-drive-host (lambda (k p) (list (quote done) p)))) (list n (flow/status 1) (flow/result 1))") + (list 1 "done" (list "done" 5))) +(flow-hst-test + "flow-run-host: drives a render -> human pipeline to completion" + (flow-hst + "(defflow pipeline (sequence (lambda (recipe) (await-render recipe)) (lambda (art) (await-human (list (quote review) art))) (branch (lambda (d) (eq? d (quote approve))) (flow-const (quote published)) (flow-const (fail (quote rejected)))))) (define id (car (cdr (flow/start pipeline 99)))) (define serviced (flow-run-host (lambda (kind payload) (if (eq? kind (quote render)) (list (quote art) payload) (quote approve))) 10)) (list serviced (flow/status id) (flow/result id))") + (list 2 "done" "published")) +(flow-hst-test + "flow-run-host: returns 0 when nothing is pending" + (flow-hst "(flow-run-host (lambda (k p) p) 5)") + 0) +(flow-hst-test + "flow-run-host: respects the maxticks bound" + (flow-hst + "(defflow pipe2 (sequence (lambda (r) (await-render r)) (lambda (a) (await-human a)) (lambda (d) d))) (define id (car (cdr (flow/start pipe2 1)))) (define serviced (flow-run-host (lambda (k p) p) 1)) (list serviced (flow/status id))") + (list 1 "suspended")) + +(define flow-hst-tests-run! (fn () {:total (+ flow-hst-pass flow-hst-fail) :passed flow-hst-pass :failed flow-hst-fail :fails flow-hst-fails})) diff --git a/lib/flow/tests/hygiene.sx b/lib/flow/tests/hygiene.sx new file mode 100644 index 00000000..a53122f8 --- /dev/null +++ b/lib/flow/tests/hygiene.sx @@ -0,0 +1,67 @@ +;; lib/flow/tests/hygiene.sx — Phase 5: store hygiene (flow/gc, flow/forget). + +(define flow-hyg-pass 0) +(define flow-hyg-fail 0) +(define flow-hyg-fails (list)) + +(define + flow-hyg-test + (fn + (name actual expected) + (if + (= actual expected) + (set! flow-hyg-pass (+ flow-hyg-pass 1)) + (begin + (set! flow-hyg-fail (+ flow-hyg-fail 1)) + (append! flow-hyg-fails {:name name :expected expected :actual actual}))))) + +(define flow-h (fn (src) (flow-run src))) + +;; ── flow/gc ───────────────────────────────────────────────────── +(flow-hyg-test + "gc: empty store removes nothing" + (flow-h "(flow/gc)") + 0) +(flow-hyg-test + "gc: removes a done flow, keeps a suspended one" + (flow-h + "(defflow w (lambda (x) (suspend (quote q)))) (flow/start w 0) (flow/start (lambda (x) x) 5) (define removed (flow/gc)) (list removed (flow/list))") + (list 1 (list (list 1 "suspended")))) +(flow-hyg-test + "gc: removes a cancelled flow" + (flow-h + "(defflow w (lambda (x) (suspend (quote q)))) (define id (car (cdr (flow/start w 0)))) (flow/cancel id) (flow/gc)") + 1) +(flow-hyg-test + "gc: a kept suspended flow is still resumable" + (flow-h + "(defflow w (sequence (lambda (x) (suspend (quote q))) (lambda (v) (* v 2)))) (define id (car (cdr (flow/start w 0)))) (flow/start (lambda (x) x) 1) (flow/gc) (flow/resume id 21)") + 42) +(flow-hyg-test + "gc: counts every terminal flow it drops" + (flow-h + "(flow/start (lambda (x) x) 1) (flow/start (lambda (x) x) 2) (defflow w (lambda (x) (suspend (quote q)))) (flow/start w 0) (flow/gc)") + 2) + +;; ── flow/forget ───────────────────────────────────────────────── +(flow-hyg-test + "forget: drops a completed flow" + (flow-h + "(defflow w (sequence (lambda (x) (suspend (quote q))) (lambda (v) v))) (define id (car (cdr (flow/start w 0)))) (flow/resume id 7) (list (flow/forget id) (flow/status id))") + (list true "unknown")) +(flow-hyg-test + "forget: refuses to drop a live (suspended) flow" + (flow-h + "(defflow w (lambda (x) (suspend (quote q)))) (define id (car (cdr (flow/start w 0)))) (list (flow/forget id) (flow/status id))") + (list false "suspended")) +(flow-hyg-test + "forget: drops a cancelled flow" + (flow-h + "(defflow w (lambda (x) (suspend (quote q)))) (define id (car (cdr (flow/start w 0)))) (flow/cancel id) (list (flow/forget id) (flow/status id))") + (list true "unknown")) +(flow-hyg-test + "forget: unknown id yields false" + (flow-h "(flow/forget 999)") + false) + +(define flow-hyg-tests-run! (fn () {:total (+ flow-hyg-pass flow-hyg-fail) :passed flow-hyg-pass :failed flow-hyg-fail :fails flow-hyg-fails})) diff --git a/lib/flow/tests/integration.sx b/lib/flow/tests/integration.sx new file mode 100644 index 00000000..46352682 --- /dev/null +++ b/lib/flow/tests/integration.sx @@ -0,0 +1,115 @@ +;; lib/flow/tests/integration.sx — Phase 7: end-to-end flows composing every phase. + +(define flow-int-pass 0) +(define flow-int-fail 0) +(define flow-int-fails (list)) + +(define + flow-int-test + (fn + (name actual expected) + (if + (= actual expected) + (set! flow-int-pass (+ flow-int-pass 1)) + (begin + (set! flow-int-fail (+ flow-int-fail 1)) + (append! flow-int-fails {:name name :expected expected :actual actual}))))) + +(define flow-i (fn (src) (flow-run src))) + +;; The order-processing flow, defined once per program via this prelude string: +;; validate amount (attempt: fail if <= 0) +;; -> suspend for payment confirmation (resume value = confirmed amount) +;; -> branch: confirmed>0 ? record on the ledger peer : declined failure +(define + order-prelude + "(flow-peer-register! (quote ledger) (list (list (quote record) (lambda (amt) (list (quote recorded) amt)))))\n (defflow order\n (attempt\n (lambda (amt) (if (> amt 0) amt (fail (quote invalid-amount))))\n (lambda (amt) (suspend (quote await-payment)))\n (branch (lambda (amt) (> amt 0))\n (remote-node (quote ledger) (quote record))\n (flow-const (fail (quote declined))))))") + +;; ── happy path through every phase ────────────────────────────── +(flow-int-test + "order: validate -> suspend -> resume -> branch -> federate" + (flow-i + (str + order-prelude + "(define id (car (cdr (flow/start order 100)))) (flow/resume id 250)")) + (list "recorded" 250)) +(flow-int-test + "order: starting suspends awaiting payment" + (flow-i + (str + order-prelude + "(define s (flow/start order 100)) (list (car s) (car (cdr (cdr s))))")) + (list "flow-suspended" "await-payment")) +(flow-int-test + "order: invalid amount fails up front and never suspends" + (flow-i + (str + order-prelude + "(define r (flow/start order -5)) (list (failed? r) (fail-reason r))")) + (list true "invalid-amount")) +(flow-int-test + "order: a declined payment yields a failure value" + (flow-i + (str + order-prelude + "(define id (car (cdr (flow/start order 100)))) (failed? (flow/resume id 0))")) + true) + +;; ── crash recovery mid-flow ───────────────────────────────────── +(flow-int-test + "order: survives a simulated crash between suspend and resume" + (flow-i + (str + order-prelude + "(define id (car (cdr (flow/start order 100)))) (define saved (flow-store-export)) (set! flow-store (list)) (flow-store-import! saved) (flow/resume id 250)")) + (list "recorded" 250)) + +;; ── handoff to a peer mid-flow ────────────────────────────────── +(flow-int-test + "order: hands off to a peer that resumes and completes" + (flow-i + (str + order-prelude + "(define id (car (cdr (flow/start order 100)))) (flow-replicate-to (quote nodeB)) (set! flow-store (list)) (flow-restore-from (quote nodeB)) (flow/resume id 250)")) + (list "recorded" 250)) + +;; ── introspection during the flow's life ──────────────────────── +(flow-int-test + "order: pending shows what the flow awaits, then result after resume" + (flow-i + (str + order-prelude + "(define id (car (cdr (flow/start order 100)))) (define p (flow/pending)) (flow/resume id 250) (list p (flow/status id) (flow/result id))")) + (list + (list (list 1 "await-payment")) + "done" + (list "recorded" 250))) + +;; ── onboarding: two human steps + cancellation ────────────────── +(define + onboard-prelude + "(defflow onboard\n (sequence\n (lambda (user) (+ user 1))\n (lambda (x) (suspend (quote confirm-email)))\n (lambda (x) (suspend (quote complete-profile)))\n (lambda (x) (list (quote onboarded) x))))") + +(flow-int-test + "onboard: two suspends resume in order to completion" + (flow-i + (str + onboard-prelude + "(define id (car (cdr (flow/start onboard 0)))) (flow/resume id 7) (flow/resume id 9)")) + (list "onboarded" 9)) +(flow-int-test + "onboard: the second pending tag appears after the first resume" + (flow-i + (str + onboard-prelude + "(define id (car (cdr (flow/start onboard 0)))) (flow/resume id 7) (car (cdr (car (flow/pending))))")) + "complete-profile") +(flow-int-test + "onboard: cancelling abandons the flow" + (flow-i + (str + onboard-prelude + "(define id (car (cdr (flow/start onboard 0)))) (flow/cancel id) (list (flow/status id) (car (flow/resume id 7)))")) + (list "cancelled" "flow-error")) + +(define flow-int-tests-run! (fn () {:total (+ flow-int-pass flow-int-fail) :passed flow-int-pass :failed flow-int-fail :fails flow-int-fails})) diff --git a/lib/flow/tests/railway.sx b/lib/flow/tests/railway.sx new file mode 100644 index 00000000..8b90e918 --- /dev/null +++ b/lib/flow/tests/railway.sx @@ -0,0 +1,73 @@ +;; lib/flow/tests/railway.sx — Phase 6: railway-oriented composition (attempt). + +(define flow-rail-pass 0) +(define flow-rail-fail 0) +(define flow-rail-fails (list)) + +(define + flow-rail-test + (fn + (name actual expected) + (if + (= actual expected) + (set! flow-rail-pass (+ flow-rail-pass 1)) + (begin + (set! flow-rail-fail (+ flow-rail-fail 1)) + (append! flow-rail-fails {:name name :expected expected :actual actual}))))) + +(define flow-r (fn (src) (flow-run src))) + +;; ── attempt — short-circuit on the first (fail ...) ───────────── +(flow-rail-test + "attempt: threads like sequence when nothing fails" + (flow-r + "(flow/start (attempt (lambda (x) (+ x 1)) (lambda (x) (* x 10))) 4)") + 50) +(flow-rail-test + "attempt: empty is identity" + (flow-r "(flow/start (attempt) 7)") + 7) +(flow-rail-test + "attempt: returns the first failure" + (flow-r + "(failed? (flow/start (attempt (lambda (x) (fail (quote bad))) (lambda (x) (* x 10))) 4))") + true) +(flow-rail-test + "attempt: the failure carries its reason" + (flow-r + "(fail-reason (flow/start (attempt (lambda (x) x) (lambda (x) (fail (quote rejected)))) 4))") + "rejected") +(flow-rail-test + "attempt: nodes after a failure do not run" + (flow-r + "(define ran 0) (flow/start (attempt (lambda (x) (fail (quote stop))) (lambda (x) (begin (set! ran (+ ran 1)) x))) 0) ran") + 0) +(flow-rail-test + "attempt: a failed input short-circuits immediately" + (flow-r + "(define ran 0) (fail-reason (flow/start (attempt (lambda (x) (begin (set! ran (+ ran 1)) x))) (fail (quote pre))))") + "pre") +(flow-rail-test + "attempt: middle failure halts the chain" + (flow-r + "(define ran 0) (flow/start (attempt (lambda (x) (+ x 1)) (lambda (x) (fail (quote mid))) (lambda (x) (begin (set! ran (+ ran 1)) x))) 5) ran") + 0) + +;; ── attempt + recover (rejoin the happy track) ────────────────── +(flow-rail-test + "attempt + recover: recover turns a failure into a value" + (flow-r + "(flow/start (recover (attempt (lambda (x) (if (> x 0) x (fail (quote non-positive)))) (lambda (x) (* x 2))) (flow-const 0)) -5)") + 0) +(flow-rail-test + "attempt + recover: happy path passes recover through" + (flow-r + "(flow/start (recover (attempt (lambda (x) (if (> x 0) x (fail (quote non-positive)))) (lambda (x) (* x 2))) (flow-const 0)) 5)") + 10) +(flow-rail-test + "attempt: validation pipeline reports the failing stage" + (flow-r + "(defflow validate (attempt (lambda (s) (if (>= (string-length s) 3) s (fail (quote too-short)))) (lambda (s) (if (<= (string-length s) 8) s (fail (quote too-long)))) (lambda (s) (list (quote ok) (string-length s))))) (list (fail-reason (flow/start validate \"hi\")) (flow/start validate \"hello\"))") + (list "too-short" (list "ok" 5))) + +(define flow-rail-tests-run! (fn () {:total (+ flow-rail-pass flow-rail-fail) :passed flow-rail-pass :failed flow-rail-fail :fails flow-rail-fails})) diff --git a/lib/flow/tests/recovery.sx b/lib/flow/tests/recovery.sx new file mode 100644 index 00000000..cbe3b2f4 --- /dev/null +++ b/lib/flow/tests/recovery.sx @@ -0,0 +1,71 @@ +;; lib/flow/tests/recovery.sx — Phase 3: crash recovery (store export/import + restart). +;; +;; "restart" is simulated within one program: (set! flow-store (list)) wipes the +;; in-memory store (process death), while flow-registry persists as it would after +;; reloading flow definitions. Recovery = import the exported (plain-data) store and +;; resume; the flow proc is re-resolved by name. + +(define flow-rec-pass 0) +(define flow-rec-fail 0) +(define flow-rec-fails (list)) + +(define + flow-rec-test + (fn + (name actual expected) + (if + (= actual expected) + (set! flow-rec-pass (+ flow-rec-pass 1)) + (begin + (set! flow-rec-fail (+ flow-rec-fail 1)) + (append! flow-rec-fails {:name name :expected expected :actual actual}))))) + +(define flow-r (fn (src) (flow-run src))) + +;; ── export / wipe / import ────────────────────────────────────── +(flow-rec-test + "export nulls the live procedure" + (flow-r + "(defflow w (lambda (x) (suspend (quote await)))) (flow/start w 10) (car (cdr (car (cdr (car (flow-store-export))))))") + false) +(flow-rec-test + "a wiped store loses the flow (process death)" + (flow-r + "(defflow w (lambda (x) (suspend (quote await)))) (define id (car (cdr (flow/start w 10)))) (set! flow-store (list)) (flow/resume id 1)") + (list "flow-error" "no-such-flow")) +(flow-rec-test + "import restores a wiped store and resume completes" + (flow-r + "(defflow w (sequence (lambda (x) (suspend (quote await))) (lambda (c) (list (quote done) c)))) (define id (car (cdr (flow/start w 10)))) (define saved (flow-store-export)) (set! flow-store (list)) (flow-store-import! saved) (flow/resume id 777)") + (list "done" 777)) + +;; ── resumable scan ────────────────────────────────────────────── +(flow-rec-test + "resumable-ids lists the suspended flow after import" + (flow-r + "(defflow w (lambda (x) (suspend (quote await)))) (define id (car (cdr (flow/start w 10)))) (define saved (flow-store-export)) (set! flow-store (list)) (flow-store-import! saved) (flow-resumable-ids)") + (list 1)) +(flow-rec-test + "resumable-ids excludes completed flows" + (flow-r + "(defflow w (sequence (lambda (x) (suspend (quote await))) (lambda (c) c))) (define id (car (cdr (flow/start w 10)))) (flow/resume id 5) (flow-resumable-ids)") + (list)) +(flow-rec-test + "resumable-ids excludes cancelled flows after import" + (flow-r + "(defflow w (lambda (x) (suspend (quote await)))) (define id (car (cdr (flow/start w 10)))) (flow/cancel id) (define saved (flow-store-export)) (set! flow-store (list)) (flow-store-import! saved) (flow-resumable-ids)") + (list)) + +;; ── restart at every step ─────────────────────────────────────── +(flow-rec-test + "two suspends survive a restart between each step" + (flow-r + "(defflow two (sequence (lambda (x) (suspend (quote a))) (lambda (x) (suspend (quote b))) (lambda (x) (list (quote end) x)))) (define id (car (cdr (flow/start two 0)))) (define s1 (flow-store-export)) (set! flow-store (list)) (flow-store-import! s1) (flow/resume id 100) (define s2 (flow-store-export)) (set! flow-store (list)) (flow-store-import! s2) (flow/resume id 200)") + (list "end" 200)) +(flow-rec-test + "import preserves the replay log (earlier value survives restart)" + (flow-r + "(defflow two (sequence (lambda (x) (suspend (quote a))) (lambda (x) (suspend (quote b))) (lambda (x) (list x)))) (define id (car (cdr (flow/start two 0)))) (flow/resume id 11) (define saved (flow-store-export)) (set! flow-store (list)) (flow-store-import! saved) (flow/resume id 22)") + (list 22)) + +(define flow-rec-tests-run! (fn () {:total (+ flow-rec-pass flow-rec-fail) :passed flow-rec-pass :failed flow-rec-fail :fails flow-rec-fails})) diff --git a/lib/flow/tests/suspend.sx b/lib/flow/tests/suspend.sx new file mode 100644 index 00000000..e9dd825a --- /dev/null +++ b/lib/flow/tests/suspend.sx @@ -0,0 +1,114 @@ +;; lib/flow/tests/suspend.sx — Phase 3: suspend / resume / cancel (deterministic replay). + +(define flow-sus-pass 0) +(define flow-sus-fail 0) +(define flow-sus-fails (list)) + +(define + flow-sus-test + (fn + (name actual expected) + (if + (= actual expected) + (set! flow-sus-pass (+ flow-sus-pass 1)) + (begin + (set! flow-sus-fail (+ flow-sus-fail 1)) + (append! flow-sus-fails {:name name :expected expected :actual actual}))))) + +(define flow-s (fn (src) (flow-run src))) + +;; ── flow/start ────────────────────────────────────────────────── +(flow-sus-test + "start: non-suspending flow returns the raw result" + (flow-s "(flow/start (lambda (x) (* x 2)) 5)") + 10) +(flow-sus-test + "start: a suspending flow returns a flow-suspended state" + (flow-s + "(defflow w (sequence (lambda (x) (+ x 1)) (lambda (g) (suspend (quote await))) (lambda (c) c))) (car (flow/start w 10))") + "flow-suspended") +(flow-sus-test + "start: suspended state carries a numeric id" + (flow-s + "(defflow w (lambda (x) (suspend (quote await)))) (car (cdr (flow/start w 10)))") + 1) +(flow-sus-test + "start: suspended state carries the suspend tag" + (flow-s + "(defflow w (lambda (x) (suspend (quote await)))) (car (cdr (cdr (flow/start w 10))))") + "await") + +;; ── flow/resume ───────────────────────────────────────────────── +(flow-sus-test + "resume: injects the value and completes" + (flow-s + "(defflow w (sequence (lambda (x) (+ x 1)) (lambda (g) (suspend (quote await))) (lambda (c) (list (quote done) c)))) (define s (flow/start w 10)) (flow/resume (car (cdr s)) 777)") + (list "done" 777)) +(flow-sus-test + "resume: injected value threads into the next node" + (flow-s + "(defflow w (sequence (lambda (x) (suspend (quote v))) (lambda (n) (* n 3)))) (define s (flow/start w 0)) (flow/resume (car (cdr s)) 14)") + 42) +(flow-sus-test + "resume: replays earlier suspends (recompute is deterministic)" + (flow-s + "(define runs 0) (defflow w (sequence (lambda (x) (begin (set! runs (+ runs 1)) (+ x 1))) (lambda (g) (suspend (quote await))) (lambda (c) c))) (define s (flow/start w 10)) (flow/resume (car (cdr s)) 99) runs") + 2) + +;; ── multi-step suspension ─────────────────────────────────────── +(flow-sus-test + "multi: first resume suspends at the next tag" + (flow-s + "(defflow two (sequence (lambda (x) (suspend (quote a))) (lambda (x) (suspend (quote b))) (lambda (x) (list (quote end) x)))) (define s (flow/start two 0)) (define s2 (flow/resume (car (cdr s)) 100)) (car (cdr (cdr s2)))") + "b") +(flow-sus-test + "multi: second resume completes with the latest value" + (flow-s + "(defflow two (sequence (lambda (x) (suspend (quote a))) (lambda (x) (suspend (quote b))) (lambda (x) (list (quote end) x)))) (define id (car (cdr (flow/start two 0)))) (flow/resume id 100) (flow/resume id 200)") + (list "end" 200)) + +;; ── error / lifecycle guards ──────────────────────────────────── +(flow-sus-test + "resume: completed flow cannot be resumed again" + (flow-s + "(defflow w (lambda (x) (suspend (quote q)))) (define id (car (cdr (flow/start w 0)))) (flow/resume id 1) (flow/resume id 2)") + (list "flow-error" "not-suspended")) +(flow-sus-test + "resume: unknown id errors" + (flow-s "(flow/resume 999 1)") + (list "flow-error" "no-such-flow")) + +;; ── flow/cancel ───────────────────────────────────────────────── +(flow-sus-test + "cancel: returns a flow-cancelled state" + (flow-s + "(defflow w (lambda (x) (suspend (quote q)))) (define id (car (cdr (flow/start w 0)))) (flow/cancel id)") + (list "flow-cancelled" 1)) +(flow-sus-test + "cancel: a cancelled flow cannot be resumed (stale resume rejected)" + (flow-s + "(defflow w (lambda (x) (suspend (quote q)))) (define id (car (cdr (flow/start w 0)))) (flow/cancel id) (flow/resume id 5)") + (list "flow-error" "not-suspended")) +(flow-sus-test + "cancel: unknown id errors" + (flow-s "(flow/cancel 999)") + (list "flow-error" "no-such-flow")) + +;; ── composition ───────────────────────────────────────────────── +(flow-sus-test + "suspend inside a branch arm" + (flow-s + "(defflow gate (branch (lambda (x) (> x 0)) (lambda (x) (suspend (quote approve))) (flow-const (quote rejected)))) (define s (flow/start gate 5)) (flow/resume (car (cdr s)) (quote approved))") + "approved") +(flow-sus-test + "two independent runs get independent ids" + (flow-s + "(defflow w (lambda (x) (suspend (quote q)))) (list (car (cdr (flow/start w 0))) (car (cdr (flow/start w 0))))") + (list 1 2)) +(flow-sus-test + "suspend reason may be a structured value" + (flow-s + "(defflow w (lambda (x) (suspend (list (quote needs) (quote approval))))) (car (cdr (cdr (flow/start w 0))))") + (list "needs" "approval")) + +(define flow-sus-tests-run! (fn () {:total (+ flow-sus-pass flow-sus-fail) :passed flow-sus-pass :failed flow-sus-fail :fails flow-sus-fails})) diff --git a/plans/flow-on-sx.md b/plans/flow-on-sx.md index 46517942..1310998c 100644 --- a/plans/flow-on-sx.md +++ b/plans/flow-on-sx.md @@ -16,7 +16,7 @@ federation extension via fed-sx for remote-node execution. ## Status (rolling) -`bash lib/flow/conformance.sh` → **0/0** (not yet started) +`bash lib/flow/conformance.sh` → **166/166** (Phases 1-8 complete; host ABI + reference driver) ## Ground rules @@ -62,47 +62,167 @@ lib/flow/spec.sx lib/flow/runtime.sx lib/flow/store.sx ## Phase 1 — Declarative DAG + sequential execution -- [ ] `lib/flow/spec.sx` — `defflow` macro, `sequence` combinator -- [ ] node = Scheme thunk; output threads to next node (data flow) -- [ ] `parallel` combinator (sequential semantics for now — TRUE parallelism in Phase 3) -- [ ] runtime executes a flow synchronously, returns final value -- [ ] `lib/flow/api.sx` — `(flow/start name args)` entry point -- [ ] `lib/flow/tests/basic.sx` — 15+ cases: linear sequence, nested sequences, - data flow between nodes, parallel-with-join -- [ ] `lib/flow/scoreboard.{json,md}` -- [ ] `lib/flow/conformance.sh` +- [x] `lib/flow/spec.sx` — `defflow` macro, `sequence` combinator +- [x] node = Scheme procedure of one arg (upstream value threaded in); output + threads to next node (data flow). A node ignoring its arg is a thunk. +- [x] `parallel` combinator (sequential semantics for now — TRUE parallelism in Phase 3) +- [x] runtime executes a flow synchronously, returns final value +- [x] `lib/flow/api.sx` — `(flow/start flow input)` entry point +- [x] `lib/flow/tests/basic.sx` — 18 cases: single nodes, linear/nested sequence, + data flow between nodes, parallel-with-join, publish-shaped flow +- [x] `lib/flow/scoreboard.{json,md}` +- [x] `lib/flow/conformance.sh` ## Phase 2 — Control flow + error handling -- [ ] `cond` combinator — predicate selects branch -- [ ] `retry n [backoff]` — re-runs node up to n times on exception -- [ ] `timeout ms` — bounds node execution -- [ ] `try-catch` — exception handler with reified error -- [ ] error model — exceptions vs explicit `(fail :reason ...)` results -- [ ] `lib/flow/tests/control.sx` — 25+ cases: each combinator + composition +- [x] `cond` combinator — predicate selects branch (named `branch`; `cond` is a + Scheme special form). `(branch pred then else)` — 6 tests. +- [x] `retry n` — re-runs node up to n attempts on a raised exception; last + exception propagates. Only raised exceptions are retried — `(fail ...)` values + pass through. 6 tests. (Backoff deferred: no wall clock in pure SX.) +- [x] `timeout budget` — bounds node execution via a **cooperative step budget** + (deterministic; no scheduler/clock in pure SX). Nodes opt in via `(tick)`; + `budget` ticks allowed, the next raises `flow-timeout`. Non-ticking nodes are + unbounded; budgets nest. 7 tests. +- [x] `try-catch` — exception handler with reified error: `(try-catch node handler)` + runs node; on raise, calls `(handler error)` and returns its value. 6 tests. +- [x] error model — exceptions vs explicit `(fail reason)` results: `fail`/`failed?`/ + `fail-reason` produce/inspect failure values that flow downstream as data + (distinct from raised exceptions caught by retry/try-catch). 6 tests. +- [x] `lib/flow/tests/control.sx` — 31 cases: branch, error model, try-catch, + retry, timeout + compositions ## Phase 3 — Suspend / resume (the showcase) -- [ ] `(suspend reason)` — `call/cc` captures continuation, returns flow-id to caller -- [ ] `lib/flow/store.sx` — serialize flow state (continuation + open vars) -- [ ] `(flow/resume id value)` — load continuation, inject value, re-enter -- [ ] `(flow/cancel id)` — explicit termination -- [ ] crash recovery — on restart, scan store for paused flows, mark resumable -- [ ] `lib/flow/tests/suspend.sx` — pause-resume scenarios, cancellation, "restart" - scenarios (simulated by re-loading store) +- [x] `(suspend tag)` — guest call/cc is ESCAPE-ONLY (re-entry hangs), so resume + uses **deterministic replay**: suspend escapes to the driver as `(flow-suspended + tag)`; resume re-runs the flow, replaying resolved suspends from a `(tag value)` + log. No live continuation is ever serialized — the log is plain data. +- [x] `lib/flow/store.sx` — flow store: id→record `(flow input log status payload)`; + `flow-drive` runs a flow against a replay log. +- [x] `(flow/resume id value)` — append `(tag value)` to the log, re-drive; raw + result on completion, `(flow-suspended id tag)` on a further suspend. +- [x] `(flow/cancel id)` — mark cancelled; a later resume is rejected (stale replay + cannot wake a cancelled flow). +- [x] crash recovery — `flow-store-export` (procs nulled → plain data), + `flow-store-import!`, `flow-resumable-ids`. Records are name-keyed; resume + re-resolves the proc by name (defflow registers names), so a flow survives a + wiped store. `tests/recovery.sx`, 8 cases (export/wipe/import, resumable scan, + restart-at-every-step, replay-log survival). +- [x] `lib/flow/tests/suspend.sx` — 17 cases: start/resume/cancel, multi-step, + replay determinism, lifecycle guards, suspend-in-branch +- Harness: `flow-run` now reuses one env with a per-test reset (building the full + standard env 66× was too slow) — see `api.sx`. ## Phase 4 — Distributed nodes via fed-sx -- [ ] `(remote-node addr fn args)` — execute node on a federation peer -- [ ] failure semantics — retry on different peer, fall through to local -- [ ] persistence across instances — flow state replicates via fed-sx -- [ ] handoff — flow started here can resume on a peer if the local instance is down -- [ ] `lib/flow/tests/distributed.sx` — federated flow scenarios (mock fed-sx in tests) +- [x] `(remote-node addr fn)` — execute a node on a federation peer. Transport is + the fed-sx boundary, MOCKED via a peer registry (`flow-peer-register!`); raises + `flow-remote-unreachable` / `flow-remote-no-fn`. Composes with sequence, suspend, + retry. `tests/distributed.sx`, 7 cases. +- [x] failure semantics — `(remote-failover addrs fn local)` tries each peer in + order, moves to the next on any raised error, and runs the `local` node if every + peer fails. 6 tests. +- [x] persistence across instances — `(flow-replicate-to addr)` copies this + instance's store (the plain-data export) to a peer's replica slot; + `(flow-restore-from addr)` imports it. Same mechanism as crash recovery, across + instances. +- [x] handoff — a flow started here resumes on a peer after the local instance dies: + replicate → wipe local store → restore on peer → `flow/resume`. The replay log + (and thus all resolved suspends) survives the move. +- [x] `lib/flow/tests/distributed.sx` — 19 cases: remote-node, failover, + replication, handoff (including replay-log survival across the move) + +## Phase 5 — Operational API + combinator library + +The four roadmap phases are complete; this phase rounds out the engine into +something operators and authors actually use. Accumulation, not a rewrite. + +- [x] introspection API — `flow/status id`, `flow/result id`, `flow/list`, + `flow/pending` (operator view of what each suspended flow awaits). 12 tests in + `tests/api.sx`. +- [x] store hygiene — `flow/gc` drops terminal (done/cancelled) records keeping + live suspended flows (returns count); `flow/forget id` drops one terminal record + and refuses live flows. Bounds unbounded store growth. 9 tests in `tests/hygiene.sx`. +- [x] `tap` — side-effecting pass-through node (logging/metrics) that returns input +- [x] `recover` — complement to try-catch for the fail-VALUE channel: run node; if it + yields `(fail ...)`, run a recovery node on the reason +- [x] `map-flow` — run a flow per item of a list, join results (sequential) +- [x] `flow-while` / `flow-until` — bounded iteration: re-run body threading the + value while/until pred holds, capped at `max` steps (deterministic bound) +- [x] `lib/flow/tests/api.sx` (12) + `lib/flow/tests/combinators.sx` (17) + +## Phase 6 — Railway-oriented composition + +Make the `(fail reason)` value channel compose into real validation/ETL pipelines. + +- [x] `attempt` — like `sequence`, but short-circuits at the first node that returns + a `(fail ...)` value, returning that failure (the railway track). Pairs with + `recover` for the rejoin. +- [x] `lib/flow/tests/railway.sx` — 10 cases: fail short-circuiting, no-run-after- + failure, recover rejoin, validation pipeline reporting the failing stage + +## Phase 8 — Host integration ABI (art-dag / human-in-the-loop) + +`suspend` is the seam to the outside world, but a bare tag is an ad-hoc convention. +This phase defines a stable request/response contract a host (an art-dag driver, a +review UI) codes against — so flow can orchestrate art-dag with human decision +points later without reverse-engineering tag shapes. `lib/flow/host.sx`. + +- [x] `(request kind payload)` — suspend with a typed `(flow-request kind payload)` + envelope; evaluates to the host's resume value. `await-human`/`await-render`/ + `await-effect` sugar. +- [x] `(flow-host-requests)` — the host work queue: `(id kind payload)` for every + suspended flow waiting on a host request; `request?`/`request-kind`/ + `request-payload` parse a tag. +- [x] `(flow-drive-host dispatch)` / `(flow-run-host dispatch maxticks)` — reference + host driver: the host supplies only a `(kind payload) -> answer` dispatch fn; the + loop drains pending requests and resumes until quiescent (bounded). +- [x] `lib/flow/tests/host.sx` — 15 cases incl. the art-dag-shaped driver loop + (render → human-review → publish) run both manually and via `flow-run-host`. +- Contract (documented in `host.sx` + README): the host owns IO + persistence; a + flow never does IO, it only `request`s; the host performs the effect and feeds the + result back via resume (logged, so not re-run on recovery). NOT done here (host + side, out of `lib/flow` scope): the real Celery/IPFS bridge and a persistent store + backend — those live in the art-dag integration, coding against this ABI. + +## Phase 7 — End-to-end integration + +Prove the phases compose: realistic flows exercising attempt + suspend + branch + +remote-node + crash-recovery + handoff + introspection together. + +- [x] `lib/flow/tests/integration.sx` — 10 cases: an order-processing flow (validate + → payment suspend → branch → ledger federation) and an onboarding flow, run through + the full lifecycle including a simulated crash and a peer handoff mid-flow, plus + introspection (`flow/pending`/`status`/`result`) during the flow's life ## Progress log -(loop fills this in) +- **Phase 1 (combinators + sequential runtime).** Flow built as a Scheme prelude + loaded onto `scheme-standard-env`: a flow is a Scheme procedure `input -> output`, + so the whole flow runs inside the interpreter (sets up Phase 3 call/cc suspend). + Combinators `flow-node`/`flow-id`/`flow-const`/`sequence`/`parallel`/`defflow` in + `spec.sx`; `flow/start` + SX helpers (`flow-make-env`/`flow-run`) in `api.sx`. + 18/18 in `tests/basic.sx`. Substrate constraints found: dotted rest params + `(a . rest)` and named `let` are unsupported in `lib/scheme/eval.sx`, so + combinators use `(lambda args ...)` variadics + top-level recursion. Scheme + strings come back boxed as `{:scm-string "..."}` — unwrap with `(get s :scm-string)`. + +- **Phases 2-4.** Control flow (branch/retry/timeout/try-catch + fail-value error + model), then the showcase: durable suspend/resume. Guest call/cc is escape-only + (re-entry hangs), so resume uses **deterministic replay** — re-run the flow, + replaying resolved suspends from a `(tag value)` log; only plain data persists, so + flows survive a wiped store (crash recovery) and a move to another instance + (replication + handoff). Phase 4 models the fed-sx boundary with a mock peer + registry. Timeout is a cooperative step budget (no wall clock in pure SX). Test + harness reuses one env with a per-test reset for speed. + +- **Phases 5-7 + docs.** Operational API (introspection, hygiene), combinator + library (tap/recover/map-flow/while/until), railway `attempt`, end-to-end + integration suite, and `lib/flow/README.md` (full API reference + replay-semantics + contract). **151/151 across 10 suites.** Conformance sx_server timeout raised to + 540s for the 10-suite run under shared-machine CPU contention. ## Blockers -(loop fills this in) +(none)