;; lib/host/ra.sx — RA: the ERLANG (durable) RUNNER ADAPTER (plans/business-logic-fed-flows.md). ;; A seam runner (behavior.sx) that runs a behavior as a DURABLE next/flow (flow_store), so a flow ;; can SUSPEND (wait-until-morning) and RESUME — the capability {suspend} the execute-fold runner ;; lacks. Spike-proven (plans/ra-spike.sh, 4/4): our canonical activity marshals to Erlang, drives ;; blog_publish_digest, and completes a suspend→resume cycle with no er-scheduler deadlock. ;; ;; PURE SX + INJECTED erl-eval. host/ra--make-runner takes an `erl-eval` fn (src -> clean SX value). ;; The REAL one (host/ra--real-eval) wraps (er-to-sx-deep (erlang-eval-ast src)) and only works in a ;; process with the Erlang runtime + next/flow loaded; unit tests inject a MOCK. So this module loads ;; in the plain host (the Erlang refs sit inside a lambda, resolved lazily at call time) and is ;; testable without the Erlang substrate. A durable behavior DAG is {:erl-flow :needs }. (define host/ra--atom (fn (s) (str "'" s "'"))) ;; single-quoted Erlang atom (special-char safe) (define host/ra--bin (fn (s) (str "<<\"" s "\">>"))) ;; Erlang binary (no /utf8 — unsupported) ;; our canonical activity dict -> the Erlang activity-proplist SOURCE the flows consume. (define host/ra--erl-src (fn (a) (str "[{type, " (host/ra--atom (get a :verb)) "}, {actor, " (host/ra--atom (get a :actor)) "}, " "{id, " (host/ra--bin (get a :id)) "}, " "{object, [{type, " (host/ra--atom (get a :object-type)) "}, " "{category, " (host/ra--atom (get a :category)) "}]}]"))) ;; the Erlang expression that starts a named durable flow with the marshalled activity env. (define host/ra--start-expr (fn (flow-name activity) (str "flow_store:start(" flow-name ", [{activity, " (host/ra--erl-src activity) "}, {actor, " (host/ra--atom (get activity :actor)) "}])"))) ;; the Erlang expression that resumes a suspended instance with an effect result (async re-entry). (define host/ra--resume-expr (fn (id result) (str "flow_store:resume(" id ", " (host/ra--atom result) ")"))) ;; map flow_store's result (er-to-sx-deep'd) onto the seam runner contract. ;; (ok Id (flow_done V…)) -> {:status "done" :effects (V…) :flow-id Id} ;; (ok Id (flow_suspended T)) -> {:status "suspended" :resume {:id Id :tag T}} (define host/ra--parse (fn (r) (if (or (not (= (type-of r) "list")) (not (= (str (first r)) "ok"))) {:status "failed" :error r} (let ((id (first (rest r))) (outcome (first (rest (rest r))))) (let ((kind (str (first outcome)))) (cond ((= kind "flow_done") {:status "done" :effects (rest outcome) :flow-id id}) ((= kind "flow_suspended") {:status "suspended" :resume {:id id :tag (str (first (rest outcome)))}}) (else {:status "failed" :error r}))))))) ;; the RA runner — a seam {:capabilities :run}. Advertises {effect, branch, each, suspend}. :run ;; marshals the activity, starts the DAG's named flow via the injected erl-eval, parses the result. (define host/ra--make-runner (fn (erl-eval) {:capabilities (list "effect" "branch" "each" "suspend") :run (fn (dag env) (host/ra--parse (erl-eval (host/ra--start-expr (get dag :erl-flow) (get env :activity)))))})) ;; resume a suspended instance out-of-band (the async re-entry path) — re-drive + parse. (define host/ra--resume (fn (erl-eval id result) (host/ra--parse (erl-eval (host/ra--resume-expr id result))))) ;; the REAL erl-eval — ONLY works where the Erlang runtime + next/flow are loaded (refs resolved ;; lazily at call time, so defining it here is harmless in the plain host). (define host/ra--real-eval (fn (src) (er-to-sx-deep (erlang-eval-ast src)))) ;; ── RA-LIVE: drive the persistent durable-execution KERNEL SERVICE over HTTP ────────── ;; next/kernel/host_kernel serves GET /flow/start/ → ":" and ;; GET /flow/resume/ → "resume:". http-request returns {"status" "headers" "body"}; ;; we take the body. So RA-live's runner is the SAME seam contract, executing durably ON A REMOTE ;; PROCESS — a suspend returns the kernel instance id as the :resume handle for a later resume. (define host/ra--http-get (fn (url) (get (http-request "GET" url {} "") "body"))) (define host/ra--parse-kernel (fn (body) (let ((i (index-of body ":"))) (if (< i 0) {:status "failed" :error body} (let ((id (substr body 0 i)) (status (substr body (+ i 1) (- (len body) (+ i 1))))) (cond ((starts-with? status "suspended") {:status "suspended" :resume {:id id}}) ((starts-with? status "done") {:status "done" :effects (list {:verb "digest" :args (list id)})}) (else {:status "failed" :error body}))))))) ;; a KERNEL runner — a seam runner that dispatches a durable behavior to the persistent kernel at ;; `base` (e.g. "http://sx_kernel:8930"), branching on the activity's category. Advertises {suspend}, ;; so select-runner routes durable DAGs here. (define host/ra--make-kernel-runner (fn (base) {:capabilities (list "effect" "branch" "each" "suspend") :run (fn (dag env) (host/ra--parse-kernel (host/ra--http-get (str base "/flow/start/" (get (get env :activity) :category)))))})) ;; resume a suspended kernel instance (the async boundary) → the runner contract. (define host/ra--kernel-resume (fn (base id) (host/ra--parse-kernel-resume (host/ra--http-get (str base "/flow/resume/" id))))) (define host/ra--parse-kernel-resume (fn (body) (if (starts-with? body "resume:done") {:status "done" :effects (list {:verb "digest-sent" :args (list)})} {:status "failed" :error body})))