From 22e2835bdb4486c411068db21f91998b4cf59bee Mon Sep 17 00:00:00 2001 From: giles Date: Thu, 2 Jul 2026 17:11:47 +0000 Subject: [PATCH] =?UTF-8?q?next:=20the=20real=20durable-execution=20KERNEL?= =?UTF-8?q?=20SERVICE=20(host=5Fkernel)=20=E2=80=94=20RA-live=20substrate?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Promotes the persistent-kernel spike into a real service. next/kernel/host_kernel.erl: boots flow_store, registers named behavior flows (blog_digest), then blocks in http:listen so the er-scheduler + gen_server stay alive across requests. Parameterised flow routes (paths matched by byte prefix — binary =:= is buggy): GET /flow/start/ starts the flow with that category and returns ':' (suspended|done); GET /flow/resume/ resumes that instance. Path plumbing (starts_with / last_seg / field) is byte-level for portability. next/kernel/serve.sh: the persistent service launcher (container entrypoint / local) — loads the runtime + next/flow + the kernel, then host_kernel:start(); sleep infinity holds stdin so the listener serves forever. next/tests/host_kernel.sh: drives it over HTTP — 4/4: newsletter → instance 1 SUSPENDED, urgent → 2 DONE, draft → 3 DONE (skipped), resume 1 in a SEPARATE request → DONE (durable state persists across requests). serve.sh launcher verified live (bind + start + resume). This is the RA-live substrate: a working durable-execution service the host drives over HTTP. Remaining for RA-live: deploy it (a container/placement), point host/ra.sx's real-eval at it (POST /flow instead of in-process erlang-eval-ast), route a durable binding to RA. TA-live adds inbox/ outbox routes on the same kernel. Co-Authored-By: Claude Opus 4.8 --- next/kernel/host_kernel.erl | 71 +++++++++++++++++++++++++++++++++++++ next/kernel/serve.sh | 40 +++++++++++++++++++++ next/tests/host_kernel.sh | 67 ++++++++++++++++++++++++++++++++++ 3 files changed, 178 insertions(+) create mode 100644 next/kernel/host_kernel.erl create mode 100755 next/kernel/serve.sh create mode 100755 next/tests/host_kernel.sh diff --git a/next/kernel/host_kernel.erl b/next/kernel/host_kernel.erl new file mode 100644 index 00000000..f39e7c92 --- /dev/null +++ b/next/kernel/host_kernel.erl @@ -0,0 +1,71 @@ +%% next/kernel/host_kernel.erl — the persistent DURABLE-EXECUTION kernel the host talks to (RA-live). +%% A long-lived next/ service: boots flow_store + registers named behavior flows, then blocks in +%% http:listen so the er-scheduler (and flow_store's gen_server) stay alive across requests. The host +%% drives durable flows over HTTP; instances persist between requests (spike-proven). +%% +%% Routes (path-parameterised — binary =:= is buggy in this port, so paths are matched by byte +%% prefix, not equality): +%;; GET /flow/start/ -> start blog_digest with that category -> ":" +%% (status = suspended | done) +%% GET /flow/resume/ -> resume instance (morning timer) -> "resume:" +%% This is the substrate for RA-live; TA-live adds inbox/outbox routes on the same kernel. +-module(host_kernel). +-export([start/1]). + +start(Port) -> + flow_store:start_link(), + FF = fun (_) -> [f1, f2, f3] end, + flow_store:register_flow(blog_digest, blog_publish_digest:build([{fetch_followers, FF}])), + http:listen(Port, fun (Req) -> route(Req) end). + +route(Req) -> + [{status, 200}, {headers, []}, {body, respond(field(path, Req))}]. + +respond(P) -> + case starts_with(P, [47,102,108,111,119,47,115,116,97,114,116,47]) of % "/flow/start/" + true -> do_start(last_seg(P)); + false -> + case starts_with(P, [47,102,108,111,119,47,114,101,115,117,109,101,47]) of % "/flow/resume/" + true -> do_resume(last_seg(P)); + false -> <<"path:unknown">> + end + end. + +%% start blog_digest with the path's -> ":suspended" | ":done" +do_start(CatChars) -> + Cat = list_to_atom(CatChars), + Env = [{activity, [{type, create}, {actor, site}, {id, <<110,49>>}, + {object, [{type, article}, {category, Cat}]}]}, + {actor, site}], + case flow_store:start(blog_digest, Env) of + {ok, Id, {flow_suspended, _}} -> id_status(Id, [115,117,115,112,101,110,100,101,100]); % "suspended" + {ok, Id, {flow_done, _}} -> id_status(Id, [100,111,110,101]); % "done" + _ -> <<"start:other">> + end. + +%% resume instance -> "resume:done" | "resume:other" +do_resume(IdChars) -> + case flow_store:resume(list_to_integer(IdChars), morning_ts) of + {ok, {flow_done, _}} -> <<"resume:done">>; + {flow_done, _} -> <<"resume:done">>; + _ -> <<"resume:other">> + end. + +%% ":" as a response binary +id_status(Id, StatusChars) -> + list_to_binary(integer_to_list(Id) ++ [58] ++ StatusChars). % ++ ":" ++ + +%% ── plumbing (byte-level; string-literal charlists avoided for portability) ── +starts_with(Bin, Prefix) -> starts_with_(binary_to_list(Bin), Prefix). +starts_with_(_, []) -> true; +starts_with_([C | T], [C | P]) -> starts_with_(T, P); +starts_with_(_, _) -> false. + +last_seg(Bin) -> last_seg_(binary_to_list(Bin), []). +last_seg_([], Acc) -> lists:reverse(Acc); +last_seg_([47 | T], _) -> last_seg_(T, []); % reset at each '/' +last_seg_([C | T], Acc) -> last_seg_(T, [C | Acc]). + +field(K, [{K, V} | _]) -> V; +field(K, [_ | Rest]) -> field(K, Rest); +field(_, []) -> nil. diff --git a/next/kernel/serve.sh b/next/kernel/serve.sh new file mode 100755 index 00000000..7a110ed2 --- /dev/null +++ b/next/kernel/serve.sh @@ -0,0 +1,40 @@ +#!/usr/bin/env bash +# next/kernel/serve.sh — run host_kernel as a PERSISTENT durable-execution service (RA-live substrate). +# Loads the Erlang-on-SX runtime + next/flow + the kernel, then host_kernel:start($KERNEL_PORT) blocks +# in http:listen forever — keeping the er-scheduler + flow_store's gen_server alive so durable flow +# instances survive across requests. Doubles as a container entrypoint and a local launcher. The host's +# RA runner drives it over HTTP (GET /flow/start/, GET /flow/resume/). +set -uo pipefail +cd "${SX_PROJECT_DIR:-$(git rev-parse --show-toplevel 2>/dev/null || echo .)}" + +SX_SERVER="${SX_SERVER:-hosts/ocaml/_build/default/bin/sx_server.exe}" +[ -x "$SX_SERVER" ] || SX_SERVER="/root/rose-ash/hosts/ocaml/_build/default/bin/sx_server.exe" +if [ ! -x "$SX_SERVER" ]; then echo "ERROR: sx_server.exe not found." >&2; exit 1; fi +PORT="${KERNEL_PORT:-8930}" + +# The epoch stream loads everything then calls start (which blocks in http:listen). `sleep infinity` +# holds stdin open so EOF doesn't exit(0) before/while the listener serves. +{ + cat </dev/null || true; wait "$SXPID" 2>/dev/null || true; } + [ -n "${HOLDPID:-}" ] && { kill -KILL "$HOLDPID" 2>/dev/null || true; wait "$HOLDPID" 2>/dev/null || true; } + rm -f "$EPOCH_FILE" "$LOG_FILE" +} +trap cleanup EXIT + +cat > "$EPOCH_FILE" < "$FIFO" & +HOLDPID=$! +"$SX_SERVER" < "$FIFO" > "$LOG_FILE" 2>&1 & +SXPID=$! +rm -f "$FIFO" + +echo "── waiting for host_kernel to bind :$PORT ──" +BOUND="" +for i in $(seq 1 300); do + if (exec 3<>/dev/tcp/127.0.0.1/$PORT) 2>/dev/null; then BOUND=1; exec 3>&- 3<&-; echo "bound (iter $i)"; break; fi + sleep 1 +done +if [ -z "$BOUND" ]; then echo "FAIL: never bound"; tail -25 "$LOG_FILE"; exit 1; fi + +ck() { local got; got=$(curl -s -m 8 "http://127.0.0.1:$PORT$1"); if echo "$got" | grep -q "$2"; then PASS=$((PASS+1)); echo " ok [$3] $1 → $got"; else FAIL=$((FAIL+1)); echo " FAIL [$3] $1 → '$got' (want '$2')"; fi; } + +echo "── driving the kernel over HTTP ──" +ck /flow/start/newsletter "1:suspended" "newsletter → instance 1, SUSPENDED (durable wait)" +ck /flow/start/urgent "2:done" "urgent → instance 2, DONE (sync branch)" +ck /flow/start/draft "3:done" "draft → instance 3, DONE (skipped)" +ck /flow/resume/1 "resume:done" "resume instance 1 (SEPARATE request) → DONE (persisted)" + +echo "─────────────────────────────────────────────────────" +echo "PASS=$PASS FAIL=$FAIL" +[ "$FAIL" -eq 0 ] || { echo "--- kernel log tail ---"; tail -20 "$LOG_FILE"; }