;; lib/flow/spec.sx — flow combinators as a Scheme prelude. ;; ;; A flow is a Scheme procedure of one argument: the upstream value. ;; node : input -> output ;; A leaf node ignoring its argument is effectively a thunk. Combinators ;; build composite nodes out of child nodes. The whole flow runs INSIDE the ;; Scheme interpreter. ;; ;; Phase 1 combinators (flow-combinators-src): ;; flow-node / flow-id / flow-const / sequence / parallel / defflow ;; defflow both binds the flow and registers it by name (flow-register!, in ;; store.sx) so it can be re-resolved after a process restart. ;; map-flow (Phase 5): run a node over each item of a list input, join results. ;; flow-while / flow-until (Phase 5): bounded iteration — re-run body, threading ;; the value, while/until pred holds, up to `max` steps (deterministic bound; no ;; unbounded loops in pure SX). ;; ;; Phase 2 combinators (flow-control-src): ;; branch / fail / failed? / fail-reason / try-catch / retry / timeout / tick ;; tap (Phase 5): side-effecting pass-through (returns input unchanged). ;; recover (Phase 5): the fail-VALUE counterpart of try-catch. ;; attempt (Phase 6): railway sequence — thread nodes left-to-right but stop at ;; the first node that returns a (fail ...) value, returning that failure. ;; ;; Phase 3 suspend core (flow-suspend-src): ;; The guest Scheme's call/cc is ESCAPE-ONLY (re-invoking a captured k after it ;; returns hangs the runtime), so suspend/resume CANNOT re-enter a continuation. ;; Instead, durability uses DETERMINISTIC REPLAY: a flow re-runs from the start ;; on each resume; suspend points that have already been resolved replay their ;; logged value, and the first unresolved suspend escapes back to the driver. ;; The entire persisted state is the replay log (plain (tag value) data), which ;; survives process restart — no live continuation is ever serialized. ;; ;; (suspend tag) — if tag is in the replay log, return its value; else escape ;; to the driver as (flow-suspended tag). tags must be unique & deterministic ;; across replays. ALL effects/non-determinism must go through suspend so their ;; results are logged (otherwise they re-run on every replay). ;; (flow-drive flow input log) — run flow with the given replay log; returns ;; (flow-done result) or (flow-suspended tag). (define flow-combinators-src "(define (flow-node f) f)\n (define (flow-id input) input)\n (define (flow-const v) (lambda (input) v))\n (define (flow-seq-step ns v)\n (if (null? ns) v (flow-seq-step (cdr ns) ((car ns) v))))\n (define sequence (lambda ns (lambda (input) (flow-seq-step ns input))))\n (define parallel (lambda ns (lambda (input) (map (lambda (n) (n input)) ns))))\n (define (map-flow node) (lambda (items) (map node items)))\n (define (flow-while-step pred body input n)\n (if (<= n 0)\n input\n (if (pred input) (flow-while-step pred body (body input) (- n 1)) input)))\n (define (flow-while pred body max) (lambda (input) (flow-while-step pred body input max)))\n (define (flow-until-step pred body input n)\n (if (<= n 0)\n input\n (if (pred input) input (flow-until-step pred body (body input) (- n 1)))))\n (define (flow-until pred body max) (lambda (input) (flow-until-step pred body input max)))\n (define-syntax defflow\n (syntax-rules ()\n ((defflow nm body)\n (begin (define nm body) (flow-register! (quote nm) nm)))))") (define flow-control-src "(define (branch pred then else)\n (lambda (input) (if (pred input) (then input) (else input))))\n (define (fail reason) (list (quote flow-fail) reason))\n (define (failed? x) (and (pair? x) (eq? (car x) (quote flow-fail))))\n (define (fail-reason x) (car (cdr x)))\n (define (recover node handler)\n (lambda (input)\n (let ((r (node input)))\n (if (failed? r) (handler (fail-reason r)) r))))\n (define (tap effect)\n (lambda (input) (begin (effect input) input)))\n (define (flow-attempt-step ns v)\n (if (failed? v)\n v\n (if (null? ns) v (flow-attempt-step (cdr ns) ((car ns) v)))))\n (define attempt (lambda ns (lambda (input) (flow-attempt-step ns input))))\n (define (try-catch node handler)\n (lambda (input) (guard (e (#t (handler e))) (node input))))\n (define (flow-retry-step n node input)\n (guard (e (#t (if (<= n 1) (raise e) (flow-retry-step (- n 1) node input))))\n (node input)))\n (define (retry n node) (lambda (input) (flow-retry-step n node input)))\n (define flow-timeout-budget -1)\n (define (tick)\n (if (< flow-timeout-budget 0)\n 0\n (begin\n (set! flow-timeout-budget (- flow-timeout-budget 1))\n (if (< flow-timeout-budget 0)\n (raise (quote flow-timeout))\n flow-timeout-budget))))\n (define (timeout budget node)\n (lambda (input)\n (let ((saved flow-timeout-budget))\n (set! flow-timeout-budget budget)\n (guard (e (#t (begin (set! flow-timeout-budget saved) (raise e))))\n (let ((result (node input)))\n (set! flow-timeout-budget saved)\n result)))))") (define flow-suspend-src "(define flow-replay-log (list))\n (define flow-suspend-k #f)\n (define (flow-log-lookup tag log)\n (if (null? log)\n (list #f #f)\n (if (eq? (car (car log)) tag)\n (list #t (car (cdr (car log))))\n (flow-log-lookup tag (cdr log)))))\n (define (suspend tag)\n (let ((hit (flow-log-lookup tag flow-replay-log)))\n (if (car hit)\n (car (cdr hit))\n (flow-suspend-k (list (quote flow-suspended) tag)))))\n (define (flow-drive flow input log)\n (set! flow-replay-log log)\n (call/cc\n (lambda (k)\n (set! flow-suspend-k k)\n (list (quote flow-done) (flow input)))))") (define flow-load-combinators! (fn (env) (begin (scheme-eval-program (scheme-parse-all flow-combinators-src) env) (scheme-eval-program (scheme-parse-all flow-control-src) env) (scheme-eval-program (scheme-parse-all flow-suspend-src) env) env)))