host RA: the Erlang durable runner adapter — built + tested (module + integration)

lib/host/ra.sx — a PURE-SX seam runner (advertises {effect,branch,each,suspend}) with an INJECTED
erl-eval (real = er-to-sx-deep ∘ erlang-eval-ast; mock in unit tests), so it loads in the plain host
(Erlang refs resolve lazily inside lambdas) and is unit-testable without the Erlang runtime.
host/ra--{atom,bin,erl-src,start-expr,resume-expr,parse,make-runner,resume,real-eval}: marshals our
canonical activity → Erlang source (CID as <<"…">> binary, atoms single-quoted), starts a named
next/ flow via flow_store, parses (ok Id (flow_done V))→{:status done :effects V :flow-id} /
(ok Id (flow_suspended T))→{:status suspended :resume {:id :tag}}.

DUAL-RUNNER ROUTING (flows.sx): host/flow--required-caps now handles a {:erl-flow :needs} DAG
(declared caps, since a foreign flow can't be introspected); host/flow--select-runner picks the
cheapest runner whose capabilities cover the DAG's needs. The capability model is now REAL with two
runners — an {effect,branch} composition lands on exec-runner; a {suspend} DAG routes to RA.

Verified: ra 9/9 (mock erl-eval) + plans/ra-integration.sh 4/4 (the REAL module driving live
flow_store: urgent→done, newsletter→suspended with a resume handle, digest_sent effect-as-data).
Full host conformance 607/607; next/tests/triggers_e2e.sh 10/10 baseline intact.

FINDING → RA-LIVE deferred: gen_servers don't persist across separate erlang-eval-ast calls (flow
README), so true cross-call suspend/resume needs a PERSISTENT next/ kernel process. The runner +
marshalling + suspend/resume mechanics are proven; RA-live is process lifecycle + wiring, documented.

Co-Authored-By: Claude Opus 4.8 <noreply@anthropic.com>
This commit is contained in:
2026-07-02 16:20:36 +00:00
parent 17602e597f
commit c21be815f3
7 changed files with 245 additions and 14 deletions

View File

@@ -96,6 +96,7 @@ MODULES=(
"lib/host/execute.sx"
"lib/host/behavior.sx"
"lib/host/flows.sx"
"lib/host/ra.sx"
"lib/host/htmlsx.sx"
"lib/host/blog.sx"
"lib/host/page.sx"
@@ -116,6 +117,7 @@ SUITES=(
"htmlsx host-ht-tests-run! lib/host/tests/htmlsx.sx"
"behavior host-be-tests-run! lib/host/tests/behavior.sx"
"flows host-fl-tests-run! lib/host/tests/flows.sx"
"ra host-ra-tests-run! lib/host/tests/ra.sx"
"compose host-cp-tests-run! lib/host/tests/compose.sx"
"execute host-ex-tests-run! lib/host/tests/execute.sx"
"session host-se-tests-run! lib/host/tests/session.sx"

View File

@@ -21,16 +21,31 @@
(else nil))))
(define host/flow--uniq-concat
(fn (a b) (reduce (fn (acc x) (if (contains? acc x) acc (concat acc (list x)))) a b)))
;; the capability SET a composition requires — the union of its nodes' caps (walked recursively).
;; the capability SET a DAG requires. An SX composition: the union of its nodes' caps (walked). A
;; durable/opaque DAG (e.g. {:erl-flow <name> :needs (…)}) DECLARES its caps via :needs — the runner
;; can't introspect a foreign flow, so it states what it needs (e.g. {suspend} for a wait).
(define host/flow--required-caps
(fn (node)
(if (not (= (type-of node) "list")) (list)
(let ((self (host/flow--node-cap (str (first node))))
(kids (reduce (fn (acc c) (host/flow--uniq-concat acc (host/flow--required-caps c)))
(list) (rest node))))
(if (nil? self) kids (host/flow--uniq-concat (list self) kids))))))
(cond
((and (= (type-of node) "dict") (get node :needs)) (get node :needs))
((not (= (type-of node) "list")) (list))
(else
(let ((self (host/flow--node-cap (str (first node))))
(kids (reduce (fn (acc c) (host/flow--uniq-concat acc (host/flow--required-caps c)))
(list) (rest node))))
(if (nil? self) kids (host/flow--uniq-concat (list self) kids)))))))
(define host/flow--subset? (fn (a b) (reduce (fn (ok x) (and ok (contains? b x))) true a)))
;; DERIVE the runner from a fleet: the FIRST runner (in ladder order — cheapest first) whose
;; advertised capabilities cover the DAG's required set. nil if none fits (a hard bind error).
;; This is the dual-runner routing: an {effect,branch}-only DAG lands on the exec-runner; a DAG
;; needing {suspend} skips past it to RA. Same DAGs, runner chosen by need — no human hint.
(define host/flow--select-runner
(fn (runners dag)
(let ((need (host/flow--required-caps dag)))
(reduce (fn (acc r) (if (and (nil? acc) (host/flow--subset? need (get r :capabilities))) r acc))
nil runners))))
;; ── the SYNCHRONOUS op-table runner = the execute-fold ────────────────
;; a seam runner {:capabilities :run}. It ADVERTISES {effect, branch, each} — the execute-fold
;; vocabulary. run: fold the composition (dag) against the env's :ctx → the effect log (as data).

63
lib/host/ra.sx Normal file
View File

@@ -0,0 +1,63 @@
;; lib/host/ra.sx — RA: the ERLANG (durable) RUNNER ADAPTER (plans/business-logic-fed-flows.md).
;; A seam runner (behavior.sx) that runs a behavior as a DURABLE next/flow (flow_store), so a flow
;; can SUSPEND (wait-until-morning) and RESUME — the capability {suspend} the execute-fold runner
;; lacks. Spike-proven (plans/ra-spike.sh, 4/4): our canonical activity marshals to Erlang, drives
;; blog_publish_digest, and completes a suspend→resume cycle with no er-scheduler deadlock.
;;
;; PURE SX + INJECTED erl-eval. host/ra--make-runner takes an `erl-eval` fn (src -> clean SX value).
;; The REAL one (host/ra--real-eval) wraps (er-to-sx-deep (erlang-eval-ast src)) and only works in a
;; process with the Erlang runtime + next/flow loaded; unit tests inject a MOCK. So this module loads
;; in the plain host (the Erlang refs sit inside a lambda, resolved lazily at call time) and is
;; testable without the Erlang substrate. A durable behavior DAG is {:erl-flow <name> :needs <caps>}.
(define host/ra--atom (fn (s) (str "'" s "'"))) ;; single-quoted Erlang atom (special-char safe)
(define host/ra--bin (fn (s) (str "<<\"" s "\">>"))) ;; Erlang binary (no /utf8 — unsupported)
;; our canonical activity dict -> the Erlang activity-proplist SOURCE the flows consume.
(define host/ra--erl-src
(fn (a)
(str "[{type, " (host/ra--atom (get a :verb)) "}, {actor, " (host/ra--atom (get a :actor)) "}, "
"{id, " (host/ra--bin (get a :id)) "}, "
"{object, [{type, " (host/ra--atom (get a :object-type)) "}, "
"{category, " (host/ra--atom (get a :category)) "}]}]")))
;; the Erlang expression that starts a named durable flow with the marshalled activity env.
(define host/ra--start-expr
(fn (flow-name activity)
(str "flow_store:start(" flow-name ", [{activity, " (host/ra--erl-src activity)
"}, {actor, " (host/ra--atom (get activity :actor)) "}])")))
;; the Erlang expression that resumes a suspended instance with an effect result (async re-entry).
(define host/ra--resume-expr
(fn (id result) (str "flow_store:resume(" id ", " (host/ra--atom result) ")")))
;; map flow_store's result (er-to-sx-deep'd) onto the seam runner contract.
;; (ok Id (flow_done V…)) -> {:status "done" :effects (V…) :flow-id Id}
;; (ok Id (flow_suspended T)) -> {:status "suspended" :resume {:id Id :tag T}}
(define host/ra--parse
(fn (r)
(if (or (not (= (type-of r) "list")) (not (= (str (first r)) "ok")))
{:status "failed" :error r}
(let ((id (first (rest r))) (outcome (first (rest (rest r)))))
(let ((kind (str (first outcome))))
(cond
((= kind "flow_done")
{:status "done" :effects (rest outcome) :flow-id id})
((= kind "flow_suspended")
{:status "suspended" :resume {:id id :tag (str (first (rest outcome)))}})
(else {:status "failed" :error r})))))))
;; the RA runner — a seam {:capabilities :run}. Advertises {effect, branch, each, suspend}. :run
;; marshals the activity, starts the DAG's named flow via the injected erl-eval, parses the result.
(define host/ra--make-runner
(fn (erl-eval)
{:capabilities (list "effect" "branch" "each" "suspend")
:run (fn (dag env)
(host/ra--parse (erl-eval (host/ra--start-expr (get dag :erl-flow) (get env :activity)))))}))
;; resume a suspended instance out-of-band (the async re-entry path) — re-drive + parse.
(define host/ra--resume
(fn (erl-eval id result) (host/ra--parse (erl-eval (host/ra--resume-expr id result)))))
;; the REAL erl-eval — ONLY works where the Erlang runtime + next/flow are loaded (refs resolved
;; lazily at call time, so defining it here is harmless in the plain host).
(define host/ra--real-eval (fn (src) (er-to-sx-deep (erlang-eval-ast src))))

View File

@@ -111,6 +111,7 @@ MODULES=(
"lib/host/execute.sx"
"lib/host/behavior.sx"
"lib/host/flows.sx"
"lib/host/ra.sx"
"lib/host/htmlsx.sx"
"lib/host/blog.sx"
"lib/host/server.sx"

62
lib/host/tests/ra.sx Normal file
View File

@@ -0,0 +1,62 @@
;; lib/host/tests/ra.sx — the RA (Erlang durable) runner adapter (lib/host/ra.sx), unit-tested with
;; a MOCK erl-eval (no Erlang runtime needed). The REAL dispatch is proven in plans/ra-spike.sh.
(define host-ra-pass 0)
(define host-ra-fail 0)
(define host-ra-fails (list))
(define host-ra-test
(fn (name actual expected)
(if (= actual expected)
(set! host-ra-pass (+ host-ra-pass 1))
(begin (set! host-ra-fail (+ host-ra-fail 1))
(append! host-ra-fails {:name name :actual actual :expected expected})))))
(define ra-urgent {:verb "create" :actor "site" :id "u1" :object-type "article" :category "urgent"})
(define ra-news {:verb "create" :actor "site" :id "n1" :object-type "article" :category "newsletter"})
;; ── marshalling: our canonical activity → Erlang source ──
(host-ra-test "erl-src marshals the activity → Erlang activity-proplist source"
(host/ra--erl-src ra-urgent)
"[{type, 'create'}, {actor, 'site'}, {id, <<\"u1\">>}, {object, [{type, 'article'}, {category, 'urgent'}]}]")
(host-ra-test "start-expr wraps it in flow_store:start with the env"
(host/ra--start-expr "bd" ra-urgent)
"flow_store:start(bd, [{activity, [{type, 'create'}, {actor, 'site'}, {id, <<\"u1\">>}, {object, [{type, 'article'}, {category, 'urgent'}]}]}, {actor, 'site'}])")
;; ── result parsing: flow_store's er-to-sx-deep'd result → the seam runner contract ──
(host-ra-test "parse DONE → {:status done :effects … :flow-id}"
(let ((p (host/ra--parse (list "ok" 1 (list "flow_done" (list "digest_sent"))))))
(list (get p :status) (get p :flow-id)))
(list "done" 1))
(host-ra-test "parse SUSPENDED → {:status suspended :resume {:id :tag}}"
(let ((p (host/ra--parse (list "ok" 1 (list "flow_suspended" "morning")))))
(list (get p :status) (get (get p :resume) :id) (get (get p :resume) :tag)))
(list "suspended" 1 "morning"))
(host-ra-test "parse garbage → failed"
(get (host/ra--parse (list "error" "boom")) :status) "failed")
;; ── the runner (MOCK erl-eval keyed on the marshalled src) ──
(define ra-mock
(fn (src) (if (>= (index-of src "newsletter") 0)
(list "ok" 7 (list "flow_suspended" "morning"))
(list "ok" 3 (list "flow_done" (list "digest_sent"))))))
(define ra-runner (host/ra--make-runner ra-mock))
(host-ra-test "RA runner advertises {effect, branch, each, suspend}"
(get ra-runner :capabilities) (list "effect" "branch" "each" "suspend"))
(host-ra-test "RA runner: urgent → done; newsletter → suspended (via the injected erl-eval)"
(list (get ((get ra-runner :run) {:erl-flow "bd"} {:activity ra-urgent}) :status)
(get ((get ra-runner :run) {:erl-flow "bd"} {:activity ra-news}) :status))
(list "done" "suspended"))
(host-ra-test "RA resume drives a suspended instance (async re-entry)"
(get (host/ra--resume ra-mock 7 "morning_ts") :status) "done")
;; ── dual-runner routing: the capability model, now REAL (2 runners) ──
(host-ra-test "select-runner: {effect,branch} composition → exec-runner; {suspend} dag → RA"
(let ((fleet (list host/flow--exec-runner ra-runner)))
(list (get (host/flow--select-runner fleet (quote (alt (when (eq "k" "v") (effect a)) (else (effect b))))) :capabilities)
(get (host/flow--select-runner fleet {:erl-flow "bd" :needs (list "effect" "branch" "suspend")}) :capabilities)))
(list (list "effect" "branch" "each") (list "effect" "branch" "each" "suspend")))
(define host-ra-tests-run!
(fn ()
{:total (+ host-ra-pass host-ra-fail)
:passed host-ra-pass :failed host-ra-fail :fails host-ra-fails}))