diff --git a/lib/flow/conformance.sh b/lib/flow/conformance.sh index f717029b..b3f5f143 100755 --- a/lib/flow/conformance.sh +++ b/lib/flow/conformance.sh @@ -26,6 +26,7 @@ SUITES=( "suspend flow-sus-tests-run! lib/flow/tests/suspend.sx" "recovery flow-rec-tests-run! lib/flow/tests/recovery.sx" "distributed flow-dist-tests-run! lib/flow/tests/distributed.sx" + "api flow-api-tests-run! lib/flow/tests/api.sx" ) TMPFILE=$(mktemp); trap "rm -f $TMPFILE" EXIT diff --git a/lib/flow/scoreboard.json b/lib/flow/scoreboard.json index a31a9169..b8ccffb4 100644 --- a/lib/flow/scoreboard.json +++ b/lib/flow/scoreboard.json @@ -1,13 +1,14 @@ { - "total": 93, - "passed": 93, + "total": 105, + "passed": 105, "failed": 0, "suites": { "basic": { "passed": 18, "total": 18 }, "control": { "passed": 31, "total": 31 }, "suspend": { "passed": 17, "total": 17 }, "recovery": { "passed": 8, "total": 8 }, - "distributed": { "passed": 19, "total": 19 } + "distributed": { "passed": 19, "total": 19 }, + "api": { "passed": 12, "total": 12 } }, - "phases": { "phase1": "done", "phase2": "done", "phase3": "done", "phase4": "done" } + "phases": { "phase1": "done", "phase2": "done", "phase3": "done", "phase4": "done", "phase5": "in-progress" } } diff --git a/lib/flow/scoreboard.md b/lib/flow/scoreboard.md index 1d2e832d..c1f0d720 100644 --- a/lib/flow/scoreboard.md +++ b/lib/flow/scoreboard.md @@ -1,6 +1,6 @@ # flow-on-sx Scoreboard -**All tests pass: 93 / 93 across 5 suites. All four phases complete.** +**All tests pass: 105 / 105 across 6 suites. Phases 1-4 complete; Phase 5 in progress.** `bash lib/flow/conformance.sh` @@ -13,6 +13,7 @@ | suspend | 17 | Phase 3: suspend/resume/cancel via deterministic replay; multi-step, replay determinism, lifecycle guards, suspend-in-branch | | recovery | 8 | Phase 3: crash recovery — store export/import, resumable scan, restart-at-every-step, replay-log survival | | distributed | 19 | Phase 4: `remote-node` (7); `remote-failover` (6); replication + handoff across instances (6) | +| api | 12 | Phase 5: introspection — `flow/status`, `flow/result`, `flow/list`, `flow/pending` | ## Architecture @@ -42,5 +43,6 @@ capture the flow continuation directly. - [x] Phase 2 — Control flow + error handling (branch, error model, try-catch, retry, timeout) - [x] Phase 3 — Suspend/resume (suspend/resume/cancel + crash recovery via deterministic replay) - [x] Phase 4 — Distributed nodes via fed-sx (remote-node, failover, replication + handoff) +- [~] Phase 5 — Operational API + combinators (introspection done; tap/recover/map-flow next) - [ ] Phase 3 — Suspend / resume (the showcase) - [ ] Phase 4 — Distributed nodes via fed-sx diff --git a/lib/flow/store.sx b/lib/flow/store.sx index 37284524..b88e6c5b 100644 --- a/lib/flow/store.sx +++ b/lib/flow/store.sx @@ -23,10 +23,15 @@ ;; (flow-store-export) — store as plain data (procs nulled) ;; (flow-store-import! d) — replace the store from exported data ;; (flow-resumable-ids) — ids of suspended (resumable) flows +;; Introspection (Phase 5): +;; (flow/status id) — done | suspended | cancelled | unknown +;; (flow/result id) — result if done, else (flow-error reason) +;; (flow/list) — list of (id status) for every flow +;; (flow/pending) — list of (id waiting-tag) for suspended flows (define flow-store-src - "(define flow-registry (list))\n (define (flow-register! name proc) (set! flow-registry (cons (list name proc) flow-registry)))\n (define (flow-lookup-in name reg)\n (if (null? reg)\n #f\n (if (eq? (car (car reg)) name) (car (cdr (car reg))) (flow-lookup-in name (cdr reg)))))\n (define (flow-lookup name) (flow-lookup-in name flow-registry))\n (define (flow-name-of proc reg)\n (if (null? reg)\n #f\n (if (eq? (car (cdr (car reg))) proc) (car (car reg)) (flow-name-of proc (cdr reg)))))\n\n (define flow-store (list))\n (define flow-next-id 0)\n (define (flow-store-remove id store)\n (if (null? store)\n (list)\n (if (= (car (car store)) id)\n (flow-store-remove id (cdr store))\n (cons (car store) (flow-store-remove id (cdr store))))))\n (define (flow-store-put! id rec) (set! flow-store (cons (list id rec) (flow-store-remove id flow-store))))\n (define (flow-store-find id store)\n (if (null? store)\n (list)\n (if (= (car (car store)) id)\n (car (cdr (car store)))\n (flow-store-find id (cdr store)))))\n (define (flow-store-get id) (flow-store-find id flow-store))\n\n (define (flow-mk-rec name proc input log status payload)\n (list name proc input log status payload))\n (define (flow-rec-name r) (car r))\n (define (flow-rec-proc r) (car (cdr r)))\n (define (flow-rec-input r) (car (cdr (cdr r))))\n (define (flow-rec-log r) (car (cdr (cdr (cdr r)))))\n (define (flow-rec-status r) (car (cdr (cdr (cdr (cdr r))))))\n (define (flow-rec-payload r) (car (cdr (cdr (cdr (cdr (cdr r)))))))\n (define (flow-rec-resolve rec)\n (let ((byname (flow-lookup (flow-rec-name rec))))\n (if byname byname (flow-rec-proc rec))))\n\n (define (flow-outcome id name proc input log outcome)\n (if (eq? (car outcome) (quote flow-done))\n (begin\n (flow-store-put! id (flow-mk-rec name proc input log (quote done) (car (cdr outcome))))\n (car (cdr outcome)))\n (begin\n (flow-store-put! id (flow-mk-rec name proc input log (quote suspended) (car (cdr outcome))))\n (list (quote flow-suspended) id (car (cdr outcome))))))\n (define (flow/start flow input)\n (set! flow-next-id (+ flow-next-id 1))\n (flow-outcome flow-next-id (flow-name-of flow flow-registry) flow input (list)\n (flow-drive flow input (list))))\n (define (flow/resume id value)\n (let ((rec (flow-store-get id)))\n (if (null? rec)\n (list (quote flow-error) (quote no-such-flow))\n (if (eq? (flow-rec-status rec) (quote suspended))\n (let ((proc (flow-rec-resolve rec)))\n (let ((newlog (cons (list (flow-rec-payload rec) value) (flow-rec-log rec))))\n (flow-outcome id (flow-rec-name rec) proc (flow-rec-input rec) newlog\n (flow-drive proc (flow-rec-input rec) newlog))))\n (list (quote flow-error) (quote not-suspended))))))\n (define (flow/cancel id)\n (let ((rec (flow-store-get id)))\n (if (null? rec)\n (list (quote flow-error) (quote no-such-flow))\n (begin\n (flow-store-put! id\n (flow-mk-rec (flow-rec-name rec) (flow-rec-proc rec) (flow-rec-input rec)\n (flow-rec-log rec) (quote cancelled) #f))\n (list (quote flow-cancelled) id)))))\n\n (define (flow-export-entry entry)\n (let ((rec (car (cdr entry))))\n (list (car entry)\n (flow-mk-rec (flow-rec-name rec) #f (flow-rec-input rec)\n (flow-rec-log rec) (flow-rec-status rec) (flow-rec-payload rec)))))\n (define (flow-export-map store)\n (if (null? store) (list) (cons (flow-export-entry (car store)) (flow-export-map (cdr store)))))\n (define (flow-store-export) (flow-export-map flow-store))\n (define (flow-max-id store m)\n (if (null? store) m (flow-max-id (cdr store) (if (> (car (car store)) m) (car (car store)) m))))\n (define (flow-store-import! data)\n (begin (set! flow-store data) (set! flow-next-id (flow-max-id data 0))))\n (define (flow-collect-resumable store)\n (if (null? store)\n (list)\n (if (eq? (flow-rec-status (car (cdr (car store)))) (quote suspended))\n (cons (car (car store)) (flow-collect-resumable (cdr store)))\n (flow-collect-resumable (cdr store)))))\n (define (flow-resumable-ids) (flow-collect-resumable flow-store))") + "(define flow-registry (list))\n (define (flow-register! name proc) (set! flow-registry (cons (list name proc) flow-registry)))\n (define (flow-lookup-in name reg)\n (if (null? reg)\n #f\n (if (eq? (car (car reg)) name) (car (cdr (car reg))) (flow-lookup-in name (cdr reg)))))\n (define (flow-lookup name) (flow-lookup-in name flow-registry))\n (define (flow-name-of proc reg)\n (if (null? reg)\n #f\n (if (eq? (car (cdr (car reg))) proc) (car (car reg)) (flow-name-of proc (cdr reg)))))\n\n (define flow-store (list))\n (define flow-next-id 0)\n (define (flow-store-remove id store)\n (if (null? store)\n (list)\n (if (= (car (car store)) id)\n (flow-store-remove id (cdr store))\n (cons (car store) (flow-store-remove id (cdr store))))))\n (define (flow-store-put! id rec) (set! flow-store (cons (list id rec) (flow-store-remove id flow-store))))\n (define (flow-store-find id store)\n (if (null? store)\n (list)\n (if (= (car (car store)) id)\n (car (cdr (car store)))\n (flow-store-find id (cdr store)))))\n (define (flow-store-get id) (flow-store-find id flow-store))\n\n (define (flow-mk-rec name proc input log status payload)\n (list name proc input log status payload))\n (define (flow-rec-name r) (car r))\n (define (flow-rec-proc r) (car (cdr r)))\n (define (flow-rec-input r) (car (cdr (cdr r))))\n (define (flow-rec-log r) (car (cdr (cdr (cdr r)))))\n (define (flow-rec-status r) (car (cdr (cdr (cdr (cdr r))))))\n (define (flow-rec-payload r) (car (cdr (cdr (cdr (cdr (cdr r)))))))\n (define (flow-rec-resolve rec)\n (let ((byname (flow-lookup (flow-rec-name rec))))\n (if byname byname (flow-rec-proc rec))))\n\n (define (flow-outcome id name proc input log outcome)\n (if (eq? (car outcome) (quote flow-done))\n (begin\n (flow-store-put! id (flow-mk-rec name proc input log (quote done) (car (cdr outcome))))\n (car (cdr outcome)))\n (begin\n (flow-store-put! id (flow-mk-rec name proc input log (quote suspended) (car (cdr outcome))))\n (list (quote flow-suspended) id (car (cdr outcome))))))\n (define (flow/start flow input)\n (set! flow-next-id (+ flow-next-id 1))\n (flow-outcome flow-next-id (flow-name-of flow flow-registry) flow input (list)\n (flow-drive flow input (list))))\n (define (flow/resume id value)\n (let ((rec (flow-store-get id)))\n (if (null? rec)\n (list (quote flow-error) (quote no-such-flow))\n (if (eq? (flow-rec-status rec) (quote suspended))\n (let ((proc (flow-rec-resolve rec)))\n (let ((newlog (cons (list (flow-rec-payload rec) value) (flow-rec-log rec))))\n (flow-outcome id (flow-rec-name rec) proc (flow-rec-input rec) newlog\n (flow-drive proc (flow-rec-input rec) newlog))))\n (list (quote flow-error) (quote not-suspended))))))\n (define (flow/cancel id)\n (let ((rec (flow-store-get id)))\n (if (null? rec)\n (list (quote flow-error) (quote no-such-flow))\n (begin\n (flow-store-put! id\n (flow-mk-rec (flow-rec-name rec) (flow-rec-proc rec) (flow-rec-input rec)\n (flow-rec-log rec) (quote cancelled) #f))\n (list (quote flow-cancelled) id)))))\n\n (define (flow-export-entry entry)\n (let ((rec (car (cdr entry))))\n (list (car entry)\n (flow-mk-rec (flow-rec-name rec) #f (flow-rec-input rec)\n (flow-rec-log rec) (flow-rec-status rec) (flow-rec-payload rec)))))\n (define (flow-export-map store)\n (if (null? store) (list) (cons (flow-export-entry (car store)) (flow-export-map (cdr store)))))\n (define (flow-store-export) (flow-export-map flow-store))\n (define (flow-max-id store m)\n (if (null? store) m (flow-max-id (cdr store) (if (> (car (car store)) m) (car (car store)) m))))\n (define (flow-store-import! data)\n (begin (set! flow-store data) (set! flow-next-id (flow-max-id data 0))))\n (define (flow-collect-resumable store)\n (if (null? store)\n (list)\n (if (eq? (flow-rec-status (car (cdr (car store)))) (quote suspended))\n (cons (car (car store)) (flow-collect-resumable (cdr store)))\n (flow-collect-resumable (cdr store)))))\n (define (flow-resumable-ids) (flow-collect-resumable flow-store))\n\n (define (flow/status id)\n (let ((rec (flow-store-get id)))\n (if (null? rec) (quote unknown) (flow-rec-status rec))))\n (define (flow/result id)\n (let ((rec (flow-store-get id)))\n (if (null? rec)\n (list (quote flow-error) (quote no-such-flow))\n (if (eq? (flow-rec-status rec) (quote done))\n (flow-rec-payload rec)\n (list (quote flow-error) (quote not-done))))))\n (define (flow-list-step store)\n (if (null? store)\n (list)\n (cons (list (car (car store)) (flow-rec-status (car (cdr (car store)))))\n (flow-list-step (cdr store)))))\n (define (flow/list) (flow-list-step flow-store))\n (define (flow-pending-step store)\n (if (null? store)\n (list)\n (if (eq? (flow-rec-status (car (cdr (car store)))) (quote suspended))\n (cons (list (car (car store)) (flow-rec-payload (car (cdr (car store)))))\n (flow-pending-step (cdr store)))\n (flow-pending-step (cdr store)))))\n (define (flow/pending) (flow-pending-step flow-store))") (define flow-load-store! diff --git a/lib/flow/tests/api.sx b/lib/flow/tests/api.sx new file mode 100644 index 00000000..6211b0f0 --- /dev/null +++ b/lib/flow/tests/api.sx @@ -0,0 +1,79 @@ +;; lib/flow/tests/api.sx — Phase 5: operational introspection API. + +(define flow-api-pass 0) +(define flow-api-fail 0) +(define flow-api-fails (list)) + +(define + flow-api-test + (fn + (name actual expected) + (if + (= actual expected) + (set! flow-api-pass (+ flow-api-pass 1)) + (begin + (set! flow-api-fail (+ flow-api-fail 1)) + (append! flow-api-fails {:name name :expected expected :actual actual}))))) + +(define flow-a (fn (src) (flow-run src))) + +;; ── flow/status ───────────────────────────────────────────────── +(flow-api-test "status: unknown id" (flow-a "(flow/status 999)") "unknown") +(flow-api-test + "status: suspended flow" + (flow-a + "(defflow w (lambda (x) (suspend (quote q)))) (define id (car (cdr (flow/start w 0)))) (flow/status id)") + "suspended") +(flow-api-test + "status: completed flow" + (flow-a + "(defflow w (sequence (lambda (x) (suspend (quote q))) (lambda (v) v))) (define id (car (cdr (flow/start w 0)))) (flow/resume id 5) (flow/status id)") + "done") +(flow-api-test + "status: cancelled flow" + (flow-a + "(defflow w (lambda (x) (suspend (quote q)))) (define id (car (cdr (flow/start w 0)))) (flow/cancel id) (flow/status id)") + "cancelled") + +;; ── flow/result ───────────────────────────────────────────────── +(flow-api-test + "result: returns the value of a completed flow" + (flow-a + "(defflow w (sequence (lambda (x) (suspend (quote q))) (lambda (v) (list (quote got) v)))) (define id (car (cdr (flow/start w 0)))) (flow/resume id 9) (flow/result id)") + (list "got" 9)) +(flow-api-test + "result: a still-suspended flow has no result" + (flow-a + "(defflow w (lambda (x) (suspend (quote q)))) (define id (car (cdr (flow/start w 0)))) (flow/result id)") + (list "flow-error" "not-done")) +(flow-api-test + "result: unknown id errors" + (flow-a "(flow/result 999)") + (list "flow-error" "no-such-flow")) + +;; ── flow/list ─────────────────────────────────────────────────── +(flow-api-test "list: empty store" (flow-a "(flow/list)") (list)) +(flow-api-test + "list: reports id + status for each flow (newest first)" + (flow-a + "(defflow w (lambda (x) (suspend (quote q)))) (flow/start w 0) (flow/start (lambda (x) (* x 2)) 5) (flow/list)") + (list (list 2 "done") (list 1 "suspended"))) + +;; ── flow/pending ──────────────────────────────────────────────── +(flow-api-test + "pending: lists suspended flows with their waiting tag" + (flow-a + "(defflow w (lambda (x) (suspend (quote review)))) (flow/start w 0) (flow/pending)") + (list (list 1 "review"))) +(flow-api-test + "pending: excludes completed and cancelled flows" + (flow-a + "(defflow w (lambda (x) (suspend (quote q)))) (defflow v (sequence (lambda (x) (suspend (quote r))) (lambda (y) y))) (define i1 (car (cdr (flow/start w 0)))) (define i2 (car (cdr (flow/start v 0)))) (define i3 (car (cdr (flow/start w 0)))) (flow/resume i2 1) (flow/cancel i3) (flow/pending)") + (list (list 1 "q"))) +(flow-api-test + "pending: operator can drain all pending flows" + (flow-a + "(defflow w (sequence (lambda (x) (suspend (quote q))) (lambda (v) (* v 10)))) (flow/start w 0) (flow/start w 0) (define ps (flow/pending)) (flow/resume (car (car ps)) 1) (flow/resume (car (car (cdr ps))) 2) (flow/list)") + (list (list 1 "done") (list 2 "done"))) + +(define flow-api-tests-run! (fn () {:total (+ flow-api-pass flow-api-fail) :passed flow-api-pass :failed flow-api-fail :fails flow-api-fails})) diff --git a/plans/flow-on-sx.md b/plans/flow-on-sx.md index 2a8f2e5d..4c9640b3 100644 --- a/plans/flow-on-sx.md +++ b/plans/flow-on-sx.md @@ -16,7 +16,7 @@ federation extension via fed-sx for remote-node execution. ## Status (rolling) -`bash lib/flow/conformance.sh` → **93/93** (all four phases complete) +`bash lib/flow/conformance.sh` → **105/105** (Phases 1-4 complete; Phase 5 in progress) ## Ground rules @@ -133,6 +133,20 @@ lib/flow/spec.sx lib/flow/runtime.sx lib/flow/store.sx - [x] `lib/flow/tests/distributed.sx` — 19 cases: remote-node, failover, replication, handoff (including replay-log survival across the move) +## Phase 5 — Operational API + combinator library + +The four roadmap phases are complete; this phase rounds out the engine into +something operators and authors actually use. Accumulation, not a rewrite. + +- [x] introspection API — `flow/status id`, `flow/result id`, `flow/list`, + `flow/pending` (operator view of what each suspended flow awaits). 12 tests in + `tests/api.sx`. +- [ ] `tap` — side-effecting pass-through node (logging/metrics) that returns input +- [ ] `recover` — complement to try-catch for the fail-VALUE channel: run node; if it + yields `(fail ...)`, run a recovery node on the reason +- [ ] `map-flow` — run a flow per item of a list, join results (sequential) +- [ ] `lib/flow/tests/api.sx` — introspection + new combinators + ## Progress log - **Phase 1 (combinators + sequential runtime).** Flow built as a Scheme prelude