RA spike — the Erlang durable runner is VIABLE (4/4)

Narrow spike (plans/ra-spike.sh) de-risking the durable/federated half before building P1/P2. From
the SX side it proves the whole RA path: (1) our canonical activity dict serializes to a valid
Erlang activity-proplist source (the marshaller shape = host/blog--activity->erl); (2) it drives
pipeline:apply_triggers → blog_publish_digest → done + 3 emails (urgent sync branch); (3) the
newsletter activity SUSPENDS on the morning timer (status =:= {ok,{suspended,morning}}); (4)
flow_store:resume completes it → 3 emails (the async cycle closes); (5) NO er-scheduler deadlock —
flow-on-erlang's railway threading holds when driven from SX.

Findings recorded in the plan for the full build: erlang-eval-ast returns Erlang TERMS directly
(integers raw, atoms as {:tag atom :name …}) so the runner must parse results (not assume :name);
flow_store start→{done,V}|{suspended,Tag} + resume(Id,Res) maps 1:1 onto the seam's runner contract
{:status done|suspended :effects :resume}; the instance Id is the resume handle. Remaining for full
RA: load the Erlang runtime into the serving process (or out-of-process), the async dispatch
boundary (DEBT #3), CID→binary marshalling, structured result parsing.

Co-Authored-By: Claude Opus 4.8 <noreply@anthropic.com>
This commit is contained in:
2026-07-02 15:33:48 +00:00
parent a5d43246e0
commit 17602e597f
2 changed files with 92 additions and 7 deletions

View File

@@ -214,13 +214,25 @@ without touching the DAG or the wiring.
Add/Remove. Emit canonical activities into the transport log. Define the delta summary. Add/Remove. Emit canonical activities into the transport log. Define the delta summary.
## RA — the ERLANG (durable) RUNNER adapter ← the old "fed-sx spike", now an adapter ## RA — the ERLANG (durable) RUNNER adapter ← the old "fed-sx spike", now an adapter
<!-- PREREQ (review): move dispatch OFF the request path (DEBT #3) — background loop calls behavior/pump; suspend can't block a request. Do a NARROW SPIKE first. --> <!-- PREREQ (review): move dispatch OFF the request path (DEBT #3) — background loop calls behavior/pump; suspend can't block a request. -->
- [ ] A seam runner wrapping next/'s flow_dispatch + flow_store: marshal the canonical activity → **SPIKE DONE 2026-07-02 — RA is VIABLE (plans/ra-spike.sh, 4/4).** Proven from the SX side:
next/'s proplist, dispatch to a named flow (blog_publish_digest), map the result back to (1) our canonical activity dict → Erlang activity-proplist SOURCE (ra/erl-src ≈ host/blog--activity->erl)
{:status done|suspended :effects :resume}. On suspend, wire the resumed completion → the transport is valid Erlang the flow consumes; (2) it drives pipeline:apply_triggers → blog_publish_digest → done
inbound (pump). Baseline: next/tests/triggers_e2e.sh 10/10. RISK: er-scheduler context + 3 emails (urgent sync branch); (3) the newsletter activity SUSPENDS on the morning timer
(project_fed_prims_http_listen_scheduler); keep flows effect-as-data (no blocking). SPIKE the (status =:= {ok,{suspended,morning}}); (4) flow_store:resume(Id, morning_ts) COMPLETES it → 3 emails
SX→erlang-on-sx call in isolation first. (the async cycle); (5) NO er-scheduler deadlock — flow-on-erlang's railway threading holds when
driven from SX. KEY FINDINGS for the build: erlang-eval-ast returns Erlang TERMS directly (integers
raw, atoms as {:tag atom :name …}) — the runner must parse results, not assume :name; flow_store
start→{done,V}|{suspended,Tag}, resume(Id,Res) maps 1:1 onto {:status done|suspended :effects :resume};
the flow instance Id is the resume handle.
- [ ] Build the RA runner (a seam {:capabilities #{effect,branch,each,suspend} :run}): marshal via
host/blog--activity->erl → Erlang source; flow_store:start (or apply_triggers) via erlang-eval-ast;
parse {done,V}→{:status done :effects V} / {suspended,Tag}→{:status suspended :resume {id,tag}}.
- [ ] REMAINING (beyond the spike): (a) load the Erlang runtime + next/flow into the serving process
(big serve.sh dependency) OR run RA out-of-process; (b) DEBT #3 — async boundary: dispatch off the
request path, a background loop drives resume + behavior/pump; (c) real CID→binary marshalling
(spike used atom ids); (d) structured result parsing (Erlang term → SX effects). Baseline still
green: next/tests/triggers_e2e.sh 10/10.
## TA — the FED-SX TRANSPORT adapter ← federation proper ## TA — the FED-SX TRANSPORT adapter ← federation proper
- [ ] A seam transport over next/ delivery: :emit → outbox → peers; :deliver → inbox. A remote - [ ] A seam transport over next/ delivery: :emit → outbox → peers; :deliver → inbox. A remote
@@ -255,6 +267,13 @@ covers everything until a DAG's cost/latency/placement forces the substrate.
activities), so business logic can change state, which federates, which triggers more flows. activities), so business logic can change state, which federates, which triggers more flows.
## Progress log (newest first) ## Progress log (newest first)
- 2026-07-02 — RA SPIKE DONE → RA is VIABLE (plans/ra-spike.sh, 4/4). From SX: our canonical activity
serializes to valid Erlang, drives blog_publish_digest through flow_store (done + suspend + resume),
no er-scheduler deadlock. De-risks the whole durable/federated half. Findings: erlang-eval-ast
returns terms directly (parse, don't assume :name); flow_store start/resume maps 1:1 onto the runner
contract. Remaining for full RA: load the Erlang runtime into serving (or out-of-process), the async
dispatch boundary (DEBT #3), CID→binary marshalling, structured result parsing. NEXT: full RA, or
P1 now that a real 2nd runner is proven reachable.
- 2026-07-02 — P0.4 DONE + LIVE-VERIFIED → **P0 COMPLETE**. Canonical seam activity shape - 2026-07-02 — P0.4 DONE + LIVE-VERIFIED → **P0 COMPLETE**. Canonical seam activity shape
({:verb :object=cid :object-type :slug :category :delta :id}); consumers reconciled (trigger match, ({:verb :object=cid :object-type :slug :category :delta :id}); consumers reconciled (trigger match,
publish-ctx); host/blog--activity->erl marshaller staged for RA. Published live → /flows fired publish-ctx); host/blog--activity->erl marshaller staged for RA. Published live → /flows fired

66
plans/ra-spike.sh Executable file
View File

@@ -0,0 +1,66 @@
#!/usr/bin/env bash
# RA SPIKE — can an SX-side runner drive next/'s durable Erlang flow (blog_publish_digest)
# through flow_store, using OUR canonical activity (serialized from SX), incl. suspend→resume?
set -uo pipefail
cd "$(git rev-parse --show-toplevel)"
SX_SERVER="${SX_SERVER:-hosts/ocaml/_build/default/bin/sx_server.exe}"
[ -x "$SX_SERVER" ] || SX_SERVER="/root/rose-ash/hosts/ocaml/_build/default/bin/sx_server.exe"
PASS=0; FAIL=0
TMPFILE=$(mktemp); trap "rm -f $TMPFILE" EXIT
cat > "$TMPFILE" <<'EPOCHS'
(epoch 1)
(load "lib/erlang/tokenizer.sx")
(load "lib/erlang/parser.sx")
(load "lib/erlang/parser-core.sx")
(load "lib/erlang/parser-expr.sx")
(load "lib/erlang/parser-module.sx")
(load "lib/erlang/transpile.sx")
(load "lib/erlang/runtime.sx")
(load "lib/erlang/vm/dispatcher.sx")
(epoch 2)
(eval "(er-load-gen-server!)")
(eval "(get (erlang-load-module (file-read \"next/kernel/envelope.erl\")) :name)")
(eval "(get (erlang-load-module (file-read \"next/flow/flow.erl\")) :name)")
(eval "(get (erlang-load-module (file-read \"next/flow/flow_spec.erl\")) :name)")
(eval "(get (erlang-load-module (file-read \"next/flow/flow_store.erl\")) :name)")
(eval "(get (erlang-load-module (file-read \"next/kernel/trigger_registry.erl\")) :name)")
(eval "(get (erlang-load-module (file-read \"next/kernel/flow_dispatch.erl\")) :name)")
(eval "(get (erlang-load-module (file-read \"next/kernel/pipeline.erl\")) :name)")
(epoch 3)
(eval "(get (erlang-load-module (file-read \"next/flow/flows/blog_publish_digest.erl\")) :name)")
(epoch 4)
;; the SX-side RA marshaller: our canonical activity dict -> the Erlang activity proplist SOURCE.
;; (id emitted as a bare atom for the spike; a real CID marshals to a binary.)
(eval "(define ra/erl-src (fn (a) (str \"[{type, \" (get a :verb) \"}, {actor, \" (get a :actor) \"}, {id, \" (get a :id) \"}, {object, [{type, \" (get a :object-type) \"}, {category, \" (get a :category) \"}]}]\")))")
;; boot: registry + store + the flow (injected 3-follower fetch) + an on-Article trigger.
(eval "(define ra/boot \"trigger_registry:start_link(), flow_store:start_link(), FF = fun(_) -> [f1, f2, f3] end, Flow = blog_publish_digest:build([{fetch_followers, FF}]), flow_store:register_flow(blog_digest, Flow), Guard = fun(A, _) -> case envelope:get_field(object, A) of {ok, O} -> envelope:get_field(type, O) =:= {ok, article}; _ -> false end end, trigger_registry:add(create, trigger_registry:mk_spec(<<116,99>>, blog_digest, Guard, any)), Cfg = [{trigger_registry, trigger_registry}], AS = [{actor_id, alice}],\")")
;; OUR canonical activities (host/blog--activity->erl shape) — urgent + newsletter.
(eval "(define ra/urgent {:verb \"create\" :actor \"alice\" :id \"u1\" :object-type \"article\" :category \"urgent\"})")
(eval "(define ra/news {:verb \"create\" :actor \"alice\" :id \"n1\" :object-type \"article\" :category \"newsletter\"})")
;; ── urgent: our serialized activity drives the flow → done, 3 emails (sync branch) ──
(epoch 10)
(eval "(get (erlang-eval-ast (str ra/boot \" pipeline:apply_triggers(\" (ra/erl-src ra/urgent) \", AS, Cfg), {ok, {done, {digest_sent, Emails, _}}} = flow_store:status(1), length(Emails) =:= 3\")) :name)")
;; ── newsletter: SUSPENDS on the morning timer (durable, off-request) ──
(epoch 20)
(eval "(get (erlang-eval-ast (str ra/boot \" pipeline:apply_triggers(\" (ra/erl-src ra/news) \", AS, Cfg), flow_store:status(1) =:= {ok, {suspended, morning}}\")) :name)")
;; ── newsletter: RESUME the suspended instance → completes, 3 emails (the async cycle) ──
(epoch 21)
(eval "(get (erlang-eval-ast (str ra/boot \" pipeline:apply_triggers(\" (ra/erl-src ra/news) \", AS, Cfg), {ok, {flow_done, {digest_sent, Emails, _}}} = flow_store:resume(1, morning_ts), length(Emails) =:= 3\")) :name)")
;; ── the marshaller output itself (prove the SX serializer produces the right Erlang source) ──
(epoch 30)
(eval "(ra/erl-src ra/urgent)")
EPOCHS
OUT=$(timeout 360 "$SX_SERVER" < "$TMPFILE" 2>/dev/null)
grab() { echo "$OUT" | awk -v e="$1" '$0 ~ "^\\(ok " e " "{print;exit} $0 ~ "^\\(ok-len " e " "{getline;print;exit} $0 ~ "^\\(error " e " "{print;exit}'; }
ck() { local a; a=$(grab "$1"); if echo "$a" | grep -qF -- "$2"; then PASS=$((PASS+1)); echo " ok [$3]"; else FAIL=$((FAIL+1)); echo " FAIL [$3] want '$2' got: $a"; fi; }
echo "── RA spike ─────────────────────────────────────────"
ck 10 "true" "urgent: SX activity → flow → done, 3 emails"
ck 20 "true" "newsletter: SX activity → flow SUSPENDS (morning)"
ck 21 "true" "newsletter: RESUME → completes, 3 emails (async cycle)"
ck 30 "category, urgent" "marshaller emits valid Erlang activity source"
echo "─────────────────────────────────────────────────────"
echo "PASS=$PASS FAIL=$FAIL"