From 17602e597fdaf184e65a3beade5bf50debd9350e Mon Sep 17 00:00:00 2001 From: giles Date: Thu, 2 Jul 2026 15:33:48 +0000 Subject: [PATCH] =?UTF-8?q?RA=20spike=20=E2=80=94=20the=20Erlang=20durable?= =?UTF-8?q?=20runner=20is=20VIABLE=20(4/4)?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Narrow spike (plans/ra-spike.sh) de-risking the durable/federated half before building P1/P2. From the SX side it proves the whole RA path: (1) our canonical activity dict serializes to a valid Erlang activity-proplist source (the marshaller shape = host/blog--activity->erl); (2) it drives pipeline:apply_triggers → blog_publish_digest → done + 3 emails (urgent sync branch); (3) the newsletter activity SUSPENDS on the morning timer (status =:= {ok,{suspended,morning}}); (4) flow_store:resume completes it → 3 emails (the async cycle closes); (5) NO er-scheduler deadlock — flow-on-erlang's railway threading holds when driven from SX. Findings recorded in the plan for the full build: erlang-eval-ast returns Erlang TERMS directly (integers raw, atoms as {:tag atom :name …}) so the runner must parse results (not assume :name); flow_store start→{done,V}|{suspended,Tag} + resume(Id,Res) maps 1:1 onto the seam's runner contract {:status done|suspended :effects :resume}; the instance Id is the resume handle. Remaining for full RA: load the Erlang runtime into the serving process (or out-of-process), the async dispatch boundary (DEBT #3), CID→binary marshalling, structured result parsing. Co-Authored-By: Claude Opus 4.8 --- plans/business-logic-fed-flows.md | 33 ++++++++++++---- plans/ra-spike.sh | 66 +++++++++++++++++++++++++++++++ 2 files changed, 92 insertions(+), 7 deletions(-) create mode 100755 plans/ra-spike.sh diff --git a/plans/business-logic-fed-flows.md b/plans/business-logic-fed-flows.md index 9fdf385b..5b9c34ce 100644 --- a/plans/business-logic-fed-flows.md +++ b/plans/business-logic-fed-flows.md @@ -214,13 +214,25 @@ without touching the DAG or the wiring. Add/Remove. Emit canonical activities into the transport log. Define the delta summary. ## RA — the ERLANG (durable) RUNNER adapter ← the old "fed-sx spike", now an adapter - -- [ ] A seam runner wrapping next/'s flow_dispatch + flow_store: marshal the canonical activity → - next/'s proplist, dispatch to a named flow (blog_publish_digest), map the result back to - {:status done|suspended :effects :resume}. On suspend, wire the resumed completion → the transport - inbound (pump). Baseline: next/tests/triggers_e2e.sh 10/10. RISK: er-scheduler context - (project_fed_prims_http_listen_scheduler); keep flows effect-as-data (no blocking). SPIKE the - SX→erlang-on-sx call in isolation first. + +**SPIKE DONE 2026-07-02 — RA is VIABLE (plans/ra-spike.sh, 4/4).** Proven from the SX side: +(1) our canonical activity dict → Erlang activity-proplist SOURCE (ra/erl-src ≈ host/blog--activity->erl) +is valid Erlang the flow consumes; (2) it drives pipeline:apply_triggers → blog_publish_digest → done ++ 3 emails (urgent sync branch); (3) the newsletter activity SUSPENDS on the morning timer +(status =:= {ok,{suspended,morning}}); (4) flow_store:resume(Id, morning_ts) COMPLETES it → 3 emails +(the async cycle); (5) NO er-scheduler deadlock — flow-on-erlang's railway threading holds when +driven from SX. KEY FINDINGS for the build: erlang-eval-ast returns Erlang TERMS directly (integers +raw, atoms as {:tag atom :name …}) — the runner must parse results, not assume :name; flow_store +start→{done,V}|{suspended,Tag}, resume(Id,Res) maps 1:1 onto {:status done|suspended :effects :resume}; +the flow instance Id is the resume handle. +- [ ] Build the RA runner (a seam {:capabilities #{effect,branch,each,suspend} :run}): marshal via + host/blog--activity->erl → Erlang source; flow_store:start (or apply_triggers) via erlang-eval-ast; + parse {done,V}→{:status done :effects V} / {suspended,Tag}→{:status suspended :resume {id,tag}}. +- [ ] REMAINING (beyond the spike): (a) load the Erlang runtime + next/flow into the serving process + (big serve.sh dependency) OR run RA out-of-process; (b) DEBT #3 — async boundary: dispatch off the + request path, a background loop drives resume + behavior/pump; (c) real CID→binary marshalling + (spike used atom ids); (d) structured result parsing (Erlang term → SX effects). Baseline still + green: next/tests/triggers_e2e.sh 10/10. ## TA — the FED-SX TRANSPORT adapter ← federation proper - [ ] A seam transport over next/ delivery: :emit → outbox → peers; :deliver → inbox. A remote @@ -255,6 +267,13 @@ covers everything until a DAG's cost/latency/placement forces the substrate. activities), so business logic can change state, which federates, which triggers more flows. ## Progress log (newest first) +- 2026-07-02 — RA SPIKE DONE → RA is VIABLE (plans/ra-spike.sh, 4/4). From SX: our canonical activity + serializes to valid Erlang, drives blog_publish_digest through flow_store (done + suspend + resume), + no er-scheduler deadlock. De-risks the whole durable/federated half. Findings: erlang-eval-ast + returns terms directly (parse, don't assume :name); flow_store start/resume maps 1:1 onto the runner + contract. Remaining for full RA: load the Erlang runtime into serving (or out-of-process), the async + dispatch boundary (DEBT #3), CID→binary marshalling, structured result parsing. NEXT: full RA, or + P1 now that a real 2nd runner is proven reachable. - 2026-07-02 — P0.4 DONE + LIVE-VERIFIED → **P0 COMPLETE**. Canonical seam activity shape ({:verb :object=cid :object-type :slug :category :delta :id}); consumers reconciled (trigger match, publish-ctx); host/blog--activity->erl marshaller staged for RA. Published live → /flows fired diff --git a/plans/ra-spike.sh b/plans/ra-spike.sh new file mode 100755 index 00000000..35e42c6b --- /dev/null +++ b/plans/ra-spike.sh @@ -0,0 +1,66 @@ +#!/usr/bin/env bash +# RA SPIKE — can an SX-side runner drive next/'s durable Erlang flow (blog_publish_digest) +# through flow_store, using OUR canonical activity (serialized from SX), incl. suspend→resume? +set -uo pipefail +cd "$(git rev-parse --show-toplevel)" +SX_SERVER="${SX_SERVER:-hosts/ocaml/_build/default/bin/sx_server.exe}" +[ -x "$SX_SERVER" ] || SX_SERVER="/root/rose-ash/hosts/ocaml/_build/default/bin/sx_server.exe" +PASS=0; FAIL=0 +TMPFILE=$(mktemp); trap "rm -f $TMPFILE" EXIT + +cat > "$TMPFILE" <<'EPOCHS' +(epoch 1) +(load "lib/erlang/tokenizer.sx") +(load "lib/erlang/parser.sx") +(load "lib/erlang/parser-core.sx") +(load "lib/erlang/parser-expr.sx") +(load "lib/erlang/parser-module.sx") +(load "lib/erlang/transpile.sx") +(load "lib/erlang/runtime.sx") +(load "lib/erlang/vm/dispatcher.sx") +(epoch 2) +(eval "(er-load-gen-server!)") +(eval "(get (erlang-load-module (file-read \"next/kernel/envelope.erl\")) :name)") +(eval "(get (erlang-load-module (file-read \"next/flow/flow.erl\")) :name)") +(eval "(get (erlang-load-module (file-read \"next/flow/flow_spec.erl\")) :name)") +(eval "(get (erlang-load-module (file-read \"next/flow/flow_store.erl\")) :name)") +(eval "(get (erlang-load-module (file-read \"next/kernel/trigger_registry.erl\")) :name)") +(eval "(get (erlang-load-module (file-read \"next/kernel/flow_dispatch.erl\")) :name)") +(eval "(get (erlang-load-module (file-read \"next/kernel/pipeline.erl\")) :name)") +(epoch 3) +(eval "(get (erlang-load-module (file-read \"next/flow/flows/blog_publish_digest.erl\")) :name)") +(epoch 4) +;; the SX-side RA marshaller: our canonical activity dict -> the Erlang activity proplist SOURCE. +;; (id emitted as a bare atom for the spike; a real CID marshals to a binary.) +(eval "(define ra/erl-src (fn (a) (str \"[{type, \" (get a :verb) \"}, {actor, \" (get a :actor) \"}, {id, \" (get a :id) \"}, {object, [{type, \" (get a :object-type) \"}, {category, \" (get a :category) \"}]}]\")))") +;; boot: registry + store + the flow (injected 3-follower fetch) + an on-Article trigger. +(eval "(define ra/boot \"trigger_registry:start_link(), flow_store:start_link(), FF = fun(_) -> [f1, f2, f3] end, Flow = blog_publish_digest:build([{fetch_followers, FF}]), flow_store:register_flow(blog_digest, Flow), Guard = fun(A, _) -> case envelope:get_field(object, A) of {ok, O} -> envelope:get_field(type, O) =:= {ok, article}; _ -> false end end, trigger_registry:add(create, trigger_registry:mk_spec(<<116,99>>, blog_digest, Guard, any)), Cfg = [{trigger_registry, trigger_registry}], AS = [{actor_id, alice}],\")") +;; OUR canonical activities (host/blog--activity->erl shape) — urgent + newsletter. +(eval "(define ra/urgent {:verb \"create\" :actor \"alice\" :id \"u1\" :object-type \"article\" :category \"urgent\"})") +(eval "(define ra/news {:verb \"create\" :actor \"alice\" :id \"n1\" :object-type \"article\" :category \"newsletter\"})") + +;; ── urgent: our serialized activity drives the flow → done, 3 emails (sync branch) ── +(epoch 10) +(eval "(get (erlang-eval-ast (str ra/boot \" pipeline:apply_triggers(\" (ra/erl-src ra/urgent) \", AS, Cfg), {ok, {done, {digest_sent, Emails, _}}} = flow_store:status(1), length(Emails) =:= 3\")) :name)") +;; ── newsletter: SUSPENDS on the morning timer (durable, off-request) ── +(epoch 20) +(eval "(get (erlang-eval-ast (str ra/boot \" pipeline:apply_triggers(\" (ra/erl-src ra/news) \", AS, Cfg), flow_store:status(1) =:= {ok, {suspended, morning}}\")) :name)") +;; ── newsletter: RESUME the suspended instance → completes, 3 emails (the async cycle) ── +(epoch 21) +(eval "(get (erlang-eval-ast (str ra/boot \" pipeline:apply_triggers(\" (ra/erl-src ra/news) \", AS, Cfg), {ok, {flow_done, {digest_sent, Emails, _}}} = flow_store:resume(1, morning_ts), length(Emails) =:= 3\")) :name)") +;; ── the marshaller output itself (prove the SX serializer produces the right Erlang source) ── +(epoch 30) +(eval "(ra/erl-src ra/urgent)") +EPOCHS + +OUT=$(timeout 360 "$SX_SERVER" < "$TMPFILE" 2>/dev/null) +grab() { echo "$OUT" | awk -v e="$1" '$0 ~ "^\\(ok " e " "{print;exit} $0 ~ "^\\(ok-len " e " "{getline;print;exit} $0 ~ "^\\(error " e " "{print;exit}'; } +ck() { local a; a=$(grab "$1"); if echo "$a" | grep -qF -- "$2"; then PASS=$((PASS+1)); echo " ok [$3]"; else FAIL=$((FAIL+1)); echo " FAIL [$3] want '$2' got: $a"; fi; } + +echo "── RA spike ─────────────────────────────────────────" +ck 10 "true" "urgent: SX activity → flow → done, 3 emails" +ck 20 "true" "newsletter: SX activity → flow SUSPENDS (morning)" +ck 21 "true" "newsletter: RESUME → completes, 3 emails (async cycle)" +ck 30 "category, urgent" "marshaller emits valid Erlang activity source" +echo "─────────────────────────────────────────────────────" +echo "PASS=$PASS FAIL=$FAIL"