#!/usr/bin/env bash # next/tests/dispatch_http.sh — m2 Step 8f acceptance test. # # Verifies the live HTTP dispatch closure built by # dispatch_http:make_dispatch_fn/2: # * 2xx response -> ok # * non-2xx (404) -> {error, {status, 404}} # * resolver miss -> {error, no_peer_url} # * connection refused (closed port) -> {error, ...} # * inbox_url constructs the path /actors//inbox # * the closure can be plugged into delivery_worker:drain # # Live HTTP uses a background `python3 -m http.server`. Step 8e's # httpc:request/4 BIF wrapper is the underlying transport. set -uo pipefail cd "$(git rev-parse --show-toplevel)" SX_SERVER="${SX_SERVER:-hosts/ocaml/_build/default/bin/sx_server.exe}" if [ ! -x "$SX_SERVER" ]; then SX_SERVER="/root/rose-ash/hosts/ocaml/_build/default/bin/sx_server.exe" fi if [ ! -x "$SX_SERVER" ]; then echo "ERROR: sx_server.exe not found." >&2 exit 1 fi VERBOSE="${1:-}" PASS=0; FAIL=0; ERRORS="" PORT=$(python3 -c 'import socket;s=socket.socket();s.bind(("127.0.0.1",0));print(s.getsockname()[1]);s.close()') SRVROOT=$(mktemp -d) # Python's http.server returns 200 for any GET to an existing path and # 501 for POST. For our purposes we need a POST endpoint that returns # 2xx. Use a tiny background Python server that always returns 200 OK # regardless of method, so we can prove the dispatch path works. PYSRV="$SRVROOT/srv.py" cat > "$PYSRV" <<'PY' import sys, http.server, socketserver PORT = int(sys.argv[1]) class H(http.server.BaseHTTPRequestHandler): def do_POST(self): n = int(self.headers.get('content-length', '0')) self.rfile.read(n) if n else None self.send_response(200); self.send_header('content-type','text/plain'); self.end_headers() self.wfile.write(b'ok') def do_GET(self): self.send_response(200); self.send_header('content-type','text/plain'); self.end_headers() self.wfile.write(b'ok') def log_message(self, fmt, *args): pass with socketserver.TCPServer(("127.0.0.1", PORT), H) as srv: srv.serve_forever() PY python3 "$PYSRV" "$PORT" >/dev/null 2>&1 & SRV_PID=$! TMPFILE=$(mktemp) trap "rm -rf $SRVROOT $TMPFILE; kill $SRV_PID 2>/dev/null || true" EXIT for _ in 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15; do if curl -fsS "http://127.0.0.1:$PORT/" >/dev/null 2>&1; then break; fi sleep 0.2 done # A DIFFERENT port that nothing is bound to — for the connection- # refused test. DEAD_PORT=$(python3 -c 'import socket;s=socket.socket();s.bind(("127.0.0.1",0));p=s.getsockname()[1];s.close();print(p)') bytes_of() { python3 -c "import sys; print(','.join(str(b) for b in sys.argv[1].encode()))" "$1"; } URL_BASE_BYTES=$(bytes_of "http://127.0.0.1:$PORT") URL_DEAD_BYTES=$(bytes_of "http://127.0.0.1:$DEAD_PORT") cat > "$TMPFILE" <<'EPOCHS' (epoch 1) (load "lib/erlang/tokenizer.sx") (load "lib/erlang/parser.sx") (load "lib/erlang/parser-core.sx") (load "lib/erlang/parser-expr.sx") (load "lib/erlang/parser-module.sx") (load "lib/erlang/transpile.sx") (load "lib/erlang/runtime.sx") (load "lib/erlang/vm/dispatcher.sx") (epoch 2) (eval "(er-load-gen-server!)") (epoch 3) (eval "(get (erlang-load-module (file-read \"next/kernel/envelope.erl\")) :name)") (epoch 4) (eval "(get (erlang-load-module (file-read \"next/kernel/log.erl\")) :name)") (epoch 5) (eval "(get (erlang-load-module (file-read \"next/kernel/pipeline.erl\")) :name)") (epoch 6) (eval "(get (erlang-load-module (file-read \"next/kernel/outbox.erl\")) :name)") (epoch 7) (eval "(get (erlang-load-module (file-read \"next/kernel/term_codec.erl\")) :name)") (epoch 8) (eval "(get (erlang-load-module (file-read \"next/kernel/dispatch_http.erl\")) :name)") (epoch 9) (eval "(get (erlang-load-module (file-read \"next/kernel/follower_graph.erl\")) :name)") (epoch 10) (eval "(get (erlang-load-module (file-read \"next/kernel/delivery.erl\")) :name)") (epoch 11) (eval "(get (erlang-load-module (file-read \"next/kernel/delivery_worker.erl\")) :name)") ;; inbox_url builds /actors//inbox (epoch 20) (eval "(get (erlang-eval-ast \"U = dispatch_http:inbox_url(<<__URL_BASE__>>, alice), case U of <<__URL_BASE__,47,97,99,116,111,114,115,47,97,108,105,99,101,47,105,110,98,111,120>> -> true; _ -> false end\") :name)") ;; resolve_peer_url hits the static map (epoch 21) (eval "(get (erlang-eval-ast \"Cfg = [{peer_url, [{alice, <<__URL_BASE__>>}]}], case dispatch_http:resolve_peer_url(alice, Cfg) of {ok, _} -> true; _ -> false end\") :name)") ;; resolve_peer_url misses cleanly (epoch 22) (eval "(get (erlang-eval-ast \"Cfg = [{peer_url, [{bob, <<__URL_BASE__>>}]}], case dispatch_http:resolve_peer_url(alice, Cfg) of {error, no_peer_url} -> true; _ -> false end\") :name)") ;; dispatch -> 200 from python server -> ok (epoch 23) (eval "(get (erlang-eval-ast \"Activity = [{type, note}, {object, [{content, hi}]}], dispatch_http:dispatch(<<__URL_BASE__,47,105,110,98,111,120>>, Activity, []) =:= ok\") :name)") ;; closure produced by make_dispatch_fn dispatches ok (epoch 24) (eval "(get (erlang-eval-ast \"Cfg = [{peer_url, [{alice, <<__URL_BASE__>>}]}], Fn = dispatch_http:make_dispatch_fn(alice, Cfg), Activity = [{type, note}, {object, [{content, hi}]}], Fn(Activity) =:= ok\") :name)") ;; closure on missing peer -> {error, no_peer_url} (epoch 25) (eval "(get (erlang-eval-ast \"Cfg = [{peer_url, []}], Fn = dispatch_http:make_dispatch_fn(alice, Cfg), Activity = [{type, note}, {object, [{content, hi}]}], case Fn(Activity) of {error, no_peer_url} -> true; _ -> false end\") :name)") ;; dispatch against a closed port -> error (not crash) (epoch 26) (eval "(get (erlang-eval-ast \"Activity = [{type, note}, {object, [{content, hi}]}], R = dispatch_http:dispatch(<<__URL_DEAD__,47,105,110,98,111,120>>, Activity, []), case R of {error, _} -> true; _ -> false end\") :name)") ;; delivery_worker drains successfully through the live closure. ;; Spin up a delivery_worker, enqueue an activity, set the live ;; dispatch_fn, drain — should drop the entry. (epoch 27) (eval "(get (erlang-eval-ast \"delivery_worker:start_link(alice), Cfg = [{peer_url, [{alice, <<__URL_BASE__>>}]}], Fn = dispatch_http:make_dispatch_fn(alice, Cfg), delivery_worker:set_dispatch_fn(alice, Fn), Activity = [{type, note}, {object, [{content, hi}]}, {cid, <<\\\"c1\\\">>}], delivery_worker:enqueue(alice, Activity), delivery_worker:flush(alice), delivery_worker:pending_srv(alice) =:= []\") :name)") ;; peer_url_fn closure path also resolves (epoch 28) (eval "(get (erlang-eval-ast \"Cfg = [{peer_url_fn, fun (alice) -> {ok, <<__URL_BASE__>>}; (_) -> not_found end}], Fn = dispatch_http:make_dispatch_fn(alice, Cfg), Activity = [{type, note}, {object, [{content, hi}]}], Fn(Activity) =:= ok\") :name)") EPOCHS sed -i "s|__URL_BASE__|${URL_BASE_BYTES}|g; s|__URL_DEAD__|${URL_DEAD_BYTES}|g" "$TMPFILE" OUTPUT=$(timeout 360 "$SX_SERVER" < "$TMPFILE" 2>/dev/null) check() { local epoch="$1" desc="$2" expected="$3" local actual actual=$(echo "$OUTPUT" | awk -v e="$epoch" ' $0 ~ "^\\(ok-len " e " " { getline; print; exit } $0 ~ "^\\(ok " e " " { print; exit } $0 ~ "^\\(error " e " " { print; exit } ') [ -z "$actual" ] && actual="" if echo "$actual" | grep -qF -- "$expected"; then PASS=$((PASS+1)) [ "$VERBOSE" = "-v" ] && echo " ok $desc" else FAIL=$((FAIL+1)) ERRORS+=" FAIL [$desc] (epoch $epoch) expected: $expected | actual: $actual " fi } check 8 "dispatch_http loaded" "dispatch_http" check 20 "inbox_url builds /actors/X/inbox" "true" check 21 "resolve hits static peer_url map" "true" check 22 "resolve misses cleanly" "true" check 23 "live POST -> 200 -> ok" "true" check 24 "closure dispatches ok" "true" check 25 "closure on missing peer -> err" "true" check 26 "closed port -> {error, _}" "true" check 27 "delivery_worker drains via closure" "true" check 28 "peer_url_fn closure path resolves" "true" TOTAL=$((PASS+FAIL)) if [ $FAIL -eq 0 ]; then echo "ok $PASS/$TOTAL next/tests/dispatch_http.sh passed" else echo "FAIL $PASS/$TOTAL passed, $FAIL failed:" echo "$ERRORS" fi [ $FAIL -eq 0 ]