From e4fc66bfeb8c2f7d89ec7f4e2e8f3bd7e2a05ff6 Mon Sep 17 00:00:00 2001 From: giles Date: Thu, 2 Jul 2026 13:50:41 +0000 Subject: [PATCH] host: enrich the adapter seam to be substrate-agnostic (review fixes) MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit After review, the seam was only synchronous-complete; the durable/celery-sx runners couldn't plug in cleanly. Additive fixes (pipeline unchanged): (1) :status branch in run-binding — 'done' dispatches effects, 'suspended' records the flow + :resume (a durable runner holds it; completion re-enters as a new activity via pump), 'failed' records + :error for retry/dead-letter. (2) richer runner env — :ctx (per-activity, via engine :ctx-of) + injected :effects (external-read interfaces, e.g. a deterministic fetch_followers). (3) dedup by content :id — a cycle is caught by identity, not just the depth guard. (4) behavior/pump — drain transport.deliver for inbound (peer activities + async runner completions), sharing one trace so dedup spans the batch. behavior 9/9 (+ suspended/failed/dedup/env/pump); full host conformance 580/580. Co-Authored-By: Claude Opus 4.8 --- lib/host/behavior.sx | 91 +++++++++++++++++++++---------- lib/host/tests/behavior.sx | 46 ++++++++++++++++ plans/business-logic-fed-flows.md | 2 +- 3 files changed, 110 insertions(+), 29 deletions(-) diff --git a/lib/host/behavior.sx b/lib/host/behavior.sx index 301ecd9a..1d64237c 100644 --- a/lib/host/behavior.sx +++ b/lib/host/behavior.sx @@ -3,35 +3,57 @@ ;; behavior DAG; everything between is a swappable adapter, each a dict-of-functions: ;; ;; trigger-registry {:register! (fn spec dag hint) :match (fn activity -> [binding])} -;; runner {:run (fn dag env -> {:status :results :effects :resume})} +;; runner {:run (fn dag env -> {:status "done"|"suspended"|"failed" :results :effects +;; :resume :error})} ;; transport {:emit (fn activity) :deliver (fn -> [activity])} ;; driver {:dispatch (fn effect -> [activity])} ;; may emit NEW activities ;; -;; An engine bundles the four. behavior/process folds an activity through the pipeline — -;; emit → match triggers → run each DAG → dispatch each effect → recurse on new activities -;; (loop closure, depth-guarded). Returns a TRACE {:emitted :ran :effects} for observation. -;; Runner/transport/registry/driver are all injected, so the same DAG + engine run over the -;; synchronous op-table runner, the Erlang durable runner, celery-sx, fed-sx transport, etc. +;; The engine bundles the four (+ optional :effects injected external-read interfaces and a +;; :ctx-of (fn activity -> ctx)). behavior/process folds an activity through the pipeline — +;; emit → match triggers → run each DAG → branch on :status → (done) dispatch each effect-as-data +;; → recurse on new activities. Substrate-agnostic: +;; - :status "done" → dispatch effects (synchronous op-table runner). +;; - :status "suspended" → record it (+ :resume token); NO effects yet — a durable (Erlang) +;; runner holds the flow, and its later completion re-enters as a NEW activity via behavior/pump. +;; - :status "failed" → record it (+ :error) for retry/dead-letter (celery-sx). +;; DEDUP: activities are content-addressed; each :id processes once (a cycle is caught by identity, +;; not just the depth guard). INBOUND: behavior/pump drains transport.deliver (peer activities + +;; async runner completions). The env carries the injected :effects + per-activity :ctx so real +;; DAGs (e.g. an injected fetch_followers, deterministic for replay) can run. -(define behavior/make-engine (fn (adapters) adapters)) ;; {:triggers :runner :transport :driver} +(define behavior/make-engine (fn (adapters) adapters)) ;; {:triggers :runner :transport :driver :effects? :ctx-of?} (define behavior/-triggers (fn (e) (get e :triggers))) (define behavior/-runner (fn (e) (get e :runner))) (define behavior/-transport (fn (e) (get e :transport))) (define behavior/-driver (fn (e) (get e :driver))) -(define behavior/-max-depth 8) ;; loop-closure guard +(define behavior/-effects (fn (e) (or (get e :effects) {}))) +(define behavior/-ctx-of (fn (e a) (let ((f (get e :ctx-of))) (if (nil? f) nil (f a))))) +(define behavior/-max-depth 8) ;; loop-closure backstop (dedup is the real guard) +(define behavior/-empty-trace + {:emitted (list) :ran (list) :effects (list) :suspended (list) :failed (list) :seen (list)}) -;; run one trigger binding: execute its DAG with the activity env, then dispatch each effect. +;; run one trigger binding: execute its DAG with the FULL env, then branch on :status. (define behavior/-run-binding (fn (engine activity binding depth acc) - (let ((result ((get (behavior/-runner engine) :run) - (get binding :dag) - {:activity activity :actor (get activity :actor) :binding binding}))) - (reduce - (fn (a eff) - (behavior/-dispatch-effect engine eff depth - (assoc a :effects (concat (get a :effects) (list eff))))) - (assoc acc :ran (concat (get acc :ran) (list result))) - (or (get result :effects) (list)))))) + (let ((env {:activity activity :actor (get activity :actor) + :ctx (behavior/-ctx-of engine activity) :effects (behavior/-effects engine) + :binding binding})) + (let ((result ((get (behavior/-runner engine) :run) (get binding :dag) env))) + (let ((acc1 (assoc acc :ran (concat (get acc :ran) (list result)))) + (status (or (get result :status) "done"))) + (cond + ((= status "suspended") + (assoc acc1 :suspended (concat (get acc1 :suspended) + (list {:dag (get binding :dag) :resume (get result :resume) :activity activity})))) + ((= status "failed") + (assoc acc1 :failed (concat (get acc1 :failed) + (list {:dag (get binding :dag) :error (get result :error) :activity activity})))) + (else + (reduce + (fn (a eff) + (behavior/-dispatch-effect engine eff depth + (assoc a :effects (concat (get a :effects) (list eff))))) + acc1 (or (get result :effects) (list)))))))))) ;; dispatch one effect via the driver; recurse on any NEW activities it emits (the loop closes). (define behavior/-dispatch-effect @@ -41,18 +63,31 @@ acc (or ((get (behavior/-driver engine) :dispatch) eff) (list))))) -;; one step: emit the activity, match triggers, run each binding. Depth-guarded. +;; one step: DEDUP by activity :id → emit → match triggers → run each binding. Depth-guarded. (define behavior/-step (fn (engine activity depth acc) - (if (> depth behavior/-max-depth) acc - (begin - ((get (behavior/-transport engine) :emit) activity) - (reduce - (fn (a binding) (behavior/-run-binding engine activity binding depth a)) - (assoc acc :emitted (concat (get acc :emitted) (list activity))) - (or ((get (behavior/-triggers engine) :match) activity) (list))))))) + (let ((aid (get activity :id))) + (if (or (> depth behavior/-max-depth) + (and (not (nil? aid)) (contains? (get acc :seen) aid))) + acc + (let ((acc0 (if (nil? aid) acc (assoc acc :seen (concat (get acc :seen) (list aid)))))) + (begin + ((get (behavior/-transport engine) :emit) activity) + (reduce + (fn (a binding) (behavior/-run-binding engine activity binding depth a)) + (assoc acc0 :emitted (concat (get acc0 :emitted) (list activity))) + (or ((get (behavior/-triggers engine) :match) activity) (list))))))))) -;; process an activity through the whole seam. Returns the trace. +;; process one activity through the whole seam. Returns the trace. (define behavior/process (fn (engine activity) - (behavior/-step engine activity 0 {:emitted (list) :ran (list) :effects (list)}))) + (behavior/-step engine activity 0 behavior/-empty-trace))) + +;; drain INBOUND activities (peer deliveries + async runner completions) through the engine, +;; sharing one trace so dedup applies across the batch. +(define behavior/pump + (fn (engine) + (reduce + (fn (acc activity) (behavior/-step engine activity 0 acc)) + behavior/-empty-trace + (or ((get (behavior/-transport engine) :deliver)) (list))))) diff --git a/lib/host/tests/behavior.sx b/lib/host/tests/behavior.sx index ce5c3814..afe1f032 100644 --- a/lib/host/tests/behavior.sx +++ b/lib/host/tests/behavior.sx @@ -64,6 +64,52 @@ (and (> (len (get tr :emitted)) 1) (<= (len (get tr :emitted)) 10))) true) +;; ── status branch: a SUSPENDED runner is recorded (no effects yet); a FAILED one too ── +(define bs-triggers {:register! (fn (s d h) nil) :match (fn (a) (if (= (get a :verb) "wait") (list {:dag "w"}) (list)))}) +(define bs-sus-runner {:run (fn (dag env) {:status "suspended" :resume "morning"})}) +(define bs-sus-engine (behavior/make-engine {:triggers bs-triggers :runner bs-sus-runner :transport be-transport :driver be-driver})) +(host-be-test "a suspended runner is recorded (+ :resume), no effects dispatched (durable path)" + (let ((tr (behavior/process bs-sus-engine {:verb "wait" :actor "a"}))) + (list (len (get tr :suspended)) (len (get tr :effects)) (get (first (get tr :suspended)) :resume))) + (list 1 0 "morning")) +(define bs-fail-runner {:run (fn (dag env) {:status "failed" :error "boom"})}) +(define bs-fail-engine (behavior/make-engine {:triggers bs-triggers :runner bs-fail-runner :transport be-transport :driver be-driver})) +(host-be-test "a failed runner is recorded (+ :error) for retry/dead-letter" + (let ((tr (behavior/process bs-fail-engine {:verb "wait" :actor "a"}))) + (list (len (get tr :failed)) (len (get tr :effects)) (get (first (get tr :failed)) :error))) + (list 1 0 "boom")) + +;; ── dedup: the same activity (by content :id) is processed ONCE even if re-emitted ── +(define bd-triggers {:register! (fn (s d h) nil) :match (fn (a) (if (= (get a :verb) "x") (list {:dag "d"}) (list)))}) +(define bd-runner {:run (fn (dag env) {:status "done" :effects (list {:kind "re"})})}) +(define bd-driver {:dispatch (fn (eff) (list {:verb "x" :id "same-id" :actor "a"}))}) ;; re-emits the SAME id +(define bd-engine (behavior/make-engine {:triggers bd-triggers :runner bd-runner :transport be-transport :driver bd-driver})) +(host-be-test "dedup — the same activity (by :id) processes once (a cycle caught by identity)" + (let ((tr (behavior/process bd-engine {:verb "x" :id "same-id" :actor "a"}))) + (list (len (get tr :emitted)) (len (get tr :ran)))) + (list 1 1)) + +;; ── env: the runner receives the injected :effects (external reads) + per-activity :ctx ── +(define benv-runner + {:run (fn (dag env) {:status "done" + :effects (list {:kind "saw" :ctx (get (get env :ctx) :state) + :ff (not (nil? (get (get env :effects) :fetch-followers)))})})}) +(define benv-engine (behavior/make-engine + {:triggers bs-triggers :runner benv-runner :transport be-transport :driver be-driver + :effects {:fetch-followers (fn (x) (list 1 2 3))} :ctx-of (fn (a) {:state "loaded"})})) +(host-be-test "the runner env carries injected :effects + per-activity :ctx" + (let ((eff (first (get (behavior/process benv-engine {:verb "wait" :actor "a"}) :effects)))) + (list (get eff :ctx) (get eff :ff))) + (list "loaded" true)) + +;; ── pump: drain transport.deliver → process each inbound activity (peers / async completions) ── +(define bp-transport {:emit (fn (a) nil) :deliver (fn () (list {:verb "publish" :actor "a"} {:verb "publish" :actor "b"}))}) +(define bp-engine (behavior/make-engine {:triggers be-triggers :runner be-runner :transport bp-transport :driver be-driver})) +(host-be-test "pump drains transport.deliver → processes each inbound activity" + (let ((tr (behavior/pump bp-engine))) + (list (len (get tr :emitted)) (len (get tr :ran)) (len (get tr :effects)))) + (list 2 2 2)) + (define host-be-tests-run! (fn () {:total (+ host-be-pass host-be-fail) diff --git a/plans/business-logic-fed-flows.md b/plans/business-logic-fed-flows.md index 15b07f09..e15e6a96 100644 --- a/plans/business-logic-fed-flows.md +++ b/plans/business-logic-fed-flows.md @@ -108,7 +108,7 @@ trigger-registry ← next/ trigger_registry OR a local SX matcher · runner ← (sync) / next/ flow_dispatch (durable) / celery-sx · transport ← artdag/federation (transport injected) / next/ delivery / in-process · driver ← host writes / email / append-activity. -**Build order:** (a) DONE — the seam as SX contracts + a reference engine wired to MOCK adapters, tested +**Build order:** (a) DONE — the seam as SX contracts + reference engine, tested. Substrate-agnostic after review: :status branch (done/suspended/failed), injected env (:ctx + :effects), dedup by activity :id, behavior/pump for inbound (peer + async completions). behavior 9/9. (process a mock activity → effect flows → loop closes). (b) P0 then supplies the REAL adapters (publish activity, local-SX trigger, sync op-table runner over a publish-DAG, host driver).