next: the real durable-execution KERNEL SERVICE (host_kernel) — RA-live substrate
Promotes the persistent-kernel spike into a real service. next/kernel/host_kernel.erl: boots flow_store, registers named behavior flows (blog_digest), then blocks in http:listen so the er-scheduler + gen_server stay alive across requests. Parameterised flow routes (paths matched by byte prefix — binary =:= is buggy): GET /flow/start/<category> starts the flow with that category and returns '<InstanceId>:<status>' (suspended|done); GET /flow/resume/<id> resumes that instance. Path plumbing (starts_with / last_seg / field) is byte-level for portability. next/kernel/serve.sh: the persistent service launcher (container entrypoint / local) — loads the runtime + next/flow + the kernel, then host_kernel:start(); sleep infinity holds stdin so the listener serves forever. next/tests/host_kernel.sh: drives it over HTTP — 4/4: newsletter → instance 1 SUSPENDED, urgent → 2 DONE, draft → 3 DONE (skipped), resume 1 in a SEPARATE request → DONE (durable state persists across requests). serve.sh launcher verified live (bind + start + resume). This is the RA-live substrate: a working durable-execution service the host drives over HTTP. Remaining for RA-live: deploy it (a container/placement), point host/ra.sx's real-eval at it (POST /flow instead of in-process erlang-eval-ast), route a durable binding to RA. TA-live adds inbox/ outbox routes on the same kernel. Co-Authored-By: Claude Opus 4.8 <noreply@anthropic.com>
This commit is contained in:
71
next/kernel/host_kernel.erl
Normal file
71
next/kernel/host_kernel.erl
Normal file
@@ -0,0 +1,71 @@
|
||||
%% next/kernel/host_kernel.erl — the persistent DURABLE-EXECUTION kernel the host talks to (RA-live).
|
||||
%% A long-lived next/ service: boots flow_store + registers named behavior flows, then blocks in
|
||||
%% http:listen so the er-scheduler (and flow_store's gen_server) stay alive across requests. The host
|
||||
%% drives durable flows over HTTP; instances persist between requests (spike-proven).
|
||||
%%
|
||||
%% Routes (path-parameterised — binary =:= is buggy in this port, so paths are matched by byte
|
||||
%% prefix, not equality):
|
||||
%;; GET /flow/start/<category> -> start blog_digest with that category -> "<InstanceId>:<status>"
|
||||
%% (status = suspended | done)
|
||||
%% GET /flow/resume/<id> -> resume instance <id> (morning timer) -> "resume:<status>"
|
||||
%% This is the substrate for RA-live; TA-live adds inbox/outbox routes on the same kernel.
|
||||
-module(host_kernel).
|
||||
-export([start/1]).
|
||||
|
||||
start(Port) ->
|
||||
flow_store:start_link(),
|
||||
FF = fun (_) -> [f1, f2, f3] end,
|
||||
flow_store:register_flow(blog_digest, blog_publish_digest:build([{fetch_followers, FF}])),
|
||||
http:listen(Port, fun (Req) -> route(Req) end).
|
||||
|
||||
route(Req) ->
|
||||
[{status, 200}, {headers, []}, {body, respond(field(path, Req))}].
|
||||
|
||||
respond(P) ->
|
||||
case starts_with(P, [47,102,108,111,119,47,115,116,97,114,116,47]) of % "/flow/start/"
|
||||
true -> do_start(last_seg(P));
|
||||
false ->
|
||||
case starts_with(P, [47,102,108,111,119,47,114,101,115,117,109,101,47]) of % "/flow/resume/"
|
||||
true -> do_resume(last_seg(P));
|
||||
false -> <<"path:unknown">>
|
||||
end
|
||||
end.
|
||||
|
||||
%% start blog_digest with the path's <category> -> "<Id>:suspended" | "<Id>:done"
|
||||
do_start(CatChars) ->
|
||||
Cat = list_to_atom(CatChars),
|
||||
Env = [{activity, [{type, create}, {actor, site}, {id, <<110,49>>},
|
||||
{object, [{type, article}, {category, Cat}]}]},
|
||||
{actor, site}],
|
||||
case flow_store:start(blog_digest, Env) of
|
||||
{ok, Id, {flow_suspended, _}} -> id_status(Id, [115,117,115,112,101,110,100,101,100]); % "suspended"
|
||||
{ok, Id, {flow_done, _}} -> id_status(Id, [100,111,110,101]); % "done"
|
||||
_ -> <<"start:other">>
|
||||
end.
|
||||
|
||||
%% resume instance <id> -> "resume:done" | "resume:other"
|
||||
do_resume(IdChars) ->
|
||||
case flow_store:resume(list_to_integer(IdChars), morning_ts) of
|
||||
{ok, {flow_done, _}} -> <<"resume:done">>;
|
||||
{flow_done, _} -> <<"resume:done">>;
|
||||
_ -> <<"resume:other">>
|
||||
end.
|
||||
|
||||
%% "<Id>:<statusChars>" as a response binary
|
||||
id_status(Id, StatusChars) ->
|
||||
list_to_binary(integer_to_list(Id) ++ [58] ++ StatusChars). % ++ ":" ++
|
||||
|
||||
%% ── plumbing (byte-level; string-literal charlists avoided for portability) ──
|
||||
starts_with(Bin, Prefix) -> starts_with_(binary_to_list(Bin), Prefix).
|
||||
starts_with_(_, []) -> true;
|
||||
starts_with_([C | T], [C | P]) -> starts_with_(T, P);
|
||||
starts_with_(_, _) -> false.
|
||||
|
||||
last_seg(Bin) -> last_seg_(binary_to_list(Bin), []).
|
||||
last_seg_([], Acc) -> lists:reverse(Acc);
|
||||
last_seg_([47 | T], _) -> last_seg_(T, []); % reset at each '/'
|
||||
last_seg_([C | T], Acc) -> last_seg_(T, [C | Acc]).
|
||||
|
||||
field(K, [{K, V} | _]) -> V;
|
||||
field(K, [_ | Rest]) -> field(K, Rest);
|
||||
field(_, []) -> nil.
|
||||
40
next/kernel/serve.sh
Executable file
40
next/kernel/serve.sh
Executable file
@@ -0,0 +1,40 @@
|
||||
#!/usr/bin/env bash
|
||||
# next/kernel/serve.sh — run host_kernel as a PERSISTENT durable-execution service (RA-live substrate).
|
||||
# Loads the Erlang-on-SX runtime + next/flow + the kernel, then host_kernel:start($KERNEL_PORT) blocks
|
||||
# in http:listen forever — keeping the er-scheduler + flow_store's gen_server alive so durable flow
|
||||
# instances survive across requests. Doubles as a container entrypoint and a local launcher. The host's
|
||||
# RA runner drives it over HTTP (GET /flow/start/<category>, GET /flow/resume/<id>).
|
||||
set -uo pipefail
|
||||
cd "${SX_PROJECT_DIR:-$(git rev-parse --show-toplevel 2>/dev/null || echo .)}"
|
||||
|
||||
SX_SERVER="${SX_SERVER:-hosts/ocaml/_build/default/bin/sx_server.exe}"
|
||||
[ -x "$SX_SERVER" ] || SX_SERVER="/root/rose-ash/hosts/ocaml/_build/default/bin/sx_server.exe"
|
||||
if [ ! -x "$SX_SERVER" ]; then echo "ERROR: sx_server.exe not found." >&2; exit 1; fi
|
||||
PORT="${KERNEL_PORT:-8930}"
|
||||
|
||||
# The epoch stream loads everything then calls start (which blocks in http:listen). `sleep infinity`
|
||||
# holds stdin open so EOF doesn't exit(0) before/while the listener serves.
|
||||
{
|
||||
cat <<EPOCHS
|
||||
(epoch 1)
|
||||
(load "lib/erlang/tokenizer.sx")
|
||||
(load "lib/erlang/parser.sx")
|
||||
(load "lib/erlang/parser-core.sx")
|
||||
(load "lib/erlang/parser-expr.sx")
|
||||
(load "lib/erlang/parser-module.sx")
|
||||
(load "lib/erlang/transpile.sx")
|
||||
(load "lib/erlang/runtime.sx")
|
||||
(load "lib/erlang/vm/dispatcher.sx")
|
||||
(epoch 2)
|
||||
(eval "(er-load-gen-server!)")
|
||||
(eval "(get (erlang-load-module (file-read \"next/kernel/envelope.erl\")) :name)")
|
||||
(eval "(get (erlang-load-module (file-read \"next/flow/flow.erl\")) :name)")
|
||||
(eval "(get (erlang-load-module (file-read \"next/flow/flow_spec.erl\")) :name)")
|
||||
(eval "(get (erlang-load-module (file-read \"next/flow/flow_store.erl\")) :name)")
|
||||
(eval "(get (erlang-load-module (file-read \"next/flow/flows/blog_publish_digest.erl\")) :name)")
|
||||
(eval "(get (erlang-load-module (file-read \"next/kernel/host_kernel.erl\")) :name)")
|
||||
(epoch 3)
|
||||
(eval "(erlang-eval-ast \"host_kernel:start($PORT)\")")
|
||||
EPOCHS
|
||||
sleep infinity
|
||||
} | exec "$SX_SERVER"
|
||||
67
next/tests/host_kernel.sh
Executable file
67
next/tests/host_kernel.sh
Executable file
@@ -0,0 +1,67 @@
|
||||
#!/usr/bin/env bash
|
||||
# next/tests/host_kernel.sh — the durable-execution kernel (host_kernel.erl) over HTTP.
|
||||
# Boots it as a background sx_server, then drives it with curl: category branching (newsletter →
|
||||
# suspend, urgent/draft → done), sequential instance ids, and RESUME of a suspended instance in a
|
||||
# SEPARATE request (durable state persists). This is the RA-live substrate.
|
||||
set -uo pipefail
|
||||
cd "$(git rev-parse --show-toplevel)"
|
||||
SX_SERVER="${SX_SERVER:-hosts/ocaml/_build/default/bin/sx_server.exe}"
|
||||
[ -x "$SX_SERVER" ] || SX_SERVER="/root/rose-ash/hosts/ocaml/_build/default/bin/sx_server.exe"
|
||||
PORT=51878
|
||||
PASS=0; FAIL=0
|
||||
EPOCH_FILE=$(mktemp); LOG_FILE=$(mktemp)
|
||||
cleanup() {
|
||||
[ -n "${SXPID:-}" ] && { kill -KILL "$SXPID" 2>/dev/null || true; wait "$SXPID" 2>/dev/null || true; }
|
||||
[ -n "${HOLDPID:-}" ] && { kill -KILL "$HOLDPID" 2>/dev/null || true; wait "$HOLDPID" 2>/dev/null || true; }
|
||||
rm -f "$EPOCH_FILE" "$LOG_FILE"
|
||||
}
|
||||
trap cleanup EXIT
|
||||
|
||||
cat > "$EPOCH_FILE" <<EPOCHS
|
||||
(epoch 1)
|
||||
(load "lib/erlang/tokenizer.sx")
|
||||
(load "lib/erlang/parser.sx")
|
||||
(load "lib/erlang/parser-core.sx")
|
||||
(load "lib/erlang/parser-expr.sx")
|
||||
(load "lib/erlang/parser-module.sx")
|
||||
(load "lib/erlang/transpile.sx")
|
||||
(load "lib/erlang/runtime.sx")
|
||||
(load "lib/erlang/vm/dispatcher.sx")
|
||||
(epoch 2)
|
||||
(eval "(er-load-gen-server!)")
|
||||
(eval "(get (erlang-load-module (file-read \"next/kernel/envelope.erl\")) :name)")
|
||||
(eval "(get (erlang-load-module (file-read \"next/flow/flow.erl\")) :name)")
|
||||
(eval "(get (erlang-load-module (file-read \"next/flow/flow_spec.erl\")) :name)")
|
||||
(eval "(get (erlang-load-module (file-read \"next/flow/flow_store.erl\")) :name)")
|
||||
(eval "(get (erlang-load-module (file-read \"next/flow/flows/blog_publish_digest.erl\")) :name)")
|
||||
(eval "(get (erlang-load-module (file-read \"next/kernel/host_kernel.erl\")) :name)")
|
||||
(epoch 3)
|
||||
(eval "(erlang-eval-ast \"host_kernel:start($PORT)\")")
|
||||
EPOCHS
|
||||
|
||||
FIFO=$(mktemp -u); mkfifo "$FIFO"
|
||||
( cat "$EPOCH_FILE"; sleep 180 ) > "$FIFO" &
|
||||
HOLDPID=$!
|
||||
"$SX_SERVER" < "$FIFO" > "$LOG_FILE" 2>&1 &
|
||||
SXPID=$!
|
||||
rm -f "$FIFO"
|
||||
|
||||
echo "── waiting for host_kernel to bind :$PORT ──"
|
||||
BOUND=""
|
||||
for i in $(seq 1 300); do
|
||||
if (exec 3<>/dev/tcp/127.0.0.1/$PORT) 2>/dev/null; then BOUND=1; exec 3>&- 3<&-; echo "bound (iter $i)"; break; fi
|
||||
sleep 1
|
||||
done
|
||||
if [ -z "$BOUND" ]; then echo "FAIL: never bound"; tail -25 "$LOG_FILE"; exit 1; fi
|
||||
|
||||
ck() { local got; got=$(curl -s -m 8 "http://127.0.0.1:$PORT$1"); if echo "$got" | grep -q "$2"; then PASS=$((PASS+1)); echo " ok [$3] $1 → $got"; else FAIL=$((FAIL+1)); echo " FAIL [$3] $1 → '$got' (want '$2')"; fi; }
|
||||
|
||||
echo "── driving the kernel over HTTP ──"
|
||||
ck /flow/start/newsletter "1:suspended" "newsletter → instance 1, SUSPENDED (durable wait)"
|
||||
ck /flow/start/urgent "2:done" "urgent → instance 2, DONE (sync branch)"
|
||||
ck /flow/start/draft "3:done" "draft → instance 3, DONE (skipped)"
|
||||
ck /flow/resume/1 "resume:done" "resume instance 1 (SEPARATE request) → DONE (persisted)"
|
||||
|
||||
echo "─────────────────────────────────────────────────────"
|
||||
echo "PASS=$PASS FAIL=$FAIL"
|
||||
[ "$FAIL" -eq 0 ] || { echo "--- kernel log tail ---"; tail -20 "$LOG_FILE"; }
|
||||
Reference in New Issue
Block a user