Files
rose-ash/lib/flow/store.sx
giles 97c7623743
Some checks failed
Test, Build, and Deploy / test-build-deploy (push) Failing after 58s
flow: crash recovery — store export/import + resumable scan + 8 tests (Phase 3 complete)
Records are name-keyed (defflow registers names); flow-store-export nulls live
procs to plain data, flow-store-import! restores, flow-resumable-ids scans for
paused flows. Resume re-resolves the proc by name, so a flow survives a wiped
store (simulated restart). The whole durable model persists only plain data.

Co-Authored-By: Claude Opus 4.8 (1M context) <noreply@anthropic.com>
2026-06-06 17:25:47 +00:00

36 lines
6.0 KiB
Plaintext

;; lib/flow/store.sx — durable flow store + lifecycle + crash recovery (Phase 3).
;;
;; The store maps flow-id -> record (one entry per id, newest wins). A record is:
;; (name proc input log status payload)
;; name — registered flow name (symbol) or #f for an anonymous flow
;; proc — the live flow procedure (#f after export; re-resolved by name)
;; input — the original start input (plain data)
;; log — replay log: list of (tag value) entries (plain data)
;; status — done | suspended | cancelled
;; payload — result (done) | waiting-tag (suspended) | #f (cancelled)
;;
;; A record is SERIALIZABLE once its proc is nulled (flow-store-export): name,
;; input, and log are plain data. On restart the flow definitions are reloaded
;; (defflow re-registers names), the store is reimported, and resume re-resolves
;; the proc by name — no live continuation is ever persisted.
;;
;; Lifecycle (all use deterministic replay via flow-drive — see spec.sx):
;; (flow/start flow input) — raw result if it completes (backward compatible),
;; else (flow-suspended id tag).
;; (flow/resume id value) — append (tag value) to the log and re-drive.
;; (flow/cancel id) — mark cancelled; a later resume is rejected.
;; Crash recovery:
;; (flow-store-export) — store as plain data (procs nulled)
;; (flow-store-import! d) — replace the store from exported data
;; (flow-resumable-ids) — ids of suspended (resumable) flows
(define
flow-store-src
"(define flow-registry (list))\n (define (flow-register! name proc) (set! flow-registry (cons (list name proc) flow-registry)))\n (define (flow-lookup-in name reg)\n (if (null? reg)\n #f\n (if (eq? (car (car reg)) name) (car (cdr (car reg))) (flow-lookup-in name (cdr reg)))))\n (define (flow-lookup name) (flow-lookup-in name flow-registry))\n (define (flow-name-of proc reg)\n (if (null? reg)\n #f\n (if (eq? (car (cdr (car reg))) proc) (car (car reg)) (flow-name-of proc (cdr reg)))))\n\n (define flow-store (list))\n (define flow-next-id 0)\n (define (flow-store-remove id store)\n (if (null? store)\n (list)\n (if (= (car (car store)) id)\n (flow-store-remove id (cdr store))\n (cons (car store) (flow-store-remove id (cdr store))))))\n (define (flow-store-put! id rec) (set! flow-store (cons (list id rec) (flow-store-remove id flow-store))))\n (define (flow-store-find id store)\n (if (null? store)\n (list)\n (if (= (car (car store)) id)\n (car (cdr (car store)))\n (flow-store-find id (cdr store)))))\n (define (flow-store-get id) (flow-store-find id flow-store))\n\n (define (flow-mk-rec name proc input log status payload)\n (list name proc input log status payload))\n (define (flow-rec-name r) (car r))\n (define (flow-rec-proc r) (car (cdr r)))\n (define (flow-rec-input r) (car (cdr (cdr r))))\n (define (flow-rec-log r) (car (cdr (cdr (cdr r)))))\n (define (flow-rec-status r) (car (cdr (cdr (cdr (cdr r))))))\n (define (flow-rec-payload r) (car (cdr (cdr (cdr (cdr (cdr r)))))))\n (define (flow-rec-resolve rec)\n (let ((byname (flow-lookup (flow-rec-name rec))))\n (if byname byname (flow-rec-proc rec))))\n\n (define (flow-outcome id name proc input log outcome)\n (if (eq? (car outcome) (quote flow-done))\n (begin\n (flow-store-put! id (flow-mk-rec name proc input log (quote done) (car (cdr outcome))))\n (car (cdr outcome)))\n (begin\n (flow-store-put! id (flow-mk-rec name proc input log (quote suspended) (car (cdr outcome))))\n (list (quote flow-suspended) id (car (cdr outcome))))))\n (define (flow/start flow input)\n (set! flow-next-id (+ flow-next-id 1))\n (flow-outcome flow-next-id (flow-name-of flow flow-registry) flow input (list)\n (flow-drive flow input (list))))\n (define (flow/resume id value)\n (let ((rec (flow-store-get id)))\n (if (null? rec)\n (list (quote flow-error) (quote no-such-flow))\n (if (eq? (flow-rec-status rec) (quote suspended))\n (let ((proc (flow-rec-resolve rec)))\n (let ((newlog (cons (list (flow-rec-payload rec) value) (flow-rec-log rec))))\n (flow-outcome id (flow-rec-name rec) proc (flow-rec-input rec) newlog\n (flow-drive proc (flow-rec-input rec) newlog))))\n (list (quote flow-error) (quote not-suspended))))))\n (define (flow/cancel id)\n (let ((rec (flow-store-get id)))\n (if (null? rec)\n (list (quote flow-error) (quote no-such-flow))\n (begin\n (flow-store-put! id\n (flow-mk-rec (flow-rec-name rec) (flow-rec-proc rec) (flow-rec-input rec)\n (flow-rec-log rec) (quote cancelled) #f))\n (list (quote flow-cancelled) id)))))\n\n (define (flow-export-entry entry)\n (let ((rec (car (cdr entry))))\n (list (car entry)\n (flow-mk-rec (flow-rec-name rec) #f (flow-rec-input rec)\n (flow-rec-log rec) (flow-rec-status rec) (flow-rec-payload rec)))))\n (define (flow-export-map store)\n (if (null? store) (list) (cons (flow-export-entry (car store)) (flow-export-map (cdr store)))))\n (define (flow-store-export) (flow-export-map flow-store))\n (define (flow-max-id store m)\n (if (null? store) m (flow-max-id (cdr store) (if (> (car (car store)) m) (car (car store)) m))))\n (define (flow-store-import! data)\n (begin (set! flow-store data) (set! flow-next-id (flow-max-id data 0))))\n (define (flow-collect-resumable store)\n (if (null? store)\n (list)\n (if (eq? (flow-rec-status (car (cdr (car store)))) (quote suspended))\n (cons (car (car store)) (flow-collect-resumable (cdr store)))\n (flow-collect-resumable (cdr store)))))\n (define (flow-resumable-ids) (flow-collect-resumable flow-store))")
(define
flow-load-store!
(fn
(env)
(begin (scheme-eval-program (scheme-parse-all flow-store-src) env) env)))