lib/host/ra.sx — a PURE-SX seam runner (advertises {effect,branch,each,suspend}) with an INJECTED
erl-eval (real = er-to-sx-deep ∘ erlang-eval-ast; mock in unit tests), so it loads in the plain host
(Erlang refs resolve lazily inside lambdas) and is unit-testable without the Erlang runtime.
host/ra--{atom,bin,erl-src,start-expr,resume-expr,parse,make-runner,resume,real-eval}: marshals our
canonical activity → Erlang source (CID as <<"…">> binary, atoms single-quoted), starts a named
next/ flow via flow_store, parses (ok Id (flow_done V))→{:status done :effects V :flow-id} /
(ok Id (flow_suspended T))→{:status suspended :resume {:id :tag}}.
DUAL-RUNNER ROUTING (flows.sx): host/flow--required-caps now handles a {:erl-flow :needs} DAG
(declared caps, since a foreign flow can't be introspected); host/flow--select-runner picks the
cheapest runner whose capabilities cover the DAG's needs. The capability model is now REAL with two
runners — an {effect,branch} composition lands on exec-runner; a {suspend} DAG routes to RA.
Verified: ra 9/9 (mock erl-eval) + plans/ra-integration.sh 4/4 (the REAL module driving live
flow_store: urgent→done, newsletter→suspended with a resume handle, digest_sent effect-as-data).
Full host conformance 607/607; next/tests/triggers_e2e.sh 10/10 baseline intact.
FINDING → RA-LIVE deferred: gen_servers don't persist across separate erlang-eval-ast calls (flow
README), so true cross-call suspend/resume needs a PERSISTENT next/ kernel process. The runner +
marshalling + suspend/resume mechanics are proven; RA-live is process lifecycle + wiring, documented.
Co-Authored-By: Claude Opus 4.8 <noreply@anthropic.com>
64 lines
4.1 KiB
Plaintext
64 lines
4.1 KiB
Plaintext
;; lib/host/flows.sx — behavior DAGs + CAPABILITY-typed nodes / capability-advertising runners
|
|
;; (plans/business-logic-fed-flows.md). P0.2 finding: a SYNCHRONOUS business flow is an EXECUTE-FOLD
|
|
;; composition (host/execute.sx: effect/alt/each — content-addressed control flow), NOT an artdag
|
|
;; DATAFLOW DAG (which has no branch). Both are "content-addressed op-DAGs" — two instances of one
|
|
;; abstraction, run very differently: the execute-fold runner (control flow, synchronous) vs the
|
|
;; artdag runner (dataflow, memoized/parallel). The DIFFERENCE is which capabilities their nodes
|
|
;; need. A node declares its capability; a runner ADVERTISES what it supports; the binder checks
|
|
;; required ⊆ advertised (fail fast); so the sync/durable/distributed choice is DERIVED from the DAG.
|
|
;; NOTE (plan phase AX): this execute-fold-vs-artdag split is a capability SNAPSHOT, not a boundary
|
|
;; — artdag MAY grow +{effect,branch,each} node-kinds and business logic then migrates onto it (to
|
|
;; inherit content-addressed memoization / optimize / FEDERATION). The capability model makes that
|
|
;; migration seamless; the execute-fold stays the lean default for cheap synchronous flows.
|
|
|
|
;; ── capability typing: a node kind → the capability it needs ──────────
|
|
(define host/flow--node-cap
|
|
(fn (h)
|
|
(cond ((= h "effect") "effect")
|
|
((= h "alt") "branch")
|
|
((= h "each") "each")
|
|
((= h "wait") "suspend") ;; a timer/suspend node — the execute-fold canNOT run it
|
|
(else nil))))
|
|
(define host/flow--uniq-concat
|
|
(fn (a b) (reduce (fn (acc x) (if (contains? acc x) acc (concat acc (list x)))) a b)))
|
|
;; the capability SET a DAG requires. An SX composition: the union of its nodes' caps (walked). A
|
|
;; durable/opaque DAG (e.g. {:erl-flow <name> :needs (…)}) DECLARES its caps via :needs — the runner
|
|
;; can't introspect a foreign flow, so it states what it needs (e.g. {suspend} for a wait).
|
|
(define host/flow--required-caps
|
|
(fn (node)
|
|
(cond
|
|
((and (= (type-of node) "dict") (get node :needs)) (get node :needs))
|
|
((not (= (type-of node) "list")) (list))
|
|
(else
|
|
(let ((self (host/flow--node-cap (str (first node))))
|
|
(kids (reduce (fn (acc c) (host/flow--uniq-concat acc (host/flow--required-caps c)))
|
|
(list) (rest node))))
|
|
(if (nil? self) kids (host/flow--uniq-concat (list self) kids)))))))
|
|
(define host/flow--subset? (fn (a b) (reduce (fn (ok x) (and ok (contains? b x))) true a)))
|
|
|
|
;; DERIVE the runner from a fleet: the FIRST runner (in ladder order — cheapest first) whose
|
|
;; advertised capabilities cover the DAG's required set. nil if none fits (a hard bind error).
|
|
;; This is the dual-runner routing: an {effect,branch}-only DAG lands on the exec-runner; a DAG
|
|
;; needing {suspend} skips past it to RA. Same DAGs, runner chosen by need — no human hint.
|
|
(define host/flow--select-runner
|
|
(fn (runners dag)
|
|
(let ((need (host/flow--required-caps dag)))
|
|
(reduce (fn (acc r) (if (and (nil? acc) (host/flow--subset? need (get r :capabilities))) r acc))
|
|
nil runners))))
|
|
|
|
;; ── the SYNCHRONOUS op-table runner = the execute-fold ────────────────
|
|
;; a seam runner {:capabilities :run}. It ADVERTISES {effect, branch, each} — the execute-fold
|
|
;; vocabulary. run: fold the composition (dag) against the env's :ctx → the effect log (as data).
|
|
(define host/flow--exec-runner
|
|
{:capabilities (list "effect" "branch" "each")
|
|
:run (fn (dag env) {:status "done" :effects (host/exec-run dag (or (get env :ctx) {}))})})
|
|
|
|
;; DERIVE the runner: bind a DAG to a runner iff its required capabilities ⊆ the runner's advertised.
|
|
;; Fails fast (a {:bind-error …}) rather than mysteriously at run time. This is where "simple in SX
|
|
;; / durable in Erlang / distributed in celery-sx" becomes a checkable property of the DAG.
|
|
(define host/flow--bind
|
|
(fn (runner dag)
|
|
(let ((need (host/flow--required-caps dag)) (have (get runner :capabilities)))
|
|
(if (host/flow--subset? need have) {:ok true :runner runner}
|
|
{:ok false :bind-error {:needs need :has have}}))))
|