Compare commits
20 Commits
loops/mod
...
loops/flow
| Author | SHA1 | Date | |
|---|---|---|---|
| 9cfca1d008 | |||
| 3cbf33d2d2 | |||
| c2d628e9c3 | |||
| aabb950256 | |||
| 2b47b2925c | |||
| d9b9da3843 | |||
| 0a1b89c975 | |||
| 0e6ba55647 | |||
| c1d24eb9b3 | |||
| 16cb727406 | |||
| f8722b3b08 | |||
| e1f802cfff | |||
| 97c7623743 | |||
| e896deffc8 | |||
| e762cc2e32 | |||
| 4674620d7e | |||
| f3da3b975a | |||
| 1731476dc6 | |||
| 65cbdb8387 | |||
| 91ffba9975 |
141
lib/flow/README.md
Normal file
141
lib/flow/README.md
Normal file
@@ -0,0 +1,141 @@
|
|||||||
|
# flow — durable DAG workflows on Scheme
|
||||||
|
|
||||||
|
`flow` is a workflow engine for rose-ash: content pipelines (write → review →
|
||||||
|
publish → federate), scheduled jobs, and multi-step user flows (signup, confirm,
|
||||||
|
onboard) that **survive process restarts**. It is a thin Scheme prelude over the
|
||||||
|
Scheme-on-SX guest (`lib/scheme/`); a flow runs *inside* the interpreter.
|
||||||
|
|
||||||
|
Run the suite: `bash lib/flow/conformance.sh` → **151/151 across 10 suites**.
|
||||||
|
|
||||||
|
## Model
|
||||||
|
|
||||||
|
A **flow** is just a Scheme procedure of one argument — the upstream value:
|
||||||
|
|
||||||
|
```
|
||||||
|
node : input -> output
|
||||||
|
```
|
||||||
|
|
||||||
|
Combinators build composite nodes out of child nodes. A node that ignores its
|
||||||
|
argument is effectively a thunk. There is no separate "graph" object: composition
|
||||||
|
*is* function composition, so flows are values you can name, pass, and nest.
|
||||||
|
|
||||||
|
```scheme
|
||||||
|
(defflow publish
|
||||||
|
(sequence
|
||||||
|
(lambda (draft) (string-append draft "!"))
|
||||||
|
(branch (lambda (post) (>= (string-length post) 3))
|
||||||
|
(remote-node 'fed 'publish)
|
||||||
|
(flow-const 'rejected))))
|
||||||
|
|
||||||
|
(flow/start publish "hello") ; => federated, or a (flow-suspended id tag) state
|
||||||
|
```
|
||||||
|
|
||||||
|
## Building blocks (`spec.sx`)
|
||||||
|
|
||||||
|
| Combinator | Meaning |
|
||||||
|
|---|---|
|
||||||
|
| `(flow-node f)` / `(flow-id x)` / `(flow-const v)` | leaf nodes |
|
||||||
|
| `(sequence n ...)` | thread input left-to-right |
|
||||||
|
| `(parallel n ...)` | fan input to every child, join results into a list (sequential eval) |
|
||||||
|
| `(map-flow node)` | run `node` over each item of a list input, join results |
|
||||||
|
| `(flow-while pred body max)` / `(flow-until ...)` | bounded iteration (cap `max` steps) |
|
||||||
|
| `(defflow name body)` | bind + register a named flow (so it survives restart) |
|
||||||
|
|
||||||
|
## Control flow + errors (`spec.sx`)
|
||||||
|
|
||||||
|
| Combinator | Meaning |
|
||||||
|
|---|---|
|
||||||
|
| `(branch pred then else)` | `pred` on input selects `then`/`else` (`cond` is a Scheme special form) |
|
||||||
|
| `(retry n node)` | re-run on a *raised exception*, up to `n` attempts |
|
||||||
|
| `(timeout budget node)` | cooperative **step budget**: nodes call `(tick)`; the `(budget+1)`-th tick raises `flow-timeout` |
|
||||||
|
| `(try-catch node handler)` | catch a raised exception → `(handler error)` |
|
||||||
|
| `(fail reason)` / `(failed? x)` / `(fail-reason x)` | explicit failure *values* (flow downstream as data) |
|
||||||
|
| `(recover node handler)` | the fail-VALUE counterpart of try-catch |
|
||||||
|
| `(attempt n ...)` | railway sequence: stop at the first node returning a `(fail ...)` |
|
||||||
|
| `(tap effect)` | run a side effect, return input unchanged |
|
||||||
|
|
||||||
|
**Two error channels, on purpose.** Raised exceptions are for *bugs/transients*
|
||||||
|
(caught by `retry`/`try-catch`). `(fail reason)` values are for *expected business
|
||||||
|
outcomes* (validation rejected, declined) and compose via `attempt`/`recover`.
|
||||||
|
|
||||||
|
## Suspend / resume — the durable core (`spec.sx`, `store.sx`)
|
||||||
|
|
||||||
|
The guest Scheme's `call/cc` is **escape-only** — re-invoking a captured
|
||||||
|
continuation after it returns *hangs* the runtime. So flow does **not** serialize
|
||||||
|
continuations. Instead it uses **deterministic replay**:
|
||||||
|
|
||||||
|
- `(suspend tag)` — if `tag` is already in the replay log, return its logged value;
|
||||||
|
otherwise escape to the driver as `(flow-suspended tag)`.
|
||||||
|
- `resume` appends `(tag value)` to the log and **re-runs the flow from the start**.
|
||||||
|
Already-resolved suspends replay their values; the first unresolved one escapes
|
||||||
|
again (or the flow completes).
|
||||||
|
|
||||||
|
The entire persisted state is the replay log — plain data. No live continuation is
|
||||||
|
ever stored, so flows survive process restarts and even moves between instances.
|
||||||
|
|
||||||
|
> **Author contract:** suspend `tag`s must be unique and deterministic across
|
||||||
|
> replays, and **all** non-determinism / side effects must go through suspend
|
||||||
|
> points (so their results are logged) — otherwise they re-run on every replay.
|
||||||
|
|
||||||
|
### Lifecycle (`store.sx`)
|
||||||
|
|
||||||
|
```scheme
|
||||||
|
(flow/start flow input) ; raw result if it completes, else (flow-suspended id tag)
|
||||||
|
(flow/resume id value) ; inject value at the waiting tag, continue
|
||||||
|
(flow/cancel id) ; terminate; a later resume is rejected
|
||||||
|
```
|
||||||
|
|
||||||
|
### Introspection & hygiene
|
||||||
|
|
||||||
|
```scheme
|
||||||
|
(flow/status id) ; done | suspended | cancelled | unknown
|
||||||
|
(flow/result id) ; result if done, else (flow-error reason)
|
||||||
|
(flow/list) ; ((id status) ...)
|
||||||
|
(flow/pending) ; ((id waiting-tag) ...) — what each suspended flow awaits
|
||||||
|
(flow/gc) ; drop terminal records, keep live ones; returns count removed
|
||||||
|
(flow/forget id) ; drop one terminal record (refuses live flows)
|
||||||
|
```
|
||||||
|
|
||||||
|
### Crash recovery
|
||||||
|
|
||||||
|
```scheme
|
||||||
|
(flow-store-export) ; the store as plain data (live procs nulled)
|
||||||
|
(flow-store-import! d) ; restore the store from exported data
|
||||||
|
(flow-resumable-ids) ; ids of suspended flows to wake on restart
|
||||||
|
```
|
||||||
|
|
||||||
|
On restart the flow definitions are reloaded (`defflow` re-registers names) and the
|
||||||
|
exported store reimported; `resume` re-resolves each flow's procedure **by name**.
|
||||||
|
|
||||||
|
## Distribution via fed-sx (`remote.sx`)
|
||||||
|
|
||||||
|
```scheme
|
||||||
|
(flow-peer-register! addr table) ; mock a peer's exposed functions (fed-sx boundary)
|
||||||
|
(remote-node addr fn) ; run a node on a peer
|
||||||
|
(remote-failover addrs fn local) ; try peers in order, fall through to a local node
|
||||||
|
(flow-replicate-to addr) ; copy this store to a peer's replica slot
|
||||||
|
(flow-restore-from addr) ; import a peer's replica (handoff)
|
||||||
|
```
|
||||||
|
|
||||||
|
**Handoff** is crash recovery across instances: replicate → local instance dies →
|
||||||
|
peer restores the (plain-data) store and resumes. The replay log carries over, so
|
||||||
|
all resolved suspends survive the move.
|
||||||
|
|
||||||
|
## Files
|
||||||
|
|
||||||
|
| File | Contents |
|
||||||
|
|---|---|
|
||||||
|
| `spec.sx` | combinators (flow-combinators-src / flow-control-src / flow-suspend-src) |
|
||||||
|
| `store.sx` | durable store, lifecycle, crash recovery, introspection, hygiene |
|
||||||
|
| `remote.sx` | fed-sx transport (mock peer registry), failover, replication |
|
||||||
|
| `api.sx` | `flow-make-env` / `flow-run` SX helpers (one cached env, per-test reset) |
|
||||||
|
| `tests/*.sx` | 10 suites, 151 cases |
|
||||||
|
| `conformance.sh` | loads substrate + flow layer, runs every suite |
|
||||||
|
|
||||||
|
## Notes on the substrate
|
||||||
|
|
||||||
|
The guest Scheme (`lib/scheme/`, imported read-only) lacks dotted-rest params
|
||||||
|
`(a . rest)` and named `let`; combinators use `(lambda args ...)` variadics + top-
|
||||||
|
level recursion. `cons` is list-only (no dotted pairs), so log/assoc entries are
|
||||||
|
2-element lists. Strings box as `{:scm-string "..."}`. Timeout is a step budget
|
||||||
|
because there is no wall clock; `parallel` is sequential for the same reason.
|
||||||
65
lib/flow/api.sx
Normal file
65
lib/flow/api.sx
Normal file
@@ -0,0 +1,65 @@
|
|||||||
|
;; lib/flow/api.sx — flow runtime entry points.
|
||||||
|
;;
|
||||||
|
;; Builds a Scheme env preloaded with the flow combinators (lib/flow/spec.sx),
|
||||||
|
;; the durable store + lifecycle (lib/flow/store.sx), the fed-sx remote layer
|
||||||
|
;; (lib/flow/remote.sx), and the host integration ABI (lib/flow/host.sx), and
|
||||||
|
;; provides SX helpers to run flow programs.
|
||||||
|
;;
|
||||||
|
;; Scheme-level API (available inside flow programs):
|
||||||
|
;; (flow/start flow input) — run a flow; raw result if it completes, else
|
||||||
|
;; (flow-suspended id tag). Defined in store.sx.
|
||||||
|
;; (flow/resume id value) — resume a suspended flow (store.sx)
|
||||||
|
;; (flow/cancel id) — cancel a flow (store.sx)
|
||||||
|
;; (suspend tag) — suspension point (spec.sx)
|
||||||
|
;; (request kind payload) — host request envelope over suspend (host.sx)
|
||||||
|
;; (remote-node addr fn) — node executed on a federation peer (remote.sx)
|
||||||
|
;;
|
||||||
|
;; SX-level helpers (for hosts and tests):
|
||||||
|
;; (flow-make-env) — fresh standard env + combinators + store + remote + host
|
||||||
|
;; (flow-run src) — eval a Scheme program string in a reset shared env
|
||||||
|
;; (flow-run-in env src) — eval a Scheme program string in a given env
|
||||||
|
;;
|
||||||
|
;; flow-run reuses ONE env (building the full standard env is expensive) and
|
||||||
|
;; resets the mutable flow globals before each program, so tests stay isolated
|
||||||
|
;; without paying for a fresh standard env each time. flow-registry persists (it
|
||||||
|
;; models reloaded flow definitions surviving a restart).
|
||||||
|
|
||||||
|
(define
|
||||||
|
flow-make-env
|
||||||
|
(fn
|
||||||
|
()
|
||||||
|
(let
|
||||||
|
((env (scheme-standard-env)))
|
||||||
|
(flow-load-combinators! env)
|
||||||
|
(flow-load-store! env)
|
||||||
|
(flow-load-remote! env)
|
||||||
|
(flow-load-host! env)
|
||||||
|
env)))
|
||||||
|
|
||||||
|
(define
|
||||||
|
flow-run-in
|
||||||
|
(fn (env src) (scheme-eval-program (scheme-parse-all src) env)))
|
||||||
|
|
||||||
|
(define
|
||||||
|
flow-reset-src
|
||||||
|
"(set! flow-store (list)) (set! flow-next-id 0) (set! flow-replay-log (list)) (set! flow-suspend-k #f) (set! flow-timeout-budget -1) (set! flow-peers (list)) (set! flow-replicas (list))")
|
||||||
|
|
||||||
|
(define flow-env-cache false)
|
||||||
|
|
||||||
|
(define
|
||||||
|
flow-shared-env
|
||||||
|
(fn
|
||||||
|
()
|
||||||
|
(begin
|
||||||
|
(if flow-env-cache nil (set! flow-env-cache (flow-make-env)))
|
||||||
|
flow-env-cache)))
|
||||||
|
|
||||||
|
(define
|
||||||
|
flow-run
|
||||||
|
(fn
|
||||||
|
(src)
|
||||||
|
(let
|
||||||
|
((env (flow-shared-env)))
|
||||||
|
(begin
|
||||||
|
(scheme-eval-program (scheme-parse-all flow-reset-src) env)
|
||||||
|
(scheme-eval-program (scheme-parse-all src) env)))))
|
||||||
103
lib/flow/conformance.sh
Executable file
103
lib/flow/conformance.sh
Executable file
@@ -0,0 +1,103 @@
|
|||||||
|
#!/usr/bin/env bash
|
||||||
|
# flow-on-sx conformance runner — runs all flow test suites in one sx_server process.
|
||||||
|
#
|
||||||
|
# Usage:
|
||||||
|
# bash lib/flow/conformance.sh # run all suites
|
||||||
|
# bash lib/flow/conformance.sh -v # verbose (list each suite)
|
||||||
|
|
||||||
|
set -uo pipefail
|
||||||
|
cd "$(git rev-parse --show-toplevel)"
|
||||||
|
|
||||||
|
SX_SERVER="${SX_SERVER:-hosts/ocaml/_build/default/bin/sx_server.exe}"
|
||||||
|
if [ ! -x "$SX_SERVER" ]; then
|
||||||
|
SX_SERVER="/root/rose-ash/hosts/ocaml/_build/default/bin/sx_server.exe"
|
||||||
|
fi
|
||||||
|
if [ ! -x "$SX_SERVER" ]; then
|
||||||
|
echo "ERROR: sx_server.exe not found." >&2
|
||||||
|
exit 1
|
||||||
|
fi
|
||||||
|
|
||||||
|
VERBOSE="${1:-}"
|
||||||
|
|
||||||
|
# Suites: NAME RUNNER-FN PATH
|
||||||
|
SUITES=(
|
||||||
|
"basic flow-basic-tests-run! lib/flow/tests/basic.sx"
|
||||||
|
"control flow-ctl-tests-run! lib/flow/tests/control.sx"
|
||||||
|
"suspend flow-sus-tests-run! lib/flow/tests/suspend.sx"
|
||||||
|
"recovery flow-rec-tests-run! lib/flow/tests/recovery.sx"
|
||||||
|
"distributed flow-dist-tests-run! lib/flow/tests/distributed.sx"
|
||||||
|
"api flow-api-tests-run! lib/flow/tests/api.sx"
|
||||||
|
"combinators flow-cmb-tests-run! lib/flow/tests/combinators.sx"
|
||||||
|
"railway flow-rail-tests-run! lib/flow/tests/railway.sx"
|
||||||
|
"integration flow-int-tests-run! lib/flow/tests/integration.sx"
|
||||||
|
"hygiene flow-hyg-tests-run! lib/flow/tests/hygiene.sx"
|
||||||
|
"host flow-hst-tests-run! lib/flow/tests/host.sx"
|
||||||
|
)
|
||||||
|
|
||||||
|
TMPFILE=$(mktemp); trap "rm -f $TMPFILE" EXIT
|
||||||
|
EPOCH=1
|
||||||
|
|
||||||
|
emit_load () { echo "(epoch $EPOCH)"; echo "(load \"$1\")"; EPOCH=$((EPOCH+1)); }
|
||||||
|
emit_eval () { echo "(epoch $EPOCH)"; echo "(eval \"$1\")"; EPOCH=$((EPOCH+1)); }
|
||||||
|
|
||||||
|
{
|
||||||
|
emit_load "lib/guest/lex.sx"
|
||||||
|
emit_load "lib/guest/reflective/env.sx"
|
||||||
|
emit_load "lib/guest/reflective/quoting.sx"
|
||||||
|
emit_load "lib/scheme/parser.sx"
|
||||||
|
emit_load "lib/scheme/eval.sx"
|
||||||
|
emit_load "lib/scheme/runtime.sx"
|
||||||
|
emit_load "lib/flow/spec.sx"
|
||||||
|
emit_load "lib/flow/store.sx"
|
||||||
|
emit_load "lib/flow/remote.sx"
|
||||||
|
emit_load "lib/flow/host.sx"
|
||||||
|
emit_load "lib/flow/api.sx"
|
||||||
|
for SUITE in "${SUITES[@]}"; do
|
||||||
|
read -r _NAME _RUNNER FILE <<< "$SUITE"
|
||||||
|
emit_load "$FILE"
|
||||||
|
emit_eval "($_RUNNER)"
|
||||||
|
done
|
||||||
|
} > "$TMPFILE"
|
||||||
|
|
||||||
|
OUTPUT=$(timeout 540 "$SX_SERVER" < "$TMPFILE" 2>&1 || true)
|
||||||
|
|
||||||
|
TOTAL_PASS=0
|
||||||
|
TOTAL_FAIL=0
|
||||||
|
FAILED_SUITES=()
|
||||||
|
|
||||||
|
LAST_DICT_LINES=$(echo "$OUTPUT" | grep -E '^\{:' || true)
|
||||||
|
|
||||||
|
I=0
|
||||||
|
while read -r LINE; do
|
||||||
|
[ -z "$LINE" ] && continue
|
||||||
|
P=$(echo "$LINE" | grep -oE ':passed [0-9]+' | awk '{print $2}')
|
||||||
|
F=$(echo "$LINE" | grep -oE ':failed [0-9]+' | awk '{print $2}')
|
||||||
|
[ -z "$P" ] && P=0
|
||||||
|
[ -z "$F" ] && F=0
|
||||||
|
SUITE_INFO="${SUITES[$I]}"
|
||||||
|
SUITE_NAME=$(echo "$SUITE_INFO" | awk '{print $1}')
|
||||||
|
TOTAL_PASS=$((TOTAL_PASS + P))
|
||||||
|
TOTAL_FAIL=$((TOTAL_FAIL + F))
|
||||||
|
if [ "$F" -gt 0 ]; then
|
||||||
|
FAILED_SUITES+=("$SUITE_NAME: $P/$((P+F))")
|
||||||
|
printf 'X %-12s %d/%d\n' "$SUITE_NAME" "$P" "$((P+F))"
|
||||||
|
echo "$LINE" | grep -oE ':name "[^"]*"' | sed 's/:name / fail: /'
|
||||||
|
elif [ "$VERBOSE" = "-v" ]; then
|
||||||
|
printf 'ok %-12s %d passed\n' "$SUITE_NAME" "$P"
|
||||||
|
fi
|
||||||
|
I=$((I+1))
|
||||||
|
done <<< "$LAST_DICT_LINES"
|
||||||
|
|
||||||
|
TOTAL=$((TOTAL_PASS + TOTAL_FAIL))
|
||||||
|
if [ "$TOTAL" -eq 0 ]; then
|
||||||
|
echo "ERROR: no suite results parsed. Raw output:" >&2
|
||||||
|
echo "$OUTPUT" >&2
|
||||||
|
exit 1
|
||||||
|
fi
|
||||||
|
if [ $TOTAL_FAIL -eq 0 ]; then
|
||||||
|
echo "ok $TOTAL_PASS/$TOTAL flow-on-sx tests passed (${#SUITES[@]} suites)"
|
||||||
|
else
|
||||||
|
echo "FAIL $TOTAL_PASS/$TOTAL passed, $TOTAL_FAIL failed:"
|
||||||
|
for S in "${FAILED_SUITES[@]}"; do echo " $S"; done
|
||||||
|
exit 1
|
||||||
|
fi
|
||||||
42
lib/flow/host.sx
Normal file
42
lib/flow/host.sx
Normal file
@@ -0,0 +1,42 @@
|
|||||||
|
;; lib/flow/host.sx — the host integration ABI (Phase 8).
|
||||||
|
;;
|
||||||
|
;; `suspend` is flow's seam to the outside world, but a bare (suspend tag) is just a
|
||||||
|
;; signal — every author would invent their own tag shape. This layer defines a
|
||||||
|
;; stable request/response contract so a host (e.g. an art-dag driver, or a human
|
||||||
|
;; review UI) can hook in WITHOUT reverse-engineering ad-hoc tags.
|
||||||
|
;;
|
||||||
|
;; A flow asks the host to do something and waits for the answer:
|
||||||
|
;; (request kind payload) — suspend with a typed envelope (flow-request kind
|
||||||
|
;; payload); evaluates to the host's resume value.
|
||||||
|
;; (await-human prompt) — request kind=human (a decision point)
|
||||||
|
;; (await-render recipe) — request kind=render (e.g. an art-dag job)
|
||||||
|
;; (await-effect kind p) — request of an arbitrary kind
|
||||||
|
;;
|
||||||
|
;; The host drives flows by polling its work queue and resuming:
|
||||||
|
;; (flow-host-requests) — ((id kind payload) ...) for every SUSPENDED flow whose
|
||||||
|
;; waiting tag is a host request. The host dispatches by kind (render -> submit a
|
||||||
|
;; Celery job; human -> show UI), then calls (flow/resume id answer).
|
||||||
|
;; (request? tag) / (request-kind tag) / (request-payload tag) — parse one tag.
|
||||||
|
;;
|
||||||
|
;; Reference driver — the host only supplies `dispatch`, a (kind payload) -> answer:
|
||||||
|
;; (flow-drive-host dispatch) — one tick: service every CURRENTLY pending
|
||||||
|
;; request (snapshot), resuming each with (dispatch kind payload); returns the
|
||||||
|
;; count serviced. Resumes may create new requests — serviced on the next tick.
|
||||||
|
;; (flow-run-host dispatch maxticks) — tick until quiescent (no pending requests)
|
||||||
|
;; or maxticks reached; returns total requests serviced. Bounded for determinism.
|
||||||
|
;;
|
||||||
|
;; Contract: the host owns IO and persistence. flow stays deterministic — a flow
|
||||||
|
;; never performs IO itself, it only `request`s; the host performs the effect and
|
||||||
|
;; feeds the result back via resume (which the replay log records, so the effect is
|
||||||
|
;; not re-run on recovery). Persist with flow-store-export after each transition and
|
||||||
|
;; flow-store-import! on boot.
|
||||||
|
|
||||||
|
(define
|
||||||
|
flow-host-src
|
||||||
|
"(define (request kind payload) (suspend (list (quote flow-request) kind payload)))\n (define (request? tag) (and (pair? tag) (eq? (car tag) (quote flow-request))))\n (define (request-kind tag) (car (cdr tag)))\n (define (request-payload tag) (car (cdr (cdr tag))))\n (define (await-human prompt) (request (quote human) prompt))\n (define (await-render recipe) (request (quote render) recipe))\n (define (await-effect kind payload) (request kind payload))\n (define (flow-host-req-step pend)\n (if (null? pend)\n (list)\n (let ((id (car (car pend))) (tag (car (cdr (car pend)))))\n (if (request? tag)\n (cons (list id (request-kind tag) (request-payload tag))\n (flow-host-req-step (cdr pend)))\n (flow-host-req-step (cdr pend))))))\n (define (flow-host-requests) (flow-host-req-step (flow/pending)))\n (define (flow-drive-host-step reqs dispatch)\n (if (null? reqs)\n 0\n (begin\n (flow/resume (car (car reqs)) (dispatch (car (cdr (car reqs))) (car (cdr (cdr (car reqs))))))\n (+ 1 (flow-drive-host-step (cdr reqs) dispatch)))))\n (define (flow-drive-host dispatch) (flow-drive-host-step (flow-host-requests) dispatch))\n (define (flow-run-host dispatch maxticks)\n (if (<= maxticks 0)\n 0\n (let ((n (flow-drive-host dispatch)))\n (if (= n 0) 0 (+ n (flow-run-host dispatch (- maxticks 1)))))))")
|
||||||
|
|
||||||
|
(define
|
||||||
|
flow-load-host!
|
||||||
|
(fn
|
||||||
|
(env)
|
||||||
|
(begin (scheme-eval-program (scheme-parse-all flow-host-src) env) env)))
|
||||||
34
lib/flow/remote.sx
Normal file
34
lib/flow/remote.sx
Normal file
@@ -0,0 +1,34 @@
|
|||||||
|
;; lib/flow/remote.sx — distributed nodes via fed-sx (Phase 4).
|
||||||
|
;;
|
||||||
|
;; A node can execute on a federation peer. The transport is the fed-sx boundary;
|
||||||
|
;; it is MOCKED in tests by a peer registry mapping addr -> function table. In
|
||||||
|
;; production flow-transport would issue a fed-sx call; here it dispatches locally.
|
||||||
|
;;
|
||||||
|
;; (flow-peer-register! addr table) — register a mock peer. table is a list of
|
||||||
|
;; (fn-name proc) entries — the functions that peer exposes.
|
||||||
|
;; (flow-transport addr fn input) — invoke fn on the peer with input. Raises
|
||||||
|
;; (flow-remote-unreachable) if the addr is unknown, (flow-remote-no-fn) if the
|
||||||
|
;; peer does not expose fn.
|
||||||
|
;; (remote-node addr fn) — a node that runs fn on the peer at addr.
|
||||||
|
;; (remote-failover addrs fn local) — try fn on each peer in addrs in order; on a
|
||||||
|
;; raised error move to the next peer; if every peer fails, run the `local`
|
||||||
|
;; node as a fallback.
|
||||||
|
;;
|
||||||
|
;; Persistence across instances + handoff. Each instance runs the same flow
|
||||||
|
;; definitions, so the only thing that needs to cross the wire is the (plain-data)
|
||||||
|
;; store — exactly flow-store-export from store.sx. Replication pushes that export
|
||||||
|
;; to a peer's replica slot; handoff = restore the replica on the peer and resume.
|
||||||
|
;;
|
||||||
|
;; (flow-replicate-to addr) — copy this instance's store to peer addr's replica
|
||||||
|
;; (flow-restore-from addr) — import the replica from peer addr (#t / #f)
|
||||||
|
;; (flow-replica-get addr) — the raw replicated store at addr (or #f)
|
||||||
|
|
||||||
|
(define
|
||||||
|
flow-remote-src
|
||||||
|
"(define flow-peers (list))\n (define (flow-assoc key alist)\n (if (null? alist)\n #f\n (if (eq? (car (car alist)) key) (car (cdr (car alist))) (flow-assoc key (cdr alist)))))\n (define (flow-peer-register! addr table) (set! flow-peers (cons (list addr table) flow-peers)))\n (define (flow-transport addr fn input)\n (let ((table (flow-assoc addr flow-peers)))\n (if table\n (let ((proc (flow-assoc fn table)))\n (if proc (proc input) (raise (quote flow-remote-no-fn))))\n (raise (quote flow-remote-unreachable)))))\n (define (remote-node addr fn) (lambda (input) (flow-transport addr fn input)))\n (define (flow-failover-step addrs fn input local)\n (if (null? addrs)\n (local input)\n (guard (e (#t (flow-failover-step (cdr addrs) fn input local)))\n (flow-transport (car addrs) fn input))))\n (define (remote-failover addrs fn local)\n (lambda (input) (flow-failover-step addrs fn input local)))\n\n (define flow-replicas (list))\n (define (flow-replicas-remove addr reps)\n (if (null? reps)\n (list)\n (if (eq? (car (car reps)) addr)\n (flow-replicas-remove addr (cdr reps))\n (cons (car reps) (flow-replicas-remove addr (cdr reps))))))\n (define (flow-replicate-to addr)\n (set! flow-replicas (cons (list addr (flow-store-export)) (flow-replicas-remove addr flow-replicas))))\n (define (flow-replica-get addr) (flow-assoc addr flow-replicas))\n (define (flow-restore-from addr)\n (let ((data (flow-replica-get addr)))\n (if data (begin (flow-store-import! data) #t) #f)))")
|
||||||
|
|
||||||
|
(define
|
||||||
|
flow-load-remote!
|
||||||
|
(fn
|
||||||
|
(env)
|
||||||
|
(begin (scheme-eval-program (scheme-parse-all flow-remote-src) env) env)))
|
||||||
19
lib/flow/scoreboard.json
Normal file
19
lib/flow/scoreboard.json
Normal file
@@ -0,0 +1,19 @@
|
|||||||
|
{
|
||||||
|
"total": 166,
|
||||||
|
"passed": 166,
|
||||||
|
"failed": 0,
|
||||||
|
"suites": {
|
||||||
|
"basic": { "passed": 18, "total": 18 },
|
||||||
|
"control": { "passed": 31, "total": 31 },
|
||||||
|
"suspend": { "passed": 17, "total": 17 },
|
||||||
|
"recovery": { "passed": 8, "total": 8 },
|
||||||
|
"distributed": { "passed": 19, "total": 19 },
|
||||||
|
"api": { "passed": 12, "total": 12 },
|
||||||
|
"combinators": { "passed": 17, "total": 17 },
|
||||||
|
"railway": { "passed": 10, "total": 10 },
|
||||||
|
"integration": { "passed": 10, "total": 10 },
|
||||||
|
"hygiene": { "passed": 9, "total": 9 },
|
||||||
|
"host": { "passed": 15, "total": 15 }
|
||||||
|
},
|
||||||
|
"phases": { "phase1": "done", "phase2": "done", "phase3": "done", "phase4": "done", "phase5": "done", "phase6": "done", "phase7": "done", "phase8": "done" }
|
||||||
|
}
|
||||||
53
lib/flow/scoreboard.md
Normal file
53
lib/flow/scoreboard.md
Normal file
@@ -0,0 +1,53 @@
|
|||||||
|
# flow-on-sx Scoreboard
|
||||||
|
|
||||||
|
**All tests pass: 166 / 166 across 11 suites. Phases 1-8 complete.**
|
||||||
|
|
||||||
|
`bash lib/flow/conformance.sh`
|
||||||
|
|
||||||
|
## Per-suite breakdown
|
||||||
|
|
||||||
|
| Suite | Passing | Covers |
|
||||||
|
|-------|--------:|--------|
|
||||||
|
| basic | 18 | Phase 1: single nodes, linear sequence, data-flow threading, defflow, parallel fan/join, nested composition, publish-shaped flow |
|
||||||
|
| control | 31 | Phase 2: `branch` (6); error model `fail`/`failed?`/`fail-reason` (6); `try-catch` (6); `retry n` (6); `timeout` cooperative step budget (7) |
|
||||||
|
| suspend | 17 | Phase 3: suspend/resume/cancel via deterministic replay; multi-step, replay determinism, lifecycle guards, suspend-in-branch |
|
||||||
|
| recovery | 8 | Phase 3: crash recovery — store export/import, resumable scan, restart-at-every-step, replay-log survival |
|
||||||
|
| distributed | 19 | Phase 4: `remote-node` (7); `remote-failover` (6); replication + handoff across instances (6) |
|
||||||
|
| api | 12 | Phase 5: introspection — `flow/status`, `flow/result`, `flow/list`, `flow/pending` |
|
||||||
|
| combinators | 17 | Phase 5: `tap`, `recover` (fail-value), `map-flow` fan-over-list, `flow-while`/`flow-until` bounded iteration |
|
||||||
|
| railway | 10 | Phase 6: `attempt` — fail-value short-circuiting sequence + recover rejoin |
|
||||||
|
| integration | 10 | Phase 7: end-to-end order + onboarding flows composing every phase (suspend, branch, federation, crash recovery, handoff, introspection) |
|
||||||
|
| hygiene | 9 | Phase 5: `flow/gc` (prune terminal flows), `flow/forget` (drop one terminal record) |
|
||||||
|
| host | 15 | Phase 8: host ABI — `request`/`await-human`/`await-render`, `flow-host-requests` queue, `flow-run-host` reference driver; art-dag-shaped render→review→publish loop |
|
||||||
|
|
||||||
|
## Architecture
|
||||||
|
|
||||||
|
Flow combinators are a **Scheme prelude** (`lib/flow/spec.sx`) loaded onto
|
||||||
|
`scheme-standard-env`. A flow is a Scheme procedure `input -> output`. The whole
|
||||||
|
flow executes inside the Scheme interpreter, so Phase 3's `suspend` (call/cc) will
|
||||||
|
capture the flow continuation directly.
|
||||||
|
|
||||||
|
- `lib/flow/spec.sx` — combinators: `flow-node`, `flow-id`, `flow-const`,
|
||||||
|
`sequence`, `parallel`, `defflow`; `flow-load-combinators!`.
|
||||||
|
- `lib/flow/api.sx` — `flow/start` (Scheme); `flow-make-env`, `flow-run`,
|
||||||
|
`flow-run-in` (SX helpers).
|
||||||
|
- `lib/flow/tests/basic.sx` — 18 cases.
|
||||||
|
- `lib/flow/conformance.sh` — loads substrate + flow layer, runs suites.
|
||||||
|
|
||||||
|
## Semantics notes
|
||||||
|
|
||||||
|
- **node** = 1-arg Scheme procedure; the upstream value is the argument. A node
|
||||||
|
ignoring its argument is effectively a thunk.
|
||||||
|
- **sequence** threads left-to-right; empty sequence = identity.
|
||||||
|
- **parallel** fans the same input to every branch and joins results into a list.
|
||||||
|
Evaluation is **sequential** for now; true concurrency arrives in Phase 3.
|
||||||
|
|
||||||
|
## Phases
|
||||||
|
|
||||||
|
- [x] Phase 1 — Declarative DAG + sequential execution (combinators + 18 tests, `flow/start`)
|
||||||
|
- [x] Phase 2 — Control flow + error handling (branch, error model, try-catch, retry, timeout)
|
||||||
|
- [x] Phase 3 — Suspend/resume (suspend/resume/cancel + crash recovery via deterministic replay)
|
||||||
|
- [x] Phase 4 — Distributed nodes via fed-sx (remote-node, failover, replication + handoff)
|
||||||
|
- [x] Phase 5 — Operational API + combinators (introspection, tap, recover, map-flow)
|
||||||
|
- [ ] Phase 3 — Suspend / resume (the showcase)
|
||||||
|
- [ ] Phase 4 — Distributed nodes via fed-sx
|
||||||
61
lib/flow/spec.sx
Normal file
61
lib/flow/spec.sx
Normal file
@@ -0,0 +1,61 @@
|
|||||||
|
;; lib/flow/spec.sx — flow combinators as a Scheme prelude.
|
||||||
|
;;
|
||||||
|
;; A flow is a Scheme procedure of one argument: the upstream value.
|
||||||
|
;; node : input -> output
|
||||||
|
;; A leaf node ignoring its argument is effectively a thunk. Combinators
|
||||||
|
;; build composite nodes out of child nodes. The whole flow runs INSIDE the
|
||||||
|
;; Scheme interpreter.
|
||||||
|
;;
|
||||||
|
;; Phase 1 combinators (flow-combinators-src):
|
||||||
|
;; flow-node / flow-id / flow-const / sequence / parallel / defflow
|
||||||
|
;; defflow both binds the flow and registers it by name (flow-register!, in
|
||||||
|
;; store.sx) so it can be re-resolved after a process restart.
|
||||||
|
;; map-flow (Phase 5): run a node over each item of a list input, join results.
|
||||||
|
;; flow-while / flow-until (Phase 5): bounded iteration — re-run body, threading
|
||||||
|
;; the value, while/until pred holds, up to `max` steps (deterministic bound; no
|
||||||
|
;; unbounded loops in pure SX).
|
||||||
|
;;
|
||||||
|
;; Phase 2 combinators (flow-control-src):
|
||||||
|
;; branch / fail / failed? / fail-reason / try-catch / retry / timeout / tick
|
||||||
|
;; tap (Phase 5): side-effecting pass-through (returns input unchanged).
|
||||||
|
;; recover (Phase 5): the fail-VALUE counterpart of try-catch.
|
||||||
|
;; attempt (Phase 6): railway sequence — thread nodes left-to-right but stop at
|
||||||
|
;; the first node that returns a (fail ...) value, returning that failure.
|
||||||
|
;;
|
||||||
|
;; Phase 3 suspend core (flow-suspend-src):
|
||||||
|
;; The guest Scheme's call/cc is ESCAPE-ONLY (re-invoking a captured k after it
|
||||||
|
;; returns hangs the runtime), so suspend/resume CANNOT re-enter a continuation.
|
||||||
|
;; Instead, durability uses DETERMINISTIC REPLAY: a flow re-runs from the start
|
||||||
|
;; on each resume; suspend points that have already been resolved replay their
|
||||||
|
;; logged value, and the first unresolved suspend escapes back to the driver.
|
||||||
|
;; The entire persisted state is the replay log (plain (tag value) data), which
|
||||||
|
;; survives process restart — no live continuation is ever serialized.
|
||||||
|
;;
|
||||||
|
;; (suspend tag) — if tag is in the replay log, return its value; else escape
|
||||||
|
;; to the driver as (flow-suspended tag). tags must be unique & deterministic
|
||||||
|
;; across replays. ALL effects/non-determinism must go through suspend so their
|
||||||
|
;; results are logged (otherwise they re-run on every replay).
|
||||||
|
;; (flow-drive flow input log) — run flow with the given replay log; returns
|
||||||
|
;; (flow-done result) or (flow-suspended tag).
|
||||||
|
|
||||||
|
(define
|
||||||
|
flow-combinators-src
|
||||||
|
"(define (flow-node f) f)\n (define (flow-id input) input)\n (define (flow-const v) (lambda (input) v))\n (define (flow-seq-step ns v)\n (if (null? ns) v (flow-seq-step (cdr ns) ((car ns) v))))\n (define sequence (lambda ns (lambda (input) (flow-seq-step ns input))))\n (define parallel (lambda ns (lambda (input) (map (lambda (n) (n input)) ns))))\n (define (map-flow node) (lambda (items) (map node items)))\n (define (flow-while-step pred body input n)\n (if (<= n 0)\n input\n (if (pred input) (flow-while-step pred body (body input) (- n 1)) input)))\n (define (flow-while pred body max) (lambda (input) (flow-while-step pred body input max)))\n (define (flow-until-step pred body input n)\n (if (<= n 0)\n input\n (if (pred input) input (flow-until-step pred body (body input) (- n 1)))))\n (define (flow-until pred body max) (lambda (input) (flow-until-step pred body input max)))\n (define-syntax defflow\n (syntax-rules ()\n ((defflow nm body)\n (begin (define nm body) (flow-register! (quote nm) nm)))))")
|
||||||
|
|
||||||
|
(define
|
||||||
|
flow-control-src
|
||||||
|
"(define (branch pred then else)\n (lambda (input) (if (pred input) (then input) (else input))))\n (define (fail reason) (list (quote flow-fail) reason))\n (define (failed? x) (and (pair? x) (eq? (car x) (quote flow-fail))))\n (define (fail-reason x) (car (cdr x)))\n (define (recover node handler)\n (lambda (input)\n (let ((r (node input)))\n (if (failed? r) (handler (fail-reason r)) r))))\n (define (tap effect)\n (lambda (input) (begin (effect input) input)))\n (define (flow-attempt-step ns v)\n (if (failed? v)\n v\n (if (null? ns) v (flow-attempt-step (cdr ns) ((car ns) v)))))\n (define attempt (lambda ns (lambda (input) (flow-attempt-step ns input))))\n (define (try-catch node handler)\n (lambda (input) (guard (e (#t (handler e))) (node input))))\n (define (flow-retry-step n node input)\n (guard (e (#t (if (<= n 1) (raise e) (flow-retry-step (- n 1) node input))))\n (node input)))\n (define (retry n node) (lambda (input) (flow-retry-step n node input)))\n (define flow-timeout-budget -1)\n (define (tick)\n (if (< flow-timeout-budget 0)\n 0\n (begin\n (set! flow-timeout-budget (- flow-timeout-budget 1))\n (if (< flow-timeout-budget 0)\n (raise (quote flow-timeout))\n flow-timeout-budget))))\n (define (timeout budget node)\n (lambda (input)\n (let ((saved flow-timeout-budget))\n (set! flow-timeout-budget budget)\n (guard (e (#t (begin (set! flow-timeout-budget saved) (raise e))))\n (let ((result (node input)))\n (set! flow-timeout-budget saved)\n result)))))")
|
||||||
|
|
||||||
|
(define
|
||||||
|
flow-suspend-src
|
||||||
|
"(define flow-replay-log (list))\n (define flow-suspend-k #f)\n (define (flow-log-lookup tag log)\n (if (null? log)\n (list #f #f)\n (if (eq? (car (car log)) tag)\n (list #t (car (cdr (car log))))\n (flow-log-lookup tag (cdr log)))))\n (define (suspend tag)\n (let ((hit (flow-log-lookup tag flow-replay-log)))\n (if (car hit)\n (car (cdr hit))\n (flow-suspend-k (list (quote flow-suspended) tag)))))\n (define (flow-drive flow input log)\n (set! flow-replay-log log)\n (call/cc\n (lambda (k)\n (set! flow-suspend-k k)\n (list (quote flow-done) (flow input)))))")
|
||||||
|
|
||||||
|
(define
|
||||||
|
flow-load-combinators!
|
||||||
|
(fn
|
||||||
|
(env)
|
||||||
|
(begin
|
||||||
|
(scheme-eval-program (scheme-parse-all flow-combinators-src) env)
|
||||||
|
(scheme-eval-program (scheme-parse-all flow-control-src) env)
|
||||||
|
(scheme-eval-program (scheme-parse-all flow-suspend-src) env)
|
||||||
|
env)))
|
||||||
45
lib/flow/store.sx
Normal file
45
lib/flow/store.sx
Normal file
File diff suppressed because one or more lines are too long
79
lib/flow/tests/api.sx
Normal file
79
lib/flow/tests/api.sx
Normal file
@@ -0,0 +1,79 @@
|
|||||||
|
;; lib/flow/tests/api.sx — Phase 5: operational introspection API.
|
||||||
|
|
||||||
|
(define flow-api-pass 0)
|
||||||
|
(define flow-api-fail 0)
|
||||||
|
(define flow-api-fails (list))
|
||||||
|
|
||||||
|
(define
|
||||||
|
flow-api-test
|
||||||
|
(fn
|
||||||
|
(name actual expected)
|
||||||
|
(if
|
||||||
|
(= actual expected)
|
||||||
|
(set! flow-api-pass (+ flow-api-pass 1))
|
||||||
|
(begin
|
||||||
|
(set! flow-api-fail (+ flow-api-fail 1))
|
||||||
|
(append! flow-api-fails {:name name :expected expected :actual actual})))))
|
||||||
|
|
||||||
|
(define flow-a (fn (src) (flow-run src)))
|
||||||
|
|
||||||
|
;; ── flow/status ─────────────────────────────────────────────────
|
||||||
|
(flow-api-test "status: unknown id" (flow-a "(flow/status 999)") "unknown")
|
||||||
|
(flow-api-test
|
||||||
|
"status: suspended flow"
|
||||||
|
(flow-a
|
||||||
|
"(defflow w (lambda (x) (suspend (quote q)))) (define id (car (cdr (flow/start w 0)))) (flow/status id)")
|
||||||
|
"suspended")
|
||||||
|
(flow-api-test
|
||||||
|
"status: completed flow"
|
||||||
|
(flow-a
|
||||||
|
"(defflow w (sequence (lambda (x) (suspend (quote q))) (lambda (v) v))) (define id (car (cdr (flow/start w 0)))) (flow/resume id 5) (flow/status id)")
|
||||||
|
"done")
|
||||||
|
(flow-api-test
|
||||||
|
"status: cancelled flow"
|
||||||
|
(flow-a
|
||||||
|
"(defflow w (lambda (x) (suspend (quote q)))) (define id (car (cdr (flow/start w 0)))) (flow/cancel id) (flow/status id)")
|
||||||
|
"cancelled")
|
||||||
|
|
||||||
|
;; ── flow/result ─────────────────────────────────────────────────
|
||||||
|
(flow-api-test
|
||||||
|
"result: returns the value of a completed flow"
|
||||||
|
(flow-a
|
||||||
|
"(defflow w (sequence (lambda (x) (suspend (quote q))) (lambda (v) (list (quote got) v)))) (define id (car (cdr (flow/start w 0)))) (flow/resume id 9) (flow/result id)")
|
||||||
|
(list "got" 9))
|
||||||
|
(flow-api-test
|
||||||
|
"result: a still-suspended flow has no result"
|
||||||
|
(flow-a
|
||||||
|
"(defflow w (lambda (x) (suspend (quote q)))) (define id (car (cdr (flow/start w 0)))) (flow/result id)")
|
||||||
|
(list "flow-error" "not-done"))
|
||||||
|
(flow-api-test
|
||||||
|
"result: unknown id errors"
|
||||||
|
(flow-a "(flow/result 999)")
|
||||||
|
(list "flow-error" "no-such-flow"))
|
||||||
|
|
||||||
|
;; ── flow/list ───────────────────────────────────────────────────
|
||||||
|
(flow-api-test "list: empty store" (flow-a "(flow/list)") (list))
|
||||||
|
(flow-api-test
|
||||||
|
"list: reports id + status for each flow (newest first)"
|
||||||
|
(flow-a
|
||||||
|
"(defflow w (lambda (x) (suspend (quote q)))) (flow/start w 0) (flow/start (lambda (x) (* x 2)) 5) (flow/list)")
|
||||||
|
(list (list 2 "done") (list 1 "suspended")))
|
||||||
|
|
||||||
|
;; ── flow/pending ────────────────────────────────────────────────
|
||||||
|
(flow-api-test
|
||||||
|
"pending: lists suspended flows with their waiting tag"
|
||||||
|
(flow-a
|
||||||
|
"(defflow w (lambda (x) (suspend (quote review)))) (flow/start w 0) (flow/pending)")
|
||||||
|
(list (list 1 "review")))
|
||||||
|
(flow-api-test
|
||||||
|
"pending: excludes completed and cancelled flows"
|
||||||
|
(flow-a
|
||||||
|
"(defflow w (lambda (x) (suspend (quote q)))) (defflow v (sequence (lambda (x) (suspend (quote r))) (lambda (y) y))) (define i1 (car (cdr (flow/start w 0)))) (define i2 (car (cdr (flow/start v 0)))) (define i3 (car (cdr (flow/start w 0)))) (flow/resume i2 1) (flow/cancel i3) (flow/pending)")
|
||||||
|
(list (list 1 "q")))
|
||||||
|
(flow-api-test
|
||||||
|
"pending: operator can drain all pending flows"
|
||||||
|
(flow-a
|
||||||
|
"(defflow w (sequence (lambda (x) (suspend (quote q))) (lambda (v) (* v 10)))) (flow/start w 0) (flow/start w 0) (define ps (flow/pending)) (flow/resume (car (car ps)) 1) (flow/resume (car (car (cdr ps))) 2) (flow/list)")
|
||||||
|
(list (list 1 "done") (list 2 "done")))
|
||||||
|
|
||||||
|
(define flow-api-tests-run! (fn () {:total (+ flow-api-pass flow-api-fail) :passed flow-api-pass :failed flow-api-fail :fails flow-api-fails}))
|
||||||
121
lib/flow/tests/basic.sx
Normal file
121
lib/flow/tests/basic.sx
Normal file
@@ -0,0 +1,121 @@
|
|||||||
|
;; lib/flow/tests/basic.sx — Phase 1: declarative DAG + sequential execution.
|
||||||
|
|
||||||
|
(define flow-basic-pass 0)
|
||||||
|
(define flow-basic-fail 0)
|
||||||
|
(define flow-basic-fails (list))
|
||||||
|
|
||||||
|
(define
|
||||||
|
flow-basic-test
|
||||||
|
(fn
|
||||||
|
(name actual expected)
|
||||||
|
(if
|
||||||
|
(= actual expected)
|
||||||
|
(set! flow-basic-pass (+ flow-basic-pass 1))
|
||||||
|
(begin
|
||||||
|
(set! flow-basic-fail (+ flow-basic-fail 1))
|
||||||
|
(append! flow-basic-fails {:name name :expected expected :actual actual})))))
|
||||||
|
|
||||||
|
;; Run a Scheme flow-program string and return its final value.
|
||||||
|
(define flow-b (fn (src) (flow-run src)))
|
||||||
|
|
||||||
|
;; Scheme strings are boxed as {:scm-string "..."}; unwrap to a host string.
|
||||||
|
(define flow-bs (fn (src) (get (flow-run src) :scm-string)))
|
||||||
|
|
||||||
|
;; ── single node ─────────────────────────────────────────────────
|
||||||
|
(flow-basic-test
|
||||||
|
"node: identity passes input through"
|
||||||
|
(flow-b "(flow/start flow-id 7)")
|
||||||
|
7)
|
||||||
|
(flow-basic-test
|
||||||
|
"node: const ignores input"
|
||||||
|
(flow-b "(flow/start (flow-const 99) 1)")
|
||||||
|
99)
|
||||||
|
(flow-basic-test
|
||||||
|
"node: bare lambda is a node"
|
||||||
|
(flow-b "(flow/start (lambda (x) (* x x)) 6)")
|
||||||
|
36)
|
||||||
|
|
||||||
|
;; ── linear sequence ─────────────────────────────────────────────
|
||||||
|
(flow-basic-test
|
||||||
|
"sequence: empty is identity"
|
||||||
|
(flow-b "(flow/start (sequence) 42)")
|
||||||
|
42)
|
||||||
|
(flow-basic-test
|
||||||
|
"sequence: single child"
|
||||||
|
(flow-b "(flow/start (sequence (lambda (x) (+ x 1))) 41)")
|
||||||
|
42)
|
||||||
|
(flow-basic-test
|
||||||
|
"sequence: two children thread"
|
||||||
|
(flow-b
|
||||||
|
"(flow/start (sequence (lambda (x) (+ x 1)) (lambda (x) (* x 10))) 4)")
|
||||||
|
50)
|
||||||
|
(flow-basic-test
|
||||||
|
"sequence: three children thread"
|
||||||
|
(flow-b
|
||||||
|
"(flow/start (sequence (lambda (x) (+ x 1)) (lambda (x) (* x 2)) (lambda (x) (- x 3))) 5)")
|
||||||
|
9)
|
||||||
|
|
||||||
|
;; ── data flow between nodes ─────────────────────────────────────
|
||||||
|
(flow-basic-test
|
||||||
|
"data flow: string accumulation"
|
||||||
|
(flow-bs
|
||||||
|
"(flow/start (sequence (lambda (s) (string-append s \"-a\")) (lambda (s) (string-append s \"-b\"))) \"x\")")
|
||||||
|
"x-a-b")
|
||||||
|
(flow-basic-test
|
||||||
|
"data flow: list build"
|
||||||
|
(flow-b
|
||||||
|
"(flow/start (sequence (lambda (x) (cons x (list))) (lambda (xs) (cons 0 xs))) 7)")
|
||||||
|
(list 0 7))
|
||||||
|
|
||||||
|
;; ── defflow ─────────────────────────────────────────────────────
|
||||||
|
(flow-basic-test
|
||||||
|
"defflow: names a flow"
|
||||||
|
(flow-b
|
||||||
|
"(defflow inc2 (sequence (lambda (x) (+ x 1)) (lambda (x) (+ x 1)))) (flow/start inc2 40)")
|
||||||
|
42)
|
||||||
|
(flow-basic-test
|
||||||
|
"defflow: reusable"
|
||||||
|
(flow-b
|
||||||
|
"(defflow dbl (lambda (x) (* x 2))) (+ (flow/start dbl 3) (flow/start dbl 10))")
|
||||||
|
26)
|
||||||
|
|
||||||
|
;; ── parallel (sequential semantics, join into list) ─────────────
|
||||||
|
(flow-basic-test
|
||||||
|
"parallel: fans input to all branches"
|
||||||
|
(flow-b
|
||||||
|
"(flow/start (parallel (lambda (x) (+ x 1)) (lambda (x) (* x 2)) (lambda (x) (- x 3))) 10)")
|
||||||
|
(list 11 20 7))
|
||||||
|
(flow-basic-test
|
||||||
|
"parallel: empty joins to empty list"
|
||||||
|
(flow-b "(flow/start (parallel) 5)")
|
||||||
|
(list))
|
||||||
|
(flow-basic-test
|
||||||
|
"parallel: single branch"
|
||||||
|
(flow-b "(flow/start (parallel (lambda (x) (* x x))) 9)")
|
||||||
|
(list 81))
|
||||||
|
|
||||||
|
;; ── nested composition ──────────────────────────────────────────
|
||||||
|
(flow-basic-test
|
||||||
|
"nested: sequence of sequences"
|
||||||
|
(flow-b
|
||||||
|
"(flow/start (sequence (sequence (lambda (x) (+ x 1)) (lambda (x) (+ x 1))) (sequence (lambda (x) (* x 3)))) 0)")
|
||||||
|
6)
|
||||||
|
(flow-basic-test
|
||||||
|
"nested: parallel inside sequence, join then reduce"
|
||||||
|
(flow-b
|
||||||
|
"(flow/start (sequence (parallel (lambda (x) (+ x 1)) (lambda (x) (* x 2))) (lambda (xs) (apply + xs))) 10)")
|
||||||
|
31)
|
||||||
|
(flow-basic-test
|
||||||
|
"nested: sequence inside parallel branch"
|
||||||
|
(flow-b
|
||||||
|
"(flow/start (parallel (sequence (lambda (x) (+ x 1)) (lambda (x) (* x 2))) (lambda (x) x)) 5)")
|
||||||
|
(list 12 5))
|
||||||
|
|
||||||
|
;; ── publish-shaped flow (the architecture sketch) ───────────────
|
||||||
|
(flow-basic-test
|
||||||
|
"publish: write -> (review | spell) -> join lengths"
|
||||||
|
(flow-b
|
||||||
|
"(defflow publish (sequence (lambda (draft) (string-append draft \"!\")) (parallel (lambda (c) (string-length c)) (lambda (c) (string-length (string-append c \"?\")))))) (flow/start publish \"hi\")")
|
||||||
|
(list 3 4))
|
||||||
|
|
||||||
|
(define flow-basic-tests-run! (fn () {:total (+ flow-basic-pass flow-basic-fail) :passed flow-basic-pass :failed flow-basic-fail :fails flow-basic-fails}))
|
||||||
108
lib/flow/tests/combinators.sx
Normal file
108
lib/flow/tests/combinators.sx
Normal file
@@ -0,0 +1,108 @@
|
|||||||
|
;; lib/flow/tests/combinators.sx — Phase 5: combinator library (tap, recover, map-flow, iteration).
|
||||||
|
|
||||||
|
(define flow-cmb-pass 0)
|
||||||
|
(define flow-cmb-fail 0)
|
||||||
|
(define flow-cmb-fails (list))
|
||||||
|
|
||||||
|
(define
|
||||||
|
flow-cmb-test
|
||||||
|
(fn
|
||||||
|
(name actual expected)
|
||||||
|
(if
|
||||||
|
(= actual expected)
|
||||||
|
(set! flow-cmb-pass (+ flow-cmb-pass 1))
|
||||||
|
(begin
|
||||||
|
(set! flow-cmb-fail (+ flow-cmb-fail 1))
|
||||||
|
(append! flow-cmb-fails {:name name :expected expected :actual actual})))))
|
||||||
|
|
||||||
|
(define flow-m (fn (src) (flow-run src)))
|
||||||
|
|
||||||
|
;; ── tap (side-effecting pass-through) ───────────────────────────
|
||||||
|
(flow-cmb-test
|
||||||
|
"tap: returns input unchanged"
|
||||||
|
(flow-m "(flow/start (tap (lambda (x) (* x 999))) 7)")
|
||||||
|
7)
|
||||||
|
(flow-cmb-test
|
||||||
|
"tap: runs the side effect"
|
||||||
|
(flow-m
|
||||||
|
"(define seen 0) (flow/start (tap (lambda (x) (set! seen x))) 42) seen")
|
||||||
|
42)
|
||||||
|
(flow-cmb-test
|
||||||
|
"tap: value flows on while the effect observes it"
|
||||||
|
(flow-m
|
||||||
|
"(define log 0) (flow/start (sequence (lambda (x) (+ x 1)) (tap (lambda (x) (set! log x))) (lambda (x) (* x 2))) 10) (list log (flow/result 1))")
|
||||||
|
(list 11 22))
|
||||||
|
|
||||||
|
;; ── recover (fail-value counterpart of try-catch) ───────────────
|
||||||
|
(flow-cmb-test
|
||||||
|
"recover: passes a non-fail value through"
|
||||||
|
(flow-m "(flow/start (recover (lambda (x) (* x 2)) (lambda (r) -1)) 5)")
|
||||||
|
10)
|
||||||
|
(flow-cmb-test
|
||||||
|
"recover: handles a fail value via the reason"
|
||||||
|
(flow-m
|
||||||
|
"(flow/start (recover (lambda (x) (fail (quote too-small))) (lambda (r) (list (quote recovered) r))) 1)")
|
||||||
|
(list "recovered" "too-small"))
|
||||||
|
(flow-cmb-test
|
||||||
|
"recover: handler can supply a default value"
|
||||||
|
(flow-m
|
||||||
|
"(flow/start (sequence (recover (lambda (x) (if (> x 0) x (fail (quote neg))) ) (flow-const 0)) (lambda (x) (* x 10))) -3)")
|
||||||
|
0)
|
||||||
|
(flow-cmb-test
|
||||||
|
"recover: does not catch raised exceptions (those are try-catch's job)"
|
||||||
|
(flow-m
|
||||||
|
"(flow/start (try-catch (recover (lambda (x) (raise (quote boom))) (flow-const 0)) (lambda (e) e)) 1)")
|
||||||
|
"boom")
|
||||||
|
|
||||||
|
;; ── map-flow (run a node over a list, join) ─────────────────────
|
||||||
|
(flow-cmb-test
|
||||||
|
"map-flow: applies the node to each item"
|
||||||
|
(flow-m "(flow/start (map-flow (lambda (x) (* x x))) (list 1 2 3 4))")
|
||||||
|
(list 1 4 9 16))
|
||||||
|
(flow-cmb-test
|
||||||
|
"map-flow: empty list joins to empty"
|
||||||
|
(flow-m "(flow/start (map-flow (lambda (x) (+ x 1))) (list))")
|
||||||
|
(list))
|
||||||
|
(flow-cmb-test
|
||||||
|
"map-flow: each item runs an independent sub-flow"
|
||||||
|
(flow-m
|
||||||
|
"(flow/start (map-flow (sequence (lambda (x) (+ x 1)) (lambda (x) (* x 2)))) (list 0 4 9))")
|
||||||
|
(list 2 10 20))
|
||||||
|
(flow-cmb-test
|
||||||
|
"map-flow: composes — fan over a list then reduce the join"
|
||||||
|
(flow-m
|
||||||
|
"(flow/start (sequence (map-flow (lambda (x) (* x 10))) (lambda (xs) (apply + xs))) (list 1 2 3))")
|
||||||
|
60)
|
||||||
|
|
||||||
|
;; ── flow-while / flow-until (bounded iteration) ─────────────────
|
||||||
|
(flow-cmb-test
|
||||||
|
"flow-while: iterates while the predicate holds"
|
||||||
|
(flow-m
|
||||||
|
"(flow/start (flow-while (lambda (x) (< x 10)) (lambda (x) (+ x 1)) 100) 0)")
|
||||||
|
10)
|
||||||
|
(flow-cmb-test
|
||||||
|
"flow-while: a false predicate leaves input unchanged"
|
||||||
|
(flow-m
|
||||||
|
"(flow/start (flow-while (lambda (x) (< x 0)) (lambda (x) (+ x 1)) 100) 5)")
|
||||||
|
5)
|
||||||
|
(flow-cmb-test
|
||||||
|
"flow-while: respects the max-iteration bound"
|
||||||
|
(flow-m "(flow/start (flow-while (lambda (x) #t) (lambda (x) (+ x 1)) 3) 0)")
|
||||||
|
3)
|
||||||
|
(flow-cmb-test
|
||||||
|
"flow-while: doubles until past a threshold"
|
||||||
|
(flow-m
|
||||||
|
"(flow/start (flow-while (lambda (x) (< x 50)) (lambda (x) (* x 2)) 100) 3)")
|
||||||
|
96)
|
||||||
|
(flow-cmb-test
|
||||||
|
"flow-until: iterates until the predicate becomes true"
|
||||||
|
(flow-m
|
||||||
|
"(flow/start (flow-until (lambda (x) (>= x 10)) (lambda (x) (+ x 3)) 100) 0)")
|
||||||
|
12)
|
||||||
|
(flow-cmb-test
|
||||||
|
"flow-until: composes inside a sequence"
|
||||||
|
(flow-m
|
||||||
|
"(flow/start (sequence (flow-until (lambda (x) (> x 100)) (lambda (x) (* x 3)) 100) (lambda (x) (- x 100))) 5)")
|
||||||
|
35)
|
||||||
|
|
||||||
|
(define flow-cmb-tests-run! (fn () {:total (+ flow-cmb-pass flow-cmb-fail) :passed flow-cmb-pass :failed flow-cmb-fail :fails flow-cmb-fails}))
|
||||||
179
lib/flow/tests/control.sx
Normal file
179
lib/flow/tests/control.sx
Normal file
@@ -0,0 +1,179 @@
|
|||||||
|
;; lib/flow/tests/control.sx — Phase 2: control flow + error handling.
|
||||||
|
|
||||||
|
(define flow-ctl-pass 0)
|
||||||
|
(define flow-ctl-fail 0)
|
||||||
|
(define flow-ctl-fails (list))
|
||||||
|
|
||||||
|
(define
|
||||||
|
flow-ctl-test
|
||||||
|
(fn
|
||||||
|
(name actual expected)
|
||||||
|
(if
|
||||||
|
(= actual expected)
|
||||||
|
(set! flow-ctl-pass (+ flow-ctl-pass 1))
|
||||||
|
(begin
|
||||||
|
(set! flow-ctl-fail (+ flow-ctl-fail 1))
|
||||||
|
(append! flow-ctl-fails {:name name :expected expected :actual actual})))))
|
||||||
|
|
||||||
|
(define flow-c (fn (src) (flow-run src)))
|
||||||
|
(define flow-cs (fn (src) (get (flow-run src) :scm-string)))
|
||||||
|
|
||||||
|
;; ── branch ──────────────────────────────────────────────────────
|
||||||
|
(flow-ctl-test
|
||||||
|
"branch: true selects then"
|
||||||
|
(flow-c
|
||||||
|
"(flow/start (branch (lambda (x) (> x 0)) (lambda (x) (* x 100)) (lambda (x) (- 0 x))) 5)")
|
||||||
|
500)
|
||||||
|
(flow-ctl-test
|
||||||
|
"branch: false selects else"
|
||||||
|
(flow-c
|
||||||
|
"(flow/start (branch (lambda (x) (> x 0)) (lambda (x) (* x 100)) (lambda (x) (- 0 x))) -3)")
|
||||||
|
3)
|
||||||
|
(flow-ctl-test
|
||||||
|
"branch: predicate sees the threaded input"
|
||||||
|
(flow-c
|
||||||
|
"(flow/start (sequence (lambda (x) (+ x 1)) (branch (lambda (x) (> x 3)) (flow-const 100) (flow-const 0))) 3)")
|
||||||
|
100)
|
||||||
|
(flow-ctl-test
|
||||||
|
"branch: branches are full nodes (sequence inside)"
|
||||||
|
(flow-c
|
||||||
|
"(flow/start (branch (lambda (x) (< x 10)) (sequence (lambda (x) (+ x 1)) (lambda (x) (* x 2))) (flow-const 0)) 4)")
|
||||||
|
10)
|
||||||
|
(flow-ctl-test
|
||||||
|
"branch: nested branch (3-way sign)"
|
||||||
|
(flow-c
|
||||||
|
"(defflow sign (branch (lambda (x) (> x 0)) (flow-const 1) (branch (lambda (x) (< x 0)) (flow-const -1) (flow-const 0)))) (list (flow/start sign 7) (flow/start sign -7) (flow/start sign 0))")
|
||||||
|
(list 1 -1 0))
|
||||||
|
(flow-ctl-test
|
||||||
|
"branch: publish-shaped approval gate"
|
||||||
|
(flow-cs
|
||||||
|
"(defflow publish (branch (lambda (post) (>= (string-length post) 3)) (lambda (post) (string-append post \" [published]\")) (lambda (post) (string-append post \" [rejected]\")))) (flow/start publish \"ok\")")
|
||||||
|
"ok [rejected]")
|
||||||
|
|
||||||
|
;; ── error model — explicit (fail reason) values ─────────────────
|
||||||
|
(flow-ctl-test
|
||||||
|
"fail: failed? is true for a failure value"
|
||||||
|
(flow-c "(failed? (fail 404))")
|
||||||
|
true)
|
||||||
|
(flow-ctl-test
|
||||||
|
"fail: fail-reason extracts the reason"
|
||||||
|
(flow-c "(fail-reason (fail 404))")
|
||||||
|
404)
|
||||||
|
(flow-ctl-test
|
||||||
|
"fail: failed? is false for a plain value"
|
||||||
|
(flow-c "(failed? 7)")
|
||||||
|
false)
|
||||||
|
(flow-ctl-test
|
||||||
|
"fail: failed? is false for an ordinary list"
|
||||||
|
(flow-c "(failed? (list 1 2 3))")
|
||||||
|
false)
|
||||||
|
(flow-ctl-test
|
||||||
|
"fail: a node may emit a failure as data"
|
||||||
|
(flow-c
|
||||||
|
"(defflow validate (lambda (s) (if (>= (string-length s) 3) s (fail (quote too-short))))) (failed? (flow/start validate \"hi\"))")
|
||||||
|
true)
|
||||||
|
(flow-ctl-test
|
||||||
|
"fail: failure flows downstream, branch recovers"
|
||||||
|
(flow-c
|
||||||
|
"(defflow guarded (sequence (lambda (s) (if (>= (string-length s) 3) (string-length s) (fail (quote too-short)))) (branch failed? (lambda (f) (list (quote recovered) (fail-reason f))) (lambda (n) (list (quote ok) n))))) (flow/start guarded \"hi\")")
|
||||||
|
(list "recovered" "too-short"))
|
||||||
|
|
||||||
|
;; ── try-catch — reify raised exceptions ─────────────────────────
|
||||||
|
(flow-ctl-test
|
||||||
|
"try-catch: no exception returns node result"
|
||||||
|
(flow-c "(flow/start (try-catch (lambda (x) (* x 2)) (lambda (e) -1)) 5)")
|
||||||
|
10)
|
||||||
|
(flow-ctl-test
|
||||||
|
"try-catch: handler runs on raise"
|
||||||
|
(flow-c
|
||||||
|
"(flow/start (try-catch (lambda (x) (raise (quote boom))) (flow-const 99)) 1)")
|
||||||
|
99)
|
||||||
|
(flow-ctl-test
|
||||||
|
"try-catch: handler receives the reified error"
|
||||||
|
(flow-c "(flow/start (try-catch (lambda (x) (raise 42)) (lambda (e) e)) 0)")
|
||||||
|
42)
|
||||||
|
(flow-ctl-test
|
||||||
|
"try-catch: catches exception from deep inside a sequence"
|
||||||
|
(flow-c
|
||||||
|
"(flow/start (try-catch (sequence (lambda (x) (+ x 1)) (lambda (x) (raise (quote deep)))) (flow-const -99)) 5)")
|
||||||
|
-99)
|
||||||
|
(flow-ctl-test
|
||||||
|
"try-catch: handler may convert to a failure value"
|
||||||
|
(flow-c
|
||||||
|
"(failed? (flow/start (try-catch (lambda (x) (raise (quote bad))) (lambda (e) (fail e))) 0))")
|
||||||
|
true)
|
||||||
|
(flow-ctl-test
|
||||||
|
"try-catch: composes — recover then continue"
|
||||||
|
(flow-c
|
||||||
|
"(flow/start (sequence (try-catch (lambda (x) (raise (quote x))) (flow-const 10)) (lambda (n) (* n 5))) 0)")
|
||||||
|
50)
|
||||||
|
|
||||||
|
;; ── retry — re-run on raised exceptions ─────────────────────────
|
||||||
|
(flow-ctl-test
|
||||||
|
"retry: succeeds after transient failures"
|
||||||
|
(flow-c
|
||||||
|
"(define ctr 0) (defflow flaky (lambda (x) (set! ctr (+ ctr 1)) (if (< ctr 3) (raise (quote nope)) (* x 10)))) (list (flow/start (retry 5 flaky) 7) ctr)")
|
||||||
|
(list 70 3))
|
||||||
|
(flow-ctl-test
|
||||||
|
"retry: exhausted re-raises (caught by try-catch)"
|
||||||
|
(flow-c
|
||||||
|
"(flow/start (try-catch (retry 2 (lambda (x) (raise (quote always)))) (flow-const (quote gaveup))) 0)")
|
||||||
|
"gaveup")
|
||||||
|
(flow-ctl-test
|
||||||
|
"retry: n=1 means a single attempt"
|
||||||
|
(flow-c
|
||||||
|
"(define ctr 0) (flow/start (try-catch (retry 1 (lambda (x) (set! ctr (+ ctr 1)) (raise (quote bad)))) (lambda (e) ctr)) 0)")
|
||||||
|
1)
|
||||||
|
(flow-ctl-test
|
||||||
|
"retry: success on first attempt does not re-run"
|
||||||
|
(flow-c
|
||||||
|
"(define ctr 0) (flow/start (sequence (retry 5 (lambda (x) (set! ctr (+ ctr 1)) (* x 2))) (lambda (n) ctr)) 21)")
|
||||||
|
1)
|
||||||
|
(flow-ctl-test
|
||||||
|
"retry: does not retry explicit failure values"
|
||||||
|
(flow-c
|
||||||
|
"(define ctr 0) (failed? (flow/start (retry 5 (lambda (x) (set! ctr (+ ctr 1)) (fail (quote bad)))) 0))")
|
||||||
|
true)
|
||||||
|
(flow-ctl-test
|
||||||
|
"retry: failure-value path runs node exactly once"
|
||||||
|
(flow-c
|
||||||
|
"(define ctr 0) (flow/start (sequence (retry 5 (lambda (x) (set! ctr (+ ctr 1)) (fail (quote bad)))) (lambda (f) ctr)) 0)")
|
||||||
|
1)
|
||||||
|
|
||||||
|
;; ── timeout — cooperative step budget ───────────────────────────
|
||||||
|
(flow-ctl-test
|
||||||
|
"timeout: work within budget completes"
|
||||||
|
(flow-c
|
||||||
|
"(define (cd n) (if (<= n 0) 99 (begin (tick) (cd (- n 1))))) (flow/start (try-catch (timeout 10 (lambda (x) (cd x))) (flow-const (quote timed-out))) 5)")
|
||||||
|
99)
|
||||||
|
(flow-ctl-test
|
||||||
|
"timeout: work exceeding budget raises flow-timeout"
|
||||||
|
(flow-c
|
||||||
|
"(define (cd n) (if (<= n 0) 99 (begin (tick) (cd (- n 1))))) (flow/start (try-catch (timeout 10 (lambda (x) (cd x))) (flow-const (quote timed-out))) 20)")
|
||||||
|
"timed-out")
|
||||||
|
(flow-ctl-test
|
||||||
|
"timeout: exact budget boundary completes"
|
||||||
|
(flow-c
|
||||||
|
"(define (cd n) (if (<= n 0) 99 (begin (tick) (cd (- n 1))))) (flow/start (try-catch (timeout 5 (lambda (x) (cd x))) (flow-const (quote timed-out))) 5)")
|
||||||
|
99)
|
||||||
|
(flow-ctl-test
|
||||||
|
"timeout: one tick over the budget raises"
|
||||||
|
(flow-c
|
||||||
|
"(define (cd n) (if (<= n 0) 99 (begin (tick) (cd (- n 1))))) (flow/start (try-catch (timeout 5 (lambda (x) (cd x))) (flow-const (quote timed-out))) 6)")
|
||||||
|
"timed-out")
|
||||||
|
(flow-ctl-test
|
||||||
|
"timeout: the raised error is identifiable"
|
||||||
|
(flow-c
|
||||||
|
"(define (cd n) (if (<= n 0) 99 (begin (tick) (cd (- n 1))))) (flow/start (try-catch (timeout 2 (lambda (x) (cd x))) (lambda (e) e)) 9)")
|
||||||
|
"flow-timeout")
|
||||||
|
(flow-ctl-test
|
||||||
|
"timeout: a node that never ticks is unbounded"
|
||||||
|
(flow-c "(flow/start (timeout 0 (lambda (x) (* x 2))) 5)")
|
||||||
|
10)
|
||||||
|
(flow-ctl-test
|
||||||
|
"timeout: budget is restored across sequential timeouts"
|
||||||
|
(flow-c
|
||||||
|
"(define (cd n) (if (<= n 0) 1 (begin (tick) (cd (- n 1))))) (flow/start (sequence (timeout 4 (lambda (x) (cd x))) (timeout 4 (lambda (x) (cd 3))) (lambda (x) (begin (tick) (+ x 100)))) 3)")
|
||||||
|
101)
|
||||||
|
|
||||||
|
(define flow-ctl-tests-run! (fn () {:total (+ flow-ctl-pass flow-ctl-fail) :passed flow-ctl-pass :failed flow-ctl-fail :fails flow-ctl-fails}))
|
||||||
120
lib/flow/tests/distributed.sx
Normal file
120
lib/flow/tests/distributed.sx
Normal file
@@ -0,0 +1,120 @@
|
|||||||
|
;; lib/flow/tests/distributed.sx — Phase 4: distributed nodes via fed-sx (mocked).
|
||||||
|
|
||||||
|
(define flow-dist-pass 0)
|
||||||
|
(define flow-dist-fail 0)
|
||||||
|
(define flow-dist-fails (list))
|
||||||
|
|
||||||
|
(define
|
||||||
|
flow-dist-test
|
||||||
|
(fn
|
||||||
|
(name actual expected)
|
||||||
|
(if
|
||||||
|
(= actual expected)
|
||||||
|
(set! flow-dist-pass (+ flow-dist-pass 1))
|
||||||
|
(begin
|
||||||
|
(set! flow-dist-fail (+ flow-dist-fail 1))
|
||||||
|
(append! flow-dist-fails {:name name :expected expected :actual actual})))))
|
||||||
|
|
||||||
|
(define flow-d (fn (src) (flow-run src)))
|
||||||
|
|
||||||
|
;; ── remote-node ─────────────────────────────────────────────────
|
||||||
|
(flow-dist-test
|
||||||
|
"remote: a node executes on a peer"
|
||||||
|
(flow-d
|
||||||
|
"(flow-peer-register! (quote edge) (list (list (quote double) (lambda (x) (* x 2))))) (flow/start (remote-node (quote edge) (quote double)) 21)")
|
||||||
|
42)
|
||||||
|
(flow-dist-test
|
||||||
|
"remote: remote nodes compose in a sequence"
|
||||||
|
(flow-d
|
||||||
|
"(flow-peer-register! (quote edge) (list (list (quote inc) (lambda (x) (+ x 1))) (list (quote double) (lambda (x) (* x 2))))) (flow/start (sequence (remote-node (quote edge) (quote inc)) (remote-node (quote edge) (quote double))) 4)")
|
||||||
|
10)
|
||||||
|
(flow-dist-test
|
||||||
|
"remote: a remote node mixes with local nodes"
|
||||||
|
(flow-d
|
||||||
|
"(flow-peer-register! (quote edge) (list (list (quote double) (lambda (x) (* x 2))))) (flow/start (sequence (lambda (x) (+ x 5)) (remote-node (quote edge) (quote double)) (lambda (x) (- x 1))) 10)")
|
||||||
|
29)
|
||||||
|
(flow-dist-test
|
||||||
|
"remote: unreachable peer raises flow-remote-unreachable"
|
||||||
|
(flow-d
|
||||||
|
"(flow/start (try-catch (remote-node (quote ghost) (quote double)) (lambda (e) e)) 1)")
|
||||||
|
"flow-remote-unreachable")
|
||||||
|
(flow-dist-test
|
||||||
|
"remote: unknown function on a peer raises flow-remote-no-fn"
|
||||||
|
(flow-d
|
||||||
|
"(flow-peer-register! (quote edge) (list (list (quote double) (lambda (x) (* x 2))))) (flow/start (try-catch (remote-node (quote edge) (quote missing)) (lambda (e) e)) 1)")
|
||||||
|
"flow-remote-no-fn")
|
||||||
|
(flow-dist-test
|
||||||
|
"remote: a remote node can suspend the flow (peer returns control)"
|
||||||
|
(flow-d
|
||||||
|
"(flow-peer-register! (quote edge) (list (list (quote review) (lambda (x) x)))) (flow/start (sequence (remote-node (quote edge) (quote review)) (lambda (x) (suspend (quote human))) (lambda (v) (list (quote published) v))) 7)")
|
||||||
|
(list "flow-suspended" 1 "human"))
|
||||||
|
(flow-dist-test
|
||||||
|
"remote: a transient remote failure is recoverable with retry"
|
||||||
|
(flow-d
|
||||||
|
"(define hits 0) (flow-peer-register! (quote edge) (list (list (quote flaky) (lambda (x) (begin (set! hits (+ hits 1)) (if (< hits 2) (raise (quote down)) (* x 3))))))) (list (flow/start (retry 3 (remote-node (quote edge) (quote flaky))) 7) hits)")
|
||||||
|
(list 21 2))
|
||||||
|
|
||||||
|
;; ── failover (retry on a different peer, fall through to local) ──
|
||||||
|
(flow-dist-test
|
||||||
|
"failover: first reachable peer serves the request"
|
||||||
|
(flow-d
|
||||||
|
"(flow-peer-register! (quote p2) (list (list (quote f) (lambda (x) (+ x 100))))) (flow/start (remote-failover (list (quote p2) (quote down)) (quote f) (flow-const (quote local))) 5)")
|
||||||
|
105)
|
||||||
|
(flow-dist-test
|
||||||
|
"failover: skips an unreachable peer to the next one"
|
||||||
|
(flow-d
|
||||||
|
"(flow-peer-register! (quote p2) (list (list (quote f) (lambda (x) (+ x 100))))) (flow/start (remote-failover (list (quote down) (quote p2)) (quote f) (flow-const (quote local))) 5)")
|
||||||
|
105)
|
||||||
|
(flow-dist-test
|
||||||
|
"failover: skips a peer whose function raises"
|
||||||
|
(flow-d
|
||||||
|
"(flow-peer-register! (quote bad) (list (list (quote f) (lambda (x) (raise (quote boom)))))) (flow-peer-register! (quote good) (list (list (quote f) (lambda (x) (* x 10))))) (flow/start (remote-failover (list (quote bad) (quote good)) (quote f) (flow-const 0)) 4)")
|
||||||
|
40)
|
||||||
|
(flow-dist-test
|
||||||
|
"failover: all peers fail, the local fallback runs"
|
||||||
|
(flow-d
|
||||||
|
"(flow/start (remote-failover (list (quote down1) (quote down2)) (quote f) (lambda (x) (* x -1))) 9)")
|
||||||
|
-9)
|
||||||
|
(flow-dist-test
|
||||||
|
"failover: threads the input through to the chosen peer"
|
||||||
|
(flow-d
|
||||||
|
"(flow-peer-register! (quote p) (list (list (quote f) (lambda (x) (list (quote got) x))))) (flow/start (sequence (lambda (x) (+ x 1)) (remote-failover (list (quote p)) (quote f) (flow-const 0))) 41)")
|
||||||
|
(list "got" 42))
|
||||||
|
(flow-dist-test
|
||||||
|
"failover: composes inside a larger sequence"
|
||||||
|
(flow-d
|
||||||
|
"(flow-peer-register! (quote p) (list (list (quote f) (lambda (x) (* x 2))))) (flow/start (sequence (remote-failover (list (quote down) (quote p)) (quote f) (flow-const 1)) (lambda (x) (+ x 3))) 5)")
|
||||||
|
13)
|
||||||
|
|
||||||
|
;; ── replication + handoff ───────────────────────────────────────
|
||||||
|
(flow-dist-test
|
||||||
|
"replicate: a peer holds the exported store"
|
||||||
|
(flow-d
|
||||||
|
"(defflow w (lambda (x) (suspend (quote q)))) (flow/start w 10) (flow-replicate-to (quote peerB)) (if (flow-replica-get (quote peerB)) (quote replicated) (quote missing))")
|
||||||
|
"replicated")
|
||||||
|
(flow-dist-test
|
||||||
|
"handoff: a peer resumes a flow after the local instance dies"
|
||||||
|
(flow-d
|
||||||
|
"(defflow w (sequence (lambda (x) (suspend (quote q))) (lambda (v) (list (quote done) v)))) (define id (car (cdr (flow/start w 10)))) (flow-replicate-to (quote peerB)) (set! flow-store (list)) (flow-restore-from (quote peerB)) (flow/resume id 55)")
|
||||||
|
(list "done" 55))
|
||||||
|
(flow-dist-test
|
||||||
|
"handoff: restored peer reports the flow as resumable"
|
||||||
|
(flow-d
|
||||||
|
"(defflow w (lambda (x) (suspend (quote q)))) (define id (car (cdr (flow/start w 10)))) (flow-replicate-to (quote peerB)) (set! flow-store (list)) (flow-restore-from (quote peerB)) (flow-resumable-ids)")
|
||||||
|
(list 1))
|
||||||
|
(flow-dist-test
|
||||||
|
"handoff: without restore the dead instance has lost the flow"
|
||||||
|
(flow-d
|
||||||
|
"(defflow w (lambda (x) (suspend (quote q)))) (define id (car (cdr (flow/start w 10)))) (flow-replicate-to (quote peerB)) (set! flow-store (list)) (flow/resume id 1)")
|
||||||
|
(list "flow-error" "no-such-flow"))
|
||||||
|
(flow-dist-test
|
||||||
|
"restore: from an unknown peer yields false"
|
||||||
|
(flow-d "(flow-restore-from (quote nowhere))")
|
||||||
|
false)
|
||||||
|
(flow-dist-test
|
||||||
|
"handoff: replication preserves the replay log across the move"
|
||||||
|
(flow-d
|
||||||
|
"(defflow two (sequence (lambda (x) (suspend (quote a))) (lambda (x) (suspend (quote b))) (lambda (x) (list x)))) (define id (car (cdr (flow/start two 0)))) (flow/resume id 11) (flow-replicate-to (quote peerB)) (set! flow-store (list)) (flow-restore-from (quote peerB)) (flow/resume id 22)")
|
||||||
|
(list 22))
|
||||||
|
|
||||||
|
(define flow-dist-tests-run! (fn () {:total (+ flow-dist-pass flow-dist-fail) :passed flow-dist-pass :failed flow-dist-fail :fails flow-dist-fails}))
|
||||||
106
lib/flow/tests/host.sx
Normal file
106
lib/flow/tests/host.sx
Normal file
@@ -0,0 +1,106 @@
|
|||||||
|
;; lib/flow/tests/host.sx — Phase 8: host integration ABI (request/await/host-queue/driver).
|
||||||
|
|
||||||
|
(define flow-hst-pass 0)
|
||||||
|
(define flow-hst-fail 0)
|
||||||
|
(define flow-hst-fails (list))
|
||||||
|
|
||||||
|
(define
|
||||||
|
flow-hst-test
|
||||||
|
(fn
|
||||||
|
(name actual expected)
|
||||||
|
(if
|
||||||
|
(= actual expected)
|
||||||
|
(set! flow-hst-pass (+ flow-hst-pass 1))
|
||||||
|
(begin
|
||||||
|
(set! flow-hst-fail (+ flow-hst-fail 1))
|
||||||
|
(append! flow-hst-fails {:name name :expected expected :actual actual})))))
|
||||||
|
|
||||||
|
(define flow-hst (fn (src) (flow-run src)))
|
||||||
|
|
||||||
|
;; ── request envelope ────────────────────────────────────────────
|
||||||
|
(flow-hst-test
|
||||||
|
"request: suspends with a typed envelope"
|
||||||
|
(flow-hst
|
||||||
|
"(car (cdr (cdr (flow/start (lambda (x) (request (quote render) x)) 5))))")
|
||||||
|
(list "flow-request" "render" 5))
|
||||||
|
(flow-hst-test
|
||||||
|
"request?: recognizes an envelope"
|
||||||
|
(flow-hst "(request? (list (quote flow-request) (quote human) 1))")
|
||||||
|
true)
|
||||||
|
(flow-hst-test
|
||||||
|
"request?: a plain tag is not a request"
|
||||||
|
(flow-hst "(request? (list (quote review) 1))")
|
||||||
|
false)
|
||||||
|
(flow-hst-test
|
||||||
|
"request-kind / request-payload: parse the envelope"
|
||||||
|
(flow-hst
|
||||||
|
"(define t (list (quote flow-request) (quote render) (list (quote recipe) 7))) (list (request-kind t) (request-payload t))")
|
||||||
|
(list "render" (list "recipe" 7)))
|
||||||
|
|
||||||
|
;; ── named decision points ───────────────────────────────────────
|
||||||
|
(flow-hst-test
|
||||||
|
"await-human: is a request of kind human"
|
||||||
|
(flow-hst
|
||||||
|
"(car (cdr (cdr (flow/start (lambda (x) (await-human x)) (quote approve?)))))")
|
||||||
|
(list "flow-request" "human" "approve?"))
|
||||||
|
(flow-hst-test
|
||||||
|
"await-render: is a request of kind render"
|
||||||
|
(flow-hst
|
||||||
|
"(car (cdr (cdr (flow/start (lambda (x) (await-render x)) (quote recipe)))))")
|
||||||
|
(list "flow-request" "render" "recipe"))
|
||||||
|
(flow-hst-test
|
||||||
|
"request: the host's resume value flows back into the flow"
|
||||||
|
(flow-hst
|
||||||
|
"(defflow f (sequence (lambda (x) (await-render x)) (lambda (art) (list (quote got) art)))) (define id (car (cdr (flow/start f 1)))) (flow/resume id (quote the-artifact))")
|
||||||
|
(list "got" "the-artifact"))
|
||||||
|
|
||||||
|
;; ── host work queue ─────────────────────────────────────────────
|
||||||
|
(flow-hst-test
|
||||||
|
"flow-host-requests: lists (id kind payload) for pending requests"
|
||||||
|
(flow-hst
|
||||||
|
"(flow/start (lambda (x) (await-render x)) 99) (flow-host-requests)")
|
||||||
|
(list (list 1 "render" 99)))
|
||||||
|
(flow-hst-test
|
||||||
|
"flow-host-requests: excludes bare (non-request) suspends"
|
||||||
|
(flow-hst
|
||||||
|
"(defflow a (lambda (x) (await-render x))) (defflow b (lambda (x) (suspend (quote plain)))) (flow/start a 1) (flow/start b 2) (flow-host-requests)")
|
||||||
|
(list (list 1 "render" 1)))
|
||||||
|
|
||||||
|
;; ── the art-dag-shaped host driver loop (manual resumes) ────────
|
||||||
|
(flow-hst-test
|
||||||
|
"host driver: render then human-review then publish"
|
||||||
|
(flow-hst
|
||||||
|
"(defflow pipeline (sequence (lambda (recipe) (await-render recipe)) (lambda (art) (await-human (list (quote review) art))) (branch (lambda (d) (eq? d (quote approve))) (flow-const (quote published)) (flow-const (fail (quote rejected)))))) (define id (car (cdr (flow/start pipeline 99)))) (define r1 (flow-host-requests)) (flow/resume id (list (quote art) 99)) (define r2 (flow-host-requests)) (flow/resume id (quote approve)) (list r1 r2 (flow/status id) (flow/result id))")
|
||||||
|
(list
|
||||||
|
(list (list 1 "render" 99))
|
||||||
|
(list (list 1 "human" (list "review" (list "art" 99))))
|
||||||
|
"done"
|
||||||
|
"published"))
|
||||||
|
(flow-hst-test
|
||||||
|
"host driver: rejection at the human gate yields a failure"
|
||||||
|
(flow-hst
|
||||||
|
"(defflow pipeline (sequence (lambda (recipe) (await-render recipe)) (lambda (art) (await-human (list (quote review) art))) (branch (lambda (d) (eq? d (quote approve))) (flow-const (quote published)) (flow-const (fail (quote rejected)))))) (define id (car (cdr (flow/start pipeline 1)))) (flow/resume id (quote artifact)) (failed? (flow/resume id (quote reject)))")
|
||||||
|
true)
|
||||||
|
|
||||||
|
;; ── reference driver: host supplies only a dispatch fn ──────────
|
||||||
|
(flow-hst-test
|
||||||
|
"flow-drive-host: one tick services every pending request"
|
||||||
|
(flow-hst
|
||||||
|
"(flow/start (lambda (x) (await-render x)) 5) (define n (flow-drive-host (lambda (k p) (list (quote done) p)))) (list n (flow/status 1) (flow/result 1))")
|
||||||
|
(list 1 "done" (list "done" 5)))
|
||||||
|
(flow-hst-test
|
||||||
|
"flow-run-host: drives a render -> human pipeline to completion"
|
||||||
|
(flow-hst
|
||||||
|
"(defflow pipeline (sequence (lambda (recipe) (await-render recipe)) (lambda (art) (await-human (list (quote review) art))) (branch (lambda (d) (eq? d (quote approve))) (flow-const (quote published)) (flow-const (fail (quote rejected)))))) (define id (car (cdr (flow/start pipeline 99)))) (define serviced (flow-run-host (lambda (kind payload) (if (eq? kind (quote render)) (list (quote art) payload) (quote approve))) 10)) (list serviced (flow/status id) (flow/result id))")
|
||||||
|
(list 2 "done" "published"))
|
||||||
|
(flow-hst-test
|
||||||
|
"flow-run-host: returns 0 when nothing is pending"
|
||||||
|
(flow-hst "(flow-run-host (lambda (k p) p) 5)")
|
||||||
|
0)
|
||||||
|
(flow-hst-test
|
||||||
|
"flow-run-host: respects the maxticks bound"
|
||||||
|
(flow-hst
|
||||||
|
"(defflow pipe2 (sequence (lambda (r) (await-render r)) (lambda (a) (await-human a)) (lambda (d) d))) (define id (car (cdr (flow/start pipe2 1)))) (define serviced (flow-run-host (lambda (k p) p) 1)) (list serviced (flow/status id))")
|
||||||
|
(list 1 "suspended"))
|
||||||
|
|
||||||
|
(define flow-hst-tests-run! (fn () {:total (+ flow-hst-pass flow-hst-fail) :passed flow-hst-pass :failed flow-hst-fail :fails flow-hst-fails}))
|
||||||
67
lib/flow/tests/hygiene.sx
Normal file
67
lib/flow/tests/hygiene.sx
Normal file
@@ -0,0 +1,67 @@
|
|||||||
|
;; lib/flow/tests/hygiene.sx — Phase 5: store hygiene (flow/gc, flow/forget).
|
||||||
|
|
||||||
|
(define flow-hyg-pass 0)
|
||||||
|
(define flow-hyg-fail 0)
|
||||||
|
(define flow-hyg-fails (list))
|
||||||
|
|
||||||
|
(define
|
||||||
|
flow-hyg-test
|
||||||
|
(fn
|
||||||
|
(name actual expected)
|
||||||
|
(if
|
||||||
|
(= actual expected)
|
||||||
|
(set! flow-hyg-pass (+ flow-hyg-pass 1))
|
||||||
|
(begin
|
||||||
|
(set! flow-hyg-fail (+ flow-hyg-fail 1))
|
||||||
|
(append! flow-hyg-fails {:name name :expected expected :actual actual})))))
|
||||||
|
|
||||||
|
(define flow-h (fn (src) (flow-run src)))
|
||||||
|
|
||||||
|
;; ── flow/gc ─────────────────────────────────────────────────────
|
||||||
|
(flow-hyg-test
|
||||||
|
"gc: empty store removes nothing"
|
||||||
|
(flow-h "(flow/gc)")
|
||||||
|
0)
|
||||||
|
(flow-hyg-test
|
||||||
|
"gc: removes a done flow, keeps a suspended one"
|
||||||
|
(flow-h
|
||||||
|
"(defflow w (lambda (x) (suspend (quote q)))) (flow/start w 0) (flow/start (lambda (x) x) 5) (define removed (flow/gc)) (list removed (flow/list))")
|
||||||
|
(list 1 (list (list 1 "suspended"))))
|
||||||
|
(flow-hyg-test
|
||||||
|
"gc: removes a cancelled flow"
|
||||||
|
(flow-h
|
||||||
|
"(defflow w (lambda (x) (suspend (quote q)))) (define id (car (cdr (flow/start w 0)))) (flow/cancel id) (flow/gc)")
|
||||||
|
1)
|
||||||
|
(flow-hyg-test
|
||||||
|
"gc: a kept suspended flow is still resumable"
|
||||||
|
(flow-h
|
||||||
|
"(defflow w (sequence (lambda (x) (suspend (quote q))) (lambda (v) (* v 2)))) (define id (car (cdr (flow/start w 0)))) (flow/start (lambda (x) x) 1) (flow/gc) (flow/resume id 21)")
|
||||||
|
42)
|
||||||
|
(flow-hyg-test
|
||||||
|
"gc: counts every terminal flow it drops"
|
||||||
|
(flow-h
|
||||||
|
"(flow/start (lambda (x) x) 1) (flow/start (lambda (x) x) 2) (defflow w (lambda (x) (suspend (quote q)))) (flow/start w 0) (flow/gc)")
|
||||||
|
2)
|
||||||
|
|
||||||
|
;; ── flow/forget ─────────────────────────────────────────────────
|
||||||
|
(flow-hyg-test
|
||||||
|
"forget: drops a completed flow"
|
||||||
|
(flow-h
|
||||||
|
"(defflow w (sequence (lambda (x) (suspend (quote q))) (lambda (v) v))) (define id (car (cdr (flow/start w 0)))) (flow/resume id 7) (list (flow/forget id) (flow/status id))")
|
||||||
|
(list true "unknown"))
|
||||||
|
(flow-hyg-test
|
||||||
|
"forget: refuses to drop a live (suspended) flow"
|
||||||
|
(flow-h
|
||||||
|
"(defflow w (lambda (x) (suspend (quote q)))) (define id (car (cdr (flow/start w 0)))) (list (flow/forget id) (flow/status id))")
|
||||||
|
(list false "suspended"))
|
||||||
|
(flow-hyg-test
|
||||||
|
"forget: drops a cancelled flow"
|
||||||
|
(flow-h
|
||||||
|
"(defflow w (lambda (x) (suspend (quote q)))) (define id (car (cdr (flow/start w 0)))) (flow/cancel id) (list (flow/forget id) (flow/status id))")
|
||||||
|
(list true "unknown"))
|
||||||
|
(flow-hyg-test
|
||||||
|
"forget: unknown id yields false"
|
||||||
|
(flow-h "(flow/forget 999)")
|
||||||
|
false)
|
||||||
|
|
||||||
|
(define flow-hyg-tests-run! (fn () {:total (+ flow-hyg-pass flow-hyg-fail) :passed flow-hyg-pass :failed flow-hyg-fail :fails flow-hyg-fails}))
|
||||||
115
lib/flow/tests/integration.sx
Normal file
115
lib/flow/tests/integration.sx
Normal file
@@ -0,0 +1,115 @@
|
|||||||
|
;; lib/flow/tests/integration.sx — Phase 7: end-to-end flows composing every phase.
|
||||||
|
|
||||||
|
(define flow-int-pass 0)
|
||||||
|
(define flow-int-fail 0)
|
||||||
|
(define flow-int-fails (list))
|
||||||
|
|
||||||
|
(define
|
||||||
|
flow-int-test
|
||||||
|
(fn
|
||||||
|
(name actual expected)
|
||||||
|
(if
|
||||||
|
(= actual expected)
|
||||||
|
(set! flow-int-pass (+ flow-int-pass 1))
|
||||||
|
(begin
|
||||||
|
(set! flow-int-fail (+ flow-int-fail 1))
|
||||||
|
(append! flow-int-fails {:name name :expected expected :actual actual})))))
|
||||||
|
|
||||||
|
(define flow-i (fn (src) (flow-run src)))
|
||||||
|
|
||||||
|
;; The order-processing flow, defined once per program via this prelude string:
|
||||||
|
;; validate amount (attempt: fail if <= 0)
|
||||||
|
;; -> suspend for payment confirmation (resume value = confirmed amount)
|
||||||
|
;; -> branch: confirmed>0 ? record on the ledger peer : declined failure
|
||||||
|
(define
|
||||||
|
order-prelude
|
||||||
|
"(flow-peer-register! (quote ledger) (list (list (quote record) (lambda (amt) (list (quote recorded) amt)))))\n (defflow order\n (attempt\n (lambda (amt) (if (> amt 0) amt (fail (quote invalid-amount))))\n (lambda (amt) (suspend (quote await-payment)))\n (branch (lambda (amt) (> amt 0))\n (remote-node (quote ledger) (quote record))\n (flow-const (fail (quote declined))))))")
|
||||||
|
|
||||||
|
;; ── happy path through every phase ──────────────────────────────
|
||||||
|
(flow-int-test
|
||||||
|
"order: validate -> suspend -> resume -> branch -> federate"
|
||||||
|
(flow-i
|
||||||
|
(str
|
||||||
|
order-prelude
|
||||||
|
"(define id (car (cdr (flow/start order 100)))) (flow/resume id 250)"))
|
||||||
|
(list "recorded" 250))
|
||||||
|
(flow-int-test
|
||||||
|
"order: starting suspends awaiting payment"
|
||||||
|
(flow-i
|
||||||
|
(str
|
||||||
|
order-prelude
|
||||||
|
"(define s (flow/start order 100)) (list (car s) (car (cdr (cdr s))))"))
|
||||||
|
(list "flow-suspended" "await-payment"))
|
||||||
|
(flow-int-test
|
||||||
|
"order: invalid amount fails up front and never suspends"
|
||||||
|
(flow-i
|
||||||
|
(str
|
||||||
|
order-prelude
|
||||||
|
"(define r (flow/start order -5)) (list (failed? r) (fail-reason r))"))
|
||||||
|
(list true "invalid-amount"))
|
||||||
|
(flow-int-test
|
||||||
|
"order: a declined payment yields a failure value"
|
||||||
|
(flow-i
|
||||||
|
(str
|
||||||
|
order-prelude
|
||||||
|
"(define id (car (cdr (flow/start order 100)))) (failed? (flow/resume id 0))"))
|
||||||
|
true)
|
||||||
|
|
||||||
|
;; ── crash recovery mid-flow ─────────────────────────────────────
|
||||||
|
(flow-int-test
|
||||||
|
"order: survives a simulated crash between suspend and resume"
|
||||||
|
(flow-i
|
||||||
|
(str
|
||||||
|
order-prelude
|
||||||
|
"(define id (car (cdr (flow/start order 100)))) (define saved (flow-store-export)) (set! flow-store (list)) (flow-store-import! saved) (flow/resume id 250)"))
|
||||||
|
(list "recorded" 250))
|
||||||
|
|
||||||
|
;; ── handoff to a peer mid-flow ──────────────────────────────────
|
||||||
|
(flow-int-test
|
||||||
|
"order: hands off to a peer that resumes and completes"
|
||||||
|
(flow-i
|
||||||
|
(str
|
||||||
|
order-prelude
|
||||||
|
"(define id (car (cdr (flow/start order 100)))) (flow-replicate-to (quote nodeB)) (set! flow-store (list)) (flow-restore-from (quote nodeB)) (flow/resume id 250)"))
|
||||||
|
(list "recorded" 250))
|
||||||
|
|
||||||
|
;; ── introspection during the flow's life ────────────────────────
|
||||||
|
(flow-int-test
|
||||||
|
"order: pending shows what the flow awaits, then result after resume"
|
||||||
|
(flow-i
|
||||||
|
(str
|
||||||
|
order-prelude
|
||||||
|
"(define id (car (cdr (flow/start order 100)))) (define p (flow/pending)) (flow/resume id 250) (list p (flow/status id) (flow/result id))"))
|
||||||
|
(list
|
||||||
|
(list (list 1 "await-payment"))
|
||||||
|
"done"
|
||||||
|
(list "recorded" 250)))
|
||||||
|
|
||||||
|
;; ── onboarding: two human steps + cancellation ──────────────────
|
||||||
|
(define
|
||||||
|
onboard-prelude
|
||||||
|
"(defflow onboard\n (sequence\n (lambda (user) (+ user 1))\n (lambda (x) (suspend (quote confirm-email)))\n (lambda (x) (suspend (quote complete-profile)))\n (lambda (x) (list (quote onboarded) x))))")
|
||||||
|
|
||||||
|
(flow-int-test
|
||||||
|
"onboard: two suspends resume in order to completion"
|
||||||
|
(flow-i
|
||||||
|
(str
|
||||||
|
onboard-prelude
|
||||||
|
"(define id (car (cdr (flow/start onboard 0)))) (flow/resume id 7) (flow/resume id 9)"))
|
||||||
|
(list "onboarded" 9))
|
||||||
|
(flow-int-test
|
||||||
|
"onboard: the second pending tag appears after the first resume"
|
||||||
|
(flow-i
|
||||||
|
(str
|
||||||
|
onboard-prelude
|
||||||
|
"(define id (car (cdr (flow/start onboard 0)))) (flow/resume id 7) (car (cdr (car (flow/pending))))"))
|
||||||
|
"complete-profile")
|
||||||
|
(flow-int-test
|
||||||
|
"onboard: cancelling abandons the flow"
|
||||||
|
(flow-i
|
||||||
|
(str
|
||||||
|
onboard-prelude
|
||||||
|
"(define id (car (cdr (flow/start onboard 0)))) (flow/cancel id) (list (flow/status id) (car (flow/resume id 7)))"))
|
||||||
|
(list "cancelled" "flow-error"))
|
||||||
|
|
||||||
|
(define flow-int-tests-run! (fn () {:total (+ flow-int-pass flow-int-fail) :passed flow-int-pass :failed flow-int-fail :fails flow-int-fails}))
|
||||||
73
lib/flow/tests/railway.sx
Normal file
73
lib/flow/tests/railway.sx
Normal file
@@ -0,0 +1,73 @@
|
|||||||
|
;; lib/flow/tests/railway.sx — Phase 6: railway-oriented composition (attempt).
|
||||||
|
|
||||||
|
(define flow-rail-pass 0)
|
||||||
|
(define flow-rail-fail 0)
|
||||||
|
(define flow-rail-fails (list))
|
||||||
|
|
||||||
|
(define
|
||||||
|
flow-rail-test
|
||||||
|
(fn
|
||||||
|
(name actual expected)
|
||||||
|
(if
|
||||||
|
(= actual expected)
|
||||||
|
(set! flow-rail-pass (+ flow-rail-pass 1))
|
||||||
|
(begin
|
||||||
|
(set! flow-rail-fail (+ flow-rail-fail 1))
|
||||||
|
(append! flow-rail-fails {:name name :expected expected :actual actual})))))
|
||||||
|
|
||||||
|
(define flow-r (fn (src) (flow-run src)))
|
||||||
|
|
||||||
|
;; ── attempt — short-circuit on the first (fail ...) ─────────────
|
||||||
|
(flow-rail-test
|
||||||
|
"attempt: threads like sequence when nothing fails"
|
||||||
|
(flow-r
|
||||||
|
"(flow/start (attempt (lambda (x) (+ x 1)) (lambda (x) (* x 10))) 4)")
|
||||||
|
50)
|
||||||
|
(flow-rail-test
|
||||||
|
"attempt: empty is identity"
|
||||||
|
(flow-r "(flow/start (attempt) 7)")
|
||||||
|
7)
|
||||||
|
(flow-rail-test
|
||||||
|
"attempt: returns the first failure"
|
||||||
|
(flow-r
|
||||||
|
"(failed? (flow/start (attempt (lambda (x) (fail (quote bad))) (lambda (x) (* x 10))) 4))")
|
||||||
|
true)
|
||||||
|
(flow-rail-test
|
||||||
|
"attempt: the failure carries its reason"
|
||||||
|
(flow-r
|
||||||
|
"(fail-reason (flow/start (attempt (lambda (x) x) (lambda (x) (fail (quote rejected)))) 4))")
|
||||||
|
"rejected")
|
||||||
|
(flow-rail-test
|
||||||
|
"attempt: nodes after a failure do not run"
|
||||||
|
(flow-r
|
||||||
|
"(define ran 0) (flow/start (attempt (lambda (x) (fail (quote stop))) (lambda (x) (begin (set! ran (+ ran 1)) x))) 0) ran")
|
||||||
|
0)
|
||||||
|
(flow-rail-test
|
||||||
|
"attempt: a failed input short-circuits immediately"
|
||||||
|
(flow-r
|
||||||
|
"(define ran 0) (fail-reason (flow/start (attempt (lambda (x) (begin (set! ran (+ ran 1)) x))) (fail (quote pre))))")
|
||||||
|
"pre")
|
||||||
|
(flow-rail-test
|
||||||
|
"attempt: middle failure halts the chain"
|
||||||
|
(flow-r
|
||||||
|
"(define ran 0) (flow/start (attempt (lambda (x) (+ x 1)) (lambda (x) (fail (quote mid))) (lambda (x) (begin (set! ran (+ ran 1)) x))) 5) ran")
|
||||||
|
0)
|
||||||
|
|
||||||
|
;; ── attempt + recover (rejoin the happy track) ──────────────────
|
||||||
|
(flow-rail-test
|
||||||
|
"attempt + recover: recover turns a failure into a value"
|
||||||
|
(flow-r
|
||||||
|
"(flow/start (recover (attempt (lambda (x) (if (> x 0) x (fail (quote non-positive)))) (lambda (x) (* x 2))) (flow-const 0)) -5)")
|
||||||
|
0)
|
||||||
|
(flow-rail-test
|
||||||
|
"attempt + recover: happy path passes recover through"
|
||||||
|
(flow-r
|
||||||
|
"(flow/start (recover (attempt (lambda (x) (if (> x 0) x (fail (quote non-positive)))) (lambda (x) (* x 2))) (flow-const 0)) 5)")
|
||||||
|
10)
|
||||||
|
(flow-rail-test
|
||||||
|
"attempt: validation pipeline reports the failing stage"
|
||||||
|
(flow-r
|
||||||
|
"(defflow validate (attempt (lambda (s) (if (>= (string-length s) 3) s (fail (quote too-short)))) (lambda (s) (if (<= (string-length s) 8) s (fail (quote too-long)))) (lambda (s) (list (quote ok) (string-length s))))) (list (fail-reason (flow/start validate \"hi\")) (flow/start validate \"hello\"))")
|
||||||
|
(list "too-short" (list "ok" 5)))
|
||||||
|
|
||||||
|
(define flow-rail-tests-run! (fn () {:total (+ flow-rail-pass flow-rail-fail) :passed flow-rail-pass :failed flow-rail-fail :fails flow-rail-fails}))
|
||||||
71
lib/flow/tests/recovery.sx
Normal file
71
lib/flow/tests/recovery.sx
Normal file
@@ -0,0 +1,71 @@
|
|||||||
|
;; lib/flow/tests/recovery.sx — Phase 3: crash recovery (store export/import + restart).
|
||||||
|
;;
|
||||||
|
;; "restart" is simulated within one program: (set! flow-store (list)) wipes the
|
||||||
|
;; in-memory store (process death), while flow-registry persists as it would after
|
||||||
|
;; reloading flow definitions. Recovery = import the exported (plain-data) store and
|
||||||
|
;; resume; the flow proc is re-resolved by name.
|
||||||
|
|
||||||
|
(define flow-rec-pass 0)
|
||||||
|
(define flow-rec-fail 0)
|
||||||
|
(define flow-rec-fails (list))
|
||||||
|
|
||||||
|
(define
|
||||||
|
flow-rec-test
|
||||||
|
(fn
|
||||||
|
(name actual expected)
|
||||||
|
(if
|
||||||
|
(= actual expected)
|
||||||
|
(set! flow-rec-pass (+ flow-rec-pass 1))
|
||||||
|
(begin
|
||||||
|
(set! flow-rec-fail (+ flow-rec-fail 1))
|
||||||
|
(append! flow-rec-fails {:name name :expected expected :actual actual})))))
|
||||||
|
|
||||||
|
(define flow-r (fn (src) (flow-run src)))
|
||||||
|
|
||||||
|
;; ── export / wipe / import ──────────────────────────────────────
|
||||||
|
(flow-rec-test
|
||||||
|
"export nulls the live procedure"
|
||||||
|
(flow-r
|
||||||
|
"(defflow w (lambda (x) (suspend (quote await)))) (flow/start w 10) (car (cdr (car (cdr (car (flow-store-export))))))")
|
||||||
|
false)
|
||||||
|
(flow-rec-test
|
||||||
|
"a wiped store loses the flow (process death)"
|
||||||
|
(flow-r
|
||||||
|
"(defflow w (lambda (x) (suspend (quote await)))) (define id (car (cdr (flow/start w 10)))) (set! flow-store (list)) (flow/resume id 1)")
|
||||||
|
(list "flow-error" "no-such-flow"))
|
||||||
|
(flow-rec-test
|
||||||
|
"import restores a wiped store and resume completes"
|
||||||
|
(flow-r
|
||||||
|
"(defflow w (sequence (lambda (x) (suspend (quote await))) (lambda (c) (list (quote done) c)))) (define id (car (cdr (flow/start w 10)))) (define saved (flow-store-export)) (set! flow-store (list)) (flow-store-import! saved) (flow/resume id 777)")
|
||||||
|
(list "done" 777))
|
||||||
|
|
||||||
|
;; ── resumable scan ──────────────────────────────────────────────
|
||||||
|
(flow-rec-test
|
||||||
|
"resumable-ids lists the suspended flow after import"
|
||||||
|
(flow-r
|
||||||
|
"(defflow w (lambda (x) (suspend (quote await)))) (define id (car (cdr (flow/start w 10)))) (define saved (flow-store-export)) (set! flow-store (list)) (flow-store-import! saved) (flow-resumable-ids)")
|
||||||
|
(list 1))
|
||||||
|
(flow-rec-test
|
||||||
|
"resumable-ids excludes completed flows"
|
||||||
|
(flow-r
|
||||||
|
"(defflow w (sequence (lambda (x) (suspend (quote await))) (lambda (c) c))) (define id (car (cdr (flow/start w 10)))) (flow/resume id 5) (flow-resumable-ids)")
|
||||||
|
(list))
|
||||||
|
(flow-rec-test
|
||||||
|
"resumable-ids excludes cancelled flows after import"
|
||||||
|
(flow-r
|
||||||
|
"(defflow w (lambda (x) (suspend (quote await)))) (define id (car (cdr (flow/start w 10)))) (flow/cancel id) (define saved (flow-store-export)) (set! flow-store (list)) (flow-store-import! saved) (flow-resumable-ids)")
|
||||||
|
(list))
|
||||||
|
|
||||||
|
;; ── restart at every step ───────────────────────────────────────
|
||||||
|
(flow-rec-test
|
||||||
|
"two suspends survive a restart between each step"
|
||||||
|
(flow-r
|
||||||
|
"(defflow two (sequence (lambda (x) (suspend (quote a))) (lambda (x) (suspend (quote b))) (lambda (x) (list (quote end) x)))) (define id (car (cdr (flow/start two 0)))) (define s1 (flow-store-export)) (set! flow-store (list)) (flow-store-import! s1) (flow/resume id 100) (define s2 (flow-store-export)) (set! flow-store (list)) (flow-store-import! s2) (flow/resume id 200)")
|
||||||
|
(list "end" 200))
|
||||||
|
(flow-rec-test
|
||||||
|
"import preserves the replay log (earlier value survives restart)"
|
||||||
|
(flow-r
|
||||||
|
"(defflow two (sequence (lambda (x) (suspend (quote a))) (lambda (x) (suspend (quote b))) (lambda (x) (list x)))) (define id (car (cdr (flow/start two 0)))) (flow/resume id 11) (define saved (flow-store-export)) (set! flow-store (list)) (flow-store-import! saved) (flow/resume id 22)")
|
||||||
|
(list 22))
|
||||||
|
|
||||||
|
(define flow-rec-tests-run! (fn () {:total (+ flow-rec-pass flow-rec-fail) :passed flow-rec-pass :failed flow-rec-fail :fails flow-rec-fails}))
|
||||||
114
lib/flow/tests/suspend.sx
Normal file
114
lib/flow/tests/suspend.sx
Normal file
@@ -0,0 +1,114 @@
|
|||||||
|
;; lib/flow/tests/suspend.sx — Phase 3: suspend / resume / cancel (deterministic replay).
|
||||||
|
|
||||||
|
(define flow-sus-pass 0)
|
||||||
|
(define flow-sus-fail 0)
|
||||||
|
(define flow-sus-fails (list))
|
||||||
|
|
||||||
|
(define
|
||||||
|
flow-sus-test
|
||||||
|
(fn
|
||||||
|
(name actual expected)
|
||||||
|
(if
|
||||||
|
(= actual expected)
|
||||||
|
(set! flow-sus-pass (+ flow-sus-pass 1))
|
||||||
|
(begin
|
||||||
|
(set! flow-sus-fail (+ flow-sus-fail 1))
|
||||||
|
(append! flow-sus-fails {:name name :expected expected :actual actual})))))
|
||||||
|
|
||||||
|
(define flow-s (fn (src) (flow-run src)))
|
||||||
|
|
||||||
|
;; ── flow/start ──────────────────────────────────────────────────
|
||||||
|
(flow-sus-test
|
||||||
|
"start: non-suspending flow returns the raw result"
|
||||||
|
(flow-s "(flow/start (lambda (x) (* x 2)) 5)")
|
||||||
|
10)
|
||||||
|
(flow-sus-test
|
||||||
|
"start: a suspending flow returns a flow-suspended state"
|
||||||
|
(flow-s
|
||||||
|
"(defflow w (sequence (lambda (x) (+ x 1)) (lambda (g) (suspend (quote await))) (lambda (c) c))) (car (flow/start w 10))")
|
||||||
|
"flow-suspended")
|
||||||
|
(flow-sus-test
|
||||||
|
"start: suspended state carries a numeric id"
|
||||||
|
(flow-s
|
||||||
|
"(defflow w (lambda (x) (suspend (quote await)))) (car (cdr (flow/start w 10)))")
|
||||||
|
1)
|
||||||
|
(flow-sus-test
|
||||||
|
"start: suspended state carries the suspend tag"
|
||||||
|
(flow-s
|
||||||
|
"(defflow w (lambda (x) (suspend (quote await)))) (car (cdr (cdr (flow/start w 10))))")
|
||||||
|
"await")
|
||||||
|
|
||||||
|
;; ── flow/resume ─────────────────────────────────────────────────
|
||||||
|
(flow-sus-test
|
||||||
|
"resume: injects the value and completes"
|
||||||
|
(flow-s
|
||||||
|
"(defflow w (sequence (lambda (x) (+ x 1)) (lambda (g) (suspend (quote await))) (lambda (c) (list (quote done) c)))) (define s (flow/start w 10)) (flow/resume (car (cdr s)) 777)")
|
||||||
|
(list "done" 777))
|
||||||
|
(flow-sus-test
|
||||||
|
"resume: injected value threads into the next node"
|
||||||
|
(flow-s
|
||||||
|
"(defflow w (sequence (lambda (x) (suspend (quote v))) (lambda (n) (* n 3)))) (define s (flow/start w 0)) (flow/resume (car (cdr s)) 14)")
|
||||||
|
42)
|
||||||
|
(flow-sus-test
|
||||||
|
"resume: replays earlier suspends (recompute is deterministic)"
|
||||||
|
(flow-s
|
||||||
|
"(define runs 0) (defflow w (sequence (lambda (x) (begin (set! runs (+ runs 1)) (+ x 1))) (lambda (g) (suspend (quote await))) (lambda (c) c))) (define s (flow/start w 10)) (flow/resume (car (cdr s)) 99) runs")
|
||||||
|
2)
|
||||||
|
|
||||||
|
;; ── multi-step suspension ───────────────────────────────────────
|
||||||
|
(flow-sus-test
|
||||||
|
"multi: first resume suspends at the next tag"
|
||||||
|
(flow-s
|
||||||
|
"(defflow two (sequence (lambda (x) (suspend (quote a))) (lambda (x) (suspend (quote b))) (lambda (x) (list (quote end) x)))) (define s (flow/start two 0)) (define s2 (flow/resume (car (cdr s)) 100)) (car (cdr (cdr s2)))")
|
||||||
|
"b")
|
||||||
|
(flow-sus-test
|
||||||
|
"multi: second resume completes with the latest value"
|
||||||
|
(flow-s
|
||||||
|
"(defflow two (sequence (lambda (x) (suspend (quote a))) (lambda (x) (suspend (quote b))) (lambda (x) (list (quote end) x)))) (define id (car (cdr (flow/start two 0)))) (flow/resume id 100) (flow/resume id 200)")
|
||||||
|
(list "end" 200))
|
||||||
|
|
||||||
|
;; ── error / lifecycle guards ────────────────────────────────────
|
||||||
|
(flow-sus-test
|
||||||
|
"resume: completed flow cannot be resumed again"
|
||||||
|
(flow-s
|
||||||
|
"(defflow w (lambda (x) (suspend (quote q)))) (define id (car (cdr (flow/start w 0)))) (flow/resume id 1) (flow/resume id 2)")
|
||||||
|
(list "flow-error" "not-suspended"))
|
||||||
|
(flow-sus-test
|
||||||
|
"resume: unknown id errors"
|
||||||
|
(flow-s "(flow/resume 999 1)")
|
||||||
|
(list "flow-error" "no-such-flow"))
|
||||||
|
|
||||||
|
;; ── flow/cancel ─────────────────────────────────────────────────
|
||||||
|
(flow-sus-test
|
||||||
|
"cancel: returns a flow-cancelled state"
|
||||||
|
(flow-s
|
||||||
|
"(defflow w (lambda (x) (suspend (quote q)))) (define id (car (cdr (flow/start w 0)))) (flow/cancel id)")
|
||||||
|
(list "flow-cancelled" 1))
|
||||||
|
(flow-sus-test
|
||||||
|
"cancel: a cancelled flow cannot be resumed (stale resume rejected)"
|
||||||
|
(flow-s
|
||||||
|
"(defflow w (lambda (x) (suspend (quote q)))) (define id (car (cdr (flow/start w 0)))) (flow/cancel id) (flow/resume id 5)")
|
||||||
|
(list "flow-error" "not-suspended"))
|
||||||
|
(flow-sus-test
|
||||||
|
"cancel: unknown id errors"
|
||||||
|
(flow-s "(flow/cancel 999)")
|
||||||
|
(list "flow-error" "no-such-flow"))
|
||||||
|
|
||||||
|
;; ── composition ─────────────────────────────────────────────────
|
||||||
|
(flow-sus-test
|
||||||
|
"suspend inside a branch arm"
|
||||||
|
(flow-s
|
||||||
|
"(defflow gate (branch (lambda (x) (> x 0)) (lambda (x) (suspend (quote approve))) (flow-const (quote rejected)))) (define s (flow/start gate 5)) (flow/resume (car (cdr s)) (quote approved))")
|
||||||
|
"approved")
|
||||||
|
(flow-sus-test
|
||||||
|
"two independent runs get independent ids"
|
||||||
|
(flow-s
|
||||||
|
"(defflow w (lambda (x) (suspend (quote q)))) (list (car (cdr (flow/start w 0))) (car (cdr (flow/start w 0))))")
|
||||||
|
(list 1 2))
|
||||||
|
(flow-sus-test
|
||||||
|
"suspend reason may be a structured value"
|
||||||
|
(flow-s
|
||||||
|
"(defflow w (lambda (x) (suspend (list (quote needs) (quote approval))))) (car (cdr (cdr (flow/start w 0))))")
|
||||||
|
(list "needs" "approval"))
|
||||||
|
|
||||||
|
(define flow-sus-tests-run! (fn () {:total (+ flow-sus-pass flow-sus-fail) :passed flow-sus-pass :failed flow-sus-fail :fails flow-sus-fails}))
|
||||||
@@ -16,7 +16,7 @@ federation extension via fed-sx for remote-node execution.
|
|||||||
|
|
||||||
## Status (rolling)
|
## Status (rolling)
|
||||||
|
|
||||||
`bash lib/flow/conformance.sh` → **0/0** (not yet started)
|
`bash lib/flow/conformance.sh` → **166/166** (Phases 1-8 complete; host ABI + reference driver)
|
||||||
|
|
||||||
## Ground rules
|
## Ground rules
|
||||||
|
|
||||||
@@ -62,47 +62,167 @@ lib/flow/spec.sx lib/flow/runtime.sx lib/flow/store.sx
|
|||||||
|
|
||||||
## Phase 1 — Declarative DAG + sequential execution
|
## Phase 1 — Declarative DAG + sequential execution
|
||||||
|
|
||||||
- [ ] `lib/flow/spec.sx` — `defflow` macro, `sequence` combinator
|
- [x] `lib/flow/spec.sx` — `defflow` macro, `sequence` combinator
|
||||||
- [ ] node = Scheme thunk; output threads to next node (data flow)
|
- [x] node = Scheme procedure of one arg (upstream value threaded in); output
|
||||||
- [ ] `parallel` combinator (sequential semantics for now — TRUE parallelism in Phase 3)
|
threads to next node (data flow). A node ignoring its arg is a thunk.
|
||||||
- [ ] runtime executes a flow synchronously, returns final value
|
- [x] `parallel` combinator (sequential semantics for now — TRUE parallelism in Phase 3)
|
||||||
- [ ] `lib/flow/api.sx` — `(flow/start name args)` entry point
|
- [x] runtime executes a flow synchronously, returns final value
|
||||||
- [ ] `lib/flow/tests/basic.sx` — 15+ cases: linear sequence, nested sequences,
|
- [x] `lib/flow/api.sx` — `(flow/start flow input)` entry point
|
||||||
data flow between nodes, parallel-with-join
|
- [x] `lib/flow/tests/basic.sx` — 18 cases: single nodes, linear/nested sequence,
|
||||||
- [ ] `lib/flow/scoreboard.{json,md}`
|
data flow between nodes, parallel-with-join, publish-shaped flow
|
||||||
- [ ] `lib/flow/conformance.sh`
|
- [x] `lib/flow/scoreboard.{json,md}`
|
||||||
|
- [x] `lib/flow/conformance.sh`
|
||||||
|
|
||||||
## Phase 2 — Control flow + error handling
|
## Phase 2 — Control flow + error handling
|
||||||
|
|
||||||
- [ ] `cond` combinator — predicate selects branch
|
- [x] `cond` combinator — predicate selects branch (named `branch`; `cond` is a
|
||||||
- [ ] `retry n [backoff]` — re-runs node up to n times on exception
|
Scheme special form). `(branch pred then else)` — 6 tests.
|
||||||
- [ ] `timeout ms` — bounds node execution
|
- [x] `retry n` — re-runs node up to n attempts on a raised exception; last
|
||||||
- [ ] `try-catch` — exception handler with reified error
|
exception propagates. Only raised exceptions are retried — `(fail ...)` values
|
||||||
- [ ] error model — exceptions vs explicit `(fail :reason ...)` results
|
pass through. 6 tests. (Backoff deferred: no wall clock in pure SX.)
|
||||||
- [ ] `lib/flow/tests/control.sx` — 25+ cases: each combinator + composition
|
- [x] `timeout budget` — bounds node execution via a **cooperative step budget**
|
||||||
|
(deterministic; no scheduler/clock in pure SX). Nodes opt in via `(tick)`;
|
||||||
|
`budget` ticks allowed, the next raises `flow-timeout`. Non-ticking nodes are
|
||||||
|
unbounded; budgets nest. 7 tests.
|
||||||
|
- [x] `try-catch` — exception handler with reified error: `(try-catch node handler)`
|
||||||
|
runs node; on raise, calls `(handler error)` and returns its value. 6 tests.
|
||||||
|
- [x] error model — exceptions vs explicit `(fail reason)` results: `fail`/`failed?`/
|
||||||
|
`fail-reason` produce/inspect failure values that flow downstream as data
|
||||||
|
(distinct from raised exceptions caught by retry/try-catch). 6 tests.
|
||||||
|
- [x] `lib/flow/tests/control.sx` — 31 cases: branch, error model, try-catch,
|
||||||
|
retry, timeout + compositions
|
||||||
|
|
||||||
## Phase 3 — Suspend / resume (the showcase)
|
## Phase 3 — Suspend / resume (the showcase)
|
||||||
|
|
||||||
- [ ] `(suspend reason)` — `call/cc` captures continuation, returns flow-id to caller
|
- [x] `(suspend tag)` — guest call/cc is ESCAPE-ONLY (re-entry hangs), so resume
|
||||||
- [ ] `lib/flow/store.sx` — serialize flow state (continuation + open vars)
|
uses **deterministic replay**: suspend escapes to the driver as `(flow-suspended
|
||||||
- [ ] `(flow/resume id value)` — load continuation, inject value, re-enter
|
tag)`; resume re-runs the flow, replaying resolved suspends from a `(tag value)`
|
||||||
- [ ] `(flow/cancel id)` — explicit termination
|
log. No live continuation is ever serialized — the log is plain data.
|
||||||
- [ ] crash recovery — on restart, scan store for paused flows, mark resumable
|
- [x] `lib/flow/store.sx` — flow store: id→record `(flow input log status payload)`;
|
||||||
- [ ] `lib/flow/tests/suspend.sx` — pause-resume scenarios, cancellation, "restart"
|
`flow-drive` runs a flow against a replay log.
|
||||||
scenarios (simulated by re-loading store)
|
- [x] `(flow/resume id value)` — append `(tag value)` to the log, re-drive; raw
|
||||||
|
result on completion, `(flow-suspended id tag)` on a further suspend.
|
||||||
|
- [x] `(flow/cancel id)` — mark cancelled; a later resume is rejected (stale replay
|
||||||
|
cannot wake a cancelled flow).
|
||||||
|
- [x] crash recovery — `flow-store-export` (procs nulled → plain data),
|
||||||
|
`flow-store-import!`, `flow-resumable-ids`. Records are name-keyed; resume
|
||||||
|
re-resolves the proc by name (defflow registers names), so a flow survives a
|
||||||
|
wiped store. `tests/recovery.sx`, 8 cases (export/wipe/import, resumable scan,
|
||||||
|
restart-at-every-step, replay-log survival).
|
||||||
|
- [x] `lib/flow/tests/suspend.sx` — 17 cases: start/resume/cancel, multi-step,
|
||||||
|
replay determinism, lifecycle guards, suspend-in-branch
|
||||||
|
- Harness: `flow-run` now reuses one env with a per-test reset (building the full
|
||||||
|
standard env 66× was too slow) — see `api.sx`.
|
||||||
|
|
||||||
## Phase 4 — Distributed nodes via fed-sx
|
## Phase 4 — Distributed nodes via fed-sx
|
||||||
|
|
||||||
- [ ] `(remote-node addr fn args)` — execute node on a federation peer
|
- [x] `(remote-node addr fn)` — execute a node on a federation peer. Transport is
|
||||||
- [ ] failure semantics — retry on different peer, fall through to local
|
the fed-sx boundary, MOCKED via a peer registry (`flow-peer-register!`); raises
|
||||||
- [ ] persistence across instances — flow state replicates via fed-sx
|
`flow-remote-unreachable` / `flow-remote-no-fn`. Composes with sequence, suspend,
|
||||||
- [ ] handoff — flow started here can resume on a peer if the local instance is down
|
retry. `tests/distributed.sx`, 7 cases.
|
||||||
- [ ] `lib/flow/tests/distributed.sx` — federated flow scenarios (mock fed-sx in tests)
|
- [x] failure semantics — `(remote-failover addrs fn local)` tries each peer in
|
||||||
|
order, moves to the next on any raised error, and runs the `local` node if every
|
||||||
|
peer fails. 6 tests.
|
||||||
|
- [x] persistence across instances — `(flow-replicate-to addr)` copies this
|
||||||
|
instance's store (the plain-data export) to a peer's replica slot;
|
||||||
|
`(flow-restore-from addr)` imports it. Same mechanism as crash recovery, across
|
||||||
|
instances.
|
||||||
|
- [x] handoff — a flow started here resumes on a peer after the local instance dies:
|
||||||
|
replicate → wipe local store → restore on peer → `flow/resume`. The replay log
|
||||||
|
(and thus all resolved suspends) survives the move.
|
||||||
|
- [x] `lib/flow/tests/distributed.sx` — 19 cases: remote-node, failover,
|
||||||
|
replication, handoff (including replay-log survival across the move)
|
||||||
|
|
||||||
|
## Phase 5 — Operational API + combinator library
|
||||||
|
|
||||||
|
The four roadmap phases are complete; this phase rounds out the engine into
|
||||||
|
something operators and authors actually use. Accumulation, not a rewrite.
|
||||||
|
|
||||||
|
- [x] introspection API — `flow/status id`, `flow/result id`, `flow/list`,
|
||||||
|
`flow/pending` (operator view of what each suspended flow awaits). 12 tests in
|
||||||
|
`tests/api.sx`.
|
||||||
|
- [x] store hygiene — `flow/gc` drops terminal (done/cancelled) records keeping
|
||||||
|
live suspended flows (returns count); `flow/forget id` drops one terminal record
|
||||||
|
and refuses live flows. Bounds unbounded store growth. 9 tests in `tests/hygiene.sx`.
|
||||||
|
- [x] `tap` — side-effecting pass-through node (logging/metrics) that returns input
|
||||||
|
- [x] `recover` — complement to try-catch for the fail-VALUE channel: run node; if it
|
||||||
|
yields `(fail ...)`, run a recovery node on the reason
|
||||||
|
- [x] `map-flow` — run a flow per item of a list, join results (sequential)
|
||||||
|
- [x] `flow-while` / `flow-until` — bounded iteration: re-run body threading the
|
||||||
|
value while/until pred holds, capped at `max` steps (deterministic bound)
|
||||||
|
- [x] `lib/flow/tests/api.sx` (12) + `lib/flow/tests/combinators.sx` (17)
|
||||||
|
|
||||||
|
## Phase 6 — Railway-oriented composition
|
||||||
|
|
||||||
|
Make the `(fail reason)` value channel compose into real validation/ETL pipelines.
|
||||||
|
|
||||||
|
- [x] `attempt` — like `sequence`, but short-circuits at the first node that returns
|
||||||
|
a `(fail ...)` value, returning that failure (the railway track). Pairs with
|
||||||
|
`recover` for the rejoin.
|
||||||
|
- [x] `lib/flow/tests/railway.sx` — 10 cases: fail short-circuiting, no-run-after-
|
||||||
|
failure, recover rejoin, validation pipeline reporting the failing stage
|
||||||
|
|
||||||
|
## Phase 8 — Host integration ABI (art-dag / human-in-the-loop)
|
||||||
|
|
||||||
|
`suspend` is the seam to the outside world, but a bare tag is an ad-hoc convention.
|
||||||
|
This phase defines a stable request/response contract a host (an art-dag driver, a
|
||||||
|
review UI) codes against — so flow can orchestrate art-dag with human decision
|
||||||
|
points later without reverse-engineering tag shapes. `lib/flow/host.sx`.
|
||||||
|
|
||||||
|
- [x] `(request kind payload)` — suspend with a typed `(flow-request kind payload)`
|
||||||
|
envelope; evaluates to the host's resume value. `await-human`/`await-render`/
|
||||||
|
`await-effect` sugar.
|
||||||
|
- [x] `(flow-host-requests)` — the host work queue: `(id kind payload)` for every
|
||||||
|
suspended flow waiting on a host request; `request?`/`request-kind`/
|
||||||
|
`request-payload` parse a tag.
|
||||||
|
- [x] `(flow-drive-host dispatch)` / `(flow-run-host dispatch maxticks)` — reference
|
||||||
|
host driver: the host supplies only a `(kind payload) -> answer` dispatch fn; the
|
||||||
|
loop drains pending requests and resumes until quiescent (bounded).
|
||||||
|
- [x] `lib/flow/tests/host.sx` — 15 cases incl. the art-dag-shaped driver loop
|
||||||
|
(render → human-review → publish) run both manually and via `flow-run-host`.
|
||||||
|
- Contract (documented in `host.sx` + README): the host owns IO + persistence; a
|
||||||
|
flow never does IO, it only `request`s; the host performs the effect and feeds the
|
||||||
|
result back via resume (logged, so not re-run on recovery). NOT done here (host
|
||||||
|
side, out of `lib/flow` scope): the real Celery/IPFS bridge and a persistent store
|
||||||
|
backend — those live in the art-dag integration, coding against this ABI.
|
||||||
|
|
||||||
|
## Phase 7 — End-to-end integration
|
||||||
|
|
||||||
|
Prove the phases compose: realistic flows exercising attempt + suspend + branch +
|
||||||
|
remote-node + crash-recovery + handoff + introspection together.
|
||||||
|
|
||||||
|
- [x] `lib/flow/tests/integration.sx` — 10 cases: an order-processing flow (validate
|
||||||
|
→ payment suspend → branch → ledger federation) and an onboarding flow, run through
|
||||||
|
the full lifecycle including a simulated crash and a peer handoff mid-flow, plus
|
||||||
|
introspection (`flow/pending`/`status`/`result`) during the flow's life
|
||||||
|
|
||||||
## Progress log
|
## Progress log
|
||||||
|
|
||||||
(loop fills this in)
|
- **Phase 1 (combinators + sequential runtime).** Flow built as a Scheme prelude
|
||||||
|
loaded onto `scheme-standard-env`: a flow is a Scheme procedure `input -> output`,
|
||||||
|
so the whole flow runs inside the interpreter (sets up Phase 3 call/cc suspend).
|
||||||
|
Combinators `flow-node`/`flow-id`/`flow-const`/`sequence`/`parallel`/`defflow` in
|
||||||
|
`spec.sx`; `flow/start` + SX helpers (`flow-make-env`/`flow-run`) in `api.sx`.
|
||||||
|
18/18 in `tests/basic.sx`. Substrate constraints found: dotted rest params
|
||||||
|
`(a . rest)` and named `let` are unsupported in `lib/scheme/eval.sx`, so
|
||||||
|
combinators use `(lambda args ...)` variadics + top-level recursion. Scheme
|
||||||
|
strings come back boxed as `{:scm-string "..."}` — unwrap with `(get s :scm-string)`.
|
||||||
|
|
||||||
|
- **Phases 2-4.** Control flow (branch/retry/timeout/try-catch + fail-value error
|
||||||
|
model), then the showcase: durable suspend/resume. Guest call/cc is escape-only
|
||||||
|
(re-entry hangs), so resume uses **deterministic replay** — re-run the flow,
|
||||||
|
replaying resolved suspends from a `(tag value)` log; only plain data persists, so
|
||||||
|
flows survive a wiped store (crash recovery) and a move to another instance
|
||||||
|
(replication + handoff). Phase 4 models the fed-sx boundary with a mock peer
|
||||||
|
registry. Timeout is a cooperative step budget (no wall clock in pure SX). Test
|
||||||
|
harness reuses one env with a per-test reset for speed.
|
||||||
|
|
||||||
|
- **Phases 5-7 + docs.** Operational API (introspection, hygiene), combinator
|
||||||
|
library (tap/recover/map-flow/while/until), railway `attempt`, end-to-end
|
||||||
|
integration suite, and `lib/flow/README.md` (full API reference + replay-semantics
|
||||||
|
contract). **151/151 across 10 suites.** Conformance sx_server timeout raised to
|
||||||
|
540s for the 10-suite run under shared-machine CPU contention.
|
||||||
|
|
||||||
## Blockers
|
## Blockers
|
||||||
|
|
||||||
(loop fills this in)
|
(none)
|
||||||
|
|||||||
Reference in New Issue
Block a user