Compare commits
9 Commits
loops/fed-
...
loops/fed-
| Author | SHA1 | Date | |
|---|---|---|---|
| 6c9b96390f | |||
| 6b4850b34e | |||
| fc6a47ad62 | |||
| 8b3d92ed5f | |||
| bba2d7e5cd | |||
| 89dd23c287 | |||
| 441a895737 | |||
| 8d54028c7f | |||
| 5959a97dca |
91
next/flow/README.md
Normal file
91
next/flow/README.md
Normal file
@@ -0,0 +1,91 @@
|
||||
# flow-on-erlang — durable workflows in the fed-sx runtime
|
||||
|
||||
A native Erlang-on-SX port of the Scheme flow engine (`lib/flow`), so
|
||||
the fed-sx kernel can fan arriving activities out into durable,
|
||||
branching, multi-step business flows **in its own runtime** — no
|
||||
cross-guest FFI, no marshalling, no Scheme dependency. The seed of a
|
||||
real engine that can later supersede the Scheme one for substrate use.
|
||||
|
||||
Run the suite: `bash next/flow/conformance.sh` → engine conformance.
|
||||
|
||||
## Model
|
||||
|
||||
A **flow** is an Erlang `fun(Ctx) -> Ctx`. Combinators (`flow_spec`)
|
||||
compose flows; user code stays value-level (the functions you hand to
|
||||
`flow_node`/`branch`/… take and return plain values). A flow that
|
||||
ignores its input is a thunk; composition *is* function composition.
|
||||
|
||||
```erlang
|
||||
F = flow_spec:sequence([
|
||||
flow_spec:flow_node(fun(Draft) -> Draft + 1 end),
|
||||
flow_spec:branch(fun(P) -> P >= 3 end,
|
||||
flow_spec:flow_const(ok),
|
||||
flow_spec:flow_const(rejected))]),
|
||||
flow:run(F, 2) %% => {flow_done, ok}
|
||||
```
|
||||
|
||||
## Durability — deterministic replay
|
||||
|
||||
Same semantics as the Scheme engine: a flow re-runs from the top on
|
||||
every resume; effects/non-determinism go through `flow:suspend/1`,
|
||||
whose resolved values are logged; an already-resolved suspend replays
|
||||
its logged value, and the first unresolved suspend short-circuits back
|
||||
to the driver. The persisted state is the **replay log** — plain
|
||||
`[{Tag, Value}]` data — so nothing live (no continuation, no process)
|
||||
is ever serialized; an instance survives restart by re-driving its
|
||||
named flow against its log.
|
||||
|
||||
```erlang
|
||||
flow_store:register_flow(publish, F),
|
||||
{ok, Id, R} = flow_store:start(publish, Draft), %% R = {flow_suspended, Tag} | {flow_done, V}
|
||||
%% ... driver performs the effect for Tag, then:
|
||||
flow_store:resume(Id, EffectResult) %% re-drives; completes or suspends again
|
||||
```
|
||||
|
||||
## Why railway threading instead of call/cc + a global
|
||||
|
||||
The Scheme engine uses an escape-only `call/cc` plus a mutable global
|
||||
replay log. This Erlang-on-SX runtime can't do either, and has a third
|
||||
sharp edge:
|
||||
|
||||
- **No re-enterable continuation** — but suspend only needs to *escape*,
|
||||
which Erlang `throw` could do …
|
||||
- **… except a blocking `receive` / `gen_server:call` inside a `try`
|
||||
deadlocks** the cooperative scheduler. So `suspend` must not consult
|
||||
the log via a registry process while inside a `try`.
|
||||
- **No process dictionary** — so there is no ambient per-process slot to
|
||||
stash the replay log in.
|
||||
|
||||
The resolution: thread the replay log through a railway-style **context**
|
||||
and make `suspend` *short-circuit* (like a `fail` value) rather than
|
||||
throw. No ambient state, no throw, no gen_server in the hot path —
|
||||
purely functional, which sidesteps all three constraints. The driver
|
||||
(`flow_store`) is the only stateful part, and it calls the pure
|
||||
`flow:drive/3` from inside `handle_call`, never wrapping a blocking
|
||||
receive.
|
||||
|
||||
A `Ctx` is `{flow_cont, Value, Log}` (running) or `{flow_susp, Tag,
|
||||
Log}` (short-circuited); every combinator passes a suspended context
|
||||
straight through.
|
||||
|
||||
## Modules
|
||||
|
||||
| Module | Role |
|
||||
|---|---|
|
||||
| `flow.erl` | pure replay driver: `drive/3`, `run/2`, `suspend/1`, the `Ctx` constructors/accessors |
|
||||
| `flow_spec.erl` | combinator algebra: leaves, `sequence`/`parallel`/`map_flow`, `flow_while`/`flow_until`, `branch`, railway `fail`/`recover`/`attempt`, `tap`, `try_catch`/`retry` |
|
||||
| `flow_store.erl` | durable gen_server: named-flow registry + instance table + `start`/`resume`/`status` |
|
||||
|
||||
## Consumed by
|
||||
|
||||
The fed-sx kernel's trigger fan-out (`pipeline.erl` + `flow_dispatch`)
|
||||
starts named flows from arriving activities; see
|
||||
`plans/fed-sx-host-types.md` and the triggers phases.
|
||||
|
||||
## Not yet (later layers)
|
||||
|
||||
- Persisting instance logs to the kernel's durable on-disk log (the
|
||||
data shape is already restart-ready; only the backing is in-memory).
|
||||
- `parallel` with multiple independent suspends resolving concurrently
|
||||
(current `parallel` is sequential under one shared log).
|
||||
- Full parity with the Scheme engine's distributed/remote nodes.
|
||||
206
next/flow/conformance.sh
Executable file
206
next/flow/conformance.sh
Executable file
@@ -0,0 +1,206 @@
|
||||
#!/usr/bin/env bash
|
||||
# next/flow/conformance.sh — flow-on-erlang engine conformance.
|
||||
#
|
||||
# Exercises the native Erlang-on-SX durable workflow engine
|
||||
# (next/flow/{flow,flow_spec,flow_store}.erl): the combinator algebra,
|
||||
# the deterministic-replay suspend/resume core, and the durable store.
|
||||
# This is the gate for the engine, replacing lib/flow/conformance.sh
|
||||
# (the Scheme engine) for the fed-sx substrate — the kernel's trigger
|
||||
# fan-out drives flows in its own runtime, with no cross-guest FFI.
|
||||
|
||||
set -uo pipefail
|
||||
cd "$(git rev-parse --show-toplevel)"
|
||||
|
||||
SX_SERVER="${SX_SERVER:-hosts/ocaml/_build/default/bin/sx_server.exe}"
|
||||
if [ ! -x "$SX_SERVER" ]; then
|
||||
SX_SERVER="/root/rose-ash/hosts/ocaml/_build/default/bin/sx_server.exe"
|
||||
fi
|
||||
if [ ! -x "$SX_SERVER" ]; then
|
||||
echo "ERROR: sx_server.exe not found." >&2
|
||||
exit 1
|
||||
fi
|
||||
|
||||
VERBOSE="${1:-}"
|
||||
PASS=0; FAIL=0; ERRORS=""
|
||||
TMPFILE=$(mktemp); trap "rm -f $TMPFILE" EXIT
|
||||
|
||||
# Common combinator shorthands built per-epoch (Erlang locals don't
|
||||
# survive across erlang-eval-ast calls; the gen_server state does).
|
||||
N1='flow_spec:flow_node(fun(X) -> X + 1 end)'
|
||||
N2='flow_spec:flow_node(fun(X) -> X * 2 end)'
|
||||
SUSP_FLOW='flow_spec:sequence([flow_spec:flow_node(fun(X) -> X + 1 end), flow:suspend(wait1), flow_spec:flow_node(fun(V) -> {resumed, V} end)])'
|
||||
TWO_SUSP='flow_spec:sequence([flow:suspend(a), flow_spec:flow_node(fun(V) -> V * 10 end), flow:suspend(b), flow_spec:flow_node(fun(V) -> V + 1 end)])'
|
||||
|
||||
cat > "$TMPFILE" <<EPOCHS
|
||||
(epoch 1)
|
||||
(load "lib/erlang/tokenizer.sx")
|
||||
(load "lib/erlang/parser.sx")
|
||||
(load "lib/erlang/parser-core.sx")
|
||||
(load "lib/erlang/parser-expr.sx")
|
||||
(load "lib/erlang/parser-module.sx")
|
||||
(load "lib/erlang/transpile.sx")
|
||||
(load "lib/erlang/runtime.sx")
|
||||
(load "lib/erlang/vm/dispatcher.sx")
|
||||
(epoch 2)
|
||||
(eval "(er-load-gen-server!)")
|
||||
(epoch 3)
|
||||
(eval "(get (erlang-load-module (file-read \"next/flow/flow.erl\")) :name)")
|
||||
(epoch 4)
|
||||
(eval "(get (erlang-load-module (file-read \"next/flow/flow_spec.erl\")) :name)")
|
||||
(epoch 5)
|
||||
(eval "(get (erlang-load-module (file-read \"next/flow/flow_store.erl\")) :name)")
|
||||
|
||||
;; ── leaves ─────────────────────────────────────────────────
|
||||
(epoch 10)
|
||||
(eval "(get (erlang-eval-ast \"flow:run(flow_spec:flow_id(), 7) =:= {flow_done, 7}\") :name)")
|
||||
(epoch 11)
|
||||
(eval "(get (erlang-eval-ast \"flow:run(flow_spec:flow_const(k), 7) =:= {flow_done, k}\") :name)")
|
||||
(epoch 12)
|
||||
(eval "(get (erlang-eval-ast \"flow:run(${N1}, 41) =:= {flow_done, 42}\") :name)")
|
||||
|
||||
;; ── threading / fan-out / iteration ────────────────────────
|
||||
(epoch 20)
|
||||
(eval "(get (erlang-eval-ast \"flow:run(flow_spec:sequence([${N1}, ${N2}]), 3) =:= {flow_done, 8}\") :name)")
|
||||
(epoch 21)
|
||||
(eval "(get (erlang-eval-ast \"flow:run(flow_spec:parallel([flow_spec:flow_const(a), flow_spec:flow_const(b)]), 0) =:= {flow_done, [a, b]}\") :name)")
|
||||
(epoch 22)
|
||||
(eval "(get (erlang-eval-ast \"flow:run(flow_spec:map_flow(${N1}), [1, 2, 3]) =:= {flow_done, [2, 3, 4]}\") :name)")
|
||||
(epoch 23)
|
||||
(eval "(get (erlang-eval-ast \"flow:run(flow_spec:flow_while(fun(X) -> X < 10 end, ${N1}, 100), 0) =:= {flow_done, 10}\") :name)")
|
||||
(epoch 24)
|
||||
(eval "(get (erlang-eval-ast \"flow:run(flow_spec:flow_until(fun(X) -> X >= 5 end, ${N1}, 100), 0) =:= {flow_done, 5}\") :name)")
|
||||
(epoch 25)
|
||||
(eval "(get (erlang-eval-ast \"flow:run(flow_spec:flow_while(fun(_) -> true end, ${N1}, 3), 0) =:= {flow_done, 3}\") :name)")
|
||||
|
||||
;; ── branching ──────────────────────────────────────────────
|
||||
(epoch 30)
|
||||
(eval "(get (erlang-eval-ast \"flow:run(flow_spec:branch(fun(X) -> X > 0 end, flow_spec:flow_const(pos), flow_spec:flow_const(neg)), 5) =:= {flow_done, pos}\") :name)")
|
||||
(epoch 31)
|
||||
(eval "(get (erlang-eval-ast \"flow:run(flow_spec:branch(fun(X) -> X > 0 end, flow_spec:flow_const(pos), flow_spec:flow_const(neg)), -5) =:= {flow_done, neg}\") :name)")
|
||||
|
||||
;; ── railway failure ────────────────────────────────────────
|
||||
(epoch 40)
|
||||
(eval "(get (erlang-eval-ast \"flow_spec:failed(flow_spec:fail(x)) andalso (flow_spec:failed(42) =:= false)\") :name)")
|
||||
(epoch 41)
|
||||
(eval "(get (erlang-eval-ast \"flow:run(flow_spec:attempt([flow_spec:flow_node(fun(_) -> flow_spec:fail(boom) end), flow_spec:flow_node(fun(_) -> 999 end)]), 0) =:= {flow_done, {flow_fail, boom}}\") :name)")
|
||||
(epoch 42)
|
||||
(eval "(get (erlang-eval-ast \"flow:run(flow_spec:attempt([${N1}, ${N2}]), 3) =:= {flow_done, 8}\") :name)")
|
||||
(epoch 43)
|
||||
(eval "(get (erlang-eval-ast \"flow:run(flow_spec:recover(flow_spec:flow_node(fun(_) -> flow_spec:fail(bad) end), fun(R) -> {ok, R} end), 0) =:= {flow_done, {ok, bad}}\") :name)")
|
||||
|
||||
;; ── effects / exceptions ───────────────────────────────────
|
||||
(epoch 50)
|
||||
(eval "(get (erlang-eval-ast \"flow:run(flow_spec:tap(fun(_) -> ok end), 7) =:= {flow_done, 7}\") :name)")
|
||||
(epoch 51)
|
||||
(eval "(get (erlang-eval-ast \"flow:run(flow_spec:try_catch(flow_spec:flow_node(fun(_) -> throw(oops) end), fun(E) -> {caught, E} end), 0) =:= {flow_done, {caught, oops}}\") :name)")
|
||||
(epoch 52)
|
||||
(eval "(get (erlang-eval-ast \"flow:run(flow_spec:retry(5, flow_spec:flow_node(fun(X) -> X + 1 end)), 1) =:= {flow_done, 2}\") :name)")
|
||||
|
||||
;; ── suspend / replay (deterministic-replay core) ───────────
|
||||
(epoch 60)
|
||||
(eval "(get (erlang-eval-ast \"flow:run(${SUSP_FLOW}, 0) =:= {flow_suspended, wait1}\") :name)")
|
||||
(epoch 61)
|
||||
(eval "(get (erlang-eval-ast \"flow:drive(${SUSP_FLOW}, 0, [{wait1, 99}]) =:= {flow_done, {resumed, 99}}\") :name)")
|
||||
(epoch 62)
|
||||
(eval "(get (erlang-eval-ast \"flow:run(flow_spec:sequence([flow:suspend(a), flow:suspend(b)]), 0) =:= {flow_suspended, a}\") :name)")
|
||||
;; wait/1 — timer-style suspend that PRESERVES the value on resume
|
||||
(epoch 63)
|
||||
(eval "(get (erlang-eval-ast \"flow:run(flow_spec:sequence([flow:wait(t), flow_spec:flow_node(fun(X) -> X + 1 end)]), 5) =:= {flow_suspended, t}\") :name)")
|
||||
(epoch 64)
|
||||
(eval "(get (erlang-eval-ast \"flow:drive(flow_spec:sequence([flow:wait(t), flow_spec:flow_node(fun(X) -> X + 1 end)]), 5, [{t, ignored}]) =:= {flow_done, 6}\") :name)")
|
||||
|
||||
;; ── durable store: registry ────────────────────────────────
|
||||
(epoch 70)
|
||||
(eval "(get (erlang-eval-ast \"flow_store:start_link(), flow_store:register_flow(f1, ${N1}), flow_store:resolve_flow(f1) =/= not_found andalso flow_store:registered_flows() =:= [f1]\") :name)")
|
||||
(epoch 71)
|
||||
(eval "(get (erlang-eval-ast \"flow_store:start_link(), flow_store:resolve_flow(ghost) =:= not_found\") :name)")
|
||||
|
||||
;; ── durable store: start / resume ──────────────────────────
|
||||
;; one-shot flow runs to completion on start
|
||||
(epoch 80)
|
||||
(eval "(get (erlang-eval-ast \"flow_store:start_link(), flow_store:register_flow(done1, ${N1}), flow_store:start(done1, 41) =:= {ok, 1, {flow_done, 42}}\") :name)")
|
||||
;; suspending flow: start suspends, resume completes
|
||||
(epoch 81)
|
||||
(eval "(get (erlang-eval-ast \"flow_store:start_link(), flow_store:register_flow(s1, ${SUSP_FLOW}), {ok, Id, R} = flow_store:start(s1, 10), R =:= {flow_suspended, wait1} andalso flow_store:status(Id) =:= {ok, {suspended, wait1}}\") :name)")
|
||||
(epoch 82)
|
||||
(eval "(get (erlang-eval-ast \"flow_store:start_link(), flow_store:register_flow(s1, ${SUSP_FLOW}), {ok, Id, _} = flow_store:start(s1, 10), flow_store:resume(Id, 99) =:= {ok, {flow_done, {resumed, 99}}}\") :name)")
|
||||
(epoch 83)
|
||||
(eval "(get (erlang-eval-ast \"flow_store:start_link(), flow_store:register_flow(s1, ${SUSP_FLOW}), {ok, Id, _} = flow_store:start(s1, 10), flow_store:resume(Id, 99), flow_store:status(Id) =:= {ok, {done, {resumed, 99}}}\") :name)")
|
||||
;; two-suspend flow: resume chain accumulates the replay log
|
||||
(epoch 84)
|
||||
(eval "(get (erlang-eval-ast \"flow_store:start_link(), flow_store:register_flow(s2, ${TWO_SUSP}), {ok, Id, _} = flow_store:start(s2, 0), {ok, R1} = flow_store:resume(Id, 5), R2 = flow_store:resume(Id, 7), R1 =:= {flow_suspended, b} andalso R2 =:= {ok, {flow_done, 8}}\") :name)")
|
||||
;; error paths
|
||||
(epoch 85)
|
||||
(eval "(get (erlang-eval-ast \"flow_store:start_link(), flow_store:start(ghost, 0) =:= {error, no_such_flow}\") :name)")
|
||||
(epoch 86)
|
||||
(eval "(get (erlang-eval-ast \"flow_store:start_link(), flow_store:resume(999, x) =:= {error, no_such_instance}\") :name)")
|
||||
(epoch 87)
|
||||
(eval "(get (erlang-eval-ast \"flow_store:start_link(), flow_store:register_flow(done1, ${N1}), {ok, Id, _} = flow_store:start(done1, 0), flow_store:resume(Id, x) =:= {error, already_done}\") :name)")
|
||||
EPOCHS
|
||||
|
||||
OUTPUT=$(timeout 360 "$SX_SERVER" < "$TMPFILE" 2>/dev/null)
|
||||
|
||||
check() {
|
||||
local epoch="$1" desc="$2" expected="$3"
|
||||
local actual
|
||||
actual=$(echo "$OUTPUT" | awk -v e="$epoch" '
|
||||
$0 ~ "^\\(ok-len " e " " { getline; print; exit }
|
||||
$0 ~ "^\\(ok " e " " { print; exit }
|
||||
$0 ~ "^\\(error " e " " { print; exit }
|
||||
')
|
||||
[ -z "$actual" ] && actual="<no output for epoch $epoch>"
|
||||
if echo "$actual" | grep -qF -- "$expected"; then
|
||||
PASS=$((PASS+1))
|
||||
[ "$VERBOSE" = "-v" ] && echo " ok $desc"
|
||||
else
|
||||
FAIL=$((FAIL+1))
|
||||
ERRORS+=" FAIL [$desc] (epoch $epoch) expected: $expected | actual: $actual
|
||||
"
|
||||
fi
|
||||
}
|
||||
|
||||
check 3 "flow module loaded" "flow"
|
||||
check 4 "flow_spec module loaded" "flow_spec"
|
||||
check 5 "flow_store module loaded" "flow_store"
|
||||
check 10 "flow_id" "true"
|
||||
check 11 "flow_const" "true"
|
||||
check 12 "flow_node" "true"
|
||||
check 20 "sequence threads left-to-right" "true"
|
||||
check 21 "parallel fans out" "true"
|
||||
check 22 "map_flow over a list" "true"
|
||||
check 23 "flow_while bounded by pred" "true"
|
||||
check 24 "flow_until bounded by pred" "true"
|
||||
check 25 "flow_while bounded by max" "true"
|
||||
check 30 "branch then-arm" "true"
|
||||
check 31 "branch else-arm" "true"
|
||||
check 40 "failed? predicate" "true"
|
||||
check 41 "attempt stops at first fail" "true"
|
||||
check 42 "attempt threads on success" "true"
|
||||
check 43 "recover handles fail value" "true"
|
||||
check 50 "tap pass-through" "true"
|
||||
check 51 "try_catch catches a raise" "true"
|
||||
check 52 "retry runs node" "true"
|
||||
check 60 "suspend miss short-circuits" "true"
|
||||
check 61 "suspend replay completes" "true"
|
||||
check 62 "first of two suspends wins" "true"
|
||||
check 63 "wait short-circuits on miss" "true"
|
||||
check 64 "wait preserves value on resume" "true"
|
||||
check 70 "register + resolve + list" "true"
|
||||
check 71 "resolve unknown -> not_found" "true"
|
||||
check 80 "start one-shot -> done" "true"
|
||||
check 81 "start suspends + status" "true"
|
||||
check 82 "resume completes" "true"
|
||||
check 83 "status after resume = done" "true"
|
||||
check 84 "two-suspend resume chain" "true"
|
||||
check 85 "start unknown -> no_such_flow" "true"
|
||||
check 86 "resume unknown -> no_such_instance" "true"
|
||||
check 87 "resume a done flow -> already_done" "true"
|
||||
|
||||
TOTAL=$((PASS+FAIL))
|
||||
if [ $FAIL -eq 0 ]; then
|
||||
echo "ok $PASS/$TOTAL flow-on-erlang engine tests passed"
|
||||
else
|
||||
echo "FAIL $PASS/$TOTAL passed, $FAIL failed:"
|
||||
echo "$ERRORS"
|
||||
fi
|
||||
[ $FAIL -eq 0 ]
|
||||
102
next/flow/flow.erl
Normal file
102
next/flow/flow.erl
Normal file
@@ -0,0 +1,102 @@
|
||||
-module(flow).
|
||||
-export([drive/3, run/2,
|
||||
cont/2, susp/2, is_susp/1, ctx_value/1, ctx_log/1,
|
||||
suspend/1, wait/1, log_lookup/2]).
|
||||
|
||||
%% flow-on-erlang — the deterministic-replay core. A native Erlang port
|
||||
%% of the Scheme flow engine (lib/flow), so the fed-sx kernel can fan
|
||||
%% activities out into durable business flows in its own runtime (no
|
||||
%% cross-guest FFI).
|
||||
%%
|
||||
%% Durability model — identical semantics to the Scheme engine, but
|
||||
%% adapted to this Erlang-on-SX runtime, which has three hard
|
||||
%% constraints the Scheme host doesn't: no escape continuation that can
|
||||
%% be re-entered, no process dictionary, and (critically) a blocking
|
||||
%% `receive` / `gen_server:call` inside a `try` deadlocks the
|
||||
%% cooperative scheduler. So instead of the Scheme engine's
|
||||
%% mutable-global + call/cc-escape, the replay log is THREADED through a
|
||||
%% railway-style context and `suspend` SHORT-CIRCUITS (like a fail
|
||||
%% value) rather than throwing. No ambient state, no throw, no
|
||||
%% gen_server — purely functional, which sidesteps every constraint.
|
||||
%%
|
||||
%% A node is `fun(Ctx) -> Ctx`. A Ctx is one of:
|
||||
%% {flow_cont, Value, Log} — running; Value is the current value
|
||||
%% {flow_susp, Tag, Log} — short-circuited at suspend Tag
|
||||
%% Log is the replay log: [{Tag, ResolvedValue}, ...]. Combinators
|
||||
%% (flow_spec) thread Ctx and pass {flow_susp,...} straight through, so
|
||||
%% once a flow suspends nothing downstream runs.
|
||||
%%
|
||||
%% suspend/1 is the load-bearing primitive: a node that, given the
|
||||
%% running Ctx, looks Tag up in the replay log. A hit replaces the
|
||||
%% current value with the logged value and continues; a miss
|
||||
%% short-circuits to {flow_susp, Tag, Log}. ALL effects/non-determinism
|
||||
%% go through a suspend node so they run once — in the driver, between
|
||||
%% drives — and their results are logged, never re-run on replay. Tags
|
||||
%% must be unique and deterministic across replays.
|
||||
|
||||
%% ── context constructors / accessors ────────────────────────────
|
||||
|
||||
cont(Value, Log) -> {flow_cont, Value, Log}.
|
||||
susp(Tag, Log) -> {flow_susp, Tag, Log}.
|
||||
|
||||
is_susp({flow_susp, _, _}) -> true;
|
||||
is_susp(_) -> false.
|
||||
|
||||
ctx_value({flow_cont, Value, _}) -> Value;
|
||||
ctx_value({flow_susp, _, _}) -> undefined.
|
||||
|
||||
ctx_log({flow_cont, _, Log}) -> Log;
|
||||
ctx_log({flow_susp, _, Log}) -> Log.
|
||||
|
||||
%% ── suspend node ────────────────────────────────────────────────
|
||||
|
||||
suspend(Tag) ->
|
||||
fun (Ctx) ->
|
||||
case Ctx of
|
||||
{flow_susp, _, _} -> Ctx;
|
||||
{flow_cont, _Value, Log} ->
|
||||
case log_lookup(Tag, Log) of
|
||||
{ok, Resolved} -> {flow_cont, Resolved, Log};
|
||||
miss -> {flow_susp, Tag, Log}
|
||||
end
|
||||
end
|
||||
end.
|
||||
|
||||
%% wait(Tag) — a timer-style suspend that PRESERVES the current value
|
||||
%% instead of replacing it with the resolved one. Use it for pure
|
||||
%% waits ("resume in the morning") where the resume is just a signal,
|
||||
%% not a result: on the first pass it short-circuits like suspend; once
|
||||
%% Tag is in the log the value flows through unchanged, so downstream
|
||||
%% steps still see the value (e.g. the env) they had before the wait.
|
||||
wait(Tag) ->
|
||||
fun (Ctx) ->
|
||||
case Ctx of
|
||||
{flow_susp, _, _} -> Ctx;
|
||||
{flow_cont, Value, Log} ->
|
||||
case log_lookup(Tag, Log) of
|
||||
{ok, _} -> {flow_cont, Value, Log};
|
||||
miss -> {flow_susp, Tag, Log}
|
||||
end
|
||||
end
|
||||
end.
|
||||
|
||||
log_lookup(_, []) -> miss;
|
||||
log_lookup(Tag, [{Tag, Value} | _]) -> {ok, Value};
|
||||
log_lookup(Tag, [_ | Rest]) -> log_lookup(Tag, Rest).
|
||||
|
||||
%% ── driver ──────────────────────────────────────────────────────
|
||||
|
||||
%% drive(Flow, Input, Log) — run Flow under the replay Log.
|
||||
%% {flow_done, Result} — flow completed
|
||||
%% {flow_suspended, Tag} — flow short-circuited at an unresolved
|
||||
%% suspend; the driver resolves Tag, appends
|
||||
%% {Tag, Value} to Log, and re-drives.
|
||||
drive(Flow, Input, Log) ->
|
||||
case Flow({flow_cont, Input, Log}) of
|
||||
{flow_cont, Result, _} -> {flow_done, Result};
|
||||
{flow_susp, Tag, _} -> {flow_suspended, Tag}
|
||||
end.
|
||||
|
||||
%% run(Flow, Input) — drive with an empty replay log.
|
||||
run(Flow, Input) ->
|
||||
drive(Flow, Input, []).
|
||||
240
next/flow/flow_spec.erl
Normal file
240
next/flow/flow_spec.erl
Normal file
@@ -0,0 +1,240 @@
|
||||
-module(flow_spec).
|
||||
-export([flow_node/1, flow_id/0, flow_const/1,
|
||||
sequence/1, parallel/1, map_flow/1,
|
||||
flow_while/3, flow_until/3,
|
||||
branch/3, fail/1, failed/1, fail_reason/1,
|
||||
recover/2, tap/1, attempt/1, try_catch/2, retry/2]).
|
||||
|
||||
%% flow-on-erlang combinators — a native port of lib/flow/spec.sx,
|
||||
%% adapted to the railway-threaded context model in flow.erl. A node is
|
||||
%% `fun(Ctx) -> Ctx`; every combinator passes a {flow_susp,...} context
|
||||
%% straight through, so once a flow suspends nothing downstream runs.
|
||||
%% User code stays value-level: the predicates/functions handed to
|
||||
%% flow_node / branch / etc. take and return plain values, and the
|
||||
%% combinator threads them into the context.
|
||||
%%
|
||||
%% Variadic Scheme forms (sequence, parallel, attempt) take an explicit
|
||||
%% list here — the one idiom difference from the Scheme engine. Effects
|
||||
%% must go through a flow:suspend/1 node so they run once (in the
|
||||
%% driver) and replay from the log; `tap` is only for replay-safe
|
||||
%% effects (e.g. tracing).
|
||||
|
||||
%% ── leaves ──────────────────────────────────────────────────────
|
||||
|
||||
%% flow_node(F) — lift a value function F :: Value -> Value into a node.
|
||||
flow_node(F) ->
|
||||
fun (Ctx) ->
|
||||
case flow:is_susp(Ctx) of
|
||||
true -> Ctx;
|
||||
false -> flow:cont(F(flow:ctx_value(Ctx)), flow:ctx_log(Ctx))
|
||||
end
|
||||
end.
|
||||
|
||||
flow_id() ->
|
||||
fun (Ctx) -> Ctx end.
|
||||
|
||||
flow_const(V) ->
|
||||
fun (Ctx) ->
|
||||
case flow:is_susp(Ctx) of
|
||||
true -> Ctx;
|
||||
false -> flow:cont(V, flow:ctx_log(Ctx))
|
||||
end
|
||||
end.
|
||||
|
||||
%% ── threading / fan-out / iteration ─────────────────────────────
|
||||
|
||||
%% sequence(Nodes) — thread the context left-to-right. Each node
|
||||
%% self-guards on suspension, so a suspended context flows through
|
||||
%% untouched.
|
||||
sequence(Nodes) ->
|
||||
fun (Ctx) -> seq_step(Nodes, Ctx) end.
|
||||
|
||||
seq_step([], Ctx) -> Ctx;
|
||||
seq_step([N | Ns], Ctx) -> seq_step(Ns, N(Ctx)).
|
||||
|
||||
%% parallel(Nodes) — fan the input value to every node, join results
|
||||
%% into a list (sequential evaluation under one shared replay log).
|
||||
%% First child to suspend short-circuits the whole parallel.
|
||||
parallel(Nodes) ->
|
||||
fun (Ctx) ->
|
||||
case flow:is_susp(Ctx) of
|
||||
true -> Ctx;
|
||||
false -> par_step(Nodes, flow:ctx_value(Ctx), flow:ctx_log(Ctx), [])
|
||||
end
|
||||
end.
|
||||
|
||||
par_step([], _Input, Log, Acc) ->
|
||||
flow:cont(lists:reverse(Acc), Log);
|
||||
par_step([N | Ns], Input, Log, Acc) ->
|
||||
R = N(flow:cont(Input, Log)),
|
||||
case flow:is_susp(R) of
|
||||
true -> R;
|
||||
false -> par_step(Ns, Input, Log, [flow:ctx_value(R) | Acc])
|
||||
end.
|
||||
|
||||
%% map_flow(Node) — run Node over each item of a list input value.
|
||||
map_flow(Node) ->
|
||||
fun (Ctx) ->
|
||||
case flow:is_susp(Ctx) of
|
||||
true -> Ctx;
|
||||
false -> map_step(Node, flow:ctx_value(Ctx), flow:ctx_log(Ctx), [])
|
||||
end
|
||||
end.
|
||||
|
||||
map_step(_, [], Log, Acc) ->
|
||||
flow:cont(lists:reverse(Acc), Log);
|
||||
map_step(Node, [I | Is], Log, Acc) ->
|
||||
R = Node(flow:cont(I, Log)),
|
||||
case flow:is_susp(R) of
|
||||
true -> R;
|
||||
false -> map_step(Node, Is, Log, [flow:ctx_value(R) | Acc])
|
||||
end.
|
||||
|
||||
%% flow_while(Pred, Body, Max) — re-run Body (a node), threading the
|
||||
%% context, while Pred(value) holds, up to Max steps. Pred :: Value ->
|
||||
%% bool; Body :: node.
|
||||
flow_while(Pred, Body, Max) ->
|
||||
fun (Ctx) -> while_step(Pred, Body, Ctx, Max) end.
|
||||
|
||||
while_step(_, _, Ctx, N) when N =< 0 -> Ctx;
|
||||
while_step(Pred, Body, Ctx, N) ->
|
||||
case flow:is_susp(Ctx) of
|
||||
true -> Ctx;
|
||||
false ->
|
||||
case Pred(flow:ctx_value(Ctx)) of
|
||||
true -> while_step(Pred, Body, Body(Ctx), N - 1);
|
||||
_ -> Ctx
|
||||
end
|
||||
end.
|
||||
|
||||
%% flow_until(Pred, Body, Max) — re-run Body until Pred(value) holds.
|
||||
flow_until(Pred, Body, Max) ->
|
||||
fun (Ctx) -> until_step(Pred, Body, Ctx, Max) end.
|
||||
|
||||
until_step(_, _, Ctx, N) when N =< 0 -> Ctx;
|
||||
until_step(Pred, Body, Ctx, N) ->
|
||||
case flow:is_susp(Ctx) of
|
||||
true -> Ctx;
|
||||
false ->
|
||||
case Pred(flow:ctx_value(Ctx)) of
|
||||
true -> Ctx;
|
||||
_ -> until_step(Pred, Body, Body(Ctx), N - 1)
|
||||
end
|
||||
end.
|
||||
|
||||
%% ── branching ───────────────────────────────────────────────────
|
||||
|
||||
%% branch(Pred, Then, Else) — Pred :: Value -> bool; Then/Else :: node.
|
||||
branch(Pred, Then, Else) ->
|
||||
fun (Ctx) ->
|
||||
case flow:is_susp(Ctx) of
|
||||
true -> Ctx;
|
||||
false ->
|
||||
case Pred(flow:ctx_value(Ctx)) of
|
||||
true -> Then(Ctx);
|
||||
_ -> Else(Ctx)
|
||||
end
|
||||
end
|
||||
end.
|
||||
|
||||
%% ── railway-style failure (values, not exceptions) ──────────────
|
||||
|
||||
fail(Reason) -> {flow_fail, Reason}.
|
||||
|
||||
failed({flow_fail, _}) -> true;
|
||||
failed(_) -> false.
|
||||
|
||||
fail_reason({flow_fail, R}) -> R.
|
||||
|
||||
%% recover(Node, Handler) — if Node yields a fail VALUE, run Handler on
|
||||
%% the reason; else pass through. Handler :: Reason -> Value.
|
||||
recover(Node, Handler) ->
|
||||
fun (Ctx) ->
|
||||
R = Node(Ctx),
|
||||
case flow:is_susp(R) of
|
||||
true -> R;
|
||||
false ->
|
||||
V = flow:ctx_value(R),
|
||||
case failed(V) of
|
||||
true -> flow:cont(Handler(fail_reason(V)), flow:ctx_log(R));
|
||||
false -> R
|
||||
end
|
||||
end
|
||||
end.
|
||||
|
||||
%% tap(Effect) — replay-safe side-effecting pass-through (returns the
|
||||
%% input value unchanged). Effect :: Value -> any.
|
||||
tap(Effect) ->
|
||||
fun (Ctx) ->
|
||||
case flow:is_susp(Ctx) of
|
||||
true -> Ctx;
|
||||
false -> Effect(flow:ctx_value(Ctx)), Ctx
|
||||
end
|
||||
end.
|
||||
|
||||
%% attempt(Nodes) — railway sequence: thread left-to-right but stop at
|
||||
%% the first node whose value is a fail, returning that failure.
|
||||
attempt(Nodes) ->
|
||||
fun (Ctx) -> attempt_step(Nodes, Ctx) end.
|
||||
|
||||
attempt_step([], Ctx) -> Ctx;
|
||||
attempt_step([N | Ns], Ctx) ->
|
||||
case flow:is_susp(Ctx) of
|
||||
true -> Ctx;
|
||||
false ->
|
||||
case failed(flow:ctx_value(Ctx)) of
|
||||
true -> Ctx;
|
||||
false -> attempt_step(Ns, N(Ctx))
|
||||
end
|
||||
end.
|
||||
|
||||
%% ── exception-style control ─────────────────────────────────────
|
||||
%% Nodes are pure (effects go through suspend, run by the driver), so a
|
||||
%% try around a node never wraps a blocking receive — safe in this
|
||||
%% runtime.
|
||||
|
||||
%% try_catch(Node, Handler) — run Node; if it raises, run Handler on the
|
||||
%% exception. Handler :: Exception -> Value.
|
||||
try_catch(Node, Handler) ->
|
||||
fun (Ctx) ->
|
||||
case flow:is_susp(Ctx) of
|
||||
true -> Ctx;
|
||||
false ->
|
||||
Log = flow:ctx_log(Ctx),
|
||||
try Node(Ctx) of
|
||||
R -> R
|
||||
catch
|
||||
throw:E -> flow:cont(Handler(E), Log);
|
||||
error:E -> flow:cont(Handler(E), Log);
|
||||
exit:E -> flow:cont(Handler(E), Log)
|
||||
end
|
||||
end
|
||||
end.
|
||||
|
||||
%% retry(N, Node) — run Node, retrying up to N attempts on a raise.
|
||||
retry(N, Node) ->
|
||||
fun (Ctx) -> retry_step(N, Node, Ctx) end.
|
||||
|
||||
retry_step(N, Node, Ctx) ->
|
||||
case flow:is_susp(Ctx) of
|
||||
true -> Ctx;
|
||||
false ->
|
||||
try Node(Ctx) of
|
||||
R -> R
|
||||
catch
|
||||
throw:Reason -> retry_reraise(N, Node, Ctx, throw, Reason);
|
||||
error:Reason -> retry_reraise(N, Node, Ctx, error, Reason);
|
||||
exit:Reason -> retry_reraise(N, Node, Ctx, exit, Reason)
|
||||
end
|
||||
end.
|
||||
|
||||
retry_reraise(N, Node, Ctx, Class, Reason) ->
|
||||
case N =< 1 of
|
||||
false -> retry_step(N - 1, Node, Ctx);
|
||||
true ->
|
||||
case Class of
|
||||
throw -> throw(Reason);
|
||||
error -> erlang:error(Reason);
|
||||
exit -> exit(Reason)
|
||||
end
|
||||
end.
|
||||
161
next/flow/flow_store.erl
Normal file
161
next/flow/flow_store.erl
Normal file
@@ -0,0 +1,161 @@
|
||||
-module(flow_store).
|
||||
-export([start_link/0, start_link/1, stop/0,
|
||||
register_flow/2, resolve_flow/1, registered_flows/0,
|
||||
start/2, resume/2, status/1, instances/0]).
|
||||
-export([init/1, handle_call/3, handle_cast/2, handle_info/2]).
|
||||
-behaviour(gen_server).
|
||||
|
||||
%% flow-on-erlang durable store — the named-flow registry plus the
|
||||
%% instance table that makes suspend/resume durable. flow.erl is the
|
||||
%% pure replay driver; this gen_server is the stateful shell around it,
|
||||
%% holding the registry (so triggers can reference flows by name, and
|
||||
%% so an instance can be re-resolved + replayed after a restart) and
|
||||
%% each instance's accumulated replay log.
|
||||
%%
|
||||
%% Crucially the driver stays OUT of any blocking context: start/resume
|
||||
%% call flow:drive/3 (pure — no receive, no gen_server:call) from inside
|
||||
%% handle_call, and the only message-passing is the caller's
|
||||
%% gen_server:call into this store. (A blocking receive inside a `try`
|
||||
%% deadlocks this cooperative scheduler, so the engine never does one.)
|
||||
%%
|
||||
%% State: {Registry, Instances, NextId}
|
||||
%% Registry = [{Name, FlowFun}, ...]
|
||||
%% Instances = [{Id, {Name, Input, Log, Status}}, ...]
|
||||
%% Status = {suspended, Tag} | {done, Result}
|
||||
%% Log = [{Tag, ResolvedValue}, ...] (the replay log — plain
|
||||
%% data, so an instance is fully described by its log and
|
||||
%% survives process restart by re-driving the named flow)
|
||||
%%
|
||||
%% v1 backs the store in gen_server memory; persisting the instance
|
||||
%% logs to the kernel's durable log (so flows survive an OS restart) is
|
||||
%% a later layer — the data shape is already restart-ready.
|
||||
|
||||
start_link() ->
|
||||
start_link([]).
|
||||
|
||||
start_link(InitialFlows) ->
|
||||
Pid = gen_server:start_link(flow_store, [InitialFlows]),
|
||||
erlang:register(flow_store, Pid),
|
||||
Pid.
|
||||
|
||||
stop() ->
|
||||
R = gen_server:call(flow_store, '$gen_stop'),
|
||||
erlang:unregister(flow_store),
|
||||
R.
|
||||
|
||||
%% register_flow(Name, Flow) — register a named flow (a node fun). Named
|
||||
%% rather than `register` to avoid the erlang:register/2 auto-import.
|
||||
register_flow(Name, Flow) ->
|
||||
gen_server:call(flow_store, {register_flow, Name, Flow}).
|
||||
|
||||
resolve_flow(Name) ->
|
||||
gen_server:call(flow_store, {resolve_flow, Name}).
|
||||
|
||||
registered_flows() ->
|
||||
gen_server:call(flow_store, registered_flows).
|
||||
|
||||
%% start(Name, Input) -> {ok, Id, Result} | {error, no_such_flow}.
|
||||
%% Result is {flow_done, V} | {flow_suspended, Tag}; the instance is
|
||||
%% recorded either way so a suspended flow can be resumed by Id.
|
||||
start(Name, Input) ->
|
||||
gen_server:call(flow_store, {start, Name, Input}).
|
||||
|
||||
%% resume(Id, Value) -> {ok, Result} | {error, Reason}. Resolves the
|
||||
%% instance's current suspend tag with Value (appends {Tag, Value} to
|
||||
%% its replay log) and re-drives from the top.
|
||||
resume(Id, Value) ->
|
||||
gen_server:call(flow_store, {resume, Id, Value}).
|
||||
|
||||
%% status(Id) -> {ok, {suspended, Tag}} | {ok, {done, Result}} | not_found
|
||||
status(Id) ->
|
||||
gen_server:call(flow_store, {status, Id}).
|
||||
|
||||
instances() ->
|
||||
gen_server:call(flow_store, instances).
|
||||
|
||||
%% ── gen_server ──────────────────────────────────────────────────
|
||||
|
||||
init([InitialFlows]) ->
|
||||
{ok, {InitialFlows, [], 1}}.
|
||||
|
||||
handle_call({register_flow, Name, Flow}, _From, {Reg, Ins, N}) ->
|
||||
{reply, ok, {set_keyed(Name, Flow, Reg), Ins, N}};
|
||||
handle_call({resolve_flow, Name}, _From, {Reg, Ins, N}) ->
|
||||
{reply, find_keyed(Name, Reg), {Reg, Ins, N}};
|
||||
handle_call(registered_flows, _From, {Reg, Ins, N}) ->
|
||||
{reply, [Name || {Name, _} <- Reg], {Reg, Ins, N}};
|
||||
handle_call({start, Name, Input}, _From, {Reg, Ins, N}) ->
|
||||
case find_keyed(Name, Reg) of
|
||||
not_found ->
|
||||
{reply, {error, no_such_flow}, {Reg, Ins, N}};
|
||||
{ok, Flow} ->
|
||||
case safe_drive(Flow, Input, []) of
|
||||
{ok, R} ->
|
||||
Status = result_status(R),
|
||||
Ins2 = set_keyed(N, {Name, Input, [], Status}, Ins),
|
||||
{reply, {ok, N, R}, {Reg, Ins2, N + 1}};
|
||||
{error, Crash} ->
|
||||
{reply, {error, {flow_crashed, Crash}}, {Reg, Ins, N}}
|
||||
end
|
||||
end;
|
||||
handle_call({resume, Id, Value}, _From, {Reg, Ins, N}) ->
|
||||
case find_keyed(Id, Ins) of
|
||||
not_found ->
|
||||
{reply, {error, no_such_instance}, {Reg, Ins, N}};
|
||||
{ok, {_Name, _Input, _Log, {done, _}}} ->
|
||||
{reply, {error, already_done}, {Reg, Ins, N}};
|
||||
{ok, {Name, Input, Log, {suspended, Tag}}} ->
|
||||
case find_keyed(Name, Reg) of
|
||||
not_found ->
|
||||
{reply, {error, no_such_flow}, {Reg, Ins, N}};
|
||||
{ok, Flow} ->
|
||||
NewLog = log_append(Log, Tag, Value),
|
||||
case safe_drive(Flow, Input, NewLog) of
|
||||
{ok, R} ->
|
||||
Status = result_status(R),
|
||||
Ins2 = set_keyed(Id, {Name, Input, NewLog, Status}, Ins),
|
||||
{reply, {ok, R}, {Reg, Ins2, N}};
|
||||
{error, Crash} ->
|
||||
{reply, {error, {flow_crashed, Crash}}, {Reg, Ins, N}}
|
||||
end
|
||||
end
|
||||
end;
|
||||
handle_call({status, Id}, _From, {Reg, Ins, N}) ->
|
||||
case find_keyed(Id, Ins) of
|
||||
{ok, {_Name, _Input, _Log, Status}} -> {reply, {ok, Status}, {Reg, Ins, N}};
|
||||
not_found -> {reply, not_found, {Reg, Ins, N}}
|
||||
end;
|
||||
handle_call(instances, _From, {Reg, Ins, N}) ->
|
||||
{reply, [Id || {Id, _} <- Ins], {Reg, Ins, N}}.
|
||||
|
||||
handle_cast(_, S) -> {noreply, S}.
|
||||
|
||||
handle_info(_, S) -> {noreply, S}.
|
||||
|
||||
%% ── helpers ─────────────────────────────────────────────────────
|
||||
|
||||
result_status({flow_done, R}) -> {done, R};
|
||||
result_status({flow_suspended, T}) -> {suspended, T}.
|
||||
|
||||
%% safe_drive/3 — flow:drive is pure (no blocking receive), so a `try`
|
||||
%% around it is safe in this runtime and isolates a flow whose step
|
||||
%% raises: the store returns {error, {flow_crashed, _}} instead of the
|
||||
%% gen_server crashing, keeping one bad flow from taking down others.
|
||||
safe_drive(Flow, Input, Log) ->
|
||||
try {ok, flow:drive(Flow, Input, Log)}
|
||||
catch
|
||||
throw:R -> {error, {throw, R}};
|
||||
error:R -> {error, {error, R}};
|
||||
exit:R -> {error, {exit, R}}
|
||||
end.
|
||||
|
||||
log_append([], Tag, Value) -> [{Tag, Value}];
|
||||
log_append([H | T], Tag, Value) -> [H | log_append(T, Tag, Value)].
|
||||
|
||||
find_keyed(_, []) -> not_found;
|
||||
find_keyed(K, [{K, V} | _]) -> {ok, V};
|
||||
find_keyed(K, [_ | Rest]) -> find_keyed(K, Rest).
|
||||
|
||||
set_keyed(K, V, []) -> [{K, V}];
|
||||
set_keyed(K, V, [{K, _} | Rest]) -> [{K, V} | Rest];
|
||||
set_keyed(K, V, [P | Rest]) -> [P | set_keyed(K, V, Rest)].
|
||||
81
next/flow/flows/blog_publish_digest.erl
Normal file
81
next/flow/flows/blog_publish_digest.erl
Normal file
@@ -0,0 +1,81 @@
|
||||
-module(blog_publish_digest).
|
||||
-export([build/1]).
|
||||
|
||||
%% A motivating multi-step business flow for the fed-sx-triggers e2e:
|
||||
%% when an Article is published, decide a batch policy by category,
|
||||
%% (for newsletters) wait until morning, fetch the author's followers,
|
||||
%% build a digest email for each, and emit a DigestSent activity — the
|
||||
%% flow's own output, which a driver appends, closing the loop so it can
|
||||
%% trigger downstream flows.
|
||||
%%
|
||||
%% Demonstrates: a branch on an activity field (:category), a timer
|
||||
%% suspension (flow:wait/1, resumed by advancing the clock), an injected
|
||||
%% effect (fetch_followers), and a follow-up activity emit.
|
||||
%%
|
||||
%% Effect-as-data: a flow runs inside flow_store's drive, where a
|
||||
%% blocking call (e.g. into nx_kernel) would deadlock this scheduler, so
|
||||
%% the flow does NOT perform IO itself. It DESCRIBES the effects in its
|
||||
%% result — {digest_sent, Emails, DigestActivityObject} — and the driver
|
||||
%% (the fan-out caller) dispatches the emails and appends the DigestSent
|
||||
%% activity. fetch_followers is injected (the one external read) as a
|
||||
%% pure function so the e2e can supply a deterministic list.
|
||||
%%
|
||||
%% Input env (from flow_dispatch): [{activity, A}, {actor, Actor}, ...].
|
||||
%% Result: {digest_sent, [Email], DigestObject} | skipped.
|
||||
|
||||
build(Effects) ->
|
||||
FetchFollowers = field(fetch_followers, Effects),
|
||||
flow_spec:branch(
|
||||
fun (Env) -> is_article(Env) end,
|
||||
flow_spec:branch(
|
||||
fun (Env) -> category_is(Env, newsletter) end,
|
||||
%% newsletter: hold until morning, then send + emit
|
||||
flow_spec:sequence([flow:wait(morning), send_emit(FetchFollowers)]),
|
||||
flow_spec:branch(
|
||||
fun (Env) -> category_is(Env, urgent) end,
|
||||
%% urgent: send + emit now (no wait)
|
||||
send_emit(FetchFollowers),
|
||||
%% any other category: skip
|
||||
flow_spec:flow_const(skipped))),
|
||||
%% not an Article: skip
|
||||
flow_spec:flow_const(skipped)).
|
||||
|
||||
%% send_emit(FetchFollowers) — the terminal step: build one digest email
|
||||
%% per follower and the DigestSent emit object. Pure given the injected
|
||||
%% follower list, so it is replay-safe (and it sits after the only
|
||||
%% suspend point, so it runs exactly once).
|
||||
send_emit(FetchFollowers) ->
|
||||
flow_spec:flow_node(
|
||||
fun (Env) ->
|
||||
Activity = env_activity(Env),
|
||||
Actor = env_actor(Env),
|
||||
ArtId = activity_id(Activity),
|
||||
Followers = FetchFollowers(Actor),
|
||||
Emails = [ [{to, F}, {article, ArtId}] || F <- Followers ],
|
||||
Digest = [{type, digest_sent},
|
||||
{for, ArtId},
|
||||
{follower_count, length(Followers)}],
|
||||
{digest_sent, Emails, Digest}
|
||||
end).
|
||||
|
||||
%% ── predicates / accessors ──────────────────────────────────────
|
||||
|
||||
is_article(Env) ->
|
||||
object_type(object_of(env_activity(Env))) =:= article.
|
||||
|
||||
category_is(Env, Cat) ->
|
||||
object_category(object_of(env_activity(Env))) =:= Cat.
|
||||
|
||||
env_activity(Env) -> field(activity, Env).
|
||||
env_actor(Env) -> field(actor, Env).
|
||||
|
||||
object_of(Activity) -> field(object, Activity).
|
||||
object_type(Obj) -> field(type, Obj).
|
||||
object_category(Obj) -> field(category, Obj).
|
||||
activity_id(Activity) -> field(id, Activity).
|
||||
|
||||
field(Key, Proplist) ->
|
||||
case envelope:get_field(Key, Proplist) of
|
||||
{ok, V} -> V;
|
||||
_ -> undefined
|
||||
end.
|
||||
33
next/genesis/activity-types/define_trigger.sx
Normal file
33
next/genesis/activity-types/define_trigger.sx
Normal file
@@ -0,0 +1,33 @@
|
||||
;; next/genesis/activity-types/define_trigger.sx
|
||||
;;
|
||||
;; Bootstrap definition of the DefineTrigger verb per
|
||||
;; plans/agent-briefings/fed-sx-triggers-loop.md (Phase 1) and
|
||||
;; plans/fed-sx-design.md §13. Read as data by the bundler
|
||||
;; (bootstrap.erl) — never evaluated as code.
|
||||
;;
|
||||
;; DefineTrigger binds an activity-type to a flow. When a matching
|
||||
;; activity is appended to the log, the kernel's trigger fan-out
|
||||
;; (pipeline.erl, post-append) looks the type up in the trigger
|
||||
;; registry and starts the named flow with the activity as input.
|
||||
;; The activity's :object is the binding record:
|
||||
;; {:activity-type "Create" ;; the verb to fire on
|
||||
;; :flow-name "blog-publish-digest"
|
||||
;; :guard <optional predicate> ;; discriminator
|
||||
;; :actor-scope <optional actor id>} ;; default: any
|
||||
;;
|
||||
;; The schema validates the *activity* shape: :object present with
|
||||
;; string :activity-type and :flow-name. The optional :guard lets one
|
||||
;; type bind to multiple flows with discriminators; it is resolved to
|
||||
;; an Erlang predicate at registration time (trigger_registry), not
|
||||
;; carried in the pure-predicate schema here. Schema bodies use nested
|
||||
;; `get` (not keyword-threading) so the predicate is evaluatable.
|
||||
(DefineActivity
|
||||
:name "DefineTrigger"
|
||||
:doc "Bind an activity-type to a flow. :object carries :activity-type, :flow-name, and optional :guard and :actor-scope."
|
||||
:schema (fn
|
||||
(act)
|
||||
(and
|
||||
(not (nil? (get act :object)))
|
||||
(string? (get (get act :object) :activity-type))
|
||||
(string? (get (get act :object) :flow-name))))
|
||||
:semantics (fn (state act) state))
|
||||
34
next/genesis/activity-types/define_type.sx
Normal file
34
next/genesis/activity-types/define_type.sx
Normal file
@@ -0,0 +1,34 @@
|
||||
;; next/genesis/activity-types/define_type.sx
|
||||
;;
|
||||
;; Bootstrap definition of the DefineType verb per
|
||||
;; plans/fed-sx-host-types.md (host-type federation, Phase 1).
|
||||
;; Read as data by the bundler (bootstrap.erl) — never evaluated as
|
||||
;; code. The :schema and :semantics bodies are SX source.
|
||||
;;
|
||||
;; DefineType declares a refinement type. The activity's :object is
|
||||
;; the type record:
|
||||
;; {:name "Post" ;; the type's display name
|
||||
;; :fields (...) ;; optional field descriptors
|
||||
;; :refinement-schema (fn (obj) ...) ;; predicate over instances
|
||||
;; :instance-type "Note"} ;; base object-type it refines
|
||||
;;
|
||||
;; The schema below validates the *activity* shape: :object present,
|
||||
;; :name a string, :fields (when present) a list. The richer
|
||||
;; per-field shape check and the registry registration land with the
|
||||
;; peer_types cache (Phase 2) — at this phase the form is pure data.
|
||||
;;
|
||||
;; Schema bodies use nested `get` rather than keyword-threading so
|
||||
;; the predicate is directly evaluatable (keywords are not callable
|
||||
;; getters in the kernel; `(-> d :k)` is not a get).
|
||||
(DefineActivity
|
||||
:name "DefineType"
|
||||
:doc "Declare a refinement type. :object carries :name, optional :fields, :refinement-schema, and :instance-type."
|
||||
:schema (fn
|
||||
(act)
|
||||
(and
|
||||
(not (nil? (get act :object)))
|
||||
(string? (get (get act :object) :name))
|
||||
(or
|
||||
(nil? (get (get act :object) :fields))
|
||||
(list? (get (get act :object) :fields)))))
|
||||
:semantics (fn (state act) state))
|
||||
31
next/genesis/activity-types/subtype_of.sx
Normal file
31
next/genesis/activity-types/subtype_of.sx
Normal file
@@ -0,0 +1,31 @@
|
||||
;; next/genesis/activity-types/subtype_of.sx
|
||||
;;
|
||||
;; Bootstrap definition of the SubtypeOf verb per
|
||||
;; plans/fed-sx-host-types.md (host-type federation, Phase 1).
|
||||
;; Read as data by the bundler (bootstrap.erl) — never evaluated as
|
||||
;; code. The :schema and :semantics bodies are SX source.
|
||||
;;
|
||||
;; SubtypeOf records a hierarchy edge between two previously-defined
|
||||
;; types. The activity's :object is the relation record:
|
||||
;; {:child-type-cid "bafy...child"
|
||||
;; :parent-type-cid "bafy...parent"}
|
||||
;;
|
||||
;; The schema validates the *activity* shape: both CIDs present and
|
||||
;; string-typed. Verifying that each CID names a previously-defined
|
||||
;; type is a registry concern (it needs the type index that lands
|
||||
;; with peer_types in Phase 2), so it is deliberately out of the
|
||||
;; pure-predicate schema here — adding the edge to the hierarchy
|
||||
;; index is the :semantics' job once the registry surface exists.
|
||||
;;
|
||||
;; Schema bodies use nested `get` rather than keyword-threading so
|
||||
;; the predicate is directly evaluatable.
|
||||
(DefineActivity
|
||||
:name "SubtypeOf"
|
||||
:doc "Record a subtype edge. :object carries :child-type-cid and :parent-type-cid, both type CIDs."
|
||||
:schema (fn
|
||||
(act)
|
||||
(and
|
||||
(not (nil? (get act :object)))
|
||||
(string? (get (get act :object) :child-type-cid))
|
||||
(string? (get (get act :object) :parent-type-cid))))
|
||||
:semantics (fn (state act) state))
|
||||
@@ -22,7 +22,10 @@
|
||||
"activity-types/update.sx"
|
||||
"activity-types/delete.sx"
|
||||
"activity-types/announce.sx"
|
||||
"activity-types/endorse.sx")
|
||||
"activity-types/endorse.sx"
|
||||
"activity-types/define_type.sx"
|
||||
"activity-types/subtype_of.sx"
|
||||
"activity-types/define_trigger.sx")
|
||||
:object-types ("object-types/sx-artifact.sx"
|
||||
"object-types/note.sx"
|
||||
"object-types/tombstone.sx"
|
||||
|
||||
118
next/kernel/discovery_type_fetch.erl
Normal file
118
next/kernel/discovery_type_fetch.erl
Normal file
@@ -0,0 +1,118 @@
|
||||
-module(discovery_type_fetch).
|
||||
-export([make_fetch_fn/0, make_fetch_fn/1,
|
||||
fetch/2,
|
||||
type_doc_url/2,
|
||||
resolve_type_url/2,
|
||||
accept_header/0]).
|
||||
|
||||
%% Live type-doc fetch for peer_types — host-type federation Step 3,
|
||||
%% the sibling of discovery_fetch.erl. peer_types:lookup_or_fetch/3
|
||||
%% calls a Cfg-supplied type_fetch_fn :: fun ((TypeCid, Cfg) -> {ok,
|
||||
%% Bytes} | {error, _}) on a cache miss; this module produces that
|
||||
%% closure for live federation. It GETs <base>/types/<cid> with an
|
||||
%% Accept header that asks for the type-doc format (http_server.erl
|
||||
%% Step 3) and returns the RAW response bytes — peer_types decodes
|
||||
%% them via term_codec into the TypeRecord. (This is the one shape
|
||||
%% difference from discovery_fetch, whose closure returns an already-
|
||||
%% decoded actor-state: there the cache stores the decoded AS, here
|
||||
%% peer_types owns the decode so the type-doc wire format lives in one
|
||||
%% place — the /types/ route encodes, peer_types decodes.)
|
||||
%%
|
||||
%% Cfg shape (parallels discovery_fetch's peer URL resolution):
|
||||
%% {type_url, [{TypeCid, BaseUrl}, ...]}
|
||||
%% {type_url_fn, fun ((TypeCid) -> {ok, BaseUrl} | not_found)}
|
||||
%%
|
||||
%% BaseUrl shape: <<"http://host:port">> (no trailing slash; this
|
||||
%% module appends the path). TypeCid is the type's CID bytes.
|
||||
%%
|
||||
%% Outcomes:
|
||||
%% 2xx -> {ok, Bytes}
|
||||
%% non-2xx -> {error, {status, N}}
|
||||
%% resolver miss -> {error, no_type_url}
|
||||
%% transport -> {error, Reason}
|
||||
|
||||
%% ── Accept header ────────────────────────────────────────────
|
||||
%% "application/vnd.fed-sx.type-doc" — same MIME http_server's
|
||||
%% content_type_for(type_doc) emits, so the Accept negotiation routes
|
||||
%% the served bytes to the term_codec-encoded TypeRecord arm.
|
||||
accept_header() ->
|
||||
<<97,112,112,108,105,99,97,116,105,111,110,47,
|
||||
118,110,100,46,102,101,100,45,115,120,46,
|
||||
116,121,112,101,45,100,111,99>>.
|
||||
|
||||
%% ── public API ───────────────────────────────────────────────
|
||||
|
||||
%% make_fetch_fn/0 — the fun/2 peer_types:lookup_or_fetch calls. It
|
||||
%% reads the type-URL resolver out of the Cfg passed at call time, so
|
||||
%% the same Cfg threads through peer_types and this closure.
|
||||
make_fetch_fn() ->
|
||||
fun (TypeCid, Cfg) ->
|
||||
case resolve_type_url(TypeCid, Cfg) of
|
||||
{error, R} -> {error, R};
|
||||
{ok, BaseUrl} -> fetch(type_doc_url(BaseUrl, TypeCid), Cfg)
|
||||
end
|
||||
end.
|
||||
|
||||
%% make_fetch_fn/1 — variant that closes over a static Cfg for the
|
||||
%% resolver while still honouring the call-time Cfg for transport.
|
||||
%% Lets a caller bake the type_url map once and reuse the closure.
|
||||
make_fetch_fn(StaticCfg) ->
|
||||
fun (TypeCid, Cfg) ->
|
||||
case resolve_type_url(TypeCid, StaticCfg) of
|
||||
{error, R} -> {error, R};
|
||||
{ok, BaseUrl} -> fetch(type_doc_url(BaseUrl, TypeCid), Cfg)
|
||||
end
|
||||
end.
|
||||
|
||||
fetch(Url, _Cfg) ->
|
||||
AcceptKey = <<97,99,99,101,112,116>>, % "accept"
|
||||
Headers = [{AcceptKey, accept_header()}],
|
||||
try httpc:request(Url, get, Headers, <<>>) of
|
||||
{ok, Status, _H, Body} when Status >= 200, Status < 300 ->
|
||||
{ok, Body};
|
||||
{ok, Status, _H, _B} ->
|
||||
{error, {status, Status}};
|
||||
Other ->
|
||||
{error, {bad_response, Other}}
|
||||
catch
|
||||
error:Reason -> {error, Reason}
|
||||
end.
|
||||
|
||||
%% type_doc_url/2 — <BaseUrl>/types/<cid>. TypeCid is the cid bytes,
|
||||
%% appended verbatim as the path segment (matches the "/types/" prefix
|
||||
%% http_server.erl registers).
|
||||
type_doc_url(BaseUrl, TypeCid) when is_binary(TypeCid) ->
|
||||
%% "/types/" — 7 bytes
|
||||
Prefix = <<47,116,121,112,101,115,47>>,
|
||||
<<BaseUrl/binary, Prefix/binary, TypeCid/binary>>.
|
||||
|
||||
%% resolve_type_url/2 — map a TypeCid to its serving node's base URL.
|
||||
%% type_url_fn (a 1-arity closure) takes precedence over the static
|
||||
%% type_url proplist; absent both -> {error, no_type_url}.
|
||||
resolve_type_url(TypeCid, Cfg) ->
|
||||
case field(type_url_fn, Cfg) of
|
||||
Fn when is_function(Fn, 1) ->
|
||||
case Fn(TypeCid) of
|
||||
{ok, BaseUrl} -> {ok, BaseUrl};
|
||||
_ -> {error, no_type_url}
|
||||
end;
|
||||
_ ->
|
||||
case field(type_url, Cfg) of
|
||||
nil -> {error, no_type_url};
|
||||
Map ->
|
||||
case find_keyed(TypeCid, Map) of
|
||||
{ok, BaseUrl} -> {ok, BaseUrl};
|
||||
_ -> {error, no_type_url}
|
||||
end
|
||||
end
|
||||
end.
|
||||
|
||||
%% ── helpers ──────────────────────────────────────────────────
|
||||
|
||||
field(K, [{K, V} | _]) -> V;
|
||||
field(K, [_ | Rest]) -> field(K, Rest);
|
||||
field(_, []) -> nil.
|
||||
|
||||
find_keyed(_, []) -> {error, not_found};
|
||||
find_keyed(K, [{K, V} | _]) -> {ok, V};
|
||||
find_keyed(K, [_ | Rest]) -> find_keyed(K, Rest).
|
||||
76
next/kernel/flow_dispatch.erl
Normal file
76
next/kernel/flow_dispatch.erl
Normal file
@@ -0,0 +1,76 @@
|
||||
-module(flow_dispatch).
|
||||
-export([start/4, guard_passes/3]).
|
||||
|
||||
%% Bridge from "an activity matched a trigger" to "a flow started with
|
||||
%% that activity as input" (fed-sx-triggers Phase 3). A NATIVE call into
|
||||
%% next/flow (flow_store) — the engine is Erlang-on-SX too, so there is
|
||||
%% no cross-guest FFI: the kernel and the workflow engine share one
|
||||
%% runtime.
|
||||
%%
|
||||
%% start(Spec, Activity, ActorState, Cfg)
|
||||
%% -> {ok, FlowId, {ActivityCid, TriggerCid, FlowId}} (audit triple)
|
||||
%% | {error, Reason}
|
||||
%%
|
||||
%% The flow named in Spec is started with the activity bound into its
|
||||
%% input environment, so flow steps can read the activity, the actor id,
|
||||
%% and the trigger cid (the audit chain). Flow-start failures — an
|
||||
%% unknown flow name, or a crashing first step (flow_store isolates the
|
||||
%% raise) — come back as {error, Reason}, never raised, so the fan-out
|
||||
%% caller is insulated from one flow's failure.
|
||||
|
||||
start(Spec, Activity, ActorState, _Cfg) ->
|
||||
FlowName = trigger_registry:spec_flow_name(Spec),
|
||||
TriggerCid = trigger_registry:spec_cid(Spec),
|
||||
ActivityCid = activity_cid(Activity),
|
||||
Input = [{activity, Activity},
|
||||
{actor, actor_id_of(ActorState, Activity)},
|
||||
{trigger_cid, TriggerCid}],
|
||||
case flow_store:start(FlowName, Input) of
|
||||
{ok, FlowId, _Result} ->
|
||||
{ok, FlowId, {ActivityCid, TriggerCid, FlowId}};
|
||||
{error, Reason} ->
|
||||
{error, Reason}
|
||||
end.
|
||||
|
||||
%% guard_passes(Spec, Activity, ActorState) — a spec fires when its
|
||||
%% actor-scope admits the activity's actor AND its guard (if any)
|
||||
%% returns true. An `any` scope and an `undefined` guard always pass;
|
||||
%% the guard lets one activity-type bind multiple flows with
|
||||
%% discriminators.
|
||||
guard_passes(Spec, Activity, ActorState) ->
|
||||
scope_ok(trigger_registry:spec_actor_scope(Spec), Activity) andalso
|
||||
guard_ok(trigger_registry:spec_guard(Spec), Activity, ActorState).
|
||||
|
||||
scope_ok(any, _Activity) -> true;
|
||||
scope_ok(Scope, Activity) ->
|
||||
case envelope:get_field(actor, Activity) of
|
||||
{ok, Scope} -> true;
|
||||
_ -> false
|
||||
end.
|
||||
|
||||
guard_ok(undefined, _Activity, _ActorState) -> true;
|
||||
guard_ok(Guard, Activity, ActorState) when is_function(Guard, 2) ->
|
||||
Guard(Activity, ActorState);
|
||||
guard_ok(_, _, _) -> false.
|
||||
|
||||
%% ── helpers ─────────────────────────────────────────────────────
|
||||
|
||||
activity_cid(Activity) ->
|
||||
case envelope:get_field(id, Activity) of
|
||||
{ok, Cid} -> Cid;
|
||||
_ -> undefined
|
||||
end.
|
||||
|
||||
%% actor_id_of/2 — prefer the receiving actor's id (ActorState carries
|
||||
%% {actor_id, _}); fall back to the activity's :actor. Reading
|
||||
%% ActorState as a proplist keeps this decoupled from actor_state's
|
||||
%% internal shape and testable with a plain [{actor_id, _}] stand-in.
|
||||
actor_id_of(ActorState, Activity) ->
|
||||
case envelope:get_field(actor_id, ActorState) of
|
||||
{ok, Id} -> Id;
|
||||
_ ->
|
||||
case envelope:get_field(actor, Activity) of
|
||||
{ok, A} -> A;
|
||||
_ -> undefined
|
||||
end
|
||||
end.
|
||||
@@ -4,6 +4,7 @@
|
||||
welcome_body/0, capabilities_body/0,
|
||||
capabilities_path/0,
|
||||
match_prefix/2, actors_prefix/0, actor_doc_response/1,
|
||||
types_prefix/0, type_doc_response_for/2,
|
||||
artifacts_prefix/0, artifact_response/1,
|
||||
projections_list_path/0, projections_prefix/0,
|
||||
projections_list_response/0, projection_response/1,
|
||||
@@ -156,7 +157,12 @@ dispatch(<<71, 69, 84>>, Path, F, Cfg) ->
|
||||
{ok, Name} when byte_size(Name) > 0 ->
|
||||
projection_response_for(Name, F);
|
||||
_ ->
|
||||
not_found_response()
|
||||
case match_prefix(types_prefix(), Path) of
|
||||
{ok, Cid} when byte_size(Cid) > 0 ->
|
||||
type_doc_response_for(Cid, Cfg);
|
||||
_ ->
|
||||
not_found_response()
|
||||
end
|
||||
end
|
||||
end
|
||||
end;
|
||||
@@ -289,6 +295,10 @@ artifact_response(Cid) ->
|
||||
Body = <<Pre/binary, Cid/binary, 10>>,
|
||||
ok_response(Body).
|
||||
|
||||
%% "/types/" — 7 bytes: 47 116 121 112 101 115 47 (host-type fed Step 3)
|
||||
types_prefix() ->
|
||||
<<47,116,121,112,101,115,47>>.
|
||||
|
||||
%% "/projections" — 12 bytes (no trailing slash; the list endpoint)
|
||||
projections_list_path() ->
|
||||
<<47,112,114,111,106,101,99,116,105,111,110,115>>.
|
||||
@@ -488,9 +498,20 @@ actor_doc_prefix() ->
|
||||
118,110,100,46,102,101,100,45,115,120,46,
|
||||
97,99,116,111,114,45,100,111,99>>.
|
||||
|
||||
%% "application/vnd.fed-sx.type-doc" — 31 bytes (host-type fed Step 3).
|
||||
%% Distinct from actor-doc: the body is a term_codec-encoded
|
||||
%% TypeRecord (peer_types cache entry), not a peer-actor-state.
|
||||
type_doc_prefix() ->
|
||||
<<97,112,112,108,105,99,97,116,105,111,110,47,
|
||||
118,110,100,46,102,101,100,45,115,120,46,
|
||||
116,121,112,101,45,100,111,99>>.
|
||||
|
||||
accept_format(nil) -> text;
|
||||
accept_format(<<>>) -> text;
|
||||
accept_format(V) when is_binary(V) ->
|
||||
case match_prefix(type_doc_prefix(), V) of
|
||||
{ok, _} -> type_doc;
|
||||
_ ->
|
||||
case match_prefix(actor_doc_prefix(), V) of
|
||||
{ok, _} -> actor_doc;
|
||||
_ ->
|
||||
@@ -510,6 +531,7 @@ accept_format(V) when is_binary(V) ->
|
||||
end
|
||||
end
|
||||
end
|
||||
end
|
||||
end;
|
||||
accept_format(_) -> text.
|
||||
|
||||
@@ -586,6 +608,11 @@ content_type_for(actor_doc) ->
|
||||
<<97,112,112,108,105,99,97,116,105,111,110,47,
|
||||
118,110,100,46,102,101,100,45,115,120,46,
|
||||
97,99,116,111,114,45,100,111,99>>;
|
||||
%% "application/vnd.fed-sx.type-doc" — 31 bytes (host-type fed Step 3).
|
||||
content_type_for(type_doc) ->
|
||||
<<97,112,112,108,105,99,97,116,105,111,110,47,
|
||||
118,110,100,46,102,101,100,45,115,120,46,
|
||||
116,121,112,101,45,100,111,99>>;
|
||||
content_type_for(_) ->
|
||||
content_type_for(text).
|
||||
|
||||
@@ -714,6 +741,42 @@ kernel_actor_state(_Kernel, Id) ->
|
||||
_ -> nil
|
||||
end.
|
||||
|
||||
%% ── host-type fed Step 3: GET /types/<cid> ──────────────────────
|
||||
%%
|
||||
%% Serves a TypeRecord the node has cached (its own published types or
|
||||
%% types fetched from peers) so a federated peer running
|
||||
%% discovery_type_fetch can decode it directly into the shape
|
||||
%% peer_types + the object-schema pipeline stage consume. The wire
|
||||
%% body is term_codec:encode(TypeRecord) under the
|
||||
%% application/vnd.fed-sx.type-doc content type; a cache miss is a 404.
|
||||
%%
|
||||
%% Cid is the path segment after "/types/" (the type's CID bytes). Cfg
|
||||
%% carries `{peer_types, peer_types}` to opt the route into the cache —
|
||||
%% absent (or the gen_server down) short-circuits to 404, matching the
|
||||
%% kernel_actor_state guard for the actor-doc route. This port can't
|
||||
%% dispatch `Mod:Fun` on a variable module, so the registered
|
||||
%% `peer_types` atom is hardcoded; the Cfg field flags "no cache wired".
|
||||
|
||||
type_doc_response_for(Cid, Cfg) ->
|
||||
case type_record_for(Cfg, Cid) of
|
||||
nil -> not_found_response();
|
||||
TR -> ok_response(term_codec:encode(TR), type_doc)
|
||||
end.
|
||||
|
||||
type_record_for(Cfg, Cid) ->
|
||||
case field(peer_types, Cfg) of
|
||||
nil -> nil;
|
||||
_ ->
|
||||
case erlang:whereis(peer_types) of
|
||||
undefined -> nil;
|
||||
_ ->
|
||||
case peer_types:lookup(Cid) of
|
||||
{ok, TR} -> TR;
|
||||
_ -> nil
|
||||
end
|
||||
end
|
||||
end.
|
||||
|
||||
%% ── Step 4a: per-actor sub-resource stubs ──────────────────────
|
||||
%% Per design §16.1 each actor has /outbox /inbox /followers
|
||||
%% /following routes. v1 returns text-stub bodies so route resolution
|
||||
|
||||
180
next/kernel/peer_types.erl
Normal file
180
next/kernel/peer_types.erl
Normal file
@@ -0,0 +1,180 @@
|
||||
-module(peer_types).
|
||||
-export([new/0, lookup/2, store/3, evict/2, types/1,
|
||||
lookup_or_fetch/3, decode_type_doc/1,
|
||||
start_link/0, start_link/1, stop/0,
|
||||
put/2, lookup/1, state_for/1, known_types/0,
|
||||
lookup_or_fetch/2, evict/1]).
|
||||
-export([init/1, handle_call/3, handle_cast/2, handle_info/2]).
|
||||
-behaviour(gen_server).
|
||||
|
||||
%% Peer-types cache — receiver-side mirror of peer_actors.erl, for
|
||||
%% host-type federation (plans/fed-sx-host-types.md, Phase 2). When an
|
||||
%% inbound activity references a refinement type the local node hasn't
|
||||
%% seen, the object-schema validation stage (Phase 4) needs that
|
||||
%% type's record — its :refinement-schema and field shape — to vet the
|
||||
%% inner object. Re-fetching the type doc on every inbound would be
|
||||
%% wasteful, so we cache the TypeRecord keyed by its content-address.
|
||||
%%
|
||||
%% State shape (pure-functional):
|
||||
%% [{TypeCidBytes, TypeRecord}, ...]
|
||||
%%
|
||||
%% TypeCidBytes is the type's CID (a binary). TypeRecord is the parsed
|
||||
%% DefineType envelope's :object payload — a proplist carrying :name,
|
||||
%% :fields, :refinement-schema, :instance-type. Refinement schemas are
|
||||
%% immutable per CID (an updated type is a new CID), so cache entries
|
||||
%% never go stale — TTL-free, like peer_actors' v2 entries.
|
||||
%%
|
||||
%% lookup_or_fetch is the load-bearing entry point: a miss invokes a
|
||||
%% Cfg-supplied closure to fetch the type doc over the wire. Per the
|
||||
%% design the closure has shape
|
||||
%% type_fetch_fn :: fun ((TypeCid, Cfg) -> {ok, Bytes} | {error, _})
|
||||
%% returning the term_codec-encoded type-doc bytes; lookup_or_fetch
|
||||
%% decodes them into the TypeRecord and caches it. Keeping the
|
||||
%% transport in the closure (Phase 3's discovery_type_fetch) keeps
|
||||
%% peer_types testable with a mocked fetch — same split as
|
||||
%% peer_actors / discovery_fetch.
|
||||
%%
|
||||
%% gen_server wrapper registers under the atom `peer_types` so the
|
||||
%% pipeline + http_server handlers can reach it without threading a
|
||||
%% Pid through Cfg.
|
||||
|
||||
%% ── Pure-functional API ─────────────────────────────────────────
|
||||
|
||||
new() -> [].
|
||||
|
||||
lookup(TypeCid, State) ->
|
||||
case find_keyed(TypeCid, State) of
|
||||
{ok, TR} -> {ok, TR};
|
||||
{error, _} -> not_found
|
||||
end.
|
||||
|
||||
store(TypeCid, TR, State) ->
|
||||
set_keyed(TypeCid, TR, State).
|
||||
|
||||
evict(TypeCid, State) ->
|
||||
delete_keyed(TypeCid, State).
|
||||
|
||||
types(State) -> [Cid || {Cid, _TR} <- State].
|
||||
|
||||
%% lookup_or_fetch/3 — cache hit returns {ok, TR, State} unchanged.
|
||||
%% Cache miss pulls the type_fetch_fn out of Cfg and calls it with
|
||||
%% (TypeCid, Cfg); a {ok, Bytes} reply is decoded via term_codec into
|
||||
%% the TypeRecord, which is then stored. Failures (no fn, fetch error,
|
||||
%% bad bytes) do NOT poison the cache so the caller can retry.
|
||||
%%
|
||||
%% no type_fetch_fn in Cfg -> {error, no_fetch_fn, State}
|
||||
%% fn -> {ok, Bytes}, decodable -> {ok, TR, store(...)}
|
||||
%% fn -> {ok, Bytes}, bad bytes -> {error, bad_type_doc, State}
|
||||
%% fn -> {error, Reason} -> {error, Reason, State}
|
||||
%% fn -> Other -> {error, {bad_fetch_return, Other}, State}
|
||||
|
||||
lookup_or_fetch(TypeCid, Cfg, State) ->
|
||||
case find_keyed(TypeCid, State) of
|
||||
{ok, TR} -> {ok, TR, State};
|
||||
{error, _} -> fetch_and_store(TypeCid, Cfg, State)
|
||||
end.
|
||||
|
||||
fetch_and_store(TypeCid, Cfg, State) ->
|
||||
case field(type_fetch_fn, Cfg) of
|
||||
nil -> {error, no_fetch_fn, State};
|
||||
Fn when is_function(Fn, 2) ->
|
||||
case Fn(TypeCid, Cfg) of
|
||||
{ok, Bytes} ->
|
||||
case decode_type_doc(Bytes) of
|
||||
{ok, TR} -> {ok, TR, store(TypeCid, TR, State)};
|
||||
{error, R} -> {error, R, State}
|
||||
end;
|
||||
{error, Reason} -> {error, Reason, State};
|
||||
Other -> {error, {bad_fetch_return, Other}, State}
|
||||
end;
|
||||
_ -> {error, bad_fetch_fn_cfg, State}
|
||||
end.
|
||||
|
||||
%% decode_type_doc/1 — round the wire body back through term_codec.
|
||||
%% The on-wire form is term_codec:encode(TypeRecord) (Phase 3's
|
||||
%% /types/<cid> route), so a clean decode yields the proplist TR.
|
||||
decode_type_doc(Bytes) ->
|
||||
case term_codec:decode(Bytes) of
|
||||
{ok, TR, _} when is_list(TR) -> {ok, TR};
|
||||
_ -> {error, bad_type_doc}
|
||||
end.
|
||||
|
||||
%% ── gen_server wrapper ──────────────────────────────────────────
|
||||
|
||||
start_link() ->
|
||||
start_link([]).
|
||||
|
||||
start_link(InitialState) ->
|
||||
Pid = gen_server:start_link(peer_types, [InitialState]),
|
||||
erlang:register(peer_types, Pid),
|
||||
Pid.
|
||||
|
||||
stop() ->
|
||||
R = gen_server:call(peer_types, '$gen_stop'),
|
||||
erlang:unregister(peer_types),
|
||||
R.
|
||||
|
||||
%% put/2 — store a TypeRecord under its CID. Mirrors store_srv.
|
||||
put(TypeCid, TR) ->
|
||||
gen_server:call(peer_types, {put, TypeCid, TR}).
|
||||
|
||||
%% lookup/1 — cache read. {ok, TR} | not_found.
|
||||
lookup(TypeCid) ->
|
||||
gen_server:call(peer_types, {lookup, TypeCid}).
|
||||
|
||||
%% state_for/1 — alias of lookup/1, named to match peer_actors'
|
||||
%% state_for accessor used by http_server's kernel bridge.
|
||||
state_for(TypeCid) ->
|
||||
gen_server:call(peer_types, {lookup, TypeCid}).
|
||||
|
||||
known_types() ->
|
||||
gen_server:call(peer_types, get_types).
|
||||
|
||||
evict(TypeCid) ->
|
||||
gen_server:call(peer_types, {evict, TypeCid}).
|
||||
|
||||
%% lookup_or_fetch/2 — gen_server form. Cfg carries the type_fetch_fn.
|
||||
%% Reply is {ok, TR} on hit-or-fetched, {error, Reason} otherwise.
|
||||
lookup_or_fetch(TypeCid, Cfg) ->
|
||||
gen_server:call(peer_types, {lookup_or_fetch, TypeCid, Cfg}).
|
||||
|
||||
%% gen_server callbacks
|
||||
|
||||
init([InitialState]) ->
|
||||
{ok, InitialState}.
|
||||
|
||||
handle_call({put, TypeCid, TR}, _From, State) ->
|
||||
{reply, ok, store(TypeCid, TR, State)};
|
||||
handle_call({lookup, TypeCid}, _From, State) ->
|
||||
{reply, lookup(TypeCid, State), State};
|
||||
handle_call({lookup_or_fetch, TypeCid, Cfg}, _From, State) ->
|
||||
case lookup_or_fetch(TypeCid, Cfg, State) of
|
||||
{ok, TR, NewState} -> {reply, {ok, TR}, NewState};
|
||||
{error, Reason, Same} -> {reply, {error, Reason}, Same}
|
||||
end;
|
||||
handle_call(get_types, _From, State) ->
|
||||
{reply, types(State), State};
|
||||
handle_call({evict, TypeCid}, _From, State) ->
|
||||
{reply, ok, evict(TypeCid, State)}.
|
||||
|
||||
handle_cast(_, S) -> {noreply, S}.
|
||||
|
||||
handle_info(_, S) -> {noreply, S}.
|
||||
|
||||
%% ── Internal helpers ────────────────────────────────────────────
|
||||
|
||||
field(K, [{K, V} | _]) -> V;
|
||||
field(K, [_ | Rest]) -> field(K, Rest);
|
||||
field(_, []) -> nil.
|
||||
|
||||
find_keyed(_, []) -> {error, not_found};
|
||||
find_keyed(K, [{K, V} | _]) -> {ok, V};
|
||||
find_keyed(K, [_ | Rest]) -> find_keyed(K, Rest).
|
||||
|
||||
set_keyed(K, V, []) -> [{K, V}];
|
||||
set_keyed(K, V, [{K, _} | Rest]) -> [{K, V} | Rest];
|
||||
set_keyed(K, V, [P | Rest]) -> [P | set_keyed(K, V, Rest)].
|
||||
|
||||
delete_keyed(_, []) -> [];
|
||||
delete_keyed(K, [{K, _} | Rest]) -> Rest;
|
||||
delete_keyed(K, [P | Rest]) -> [P | delete_keyed(K, Rest)].
|
||||
@@ -6,7 +6,9 @@
|
||||
stage_envelope/1,
|
||||
stage_signature/1, stage_signature/2,
|
||||
stage_replay/1, stage_replay/2,
|
||||
stage_schema/1, stage_schema/2]).
|
||||
stage_schema/1, stage_schema/2,
|
||||
apply_object_schema/2, stage_object_schema/1,
|
||||
apply_triggers/3]).
|
||||
|
||||
%% Validation pipeline per design §14.
|
||||
%%
|
||||
@@ -165,3 +167,233 @@ check_object_schema(Activity, SchemaFn) ->
|
||||
|
||||
stage_schema(SchemaLookup) ->
|
||||
fun (Activity) -> stage_schema(Activity, SchemaLookup) end.
|
||||
|
||||
%% ── host-type fed Step 4: object-schema validation stage ────────
|
||||
%%
|
||||
%% apply_object_schema/2 — when an inbound activity's :object declares
|
||||
%% a refinement type ({type, TypeName} on the object), resolve that
|
||||
%% type's record and apply its refinement schema to the object's
|
||||
%% :field_values. Sits between activity-type (stage_schema) validation
|
||||
%% and the kernel append; rejects the activity on schema-fail.
|
||||
%%
|
||||
%% Resolution mirrors the design note: TypeName -> TypeCid via Cfg's
|
||||
%% `type_index` ([{TypeName, TypeCid}, ...], the local Define-name
|
||||
%% index), then TypeCid -> TypeRecord via peer_types:lookup_or_fetch/2
|
||||
%% (a local cache hit, or a wire fetch through the Cfg type_fetch_fn).
|
||||
%%
|
||||
%% Outcomes:
|
||||
%% object has no {type, _} -> ok (no schema applies)
|
||||
%% TypeName not in type_index -> ok (undeclared type;
|
||||
%% open-world default)
|
||||
%% record resolved, schema passes -> ok
|
||||
%% record resolved, schema fails -> {error, {validation_failed,
|
||||
%% object_schema}}
|
||||
%% record unresolvable (cache miss + -> strict_object_schema:
|
||||
%% fetch failure / no peer_types) true -> {error, ...}
|
||||
%% false -> ok (skipped)
|
||||
%%
|
||||
%% Default strict_object_schema = false: a node only blocks on an
|
||||
%% unresolvable type when it opts into airtight validation via Cfg
|
||||
%% {strict_object_schema, true}. The non-strict skip is where a
|
||||
%% `validation_skipped` log entry belongs (left to the caller's logger
|
||||
%% so this stage keeps the ok | {error, _} contract run_stages wants).
|
||||
%%
|
||||
%% A TypeRecord's refinement schema is either a 1-arity Erlang
|
||||
%% predicate over the field-values (the substrate stand-in, for
|
||||
%% locally-defined types) or a data constraint {required, [Field, ...]}
|
||||
%% (term_codec-safe, so a wire-fetched TypeRecord can still validate).
|
||||
|
||||
apply_object_schema(Activity, Cfg) ->
|
||||
case object_type_name(Activity) of
|
||||
none -> ok;
|
||||
{ok, TypeName} ->
|
||||
case type_cid_for(TypeName, Cfg) of
|
||||
none -> ok;
|
||||
{ok, TypeCid} ->
|
||||
case resolve_type_record(TypeCid, Cfg) of
|
||||
{ok, TR} -> check_object_against(Activity, TR);
|
||||
{error, _} -> on_unresolved_type(Cfg)
|
||||
end
|
||||
end
|
||||
end.
|
||||
|
||||
stage_object_schema(Cfg) ->
|
||||
fun (Activity) -> apply_object_schema(Activity, Cfg) end.
|
||||
|
||||
object_type_name(Activity) ->
|
||||
case envelope:get_field(object, Activity) of
|
||||
{ok, Obj} when is_list(Obj) ->
|
||||
case envelope:get_field(type, Obj) of
|
||||
{ok, T} -> {ok, T};
|
||||
_ -> none
|
||||
end;
|
||||
_ -> none
|
||||
end.
|
||||
|
||||
object_field_values(Activity) ->
|
||||
case envelope:get_field(object, Activity) of
|
||||
{ok, Obj} when is_list(Obj) ->
|
||||
case envelope:get_field(field_values, Obj) of
|
||||
{ok, FV} -> FV;
|
||||
_ -> []
|
||||
end;
|
||||
_ -> []
|
||||
end.
|
||||
|
||||
type_cid_for(TypeName, Cfg) ->
|
||||
case stage_field(type_index, Cfg) of
|
||||
nil -> none;
|
||||
Index ->
|
||||
case find_keyed(TypeName, Index) of
|
||||
{ok, Cid} -> {ok, Cid};
|
||||
_ -> none
|
||||
end
|
||||
end.
|
||||
|
||||
resolve_type_record(TypeCid, Cfg) ->
|
||||
case stage_field(peer_types, Cfg) of
|
||||
nil -> {error, no_peer_types};
|
||||
_ ->
|
||||
case erlang:whereis(peer_types) of
|
||||
undefined -> {error, peer_types_down};
|
||||
_ -> peer_types:lookup_or_fetch(TypeCid, Cfg)
|
||||
end
|
||||
end.
|
||||
|
||||
on_unresolved_type(Cfg) ->
|
||||
case stage_field(strict_object_schema, Cfg) of
|
||||
true -> {error, {validation_failed, object_schema}};
|
||||
_ -> ok
|
||||
end.
|
||||
|
||||
check_object_against(Activity, TR) ->
|
||||
case stage_field(refinement_schema, TR) of
|
||||
nil -> ok;
|
||||
Schema -> apply_refinement(Schema, object_field_values(Activity))
|
||||
end.
|
||||
|
||||
apply_refinement(Fn, FieldValues) when is_function(Fn, 1) ->
|
||||
case Fn(FieldValues) of
|
||||
true -> ok;
|
||||
_ -> {error, {validation_failed, object_schema}}
|
||||
end;
|
||||
apply_refinement({required, Fields}, FieldValues) ->
|
||||
case all_present(Fields, FieldValues) of
|
||||
true -> ok;
|
||||
false -> {error, {validation_failed, object_schema}}
|
||||
end;
|
||||
apply_refinement(_, _) -> ok.
|
||||
|
||||
all_present([], _) -> true;
|
||||
all_present([F | Rest], FV) ->
|
||||
case has_key(F, FV) of
|
||||
true -> all_present(Rest, FV);
|
||||
false -> false
|
||||
end.
|
||||
|
||||
has_key(_, []) -> false;
|
||||
has_key(K, [{K, _} | _]) -> true;
|
||||
has_key(K, [_ | Rest]) -> has_key(K, Rest).
|
||||
|
||||
stage_field(K, [{K, V} | _]) -> V;
|
||||
stage_field(K, [_ | Rest]) -> stage_field(K, Rest);
|
||||
stage_field(_, []) -> nil.
|
||||
|
||||
find_keyed(_, []) -> {error, not_found};
|
||||
find_keyed(K, [{K, V} | _]) -> {ok, V};
|
||||
find_keyed(K, [_ | Rest]) -> find_keyed(K, Rest).
|
||||
|
||||
%% ── fed-sx triggers Step 2: post-append fan-out ─────────────────
|
||||
%%
|
||||
%% apply_triggers/3 — fires the durable flows bound to an activity's
|
||||
%% type AFTER it has been accepted and appended (rejected activities
|
||||
%% never reach here, so a flow only runs for an activity that really
|
||||
%% landed). For each spec the activity's type is bound to, the spec
|
||||
%% must pass its guard/actor-scope, and its {ActivityCid, TriggerCid}
|
||||
%% pair must not already have fired (federation can deliver the same
|
||||
%% activity twice via different peers — dedup is keyed on that pair,
|
||||
%% read from the receiving actor's :triggers_fired). Surviving specs are
|
||||
%% dispatched via flow_dispatch:start (a native flow_store:start), which
|
||||
%% never raises.
|
||||
%%
|
||||
%% Returns {ok, Results} where Results is one
|
||||
%% {ActivityCid, TriggerCid, {ok, FlowId} | {error, Reason}}
|
||||
%% per spec actually dispatched (guard-passed, not a duplicate). The
|
||||
%% kernel folds the {ActivityCid, TriggerCid} pairs into the actor's
|
||||
%% :triggers_fired (dedup) and the audit triples into its projection.
|
||||
%% No matching/ready registry yields {ok, []}.
|
||||
%%
|
||||
%% Cfg gates the fan-out on {trigger_registry, trigger_registry} (the
|
||||
%% registered gen_server), mirroring the object-schema stage's
|
||||
%% {peer_types, _} gate. apply_triggers must NOT be called inside a
|
||||
%% `try` — flow_dispatch does gen_server:calls, and a blocking call
|
||||
%% inside a try deadlocks this scheduler; the fan-out runs after append,
|
||||
%% in its own step, so this is naturally satisfied.
|
||||
|
||||
apply_triggers(Activity, ActorState, Cfg) ->
|
||||
case trigger_registry_ready(Cfg) of
|
||||
false -> {ok, []};
|
||||
true ->
|
||||
Type = activity_type_of(Activity),
|
||||
Specs = trigger_registry:lookup(Type),
|
||||
ActCid = trigger_activity_cid(Activity),
|
||||
Fired = field_or_default(triggers_fired, ActorState, []),
|
||||
fire_each(Specs, Activity, ActorState, ActCid, Fired, Cfg, [])
|
||||
end.
|
||||
|
||||
trigger_registry_ready(Cfg) ->
|
||||
case stage_field(trigger_registry, Cfg) of
|
||||
nil -> false;
|
||||
_ ->
|
||||
case erlang:whereis(trigger_registry) of
|
||||
undefined -> false;
|
||||
_ -> true
|
||||
end
|
||||
end.
|
||||
|
||||
fire_each([], _A, _AS, _ACid, _Fired, _Cfg, Acc) ->
|
||||
{ok, lists:reverse(Acc)};
|
||||
fire_each([Spec | Rest], A, AS, ACid, Fired, Cfg, Acc) ->
|
||||
TCid = trigger_registry:spec_cid(Spec),
|
||||
Pair = {ACid, TCid},
|
||||
AlreadyFired = pair_member(Pair, Fired) orelse acc_member(Pair, Acc),
|
||||
Pass = (not AlreadyFired) andalso flow_dispatch:guard_passes(Spec, A, AS),
|
||||
case Pass of
|
||||
false ->
|
||||
fire_each(Rest, A, AS, ACid, Fired, Cfg, Acc);
|
||||
true ->
|
||||
Outcome = case flow_dispatch:start(Spec, A, AS, Cfg) of
|
||||
{ok, FlowId, _Audit} -> {ok, FlowId};
|
||||
{error, Reason} -> {error, Reason}
|
||||
end,
|
||||
fire_each(Rest, A, AS, ACid, Fired, Cfg, [{ACid, TCid, Outcome} | Acc])
|
||||
end.
|
||||
|
||||
activity_type_of(Activity) ->
|
||||
case envelope:get_field(type, Activity) of
|
||||
{ok, Type} -> Type;
|
||||
_ -> undefined
|
||||
end.
|
||||
|
||||
trigger_activity_cid(Activity) ->
|
||||
case envelope:get_field(id, Activity) of
|
||||
{ok, Cid} -> Cid;
|
||||
_ -> undefined
|
||||
end.
|
||||
|
||||
field_or_default(Key, Proplist, Default) ->
|
||||
case envelope:get_field(Key, Proplist) of
|
||||
{ok, V} -> V;
|
||||
_ -> Default
|
||||
end.
|
||||
|
||||
%% pair_member/2 — {ACid, TCid} present in a [{ACid, TCid}] fired list.
|
||||
pair_member(_, []) -> false;
|
||||
pair_member(P, [P | _]) -> true;
|
||||
pair_member(P, [_ | Rest]) -> pair_member(P, Rest).
|
||||
|
||||
%% acc_member/2 — {ACid, TCid} already dispatched this call (Acc holds
|
||||
%% {ACid, TCid, Outcome} triples).
|
||||
acc_member(_, []) -> false;
|
||||
acc_member({A, T}, [{A, T, _} | _]) -> true;
|
||||
acc_member(P, [_ | Rest]) -> acc_member(P, Rest).
|
||||
|
||||
180
next/kernel/trigger_registry.erl
Normal file
180
next/kernel/trigger_registry.erl
Normal file
@@ -0,0 +1,180 @@
|
||||
-module(trigger_registry).
|
||||
-export([new/0, add/3, remove/2, lookup/2, all/1, fold/2, fold_fn/0,
|
||||
mk_spec/4, spec_cid/1, spec_flow_name/1, spec_guard/1,
|
||||
spec_actor_scope/1,
|
||||
start_link/0, start_link/1, stop/0,
|
||||
add/2, remove/1, lookup/1, all_triggers/0]).
|
||||
-export([init/1, handle_call/3, handle_cast/2, handle_info/2]).
|
||||
-behaviour(gen_server).
|
||||
|
||||
%% Trigger registry — binds activity-types to durable flows
|
||||
%% (plans/agent-briefings/fed-sx-triggers-loop.md, Phase 1). When an
|
||||
%% activity is appended, the kernel's post-append fan-out
|
||||
%% (pipeline.erl, Phase 2) looks the activity's type up here and starts
|
||||
%% each registered flow. Mirrors the peer_actors / peer_types shape: a
|
||||
%% pure-functional core plus a registered gen_server, hydrated on start
|
||||
%% from a fold over DefineTrigger activities.
|
||||
%%
|
||||
%% State shape (pure-functional):
|
||||
%% [{ActivityType, [Spec, ...]}, ...]
|
||||
%% Multiple triggers may bind the same activity-type; they fire
|
||||
%% independently. A Spec is a 4-tuple:
|
||||
%% {TriggerCid, FlowName, Guard, ActorScope}
|
||||
%% TriggerCid — content-address of the DefineTrigger activity
|
||||
%% (dedup + audit); `undefined` if not yet addressed.
|
||||
%% FlowName — the flow_store-registered flow to start.
|
||||
%% Guard — fun ((Activity, ActorState) -> bool) | undefined.
|
||||
%% Lets one type bind multiple flows with
|
||||
%% discriminators ("only Articles in :newsletter").
|
||||
%% Resolved to a fun at registration; not carried over
|
||||
%% the wire (term_codec can't encode funs).
|
||||
%% ActorScope — an actor id the trigger is scoped to, or `any`.
|
||||
|
||||
%% ── Spec constructor / accessors ────────────────────────────────
|
||||
|
||||
mk_spec(TriggerCid, FlowName, Guard, ActorScope) ->
|
||||
{TriggerCid, FlowName, Guard, ActorScope}.
|
||||
|
||||
spec_cid({Cid, _, _, _}) -> Cid.
|
||||
spec_flow_name({_, FlowName, _, _}) -> FlowName.
|
||||
spec_guard({_, _, Guard, _}) -> Guard.
|
||||
spec_actor_scope({_, _, _, Scope}) -> Scope.
|
||||
|
||||
%% ── Pure-functional API ─────────────────────────────────────────
|
||||
|
||||
new() -> [].
|
||||
|
||||
%% add(ActivityType, Spec, State) — append Spec to ActivityType's list.
|
||||
add(ActivityType, Spec, State) ->
|
||||
Existing = lookup(ActivityType, State),
|
||||
set_keyed(ActivityType, append1(Existing, Spec), State).
|
||||
|
||||
%% remove(TriggerCid, State) — drop every spec carrying TriggerCid,
|
||||
%% across all activity-types; empties are pruned.
|
||||
remove(TriggerCid, State) ->
|
||||
prune([{T, drop_cid(TriggerCid, Specs)} || {T, Specs} <- State]).
|
||||
|
||||
%% lookup(ActivityType, State) — the specs bound to ActivityType ([] if
|
||||
%% none).
|
||||
lookup(ActivityType, State) ->
|
||||
case find_keyed(ActivityType, State) of
|
||||
{ok, Specs} -> Specs;
|
||||
not_found -> []
|
||||
end.
|
||||
|
||||
all(State) -> State.
|
||||
|
||||
%% ── Hydration fold ──────────────────────────────────────────────
|
||||
%%
|
||||
%% fold(Activity, State) — register the binding carried by a
|
||||
%% DefineTrigger activity. Replaying the actor log through this fold
|
||||
%% rebuilds the registry after a restart (same content-addressing
|
||||
%% discipline as define_registry). A non-DefineTrigger activity passes
|
||||
%% through untouched.
|
||||
|
||||
fold(Activity, State) ->
|
||||
case envelope:get_field(type, Activity) of
|
||||
{ok, define_trigger} -> fold_trigger(Activity, State);
|
||||
_ -> State
|
||||
end.
|
||||
|
||||
fold_trigger(Activity, State) ->
|
||||
case envelope:get_field(object, Activity) of
|
||||
{ok, Obj} ->
|
||||
case binding_of(Activity, Obj) of
|
||||
{ok, AType, Spec} -> add(AType, Spec, State);
|
||||
not_a_binding -> State
|
||||
end;
|
||||
_ -> State
|
||||
end.
|
||||
|
||||
binding_of(Activity, Obj) ->
|
||||
case envelope:get_field(activity_type, Obj) of
|
||||
{ok, AType} ->
|
||||
case envelope:get_field(flow_name, Obj) of
|
||||
{ok, FlowName} ->
|
||||
Guard = field_or(guard, Obj, undefined),
|
||||
Scope = field_or(actor_scope, Obj, any),
|
||||
Cid = field_or(id, Activity, undefined),
|
||||
{ok, AType, mk_spec(Cid, FlowName, Guard, Scope)};
|
||||
_ -> not_a_binding
|
||||
end;
|
||||
_ -> not_a_binding
|
||||
end.
|
||||
|
||||
%% fold_fn/0 — a 2-arity fun the projection scheduler can plant.
|
||||
fold_fn() ->
|
||||
fun (Activity, State) -> fold(Activity, State) end.
|
||||
|
||||
%% ── gen_server wrapper ──────────────────────────────────────────
|
||||
|
||||
start_link() ->
|
||||
start_link([]).
|
||||
|
||||
start_link(InitialState) ->
|
||||
Pid = gen_server:start_link(trigger_registry, [InitialState]),
|
||||
erlang:register(trigger_registry, Pid),
|
||||
Pid.
|
||||
|
||||
stop() ->
|
||||
R = gen_server:call(trigger_registry, '$gen_stop'),
|
||||
erlang:unregister(trigger_registry),
|
||||
R.
|
||||
|
||||
add(ActivityType, Spec) ->
|
||||
gen_server:call(trigger_registry, {add, ActivityType, Spec}).
|
||||
|
||||
remove(TriggerCid) ->
|
||||
gen_server:call(trigger_registry, {remove, TriggerCid}).
|
||||
|
||||
lookup(ActivityType) ->
|
||||
gen_server:call(trigger_registry, {lookup, ActivityType}).
|
||||
|
||||
all_triggers() ->
|
||||
gen_server:call(trigger_registry, all_triggers).
|
||||
|
||||
init([InitialState]) ->
|
||||
{ok, InitialState}.
|
||||
|
||||
handle_call({add, ActivityType, Spec}, _From, State) ->
|
||||
{reply, ok, add(ActivityType, Spec, State)};
|
||||
handle_call({remove, TriggerCid}, _From, State) ->
|
||||
{reply, ok, remove(TriggerCid, State)};
|
||||
handle_call({lookup, ActivityType}, _From, State) ->
|
||||
{reply, lookup(ActivityType, State), State};
|
||||
handle_call(all_triggers, _From, State) ->
|
||||
{reply, State, State}.
|
||||
|
||||
handle_cast(_, S) -> {noreply, S}.
|
||||
|
||||
handle_info(_, S) -> {noreply, S}.
|
||||
|
||||
%% ── helpers ─────────────────────────────────────────────────────
|
||||
|
||||
field_or(Key, Proplist, Default) ->
|
||||
case envelope:get_field(Key, Proplist) of
|
||||
{ok, V} -> V;
|
||||
_ -> Default
|
||||
end.
|
||||
|
||||
drop_cid(_, []) -> [];
|
||||
drop_cid(Cid, [Spec | Rest]) ->
|
||||
case spec_cid(Spec) of
|
||||
Cid -> drop_cid(Cid, Rest);
|
||||
_ -> [Spec | drop_cid(Cid, Rest)]
|
||||
end.
|
||||
|
||||
prune([]) -> [];
|
||||
prune([{_, []} | Rest]) -> prune(Rest);
|
||||
prune([P | Rest]) -> [P | prune(Rest)].
|
||||
|
||||
append1([], X) -> [X];
|
||||
append1([H | T], X) -> [H | append1(T, X)].
|
||||
|
||||
find_keyed(_, []) -> not_found;
|
||||
find_keyed(K, [{K, V} | _]) -> {ok, V};
|
||||
find_keyed(K, [_ | Rest]) -> find_keyed(K, Rest).
|
||||
|
||||
set_keyed(K, V, []) -> [{K, V}];
|
||||
set_keyed(K, V, [{K, _} | Rest]) -> [{K, V} | Rest];
|
||||
set_keyed(K, V, [P | Rest]) -> [P | set_keyed(K, V, Rest)].
|
||||
@@ -79,7 +79,7 @@ cat > "$TMPFILE" <<'EPOCHS'
|
||||
(eval "(get (erlang-eval-ast \"R = bootstrap:read_genesis(), {ok, S1} = bootstrap:load_genesis(R), {ok, S2} = bootstrap:load_genesis(R), cid:to_string(S1) =:= cid:to_string(S2)\") :name)")
|
||||
EPOCHS
|
||||
|
||||
OUTPUT=$(timeout 300 "$SX_SERVER" < "$TMPFILE" 2>/dev/null)
|
||||
OUTPUT=$(timeout 590 "$SX_SERVER" < "$TMPFILE" 2>/dev/null)
|
||||
|
||||
check() {
|
||||
local epoch="$1" desc="$2" expected="$3"
|
||||
@@ -106,7 +106,7 @@ check 10 "strip suffix create.sx -> create" "true"
|
||||
check 11 "strip suffix hello unchanged" "true"
|
||||
check 12 "strip suffix .sx -> empty" "true"
|
||||
check 13 "load_genesis rejects bad shape" "ok"
|
||||
check 20 "loaded activity_types count = 5" "5"
|
||||
check 20 "loaded activity_types count = 8" "8"
|
||||
check 21 "loaded object_types count = 13" "13"
|
||||
check 22 "loaded projections count = 7" "7"
|
||||
check 23 "loaded validators count = 3" "3"
|
||||
|
||||
@@ -99,8 +99,8 @@ check() {
|
||||
check 2 "gen_server loaded" "gen_server"
|
||||
check 3 "registry loaded" "registry"
|
||||
check 4 "bootstrap loaded" "bootstrap"
|
||||
check 10 "populate returns total 36" "36"
|
||||
check 20 "activity_types count = 5" "5"
|
||||
check 10 "populate returns total 39" "39"
|
||||
check 20 "activity_types count = 8" "8"
|
||||
check 21 "object_types count = 13" "13"
|
||||
check 22 "projections count = 7" "7"
|
||||
check 23 "validators count = 3" "3"
|
||||
|
||||
@@ -102,7 +102,7 @@ check 10 "sections/0 length" "7"
|
||||
check 11 "ends_with_sx create.sx" "true"
|
||||
check 12 "ends_with_sx hello" "false"
|
||||
check 13 "ends_with_sx empty" "false"
|
||||
check 20 "section activity_types count" "5"
|
||||
check 20 "section activity_types count" "8"
|
||||
check 21 "section object_types count" "13"
|
||||
check 22 "section projections count" "7"
|
||||
check 23 "section validators count" "3"
|
||||
@@ -111,7 +111,7 @@ check 25 "section sig_suites count" "2"
|
||||
check 26 "section audience count" "3"
|
||||
check 30 "read_genesis returns 7 sections" "7"
|
||||
check 31 "first section name" "activity_types"
|
||||
check 32 "first section entry count" "5"
|
||||
check 32 "first section entry count" "8"
|
||||
|
||||
TOTAL=$((PASS+FAIL))
|
||||
if [ $FAIL -eq 0 ]; then
|
||||
|
||||
@@ -54,6 +54,12 @@ cat > "$TMPFILE" <<EPOCHS
|
||||
(eval "(get (erlang-load-module (file-read \"next/kernel/nx_kernel.erl\")) :name)")
|
||||
(epoch 10)
|
||||
(eval "(get (erlang-load-module (file-read \"next/kernel/bootstrap.erl\")) :name)")
|
||||
;; outbox:publish computes a delivery set via follower_graph + delivery
|
||||
;; (compute_delivery_set/3) — load both so the publish path resolves.
|
||||
(epoch 11)
|
||||
(eval "(get (erlang-load-module (file-read \"next/kernel/follower_graph.erl\")) :name)")
|
||||
(epoch 12)
|
||||
(eval "(get (erlang-load-module (file-read \"next/kernel/delivery.erl\")) :name)")
|
||||
|
||||
;; bootstrap:start returns a Pid
|
||||
(epoch 20)
|
||||
@@ -115,10 +121,10 @@ check() {
|
||||
|
||||
check 10 "bootstrap module loaded" "bootstrap"
|
||||
check 20 "whereis(nx_kernel) is Pid" "true"
|
||||
check 21 "activity_types count = 5" "5"
|
||||
check 21 "activity_types count = 8" "8"
|
||||
check 22 "object_types count = 13" "13"
|
||||
check 23 "projections count = 7" "7"
|
||||
check 24 "total entries = 36" "36"
|
||||
check 24 "total entries = 39" "39"
|
||||
check 25 "fresh log_tip = 0" "0"
|
||||
check 26 "publish advances tip to 1" "1"
|
||||
check 27 "actor_id = alice" "true"
|
||||
|
||||
99
next/tests/define_trigger.sh
Executable file
99
next/tests/define_trigger.sh
Executable file
@@ -0,0 +1,99 @@
|
||||
#!/usr/bin/env bash
|
||||
# next/tests/define_trigger.sh — fed-sx triggers Phase 1 (verb).
|
||||
#
|
||||
# The DefineTrigger genesis verb
|
||||
# (next/genesis/activity-types/define_trigger.sx) binds an activity-type
|
||||
# to a flow. This suite confirms it parses with the expected
|
||||
# DefineActivity head + :name, that its :schema accepts a well-formed
|
||||
# binding and rejects malformed ones, and that a DefineTrigger envelope
|
||||
# round-trips through term_codec.
|
||||
|
||||
set -uo pipefail
|
||||
cd "$(git rev-parse --show-toplevel)"
|
||||
|
||||
SX_SERVER="${SX_SERVER:-hosts/ocaml/_build/default/bin/sx_server.exe}"
|
||||
if [ ! -x "$SX_SERVER" ]; then
|
||||
SX_SERVER="/root/rose-ash/hosts/ocaml/_build/default/bin/sx_server.exe"
|
||||
fi
|
||||
if [ ! -x "$SX_SERVER" ]; then
|
||||
echo "ERROR: sx_server.exe not found." >&2
|
||||
exit 1
|
||||
fi
|
||||
|
||||
VERBOSE="${1:-}"
|
||||
PASS=0; FAIL=0; ERRORS=""
|
||||
TMPFILE=$(mktemp); trap "rm -f $TMPFILE" EXIT
|
||||
|
||||
SCH='(eval-expr (get (apply dict (rest (parse (file-read \"next/genesis/activity-types/define_trigger.sx\")))) :schema))'
|
||||
|
||||
cat > "$TMPFILE" <<EPOCHS
|
||||
(epoch 1)
|
||||
(load "lib/erlang/tokenizer.sx")
|
||||
(load "lib/erlang/parser.sx")
|
||||
(load "lib/erlang/parser-core.sx")
|
||||
(load "lib/erlang/parser-expr.sx")
|
||||
(load "lib/erlang/parser-module.sx")
|
||||
(load "lib/erlang/transpile.sx")
|
||||
(load "lib/erlang/runtime.sx")
|
||||
(load "lib/erlang/vm/dispatcher.sx")
|
||||
(epoch 2)
|
||||
(eval "(get (erlang-load-module (file-read \"next/kernel/term_codec.erl\")) :name)")
|
||||
|
||||
;; ── parse / shape ──────────────────────────────────────────
|
||||
(epoch 10)
|
||||
(eval "(first (parse (file-read \"next/genesis/activity-types/define_trigger.sx\")))")
|
||||
(epoch 11)
|
||||
(eval "(get (apply dict (rest (parse (file-read \"next/genesis/activity-types/define_trigger.sx\")))) :name)")
|
||||
|
||||
;; ── schema accept / reject ─────────────────────────────────
|
||||
;; valid binding: string :activity-type + :flow-name -> true
|
||||
(epoch 20)
|
||||
(eval "(define sch ${SCH}) (sch (dict :object (dict :activity-type \"Create\" :flow-name \"blog-publish-digest\")))")
|
||||
;; reject: missing :activity-type -> false
|
||||
(epoch 21)
|
||||
(eval "(define sch ${SCH}) (sch (dict :object (dict :flow-name \"f\")))")
|
||||
;; reject: missing :flow-name -> false
|
||||
(epoch 22)
|
||||
(eval "(define sch ${SCH}) (sch (dict :object (dict :activity-type \"Create\")))")
|
||||
|
||||
;; ── envelope round-trip through term_codec ─────────────────
|
||||
(epoch 30)
|
||||
(eval "(get (erlang-eval-ast \"A = [{type, define_trigger}, {actor, alice}, {object, [{activity_type, create}, {flow_name, blog_publish_digest}]}], {ok, D, _} = term_codec:decode(term_codec:encode(A)), D =:= A\") :name)")
|
||||
EPOCHS
|
||||
|
||||
OUTPUT=$(timeout 180 "$SX_SERVER" < "$TMPFILE" 2>/dev/null)
|
||||
|
||||
check() {
|
||||
local epoch="$1" desc="$2" expected="$3"
|
||||
local actual
|
||||
actual=$(echo "$OUTPUT" | awk -v e="$epoch" '
|
||||
$0 ~ "^\\(ok-len " e " " { getline; print; exit }
|
||||
$0 ~ "^\\(ok " e " " { print; exit }
|
||||
$0 ~ "^\\(error " e " " { print; exit }
|
||||
')
|
||||
[ -z "$actual" ] && actual="<no output for epoch $epoch>"
|
||||
if echo "$actual" | grep -qF -- "$expected"; then
|
||||
PASS=$((PASS+1))
|
||||
[ "$VERBOSE" = "-v" ] && echo " ok $desc"
|
||||
else
|
||||
FAIL=$((FAIL+1))
|
||||
ERRORS+=" FAIL [$desc] (epoch $epoch) expected: $expected | actual: $actual
|
||||
"
|
||||
fi
|
||||
}
|
||||
|
||||
check 10 "define_trigger.sx head form" "DefineActivity"
|
||||
check 11 "define_trigger.sx name" "DefineTrigger"
|
||||
check 20 "schema accepts valid binding" "true"
|
||||
check 21 "schema rejects missing type" "false"
|
||||
check 22 "schema rejects missing flow-name" "false"
|
||||
check 30 "DefineTrigger envelope round-trips" "true"
|
||||
|
||||
TOTAL=$((PASS+FAIL))
|
||||
if [ $FAIL -eq 0 ]; then
|
||||
echo "ok $PASS/$TOTAL next/tests/define_trigger.sh passed"
|
||||
else
|
||||
echo "FAIL $PASS/$TOTAL passed, $FAIL failed:"
|
||||
echo "$ERRORS"
|
||||
fi
|
||||
[ $FAIL -eq 0 ]
|
||||
110
next/tests/define_type.sh
Executable file
110
next/tests/define_type.sh
Executable file
@@ -0,0 +1,110 @@
|
||||
#!/usr/bin/env bash
|
||||
# next/tests/define_type.sh — host-type federation Phase 1 acceptance.
|
||||
#
|
||||
# The DefineType genesis verb (next/genesis/activity-types/define_type.sx)
|
||||
# declares a refinement type. This suite confirms:
|
||||
# - the file parses with the expected DefineActivity head + :name
|
||||
# - the :schema predicate accepts a well-formed type-definition
|
||||
# activity and rejects malformed ones (missing :name, non-list
|
||||
# :fields)
|
||||
# - a DefineType envelope round-trips through term_codec
|
||||
#
|
||||
# Schema bodies are SX source; we eval them with `eval-expr` and call
|
||||
# the resulting lambda directly (note: `apply` does not spread into
|
||||
# SX lambdas in this kernel, and keyword-getters are not callable —
|
||||
# the schema uses nested `get`). 7 cases.
|
||||
|
||||
set -uo pipefail
|
||||
cd "$(git rev-parse --show-toplevel)"
|
||||
|
||||
SX_SERVER="${SX_SERVER:-hosts/ocaml/_build/default/bin/sx_server.exe}"
|
||||
if [ ! -x "$SX_SERVER" ]; then
|
||||
SX_SERVER="/root/rose-ash/hosts/ocaml/_build/default/bin/sx_server.exe"
|
||||
fi
|
||||
if [ ! -x "$SX_SERVER" ]; then
|
||||
echo "ERROR: sx_server.exe not found." >&2
|
||||
exit 1
|
||||
fi
|
||||
|
||||
VERBOSE="${1:-}"
|
||||
PASS=0; FAIL=0; ERRORS=""
|
||||
TMPFILE=$(mktemp); trap "rm -f $TMPFILE" EXIT
|
||||
|
||||
# The schema fn, evaluated from the genesis file into a lambda.
|
||||
SCH='(eval-expr (get (apply dict (rest (parse (file-read \"next/genesis/activity-types/define_type.sx\")))) :schema))'
|
||||
|
||||
cat > "$TMPFILE" <<EPOCHS
|
||||
(epoch 1)
|
||||
(load "lib/erlang/tokenizer.sx")
|
||||
(load "lib/erlang/parser.sx")
|
||||
(load "lib/erlang/parser-core.sx")
|
||||
(load "lib/erlang/parser-expr.sx")
|
||||
(load "lib/erlang/parser-module.sx")
|
||||
(load "lib/erlang/transpile.sx")
|
||||
(load "lib/erlang/runtime.sx")
|
||||
(load "lib/erlang/vm/dispatcher.sx")
|
||||
(epoch 2)
|
||||
(eval "(get (erlang-load-module (file-read \"next/kernel/term_codec.erl\")) :name)")
|
||||
|
||||
;; ── parse / shape ──────────────────────────────────────────
|
||||
(epoch 10)
|
||||
(eval "(first (parse (file-read \"next/genesis/activity-types/define_type.sx\")))")
|
||||
(epoch 11)
|
||||
(eval "(get (apply dict (rest (parse (file-read \"next/genesis/activity-types/define_type.sx\")))) :name)")
|
||||
|
||||
;; ── schema accept / reject ─────────────────────────────────
|
||||
;; valid: :object with string :name and list :fields -> true
|
||||
(epoch 20)
|
||||
(eval "(define sch ${SCH}) (sch (dict :object (dict :name \"Post\" :fields (list))))")
|
||||
;; valid: :fields omitted (optional) -> true
|
||||
(epoch 21)
|
||||
(eval "(define sch ${SCH}) (sch (dict :object (dict :name \"Post\")))")
|
||||
;; reject: missing :name -> false
|
||||
(epoch 22)
|
||||
(eval "(define sch ${SCH}) (sch (dict :object (dict :fields (list))))")
|
||||
;; reject: :fields present but not a list -> false
|
||||
(epoch 23)
|
||||
(eval "(define sch ${SCH}) (sch (dict :object (dict :name \"Post\" :fields \"notalist\")))")
|
||||
|
||||
;; ── envelope round-trip through term_codec ─────────────────
|
||||
(epoch 30)
|
||||
(eval "(get (erlang-eval-ast \"A = [{type, define_type}, {actor, alice}, {object, [{name, <<80,111,115,116>>}, {instance_type, <<78,111,116,101>>}]}], {ok, D, _} = term_codec:decode(term_codec:encode(A)), D =:= A\") :name)")
|
||||
EPOCHS
|
||||
|
||||
OUTPUT=$(timeout 180 "$SX_SERVER" < "$TMPFILE" 2>/dev/null)
|
||||
|
||||
check() {
|
||||
local epoch="$1" desc="$2" expected="$3"
|
||||
local actual
|
||||
actual=$(echo "$OUTPUT" | awk -v e="$epoch" '
|
||||
$0 ~ "^\\(ok-len " e " " { getline; print; exit }
|
||||
$0 ~ "^\\(ok " e " " { print; exit }
|
||||
$0 ~ "^\\(error " e " " { print; exit }
|
||||
')
|
||||
[ -z "$actual" ] && actual="<no output for epoch $epoch>"
|
||||
if echo "$actual" | grep -qF -- "$expected"; then
|
||||
PASS=$((PASS+1))
|
||||
[ "$VERBOSE" = "-v" ] && echo " ok $desc"
|
||||
else
|
||||
FAIL=$((FAIL+1))
|
||||
ERRORS+=" FAIL [$desc] (epoch $epoch) expected: $expected | actual: $actual
|
||||
"
|
||||
fi
|
||||
}
|
||||
|
||||
check 10 "define_type.sx head form" "DefineActivity"
|
||||
check 11 "define_type.sx name" "DefineType"
|
||||
check 20 "schema accepts valid type def" "true"
|
||||
check 21 "schema accepts omitted :fields" "true"
|
||||
check 22 "schema rejects missing :name" "false"
|
||||
check 23 "schema rejects non-list :fields" "false"
|
||||
check 30 "DefineType envelope round-trips" "true"
|
||||
|
||||
TOTAL=$((PASS+FAIL))
|
||||
if [ $FAIL -eq 0 ]; then
|
||||
echo "ok $PASS/$TOTAL next/tests/define_type.sh passed"
|
||||
else
|
||||
echo "FAIL $PASS/$TOTAL passed, $FAIL failed:"
|
||||
echo "$ERRORS"
|
||||
fi
|
||||
[ $FAIL -eq 0 ]
|
||||
176
next/tests/discovery_type_fetch.sh
Executable file
176
next/tests/discovery_type_fetch.sh
Executable file
@@ -0,0 +1,176 @@
|
||||
#!/usr/bin/env bash
|
||||
# next/tests/discovery_type_fetch.sh — host-type federation Phase 3.
|
||||
#
|
||||
# Client side of the type-doc wire: discovery_type_fetch builds the
|
||||
# fun/2 closure peer_types:lookup_or_fetch calls on a cache miss. It
|
||||
# GETs <base>/types/<cid> with the type-doc Accept header and returns
|
||||
# the RAW response bytes (peer_types decodes them via term_codec).
|
||||
# Exercised end-to-end against a background python http server that
|
||||
# serves hand-crafted term_codec bytes, so we test the wire — not just
|
||||
# an in-process call.
|
||||
|
||||
set -uo pipefail
|
||||
cd "$(git rev-parse --show-toplevel)"
|
||||
|
||||
SX_SERVER="${SX_SERVER:-hosts/ocaml/_build/default/bin/sx_server.exe}"
|
||||
if [ ! -x "$SX_SERVER" ]; then
|
||||
SX_SERVER="/root/rose-ash/hosts/ocaml/_build/default/bin/sx_server.exe"
|
||||
fi
|
||||
if [ ! -x "$SX_SERVER" ]; then
|
||||
echo "ERROR: sx_server.exe not found." >&2
|
||||
exit 1
|
||||
fi
|
||||
|
||||
VERBOSE="${1:-}"
|
||||
PASS=0; FAIL=0; ERRORS=""
|
||||
|
||||
# ── live stub server ─────────────────────────────────────────
|
||||
# GET /types/bafy1 -> 200 with term_codec-encoded TypeRecord
|
||||
# TR = [{name, <<"Post">>}, {instance_type, <<"Note">>}]
|
||||
# GET anything else -> 404
|
||||
PORT=$(python3 -c 'import socket;s=socket.socket();s.bind(("127.0.0.1",0));print(s.getsockname()[1]);s.close()')
|
||||
SRVROOT=$(mktemp -d)
|
||||
PYSRV="$SRVROOT/srv.py"
|
||||
cat > "$PYSRV" <<'PY'
|
||||
import sys, http.server, socketserver
|
||||
|
||||
PORT = int(sys.argv[1])
|
||||
|
||||
# term_codec encoding (mirror of next/kernel/term_codec.erl).
|
||||
def enc_atom(s):
|
||||
b = s.encode()
|
||||
return f"a{len(b)}:".encode() + b
|
||||
def enc_bin(b):
|
||||
return f"b{len(b)}:".encode() + b
|
||||
def enc_tuple(items):
|
||||
return f"t{len(items)}:".encode() + b"".join(items)
|
||||
def enc_list(items):
|
||||
return f"l{len(items)}:".encode() + b"".join(items)
|
||||
|
||||
# [{name, <<"Post">>}, {instance_type, <<"Note">>}]
|
||||
TYPEDOC = enc_list([
|
||||
enc_tuple([enc_atom("name"), enc_bin(b"Post")]),
|
||||
enc_tuple([enc_atom("instance_type"), enc_bin(b"Note")]),
|
||||
])
|
||||
|
||||
class H(http.server.BaseHTTPRequestHandler):
|
||||
def do_GET(self):
|
||||
if self.path == "/types/bafy1":
|
||||
self.send_response(200)
|
||||
self.send_header('content-type','application/vnd.fed-sx.type-doc')
|
||||
self.send_header('content-length', str(len(TYPEDOC)))
|
||||
self.end_headers()
|
||||
self.wfile.write(TYPEDOC)
|
||||
else:
|
||||
self.send_response(404); self.end_headers(); self.wfile.write(b'not found')
|
||||
def log_message(self, fmt, *args): pass
|
||||
|
||||
with socketserver.TCPServer(("127.0.0.1", PORT), H) as srv:
|
||||
srv.serve_forever()
|
||||
PY
|
||||
python3 "$PYSRV" "$PORT" >/dev/null 2>&1 &
|
||||
SRV_PID=$!
|
||||
TMPFILE=$(mktemp)
|
||||
trap "rm -rf $SRVROOT $TMPFILE; kill $SRV_PID 2>/dev/null || true" EXIT
|
||||
for _ in 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15; do
|
||||
if curl -fsS "http://127.0.0.1:$PORT/types/bafy1" >/dev/null 2>&1; then break; fi
|
||||
sleep 0.2
|
||||
done
|
||||
|
||||
bytes_of() { python3 -c "import sys; print(','.join(str(b) for b in sys.argv[1].encode()))" "$1"; }
|
||||
URL_BASE_BYTES=$(bytes_of "http://127.0.0.1:$PORT")
|
||||
|
||||
cat > "$TMPFILE" <<'EPOCHS'
|
||||
(epoch 1)
|
||||
(load "lib/erlang/tokenizer.sx")
|
||||
(load "lib/erlang/parser.sx")
|
||||
(load "lib/erlang/parser-core.sx")
|
||||
(load "lib/erlang/parser-expr.sx")
|
||||
(load "lib/erlang/parser-module.sx")
|
||||
(load "lib/erlang/transpile.sx")
|
||||
(load "lib/erlang/runtime.sx")
|
||||
(load "lib/erlang/vm/dispatcher.sx")
|
||||
(epoch 2)
|
||||
(eval "(er-load-gen-server!)")
|
||||
(epoch 3)
|
||||
(eval "(get (erlang-load-module (file-read \"next/kernel/term_codec.erl\")) :name)")
|
||||
(epoch 4)
|
||||
(eval "(get (erlang-load-module (file-read \"next/kernel/peer_types.erl\")) :name)")
|
||||
(epoch 5)
|
||||
(eval "(get (erlang-load-module (file-read \"next/kernel/discovery_type_fetch.erl\")) :name)")
|
||||
|
||||
;; accept_header is the 31-byte type-doc MIME
|
||||
(epoch 10)
|
||||
(eval "(get (erlang-eval-ast \"byte_size(discovery_type_fetch:accept_header()) =:= 31\") :name)")
|
||||
|
||||
;; type_doc_url builds <base>/types/bafy1
|
||||
(epoch 11)
|
||||
(eval "(get (erlang-eval-ast \"U = discovery_type_fetch:type_doc_url(<<__URL_BASE__>>, <<98,97,102,121,49>>), U =:= <<__URL_BASE__,47,116,121,112,101,115,47,98,97,102,121,49>>\") :name)")
|
||||
|
||||
;; resolve_type_url via the static type_url proplist
|
||||
(epoch 12)
|
||||
(eval "(get (erlang-eval-ast \"discovery_type_fetch:resolve_type_url(<<98,97,102,121,49>>, [{type_url, [{<<98,97,102,121,49>>, <<__URL_BASE__>>}]}]) =:= {ok, <<__URL_BASE__>>}\") :name)")
|
||||
|
||||
;; fetch live -> {ok, Bytes} that decode to the TypeRecord
|
||||
(epoch 13)
|
||||
(eval "(get (erlang-eval-ast \"R = discovery_type_fetch:fetch(<<__URL_BASE__,47,116,121,112,101,115,47,98,97,102,121,49>>, []), case R of {ok, B} -> {ok, TR, _} = term_codec:decode(B), TR =:= [{name, <<80,111,115,116>>}, {instance_type, <<78,111,116,101>>}]; _ -> false end\") :name)")
|
||||
|
||||
;; closure from make_fetch_fn/0 dispatches and returns raw bytes
|
||||
(epoch 14)
|
||||
(eval "(get (erlang-eval-ast \"Fn = discovery_type_fetch:make_fetch_fn(), Cfg = [{type_url, [{<<98,97,102,121,49>>, <<__URL_BASE__>>}]}], case Fn(<<98,97,102,121,49>>, Cfg) of {ok, B} -> {ok, TR, _} = term_codec:decode(B), TR =:= [{name, <<80,111,115,116>>}, {instance_type, <<78,111,116,101>>}]; _ -> false end\") :name)")
|
||||
|
||||
;; closure with no resolver -> {error, no_type_url}
|
||||
(epoch 15)
|
||||
(eval "(get (erlang-eval-ast \"Fn = discovery_type_fetch:make_fetch_fn(), case Fn(<<98,97,102,121,49>>, []) of {error, no_type_url} -> true; _ -> false end\") :name)")
|
||||
|
||||
;; fetch on an unknown cid path -> {error, {status, 404}}
|
||||
(epoch 16)
|
||||
(eval "(get (erlang-eval-ast \"R = discovery_type_fetch:fetch(<<__URL_BASE__,47,116,121,112,101,115,47,122,122,122>>, []), case R of {error, {status, 404}} -> true; _ -> false end\") :name)")
|
||||
|
||||
;; end-to-end: peer_types:lookup_or_fetch uses the closure, decodes,
|
||||
;; and writes the TypeRecord into the cache
|
||||
(epoch 17)
|
||||
(eval "(get (erlang-eval-ast \"Fn = discovery_type_fetch:make_fetch_fn(), Cfg = [{type_fetch_fn, Fn}, {type_url, [{<<98,97,102,121,49>>, <<__URL_BASE__>>}]}], case peer_types:lookup_or_fetch(<<98,97,102,121,49>>, Cfg, peer_types:new()) of {ok, TR, S} -> TR =:= [{name, <<80,111,115,116>>}, {instance_type, <<78,111,116,101>>}] andalso peer_types:types(S) =:= [<<98,97,102,121,49>>]; _ -> false end\") :name)")
|
||||
EPOCHS
|
||||
|
||||
sed -i "s|__URL_BASE__|${URL_BASE_BYTES}|g" "$TMPFILE"
|
||||
|
||||
OUTPUT=$(timeout 300 "$SX_SERVER" < "$TMPFILE" 2>/dev/null)
|
||||
|
||||
check() {
|
||||
local epoch="$1" desc="$2" expected="$3"
|
||||
local actual
|
||||
actual=$(echo "$OUTPUT" | awk -v e="$epoch" '
|
||||
$0 ~ "^\\(ok-len " e " " { getline; print; exit }
|
||||
$0 ~ "^\\(ok " e " " { print; exit }
|
||||
$0 ~ "^\\(error " e " " { print; exit }
|
||||
')
|
||||
[ -z "$actual" ] && actual="<no output for epoch $epoch>"
|
||||
if echo "$actual" | grep -qF -- "$expected"; then
|
||||
PASS=$((PASS+1))
|
||||
[ "$VERBOSE" = "-v" ] && echo " ok $desc"
|
||||
else
|
||||
FAIL=$((FAIL+1))
|
||||
ERRORS+=" FAIL [$desc] (epoch $epoch) expected: $expected | actual: $actual
|
||||
"
|
||||
fi
|
||||
}
|
||||
|
||||
check 5 "discovery_type_fetch loaded" "discovery_type_fetch"
|
||||
check 10 "accept_header is 31-byte type-doc" "true"
|
||||
check 11 "type_doc_url builds /types/<cid>" "true"
|
||||
check 12 "resolve_type_url via type_url map" "true"
|
||||
check 13 "fetch live -> raw bytes decode to TR" "true"
|
||||
check 14 "closure -> raw bytes decode to TR" "true"
|
||||
check 15 "closure no resolver -> no_type_url" "true"
|
||||
check 16 "fetch 404 path -> {status, 404}" "true"
|
||||
check 17 "lookup_or_fetch caches fetched TR" "true"
|
||||
|
||||
TOTAL=$((PASS+FAIL))
|
||||
if [ $FAIL -eq 0 ]; then
|
||||
echo "ok $PASS/$TOTAL next/tests/discovery_type_fetch.sh passed"
|
||||
else
|
||||
echo "FAIL $PASS/$TOTAL passed, $FAIL failed:"
|
||||
echo "$ERRORS"
|
||||
fi
|
||||
[ $FAIL -eq 0 ]
|
||||
130
next/tests/flow_dispatch.sh
Executable file
130
next/tests/flow_dispatch.sh
Executable file
@@ -0,0 +1,130 @@
|
||||
#!/usr/bin/env bash
|
||||
# next/tests/flow_dispatch.sh — fed-sx triggers Phase 3.
|
||||
#
|
||||
# flow_dispatch bridges a matched trigger to a started flow — a native
|
||||
# flow_store:start (the engine is Erlang-on-SX too, no FFI). Confirms
|
||||
# guard/actor-scope gating, the audit triple, synchronous first-step
|
||||
# execution, suspend/resume of a started instance, a branch on an
|
||||
# activity field, and graceful handling of an unknown flow name.
|
||||
|
||||
set -uo pipefail
|
||||
cd "$(git rev-parse --show-toplevel)"
|
||||
|
||||
SX_SERVER="${SX_SERVER:-hosts/ocaml/_build/default/bin/sx_server.exe}"
|
||||
if [ ! -x "$SX_SERVER" ]; then
|
||||
SX_SERVER="/root/rose-ash/hosts/ocaml/_build/default/bin/sx_server.exe"
|
||||
fi
|
||||
if [ ! -x "$SX_SERVER" ]; then
|
||||
echo "ERROR: sx_server.exe not found." >&2
|
||||
exit 1
|
||||
fi
|
||||
|
||||
VERBOSE="${1:-}"
|
||||
PASS=0; FAIL=0; ERRORS=""
|
||||
TMPFILE=$(mktemp); trap "rm -f $TMPFILE" EXIT
|
||||
|
||||
# Activity (Create of a Note by alice), receiving actor-state, and a
|
||||
# couple of flows: `capture` echoes the activity's type out of the
|
||||
# flow's input env; `wait_flow` suspends then wraps the resumed value;
|
||||
# `cat_flow` branches on the inner object's :type.
|
||||
ACT='[{type, create}, {actor, alice}, {id, <<97,99,105,100>>}, {object, [{type, note}]}]'
|
||||
AS='[{actor_id, alice}]'
|
||||
CAP='flow_spec:flow_node(fun(In) -> {ok, A} = envelope:get_field(activity, In), {ok, T} = envelope:get_field(type, A), T end)'
|
||||
WAITF='flow_spec:sequence([flow:suspend(w), flow_spec:flow_node(fun(V) -> {got, V} end)])'
|
||||
CATF='flow_spec:branch(fun(In) -> {ok, A} = envelope:get_field(activity, In), {ok, O} = envelope:get_field(object, A), envelope:get_field(type, O) =:= {ok, note} end, flow_spec:flow_const(is_note), flow_spec:flow_const(not_note))'
|
||||
|
||||
cat > "$TMPFILE" <<EPOCHS
|
||||
(epoch 1)
|
||||
(load "lib/erlang/tokenizer.sx")
|
||||
(load "lib/erlang/parser.sx")
|
||||
(load "lib/erlang/parser-core.sx")
|
||||
(load "lib/erlang/parser-expr.sx")
|
||||
(load "lib/erlang/parser-module.sx")
|
||||
(load "lib/erlang/transpile.sx")
|
||||
(load "lib/erlang/runtime.sx")
|
||||
(load "lib/erlang/vm/dispatcher.sx")
|
||||
(epoch 2)
|
||||
(eval "(er-load-gen-server!)")
|
||||
(eval "(get (erlang-load-module (file-read \"next/kernel/envelope.erl\")) :name)")
|
||||
(eval "(get (erlang-load-module (file-read \"next/flow/flow.erl\")) :name)")
|
||||
(eval "(get (erlang-load-module (file-read \"next/flow/flow_spec.erl\")) :name)")
|
||||
(eval "(get (erlang-load-module (file-read \"next/flow/flow_store.erl\")) :name)")
|
||||
(eval "(get (erlang-load-module (file-read \"next/kernel/trigger_registry.erl\")) :name)")
|
||||
(epoch 3)
|
||||
(eval "(get (erlang-load-module (file-read \"next/kernel/flow_dispatch.erl\")) :name)")
|
||||
|
||||
;; ── guard / actor-scope gating ─────────────────────────────
|
||||
(epoch 10)
|
||||
(eval "(get (erlang-eval-ast \"flow_dispatch:guard_passes(trigger_registry:mk_spec(c, f, undefined, any), ${ACT}, ${AS})\") :name)")
|
||||
(epoch 11)
|
||||
(eval "(get (erlang-eval-ast \"flow_dispatch:guard_passes(trigger_registry:mk_spec(c, f, fun(_, _) -> false end, any), ${ACT}, ${AS}) =:= false\") :name)")
|
||||
(epoch 12)
|
||||
(eval "(get (erlang-eval-ast \"flow_dispatch:guard_passes(trigger_registry:mk_spec(c, f, fun(A, _) -> envelope:get_field(actor, A) =:= {ok, alice} end, any), ${ACT}, ${AS})\") :name)")
|
||||
(epoch 13)
|
||||
(eval "(get (erlang-eval-ast \"flow_dispatch:guard_passes(trigger_registry:mk_spec(c, f, undefined, alice), ${ACT}, ${AS})\") :name)")
|
||||
(epoch 14)
|
||||
(eval "(get (erlang-eval-ast \"flow_dispatch:guard_passes(trigger_registry:mk_spec(c, f, undefined, bob), ${ACT}, ${AS}) =:= false\") :name)")
|
||||
|
||||
;; ── start: audit triple + synchronous first step ───────────
|
||||
(epoch 20)
|
||||
(eval "(get (erlang-eval-ast \"flow_store:start_link(), flow_store:register_flow(capture, ${CAP}), flow_dispatch:start(trigger_registry:mk_spec(<<116,99>>, capture, undefined, any), ${ACT}, ${AS}, []) =:= {ok, 1, {<<97,99,105,100>>, <<116,99>>, 1}}\") :name)")
|
||||
(epoch 21)
|
||||
(eval "(get (erlang-eval-ast \"flow_store:start_link(), flow_store:register_flow(capture, ${CAP}), {ok, FlowId, _} = flow_dispatch:start(trigger_registry:mk_spec(<<116,99>>, capture, undefined, any), ${ACT}, ${AS}, []), flow_store:status(FlowId) =:= {ok, {done, create}}\") :name)")
|
||||
|
||||
;; ── unknown flow name -> {error, no_such_flow}, no crash ────
|
||||
(epoch 30)
|
||||
(eval "(get (erlang-eval-ast \"flow_store:start_link(), flow_dispatch:start(trigger_registry:mk_spec(<<116,99>>, ghostflow, undefined, any), ${ACT}, ${AS}, []) =:= {error, no_such_flow}\") :name)")
|
||||
|
||||
;; ── started instance suspends; resume completes ────────────
|
||||
(epoch 40)
|
||||
(eval "(get (erlang-eval-ast \"flow_store:start_link(), flow_store:register_flow(wait_flow, ${WAITF}), {ok, FlowId, _} = flow_dispatch:start(trigger_registry:mk_spec(<<116,99>>, wait_flow, undefined, any), ${ACT}, ${AS}, []), S1 = flow_store:status(FlowId), R = flow_store:resume(FlowId, 7), S1 =:= {ok, {suspended, w}} andalso R =:= {ok, {flow_done, {got, 7}}}\") :name)")
|
||||
|
||||
;; ── branch on an activity field (both branches) ────────────
|
||||
(epoch 50)
|
||||
(eval "(get (erlang-eval-ast \"flow_store:start_link(), flow_store:register_flow(cat_flow, ${CATF}), {ok, FlowId, _} = flow_dispatch:start(trigger_registry:mk_spec(<<116,99>>, cat_flow, undefined, any), ${ACT}, ${AS}, []), flow_store:status(FlowId) =:= {ok, {done, is_note}}\") :name)")
|
||||
(epoch 51)
|
||||
(eval "(get (erlang-eval-ast \"flow_store:start_link(), flow_store:register_flow(cat_flow, ${CATF}), {ok, FlowId, _} = flow_dispatch:start(trigger_registry:mk_spec(<<116,99>>, cat_flow, undefined, any), [{type, create}, {actor, alice}, {id, <<120>>}, {object, [{type, article}]}], ${AS}, []), flow_store:status(FlowId) =:= {ok, {done, not_note}}\") :name)")
|
||||
EPOCHS
|
||||
|
||||
OUTPUT=$(timeout 360 "$SX_SERVER" < "$TMPFILE" 2>/dev/null)
|
||||
|
||||
check() {
|
||||
local epoch="$1" desc="$2" expected="$3"
|
||||
local actual
|
||||
actual=$(echo "$OUTPUT" | awk -v e="$epoch" '
|
||||
$0 ~ "^\\(ok-len " e " " { getline; print; exit }
|
||||
$0 ~ "^\\(ok " e " " { print; exit }
|
||||
$0 ~ "^\\(error " e " " { print; exit }
|
||||
')
|
||||
[ -z "$actual" ] && actual="<no output for epoch $epoch>"
|
||||
if echo "$actual" | grep -qF -- "$expected"; then
|
||||
PASS=$((PASS+1))
|
||||
[ "$VERBOSE" = "-v" ] && echo " ok $desc"
|
||||
else
|
||||
FAIL=$((FAIL+1))
|
||||
ERRORS+=" FAIL [$desc] (epoch $epoch) expected: $expected | actual: $actual
|
||||
"
|
||||
fi
|
||||
}
|
||||
|
||||
check 3 "flow_dispatch module loaded" "flow_dispatch"
|
||||
check 10 "undefined guard + any scope pass" "true"
|
||||
check 11 "guard false -> no pass" "true"
|
||||
check 12 "guard true on activity field" "true"
|
||||
check 13 "actor-scope match passes" "true"
|
||||
check 14 "actor-scope mismatch fails" "true"
|
||||
check 20 "start returns audit triple" "true"
|
||||
check 21 "first step runs synchronously" "true"
|
||||
check 30 "unknown flow -> no_such_flow" "true"
|
||||
check 40 "started flow suspends + resumes" "true"
|
||||
check 50 "branch then-arm (is_note)" "true"
|
||||
check 51 "branch else-arm (not_note)" "true"
|
||||
|
||||
TOTAL=$((PASS+FAIL))
|
||||
if [ $FAIL -eq 0 ]; then
|
||||
echo "ok $PASS/$TOTAL next/tests/flow_dispatch.sh passed"
|
||||
else
|
||||
echo "FAIL $PASS/$TOTAL passed, $FAIL failed:"
|
||||
echo "$ERRORS"
|
||||
fi
|
||||
[ $FAIL -eq 0 ]
|
||||
@@ -48,6 +48,18 @@ cat > "$TMPFILE" <<'EPOCHS'
|
||||
(eval "(first (parse (file-read \"next/genesis/activity-types/endorse.sx\")))")
|
||||
(epoch 200)
|
||||
(eval "(get (apply dict (rest (parse (file-read \"next/genesis/activity-types/endorse.sx\")))) :name)")
|
||||
(epoch 201)
|
||||
(eval "(first (parse (file-read \"next/genesis/activity-types/define_type.sx\")))")
|
||||
(epoch 202)
|
||||
(eval "(get (apply dict (rest (parse (file-read \"next/genesis/activity-types/define_type.sx\")))) :name)")
|
||||
(epoch 203)
|
||||
(eval "(first (parse (file-read \"next/genesis/activity-types/subtype_of.sx\")))")
|
||||
(epoch 204)
|
||||
(eval "(get (apply dict (rest (parse (file-read \"next/genesis/activity-types/subtype_of.sx\")))) :name)")
|
||||
(epoch 205)
|
||||
(eval "(first (parse (file-read \"next/genesis/activity-types/define_trigger.sx\")))")
|
||||
(epoch 206)
|
||||
(eval "(get (apply dict (rest (parse (file-read \"next/genesis/activity-types/define_trigger.sx\")))) :name)")
|
||||
(epoch 19)
|
||||
(eval "(len (get (apply dict (rest (parse (file-read \"next/genesis/manifest.sx\")))) :activity-types))")
|
||||
(epoch 30)
|
||||
@@ -180,7 +192,13 @@ check 27 "announce.sx head form" "DefineActivity"
|
||||
check 28 "announce.sx name is Announce" "Announce"
|
||||
check 29 "endorse.sx head form" "DefineActivity"
|
||||
check 200 "endorse.sx name is Endorse" "Endorse"
|
||||
check 19 "manifest has 5 activity-types" "5"
|
||||
check 201 "define_type.sx head form" "DefineActivity"
|
||||
check 202 "define_type.sx name" "DefineType"
|
||||
check 203 "subtype_of.sx head form" "DefineActivity"
|
||||
check 204 "subtype_of.sx name" "SubtypeOf"
|
||||
check 205 "define_trigger.sx head form" "DefineActivity"
|
||||
check 206 "define_trigger.sx name" "DefineTrigger"
|
||||
check 19 "manifest has 8 activity-types" "8"
|
||||
check 30 "sx-artifact.sx head form" "DefineObject"
|
||||
check 31 "sx-artifact.sx name" "SXArtifact"
|
||||
check 32 "note.sx name" "Note"
|
||||
|
||||
154
next/tests/object_schema.sh
Executable file
154
next/tests/object_schema.sh
Executable file
@@ -0,0 +1,154 @@
|
||||
#!/usr/bin/env bash
|
||||
# next/tests/object_schema.sh — host-type federation Phase 4.
|
||||
#
|
||||
# pipeline:apply_object_schema/2 validates an inbound activity's inner
|
||||
# object against its declared refinement type. The type is resolved
|
||||
# TypeName -> TypeCid (Cfg type_index) -> TypeRecord
|
||||
# (peer_types:lookup_or_fetch, a local hit or a wire fetch), then the
|
||||
# record's refinement schema is applied to the object's :field_values.
|
||||
# Default strict_object_schema = false: an unresolvable type is let
|
||||
# through; opt-in strict rejects.
|
||||
#
|
||||
# Refinement schemas are either a 1-arity Erlang predicate (the
|
||||
# substrate stand-in, locally stored) or a term_codec-safe
|
||||
# {required, [Field,...]} constraint (so a wire-fetched record still
|
||||
# validates). Both are exercised here.
|
||||
|
||||
set -uo pipefail
|
||||
cd "$(git rev-parse --show-toplevel)"
|
||||
|
||||
SX_SERVER="${SX_SERVER:-hosts/ocaml/_build/default/bin/sx_server.exe}"
|
||||
if [ ! -x "$SX_SERVER" ]; then
|
||||
SX_SERVER="/root/rose-ash/hosts/ocaml/_build/default/bin/sx_server.exe"
|
||||
fi
|
||||
if [ ! -x "$SX_SERVER" ]; then
|
||||
echo "ERROR: sx_server.exe not found." >&2
|
||||
exit 1
|
||||
fi
|
||||
|
||||
VERBOSE="${1:-}"
|
||||
PASS=0; FAIL=0; ERRORS=""
|
||||
TMPFILE=$(mktemp); trap "rm -f $TMPFILE" EXIT
|
||||
|
||||
# Cid is the Post type's CID; TRdata carries a data-form refinement
|
||||
# (object must have a `title` field), TRfun the Erlang-predicate form.
|
||||
# ActValid's object has :title, ActFail's doesn't, ActNoType's object
|
||||
# declares no type, ActUnknown's type isn't in the index. PostName is
|
||||
# <<"Post">>, title "Hi" = <<72,105>>. Index maps name -> Cid.
|
||||
SETUP='Cid = <<98,97,102,121,80>>, PostName = <<80,111,115,116>>, TRdata = [{name, PostName}, {refinement_schema, {required, [title]}}], TRfun = [{name, PostName}, {refinement_schema, fun(FV) -> case FV of [{title, _} | _] -> true; _ -> false end end}], ObjValid = [{type, PostName}, {field_values, [{title, <<72,105>>}, {body, <<104,105>>}]}], ObjFail = [{type, PostName}, {field_values, [{body, <<104,105>>}]}], ActValid = [{type, create}, {actor, alice}, {object, ObjValid}], ActFail = [{type, create}, {actor, alice}, {object, ObjFail}], ActNoType = [{type, create}, {actor, alice}, {object, [{field_values, [{title, <<72,105>>}]}]}], ActUnknown = [{type, create}, {actor, alice}, {object, [{type, <<82,101,112,108,121>>}, {field_values, [{title, <<72,105>>}]}]}], Index = [{PostName, Cid}], FAIL = {error, {validation_failed, object_schema}},'
|
||||
|
||||
cat > "$TMPFILE" <<EPOCHS
|
||||
(epoch 1)
|
||||
(load "lib/erlang/tokenizer.sx")
|
||||
(load "lib/erlang/parser.sx")
|
||||
(load "lib/erlang/parser-core.sx")
|
||||
(load "lib/erlang/parser-expr.sx")
|
||||
(load "lib/erlang/parser-module.sx")
|
||||
(load "lib/erlang/transpile.sx")
|
||||
(load "lib/erlang/runtime.sx")
|
||||
(load "lib/erlang/vm/dispatcher.sx")
|
||||
(epoch 2)
|
||||
(eval "(er-load-gen-server!)")
|
||||
(epoch 3)
|
||||
(eval "(get (erlang-load-module (file-read \"next/kernel/envelope.erl\")) :name)")
|
||||
(epoch 4)
|
||||
(eval "(get (erlang-load-module (file-read \"next/kernel/term_codec.erl\")) :name)")
|
||||
(epoch 5)
|
||||
(eval "(get (erlang-load-module (file-read \"next/kernel/peer_types.erl\")) :name)")
|
||||
(epoch 6)
|
||||
(eval "(get (erlang-load-module (file-read \"next/kernel/pipeline.erl\")) :name)")
|
||||
|
||||
;; local registry match + valid object -> accepted
|
||||
(epoch 10)
|
||||
(eval "(get (erlang-eval-ast \"${SETUP} peer_types:start_link(), peer_types:put(Cid, TRdata), Cfg = [{peer_types, peer_types}, {type_index, Index}], pipeline:apply_object_schema(ActValid, Cfg) =:= ok\") :name)")
|
||||
;; local match + refinement-failing object -> rejected
|
||||
(epoch 11)
|
||||
(eval "(get (erlang-eval-ast \"${SETUP} peer_types:start_link(), peer_types:put(Cid, TRdata), Cfg = [{peer_types, peer_types}, {type_index, Index}], pipeline:apply_object_schema(ActFail, Cfg) =:= FAIL\") :name)")
|
||||
|
||||
;; type not cached, fetch succeeds -> validates against fetched record
|
||||
(epoch 12)
|
||||
(eval "(get (erlang-eval-ast \"${SETUP} peer_types:start_link(), Cfg = [{peer_types, peer_types}, {type_index, Index}, {type_fetch_fn, fun(_, _) -> {ok, term_codec:encode(TRdata)} end}], pipeline:apply_object_schema(ActValid, Cfg) =:= ok\") :name)")
|
||||
;; fetched record, failing object -> rejected
|
||||
(epoch 13)
|
||||
(eval "(get (erlang-eval-ast \"${SETUP} peer_types:start_link(), Cfg = [{peer_types, peer_types}, {type_index, Index}, {type_fetch_fn, fun(_, _) -> {ok, term_codec:encode(TRdata)} end}], pipeline:apply_object_schema(ActFail, Cfg) =:= FAIL\") :name)")
|
||||
|
||||
;; unknown type, fetch fails, strict not set -> accepted (skipped)
|
||||
(epoch 14)
|
||||
(eval "(get (erlang-eval-ast \"${SETUP} peer_types:start_link(), Cfg = [{peer_types, peer_types}, {type_index, Index}, {type_fetch_fn, fun(_, _) -> {error, http_404} end}], pipeline:apply_object_schema(ActValid, Cfg) =:= ok\") :name)")
|
||||
;; unknown type, fetch fails, strict set -> rejected
|
||||
(epoch 15)
|
||||
(eval "(get (erlang-eval-ast \"${SETUP} peer_types:start_link(), Cfg = [{peer_types, peer_types}, {type_index, Index}, {type_fetch_fn, fun(_, _) -> {error, http_404} end}, {strict_object_schema, true}], pipeline:apply_object_schema(ActValid, Cfg) =:= FAIL\") :name)")
|
||||
;; no peer_types cfg at all, non-strict -> accepted (skipped)
|
||||
(epoch 16)
|
||||
(eval "(get (erlang-eval-ast \"${SETUP} Cfg = [{type_index, Index}], pipeline:apply_object_schema(ActValid, Cfg) =:= ok\") :name)")
|
||||
;; no peer_types cfg, strict -> rejected
|
||||
(epoch 17)
|
||||
(eval "(get (erlang-eval-ast \"${SETUP} Cfg = [{type_index, Index}, {strict_object_schema, true}], pipeline:apply_object_schema(ActValid, Cfg) =:= FAIL\") :name)")
|
||||
|
||||
;; object without inner {type, _} -> skipped (accepted)
|
||||
(epoch 18)
|
||||
(eval "(get (erlang-eval-ast \"${SETUP} peer_types:start_link(), peer_types:put(Cid, TRdata), Cfg = [{peer_types, peer_types}, {type_index, Index}], pipeline:apply_object_schema(ActNoType, Cfg) =:= ok\") :name)")
|
||||
;; object type not in the local index -> skipped (open-world)
|
||||
(epoch 19)
|
||||
(eval "(get (erlang-eval-ast \"${SETUP} peer_types:start_link(), peer_types:put(Cid, TRdata), Cfg = [{peer_types, peer_types}, {type_index, Index}], pipeline:apply_object_schema(ActUnknown, Cfg) =:= ok\") :name)")
|
||||
|
||||
;; Erlang-predicate refinement schema: valid -> ok, failing -> reject
|
||||
(epoch 20)
|
||||
(eval "(get (erlang-eval-ast \"${SETUP} peer_types:start_link(), peer_types:put(Cid, TRfun), Cfg = [{peer_types, peer_types}, {type_index, Index}], pipeline:apply_object_schema(ActValid, Cfg) =:= ok\") :name)")
|
||||
(epoch 21)
|
||||
(eval "(get (erlang-eval-ast \"${SETUP} peer_types:start_link(), peer_types:put(Cid, TRfun), Cfg = [{peer_types, peer_types}, {type_index, Index}], pipeline:apply_object_schema(ActFail, Cfg) =:= FAIL\") :name)")
|
||||
|
||||
;; type known but record carries no refinement schema -> accepted
|
||||
(epoch 22)
|
||||
(eval "(get (erlang-eval-ast \"${SETUP} peer_types:start_link(), peer_types:put(Cid, [{name, PostName}]), Cfg = [{peer_types, peer_types}, {type_index, Index}], pipeline:apply_object_schema(ActFail, Cfg) =:= ok\") :name)")
|
||||
|
||||
;; stage_object_schema/1 yields a 1-arity stage usable by run_stages
|
||||
(epoch 23)
|
||||
(eval "(get (erlang-eval-ast \"${SETUP} peer_types:start_link(), peer_types:put(Cid, TRdata), Cfg = [{peer_types, peer_types}, {type_index, Index}], Stage = pipeline:stage_object_schema(Cfg), is_function(Stage, 1) andalso pipeline:run_stages(ActValid, [Stage]) =:= ok andalso pipeline:run_stages(ActFail, [Stage]) =:= FAIL\") :name)")
|
||||
EPOCHS
|
||||
|
||||
OUTPUT=$(timeout 300 "$SX_SERVER" < "$TMPFILE" 2>/dev/null)
|
||||
|
||||
check() {
|
||||
local epoch="$1" desc="$2" expected="$3"
|
||||
local actual
|
||||
actual=$(echo "$OUTPUT" | awk -v e="$epoch" '
|
||||
$0 ~ "^\\(ok-len " e " " { getline; print; exit }
|
||||
$0 ~ "^\\(ok " e " " { print; exit }
|
||||
$0 ~ "^\\(error " e " " { print; exit }
|
||||
')
|
||||
[ -z "$actual" ] && actual="<no output for epoch $epoch>"
|
||||
if echo "$actual" | grep -qF -- "$expected"; then
|
||||
PASS=$((PASS+1))
|
||||
[ "$VERBOSE" = "-v" ] && echo " ok $desc"
|
||||
else
|
||||
FAIL=$((FAIL+1))
|
||||
ERRORS+=" FAIL [$desc] (epoch $epoch) expected: $expected | actual: $actual
|
||||
"
|
||||
fi
|
||||
}
|
||||
|
||||
check 6 "pipeline module loaded" "pipeline"
|
||||
check 10 "local match + valid -> accepted" "true"
|
||||
check 11 "local match + failing -> rejected" "true"
|
||||
check 12 "fetch ok -> validates fetched record" "true"
|
||||
check 13 "fetched record + failing -> rejected" "true"
|
||||
check 14 "fetch fail, non-strict -> accepted" "true"
|
||||
check 15 "fetch fail, strict -> rejected" "true"
|
||||
check 16 "no peer_types, non-strict -> accepted" "true"
|
||||
check 17 "no peer_types, strict -> rejected" "true"
|
||||
check 18 "object without type -> skipped" "true"
|
||||
check 19 "type not in index -> skipped" "true"
|
||||
check 20 "fun schema valid -> accepted" "true"
|
||||
check 21 "fun schema failing -> rejected" "true"
|
||||
check 22 "no refinement schema -> accepted" "true"
|
||||
check 23 "stage_object_schema composes" "true"
|
||||
|
||||
TOTAL=$((PASS+FAIL))
|
||||
if [ $FAIL -eq 0 ]; then
|
||||
echo "ok $PASS/$TOTAL next/tests/object_schema.sh passed"
|
||||
else
|
||||
echo "FAIL $PASS/$TOTAL passed, $FAIL failed:"
|
||||
echo "$ERRORS"
|
||||
fi
|
||||
[ $FAIL -eq 0 ]
|
||||
155
next/tests/peer_types.sh
Executable file
155
next/tests/peer_types.sh
Executable file
@@ -0,0 +1,155 @@
|
||||
#!/usr/bin/env bash
|
||||
# next/tests/peer_types.sh — host-type federation Phase 2 acceptance.
|
||||
#
|
||||
# Receiver-side peer-types cache (next/kernel/peer_types.erl), a mirror
|
||||
# of peer_actors keyed by type CID. Tracks {TypeCidBytes, TypeRecord}
|
||||
# pairs so the object-schema validation stage can vet inbound objects
|
||||
# against a fetched-once refinement type. lookup_or_fetch pulls a
|
||||
# Cfg-supplied type_fetch_fn on a miss, decodes the returned wire bytes
|
||||
# via term_codec, and caches the TypeRecord.
|
||||
|
||||
set -uo pipefail
|
||||
cd "$(git rev-parse --show-toplevel)"
|
||||
|
||||
SX_SERVER="${SX_SERVER:-hosts/ocaml/_build/default/bin/sx_server.exe}"
|
||||
if [ ! -x "$SX_SERVER" ]; then
|
||||
SX_SERVER="/root/rose-ash/hosts/ocaml/_build/default/bin/sx_server.exe"
|
||||
fi
|
||||
if [ ! -x "$SX_SERVER" ]; then
|
||||
echo "ERROR: sx_server.exe not found." >&2
|
||||
exit 1
|
||||
fi
|
||||
|
||||
VERBOSE="${1:-}"
|
||||
PASS=0; FAIL=0; ERRORS=""
|
||||
TMPFILE=$(mktemp); trap "rm -f $TMPFILE" EXIT
|
||||
|
||||
# TR1/TR2 are TypeRecords (the DefineType :object payloads). Doc1 is
|
||||
# TR1's on-wire form (term_codec). FetchOk serves Doc1 for Cid1;
|
||||
# FetchBad returns undecodable bytes. CfgOk/CfgBad/CfgNone vary the
|
||||
# type_fetch_fn slot.
|
||||
SETUP='Cid1 = <<98,97,102,121,49>>, Cid2 = <<98,97,102,121,50>>, TR1 = [{name, <<80,111,115,116>>}, {instance_type, <<78,111,116,101>>}], TR2 = [{name, <<82,101,112,108,121>>}], Doc1 = term_codec:encode(TR1), FetchOk = fun(C, _) -> case C =:= Cid1 of true -> {ok, Doc1}; false -> {error, not_found} end end, FetchBad = fun(_, _) -> {ok, <<255>>} end, CfgOk = [{type_fetch_fn, FetchOk}], CfgBad = [{type_fetch_fn, FetchBad}], CfgNone = [],'
|
||||
|
||||
cat > "$TMPFILE" <<EPOCHS
|
||||
(epoch 1)
|
||||
(load "lib/erlang/tokenizer.sx")
|
||||
(load "lib/erlang/parser.sx")
|
||||
(load "lib/erlang/parser-core.sx")
|
||||
(load "lib/erlang/parser-expr.sx")
|
||||
(load "lib/erlang/parser-module.sx")
|
||||
(load "lib/erlang/transpile.sx")
|
||||
(load "lib/erlang/runtime.sx")
|
||||
(load "lib/erlang/vm/dispatcher.sx")
|
||||
(epoch 2)
|
||||
(eval "(er-load-gen-server!)")
|
||||
(epoch 3)
|
||||
(eval "(get (erlang-load-module (file-read \"next/kernel/term_codec.erl\")) :name)")
|
||||
(epoch 4)
|
||||
(eval "(get (erlang-load-module (file-read \"next/kernel/peer_types.erl\")) :name)")
|
||||
|
||||
;; ── pure API ───────────────────────────────────────────────
|
||||
;; new/0 -> []
|
||||
(epoch 10)
|
||||
(eval "(get (erlang-eval-ast \"peer_types:new() =:= []\") :name)")
|
||||
;; lookup miss -> not_found
|
||||
(epoch 11)
|
||||
(eval "(get (erlang-eval-ast \"peer_types:lookup(<<1>>, peer_types:new()) =:= not_found\") :name)")
|
||||
;; store + lookup round-trip
|
||||
(epoch 12)
|
||||
(eval "(get (erlang-eval-ast \"${SETUP} S = peer_types:store(Cid1, TR1, peer_types:new()), peer_types:lookup(Cid1, S) =:= {ok, TR1}\") :name)")
|
||||
;; types/1 lists CIDs in insertion order
|
||||
(epoch 13)
|
||||
(eval "(get (erlang-eval-ast \"${SETUP} S = peer_types:store(Cid2, TR2, peer_types:store(Cid1, TR1, peer_types:new())), peer_types:types(S) =:= [Cid1, Cid2]\") :name)")
|
||||
;; evict removes the entry
|
||||
(epoch 14)
|
||||
(eval "(get (erlang-eval-ast \"${SETUP} S = peer_types:evict(Cid1, peer_types:store(Cid1, TR1, peer_types:new())), peer_types:lookup(Cid1, S) =:= not_found\") :name)")
|
||||
|
||||
;; ── lookup_or_fetch (pure) ─────────────────────────────────
|
||||
;; miss -> fetch via Cfg.fn, decode bytes, cache TR
|
||||
(epoch 20)
|
||||
(eval "(get (erlang-eval-ast \"${SETUP} case peer_types:lookup_or_fetch(Cid1, CfgOk, peer_types:new()) of {ok, TR1, [{Cid1, TR1}]} -> ok; _ -> bad end\") :name)")
|
||||
;; hit -> returns cached without calling fetch
|
||||
(epoch 21)
|
||||
(eval "(get (erlang-eval-ast \"${SETUP} S = peer_types:store(Cid1, TR1, peer_types:new()), case peer_types:lookup_or_fetch(Cid1, CfgBad, S) of {ok, TR1, S} -> ok; _ -> bad end\") :name)")
|
||||
;; no type_fetch_fn -> {error, no_fetch_fn}, cache untouched
|
||||
(epoch 22)
|
||||
(eval "(get (erlang-eval-ast \"${SETUP} case peer_types:lookup_or_fetch(Cid1, CfgNone, peer_types:new()) of {error, no_fetch_fn, []} -> ok; _ -> bad end\") :name)")
|
||||
;; fetch error does NOT poison the cache
|
||||
(epoch 23)
|
||||
(eval "(get (erlang-eval-ast \"${SETUP} BadCfg = [{type_fetch_fn, fun(_, _) -> {error, http_404} end}], case peer_types:lookup_or_fetch(Cid1, BadCfg, peer_types:new()) of {error, http_404, []} -> ok; _ -> bad end\") :name)")
|
||||
;; undecodable bytes -> {error, bad_type_doc}
|
||||
(epoch 24)
|
||||
(eval "(get (erlang-eval-ast \"${SETUP} case peer_types:lookup_or_fetch(Cid1, CfgBad, peer_types:new()) of {error, bad_type_doc, []} -> ok; _ -> bad end\") :name)")
|
||||
|
||||
;; ── gen_server API ─────────────────────────────────────────
|
||||
;; start_link + put + lookup round-trip
|
||||
(epoch 30)
|
||||
(eval "(get (erlang-eval-ast \"${SETUP} peer_types:start_link(), peer_types:put(Cid1, TR1), peer_types:lookup(Cid1) =:= {ok, TR1}\") :name)")
|
||||
;; lookup miss -> not_found
|
||||
(epoch 31)
|
||||
(eval "(get (erlang-eval-ast \"peer_types:start_link(), peer_types:lookup(<<9>>) =:= not_found\") :name)")
|
||||
;; state_for is an alias of lookup
|
||||
(epoch 32)
|
||||
(eval "(get (erlang-eval-ast \"${SETUP} peer_types:start_link(), peer_types:put(Cid1, TR1), peer_types:state_for(Cid1) =:= {ok, TR1}\") :name)")
|
||||
;; known_types lists stored CIDs
|
||||
(epoch 33)
|
||||
(eval "(get (erlang-eval-ast \"${SETUP} peer_types:start_link(), peer_types:put(Cid1, TR1), peer_types:put(Cid2, TR2), peer_types:known_types() =:= [Cid1, Cid2]\") :name)")
|
||||
;; lookup_or_fetch miss fetches + caches
|
||||
(epoch 34)
|
||||
(eval "(get (erlang-eval-ast \"${SETUP} peer_types:start_link(), R = peer_types:lookup_or_fetch(Cid1, CfgOk), R =:= {ok, TR1} andalso peer_types:known_types() =:= [Cid1]\") :name)")
|
||||
;; lookup_or_fetch with no fn -> {error, no_fetch_fn}, pristine
|
||||
(epoch 35)
|
||||
(eval "(get (erlang-eval-ast \"${SETUP} peer_types:start_link(), R = peer_types:lookup_or_fetch(Cid1, CfgNone), R =:= {error, no_fetch_fn} andalso peer_types:known_types() =:= []\") :name)")
|
||||
;; start_link/1 pre-populates the cache
|
||||
(epoch 36)
|
||||
(eval "(get (erlang-eval-ast \"${SETUP} peer_types:start_link([{Cid1, TR1}]), peer_types:lookup(Cid1) =:= {ok, TR1}\") :name)")
|
||||
EPOCHS
|
||||
|
||||
OUTPUT=$(timeout 300 "$SX_SERVER" < "$TMPFILE" 2>/dev/null)
|
||||
|
||||
check() {
|
||||
local epoch="$1" desc="$2" expected="$3"
|
||||
local actual
|
||||
actual=$(echo "$OUTPUT" | awk -v e="$epoch" '
|
||||
$0 ~ "^\\(ok-len " e " " { getline; print; exit }
|
||||
$0 ~ "^\\(ok " e " " { print; exit }
|
||||
$0 ~ "^\\(error " e " " { print; exit }
|
||||
')
|
||||
[ -z "$actual" ] && actual="<no output for epoch $epoch>"
|
||||
if echo "$actual" | grep -qF -- "$expected"; then
|
||||
PASS=$((PASS+1))
|
||||
[ "$VERBOSE" = "-v" ] && echo " ok $desc"
|
||||
else
|
||||
FAIL=$((FAIL+1))
|
||||
ERRORS+=" FAIL [$desc] (epoch $epoch) expected: $expected | actual: $actual
|
||||
"
|
||||
fi
|
||||
}
|
||||
|
||||
check 4 "peer_types module loaded" "peer_types"
|
||||
check 10 "new/0 -> []" "true"
|
||||
check 11 "lookup miss -> not_found" "true"
|
||||
check 12 "store + lookup round-trip" "true"
|
||||
check 13 "types/1 lists in insertion order" "true"
|
||||
check 14 "evict removes entry" "true"
|
||||
check 20 "lookup_or_fetch miss fetches" "ok"
|
||||
check 21 "lookup_or_fetch hit skips fetch" "ok"
|
||||
check 22 "no fetch_fn -> no_fetch_fn" "ok"
|
||||
check 23 "fetch error doesn't poison" "ok"
|
||||
check 24 "undecodable bytes -> bad_type_doc" "ok"
|
||||
check 30 "gen_server put + lookup" "true"
|
||||
check 31 "gen_server lookup miss" "true"
|
||||
check 32 "gen_server state_for alias" "true"
|
||||
check 33 "gen_server known_types lists" "true"
|
||||
check 34 "gen_server fetch + cache" "true"
|
||||
check 35 "gen_server no fn -> pristine" "true"
|
||||
check 36 "start_link/1 pre-populates" "true"
|
||||
|
||||
TOTAL=$((PASS+FAIL))
|
||||
if [ $FAIL -eq 0 ]; then
|
||||
echo "ok $PASS/$TOTAL next/tests/peer_types.sh passed"
|
||||
else
|
||||
echo "FAIL $PASS/$TOTAL passed, $FAIL failed:"
|
||||
echo "$ERRORS"
|
||||
fi
|
||||
[ $FAIL -eq 0 ]
|
||||
140
next/tests/peer_types_route.sh
Executable file
140
next/tests/peer_types_route.sh
Executable file
@@ -0,0 +1,140 @@
|
||||
#!/usr/bin/env bash
|
||||
# next/tests/peer_types_route.sh — host-type federation Phase 3.
|
||||
#
|
||||
# Server side of the type-doc wire: http_server serves
|
||||
# GET /types/<cid> Accept: application/vnd.fed-sx.type-doc
|
||||
# as the term_codec-encoded TypeRecord pulled from the peer_types
|
||||
# cache; 404 if the cid isn't cached. Exercised via http_server:route
|
||||
# in-process (the established pattern — see http_actors.sh) so the
|
||||
# route resolution + content negotiation are tested without a live
|
||||
# socket. The peer_types gen_server holds the cache across epochs.
|
||||
|
||||
set -uo pipefail
|
||||
cd "$(git rev-parse --show-toplevel)"
|
||||
|
||||
SX_SERVER="${SX_SERVER:-hosts/ocaml/_build/default/bin/sx_server.exe}"
|
||||
if [ ! -x "$SX_SERVER" ]; then
|
||||
SX_SERVER="/root/rose-ash/hosts/ocaml/_build/default/bin/sx_server.exe"
|
||||
fi
|
||||
if [ ! -x "$SX_SERVER" ]; then
|
||||
echo "ERROR: sx_server.exe not found." >&2
|
||||
exit 1
|
||||
fi
|
||||
|
||||
VERBOSE="${1:-}"
|
||||
PASS=0; FAIL=0; ERRORS=""
|
||||
TMPFILE=$(mktemp); trap "rm -f $TMPFILE" EXIT
|
||||
|
||||
# TR is the served TypeRecord, Cid its key. AccV is the type-doc
|
||||
# Accept header value, CT the content-type key. Cfg opts the route
|
||||
# into the peer_types cache. ReqHit / ReqMiss / ReqEmpty / ReqPost
|
||||
# vary the request line.
|
||||
SETUP='TR = [{name, <<80,111,115,116>>}, {instance_type, <<78,111,116,101>>}], Cid = <<98,97,102,121,49>>, peer_types:start_link(), peer_types:put(Cid, TR), AcK = <<97,99,99,101,112,116>>, AcV = <<97,112,112,108,105,99,97,116,105,111,110,47,118,110,100,46,102,101,100,45,115,120,46,116,121,112,101,45,100,111,99>>, Hs = [{AcK, AcV}], Cfg = [{peer_types, peer_types}],'
|
||||
|
||||
cat > "$TMPFILE" <<EPOCHS
|
||||
(epoch 1)
|
||||
(load "lib/erlang/tokenizer.sx")
|
||||
(load "lib/erlang/parser.sx")
|
||||
(load "lib/erlang/parser-core.sx")
|
||||
(load "lib/erlang/parser-expr.sx")
|
||||
(load "lib/erlang/parser-module.sx")
|
||||
(load "lib/erlang/transpile.sx")
|
||||
(load "lib/erlang/runtime.sx")
|
||||
(load "lib/erlang/vm/dispatcher.sx")
|
||||
(epoch 2)
|
||||
(eval "(er-load-gen-server!)")
|
||||
(epoch 3)
|
||||
(eval "(get (erlang-load-module (file-read \"next/kernel/envelope.erl\")) :name)")
|
||||
(epoch 4)
|
||||
(eval "(get (erlang-load-module (file-read \"next/kernel/term_codec.erl\")) :name)")
|
||||
(epoch 5)
|
||||
(eval "(get (erlang-load-module (file-read \"next/kernel/peer_types.erl\")) :name)")
|
||||
(epoch 6)
|
||||
(eval "(get (erlang-load-module (file-read \"next/kernel/http_server.erl\")) :name)")
|
||||
|
||||
;; ── negotiation + prefix primitives ────────────────────────
|
||||
;; Accept: type-doc negotiates to the type_doc format atom
|
||||
(epoch 10)
|
||||
(eval "(get (erlang-eval-ast \"http_server:accept_format(<<97,112,112,108,105,99,97,116,105,111,110,47,118,110,100,46,102,101,100,45,115,120,46,116,121,112,101,45,100,111,99>>) =:= type_doc\") :name)")
|
||||
;; type_doc content type is 31 bytes
|
||||
(epoch 11)
|
||||
(eval "(get (erlang-eval-ast \"byte_size(http_server:content_type_for(type_doc)) =:= 31\") :name)")
|
||||
;; types_prefix is "/types/" — 7 bytes
|
||||
(epoch 12)
|
||||
(eval "(get (erlang-eval-ast \"byte_size(http_server:types_prefix()) =:= 7\") :name)")
|
||||
|
||||
;; ── GET /types/<cid> ───────────────────────────────────────
|
||||
;; cache hit -> 200
|
||||
(epoch 20)
|
||||
(eval "(get (erlang-eval-ast \"${SETUP} Req = [{method, <<71,69,84>>}, {path, <<47,116,121,112,101,115,47,98,97,102,121,49>>}, {headers, Hs}], R = http_server:route(Req, Cfg), {ok, S} = envelope:get_field(status, R), S =:= 200\") :name)")
|
||||
;; body decodes back to the stored TypeRecord
|
||||
(epoch 21)
|
||||
(eval "(get (erlang-eval-ast \"${SETUP} Req = [{method, <<71,69,84>>}, {path, <<47,116,121,112,101,115,47,98,97,102,121,49>>}, {headers, Hs}], R = http_server:route(Req, Cfg), {ok, B} = envelope:get_field(body, R), {ok, DTR, _} = term_codec:decode(B), DTR =:= TR\") :name)")
|
||||
;; response carries the type-doc content type
|
||||
(epoch 22)
|
||||
(eval "(get (erlang-eval-ast \"${SETUP} Req = [{method, <<71,69,84>>}, {path, <<47,116,121,112,101,115,47,98,97,102,121,49>>}, {headers, Hs}], R = http_server:route(Req, Cfg), {ok, Hdrs} = envelope:get_field(headers, R), {_CTK, CTV} = hd(Hdrs), CTV =:= http_server:content_type_for(type_doc)\") :name)")
|
||||
;; type_doc_response_for/2 direct: known cid -> 200
|
||||
(epoch 23)
|
||||
(eval "(get (erlang-eval-ast \"${SETUP} R = http_server:type_doc_response_for(Cid, Cfg), {ok, S} = envelope:get_field(status, R), S =:= 200\") :name)")
|
||||
|
||||
;; ── misses + wrong method ──────────────────────────────────
|
||||
;; unknown cid -> 404
|
||||
(epoch 30)
|
||||
(eval "(get (erlang-eval-ast \"${SETUP} Req = [{method, <<71,69,84>>}, {path, <<47,116,121,112,101,115,47,122,122,122>>}, {headers, Hs}], R = http_server:route(Req, Cfg), {ok, S} = envelope:get_field(status, R), S =:= 404\") :name)")
|
||||
;; empty cid (GET /types/) -> 404
|
||||
(epoch 31)
|
||||
(eval "(get (erlang-eval-ast \"${SETUP} Req = [{method, <<71,69,84>>}, {path, <<47,116,121,112,101,115,47>>}, {headers, Hs}], R = http_server:route(Req, Cfg), {ok, S} = envelope:get_field(status, R), S =:= 404\") :name)")
|
||||
;; no peer_types cfg -> 404 even for a known cid
|
||||
(epoch 32)
|
||||
(eval "(get (erlang-eval-ast \"${SETUP} Req = [{method, <<71,69,84>>}, {path, <<47,116,121,112,101,115,47,98,97,102,121,49>>}, {headers, Hs}], R = http_server:route(Req, []), {ok, S} = envelope:get_field(status, R), S =:= 404\") :name)")
|
||||
;; POST /types/<cid> -> 404 (only GET serves type docs)
|
||||
(epoch 33)
|
||||
(eval "(get (erlang-eval-ast \"${SETUP} Req = [{method, <<80,79,83,84>>}, {path, <<47,116,121,112,101,115,47,98,97,102,121,49>>}, {headers, Hs}], R = http_server:route(Req, Cfg), {ok, S} = envelope:get_field(status, R), S =:= 404\") :name)")
|
||||
;; existing routes intact: GET / still 200
|
||||
(epoch 34)
|
||||
(eval "(get (erlang-eval-ast \"${SETUP} Req = [{method, <<71,69,84>>}, {path, <<47>>}], R = http_server:route(Req, Cfg), {ok, S} = envelope:get_field(status, R), S =:= 200\") :name)")
|
||||
EPOCHS
|
||||
|
||||
OUTPUT=$(timeout 300 "$SX_SERVER" < "$TMPFILE" 2>/dev/null)
|
||||
|
||||
check() {
|
||||
local epoch="$1" desc="$2" expected="$3"
|
||||
local actual
|
||||
actual=$(echo "$OUTPUT" | awk -v e="$epoch" '
|
||||
$0 ~ "^\\(ok-len " e " " { getline; print; exit }
|
||||
$0 ~ "^\\(ok " e " " { print; exit }
|
||||
$0 ~ "^\\(error " e " " { print; exit }
|
||||
')
|
||||
[ -z "$actual" ] && actual="<no output for epoch $epoch>"
|
||||
if echo "$actual" | grep -qF -- "$expected"; then
|
||||
PASS=$((PASS+1))
|
||||
[ "$VERBOSE" = "-v" ] && echo " ok $desc"
|
||||
else
|
||||
FAIL=$((FAIL+1))
|
||||
ERRORS+=" FAIL [$desc] (epoch $epoch) expected: $expected | actual: $actual
|
||||
"
|
||||
fi
|
||||
}
|
||||
|
||||
check 6 "http_server module loaded" "http_server"
|
||||
check 10 "Accept type-doc -> type_doc" "true"
|
||||
check 11 "type_doc content type = 31 bytes" "true"
|
||||
check 12 "types_prefix = 7 bytes" "true"
|
||||
check 20 "GET /types/<cid> hit -> 200" "true"
|
||||
check 21 "body decodes to TypeRecord" "true"
|
||||
check 22 "response is type-doc content type" "true"
|
||||
check 23 "type_doc_response_for hit -> 200" "true"
|
||||
check 30 "unknown cid -> 404" "true"
|
||||
check 31 "empty cid -> 404" "true"
|
||||
check 32 "no peer_types cfg -> 404" "true"
|
||||
check 33 "POST /types/<cid> -> 404" "true"
|
||||
check 34 "existing GET / route intact" "true"
|
||||
|
||||
TOTAL=$((PASS+FAIL))
|
||||
if [ $FAIL -eq 0 ]; then
|
||||
echo "ok $PASS/$TOTAL next/tests/peer_types_route.sh passed"
|
||||
else
|
||||
echo "FAIL $PASS/$TOTAL passed, $FAIL failed:"
|
||||
echo "$ERRORS"
|
||||
fi
|
||||
[ $FAIL -eq 0 ]
|
||||
129
next/tests/pipeline_triggers.sh
Executable file
129
next/tests/pipeline_triggers.sh
Executable file
@@ -0,0 +1,129 @@
|
||||
#!/usr/bin/env bash
|
||||
# next/tests/pipeline_triggers.sh — fed-sx triggers Phase 2.
|
||||
#
|
||||
# pipeline:apply_triggers/3 is the post-append fan-out: a successfully
|
||||
# appended activity has its type looked up in the trigger registry, and
|
||||
# each surviving spec (guard + actor-scope pass, not already fired) is
|
||||
# dispatched to a durable flow. Confirms lookup -> dispatch, no-match,
|
||||
# guard rejection, {activity,trigger}-cid dedup, multi-bind, graceful
|
||||
# handling of an unknown flow and a crashing flow, and the cfg gate.
|
||||
|
||||
set -uo pipefail
|
||||
cd "$(git rev-parse --show-toplevel)"
|
||||
|
||||
SX_SERVER="${SX_SERVER:-hosts/ocaml/_build/default/bin/sx_server.exe}"
|
||||
if [ ! -x "$SX_SERVER" ]; then
|
||||
SX_SERVER="/root/rose-ash/hosts/ocaml/_build/default/bin/sx_server.exe"
|
||||
fi
|
||||
if [ ! -x "$SX_SERVER" ]; then
|
||||
echo "ERROR: sx_server.exe not found." >&2
|
||||
exit 1
|
||||
fi
|
||||
|
||||
VERBOSE="${1:-}"
|
||||
PASS=0; FAIL=0; ERRORS=""
|
||||
TMPFILE=$(mktemp); trap "rm -f $TMPFILE" EXIT
|
||||
|
||||
ACT='[{type, create}, {actor, alice}, {id, <<97,99,105,100>>}, {object, [{type, note}]}]'
|
||||
AS='[{actor_id, alice}]'
|
||||
CFG='[{trigger_registry, trigger_registry}]'
|
||||
DONEF='flow_spec:flow_const(ran)'
|
||||
BOOMF='flow_spec:flow_node(fun(_) -> error(kaboom) end)'
|
||||
|
||||
cat > "$TMPFILE" <<EPOCHS
|
||||
(epoch 1)
|
||||
(load "lib/erlang/tokenizer.sx")
|
||||
(load "lib/erlang/parser.sx")
|
||||
(load "lib/erlang/parser-core.sx")
|
||||
(load "lib/erlang/parser-expr.sx")
|
||||
(load "lib/erlang/parser-module.sx")
|
||||
(load "lib/erlang/transpile.sx")
|
||||
(load "lib/erlang/runtime.sx")
|
||||
(load "lib/erlang/vm/dispatcher.sx")
|
||||
(epoch 2)
|
||||
(eval "(er-load-gen-server!)")
|
||||
(eval "(get (erlang-load-module (file-read \"next/kernel/envelope.erl\")) :name)")
|
||||
(eval "(get (erlang-load-module (file-read \"next/flow/flow.erl\")) :name)")
|
||||
(eval "(get (erlang-load-module (file-read \"next/flow/flow_spec.erl\")) :name)")
|
||||
(eval "(get (erlang-load-module (file-read \"next/flow/flow_store.erl\")) :name)")
|
||||
(eval "(get (erlang-load-module (file-read \"next/kernel/trigger_registry.erl\")) :name)")
|
||||
(eval "(get (erlang-load-module (file-read \"next/kernel/flow_dispatch.erl\")) :name)")
|
||||
(epoch 3)
|
||||
(eval "(get (erlang-load-module (file-read \"next/kernel/pipeline.erl\")) :name)")
|
||||
|
||||
;; ── lookup -> dispatch ─────────────────────────────────────
|
||||
(epoch 10)
|
||||
(eval "(get (erlang-eval-ast \"trigger_registry:start_link(), trigger_registry:add(create, trigger_registry:mk_spec(<<116,99>>, ranflow, undefined, any)), flow_store:start_link(), flow_store:register_flow(ranflow, ${DONEF}), pipeline:apply_triggers(${ACT}, ${AS}, ${CFG}) =:= {ok, [{<<97,99,105,100>>, <<116,99>>, {ok, 1}}]}\") :name)")
|
||||
;; the dispatched flow really ran (instance recorded done)
|
||||
(epoch 11)
|
||||
(eval "(get (erlang-eval-ast \"trigger_registry:start_link(), trigger_registry:add(create, trigger_registry:mk_spec(<<116,99>>, ranflow, undefined, any)), flow_store:start_link(), flow_store:register_flow(ranflow, ${DONEF}), pipeline:apply_triggers(${ACT}, ${AS}, ${CFG}), flow_store:status(1) =:= {ok, {done, ran}}\") :name)")
|
||||
|
||||
;; ── no matching trigger -> no dispatch ─────────────────────
|
||||
(epoch 20)
|
||||
(eval "(get (erlang-eval-ast \"trigger_registry:start_link(), flow_store:start_link(), pipeline:apply_triggers(${ACT}, ${AS}, ${CFG}) =:= {ok, []}\") :name)")
|
||||
|
||||
;; ── guard returns false -> no dispatch ─────────────────────
|
||||
(epoch 30)
|
||||
(eval "(get (erlang-eval-ast \"trigger_registry:start_link(), trigger_registry:add(create, trigger_registry:mk_spec(<<116,99>>, ranflow, fun(_, _) -> false end, any)), flow_store:start_link(), flow_store:register_flow(ranflow, ${DONEF}), pipeline:apply_triggers(${ACT}, ${AS}, ${CFG}) =:= {ok, []}\") :name)")
|
||||
|
||||
;; ── dedup: already-fired {activity,trigger} pair -> skipped ─
|
||||
(epoch 40)
|
||||
(eval "(get (erlang-eval-ast \"trigger_registry:start_link(), trigger_registry:add(create, trigger_registry:mk_spec(<<116,99>>, ranflow, undefined, any)), flow_store:start_link(), flow_store:register_flow(ranflow, ${DONEF}), pipeline:apply_triggers(${ACT}, [{actor_id, alice}, {triggers_fired, [{<<97,99,105,100>>, <<116,99>>}]}], ${CFG}) =:= {ok, []}\") :name)")
|
||||
|
||||
;; ── multiple triggers for the same type -> each dispatched ─
|
||||
(epoch 50)
|
||||
(eval "(get (erlang-eval-ast \"trigger_registry:start_link(), trigger_registry:add(create, trigger_registry:mk_spec(<<116,49>>, ranflow, undefined, any)), trigger_registry:add(create, trigger_registry:mk_spec(<<116,50>>, ranflow, undefined, any)), flow_store:start_link(), flow_store:register_flow(ranflow, ${DONEF}), {ok, Rs} = pipeline:apply_triggers(${ACT}, ${AS}, ${CFG}), length(Rs) =:= 2\") :name)")
|
||||
|
||||
;; ── unknown flow name -> {error, _} in results, no crash ───
|
||||
(epoch 60)
|
||||
(eval "(get (erlang-eval-ast \"trigger_registry:start_link(), trigger_registry:add(create, trigger_registry:mk_spec(<<116,99>>, ghostflow, undefined, any)), flow_store:start_link(), pipeline:apply_triggers(${ACT}, ${AS}, ${CFG}) =:= {ok, [{<<97,99,105,100>>, <<116,99>>, {error, no_such_flow}}]}\") :name)")
|
||||
|
||||
;; ── crashing flow -> isolated as {error, {flow_crashed, _}} ─
|
||||
(epoch 61)
|
||||
(eval "(get (erlang-eval-ast \"trigger_registry:start_link(), trigger_registry:add(create, trigger_registry:mk_spec(<<116,99>>, boom, undefined, any)), flow_store:start_link(), flow_store:register_flow(boom, ${BOOMF}), {ok, [{_, _, Outcome}]} = pipeline:apply_triggers(${ACT}, ${AS}, ${CFG}), case Outcome of {error, {flow_crashed, _}} -> true; _ -> false end\") :name)")
|
||||
|
||||
;; ── no trigger_registry cfg -> {ok, []} ────────────────────
|
||||
(epoch 70)
|
||||
(eval "(get (erlang-eval-ast \"trigger_registry:start_link(), trigger_registry:add(create, trigger_registry:mk_spec(<<116,99>>, ranflow, undefined, any)), flow_store:start_link(), flow_store:register_flow(ranflow, ${DONEF}), pipeline:apply_triggers(${ACT}, ${AS}, []) =:= {ok, []}\") :name)")
|
||||
EPOCHS
|
||||
|
||||
OUTPUT=$(timeout 360 "$SX_SERVER" < "$TMPFILE" 2>/dev/null)
|
||||
|
||||
check() {
|
||||
local epoch="$1" desc="$2" expected="$3"
|
||||
local actual
|
||||
actual=$(echo "$OUTPUT" | awk -v e="$epoch" '
|
||||
$0 ~ "^\\(ok-len " e " " { getline; print; exit }
|
||||
$0 ~ "^\\(ok " e " " { print; exit }
|
||||
$0 ~ "^\\(error " e " " { print; exit }
|
||||
')
|
||||
[ -z "$actual" ] && actual="<no output for epoch $epoch>"
|
||||
if echo "$actual" | grep -qF -- "$expected"; then
|
||||
PASS=$((PASS+1))
|
||||
[ "$VERBOSE" = "-v" ] && echo " ok $desc"
|
||||
else
|
||||
FAIL=$((FAIL+1))
|
||||
ERRORS+=" FAIL [$desc] (epoch $epoch) expected: $expected | actual: $actual
|
||||
"
|
||||
fi
|
||||
}
|
||||
|
||||
check 3 "pipeline module loaded" "pipeline"
|
||||
check 10 "lookup -> dispatch (audit)" "true"
|
||||
check 11 "dispatched flow actually ran" "true"
|
||||
check 20 "no matching trigger -> no dispatch" "true"
|
||||
check 30 "guard false -> no dispatch" "true"
|
||||
check 40 "dedup already-fired -> skipped" "true"
|
||||
check 50 "multi-bind: each dispatched" "true"
|
||||
check 60 "unknown flow -> error in results" "true"
|
||||
check 61 "crashing flow isolated" "true"
|
||||
check 70 "no registry cfg -> no dispatch" "true"
|
||||
|
||||
TOTAL=$((PASS+FAIL))
|
||||
if [ $FAIL -eq 0 ]; then
|
||||
echo "ok $PASS/$TOTAL next/tests/pipeline_triggers.sh passed"
|
||||
else
|
||||
echo "FAIL $PASS/$TOTAL passed, $FAIL failed:"
|
||||
echo "$ERRORS"
|
||||
fi
|
||||
[ $FAIL -eq 0 ]
|
||||
103
next/tests/subtype_of.sh
Executable file
103
next/tests/subtype_of.sh
Executable file
@@ -0,0 +1,103 @@
|
||||
#!/usr/bin/env bash
|
||||
# next/tests/subtype_of.sh — host-type federation Phase 1 acceptance.
|
||||
#
|
||||
# The SubtypeOf genesis verb (next/genesis/activity-types/subtype_of.sx)
|
||||
# records a hierarchy edge between two previously-defined types. This
|
||||
# suite confirms:
|
||||
# - the file parses with the expected DefineActivity head + :name
|
||||
# - the :schema predicate accepts an edge carrying both CIDs and
|
||||
# rejects edges missing either side
|
||||
# - a SubtypeOf envelope round-trips through term_codec
|
||||
#
|
||||
# Schema bodies are SX source; we eval them with `eval-expr` and call
|
||||
# the resulting lambda directly. 7 cases.
|
||||
|
||||
set -uo pipefail
|
||||
cd "$(git rev-parse --show-toplevel)"
|
||||
|
||||
SX_SERVER="${SX_SERVER:-hosts/ocaml/_build/default/bin/sx_server.exe}"
|
||||
if [ ! -x "$SX_SERVER" ]; then
|
||||
SX_SERVER="/root/rose-ash/hosts/ocaml/_build/default/bin/sx_server.exe"
|
||||
fi
|
||||
if [ ! -x "$SX_SERVER" ]; then
|
||||
echo "ERROR: sx_server.exe not found." >&2
|
||||
exit 1
|
||||
fi
|
||||
|
||||
VERBOSE="${1:-}"
|
||||
PASS=0; FAIL=0; ERRORS=""
|
||||
TMPFILE=$(mktemp); trap "rm -f $TMPFILE" EXIT
|
||||
|
||||
SCH='(eval-expr (get (apply dict (rest (parse (file-read \"next/genesis/activity-types/subtype_of.sx\")))) :schema))'
|
||||
|
||||
cat > "$TMPFILE" <<EPOCHS
|
||||
(epoch 1)
|
||||
(load "lib/erlang/tokenizer.sx")
|
||||
(load "lib/erlang/parser.sx")
|
||||
(load "lib/erlang/parser-core.sx")
|
||||
(load "lib/erlang/parser-expr.sx")
|
||||
(load "lib/erlang/parser-module.sx")
|
||||
(load "lib/erlang/transpile.sx")
|
||||
(load "lib/erlang/runtime.sx")
|
||||
(load "lib/erlang/vm/dispatcher.sx")
|
||||
(epoch 2)
|
||||
(eval "(get (erlang-load-module (file-read \"next/kernel/term_codec.erl\")) :name)")
|
||||
|
||||
;; ── parse / shape ──────────────────────────────────────────
|
||||
(epoch 10)
|
||||
(eval "(first (parse (file-read \"next/genesis/activity-types/subtype_of.sx\")))")
|
||||
(epoch 11)
|
||||
(eval "(get (apply dict (rest (parse (file-read \"next/genesis/activity-types/subtype_of.sx\")))) :name)")
|
||||
|
||||
;; ── schema accept / reject ─────────────────────────────────
|
||||
;; valid: both CIDs present + strings -> true
|
||||
(epoch 20)
|
||||
(eval "(define sch ${SCH}) (sch (dict :object (dict :child-type-cid \"bafyChild\" :parent-type-cid \"bafyParent\")))")
|
||||
;; reject: missing :child-type-cid -> false
|
||||
(epoch 21)
|
||||
(eval "(define sch ${SCH}) (sch (dict :object (dict :parent-type-cid \"bafyParent\")))")
|
||||
;; reject: missing :parent-type-cid -> false
|
||||
(epoch 22)
|
||||
(eval "(define sch ${SCH}) (sch (dict :object (dict :child-type-cid \"bafyChild\")))")
|
||||
|
||||
;; ── envelope round-trip through term_codec ─────────────────
|
||||
(epoch 30)
|
||||
(eval "(get (erlang-eval-ast \"A = [{type, subtype_of}, {actor, alice}, {object, [{child_type_cid, <<99,104>>}, {parent_type_cid, <<112,97>>}]}], {ok, D, _} = term_codec:decode(term_codec:encode(A)), D =:= A\") :name)")
|
||||
EPOCHS
|
||||
|
||||
OUTPUT=$(timeout 180 "$SX_SERVER" < "$TMPFILE" 2>/dev/null)
|
||||
|
||||
check() {
|
||||
local epoch="$1" desc="$2" expected="$3"
|
||||
local actual
|
||||
actual=$(echo "$OUTPUT" | awk -v e="$epoch" '
|
||||
$0 ~ "^\\(ok-len " e " " { getline; print; exit }
|
||||
$0 ~ "^\\(ok " e " " { print; exit }
|
||||
$0 ~ "^\\(error " e " " { print; exit }
|
||||
')
|
||||
[ -z "$actual" ] && actual="<no output for epoch $epoch>"
|
||||
if echo "$actual" | grep -qF -- "$expected"; then
|
||||
PASS=$((PASS+1))
|
||||
[ "$VERBOSE" = "-v" ] && echo " ok $desc"
|
||||
else
|
||||
FAIL=$((FAIL+1))
|
||||
ERRORS+=" FAIL [$desc] (epoch $epoch) expected: $expected | actual: $actual
|
||||
"
|
||||
fi
|
||||
}
|
||||
|
||||
check 10 "subtype_of.sx head form" "DefineActivity"
|
||||
check 11 "subtype_of.sx name" "SubtypeOf"
|
||||
check 20 "schema accepts edge with 2 CIDs" "true"
|
||||
check 21 "schema rejects missing child CID" "false"
|
||||
check 22 "schema rejects missing parent CID" "false"
|
||||
check 30 "SubtypeOf envelope round-trips" "true"
|
||||
|
||||
TOTAL=$((PASS+FAIL))
|
||||
if [ $FAIL -eq 0 ]; then
|
||||
echo "ok $PASS/$TOTAL next/tests/subtype_of.sh passed"
|
||||
else
|
||||
echo "FAIL $PASS/$TOTAL passed, $FAIL failed:"
|
||||
echo "$ERRORS"
|
||||
fi
|
||||
[ $FAIL -eq 0 ]
|
||||
143
next/tests/trigger_registry.sh
Executable file
143
next/tests/trigger_registry.sh
Executable file
@@ -0,0 +1,143 @@
|
||||
#!/usr/bin/env bash
|
||||
# next/tests/trigger_registry.sh — fed-sx triggers Phase 1 (registry).
|
||||
#
|
||||
# trigger_registry binds activity-types to durable flows. The kernel's
|
||||
# post-append fan-out (Phase 2) looks an arriving activity's type up
|
||||
# here and starts each registered flow. Mirrors peer_actors / peer_types:
|
||||
# a pure core + a gen_server, hydrated from a fold over DefineTrigger
|
||||
# activities.
|
||||
|
||||
set -uo pipefail
|
||||
cd "$(git rev-parse --show-toplevel)"
|
||||
|
||||
SX_SERVER="${SX_SERVER:-hosts/ocaml/_build/default/bin/sx_server.exe}"
|
||||
if [ ! -x "$SX_SERVER" ]; then
|
||||
SX_SERVER="/root/rose-ash/hosts/ocaml/_build/default/bin/sx_server.exe"
|
||||
fi
|
||||
if [ ! -x "$SX_SERVER" ]; then
|
||||
echo "ERROR: sx_server.exe not found." >&2
|
||||
exit 1
|
||||
fi
|
||||
|
||||
VERBOSE="${1:-}"
|
||||
PASS=0; FAIL=0; ERRORS=""
|
||||
TMPFILE=$(mktemp); trap "rm -f $TMPFILE" EXIT
|
||||
|
||||
# Spec1/Spec2 bind activity-type `create`. TrigAct/TrigAct2 are
|
||||
# DefineTrigger activities the fold hydrates from.
|
||||
SETUP='S1 = trigger_registry:mk_spec(<<99,49>>, flow_a, undefined, any), S2 = trigger_registry:mk_spec(<<99,50>>, flow_b, undefined, any), TrigAct = [{type, define_trigger}, {actor, alice}, {id, <<99,49>>}, {object, [{activity_type, create}, {flow_name, flow_a}]}], TrigAct2 = [{type, define_trigger}, {actor, alice}, {id, <<99,50>>}, {object, [{activity_type, follow}, {flow_name, flow_c}]}], Note = [{type, note}, {actor, alice}, {object, [{content, hi}]}],'
|
||||
|
||||
cat > "$TMPFILE" <<EPOCHS
|
||||
(epoch 1)
|
||||
(load "lib/erlang/tokenizer.sx")
|
||||
(load "lib/erlang/parser.sx")
|
||||
(load "lib/erlang/parser-core.sx")
|
||||
(load "lib/erlang/parser-expr.sx")
|
||||
(load "lib/erlang/parser-module.sx")
|
||||
(load "lib/erlang/transpile.sx")
|
||||
(load "lib/erlang/runtime.sx")
|
||||
(load "lib/erlang/vm/dispatcher.sx")
|
||||
(epoch 2)
|
||||
(eval "(er-load-gen-server!)")
|
||||
(epoch 3)
|
||||
(eval "(get (erlang-load-module (file-read \"next/kernel/envelope.erl\")) :name)")
|
||||
(epoch 4)
|
||||
(eval "(get (erlang-load-module (file-read \"next/kernel/trigger_registry.erl\")) :name)")
|
||||
|
||||
;; ── pure core ──────────────────────────────────────────────
|
||||
(epoch 10)
|
||||
(eval "(get (erlang-eval-ast \"trigger_registry:new() =:= []\") :name)")
|
||||
;; add + lookup round-trip
|
||||
(epoch 11)
|
||||
(eval "(get (erlang-eval-ast \"${SETUP} St = trigger_registry:add(create, S1, trigger_registry:new()), trigger_registry:lookup(create, St) =:= [S1]\") :name)")
|
||||
;; lookup with no match -> []
|
||||
(epoch 12)
|
||||
(eval "(get (erlang-eval-ast \"${SETUP} trigger_registry:lookup(create, trigger_registry:new()) =:= []\") :name)")
|
||||
;; multi-bind: two specs on the same activity-type, both returned in order
|
||||
(epoch 13)
|
||||
(eval "(get (erlang-eval-ast \"${SETUP} St = trigger_registry:add(create, S2, trigger_registry:add(create, S1, trigger_registry:new())), trigger_registry:lookup(create, St) =:= [S1, S2]\") :name)")
|
||||
;; remove by trigger cid
|
||||
(epoch 14)
|
||||
(eval "(get (erlang-eval-ast \"${SETUP} St = trigger_registry:add(create, S2, trigger_registry:add(create, S1, trigger_registry:new())), trigger_registry:lookup(create, trigger_registry:remove(<<99,49>>, St)) =:= [S2]\") :name)")
|
||||
;; remove last spec for a type prunes the type
|
||||
(epoch 15)
|
||||
(eval "(get (erlang-eval-ast \"${SETUP} St = trigger_registry:add(create, S1, trigger_registry:new()), trigger_registry:remove(<<99,49>>, St) =:= []\") :name)")
|
||||
;; spec accessors
|
||||
(epoch 16)
|
||||
(eval "(get (erlang-eval-ast \"${SETUP} {trigger_registry:spec_cid(S1), trigger_registry:spec_flow_name(S1), trigger_registry:spec_guard(S1), trigger_registry:spec_actor_scope(S1)} =:= {<<99,49>>, flow_a, undefined, any}\") :name)")
|
||||
|
||||
;; ── hydration fold ─────────────────────────────────────────
|
||||
;; a DefineTrigger activity registers its binding
|
||||
(epoch 20)
|
||||
(eval "(get (erlang-eval-ast \"${SETUP} St = trigger_registry:fold(TrigAct, trigger_registry:new()), trigger_registry:lookup(create, St) =:= [trigger_registry:mk_spec(<<99,49>>, flow_a, undefined, any)]\") :name)")
|
||||
;; a non-trigger activity passes through untouched
|
||||
(epoch 21)
|
||||
(eval "(get (erlang-eval-ast \"${SETUP} trigger_registry:fold(Note, trigger_registry:new()) =:= []\") :name)")
|
||||
;; folding several Trigger activities rebuilds the whole registry
|
||||
(epoch 22)
|
||||
(eval "(get (erlang-eval-ast \"${SETUP} St = trigger_registry:fold(TrigAct2, trigger_registry:fold(TrigAct, trigger_registry:new())), {trigger_registry:lookup(create, St), trigger_registry:lookup(follow, St)} =:= {[trigger_registry:mk_spec(<<99,49>>, flow_a, undefined, any)], [trigger_registry:mk_spec(<<99,50>>, flow_c, undefined, any)]}\") :name)")
|
||||
;; fold_fn/0 is a 2-arity fun
|
||||
(epoch 23)
|
||||
(eval "(get (erlang-eval-ast \"is_function(trigger_registry:fold_fn(), 2)\") :name)")
|
||||
|
||||
;; ── gen_server ─────────────────────────────────────────────
|
||||
(epoch 30)
|
||||
(eval "(get (erlang-eval-ast \"${SETUP} trigger_registry:start_link(), trigger_registry:add(create, S1), trigger_registry:lookup(create) =:= [S1]\") :name)")
|
||||
(epoch 31)
|
||||
(eval "(get (erlang-eval-ast \"trigger_registry:start_link(), trigger_registry:lookup(create) =:= []\") :name)")
|
||||
(epoch 32)
|
||||
(eval "(get (erlang-eval-ast \"${SETUP} trigger_registry:start_link(), trigger_registry:add(create, S1), trigger_registry:add(create, S2), trigger_registry:remove(<<99,49>>), trigger_registry:lookup(create) =:= [S2]\") :name)")
|
||||
(epoch 33)
|
||||
(eval "(get (erlang-eval-ast \"${SETUP} trigger_registry:start_link(), trigger_registry:add(create, S1), trigger_registry:add(follow, S2), trigger_registry:all_triggers() =:= [{create, [S1]}, {follow, [S2]}]\") :name)")
|
||||
;; start_link/1 pre-populates from a hydrated state
|
||||
(epoch 34)
|
||||
(eval "(get (erlang-eval-ast \"${SETUP} St = trigger_registry:fold(TrigAct, trigger_registry:new()), trigger_registry:start_link(St), trigger_registry:lookup(create) =:= [trigger_registry:mk_spec(<<99,49>>, flow_a, undefined, any)]\") :name)")
|
||||
EPOCHS
|
||||
|
||||
OUTPUT=$(timeout 300 "$SX_SERVER" < "$TMPFILE" 2>/dev/null)
|
||||
|
||||
check() {
|
||||
local epoch="$1" desc="$2" expected="$3"
|
||||
local actual
|
||||
actual=$(echo "$OUTPUT" | awk -v e="$epoch" '
|
||||
$0 ~ "^\\(ok-len " e " " { getline; print; exit }
|
||||
$0 ~ "^\\(ok " e " " { print; exit }
|
||||
$0 ~ "^\\(error " e " " { print; exit }
|
||||
')
|
||||
[ -z "$actual" ] && actual="<no output for epoch $epoch>"
|
||||
if echo "$actual" | grep -qF -- "$expected"; then
|
||||
PASS=$((PASS+1))
|
||||
[ "$VERBOSE" = "-v" ] && echo " ok $desc"
|
||||
else
|
||||
FAIL=$((FAIL+1))
|
||||
ERRORS+=" FAIL [$desc] (epoch $epoch) expected: $expected | actual: $actual
|
||||
"
|
||||
fi
|
||||
}
|
||||
|
||||
check 4 "trigger_registry module loaded" "trigger_registry"
|
||||
check 10 "new/0 -> []" "true"
|
||||
check 11 "add + lookup round-trip" "true"
|
||||
check 12 "lookup no match -> []" "true"
|
||||
check 13 "multi-bind same type, ordered" "true"
|
||||
check 14 "remove by trigger cid" "true"
|
||||
check 15 "remove last prunes the type" "true"
|
||||
check 16 "spec accessors" "true"
|
||||
check 20 "fold registers a binding" "true"
|
||||
check 21 "fold non-trigger passes through" "true"
|
||||
check 22 "fold hydration rebuilds registry" "true"
|
||||
check 23 "fold_fn/0 is fun/2" "true"
|
||||
check 30 "gen_server add + lookup" "true"
|
||||
check 31 "gen_server lookup no match -> []" "true"
|
||||
check 32 "gen_server remove" "true"
|
||||
check 33 "gen_server all_triggers" "true"
|
||||
check 34 "start_link/1 pre-populates" "true"
|
||||
|
||||
TOTAL=$((PASS+FAIL))
|
||||
if [ $FAIL -eq 0 ]; then
|
||||
echo "ok $PASS/$TOTAL next/tests/trigger_registry.sh passed"
|
||||
else
|
||||
echo "FAIL $PASS/$TOTAL passed, $FAIL failed:"
|
||||
echo "$ERRORS"
|
||||
fi
|
||||
[ $FAIL -eq 0 ]
|
||||
134
next/tests/triggers_e2e.sh
Executable file
134
next/tests/triggers_e2e.sh
Executable file
@@ -0,0 +1,134 @@
|
||||
#!/usr/bin/env bash
|
||||
# next/tests/triggers_e2e.sh — fed-sx triggers Phase 4 (end-to-end).
|
||||
#
|
||||
# The motivating blog-publish-digest flow, driven the whole way: a
|
||||
# trigger binds Article-creates to the flow; the post-append fan-out
|
||||
# starts it; the flow branches on :category, (for newsletters) suspends
|
||||
# on a morning timer, fetches followers (injected), and emits a
|
||||
# DigestSent activity object. Effect-as-data: the flow returns the
|
||||
# emails + DigestSent object (a driver would dispatch/append them) since
|
||||
# a flow can't call kernel gen_servers from inside the drive.
|
||||
#
|
||||
# Each epoch starts fresh gen_servers so instance ids are deterministic.
|
||||
|
||||
set -uo pipefail
|
||||
cd "$(git rev-parse --show-toplevel)"
|
||||
|
||||
SX_SERVER="${SX_SERVER:-hosts/ocaml/_build/default/bin/sx_server.exe}"
|
||||
if [ ! -x "$SX_SERVER" ]; then
|
||||
SX_SERVER="/root/rose-ash/hosts/ocaml/_build/default/bin/sx_server.exe"
|
||||
fi
|
||||
if [ ! -x "$SX_SERVER" ]; then
|
||||
echo "ERROR: sx_server.exe not found." >&2
|
||||
exit 1
|
||||
fi
|
||||
|
||||
VERBOSE="${1:-}"
|
||||
PASS=0; FAIL=0; ERRORS=""
|
||||
TMPFILE=$(mktemp); trap "rm -f $TMPFILE" EXIT
|
||||
|
||||
# Bring-up shared by every case: registry + store, a 3-follower mock,
|
||||
# the flow registered as blog_digest, and a trigger binding `create`
|
||||
# to it guarded on "the object is an Article". Cfg/AS as the fan-out
|
||||
# expects. Activities differ by :category (urgent / newsletter / draft)
|
||||
# plus a non-Article note.
|
||||
BOOT='trigger_registry:start_link(), flow_store:start_link(), FF = fun(_) -> [f1, f2, f3] end, Flow = blog_publish_digest:build([{fetch_followers, FF}]), flow_store:register_flow(blog_digest, Flow), Guard = fun(A, _) -> case envelope:get_field(object, A) of {ok, O} -> envelope:get_field(type, O) =:= {ok, article}; _ -> false end end, trigger_registry:add(create, trigger_registry:mk_spec(<<116,99>>, blog_digest, Guard, any)), Cfg = [{trigger_registry, trigger_registry}], AS = [{actor_id, alice}],'
|
||||
URGENT='[{type, create}, {actor, alice}, {id, <<117,49>>}, {object, [{type, article}, {category, urgent}]}]'
|
||||
NEWS='[{type, create}, {actor, alice}, {id, <<110,49>>}, {object, [{type, article}, {category, newsletter}]}]'
|
||||
DRAFT='[{type, create}, {actor, alice}, {id, <<100,49>>}, {object, [{type, article}, {category, draft}]}]'
|
||||
NOTE='[{type, create}, {actor, alice}, {id, <<120,49>>}, {object, [{type, note}]}]'
|
||||
|
||||
cat > "$TMPFILE" <<EPOCHS
|
||||
(epoch 1)
|
||||
(load "lib/erlang/tokenizer.sx")
|
||||
(load "lib/erlang/parser.sx")
|
||||
(load "lib/erlang/parser-core.sx")
|
||||
(load "lib/erlang/parser-expr.sx")
|
||||
(load "lib/erlang/parser-module.sx")
|
||||
(load "lib/erlang/transpile.sx")
|
||||
(load "lib/erlang/runtime.sx")
|
||||
(load "lib/erlang/vm/dispatcher.sx")
|
||||
(epoch 2)
|
||||
(eval "(er-load-gen-server!)")
|
||||
(eval "(get (erlang-load-module (file-read \"next/kernel/envelope.erl\")) :name)")
|
||||
(eval "(get (erlang-load-module (file-read \"next/flow/flow.erl\")) :name)")
|
||||
(eval "(get (erlang-load-module (file-read \"next/flow/flow_spec.erl\")) :name)")
|
||||
(eval "(get (erlang-load-module (file-read \"next/flow/flow_store.erl\")) :name)")
|
||||
(eval "(get (erlang-load-module (file-read \"next/kernel/trigger_registry.erl\")) :name)")
|
||||
(eval "(get (erlang-load-module (file-read \"next/kernel/flow_dispatch.erl\")) :name)")
|
||||
(eval "(get (erlang-load-module (file-read \"next/kernel/pipeline.erl\")) :name)")
|
||||
(epoch 3)
|
||||
(eval "(get (erlang-load-module (file-read \"next/flow/flows/blog_publish_digest.erl\")) :name)")
|
||||
|
||||
;; ── urgent: fans out, completes in one cycle, 3 emails ─────
|
||||
(epoch 10)
|
||||
(eval "(get (erlang-eval-ast \"${BOOT} pipeline:apply_triggers(${URGENT}, AS, Cfg) =:= {ok, [{<<117,49>>, <<116,99>>, {ok, 1}}]}\") :name)")
|
||||
(epoch 11)
|
||||
(eval "(get (erlang-eval-ast \"${BOOT} pipeline:apply_triggers(${URGENT}, AS, Cfg), {ok, {done, {digest_sent, Emails, _}}} = flow_store:status(1), length(Emails) =:= 3\") :name)")
|
||||
;; DigestSent emit object is well-formed (type, for the article, count)
|
||||
(epoch 12)
|
||||
(eval "(get (erlang-eval-ast \"${BOOT} pipeline:apply_triggers(${URGENT}, AS, Cfg), {ok, {done, {digest_sent, _, Digest}}} = flow_store:status(1), Digest =:= [{type, digest_sent}, {for, <<117,49>>}, {follower_count, 3}]\") :name)")
|
||||
|
||||
;; ── newsletter: suspends on the morning timer, then resumes ─
|
||||
(epoch 20)
|
||||
(eval "(get (erlang-eval-ast \"${BOOT} pipeline:apply_triggers(${NEWS}, AS, Cfg), flow_store:status(1) =:= {ok, {suspended, morning}}\") :name)")
|
||||
;; advancing the clock (resume the timer) drives it to completion
|
||||
(epoch 21)
|
||||
(eval "(get (erlang-eval-ast \"${BOOT} pipeline:apply_triggers(${NEWS}, AS, Cfg), {ok, {flow_done, {digest_sent, Emails, _}}} = flow_store:resume(1, morning_ts), length(Emails) =:= 3\") :name)")
|
||||
;; before resume no digest exists (still suspended, not done)
|
||||
(epoch 22)
|
||||
(eval "(get (erlang-eval-ast \"${BOOT} pipeline:apply_triggers(${NEWS}, AS, Cfg), case flow_store:status(1) of {ok, {done, _}} -> false; {ok, {suspended, morning}} -> true; _ -> false end\") :name)")
|
||||
|
||||
;; ── draft: the :else branch, no emails, no DigestSent ──────
|
||||
(epoch 30)
|
||||
(eval "(get (erlang-eval-ast \"${BOOT} pipeline:apply_triggers(${DRAFT}, AS, Cfg), flow_store:status(1) =:= {ok, {done, skipped}}\") :name)")
|
||||
|
||||
;; ── non-Article note: guard rejects, no flow dispatched ────
|
||||
(epoch 40)
|
||||
(eval "(get (erlang-eval-ast \"${BOOT} pipeline:apply_triggers(${NOTE}, AS, Cfg) =:= {ok, []}\") :name)")
|
||||
|
||||
;; ── dedup: the same activity arriving twice fires once ─────
|
||||
(epoch 50)
|
||||
(eval "(get (erlang-eval-ast \"${BOOT} pipeline:apply_triggers(${URGENT}, [{actor_id, alice}, {triggers_fired, [{<<117,49>>, <<116,99>>}]}], Cfg) =:= {ok, []}\") :name)")
|
||||
EPOCHS
|
||||
|
||||
OUTPUT=$(timeout 360 "$SX_SERVER" < "$TMPFILE" 2>/dev/null)
|
||||
|
||||
check() {
|
||||
local epoch="$1" desc="$2" expected="$3"
|
||||
local actual
|
||||
actual=$(echo "$OUTPUT" | awk -v e="$epoch" '
|
||||
$0 ~ "^\\(ok-len " e " " { getline; print; exit }
|
||||
$0 ~ "^\\(ok " e " " { print; exit }
|
||||
$0 ~ "^\\(error " e " " { print; exit }
|
||||
')
|
||||
[ -z "$actual" ] && actual="<no output for epoch $epoch>"
|
||||
if echo "$actual" | grep -qF -- "$expected"; then
|
||||
PASS=$((PASS+1))
|
||||
[ "$VERBOSE" = "-v" ] && echo " ok $desc"
|
||||
else
|
||||
FAIL=$((FAIL+1))
|
||||
ERRORS+=" FAIL [$desc] (epoch $epoch) expected: $expected | actual: $actual
|
||||
"
|
||||
fi
|
||||
}
|
||||
|
||||
check 3 "blog_publish_digest loaded" "blog_publish_digest"
|
||||
check 10 "urgent fans out (audit triple)" "true"
|
||||
check 11 "urgent: 3 emails dispatched" "true"
|
||||
check 12 "urgent: DigestSent object emitted" "true"
|
||||
check 20 "newsletter suspends on timer" "true"
|
||||
check 21 "newsletter resumes -> 3 emails" "true"
|
||||
check 22 "no digest before resume" "true"
|
||||
check 30 "draft -> else branch, skipped" "true"
|
||||
check 40 "non-Article note -> guard rejects" "true"
|
||||
check 50 "duplicate activity fires once" "true"
|
||||
|
||||
TOTAL=$((PASS+FAIL))
|
||||
if [ $FAIL -eq 0 ]; then
|
||||
echo "ok $PASS/$TOTAL next/tests/triggers_e2e.sh passed"
|
||||
else
|
||||
echo "FAIL $PASS/$TOTAL passed, $FAIL failed:"
|
||||
echo "$ERRORS"
|
||||
fi
|
||||
[ $FAIL -eq 0 ]
|
||||
211
plans/agent-briefings/host-fed-sx-adapter-loop.md
Normal file
211
plans/agent-briefings/host-fed-sx-adapter-loop.md
Normal file
@@ -0,0 +1,211 @@
|
||||
; -*- mode: markdown -*-
|
||||
# loops/host — fed-sx adapter slice (host side of host-type federation)
|
||||
|
||||
Scoped briefing for the follow-up that wires `loops/host`'s SX/dream
|
||||
front door to the fed-sx kernel substrate landed by
|
||||
`loops/fed-sx-types`. Companion to `plans/fed-sx-host-types.md` (the
|
||||
substrate design + public surface). This is the build sheet for the
|
||||
host-side adapters the substrate loop deliberately deferred.
|
||||
|
||||
```
|
||||
description: loops/host — fed-sx adapter (publish/serve/ingest typed posts)
|
||||
subagent_type: general-purpose
|
||||
run_in_background: true
|
||||
isolation: worktree # worktree at /root/rose-ash-loops/host
|
||||
```
|
||||
|
||||
## Why this is small now
|
||||
|
||||
The substrate is done and tested (`origin/loops/fed-sx-types`, 4
|
||||
phases). And the host already has *most of a type system*:
|
||||
`lib/host/blog.sx` models a **type as a post** with a content-address
|
||||
`:cid`, a `:schema` (`{:required [...]}`), `:fields`, `:template`, and a
|
||||
**`subtype-of` graph over lib/relations**. So this loop is not building
|
||||
a type model — it is **projecting the host's existing one onto fed-sx**
|
||||
and ingesting peers' types back. The pieces line up almost 1:1:
|
||||
|
||||
| host (lib/host/blog.sx) | fed-sx (next/kernel) |
|
||||
|--------------------------------------------|------------------------------------------|
|
||||
| a type-post + `:schema {:required [...]}` | `DefineType` activity + refinement `{required, [...]}` |
|
||||
| `subtype-of` edge (lib/relations) | `SubtypeOf` activity |
|
||||
| `host/blog-by-cid` / `host/blog-type-defs` | `peer_types` cache + `GET /types/<cid>` |
|
||||
| `host/blog-type-issues` (local validate) | `pipeline:apply_object_schema/2` (inbound) |
|
||||
|
||||
The host's `{:required [...]}` schema maps **directly** onto the
|
||||
term_codec-safe `{required, [Field,...]}` refinement form the substrate
|
||||
already validates — so schema translation is nearly trivial. Derive
|
||||
both validators from the *same* schema data to avoid drift.
|
||||
|
||||
## Scope
|
||||
|
||||
**In scope** — `lib/host/**` only (the mirror of fed-sx-types'
|
||||
`next/**` only). New: `lib/host/fed_sx_outbox.sx`,
|
||||
`lib/host/fed_sx_inbox.sx` (and a small shared `lib/host/fed_sx.sx`
|
||||
bridge + a `host/types-routes`), plus `serve.sh` bring-up wiring and
|
||||
`lib/host/tests/`.
|
||||
|
||||
**Out of scope** — `next/**`. The substrate is frozen public surface;
|
||||
do **not** edit `next/kernel/**` or the genesis verbs from here. If a
|
||||
gap is found, file it against fed-sx-types, don't patch across the
|
||||
boundary. **Hard line: do not edit `next/`.**
|
||||
|
||||
## Branch base
|
||||
|
||||
Start from `loops/host`, then bring in the substrate (clean, additive —
|
||||
disjoint paths, no conflicts):
|
||||
|
||||
```bash
|
||||
cd /root/rose-ash-loops/host
|
||||
git fetch origin
|
||||
git merge origin/loops/fed-sx-types # adds next/kernel + genesis + plan doc
|
||||
```
|
||||
|
||||
After the merge the worktree has both `lib/host/**` (host) and
|
||||
`next/kernel/**` (fed-sx substrate) on one branch.
|
||||
|
||||
## Phase 0 — settle the runtime boundary (DECIDE FIRST, blocks all else)
|
||||
|
||||
`lib/host` is **pure same-runtime SX** (one `sx_server.exe`: stdlib →
|
||||
R7RS → APL → Datalog → ACL → Relations → Feed → Persist → Dream →
|
||||
Host, per `serve.sh`). The fed-sx kernel is **Erlang-on-SX** on the
|
||||
er-scheduler (`erlang-load-module` + gen_servers: `peer_types`,
|
||||
`nx_kernel`). The host's dream handlers run on the native `http-listen`
|
||||
accept loop — **outside** the er-scheduler. Calling a kernel
|
||||
`gen_server:call` synchronously from a native-thread handler hits the
|
||||
known scheduler-context deadlock (see
|
||||
`plans/fed-sx-design.md` §, and the fed-prims http-listen note: a
|
||||
handler on `Thread.create` outside er-sched can't complete a
|
||||
`gen_server:call → receive`).
|
||||
|
||||
Two architectures; **the loop's first deliverable is choosing one with
|
||||
a tiny spike**:
|
||||
|
||||
- **Option A — in-process Erlang bridge.** Host's sx_server also
|
||||
`erlang-load-module`s the kernel and calls it directly. Pro: one
|
||||
process, no serialization. Con: the deadlock above — kernel calls
|
||||
must be marshalled onto the er-scheduler or restricted to pure
|
||||
(non-gen_server) functions. Fragile; not recommended.
|
||||
|
||||
- **Option B — HTTP boundary (RECOMMENDED).** Run the fed-sx kernel
|
||||
with its own `http_server`/`http-listen` loop (it already has the
|
||||
whole route surface, and the m2 two-instance smoke test proves
|
||||
HTTP federation between fed-sx nodes). The host talks to its local
|
||||
fed-sx node **over localhost HTTP using the wire it already speaks**
|
||||
(term_codec / activity+json / type-doc). This is literally what
|
||||
federation is — the host is just another peer to its own node. An
|
||||
`httpc`/localhost call from a native-thread host handler does **not**
|
||||
touch the er-scheduler, so the deadlock never arises; the kernel's
|
||||
own listen handler runs the gen_server calls within er-sched context.
|
||||
Works whether the kernel is a sidecar process or spawned on an
|
||||
er-scheduler process inside the host's sx_server (two ports, one
|
||||
process). Pro: clean, reuses the fully-tested surface, no deadlock.
|
||||
Con: serialization + lifecycle coordination.
|
||||
|
||||
**Recommendation: Option B.** Spike: host handler → `httpc` POST to the
|
||||
local kernel's `/activity` → 200/cid back, with no hang. Lock the
|
||||
decision before Phase 1.
|
||||
|
||||
## Phase 1 — outbox: project host types → DefineType / SubtypeOf
|
||||
|
||||
`lib/host/fed_sx_outbox.sx`. When a host type-post is created/updated
|
||||
(`host/blog-put!` path), project it and publish to the local fed-sx
|
||||
node:
|
||||
|
||||
- type-post → `DefineType` activity: `:object` = `{name: slug,
|
||||
fields: (host/blog-fields-of slug), refinement-schema:
|
||||
(host/blog-schema-of slug), instance-type: <base>}`. The host
|
||||
`{:required [...]}` becomes the substrate `{required, [...]}` form
|
||||
verbatim.
|
||||
- each `subtype-of` edge (`relations/parents` over `"subtype-of"`) →
|
||||
a `SubtypeOf` activity `{child-type-cid, parent-type-cid}`.
|
||||
- publish via Phase 0's transport (POST `/activity` to the local node,
|
||||
authed with the node's publish token).
|
||||
|
||||
**Key open decision — the type CID.** The host computes `:cid` via
|
||||
`host/blog--cid-of` (double-hash over the canonical record); fed-sx
|
||||
keys `peer_types` by a `TypeCid`. Either:
|
||||
(a) **adopt the host `:cid` as the fed-sx TypeCid** — one identity,
|
||||
no reconciliation, but peers can't content-verify it from the
|
||||
wire bytes; or
|
||||
(b) **let the kernel content-address the TypeRecord** — verifiable,
|
||||
but the host must keep a `slug → fed-sx-cid` map (and
|
||||
`SubtypeOf` edges must reference fed-sx CIDs, not host CIDs).
|
||||
Pick one and document it in `plans/fed-sx-host-types.md`. (a) is
|
||||
simpler and probably right for v1; revisit when cross-node verification
|
||||
matters.
|
||||
|
||||
## Phase 2 — inbox: ingest peers' types + validate typed objects
|
||||
|
||||
`lib/host/fed_sx_inbox.sx`. Inbound from the local node's inbox:
|
||||
|
||||
- inbound `DefineType` → `peer_types:put` (cache it). Decide whether to
|
||||
also **materialize** it as a host post (`host/blog-put!` +
|
||||
`host/blog--set-schema!`) or keep federation-only types out of the
|
||||
local blog (recommended for v1: cache-only, materialize on demand).
|
||||
- inbound `SubtypeOf` → record the edge (peer_types hierarchy and/or
|
||||
`host/blog-relate! child parent "subtype-of"` if materialized).
|
||||
- inbound typed `Create` (a post that `is-a` some refinement type) →
|
||||
the kernel inbound pipeline runs `pipeline:apply_object_schema/2`
|
||||
(configured with a `type_index` + `{peer_types, peer_types}` +
|
||||
`type_fetch_fn`), so a typed object is validated against its declared
|
||||
type **before** the host sees it. Choose `strict_object_schema`
|
||||
per-node (default false = open-world).
|
||||
|
||||
**Avoid double-validation drift:** the host already has
|
||||
`host/blog-type-issues`. Let the **kernel validate federation inbound**
|
||||
and the **host validate local writes**, both deriving from the same
|
||||
`{:required [...]}` schema data — don't fork the rules.
|
||||
|
||||
## Phase 3 — serve + bring-up wiring
|
||||
|
||||
- **Serve `GET /types/<cid>`** on the host front door. Either proxy to
|
||||
the kernel's `/types/<cid>` (Option B keeps one source of truth), or
|
||||
serve directly from `host/blog-by-cid` + the projected TypeRecord.
|
||||
Hook as `(dream-get "/types/:cid" host/types-by-cid)` and add
|
||||
`host/types-routes` to the `host/serve` list (per `router.sx` /
|
||||
`serve.sh` pattern).
|
||||
- **Bring-up** in `serve.sh`: start the fed-sx node (Phase 0 transport),
|
||||
start `peer_types`, configure `type_fetch_fn =
|
||||
discovery_type_fetch:make_fetch_fn()` + a `type_url` resolver, and on
|
||||
startup project existing host types (Phase 1) so the node is
|
||||
type-aware from boot. Gate writes behind the existing
|
||||
`host/require-auth` / `host/require-permission` middleware, same as
|
||||
the relations write routes.
|
||||
|
||||
## Phase 4 — end-to-end round-trip test
|
||||
|
||||
Two nodes (host A + host B, or host + a sidecar fed-sx node): A defines
|
||||
a refinement type → B fetches the type-doc via `GET /types/<cid>` → B
|
||||
ingests an inbound typed object and `apply_object_schema` accepts the
|
||||
valid one / rejects a refinement-failing one. Mirror the m2
|
||||
two-instance smoke test style. Plus per-phase suites in
|
||||
`lib/host/tests/` (the host runs its own `conformance.sh`).
|
||||
|
||||
## Tests discipline
|
||||
|
||||
- The host's `lib/host/conformance.sh` green before AND after every
|
||||
commit. `lib/host` is **LIVE at blog.rose-ash.com** — pushing
|
||||
`loops/host` reloads dev, so treat pushes as deliberate.
|
||||
- Commits scoped to `lib/host/**` (+ `plans/fed-sx-host-types.md` as
|
||||
decisions ratify). Do **not** edit `next/**`.
|
||||
- One commit per phase; smaller intermediate commits fine if each
|
||||
leaves the gate green. The Phase-0 spike can be its own commit.
|
||||
|
||||
## Done when
|
||||
|
||||
- A host type round-trips: defined locally → published as
|
||||
`DefineType`/`SubtypeOf` → fetchable at `GET /types/<cid>` → a peer
|
||||
validates an inbound typed object against it.
|
||||
- `peer_types` is populated from inbound `DefineType`, and the inbound
|
||||
pipeline rejects refinement-failing typed objects (strict node).
|
||||
- The runtime-boundary decision and the type-CID decision are recorded
|
||||
in `plans/fed-sx-host-types.md`.
|
||||
- Host `conformance.sh` + the new fed-sx adapter suites green.
|
||||
|
||||
## Parallel-safety with loops/fed-sx-types
|
||||
|
||||
That loop owns `next/**` and is feature-complete; this loop owns
|
||||
`lib/host/**`. Disjoint surfaces — they meet only at the merge that
|
||||
brings the substrate in (Branch base, above). If this loop needs a
|
||||
substrate change, file it against fed-sx-types rather than editing
|
||||
`next/` here.
|
||||
@@ -1296,6 +1296,32 @@ inbox + pull from outbox. SSE is convenience, not protocol.
|
||||
unknown verbs are stored-but-not-projected — safe by default, with explicit
|
||||
operator control over what extensions load.
|
||||
|
||||
### 13.10 Activity-driven flow triggers (kernel convention)
|
||||
|
||||
Beyond projections (which fold an activity into read-model state), the kernel
|
||||
supports firing **durable business flows** off arriving activities — the
|
||||
"something happened → here is what we DO about it" half of the model. The
|
||||
convention (substrate landed in `loops/fed-sx-types`, Phases 5–8):
|
||||
|
||||
- A `DefineTrigger{activity-type, flow-name, guard?, actor-scope?}` activity binds
|
||||
an activity-type to a named flow. `trigger_registry` hydrates from a fold over
|
||||
these (restart-safe, same content-addressing as `define-registry`).
|
||||
- Fan-out runs **after** the kernel append, as the last pipeline step (§14):
|
||||
`envelope → signature → activity-type schema → object schema → append → trigger
|
||||
fan-out`. Only accepted activities fire flows; rejected ones never trigger.
|
||||
- Fan-out is deduped per `{activity-cid, trigger-cid}` (federation can deliver the
|
||||
same activity twice via different peers) using the actor's `:triggers_fired`
|
||||
field, and is failure-isolated: one flow's failure never blocks the append or
|
||||
the other flows.
|
||||
- Flows run on **flow-on-erlang** (`next/flow/`), a native Erlang-on-SX durable
|
||||
workflow engine (deterministic-replay suspend/resume; combinator algebra
|
||||
mirrored from the Scheme `lib/flow`). It runs in the kernel's own runtime, so
|
||||
the fan-out is a direct call — no cross-guest bridge. Because a flow runs inside
|
||||
the engine's drive (where a blocking kernel call would deadlock the cooperative
|
||||
scheduler), flows are **pure and describe effects as data** (their output, or a
|
||||
`suspend`); a driver outside the flow performs IO and appends any follow-up
|
||||
activity — which can in turn trigger further flows.
|
||||
|
||||
## 14. Validation pipeline
|
||||
|
||||
Every activity entering the substrate (whether published locally or received from a
|
||||
|
||||
112
plans/fed-sx-host-types.md
Normal file
112
plans/fed-sx-host-types.md
Normal file
@@ -0,0 +1,112 @@
|
||||
; -*- mode: markdown -*-
|
||||
# fed-sx host-type federation — substrate design + build log
|
||||
|
||||
How a host's typed-post graph (refinement types declared in
|
||||
`lib/host`'s metamodel) flows across fed-sx nodes: a type is published
|
||||
as a content-addressed `DefineType` activity, peers cache its record,
|
||||
serve it over the wire, and validate inbound objects against the
|
||||
declared refinement schema before appending them.
|
||||
|
||||
This document is both the design and the running build log for
|
||||
`loops/fed-sx-types`. The companion build sheet is
|
||||
`plans/agent-briefings/fed-sx-types-loop.md`.
|
||||
|
||||
## Vocabulary
|
||||
|
||||
- **Type record** — `{name, fields, refinement-schema, instance-type}`.
|
||||
The parsed `:object` payload of a `DefineType` activity. Immutable
|
||||
per CID: an updated type is a new CID (no in-place evolution).
|
||||
- **Type CID** — content-address of the type record's wire form. The
|
||||
stable handle a `SubtypeOf` edge or an object's `{type, _}` field
|
||||
references.
|
||||
- **Refinement schema** — a predicate over an object's field-values;
|
||||
the extra constraint a refinement type adds on top of its base
|
||||
`instance-type` (e.g. a `Post` is a `Note` whose `:title` is a
|
||||
non-empty string).
|
||||
|
||||
## Scope
|
||||
|
||||
Substrate side only — everything under `next/**`. The host-side
|
||||
adapters (`lib/host/fed_sx_outbox.sx`, `lib/host/fed_sx_inbox.sx`)
|
||||
are a deliberate follow-up that consumes this branch's public surface
|
||||
(`DefineType` / `SubtypeOf` verbs, `peer_types`, the `/types/<cid>`
|
||||
route) once `loops/host`'s metamodel settles. **This loop does not
|
||||
touch `lib/host/`.**
|
||||
|
||||
## Steps
|
||||
|
||||
### Step 1 — `DefineType` + `SubtypeOf` genesis activity-types — DONE
|
||||
|
||||
New `DefineActivity`-form genesis files, parsed as data by
|
||||
`bootstrap.erl` at startup (no kernel change yet):
|
||||
|
||||
- `next/genesis/activity-types/define_type.sx` — declares the
|
||||
`DefineType` verb. `:schema` accepts an activity whose `:object`
|
||||
carries a string `:name` and an optional list `:fields`.
|
||||
- `next/genesis/activity-types/subtype_of.sx` — declares the
|
||||
`SubtypeOf` verb. `:schema` accepts an `:object` carrying both
|
||||
`:child-type-cid` and `:parent-type-cid` as strings.
|
||||
|
||||
Schema bodies are SX source written with nested `get` (not
|
||||
keyword-threading) so they are directly evaluatable: keywords are not
|
||||
callable getters in the kernel and `(-> d :k)` does not get. Both are
|
||||
registered in `next/genesis/manifest.sx` (activity-types now 7) and the
|
||||
bundle counts in the bootstrap suites were bumped accordingly.
|
||||
|
||||
Tests: `next/tests/define_type.sh`, `next/tests/subtype_of.sh` — parse
|
||||
shape, schema accept/reject, and a `term_codec` envelope round-trip.
|
||||
|
||||
### Step 2 — `peer_types.erl` receiver-side cache — DONE
|
||||
|
||||
`next/kernel/peer_types.erl`, a mirror of `peer_actors.erl` keyed by
|
||||
type CID. State `[{TypeCidBytes, TypeRecord}, ...]`. Pure API
|
||||
(`new/2`-threaded `lookup`/`store`/`evict`/`types`/`lookup_or_fetch`)
|
||||
plus a registered gen_server (`put`, `lookup`, `state_for`,
|
||||
`known_types`, `lookup_or_fetch`). On a miss `lookup_or_fetch` pulls a
|
||||
Cfg-supplied `type_fetch_fn :: fun ((TypeCid, Cfg) -> {ok, Bytes} |
|
||||
{error, _})`, decodes the wire bytes via `term_codec`, and caches the
|
||||
record. No fn → `{error, no_fetch_fn}`; fetch error or bad bytes do not
|
||||
poison the cache. Test: `next/tests/peer_types.sh`.
|
||||
|
||||
### Step 3 — `/types/<cid>` route + `discovery_type_fetch.erl` — DONE
|
||||
|
||||
`http_server.erl` serves `GET /types/<cid>` with
|
||||
`Accept: application/vnd.fed-sx.type-doc`: the cached TypeRecord
|
||||
`term_codec`-encoded, 404 if not cached. `discovery_type_fetch.erl`
|
||||
holds the live-HTTP closure that `peer_types:lookup_or_fetch` calls.
|
||||
Tests: `next/tests/peer_types_route.sh`, `next/tests/discovery_type_fetch.sh`.
|
||||
|
||||
### Step 4 — object-schema validation stage in `pipeline.erl` — DONE
|
||||
|
||||
`pipeline:apply_object_schema/2` (+ `stage_object_schema/1` factory)
|
||||
sits between activity-type validation and the kernel append. When an
|
||||
inbound object carries `{type, TypeName}`, resolve the TypeRecord
|
||||
(Cfg `type_index`: TypeName → TypeCid; then
|
||||
`peer_types:lookup_or_fetch/2`) and apply its refinement schema to the
|
||||
object's `:field_values`. The schema is either a 1-arity Erlang
|
||||
predicate (the substrate stand-in, for locally-defined types) or a
|
||||
term_codec-safe `{required, [Field, ...]}` data constraint (so a
|
||||
wire-fetched record validates too). Default `strict_object_schema =
|
||||
false`: an unresolvable type is let through (the non-strict skip is
|
||||
where a `validation_skipped` log belongs); opt-in strict rejects.
|
||||
Objects with no declared type, and type names absent from the local
|
||||
index, are skipped (open-world). Test: `next/tests/object_schema.sh`.
|
||||
|
||||
## Out of scope (deliberately)
|
||||
|
||||
- Host-side outbox/inbox adapters (`lib/host/**`).
|
||||
- Type evolution / version migration — schemas are immutable per CID;
|
||||
the "name → currently-valid CID" routing layer is a separate problem.
|
||||
- Subtype-of unification / rendering across nodes — the graph data
|
||||
lands via `SubtypeOf` activities; dedup/display is a consumer concern.
|
||||
|
||||
## What the host-side adapter loop gets
|
||||
|
||||
Once all four steps land, the follow-up `loops/host` adapter work can
|
||||
treat the following as stable public surface:
|
||||
|
||||
- `DefineType` / `SubtypeOf` activity verbs (publish a type, link two).
|
||||
- `peer_types` gen_server (cache a peer's type, look it up).
|
||||
- `GET /types/<cid>` (serve a type the node knows).
|
||||
- `pipeline`'s object-schema stage (inbound objects validated against
|
||||
their declared refinement type when resolvable).
|
||||
Reference in New Issue
Block a user