Files
rose-ash/lib/host/tests/behavior.sx
giles 6ed523623b host: correct the seam's async-completion contract + prove it (2nd review)
Second review of the (core) seam caught a subtle one — and that my first 'fix' was itself wrong.
The async completion of a SUSPENDED durable flow happens AFTER the synchronous process call has
returned, so an :emit captured in the run env would be stale. The correct seam is construction-
wiring: a durable runner is wired to the transport's INBOUND channel at construction and injects
its completion activity there, out-of-band; a later behavior/pump drains it → effects flow. So the
engine code was already right (pump is the async re-entry seam); only the contract comment was
wrong — corrected. New test proves the loop: process(wait) suspends (no effect), then pump drains
the out-of-band completion → the flow's digest effect flows. Also clarified: dedup is per-
invocation (global idempotency = emitter fire-once + durable inbox); retry is flow-level; the
engine-facing runner result is {:status :effects :resume :error} (:results is runner-internal).

behavior 10/10 (+ async-completion). No engine change — comment + test only.

Co-Authored-By: Claude Opus 4.8 <noreply@anthropic.com>
2026-07-02 13:55:32 +00:00

141 lines
8.9 KiB
Plaintext

;; lib/host/tests/behavior.sx — the adapter seam (lib/host/behavior.sx). MOCK adapters prove the
;; pipeline contract (emit → match → run → dispatch → loop-closure) independent of any substrate.
(define host-be-pass 0)
(define host-be-fail 0)
(define host-be-fails (list))
(define host-be-test
(fn (name actual expected)
(if (= actual expected)
(set! host-be-pass (+ host-be-pass 1))
(begin
(set! host-be-fail (+ host-be-fail 1))
(append! host-be-fails {:name name :actual actual :expected expected})))))
;; ── mock adapters: a publish activity fires a runner that emits one notify effect ──
(define be-transport {:emit (fn (a) nil) :deliver (fn () (list))})
(define be-triggers
{:register! (fn (spec dag hint) nil)
:match (fn (a) (if (= (get a :verb) "publish") (list {:dag "publish-dag"}) (list)))})
(define be-runner
{:run (fn (dag env)
(if (= dag "publish-dag")
{:status "done" :effects (list {:kind "notify" :to (get (get env :activity) :actor)})}
{:status "done" :effects (list)}))})
(define be-driver {:dispatch (fn (eff) (list))}) ;; no follow-up activities
(define be-engine (behavior/make-engine {:triggers be-triggers :runner be-runner :transport be-transport :driver be-driver}))
(host-be-test "publish activity → trigger → runner → effect flows through the seam"
(let ((tr (behavior/process be-engine {:verb "publish" :actor "alice" :object "cid1"})))
(list (len (get tr :emitted)) (len (get tr :ran)) (len (get tr :effects))
(get (first (get tr :effects)) :kind) (get (first (get tr :effects)) :to)))
(list 1 1 1 "notify" "alice"))
(host-be-test "a non-matching activity fires nothing (log complete, execution precise)"
(let ((tr (behavior/process be-engine {:verb "draft" :actor "bob" :object "cid2"})))
(list (len (get tr :emitted)) (len (get tr :ran)) (len (get tr :effects))))
(list 1 0 0))
;; ── loop closure: an effect's driver emits a NEW activity that re-triggers a second DAG ──
(define bl-triggers
{:register! (fn (spec dag hint) nil)
:match (fn (a) (cond ((= (get a :verb) "publish") (list {:dag "pub"}))
((= (get a :verb) "followup") (list {:dag "fu"}))
(else (list))))})
(define bl-runner
{:run (fn (dag env) (cond ((= dag "pub") {:effects (list {:kind "chain"})})
((= dag "fu") {:effects (list {:kind "done"})})
(else {:effects (list)})))})
(define bl-driver
{:dispatch (fn (eff) (if (= (get eff :kind) "chain")
(list {:verb "followup" :actor "a" :object "c2"}) (list)))})
(define bl-engine (behavior/make-engine {:triggers bl-triggers :runner bl-runner :transport be-transport :driver bl-driver}))
(host-be-test "loop closure — an effect emits a new activity that re-triggers, bounded"
(let ((tr (behavior/process bl-engine {:verb "publish" :actor "a" :object "c1"})))
(list (len (get tr :emitted)) (len (get tr :ran)) (map (fn (e) (get e :kind)) (get tr :effects))))
(list 2 2 (list "chain" "done")))
;; ── an unbounded loop is depth-guarded (terminates instead of running forever) ──
(define bi-triggers {:register! (fn (s d h) nil) :match (fn (a) (if (= (get a :verb) "loop") (list {:dag "l"}) (list)))})
(define bi-runner {:run (fn (dag env) {:effects (list {:kind "again"})})})
(define bi-driver {:dispatch (fn (eff) (list {:verb "loop" :actor "a" :object "c"}))})
(define bi-engine (behavior/make-engine {:triggers bi-triggers :runner bi-runner :transport be-transport :driver bi-driver}))
(host-be-test "an unbounded loop is depth-guarded (terminates)"
(let ((tr (behavior/process bi-engine {:verb "loop" :actor "a" :object "c"})))
(and (> (len (get tr :emitted)) 1) (<= (len (get tr :emitted)) 10)))
true)
;; ── status branch: a SUSPENDED runner is recorded (no effects yet); a FAILED one too ──
(define bs-triggers {:register! (fn (s d h) nil) :match (fn (a) (if (= (get a :verb) "wait") (list {:dag "w"}) (list)))})
(define bs-sus-runner {:run (fn (dag env) {:status "suspended" :resume "morning"})})
(define bs-sus-engine (behavior/make-engine {:triggers bs-triggers :runner bs-sus-runner :transport be-transport :driver be-driver}))
(host-be-test "a suspended runner is recorded (+ :resume), no effects dispatched (durable path)"
(let ((tr (behavior/process bs-sus-engine {:verb "wait" :actor "a"})))
(list (len (get tr :suspended)) (len (get tr :effects)) (get (first (get tr :suspended)) :resume)))
(list 1 0 "morning"))
(define bs-fail-runner {:run (fn (dag env) {:status "failed" :error "boom"})})
(define bs-fail-engine (behavior/make-engine {:triggers bs-triggers :runner bs-fail-runner :transport be-transport :driver be-driver}))
(host-be-test "a failed runner is recorded (+ :error) for retry/dead-letter"
(let ((tr (behavior/process bs-fail-engine {:verb "wait" :actor "a"})))
(list (len (get tr :failed)) (len (get tr :effects)) (get (first (get tr :failed)) :error)))
(list 1 0 "boom"))
;; ── dedup: the same activity (by content :id) is processed ONCE even if re-emitted ──
(define bd-triggers {:register! (fn (s d h) nil) :match (fn (a) (if (= (get a :verb) "x") (list {:dag "d"}) (list)))})
(define bd-runner {:run (fn (dag env) {:status "done" :effects (list {:kind "re"})})})
(define bd-driver {:dispatch (fn (eff) (list {:verb "x" :id "same-id" :actor "a"}))}) ;; re-emits the SAME id
(define bd-engine (behavior/make-engine {:triggers bd-triggers :runner bd-runner :transport be-transport :driver bd-driver}))
(host-be-test "dedup — the same activity (by :id) processes once (a cycle caught by identity)"
(let ((tr (behavior/process bd-engine {:verb "x" :id "same-id" :actor "a"})))
(list (len (get tr :emitted)) (len (get tr :ran))))
(list 1 1))
;; ── env: the runner receives the injected :effects (external reads) + per-activity :ctx ──
(define benv-runner
{:run (fn (dag env) {:status "done"
:effects (list {:kind "saw" :ctx (get (get env :ctx) :state)
:ff (not (nil? (get (get env :effects) :fetch-followers)))})})})
(define benv-engine (behavior/make-engine
{:triggers bs-triggers :runner benv-runner :transport be-transport :driver be-driver
:effects {:fetch-followers (fn (x) (list 1 2 3))} :ctx-of (fn (a) {:state "loaded"})}))
(host-be-test "the runner env carries injected :effects + per-activity :ctx"
(let ((eff (first (get (behavior/process benv-engine {:verb "wait" :actor "a"}) :effects))))
(list (get eff :ctx) (get eff :ff)))
(list "loaded" true))
;; ── pump: drain transport.deliver → process each inbound activity (peers / async completions) ──
(define bp-transport {:emit (fn (a) nil) :deliver (fn () (list {:verb "publish" :actor "a"} {:verb "publish" :actor "b"}))})
(define bp-engine (behavior/make-engine {:triggers be-triggers :runner be-runner :transport bp-transport :driver be-driver}))
(host-be-test "pump drains transport.deliver → processes each inbound activity"
(let ((tr (behavior/pump bp-engine)))
(list (len (get tr :emitted)) (len (get tr :ran)) (len (get tr :effects))))
(list 2 2 2))
;; ── async completion: a durable runner SUSPENDS, then (wired to an inbox at construction) injects
;; a completion activity OUT-OF-BAND; a later pump drains it → the flow's effect flows. Proves the
;; suspend→resume→complete loop closes via the transport's INBOUND channel, not the sync run env. ──
(define ba-inbox (list)) ;; the durable runner's out-of-band inbox
(define ba-transport {:emit (fn (a) nil) ;; outbound log — SEPARATE from the inbox
:deliver (fn () (let ((batch ba-inbox)) (begin (set! ba-inbox (list)) batch)))})
(define ba-runner
{:run (fn (dag env)
(if (= dag "wait-dag")
(begin (set! ba-inbox (concat ba-inbox (list {:verb "resumed" :actor "a"}))) ;; timer fires (simulated)
{:status "suspended" :resume "morning"})
{:status "done" :effects (list {:kind "digest"})}))})
(define ba-triggers {:register! (fn (s d h) nil)
:match (fn (a) (cond ((= (get a :verb) "wait") (list {:dag "wait-dag"}))
((= (get a :verb) "resumed") (list {:dag "resume-dag"}))
(else (list))))})
(define ba-engine (behavior/make-engine {:triggers ba-triggers :runner ba-runner :transport ba-transport :driver be-driver}))
(host-be-test "async completion — suspend, then a later pump drains the out-of-band completion → effect"
(let ((t1 (behavior/process ba-engine {:verb "wait" :actor "a"})))
(let ((t2 (behavior/pump ba-engine)))
(list (len (get t1 :suspended)) (len (get t1 :effects))
(len (get t2 :emitted)) (get (first (get t2 :effects)) :kind))))
(list 1 0 1 "digest"))
(define host-be-tests-run!
(fn ()
{:total (+ host-be-pass host-be-fail)
:passed host-be-pass :failed host-be-fail :fails host-be-fails}))