Files
rose-ash/lib/host/ra.sx
giles cb0d866002 RA-live: durable business logic in production — host drives the kernel service (LIVE)
Steps 1+2 of RA-live/TA-live, live-verified end-to-end on blog.rose-ash.com.

(1) DEPLOY: docker-compose.dev-sx-host.yml gains an sx_kernel service running next/kernel/serve.sh
(the durable-execution kernel), SX_HTTP_HOST=0.0.0.0 so the host container reaches it at
http://sx_kernel:8930.

(2) HOST AS CLIENT: lib/host/ra.sx gains a KERNEL runner — host/ra--make-kernel-runner drives the
kernel over HTTP (http-request, native primitive; returns {status headers body}). It advertises
{effect,branch,each,suspend}, so select-runner routes a durable DAG to it. host/blog.sx: the DAG
registry + runner fleet are now mutable (register-dag!/add-runner!); emit! records SUSPENSIONS in a
durable pending log; /flows shows suspended instances with a resume link (?resume=<id>) driving
host/ra--kernel-resume. serve.sh wires it: set kernel-base, add the kernel runner, register the
durable 'blog-digest' DAG, declare a DURABLE behavior on article (create→publish SYNC, update→
blog-digest DURABLE), add a 'category' field.

LIVE PROOF: editing a published newsletter article → Update → routes to the kernel runner → POST
/flow/start/newsletter → kernel SUSPENDS (instance 5, shown pending on /flows) → /flows?resume=5 →
host re-drives the kernel → DONE → digest-sent effect + pending cleared. Durable suspend/resume
across separate HTTP requests, on a deployed persistent kernel. urgent edits complete immediately
(digest). http-request works in the serving context. blog 217/217, full conformance green.

Co-Authored-By: Claude Opus 4.8 <noreply@anthropic.com>
2026-07-02 19:31:33 +00:00

98 lines
5.7 KiB
Plaintext

;; lib/host/ra.sx — RA: the ERLANG (durable) RUNNER ADAPTER (plans/business-logic-fed-flows.md).
;; A seam runner (behavior.sx) that runs a behavior as a DURABLE next/flow (flow_store), so a flow
;; can SUSPEND (wait-until-morning) and RESUME — the capability {suspend} the execute-fold runner
;; lacks. Spike-proven (plans/ra-spike.sh, 4/4): our canonical activity marshals to Erlang, drives
;; blog_publish_digest, and completes a suspend→resume cycle with no er-scheduler deadlock.
;;
;; PURE SX + INJECTED erl-eval. host/ra--make-runner takes an `erl-eval` fn (src -> clean SX value).
;; The REAL one (host/ra--real-eval) wraps (er-to-sx-deep (erlang-eval-ast src)) and only works in a
;; process with the Erlang runtime + next/flow loaded; unit tests inject a MOCK. So this module loads
;; in the plain host (the Erlang refs sit inside a lambda, resolved lazily at call time) and is
;; testable without the Erlang substrate. A durable behavior DAG is {:erl-flow <name> :needs <caps>}.
(define host/ra--atom (fn (s) (str "'" s "'"))) ;; single-quoted Erlang atom (special-char safe)
(define host/ra--bin (fn (s) (str "<<\"" s "\">>"))) ;; Erlang binary (no /utf8 — unsupported)
;; our canonical activity dict -> the Erlang activity-proplist SOURCE the flows consume.
(define host/ra--erl-src
(fn (a)
(str "[{type, " (host/ra--atom (get a :verb)) "}, {actor, " (host/ra--atom (get a :actor)) "}, "
"{id, " (host/ra--bin (get a :id)) "}, "
"{object, [{type, " (host/ra--atom (get a :object-type)) "}, "
"{category, " (host/ra--atom (get a :category)) "}]}]")))
;; the Erlang expression that starts a named durable flow with the marshalled activity env.
(define host/ra--start-expr
(fn (flow-name activity)
(str "flow_store:start(" flow-name ", [{activity, " (host/ra--erl-src activity)
"}, {actor, " (host/ra--atom (get activity :actor)) "}])")))
;; the Erlang expression that resumes a suspended instance with an effect result (async re-entry).
(define host/ra--resume-expr
(fn (id result) (str "flow_store:resume(" id ", " (host/ra--atom result) ")")))
;; map flow_store's result (er-to-sx-deep'd) onto the seam runner contract.
;; (ok Id (flow_done V…)) -> {:status "done" :effects (V…) :flow-id Id}
;; (ok Id (flow_suspended T)) -> {:status "suspended" :resume {:id Id :tag T}}
(define host/ra--parse
(fn (r)
(if (or (not (= (type-of r) "list")) (not (= (str (first r)) "ok")))
{:status "failed" :error r}
(let ((id (first (rest r))) (outcome (first (rest (rest r)))))
(let ((kind (str (first outcome))))
(cond
((= kind "flow_done")
{:status "done" :effects (rest outcome) :flow-id id})
((= kind "flow_suspended")
{:status "suspended" :resume {:id id :tag (str (first (rest outcome)))}})
(else {:status "failed" :error r})))))))
;; the RA runner — a seam {:capabilities :run}. Advertises {effect, branch, each, suspend}. :run
;; marshals the activity, starts the DAG's named flow via the injected erl-eval, parses the result.
(define host/ra--make-runner
(fn (erl-eval)
{:capabilities (list "effect" "branch" "each" "suspend")
:run (fn (dag env)
(host/ra--parse (erl-eval (host/ra--start-expr (get dag :erl-flow) (get env :activity)))))}))
;; resume a suspended instance out-of-band (the async re-entry path) — re-drive + parse.
(define host/ra--resume
(fn (erl-eval id result) (host/ra--parse (erl-eval (host/ra--resume-expr id result)))))
;; the REAL erl-eval — ONLY works where the Erlang runtime + next/flow are loaded (refs resolved
;; lazily at call time, so defining it here is harmless in the plain host).
(define host/ra--real-eval (fn (src) (er-to-sx-deep (erlang-eval-ast src))))
;; ── RA-LIVE: drive the persistent durable-execution KERNEL SERVICE over HTTP ──────────
;; next/kernel/host_kernel serves GET /flow/start/<category> → "<instance-id>:<status>" and
;; GET /flow/resume/<id> → "resume:<status>". http-request returns {"status" "headers" "body"};
;; we take the body. So RA-live's runner is the SAME seam contract, executing durably ON A REMOTE
;; PROCESS — a suspend returns the kernel instance id as the :resume handle for a later resume.
(define host/ra--http-get (fn (url) (get (http-request "GET" url {} "") "body")))
(define host/ra--parse-kernel
(fn (body)
(let ((i (index-of body ":")))
(if (< i 0) {:status "failed" :error body}
(let ((id (substr body 0 i)) (status (substr body (+ i 1) (- (len body) (+ i 1)))))
(cond
((starts-with? status "suspended") {:status "suspended" :resume {:id id}})
((starts-with? status "done") {:status "done" :effects (list {:verb "digest" :args (list id)})})
(else {:status "failed" :error body})))))))
;; a KERNEL runner — a seam runner that dispatches a durable behavior to the persistent kernel at
;; `base` (e.g. "http://sx_kernel:8930"), branching on the activity's category. Advertises {suspend},
;; so select-runner routes durable DAGs here.
(define host/ra--make-kernel-runner
(fn (base)
{:capabilities (list "effect" "branch" "each" "suspend")
:run (fn (dag env)
(host/ra--parse-kernel
(host/ra--http-get (str base "/flow/start/" (get (get env :activity) :category)))))}))
;; resume a suspended kernel instance (the async boundary) → the runner contract.
(define host/ra--kernel-resume
(fn (base id)
(host/ra--parse-kernel-resume (host/ra--http-get (str base "/flow/resume/" id)))))
(define host/ra--parse-kernel-resume
(fn (body)
(if (starts-with? body "resume:done")
{:status "done" :effects (list {:verb "digest-sent" :args (list)})}
{:status "failed" :error body})))