From 6ed523623bccbf4d3b0628e0fc42be3bf4e9d1ea Mon Sep 17 00:00:00 2001 From: giles Date: Thu, 2 Jul 2026 13:55:32 +0000 Subject: [PATCH] host: correct the seam's async-completion contract + prove it (2nd review) MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Second review of the (core) seam caught a subtle one — and that my first 'fix' was itself wrong. The async completion of a SUSPENDED durable flow happens AFTER the synchronous process call has returned, so an :emit captured in the run env would be stale. The correct seam is construction- wiring: a durable runner is wired to the transport's INBOUND channel at construction and injects its completion activity there, out-of-band; a later behavior/pump drains it → effects flow. So the engine code was already right (pump is the async re-entry seam); only the contract comment was wrong — corrected. New test proves the loop: process(wait) suspends (no effect), then pump drains the out-of-band completion → the flow's digest effect flows. Also clarified: dedup is per- invocation (global idempotency = emitter fire-once + durable inbox); retry is flow-level; the engine-facing runner result is {:status :effects :resume :error} (:results is runner-internal). behavior 10/10 (+ async-completion). No engine change — comment + test only. Co-Authored-By: Claude Opus 4.8 --- lib/host/behavior.sx | 25 +++++++++++++++++-------- lib/host/tests/behavior.sx | 24 ++++++++++++++++++++++++ 2 files changed, 41 insertions(+), 8 deletions(-) diff --git a/lib/host/behavior.sx b/lib/host/behavior.sx index 1d64237c..fc44278a 100644 --- a/lib/host/behavior.sx +++ b/lib/host/behavior.sx @@ -11,15 +11,24 @@ ;; The engine bundles the four (+ optional :effects injected external-read interfaces and a ;; :ctx-of (fn activity -> ctx)). behavior/process folds an activity through the pipeline — ;; emit → match triggers → run each DAG → branch on :status → (done) dispatch each effect-as-data -;; → recurse on new activities. Substrate-agnostic: +;; → recurse on new activities. The engine-facing runner result is {:status :effects :resume +;; :error} (anything else — e.g. artdag's memoized node :results — is runner-internal). +;; Substrate-agnostic: ;; - :status "done" → dispatch effects (synchronous op-table runner). -;; - :status "suspended" → record it (+ :resume token); NO effects yet — a durable (Erlang) -;; runner holds the flow, and its later completion re-enters as a NEW activity via behavior/pump. -;; - :status "failed" → record it (+ :error) for retry/dead-letter (celery-sx). -;; DEDUP: activities are content-addressed; each :id processes once (a cycle is caught by identity, -;; not just the depth guard). INBOUND: behavior/pump drains transport.deliver (peer activities + -;; async runner completions). The env carries the injected :effects + per-activity :ctx so real -;; DAGs (e.g. an injected fetch_followers, deterministic for replay) can run. +;; - :status "suspended" → record it (+ :resume token); NO effects yet. A durable (Erlang) runner +;; HOLDS the flow OFF-ENGINE; a timer/event resumes it OUT-OF-BAND — AFTER this process call has +;; returned. On completion the runner injects a completion ACTIVITY into the transport's INBOUND +;; channel (the runner is wired to it at CONSTRUCTION — the sync run env is long gone by then), +;; and a later behavior/pump drains it → effects flow. The effect-as-data model is intact: the +;; completion is just another inbound activity. Without that wiring a suspended flow is a dead end. +;; - :status "failed" → record it (+ :error) for retry/dead-letter (celery-sx). Retry is +;; FLOW-level (re-run the runner for that binding), NOT activity re-entry (dedup would block it). +;; DEDUP: a cycle within ONE process/pump is caught by activity :id (identity, not just the depth +;; guard). This is PER-INVOCATION, not global — global idempotency is the EMITTER's job (fire-once +;; on the transition) + a DURABLE inbox for federation (cross-pump re-delivery isn't caught here). +;; The env carries injected :effects (external reads, deterministic for replay) + per-activity :ctx. +;; The ASYNC RE-ENTRY seam is the transport's inbound channel + pump — NOT the run env (which is +;; synchronous). Runners that complete out-of-band are wired to that channel at construction. (define behavior/make-engine (fn (adapters) adapters)) ;; {:triggers :runner :transport :driver :effects? :ctx-of?} (define behavior/-triggers (fn (e) (get e :triggers))) diff --git a/lib/host/tests/behavior.sx b/lib/host/tests/behavior.sx index afe1f032..070bc766 100644 --- a/lib/host/tests/behavior.sx +++ b/lib/host/tests/behavior.sx @@ -110,6 +110,30 @@ (list (len (get tr :emitted)) (len (get tr :ran)) (len (get tr :effects)))) (list 2 2 2)) +;; ── async completion: a durable runner SUSPENDS, then (wired to an inbox at construction) injects +;; a completion activity OUT-OF-BAND; a later pump drains it → the flow's effect flows. Proves the +;; suspend→resume→complete loop closes via the transport's INBOUND channel, not the sync run env. ── +(define ba-inbox (list)) ;; the durable runner's out-of-band inbox +(define ba-transport {:emit (fn (a) nil) ;; outbound log — SEPARATE from the inbox + :deliver (fn () (let ((batch ba-inbox)) (begin (set! ba-inbox (list)) batch)))}) +(define ba-runner + {:run (fn (dag env) + (if (= dag "wait-dag") + (begin (set! ba-inbox (concat ba-inbox (list {:verb "resumed" :actor "a"}))) ;; timer fires (simulated) + {:status "suspended" :resume "morning"}) + {:status "done" :effects (list {:kind "digest"})}))}) +(define ba-triggers {:register! (fn (s d h) nil) + :match (fn (a) (cond ((= (get a :verb) "wait") (list {:dag "wait-dag"})) + ((= (get a :verb) "resumed") (list {:dag "resume-dag"})) + (else (list))))}) +(define ba-engine (behavior/make-engine {:triggers ba-triggers :runner ba-runner :transport ba-transport :driver be-driver})) +(host-be-test "async completion — suspend, then a later pump drains the out-of-band completion → effect" + (let ((t1 (behavior/process ba-engine {:verb "wait" :actor "a"}))) + (let ((t2 (behavior/pump ba-engine))) + (list (len (get t1 :suspended)) (len (get t1 :effects)) + (len (get t2 :emitted)) (get (first (get t2 :effects)) :kind)))) + (list 1 0 1 "digest")) + (define host-be-tests-run! (fn () {:total (+ host-be-pass host-be-fail)