Compare commits
28 Commits
loops/go
...
loops/flow
| Author | SHA1 | Date | |
|---|---|---|---|
| 9cfca1d008 | |||
| 3cbf33d2d2 | |||
| c2d628e9c3 | |||
| aabb950256 | |||
| 2b47b2925c | |||
| d9b9da3843 | |||
| 0a1b89c975 | |||
| 0e6ba55647 | |||
| c1d24eb9b3 | |||
| 16cb727406 | |||
| f8722b3b08 | |||
| e1f802cfff | |||
| 97c7623743 | |||
| e896deffc8 | |||
| e762cc2e32 | |||
| 4674620d7e | |||
| f3da3b975a | |||
| 1731476dc6 | |||
| 65cbdb8387 | |||
| 91ffba9975 | |||
| c3a0727645 | |||
| 1b94082a71 | |||
| 57184daaee | |||
| d9e2627b89 | |||
| bcabed6bce | |||
| 5098a8f015 | |||
| 9fe5c9044d | |||
| c6f397c3d9 |
@@ -1561,7 +1561,66 @@
|
||||
(er-register-pure-bif! "crypto" "hash" 2 er-bif-crypto-hash)
|
||||
(er-register-pure-bif! "cid" "from_bytes" 1 er-bif-cid-from-bytes)
|
||||
(er-register-pure-bif! "cid" "to_string" 1 er-bif-cid-to-string)
|
||||
|
||||
;; ── binary_to_list / list_to_binary (Step 3b — term codec) ──────
|
||||
;; Standard Erlang semantics:
|
||||
;; binary_to_list(<<B1,B2,...>>) -> [B1, B2, ...] (Erlang cons of ints)
|
||||
;; list_to_binary(IoList) -> <<...>> (flattens nested
|
||||
;; iolists; elements are byte ints 0-255 or binaries)
|
||||
;; Bad arg / out-of-range byte / non-iolist element -> error:badarg.
|
||||
|
||||
(define er-bif-binary-to-list
|
||||
(fn (vs)
|
||||
(let ((v (nth vs 0)))
|
||||
(cond
|
||||
(not (er-binary? v))
|
||||
(raise (er-mk-error-marker (er-mk-atom "badarg")))
|
||||
:else
|
||||
(let ((bs (get v :bytes)) (out (er-mk-nil)))
|
||||
(for-each
|
||||
(fn (i)
|
||||
(set! out (er-mk-cons (nth bs (- (- (len bs) 1) i)) out)))
|
||||
(range 0 (len bs)))
|
||||
out)))))
|
||||
|
||||
;; Walk an Erlang iolist, appending bytes to `acc` (a mutable SX list).
|
||||
;; Accepts: nil, cons-of-X, binary, integer in 0..255. Anything else
|
||||
;; signals failure by setting (nth fail 0) to true.
|
||||
(define er-iolist-walk!
|
||||
(fn (v acc fail)
|
||||
(cond
|
||||
(nth fail 0) nil
|
||||
(er-nil? v) nil
|
||||
(er-cons? v)
|
||||
(do (er-iolist-walk! (get v :head) acc fail)
|
||||
(er-iolist-walk! (get v :tail) acc fail))
|
||||
(er-binary? v)
|
||||
(for-each
|
||||
(fn (i) (append! acc (nth (get v :bytes) i)))
|
||||
(range 0 (len (get v :bytes))))
|
||||
(= (type-of v) "number")
|
||||
(cond
|
||||
(and (>= v 0) (<= v 255)) (append! acc v)
|
||||
:else (set-nth! fail 0 true))
|
||||
:else (set-nth! fail 0 true))))
|
||||
|
||||
(define er-bif-list-to-binary
|
||||
(fn (vs)
|
||||
(let ((v (nth vs 0)) (acc (list)) (fail (list false)))
|
||||
(cond
|
||||
(not (or (er-nil? v) (er-cons? v) (er-binary? v)))
|
||||
(raise (er-mk-error-marker (er-mk-atom "badarg")))
|
||||
:else
|
||||
(do
|
||||
(er-iolist-walk! v acc fail)
|
||||
(cond
|
||||
(nth fail 0)
|
||||
(raise (er-mk-error-marker (er-mk-atom "badarg")))
|
||||
:else (er-mk-binary acc)))))))
|
||||
|
||||
(er-register-bif! "file" "list_dir" 1 er-bif-file-list-dir)
|
||||
(er-register-pure-bif! "erlang" "binary_to_list" 1 er-bif-binary-to-list)
|
||||
(er-register-pure-bif! "erlang" "list_to_binary" 1 er-bif-list-to-binary)
|
||||
(er-mk-atom "ok")))
|
||||
|
||||
;; Register everything at load time.
|
||||
|
||||
@@ -1,18 +1,18 @@
|
||||
{
|
||||
"language": "erlang",
|
||||
"total_pass": 729,
|
||||
"total": 729,
|
||||
"total_pass": 761,
|
||||
"total": 761,
|
||||
"suites": [
|
||||
{"name":"tokenize","pass":62,"total":62,"status":"ok"},
|
||||
{"name":"parse","pass":52,"total":52,"status":"ok"},
|
||||
{"name":"eval","pass":385,"total":385,"status":"ok"},
|
||||
{"name":"eval","pass":408,"total":408,"status":"ok"},
|
||||
{"name":"runtime","pass":93,"total":93,"status":"ok"},
|
||||
{"name":"ring","pass":4,"total":4,"status":"ok"},
|
||||
{"name":"ping-pong","pass":4,"total":4,"status":"ok"},
|
||||
{"name":"bank","pass":8,"total":8,"status":"ok"},
|
||||
{"name":"echo","pass":7,"total":7,"status":"ok"},
|
||||
{"name":"fib","pass":8,"total":8,"status":"ok"},
|
||||
{"name":"ffi","pass":28,"total":28,"status":"ok"},
|
||||
{"name":"ffi","pass":37,"total":37,"status":"ok"},
|
||||
{"name":"vm","pass":78,"total":78,"status":"ok"}
|
||||
]
|
||||
}
|
||||
|
||||
@@ -1,19 +1,19 @@
|
||||
# Erlang-on-SX Scoreboard
|
||||
|
||||
**Total: 729 / 729 tests passing**
|
||||
**Total: 761 / 761 tests passing**
|
||||
|
||||
| | Suite | Pass | Total |
|
||||
|---|---|---|---|
|
||||
| ✅ | tokenize | 62 | 62 |
|
||||
| ✅ | parse | 52 | 52 |
|
||||
| ✅ | eval | 385 | 385 |
|
||||
| ✅ | eval | 408 | 408 |
|
||||
| ✅ | runtime | 93 | 93 |
|
||||
| ✅ | ring | 4 | 4 |
|
||||
| ✅ | ping-pong | 4 | 4 |
|
||||
| ✅ | bank | 8 | 8 |
|
||||
| ✅ | echo | 7 | 7 |
|
||||
| ✅ | fib | 8 | 8 |
|
||||
| ✅ | ffi | 28 | 28 |
|
||||
| ✅ | ffi | 37 | 37 |
|
||||
| ✅ | vm | 78 | 78 |
|
||||
|
||||
|
||||
|
||||
@@ -228,9 +228,10 @@
|
||||
(er-eval-test "tuple_size 0" (ev "tuple_size({})") 0)
|
||||
|
||||
;; ── BIFs: atom / list conversions ───────────────────────────────
|
||||
(er-eval-test "atom_to_list" (ev "atom_to_list(hello)") "hello")
|
||||
(er-eval-test "atom_to_list -> charlist length" (ev "length(atom_to_list(hello))") 5)
|
||||
(er-eval-test "atom_to_list -> head $h" (ev "hd(atom_to_list(hello))") 104)
|
||||
(er-eval-test "list_to_atom roundtrip"
|
||||
(nm (ev "list_to_atom(atom_to_list(foo))")) "foo")
|
||||
(nm (ev "list_to_atom(atom_to_list(foo))")) "foo") ;; round-trip via charlist
|
||||
(er-eval-test "list_to_atom fresh"
|
||||
(nm (ev "list_to_atom(\"bar\")")) "bar")
|
||||
|
||||
@@ -1060,11 +1061,13 @@
|
||||
(er-eval-test "list_to_tuple roundtrip"
|
||||
(ev "tuple_size(list_to_tuple([10, 20, 30]))") 3)
|
||||
|
||||
(er-eval-test "integer_to_list" (ev "integer_to_list(42)") "42")
|
||||
(er-eval-test "integer_to_list neg" (ev "integer_to_list(-99)") "-99")
|
||||
(er-eval-test "integer_to_list -> charlist length" (ev "length(integer_to_list(42))") 2)
|
||||
(er-eval-test "integer_to_list 42 head $4" (ev "hd(integer_to_list(42))") 52)
|
||||
(er-eval-test "integer_to_list neg -> charlist length" (ev "length(integer_to_list(-99))") 3)
|
||||
(er-eval-test "integer_to_list -99 head $-" (ev "hd(integer_to_list(-99))") 45)
|
||||
(er-eval-test "list_to_integer" (ev "list_to_integer(\"123\")") 123)
|
||||
(er-eval-test "list_to_integer roundtrip"
|
||||
(ev "list_to_integer(integer_to_list(7))") 7)
|
||||
(ev "list_to_integer(integer_to_list(7))") 7) ;; round-trip via charlist
|
||||
|
||||
(er-eval-test "is_function fun"
|
||||
(nm (ev "F = fun (X) -> X end, is_function(F)")) "true")
|
||||
@@ -1341,6 +1344,42 @@
|
||||
(get (nth (get er-rt-cap-result :elements) 4) :name) "true")
|
||||
|
||||
|
||||
|
||||
;; ── $X char literals (Step 3b substrate fix 2026-06-04) ──────────
|
||||
(er-eval-test "char $A" (ev "$A") 65)
|
||||
(er-eval-test "char $a" (ev "$a") 97)
|
||||
(er-eval-test "char $0 is digit, not escape-NUL" (ev "$0") 48)
|
||||
(er-eval-test "char $\\n is newline (10)" (ev "$\\n") 10)
|
||||
(er-eval-test "char $\\t is tab (9)" (ev "$\\t") 9)
|
||||
(er-eval-test "char $\\r is CR (13)" (ev "$\\r") 13)
|
||||
(er-eval-test "char $\\s is space (32)" (ev "$\\s") 32)
|
||||
(er-eval-test "char $\\0 is NUL (0)" (ev "$\\0") 0)
|
||||
(er-eval-test "char $\\\\ is backslash (92)" (ev "$\\\\") 92)
|
||||
(er-eval-test "[$h,$i] head is 104" (ev "hd([$h, $i])") 104)
|
||||
(er-eval-test "list_to_binary char-list -> bytes"
|
||||
(ev "byte_size(list_to_binary([$f, $e, $d]))") 3)
|
||||
(er-eval-test "list_to_binary char-list round-trip"
|
||||
(nm (ev "list_to_binary([$h, $i]) =:= <<104, 105>>")) "true")
|
||||
|
||||
|
||||
;; ── atom_to_list / integer_to_list charlist semantics (Step 3b substrate fix #3) ──
|
||||
(er-eval-test "atom_to_list hd is char code"
|
||||
(ev "hd(atom_to_list(hi))") 104)
|
||||
(er-eval-test "atom_to_list maps to bytes via list_to_binary"
|
||||
(ev "byte_size(list_to_binary(atom_to_list(hello)))") 5)
|
||||
(er-eval-test "atom_to_list -> list_to_binary -> bytes content"
|
||||
(nm (ev "list_to_binary(atom_to_list(ok)) =:= <<111, 107>>")) "true")
|
||||
(er-eval-test "integer_to_list 12345 -> 5 chars"
|
||||
(ev "length(integer_to_list(12345))") 5)
|
||||
(er-eval-test "integer_to_list -> bytes -> back"
|
||||
(ev "list_to_integer(integer_to_list(99999))") 99999)
|
||||
(er-eval-test "list_to_atom from charlist"
|
||||
(nm (ev "list_to_atom([$f, $o, $o])")) "foo")
|
||||
(er-eval-test "list_to_atom from SX-string back-compat"
|
||||
(nm (ev "list_to_atom(\"bar\")")) "bar")
|
||||
(er-eval-test "list_to_integer from charlist"
|
||||
(ev "list_to_integer([$1, $0, $0])") 100)
|
||||
|
||||
(define
|
||||
er-eval-test-summary
|
||||
(str "eval " er-eval-test-pass "/" er-eval-test-count))
|
||||
|
||||
@@ -160,6 +160,51 @@
|
||||
(ffi-nm (ffi-ev "element(2, file:list_dir(\"/no/such/dir/xyz\"))"))
|
||||
"enoent")
|
||||
|
||||
(er-ffi-test
|
||||
"binary_to_list <<1,2,3>> length"
|
||||
(ffi-ev "length(binary_to_list(<<1,2,3,4,5>>))")
|
||||
5)
|
||||
|
||||
(er-ffi-test
|
||||
"binary_to_list hd byte"
|
||||
(ffi-ev "hd(binary_to_list(<<7,8,9>>))")
|
||||
7)
|
||||
|
||||
(er-ffi-test
|
||||
"binary_to_list empty -> []"
|
||||
(ffi-nm (ffi-ev "case binary_to_list(<<>>) of [] -> empty end"))
|
||||
"empty")
|
||||
|
||||
(er-ffi-test
|
||||
"list_to_binary flat list bytes"
|
||||
(ffi-ev "byte_size(list_to_binary([1,2,3]))")
|
||||
3)
|
||||
|
||||
(er-ffi-test
|
||||
"list_to_binary nested iolist"
|
||||
(ffi-ev "byte_size(list_to_binary([1, <<2,3>>, [4, [5]]]))")
|
||||
5)
|
||||
|
||||
(er-ffi-test
|
||||
"list_to_binary round-trip via binary_to_list"
|
||||
(ffi-nm (ffi-ev "list_to_binary(binary_to_list(<<10,20,30>>)) =:= <<10,20,30>>"))
|
||||
"true")
|
||||
|
||||
(er-ffi-test
|
||||
"binary_to_list non-binary -> error:badarg"
|
||||
(ffi-nm (ffi-ev "try binary_to_list(42) catch error:badarg -> ok end"))
|
||||
"ok")
|
||||
|
||||
(er-ffi-test
|
||||
"list_to_binary out-of-range byte -> error:badarg"
|
||||
(ffi-nm (ffi-ev "try list_to_binary([300]) catch error:badarg -> ok end"))
|
||||
"ok")
|
||||
|
||||
(er-ffi-test
|
||||
"list_to_binary non-iolist -> error:badarg"
|
||||
(ffi-nm (ffi-ev "try list_to_binary(42) catch error:badarg -> ok end"))
|
||||
"ok")
|
||||
|
||||
;; ── Still deferred (no host primitive): httpc (HTTP client, v2),
|
||||
;; sqlite-* (v2 indexes). Assert NOT registered so a future iteration
|
||||
;; that wires them without updating this suite fails fast.
|
||||
|
||||
@@ -229,13 +229,37 @@
|
||||
(= ch "$")
|
||||
(do
|
||||
(er-advance! 1)
|
||||
(if
|
||||
(and (< pos src-len) (= (er-cur) "\\"))
|
||||
(do
|
||||
(er-advance! 1)
|
||||
(when (< pos src-len) (er-advance! 1)))
|
||||
(when (< pos src-len) (er-advance! 1)))
|
||||
(er-emit! "integer" (slice src start pos) start)
|
||||
;; Emit the char's decimal code as the integer token value
|
||||
;; (was: raw "$X" text — parse-number then returned nil).
|
||||
(let
|
||||
((code (cond
|
||||
(>= pos src-len) 0
|
||||
(= (er-cur) "\\")
|
||||
(do
|
||||
(er-advance! 1)
|
||||
(let ((esc (if (< pos src-len) (er-cur) "")))
|
||||
(when (< pos src-len) (er-advance! 1))
|
||||
(cond
|
||||
(= esc "n") 10
|
||||
(= esc "t") 9
|
||||
(= esc "r") 13
|
||||
(= esc "s") 32
|
||||
(= esc "b") 8
|
||||
(= esc "e") 27
|
||||
(= esc "f") 12
|
||||
(= esc "v") 11
|
||||
(= esc "d") 127
|
||||
(= esc "0") 0
|
||||
(= esc "\\") 92
|
||||
(= esc "\"") 34
|
||||
(= esc "'") 39
|
||||
(= esc "") 0
|
||||
:else (char->integer (nth (string->list esc) 0)))))
|
||||
:else
|
||||
(let ((c (er-cur)))
|
||||
(er-advance! 1)
|
||||
(char->integer (nth (string->list c) 0))))))
|
||||
(er-emit! "integer" (str code) start))
|
||||
(scan!))
|
||||
(er-lower? ch)
|
||||
(do
|
||||
|
||||
@@ -107,7 +107,12 @@
|
||||
(let
|
||||
((ty (get node :type)))
|
||||
(cond
|
||||
(= ty "integer") (parse-number (get node :value))
|
||||
(= ty "integer")
|
||||
(let ((n (parse-number (get node :value))))
|
||||
(cond
|
||||
(= n nil) (error (str "Erlang: invalid integer literal: "
|
||||
(get node :value)))
|
||||
:else (truncate n)))
|
||||
(= ty "float") (parse-number (get node :value))
|
||||
(= ty "atom") (er-mk-atom (get node :value))
|
||||
(= ty "string") (get node :value)
|
||||
@@ -821,16 +826,30 @@
|
||||
(len (get v :elements))
|
||||
(error "Erlang: tuple_size: not a tuple")))))
|
||||
|
||||
(define er-string->charlist
|
||||
(fn (s)
|
||||
(let ((cs (string->list s)) (out (er-mk-nil)))
|
||||
(for-each
|
||||
(fn (i)
|
||||
(set! out (er-mk-cons
|
||||
(char->integer (nth cs (- (- (len cs) 1) i)))
|
||||
out)))
|
||||
(range 0 (len cs)))
|
||||
out)))
|
||||
|
||||
(define
|
||||
er-bif-atom-to-list
|
||||
(fn
|
||||
(vs)
|
||||
(let
|
||||
((v (er-bif-arg1 vs "atom_to_list")))
|
||||
;; Standard Erlang: atom_to_list/1 returns an Erlang charlist
|
||||
;; (list of integer char codes). Was: SX string of :name —
|
||||
;; unusable from Erlang-land for [Char|T] / ++ / binary segments.
|
||||
(if
|
||||
(er-atom? v)
|
||||
(get v :name)
|
||||
(error "Erlang: atom_to_list: not an atom")))))
|
||||
(er-string->charlist (get v :name))
|
||||
(raise (er-mk-error-marker (er-mk-atom "badarg")))))))
|
||||
|
||||
(define
|
||||
er-bif-list-to-atom
|
||||
@@ -838,10 +857,11 @@
|
||||
(vs)
|
||||
(let
|
||||
((v (er-bif-arg1 vs "list_to_atom")))
|
||||
(if
|
||||
(= (type-of v) "string")
|
||||
(er-mk-atom v)
|
||||
(error "Erlang: list_to_atom: not a string")))))
|
||||
;; Accept Erlang charlist (cons of ints) or SX string.
|
||||
(let ((s (er-source-to-string v)))
|
||||
(cond
|
||||
(= s nil) (raise (er-mk-error-marker (er-mk-atom "badarg")))
|
||||
:else (er-mk-atom s))))))
|
||||
|
||||
;; ── lists module ─────────────────────────────────────────────────
|
||||
(define
|
||||
@@ -1597,10 +1617,12 @@
|
||||
(vs)
|
||||
(let
|
||||
((v (er-bif-arg1 vs "integer_to_list")))
|
||||
;; Standard Erlang: integer_to_list/1 returns an Erlang charlist
|
||||
;; (e.g. integer_to_list(42) -> [$4, $2] -> [52, 50]).
|
||||
(cond
|
||||
(not (= (type-of v) "number"))
|
||||
(raise (er-mk-error-marker (er-mk-atom "badarg")))
|
||||
:else (str v)))))
|
||||
:else (er-string->charlist (str v))))))
|
||||
|
||||
(define
|
||||
er-bif-list-to-integer
|
||||
@@ -1608,15 +1630,14 @@
|
||||
(vs)
|
||||
(let
|
||||
((v (er-bif-arg1 vs "list_to_integer")))
|
||||
(cond
|
||||
(not (= (type-of v) "string"))
|
||||
(raise (er-mk-error-marker (er-mk-atom "badarg")))
|
||||
:else (let
|
||||
((n (parse-number v)))
|
||||
(cond
|
||||
(= n nil)
|
||||
(raise (er-mk-error-marker (er-mk-atom "badarg")))
|
||||
:else n))))))
|
||||
;; Accept Erlang charlist (cons of ints) or SX string.
|
||||
(let ((s (er-source-to-string v)))
|
||||
(cond
|
||||
(= s nil) (raise (er-mk-error-marker (er-mk-atom "badarg")))
|
||||
:else (let ((n (parse-number s)))
|
||||
(cond
|
||||
(= n nil) (raise (er-mk-error-marker (er-mk-atom "badarg")))
|
||||
:else n)))))))
|
||||
|
||||
(define
|
||||
er-bif-is-function
|
||||
|
||||
141
lib/flow/README.md
Normal file
141
lib/flow/README.md
Normal file
@@ -0,0 +1,141 @@
|
||||
# flow — durable DAG workflows on Scheme
|
||||
|
||||
`flow` is a workflow engine for rose-ash: content pipelines (write → review →
|
||||
publish → federate), scheduled jobs, and multi-step user flows (signup, confirm,
|
||||
onboard) that **survive process restarts**. It is a thin Scheme prelude over the
|
||||
Scheme-on-SX guest (`lib/scheme/`); a flow runs *inside* the interpreter.
|
||||
|
||||
Run the suite: `bash lib/flow/conformance.sh` → **151/151 across 10 suites**.
|
||||
|
||||
## Model
|
||||
|
||||
A **flow** is just a Scheme procedure of one argument — the upstream value:
|
||||
|
||||
```
|
||||
node : input -> output
|
||||
```
|
||||
|
||||
Combinators build composite nodes out of child nodes. A node that ignores its
|
||||
argument is effectively a thunk. There is no separate "graph" object: composition
|
||||
*is* function composition, so flows are values you can name, pass, and nest.
|
||||
|
||||
```scheme
|
||||
(defflow publish
|
||||
(sequence
|
||||
(lambda (draft) (string-append draft "!"))
|
||||
(branch (lambda (post) (>= (string-length post) 3))
|
||||
(remote-node 'fed 'publish)
|
||||
(flow-const 'rejected))))
|
||||
|
||||
(flow/start publish "hello") ; => federated, or a (flow-suspended id tag) state
|
||||
```
|
||||
|
||||
## Building blocks (`spec.sx`)
|
||||
|
||||
| Combinator | Meaning |
|
||||
|---|---|
|
||||
| `(flow-node f)` / `(flow-id x)` / `(flow-const v)` | leaf nodes |
|
||||
| `(sequence n ...)` | thread input left-to-right |
|
||||
| `(parallel n ...)` | fan input to every child, join results into a list (sequential eval) |
|
||||
| `(map-flow node)` | run `node` over each item of a list input, join results |
|
||||
| `(flow-while pred body max)` / `(flow-until ...)` | bounded iteration (cap `max` steps) |
|
||||
| `(defflow name body)` | bind + register a named flow (so it survives restart) |
|
||||
|
||||
## Control flow + errors (`spec.sx`)
|
||||
|
||||
| Combinator | Meaning |
|
||||
|---|---|
|
||||
| `(branch pred then else)` | `pred` on input selects `then`/`else` (`cond` is a Scheme special form) |
|
||||
| `(retry n node)` | re-run on a *raised exception*, up to `n` attempts |
|
||||
| `(timeout budget node)` | cooperative **step budget**: nodes call `(tick)`; the `(budget+1)`-th tick raises `flow-timeout` |
|
||||
| `(try-catch node handler)` | catch a raised exception → `(handler error)` |
|
||||
| `(fail reason)` / `(failed? x)` / `(fail-reason x)` | explicit failure *values* (flow downstream as data) |
|
||||
| `(recover node handler)` | the fail-VALUE counterpart of try-catch |
|
||||
| `(attempt n ...)` | railway sequence: stop at the first node returning a `(fail ...)` |
|
||||
| `(tap effect)` | run a side effect, return input unchanged |
|
||||
|
||||
**Two error channels, on purpose.** Raised exceptions are for *bugs/transients*
|
||||
(caught by `retry`/`try-catch`). `(fail reason)` values are for *expected business
|
||||
outcomes* (validation rejected, declined) and compose via `attempt`/`recover`.
|
||||
|
||||
## Suspend / resume — the durable core (`spec.sx`, `store.sx`)
|
||||
|
||||
The guest Scheme's `call/cc` is **escape-only** — re-invoking a captured
|
||||
continuation after it returns *hangs* the runtime. So flow does **not** serialize
|
||||
continuations. Instead it uses **deterministic replay**:
|
||||
|
||||
- `(suspend tag)` — if `tag` is already in the replay log, return its logged value;
|
||||
otherwise escape to the driver as `(flow-suspended tag)`.
|
||||
- `resume` appends `(tag value)` to the log and **re-runs the flow from the start**.
|
||||
Already-resolved suspends replay their values; the first unresolved one escapes
|
||||
again (or the flow completes).
|
||||
|
||||
The entire persisted state is the replay log — plain data. No live continuation is
|
||||
ever stored, so flows survive process restarts and even moves between instances.
|
||||
|
||||
> **Author contract:** suspend `tag`s must be unique and deterministic across
|
||||
> replays, and **all** non-determinism / side effects must go through suspend
|
||||
> points (so their results are logged) — otherwise they re-run on every replay.
|
||||
|
||||
### Lifecycle (`store.sx`)
|
||||
|
||||
```scheme
|
||||
(flow/start flow input) ; raw result if it completes, else (flow-suspended id tag)
|
||||
(flow/resume id value) ; inject value at the waiting tag, continue
|
||||
(flow/cancel id) ; terminate; a later resume is rejected
|
||||
```
|
||||
|
||||
### Introspection & hygiene
|
||||
|
||||
```scheme
|
||||
(flow/status id) ; done | suspended | cancelled | unknown
|
||||
(flow/result id) ; result if done, else (flow-error reason)
|
||||
(flow/list) ; ((id status) ...)
|
||||
(flow/pending) ; ((id waiting-tag) ...) — what each suspended flow awaits
|
||||
(flow/gc) ; drop terminal records, keep live ones; returns count removed
|
||||
(flow/forget id) ; drop one terminal record (refuses live flows)
|
||||
```
|
||||
|
||||
### Crash recovery
|
||||
|
||||
```scheme
|
||||
(flow-store-export) ; the store as plain data (live procs nulled)
|
||||
(flow-store-import! d) ; restore the store from exported data
|
||||
(flow-resumable-ids) ; ids of suspended flows to wake on restart
|
||||
```
|
||||
|
||||
On restart the flow definitions are reloaded (`defflow` re-registers names) and the
|
||||
exported store reimported; `resume` re-resolves each flow's procedure **by name**.
|
||||
|
||||
## Distribution via fed-sx (`remote.sx`)
|
||||
|
||||
```scheme
|
||||
(flow-peer-register! addr table) ; mock a peer's exposed functions (fed-sx boundary)
|
||||
(remote-node addr fn) ; run a node on a peer
|
||||
(remote-failover addrs fn local) ; try peers in order, fall through to a local node
|
||||
(flow-replicate-to addr) ; copy this store to a peer's replica slot
|
||||
(flow-restore-from addr) ; import a peer's replica (handoff)
|
||||
```
|
||||
|
||||
**Handoff** is crash recovery across instances: replicate → local instance dies →
|
||||
peer restores the (plain-data) store and resumes. The replay log carries over, so
|
||||
all resolved suspends survive the move.
|
||||
|
||||
## Files
|
||||
|
||||
| File | Contents |
|
||||
|---|---|
|
||||
| `spec.sx` | combinators (flow-combinators-src / flow-control-src / flow-suspend-src) |
|
||||
| `store.sx` | durable store, lifecycle, crash recovery, introspection, hygiene |
|
||||
| `remote.sx` | fed-sx transport (mock peer registry), failover, replication |
|
||||
| `api.sx` | `flow-make-env` / `flow-run` SX helpers (one cached env, per-test reset) |
|
||||
| `tests/*.sx` | 10 suites, 151 cases |
|
||||
| `conformance.sh` | loads substrate + flow layer, runs every suite |
|
||||
|
||||
## Notes on the substrate
|
||||
|
||||
The guest Scheme (`lib/scheme/`, imported read-only) lacks dotted-rest params
|
||||
`(a . rest)` and named `let`; combinators use `(lambda args ...)` variadics + top-
|
||||
level recursion. `cons` is list-only (no dotted pairs), so log/assoc entries are
|
||||
2-element lists. Strings box as `{:scm-string "..."}`. Timeout is a step budget
|
||||
because there is no wall clock; `parallel` is sequential for the same reason.
|
||||
65
lib/flow/api.sx
Normal file
65
lib/flow/api.sx
Normal file
@@ -0,0 +1,65 @@
|
||||
;; lib/flow/api.sx — flow runtime entry points.
|
||||
;;
|
||||
;; Builds a Scheme env preloaded with the flow combinators (lib/flow/spec.sx),
|
||||
;; the durable store + lifecycle (lib/flow/store.sx), the fed-sx remote layer
|
||||
;; (lib/flow/remote.sx), and the host integration ABI (lib/flow/host.sx), and
|
||||
;; provides SX helpers to run flow programs.
|
||||
;;
|
||||
;; Scheme-level API (available inside flow programs):
|
||||
;; (flow/start flow input) — run a flow; raw result if it completes, else
|
||||
;; (flow-suspended id tag). Defined in store.sx.
|
||||
;; (flow/resume id value) — resume a suspended flow (store.sx)
|
||||
;; (flow/cancel id) — cancel a flow (store.sx)
|
||||
;; (suspend tag) — suspension point (spec.sx)
|
||||
;; (request kind payload) — host request envelope over suspend (host.sx)
|
||||
;; (remote-node addr fn) — node executed on a federation peer (remote.sx)
|
||||
;;
|
||||
;; SX-level helpers (for hosts and tests):
|
||||
;; (flow-make-env) — fresh standard env + combinators + store + remote + host
|
||||
;; (flow-run src) — eval a Scheme program string in a reset shared env
|
||||
;; (flow-run-in env src) — eval a Scheme program string in a given env
|
||||
;;
|
||||
;; flow-run reuses ONE env (building the full standard env is expensive) and
|
||||
;; resets the mutable flow globals before each program, so tests stay isolated
|
||||
;; without paying for a fresh standard env each time. flow-registry persists (it
|
||||
;; models reloaded flow definitions surviving a restart).
|
||||
|
||||
(define
|
||||
flow-make-env
|
||||
(fn
|
||||
()
|
||||
(let
|
||||
((env (scheme-standard-env)))
|
||||
(flow-load-combinators! env)
|
||||
(flow-load-store! env)
|
||||
(flow-load-remote! env)
|
||||
(flow-load-host! env)
|
||||
env)))
|
||||
|
||||
(define
|
||||
flow-run-in
|
||||
(fn (env src) (scheme-eval-program (scheme-parse-all src) env)))
|
||||
|
||||
(define
|
||||
flow-reset-src
|
||||
"(set! flow-store (list)) (set! flow-next-id 0) (set! flow-replay-log (list)) (set! flow-suspend-k #f) (set! flow-timeout-budget -1) (set! flow-peers (list)) (set! flow-replicas (list))")
|
||||
|
||||
(define flow-env-cache false)
|
||||
|
||||
(define
|
||||
flow-shared-env
|
||||
(fn
|
||||
()
|
||||
(begin
|
||||
(if flow-env-cache nil (set! flow-env-cache (flow-make-env)))
|
||||
flow-env-cache)))
|
||||
|
||||
(define
|
||||
flow-run
|
||||
(fn
|
||||
(src)
|
||||
(let
|
||||
((env (flow-shared-env)))
|
||||
(begin
|
||||
(scheme-eval-program (scheme-parse-all flow-reset-src) env)
|
||||
(scheme-eval-program (scheme-parse-all src) env)))))
|
||||
103
lib/flow/conformance.sh
Executable file
103
lib/flow/conformance.sh
Executable file
@@ -0,0 +1,103 @@
|
||||
#!/usr/bin/env bash
|
||||
# flow-on-sx conformance runner — runs all flow test suites in one sx_server process.
|
||||
#
|
||||
# Usage:
|
||||
# bash lib/flow/conformance.sh # run all suites
|
||||
# bash lib/flow/conformance.sh -v # verbose (list each suite)
|
||||
|
||||
set -uo pipefail
|
||||
cd "$(git rev-parse --show-toplevel)"
|
||||
|
||||
SX_SERVER="${SX_SERVER:-hosts/ocaml/_build/default/bin/sx_server.exe}"
|
||||
if [ ! -x "$SX_SERVER" ]; then
|
||||
SX_SERVER="/root/rose-ash/hosts/ocaml/_build/default/bin/sx_server.exe"
|
||||
fi
|
||||
if [ ! -x "$SX_SERVER" ]; then
|
||||
echo "ERROR: sx_server.exe not found." >&2
|
||||
exit 1
|
||||
fi
|
||||
|
||||
VERBOSE="${1:-}"
|
||||
|
||||
# Suites: NAME RUNNER-FN PATH
|
||||
SUITES=(
|
||||
"basic flow-basic-tests-run! lib/flow/tests/basic.sx"
|
||||
"control flow-ctl-tests-run! lib/flow/tests/control.sx"
|
||||
"suspend flow-sus-tests-run! lib/flow/tests/suspend.sx"
|
||||
"recovery flow-rec-tests-run! lib/flow/tests/recovery.sx"
|
||||
"distributed flow-dist-tests-run! lib/flow/tests/distributed.sx"
|
||||
"api flow-api-tests-run! lib/flow/tests/api.sx"
|
||||
"combinators flow-cmb-tests-run! lib/flow/tests/combinators.sx"
|
||||
"railway flow-rail-tests-run! lib/flow/tests/railway.sx"
|
||||
"integration flow-int-tests-run! lib/flow/tests/integration.sx"
|
||||
"hygiene flow-hyg-tests-run! lib/flow/tests/hygiene.sx"
|
||||
"host flow-hst-tests-run! lib/flow/tests/host.sx"
|
||||
)
|
||||
|
||||
TMPFILE=$(mktemp); trap "rm -f $TMPFILE" EXIT
|
||||
EPOCH=1
|
||||
|
||||
emit_load () { echo "(epoch $EPOCH)"; echo "(load \"$1\")"; EPOCH=$((EPOCH+1)); }
|
||||
emit_eval () { echo "(epoch $EPOCH)"; echo "(eval \"$1\")"; EPOCH=$((EPOCH+1)); }
|
||||
|
||||
{
|
||||
emit_load "lib/guest/lex.sx"
|
||||
emit_load "lib/guest/reflective/env.sx"
|
||||
emit_load "lib/guest/reflective/quoting.sx"
|
||||
emit_load "lib/scheme/parser.sx"
|
||||
emit_load "lib/scheme/eval.sx"
|
||||
emit_load "lib/scheme/runtime.sx"
|
||||
emit_load "lib/flow/spec.sx"
|
||||
emit_load "lib/flow/store.sx"
|
||||
emit_load "lib/flow/remote.sx"
|
||||
emit_load "lib/flow/host.sx"
|
||||
emit_load "lib/flow/api.sx"
|
||||
for SUITE in "${SUITES[@]}"; do
|
||||
read -r _NAME _RUNNER FILE <<< "$SUITE"
|
||||
emit_load "$FILE"
|
||||
emit_eval "($_RUNNER)"
|
||||
done
|
||||
} > "$TMPFILE"
|
||||
|
||||
OUTPUT=$(timeout 540 "$SX_SERVER" < "$TMPFILE" 2>&1 || true)
|
||||
|
||||
TOTAL_PASS=0
|
||||
TOTAL_FAIL=0
|
||||
FAILED_SUITES=()
|
||||
|
||||
LAST_DICT_LINES=$(echo "$OUTPUT" | grep -E '^\{:' || true)
|
||||
|
||||
I=0
|
||||
while read -r LINE; do
|
||||
[ -z "$LINE" ] && continue
|
||||
P=$(echo "$LINE" | grep -oE ':passed [0-9]+' | awk '{print $2}')
|
||||
F=$(echo "$LINE" | grep -oE ':failed [0-9]+' | awk '{print $2}')
|
||||
[ -z "$P" ] && P=0
|
||||
[ -z "$F" ] && F=0
|
||||
SUITE_INFO="${SUITES[$I]}"
|
||||
SUITE_NAME=$(echo "$SUITE_INFO" | awk '{print $1}')
|
||||
TOTAL_PASS=$((TOTAL_PASS + P))
|
||||
TOTAL_FAIL=$((TOTAL_FAIL + F))
|
||||
if [ "$F" -gt 0 ]; then
|
||||
FAILED_SUITES+=("$SUITE_NAME: $P/$((P+F))")
|
||||
printf 'X %-12s %d/%d\n' "$SUITE_NAME" "$P" "$((P+F))"
|
||||
echo "$LINE" | grep -oE ':name "[^"]*"' | sed 's/:name / fail: /'
|
||||
elif [ "$VERBOSE" = "-v" ]; then
|
||||
printf 'ok %-12s %d passed\n' "$SUITE_NAME" "$P"
|
||||
fi
|
||||
I=$((I+1))
|
||||
done <<< "$LAST_DICT_LINES"
|
||||
|
||||
TOTAL=$((TOTAL_PASS + TOTAL_FAIL))
|
||||
if [ "$TOTAL" -eq 0 ]; then
|
||||
echo "ERROR: no suite results parsed. Raw output:" >&2
|
||||
echo "$OUTPUT" >&2
|
||||
exit 1
|
||||
fi
|
||||
if [ $TOTAL_FAIL -eq 0 ]; then
|
||||
echo "ok $TOTAL_PASS/$TOTAL flow-on-sx tests passed (${#SUITES[@]} suites)"
|
||||
else
|
||||
echo "FAIL $TOTAL_PASS/$TOTAL passed, $TOTAL_FAIL failed:"
|
||||
for S in "${FAILED_SUITES[@]}"; do echo " $S"; done
|
||||
exit 1
|
||||
fi
|
||||
42
lib/flow/host.sx
Normal file
42
lib/flow/host.sx
Normal file
@@ -0,0 +1,42 @@
|
||||
;; lib/flow/host.sx — the host integration ABI (Phase 8).
|
||||
;;
|
||||
;; `suspend` is flow's seam to the outside world, but a bare (suspend tag) is just a
|
||||
;; signal — every author would invent their own tag shape. This layer defines a
|
||||
;; stable request/response contract so a host (e.g. an art-dag driver, or a human
|
||||
;; review UI) can hook in WITHOUT reverse-engineering ad-hoc tags.
|
||||
;;
|
||||
;; A flow asks the host to do something and waits for the answer:
|
||||
;; (request kind payload) — suspend with a typed envelope (flow-request kind
|
||||
;; payload); evaluates to the host's resume value.
|
||||
;; (await-human prompt) — request kind=human (a decision point)
|
||||
;; (await-render recipe) — request kind=render (e.g. an art-dag job)
|
||||
;; (await-effect kind p) — request of an arbitrary kind
|
||||
;;
|
||||
;; The host drives flows by polling its work queue and resuming:
|
||||
;; (flow-host-requests) — ((id kind payload) ...) for every SUSPENDED flow whose
|
||||
;; waiting tag is a host request. The host dispatches by kind (render -> submit a
|
||||
;; Celery job; human -> show UI), then calls (flow/resume id answer).
|
||||
;; (request? tag) / (request-kind tag) / (request-payload tag) — parse one tag.
|
||||
;;
|
||||
;; Reference driver — the host only supplies `dispatch`, a (kind payload) -> answer:
|
||||
;; (flow-drive-host dispatch) — one tick: service every CURRENTLY pending
|
||||
;; request (snapshot), resuming each with (dispatch kind payload); returns the
|
||||
;; count serviced. Resumes may create new requests — serviced on the next tick.
|
||||
;; (flow-run-host dispatch maxticks) — tick until quiescent (no pending requests)
|
||||
;; or maxticks reached; returns total requests serviced. Bounded for determinism.
|
||||
;;
|
||||
;; Contract: the host owns IO and persistence. flow stays deterministic — a flow
|
||||
;; never performs IO itself, it only `request`s; the host performs the effect and
|
||||
;; feeds the result back via resume (which the replay log records, so the effect is
|
||||
;; not re-run on recovery). Persist with flow-store-export after each transition and
|
||||
;; flow-store-import! on boot.
|
||||
|
||||
(define
|
||||
flow-host-src
|
||||
"(define (request kind payload) (suspend (list (quote flow-request) kind payload)))\n (define (request? tag) (and (pair? tag) (eq? (car tag) (quote flow-request))))\n (define (request-kind tag) (car (cdr tag)))\n (define (request-payload tag) (car (cdr (cdr tag))))\n (define (await-human prompt) (request (quote human) prompt))\n (define (await-render recipe) (request (quote render) recipe))\n (define (await-effect kind payload) (request kind payload))\n (define (flow-host-req-step pend)\n (if (null? pend)\n (list)\n (let ((id (car (car pend))) (tag (car (cdr (car pend)))))\n (if (request? tag)\n (cons (list id (request-kind tag) (request-payload tag))\n (flow-host-req-step (cdr pend)))\n (flow-host-req-step (cdr pend))))))\n (define (flow-host-requests) (flow-host-req-step (flow/pending)))\n (define (flow-drive-host-step reqs dispatch)\n (if (null? reqs)\n 0\n (begin\n (flow/resume (car (car reqs)) (dispatch (car (cdr (car reqs))) (car (cdr (cdr (car reqs))))))\n (+ 1 (flow-drive-host-step (cdr reqs) dispatch)))))\n (define (flow-drive-host dispatch) (flow-drive-host-step (flow-host-requests) dispatch))\n (define (flow-run-host dispatch maxticks)\n (if (<= maxticks 0)\n 0\n (let ((n (flow-drive-host dispatch)))\n (if (= n 0) 0 (+ n (flow-run-host dispatch (- maxticks 1)))))))")
|
||||
|
||||
(define
|
||||
flow-load-host!
|
||||
(fn
|
||||
(env)
|
||||
(begin (scheme-eval-program (scheme-parse-all flow-host-src) env) env)))
|
||||
34
lib/flow/remote.sx
Normal file
34
lib/flow/remote.sx
Normal file
@@ -0,0 +1,34 @@
|
||||
;; lib/flow/remote.sx — distributed nodes via fed-sx (Phase 4).
|
||||
;;
|
||||
;; A node can execute on a federation peer. The transport is the fed-sx boundary;
|
||||
;; it is MOCKED in tests by a peer registry mapping addr -> function table. In
|
||||
;; production flow-transport would issue a fed-sx call; here it dispatches locally.
|
||||
;;
|
||||
;; (flow-peer-register! addr table) — register a mock peer. table is a list of
|
||||
;; (fn-name proc) entries — the functions that peer exposes.
|
||||
;; (flow-transport addr fn input) — invoke fn on the peer with input. Raises
|
||||
;; (flow-remote-unreachable) if the addr is unknown, (flow-remote-no-fn) if the
|
||||
;; peer does not expose fn.
|
||||
;; (remote-node addr fn) — a node that runs fn on the peer at addr.
|
||||
;; (remote-failover addrs fn local) — try fn on each peer in addrs in order; on a
|
||||
;; raised error move to the next peer; if every peer fails, run the `local`
|
||||
;; node as a fallback.
|
||||
;;
|
||||
;; Persistence across instances + handoff. Each instance runs the same flow
|
||||
;; definitions, so the only thing that needs to cross the wire is the (plain-data)
|
||||
;; store — exactly flow-store-export from store.sx. Replication pushes that export
|
||||
;; to a peer's replica slot; handoff = restore the replica on the peer and resume.
|
||||
;;
|
||||
;; (flow-replicate-to addr) — copy this instance's store to peer addr's replica
|
||||
;; (flow-restore-from addr) — import the replica from peer addr (#t / #f)
|
||||
;; (flow-replica-get addr) — the raw replicated store at addr (or #f)
|
||||
|
||||
(define
|
||||
flow-remote-src
|
||||
"(define flow-peers (list))\n (define (flow-assoc key alist)\n (if (null? alist)\n #f\n (if (eq? (car (car alist)) key) (car (cdr (car alist))) (flow-assoc key (cdr alist)))))\n (define (flow-peer-register! addr table) (set! flow-peers (cons (list addr table) flow-peers)))\n (define (flow-transport addr fn input)\n (let ((table (flow-assoc addr flow-peers)))\n (if table\n (let ((proc (flow-assoc fn table)))\n (if proc (proc input) (raise (quote flow-remote-no-fn))))\n (raise (quote flow-remote-unreachable)))))\n (define (remote-node addr fn) (lambda (input) (flow-transport addr fn input)))\n (define (flow-failover-step addrs fn input local)\n (if (null? addrs)\n (local input)\n (guard (e (#t (flow-failover-step (cdr addrs) fn input local)))\n (flow-transport (car addrs) fn input))))\n (define (remote-failover addrs fn local)\n (lambda (input) (flow-failover-step addrs fn input local)))\n\n (define flow-replicas (list))\n (define (flow-replicas-remove addr reps)\n (if (null? reps)\n (list)\n (if (eq? (car (car reps)) addr)\n (flow-replicas-remove addr (cdr reps))\n (cons (car reps) (flow-replicas-remove addr (cdr reps))))))\n (define (flow-replicate-to addr)\n (set! flow-replicas (cons (list addr (flow-store-export)) (flow-replicas-remove addr flow-replicas))))\n (define (flow-replica-get addr) (flow-assoc addr flow-replicas))\n (define (flow-restore-from addr)\n (let ((data (flow-replica-get addr)))\n (if data (begin (flow-store-import! data) #t) #f)))")
|
||||
|
||||
(define
|
||||
flow-load-remote!
|
||||
(fn
|
||||
(env)
|
||||
(begin (scheme-eval-program (scheme-parse-all flow-remote-src) env) env)))
|
||||
19
lib/flow/scoreboard.json
Normal file
19
lib/flow/scoreboard.json
Normal file
@@ -0,0 +1,19 @@
|
||||
{
|
||||
"total": 166,
|
||||
"passed": 166,
|
||||
"failed": 0,
|
||||
"suites": {
|
||||
"basic": { "passed": 18, "total": 18 },
|
||||
"control": { "passed": 31, "total": 31 },
|
||||
"suspend": { "passed": 17, "total": 17 },
|
||||
"recovery": { "passed": 8, "total": 8 },
|
||||
"distributed": { "passed": 19, "total": 19 },
|
||||
"api": { "passed": 12, "total": 12 },
|
||||
"combinators": { "passed": 17, "total": 17 },
|
||||
"railway": { "passed": 10, "total": 10 },
|
||||
"integration": { "passed": 10, "total": 10 },
|
||||
"hygiene": { "passed": 9, "total": 9 },
|
||||
"host": { "passed": 15, "total": 15 }
|
||||
},
|
||||
"phases": { "phase1": "done", "phase2": "done", "phase3": "done", "phase4": "done", "phase5": "done", "phase6": "done", "phase7": "done", "phase8": "done" }
|
||||
}
|
||||
53
lib/flow/scoreboard.md
Normal file
53
lib/flow/scoreboard.md
Normal file
@@ -0,0 +1,53 @@
|
||||
# flow-on-sx Scoreboard
|
||||
|
||||
**All tests pass: 166 / 166 across 11 suites. Phases 1-8 complete.**
|
||||
|
||||
`bash lib/flow/conformance.sh`
|
||||
|
||||
## Per-suite breakdown
|
||||
|
||||
| Suite | Passing | Covers |
|
||||
|-------|--------:|--------|
|
||||
| basic | 18 | Phase 1: single nodes, linear sequence, data-flow threading, defflow, parallel fan/join, nested composition, publish-shaped flow |
|
||||
| control | 31 | Phase 2: `branch` (6); error model `fail`/`failed?`/`fail-reason` (6); `try-catch` (6); `retry n` (6); `timeout` cooperative step budget (7) |
|
||||
| suspend | 17 | Phase 3: suspend/resume/cancel via deterministic replay; multi-step, replay determinism, lifecycle guards, suspend-in-branch |
|
||||
| recovery | 8 | Phase 3: crash recovery — store export/import, resumable scan, restart-at-every-step, replay-log survival |
|
||||
| distributed | 19 | Phase 4: `remote-node` (7); `remote-failover` (6); replication + handoff across instances (6) |
|
||||
| api | 12 | Phase 5: introspection — `flow/status`, `flow/result`, `flow/list`, `flow/pending` |
|
||||
| combinators | 17 | Phase 5: `tap`, `recover` (fail-value), `map-flow` fan-over-list, `flow-while`/`flow-until` bounded iteration |
|
||||
| railway | 10 | Phase 6: `attempt` — fail-value short-circuiting sequence + recover rejoin |
|
||||
| integration | 10 | Phase 7: end-to-end order + onboarding flows composing every phase (suspend, branch, federation, crash recovery, handoff, introspection) |
|
||||
| hygiene | 9 | Phase 5: `flow/gc` (prune terminal flows), `flow/forget` (drop one terminal record) |
|
||||
| host | 15 | Phase 8: host ABI — `request`/`await-human`/`await-render`, `flow-host-requests` queue, `flow-run-host` reference driver; art-dag-shaped render→review→publish loop |
|
||||
|
||||
## Architecture
|
||||
|
||||
Flow combinators are a **Scheme prelude** (`lib/flow/spec.sx`) loaded onto
|
||||
`scheme-standard-env`. A flow is a Scheme procedure `input -> output`. The whole
|
||||
flow executes inside the Scheme interpreter, so Phase 3's `suspend` (call/cc) will
|
||||
capture the flow continuation directly.
|
||||
|
||||
- `lib/flow/spec.sx` — combinators: `flow-node`, `flow-id`, `flow-const`,
|
||||
`sequence`, `parallel`, `defflow`; `flow-load-combinators!`.
|
||||
- `lib/flow/api.sx` — `flow/start` (Scheme); `flow-make-env`, `flow-run`,
|
||||
`flow-run-in` (SX helpers).
|
||||
- `lib/flow/tests/basic.sx` — 18 cases.
|
||||
- `lib/flow/conformance.sh` — loads substrate + flow layer, runs suites.
|
||||
|
||||
## Semantics notes
|
||||
|
||||
- **node** = 1-arg Scheme procedure; the upstream value is the argument. A node
|
||||
ignoring its argument is effectively a thunk.
|
||||
- **sequence** threads left-to-right; empty sequence = identity.
|
||||
- **parallel** fans the same input to every branch and joins results into a list.
|
||||
Evaluation is **sequential** for now; true concurrency arrives in Phase 3.
|
||||
|
||||
## Phases
|
||||
|
||||
- [x] Phase 1 — Declarative DAG + sequential execution (combinators + 18 tests, `flow/start`)
|
||||
- [x] Phase 2 — Control flow + error handling (branch, error model, try-catch, retry, timeout)
|
||||
- [x] Phase 3 — Suspend/resume (suspend/resume/cancel + crash recovery via deterministic replay)
|
||||
- [x] Phase 4 — Distributed nodes via fed-sx (remote-node, failover, replication + handoff)
|
||||
- [x] Phase 5 — Operational API + combinators (introspection, tap, recover, map-flow)
|
||||
- [ ] Phase 3 — Suspend / resume (the showcase)
|
||||
- [ ] Phase 4 — Distributed nodes via fed-sx
|
||||
61
lib/flow/spec.sx
Normal file
61
lib/flow/spec.sx
Normal file
@@ -0,0 +1,61 @@
|
||||
;; lib/flow/spec.sx — flow combinators as a Scheme prelude.
|
||||
;;
|
||||
;; A flow is a Scheme procedure of one argument: the upstream value.
|
||||
;; node : input -> output
|
||||
;; A leaf node ignoring its argument is effectively a thunk. Combinators
|
||||
;; build composite nodes out of child nodes. The whole flow runs INSIDE the
|
||||
;; Scheme interpreter.
|
||||
;;
|
||||
;; Phase 1 combinators (flow-combinators-src):
|
||||
;; flow-node / flow-id / flow-const / sequence / parallel / defflow
|
||||
;; defflow both binds the flow and registers it by name (flow-register!, in
|
||||
;; store.sx) so it can be re-resolved after a process restart.
|
||||
;; map-flow (Phase 5): run a node over each item of a list input, join results.
|
||||
;; flow-while / flow-until (Phase 5): bounded iteration — re-run body, threading
|
||||
;; the value, while/until pred holds, up to `max` steps (deterministic bound; no
|
||||
;; unbounded loops in pure SX).
|
||||
;;
|
||||
;; Phase 2 combinators (flow-control-src):
|
||||
;; branch / fail / failed? / fail-reason / try-catch / retry / timeout / tick
|
||||
;; tap (Phase 5): side-effecting pass-through (returns input unchanged).
|
||||
;; recover (Phase 5): the fail-VALUE counterpart of try-catch.
|
||||
;; attempt (Phase 6): railway sequence — thread nodes left-to-right but stop at
|
||||
;; the first node that returns a (fail ...) value, returning that failure.
|
||||
;;
|
||||
;; Phase 3 suspend core (flow-suspend-src):
|
||||
;; The guest Scheme's call/cc is ESCAPE-ONLY (re-invoking a captured k after it
|
||||
;; returns hangs the runtime), so suspend/resume CANNOT re-enter a continuation.
|
||||
;; Instead, durability uses DETERMINISTIC REPLAY: a flow re-runs from the start
|
||||
;; on each resume; suspend points that have already been resolved replay their
|
||||
;; logged value, and the first unresolved suspend escapes back to the driver.
|
||||
;; The entire persisted state is the replay log (plain (tag value) data), which
|
||||
;; survives process restart — no live continuation is ever serialized.
|
||||
;;
|
||||
;; (suspend tag) — if tag is in the replay log, return its value; else escape
|
||||
;; to the driver as (flow-suspended tag). tags must be unique & deterministic
|
||||
;; across replays. ALL effects/non-determinism must go through suspend so their
|
||||
;; results are logged (otherwise they re-run on every replay).
|
||||
;; (flow-drive flow input log) — run flow with the given replay log; returns
|
||||
;; (flow-done result) or (flow-suspended tag).
|
||||
|
||||
(define
|
||||
flow-combinators-src
|
||||
"(define (flow-node f) f)\n (define (flow-id input) input)\n (define (flow-const v) (lambda (input) v))\n (define (flow-seq-step ns v)\n (if (null? ns) v (flow-seq-step (cdr ns) ((car ns) v))))\n (define sequence (lambda ns (lambda (input) (flow-seq-step ns input))))\n (define parallel (lambda ns (lambda (input) (map (lambda (n) (n input)) ns))))\n (define (map-flow node) (lambda (items) (map node items)))\n (define (flow-while-step pred body input n)\n (if (<= n 0)\n input\n (if (pred input) (flow-while-step pred body (body input) (- n 1)) input)))\n (define (flow-while pred body max) (lambda (input) (flow-while-step pred body input max)))\n (define (flow-until-step pred body input n)\n (if (<= n 0)\n input\n (if (pred input) input (flow-until-step pred body (body input) (- n 1)))))\n (define (flow-until pred body max) (lambda (input) (flow-until-step pred body input max)))\n (define-syntax defflow\n (syntax-rules ()\n ((defflow nm body)\n (begin (define nm body) (flow-register! (quote nm) nm)))))")
|
||||
|
||||
(define
|
||||
flow-control-src
|
||||
"(define (branch pred then else)\n (lambda (input) (if (pred input) (then input) (else input))))\n (define (fail reason) (list (quote flow-fail) reason))\n (define (failed? x) (and (pair? x) (eq? (car x) (quote flow-fail))))\n (define (fail-reason x) (car (cdr x)))\n (define (recover node handler)\n (lambda (input)\n (let ((r (node input)))\n (if (failed? r) (handler (fail-reason r)) r))))\n (define (tap effect)\n (lambda (input) (begin (effect input) input)))\n (define (flow-attempt-step ns v)\n (if (failed? v)\n v\n (if (null? ns) v (flow-attempt-step (cdr ns) ((car ns) v)))))\n (define attempt (lambda ns (lambda (input) (flow-attempt-step ns input))))\n (define (try-catch node handler)\n (lambda (input) (guard (e (#t (handler e))) (node input))))\n (define (flow-retry-step n node input)\n (guard (e (#t (if (<= n 1) (raise e) (flow-retry-step (- n 1) node input))))\n (node input)))\n (define (retry n node) (lambda (input) (flow-retry-step n node input)))\n (define flow-timeout-budget -1)\n (define (tick)\n (if (< flow-timeout-budget 0)\n 0\n (begin\n (set! flow-timeout-budget (- flow-timeout-budget 1))\n (if (< flow-timeout-budget 0)\n (raise (quote flow-timeout))\n flow-timeout-budget))))\n (define (timeout budget node)\n (lambda (input)\n (let ((saved flow-timeout-budget))\n (set! flow-timeout-budget budget)\n (guard (e (#t (begin (set! flow-timeout-budget saved) (raise e))))\n (let ((result (node input)))\n (set! flow-timeout-budget saved)\n result)))))")
|
||||
|
||||
(define
|
||||
flow-suspend-src
|
||||
"(define flow-replay-log (list))\n (define flow-suspend-k #f)\n (define (flow-log-lookup tag log)\n (if (null? log)\n (list #f #f)\n (if (eq? (car (car log)) tag)\n (list #t (car (cdr (car log))))\n (flow-log-lookup tag (cdr log)))))\n (define (suspend tag)\n (let ((hit (flow-log-lookup tag flow-replay-log)))\n (if (car hit)\n (car (cdr hit))\n (flow-suspend-k (list (quote flow-suspended) tag)))))\n (define (flow-drive flow input log)\n (set! flow-replay-log log)\n (call/cc\n (lambda (k)\n (set! flow-suspend-k k)\n (list (quote flow-done) (flow input)))))")
|
||||
|
||||
(define
|
||||
flow-load-combinators!
|
||||
(fn
|
||||
(env)
|
||||
(begin
|
||||
(scheme-eval-program (scheme-parse-all flow-combinators-src) env)
|
||||
(scheme-eval-program (scheme-parse-all flow-control-src) env)
|
||||
(scheme-eval-program (scheme-parse-all flow-suspend-src) env)
|
||||
env)))
|
||||
45
lib/flow/store.sx
Normal file
45
lib/flow/store.sx
Normal file
File diff suppressed because one or more lines are too long
79
lib/flow/tests/api.sx
Normal file
79
lib/flow/tests/api.sx
Normal file
@@ -0,0 +1,79 @@
|
||||
;; lib/flow/tests/api.sx — Phase 5: operational introspection API.
|
||||
|
||||
(define flow-api-pass 0)
|
||||
(define flow-api-fail 0)
|
||||
(define flow-api-fails (list))
|
||||
|
||||
(define
|
||||
flow-api-test
|
||||
(fn
|
||||
(name actual expected)
|
||||
(if
|
||||
(= actual expected)
|
||||
(set! flow-api-pass (+ flow-api-pass 1))
|
||||
(begin
|
||||
(set! flow-api-fail (+ flow-api-fail 1))
|
||||
(append! flow-api-fails {:name name :expected expected :actual actual})))))
|
||||
|
||||
(define flow-a (fn (src) (flow-run src)))
|
||||
|
||||
;; ── flow/status ─────────────────────────────────────────────────
|
||||
(flow-api-test "status: unknown id" (flow-a "(flow/status 999)") "unknown")
|
||||
(flow-api-test
|
||||
"status: suspended flow"
|
||||
(flow-a
|
||||
"(defflow w (lambda (x) (suspend (quote q)))) (define id (car (cdr (flow/start w 0)))) (flow/status id)")
|
||||
"suspended")
|
||||
(flow-api-test
|
||||
"status: completed flow"
|
||||
(flow-a
|
||||
"(defflow w (sequence (lambda (x) (suspend (quote q))) (lambda (v) v))) (define id (car (cdr (flow/start w 0)))) (flow/resume id 5) (flow/status id)")
|
||||
"done")
|
||||
(flow-api-test
|
||||
"status: cancelled flow"
|
||||
(flow-a
|
||||
"(defflow w (lambda (x) (suspend (quote q)))) (define id (car (cdr (flow/start w 0)))) (flow/cancel id) (flow/status id)")
|
||||
"cancelled")
|
||||
|
||||
;; ── flow/result ─────────────────────────────────────────────────
|
||||
(flow-api-test
|
||||
"result: returns the value of a completed flow"
|
||||
(flow-a
|
||||
"(defflow w (sequence (lambda (x) (suspend (quote q))) (lambda (v) (list (quote got) v)))) (define id (car (cdr (flow/start w 0)))) (flow/resume id 9) (flow/result id)")
|
||||
(list "got" 9))
|
||||
(flow-api-test
|
||||
"result: a still-suspended flow has no result"
|
||||
(flow-a
|
||||
"(defflow w (lambda (x) (suspend (quote q)))) (define id (car (cdr (flow/start w 0)))) (flow/result id)")
|
||||
(list "flow-error" "not-done"))
|
||||
(flow-api-test
|
||||
"result: unknown id errors"
|
||||
(flow-a "(flow/result 999)")
|
||||
(list "flow-error" "no-such-flow"))
|
||||
|
||||
;; ── flow/list ───────────────────────────────────────────────────
|
||||
(flow-api-test "list: empty store" (flow-a "(flow/list)") (list))
|
||||
(flow-api-test
|
||||
"list: reports id + status for each flow (newest first)"
|
||||
(flow-a
|
||||
"(defflow w (lambda (x) (suspend (quote q)))) (flow/start w 0) (flow/start (lambda (x) (* x 2)) 5) (flow/list)")
|
||||
(list (list 2 "done") (list 1 "suspended")))
|
||||
|
||||
;; ── flow/pending ────────────────────────────────────────────────
|
||||
(flow-api-test
|
||||
"pending: lists suspended flows with their waiting tag"
|
||||
(flow-a
|
||||
"(defflow w (lambda (x) (suspend (quote review)))) (flow/start w 0) (flow/pending)")
|
||||
(list (list 1 "review")))
|
||||
(flow-api-test
|
||||
"pending: excludes completed and cancelled flows"
|
||||
(flow-a
|
||||
"(defflow w (lambda (x) (suspend (quote q)))) (defflow v (sequence (lambda (x) (suspend (quote r))) (lambda (y) y))) (define i1 (car (cdr (flow/start w 0)))) (define i2 (car (cdr (flow/start v 0)))) (define i3 (car (cdr (flow/start w 0)))) (flow/resume i2 1) (flow/cancel i3) (flow/pending)")
|
||||
(list (list 1 "q")))
|
||||
(flow-api-test
|
||||
"pending: operator can drain all pending flows"
|
||||
(flow-a
|
||||
"(defflow w (sequence (lambda (x) (suspend (quote q))) (lambda (v) (* v 10)))) (flow/start w 0) (flow/start w 0) (define ps (flow/pending)) (flow/resume (car (car ps)) 1) (flow/resume (car (car (cdr ps))) 2) (flow/list)")
|
||||
(list (list 1 "done") (list 2 "done")))
|
||||
|
||||
(define flow-api-tests-run! (fn () {:total (+ flow-api-pass flow-api-fail) :passed flow-api-pass :failed flow-api-fail :fails flow-api-fails}))
|
||||
121
lib/flow/tests/basic.sx
Normal file
121
lib/flow/tests/basic.sx
Normal file
@@ -0,0 +1,121 @@
|
||||
;; lib/flow/tests/basic.sx — Phase 1: declarative DAG + sequential execution.
|
||||
|
||||
(define flow-basic-pass 0)
|
||||
(define flow-basic-fail 0)
|
||||
(define flow-basic-fails (list))
|
||||
|
||||
(define
|
||||
flow-basic-test
|
||||
(fn
|
||||
(name actual expected)
|
||||
(if
|
||||
(= actual expected)
|
||||
(set! flow-basic-pass (+ flow-basic-pass 1))
|
||||
(begin
|
||||
(set! flow-basic-fail (+ flow-basic-fail 1))
|
||||
(append! flow-basic-fails {:name name :expected expected :actual actual})))))
|
||||
|
||||
;; Run a Scheme flow-program string and return its final value.
|
||||
(define flow-b (fn (src) (flow-run src)))
|
||||
|
||||
;; Scheme strings are boxed as {:scm-string "..."}; unwrap to a host string.
|
||||
(define flow-bs (fn (src) (get (flow-run src) :scm-string)))
|
||||
|
||||
;; ── single node ─────────────────────────────────────────────────
|
||||
(flow-basic-test
|
||||
"node: identity passes input through"
|
||||
(flow-b "(flow/start flow-id 7)")
|
||||
7)
|
||||
(flow-basic-test
|
||||
"node: const ignores input"
|
||||
(flow-b "(flow/start (flow-const 99) 1)")
|
||||
99)
|
||||
(flow-basic-test
|
||||
"node: bare lambda is a node"
|
||||
(flow-b "(flow/start (lambda (x) (* x x)) 6)")
|
||||
36)
|
||||
|
||||
;; ── linear sequence ─────────────────────────────────────────────
|
||||
(flow-basic-test
|
||||
"sequence: empty is identity"
|
||||
(flow-b "(flow/start (sequence) 42)")
|
||||
42)
|
||||
(flow-basic-test
|
||||
"sequence: single child"
|
||||
(flow-b "(flow/start (sequence (lambda (x) (+ x 1))) 41)")
|
||||
42)
|
||||
(flow-basic-test
|
||||
"sequence: two children thread"
|
||||
(flow-b
|
||||
"(flow/start (sequence (lambda (x) (+ x 1)) (lambda (x) (* x 10))) 4)")
|
||||
50)
|
||||
(flow-basic-test
|
||||
"sequence: three children thread"
|
||||
(flow-b
|
||||
"(flow/start (sequence (lambda (x) (+ x 1)) (lambda (x) (* x 2)) (lambda (x) (- x 3))) 5)")
|
||||
9)
|
||||
|
||||
;; ── data flow between nodes ─────────────────────────────────────
|
||||
(flow-basic-test
|
||||
"data flow: string accumulation"
|
||||
(flow-bs
|
||||
"(flow/start (sequence (lambda (s) (string-append s \"-a\")) (lambda (s) (string-append s \"-b\"))) \"x\")")
|
||||
"x-a-b")
|
||||
(flow-basic-test
|
||||
"data flow: list build"
|
||||
(flow-b
|
||||
"(flow/start (sequence (lambda (x) (cons x (list))) (lambda (xs) (cons 0 xs))) 7)")
|
||||
(list 0 7))
|
||||
|
||||
;; ── defflow ─────────────────────────────────────────────────────
|
||||
(flow-basic-test
|
||||
"defflow: names a flow"
|
||||
(flow-b
|
||||
"(defflow inc2 (sequence (lambda (x) (+ x 1)) (lambda (x) (+ x 1)))) (flow/start inc2 40)")
|
||||
42)
|
||||
(flow-basic-test
|
||||
"defflow: reusable"
|
||||
(flow-b
|
||||
"(defflow dbl (lambda (x) (* x 2))) (+ (flow/start dbl 3) (flow/start dbl 10))")
|
||||
26)
|
||||
|
||||
;; ── parallel (sequential semantics, join into list) ─────────────
|
||||
(flow-basic-test
|
||||
"parallel: fans input to all branches"
|
||||
(flow-b
|
||||
"(flow/start (parallel (lambda (x) (+ x 1)) (lambda (x) (* x 2)) (lambda (x) (- x 3))) 10)")
|
||||
(list 11 20 7))
|
||||
(flow-basic-test
|
||||
"parallel: empty joins to empty list"
|
||||
(flow-b "(flow/start (parallel) 5)")
|
||||
(list))
|
||||
(flow-basic-test
|
||||
"parallel: single branch"
|
||||
(flow-b "(flow/start (parallel (lambda (x) (* x x))) 9)")
|
||||
(list 81))
|
||||
|
||||
;; ── nested composition ──────────────────────────────────────────
|
||||
(flow-basic-test
|
||||
"nested: sequence of sequences"
|
||||
(flow-b
|
||||
"(flow/start (sequence (sequence (lambda (x) (+ x 1)) (lambda (x) (+ x 1))) (sequence (lambda (x) (* x 3)))) 0)")
|
||||
6)
|
||||
(flow-basic-test
|
||||
"nested: parallel inside sequence, join then reduce"
|
||||
(flow-b
|
||||
"(flow/start (sequence (parallel (lambda (x) (+ x 1)) (lambda (x) (* x 2))) (lambda (xs) (apply + xs))) 10)")
|
||||
31)
|
||||
(flow-basic-test
|
||||
"nested: sequence inside parallel branch"
|
||||
(flow-b
|
||||
"(flow/start (parallel (sequence (lambda (x) (+ x 1)) (lambda (x) (* x 2))) (lambda (x) x)) 5)")
|
||||
(list 12 5))
|
||||
|
||||
;; ── publish-shaped flow (the architecture sketch) ───────────────
|
||||
(flow-basic-test
|
||||
"publish: write -> (review | spell) -> join lengths"
|
||||
(flow-b
|
||||
"(defflow publish (sequence (lambda (draft) (string-append draft \"!\")) (parallel (lambda (c) (string-length c)) (lambda (c) (string-length (string-append c \"?\")))))) (flow/start publish \"hi\")")
|
||||
(list 3 4))
|
||||
|
||||
(define flow-basic-tests-run! (fn () {:total (+ flow-basic-pass flow-basic-fail) :passed flow-basic-pass :failed flow-basic-fail :fails flow-basic-fails}))
|
||||
108
lib/flow/tests/combinators.sx
Normal file
108
lib/flow/tests/combinators.sx
Normal file
@@ -0,0 +1,108 @@
|
||||
;; lib/flow/tests/combinators.sx — Phase 5: combinator library (tap, recover, map-flow, iteration).
|
||||
|
||||
(define flow-cmb-pass 0)
|
||||
(define flow-cmb-fail 0)
|
||||
(define flow-cmb-fails (list))
|
||||
|
||||
(define
|
||||
flow-cmb-test
|
||||
(fn
|
||||
(name actual expected)
|
||||
(if
|
||||
(= actual expected)
|
||||
(set! flow-cmb-pass (+ flow-cmb-pass 1))
|
||||
(begin
|
||||
(set! flow-cmb-fail (+ flow-cmb-fail 1))
|
||||
(append! flow-cmb-fails {:name name :expected expected :actual actual})))))
|
||||
|
||||
(define flow-m (fn (src) (flow-run src)))
|
||||
|
||||
;; ── tap (side-effecting pass-through) ───────────────────────────
|
||||
(flow-cmb-test
|
||||
"tap: returns input unchanged"
|
||||
(flow-m "(flow/start (tap (lambda (x) (* x 999))) 7)")
|
||||
7)
|
||||
(flow-cmb-test
|
||||
"tap: runs the side effect"
|
||||
(flow-m
|
||||
"(define seen 0) (flow/start (tap (lambda (x) (set! seen x))) 42) seen")
|
||||
42)
|
||||
(flow-cmb-test
|
||||
"tap: value flows on while the effect observes it"
|
||||
(flow-m
|
||||
"(define log 0) (flow/start (sequence (lambda (x) (+ x 1)) (tap (lambda (x) (set! log x))) (lambda (x) (* x 2))) 10) (list log (flow/result 1))")
|
||||
(list 11 22))
|
||||
|
||||
;; ── recover (fail-value counterpart of try-catch) ───────────────
|
||||
(flow-cmb-test
|
||||
"recover: passes a non-fail value through"
|
||||
(flow-m "(flow/start (recover (lambda (x) (* x 2)) (lambda (r) -1)) 5)")
|
||||
10)
|
||||
(flow-cmb-test
|
||||
"recover: handles a fail value via the reason"
|
||||
(flow-m
|
||||
"(flow/start (recover (lambda (x) (fail (quote too-small))) (lambda (r) (list (quote recovered) r))) 1)")
|
||||
(list "recovered" "too-small"))
|
||||
(flow-cmb-test
|
||||
"recover: handler can supply a default value"
|
||||
(flow-m
|
||||
"(flow/start (sequence (recover (lambda (x) (if (> x 0) x (fail (quote neg))) ) (flow-const 0)) (lambda (x) (* x 10))) -3)")
|
||||
0)
|
||||
(flow-cmb-test
|
||||
"recover: does not catch raised exceptions (those are try-catch's job)"
|
||||
(flow-m
|
||||
"(flow/start (try-catch (recover (lambda (x) (raise (quote boom))) (flow-const 0)) (lambda (e) e)) 1)")
|
||||
"boom")
|
||||
|
||||
;; ── map-flow (run a node over a list, join) ─────────────────────
|
||||
(flow-cmb-test
|
||||
"map-flow: applies the node to each item"
|
||||
(flow-m "(flow/start (map-flow (lambda (x) (* x x))) (list 1 2 3 4))")
|
||||
(list 1 4 9 16))
|
||||
(flow-cmb-test
|
||||
"map-flow: empty list joins to empty"
|
||||
(flow-m "(flow/start (map-flow (lambda (x) (+ x 1))) (list))")
|
||||
(list))
|
||||
(flow-cmb-test
|
||||
"map-flow: each item runs an independent sub-flow"
|
||||
(flow-m
|
||||
"(flow/start (map-flow (sequence (lambda (x) (+ x 1)) (lambda (x) (* x 2)))) (list 0 4 9))")
|
||||
(list 2 10 20))
|
||||
(flow-cmb-test
|
||||
"map-flow: composes — fan over a list then reduce the join"
|
||||
(flow-m
|
||||
"(flow/start (sequence (map-flow (lambda (x) (* x 10))) (lambda (xs) (apply + xs))) (list 1 2 3))")
|
||||
60)
|
||||
|
||||
;; ── flow-while / flow-until (bounded iteration) ─────────────────
|
||||
(flow-cmb-test
|
||||
"flow-while: iterates while the predicate holds"
|
||||
(flow-m
|
||||
"(flow/start (flow-while (lambda (x) (< x 10)) (lambda (x) (+ x 1)) 100) 0)")
|
||||
10)
|
||||
(flow-cmb-test
|
||||
"flow-while: a false predicate leaves input unchanged"
|
||||
(flow-m
|
||||
"(flow/start (flow-while (lambda (x) (< x 0)) (lambda (x) (+ x 1)) 100) 5)")
|
||||
5)
|
||||
(flow-cmb-test
|
||||
"flow-while: respects the max-iteration bound"
|
||||
(flow-m "(flow/start (flow-while (lambda (x) #t) (lambda (x) (+ x 1)) 3) 0)")
|
||||
3)
|
||||
(flow-cmb-test
|
||||
"flow-while: doubles until past a threshold"
|
||||
(flow-m
|
||||
"(flow/start (flow-while (lambda (x) (< x 50)) (lambda (x) (* x 2)) 100) 3)")
|
||||
96)
|
||||
(flow-cmb-test
|
||||
"flow-until: iterates until the predicate becomes true"
|
||||
(flow-m
|
||||
"(flow/start (flow-until (lambda (x) (>= x 10)) (lambda (x) (+ x 3)) 100) 0)")
|
||||
12)
|
||||
(flow-cmb-test
|
||||
"flow-until: composes inside a sequence"
|
||||
(flow-m
|
||||
"(flow/start (sequence (flow-until (lambda (x) (> x 100)) (lambda (x) (* x 3)) 100) (lambda (x) (- x 100))) 5)")
|
||||
35)
|
||||
|
||||
(define flow-cmb-tests-run! (fn () {:total (+ flow-cmb-pass flow-cmb-fail) :passed flow-cmb-pass :failed flow-cmb-fail :fails flow-cmb-fails}))
|
||||
179
lib/flow/tests/control.sx
Normal file
179
lib/flow/tests/control.sx
Normal file
@@ -0,0 +1,179 @@
|
||||
;; lib/flow/tests/control.sx — Phase 2: control flow + error handling.
|
||||
|
||||
(define flow-ctl-pass 0)
|
||||
(define flow-ctl-fail 0)
|
||||
(define flow-ctl-fails (list))
|
||||
|
||||
(define
|
||||
flow-ctl-test
|
||||
(fn
|
||||
(name actual expected)
|
||||
(if
|
||||
(= actual expected)
|
||||
(set! flow-ctl-pass (+ flow-ctl-pass 1))
|
||||
(begin
|
||||
(set! flow-ctl-fail (+ flow-ctl-fail 1))
|
||||
(append! flow-ctl-fails {:name name :expected expected :actual actual})))))
|
||||
|
||||
(define flow-c (fn (src) (flow-run src)))
|
||||
(define flow-cs (fn (src) (get (flow-run src) :scm-string)))
|
||||
|
||||
;; ── branch ──────────────────────────────────────────────────────
|
||||
(flow-ctl-test
|
||||
"branch: true selects then"
|
||||
(flow-c
|
||||
"(flow/start (branch (lambda (x) (> x 0)) (lambda (x) (* x 100)) (lambda (x) (- 0 x))) 5)")
|
||||
500)
|
||||
(flow-ctl-test
|
||||
"branch: false selects else"
|
||||
(flow-c
|
||||
"(flow/start (branch (lambda (x) (> x 0)) (lambda (x) (* x 100)) (lambda (x) (- 0 x))) -3)")
|
||||
3)
|
||||
(flow-ctl-test
|
||||
"branch: predicate sees the threaded input"
|
||||
(flow-c
|
||||
"(flow/start (sequence (lambda (x) (+ x 1)) (branch (lambda (x) (> x 3)) (flow-const 100) (flow-const 0))) 3)")
|
||||
100)
|
||||
(flow-ctl-test
|
||||
"branch: branches are full nodes (sequence inside)"
|
||||
(flow-c
|
||||
"(flow/start (branch (lambda (x) (< x 10)) (sequence (lambda (x) (+ x 1)) (lambda (x) (* x 2))) (flow-const 0)) 4)")
|
||||
10)
|
||||
(flow-ctl-test
|
||||
"branch: nested branch (3-way sign)"
|
||||
(flow-c
|
||||
"(defflow sign (branch (lambda (x) (> x 0)) (flow-const 1) (branch (lambda (x) (< x 0)) (flow-const -1) (flow-const 0)))) (list (flow/start sign 7) (flow/start sign -7) (flow/start sign 0))")
|
||||
(list 1 -1 0))
|
||||
(flow-ctl-test
|
||||
"branch: publish-shaped approval gate"
|
||||
(flow-cs
|
||||
"(defflow publish (branch (lambda (post) (>= (string-length post) 3)) (lambda (post) (string-append post \" [published]\")) (lambda (post) (string-append post \" [rejected]\")))) (flow/start publish \"ok\")")
|
||||
"ok [rejected]")
|
||||
|
||||
;; ── error model — explicit (fail reason) values ─────────────────
|
||||
(flow-ctl-test
|
||||
"fail: failed? is true for a failure value"
|
||||
(flow-c "(failed? (fail 404))")
|
||||
true)
|
||||
(flow-ctl-test
|
||||
"fail: fail-reason extracts the reason"
|
||||
(flow-c "(fail-reason (fail 404))")
|
||||
404)
|
||||
(flow-ctl-test
|
||||
"fail: failed? is false for a plain value"
|
||||
(flow-c "(failed? 7)")
|
||||
false)
|
||||
(flow-ctl-test
|
||||
"fail: failed? is false for an ordinary list"
|
||||
(flow-c "(failed? (list 1 2 3))")
|
||||
false)
|
||||
(flow-ctl-test
|
||||
"fail: a node may emit a failure as data"
|
||||
(flow-c
|
||||
"(defflow validate (lambda (s) (if (>= (string-length s) 3) s (fail (quote too-short))))) (failed? (flow/start validate \"hi\"))")
|
||||
true)
|
||||
(flow-ctl-test
|
||||
"fail: failure flows downstream, branch recovers"
|
||||
(flow-c
|
||||
"(defflow guarded (sequence (lambda (s) (if (>= (string-length s) 3) (string-length s) (fail (quote too-short)))) (branch failed? (lambda (f) (list (quote recovered) (fail-reason f))) (lambda (n) (list (quote ok) n))))) (flow/start guarded \"hi\")")
|
||||
(list "recovered" "too-short"))
|
||||
|
||||
;; ── try-catch — reify raised exceptions ─────────────────────────
|
||||
(flow-ctl-test
|
||||
"try-catch: no exception returns node result"
|
||||
(flow-c "(flow/start (try-catch (lambda (x) (* x 2)) (lambda (e) -1)) 5)")
|
||||
10)
|
||||
(flow-ctl-test
|
||||
"try-catch: handler runs on raise"
|
||||
(flow-c
|
||||
"(flow/start (try-catch (lambda (x) (raise (quote boom))) (flow-const 99)) 1)")
|
||||
99)
|
||||
(flow-ctl-test
|
||||
"try-catch: handler receives the reified error"
|
||||
(flow-c "(flow/start (try-catch (lambda (x) (raise 42)) (lambda (e) e)) 0)")
|
||||
42)
|
||||
(flow-ctl-test
|
||||
"try-catch: catches exception from deep inside a sequence"
|
||||
(flow-c
|
||||
"(flow/start (try-catch (sequence (lambda (x) (+ x 1)) (lambda (x) (raise (quote deep)))) (flow-const -99)) 5)")
|
||||
-99)
|
||||
(flow-ctl-test
|
||||
"try-catch: handler may convert to a failure value"
|
||||
(flow-c
|
||||
"(failed? (flow/start (try-catch (lambda (x) (raise (quote bad))) (lambda (e) (fail e))) 0))")
|
||||
true)
|
||||
(flow-ctl-test
|
||||
"try-catch: composes — recover then continue"
|
||||
(flow-c
|
||||
"(flow/start (sequence (try-catch (lambda (x) (raise (quote x))) (flow-const 10)) (lambda (n) (* n 5))) 0)")
|
||||
50)
|
||||
|
||||
;; ── retry — re-run on raised exceptions ─────────────────────────
|
||||
(flow-ctl-test
|
||||
"retry: succeeds after transient failures"
|
||||
(flow-c
|
||||
"(define ctr 0) (defflow flaky (lambda (x) (set! ctr (+ ctr 1)) (if (< ctr 3) (raise (quote nope)) (* x 10)))) (list (flow/start (retry 5 flaky) 7) ctr)")
|
||||
(list 70 3))
|
||||
(flow-ctl-test
|
||||
"retry: exhausted re-raises (caught by try-catch)"
|
||||
(flow-c
|
||||
"(flow/start (try-catch (retry 2 (lambda (x) (raise (quote always)))) (flow-const (quote gaveup))) 0)")
|
||||
"gaveup")
|
||||
(flow-ctl-test
|
||||
"retry: n=1 means a single attempt"
|
||||
(flow-c
|
||||
"(define ctr 0) (flow/start (try-catch (retry 1 (lambda (x) (set! ctr (+ ctr 1)) (raise (quote bad)))) (lambda (e) ctr)) 0)")
|
||||
1)
|
||||
(flow-ctl-test
|
||||
"retry: success on first attempt does not re-run"
|
||||
(flow-c
|
||||
"(define ctr 0) (flow/start (sequence (retry 5 (lambda (x) (set! ctr (+ ctr 1)) (* x 2))) (lambda (n) ctr)) 21)")
|
||||
1)
|
||||
(flow-ctl-test
|
||||
"retry: does not retry explicit failure values"
|
||||
(flow-c
|
||||
"(define ctr 0) (failed? (flow/start (retry 5 (lambda (x) (set! ctr (+ ctr 1)) (fail (quote bad)))) 0))")
|
||||
true)
|
||||
(flow-ctl-test
|
||||
"retry: failure-value path runs node exactly once"
|
||||
(flow-c
|
||||
"(define ctr 0) (flow/start (sequence (retry 5 (lambda (x) (set! ctr (+ ctr 1)) (fail (quote bad)))) (lambda (f) ctr)) 0)")
|
||||
1)
|
||||
|
||||
;; ── timeout — cooperative step budget ───────────────────────────
|
||||
(flow-ctl-test
|
||||
"timeout: work within budget completes"
|
||||
(flow-c
|
||||
"(define (cd n) (if (<= n 0) 99 (begin (tick) (cd (- n 1))))) (flow/start (try-catch (timeout 10 (lambda (x) (cd x))) (flow-const (quote timed-out))) 5)")
|
||||
99)
|
||||
(flow-ctl-test
|
||||
"timeout: work exceeding budget raises flow-timeout"
|
||||
(flow-c
|
||||
"(define (cd n) (if (<= n 0) 99 (begin (tick) (cd (- n 1))))) (flow/start (try-catch (timeout 10 (lambda (x) (cd x))) (flow-const (quote timed-out))) 20)")
|
||||
"timed-out")
|
||||
(flow-ctl-test
|
||||
"timeout: exact budget boundary completes"
|
||||
(flow-c
|
||||
"(define (cd n) (if (<= n 0) 99 (begin (tick) (cd (- n 1))))) (flow/start (try-catch (timeout 5 (lambda (x) (cd x))) (flow-const (quote timed-out))) 5)")
|
||||
99)
|
||||
(flow-ctl-test
|
||||
"timeout: one tick over the budget raises"
|
||||
(flow-c
|
||||
"(define (cd n) (if (<= n 0) 99 (begin (tick) (cd (- n 1))))) (flow/start (try-catch (timeout 5 (lambda (x) (cd x))) (flow-const (quote timed-out))) 6)")
|
||||
"timed-out")
|
||||
(flow-ctl-test
|
||||
"timeout: the raised error is identifiable"
|
||||
(flow-c
|
||||
"(define (cd n) (if (<= n 0) 99 (begin (tick) (cd (- n 1))))) (flow/start (try-catch (timeout 2 (lambda (x) (cd x))) (lambda (e) e)) 9)")
|
||||
"flow-timeout")
|
||||
(flow-ctl-test
|
||||
"timeout: a node that never ticks is unbounded"
|
||||
(flow-c "(flow/start (timeout 0 (lambda (x) (* x 2))) 5)")
|
||||
10)
|
||||
(flow-ctl-test
|
||||
"timeout: budget is restored across sequential timeouts"
|
||||
(flow-c
|
||||
"(define (cd n) (if (<= n 0) 1 (begin (tick) (cd (- n 1))))) (flow/start (sequence (timeout 4 (lambda (x) (cd x))) (timeout 4 (lambda (x) (cd 3))) (lambda (x) (begin (tick) (+ x 100)))) 3)")
|
||||
101)
|
||||
|
||||
(define flow-ctl-tests-run! (fn () {:total (+ flow-ctl-pass flow-ctl-fail) :passed flow-ctl-pass :failed flow-ctl-fail :fails flow-ctl-fails}))
|
||||
120
lib/flow/tests/distributed.sx
Normal file
120
lib/flow/tests/distributed.sx
Normal file
@@ -0,0 +1,120 @@
|
||||
;; lib/flow/tests/distributed.sx — Phase 4: distributed nodes via fed-sx (mocked).
|
||||
|
||||
(define flow-dist-pass 0)
|
||||
(define flow-dist-fail 0)
|
||||
(define flow-dist-fails (list))
|
||||
|
||||
(define
|
||||
flow-dist-test
|
||||
(fn
|
||||
(name actual expected)
|
||||
(if
|
||||
(= actual expected)
|
||||
(set! flow-dist-pass (+ flow-dist-pass 1))
|
||||
(begin
|
||||
(set! flow-dist-fail (+ flow-dist-fail 1))
|
||||
(append! flow-dist-fails {:name name :expected expected :actual actual})))))
|
||||
|
||||
(define flow-d (fn (src) (flow-run src)))
|
||||
|
||||
;; ── remote-node ─────────────────────────────────────────────────
|
||||
(flow-dist-test
|
||||
"remote: a node executes on a peer"
|
||||
(flow-d
|
||||
"(flow-peer-register! (quote edge) (list (list (quote double) (lambda (x) (* x 2))))) (flow/start (remote-node (quote edge) (quote double)) 21)")
|
||||
42)
|
||||
(flow-dist-test
|
||||
"remote: remote nodes compose in a sequence"
|
||||
(flow-d
|
||||
"(flow-peer-register! (quote edge) (list (list (quote inc) (lambda (x) (+ x 1))) (list (quote double) (lambda (x) (* x 2))))) (flow/start (sequence (remote-node (quote edge) (quote inc)) (remote-node (quote edge) (quote double))) 4)")
|
||||
10)
|
||||
(flow-dist-test
|
||||
"remote: a remote node mixes with local nodes"
|
||||
(flow-d
|
||||
"(flow-peer-register! (quote edge) (list (list (quote double) (lambda (x) (* x 2))))) (flow/start (sequence (lambda (x) (+ x 5)) (remote-node (quote edge) (quote double)) (lambda (x) (- x 1))) 10)")
|
||||
29)
|
||||
(flow-dist-test
|
||||
"remote: unreachable peer raises flow-remote-unreachable"
|
||||
(flow-d
|
||||
"(flow/start (try-catch (remote-node (quote ghost) (quote double)) (lambda (e) e)) 1)")
|
||||
"flow-remote-unreachable")
|
||||
(flow-dist-test
|
||||
"remote: unknown function on a peer raises flow-remote-no-fn"
|
||||
(flow-d
|
||||
"(flow-peer-register! (quote edge) (list (list (quote double) (lambda (x) (* x 2))))) (flow/start (try-catch (remote-node (quote edge) (quote missing)) (lambda (e) e)) 1)")
|
||||
"flow-remote-no-fn")
|
||||
(flow-dist-test
|
||||
"remote: a remote node can suspend the flow (peer returns control)"
|
||||
(flow-d
|
||||
"(flow-peer-register! (quote edge) (list (list (quote review) (lambda (x) x)))) (flow/start (sequence (remote-node (quote edge) (quote review)) (lambda (x) (suspend (quote human))) (lambda (v) (list (quote published) v))) 7)")
|
||||
(list "flow-suspended" 1 "human"))
|
||||
(flow-dist-test
|
||||
"remote: a transient remote failure is recoverable with retry"
|
||||
(flow-d
|
||||
"(define hits 0) (flow-peer-register! (quote edge) (list (list (quote flaky) (lambda (x) (begin (set! hits (+ hits 1)) (if (< hits 2) (raise (quote down)) (* x 3))))))) (list (flow/start (retry 3 (remote-node (quote edge) (quote flaky))) 7) hits)")
|
||||
(list 21 2))
|
||||
|
||||
;; ── failover (retry on a different peer, fall through to local) ──
|
||||
(flow-dist-test
|
||||
"failover: first reachable peer serves the request"
|
||||
(flow-d
|
||||
"(flow-peer-register! (quote p2) (list (list (quote f) (lambda (x) (+ x 100))))) (flow/start (remote-failover (list (quote p2) (quote down)) (quote f) (flow-const (quote local))) 5)")
|
||||
105)
|
||||
(flow-dist-test
|
||||
"failover: skips an unreachable peer to the next one"
|
||||
(flow-d
|
||||
"(flow-peer-register! (quote p2) (list (list (quote f) (lambda (x) (+ x 100))))) (flow/start (remote-failover (list (quote down) (quote p2)) (quote f) (flow-const (quote local))) 5)")
|
||||
105)
|
||||
(flow-dist-test
|
||||
"failover: skips a peer whose function raises"
|
||||
(flow-d
|
||||
"(flow-peer-register! (quote bad) (list (list (quote f) (lambda (x) (raise (quote boom)))))) (flow-peer-register! (quote good) (list (list (quote f) (lambda (x) (* x 10))))) (flow/start (remote-failover (list (quote bad) (quote good)) (quote f) (flow-const 0)) 4)")
|
||||
40)
|
||||
(flow-dist-test
|
||||
"failover: all peers fail, the local fallback runs"
|
||||
(flow-d
|
||||
"(flow/start (remote-failover (list (quote down1) (quote down2)) (quote f) (lambda (x) (* x -1))) 9)")
|
||||
-9)
|
||||
(flow-dist-test
|
||||
"failover: threads the input through to the chosen peer"
|
||||
(flow-d
|
||||
"(flow-peer-register! (quote p) (list (list (quote f) (lambda (x) (list (quote got) x))))) (flow/start (sequence (lambda (x) (+ x 1)) (remote-failover (list (quote p)) (quote f) (flow-const 0))) 41)")
|
||||
(list "got" 42))
|
||||
(flow-dist-test
|
||||
"failover: composes inside a larger sequence"
|
||||
(flow-d
|
||||
"(flow-peer-register! (quote p) (list (list (quote f) (lambda (x) (* x 2))))) (flow/start (sequence (remote-failover (list (quote down) (quote p)) (quote f) (flow-const 1)) (lambda (x) (+ x 3))) 5)")
|
||||
13)
|
||||
|
||||
;; ── replication + handoff ───────────────────────────────────────
|
||||
(flow-dist-test
|
||||
"replicate: a peer holds the exported store"
|
||||
(flow-d
|
||||
"(defflow w (lambda (x) (suspend (quote q)))) (flow/start w 10) (flow-replicate-to (quote peerB)) (if (flow-replica-get (quote peerB)) (quote replicated) (quote missing))")
|
||||
"replicated")
|
||||
(flow-dist-test
|
||||
"handoff: a peer resumes a flow after the local instance dies"
|
||||
(flow-d
|
||||
"(defflow w (sequence (lambda (x) (suspend (quote q))) (lambda (v) (list (quote done) v)))) (define id (car (cdr (flow/start w 10)))) (flow-replicate-to (quote peerB)) (set! flow-store (list)) (flow-restore-from (quote peerB)) (flow/resume id 55)")
|
||||
(list "done" 55))
|
||||
(flow-dist-test
|
||||
"handoff: restored peer reports the flow as resumable"
|
||||
(flow-d
|
||||
"(defflow w (lambda (x) (suspend (quote q)))) (define id (car (cdr (flow/start w 10)))) (flow-replicate-to (quote peerB)) (set! flow-store (list)) (flow-restore-from (quote peerB)) (flow-resumable-ids)")
|
||||
(list 1))
|
||||
(flow-dist-test
|
||||
"handoff: without restore the dead instance has lost the flow"
|
||||
(flow-d
|
||||
"(defflow w (lambda (x) (suspend (quote q)))) (define id (car (cdr (flow/start w 10)))) (flow-replicate-to (quote peerB)) (set! flow-store (list)) (flow/resume id 1)")
|
||||
(list "flow-error" "no-such-flow"))
|
||||
(flow-dist-test
|
||||
"restore: from an unknown peer yields false"
|
||||
(flow-d "(flow-restore-from (quote nowhere))")
|
||||
false)
|
||||
(flow-dist-test
|
||||
"handoff: replication preserves the replay log across the move"
|
||||
(flow-d
|
||||
"(defflow two (sequence (lambda (x) (suspend (quote a))) (lambda (x) (suspend (quote b))) (lambda (x) (list x)))) (define id (car (cdr (flow/start two 0)))) (flow/resume id 11) (flow-replicate-to (quote peerB)) (set! flow-store (list)) (flow-restore-from (quote peerB)) (flow/resume id 22)")
|
||||
(list 22))
|
||||
|
||||
(define flow-dist-tests-run! (fn () {:total (+ flow-dist-pass flow-dist-fail) :passed flow-dist-pass :failed flow-dist-fail :fails flow-dist-fails}))
|
||||
106
lib/flow/tests/host.sx
Normal file
106
lib/flow/tests/host.sx
Normal file
@@ -0,0 +1,106 @@
|
||||
;; lib/flow/tests/host.sx — Phase 8: host integration ABI (request/await/host-queue/driver).
|
||||
|
||||
(define flow-hst-pass 0)
|
||||
(define flow-hst-fail 0)
|
||||
(define flow-hst-fails (list))
|
||||
|
||||
(define
|
||||
flow-hst-test
|
||||
(fn
|
||||
(name actual expected)
|
||||
(if
|
||||
(= actual expected)
|
||||
(set! flow-hst-pass (+ flow-hst-pass 1))
|
||||
(begin
|
||||
(set! flow-hst-fail (+ flow-hst-fail 1))
|
||||
(append! flow-hst-fails {:name name :expected expected :actual actual})))))
|
||||
|
||||
(define flow-hst (fn (src) (flow-run src)))
|
||||
|
||||
;; ── request envelope ────────────────────────────────────────────
|
||||
(flow-hst-test
|
||||
"request: suspends with a typed envelope"
|
||||
(flow-hst
|
||||
"(car (cdr (cdr (flow/start (lambda (x) (request (quote render) x)) 5))))")
|
||||
(list "flow-request" "render" 5))
|
||||
(flow-hst-test
|
||||
"request?: recognizes an envelope"
|
||||
(flow-hst "(request? (list (quote flow-request) (quote human) 1))")
|
||||
true)
|
||||
(flow-hst-test
|
||||
"request?: a plain tag is not a request"
|
||||
(flow-hst "(request? (list (quote review) 1))")
|
||||
false)
|
||||
(flow-hst-test
|
||||
"request-kind / request-payload: parse the envelope"
|
||||
(flow-hst
|
||||
"(define t (list (quote flow-request) (quote render) (list (quote recipe) 7))) (list (request-kind t) (request-payload t))")
|
||||
(list "render" (list "recipe" 7)))
|
||||
|
||||
;; ── named decision points ───────────────────────────────────────
|
||||
(flow-hst-test
|
||||
"await-human: is a request of kind human"
|
||||
(flow-hst
|
||||
"(car (cdr (cdr (flow/start (lambda (x) (await-human x)) (quote approve?)))))")
|
||||
(list "flow-request" "human" "approve?"))
|
||||
(flow-hst-test
|
||||
"await-render: is a request of kind render"
|
||||
(flow-hst
|
||||
"(car (cdr (cdr (flow/start (lambda (x) (await-render x)) (quote recipe)))))")
|
||||
(list "flow-request" "render" "recipe"))
|
||||
(flow-hst-test
|
||||
"request: the host's resume value flows back into the flow"
|
||||
(flow-hst
|
||||
"(defflow f (sequence (lambda (x) (await-render x)) (lambda (art) (list (quote got) art)))) (define id (car (cdr (flow/start f 1)))) (flow/resume id (quote the-artifact))")
|
||||
(list "got" "the-artifact"))
|
||||
|
||||
;; ── host work queue ─────────────────────────────────────────────
|
||||
(flow-hst-test
|
||||
"flow-host-requests: lists (id kind payload) for pending requests"
|
||||
(flow-hst
|
||||
"(flow/start (lambda (x) (await-render x)) 99) (flow-host-requests)")
|
||||
(list (list 1 "render" 99)))
|
||||
(flow-hst-test
|
||||
"flow-host-requests: excludes bare (non-request) suspends"
|
||||
(flow-hst
|
||||
"(defflow a (lambda (x) (await-render x))) (defflow b (lambda (x) (suspend (quote plain)))) (flow/start a 1) (flow/start b 2) (flow-host-requests)")
|
||||
(list (list 1 "render" 1)))
|
||||
|
||||
;; ── the art-dag-shaped host driver loop (manual resumes) ────────
|
||||
(flow-hst-test
|
||||
"host driver: render then human-review then publish"
|
||||
(flow-hst
|
||||
"(defflow pipeline (sequence (lambda (recipe) (await-render recipe)) (lambda (art) (await-human (list (quote review) art))) (branch (lambda (d) (eq? d (quote approve))) (flow-const (quote published)) (flow-const (fail (quote rejected)))))) (define id (car (cdr (flow/start pipeline 99)))) (define r1 (flow-host-requests)) (flow/resume id (list (quote art) 99)) (define r2 (flow-host-requests)) (flow/resume id (quote approve)) (list r1 r2 (flow/status id) (flow/result id))")
|
||||
(list
|
||||
(list (list 1 "render" 99))
|
||||
(list (list 1 "human" (list "review" (list "art" 99))))
|
||||
"done"
|
||||
"published"))
|
||||
(flow-hst-test
|
||||
"host driver: rejection at the human gate yields a failure"
|
||||
(flow-hst
|
||||
"(defflow pipeline (sequence (lambda (recipe) (await-render recipe)) (lambda (art) (await-human (list (quote review) art))) (branch (lambda (d) (eq? d (quote approve))) (flow-const (quote published)) (flow-const (fail (quote rejected)))))) (define id (car (cdr (flow/start pipeline 1)))) (flow/resume id (quote artifact)) (failed? (flow/resume id (quote reject)))")
|
||||
true)
|
||||
|
||||
;; ── reference driver: host supplies only a dispatch fn ──────────
|
||||
(flow-hst-test
|
||||
"flow-drive-host: one tick services every pending request"
|
||||
(flow-hst
|
||||
"(flow/start (lambda (x) (await-render x)) 5) (define n (flow-drive-host (lambda (k p) (list (quote done) p)))) (list n (flow/status 1) (flow/result 1))")
|
||||
(list 1 "done" (list "done" 5)))
|
||||
(flow-hst-test
|
||||
"flow-run-host: drives a render -> human pipeline to completion"
|
||||
(flow-hst
|
||||
"(defflow pipeline (sequence (lambda (recipe) (await-render recipe)) (lambda (art) (await-human (list (quote review) art))) (branch (lambda (d) (eq? d (quote approve))) (flow-const (quote published)) (flow-const (fail (quote rejected)))))) (define id (car (cdr (flow/start pipeline 99)))) (define serviced (flow-run-host (lambda (kind payload) (if (eq? kind (quote render)) (list (quote art) payload) (quote approve))) 10)) (list serviced (flow/status id) (flow/result id))")
|
||||
(list 2 "done" "published"))
|
||||
(flow-hst-test
|
||||
"flow-run-host: returns 0 when nothing is pending"
|
||||
(flow-hst "(flow-run-host (lambda (k p) p) 5)")
|
||||
0)
|
||||
(flow-hst-test
|
||||
"flow-run-host: respects the maxticks bound"
|
||||
(flow-hst
|
||||
"(defflow pipe2 (sequence (lambda (r) (await-render r)) (lambda (a) (await-human a)) (lambda (d) d))) (define id (car (cdr (flow/start pipe2 1)))) (define serviced (flow-run-host (lambda (k p) p) 1)) (list serviced (flow/status id))")
|
||||
(list 1 "suspended"))
|
||||
|
||||
(define flow-hst-tests-run! (fn () {:total (+ flow-hst-pass flow-hst-fail) :passed flow-hst-pass :failed flow-hst-fail :fails flow-hst-fails}))
|
||||
67
lib/flow/tests/hygiene.sx
Normal file
67
lib/flow/tests/hygiene.sx
Normal file
@@ -0,0 +1,67 @@
|
||||
;; lib/flow/tests/hygiene.sx — Phase 5: store hygiene (flow/gc, flow/forget).
|
||||
|
||||
(define flow-hyg-pass 0)
|
||||
(define flow-hyg-fail 0)
|
||||
(define flow-hyg-fails (list))
|
||||
|
||||
(define
|
||||
flow-hyg-test
|
||||
(fn
|
||||
(name actual expected)
|
||||
(if
|
||||
(= actual expected)
|
||||
(set! flow-hyg-pass (+ flow-hyg-pass 1))
|
||||
(begin
|
||||
(set! flow-hyg-fail (+ flow-hyg-fail 1))
|
||||
(append! flow-hyg-fails {:name name :expected expected :actual actual})))))
|
||||
|
||||
(define flow-h (fn (src) (flow-run src)))
|
||||
|
||||
;; ── flow/gc ─────────────────────────────────────────────────────
|
||||
(flow-hyg-test
|
||||
"gc: empty store removes nothing"
|
||||
(flow-h "(flow/gc)")
|
||||
0)
|
||||
(flow-hyg-test
|
||||
"gc: removes a done flow, keeps a suspended one"
|
||||
(flow-h
|
||||
"(defflow w (lambda (x) (suspend (quote q)))) (flow/start w 0) (flow/start (lambda (x) x) 5) (define removed (flow/gc)) (list removed (flow/list))")
|
||||
(list 1 (list (list 1 "suspended"))))
|
||||
(flow-hyg-test
|
||||
"gc: removes a cancelled flow"
|
||||
(flow-h
|
||||
"(defflow w (lambda (x) (suspend (quote q)))) (define id (car (cdr (flow/start w 0)))) (flow/cancel id) (flow/gc)")
|
||||
1)
|
||||
(flow-hyg-test
|
||||
"gc: a kept suspended flow is still resumable"
|
||||
(flow-h
|
||||
"(defflow w (sequence (lambda (x) (suspend (quote q))) (lambda (v) (* v 2)))) (define id (car (cdr (flow/start w 0)))) (flow/start (lambda (x) x) 1) (flow/gc) (flow/resume id 21)")
|
||||
42)
|
||||
(flow-hyg-test
|
||||
"gc: counts every terminal flow it drops"
|
||||
(flow-h
|
||||
"(flow/start (lambda (x) x) 1) (flow/start (lambda (x) x) 2) (defflow w (lambda (x) (suspend (quote q)))) (flow/start w 0) (flow/gc)")
|
||||
2)
|
||||
|
||||
;; ── flow/forget ─────────────────────────────────────────────────
|
||||
(flow-hyg-test
|
||||
"forget: drops a completed flow"
|
||||
(flow-h
|
||||
"(defflow w (sequence (lambda (x) (suspend (quote q))) (lambda (v) v))) (define id (car (cdr (flow/start w 0)))) (flow/resume id 7) (list (flow/forget id) (flow/status id))")
|
||||
(list true "unknown"))
|
||||
(flow-hyg-test
|
||||
"forget: refuses to drop a live (suspended) flow"
|
||||
(flow-h
|
||||
"(defflow w (lambda (x) (suspend (quote q)))) (define id (car (cdr (flow/start w 0)))) (list (flow/forget id) (flow/status id))")
|
||||
(list false "suspended"))
|
||||
(flow-hyg-test
|
||||
"forget: drops a cancelled flow"
|
||||
(flow-h
|
||||
"(defflow w (lambda (x) (suspend (quote q)))) (define id (car (cdr (flow/start w 0)))) (flow/cancel id) (list (flow/forget id) (flow/status id))")
|
||||
(list true "unknown"))
|
||||
(flow-hyg-test
|
||||
"forget: unknown id yields false"
|
||||
(flow-h "(flow/forget 999)")
|
||||
false)
|
||||
|
||||
(define flow-hyg-tests-run! (fn () {:total (+ flow-hyg-pass flow-hyg-fail) :passed flow-hyg-pass :failed flow-hyg-fail :fails flow-hyg-fails}))
|
||||
115
lib/flow/tests/integration.sx
Normal file
115
lib/flow/tests/integration.sx
Normal file
@@ -0,0 +1,115 @@
|
||||
;; lib/flow/tests/integration.sx — Phase 7: end-to-end flows composing every phase.
|
||||
|
||||
(define flow-int-pass 0)
|
||||
(define flow-int-fail 0)
|
||||
(define flow-int-fails (list))
|
||||
|
||||
(define
|
||||
flow-int-test
|
||||
(fn
|
||||
(name actual expected)
|
||||
(if
|
||||
(= actual expected)
|
||||
(set! flow-int-pass (+ flow-int-pass 1))
|
||||
(begin
|
||||
(set! flow-int-fail (+ flow-int-fail 1))
|
||||
(append! flow-int-fails {:name name :expected expected :actual actual})))))
|
||||
|
||||
(define flow-i (fn (src) (flow-run src)))
|
||||
|
||||
;; The order-processing flow, defined once per program via this prelude string:
|
||||
;; validate amount (attempt: fail if <= 0)
|
||||
;; -> suspend for payment confirmation (resume value = confirmed amount)
|
||||
;; -> branch: confirmed>0 ? record on the ledger peer : declined failure
|
||||
(define
|
||||
order-prelude
|
||||
"(flow-peer-register! (quote ledger) (list (list (quote record) (lambda (amt) (list (quote recorded) amt)))))\n (defflow order\n (attempt\n (lambda (amt) (if (> amt 0) amt (fail (quote invalid-amount))))\n (lambda (amt) (suspend (quote await-payment)))\n (branch (lambda (amt) (> amt 0))\n (remote-node (quote ledger) (quote record))\n (flow-const (fail (quote declined))))))")
|
||||
|
||||
;; ── happy path through every phase ──────────────────────────────
|
||||
(flow-int-test
|
||||
"order: validate -> suspend -> resume -> branch -> federate"
|
||||
(flow-i
|
||||
(str
|
||||
order-prelude
|
||||
"(define id (car (cdr (flow/start order 100)))) (flow/resume id 250)"))
|
||||
(list "recorded" 250))
|
||||
(flow-int-test
|
||||
"order: starting suspends awaiting payment"
|
||||
(flow-i
|
||||
(str
|
||||
order-prelude
|
||||
"(define s (flow/start order 100)) (list (car s) (car (cdr (cdr s))))"))
|
||||
(list "flow-suspended" "await-payment"))
|
||||
(flow-int-test
|
||||
"order: invalid amount fails up front and never suspends"
|
||||
(flow-i
|
||||
(str
|
||||
order-prelude
|
||||
"(define r (flow/start order -5)) (list (failed? r) (fail-reason r))"))
|
||||
(list true "invalid-amount"))
|
||||
(flow-int-test
|
||||
"order: a declined payment yields a failure value"
|
||||
(flow-i
|
||||
(str
|
||||
order-prelude
|
||||
"(define id (car (cdr (flow/start order 100)))) (failed? (flow/resume id 0))"))
|
||||
true)
|
||||
|
||||
;; ── crash recovery mid-flow ─────────────────────────────────────
|
||||
(flow-int-test
|
||||
"order: survives a simulated crash between suspend and resume"
|
||||
(flow-i
|
||||
(str
|
||||
order-prelude
|
||||
"(define id (car (cdr (flow/start order 100)))) (define saved (flow-store-export)) (set! flow-store (list)) (flow-store-import! saved) (flow/resume id 250)"))
|
||||
(list "recorded" 250))
|
||||
|
||||
;; ── handoff to a peer mid-flow ──────────────────────────────────
|
||||
(flow-int-test
|
||||
"order: hands off to a peer that resumes and completes"
|
||||
(flow-i
|
||||
(str
|
||||
order-prelude
|
||||
"(define id (car (cdr (flow/start order 100)))) (flow-replicate-to (quote nodeB)) (set! flow-store (list)) (flow-restore-from (quote nodeB)) (flow/resume id 250)"))
|
||||
(list "recorded" 250))
|
||||
|
||||
;; ── introspection during the flow's life ────────────────────────
|
||||
(flow-int-test
|
||||
"order: pending shows what the flow awaits, then result after resume"
|
||||
(flow-i
|
||||
(str
|
||||
order-prelude
|
||||
"(define id (car (cdr (flow/start order 100)))) (define p (flow/pending)) (flow/resume id 250) (list p (flow/status id) (flow/result id))"))
|
||||
(list
|
||||
(list (list 1 "await-payment"))
|
||||
"done"
|
||||
(list "recorded" 250)))
|
||||
|
||||
;; ── onboarding: two human steps + cancellation ──────────────────
|
||||
(define
|
||||
onboard-prelude
|
||||
"(defflow onboard\n (sequence\n (lambda (user) (+ user 1))\n (lambda (x) (suspend (quote confirm-email)))\n (lambda (x) (suspend (quote complete-profile)))\n (lambda (x) (list (quote onboarded) x))))")
|
||||
|
||||
(flow-int-test
|
||||
"onboard: two suspends resume in order to completion"
|
||||
(flow-i
|
||||
(str
|
||||
onboard-prelude
|
||||
"(define id (car (cdr (flow/start onboard 0)))) (flow/resume id 7) (flow/resume id 9)"))
|
||||
(list "onboarded" 9))
|
||||
(flow-int-test
|
||||
"onboard: the second pending tag appears after the first resume"
|
||||
(flow-i
|
||||
(str
|
||||
onboard-prelude
|
||||
"(define id (car (cdr (flow/start onboard 0)))) (flow/resume id 7) (car (cdr (car (flow/pending))))"))
|
||||
"complete-profile")
|
||||
(flow-int-test
|
||||
"onboard: cancelling abandons the flow"
|
||||
(flow-i
|
||||
(str
|
||||
onboard-prelude
|
||||
"(define id (car (cdr (flow/start onboard 0)))) (flow/cancel id) (list (flow/status id) (car (flow/resume id 7)))"))
|
||||
(list "cancelled" "flow-error"))
|
||||
|
||||
(define flow-int-tests-run! (fn () {:total (+ flow-int-pass flow-int-fail) :passed flow-int-pass :failed flow-int-fail :fails flow-int-fails}))
|
||||
73
lib/flow/tests/railway.sx
Normal file
73
lib/flow/tests/railway.sx
Normal file
@@ -0,0 +1,73 @@
|
||||
;; lib/flow/tests/railway.sx — Phase 6: railway-oriented composition (attempt).
|
||||
|
||||
(define flow-rail-pass 0)
|
||||
(define flow-rail-fail 0)
|
||||
(define flow-rail-fails (list))
|
||||
|
||||
(define
|
||||
flow-rail-test
|
||||
(fn
|
||||
(name actual expected)
|
||||
(if
|
||||
(= actual expected)
|
||||
(set! flow-rail-pass (+ flow-rail-pass 1))
|
||||
(begin
|
||||
(set! flow-rail-fail (+ flow-rail-fail 1))
|
||||
(append! flow-rail-fails {:name name :expected expected :actual actual})))))
|
||||
|
||||
(define flow-r (fn (src) (flow-run src)))
|
||||
|
||||
;; ── attempt — short-circuit on the first (fail ...) ─────────────
|
||||
(flow-rail-test
|
||||
"attempt: threads like sequence when nothing fails"
|
||||
(flow-r
|
||||
"(flow/start (attempt (lambda (x) (+ x 1)) (lambda (x) (* x 10))) 4)")
|
||||
50)
|
||||
(flow-rail-test
|
||||
"attempt: empty is identity"
|
||||
(flow-r "(flow/start (attempt) 7)")
|
||||
7)
|
||||
(flow-rail-test
|
||||
"attempt: returns the first failure"
|
||||
(flow-r
|
||||
"(failed? (flow/start (attempt (lambda (x) (fail (quote bad))) (lambda (x) (* x 10))) 4))")
|
||||
true)
|
||||
(flow-rail-test
|
||||
"attempt: the failure carries its reason"
|
||||
(flow-r
|
||||
"(fail-reason (flow/start (attempt (lambda (x) x) (lambda (x) (fail (quote rejected)))) 4))")
|
||||
"rejected")
|
||||
(flow-rail-test
|
||||
"attempt: nodes after a failure do not run"
|
||||
(flow-r
|
||||
"(define ran 0) (flow/start (attempt (lambda (x) (fail (quote stop))) (lambda (x) (begin (set! ran (+ ran 1)) x))) 0) ran")
|
||||
0)
|
||||
(flow-rail-test
|
||||
"attempt: a failed input short-circuits immediately"
|
||||
(flow-r
|
||||
"(define ran 0) (fail-reason (flow/start (attempt (lambda (x) (begin (set! ran (+ ran 1)) x))) (fail (quote pre))))")
|
||||
"pre")
|
||||
(flow-rail-test
|
||||
"attempt: middle failure halts the chain"
|
||||
(flow-r
|
||||
"(define ran 0) (flow/start (attempt (lambda (x) (+ x 1)) (lambda (x) (fail (quote mid))) (lambda (x) (begin (set! ran (+ ran 1)) x))) 5) ran")
|
||||
0)
|
||||
|
||||
;; ── attempt + recover (rejoin the happy track) ──────────────────
|
||||
(flow-rail-test
|
||||
"attempt + recover: recover turns a failure into a value"
|
||||
(flow-r
|
||||
"(flow/start (recover (attempt (lambda (x) (if (> x 0) x (fail (quote non-positive)))) (lambda (x) (* x 2))) (flow-const 0)) -5)")
|
||||
0)
|
||||
(flow-rail-test
|
||||
"attempt + recover: happy path passes recover through"
|
||||
(flow-r
|
||||
"(flow/start (recover (attempt (lambda (x) (if (> x 0) x (fail (quote non-positive)))) (lambda (x) (* x 2))) (flow-const 0)) 5)")
|
||||
10)
|
||||
(flow-rail-test
|
||||
"attempt: validation pipeline reports the failing stage"
|
||||
(flow-r
|
||||
"(defflow validate (attempt (lambda (s) (if (>= (string-length s) 3) s (fail (quote too-short)))) (lambda (s) (if (<= (string-length s) 8) s (fail (quote too-long)))) (lambda (s) (list (quote ok) (string-length s))))) (list (fail-reason (flow/start validate \"hi\")) (flow/start validate \"hello\"))")
|
||||
(list "too-short" (list "ok" 5)))
|
||||
|
||||
(define flow-rail-tests-run! (fn () {:total (+ flow-rail-pass flow-rail-fail) :passed flow-rail-pass :failed flow-rail-fail :fails flow-rail-fails}))
|
||||
71
lib/flow/tests/recovery.sx
Normal file
71
lib/flow/tests/recovery.sx
Normal file
@@ -0,0 +1,71 @@
|
||||
;; lib/flow/tests/recovery.sx — Phase 3: crash recovery (store export/import + restart).
|
||||
;;
|
||||
;; "restart" is simulated within one program: (set! flow-store (list)) wipes the
|
||||
;; in-memory store (process death), while flow-registry persists as it would after
|
||||
;; reloading flow definitions. Recovery = import the exported (plain-data) store and
|
||||
;; resume; the flow proc is re-resolved by name.
|
||||
|
||||
(define flow-rec-pass 0)
|
||||
(define flow-rec-fail 0)
|
||||
(define flow-rec-fails (list))
|
||||
|
||||
(define
|
||||
flow-rec-test
|
||||
(fn
|
||||
(name actual expected)
|
||||
(if
|
||||
(= actual expected)
|
||||
(set! flow-rec-pass (+ flow-rec-pass 1))
|
||||
(begin
|
||||
(set! flow-rec-fail (+ flow-rec-fail 1))
|
||||
(append! flow-rec-fails {:name name :expected expected :actual actual})))))
|
||||
|
||||
(define flow-r (fn (src) (flow-run src)))
|
||||
|
||||
;; ── export / wipe / import ──────────────────────────────────────
|
||||
(flow-rec-test
|
||||
"export nulls the live procedure"
|
||||
(flow-r
|
||||
"(defflow w (lambda (x) (suspend (quote await)))) (flow/start w 10) (car (cdr (car (cdr (car (flow-store-export))))))")
|
||||
false)
|
||||
(flow-rec-test
|
||||
"a wiped store loses the flow (process death)"
|
||||
(flow-r
|
||||
"(defflow w (lambda (x) (suspend (quote await)))) (define id (car (cdr (flow/start w 10)))) (set! flow-store (list)) (flow/resume id 1)")
|
||||
(list "flow-error" "no-such-flow"))
|
||||
(flow-rec-test
|
||||
"import restores a wiped store and resume completes"
|
||||
(flow-r
|
||||
"(defflow w (sequence (lambda (x) (suspend (quote await))) (lambda (c) (list (quote done) c)))) (define id (car (cdr (flow/start w 10)))) (define saved (flow-store-export)) (set! flow-store (list)) (flow-store-import! saved) (flow/resume id 777)")
|
||||
(list "done" 777))
|
||||
|
||||
;; ── resumable scan ──────────────────────────────────────────────
|
||||
(flow-rec-test
|
||||
"resumable-ids lists the suspended flow after import"
|
||||
(flow-r
|
||||
"(defflow w (lambda (x) (suspend (quote await)))) (define id (car (cdr (flow/start w 10)))) (define saved (flow-store-export)) (set! flow-store (list)) (flow-store-import! saved) (flow-resumable-ids)")
|
||||
(list 1))
|
||||
(flow-rec-test
|
||||
"resumable-ids excludes completed flows"
|
||||
(flow-r
|
||||
"(defflow w (sequence (lambda (x) (suspend (quote await))) (lambda (c) c))) (define id (car (cdr (flow/start w 10)))) (flow/resume id 5) (flow-resumable-ids)")
|
||||
(list))
|
||||
(flow-rec-test
|
||||
"resumable-ids excludes cancelled flows after import"
|
||||
(flow-r
|
||||
"(defflow w (lambda (x) (suspend (quote await)))) (define id (car (cdr (flow/start w 10)))) (flow/cancel id) (define saved (flow-store-export)) (set! flow-store (list)) (flow-store-import! saved) (flow-resumable-ids)")
|
||||
(list))
|
||||
|
||||
;; ── restart at every step ───────────────────────────────────────
|
||||
(flow-rec-test
|
||||
"two suspends survive a restart between each step"
|
||||
(flow-r
|
||||
"(defflow two (sequence (lambda (x) (suspend (quote a))) (lambda (x) (suspend (quote b))) (lambda (x) (list (quote end) x)))) (define id (car (cdr (flow/start two 0)))) (define s1 (flow-store-export)) (set! flow-store (list)) (flow-store-import! s1) (flow/resume id 100) (define s2 (flow-store-export)) (set! flow-store (list)) (flow-store-import! s2) (flow/resume id 200)")
|
||||
(list "end" 200))
|
||||
(flow-rec-test
|
||||
"import preserves the replay log (earlier value survives restart)"
|
||||
(flow-r
|
||||
"(defflow two (sequence (lambda (x) (suspend (quote a))) (lambda (x) (suspend (quote b))) (lambda (x) (list x)))) (define id (car (cdr (flow/start two 0)))) (flow/resume id 11) (define saved (flow-store-export)) (set! flow-store (list)) (flow-store-import! saved) (flow/resume id 22)")
|
||||
(list 22))
|
||||
|
||||
(define flow-rec-tests-run! (fn () {:total (+ flow-rec-pass flow-rec-fail) :passed flow-rec-pass :failed flow-rec-fail :fails flow-rec-fails}))
|
||||
114
lib/flow/tests/suspend.sx
Normal file
114
lib/flow/tests/suspend.sx
Normal file
@@ -0,0 +1,114 @@
|
||||
;; lib/flow/tests/suspend.sx — Phase 3: suspend / resume / cancel (deterministic replay).
|
||||
|
||||
(define flow-sus-pass 0)
|
||||
(define flow-sus-fail 0)
|
||||
(define flow-sus-fails (list))
|
||||
|
||||
(define
|
||||
flow-sus-test
|
||||
(fn
|
||||
(name actual expected)
|
||||
(if
|
||||
(= actual expected)
|
||||
(set! flow-sus-pass (+ flow-sus-pass 1))
|
||||
(begin
|
||||
(set! flow-sus-fail (+ flow-sus-fail 1))
|
||||
(append! flow-sus-fails {:name name :expected expected :actual actual})))))
|
||||
|
||||
(define flow-s (fn (src) (flow-run src)))
|
||||
|
||||
;; ── flow/start ──────────────────────────────────────────────────
|
||||
(flow-sus-test
|
||||
"start: non-suspending flow returns the raw result"
|
||||
(flow-s "(flow/start (lambda (x) (* x 2)) 5)")
|
||||
10)
|
||||
(flow-sus-test
|
||||
"start: a suspending flow returns a flow-suspended state"
|
||||
(flow-s
|
||||
"(defflow w (sequence (lambda (x) (+ x 1)) (lambda (g) (suspend (quote await))) (lambda (c) c))) (car (flow/start w 10))")
|
||||
"flow-suspended")
|
||||
(flow-sus-test
|
||||
"start: suspended state carries a numeric id"
|
||||
(flow-s
|
||||
"(defflow w (lambda (x) (suspend (quote await)))) (car (cdr (flow/start w 10)))")
|
||||
1)
|
||||
(flow-sus-test
|
||||
"start: suspended state carries the suspend tag"
|
||||
(flow-s
|
||||
"(defflow w (lambda (x) (suspend (quote await)))) (car (cdr (cdr (flow/start w 10))))")
|
||||
"await")
|
||||
|
||||
;; ── flow/resume ─────────────────────────────────────────────────
|
||||
(flow-sus-test
|
||||
"resume: injects the value and completes"
|
||||
(flow-s
|
||||
"(defflow w (sequence (lambda (x) (+ x 1)) (lambda (g) (suspend (quote await))) (lambda (c) (list (quote done) c)))) (define s (flow/start w 10)) (flow/resume (car (cdr s)) 777)")
|
||||
(list "done" 777))
|
||||
(flow-sus-test
|
||||
"resume: injected value threads into the next node"
|
||||
(flow-s
|
||||
"(defflow w (sequence (lambda (x) (suspend (quote v))) (lambda (n) (* n 3)))) (define s (flow/start w 0)) (flow/resume (car (cdr s)) 14)")
|
||||
42)
|
||||
(flow-sus-test
|
||||
"resume: replays earlier suspends (recompute is deterministic)"
|
||||
(flow-s
|
||||
"(define runs 0) (defflow w (sequence (lambda (x) (begin (set! runs (+ runs 1)) (+ x 1))) (lambda (g) (suspend (quote await))) (lambda (c) c))) (define s (flow/start w 10)) (flow/resume (car (cdr s)) 99) runs")
|
||||
2)
|
||||
|
||||
;; ── multi-step suspension ───────────────────────────────────────
|
||||
(flow-sus-test
|
||||
"multi: first resume suspends at the next tag"
|
||||
(flow-s
|
||||
"(defflow two (sequence (lambda (x) (suspend (quote a))) (lambda (x) (suspend (quote b))) (lambda (x) (list (quote end) x)))) (define s (flow/start two 0)) (define s2 (flow/resume (car (cdr s)) 100)) (car (cdr (cdr s2)))")
|
||||
"b")
|
||||
(flow-sus-test
|
||||
"multi: second resume completes with the latest value"
|
||||
(flow-s
|
||||
"(defflow two (sequence (lambda (x) (suspend (quote a))) (lambda (x) (suspend (quote b))) (lambda (x) (list (quote end) x)))) (define id (car (cdr (flow/start two 0)))) (flow/resume id 100) (flow/resume id 200)")
|
||||
(list "end" 200))
|
||||
|
||||
;; ── error / lifecycle guards ────────────────────────────────────
|
||||
(flow-sus-test
|
||||
"resume: completed flow cannot be resumed again"
|
||||
(flow-s
|
||||
"(defflow w (lambda (x) (suspend (quote q)))) (define id (car (cdr (flow/start w 0)))) (flow/resume id 1) (flow/resume id 2)")
|
||||
(list "flow-error" "not-suspended"))
|
||||
(flow-sus-test
|
||||
"resume: unknown id errors"
|
||||
(flow-s "(flow/resume 999 1)")
|
||||
(list "flow-error" "no-such-flow"))
|
||||
|
||||
;; ── flow/cancel ─────────────────────────────────────────────────
|
||||
(flow-sus-test
|
||||
"cancel: returns a flow-cancelled state"
|
||||
(flow-s
|
||||
"(defflow w (lambda (x) (suspend (quote q)))) (define id (car (cdr (flow/start w 0)))) (flow/cancel id)")
|
||||
(list "flow-cancelled" 1))
|
||||
(flow-sus-test
|
||||
"cancel: a cancelled flow cannot be resumed (stale resume rejected)"
|
||||
(flow-s
|
||||
"(defflow w (lambda (x) (suspend (quote q)))) (define id (car (cdr (flow/start w 0)))) (flow/cancel id) (flow/resume id 5)")
|
||||
(list "flow-error" "not-suspended"))
|
||||
(flow-sus-test
|
||||
"cancel: unknown id errors"
|
||||
(flow-s "(flow/cancel 999)")
|
||||
(list "flow-error" "no-such-flow"))
|
||||
|
||||
;; ── composition ─────────────────────────────────────────────────
|
||||
(flow-sus-test
|
||||
"suspend inside a branch arm"
|
||||
(flow-s
|
||||
"(defflow gate (branch (lambda (x) (> x 0)) (lambda (x) (suspend (quote approve))) (flow-const (quote rejected)))) (define s (flow/start gate 5)) (flow/resume (car (cdr s)) (quote approved))")
|
||||
"approved")
|
||||
(flow-sus-test
|
||||
"two independent runs get independent ids"
|
||||
(flow-s
|
||||
"(defflow w (lambda (x) (suspend (quote q)))) (list (car (cdr (flow/start w 0))) (car (cdr (flow/start w 0))))")
|
||||
(list 1 2))
|
||||
(flow-sus-test
|
||||
"suspend reason may be a structured value"
|
||||
(flow-s
|
||||
"(defflow w (lambda (x) (suspend (list (quote needs) (quote approval))))) (car (cdr (cdr (flow/start w 0))))")
|
||||
(list "needs" "approval"))
|
||||
|
||||
(define flow-sus-tests-run! (fn () {:total (+ flow-sus-pass flow-sus-fail) :passed flow-sus-pass :failed flow-sus-fail :fails flow-sus-fails}))
|
||||
102
plans/acl-on-sx.md
Normal file
102
plans/acl-on-sx.md
Normal file
@@ -0,0 +1,102 @@
|
||||
# acl-on-sx: Access Control on Datalog
|
||||
|
||||
rose-ash needs fine-grained, explainable, federation-aware access control. Subjects
|
||||
(users, groups, roles, services) × actions (read, edit, comment, moderate, federate)
|
||||
× resources (pages, posts, threads, peers). Decisions must come with a trace — not just
|
||||
permit/deny, but **why**.
|
||||
|
||||
Datalog's bottom-up rule engine produces transparent permit/deny chains: the proof tree
|
||||
is the audit trail. Inheritance over groups + resource hierarchies is recursive Datalog
|
||||
in one rule. Federation extends naturally — fed-sx replicates ACL facts, peers reason
|
||||
over the union.
|
||||
|
||||
End-state: a Datalog-on-SX layer specifically for ACL, with explanation API, audit log,
|
||||
and federation extension. Reuses `lib/datalog/` evaluator and term model where possible.
|
||||
|
||||
## Status (rolling)
|
||||
|
||||
`bash lib/acl/conformance.sh` → **0/0** (not yet started)
|
||||
|
||||
## Ground rules
|
||||
|
||||
- **Scope:** only touch `lib/acl/**` and `plans/acl-on-sx.md`. Do **not** edit `spec/`,
|
||||
`hosts/`, `shared/`, `lib/datalog/**`, or other `lib/<lang>/`. You may **import**
|
||||
from `lib/datalog/` (its public API in `lib/datalog/datalog.sx`); do **not** copy or
|
||||
modify Datalog code.
|
||||
- **Shared-file issues** go under "Blockers" with a minimal repro; do not fix here.
|
||||
- **SX files:** use `sx-tree` MCP tools only.
|
||||
- **Architecture:** thin layer on top of `lib/datalog/`. Define schema, surface API,
|
||||
audit + federation hooks. The rule engine itself is Datalog's.
|
||||
- **Watch for shared patterns** going into `lib/guest/` — both acl-sx and mod-sx need
|
||||
rule-engine plumbing. If you find shared shape, flag it for extraction (don't
|
||||
extract yet — wait for mod-sx to start).
|
||||
- **Commits:** one feature per commit. Keep Progress log updated and tick boxes.
|
||||
|
||||
## Architecture sketch
|
||||
|
||||
```
|
||||
ACL declarations (SX) User query
|
||||
│ │
|
||||
▼ ▼
|
||||
lib/acl/schema.sx lib/acl/api.sx
|
||||
— subject sorts — (acl/permit? subj act res)
|
||||
— resource sorts — (acl/explain subj act res)
|
||||
— action sorts — (acl/audit subj act res :allowed?)
|
||||
— fact schema │
|
||||
│ ▼
|
||||
▼ lib/acl/engine.sx
|
||||
lib/acl/facts.sx — builds Datalog query
|
||||
— actor(id, kind) — invokes lib/datalog/
|
||||
— resource(id, kind) — extracts proof tree
|
||||
— member_of(actor, group) │
|
||||
— child_of(res, parent) ▼
|
||||
— grant(actor, act, res) lib/acl/audit.sx
|
||||
— deny (actor, act, res) — persistent decision log
|
||||
— query API
|
||||
```
|
||||
|
||||
## Phase 1 — Direct grants
|
||||
|
||||
- [ ] `lib/acl/schema.sx` — sorts: subject {user, group, role, service}, action,
|
||||
resource {page, post, thread, peer}
|
||||
- [ ] `lib/acl/facts.sx` — `actor`, `resource`, `grant`, `deny` predicates as Datalog
|
||||
EDB
|
||||
- [ ] `lib/acl/engine.sx` — `(permit? subj act res db)` reduces to Datalog query
|
||||
- [ ] `lib/acl/api.sx` — public `(acl/permit? ...)` taking implicit current db
|
||||
- [ ] `lib/acl/tests/direct.sx` — 15+ cases: direct grant, missing grant, explicit deny
|
||||
- [ ] `lib/acl/scoreboard.{json,md}` baseline
|
||||
- [ ] `lib/acl/conformance.sh` runs the suite
|
||||
|
||||
## Phase 2 — Inheritance
|
||||
|
||||
- [ ] `member_of(actor, group)` chain — group grants apply to members (transitive)
|
||||
- [ ] `child_of(res, parent)` chain — parent grants apply to children (transitive)
|
||||
- [ ] role expansion — role contains list of (action, resource) tuples
|
||||
- [ ] deny-overrides — explicit deny wins over inherited allow
|
||||
- [ ] `lib/acl/tests/inherit.sx` — 25+ cases: nested groups, deep resource trees,
|
||||
conflict resolution, deny precedence
|
||||
- [ ] document the deny-overrides choice in plan
|
||||
|
||||
## Phase 3 — Explanation + audit
|
||||
|
||||
- [ ] `(acl/explain subj act res)` → `{:allowed? T :proof <tree>}`
|
||||
- [ ] proof tree extracts from Datalog's derivation
|
||||
- [ ] `lib/acl/audit.sx` — append-only decision log (in-memory + serializer for disk)
|
||||
- [ ] `(acl/audit-tail n)` for recent decisions
|
||||
- [ ] `lib/acl/tests/explain.sx` — proof correctness, audit completeness
|
||||
|
||||
## Phase 4 — Federation
|
||||
|
||||
- [ ] peer trust facts — `peer(addr, kind)`, `trust(peer, level)`
|
||||
- [ ] delegated grants — `delegate(peer, actor, action, resource)`
|
||||
- [ ] cross-instance permit chain — query asks local + queries trusted peers via fed-sx
|
||||
- [ ] revocation propagation — fact retraction across federation
|
||||
- [ ] `lib/acl/tests/fed.sx` — federated grant chains (mock fed-sx transport in tests)
|
||||
|
||||
## Progress log
|
||||
|
||||
(loop fills this in)
|
||||
|
||||
## Blockers
|
||||
|
||||
(loop fills this in)
|
||||
93
plans/agent-briefings/acl-loop.md
Normal file
93
plans/agent-briefings/acl-loop.md
Normal file
@@ -0,0 +1,93 @@
|
||||
# acl-on-sx loop agent (single agent, queue-driven)
|
||||
|
||||
Role: iterates `plans/acl-on-sx.md` forever. **First subsystem loop after fed-sx.**
|
||||
Sits on `lib/datalog/` — rule engine reused, schema/api/audit/federation added on
|
||||
top. The deliverable isn't "implement Datalog ACL"; it's *also* to surface shared
|
||||
rule-engine plumbing into `lib/guest/` (the mod-sx loop will be the second consumer,
|
||||
validating extraction).
|
||||
|
||||
```
|
||||
description: acl-on-sx queue loop
|
||||
subagent_type: general-purpose
|
||||
run_in_background: true
|
||||
isolation: worktree
|
||||
```
|
||||
|
||||
## Prompt
|
||||
|
||||
You are the sole background agent working `/root/rose-ash/plans/acl-on-sx.md`.
|
||||
Isolated worktree, forever, one commit per feature. Push to `origin/loops/acl`
|
||||
after every commit.
|
||||
|
||||
## Restart baseline — check before iterating
|
||||
|
||||
1. Read `plans/acl-on-sx.md` — roadmap + Progress log.
|
||||
2. `ls lib/acl/` — pick up from the most advanced file.
|
||||
3. If `lib/acl/tests/*.sx` exist, run them via `bash lib/acl/conformance.sh`. Green
|
||||
before new work.
|
||||
4. If `lib/acl/scoreboard.md` exists, that's your baseline.
|
||||
5. Read `lib/datalog/datalog.sx` public API once — that's your substrate.
|
||||
|
||||
## The queue
|
||||
|
||||
Phase order per `plans/acl-on-sx.md`:
|
||||
|
||||
- **Phase 1** — direct grants. Schema, EDB facts, engine, api, 15+ tests
|
||||
- **Phase 2** — inheritance (member_of, child_of, role expansion, deny-overrides)
|
||||
- **Phase 3** — explanation + audit (proof tree, audit log)
|
||||
- **Phase 4** — federation (peer trust, delegation, cross-instance permit chain)
|
||||
|
||||
Within a phase, pick the checkbox that unlocks the most tests per effort.
|
||||
|
||||
Every iteration: implement → test → commit → tick `[ ]` → Progress log → next.
|
||||
|
||||
## Ground rules (hard)
|
||||
|
||||
- **Scope:** only `lib/acl/**` and `plans/acl-on-sx.md`. Do **not** edit `spec/`,
|
||||
`hosts/`, `shared/`, other `lib/<lang>/` dirs, `lib/stdlib.sx`, or `lib/` root.
|
||||
May **import** from `lib/datalog/` only (its public API).
|
||||
- **NEVER call `sx_build`.** 600s watchdog. If sx_server binary broken → Blockers
|
||||
entry, stop.
|
||||
- **Shared-file issues** → plan's Blockers with minimal repro.
|
||||
- **SX files:** `sx-tree` MCP tools ONLY. `sx_validate` after edits.
|
||||
- **Worktree:** commit, then push to `origin/loops/acl`. Never touch `main` or
|
||||
`architecture`.
|
||||
- **Commit granularity:** one feature per commit. Short factual messages
|
||||
(`acl: child_of resource inheritance + 8 tests`).
|
||||
- **Plan file:** update Progress log + tick boxes every commit.
|
||||
- **Watch for shared infrastructure** with future mod-sx (Prolog moderation). If you
|
||||
build a generic rule-engine adapter, note it in Progress log so the eventual
|
||||
`lib/guest/rules/` extraction has both consumers identified.
|
||||
|
||||
## ACL-specific gotchas
|
||||
|
||||
- **Datalog is bottom-up.** No goal-directed search. Don't reach for cut or
|
||||
backtracking — that's mod-sx's job. Your decisions emerge from fixpoint.
|
||||
- **Deny-overrides** is the policy: if both an allow and deny rule fire, deny wins.
|
||||
Encode this via stratified negation; document the choice clearly in plan.
|
||||
- **Inheritance termination:** recursive rules with `member_of` chains must
|
||||
terminate. Datalog guarantees this absent function symbols — don't introduce them
|
||||
in your schema.
|
||||
- **Proof tree shape:** Datalog's derivation graph is a DAG, not a tree, when the
|
||||
same fact is derived multiple ways. For audit, pick one canonical derivation
|
||||
(shortest, or first); document choice.
|
||||
- **Federation isn't transitive trust.** A peer's `delegate(...)` fact only applies
|
||||
if local `trust(peer, level)` covers the action class. Re-check trust on every
|
||||
query, not at fact-ingestion time.
|
||||
|
||||
## General gotchas (all loops)
|
||||
|
||||
- SX `do` = R7RS iteration. Use `begin` for multi-expr sequences.
|
||||
- `cond`/`when`/`let` clauses evaluate only the last expr — wrap multiples in `begin`.
|
||||
- `env-bind!` creates a binding; `env-set!` mutates an existing one (walks scope chain).
|
||||
- `sx_validate` after every structural edit.
|
||||
- `list?` returns false on raw JS Arrays — host data must be SX-converted.
|
||||
|
||||
## Style
|
||||
|
||||
- No comments in `.sx` unless non-obvious.
|
||||
- No new planning docs — update `plans/acl-on-sx.md` inline.
|
||||
- Short, factual commit messages.
|
||||
- One feature per iteration. Commit. Log. Push. Next.
|
||||
|
||||
Go. Start by reading the plan; find the first unchecked `[ ]`; implement it.
|
||||
99
plans/agent-briefings/feed-loop.md
Normal file
99
plans/agent-briefings/feed-loop.md
Normal file
@@ -0,0 +1,99 @@
|
||||
# feed-on-sx loop agent (single agent, queue-driven)
|
||||
|
||||
Role: iterates `plans/feed-on-sx.md` forever. **Activity feeds on APL** — timelines,
|
||||
notifications, fanout, ranking, all as APL array math on activity vectors. Densest
|
||||
possible expression of feed composition. Sits on `lib/apl/` (450+/450+ tests
|
||||
already); adds a feed-shaped vocabulary on top.
|
||||
|
||||
```
|
||||
description: feed-on-sx queue loop
|
||||
subagent_type: general-purpose
|
||||
run_in_background: true
|
||||
isolation: worktree
|
||||
```
|
||||
|
||||
## Prompt
|
||||
|
||||
You are the sole background agent working `/root/rose-ash/plans/feed-on-sx.md`.
|
||||
Isolated worktree, forever, one commit per feature. Push to `origin/loops/feed`
|
||||
after every commit.
|
||||
|
||||
## Restart baseline — check before iterating
|
||||
|
||||
1. Read `plans/feed-on-sx.md` — roadmap + Progress log.
|
||||
2. `ls lib/feed/` — pick up from the most advanced file.
|
||||
3. If `lib/feed/tests/*.sx` exist, run them via `bash lib/feed/conformance.sh`. Green
|
||||
before new work.
|
||||
4. If `lib/feed/scoreboard.md` exists, that's your baseline.
|
||||
5. Read `lib/apl/apl.sx` public API once — that's your substrate. Familiarize
|
||||
yourself with at least: `⍳ ⍴ / ⌽ ↑ ↓ ⌷ ∊ ∘.× /\ ⍋` (you will use all of these).
|
||||
|
||||
## The queue
|
||||
|
||||
Phase order per `plans/feed-on-sx.md`:
|
||||
|
||||
- **Phase 1** — stream model + basic ops (record schema, filter, sort, take)
|
||||
- **Phase 2** — **THE SHOWCASE**: fanout via outer product. activities `∘.×`
|
||||
followers → inbox matrix, flatten + dedupe
|
||||
- **Phase 3** — aggregation + ranking (group-by, velocity, recency, top-N)
|
||||
- **Phase 4** — visibility filter (acl-sx) + federation (fed-sx inbox + backfill)
|
||||
|
||||
Within a phase, pick the checkbox that unlocks the most tests per effort.
|
||||
|
||||
Every iteration: implement → test → commit → tick `[ ]` → Progress log → next.
|
||||
|
||||
## Ground rules (hard)
|
||||
|
||||
- **Scope:** only `lib/feed/**` and `plans/feed-on-sx.md`. Do **not** edit `spec/`,
|
||||
`hosts/`, `shared/`, other `lib/<lang>/` dirs, `lib/stdlib.sx`, or `lib/` root.
|
||||
May **import** from `lib/apl/` only (its public API).
|
||||
- **NEVER call `sx_build`.** 600s watchdog. If sx_server binary broken → Blockers
|
||||
entry, stop.
|
||||
- **Shared-file issues** → plan's Blockers with minimal repro.
|
||||
- **SX files:** `sx-tree` MCP tools ONLY. `sx_validate` after edits.
|
||||
- **Unicode in `.sx`:** raw UTF-8 only, never `\uXXXX` escapes. APL glyphs land
|
||||
directly in source.
|
||||
- **Worktree:** commit, then push to `origin/loops/feed`. Never touch `main` or
|
||||
`architecture`.
|
||||
- **Commit granularity:** one feature per commit. Short factual messages
|
||||
(`feed: outer-product fanout + dedupe by (actor,verb,object) + 9 tests`).
|
||||
- **Plan file:** update Progress log + tick boxes every commit.
|
||||
|
||||
## feed-specific gotchas
|
||||
|
||||
- **Activities are heterogeneous.** Different verbs carry different shapes
|
||||
(`:object` might be page-id, post-id, user-id). Don't over-normalize — keep
|
||||
`:tags` as a flexible bag. APL operations over heterogeneous records work fine
|
||||
via dict lookups; only the indexed fields need uniform shape.
|
||||
- **Fanout produces matrices fast.** N activities × M followers → NM items. Apply
|
||||
filter/dedupe early, not after materialization. Use guard predicates *inside*
|
||||
the outer product where possible (compose with `∘.{a v ⊢ ...}`).
|
||||
- **Dedupe key isn't always `(actor,verb,object)`.** For "alice liked X" and "bob
|
||||
liked X" the dedupe key is `(verb,object)` (collapse the actors into a list).
|
||||
For "alice posted X" each `:actor` is distinct. Each verb may want its own
|
||||
dedupe rule; codify these in `lib/feed/dedupe.sx`.
|
||||
- **Recency decay matters more than score precision.** Use a simple half-life decay
|
||||
(e.g. score × 0.5^(age/window)) rather than a clever curve. Calibrate the
|
||||
window via tests, not theory.
|
||||
- **Ranking should be deterministic on ties.** Always include a tiebreaker (id, or
|
||||
hash). Otherwise tests will flake.
|
||||
- **The ACL filter is per-viewer.** A timeline is computed *for* a user; the same
|
||||
candidate stream produces different timelines for different viewers. Don't
|
||||
cache pre-ACL timelines.
|
||||
|
||||
## General gotchas (all loops)
|
||||
|
||||
- SX `do` = R7RS iteration. Use `begin` for multi-expr sequences.
|
||||
- `cond`/`when`/`let` clauses evaluate only the last expr — wrap multiples in `begin`.
|
||||
- `env-bind!` creates a binding; `env-set!` mutates an existing one (walks scope chain).
|
||||
- `sx_validate` after every structural edit.
|
||||
- `list?` returns false on raw JS Arrays — host data must be SX-converted.
|
||||
|
||||
## Style
|
||||
|
||||
- No comments in `.sx` unless non-obvious.
|
||||
- No new planning docs — update `plans/feed-on-sx.md` inline.
|
||||
- Short, factual commit messages.
|
||||
- One feature per iteration. Commit. Log. Push. Next.
|
||||
|
||||
Go. Start by reading the plan; find the first unchecked `[ ]`; implement it.
|
||||
98
plans/agent-briefings/flow-loop.md
Normal file
98
plans/agent-briefings/flow-loop.md
Normal file
@@ -0,0 +1,98 @@
|
||||
# flow-on-sx loop agent (single agent, queue-driven)
|
||||
|
||||
Role: iterates `plans/flow-on-sx.md` forever. **Durable workflows on Scheme** — the
|
||||
call/cc + delimited continuation showcase that justifies pulling R7RS into
|
||||
production. art-dag's natural successor: DAG-of-tasks with pause/resume across
|
||||
process restarts. fed-sx extension turns local flows into distributed ones.
|
||||
|
||||
```
|
||||
description: flow-on-sx queue loop
|
||||
subagent_type: general-purpose
|
||||
run_in_background: true
|
||||
isolation: worktree
|
||||
```
|
||||
|
||||
## Prompt
|
||||
|
||||
You are the sole background agent working `/root/rose-ash/plans/flow-on-sx.md`.
|
||||
Isolated worktree, forever, one commit per feature. Push to `origin/loops/flow`
|
||||
after every commit.
|
||||
|
||||
## Restart baseline — check before iterating
|
||||
|
||||
1. Read `plans/flow-on-sx.md` — roadmap + Progress log.
|
||||
2. `ls lib/flow/` — pick up from the most advanced file.
|
||||
3. If `lib/flow/tests/*.sx` exist, run them via `bash lib/flow/conformance.sh`. Green
|
||||
before new work.
|
||||
4. If `lib/flow/scoreboard.md` exists, that's your baseline.
|
||||
5. Read `lib/scheme/scheme.sx` public API once — that's your substrate.
|
||||
|
||||
## The queue
|
||||
|
||||
Phase order per `plans/flow-on-sx.md`:
|
||||
|
||||
- **Phase 1** — declarative DAG: `defflow`, `sequence`, `parallel`, sync runtime,
|
||||
basic api
|
||||
- **Phase 2** — control flow + error handling: `cond`, `retry`, `timeout`,
|
||||
`try-catch`
|
||||
- **Phase 3** — **THE SHOWCASE**: `suspend`/`resume` via `call/cc`, persistent
|
||||
store, crash recovery
|
||||
- **Phase 4** — distributed nodes via fed-sx (remote-node, handoff, replication)
|
||||
|
||||
Within a phase, pick the checkbox that unlocks the most tests per effort.
|
||||
|
||||
Every iteration: implement → test → commit → tick `[ ]` → Progress log → next.
|
||||
|
||||
## Ground rules (hard)
|
||||
|
||||
- **Scope:** only `lib/flow/**` and `plans/flow-on-sx.md`. Do **not** edit `spec/`,
|
||||
`hosts/`, `shared/`, other `lib/<lang>/` dirs, `lib/stdlib.sx`, or `lib/` root.
|
||||
May **import** from `lib/scheme/` only (its public API).
|
||||
- **NEVER call `sx_build`.** 600s watchdog. If sx_server binary broken → Blockers
|
||||
entry, stop.
|
||||
- **Shared-file issues** → plan's Blockers with minimal repro.
|
||||
- **SX files:** `sx-tree` MCP tools ONLY. `sx_validate` after edits.
|
||||
- **Worktree:** commit, then push to `origin/loops/flow`. Never touch `main` or
|
||||
`architecture`.
|
||||
- **Commit granularity:** one feature per commit. Short factual messages
|
||||
(`flow: retry combinator with exponential backoff + 6 tests`).
|
||||
- **Plan file:** update Progress log + tick boxes every commit.
|
||||
|
||||
## flow-specific gotchas
|
||||
|
||||
- **Continuations must be re-entrant.** Phase 3's `suspend` captures a continuation
|
||||
that may be re-entered after a process restart. That means: no captured file
|
||||
descriptors, no captured sockets, no captured live runtime references that won't
|
||||
survive serialization. State referenced by the continuation must be plain SX data
|
||||
or live in the flow store.
|
||||
- **call/cc, not call-with-escape-continuation.** R7RS distinguishes. Use the full
|
||||
call/cc for resume; escape-only continuations cannot be re-entered. Read
|
||||
`lib/scheme/r7rs.md` (or equivalent) to confirm semantics.
|
||||
- **`parallel` in Phase 1 is sequential.** Don't try threading until Phase 3+. Just
|
||||
evaluate branches in order, collect results, return joined value. Document the
|
||||
semantics clearly so users don't assume true concurrency.
|
||||
- **Retry doesn't retry continuations.** If a node has already suspended, retry on
|
||||
resume doesn't re-run it from scratch — it resumes. `retry` only applies to
|
||||
exceptions raised before suspend. Be explicit in the API.
|
||||
- **Cancellation invalidates the continuation.** `(flow/cancel id)` must remove the
|
||||
stored continuation so a stale `resume` cannot wake it. Document semantics.
|
||||
- **Timeouts in pure SX are tricky.** Without a scheduler, `timeout` is a budget on
|
||||
step count or wall-clock probed at safe points. Pick one approach (probably step
|
||||
budget for determinism) and document.
|
||||
|
||||
## General gotchas (all loops)
|
||||
|
||||
- SX `do` = R7RS iteration. Use `begin` for multi-expr sequences.
|
||||
- `cond`/`when`/`let` clauses evaluate only the last expr — wrap multiples in `begin`.
|
||||
- `env-bind!` creates a binding; `env-set!` mutates an existing one (walks scope chain).
|
||||
- `sx_validate` after every structural edit.
|
||||
- `list?` returns false on raw JS Arrays — host data must be SX-converted.
|
||||
|
||||
## Style
|
||||
|
||||
- No comments in `.sx` unless non-obvious.
|
||||
- No new planning docs — update `plans/flow-on-sx.md` inline.
|
||||
- Short, factual commit messages.
|
||||
- One feature per iteration. Commit. Log. Push. Next.
|
||||
|
||||
Go. Start by reading the plan; find the first unchecked `[ ]`; implement it.
|
||||
106
plans/agent-briefings/kernel-loop.md
Normal file
106
plans/agent-briefings/kernel-loop.md
Normal file
@@ -0,0 +1,106 @@
|
||||
# kernel-on-sx loop agent (single agent, queue-driven)
|
||||
|
||||
Role: iterates `plans/kernel-on-sx.md` forever. **First chisel of the Phase B stratification work** — natural successor to env-as-value, validates SX's reflection story (first-class environments, evaluators, operatives). Goal isn't just "implement Kernel"; it's *also* to surface common patterns into `lib/guest/` (specifically motivating a future `lib/guest/reflective/` sub-layer). One feature per commit.
|
||||
|
||||
```
|
||||
description: kernel-on-sx queue loop
|
||||
subagent_type: general-purpose
|
||||
run_in_background: true
|
||||
isolation: worktree
|
||||
```
|
||||
|
||||
## DO NOT START WITHOUT THE PREREQUISITES
|
||||
|
||||
This loop **must not** start until the lib-guest core kits are in place. Kernel's parser consumes `lib/guest/core/lex.sx` and `lib/guest/core/pratt.sx` (s-expression-shaped, minimal demand); its evaluator's pattern dispatch consumes `lib/guest/core/match.sx`.
|
||||
|
||||
**Pre-flight check:**
|
||||
```
|
||||
ls /root/rose-ash/lib/guest/lex.sx /root/rose-ash/lib/guest/pratt.sx \
|
||||
/root/rose-ash/lib/guest/match.sx /root/rose-ash/lib/guest/ast.sx
|
||||
```
|
||||
If any of those `lib/guest/*.sx` files are missing, **stop and report**. Do not start.
|
||||
|
||||
## Prompt
|
||||
|
||||
You are the sole background agent working `/root/rose-ash/plans/kernel-on-sx.md`. You run in an isolated git worktree on branch `loops/kernel`. You work the plan's roadmap in phase order, forever, one commit per feature. Push to `origin/loops/kernel` after every commit.
|
||||
|
||||
## Restart baseline — check before iterating
|
||||
|
||||
1. Read `plans/kernel-on-sx.md` — Roadmap + Progress log + Blockers tell you where you are.
|
||||
2. Run the pre-flight check above. If any lib/guest kit is missing, stop immediately and update the plan's Blockers section.
|
||||
3. `ls lib/kernel/` — pick up from the most advanced file that exists. If the directory does not exist, you are at Phase 1.
|
||||
4. If `lib/kernel/tests/*.sx` exist, run them via the epoch protocol against `sx_server.exe`. They must be green before new work.
|
||||
|
||||
## The queue
|
||||
|
||||
Phase order per `plans/kernel-on-sx.md`:
|
||||
|
||||
- **Phase 1** — Parser (s-expression reader, minimal — consumes `lib/guest/lex` + `lib/guest/pratt`)
|
||||
- **Phase 2** — Core evaluator with first-class environments
|
||||
- **Phase 3** — `$vau` / `$lambda` / `wrap` / `unwrap` (the operative–applicative distinction)
|
||||
- **Phase 4** — Standard environment construction
|
||||
- **Phase 5** — Encapsulations (Kernel's opaque-type idiom)
|
||||
- **Phase 6** — Hygienic operatives (Shutt's later work — operatives that don't capture)
|
||||
- **Phase 7** — Propose `lib/guest/reflective/` (extraction phase — see chiselling discipline)
|
||||
|
||||
Within a phase, pick the checkbox with the best tests-per-effort ratio.
|
||||
|
||||
Every iteration: implement → test → commit → tick `[ ]` in plan → append Progress log → push → next.
|
||||
|
||||
## Lib/guest chiselling discipline (the defining feature of this loop)
|
||||
|
||||
You are not just implementing Kernel — you are *chiselling* the substrate to surface what `lib/guest/reflective/` should contain. Every commit must end with a one-line **"chisel note"** appended to the plan's Progress log entry, in this format:
|
||||
|
||||
```
|
||||
chisel: <one of: consumes-X | shapes-reflective | proposes-Y | nothing>
|
||||
```
|
||||
|
||||
- `consumes-X` — this commit used an existing `lib/guest/X` kit (e.g., `consumes-pratt`, `consumes-match`).
|
||||
- `shapes-reflective` — this commit revealed something about what `lib/guest/reflective/` should look like (e.g., env-reification helper signatures, applicative-vs-operative dispatch protocol). Add a paragraph to the plan's "lib/guest feedback loop" section describing the insight.
|
||||
- `proposes-Y` — this commit revealed a gap in another existing kit (e.g., `match.sx` doesn't quite handle X). Open a Blockers entry describing the gap.
|
||||
- `nothing` — pure Kernel work that didn't touch the substrate or lib/guest story (rare; if you write this twice in a row, stop and reflect on why).
|
||||
|
||||
**Phase 7 (extraction)** is **gated** by the two-consumer rule. Kernel alone is one consumer. The natural second consumer is a future MetaScheme port, a Common-Lisp meta-evaluator port, or a Kernel dialect (cKanren-style). **Until a second consumer exists, do NOT actually extract** — instead, mark Phase 7 `[partial — pending second consumer]` and document the proposed `lib/guest/reflective/` API surface in the plan's progress log. The extraction itself happens later, when a second consumer materialises.
|
||||
|
||||
This discipline is the point of the loop, not a bookkeeping tax. The chisel notes are what tell us — at the end of Kernel's run — whether a `lib/guest/reflective/` sub-layer is real or just one-language-shaped.
|
||||
|
||||
## Ground rules (hard)
|
||||
|
||||
- **Scope:** only `lib/kernel/**` and `plans/kernel-on-sx.md`. Do **not** edit `spec/`, `hosts/`, `shared/`, `lib/guest/**` (read-only consumer at this phase), or other `lib/<lang>/`.
|
||||
- **Consume `lib/guest/core/`** wherever it covers a need. Hand-rolling defeats the chiselling goal.
|
||||
- **Do not extract into `lib/guest/reflective/` from this loop.** That's Phase 7 territory, gated by the two-consumer rule. Until there's a second consumer, document the API surface only.
|
||||
- **Substrate gaps** (env-as-value not exposing X, `eval` semantics drift, JIT not handling reflective patterns) → Blockers entry with minimal repro. Do **not** fix substrate from this loop. Substrate work belongs to `sx-improvements.md` / `jit-perf-regression.md`.
|
||||
- **NEVER call `sx_build`.** 600s watchdog will kill you. If `sx_server.exe` is broken, add a Blockers entry and stop.
|
||||
- **SX files:** `sx-tree` MCP tools ONLY. `sx_validate` after every edit. Never `Edit`/`Read`/`Write` on `.sx`.
|
||||
- **Worktree:** commit, then push to `origin/loops/kernel`. Never touch `main`. Never push to `architecture`.
|
||||
- **Commit granularity:** one feature per commit. Short factual messages: `kernel: $vau operative + 6 tests`.
|
||||
- **Plan file:** update Progress log + tick boxes every commit. Include the chisel note.
|
||||
- **If blocked** for two iterations on the same issue, add to Blockers and move on.
|
||||
|
||||
## Kernel-specific gotchas
|
||||
|
||||
- **Operatives don't evaluate their arguments.** `$vau` builds an operative; the body sees the *unevaluated* argument expressions plus the dynamic environment. This is the opposite of every other guest in the set. `(define-via-vau)` builds a binding by calling `eval` inside the body on the (still-syntax) argument.
|
||||
- **Applicatives wrap operatives.** `(wrap op)` produces an applicative that evaluates its args first, then calls `op` with the values. `$lambda` is sugar for `wrap` ∘ `$vau`.
|
||||
- **Dynamic vs static environments.** Operative body sees both: the static env where the `$vau` was created (closure-style), AND the dynamic env where the call happens (passed as the env-param). Different from lexical-only languages.
|
||||
- **No special forms in the evaluator.** `$if`, `$define!`, `$lambda` are all just operatives bound in the standard environment. The evaluator is `lookup-and-call` — no hardcoded switch on symbols. This is the whole point: the language is reified as data.
|
||||
- **`eval` is a primitive callable on user environments.** This is where SX's env-as-value matters most. If env-as-value isn't fully landed in the substrate, this is where it'll break.
|
||||
- **Encapsulations (Phase 5) are Kernel's opaque-types idiom.** `make-encapsulation-type` returns three operatives: encapsulator (constructs), predicate (tests), decapsulator (extracts). Used to define promises, streams, modules.
|
||||
- **Hygienic operatives (Phase 6) are research-grade.** Shutt's later work. Operatives that don't accidentally capture caller bindings. Likely uses scope sets / frame stamps. Treat as exploration, not implementation-deadline.
|
||||
|
||||
## General gotchas (all loops)
|
||||
|
||||
- SX `do` = R7RS iteration. Use `begin` for multi-expr sequences.
|
||||
- `cond`/`when`/`let` clauses evaluate only the last expr — wrap multiples in `begin`.
|
||||
- `env-bind!` creates a binding; `env-set!` mutates an existing one (walks scope chain).
|
||||
- `sx_validate` after every structural edit.
|
||||
- `list?` returns false on raw JS Arrays — host data must be SX-converted.
|
||||
- Shell heredoc `||` gets eaten — escape or use `case`.
|
||||
|
||||
## Style
|
||||
|
||||
- No comments in `.sx` unless non-obvious.
|
||||
- No new planning docs — update `plans/kernel-on-sx.md` inline.
|
||||
- Short, factual commit messages with chisel note: `kernel: $vau operative + 6 tests [shapes-reflective]`.
|
||||
- One feature per iteration. Commit. Log. Push. Next.
|
||||
|
||||
Go. Run the pre-flight check. If lib/guest kits are missing, stop. Otherwise read the plan, find the first unchecked `[ ]`, implement it. Remember: every commit ends with a chisel note, and Phase 7 extraction waits for a second consumer.
|
||||
105
plans/feed-on-sx.md
Normal file
105
plans/feed-on-sx.md
Normal file
@@ -0,0 +1,105 @@
|
||||
# feed-on-sx: Activity Feeds on APL
|
||||
|
||||
Timelines, notifications, activity aggregation. The math is array math: filter, sort,
|
||||
reduce, scan, outer product. APL is the densest possible expression of feed
|
||||
composition — a fanout-and-rank pipeline reads as a single line.
|
||||
|
||||
rose-ash needs: per-user home timeline, notification feed, activity stream digestion,
|
||||
backfill for new follows, deduplication across cross-posts. Every operation is an
|
||||
array-shaped transformation.
|
||||
|
||||
End-state: an APL-flavored layer on `lib/apl/` with feed-specific combinators
|
||||
(`fanout`, `dedupe`, `score`, `rank`), an SX adapter for callers who don't want raw
|
||||
APL, ACL visibility filtering via `lib/acl/`, federation via fed-sx.
|
||||
|
||||
## Status (rolling)
|
||||
|
||||
`bash lib/feed/conformance.sh` → **0/0** (not yet started)
|
||||
|
||||
## Ground rules
|
||||
|
||||
- **Scope:** only touch `lib/feed/**` and `plans/feed-on-sx.md`. Do **not** edit
|
||||
`spec/`, `hosts/`, `shared/`, `lib/apl/**`, or other `lib/<lang>/`. You may
|
||||
**import** from `lib/apl/` (public API in `lib/apl/apl.sx`); do **not** modify APL.
|
||||
- **Shared-file issues** go under "Blockers" with a minimal repro; do not fix here.
|
||||
- **SX files:** use `sx-tree` MCP tools only.
|
||||
- **Architecture:** an activity is a small dict (`{:actor :verb :object :at :tags}`); a
|
||||
stream is an APL vector of such dicts. Operations are APL primitives lifted onto
|
||||
this shape. SX adapter exposes ergonomic API to non-APL callers.
|
||||
- **Unicode:** raw UTF-8 in `.sx` files. APL glyphs land directly.
|
||||
- **Commits:** one feature per commit. Keep Progress log updated and tick boxes.
|
||||
|
||||
## Architecture sketch
|
||||
|
||||
```
|
||||
Raw activities (any shape) Per-user view
|
||||
│ ▲
|
||||
▼ │
|
||||
lib/feed/normalize.sx lib/feed/timeline.sx
|
||||
— {:actor :verb :object — (timeline user)
|
||||
:at :tags} record — applies filter ∘ rank ∘ take
|
||||
│ ▲
|
||||
▼ │
|
||||
lib/feed/stream.sx lib/feed/rank.sx
|
||||
— APL vector of activities — velocity, recency
|
||||
— filter, sort, take — TF-IDF-ish over :tags
|
||||
│ ▲
|
||||
▼ │
|
||||
lib/feed/fanout.sx lib/feed/dedupe.sx
|
||||
— followers vector — group by :object
|
||||
— activities ∘.× followers — collapse cross-posts
|
||||
— flatten + dedupe
|
||||
│
|
||||
▼
|
||||
lib/feed/api.sx lib/feed/fed.sx
|
||||
— (feed/post activity) — inbox via fed-sx
|
||||
— (feed/timeline user) — backfill on subscribe
|
||||
— (feed/notify user)
|
||||
```
|
||||
|
||||
## Phase 1 — Stream model + basic ops
|
||||
|
||||
- [ ] `lib/feed/normalize.sx` — activity record schema; coerce arbitrary inputs
|
||||
- [ ] `lib/feed/stream.sx` — APL vector representation; filter by predicate; sort by
|
||||
`:at`; take N (`↑`); reverse (`⌽`)
|
||||
- [ ] `lib/feed/api.sx` — `(feed/post activity)`, `(feed/all)`
|
||||
- [ ] `lib/feed/tests/basic.sx` — 15+ cases: post, query, filter, sort
|
||||
- [ ] `lib/feed/scoreboard.{json,md}`
|
||||
- [ ] `lib/feed/conformance.sh`
|
||||
|
||||
## Phase 2 — Fanout via outer product
|
||||
|
||||
- [ ] follower graph: `followers user → vector of user ids`
|
||||
- [ ] fanout: activities `∘.×` followers → matrix `(activity, follower)` pairs
|
||||
- [ ] flatten to inbox events vector
|
||||
- [ ] dedupe — group by `(actor, verb, object)` collapse to one inbox event per
|
||||
receiver
|
||||
- [ ] `lib/feed/tests/fanout.sx` — 20+ cases: small graph, mutual follow, popular
|
||||
actor (high-fanout), cross-post dedupe
|
||||
|
||||
## Phase 3 — Aggregation + ranking
|
||||
|
||||
- [ ] group-by — `(actor, day) → count` via key-reduce
|
||||
- [ ] velocity score — recent activity count over window
|
||||
- [ ] recency score — decay by age
|
||||
- [ ] composite rank — weighted sum of components
|
||||
- [ ] top-N per timeline
|
||||
- [ ] `lib/feed/tests/rank.sx` — 20+ cases: ranking stable on tie, decay shape,
|
||||
per-user weighting
|
||||
|
||||
## Phase 4 — Visibility filter + federation
|
||||
|
||||
- [ ] ACL filter — each candidate activity passed through `(acl/permit? viewer :read
|
||||
activity)`
|
||||
- [ ] fed-sx outbound — local `feed/post` fans out to remote followers' inboxes
|
||||
- [ ] fed-sx inbound — peer activities arrive at local inbox
|
||||
- [ ] backfill on subscribe — request peer history, merge into local stream
|
||||
- [ ] `lib/feed/tests/integration.sx` — federated timeline with ACL applied
|
||||
|
||||
## Progress log
|
||||
|
||||
(loop fills this in)
|
||||
|
||||
## Blockers
|
||||
|
||||
(loop fills this in)
|
||||
228
plans/flow-on-sx.md
Normal file
228
plans/flow-on-sx.md
Normal file
@@ -0,0 +1,228 @@
|
||||
# flow-on-sx: Durable DAG Workflows on Scheme
|
||||
|
||||
rose-ash needs workflows that survive restarts: content pipelines (write → review →
|
||||
publish → federate), scheduled jobs (digest emails), multi-step user flows (signup,
|
||||
confirm, onboard). art-dag is the precedent — DAG-of-tasks with pause/resume at IO
|
||||
boundaries.
|
||||
|
||||
Scheme's `call/cc` + delimited continuations make pause/resume natural: a `suspend`
|
||||
captures the continuation, serializes it as part of the flow record, and `resume`
|
||||
re-enters at exactly that point. No state-machine bookkeeping by hand. R7RS-small is
|
||||
already at 2644/2644 (see kernel/architecture status).
|
||||
|
||||
End-state: a Scheme-on-SX layer over the existing scheme runtime, with combinators
|
||||
for sequence/parallel/branch/retry/timeout/suspend, persistent flow store, and a
|
||||
federation extension via fed-sx for remote-node execution.
|
||||
|
||||
## Status (rolling)
|
||||
|
||||
`bash lib/flow/conformance.sh` → **166/166** (Phases 1-8 complete; host ABI + reference driver)
|
||||
|
||||
## Ground rules
|
||||
|
||||
- **Scope:** only touch `lib/flow/**` and `plans/flow-on-sx.md`. Do **not** edit
|
||||
`spec/`, `hosts/`, `shared/`, `lib/scheme/**`, or other `lib/<lang>/`. You may
|
||||
**import** from `lib/scheme/` (public API via `lib/scheme/scheme.sx`); do **not**
|
||||
modify Scheme.
|
||||
- **Shared-file issues** go under "Blockers" with a minimal repro; do not fix here.
|
||||
- **SX files:** use `sx-tree` MCP tools only.
|
||||
- **Architecture:** flow combinators are Scheme macros + procedures. Runtime is a
|
||||
driver loop that walks the flow graph and invokes `call/cc` at `suspend` points.
|
||||
Persistence layer serializes the continuation + open file/socket placeholders are
|
||||
forbidden (continuations must be resumable across process restart).
|
||||
- **art-dag awareness:** read `plans/art-dag*` if it exists for design lineage; do not
|
||||
import code.
|
||||
- **Commits:** one feature per commit. Keep Progress log updated and tick boxes.
|
||||
|
||||
## Architecture sketch
|
||||
|
||||
```
|
||||
(defflow publish
|
||||
(sequence
|
||||
(write-content)
|
||||
(parallel
|
||||
(review)
|
||||
(spell-check))
|
||||
(cond approved?
|
||||
(sequence (publish) (federate))
|
||||
(notify-author))))
|
||||
│
|
||||
▼
|
||||
lib/flow/spec.sx lib/flow/runtime.sx lib/flow/store.sx
|
||||
— defflow — driver loop — append-only flow log
|
||||
— sequence/parallel — node dispatch — checkpoint serialize
|
||||
— cond/retry/timeout — call/cc at suspend — restart loader
|
||||
— suspend/resume │ │
|
||||
▼ ▼
|
||||
lib/flow/api.sx lib/flow/remote.sx
|
||||
— (flow/start name args) — fed-sx adapter
|
||||
— (flow/resume id value) — node-on-peer execution
|
||||
— (flow/cancel id) — failure handling
|
||||
```
|
||||
|
||||
## Phase 1 — Declarative DAG + sequential execution
|
||||
|
||||
- [x] `lib/flow/spec.sx` — `defflow` macro, `sequence` combinator
|
||||
- [x] node = Scheme procedure of one arg (upstream value threaded in); output
|
||||
threads to next node (data flow). A node ignoring its arg is a thunk.
|
||||
- [x] `parallel` combinator (sequential semantics for now — TRUE parallelism in Phase 3)
|
||||
- [x] runtime executes a flow synchronously, returns final value
|
||||
- [x] `lib/flow/api.sx` — `(flow/start flow input)` entry point
|
||||
- [x] `lib/flow/tests/basic.sx` — 18 cases: single nodes, linear/nested sequence,
|
||||
data flow between nodes, parallel-with-join, publish-shaped flow
|
||||
- [x] `lib/flow/scoreboard.{json,md}`
|
||||
- [x] `lib/flow/conformance.sh`
|
||||
|
||||
## Phase 2 — Control flow + error handling
|
||||
|
||||
- [x] `cond` combinator — predicate selects branch (named `branch`; `cond` is a
|
||||
Scheme special form). `(branch pred then else)` — 6 tests.
|
||||
- [x] `retry n` — re-runs node up to n attempts on a raised exception; last
|
||||
exception propagates. Only raised exceptions are retried — `(fail ...)` values
|
||||
pass through. 6 tests. (Backoff deferred: no wall clock in pure SX.)
|
||||
- [x] `timeout budget` — bounds node execution via a **cooperative step budget**
|
||||
(deterministic; no scheduler/clock in pure SX). Nodes opt in via `(tick)`;
|
||||
`budget` ticks allowed, the next raises `flow-timeout`. Non-ticking nodes are
|
||||
unbounded; budgets nest. 7 tests.
|
||||
- [x] `try-catch` — exception handler with reified error: `(try-catch node handler)`
|
||||
runs node; on raise, calls `(handler error)` and returns its value. 6 tests.
|
||||
- [x] error model — exceptions vs explicit `(fail reason)` results: `fail`/`failed?`/
|
||||
`fail-reason` produce/inspect failure values that flow downstream as data
|
||||
(distinct from raised exceptions caught by retry/try-catch). 6 tests.
|
||||
- [x] `lib/flow/tests/control.sx` — 31 cases: branch, error model, try-catch,
|
||||
retry, timeout + compositions
|
||||
|
||||
## Phase 3 — Suspend / resume (the showcase)
|
||||
|
||||
- [x] `(suspend tag)` — guest call/cc is ESCAPE-ONLY (re-entry hangs), so resume
|
||||
uses **deterministic replay**: suspend escapes to the driver as `(flow-suspended
|
||||
tag)`; resume re-runs the flow, replaying resolved suspends from a `(tag value)`
|
||||
log. No live continuation is ever serialized — the log is plain data.
|
||||
- [x] `lib/flow/store.sx` — flow store: id→record `(flow input log status payload)`;
|
||||
`flow-drive` runs a flow against a replay log.
|
||||
- [x] `(flow/resume id value)` — append `(tag value)` to the log, re-drive; raw
|
||||
result on completion, `(flow-suspended id tag)` on a further suspend.
|
||||
- [x] `(flow/cancel id)` — mark cancelled; a later resume is rejected (stale replay
|
||||
cannot wake a cancelled flow).
|
||||
- [x] crash recovery — `flow-store-export` (procs nulled → plain data),
|
||||
`flow-store-import!`, `flow-resumable-ids`. Records are name-keyed; resume
|
||||
re-resolves the proc by name (defflow registers names), so a flow survives a
|
||||
wiped store. `tests/recovery.sx`, 8 cases (export/wipe/import, resumable scan,
|
||||
restart-at-every-step, replay-log survival).
|
||||
- [x] `lib/flow/tests/suspend.sx` — 17 cases: start/resume/cancel, multi-step,
|
||||
replay determinism, lifecycle guards, suspend-in-branch
|
||||
- Harness: `flow-run` now reuses one env with a per-test reset (building the full
|
||||
standard env 66× was too slow) — see `api.sx`.
|
||||
|
||||
## Phase 4 — Distributed nodes via fed-sx
|
||||
|
||||
- [x] `(remote-node addr fn)` — execute a node on a federation peer. Transport is
|
||||
the fed-sx boundary, MOCKED via a peer registry (`flow-peer-register!`); raises
|
||||
`flow-remote-unreachable` / `flow-remote-no-fn`. Composes with sequence, suspend,
|
||||
retry. `tests/distributed.sx`, 7 cases.
|
||||
- [x] failure semantics — `(remote-failover addrs fn local)` tries each peer in
|
||||
order, moves to the next on any raised error, and runs the `local` node if every
|
||||
peer fails. 6 tests.
|
||||
- [x] persistence across instances — `(flow-replicate-to addr)` copies this
|
||||
instance's store (the plain-data export) to a peer's replica slot;
|
||||
`(flow-restore-from addr)` imports it. Same mechanism as crash recovery, across
|
||||
instances.
|
||||
- [x] handoff — a flow started here resumes on a peer after the local instance dies:
|
||||
replicate → wipe local store → restore on peer → `flow/resume`. The replay log
|
||||
(and thus all resolved suspends) survives the move.
|
||||
- [x] `lib/flow/tests/distributed.sx` — 19 cases: remote-node, failover,
|
||||
replication, handoff (including replay-log survival across the move)
|
||||
|
||||
## Phase 5 — Operational API + combinator library
|
||||
|
||||
The four roadmap phases are complete; this phase rounds out the engine into
|
||||
something operators and authors actually use. Accumulation, not a rewrite.
|
||||
|
||||
- [x] introspection API — `flow/status id`, `flow/result id`, `flow/list`,
|
||||
`flow/pending` (operator view of what each suspended flow awaits). 12 tests in
|
||||
`tests/api.sx`.
|
||||
- [x] store hygiene — `flow/gc` drops terminal (done/cancelled) records keeping
|
||||
live suspended flows (returns count); `flow/forget id` drops one terminal record
|
||||
and refuses live flows. Bounds unbounded store growth. 9 tests in `tests/hygiene.sx`.
|
||||
- [x] `tap` — side-effecting pass-through node (logging/metrics) that returns input
|
||||
- [x] `recover` — complement to try-catch for the fail-VALUE channel: run node; if it
|
||||
yields `(fail ...)`, run a recovery node on the reason
|
||||
- [x] `map-flow` — run a flow per item of a list, join results (sequential)
|
||||
- [x] `flow-while` / `flow-until` — bounded iteration: re-run body threading the
|
||||
value while/until pred holds, capped at `max` steps (deterministic bound)
|
||||
- [x] `lib/flow/tests/api.sx` (12) + `lib/flow/tests/combinators.sx` (17)
|
||||
|
||||
## Phase 6 — Railway-oriented composition
|
||||
|
||||
Make the `(fail reason)` value channel compose into real validation/ETL pipelines.
|
||||
|
||||
- [x] `attempt` — like `sequence`, but short-circuits at the first node that returns
|
||||
a `(fail ...)` value, returning that failure (the railway track). Pairs with
|
||||
`recover` for the rejoin.
|
||||
- [x] `lib/flow/tests/railway.sx` — 10 cases: fail short-circuiting, no-run-after-
|
||||
failure, recover rejoin, validation pipeline reporting the failing stage
|
||||
|
||||
## Phase 8 — Host integration ABI (art-dag / human-in-the-loop)
|
||||
|
||||
`suspend` is the seam to the outside world, but a bare tag is an ad-hoc convention.
|
||||
This phase defines a stable request/response contract a host (an art-dag driver, a
|
||||
review UI) codes against — so flow can orchestrate art-dag with human decision
|
||||
points later without reverse-engineering tag shapes. `lib/flow/host.sx`.
|
||||
|
||||
- [x] `(request kind payload)` — suspend with a typed `(flow-request kind payload)`
|
||||
envelope; evaluates to the host's resume value. `await-human`/`await-render`/
|
||||
`await-effect` sugar.
|
||||
- [x] `(flow-host-requests)` — the host work queue: `(id kind payload)` for every
|
||||
suspended flow waiting on a host request; `request?`/`request-kind`/
|
||||
`request-payload` parse a tag.
|
||||
- [x] `(flow-drive-host dispatch)` / `(flow-run-host dispatch maxticks)` — reference
|
||||
host driver: the host supplies only a `(kind payload) -> answer` dispatch fn; the
|
||||
loop drains pending requests and resumes until quiescent (bounded).
|
||||
- [x] `lib/flow/tests/host.sx` — 15 cases incl. the art-dag-shaped driver loop
|
||||
(render → human-review → publish) run both manually and via `flow-run-host`.
|
||||
- Contract (documented in `host.sx` + README): the host owns IO + persistence; a
|
||||
flow never does IO, it only `request`s; the host performs the effect and feeds the
|
||||
result back via resume (logged, so not re-run on recovery). NOT done here (host
|
||||
side, out of `lib/flow` scope): the real Celery/IPFS bridge and a persistent store
|
||||
backend — those live in the art-dag integration, coding against this ABI.
|
||||
|
||||
## Phase 7 — End-to-end integration
|
||||
|
||||
Prove the phases compose: realistic flows exercising attempt + suspend + branch +
|
||||
remote-node + crash-recovery + handoff + introspection together.
|
||||
|
||||
- [x] `lib/flow/tests/integration.sx` — 10 cases: an order-processing flow (validate
|
||||
→ payment suspend → branch → ledger federation) and an onboarding flow, run through
|
||||
the full lifecycle including a simulated crash and a peer handoff mid-flow, plus
|
||||
introspection (`flow/pending`/`status`/`result`) during the flow's life
|
||||
|
||||
## Progress log
|
||||
|
||||
- **Phase 1 (combinators + sequential runtime).** Flow built as a Scheme prelude
|
||||
loaded onto `scheme-standard-env`: a flow is a Scheme procedure `input -> output`,
|
||||
so the whole flow runs inside the interpreter (sets up Phase 3 call/cc suspend).
|
||||
Combinators `flow-node`/`flow-id`/`flow-const`/`sequence`/`parallel`/`defflow` in
|
||||
`spec.sx`; `flow/start` + SX helpers (`flow-make-env`/`flow-run`) in `api.sx`.
|
||||
18/18 in `tests/basic.sx`. Substrate constraints found: dotted rest params
|
||||
`(a . rest)` and named `let` are unsupported in `lib/scheme/eval.sx`, so
|
||||
combinators use `(lambda args ...)` variadics + top-level recursion. Scheme
|
||||
strings come back boxed as `{:scm-string "..."}` — unwrap with `(get s :scm-string)`.
|
||||
|
||||
- **Phases 2-4.** Control flow (branch/retry/timeout/try-catch + fail-value error
|
||||
model), then the showcase: durable suspend/resume. Guest call/cc is escape-only
|
||||
(re-entry hangs), so resume uses **deterministic replay** — re-run the flow,
|
||||
replaying resolved suspends from a `(tag value)` log; only plain data persists, so
|
||||
flows survive a wiped store (crash recovery) and a move to another instance
|
||||
(replication + handoff). Phase 4 models the fed-sx boundary with a mock peer
|
||||
registry. Timeout is a cooperative step budget (no wall clock in pure SX). Test
|
||||
harness reuses one env with a per-test reset for speed.
|
||||
|
||||
- **Phases 5-7 + docs.** Operational API (introspection, hygiene), combinator
|
||||
library (tap/recover/map-flow/while/until), railway `attempt`, end-to-end
|
||||
integration suite, and `lib/flow/README.md` (full API reference + replay-semantics
|
||||
contract). **151/151 across 10 suites.** Conformance sx_server timeout raised to
|
||||
540s for the 10-suite run under shared-machine CPU contention.
|
||||
|
||||
## Blockers
|
||||
|
||||
(none)
|
||||
112
plans/mod-on-sx.md
Normal file
112
plans/mod-on-sx.md
Normal file
@@ -0,0 +1,112 @@
|
||||
# mod-on-sx: Moderation on Prolog
|
||||
|
||||
rose-ash needs moderation infrastructure: reports flagged by users, automated
|
||||
classifications (spam, abuse), tiered escalation (auto → human → appeal), audit
|
||||
trails. Each decision is the conclusion of a backtracking search over evidence and
|
||||
policy rules — exactly what Prolog does.
|
||||
|
||||
Where acl-sx says "may this happen?", mod-sx says "should this stay?" The former is
|
||||
a positive decision (proof of grant); the latter often a negative one (proof of
|
||||
violation), and policy chains naturally backtrack: if the first rule doesn't apply,
|
||||
try the next.
|
||||
|
||||
End-state: a Prolog-on-SX layer for moderation policy declaration and evaluation,
|
||||
with persistent report lifecycle, audit log, escalation state machine, and
|
||||
federation extension.
|
||||
|
||||
## Status (rolling)
|
||||
|
||||
`bash lib/mod/conformance.sh` → **0/0** (not yet started)
|
||||
|
||||
## Ground rules
|
||||
|
||||
- **Scope:** only touch `lib/mod/**` and `plans/mod-on-sx.md`. Do **not** edit
|
||||
`spec/`, `hosts/`, `shared/`, `lib/prolog/**`, or other `lib/<lang>/`. You may
|
||||
**import** from `lib/prolog/` (public API in `lib/prolog/prolog.sx`); do **not**
|
||||
modify Prolog.
|
||||
- **Shared-file issues** go under "Blockers" with a minimal repro; do not fix here.
|
||||
- **SX files:** use `sx-tree` MCP tools only.
|
||||
- **Architecture:** policies are Prolog rules over `report(...)` and `evidence(...)`
|
||||
facts. Decisions are query results. Proof trees become audit records. The state
|
||||
machine for report lifecycle is separate (an SX module on top).
|
||||
- **Shared with acl-sx:** rule-engine plumbing may be liftable into `lib/guest/`.
|
||||
Watch for it; flag in Progress log but do not extract until both subsystems are
|
||||
past Phase 2.
|
||||
- **Commits:** one feature per commit. Keep Progress log updated and tick boxes.
|
||||
|
||||
## Architecture sketch
|
||||
|
||||
```
|
||||
Report Decision
|
||||
{:by :about :reason :at} {:action :proof :next-state}
|
||||
│ ▲
|
||||
▼ │
|
||||
lib/mod/schema.sx lib/mod/engine.sx
|
||||
— report/4, evidence/2, — query Prolog with report fact
|
||||
classification/3 predicates — extract proof tree
|
||||
│ ▲
|
||||
▼ │
|
||||
lib/mod/policy.sx lib/mod/lifecycle.sx
|
||||
— rule syntax → Prolog — state machine
|
||||
— action heads: — open → triaged → decided
|
||||
{:keep :hide :remove — appeal handling
|
||||
:escalate :ban} │
|
||||
│ ▼
|
||||
▼ lib/mod/audit.sx
|
||||
lib/mod/api.sx — append-only decision log
|
||||
— (mod/report ...) — proof tree persistence
|
||||
— (mod/decide report) — query API
|
||||
— (mod/appeal id)
|
||||
│
|
||||
▼
|
||||
lib/mod/fed.sx
|
||||
— cross-instance reports via fed-sx
|
||||
— decision sharing / trust model
|
||||
```
|
||||
|
||||
## Phase 1 — Report representation + simple policy
|
||||
|
||||
- [ ] `lib/mod/schema.sx` — `report(id, by, about, reason)`, `evidence(id, kind, val)`,
|
||||
`policy-action(report, action)` predicates as Prolog facts/rules
|
||||
- [ ] `lib/mod/policy.sx` — rule declarations: `(defrule action :when conditions)`
|
||||
desugars to Prolog clause
|
||||
- [ ] `lib/mod/engine.sx` — `(decide report-id)` runs Prolog query, returns first
|
||||
matching action
|
||||
- [ ] `lib/mod/api.sx` — `(mod/report by about reason)`, `(mod/decide id)`
|
||||
- [ ] `lib/mod/tests/decide.sx` — 15+ cases: spam keyword → hide, repeated reports →
|
||||
escalate, no rule matches → keep
|
||||
- [ ] `lib/mod/scoreboard.{json,md}`
|
||||
- [ ] `lib/mod/conformance.sh`
|
||||
|
||||
## Phase 2 — Evidence + audit trail
|
||||
|
||||
- [ ] evidence accumulation — additional facts asserted before query
|
||||
- [ ] proof tree from Prolog derivation tree
|
||||
- [ ] `lib/mod/audit.sx` — append-only log (decision + proof + evidence snapshot)
|
||||
- [ ] `(mod/audit id)` retrieval
|
||||
- [ ] `lib/mod/tests/audit.sx` — proof correctness, trail completeness
|
||||
|
||||
## Phase 3 — Escalation + lifecycle state machine
|
||||
|
||||
- [ ] state machine: `:open → :triaged → :decided → :appealed → :final`
|
||||
- [ ] auto-tier: first-pass rules decide quick cases
|
||||
- [ ] human-tier: rules that emit `:escalate` move to next state
|
||||
- [ ] appeal: re-runs with appeal evidence, may override prior decision
|
||||
- [ ] `(mod/appeal id new-evidence)` API
|
||||
- [ ] `lib/mod/tests/escalation.sx` — full lifecycle traversal cases
|
||||
|
||||
## Phase 4 — Federation
|
||||
|
||||
- [ ] cross-instance reports — peer raises report about local content (or vice versa)
|
||||
- [ ] decision sharing — actions taken locally propagate to peers via fed-sx
|
||||
- [ ] trust model — peer's decision is advisory unless `(trust peer :mod)` is granted
|
||||
- [ ] revocation — undo applied moderation if proof was invalidated
|
||||
- [ ] `lib/mod/tests/fed.sx` — federated decision chains (mock fed-sx in tests)
|
||||
|
||||
## Progress log
|
||||
|
||||
(loop fills this in)
|
||||
|
||||
## Blockers
|
||||
|
||||
(loop fills this in)
|
||||
106
plans/search-on-sx.md
Normal file
106
plans/search-on-sx.md
Normal file
@@ -0,0 +1,106 @@
|
||||
# search-on-sx: Full-text + structured search on Haskell
|
||||
|
||||
rose-ash needs search across pages, posts, threads, federated content. Tokenize,
|
||||
index, query, rank, filter by visibility. Typed ADTs make query parsing clean,
|
||||
lazy lists make posting-list iteration efficient, and Haskell-on-SX is at 1514/1514.
|
||||
|
||||
End-state: a Haskell-on-SX layer with inverted index, query AST, boolean +
|
||||
phrase + ranked queries (TF-IDF, BM25), ACL-aware post-filter, and a federation
|
||||
extension that merges per-peer indices.
|
||||
|
||||
## Status (rolling)
|
||||
|
||||
`bash lib/search/conformance.sh` → **0/0** (not yet started)
|
||||
|
||||
## Ground rules
|
||||
|
||||
- **Scope:** only touch `lib/search/**` and `plans/search-on-sx.md`. Do **not** edit
|
||||
`spec/`, `hosts/`, `shared/`, `lib/haskell/**`, or other `lib/<lang>/`. You may
|
||||
**import** from `lib/haskell/` (public API in `lib/haskell/haskell.sx`); do **not**
|
||||
modify Haskell.
|
||||
- **Shared-file issues** go under "Blockers" with a minimal repro; do not fix here.
|
||||
- **SX files:** use `sx-tree` MCP tools only.
|
||||
- **Architecture:** index = `Map Term [(DocId, [Pos])]`. Query AST = ADT. Eval =
|
||||
fold of posting lists with set ops + ranking math. Ranking is pure (no IO until
|
||||
result emission).
|
||||
- **Commits:** one feature per commit. Keep Progress log updated and tick boxes.
|
||||
|
||||
## Architecture sketch
|
||||
|
||||
```
|
||||
Document Query
|
||||
{:id :text :tags} "alice AND bob OR phrase \"x y\""
|
||||
│ │
|
||||
▼ ▼
|
||||
lib/search/tokenize.sx lib/search/parse.sx
|
||||
— tokenize :: Text → [Term] — parse :: Text → Query
|
||||
— normalize (lowercase, strip) — Query = Term | And | Or
|
||||
— (optionally) stem | Not | Phrase
|
||||
│ │
|
||||
▼ ▼
|
||||
lib/search/index.sx lib/search/eval.sx
|
||||
— Map Term [(DocId, [Pos])] — eval :: Index → Query → [DocId]
|
||||
— insert / delete / lookup — boolean + phrase positions
|
||||
— persistence (optional later) │
|
||||
│ ▼
|
||||
└────────────────► lib/search/rank.sx
|
||||
— TF-IDF / BM25 scoring
|
||||
— top-N
|
||||
│
|
||||
▼
|
||||
lib/search/api.sx
|
||||
— (search/index doc)
|
||||
— (search/query q)
|
||||
— (search/top n q)
|
||||
│
|
||||
▼
|
||||
lib/search/fed.sx
|
||||
— federated query (merge peer results)
|
||||
— ACL filter post-merge
|
||||
```
|
||||
|
||||
## Phase 1 — Tokenize + index
|
||||
|
||||
- [ ] `lib/search/tokenize.sx` — normalize (lowercase, strip punctuation), split on
|
||||
whitespace, return positions
|
||||
- [ ] `lib/search/index.sx` — inverted index data structure (typed `Map` from
|
||||
haskell lib); `insert`, `delete`, `lookup`
|
||||
- [ ] `lib/search/api.sx` — `(search/index doc)`, `(search/lookup term)`
|
||||
- [ ] `lib/search/tests/index.sx` — 15+ cases: tokenize, insert + lookup, update,
|
||||
delete, multi-doc
|
||||
- [ ] `lib/search/scoreboard.{json,md}`
|
||||
- [ ] `lib/search/conformance.sh`
|
||||
|
||||
## Phase 2 — Query AST + boolean evaluation
|
||||
|
||||
- [ ] Query ADT: `Term Text | And Query Query | Or Query Query | Not Query |
|
||||
Phrase [Text]`
|
||||
- [ ] `lib/search/parse.sx` — query syntax parser (boolean operators, quoted phrases)
|
||||
- [ ] `lib/search/eval.sx` — boolean eval via set ops on posting lists
|
||||
- [ ] phrase eval — adjacency check using positions
|
||||
- [ ] `lib/search/tests/boolean.sx` — 25+ cases: term, and, or, not, phrase,
|
||||
composition, parser edge cases
|
||||
|
||||
## Phase 3 — Ranking
|
||||
|
||||
- [ ] document frequency tracking — extend index with `df` per term
|
||||
- [ ] TF-IDF scoring
|
||||
- [ ] BM25 scoring (configurable k1, b)
|
||||
- [ ] top-N retrieval (heap-based)
|
||||
- [ ] `lib/search/tests/rank.sx` — 20+ cases: TF-IDF behavior, BM25 vs TF-IDF,
|
||||
ranking stability, top-N correctness
|
||||
|
||||
## Phase 4 — ACL filter + federation
|
||||
|
||||
- [ ] post-filter — each candidate result tested via `(acl/permit? viewer :read doc)`
|
||||
- [ ] federated query — fan out to peer instances via fed-sx, merge results
|
||||
- [ ] merge policy — interleave by rank, dedupe by `(peer, doc-id)`
|
||||
- [ ] `lib/search/tests/integration.sx` — federated search with ACL filter
|
||||
|
||||
## Progress log
|
||||
|
||||
(loop fills this in)
|
||||
|
||||
## Blockers
|
||||
|
||||
(loop fills this in)
|
||||
Reference in New Issue
Block a user