;; lib/host/behavior.sx — the ADAPTER SEAM for business-logic-as-composition. ;; (plans/business-logic-fed-flows.md.) The invariant is an ACTIVITY (state-change event) + a ;; behavior DAG; everything between is a swappable adapter, each a dict-of-functions: ;; ;; trigger-registry {:register! (fn spec dag hint) :match (fn activity -> [binding])} ;; runner {:run (fn dag env -> {:status "done"|"suspended"|"failed" :results :effects ;; :resume :error})} ;; transport {:emit (fn activity) :deliver (fn -> [activity])} ;; driver {:dispatch (fn effect -> [activity])} ;; may emit NEW activities ;; ;; The engine bundles the four (+ optional :effects injected external-read interfaces and a ;; :ctx-of (fn activity -> ctx)). behavior/process folds an activity through the pipeline — ;; emit → match triggers → run each DAG → branch on :status → (done) dispatch each effect-as-data ;; → recurse on new activities. The engine-facing runner result is {:status :effects :resume ;; :error} (anything else — e.g. artdag's memoized node :results — is runner-internal). ;; Substrate-agnostic: ;; - :status "done" → dispatch effects (synchronous op-table runner). ;; - :status "suspended" → record it (+ :resume token); NO effects yet. A durable (Erlang) runner ;; HOLDS the flow OFF-ENGINE; a timer/event resumes it OUT-OF-BAND — AFTER this process call has ;; returned. On completion the runner injects a completion ACTIVITY into the transport's INBOUND ;; channel (the runner is wired to it at CONSTRUCTION — the sync run env is long gone by then), ;; and a later behavior/pump drains it → effects flow. The effect-as-data model is intact: the ;; completion is just another inbound activity. Without that wiring a suspended flow is a dead end. ;; - :status "failed" → record it (+ :error) for retry/dead-letter (celery-sx). Retry is ;; FLOW-level (re-run the runner for that binding), NOT activity re-entry (dedup would block it). ;; DEDUP: a cycle within ONE process/pump is caught by activity :id (identity, not just the depth ;; guard). This is PER-INVOCATION, not global — global idempotency is the EMITTER's job (fire-once ;; on the transition) + a DURABLE inbox for federation (cross-pump re-delivery isn't caught here). ;; The env carries injected :effects (external reads, deterministic for replay) + per-activity :ctx. ;; The ASYNC RE-ENTRY seam is the transport's inbound channel + pump — NOT the run env (which is ;; synchronous). Runners that complete out-of-band are wired to that channel at construction. (define behavior/make-engine (fn (adapters) adapters)) ;; {:triggers :runner :transport :driver :effects? :ctx-of?} (define behavior/-triggers (fn (e) (get e :triggers))) (define behavior/-runner (fn (e) (get e :runner))) (define behavior/-transport (fn (e) (get e :transport))) (define behavior/-driver (fn (e) (get e :driver))) (define behavior/-effects (fn (e) (or (get e :effects) {}))) (define behavior/-ctx-of (fn (e a) (let ((f (get e :ctx-of))) (if (nil? f) nil (f a))))) (define behavior/-max-depth 8) ;; loop-closure backstop (dedup is the real guard) (define behavior/-empty-trace {:emitted (list) :ran (list) :effects (list) :suspended (list) :failed (list) :seen (list)}) ;; run one trigger binding: execute its DAG with the FULL env, then branch on :status. (define behavior/-run-binding (fn (engine activity binding depth acc) (let ((env {:activity activity :actor (get activity :actor) :ctx (behavior/-ctx-of engine activity) :effects (behavior/-effects engine) :binding binding})) (let ((result ((get (behavior/-runner engine) :run) (get binding :dag) env))) (let ((acc1 (assoc acc :ran (concat (get acc :ran) (list result)))) (status (or (get result :status) "done"))) (cond ((= status "suspended") (assoc acc1 :suspended (concat (get acc1 :suspended) (list {:dag (get binding :dag) :resume (get result :resume) :activity activity})))) ((= status "failed") (assoc acc1 :failed (concat (get acc1 :failed) (list {:dag (get binding :dag) :error (get result :error) :activity activity})))) (else (reduce (fn (a eff) (behavior/-dispatch-effect engine eff depth (assoc a :effects (concat (get a :effects) (list eff))))) acc1 (or (get result :effects) (list)))))))))) ;; dispatch one effect via the driver; recurse on any NEW activities it emits (the loop closes). (define behavior/-dispatch-effect (fn (engine eff depth acc) (reduce (fn (a na) (behavior/-step engine na (+ depth 1) a)) acc (or ((get (behavior/-driver engine) :dispatch) eff) (list))))) ;; one step: DEDUP by activity :id → emit → match triggers → run each binding. Depth-guarded. (define behavior/-step (fn (engine activity depth acc) (let ((aid (get activity :id))) (if (or (> depth behavior/-max-depth) (and (not (nil? aid)) (contains? (get acc :seen) aid))) acc (let ((acc0 (if (nil? aid) acc (assoc acc :seen (concat (get acc :seen) (list aid)))))) (begin ((get (behavior/-transport engine) :emit) activity) (reduce (fn (a binding) (behavior/-run-binding engine activity binding depth a)) (assoc acc0 :emitted (concat (get acc0 :emitted) (list activity))) (or ((get (behavior/-triggers engine) :match) activity) (list))))))))) ;; process one activity through the whole seam. Returns the trace. (define behavior/process (fn (engine activity) (behavior/-step engine activity 0 behavior/-empty-trace))) ;; drain INBOUND activities (peer deliveries + async runner completions) through the engine, ;; sharing one trace so dedup applies across the batch. (define behavior/pump (fn (engine) (reduce (fn (acc activity) (behavior/-step engine activity 0 acc)) behavior/-empty-trace (or ((get (behavior/-transport engine) :deliver)) (list)))))