diff --git a/lib/host/conformance.sh b/lib/host/conformance.sh index 0aabeba9..e7efc1d2 100755 --- a/lib/host/conformance.sh +++ b/lib/host/conformance.sh @@ -96,6 +96,7 @@ MODULES=( "lib/host/execute.sx" "lib/host/behavior.sx" "lib/host/flows.sx" + "lib/host/ra.sx" "lib/host/htmlsx.sx" "lib/host/blog.sx" "lib/host/page.sx" @@ -116,6 +117,7 @@ SUITES=( "htmlsx host-ht-tests-run! lib/host/tests/htmlsx.sx" "behavior host-be-tests-run! lib/host/tests/behavior.sx" "flows host-fl-tests-run! lib/host/tests/flows.sx" + "ra host-ra-tests-run! lib/host/tests/ra.sx" "compose host-cp-tests-run! lib/host/tests/compose.sx" "execute host-ex-tests-run! lib/host/tests/execute.sx" "session host-se-tests-run! lib/host/tests/session.sx" diff --git a/lib/host/flows.sx b/lib/host/flows.sx index 7c2a36f0..e7c5b311 100644 --- a/lib/host/flows.sx +++ b/lib/host/flows.sx @@ -21,16 +21,31 @@ (else nil)))) (define host/flow--uniq-concat (fn (a b) (reduce (fn (acc x) (if (contains? acc x) acc (concat acc (list x)))) a b))) -;; the capability SET a composition requires — the union of its nodes' caps (walked recursively). +;; the capability SET a DAG requires. An SX composition: the union of its nodes' caps (walked). A +;; durable/opaque DAG (e.g. {:erl-flow :needs (…)}) DECLARES its caps via :needs — the runner +;; can't introspect a foreign flow, so it states what it needs (e.g. {suspend} for a wait). (define host/flow--required-caps (fn (node) - (if (not (= (type-of node) "list")) (list) - (let ((self (host/flow--node-cap (str (first node)))) - (kids (reduce (fn (acc c) (host/flow--uniq-concat acc (host/flow--required-caps c))) - (list) (rest node)))) - (if (nil? self) kids (host/flow--uniq-concat (list self) kids)))))) + (cond + ((and (= (type-of node) "dict") (get node :needs)) (get node :needs)) + ((not (= (type-of node) "list")) (list)) + (else + (let ((self (host/flow--node-cap (str (first node)))) + (kids (reduce (fn (acc c) (host/flow--uniq-concat acc (host/flow--required-caps c))) + (list) (rest node)))) + (if (nil? self) kids (host/flow--uniq-concat (list self) kids))))))) (define host/flow--subset? (fn (a b) (reduce (fn (ok x) (and ok (contains? b x))) true a))) +;; DERIVE the runner from a fleet: the FIRST runner (in ladder order — cheapest first) whose +;; advertised capabilities cover the DAG's required set. nil if none fits (a hard bind error). +;; This is the dual-runner routing: an {effect,branch}-only DAG lands on the exec-runner; a DAG +;; needing {suspend} skips past it to RA. Same DAGs, runner chosen by need — no human hint. +(define host/flow--select-runner + (fn (runners dag) + (let ((need (host/flow--required-caps dag))) + (reduce (fn (acc r) (if (and (nil? acc) (host/flow--subset? need (get r :capabilities))) r acc)) + nil runners)))) + ;; ── the SYNCHRONOUS op-table runner = the execute-fold ──────────────── ;; a seam runner {:capabilities :run}. It ADVERTISES {effect, branch, each} — the execute-fold ;; vocabulary. run: fold the composition (dag) against the env's :ctx → the effect log (as data). diff --git a/lib/host/ra.sx b/lib/host/ra.sx new file mode 100644 index 00000000..f249ce76 --- /dev/null +++ b/lib/host/ra.sx @@ -0,0 +1,63 @@ +;; lib/host/ra.sx — RA: the ERLANG (durable) RUNNER ADAPTER (plans/business-logic-fed-flows.md). +;; A seam runner (behavior.sx) that runs a behavior as a DURABLE next/flow (flow_store), so a flow +;; can SUSPEND (wait-until-morning) and RESUME — the capability {suspend} the execute-fold runner +;; lacks. Spike-proven (plans/ra-spike.sh, 4/4): our canonical activity marshals to Erlang, drives +;; blog_publish_digest, and completes a suspend→resume cycle with no er-scheduler deadlock. +;; +;; PURE SX + INJECTED erl-eval. host/ra--make-runner takes an `erl-eval` fn (src -> clean SX value). +;; The REAL one (host/ra--real-eval) wraps (er-to-sx-deep (erlang-eval-ast src)) and only works in a +;; process with the Erlang runtime + next/flow loaded; unit tests inject a MOCK. So this module loads +;; in the plain host (the Erlang refs sit inside a lambda, resolved lazily at call time) and is +;; testable without the Erlang substrate. A durable behavior DAG is {:erl-flow :needs }. + +(define host/ra--atom (fn (s) (str "'" s "'"))) ;; single-quoted Erlang atom (special-char safe) +(define host/ra--bin (fn (s) (str "<<\"" s "\">>"))) ;; Erlang binary (no /utf8 — unsupported) + +;; our canonical activity dict -> the Erlang activity-proplist SOURCE the flows consume. +(define host/ra--erl-src + (fn (a) + (str "[{type, " (host/ra--atom (get a :verb)) "}, {actor, " (host/ra--atom (get a :actor)) "}, " + "{id, " (host/ra--bin (get a :id)) "}, " + "{object, [{type, " (host/ra--atom (get a :object-type)) "}, " + "{category, " (host/ra--atom (get a :category)) "}]}]"))) + +;; the Erlang expression that starts a named durable flow with the marshalled activity env. +(define host/ra--start-expr + (fn (flow-name activity) + (str "flow_store:start(" flow-name ", [{activity, " (host/ra--erl-src activity) + "}, {actor, " (host/ra--atom (get activity :actor)) "}])"))) +;; the Erlang expression that resumes a suspended instance with an effect result (async re-entry). +(define host/ra--resume-expr + (fn (id result) (str "flow_store:resume(" id ", " (host/ra--atom result) ")"))) + +;; map flow_store's result (er-to-sx-deep'd) onto the seam runner contract. +;; (ok Id (flow_done V…)) -> {:status "done" :effects (V…) :flow-id Id} +;; (ok Id (flow_suspended T)) -> {:status "suspended" :resume {:id Id :tag T}} +(define host/ra--parse + (fn (r) + (if (or (not (= (type-of r) "list")) (not (= (str (first r)) "ok"))) + {:status "failed" :error r} + (let ((id (first (rest r))) (outcome (first (rest (rest r))))) + (let ((kind (str (first outcome)))) + (cond + ((= kind "flow_done") + {:status "done" :effects (rest outcome) :flow-id id}) + ((= kind "flow_suspended") + {:status "suspended" :resume {:id id :tag (str (first (rest outcome)))}}) + (else {:status "failed" :error r}))))))) + +;; the RA runner — a seam {:capabilities :run}. Advertises {effect, branch, each, suspend}. :run +;; marshals the activity, starts the DAG's named flow via the injected erl-eval, parses the result. +(define host/ra--make-runner + (fn (erl-eval) + {:capabilities (list "effect" "branch" "each" "suspend") + :run (fn (dag env) + (host/ra--parse (erl-eval (host/ra--start-expr (get dag :erl-flow) (get env :activity)))))})) + +;; resume a suspended instance out-of-band (the async re-entry path) — re-drive + parse. +(define host/ra--resume + (fn (erl-eval id result) (host/ra--parse (erl-eval (host/ra--resume-expr id result))))) + +;; the REAL erl-eval — ONLY works where the Erlang runtime + next/flow are loaded (refs resolved +;; lazily at call time, so defining it here is harmless in the plain host). +(define host/ra--real-eval (fn (src) (er-to-sx-deep (erlang-eval-ast src)))) diff --git a/lib/host/serve.sh b/lib/host/serve.sh index 7a9d30f6..02ebac93 100755 --- a/lib/host/serve.sh +++ b/lib/host/serve.sh @@ -111,6 +111,7 @@ MODULES=( "lib/host/execute.sx" "lib/host/behavior.sx" "lib/host/flows.sx" + "lib/host/ra.sx" "lib/host/htmlsx.sx" "lib/host/blog.sx" "lib/host/server.sx" diff --git a/lib/host/tests/ra.sx b/lib/host/tests/ra.sx new file mode 100644 index 00000000..90967e42 --- /dev/null +++ b/lib/host/tests/ra.sx @@ -0,0 +1,62 @@ +;; lib/host/tests/ra.sx — the RA (Erlang durable) runner adapter (lib/host/ra.sx), unit-tested with +;; a MOCK erl-eval (no Erlang runtime needed). The REAL dispatch is proven in plans/ra-spike.sh. + +(define host-ra-pass 0) +(define host-ra-fail 0) +(define host-ra-fails (list)) +(define host-ra-test + (fn (name actual expected) + (if (= actual expected) + (set! host-ra-pass (+ host-ra-pass 1)) + (begin (set! host-ra-fail (+ host-ra-fail 1)) + (append! host-ra-fails {:name name :actual actual :expected expected}))))) + +(define ra-urgent {:verb "create" :actor "site" :id "u1" :object-type "article" :category "urgent"}) +(define ra-news {:verb "create" :actor "site" :id "n1" :object-type "article" :category "newsletter"}) + +;; ── marshalling: our canonical activity → Erlang source ── +(host-ra-test "erl-src marshals the activity → Erlang activity-proplist source" + (host/ra--erl-src ra-urgent) + "[{type, 'create'}, {actor, 'site'}, {id, <<\"u1\">>}, {object, [{type, 'article'}, {category, 'urgent'}]}]") +(host-ra-test "start-expr wraps it in flow_store:start with the env" + (host/ra--start-expr "bd" ra-urgent) + "flow_store:start(bd, [{activity, [{type, 'create'}, {actor, 'site'}, {id, <<\"u1\">>}, {object, [{type, 'article'}, {category, 'urgent'}]}]}, {actor, 'site'}])") + +;; ── result parsing: flow_store's er-to-sx-deep'd result → the seam runner contract ── +(host-ra-test "parse DONE → {:status done :effects … :flow-id}" + (let ((p (host/ra--parse (list "ok" 1 (list "flow_done" (list "digest_sent")))))) + (list (get p :status) (get p :flow-id))) + (list "done" 1)) +(host-ra-test "parse SUSPENDED → {:status suspended :resume {:id :tag}}" + (let ((p (host/ra--parse (list "ok" 1 (list "flow_suspended" "morning"))))) + (list (get p :status) (get (get p :resume) :id) (get (get p :resume) :tag))) + (list "suspended" 1 "morning")) +(host-ra-test "parse garbage → failed" + (get (host/ra--parse (list "error" "boom")) :status) "failed") + +;; ── the runner (MOCK erl-eval keyed on the marshalled src) ── +(define ra-mock + (fn (src) (if (>= (index-of src "newsletter") 0) + (list "ok" 7 (list "flow_suspended" "morning")) + (list "ok" 3 (list "flow_done" (list "digest_sent")))))) +(define ra-runner (host/ra--make-runner ra-mock)) +(host-ra-test "RA runner advertises {effect, branch, each, suspend}" + (get ra-runner :capabilities) (list "effect" "branch" "each" "suspend")) +(host-ra-test "RA runner: urgent → done; newsletter → suspended (via the injected erl-eval)" + (list (get ((get ra-runner :run) {:erl-flow "bd"} {:activity ra-urgent}) :status) + (get ((get ra-runner :run) {:erl-flow "bd"} {:activity ra-news}) :status)) + (list "done" "suspended")) +(host-ra-test "RA resume drives a suspended instance (async re-entry)" + (get (host/ra--resume ra-mock 7 "morning_ts") :status) "done") + +;; ── dual-runner routing: the capability model, now REAL (2 runners) ── +(host-ra-test "select-runner: {effect,branch} composition → exec-runner; {suspend} dag → RA" + (let ((fleet (list host/flow--exec-runner ra-runner))) + (list (get (host/flow--select-runner fleet (quote (alt (when (eq "k" "v") (effect a)) (else (effect b))))) :capabilities) + (get (host/flow--select-runner fleet {:erl-flow "bd" :needs (list "effect" "branch" "suspend")}) :capabilities))) + (list (list "effect" "branch" "each") (list "effect" "branch" "each" "suspend"))) + +(define host-ra-tests-run! + (fn () + {:total (+ host-ra-pass host-ra-fail) + :passed host-ra-pass :failed host-ra-fail :fails host-ra-fails})) diff --git a/plans/business-logic-fed-flows.md b/plans/business-logic-fed-flows.md index 5b9c34ce..eacf9c8a 100644 --- a/plans/business-logic-fed-flows.md +++ b/plans/business-logic-fed-flows.md @@ -225,14 +225,29 @@ driven from SX. KEY FINDINGS for the build: erlang-eval-ast returns Erlang TERMS raw, atoms as {:tag atom :name …}) — the runner must parse results, not assume :name; flow_store start→{done,V}|{suspended,Tag}, resume(Id,Res) maps 1:1 onto {:status done|suspended :effects :resume}; the flow instance Id is the resume handle. -- [ ] Build the RA runner (a seam {:capabilities #{effect,branch,each,suspend} :run}): marshal via - host/blog--activity->erl → Erlang source; flow_store:start (or apply_triggers) via erlang-eval-ast; - parse {done,V}→{:status done :effects V} / {suspended,Tag}→{:status suspended :resume {id,tag}}. -- [ ] REMAINING (beyond the spike): (a) load the Erlang runtime + next/flow into the serving process - (big serve.sh dependency) OR run RA out-of-process; (b) DEBT #3 — async boundary: dispatch off the - request path, a background loop drives resume + behavior/pump; (c) real CID→binary marshalling - (spike used atom ids); (d) structured result parsing (Erlang term → SX effects). Baseline still - green: next/tests/triggers_e2e.sh 10/10. +- [x] **RA RUNNER BUILT + TESTED (module + integration) 2026-07-02.** lib/host/ra.sx — a PURE-SX + seam runner (advertises {effect,branch,each,suspend}) with an INJECTED erl-eval (real = + er-to-sx-deep ∘ erlang-eval-ast; mock in unit tests), so it loads in the plain host and is testable + without the Erlang runtime. host/ra--{atom,bin,erl-src,start-expr,resume-expr,parse,make-runner, + resume,real-eval}: marshals our canonical activity → Erlang source (CID as <<"…">> binary, atoms + single-quoted), starts flow_store, parses (ok Id (flow_done V))→{:status done :effects V :flow-id} / + (ok Id (flow_suspended T))→{:status suspended :resume {:id :tag}}. DUAL-RUNNER ROUTING in flows.sx: + host/flow--required-caps handles a {:erl-flow :needs} DAG (declared caps); host/flow--select-runner + picks the cheapest runner covering the DAG's needs — the capability model is now REAL (2 runners: + an {effect,branch} composition → exec-runner; a {suspend} DAG → RA). ra 9/9 (mock) + plans/ + ra-integration.sh 4/4 (the REAL module driving live flow_store: urgent→done, newsletter→suspended + with resume handle, effect-as-data carried). Full host conformance green. next/tests/triggers_e2e.sh + 10/10 baseline intact. +- [ ] **RA-LIVE (deferred — the deployment step, prerequisite now PRECISE).** KEY FINDING: gen_servers + do NOT persist across separate erlang-eval-ast calls (flow README: "the scheduler doesn't preserve + spawned processes across separate erlang-eval-ast invocations"). So a boot-per-call proves the + module (done), but TRUE async (suspend → return the request → resume LATER in another call) needs a + PERSISTENT next/ kernel PROCESS holding flow_store — the async boundary (DEBT #3) is deeper than + "off the request path". REMAINING: (a) stand up a long-lived next/ kernel (nx_kernel/http_server + already run persistently for TCP) that RA talks to; (b) wire a DURABLE behavior binding ({:erl-flow + "blog_digest" :needs (effect branch suspend)}) into the live publish engine, routed to RA via + select-runner; (c) the resumed completion re-enters via the transport inbound + behavior/pump. + The runner + marshalling + suspend/resume mechanics are all proven; this is process lifecycle + wiring. ## TA — the FED-SX TRANSPORT adapter ← federation proper - [ ] A seam transport over next/ delivery: :emit → outbox → peers; :deliver → inbox. A remote @@ -267,6 +282,16 @@ covers everything until a DAG's cost/latency/placement forces the substrate. activities), so business logic can change state, which federates, which triggers more flows. ## Progress log (newest first) +- 2026-07-02 — RA RUNNER BUILT + tested (module + integration). lib/host/ra.sx = a pure-SX seam + runner with injected erl-eval (loads in the plain host, mock-testable); marshals our activity → + Erlang, drives flow_store, parses done/suspended → the runner contract. Dual-runner ROUTING in + flows.sx (host/flow--select-runner + required-caps for {:erl-flow :needs} DAGs) makes the capability + model REAL (2 runners). ra 9/9 (mock) + plans/ra-integration.sh 4/4 (REAL module → live flow_store). + Full host conformance 607/607. FINDING: gen_servers don't persist across erlang-eval-ast calls, so + RA-LIVE (true cross-call suspend/resume) needs a persistent next/ kernel process — the async + boundary is deeper than "off the request path". Runner mechanics fully proven; RA-live = lifecycle + + wiring. NEXT: RA-live (persistent kernel + a durable binding wired to RA), or P1 (capability model + is now real, so it's no longer vacuous). - 2026-07-02 — RA SPIKE DONE → RA is VIABLE (plans/ra-spike.sh, 4/4). From SX: our canonical activity serializes to valid Erlang, drives blog_publish_digest through flow_store (done + suspend + resume), no er-scheduler deadlock. De-risks the whole durable/federated half. Findings: erlang-eval-ast diff --git a/plans/ra-integration.sh b/plans/ra-integration.sh new file mode 100755 index 00000000..491352d8 --- /dev/null +++ b/plans/ra-integration.sh @@ -0,0 +1,63 @@ +#!/usr/bin/env bash +# RA INTEGRATION — the REAL lib/host/ra.sx runner (host/ra--make-runner + host/ra--real-eval) +# driving next/'s live flow_store durable flow, end-to-end. (ra-spike.sh proved the path with +# inline Erlang; this proves the MODULE.) urgent→done, newsletter→suspended, resume→done. +set -uo pipefail +cd "$(git rev-parse --show-toplevel)" +SX_SERVER="${SX_SERVER:-hosts/ocaml/_build/default/bin/sx_server.exe}" +[ -x "$SX_SERVER" ] || SX_SERVER="/root/rose-ash/hosts/ocaml/_build/default/bin/sx_server.exe" +PASS=0; FAIL=0 +TMPFILE=$(mktemp); trap "rm -f $TMPFILE" EXIT + +cat > "$TMPFILE" <<'EPOCHS' +(epoch 1) +(load "lib/erlang/tokenizer.sx") +(load "lib/erlang/parser.sx") +(load "lib/erlang/parser-core.sx") +(load "lib/erlang/parser-expr.sx") +(load "lib/erlang/parser-module.sx") +(load "lib/erlang/transpile.sx") +(load "lib/erlang/runtime.sx") +(load "lib/erlang/vm/dispatcher.sx") +(load "lib/host/ra.sx") +(epoch 2) +(eval "(er-load-gen-server!)") +(eval "(get (erlang-load-module (file-read \"next/kernel/envelope.erl\")) :name)") +(eval "(get (erlang-load-module (file-read \"next/flow/flow.erl\")) :name)") +(eval "(get (erlang-load-module (file-read \"next/flow/flow_spec.erl\")) :name)") +(eval "(get (erlang-load-module (file-read \"next/flow/flow_store.erl\")) :name)") +(eval "(get (erlang-load-module (file-read \"next/flow/flows/blog_publish_digest.erl\")) :name)") +(epoch 3) +;; gen_servers don't persist across separate erlang-eval-ast calls (flow README), so the injected +;; erl-eval boots the store + registers the flow inline on EVERY call (like the e2e). This proves +;; the MODULE's marshalling/dispatch/parse against the REAL flow; TRUE cross-call resume (suspend, +;; return, resume later) needs a PERSISTENT next/ kernel process — the RA-live deployment step. +(eval "(define ra/boot-eval (fn (src) (er-to-sx-deep (erlang-eval-ast (str \"flow_store:start_link(), FF = fun(_) -> [f1, f2, f3] end, flow_store:register_flow(bd, blog_publish_digest:build([{fetch_followers, FF}])), \" src)))))") +(eval "(define ra/R (host/ra--make-runner ra/boot-eval))") +(eval "(define ra/urgent {:verb \"create\" :actor \"alice\" :id \"u1\" :object-type \"article\" :category \"urgent\"})") +(eval "(define ra/news {:verb \"create\" :actor \"alice\" :id \"n1\" :object-type \"article\" :category \"newsletter\"})") +;; ── urgent: the real MODULE runner → done ── +(epoch 10) +(eval "(get ((get ra/R :run) {:erl-flow \"bd\"} {:activity ra/urgent}) :status)") +;; ── newsletter: the real MODULE runner → suspended (durable wait) ── +(epoch 20) +(eval "(get ((get ra/R :run) {:erl-flow \"bd\"} {:activity ra/news}) :status)") +;; ── the suspended result carries a resume handle {:id :tag morning} ── +(epoch 21) +(eval "(str (get (get ((get ra/R :run) {:erl-flow \"bd\"} {:activity ra/news}) :resume) :tag))") +;; ── the done result carries the flow's effect-as-data (digest_sent) ── +(epoch 30) +(eval "(str (first (get ((get ra/R :run) {:erl-flow \"bd\"} {:activity ra/urgent}) :effects)))") +EPOCHS + +OUT=$(timeout 360 "$SX_SERVER" < "$TMPFILE" 2>/dev/null) +grab() { echo "$OUT" | awk -v e="$1" '$0 ~ "^\\(ok " e " "{print;exit} $0 ~ "^\\(ok-len " e " "{getline;print;exit} $0 ~ "^\\(error " e " "{print;exit}'; } +ck() { local a; a=$(grab "$1"); if echo "$a" | grep -qF -- "$2"; then PASS=$((PASS+1)); echo " ok [$3]"; else FAIL=$((FAIL+1)); echo " FAIL [$3] want '$2' got: $a"; fi; } + +echo "── RA integration (the real host/ra.sx module) ──────" +ck 10 "done" "urgent → real RA runner → done" +ck 20 "suspended" "newsletter → real RA runner → suspended" +ck 21 "morning" "suspended result carries a resume handle (:tag morning)" +ck 30 "digest_sent" "done result carries the flow's effect-as-data" +echo "─────────────────────────────────────────────────────" +echo "PASS=$PASS FAIL=$FAIL"