host: correct the seam's async-completion contract + prove it (2nd review)
Second review of the (core) seam caught a subtle one — and that my first 'fix' was itself wrong.
The async completion of a SUSPENDED durable flow happens AFTER the synchronous process call has
returned, so an :emit captured in the run env would be stale. The correct seam is construction-
wiring: a durable runner is wired to the transport's INBOUND channel at construction and injects
its completion activity there, out-of-band; a later behavior/pump drains it → effects flow. So the
engine code was already right (pump is the async re-entry seam); only the contract comment was
wrong — corrected. New test proves the loop: process(wait) suspends (no effect), then pump drains
the out-of-band completion → the flow's digest effect flows. Also clarified: dedup is per-
invocation (global idempotency = emitter fire-once + durable inbox); retry is flow-level; the
engine-facing runner result is {:status :effects :resume :error} (:results is runner-internal).
behavior 10/10 (+ async-completion). No engine change — comment + test only.
Co-Authored-By: Claude Opus 4.8 <noreply@anthropic.com>
This commit is contained in:
@@ -110,6 +110,30 @@
|
||||
(list (len (get tr :emitted)) (len (get tr :ran)) (len (get tr :effects))))
|
||||
(list 2 2 2))
|
||||
|
||||
;; ── async completion: a durable runner SUSPENDS, then (wired to an inbox at construction) injects
|
||||
;; a completion activity OUT-OF-BAND; a later pump drains it → the flow's effect flows. Proves the
|
||||
;; suspend→resume→complete loop closes via the transport's INBOUND channel, not the sync run env. ──
|
||||
(define ba-inbox (list)) ;; the durable runner's out-of-band inbox
|
||||
(define ba-transport {:emit (fn (a) nil) ;; outbound log — SEPARATE from the inbox
|
||||
:deliver (fn () (let ((batch ba-inbox)) (begin (set! ba-inbox (list)) batch)))})
|
||||
(define ba-runner
|
||||
{:run (fn (dag env)
|
||||
(if (= dag "wait-dag")
|
||||
(begin (set! ba-inbox (concat ba-inbox (list {:verb "resumed" :actor "a"}))) ;; timer fires (simulated)
|
||||
{:status "suspended" :resume "morning"})
|
||||
{:status "done" :effects (list {:kind "digest"})}))})
|
||||
(define ba-triggers {:register! (fn (s d h) nil)
|
||||
:match (fn (a) (cond ((= (get a :verb) "wait") (list {:dag "wait-dag"}))
|
||||
((= (get a :verb) "resumed") (list {:dag "resume-dag"}))
|
||||
(else (list))))})
|
||||
(define ba-engine (behavior/make-engine {:triggers ba-triggers :runner ba-runner :transport ba-transport :driver be-driver}))
|
||||
(host-be-test "async completion — suspend, then a later pump drains the out-of-band completion → effect"
|
||||
(let ((t1 (behavior/process ba-engine {:verb "wait" :actor "a"})))
|
||||
(let ((t2 (behavior/pump ba-engine)))
|
||||
(list (len (get t1 :suspended)) (len (get t1 :effects))
|
||||
(len (get t2 :emitted)) (get (first (get t2 :effects)) :kind))))
|
||||
(list 1 0 1 "digest"))
|
||||
|
||||
(define host-be-tests-run!
|
||||
(fn ()
|
||||
{:total (+ host-be-pass host-be-fail)
|
||||
|
||||
Reference in New Issue
Block a user