From cb0d866002b5fe0d5608c5325a99dd63c502ebbd Mon Sep 17 00:00:00 2001 From: giles Date: Thu, 2 Jul 2026 19:31:33 +0000 Subject: [PATCH] =?UTF-8?q?RA-live:=20durable=20business=20logic=20in=20pr?= =?UTF-8?q?oduction=20=E2=80=94=20host=20drives=20the=20kernel=20service?= =?UTF-8?q?=20(LIVE)?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Steps 1+2 of RA-live/TA-live, live-verified end-to-end on blog.rose-ash.com. (1) DEPLOY: docker-compose.dev-sx-host.yml gains an sx_kernel service running next/kernel/serve.sh (the durable-execution kernel), SX_HTTP_HOST=0.0.0.0 so the host container reaches it at http://sx_kernel:8930. (2) HOST AS CLIENT: lib/host/ra.sx gains a KERNEL runner — host/ra--make-kernel-runner drives the kernel over HTTP (http-request, native primitive; returns {status headers body}). It advertises {effect,branch,each,suspend}, so select-runner routes a durable DAG to it. host/blog.sx: the DAG registry + runner fleet are now mutable (register-dag!/add-runner!); emit! records SUSPENSIONS in a durable pending log; /flows shows suspended instances with a resume link (?resume=) driving host/ra--kernel-resume. serve.sh wires it: set kernel-base, add the kernel runner, register the durable 'blog-digest' DAG, declare a DURABLE behavior on article (create→publish SYNC, update→ blog-digest DURABLE), add a 'category' field. LIVE PROOF: editing a published newsletter article → Update → routes to the kernel runner → POST /flow/start/newsletter → kernel SUSPENDS (instance 5, shown pending on /flows) → /flows?resume=5 → host re-drives the kernel → DONE → digest-sent effect + pending cleared. Durable suspend/resume across separate HTTP requests, on a deployed persistent kernel. urgent edits complete immediately (digest). http-request works in the serving context. blog 217/217, full conformance green. Co-Authored-By: Claude Opus 4.8 --- docker-compose.dev-sx-host.yml | 23 ++++++++++ lib/host/blog.sx | 80 ++++++++++++++++++++++++++++------ lib/host/ra.sx | 34 +++++++++++++++ lib/host/serve.sh | 25 +++++++++++ 4 files changed, 149 insertions(+), 13 deletions(-) diff --git a/docker-compose.dev-sx-host.yml b/docker-compose.dev-sx-host.yml index 9aad03ec..c3a47f06 100644 --- a/docker-compose.dev-sx-host.yml +++ b/docker-compose.dev-sx-host.yml @@ -53,6 +53,29 @@ services: - default restart: unless-stopped + # The durable-execution KERNEL (next/kernel/host_kernel.erl) — a persistent next/ service holding + # flow_store across requests (RA-live substrate). The host reaches it at http://sx_kernel:8930 over + # the shared `default` network. SX_HTTP_HOST=0.0.0.0 so the bind is reachable cross-container. + sx_kernel: + image: registry.rose-ash.com:5000/sx_docs:latest + container_name: sx-dev-sx_kernel-1 + entrypoint: ["bash", "/app/next/kernel/serve.sh"] + working_dir: /app + environment: + SX_PROJECT_DIR: /app + SX_SERVER: /app/bin/sx_server + KERNEL_PORT: "8930" + SX_HTTP_HOST: "0.0.0.0" + OCAMLRUNPARAM: "b" + volumes: + - ./spec:/app/spec:ro + - ./lib:/app/lib:ro + - ./next:/app/next:ro + - ./hosts/ocaml/_build/default/bin/sx_server.exe:/app/bin/sx_server:ro + networks: + - default + restart: unless-stopped + networks: externalnet: external: true diff --git a/lib/host/blog.sx b/lib/host/blog.sx index a6be0565..ebb373e3 100644 --- a/lib/host/blog.sx +++ b/lib/host/blog.sx @@ -182,8 +182,12 @@ ;; {effect,branch} composition → exec-runner; a {suspend} DAG → RA once RA-live adds it to the fleet. (define host/blog--dag-registry {"publish" host/blog--publish-dag}) ;; name -> behavior DAG (define host/blog--dag-of (fn (name) (get host/blog--dag-registry name))) -;; the runner fleet, cheapest-first. exec-runner only until RA-live stands up a persistent kernel. +(define host/blog--register-dag! (fn (name dag) (set! host/blog--dag-registry (assoc host/blog--dag-registry name dag)))) +;; the runner fleet, cheapest-first. exec-runner locally; serve.sh appends the KERNEL runner (RA-live) +;; so a {suspend} DAG routes to the durable kernel. host/blog--kernel-base is the kernel URL (serve-set). (define host/blog--runner-fleet (list host/flow--exec-runner)) +(define host/blog--add-runner! (fn (r) (set! host/blog--runner-fleet (concat host/blog--runner-fleet (list r))))) +(define host/blog--kernel-base "") ;; per-type behavior declaration, stored on the type-post (string-keyed → persist-safe). (define host/blog--type-behavior (fn (type) (or (get (host/blog-get type) :behavior) (list)))) (define host/blog--set-type-behavior! @@ -264,9 +268,32 @@ (behavior/make-engine {:triggers host/blog--triggers :runner host/flow--exec-runner :transport host/blog--transport :driver host/blog--driver :ctx-of host/blog--publish-ctx})) -;; P2: EMIT any activity through the seam — it is LOGGED (the event source, via the transport) and -;; matched against the behavior registry (firing any declared behavior). Returns the trace, or nil. -(define host/blog--emit! (fn (a) (if (nil? a) nil (behavior/process host/blog--publish-engine a)))) +;; RA-live: suspended durable instances awaiting resume (the async boundary). Durable, string-keyed. +(define host/blog--pending-log (list)) +(define host/blog--pendinglog-key "pendinglog") +(define host/blog--record-pending! + (fn (a s) + (let ((rec {"id" (get (get s :resume) :id) "slug" (or (get a :slug) "") + "verb" (get a :verb) "category" (or (get a :category) "")})) + (begin + (set! host/blog--pending-log (concat host/blog--pending-log (list rec))) + (persist/backend-kv-put host/blog-store host/blog--pendinglog-key host/blog--pending-log))))) +(define host/blog--drop-pending! + (fn (id) + (begin + (set! host/blog--pending-log (filter (fn (p) (not (= (get p "id") id))) host/blog--pending-log)) + (persist/backend-kv-put host/blog-store host/blog--pendinglog-key host/blog--pending-log)))) +(define host/blog-load-pendinglog! + (fn () + (let ((v (persist/backend-kv-get host/blog-store host/blog--pendinglog-key))) + (when (and v (= (type-of v) "list")) (set! host/blog--pending-log v))))) +;; P2: EMIT any activity through the seam — LOGGED (event source) + matched (fires behaviors). A +;; durable runner that SUSPENDS records its kernel instance in the pending log for a later resume. +(define host/blog--emit! + (fn (a) + (if (nil? a) nil + (let ((tr (behavior/process host/blog--publish-engine a))) + (begin (for-each (fn (s) (host/blog--record-pending! a s)) (get tr :suspended)) tr))))) ;; a slug's content CHANGE → the right verb: draft→published = Create (first publish); published→ ;; published = Update (a subsequent edit). Draft↔draft emits nothing (unobservable). Fire-once on the ;; create transition; an identical re-edit dedups (same verb:cid id). @@ -2786,25 +2813,52 @@ (unquote (cons (quote ul) issue-items)) (p (a :href (unquote (str "/" slug "/edit")) "Back")))))))))))))) -;; ── /flows — P0.3 acceptance surface ───────────────────────────────── -;; What the publish workflows DID: each effect-as-data record the host driver dispatched, produced -;; by running the on-publish DAG through the seam. Publishing a post appends here (live proof the -;; behavior engine fired). Public read. +;; ── RA-live: resume a suspended durable KERNEL instance (the async boundary) ────────── +;; The kernel held the flow suspended across requests; resuming re-drives it to completion, records +;; the effects, and clears it from the pending log. Driven here by a link (a timer/queue would in prod). +(define host/blog--resume-pending! + (fn (id) + (let ((r (host/ra--kernel-resume host/blog--kernel-base id))) + (begin + (when (= (get r :status) "done") + (begin + (for-each (fn (eff) + (set! host/blog--flow-log + (concat host/blog--flow-log (list {"verb" (get eff :verb) "args" (get eff :args)})))) + (or (get r :effects) (list))) + (persist/backend-kv-put host/blog-store host/blog--flowlog-key host/blog--flow-log) + (host/blog--drop-pending! id))) + r)))) +;; ── /flows — the behavior surface: what fired + what's SUSPENDED (RA-live). ?resume= resumes. ─ (define host/blog-flows (fn (req) - (host/blog--resp req 200 - (host/blog--page req "Flows" + (let ((rid (dream-query-param req "resume"))) + (begin + (when (and rid (not (= rid ""))) (host/blog--resume-pending! rid)) + (host/blog--resp req 200 + (host/blog--page req "Flows" (quasiquote (div (h1 "Flows") - (p "Effect-as-data from publish workflows — the seam: on-publish → publish-DAG → effects.") + (p "Effect-as-data from behavior workflows — the seam: activity → DAG → runner → effects.") + (h3 :style "font-size:1em;margin:1em 0 0.3em" "Suspended (durable, on the kernel)") + (unquote + (if (= (len host/blog--pending-log) 0) + (quote (p :style "color:#999;margin:0" (em "None suspended."))) + (cons (quote ul) + (map (fn (p) + (quasiquote (li (unquote (str "instance " (get p "id") " — " (get p "slug") + " (" (get p "category") ") ")) " " + (a :href (unquote (str "/flows?resume=" (get p "id"))) "resume")))) + host/blog--pending-log)))) + (h3 :style "font-size:1em;margin:1em 0 0.3em" "Effects") (unquote (if (= (len host/blog--flow-log) 0) - (quote (p (em "No flows yet — publish a post to fire the on-publish DAG."))) + (quote (p :style "color:#999;margin:0" (em "No effects yet."))) (cons (quote ul) (map (fn (e) (quasiquote (li (strong (unquote (get e "verb"))) " " (unquote (if (> (len (get e "args")) 0) (str (first (get e "args"))) ""))))) - host/blog--flow-log)))))))))) + host/blog--flow-log)))))))))))) ;; ── /activities — P2: the EVENT SOURCE ─────────────────────────────── ;; Every observable state change emitted as a canonical activity (Create/Update on content, diff --git a/lib/host/ra.sx b/lib/host/ra.sx index f249ce76..43f8d060 100644 --- a/lib/host/ra.sx +++ b/lib/host/ra.sx @@ -61,3 +61,37 @@ ;; the REAL erl-eval — ONLY works where the Erlang runtime + next/flow are loaded (refs resolved ;; lazily at call time, so defining it here is harmless in the plain host). (define host/ra--real-eval (fn (src) (er-to-sx-deep (erlang-eval-ast src)))) + +;; ── RA-LIVE: drive the persistent durable-execution KERNEL SERVICE over HTTP ────────── +;; next/kernel/host_kernel serves GET /flow/start/ → ":" and +;; GET /flow/resume/ → "resume:". http-request returns {"status" "headers" "body"}; +;; we take the body. So RA-live's runner is the SAME seam contract, executing durably ON A REMOTE +;; PROCESS — a suspend returns the kernel instance id as the :resume handle for a later resume. +(define host/ra--http-get (fn (url) (get (http-request "GET" url {} "") "body"))) +(define host/ra--parse-kernel + (fn (body) + (let ((i (index-of body ":"))) + (if (< i 0) {:status "failed" :error body} + (let ((id (substr body 0 i)) (status (substr body (+ i 1) (- (len body) (+ i 1))))) + (cond + ((starts-with? status "suspended") {:status "suspended" :resume {:id id}}) + ((starts-with? status "done") {:status "done" :effects (list {:verb "digest" :args (list id)})}) + (else {:status "failed" :error body}))))))) +;; a KERNEL runner — a seam runner that dispatches a durable behavior to the persistent kernel at +;; `base` (e.g. "http://sx_kernel:8930"), branching on the activity's category. Advertises {suspend}, +;; so select-runner routes durable DAGs here. +(define host/ra--make-kernel-runner + (fn (base) + {:capabilities (list "effect" "branch" "each" "suspend") + :run (fn (dag env) + (host/ra--parse-kernel + (host/ra--http-get (str base "/flow/start/" (get (get env :activity) :category)))))})) +;; resume a suspended kernel instance (the async boundary) → the runner contract. +(define host/ra--kernel-resume + (fn (base id) + (host/ra--parse-kernel-resume (host/ra--http-get (str base "/flow/resume/" id))))) +(define host/ra--parse-kernel-resume + (fn (body) + (if (starts-with? body "resume:done") + {:status "done" :effects (list {:verb "digest-sent" :args (list)})} + {:status "failed" :error body}))) diff --git a/lib/host/serve.sh b/lib/host/serve.sh index 0b566198..5235d94e 100755 --- a/lib/host/serve.sh +++ b/lib/host/serve.sh @@ -190,11 +190,36 @@ EPOCH=1 echo "(epoch $EPOCH)" echo "(eval \"(host/blog-seed-types!)\")" EPOCH=$((EPOCH+1)) + # RA-LIVE: point at the durable-execution KERNEL, add it to the runner fleet, register the durable + # "blog-digest" DAG (needs {suspend} → routes to the kernel), and declare a DURABLE behavior on the + # article: an UPDATE (a published edit) runs the kernel flow (newsletter suspends until resumed, + # urgent completes now). create→publish stays SYNCHRONOUS (exec-fold). Then reload the registry + + # the pending log. host/blog--kernel-base is a mutable define set here. + echo "(epoch $EPOCH)" + echo "(eval \"(set! host/blog--kernel-base \\\"http://sx_kernel:8930\\\")\")" + EPOCH=$((EPOCH+1)) + echo "(epoch $EPOCH)" + echo "(eval \"(host/blog--add-runner! (host/ra--make-kernel-runner host/blog--kernel-base))\")" + EPOCH=$((EPOCH+1)) + echo "(epoch $EPOCH)" + echo "(eval \"(host/blog--register-dag! \\\"blog-digest\\\" {:erl-flow \\\"blog_digest\\\" :needs (list \\\"effect\\\" \\\"branch\\\" \\\"suspend\\\")})\")" + EPOCH=$((EPOCH+1)) + echo "(epoch $EPOCH)" + echo "(eval \"(host/blog--set-type-behavior! \\\"article\\\" (list {\\\"verb\\\" \\\"create\\\" \\\"type\\\" \\\"article\\\" \\\"dag\\\" \\\"publish\\\"} {\\\"verb\\\" \\\"update\\\" \\\"type\\\" \\\"article\\\" \\\"dag\\\" \\\"blog-digest\\\"}))\")" + EPOCH=$((EPOCH+1)) + # Give the article type a "category" field so the edit form can set newsletter/urgent — the + # branch the durable kernel flow keys on (newsletter suspends until resumed). + echo "(epoch $EPOCH)" + echo "(eval \"(host/blog--set-fields! \\\"article\\\" (list {:name \\\"category\\\" :type \\\"text\\\"}))\")" + EPOCH=$((EPOCH+1)) # P1: gather the types' declared :behavior bindings into the registry the trigger match # consults (so publishing an article fires its declared on-publish DAG, runner derived). echo "(epoch $EPOCH)" echo "(eval \"(host/blog--load-behaviors!)\")" EPOCH=$((EPOCH+1)) + echo "(epoch $EPOCH)" + echo "(eval \"(host/blog-load-pendinglog!)\")" + EPOCH=$((EPOCH+1)) # Seed a live demo of the composition fold (plans/composition-objects.md): /compose-demo # is one composition object rendered by host/comp-render — renders differently by context. echo "(epoch $EPOCH)"