Steps 1+2 of RA-live/TA-live, live-verified end-to-end on blog.rose-ash.com. (1) DEPLOY: docker-compose.dev-sx-host.yml gains an sx_kernel service running next/kernel/serve.sh (the durable-execution kernel), SX_HTTP_HOST=0.0.0.0 so the host container reaches it at http://sx_kernel:8930. (2) HOST AS CLIENT: lib/host/ra.sx gains a KERNEL runner — host/ra--make-kernel-runner drives the kernel over HTTP (http-request, native primitive; returns {status headers body}). It advertises {effect,branch,each,suspend}, so select-runner routes a durable DAG to it. host/blog.sx: the DAG registry + runner fleet are now mutable (register-dag!/add-runner!); emit! records SUSPENSIONS in a durable pending log; /flows shows suspended instances with a resume link (?resume=<id>) driving host/ra--kernel-resume. serve.sh wires it: set kernel-base, add the kernel runner, register the durable 'blog-digest' DAG, declare a DURABLE behavior on article (create→publish SYNC, update→ blog-digest DURABLE), add a 'category' field. LIVE PROOF: editing a published newsletter article → Update → routes to the kernel runner → POST /flow/start/newsletter → kernel SUSPENDS (instance 5, shown pending on /flows) → /flows?resume=5 → host re-drives the kernel → DONE → digest-sent effect + pending cleared. Durable suspend/resume across separate HTTP requests, on a deployed persistent kernel. urgent edits complete immediately (digest). http-request works in the serving context. blog 217/217, full conformance green. Co-Authored-By: Claude Opus 4.8 <noreply@anthropic.com>
98 lines
5.7 KiB
Plaintext
98 lines
5.7 KiB
Plaintext
;; lib/host/ra.sx — RA: the ERLANG (durable) RUNNER ADAPTER (plans/business-logic-fed-flows.md).
|
|
;; A seam runner (behavior.sx) that runs a behavior as a DURABLE next/flow (flow_store), so a flow
|
|
;; can SUSPEND (wait-until-morning) and RESUME — the capability {suspend} the execute-fold runner
|
|
;; lacks. Spike-proven (plans/ra-spike.sh, 4/4): our canonical activity marshals to Erlang, drives
|
|
;; blog_publish_digest, and completes a suspend→resume cycle with no er-scheduler deadlock.
|
|
;;
|
|
;; PURE SX + INJECTED erl-eval. host/ra--make-runner takes an `erl-eval` fn (src -> clean SX value).
|
|
;; The REAL one (host/ra--real-eval) wraps (er-to-sx-deep (erlang-eval-ast src)) and only works in a
|
|
;; process with the Erlang runtime + next/flow loaded; unit tests inject a MOCK. So this module loads
|
|
;; in the plain host (the Erlang refs sit inside a lambda, resolved lazily at call time) and is
|
|
;; testable without the Erlang substrate. A durable behavior DAG is {:erl-flow <name> :needs <caps>}.
|
|
|
|
(define host/ra--atom (fn (s) (str "'" s "'"))) ;; single-quoted Erlang atom (special-char safe)
|
|
(define host/ra--bin (fn (s) (str "<<\"" s "\">>"))) ;; Erlang binary (no /utf8 — unsupported)
|
|
|
|
;; our canonical activity dict -> the Erlang activity-proplist SOURCE the flows consume.
|
|
(define host/ra--erl-src
|
|
(fn (a)
|
|
(str "[{type, " (host/ra--atom (get a :verb)) "}, {actor, " (host/ra--atom (get a :actor)) "}, "
|
|
"{id, " (host/ra--bin (get a :id)) "}, "
|
|
"{object, [{type, " (host/ra--atom (get a :object-type)) "}, "
|
|
"{category, " (host/ra--atom (get a :category)) "}]}]")))
|
|
|
|
;; the Erlang expression that starts a named durable flow with the marshalled activity env.
|
|
(define host/ra--start-expr
|
|
(fn (flow-name activity)
|
|
(str "flow_store:start(" flow-name ", [{activity, " (host/ra--erl-src activity)
|
|
"}, {actor, " (host/ra--atom (get activity :actor)) "}])")))
|
|
;; the Erlang expression that resumes a suspended instance with an effect result (async re-entry).
|
|
(define host/ra--resume-expr
|
|
(fn (id result) (str "flow_store:resume(" id ", " (host/ra--atom result) ")")))
|
|
|
|
;; map flow_store's result (er-to-sx-deep'd) onto the seam runner contract.
|
|
;; (ok Id (flow_done V…)) -> {:status "done" :effects (V…) :flow-id Id}
|
|
;; (ok Id (flow_suspended T)) -> {:status "suspended" :resume {:id Id :tag T}}
|
|
(define host/ra--parse
|
|
(fn (r)
|
|
(if (or (not (= (type-of r) "list")) (not (= (str (first r)) "ok")))
|
|
{:status "failed" :error r}
|
|
(let ((id (first (rest r))) (outcome (first (rest (rest r)))))
|
|
(let ((kind (str (first outcome))))
|
|
(cond
|
|
((= kind "flow_done")
|
|
{:status "done" :effects (rest outcome) :flow-id id})
|
|
((= kind "flow_suspended")
|
|
{:status "suspended" :resume {:id id :tag (str (first (rest outcome)))}})
|
|
(else {:status "failed" :error r})))))))
|
|
|
|
;; the RA runner — a seam {:capabilities :run}. Advertises {effect, branch, each, suspend}. :run
|
|
;; marshals the activity, starts the DAG's named flow via the injected erl-eval, parses the result.
|
|
(define host/ra--make-runner
|
|
(fn (erl-eval)
|
|
{:capabilities (list "effect" "branch" "each" "suspend")
|
|
:run (fn (dag env)
|
|
(host/ra--parse (erl-eval (host/ra--start-expr (get dag :erl-flow) (get env :activity)))))}))
|
|
|
|
;; resume a suspended instance out-of-band (the async re-entry path) — re-drive + parse.
|
|
(define host/ra--resume
|
|
(fn (erl-eval id result) (host/ra--parse (erl-eval (host/ra--resume-expr id result)))))
|
|
|
|
;; the REAL erl-eval — ONLY works where the Erlang runtime + next/flow are loaded (refs resolved
|
|
;; lazily at call time, so defining it here is harmless in the plain host).
|
|
(define host/ra--real-eval (fn (src) (er-to-sx-deep (erlang-eval-ast src))))
|
|
|
|
;; ── RA-LIVE: drive the persistent durable-execution KERNEL SERVICE over HTTP ──────────
|
|
;; next/kernel/host_kernel serves GET /flow/start/<category> → "<instance-id>:<status>" and
|
|
;; GET /flow/resume/<id> → "resume:<status>". http-request returns {"status" "headers" "body"};
|
|
;; we take the body. So RA-live's runner is the SAME seam contract, executing durably ON A REMOTE
|
|
;; PROCESS — a suspend returns the kernel instance id as the :resume handle for a later resume.
|
|
(define host/ra--http-get (fn (url) (get (http-request "GET" url {} "") "body")))
|
|
(define host/ra--parse-kernel
|
|
(fn (body)
|
|
(let ((i (index-of body ":")))
|
|
(if (< i 0) {:status "failed" :error body}
|
|
(let ((id (substr body 0 i)) (status (substr body (+ i 1) (- (len body) (+ i 1)))))
|
|
(cond
|
|
((starts-with? status "suspended") {:status "suspended" :resume {:id id}})
|
|
((starts-with? status "done") {:status "done" :effects (list {:verb "digest" :args (list id)})})
|
|
(else {:status "failed" :error body})))))))
|
|
;; a KERNEL runner — a seam runner that dispatches a durable behavior to the persistent kernel at
|
|
;; `base` (e.g. "http://sx_kernel:8930"), branching on the activity's category. Advertises {suspend},
|
|
;; so select-runner routes durable DAGs here.
|
|
(define host/ra--make-kernel-runner
|
|
(fn (base)
|
|
{:capabilities (list "effect" "branch" "each" "suspend")
|
|
:run (fn (dag env)
|
|
(host/ra--parse-kernel
|
|
(host/ra--http-get (str base "/flow/start/" (get (get env :activity) :category)))))}))
|
|
;; resume a suspended kernel instance (the async boundary) → the runner contract.
|
|
(define host/ra--kernel-resume
|
|
(fn (base id)
|
|
(host/ra--parse-kernel-resume (host/ra--http-get (str base "/flow/resume/" id)))))
|
|
(define host/ra--parse-kernel-resume
|
|
(fn (body)
|
|
(if (starts-with? body "resume:done")
|
|
{:status "done" :effects (list {:verb "digest-sent" :args (list)})}
|
|
{:status "failed" :error body})))
|