diff --git a/lib/flow/conformance.sh b/lib/flow/conformance.sh index 5d0e7b6b..787c9c9e 100755 --- a/lib/flow/conformance.sh +++ b/lib/flow/conformance.sh @@ -30,6 +30,7 @@ SUITES=( "combinators flow-cmb-tests-run! lib/flow/tests/combinators.sx" "railway flow-rail-tests-run! lib/flow/tests/railway.sx" "integration flow-int-tests-run! lib/flow/tests/integration.sx" + "hygiene flow-hyg-tests-run! lib/flow/tests/hygiene.sx" ) TMPFILE=$(mktemp); trap "rm -f $TMPFILE" EXIT @@ -56,7 +57,7 @@ emit_eval () { echo "(epoch $EPOCH)"; echo "(eval \"$1\")"; EPOCH=$((EPOCH+1)); done } > "$TMPFILE" -OUTPUT=$(timeout 300 "$SX_SERVER" < "$TMPFILE" 2>&1 || true) +OUTPUT=$(timeout 540 "$SX_SERVER" < "$TMPFILE" 2>&1 || true) TOTAL_PASS=0 TOTAL_FAIL=0 diff --git a/lib/flow/scoreboard.json b/lib/flow/scoreboard.json index b8cff6b9..012c8bfb 100644 --- a/lib/flow/scoreboard.json +++ b/lib/flow/scoreboard.json @@ -1,6 +1,6 @@ { - "total": 142, - "passed": 142, + "total": 151, + "passed": 151, "failed": 0, "suites": { "basic": { "passed": 18, "total": 18 }, @@ -11,7 +11,8 @@ "api": { "passed": 12, "total": 12 }, "combinators": { "passed": 17, "total": 17 }, "railway": { "passed": 10, "total": 10 }, - "integration": { "passed": 10, "total": 10 } + "integration": { "passed": 10, "total": 10 }, + "hygiene": { "passed": 9, "total": 9 } }, "phases": { "phase1": "done", "phase2": "done", "phase3": "done", "phase4": "done", "phase5": "done", "phase6": "done", "phase7": "done" } } diff --git a/lib/flow/scoreboard.md b/lib/flow/scoreboard.md index 0f539a36..41eaf45d 100644 --- a/lib/flow/scoreboard.md +++ b/lib/flow/scoreboard.md @@ -1,6 +1,6 @@ # flow-on-sx Scoreboard -**All tests pass: 142 / 142 across 9 suites. Phases 1-7 complete.** +**All tests pass: 151 / 151 across 10 suites. Phases 1-7 complete.** `bash lib/flow/conformance.sh` @@ -17,6 +17,7 @@ | combinators | 17 | Phase 5: `tap`, `recover` (fail-value), `map-flow` fan-over-list, `flow-while`/`flow-until` bounded iteration | | railway | 10 | Phase 6: `attempt` — fail-value short-circuiting sequence + recover rejoin | | integration | 10 | Phase 7: end-to-end order + onboarding flows composing every phase (suspend, branch, federation, crash recovery, handoff, introspection) | +| hygiene | 9 | Phase 5: `flow/gc` (prune terminal flows), `flow/forget` (drop one terminal record) | ## Architecture diff --git a/lib/flow/store.sx b/lib/flow/store.sx index b88e6c5b..abd75360 100644 --- a/lib/flow/store.sx +++ b/lib/flow/store.sx @@ -28,10 +28,15 @@ ;; (flow/result id) — result if done, else (flow-error reason) ;; (flow/list) — list of (id status) for every flow ;; (flow/pending) — list of (id waiting-tag) for suspended flows +;; Store hygiene (Phase 5): +;; (flow/gc) — drop all terminal (done/cancelled) records, keeping +;; suspended (live) flows; returns the count removed +;; (flow/forget id) — drop one TERMINAL record (#t); refuses to forget a +;; still-suspended flow or an unknown id (#f) (define flow-store-src - "(define flow-registry (list))\n (define (flow-register! name proc) (set! flow-registry (cons (list name proc) flow-registry)))\n (define (flow-lookup-in name reg)\n (if (null? reg)\n #f\n (if (eq? (car (car reg)) name) (car (cdr (car reg))) (flow-lookup-in name (cdr reg)))))\n (define (flow-lookup name) (flow-lookup-in name flow-registry))\n (define (flow-name-of proc reg)\n (if (null? reg)\n #f\n (if (eq? (car (cdr (car reg))) proc) (car (car reg)) (flow-name-of proc (cdr reg)))))\n\n (define flow-store (list))\n (define flow-next-id 0)\n (define (flow-store-remove id store)\n (if (null? store)\n (list)\n (if (= (car (car store)) id)\n (flow-store-remove id (cdr store))\n (cons (car store) (flow-store-remove id (cdr store))))))\n (define (flow-store-put! id rec) (set! flow-store (cons (list id rec) (flow-store-remove id flow-store))))\n (define (flow-store-find id store)\n (if (null? store)\n (list)\n (if (= (car (car store)) id)\n (car (cdr (car store)))\n (flow-store-find id (cdr store)))))\n (define (flow-store-get id) (flow-store-find id flow-store))\n\n (define (flow-mk-rec name proc input log status payload)\n (list name proc input log status payload))\n (define (flow-rec-name r) (car r))\n (define (flow-rec-proc r) (car (cdr r)))\n (define (flow-rec-input r) (car (cdr (cdr r))))\n (define (flow-rec-log r) (car (cdr (cdr (cdr r)))))\n (define (flow-rec-status r) (car (cdr (cdr (cdr (cdr r))))))\n (define (flow-rec-payload r) (car (cdr (cdr (cdr (cdr (cdr r)))))))\n (define (flow-rec-resolve rec)\n (let ((byname (flow-lookup (flow-rec-name rec))))\n (if byname byname (flow-rec-proc rec))))\n\n (define (flow-outcome id name proc input log outcome)\n (if (eq? (car outcome) (quote flow-done))\n (begin\n (flow-store-put! id (flow-mk-rec name proc input log (quote done) (car (cdr outcome))))\n (car (cdr outcome)))\n (begin\n (flow-store-put! id (flow-mk-rec name proc input log (quote suspended) (car (cdr outcome))))\n (list (quote flow-suspended) id (car (cdr outcome))))))\n (define (flow/start flow input)\n (set! flow-next-id (+ flow-next-id 1))\n (flow-outcome flow-next-id (flow-name-of flow flow-registry) flow input (list)\n (flow-drive flow input (list))))\n (define (flow/resume id value)\n (let ((rec (flow-store-get id)))\n (if (null? rec)\n (list (quote flow-error) (quote no-such-flow))\n (if (eq? (flow-rec-status rec) (quote suspended))\n (let ((proc (flow-rec-resolve rec)))\n (let ((newlog (cons (list (flow-rec-payload rec) value) (flow-rec-log rec))))\n (flow-outcome id (flow-rec-name rec) proc (flow-rec-input rec) newlog\n (flow-drive proc (flow-rec-input rec) newlog))))\n (list (quote flow-error) (quote not-suspended))))))\n (define (flow/cancel id)\n (let ((rec (flow-store-get id)))\n (if (null? rec)\n (list (quote flow-error) (quote no-such-flow))\n (begin\n (flow-store-put! id\n (flow-mk-rec (flow-rec-name rec) (flow-rec-proc rec) (flow-rec-input rec)\n (flow-rec-log rec) (quote cancelled) #f))\n (list (quote flow-cancelled) id)))))\n\n (define (flow-export-entry entry)\n (let ((rec (car (cdr entry))))\n (list (car entry)\n (flow-mk-rec (flow-rec-name rec) #f (flow-rec-input rec)\n (flow-rec-log rec) (flow-rec-status rec) (flow-rec-payload rec)))))\n (define (flow-export-map store)\n (if (null? store) (list) (cons (flow-export-entry (car store)) (flow-export-map (cdr store)))))\n (define (flow-store-export) (flow-export-map flow-store))\n (define (flow-max-id store m)\n (if (null? store) m (flow-max-id (cdr store) (if (> (car (car store)) m) (car (car store)) m))))\n (define (flow-store-import! data)\n (begin (set! flow-store data) (set! flow-next-id (flow-max-id data 0))))\n (define (flow-collect-resumable store)\n (if (null? store)\n (list)\n (if (eq? (flow-rec-status (car (cdr (car store)))) (quote suspended))\n (cons (car (car store)) (flow-collect-resumable (cdr store)))\n (flow-collect-resumable (cdr store)))))\n (define (flow-resumable-ids) (flow-collect-resumable flow-store))\n\n (define (flow/status id)\n (let ((rec (flow-store-get id)))\n (if (null? rec) (quote unknown) (flow-rec-status rec))))\n (define (flow/result id)\n (let ((rec (flow-store-get id)))\n (if (null? rec)\n (list (quote flow-error) (quote no-such-flow))\n (if (eq? (flow-rec-status rec) (quote done))\n (flow-rec-payload rec)\n (list (quote flow-error) (quote not-done))))))\n (define (flow-list-step store)\n (if (null? store)\n (list)\n (cons (list (car (car store)) (flow-rec-status (car (cdr (car store)))))\n (flow-list-step (cdr store)))))\n (define (flow/list) (flow-list-step flow-store))\n (define (flow-pending-step store)\n (if (null? store)\n (list)\n (if (eq? (flow-rec-status (car (cdr (car store)))) (quote suspended))\n (cons (list (car (car store)) (flow-rec-payload (car (cdr (car store)))))\n (flow-pending-step (cdr store)))\n (flow-pending-step (cdr store)))))\n (define (flow/pending) (flow-pending-step flow-store))") + "(define flow-registry (list))\n (define (flow-register! name proc) (set! flow-registry (cons (list name proc) flow-registry)))\n (define (flow-lookup-in name reg)\n (if (null? reg)\n #f\n (if (eq? (car (car reg)) name) (car (cdr (car reg))) (flow-lookup-in name (cdr reg)))))\n (define (flow-lookup name) (flow-lookup-in name flow-registry))\n (define (flow-name-of proc reg)\n (if (null? reg)\n #f\n (if (eq? (car (cdr (car reg))) proc) (car (car reg)) (flow-name-of proc (cdr reg)))))\n\n (define flow-store (list))\n (define flow-next-id 0)\n (define (flow-store-remove id store)\n (if (null? store)\n (list)\n (if (= (car (car store)) id)\n (flow-store-remove id (cdr store))\n (cons (car store) (flow-store-remove id (cdr store))))))\n (define (flow-store-put! id rec) (set! flow-store (cons (list id rec) (flow-store-remove id flow-store))))\n (define (flow-store-find id store)\n (if (null? store)\n (list)\n (if (= (car (car store)) id)\n (car (cdr (car store)))\n (flow-store-find id (cdr store)))))\n (define (flow-store-get id) (flow-store-find id flow-store))\n\n (define (flow-mk-rec name proc input log status payload)\n (list name proc input log status payload))\n (define (flow-rec-name r) (car r))\n (define (flow-rec-proc r) (car (cdr r)))\n (define (flow-rec-input r) (car (cdr (cdr r))))\n (define (flow-rec-log r) (car (cdr (cdr (cdr r)))))\n (define (flow-rec-status r) (car (cdr (cdr (cdr (cdr r))))))\n (define (flow-rec-payload r) (car (cdr (cdr (cdr (cdr (cdr r)))))))\n (define (flow-rec-resolve rec)\n (let ((byname (flow-lookup (flow-rec-name rec))))\n (if byname byname (flow-rec-proc rec))))\n\n (define (flow-outcome id name proc input log outcome)\n (if (eq? (car outcome) (quote flow-done))\n (begin\n (flow-store-put! id (flow-mk-rec name proc input log (quote done) (car (cdr outcome))))\n (car (cdr outcome)))\n (begin\n (flow-store-put! id (flow-mk-rec name proc input log (quote suspended) (car (cdr outcome))))\n (list (quote flow-suspended) id (car (cdr outcome))))))\n (define (flow/start flow input)\n (set! flow-next-id (+ flow-next-id 1))\n (flow-outcome flow-next-id (flow-name-of flow flow-registry) flow input (list)\n (flow-drive flow input (list))))\n (define (flow/resume id value)\n (let ((rec (flow-store-get id)))\n (if (null? rec)\n (list (quote flow-error) (quote no-such-flow))\n (if (eq? (flow-rec-status rec) (quote suspended))\n (let ((proc (flow-rec-resolve rec)))\n (let ((newlog (cons (list (flow-rec-payload rec) value) (flow-rec-log rec))))\n (flow-outcome id (flow-rec-name rec) proc (flow-rec-input rec) newlog\n (flow-drive proc (flow-rec-input rec) newlog))))\n (list (quote flow-error) (quote not-suspended))))))\n (define (flow/cancel id)\n (let ((rec (flow-store-get id)))\n (if (null? rec)\n (list (quote flow-error) (quote no-such-flow))\n (begin\n (flow-store-put! id\n (flow-mk-rec (flow-rec-name rec) (flow-rec-proc rec) (flow-rec-input rec)\n (flow-rec-log rec) (quote cancelled) #f))\n (list (quote flow-cancelled) id)))))\n\n (define (flow-export-entry entry)\n (let ((rec (car (cdr entry))))\n (list (car entry)\n (flow-mk-rec (flow-rec-name rec) #f (flow-rec-input rec)\n (flow-rec-log rec) (flow-rec-status rec) (flow-rec-payload rec)))))\n (define (flow-export-map store)\n (if (null? store) (list) (cons (flow-export-entry (car store)) (flow-export-map (cdr store)))))\n (define (flow-store-export) (flow-export-map flow-store))\n (define (flow-max-id store m)\n (if (null? store) m (flow-max-id (cdr store) (if (> (car (car store)) m) (car (car store)) m))))\n (define (flow-store-import! data)\n (begin (set! flow-store data) (set! flow-next-id (flow-max-id data 0))))\n (define (flow-collect-resumable store)\n (if (null? store)\n (list)\n (if (eq? (flow-rec-status (car (cdr (car store)))) (quote suspended))\n (cons (car (car store)) (flow-collect-resumable (cdr store)))\n (flow-collect-resumable (cdr store)))))\n (define (flow-resumable-ids) (flow-collect-resumable flow-store))\n\n (define (flow/status id)\n (let ((rec (flow-store-get id)))\n (if (null? rec) (quote unknown) (flow-rec-status rec))))\n (define (flow/result id)\n (let ((rec (flow-store-get id)))\n (if (null? rec)\n (list (quote flow-error) (quote no-such-flow))\n (if (eq? (flow-rec-status rec) (quote done))\n (flow-rec-payload rec)\n (list (quote flow-error) (quote not-done))))))\n (define (flow-list-step store)\n (if (null? store)\n (list)\n (cons (list (car (car store)) (flow-rec-status (car (cdr (car store)))))\n (flow-list-step (cdr store)))))\n (define (flow/list) (flow-list-step flow-store))\n (define (flow-pending-step store)\n (if (null? store)\n (list)\n (if (eq? (flow-rec-status (car (cdr (car store)))) (quote suspended))\n (cons (list (car (car store)) (flow-rec-payload (car (cdr (car store)))))\n (flow-pending-step (cdr store)))\n (flow-pending-step (cdr store)))))\n (define (flow/pending) (flow-pending-step flow-store))\n\n (define (flow-store-count store) (if (null? store) 0 (+ 1 (flow-store-count (cdr store)))))\n (define (flow-gc-keep store)\n (if (null? store)\n (list)\n (if (eq? (flow-rec-status (car (cdr (car store)))) (quote suspended))\n (cons (car store) (flow-gc-keep (cdr store)))\n (flow-gc-keep (cdr store)))))\n (define (flow/gc)\n (let ((before (flow-store-count flow-store)))\n (set! flow-store (flow-gc-keep flow-store))\n (- before (flow-store-count flow-store))))\n (define (flow/forget id)\n (let ((rec (flow-store-get id)))\n (if (null? rec)\n #f\n (if (eq? (flow-rec-status rec) (quote suspended))\n #f\n (begin (set! flow-store (flow-store-remove id flow-store)) #t)))))") (define flow-load-store! diff --git a/lib/flow/tests/hygiene.sx b/lib/flow/tests/hygiene.sx new file mode 100644 index 00000000..a53122f8 --- /dev/null +++ b/lib/flow/tests/hygiene.sx @@ -0,0 +1,67 @@ +;; lib/flow/tests/hygiene.sx — Phase 5: store hygiene (flow/gc, flow/forget). + +(define flow-hyg-pass 0) +(define flow-hyg-fail 0) +(define flow-hyg-fails (list)) + +(define + flow-hyg-test + (fn + (name actual expected) + (if + (= actual expected) + (set! flow-hyg-pass (+ flow-hyg-pass 1)) + (begin + (set! flow-hyg-fail (+ flow-hyg-fail 1)) + (append! flow-hyg-fails {:name name :expected expected :actual actual}))))) + +(define flow-h (fn (src) (flow-run src))) + +;; ── flow/gc ───────────────────────────────────────────────────── +(flow-hyg-test + "gc: empty store removes nothing" + (flow-h "(flow/gc)") + 0) +(flow-hyg-test + "gc: removes a done flow, keeps a suspended one" + (flow-h + "(defflow w (lambda (x) (suspend (quote q)))) (flow/start w 0) (flow/start (lambda (x) x) 5) (define removed (flow/gc)) (list removed (flow/list))") + (list 1 (list (list 1 "suspended")))) +(flow-hyg-test + "gc: removes a cancelled flow" + (flow-h + "(defflow w (lambda (x) (suspend (quote q)))) (define id (car (cdr (flow/start w 0)))) (flow/cancel id) (flow/gc)") + 1) +(flow-hyg-test + "gc: a kept suspended flow is still resumable" + (flow-h + "(defflow w (sequence (lambda (x) (suspend (quote q))) (lambda (v) (* v 2)))) (define id (car (cdr (flow/start w 0)))) (flow/start (lambda (x) x) 1) (flow/gc) (flow/resume id 21)") + 42) +(flow-hyg-test + "gc: counts every terminal flow it drops" + (flow-h + "(flow/start (lambda (x) x) 1) (flow/start (lambda (x) x) 2) (defflow w (lambda (x) (suspend (quote q)))) (flow/start w 0) (flow/gc)") + 2) + +;; ── flow/forget ───────────────────────────────────────────────── +(flow-hyg-test + "forget: drops a completed flow" + (flow-h + "(defflow w (sequence (lambda (x) (suspend (quote q))) (lambda (v) v))) (define id (car (cdr (flow/start w 0)))) (flow/resume id 7) (list (flow/forget id) (flow/status id))") + (list true "unknown")) +(flow-hyg-test + "forget: refuses to drop a live (suspended) flow" + (flow-h + "(defflow w (lambda (x) (suspend (quote q)))) (define id (car (cdr (flow/start w 0)))) (list (flow/forget id) (flow/status id))") + (list false "suspended")) +(flow-hyg-test + "forget: drops a cancelled flow" + (flow-h + "(defflow w (lambda (x) (suspend (quote q)))) (define id (car (cdr (flow/start w 0)))) (flow/cancel id) (list (flow/forget id) (flow/status id))") + (list true "unknown")) +(flow-hyg-test + "forget: unknown id yields false" + (flow-h "(flow/forget 999)") + false) + +(define flow-hyg-tests-run! (fn () {:total (+ flow-hyg-pass flow-hyg-fail) :passed flow-hyg-pass :failed flow-hyg-fail :fails flow-hyg-fails})) diff --git a/plans/flow-on-sx.md b/plans/flow-on-sx.md index 95a8a57b..108e74f9 100644 --- a/plans/flow-on-sx.md +++ b/plans/flow-on-sx.md @@ -16,7 +16,7 @@ federation extension via fed-sx for remote-node execution. ## Status (rolling) -`bash lib/flow/conformance.sh` → **142/142** (Phases 1-7 complete) +`bash lib/flow/conformance.sh` → **151/151** (Phases 1-7 complete; +store hygiene) ## Ground rules @@ -141,6 +141,9 @@ something operators and authors actually use. Accumulation, not a rewrite. - [x] introspection API — `flow/status id`, `flow/result id`, `flow/list`, `flow/pending` (operator view of what each suspended flow awaits). 12 tests in `tests/api.sx`. +- [x] store hygiene — `flow/gc` drops terminal (done/cancelled) records keeping + live suspended flows (returns count); `flow/forget id` drops one terminal record + and refuses live flows. Bounds unbounded store growth. 9 tests in `tests/hygiene.sx`. - [x] `tap` — side-effecting pass-through node (logging/metrics) that returns input - [x] `recover` — complement to try-catch for the fail-VALUE channel: run node; if it yields `(fail ...)`, run a recovery node on the reason