RA-live: durable business logic in production — host drives the kernel service (LIVE)
Steps 1+2 of RA-live/TA-live, live-verified end-to-end on blog.rose-ash.com. (1) DEPLOY: docker-compose.dev-sx-host.yml gains an sx_kernel service running next/kernel/serve.sh (the durable-execution kernel), SX_HTTP_HOST=0.0.0.0 so the host container reaches it at http://sx_kernel:8930. (2) HOST AS CLIENT: lib/host/ra.sx gains a KERNEL runner — host/ra--make-kernel-runner drives the kernel over HTTP (http-request, native primitive; returns {status headers body}). It advertises {effect,branch,each,suspend}, so select-runner routes a durable DAG to it. host/blog.sx: the DAG registry + runner fleet are now mutable (register-dag!/add-runner!); emit! records SUSPENSIONS in a durable pending log; /flows shows suspended instances with a resume link (?resume=<id>) driving host/ra--kernel-resume. serve.sh wires it: set kernel-base, add the kernel runner, register the durable 'blog-digest' DAG, declare a DURABLE behavior on article (create→publish SYNC, update→ blog-digest DURABLE), add a 'category' field. LIVE PROOF: editing a published newsletter article → Update → routes to the kernel runner → POST /flow/start/newsletter → kernel SUSPENDS (instance 5, shown pending on /flows) → /flows?resume=5 → host re-drives the kernel → DONE → digest-sent effect + pending cleared. Durable suspend/resume across separate HTTP requests, on a deployed persistent kernel. urgent edits complete immediately (digest). http-request works in the serving context. blog 217/217, full conformance green. Co-Authored-By: Claude Opus 4.8 <noreply@anthropic.com>
This commit is contained in:
@@ -182,8 +182,12 @@
|
||||
;; {effect,branch} composition → exec-runner; a {suspend} DAG → RA once RA-live adds it to the fleet.
|
||||
(define host/blog--dag-registry {"publish" host/blog--publish-dag}) ;; name -> behavior DAG
|
||||
(define host/blog--dag-of (fn (name) (get host/blog--dag-registry name)))
|
||||
;; the runner fleet, cheapest-first. exec-runner only until RA-live stands up a persistent kernel.
|
||||
(define host/blog--register-dag! (fn (name dag) (set! host/blog--dag-registry (assoc host/blog--dag-registry name dag))))
|
||||
;; the runner fleet, cheapest-first. exec-runner locally; serve.sh appends the KERNEL runner (RA-live)
|
||||
;; so a {suspend} DAG routes to the durable kernel. host/blog--kernel-base is the kernel URL (serve-set).
|
||||
(define host/blog--runner-fleet (list host/flow--exec-runner))
|
||||
(define host/blog--add-runner! (fn (r) (set! host/blog--runner-fleet (concat host/blog--runner-fleet (list r)))))
|
||||
(define host/blog--kernel-base "")
|
||||
;; per-type behavior declaration, stored on the type-post (string-keyed → persist-safe).
|
||||
(define host/blog--type-behavior (fn (type) (or (get (host/blog-get type) :behavior) (list))))
|
||||
(define host/blog--set-type-behavior!
|
||||
@@ -264,9 +268,32 @@
|
||||
(behavior/make-engine {:triggers host/blog--triggers :runner host/flow--exec-runner
|
||||
:transport host/blog--transport :driver host/blog--driver
|
||||
:ctx-of host/blog--publish-ctx}))
|
||||
;; P2: EMIT any activity through the seam — it is LOGGED (the event source, via the transport) and
|
||||
;; matched against the behavior registry (firing any declared behavior). Returns the trace, or nil.
|
||||
(define host/blog--emit! (fn (a) (if (nil? a) nil (behavior/process host/blog--publish-engine a))))
|
||||
;; RA-live: suspended durable instances awaiting resume (the async boundary). Durable, string-keyed.
|
||||
(define host/blog--pending-log (list))
|
||||
(define host/blog--pendinglog-key "pendinglog")
|
||||
(define host/blog--record-pending!
|
||||
(fn (a s)
|
||||
(let ((rec {"id" (get (get s :resume) :id) "slug" (or (get a :slug) "")
|
||||
"verb" (get a :verb) "category" (or (get a :category) "")}))
|
||||
(begin
|
||||
(set! host/blog--pending-log (concat host/blog--pending-log (list rec)))
|
||||
(persist/backend-kv-put host/blog-store host/blog--pendinglog-key host/blog--pending-log)))))
|
||||
(define host/blog--drop-pending!
|
||||
(fn (id)
|
||||
(begin
|
||||
(set! host/blog--pending-log (filter (fn (p) (not (= (get p "id") id))) host/blog--pending-log))
|
||||
(persist/backend-kv-put host/blog-store host/blog--pendinglog-key host/blog--pending-log))))
|
||||
(define host/blog-load-pendinglog!
|
||||
(fn ()
|
||||
(let ((v (persist/backend-kv-get host/blog-store host/blog--pendinglog-key)))
|
||||
(when (and v (= (type-of v) "list")) (set! host/blog--pending-log v)))))
|
||||
;; P2: EMIT any activity through the seam — LOGGED (event source) + matched (fires behaviors). A
|
||||
;; durable runner that SUSPENDS records its kernel instance in the pending log for a later resume.
|
||||
(define host/blog--emit!
|
||||
(fn (a)
|
||||
(if (nil? a) nil
|
||||
(let ((tr (behavior/process host/blog--publish-engine a)))
|
||||
(begin (for-each (fn (s) (host/blog--record-pending! a s)) (get tr :suspended)) tr)))))
|
||||
;; a slug's content CHANGE → the right verb: draft→published = Create (first publish); published→
|
||||
;; published = Update (a subsequent edit). Draft↔draft emits nothing (unobservable). Fire-once on the
|
||||
;; create transition; an identical re-edit dedups (same verb:cid id).
|
||||
@@ -2786,25 +2813,52 @@
|
||||
(unquote (cons (quote ul) issue-items))
|
||||
(p (a :href (unquote (str "/" slug "/edit")) "Back"))))))))))))))
|
||||
|
||||
;; ── /flows — P0.3 acceptance surface ─────────────────────────────────
|
||||
;; What the publish workflows DID: each effect-as-data record the host driver dispatched, produced
|
||||
;; by running the on-publish DAG through the seam. Publishing a post appends here (live proof the
|
||||
;; behavior engine fired). Public read.
|
||||
;; ── RA-live: resume a suspended durable KERNEL instance (the async boundary) ──────────
|
||||
;; The kernel held the flow suspended across requests; resuming re-drives it to completion, records
|
||||
;; the effects, and clears it from the pending log. Driven here by a link (a timer/queue would in prod).
|
||||
(define host/blog--resume-pending!
|
||||
(fn (id)
|
||||
(let ((r (host/ra--kernel-resume host/blog--kernel-base id)))
|
||||
(begin
|
||||
(when (= (get r :status) "done")
|
||||
(begin
|
||||
(for-each (fn (eff)
|
||||
(set! host/blog--flow-log
|
||||
(concat host/blog--flow-log (list {"verb" (get eff :verb) "args" (get eff :args)}))))
|
||||
(or (get r :effects) (list)))
|
||||
(persist/backend-kv-put host/blog-store host/blog--flowlog-key host/blog--flow-log)
|
||||
(host/blog--drop-pending! id)))
|
||||
r))))
|
||||
;; ── /flows — the behavior surface: what fired + what's SUSPENDED (RA-live). ?resume=<id> resumes. ─
|
||||
(define host/blog-flows
|
||||
(fn (req)
|
||||
(host/blog--resp req 200
|
||||
(host/blog--page req "Flows"
|
||||
(let ((rid (dream-query-param req "resume")))
|
||||
(begin
|
||||
(when (and rid (not (= rid ""))) (host/blog--resume-pending! rid))
|
||||
(host/blog--resp req 200
|
||||
(host/blog--page req "Flows"
|
||||
(quasiquote
|
||||
(div (h1 "Flows")
|
||||
(p "Effect-as-data from publish workflows — the seam: on-publish → publish-DAG → effects.")
|
||||
(p "Effect-as-data from behavior workflows — the seam: activity → DAG → runner → effects.")
|
||||
(h3 :style "font-size:1em;margin:1em 0 0.3em" "Suspended (durable, on the kernel)")
|
||||
(unquote
|
||||
(if (= (len host/blog--pending-log) 0)
|
||||
(quote (p :style "color:#999;margin:0" (em "None suspended.")))
|
||||
(cons (quote ul)
|
||||
(map (fn (p)
|
||||
(quasiquote (li (unquote (str "instance " (get p "id") " — " (get p "slug")
|
||||
" (" (get p "category") ") ")) " "
|
||||
(a :href (unquote (str "/flows?resume=" (get p "id"))) "resume"))))
|
||||
host/blog--pending-log))))
|
||||
(h3 :style "font-size:1em;margin:1em 0 0.3em" "Effects")
|
||||
(unquote
|
||||
(if (= (len host/blog--flow-log) 0)
|
||||
(quote (p (em "No flows yet — publish a post to fire the on-publish DAG.")))
|
||||
(quote (p :style "color:#999;margin:0" (em "No effects yet.")))
|
||||
(cons (quote ul)
|
||||
(map (fn (e)
|
||||
(quasiquote (li (strong (unquote (get e "verb"))) " "
|
||||
(unquote (if (> (len (get e "args")) 0) (str (first (get e "args"))) "")))))
|
||||
host/blog--flow-log))))))))))
|
||||
host/blog--flow-log))))))))))))
|
||||
|
||||
;; ── /activities — P2: the EVENT SOURCE ───────────────────────────────
|
||||
;; Every observable state change emitted as a canonical activity (Create/Update on content,
|
||||
|
||||
@@ -61,3 +61,37 @@
|
||||
;; the REAL erl-eval — ONLY works where the Erlang runtime + next/flow are loaded (refs resolved
|
||||
;; lazily at call time, so defining it here is harmless in the plain host).
|
||||
(define host/ra--real-eval (fn (src) (er-to-sx-deep (erlang-eval-ast src))))
|
||||
|
||||
;; ── RA-LIVE: drive the persistent durable-execution KERNEL SERVICE over HTTP ──────────
|
||||
;; next/kernel/host_kernel serves GET /flow/start/<category> → "<instance-id>:<status>" and
|
||||
;; GET /flow/resume/<id> → "resume:<status>". http-request returns {"status" "headers" "body"};
|
||||
;; we take the body. So RA-live's runner is the SAME seam contract, executing durably ON A REMOTE
|
||||
;; PROCESS — a suspend returns the kernel instance id as the :resume handle for a later resume.
|
||||
(define host/ra--http-get (fn (url) (get (http-request "GET" url {} "") "body")))
|
||||
(define host/ra--parse-kernel
|
||||
(fn (body)
|
||||
(let ((i (index-of body ":")))
|
||||
(if (< i 0) {:status "failed" :error body}
|
||||
(let ((id (substr body 0 i)) (status (substr body (+ i 1) (- (len body) (+ i 1)))))
|
||||
(cond
|
||||
((starts-with? status "suspended") {:status "suspended" :resume {:id id}})
|
||||
((starts-with? status "done") {:status "done" :effects (list {:verb "digest" :args (list id)})})
|
||||
(else {:status "failed" :error body})))))))
|
||||
;; a KERNEL runner — a seam runner that dispatches a durable behavior to the persistent kernel at
|
||||
;; `base` (e.g. "http://sx_kernel:8930"), branching on the activity's category. Advertises {suspend},
|
||||
;; so select-runner routes durable DAGs here.
|
||||
(define host/ra--make-kernel-runner
|
||||
(fn (base)
|
||||
{:capabilities (list "effect" "branch" "each" "suspend")
|
||||
:run (fn (dag env)
|
||||
(host/ra--parse-kernel
|
||||
(host/ra--http-get (str base "/flow/start/" (get (get env :activity) :category)))))}))
|
||||
;; resume a suspended kernel instance (the async boundary) → the runner contract.
|
||||
(define host/ra--kernel-resume
|
||||
(fn (base id)
|
||||
(host/ra--parse-kernel-resume (host/ra--http-get (str base "/flow/resume/" id)))))
|
||||
(define host/ra--parse-kernel-resume
|
||||
(fn (body)
|
||||
(if (starts-with? body "resume:done")
|
||||
{:status "done" :effects (list {:verb "digest-sent" :args (list)})}
|
||||
{:status "failed" :error body})))
|
||||
|
||||
@@ -190,11 +190,36 @@ EPOCH=1
|
||||
echo "(epoch $EPOCH)"
|
||||
echo "(eval \"(host/blog-seed-types!)\")"
|
||||
EPOCH=$((EPOCH+1))
|
||||
# RA-LIVE: point at the durable-execution KERNEL, add it to the runner fleet, register the durable
|
||||
# "blog-digest" DAG (needs {suspend} → routes to the kernel), and declare a DURABLE behavior on the
|
||||
# article: an UPDATE (a published edit) runs the kernel flow (newsletter suspends until resumed,
|
||||
# urgent completes now). create→publish stays SYNCHRONOUS (exec-fold). Then reload the registry +
|
||||
# the pending log. host/blog--kernel-base is a mutable define set here.
|
||||
echo "(epoch $EPOCH)"
|
||||
echo "(eval \"(set! host/blog--kernel-base \\\"http://sx_kernel:8930\\\")\")"
|
||||
EPOCH=$((EPOCH+1))
|
||||
echo "(epoch $EPOCH)"
|
||||
echo "(eval \"(host/blog--add-runner! (host/ra--make-kernel-runner host/blog--kernel-base))\")"
|
||||
EPOCH=$((EPOCH+1))
|
||||
echo "(epoch $EPOCH)"
|
||||
echo "(eval \"(host/blog--register-dag! \\\"blog-digest\\\" {:erl-flow \\\"blog_digest\\\" :needs (list \\\"effect\\\" \\\"branch\\\" \\\"suspend\\\")})\")"
|
||||
EPOCH=$((EPOCH+1))
|
||||
echo "(epoch $EPOCH)"
|
||||
echo "(eval \"(host/blog--set-type-behavior! \\\"article\\\" (list {\\\"verb\\\" \\\"create\\\" \\\"type\\\" \\\"article\\\" \\\"dag\\\" \\\"publish\\\"} {\\\"verb\\\" \\\"update\\\" \\\"type\\\" \\\"article\\\" \\\"dag\\\" \\\"blog-digest\\\"}))\")"
|
||||
EPOCH=$((EPOCH+1))
|
||||
# Give the article type a "category" field so the edit form can set newsletter/urgent — the
|
||||
# branch the durable kernel flow keys on (newsletter suspends until resumed).
|
||||
echo "(epoch $EPOCH)"
|
||||
echo "(eval \"(host/blog--set-fields! \\\"article\\\" (list {:name \\\"category\\\" :type \\\"text\\\"}))\")"
|
||||
EPOCH=$((EPOCH+1))
|
||||
# P1: gather the types' declared :behavior bindings into the registry the trigger match
|
||||
# consults (so publishing an article fires its declared on-publish DAG, runner derived).
|
||||
echo "(epoch $EPOCH)"
|
||||
echo "(eval \"(host/blog--load-behaviors!)\")"
|
||||
EPOCH=$((EPOCH+1))
|
||||
echo "(epoch $EPOCH)"
|
||||
echo "(eval \"(host/blog-load-pendinglog!)\")"
|
||||
EPOCH=$((EPOCH+1))
|
||||
# Seed a live demo of the composition fold (plans/composition-objects.md): /compose-demo
|
||||
# is one composition object rendered by host/comp-render — renders differently by context.
|
||||
echo "(epoch $EPOCH)"
|
||||
|
||||
Reference in New Issue
Block a user