;; lib/flow/spec.sx — flow combinators as a Scheme prelude. ;; ;; A flow is a Scheme procedure of one argument: the upstream value. ;; node : input -> output ;; A leaf node ignoring its argument is effectively a thunk. Combinators ;; build composite nodes out of child nodes. The whole flow runs INSIDE the ;; Scheme interpreter. ;; ;; Phase 1 combinators (flow-combinators-src): ;; flow-node / flow-id / flow-const / sequence / parallel / defflow ;; defflow both binds the flow and registers it by name (flow-register!, in ;; store.sx) so it can be re-resolved after a process restart. ;; ;; Phase 2 combinators (flow-control-src): ;; branch / fail / failed? / fail-reason / try-catch / retry / timeout / tick ;; ;; Phase 3 suspend core (flow-suspend-src): ;; The guest Scheme's call/cc is ESCAPE-ONLY (re-invoking a captured k after it ;; returns hangs the runtime), so suspend/resume CANNOT re-enter a continuation. ;; Instead, durability uses DETERMINISTIC REPLAY: a flow re-runs from the start ;; on each resume; suspend points that have already been resolved replay their ;; logged value, and the first unresolved suspend escapes back to the driver. ;; The entire persisted state is the replay log (plain (tag value) data), which ;; survives process restart — no live continuation is ever serialized. ;; ;; (suspend tag) — if tag is in the replay log, return its value; else escape ;; to the driver as (flow-suspended tag). tags must be unique & deterministic ;; across replays. ALL effects/non-determinism must go through suspend so their ;; results are logged (otherwise they re-run on every replay). ;; (flow-drive flow input log) — run flow with the given replay log; returns ;; (flow-done result) or (flow-suspended tag). (define flow-combinators-src "(define (flow-node f) f)\n (define (flow-id input) input)\n (define (flow-const v) (lambda (input) v))\n (define (flow-seq-step ns v)\n (if (null? ns) v (flow-seq-step (cdr ns) ((car ns) v))))\n (define sequence (lambda ns (lambda (input) (flow-seq-step ns input))))\n (define parallel (lambda ns (lambda (input) (map (lambda (n) (n input)) ns))))\n (define-syntax defflow\n (syntax-rules ()\n ((defflow nm body)\n (begin (define nm body) (flow-register! (quote nm) nm)))))") (define flow-control-src "(define (branch pred then else)\n (lambda (input) (if (pred input) (then input) (else input))))\n (define (fail reason) (list (quote flow-fail) reason))\n (define (failed? x) (and (pair? x) (eq? (car x) (quote flow-fail))))\n (define (fail-reason x) (car (cdr x)))\n (define (try-catch node handler)\n (lambda (input) (guard (e (#t (handler e))) (node input))))\n (define (flow-retry-step n node input)\n (guard (e (#t (if (<= n 1) (raise e) (flow-retry-step (- n 1) node input))))\n (node input)))\n (define (retry n node) (lambda (input) (flow-retry-step n node input)))\n (define flow-timeout-budget -1)\n (define (tick)\n (if (< flow-timeout-budget 0)\n 0\n (begin\n (set! flow-timeout-budget (- flow-timeout-budget 1))\n (if (< flow-timeout-budget 0)\n (raise (quote flow-timeout))\n flow-timeout-budget))))\n (define (timeout budget node)\n (lambda (input)\n (let ((saved flow-timeout-budget))\n (set! flow-timeout-budget budget)\n (guard (e (#t (begin (set! flow-timeout-budget saved) (raise e))))\n (let ((result (node input)))\n (set! flow-timeout-budget saved)\n result)))))") (define flow-suspend-src "(define flow-replay-log (list))\n (define flow-suspend-k #f)\n (define (flow-log-lookup tag log)\n (if (null? log)\n (list #f #f)\n (if (eq? (car (car log)) tag)\n (list #t (car (cdr (car log))))\n (flow-log-lookup tag (cdr log)))))\n (define (suspend tag)\n (let ((hit (flow-log-lookup tag flow-replay-log)))\n (if (car hit)\n (car (cdr hit))\n (flow-suspend-k (list (quote flow-suspended) tag)))))\n (define (flow-drive flow input log)\n (set! flow-replay-log log)\n (call/cc\n (lambda (k)\n (set! flow-suspend-k k)\n (list (quote flow-done) (flow input)))))") (define flow-load-combinators! (fn (env) (begin (scheme-eval-program (scheme-parse-all flow-combinators-src) env) (scheme-eval-program (scheme-parse-all flow-control-src) env) (scheme-eval-program (scheme-parse-all flow-suspend-src) env) env)))