Merge loops/flow into architecture: flow-on-sx durable DAG workflow engine
166/166 across 11 suites, Phases 1-8. Combinators (sequence/parallel/branch/attempt/ map-flow/while/until + retry/timeout/try-catch/recover/tap/fail-model), durable suspend/resume via deterministic replay (guest call/cc is escape-only), crash recovery, fed-sx distribution (remote-node/failover/replication/handoff), operational API + hygiene, and a host integration ABI + reference driver for art-dag / human-in- the-loop. New lib/flow/** only; imports lib/scheme read-only.
This commit is contained in:
@@ -16,7 +16,7 @@ federation extension via fed-sx for remote-node execution.
|
||||
|
||||
## Status (rolling)
|
||||
|
||||
`bash lib/flow/conformance.sh` → **0/0** (not yet started)
|
||||
`bash lib/flow/conformance.sh` → **166/166** (Phases 1-8 complete; host ABI + reference driver)
|
||||
|
||||
## Ground rules
|
||||
|
||||
@@ -62,47 +62,167 @@ lib/flow/spec.sx lib/flow/runtime.sx lib/flow/store.sx
|
||||
|
||||
## Phase 1 — Declarative DAG + sequential execution
|
||||
|
||||
- [ ] `lib/flow/spec.sx` — `defflow` macro, `sequence` combinator
|
||||
- [ ] node = Scheme thunk; output threads to next node (data flow)
|
||||
- [ ] `parallel` combinator (sequential semantics for now — TRUE parallelism in Phase 3)
|
||||
- [ ] runtime executes a flow synchronously, returns final value
|
||||
- [ ] `lib/flow/api.sx` — `(flow/start name args)` entry point
|
||||
- [ ] `lib/flow/tests/basic.sx` — 15+ cases: linear sequence, nested sequences,
|
||||
data flow between nodes, parallel-with-join
|
||||
- [ ] `lib/flow/scoreboard.{json,md}`
|
||||
- [ ] `lib/flow/conformance.sh`
|
||||
- [x] `lib/flow/spec.sx` — `defflow` macro, `sequence` combinator
|
||||
- [x] node = Scheme procedure of one arg (upstream value threaded in); output
|
||||
threads to next node (data flow). A node ignoring its arg is a thunk.
|
||||
- [x] `parallel` combinator (sequential semantics for now — TRUE parallelism in Phase 3)
|
||||
- [x] runtime executes a flow synchronously, returns final value
|
||||
- [x] `lib/flow/api.sx` — `(flow/start flow input)` entry point
|
||||
- [x] `lib/flow/tests/basic.sx` — 18 cases: single nodes, linear/nested sequence,
|
||||
data flow between nodes, parallel-with-join, publish-shaped flow
|
||||
- [x] `lib/flow/scoreboard.{json,md}`
|
||||
- [x] `lib/flow/conformance.sh`
|
||||
|
||||
## Phase 2 — Control flow + error handling
|
||||
|
||||
- [ ] `cond` combinator — predicate selects branch
|
||||
- [ ] `retry n [backoff]` — re-runs node up to n times on exception
|
||||
- [ ] `timeout ms` — bounds node execution
|
||||
- [ ] `try-catch` — exception handler with reified error
|
||||
- [ ] error model — exceptions vs explicit `(fail :reason ...)` results
|
||||
- [ ] `lib/flow/tests/control.sx` — 25+ cases: each combinator + composition
|
||||
- [x] `cond` combinator — predicate selects branch (named `branch`; `cond` is a
|
||||
Scheme special form). `(branch pred then else)` — 6 tests.
|
||||
- [x] `retry n` — re-runs node up to n attempts on a raised exception; last
|
||||
exception propagates. Only raised exceptions are retried — `(fail ...)` values
|
||||
pass through. 6 tests. (Backoff deferred: no wall clock in pure SX.)
|
||||
- [x] `timeout budget` — bounds node execution via a **cooperative step budget**
|
||||
(deterministic; no scheduler/clock in pure SX). Nodes opt in via `(tick)`;
|
||||
`budget` ticks allowed, the next raises `flow-timeout`. Non-ticking nodes are
|
||||
unbounded; budgets nest. 7 tests.
|
||||
- [x] `try-catch` — exception handler with reified error: `(try-catch node handler)`
|
||||
runs node; on raise, calls `(handler error)` and returns its value. 6 tests.
|
||||
- [x] error model — exceptions vs explicit `(fail reason)` results: `fail`/`failed?`/
|
||||
`fail-reason` produce/inspect failure values that flow downstream as data
|
||||
(distinct from raised exceptions caught by retry/try-catch). 6 tests.
|
||||
- [x] `lib/flow/tests/control.sx` — 31 cases: branch, error model, try-catch,
|
||||
retry, timeout + compositions
|
||||
|
||||
## Phase 3 — Suspend / resume (the showcase)
|
||||
|
||||
- [ ] `(suspend reason)` — `call/cc` captures continuation, returns flow-id to caller
|
||||
- [ ] `lib/flow/store.sx` — serialize flow state (continuation + open vars)
|
||||
- [ ] `(flow/resume id value)` — load continuation, inject value, re-enter
|
||||
- [ ] `(flow/cancel id)` — explicit termination
|
||||
- [ ] crash recovery — on restart, scan store for paused flows, mark resumable
|
||||
- [ ] `lib/flow/tests/suspend.sx` — pause-resume scenarios, cancellation, "restart"
|
||||
scenarios (simulated by re-loading store)
|
||||
- [x] `(suspend tag)` — guest call/cc is ESCAPE-ONLY (re-entry hangs), so resume
|
||||
uses **deterministic replay**: suspend escapes to the driver as `(flow-suspended
|
||||
tag)`; resume re-runs the flow, replaying resolved suspends from a `(tag value)`
|
||||
log. No live continuation is ever serialized — the log is plain data.
|
||||
- [x] `lib/flow/store.sx` — flow store: id→record `(flow input log status payload)`;
|
||||
`flow-drive` runs a flow against a replay log.
|
||||
- [x] `(flow/resume id value)` — append `(tag value)` to the log, re-drive; raw
|
||||
result on completion, `(flow-suspended id tag)` on a further suspend.
|
||||
- [x] `(flow/cancel id)` — mark cancelled; a later resume is rejected (stale replay
|
||||
cannot wake a cancelled flow).
|
||||
- [x] crash recovery — `flow-store-export` (procs nulled → plain data),
|
||||
`flow-store-import!`, `flow-resumable-ids`. Records are name-keyed; resume
|
||||
re-resolves the proc by name (defflow registers names), so a flow survives a
|
||||
wiped store. `tests/recovery.sx`, 8 cases (export/wipe/import, resumable scan,
|
||||
restart-at-every-step, replay-log survival).
|
||||
- [x] `lib/flow/tests/suspend.sx` — 17 cases: start/resume/cancel, multi-step,
|
||||
replay determinism, lifecycle guards, suspend-in-branch
|
||||
- Harness: `flow-run` now reuses one env with a per-test reset (building the full
|
||||
standard env 66× was too slow) — see `api.sx`.
|
||||
|
||||
## Phase 4 — Distributed nodes via fed-sx
|
||||
|
||||
- [ ] `(remote-node addr fn args)` — execute node on a federation peer
|
||||
- [ ] failure semantics — retry on different peer, fall through to local
|
||||
- [ ] persistence across instances — flow state replicates via fed-sx
|
||||
- [ ] handoff — flow started here can resume on a peer if the local instance is down
|
||||
- [ ] `lib/flow/tests/distributed.sx` — federated flow scenarios (mock fed-sx in tests)
|
||||
- [x] `(remote-node addr fn)` — execute a node on a federation peer. Transport is
|
||||
the fed-sx boundary, MOCKED via a peer registry (`flow-peer-register!`); raises
|
||||
`flow-remote-unreachable` / `flow-remote-no-fn`. Composes with sequence, suspend,
|
||||
retry. `tests/distributed.sx`, 7 cases.
|
||||
- [x] failure semantics — `(remote-failover addrs fn local)` tries each peer in
|
||||
order, moves to the next on any raised error, and runs the `local` node if every
|
||||
peer fails. 6 tests.
|
||||
- [x] persistence across instances — `(flow-replicate-to addr)` copies this
|
||||
instance's store (the plain-data export) to a peer's replica slot;
|
||||
`(flow-restore-from addr)` imports it. Same mechanism as crash recovery, across
|
||||
instances.
|
||||
- [x] handoff — a flow started here resumes on a peer after the local instance dies:
|
||||
replicate → wipe local store → restore on peer → `flow/resume`. The replay log
|
||||
(and thus all resolved suspends) survives the move.
|
||||
- [x] `lib/flow/tests/distributed.sx` — 19 cases: remote-node, failover,
|
||||
replication, handoff (including replay-log survival across the move)
|
||||
|
||||
## Phase 5 — Operational API + combinator library
|
||||
|
||||
The four roadmap phases are complete; this phase rounds out the engine into
|
||||
something operators and authors actually use. Accumulation, not a rewrite.
|
||||
|
||||
- [x] introspection API — `flow/status id`, `flow/result id`, `flow/list`,
|
||||
`flow/pending` (operator view of what each suspended flow awaits). 12 tests in
|
||||
`tests/api.sx`.
|
||||
- [x] store hygiene — `flow/gc` drops terminal (done/cancelled) records keeping
|
||||
live suspended flows (returns count); `flow/forget id` drops one terminal record
|
||||
and refuses live flows. Bounds unbounded store growth. 9 tests in `tests/hygiene.sx`.
|
||||
- [x] `tap` — side-effecting pass-through node (logging/metrics) that returns input
|
||||
- [x] `recover` — complement to try-catch for the fail-VALUE channel: run node; if it
|
||||
yields `(fail ...)`, run a recovery node on the reason
|
||||
- [x] `map-flow` — run a flow per item of a list, join results (sequential)
|
||||
- [x] `flow-while` / `flow-until` — bounded iteration: re-run body threading the
|
||||
value while/until pred holds, capped at `max` steps (deterministic bound)
|
||||
- [x] `lib/flow/tests/api.sx` (12) + `lib/flow/tests/combinators.sx` (17)
|
||||
|
||||
## Phase 6 — Railway-oriented composition
|
||||
|
||||
Make the `(fail reason)` value channel compose into real validation/ETL pipelines.
|
||||
|
||||
- [x] `attempt` — like `sequence`, but short-circuits at the first node that returns
|
||||
a `(fail ...)` value, returning that failure (the railway track). Pairs with
|
||||
`recover` for the rejoin.
|
||||
- [x] `lib/flow/tests/railway.sx` — 10 cases: fail short-circuiting, no-run-after-
|
||||
failure, recover rejoin, validation pipeline reporting the failing stage
|
||||
|
||||
## Phase 8 — Host integration ABI (art-dag / human-in-the-loop)
|
||||
|
||||
`suspend` is the seam to the outside world, but a bare tag is an ad-hoc convention.
|
||||
This phase defines a stable request/response contract a host (an art-dag driver, a
|
||||
review UI) codes against — so flow can orchestrate art-dag with human decision
|
||||
points later without reverse-engineering tag shapes. `lib/flow/host.sx`.
|
||||
|
||||
- [x] `(request kind payload)` — suspend with a typed `(flow-request kind payload)`
|
||||
envelope; evaluates to the host's resume value. `await-human`/`await-render`/
|
||||
`await-effect` sugar.
|
||||
- [x] `(flow-host-requests)` — the host work queue: `(id kind payload)` for every
|
||||
suspended flow waiting on a host request; `request?`/`request-kind`/
|
||||
`request-payload` parse a tag.
|
||||
- [x] `(flow-drive-host dispatch)` / `(flow-run-host dispatch maxticks)` — reference
|
||||
host driver: the host supplies only a `(kind payload) -> answer` dispatch fn; the
|
||||
loop drains pending requests and resumes until quiescent (bounded).
|
||||
- [x] `lib/flow/tests/host.sx` — 15 cases incl. the art-dag-shaped driver loop
|
||||
(render → human-review → publish) run both manually and via `flow-run-host`.
|
||||
- Contract (documented in `host.sx` + README): the host owns IO + persistence; a
|
||||
flow never does IO, it only `request`s; the host performs the effect and feeds the
|
||||
result back via resume (logged, so not re-run on recovery). NOT done here (host
|
||||
side, out of `lib/flow` scope): the real Celery/IPFS bridge and a persistent store
|
||||
backend — those live in the art-dag integration, coding against this ABI.
|
||||
|
||||
## Phase 7 — End-to-end integration
|
||||
|
||||
Prove the phases compose: realistic flows exercising attempt + suspend + branch +
|
||||
remote-node + crash-recovery + handoff + introspection together.
|
||||
|
||||
- [x] `lib/flow/tests/integration.sx` — 10 cases: an order-processing flow (validate
|
||||
→ payment suspend → branch → ledger federation) and an onboarding flow, run through
|
||||
the full lifecycle including a simulated crash and a peer handoff mid-flow, plus
|
||||
introspection (`flow/pending`/`status`/`result`) during the flow's life
|
||||
|
||||
## Progress log
|
||||
|
||||
(loop fills this in)
|
||||
- **Phase 1 (combinators + sequential runtime).** Flow built as a Scheme prelude
|
||||
loaded onto `scheme-standard-env`: a flow is a Scheme procedure `input -> output`,
|
||||
so the whole flow runs inside the interpreter (sets up Phase 3 call/cc suspend).
|
||||
Combinators `flow-node`/`flow-id`/`flow-const`/`sequence`/`parallel`/`defflow` in
|
||||
`spec.sx`; `flow/start` + SX helpers (`flow-make-env`/`flow-run`) in `api.sx`.
|
||||
18/18 in `tests/basic.sx`. Substrate constraints found: dotted rest params
|
||||
`(a . rest)` and named `let` are unsupported in `lib/scheme/eval.sx`, so
|
||||
combinators use `(lambda args ...)` variadics + top-level recursion. Scheme
|
||||
strings come back boxed as `{:scm-string "..."}` — unwrap with `(get s :scm-string)`.
|
||||
|
||||
- **Phases 2-4.** Control flow (branch/retry/timeout/try-catch + fail-value error
|
||||
model), then the showcase: durable suspend/resume. Guest call/cc is escape-only
|
||||
(re-entry hangs), so resume uses **deterministic replay** — re-run the flow,
|
||||
replaying resolved suspends from a `(tag value)` log; only plain data persists, so
|
||||
flows survive a wiped store (crash recovery) and a move to another instance
|
||||
(replication + handoff). Phase 4 models the fed-sx boundary with a mock peer
|
||||
registry. Timeout is a cooperative step budget (no wall clock in pure SX). Test
|
||||
harness reuses one env with a per-test reset for speed.
|
||||
|
||||
- **Phases 5-7 + docs.** Operational API (introspection, hygiene), combinator
|
||||
library (tap/recover/map-flow/while/until), railway `attempt`, end-to-end
|
||||
integration suite, and `lib/flow/README.md` (full API reference + replay-semantics
|
||||
contract). **151/151 across 10 suites.** Conformance sx_server timeout raised to
|
||||
540s for the 10-suite run under shared-machine CPU contention.
|
||||
|
||||
## Blockers
|
||||
|
||||
(loop fills this in)
|
||||
(none)
|
||||
|
||||
Reference in New Issue
Block a user