host: enrich the adapter seam to be substrate-agnostic (review fixes)
After review, the seam was only synchronous-complete; the durable/celery-sx runners couldn't plug in cleanly. Additive fixes (pipeline unchanged): (1) :status branch in run-binding — 'done' dispatches effects, 'suspended' records the flow + :resume (a durable runner holds it; completion re-enters as a new activity via pump), 'failed' records + :error for retry/dead-letter. (2) richer runner env — :ctx (per-activity, via engine :ctx-of) + injected :effects (external-read interfaces, e.g. a deterministic fetch_followers). (3) dedup by content :id — a cycle is caught by identity, not just the depth guard. (4) behavior/pump — drain transport.deliver for inbound (peer activities + async runner completions), sharing one trace so dedup spans the batch. behavior 9/9 (+ suspended/failed/dedup/env/pump); full host conformance 580/580. Co-Authored-By: Claude Opus 4.8 <noreply@anthropic.com>
This commit is contained in:
@@ -3,35 +3,57 @@
|
||||
;; behavior DAG; everything between is a swappable adapter, each a dict-of-functions:
|
||||
;;
|
||||
;; trigger-registry {:register! (fn spec dag hint) :match (fn activity -> [binding])}
|
||||
;; runner {:run (fn dag env -> {:status :results :effects :resume})}
|
||||
;; runner {:run (fn dag env -> {:status "done"|"suspended"|"failed" :results :effects
|
||||
;; :resume :error})}
|
||||
;; transport {:emit (fn activity) :deliver (fn -> [activity])}
|
||||
;; driver {:dispatch (fn effect -> [activity])} ;; may emit NEW activities
|
||||
;;
|
||||
;; An engine bundles the four. behavior/process folds an activity through the pipeline —
|
||||
;; emit → match triggers → run each DAG → dispatch each effect → recurse on new activities
|
||||
;; (loop closure, depth-guarded). Returns a TRACE {:emitted :ran :effects} for observation.
|
||||
;; Runner/transport/registry/driver are all injected, so the same DAG + engine run over the
|
||||
;; synchronous op-table runner, the Erlang durable runner, celery-sx, fed-sx transport, etc.
|
||||
;; The engine bundles the four (+ optional :effects injected external-read interfaces and a
|
||||
;; :ctx-of (fn activity -> ctx)). behavior/process folds an activity through the pipeline —
|
||||
;; emit → match triggers → run each DAG → branch on :status → (done) dispatch each effect-as-data
|
||||
;; → recurse on new activities. Substrate-agnostic:
|
||||
;; - :status "done" → dispatch effects (synchronous op-table runner).
|
||||
;; - :status "suspended" → record it (+ :resume token); NO effects yet — a durable (Erlang)
|
||||
;; runner holds the flow, and its later completion re-enters as a NEW activity via behavior/pump.
|
||||
;; - :status "failed" → record it (+ :error) for retry/dead-letter (celery-sx).
|
||||
;; DEDUP: activities are content-addressed; each :id processes once (a cycle is caught by identity,
|
||||
;; not just the depth guard). INBOUND: behavior/pump drains transport.deliver (peer activities +
|
||||
;; async runner completions). The env carries the injected :effects + per-activity :ctx so real
|
||||
;; DAGs (e.g. an injected fetch_followers, deterministic for replay) can run.
|
||||
|
||||
(define behavior/make-engine (fn (adapters) adapters)) ;; {:triggers :runner :transport :driver}
|
||||
(define behavior/make-engine (fn (adapters) adapters)) ;; {:triggers :runner :transport :driver :effects? :ctx-of?}
|
||||
(define behavior/-triggers (fn (e) (get e :triggers)))
|
||||
(define behavior/-runner (fn (e) (get e :runner)))
|
||||
(define behavior/-transport (fn (e) (get e :transport)))
|
||||
(define behavior/-driver (fn (e) (get e :driver)))
|
||||
(define behavior/-max-depth 8) ;; loop-closure guard
|
||||
(define behavior/-effects (fn (e) (or (get e :effects) {})))
|
||||
(define behavior/-ctx-of (fn (e a) (let ((f (get e :ctx-of))) (if (nil? f) nil (f a)))))
|
||||
(define behavior/-max-depth 8) ;; loop-closure backstop (dedup is the real guard)
|
||||
(define behavior/-empty-trace
|
||||
{:emitted (list) :ran (list) :effects (list) :suspended (list) :failed (list) :seen (list)})
|
||||
|
||||
;; run one trigger binding: execute its DAG with the activity env, then dispatch each effect.
|
||||
;; run one trigger binding: execute its DAG with the FULL env, then branch on :status.
|
||||
(define behavior/-run-binding
|
||||
(fn (engine activity binding depth acc)
|
||||
(let ((result ((get (behavior/-runner engine) :run)
|
||||
(get binding :dag)
|
||||
{:activity activity :actor (get activity :actor) :binding binding})))
|
||||
(reduce
|
||||
(fn (a eff)
|
||||
(behavior/-dispatch-effect engine eff depth
|
||||
(assoc a :effects (concat (get a :effects) (list eff)))))
|
||||
(assoc acc :ran (concat (get acc :ran) (list result)))
|
||||
(or (get result :effects) (list))))))
|
||||
(let ((env {:activity activity :actor (get activity :actor)
|
||||
:ctx (behavior/-ctx-of engine activity) :effects (behavior/-effects engine)
|
||||
:binding binding}))
|
||||
(let ((result ((get (behavior/-runner engine) :run) (get binding :dag) env)))
|
||||
(let ((acc1 (assoc acc :ran (concat (get acc :ran) (list result))))
|
||||
(status (or (get result :status) "done")))
|
||||
(cond
|
||||
((= status "suspended")
|
||||
(assoc acc1 :suspended (concat (get acc1 :suspended)
|
||||
(list {:dag (get binding :dag) :resume (get result :resume) :activity activity}))))
|
||||
((= status "failed")
|
||||
(assoc acc1 :failed (concat (get acc1 :failed)
|
||||
(list {:dag (get binding :dag) :error (get result :error) :activity activity}))))
|
||||
(else
|
||||
(reduce
|
||||
(fn (a eff)
|
||||
(behavior/-dispatch-effect engine eff depth
|
||||
(assoc a :effects (concat (get a :effects) (list eff)))))
|
||||
acc1 (or (get result :effects) (list))))))))))
|
||||
|
||||
;; dispatch one effect via the driver; recurse on any NEW activities it emits (the loop closes).
|
||||
(define behavior/-dispatch-effect
|
||||
@@ -41,18 +63,31 @@
|
||||
acc
|
||||
(or ((get (behavior/-driver engine) :dispatch) eff) (list)))))
|
||||
|
||||
;; one step: emit the activity, match triggers, run each binding. Depth-guarded.
|
||||
;; one step: DEDUP by activity :id → emit → match triggers → run each binding. Depth-guarded.
|
||||
(define behavior/-step
|
||||
(fn (engine activity depth acc)
|
||||
(if (> depth behavior/-max-depth) acc
|
||||
(begin
|
||||
((get (behavior/-transport engine) :emit) activity)
|
||||
(reduce
|
||||
(fn (a binding) (behavior/-run-binding engine activity binding depth a))
|
||||
(assoc acc :emitted (concat (get acc :emitted) (list activity)))
|
||||
(or ((get (behavior/-triggers engine) :match) activity) (list)))))))
|
||||
(let ((aid (get activity :id)))
|
||||
(if (or (> depth behavior/-max-depth)
|
||||
(and (not (nil? aid)) (contains? (get acc :seen) aid)))
|
||||
acc
|
||||
(let ((acc0 (if (nil? aid) acc (assoc acc :seen (concat (get acc :seen) (list aid))))))
|
||||
(begin
|
||||
((get (behavior/-transport engine) :emit) activity)
|
||||
(reduce
|
||||
(fn (a binding) (behavior/-run-binding engine activity binding depth a))
|
||||
(assoc acc0 :emitted (concat (get acc0 :emitted) (list activity)))
|
||||
(or ((get (behavior/-triggers engine) :match) activity) (list)))))))))
|
||||
|
||||
;; process an activity through the whole seam. Returns the trace.
|
||||
;; process one activity through the whole seam. Returns the trace.
|
||||
(define behavior/process
|
||||
(fn (engine activity)
|
||||
(behavior/-step engine activity 0 {:emitted (list) :ran (list) :effects (list)})))
|
||||
(behavior/-step engine activity 0 behavior/-empty-trace)))
|
||||
|
||||
;; drain INBOUND activities (peer deliveries + async runner completions) through the engine,
|
||||
;; sharing one trace so dedup applies across the batch.
|
||||
(define behavior/pump
|
||||
(fn (engine)
|
||||
(reduce
|
||||
(fn (acc activity) (behavior/-step engine activity 0 acc))
|
||||
behavior/-empty-trace
|
||||
(or ((get (behavior/-transport engine) :deliver)) (list)))))
|
||||
|
||||
Reference in New Issue
Block a user