From 3cbf33d2d2db71622e75b9b7a09fd3b442a84dd3 Mon Sep 17 00:00:00 2001 From: giles Date: Sat, 6 Jun 2026 19:24:16 +0000 Subject: [PATCH] flow: host integration ABI (request/await/host-queue) + 11 tests (Phase 8) The seam for hooking flow to art-dag and human-in-the-loop later. (request kind payload) suspends with a typed (flow-request kind payload) envelope and returns the host's resume value; await-human/await-render sugar. (flow-host-requests) is the host work queue: (id kind payload) for every suspended flow awaiting a host effect; request?/request-kind/request-payload parse a tag. Tests include the art-dag-shaped driver loop (render -> human-review -> publish). Host owns IO+persistence; flow only requests (replay-safe). 162/162 across 11 suites. Co-Authored-By: Claude Opus 4.8 (1M context) --- lib/flow/api.sx | 9 +++-- lib/flow/conformance.sh | 2 + lib/flow/host.sx | 35 ++++++++++++++++ lib/flow/scoreboard.json | 9 +++-- lib/flow/scoreboard.md | 3 +- lib/flow/tests/host.sx | 87 ++++++++++++++++++++++++++++++++++++++++ plans/flow-on-sx.md | 23 ++++++++++- 7 files changed, 159 insertions(+), 9 deletions(-) create mode 100644 lib/flow/host.sx create mode 100644 lib/flow/tests/host.sx diff --git a/lib/flow/api.sx b/lib/flow/api.sx index d3e8dce3..b6feca69 100644 --- a/lib/flow/api.sx +++ b/lib/flow/api.sx @@ -1,8 +1,9 @@ ;; lib/flow/api.sx — flow runtime entry points. ;; ;; Builds a Scheme env preloaded with the flow combinators (lib/flow/spec.sx), -;; the durable store + lifecycle (lib/flow/store.sx), and the fed-sx remote layer -;; (lib/flow/remote.sx), and provides SX helpers to run flow programs. +;; the durable store + lifecycle (lib/flow/store.sx), the fed-sx remote layer +;; (lib/flow/remote.sx), and the host integration ABI (lib/flow/host.sx), and +;; provides SX helpers to run flow programs. ;; ;; Scheme-level API (available inside flow programs): ;; (flow/start flow input) — run a flow; raw result if it completes, else @@ -10,10 +11,11 @@ ;; (flow/resume id value) — resume a suspended flow (store.sx) ;; (flow/cancel id) — cancel a flow (store.sx) ;; (suspend tag) — suspension point (spec.sx) +;; (request kind payload) — host request envelope over suspend (host.sx) ;; (remote-node addr fn) — node executed on a federation peer (remote.sx) ;; ;; SX-level helpers (for hosts and tests): -;; (flow-make-env) — fresh standard env + combinators + store + remote +;; (flow-make-env) — fresh standard env + combinators + store + remote + host ;; (flow-run src) — eval a Scheme program string in a reset shared env ;; (flow-run-in env src) — eval a Scheme program string in a given env ;; @@ -31,6 +33,7 @@ (flow-load-combinators! env) (flow-load-store! env) (flow-load-remote! env) + (flow-load-host! env) env))) (define diff --git a/lib/flow/conformance.sh b/lib/flow/conformance.sh index 787c9c9e..31a1d860 100755 --- a/lib/flow/conformance.sh +++ b/lib/flow/conformance.sh @@ -31,6 +31,7 @@ SUITES=( "railway flow-rail-tests-run! lib/flow/tests/railway.sx" "integration flow-int-tests-run! lib/flow/tests/integration.sx" "hygiene flow-hyg-tests-run! lib/flow/tests/hygiene.sx" + "host flow-hst-tests-run! lib/flow/tests/host.sx" ) TMPFILE=$(mktemp); trap "rm -f $TMPFILE" EXIT @@ -49,6 +50,7 @@ emit_eval () { echo "(epoch $EPOCH)"; echo "(eval \"$1\")"; EPOCH=$((EPOCH+1)); emit_load "lib/flow/spec.sx" emit_load "lib/flow/store.sx" emit_load "lib/flow/remote.sx" + emit_load "lib/flow/host.sx" emit_load "lib/flow/api.sx" for SUITE in "${SUITES[@]}"; do read -r _NAME _RUNNER FILE <<< "$SUITE" diff --git a/lib/flow/host.sx b/lib/flow/host.sx new file mode 100644 index 00000000..28e3b556 --- /dev/null +++ b/lib/flow/host.sx @@ -0,0 +1,35 @@ +;; lib/flow/host.sx — the host integration ABI (Phase 8). +;; +;; `suspend` is flow's seam to the outside world, but a bare (suspend tag) is just a +;; signal — every author would invent their own tag shape. This layer defines a +;; stable request/response contract so a host (e.g. an art-dag driver, or a human +;; review UI) can hook in WITHOUT reverse-engineering ad-hoc tags. +;; +;; A flow asks the host to do something and waits for the answer: +;; (request kind payload) — suspend with a typed envelope (flow-request kind +;; payload); evaluates to the host's resume value. +;; (await-human prompt) — request kind=human (a decision point) +;; (await-render recipe) — request kind=render (e.g. an art-dag job) +;; (await-effect kind p) — request of an arbitrary kind +;; +;; The host drives flows by polling its work queue and resuming: +;; (flow-host-requests) — ((id kind payload) ...) for every SUSPENDED flow whose +;; waiting tag is a host request. The host dispatches by kind (render -> submit a +;; Celery job; human -> show UI), then calls (flow/resume id answer). +;; (request? tag) / (request-kind tag) / (request-payload tag) — parse one tag. +;; +;; Contract: the host owns IO and persistence. flow stays deterministic — a flow +;; never performs IO itself, it only `request`s; the host performs the effect and +;; feeds the result back via resume (which the replay log records, so the effect is +;; not re-run on recovery). Persist with flow-store-export after each transition and +;; flow-store-import! on boot. + +(define + flow-host-src + "(define (request kind payload) (suspend (list (quote flow-request) kind payload)))\n (define (request? tag) (and (pair? tag) (eq? (car tag) (quote flow-request))))\n (define (request-kind tag) (car (cdr tag)))\n (define (request-payload tag) (car (cdr (cdr tag))))\n (define (await-human prompt) (request (quote human) prompt))\n (define (await-render recipe) (request (quote render) recipe))\n (define (await-effect kind payload) (request kind payload))\n (define (flow-host-req-step pend)\n (if (null? pend)\n (list)\n (let ((id (car (car pend))) (tag (car (cdr (car pend)))))\n (if (request? tag)\n (cons (list id (request-kind tag) (request-payload tag))\n (flow-host-req-step (cdr pend)))\n (flow-host-req-step (cdr pend))))))\n (define (flow-host-requests) (flow-host-req-step (flow/pending)))") + +(define + flow-load-host! + (fn + (env) + (begin (scheme-eval-program (scheme-parse-all flow-host-src) env) env))) diff --git a/lib/flow/scoreboard.json b/lib/flow/scoreboard.json index 012c8bfb..8cedc169 100644 --- a/lib/flow/scoreboard.json +++ b/lib/flow/scoreboard.json @@ -1,6 +1,6 @@ { - "total": 151, - "passed": 151, + "total": 162, + "passed": 162, "failed": 0, "suites": { "basic": { "passed": 18, "total": 18 }, @@ -12,7 +12,8 @@ "combinators": { "passed": 17, "total": 17 }, "railway": { "passed": 10, "total": 10 }, "integration": { "passed": 10, "total": 10 }, - "hygiene": { "passed": 9, "total": 9 } + "hygiene": { "passed": 9, "total": 9 }, + "host": { "passed": 11, "total": 11 } }, - "phases": { "phase1": "done", "phase2": "done", "phase3": "done", "phase4": "done", "phase5": "done", "phase6": "done", "phase7": "done" } + "phases": { "phase1": "done", "phase2": "done", "phase3": "done", "phase4": "done", "phase5": "done", "phase6": "done", "phase7": "done", "phase8": "done" } } diff --git a/lib/flow/scoreboard.md b/lib/flow/scoreboard.md index 41eaf45d..7ee0da8d 100644 --- a/lib/flow/scoreboard.md +++ b/lib/flow/scoreboard.md @@ -1,6 +1,6 @@ # flow-on-sx Scoreboard -**All tests pass: 151 / 151 across 10 suites. Phases 1-7 complete.** +**All tests pass: 162 / 162 across 11 suites. Phases 1-8 complete.** `bash lib/flow/conformance.sh` @@ -18,6 +18,7 @@ | railway | 10 | Phase 6: `attempt` — fail-value short-circuiting sequence + recover rejoin | | integration | 10 | Phase 7: end-to-end order + onboarding flows composing every phase (suspend, branch, federation, crash recovery, handoff, introspection) | | hygiene | 9 | Phase 5: `flow/gc` (prune terminal flows), `flow/forget` (drop one terminal record) | +| host | 11 | Phase 8: host ABI — `request`/`await-human`/`await-render`, `flow-host-requests` work queue; art-dag-shaped driver loop | ## Architecture diff --git a/lib/flow/tests/host.sx b/lib/flow/tests/host.sx new file mode 100644 index 00000000..74ec3b5d --- /dev/null +++ b/lib/flow/tests/host.sx @@ -0,0 +1,87 @@ +;; lib/flow/tests/host.sx — Phase 8: host integration ABI (request/await/host-queue). + +(define flow-hst-pass 0) +(define flow-hst-fail 0) +(define flow-hst-fails (list)) + +(define + flow-hst-test + (fn + (name actual expected) + (if + (= actual expected) + (set! flow-hst-pass (+ flow-hst-pass 1)) + (begin + (set! flow-hst-fail (+ flow-hst-fail 1)) + (append! flow-hst-fails {:name name :expected expected :actual actual}))))) + +(define flow-hst (fn (src) (flow-run src))) + +;; ── request envelope ──────────────────────────────────────────── +(flow-hst-test + "request: suspends with a typed envelope" + (flow-hst + "(car (cdr (cdr (flow/start (lambda (x) (request (quote render) x)) 5))))") + (list "flow-request" "render" 5)) +(flow-hst-test + "request?: recognizes an envelope" + (flow-hst "(request? (list (quote flow-request) (quote human) 1))") + true) +(flow-hst-test + "request?: a plain tag is not a request" + (flow-hst "(request? (list (quote review) 1))") + false) +(flow-hst-test + "request-kind / request-payload: parse the envelope" + (flow-hst + "(define t (list (quote flow-request) (quote render) (list (quote recipe) 7))) (list (request-kind t) (request-payload t))") + (list "render" (list "recipe" 7))) + +;; ── named decision points ─────────────────────────────────────── +(flow-hst-test + "await-human: is a request of kind human" + (flow-hst + "(car (cdr (cdr (flow/start (lambda (x) (await-human x)) (quote approve?)))))") + (list "flow-request" "human" "approve?")) +(flow-hst-test + "await-render: is a request of kind render" + (flow-hst + "(car (cdr (cdr (flow/start (lambda (x) (await-render x)) (quote recipe)))))") + (list "flow-request" "render" "recipe")) +(flow-hst-test + "request: the host's resume value flows back into the flow" + (flow-hst + "(defflow f (sequence (lambda (x) (await-render x)) (lambda (art) (list (quote got) art)))) (define id (car (cdr (flow/start f 1)))) (flow/resume id (quote the-artifact))") + (list "got" "the-artifact")) + +;; ── host work queue ───────────────────────────────────────────── +(flow-hst-test + "flow-host-requests: lists (id kind payload) for pending requests" + (flow-hst + "(flow/start (lambda (x) (await-render x)) 99) (flow-host-requests)") + (list (list 1 "render" 99))) +(flow-hst-test + "flow-host-requests: excludes bare (non-request) suspends" + (flow-hst + "(defflow a (lambda (x) (await-render x))) (defflow b (lambda (x) (suspend (quote plain)))) (flow/start a 1) (flow/start b 2) (flow-host-requests)") + (list (list 1 "render" 1))) + +;; ── the art-dag-shaped host driver loop ───────────────────────── +;; A host: poll requests, dispatch by kind (render -> compute; human -> decide), +;; resume with the result. Drives a render -> human-review -> publish pipeline. +(flow-hst-test + "host driver: render then human-review then publish" + (flow-hst + "(defflow pipeline (sequence (lambda (recipe) (await-render recipe)) (lambda (art) (await-human (list (quote review) art))) (branch (lambda (d) (eq? d (quote approve))) (flow-const (quote published)) (flow-const (fail (quote rejected)))))) (define id (car (cdr (flow/start pipeline 99)))) (define r1 (flow-host-requests)) (flow/resume id (list (quote art) 99)) (define r2 (flow-host-requests)) (flow/resume id (quote approve)) (list r1 r2 (flow/status id) (flow/result id))") + (list + (list (list 1 "render" 99)) + (list (list 1 "human" (list "review" (list "art" 99)))) + "done" + "published")) +(flow-hst-test + "host driver: rejection at the human gate yields a failure" + (flow-hst + "(defflow pipeline (sequence (lambda (recipe) (await-render recipe)) (lambda (art) (await-human (list (quote review) art))) (branch (lambda (d) (eq? d (quote approve))) (flow-const (quote published)) (flow-const (fail (quote rejected)))))) (define id (car (cdr (flow/start pipeline 1)))) (flow/resume id (quote artifact)) (failed? (flow/resume id (quote reject)))") + true) + +(define flow-hst-tests-run! (fn () {:total (+ flow-hst-pass flow-hst-fail) :passed flow-hst-pass :failed flow-hst-fail :fails flow-hst-fails})) diff --git a/plans/flow-on-sx.md b/plans/flow-on-sx.md index c14d8631..2d336da6 100644 --- a/plans/flow-on-sx.md +++ b/plans/flow-on-sx.md @@ -16,7 +16,7 @@ federation extension via fed-sx for remote-node execution. ## Status (rolling) -`bash lib/flow/conformance.sh` → **151/151** (Phases 1-7 complete; +store hygiene) +`bash lib/flow/conformance.sh` → **162/162** (Phases 1-8 complete; host ABI for art-dag) ## Ground rules @@ -162,6 +162,27 @@ Make the `(fail reason)` value channel compose into real validation/ETL pipeline - [x] `lib/flow/tests/railway.sx` — 10 cases: fail short-circuiting, no-run-after- failure, recover rejoin, validation pipeline reporting the failing stage +## Phase 8 — Host integration ABI (art-dag / human-in-the-loop) + +`suspend` is the seam to the outside world, but a bare tag is an ad-hoc convention. +This phase defines a stable request/response contract a host (an art-dag driver, a +review UI) codes against — so flow can orchestrate art-dag with human decision +points later without reverse-engineering tag shapes. `lib/flow/host.sx`. + +- [x] `(request kind payload)` — suspend with a typed `(flow-request kind payload)` + envelope; evaluates to the host's resume value. `await-human`/`await-render`/ + `await-effect` sugar. +- [x] `(flow-host-requests)` — the host work queue: `(id kind payload)` for every + suspended flow waiting on a host request; `request?`/`request-kind`/ + `request-payload` parse a tag. +- [x] `lib/flow/tests/host.sx` — 11 cases incl. the art-dag-shaped driver loop + (render → human-review → publish, driven by polling the queue + resume). +- Contract (documented in `host.sx` + README): the host owns IO + persistence; a + flow never does IO, it only `request`s; the host performs the effect and feeds the + result back via resume (logged, so not re-run on recovery). NOT done here (host + side, out of `lib/flow` scope): the real Celery/IPFS bridge and a persistent store + backend — those live in the art-dag integration, coding against this ABI. + ## Phase 7 — End-to-end integration Prove the phases compose: realistic flows exercising attempt + suspend + branch +