Some checks failed
Test, Build, and Deploy / test-build-deploy (push) Failing after 55s
flow/status id -> done|suspended|cancelled|unknown; flow/result id -> value or error; flow/list -> (id status) per flow; flow/pending -> (id waiting-tag) for suspended flows (operator view of what each awaits). Pure store introspection. Co-Authored-By: Claude Opus 4.8 (1M context) <noreply@anthropic.com>
41 lines
7.4 KiB
Plaintext
41 lines
7.4 KiB
Plaintext
;; lib/flow/store.sx — durable flow store + lifecycle + crash recovery (Phase 3).
|
|
;;
|
|
;; The store maps flow-id -> record (one entry per id, newest wins). A record is:
|
|
;; (name proc input log status payload)
|
|
;; name — registered flow name (symbol) or #f for an anonymous flow
|
|
;; proc — the live flow procedure (#f after export; re-resolved by name)
|
|
;; input — the original start input (plain data)
|
|
;; log — replay log: list of (tag value) entries (plain data)
|
|
;; status — done | suspended | cancelled
|
|
;; payload — result (done) | waiting-tag (suspended) | #f (cancelled)
|
|
;;
|
|
;; A record is SERIALIZABLE once its proc is nulled (flow-store-export): name,
|
|
;; input, and log are plain data. On restart the flow definitions are reloaded
|
|
;; (defflow re-registers names), the store is reimported, and resume re-resolves
|
|
;; the proc by name — no live continuation is ever persisted.
|
|
;;
|
|
;; Lifecycle (all use deterministic replay via flow-drive — see spec.sx):
|
|
;; (flow/start flow input) — raw result if it completes (backward compatible),
|
|
;; else (flow-suspended id tag).
|
|
;; (flow/resume id value) — append (tag value) to the log and re-drive.
|
|
;; (flow/cancel id) — mark cancelled; a later resume is rejected.
|
|
;; Crash recovery:
|
|
;; (flow-store-export) — store as plain data (procs nulled)
|
|
;; (flow-store-import! d) — replace the store from exported data
|
|
;; (flow-resumable-ids) — ids of suspended (resumable) flows
|
|
;; Introspection (Phase 5):
|
|
;; (flow/status id) — done | suspended | cancelled | unknown
|
|
;; (flow/result id) — result if done, else (flow-error reason)
|
|
;; (flow/list) — list of (id status) for every flow
|
|
;; (flow/pending) — list of (id waiting-tag) for suspended flows
|
|
|
|
(define
|
|
flow-store-src
|
|
"(define flow-registry (list))\n (define (flow-register! name proc) (set! flow-registry (cons (list name proc) flow-registry)))\n (define (flow-lookup-in name reg)\n (if (null? reg)\n #f\n (if (eq? (car (car reg)) name) (car (cdr (car reg))) (flow-lookup-in name (cdr reg)))))\n (define (flow-lookup name) (flow-lookup-in name flow-registry))\n (define (flow-name-of proc reg)\n (if (null? reg)\n #f\n (if (eq? (car (cdr (car reg))) proc) (car (car reg)) (flow-name-of proc (cdr reg)))))\n\n (define flow-store (list))\n (define flow-next-id 0)\n (define (flow-store-remove id store)\n (if (null? store)\n (list)\n (if (= (car (car store)) id)\n (flow-store-remove id (cdr store))\n (cons (car store) (flow-store-remove id (cdr store))))))\n (define (flow-store-put! id rec) (set! flow-store (cons (list id rec) (flow-store-remove id flow-store))))\n (define (flow-store-find id store)\n (if (null? store)\n (list)\n (if (= (car (car store)) id)\n (car (cdr (car store)))\n (flow-store-find id (cdr store)))))\n (define (flow-store-get id) (flow-store-find id flow-store))\n\n (define (flow-mk-rec name proc input log status payload)\n (list name proc input log status payload))\n (define (flow-rec-name r) (car r))\n (define (flow-rec-proc r) (car (cdr r)))\n (define (flow-rec-input r) (car (cdr (cdr r))))\n (define (flow-rec-log r) (car (cdr (cdr (cdr r)))))\n (define (flow-rec-status r) (car (cdr (cdr (cdr (cdr r))))))\n (define (flow-rec-payload r) (car (cdr (cdr (cdr (cdr (cdr r)))))))\n (define (flow-rec-resolve rec)\n (let ((byname (flow-lookup (flow-rec-name rec))))\n (if byname byname (flow-rec-proc rec))))\n\n (define (flow-outcome id name proc input log outcome)\n (if (eq? (car outcome) (quote flow-done))\n (begin\n (flow-store-put! id (flow-mk-rec name proc input log (quote done) (car (cdr outcome))))\n (car (cdr outcome)))\n (begin\n (flow-store-put! id (flow-mk-rec name proc input log (quote suspended) (car (cdr outcome))))\n (list (quote flow-suspended) id (car (cdr outcome))))))\n (define (flow/start flow input)\n (set! flow-next-id (+ flow-next-id 1))\n (flow-outcome flow-next-id (flow-name-of flow flow-registry) flow input (list)\n (flow-drive flow input (list))))\n (define (flow/resume id value)\n (let ((rec (flow-store-get id)))\n (if (null? rec)\n (list (quote flow-error) (quote no-such-flow))\n (if (eq? (flow-rec-status rec) (quote suspended))\n (let ((proc (flow-rec-resolve rec)))\n (let ((newlog (cons (list (flow-rec-payload rec) value) (flow-rec-log rec))))\n (flow-outcome id (flow-rec-name rec) proc (flow-rec-input rec) newlog\n (flow-drive proc (flow-rec-input rec) newlog))))\n (list (quote flow-error) (quote not-suspended))))))\n (define (flow/cancel id)\n (let ((rec (flow-store-get id)))\n (if (null? rec)\n (list (quote flow-error) (quote no-such-flow))\n (begin\n (flow-store-put! id\n (flow-mk-rec (flow-rec-name rec) (flow-rec-proc rec) (flow-rec-input rec)\n (flow-rec-log rec) (quote cancelled) #f))\n (list (quote flow-cancelled) id)))))\n\n (define (flow-export-entry entry)\n (let ((rec (car (cdr entry))))\n (list (car entry)\n (flow-mk-rec (flow-rec-name rec) #f (flow-rec-input rec)\n (flow-rec-log rec) (flow-rec-status rec) (flow-rec-payload rec)))))\n (define (flow-export-map store)\n (if (null? store) (list) (cons (flow-export-entry (car store)) (flow-export-map (cdr store)))))\n (define (flow-store-export) (flow-export-map flow-store))\n (define (flow-max-id store m)\n (if (null? store) m (flow-max-id (cdr store) (if (> (car (car store)) m) (car (car store)) m))))\n (define (flow-store-import! data)\n (begin (set! flow-store data) (set! flow-next-id (flow-max-id data 0))))\n (define (flow-collect-resumable store)\n (if (null? store)\n (list)\n (if (eq? (flow-rec-status (car (cdr (car store)))) (quote suspended))\n (cons (car (car store)) (flow-collect-resumable (cdr store)))\n (flow-collect-resumable (cdr store)))))\n (define (flow-resumable-ids) (flow-collect-resumable flow-store))\n\n (define (flow/status id)\n (let ((rec (flow-store-get id)))\n (if (null? rec) (quote unknown) (flow-rec-status rec))))\n (define (flow/result id)\n (let ((rec (flow-store-get id)))\n (if (null? rec)\n (list (quote flow-error) (quote no-such-flow))\n (if (eq? (flow-rec-status rec) (quote done))\n (flow-rec-payload rec)\n (list (quote flow-error) (quote not-done))))))\n (define (flow-list-step store)\n (if (null? store)\n (list)\n (cons (list (car (car store)) (flow-rec-status (car (cdr (car store)))))\n (flow-list-step (cdr store)))))\n (define (flow/list) (flow-list-step flow-store))\n (define (flow-pending-step store)\n (if (null? store)\n (list)\n (if (eq? (flow-rec-status (car (cdr (car store)))) (quote suspended))\n (cons (list (car (car store)) (flow-rec-payload (car (cdr (car store)))))\n (flow-pending-step (cdr store)))\n (flow-pending-step (cdr store)))))\n (define (flow/pending) (flow-pending-step flow-store))")
|
|
|
|
(define
|
|
flow-load-store!
|
|
(fn
|
|
(env)
|
|
(begin (scheme-eval-program (scheme-parse-all flow-store-src) env) env)))
|