flow: host integration ABI (request/await/host-queue) + 11 tests (Phase 8)
Some checks failed
Test, Build, and Deploy / test-build-deploy (push) Failing after 38s
Some checks failed
Test, Build, and Deploy / test-build-deploy (push) Failing after 38s
The seam for hooking flow to art-dag and human-in-the-loop later. (request kind payload) suspends with a typed (flow-request kind payload) envelope and returns the host's resume value; await-human/await-render sugar. (flow-host-requests) is the host work queue: (id kind payload) for every suspended flow awaiting a host effect; request?/request-kind/request-payload parse a tag. Tests include the art-dag-shaped driver loop (render -> human-review -> publish). Host owns IO+persistence; flow only requests (replay-safe). 162/162 across 11 suites. Co-Authored-By: Claude Opus 4.8 (1M context) <noreply@anthropic.com>
This commit is contained in:
@@ -1,8 +1,9 @@
|
|||||||
;; lib/flow/api.sx — flow runtime entry points.
|
;; lib/flow/api.sx — flow runtime entry points.
|
||||||
;;
|
;;
|
||||||
;; Builds a Scheme env preloaded with the flow combinators (lib/flow/spec.sx),
|
;; Builds a Scheme env preloaded with the flow combinators (lib/flow/spec.sx),
|
||||||
;; the durable store + lifecycle (lib/flow/store.sx), and the fed-sx remote layer
|
;; the durable store + lifecycle (lib/flow/store.sx), the fed-sx remote layer
|
||||||
;; (lib/flow/remote.sx), and provides SX helpers to run flow programs.
|
;; (lib/flow/remote.sx), and the host integration ABI (lib/flow/host.sx), and
|
||||||
|
;; provides SX helpers to run flow programs.
|
||||||
;;
|
;;
|
||||||
;; Scheme-level API (available inside flow programs):
|
;; Scheme-level API (available inside flow programs):
|
||||||
;; (flow/start flow input) — run a flow; raw result if it completes, else
|
;; (flow/start flow input) — run a flow; raw result if it completes, else
|
||||||
@@ -10,10 +11,11 @@
|
|||||||
;; (flow/resume id value) — resume a suspended flow (store.sx)
|
;; (flow/resume id value) — resume a suspended flow (store.sx)
|
||||||
;; (flow/cancel id) — cancel a flow (store.sx)
|
;; (flow/cancel id) — cancel a flow (store.sx)
|
||||||
;; (suspend tag) — suspension point (spec.sx)
|
;; (suspend tag) — suspension point (spec.sx)
|
||||||
|
;; (request kind payload) — host request envelope over suspend (host.sx)
|
||||||
;; (remote-node addr fn) — node executed on a federation peer (remote.sx)
|
;; (remote-node addr fn) — node executed on a federation peer (remote.sx)
|
||||||
;;
|
;;
|
||||||
;; SX-level helpers (for hosts and tests):
|
;; SX-level helpers (for hosts and tests):
|
||||||
;; (flow-make-env) — fresh standard env + combinators + store + remote
|
;; (flow-make-env) — fresh standard env + combinators + store + remote + host
|
||||||
;; (flow-run src) — eval a Scheme program string in a reset shared env
|
;; (flow-run src) — eval a Scheme program string in a reset shared env
|
||||||
;; (flow-run-in env src) — eval a Scheme program string in a given env
|
;; (flow-run-in env src) — eval a Scheme program string in a given env
|
||||||
;;
|
;;
|
||||||
@@ -31,6 +33,7 @@
|
|||||||
(flow-load-combinators! env)
|
(flow-load-combinators! env)
|
||||||
(flow-load-store! env)
|
(flow-load-store! env)
|
||||||
(flow-load-remote! env)
|
(flow-load-remote! env)
|
||||||
|
(flow-load-host! env)
|
||||||
env)))
|
env)))
|
||||||
|
|
||||||
(define
|
(define
|
||||||
|
|||||||
@@ -31,6 +31,7 @@ SUITES=(
|
|||||||
"railway flow-rail-tests-run! lib/flow/tests/railway.sx"
|
"railway flow-rail-tests-run! lib/flow/tests/railway.sx"
|
||||||
"integration flow-int-tests-run! lib/flow/tests/integration.sx"
|
"integration flow-int-tests-run! lib/flow/tests/integration.sx"
|
||||||
"hygiene flow-hyg-tests-run! lib/flow/tests/hygiene.sx"
|
"hygiene flow-hyg-tests-run! lib/flow/tests/hygiene.sx"
|
||||||
|
"host flow-hst-tests-run! lib/flow/tests/host.sx"
|
||||||
)
|
)
|
||||||
|
|
||||||
TMPFILE=$(mktemp); trap "rm -f $TMPFILE" EXIT
|
TMPFILE=$(mktemp); trap "rm -f $TMPFILE" EXIT
|
||||||
@@ -49,6 +50,7 @@ emit_eval () { echo "(epoch $EPOCH)"; echo "(eval \"$1\")"; EPOCH=$((EPOCH+1));
|
|||||||
emit_load "lib/flow/spec.sx"
|
emit_load "lib/flow/spec.sx"
|
||||||
emit_load "lib/flow/store.sx"
|
emit_load "lib/flow/store.sx"
|
||||||
emit_load "lib/flow/remote.sx"
|
emit_load "lib/flow/remote.sx"
|
||||||
|
emit_load "lib/flow/host.sx"
|
||||||
emit_load "lib/flow/api.sx"
|
emit_load "lib/flow/api.sx"
|
||||||
for SUITE in "${SUITES[@]}"; do
|
for SUITE in "${SUITES[@]}"; do
|
||||||
read -r _NAME _RUNNER FILE <<< "$SUITE"
|
read -r _NAME _RUNNER FILE <<< "$SUITE"
|
||||||
|
|||||||
35
lib/flow/host.sx
Normal file
35
lib/flow/host.sx
Normal file
@@ -0,0 +1,35 @@
|
|||||||
|
;; lib/flow/host.sx — the host integration ABI (Phase 8).
|
||||||
|
;;
|
||||||
|
;; `suspend` is flow's seam to the outside world, but a bare (suspend tag) is just a
|
||||||
|
;; signal — every author would invent their own tag shape. This layer defines a
|
||||||
|
;; stable request/response contract so a host (e.g. an art-dag driver, or a human
|
||||||
|
;; review UI) can hook in WITHOUT reverse-engineering ad-hoc tags.
|
||||||
|
;;
|
||||||
|
;; A flow asks the host to do something and waits for the answer:
|
||||||
|
;; (request kind payload) — suspend with a typed envelope (flow-request kind
|
||||||
|
;; payload); evaluates to the host's resume value.
|
||||||
|
;; (await-human prompt) — request kind=human (a decision point)
|
||||||
|
;; (await-render recipe) — request kind=render (e.g. an art-dag job)
|
||||||
|
;; (await-effect kind p) — request of an arbitrary kind
|
||||||
|
;;
|
||||||
|
;; The host drives flows by polling its work queue and resuming:
|
||||||
|
;; (flow-host-requests) — ((id kind payload) ...) for every SUSPENDED flow whose
|
||||||
|
;; waiting tag is a host request. The host dispatches by kind (render -> submit a
|
||||||
|
;; Celery job; human -> show UI), then calls (flow/resume id answer).
|
||||||
|
;; (request? tag) / (request-kind tag) / (request-payload tag) — parse one tag.
|
||||||
|
;;
|
||||||
|
;; Contract: the host owns IO and persistence. flow stays deterministic — a flow
|
||||||
|
;; never performs IO itself, it only `request`s; the host performs the effect and
|
||||||
|
;; feeds the result back via resume (which the replay log records, so the effect is
|
||||||
|
;; not re-run on recovery). Persist with flow-store-export after each transition and
|
||||||
|
;; flow-store-import! on boot.
|
||||||
|
|
||||||
|
(define
|
||||||
|
flow-host-src
|
||||||
|
"(define (request kind payload) (suspend (list (quote flow-request) kind payload)))\n (define (request? tag) (and (pair? tag) (eq? (car tag) (quote flow-request))))\n (define (request-kind tag) (car (cdr tag)))\n (define (request-payload tag) (car (cdr (cdr tag))))\n (define (await-human prompt) (request (quote human) prompt))\n (define (await-render recipe) (request (quote render) recipe))\n (define (await-effect kind payload) (request kind payload))\n (define (flow-host-req-step pend)\n (if (null? pend)\n (list)\n (let ((id (car (car pend))) (tag (car (cdr (car pend)))))\n (if (request? tag)\n (cons (list id (request-kind tag) (request-payload tag))\n (flow-host-req-step (cdr pend)))\n (flow-host-req-step (cdr pend))))))\n (define (flow-host-requests) (flow-host-req-step (flow/pending)))")
|
||||||
|
|
||||||
|
(define
|
||||||
|
flow-load-host!
|
||||||
|
(fn
|
||||||
|
(env)
|
||||||
|
(begin (scheme-eval-program (scheme-parse-all flow-host-src) env) env)))
|
||||||
@@ -1,6 +1,6 @@
|
|||||||
{
|
{
|
||||||
"total": 151,
|
"total": 162,
|
||||||
"passed": 151,
|
"passed": 162,
|
||||||
"failed": 0,
|
"failed": 0,
|
||||||
"suites": {
|
"suites": {
|
||||||
"basic": { "passed": 18, "total": 18 },
|
"basic": { "passed": 18, "total": 18 },
|
||||||
@@ -12,7 +12,8 @@
|
|||||||
"combinators": { "passed": 17, "total": 17 },
|
"combinators": { "passed": 17, "total": 17 },
|
||||||
"railway": { "passed": 10, "total": 10 },
|
"railway": { "passed": 10, "total": 10 },
|
||||||
"integration": { "passed": 10, "total": 10 },
|
"integration": { "passed": 10, "total": 10 },
|
||||||
"hygiene": { "passed": 9, "total": 9 }
|
"hygiene": { "passed": 9, "total": 9 },
|
||||||
|
"host": { "passed": 11, "total": 11 }
|
||||||
},
|
},
|
||||||
"phases": { "phase1": "done", "phase2": "done", "phase3": "done", "phase4": "done", "phase5": "done", "phase6": "done", "phase7": "done" }
|
"phases": { "phase1": "done", "phase2": "done", "phase3": "done", "phase4": "done", "phase5": "done", "phase6": "done", "phase7": "done", "phase8": "done" }
|
||||||
}
|
}
|
||||||
|
|||||||
@@ -1,6 +1,6 @@
|
|||||||
# flow-on-sx Scoreboard
|
# flow-on-sx Scoreboard
|
||||||
|
|
||||||
**All tests pass: 151 / 151 across 10 suites. Phases 1-7 complete.**
|
**All tests pass: 162 / 162 across 11 suites. Phases 1-8 complete.**
|
||||||
|
|
||||||
`bash lib/flow/conformance.sh`
|
`bash lib/flow/conformance.sh`
|
||||||
|
|
||||||
@@ -18,6 +18,7 @@
|
|||||||
| railway | 10 | Phase 6: `attempt` — fail-value short-circuiting sequence + recover rejoin |
|
| railway | 10 | Phase 6: `attempt` — fail-value short-circuiting sequence + recover rejoin |
|
||||||
| integration | 10 | Phase 7: end-to-end order + onboarding flows composing every phase (suspend, branch, federation, crash recovery, handoff, introspection) |
|
| integration | 10 | Phase 7: end-to-end order + onboarding flows composing every phase (suspend, branch, federation, crash recovery, handoff, introspection) |
|
||||||
| hygiene | 9 | Phase 5: `flow/gc` (prune terminal flows), `flow/forget` (drop one terminal record) |
|
| hygiene | 9 | Phase 5: `flow/gc` (prune terminal flows), `flow/forget` (drop one terminal record) |
|
||||||
|
| host | 11 | Phase 8: host ABI — `request`/`await-human`/`await-render`, `flow-host-requests` work queue; art-dag-shaped driver loop |
|
||||||
|
|
||||||
## Architecture
|
## Architecture
|
||||||
|
|
||||||
|
|||||||
87
lib/flow/tests/host.sx
Normal file
87
lib/flow/tests/host.sx
Normal file
@@ -0,0 +1,87 @@
|
|||||||
|
;; lib/flow/tests/host.sx — Phase 8: host integration ABI (request/await/host-queue).
|
||||||
|
|
||||||
|
(define flow-hst-pass 0)
|
||||||
|
(define flow-hst-fail 0)
|
||||||
|
(define flow-hst-fails (list))
|
||||||
|
|
||||||
|
(define
|
||||||
|
flow-hst-test
|
||||||
|
(fn
|
||||||
|
(name actual expected)
|
||||||
|
(if
|
||||||
|
(= actual expected)
|
||||||
|
(set! flow-hst-pass (+ flow-hst-pass 1))
|
||||||
|
(begin
|
||||||
|
(set! flow-hst-fail (+ flow-hst-fail 1))
|
||||||
|
(append! flow-hst-fails {:name name :expected expected :actual actual})))))
|
||||||
|
|
||||||
|
(define flow-hst (fn (src) (flow-run src)))
|
||||||
|
|
||||||
|
;; ── request envelope ────────────────────────────────────────────
|
||||||
|
(flow-hst-test
|
||||||
|
"request: suspends with a typed envelope"
|
||||||
|
(flow-hst
|
||||||
|
"(car (cdr (cdr (flow/start (lambda (x) (request (quote render) x)) 5))))")
|
||||||
|
(list "flow-request" "render" 5))
|
||||||
|
(flow-hst-test
|
||||||
|
"request?: recognizes an envelope"
|
||||||
|
(flow-hst "(request? (list (quote flow-request) (quote human) 1))")
|
||||||
|
true)
|
||||||
|
(flow-hst-test
|
||||||
|
"request?: a plain tag is not a request"
|
||||||
|
(flow-hst "(request? (list (quote review) 1))")
|
||||||
|
false)
|
||||||
|
(flow-hst-test
|
||||||
|
"request-kind / request-payload: parse the envelope"
|
||||||
|
(flow-hst
|
||||||
|
"(define t (list (quote flow-request) (quote render) (list (quote recipe) 7))) (list (request-kind t) (request-payload t))")
|
||||||
|
(list "render" (list "recipe" 7)))
|
||||||
|
|
||||||
|
;; ── named decision points ───────────────────────────────────────
|
||||||
|
(flow-hst-test
|
||||||
|
"await-human: is a request of kind human"
|
||||||
|
(flow-hst
|
||||||
|
"(car (cdr (cdr (flow/start (lambda (x) (await-human x)) (quote approve?)))))")
|
||||||
|
(list "flow-request" "human" "approve?"))
|
||||||
|
(flow-hst-test
|
||||||
|
"await-render: is a request of kind render"
|
||||||
|
(flow-hst
|
||||||
|
"(car (cdr (cdr (flow/start (lambda (x) (await-render x)) (quote recipe)))))")
|
||||||
|
(list "flow-request" "render" "recipe"))
|
||||||
|
(flow-hst-test
|
||||||
|
"request: the host's resume value flows back into the flow"
|
||||||
|
(flow-hst
|
||||||
|
"(defflow f (sequence (lambda (x) (await-render x)) (lambda (art) (list (quote got) art)))) (define id (car (cdr (flow/start f 1)))) (flow/resume id (quote the-artifact))")
|
||||||
|
(list "got" "the-artifact"))
|
||||||
|
|
||||||
|
;; ── host work queue ─────────────────────────────────────────────
|
||||||
|
(flow-hst-test
|
||||||
|
"flow-host-requests: lists (id kind payload) for pending requests"
|
||||||
|
(flow-hst
|
||||||
|
"(flow/start (lambda (x) (await-render x)) 99) (flow-host-requests)")
|
||||||
|
(list (list 1 "render" 99)))
|
||||||
|
(flow-hst-test
|
||||||
|
"flow-host-requests: excludes bare (non-request) suspends"
|
||||||
|
(flow-hst
|
||||||
|
"(defflow a (lambda (x) (await-render x))) (defflow b (lambda (x) (suspend (quote plain)))) (flow/start a 1) (flow/start b 2) (flow-host-requests)")
|
||||||
|
(list (list 1 "render" 1)))
|
||||||
|
|
||||||
|
;; ── the art-dag-shaped host driver loop ─────────────────────────
|
||||||
|
;; A host: poll requests, dispatch by kind (render -> compute; human -> decide),
|
||||||
|
;; resume with the result. Drives a render -> human-review -> publish pipeline.
|
||||||
|
(flow-hst-test
|
||||||
|
"host driver: render then human-review then publish"
|
||||||
|
(flow-hst
|
||||||
|
"(defflow pipeline (sequence (lambda (recipe) (await-render recipe)) (lambda (art) (await-human (list (quote review) art))) (branch (lambda (d) (eq? d (quote approve))) (flow-const (quote published)) (flow-const (fail (quote rejected)))))) (define id (car (cdr (flow/start pipeline 99)))) (define r1 (flow-host-requests)) (flow/resume id (list (quote art) 99)) (define r2 (flow-host-requests)) (flow/resume id (quote approve)) (list r1 r2 (flow/status id) (flow/result id))")
|
||||||
|
(list
|
||||||
|
(list (list 1 "render" 99))
|
||||||
|
(list (list 1 "human" (list "review" (list "art" 99))))
|
||||||
|
"done"
|
||||||
|
"published"))
|
||||||
|
(flow-hst-test
|
||||||
|
"host driver: rejection at the human gate yields a failure"
|
||||||
|
(flow-hst
|
||||||
|
"(defflow pipeline (sequence (lambda (recipe) (await-render recipe)) (lambda (art) (await-human (list (quote review) art))) (branch (lambda (d) (eq? d (quote approve))) (flow-const (quote published)) (flow-const (fail (quote rejected)))))) (define id (car (cdr (flow/start pipeline 1)))) (flow/resume id (quote artifact)) (failed? (flow/resume id (quote reject)))")
|
||||||
|
true)
|
||||||
|
|
||||||
|
(define flow-hst-tests-run! (fn () {:total (+ flow-hst-pass flow-hst-fail) :passed flow-hst-pass :failed flow-hst-fail :fails flow-hst-fails}))
|
||||||
@@ -16,7 +16,7 @@ federation extension via fed-sx for remote-node execution.
|
|||||||
|
|
||||||
## Status (rolling)
|
## Status (rolling)
|
||||||
|
|
||||||
`bash lib/flow/conformance.sh` → **151/151** (Phases 1-7 complete; +store hygiene)
|
`bash lib/flow/conformance.sh` → **162/162** (Phases 1-8 complete; host ABI for art-dag)
|
||||||
|
|
||||||
## Ground rules
|
## Ground rules
|
||||||
|
|
||||||
@@ -162,6 +162,27 @@ Make the `(fail reason)` value channel compose into real validation/ETL pipeline
|
|||||||
- [x] `lib/flow/tests/railway.sx` — 10 cases: fail short-circuiting, no-run-after-
|
- [x] `lib/flow/tests/railway.sx` — 10 cases: fail short-circuiting, no-run-after-
|
||||||
failure, recover rejoin, validation pipeline reporting the failing stage
|
failure, recover rejoin, validation pipeline reporting the failing stage
|
||||||
|
|
||||||
|
## Phase 8 — Host integration ABI (art-dag / human-in-the-loop)
|
||||||
|
|
||||||
|
`suspend` is the seam to the outside world, but a bare tag is an ad-hoc convention.
|
||||||
|
This phase defines a stable request/response contract a host (an art-dag driver, a
|
||||||
|
review UI) codes against — so flow can orchestrate art-dag with human decision
|
||||||
|
points later without reverse-engineering tag shapes. `lib/flow/host.sx`.
|
||||||
|
|
||||||
|
- [x] `(request kind payload)` — suspend with a typed `(flow-request kind payload)`
|
||||||
|
envelope; evaluates to the host's resume value. `await-human`/`await-render`/
|
||||||
|
`await-effect` sugar.
|
||||||
|
- [x] `(flow-host-requests)` — the host work queue: `(id kind payload)` for every
|
||||||
|
suspended flow waiting on a host request; `request?`/`request-kind`/
|
||||||
|
`request-payload` parse a tag.
|
||||||
|
- [x] `lib/flow/tests/host.sx` — 11 cases incl. the art-dag-shaped driver loop
|
||||||
|
(render → human-review → publish, driven by polling the queue + resume).
|
||||||
|
- Contract (documented in `host.sx` + README): the host owns IO + persistence; a
|
||||||
|
flow never does IO, it only `request`s; the host performs the effect and feeds the
|
||||||
|
result back via resume (logged, so not re-run on recovery). NOT done here (host
|
||||||
|
side, out of `lib/flow` scope): the real Celery/IPFS bridge and a persistent store
|
||||||
|
backend — those live in the art-dag integration, coding against this ABI.
|
||||||
|
|
||||||
## Phase 7 — End-to-end integration
|
## Phase 7 — End-to-end integration
|
||||||
|
|
||||||
Prove the phases compose: realistic flows exercising attempt + suspend + branch +
|
Prove the phases compose: realistic flows exercising attempt + suspend + branch +
|
||||||
|
|||||||
Reference in New Issue
Block a user