From 9cfca1d00886e426ca85e2a9f9ef9a06c786e61a Mon Sep 17 00:00:00 2001 From: giles Date: Sat, 6 Jun 2026 19:33:04 +0000 Subject: [PATCH] flow: reference host driver flow-drive-host/flow-run-host + 4 tests Completes the host ABI from work-queue to driver loop: the host supplies only a (kind payload) -> answer dispatch fn; flow-drive-host services one tick of pending requests, flow-run-host ticks until quiescent (bounded). Tested via the art-dag render -> human-review -> publish pipeline driven entirely by flow-run-host. The art-dag integration is now: define dispatch, call flow-run-host. 166/166, 11 suites. Co-Authored-By: Claude Opus 4.8 (1M context) --- lib/flow/host.sx | 9 ++++++++- lib/flow/scoreboard.json | 6 +++--- lib/flow/scoreboard.md | 4 ++-- lib/flow/tests/host.sx | 27 +++++++++++++++++++++++---- plans/flow-on-sx.md | 9 ++++++--- 5 files changed, 42 insertions(+), 13 deletions(-) diff --git a/lib/flow/host.sx b/lib/flow/host.sx index 28e3b556..da92ac7a 100644 --- a/lib/flow/host.sx +++ b/lib/flow/host.sx @@ -18,6 +18,13 @@ ;; Celery job; human -> show UI), then calls (flow/resume id answer). ;; (request? tag) / (request-kind tag) / (request-payload tag) — parse one tag. ;; +;; Reference driver — the host only supplies `dispatch`, a (kind payload) -> answer: +;; (flow-drive-host dispatch) — one tick: service every CURRENTLY pending +;; request (snapshot), resuming each with (dispatch kind payload); returns the +;; count serviced. Resumes may create new requests — serviced on the next tick. +;; (flow-run-host dispatch maxticks) — tick until quiescent (no pending requests) +;; or maxticks reached; returns total requests serviced. Bounded for determinism. +;; ;; Contract: the host owns IO and persistence. flow stays deterministic — a flow ;; never performs IO itself, it only `request`s; the host performs the effect and ;; feeds the result back via resume (which the replay log records, so the effect is @@ -26,7 +33,7 @@ (define flow-host-src - "(define (request kind payload) (suspend (list (quote flow-request) kind payload)))\n (define (request? tag) (and (pair? tag) (eq? (car tag) (quote flow-request))))\n (define (request-kind tag) (car (cdr tag)))\n (define (request-payload tag) (car (cdr (cdr tag))))\n (define (await-human prompt) (request (quote human) prompt))\n (define (await-render recipe) (request (quote render) recipe))\n (define (await-effect kind payload) (request kind payload))\n (define (flow-host-req-step pend)\n (if (null? pend)\n (list)\n (let ((id (car (car pend))) (tag (car (cdr (car pend)))))\n (if (request? tag)\n (cons (list id (request-kind tag) (request-payload tag))\n (flow-host-req-step (cdr pend)))\n (flow-host-req-step (cdr pend))))))\n (define (flow-host-requests) (flow-host-req-step (flow/pending)))") + "(define (request kind payload) (suspend (list (quote flow-request) kind payload)))\n (define (request? tag) (and (pair? tag) (eq? (car tag) (quote flow-request))))\n (define (request-kind tag) (car (cdr tag)))\n (define (request-payload tag) (car (cdr (cdr tag))))\n (define (await-human prompt) (request (quote human) prompt))\n (define (await-render recipe) (request (quote render) recipe))\n (define (await-effect kind payload) (request kind payload))\n (define (flow-host-req-step pend)\n (if (null? pend)\n (list)\n (let ((id (car (car pend))) (tag (car (cdr (car pend)))))\n (if (request? tag)\n (cons (list id (request-kind tag) (request-payload tag))\n (flow-host-req-step (cdr pend)))\n (flow-host-req-step (cdr pend))))))\n (define (flow-host-requests) (flow-host-req-step (flow/pending)))\n (define (flow-drive-host-step reqs dispatch)\n (if (null? reqs)\n 0\n (begin\n (flow/resume (car (car reqs)) (dispatch (car (cdr (car reqs))) (car (cdr (cdr (car reqs))))))\n (+ 1 (flow-drive-host-step (cdr reqs) dispatch)))))\n (define (flow-drive-host dispatch) (flow-drive-host-step (flow-host-requests) dispatch))\n (define (flow-run-host dispatch maxticks)\n (if (<= maxticks 0)\n 0\n (let ((n (flow-drive-host dispatch)))\n (if (= n 0) 0 (+ n (flow-run-host dispatch (- maxticks 1)))))))") (define flow-load-host! diff --git a/lib/flow/scoreboard.json b/lib/flow/scoreboard.json index 8cedc169..5229b185 100644 --- a/lib/flow/scoreboard.json +++ b/lib/flow/scoreboard.json @@ -1,6 +1,6 @@ { - "total": 162, - "passed": 162, + "total": 166, + "passed": 166, "failed": 0, "suites": { "basic": { "passed": 18, "total": 18 }, @@ -13,7 +13,7 @@ "railway": { "passed": 10, "total": 10 }, "integration": { "passed": 10, "total": 10 }, "hygiene": { "passed": 9, "total": 9 }, - "host": { "passed": 11, "total": 11 } + "host": { "passed": 15, "total": 15 } }, "phases": { "phase1": "done", "phase2": "done", "phase3": "done", "phase4": "done", "phase5": "done", "phase6": "done", "phase7": "done", "phase8": "done" } } diff --git a/lib/flow/scoreboard.md b/lib/flow/scoreboard.md index 7ee0da8d..70afaeee 100644 --- a/lib/flow/scoreboard.md +++ b/lib/flow/scoreboard.md @@ -1,6 +1,6 @@ # flow-on-sx Scoreboard -**All tests pass: 162 / 162 across 11 suites. Phases 1-8 complete.** +**All tests pass: 166 / 166 across 11 suites. Phases 1-8 complete.** `bash lib/flow/conformance.sh` @@ -18,7 +18,7 @@ | railway | 10 | Phase 6: `attempt` — fail-value short-circuiting sequence + recover rejoin | | integration | 10 | Phase 7: end-to-end order + onboarding flows composing every phase (suspend, branch, federation, crash recovery, handoff, introspection) | | hygiene | 9 | Phase 5: `flow/gc` (prune terminal flows), `flow/forget` (drop one terminal record) | -| host | 11 | Phase 8: host ABI — `request`/`await-human`/`await-render`, `flow-host-requests` work queue; art-dag-shaped driver loop | +| host | 15 | Phase 8: host ABI — `request`/`await-human`/`await-render`, `flow-host-requests` queue, `flow-run-host` reference driver; art-dag-shaped render→review→publish loop | ## Architecture diff --git a/lib/flow/tests/host.sx b/lib/flow/tests/host.sx index 74ec3b5d..d0e8335a 100644 --- a/lib/flow/tests/host.sx +++ b/lib/flow/tests/host.sx @@ -1,4 +1,4 @@ -;; lib/flow/tests/host.sx — Phase 8: host integration ABI (request/await/host-queue). +;; lib/flow/tests/host.sx — Phase 8: host integration ABI (request/await/host-queue/driver). (define flow-hst-pass 0) (define flow-hst-fail 0) @@ -66,9 +66,7 @@ "(defflow a (lambda (x) (await-render x))) (defflow b (lambda (x) (suspend (quote plain)))) (flow/start a 1) (flow/start b 2) (flow-host-requests)") (list (list 1 "render" 1))) -;; ── the art-dag-shaped host driver loop ───────────────────────── -;; A host: poll requests, dispatch by kind (render -> compute; human -> decide), -;; resume with the result. Drives a render -> human-review -> publish pipeline. +;; ── the art-dag-shaped host driver loop (manual resumes) ──────── (flow-hst-test "host driver: render then human-review then publish" (flow-hst @@ -84,4 +82,25 @@ "(defflow pipeline (sequence (lambda (recipe) (await-render recipe)) (lambda (art) (await-human (list (quote review) art))) (branch (lambda (d) (eq? d (quote approve))) (flow-const (quote published)) (flow-const (fail (quote rejected)))))) (define id (car (cdr (flow/start pipeline 1)))) (flow/resume id (quote artifact)) (failed? (flow/resume id (quote reject)))") true) +;; ── reference driver: host supplies only a dispatch fn ────────── +(flow-hst-test + "flow-drive-host: one tick services every pending request" + (flow-hst + "(flow/start (lambda (x) (await-render x)) 5) (define n (flow-drive-host (lambda (k p) (list (quote done) p)))) (list n (flow/status 1) (flow/result 1))") + (list 1 "done" (list "done" 5))) +(flow-hst-test + "flow-run-host: drives a render -> human pipeline to completion" + (flow-hst + "(defflow pipeline (sequence (lambda (recipe) (await-render recipe)) (lambda (art) (await-human (list (quote review) art))) (branch (lambda (d) (eq? d (quote approve))) (flow-const (quote published)) (flow-const (fail (quote rejected)))))) (define id (car (cdr (flow/start pipeline 99)))) (define serviced (flow-run-host (lambda (kind payload) (if (eq? kind (quote render)) (list (quote art) payload) (quote approve))) 10)) (list serviced (flow/status id) (flow/result id))") + (list 2 "done" "published")) +(flow-hst-test + "flow-run-host: returns 0 when nothing is pending" + (flow-hst "(flow-run-host (lambda (k p) p) 5)") + 0) +(flow-hst-test + "flow-run-host: respects the maxticks bound" + (flow-hst + "(defflow pipe2 (sequence (lambda (r) (await-render r)) (lambda (a) (await-human a)) (lambda (d) d))) (define id (car (cdr (flow/start pipe2 1)))) (define serviced (flow-run-host (lambda (k p) p) 1)) (list serviced (flow/status id))") + (list 1 "suspended")) + (define flow-hst-tests-run! (fn () {:total (+ flow-hst-pass flow-hst-fail) :passed flow-hst-pass :failed flow-hst-fail :fails flow-hst-fails})) diff --git a/plans/flow-on-sx.md b/plans/flow-on-sx.md index 2d336da6..1310998c 100644 --- a/plans/flow-on-sx.md +++ b/plans/flow-on-sx.md @@ -16,7 +16,7 @@ federation extension via fed-sx for remote-node execution. ## Status (rolling) -`bash lib/flow/conformance.sh` → **162/162** (Phases 1-8 complete; host ABI for art-dag) +`bash lib/flow/conformance.sh` → **166/166** (Phases 1-8 complete; host ABI + reference driver) ## Ground rules @@ -175,8 +175,11 @@ points later without reverse-engineering tag shapes. `lib/flow/host.sx`. - [x] `(flow-host-requests)` — the host work queue: `(id kind payload)` for every suspended flow waiting on a host request; `request?`/`request-kind`/ `request-payload` parse a tag. -- [x] `lib/flow/tests/host.sx` — 11 cases incl. the art-dag-shaped driver loop - (render → human-review → publish, driven by polling the queue + resume). +- [x] `(flow-drive-host dispatch)` / `(flow-run-host dispatch maxticks)` — reference + host driver: the host supplies only a `(kind payload) -> answer` dispatch fn; the + loop drains pending requests and resumes until quiescent (bounded). +- [x] `lib/flow/tests/host.sx` — 15 cases incl. the art-dag-shaped driver loop + (render → human-review → publish) run both manually and via `flow-run-host`. - Contract (documented in `host.sx` + README): the host owns IO + persistence; a flow never does IO, it only `request`s; the host performs the effect and feeds the result back via resume (logged, so not re-run on recovery). NOT done here (host