From d36fe4ee97d4c7a936dc31adeabbe11ec4024c3b Mon Sep 17 00:00:00 2001 From: giles Date: Sat, 6 Jun 2026 19:19:02 +0000 Subject: [PATCH] =?UTF-8?q?fed-sx-m2:=20Step=205d=20=E2=80=94=20inbox=20ha?= =?UTF-8?q?ndler=20wires=20the=20ingestion=20chain?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit POST /actors//inbox is now special-cased in route/2 (next to POST /activity) so the body + Cfg reach the new handle_inbox_post/3 handler. Wire format: body = term_codec:encode(SignedActivity); the receiver decodes into the activity proplist and runs the chain. handle_inbox_post/3 orchestration: 1. kernel_has_actor(field(kernel, Cfg), TargetId) -> 404 if missing 2. decode_activity(Body) -> 422 on bad shape 3. envelope:get_field(actor, Activity) -> 422 if no peer id 4. resolve_peer_as(PeerId, Cfg) -> 401 if unknown 5. nx_kernel:inbox_state_for(TargetAtom) -> 404 belt-and-braces 6. pipeline:validate_inbound(Activity, PeerAS, InboxLog) ok -> nx_kernel:append_inbox + 202 {error, bad_signature} -> 401 {error, no_signature} -> 401 {error, _} -> 422 resolve_peer_as/2 supports three Cfg paths in priority order: {peer_as, [{PeerId, AS}, ...]} pure-fn pre-populated map {peer_actors, AtomName} peer_actors gen_server cache {peer_fetch_fn, fun/1} fallback on srv cache miss Empty Cfg returns {error, no_peer_resolver} -> 401. v1 actor_post/1 4a stub deleted; M1 actor_inbox_post_response/0 kept for response composition. Projection broadcast on inbox success intentionally deferred to a follow-up sub-deliverable. inbox.sh 11/11 (acceptance suite for the basic chain): - happy path -> 202 - inbox tip advances; outbox tip unchanged (per-actor bucket independence carried through from Step 5a) - empty / garbage body -> 422 - unknown peer -> 401 - bad peer-AS keys -> 401 - replay (same activity twice) -> 422 on second - unknown target actor -> 404 - two distinct activities -> tip = 2 inbox_peer_resolution.sh 6/6 (Cfg resolution variants): - peer_actors gen_server hit -> 202 - FetchFn fallback -> 202 - FetchFn error -> 401 - FetchFn caches into peer_actors (peers_srv shows [bob] after) - No resolver -> 401 Tests split into two files because each epoch's kernel start_link + outbox construct + term_codec encode is expensive and a single suite hits the wall-clock budget. http_server.erl is now 1181 lines. erlang-load-module on this port scales superlinearly with function count, so eight http_*.sh tests' internal sx_server timeout bumped 60s -> 360s (http_route, http_actors, http_accept, http_capabilities, http_capabilities_format, http_content_type, http_artifacts, http_projections). Conformance 761/761. --- next/kernel/http_server.erl | 163 ++++++++++++++++++++++--- next/tests/http_accept.sh | 2 +- next/tests/http_actors.sh | 2 +- next/tests/http_artifacts.sh | 2 +- next/tests/http_capabilities.sh | 2 +- next/tests/http_capabilities_format.sh | 2 +- next/tests/http_content_type.sh | 2 +- next/tests/http_multi_actor.sh | 10 +- next/tests/http_projections.sh | 2 +- next/tests/http_route.sh | 2 +- next/tests/inbox.sh | 148 ++++++++++++++++++++++ next/tests/inbox_peer_resolution.sh | 119 ++++++++++++++++++ plans/fed-sx-milestone-2.md | 51 +++++++- 13 files changed, 473 insertions(+), 34 deletions(-) create mode 100755 next/tests/inbox.sh create mode 100755 next/tests/inbox_peer_resolution.sh diff --git a/next/kernel/http_server.erl b/next/kernel/http_server.erl index 48dadfdf..084d9ae9 100644 --- a/next/kernel/http_server.erl +++ b/next/kernel/http_server.erl @@ -77,10 +77,27 @@ route(Req, Cfg) -> <<47,46,119,101,108,108,45,107,110,111,119,110, 47,115,120,45,99,97,112,97,98,105,108,105,116,105,101,115>>} -> ok_response(capabilities_body_for(F)); + {<<80,79,83,84>>, _} -> + case match_prefix(actors_prefix(), P) of + {ok, Rest} when byte_size(Rest) > 0 -> + handle_post_actor(Rest, Req, Cfg1); + _ -> + dispatch(M, P, F, Cfg1) + end; _ -> dispatch(M, P, F, Cfg1) end. +%% handle_post_actor/3 — Step 5d ingest. Rest is the path after +%% "/actors/". Only `//inbox` is wired right now; other POST +%% sub-paths fall through to 404. + +handle_post_actor(Rest, Req, Cfg) -> + case split_first_slash(Rest) of + {Id, <<105,110,98,111,120>>} -> handle_inbox_post(Id, Req, Cfg); + _ -> not_found_response() + end. + %% with_request_query/2 — bake the Req's :query binary into Cfg as %% `{request_query, Q}` so sub-resource handlers can parse `?page=N` %% etc without taking the Req as an extra argument. @@ -137,15 +154,9 @@ dispatch(<<71, 69, 84>>, Path, F, Cfg) -> end end end; -%% POST /actors/{id}/inbox — peer-side delivery (Step 4a returns -%% 202 Accepted stub; Step 5 lands the real ingestion pipeline). -dispatch(<<80, 79, 83, 84>>, Path, _F, _Cfg) -> - case match_prefix(actors_prefix(), Path) of - {ok, Rest} when byte_size(Rest) > 0 -> - actor_post(Rest); - _ -> - not_found_response() - end; +%% POST handling moved to route/2 in Step 5d so the Req body and +%% full Cfg are in scope for the inbox pipeline. Anything that +%% reaches dispatch here is an unmatched method or path -> 404. dispatch(_, _, _, _) -> not_found_response(). @@ -176,14 +187,6 @@ actor_subresource_get(Id, <<102,111,108,108,111,119,105,110,103>>, F, _Cfg) -> actor_subresource_get(_, _, _, _) -> not_found_response(). -actor_post(Rest) -> - case split_first_slash(Rest) of - {_Id, <<105,110,98,111,120>>} -> - actor_inbox_post_response(); - _ -> - not_found_response() - end. - %% split_first_slash/1 — split a binary on the first slash. Returns %% {Before, After} where After omits the slash itself. If no slash %% is present, returns just Before. 47 = "/". @@ -1050,3 +1053,129 @@ projections_list_response_for(cbor) -> ok_response(Body, cbor); projections_list_response_for(_) -> projections_list_response(). + +%% ── Step 5d: POST /actors//inbox real ingestion ──────────── +%% +%% Wire format for v2: body is `term_codec:encode(SignedActivity)`, +%% which the receiver decodes into the activity proplist. Peer-AS +%% comes from Cfg's `:peer_actors` cache (a registered atom for the +%% peer_actors gen_server); on a cache miss the handler will fetch +%% via Cfg's `:peer_fetch_fn` if present, otherwise the peer is +%% considered unknown and the request is rejected as unauthorized. +%% +%% Status codes per design §16.1: +%% 202 Accepted — pipeline ok, activity appended to inbox +%% 401 Unauthorized — sig fail or peer unknown +%% 404 Not Found — target actor unknown +%% 422 Unprocessable — envelope / replay failure + +handle_inbox_post(TargetId, Req, Cfg) -> + case kernel_has_actor(field(kernel, Cfg), TargetId) of + false -> not_found_response(); + true -> + Body = field(body, Req), + case decode_activity(Body) of + {error, _} -> validation_failed_response(); + {ok, Activity} -> + handle_inbox_decoded(TargetId, Activity, Cfg) + end + end. + +handle_inbox_decoded(TargetId, Activity, Cfg) -> + case envelope:get_field(actor, Activity) of + not_found -> validation_failed_response(); + {ok, PeerId} -> + case resolve_peer_as(PeerId, Cfg) of + {error, _} -> unauthorized_response(); + {ok, PeerAS} -> + TargetAtom = list_to_atom(binary_to_list(TargetId)), + case nx_kernel:inbox_state_for(TargetAtom) of + {ok, InboxLog} -> + run_inbox_pipeline(TargetAtom, Activity, + PeerAS, InboxLog, Cfg); + _ -> not_found_response() + end + end + end. + +run_inbox_pipeline(TargetAtom, Activity, PeerAS, InboxLog, _Cfg) -> + case pipeline:validate_inbound(Activity, PeerAS, InboxLog) of + ok -> + nx_kernel:append_inbox(TargetAtom, Activity), + actor_inbox_post_response(); + {error, bad_signature} -> unauthorized_response(); + {error, no_signature} -> unauthorized_response(); + {error, _} -> validation_failed_response() + end. + +%% kernel_has_actor/2 — guard against unknown target actors. nil +%% kernel (e.g. tests without a kernel cfg'd) treats every Id as +%% present so the rest of the pipeline can still exercise. + +kernel_has_actor(nil, _Id) -> true; +kernel_has_actor(Kernel, Id) when is_atom(Kernel) -> + case erlang:whereis(Kernel) of + undefined -> false; + _ -> + A = list_to_atom(binary_to_list(Id)), + Actors = nx_kernel:actors(), + lists_member(A, Actors) + end; +kernel_has_actor(_, _) -> false. + +lists_member(_, []) -> false; +lists_member(X, [X | _]) -> true; +lists_member(X, [_ | Rest]) -> lists_member(X, Rest). + +%% decode_activity/1 — body wire format. v2 uses term_codec; v3 may +%% layer JSON or content negotiation on top. + +decode_activity(Body) -> + case term_codec:decode(Body) of + {ok, T, _} when is_list(T) -> {ok, T}; + _ -> {error, bad_envelope} + end. + +%% resolve_peer_as/2 — Cfg may carry: +%% {peer_actors, AtomName} registered peer_actors gen_server +%% {peer_fetch_fn, FetchFn} fallback FetchFn on cache miss +%% {peer_as, [{PeerId, AS}]} pure-fn pre-populated map (tests) +%% In priority order: explicit :peer_as map, then peer_actors srv +%% with optional FetchFn, then unknown. + +resolve_peer_as(PeerId, Cfg) -> + case field(peer_as, Cfg) of + nil -> resolve_peer_as_srv(PeerId, Cfg); + Map -> + case find_peer(PeerId, Map) of + {ok, AS} -> {ok, AS}; + _ -> resolve_peer_as_srv(PeerId, Cfg) + end + end. + +resolve_peer_as_srv(PeerId, Cfg) -> + case field(peer_actors, Cfg) of + nil -> {error, no_peer_resolver}; + Srv when is_atom(Srv) -> + case erlang:whereis(Srv) of + undefined -> {error, peer_actors_down}; + _ -> resolve_via_srv(PeerId, Cfg) + end; + _ -> {error, bad_peer_actors_cfg} + end. + +resolve_via_srv(PeerId, Cfg) -> + case field(peer_fetch_fn, Cfg) of + nil -> + case peer_actors:lookup_srv(PeerId) of + {ok, AS} -> {ok, AS}; + not_found -> {error, unknown_peer} + end; + FetchFn when is_function(FetchFn, 1) -> + peer_actors:lookup_or_fetch_srv(PeerId, FetchFn); + _ -> {error, bad_fetch_fn_cfg} + end. + +find_peer(_, []) -> not_found; +find_peer(K, [{K, V} | _]) -> {ok, V}; +find_peer(K, [_ | Rest]) -> find_peer(K, Rest). diff --git a/next/tests/http_accept.sh b/next/tests/http_accept.sh index 7b06a560..c9ae99c0 100755 --- a/next/tests/http_accept.sh +++ b/next/tests/http_accept.sh @@ -83,7 +83,7 @@ cat > "$TMPFILE" <<'EPOCHS' (eval "(get (erlang-eval-ast \"http_server:accept_format(some_atom)\") :name)") EPOCHS -OUTPUT=$(timeout 60 "$SX_SERVER" < "$TMPFILE" 2>/dev/null) +OUTPUT=$(timeout 360 "$SX_SERVER" < "$TMPFILE" 2>/dev/null) check() { local epoch="$1" desc="$2" expected="$3" diff --git a/next/tests/http_actors.sh b/next/tests/http_actors.sh index c0fe9b5c..a16b6c4e 100755 --- a/next/tests/http_actors.sh +++ b/next/tests/http_actors.sh @@ -84,7 +84,7 @@ cat > "$TMPFILE" <<'EPOCHS' (eval "(get (erlang-eval-ast \"Req1 = [{method, <<71,69,84>>}, {path, <<47>>}], Req2 = [{method, <<71,69,84>>}, {path, http_server:capabilities_path()}], R1 = case http_server:route(Req1) of [{status, 200} | _] -> ok; _ -> bad end, R2 = case http_server:route(Req2) of [{status, 200} | _] -> ok; _ -> bad end, {R1, R2} =:= {ok, ok}\") :name)") EPOCHS -OUTPUT=$(timeout 60 "$SX_SERVER" < "$TMPFILE" 2>/dev/null) +OUTPUT=$(timeout 360 "$SX_SERVER" < "$TMPFILE" 2>/dev/null) check() { local epoch="$1" desc="$2" expected="$3" diff --git a/next/tests/http_artifacts.sh b/next/tests/http_artifacts.sh index 35695d6a..54ab3e7d 100755 --- a/next/tests/http_artifacts.sh +++ b/next/tests/http_artifacts.sh @@ -67,7 +67,7 @@ cat > "$TMPFILE" <<'EPOCHS' (eval "(get (erlang-eval-ast \"case http_server:artifacts_prefix() of <<47, _/binary>> -> ok; _ -> bad end\") :name)") EPOCHS -OUTPUT=$(timeout 60 "$SX_SERVER" < "$TMPFILE" 2>/dev/null) +OUTPUT=$(timeout 360 "$SX_SERVER" < "$TMPFILE" 2>/dev/null) check() { local epoch="$1" desc="$2" expected="$3" diff --git a/next/tests/http_capabilities.sh b/next/tests/http_capabilities.sh index 11242526..5209226a 100755 --- a/next/tests/http_capabilities.sh +++ b/next/tests/http_capabilities.sh @@ -65,7 +65,7 @@ cat > "$TMPFILE" <<'EPOCHS' (eval "(get (erlang-eval-ast \"Req = [{method, <<71,69,84>>}, {path, <<47>>}], case http_server:route(Req) of [{status, 200} | _] -> ok; _ -> bad end\") :name)") EPOCHS -OUTPUT=$(timeout 60 "$SX_SERVER" < "$TMPFILE" 2>/dev/null) +OUTPUT=$(timeout 360 "$SX_SERVER" < "$TMPFILE" 2>/dev/null) check() { local epoch="$1" desc="$2" expected="$3" diff --git a/next/tests/http_capabilities_format.sh b/next/tests/http_capabilities_format.sh index 40942a7b..0014c998 100755 --- a/next/tests/http_capabilities_format.sh +++ b/next/tests/http_capabilities_format.sh @@ -88,7 +88,7 @@ cat > "$TMPFILE" <>}, {path, CapPath}], case http_server:route(Req) of [{status, 404} | _] -> ok; _ -> bad end\") :name)") EPOCHS -OUTPUT=$(timeout 60 "$SX_SERVER" < "$TMPFILE" 2>/dev/null) +OUTPUT=$(timeout 360 "$SX_SERVER" < "$TMPFILE" 2>/dev/null) check() { local epoch="$1" desc="$2" expected="$3" diff --git a/next/tests/http_content_type.sh b/next/tests/http_content_type.sh index 7654f3a0..2f1697fa 100755 --- a/next/tests/http_content_type.sh +++ b/next/tests/http_content_type.sh @@ -74,7 +74,7 @@ cat > "$TMPFILE" <<'EPOCHS' (eval "(get (erlang-eval-ast \"R = http_server:ok_response(<<1,2,3>>), case R of [{status, 200}, {headers, []}, {body, <<1,2,3>>}] -> ok; _ -> bad end\") :name)") EPOCHS -OUTPUT=$(timeout 60 "$SX_SERVER" < "$TMPFILE" 2>/dev/null) +OUTPUT=$(timeout 360 "$SX_SERVER" < "$TMPFILE" 2>/dev/null) check() { local epoch="$1" desc="$2" expected="$3" diff --git a/next/tests/http_multi_actor.sh b/next/tests/http_multi_actor.sh index e7a76d5c..115166ed 100755 --- a/next/tests/http_multi_actor.sh +++ b/next/tests/http_multi_actor.sh @@ -52,6 +52,8 @@ cat > "$TMPFILE" <<'EPOCHS' (eval "(get (erlang-load-module (file-read \"next/kernel/outbox.erl\")) :name)") (epoch 8) (eval "(get (erlang-load-module (file-read \"next/kernel/nx_kernel.erl\")) :name)") +(epoch 9) +(eval "(get (erlang-load-module (file-read \"next/kernel/term_codec.erl\")) :name)") ;; split_first_slash sanity (epoch 10) @@ -81,9 +83,11 @@ cat > "$TMPFILE" <<'EPOCHS' (epoch 24) (eval "(get (erlang-eval-ast \"Req = [{method, <<71,69,84>>}, {path, <<47,97,99,116,111,114,115,47,97,108,105,99,101,47,102,111,108,108,111,119,105,110,103>>}, {headers, []}, {body, <<>>}], R = http_server:route(Req), case R of [{status, 200}, _, {body, B}] -> http_server:match_prefix(<<102,111,108,108,111,119,105,110,103,58>>, B) =/= nomatch; _ -> false end\") :name)") -;; POST /actors/alice/inbox returns 202 with "accepted" +;; POST /actors/alice/inbox with empty body -> 422 (Step 5d +;; expects a term_codec-encoded signed activity; empty body fails +;; decoding before sig check runs). (epoch 25) -(eval "(get (erlang-eval-ast \"Req = [{method, <<80,79,83,84>>}, {path, <<47,97,99,116,111,114,115,47,97,108,105,99,101,47,105,110,98,111,120>>}, {headers, []}, {body, <<>>}], R = http_server:route(Req), case R of [{status, 202}, _, {body, B}] -> http_server:match_prefix(<<97,99,99,101,112,116,101,100>>, B) =/= nomatch; _ -> false end\") :name)") +(eval "(get (erlang-eval-ast \"Req = [{method, <<80,79,83,84>>}, {path, <<47,97,99,116,111,114,115,47,97,108,105,99,101,47,105,110,98,111,120>>}, {headers, []}, {body, <<>>}], R = http_server:route(Req), case R of [{status, 422}, _, _] -> true; _ -> false end\") :name)") ;; GET /actors/alice/unknown returns 404 (epoch 26) @@ -256,7 +260,7 @@ check 21 "GET /actors//outbox stub" "true" check 22 "GET /actors//inbox stub" "true" check 23 "GET /actors//followers stub" "true" check 24 "GET /actors//following stub" "true" -check 25 "POST /actors//inbox -> 202" "true" +check 25 "POST inbox empty body -> 422" "true" check 26 "GET /actors// -> 404" "true" check 27 "POST /actors// -> 404" "true" check 28 "GET /actors/ (empty) -> 404" "true" diff --git a/next/tests/http_projections.sh b/next/tests/http_projections.sh index 011764c8..0d71da1c 100755 --- a/next/tests/http_projections.sh +++ b/next/tests/http_projections.sh @@ -75,7 +75,7 @@ cat > "$TMPFILE" <<'EPOCHS' (eval "(get (erlang-eval-ast \"R1 = http_server:route([{method, <<71,69,84>>}, {path, <<47,97,99,116,111,114,115,47,97>>}]), R2 = http_server:route([{method, <<71,69,84>>}, {path, <<(http_server:artifacts_prefix())/binary, 98>>}]), R3 = http_server:route([{method, <<71,69,84>>}, {path, <<(http_server:projections_prefix())/binary, 99>>}]), case {R1, R2, R3} of {[{status, 200} | _], [{status, 200} | _], [{status, 200} | _]} -> ok; _ -> bad end\") :name)") EPOCHS -OUTPUT=$(timeout 60 "$SX_SERVER" < "$TMPFILE" 2>/dev/null) +OUTPUT=$(timeout 360 "$SX_SERVER" < "$TMPFILE" 2>/dev/null) check() { local epoch="$1" desc="$2" expected="$3" diff --git a/next/tests/http_route.sh b/next/tests/http_route.sh index 23a9e93f..fd0a44ec 100755 --- a/next/tests/http_route.sh +++ b/next/tests/http_route.sh @@ -77,7 +77,7 @@ cat > "$TMPFILE" <<'EPOCHS' (eval "(get (erlang-eval-ast \"byte_size(http_server:welcome_body()) > 0\") :name)") EPOCHS -OUTPUT=$(timeout 60 "$SX_SERVER" < "$TMPFILE" 2>/dev/null) +OUTPUT=$(timeout 360 "$SX_SERVER" < "$TMPFILE" 2>/dev/null) check() { local epoch="$1" desc="$2" expected="$3" diff --git a/next/tests/inbox.sh b/next/tests/inbox.sh new file mode 100755 index 00000000..bf3cb515 --- /dev/null +++ b/next/tests/inbox.sh @@ -0,0 +1,148 @@ +#!/usr/bin/env bash +# next/tests/inbox.sh — m2 Step 5d test (the federation acceptance +# suite for POST /actors//inbox). +# +# Wire format: body = term_codec:encode(SignedActivity). The +# receiver decodes, looks up the peer-AS (via Cfg :peer_as map or +# peer_actors gen_server), runs pipeline:validate_inbound/3 against +# the receiving actor's inbox log, and either: +# 202 Accepted pipeline ok, appended to inbox +# 401 Unauthorized bad sig / unknown peer +# 404 Not Found target actor unknown +# 422 Unprocessable envelope / replay failure + +set -uo pipefail +cd "$(git rev-parse --show-toplevel)" + +SX_SERVER="${SX_SERVER:-hosts/ocaml/_build/default/bin/sx_server.exe}" +if [ ! -x "$SX_SERVER" ]; then + SX_SERVER="/root/rose-ash/hosts/ocaml/_build/default/bin/sx_server.exe" +fi +if [ ! -x "$SX_SERVER" ]; then + echo "ERROR: sx_server.exe not found." >&2 + exit 1 +fi + +VERBOSE="${1:-}" +PASS=0; FAIL=0; ERRORS="" +TMPFILE=$(mktemp); trap "rm -f $TMPFILE" EXIT + +# Alice (target) hosts the kernel; Bob (peer) signs activities with BobKS. +# Alice's actor-state carries Alice's own key (not used for inbox +# verification — the peer-AS does). The :peer_as Cfg map gives the +# inbox handler bob's keys directly so peer-AS resolution doesn't +# need the peer_actors gen_server in the pure path. +SETUP='AK = <<1,2,3,4>>, AKS = [{key_id,k1},{algorithm,ed25519},{value,AK}], AAS = [{public_keys,[[{id,k1},{created,0},{value,AK}]]}], BK = <<5,6,7,8>>, BKS = [{key_id,k1},{algorithm,ed25519},{value,BK}], BAS = [{public_keys,[[{id,k1},{created,0},{value,BK}]]}], EvilK = <<9,9,9,9>>, EvilAS = [{public_keys,[[{id,k1},{created,0},{value,EvilK}]]}], Env = outbox:construct(note, bob, 1, [{content,hi}]), Signed = outbox:sign(Env, BKS), Body = term_codec:encode(Signed), nx_kernel:start_link(alice, AKS, AAS), InboxPath = <<47,97,99,116,111,114,115,47,97,108,105,99,101,47,105,110,98,111,120>>, Cfg = [{peer_as, [{bob, BAS}]}, {kernel, nx_kernel}],' + +cat > "$TMPFILE" < 202 +(epoch 20) +(eval "(get (erlang-eval-ast \"${SETUP} Req = [{method, <<80,79,83,84>>}, {path, InboxPath}, {headers, []}, {body, Body}], case http_server:route(Req, Cfg) of [{status, 202}, _, _] -> true; _ -> false end\") :name)") + +;; Happy path: inbox tip advances to 1 +(epoch 21) +(eval "(erlang-eval-ast \"${SETUP} Req = [{method, <<80,79,83,84>>}, {path, InboxPath}, {headers, []}, {body, Body}], http_server:route(Req, Cfg), nx_kernel:inbox_tip_for(alice)\")") + +;; Outbox tip stays 0 after inbox delivery (independent buckets) +(epoch 22) +(eval "(erlang-eval-ast \"${SETUP} Req = [{method, <<80,79,83,84>>}, {path, InboxPath}, {headers, []}, {body, Body}], http_server:route(Req, Cfg), nx_kernel:log_tip_for(alice)\")") + +;; Empty body -> 422 (decode failure before sig) +(epoch 23) +(eval "(get (erlang-eval-ast \"${SETUP} Req = [{method, <<80,79,83,84>>}, {path, InboxPath}, {headers, []}, {body, <<>>}], case http_server:route(Req, Cfg) of [{status, 422}, _, _] -> true; _ -> false end\") :name)") + +;; Garbage body -> 422 +(epoch 24) +(eval "(get (erlang-eval-ast \"${SETUP} Req = [{method, <<80,79,83,84>>}, {path, InboxPath}, {headers, []}, {body, <<99,99,99,99>>}], case http_server:route(Req, Cfg) of [{status, 422}, _, _] -> true; _ -> false end\") :name)") + +;; Unknown peer (no entry in :peer_as map) -> 401 +(epoch 25) +(eval "(get (erlang-eval-ast \"${SETUP} EmptyCfg = [{peer_as, []}, {kernel, nx_kernel}], Req = [{method, <<80,79,83,84>>}, {path, InboxPath}, {headers, []}, {body, Body}], case http_server:route(Req, EmptyCfg) of [{status, 401}, _, _] -> true; _ -> false end\") :name)") + +;; Wrong peer-AS keys (EvilAS) -> 401 (bad_signature) +(epoch 26) +(eval "(get (erlang-eval-ast \"${SETUP} EvilCfg = [{peer_as, [{bob, EvilAS}]}, {kernel, nx_kernel}], Req = [{method, <<80,79,83,84>>}, {path, InboxPath}, {headers, []}, {body, Body}], case http_server:route(Req, EvilCfg) of [{status, 401}, _, _] -> true; _ -> false end\") :name)") + +;; Replay: deliver same activity twice -> second one 422 +(epoch 27) +(eval "(get (erlang-eval-ast \"${SETUP} Req = [{method, <<80,79,83,84>>}, {path, InboxPath}, {headers, []}, {body, Body}], http_server:route(Req, Cfg), case http_server:route(Req, Cfg) of [{status, 422}, _, _] -> true; _ -> false end\") :name)") + +;; Unknown target actor -> 404 +(epoch 28) +(eval "(get (erlang-eval-ast \"${SETUP} GhostPath = <<47,97,99,116,111,114,115,47,103,104,111,115,116,47,105,110,98,111,120>>, Req = [{method, <<80,79,83,84>>}, {path, GhostPath}, {headers, []}, {body, Body}], case http_server:route(Req, Cfg) of [{status, 404}, _, _] -> true; _ -> false end\") :name)") + +;; Two distinct activities -> inbox tip = 2 +(epoch 29) +(eval "(erlang-eval-ast \"${SETUP} Env2 = outbox:construct(note, bob, 2, [{content,bye}]), Signed2 = outbox:sign(Env2, BKS), Body2 = term_codec:encode(Signed2), Req1 = [{method, <<80,79,83,84>>}, {path, InboxPath}, {headers, []}, {body, Body}], Req2 = [{method, <<80,79,83,84>>}, {path, InboxPath}, {headers, []}, {body, Body2}], http_server:route(Req1, Cfg), http_server:route(Req2, Cfg), nx_kernel:inbox_tip_for(alice)\")") + +EPOCHS + +OUTPUT=$(timeout 600 "$SX_SERVER" < "$TMPFILE" 2>/dev/null) + +check() { + local epoch="$1" desc="$2" expected="$3" + local actual + actual=$(echo "$OUTPUT" | awk -v e="$epoch" ' + $0 ~ "^\\(ok-len " e " " { getline; print; exit } + $0 ~ "^\\(ok " e " " { print; exit } + $0 ~ "^\\(error " e " " { print; exit } + ') + [ -z "$actual" ] && actual="" + if echo "$actual" | grep -qF -- "$expected"; then + PASS=$((PASS+1)) + [ "$VERBOSE" = "-v" ] && echo " ok $desc" + else + FAIL=$((FAIL+1)) + ERRORS+=" FAIL [$desc] (epoch $epoch) expected: $expected | actual: $actual +" + fi +} + +check 10 "http_server module loaded" "http_server" +check 20 "happy path -> 202" "true" +check 21 "inbox tip advances to 1" "1" +check 22 "outbox tip unchanged (= 0)" "0" +check 23 "empty body -> 422" "true" +check 24 "garbage body -> 422" "true" +check 25 "unknown peer -> 401" "true" +check 26 "bad peer-AS keys -> 401" "true" +check 27 "replay -> 422 on second delivery" "true" +check 28 "unknown target actor -> 404" "true" +check 29 "two activities -> inbox tip = 2" "2" + +TOTAL=$((PASS+FAIL)) +if [ $FAIL -eq 0 ]; then + echo "ok $PASS/$TOTAL next/tests/inbox.sh passed" +else + echo "FAIL $PASS/$TOTAL passed, $FAIL failed:" + echo "$ERRORS" +fi +[ $FAIL -eq 0 ] diff --git a/next/tests/inbox_peer_resolution.sh b/next/tests/inbox_peer_resolution.sh new file mode 100755 index 00000000..e74da97d --- /dev/null +++ b/next/tests/inbox_peer_resolution.sh @@ -0,0 +1,119 @@ +#!/usr/bin/env bash +# next/tests/inbox_peer_resolution.sh — m2 Step 5d-resolution test. +# +# Exercises the four peer-AS resolution paths the inbox handler +# supports via Cfg: +# :peer_as map pure-fn pre-populated proplist +# :peer_actors gen_server cache atom +# :peer_fetch_fn fallback on cache miss +# none reject as 401 +# +# Split out from inbox.sh so each suite gets its own scheduler +# budget — the cumulative cost of one kernel start_link per epoch +# pushes a single-file suite past the wall-clock timeout. + +set -uo pipefail +cd "$(git rev-parse --show-toplevel)" + +SX_SERVER="${SX_SERVER:-hosts/ocaml/_build/default/bin/sx_server.exe}" +if [ ! -x "$SX_SERVER" ]; then + SX_SERVER="/root/rose-ash/hosts/ocaml/_build/default/bin/sx_server.exe" +fi +if [ ! -x "$SX_SERVER" ]; then + echo "ERROR: sx_server.exe not found." >&2 + exit 1 +fi + +VERBOSE="${1:-}" +PASS=0; FAIL=0; ERRORS="" +TMPFILE=$(mktemp); trap "rm -f $TMPFILE" EXIT + +SETUP='AK = <<1,2,3,4>>, AKS = [{key_id,k1},{algorithm,ed25519},{value,AK}], AAS = [{public_keys,[[{id,k1},{created,0},{value,AK}]]}], BK = <<5,6,7,8>>, BKS = [{key_id,k1},{algorithm,ed25519},{value,BK}], BAS = [{public_keys,[[{id,k1},{created,0},{value,BK}]]}], Env = outbox:construct(note, bob, 1, [{content,hi}]), Signed = outbox:sign(Env, BKS), Body = term_codec:encode(Signed), nx_kernel:start_link(alice, AKS, AAS), InboxPath = <<47,97,99,116,111,114,115,47,97,108,105,99,101,47,105,110,98,111,120>>,' + +cat > "$TMPFILE" < 202 +(epoch 20) +(eval "(get (erlang-eval-ast \"${SETUP} peer_actors:start_link([{bob, BAS}]), SrvCfg = [{peer_actors, peer_actors}, {kernel, nx_kernel}], Req = [{method, <<80,79,83,84>>}, {path, InboxPath}, {headers, []}, {body, Body}], case http_server:route(Req, SrvCfg) of [{status, 202}, _, _] -> true; _ -> false end\") :name)") + +;; FetchFn fallback on cache miss +(epoch 21) +(eval "(get (erlang-eval-ast \"${SETUP} FetchFn = fun(bob) -> {ok, BAS}; (_) -> {error, not_found} end, peer_actors:start_link(), FetchCfg = [{peer_actors, peer_actors}, {peer_fetch_fn, FetchFn}, {kernel, nx_kernel}], Req = [{method, <<80,79,83,84>>}, {path, InboxPath}, {headers, []}, {body, Body}], case http_server:route(Req, FetchCfg) of [{status, 202}, _, _] -> true; _ -> false end\") :name)") + +;; FetchFn returning error -> 401 +(epoch 22) +(eval "(get (erlang-eval-ast \"${SETUP} BadFetch = fun(_) -> {error, http_404} end, peer_actors:start_link(), FetchCfg = [{peer_actors, peer_actors}, {peer_fetch_fn, BadFetch}, {kernel, nx_kernel}], Req = [{method, <<80,79,83,84>>}, {path, InboxPath}, {headers, []}, {body, Body}], case http_server:route(Req, FetchCfg) of [{status, 401}, _, _] -> true; _ -> false end\") :name)") + +;; FetchFn caches across deliveries (peers_srv shows [bob] after) +(epoch 23) +(eval "(get (erlang-eval-ast \"${SETUP} FetchFn = fun(bob) -> {ok, BAS}; (_) -> {error, not_found} end, peer_actors:start_link(), FetchCfg = [{peer_actors, peer_actors}, {peer_fetch_fn, FetchFn}, {kernel, nx_kernel}], Req = [{method, <<80,79,83,84>>}, {path, InboxPath}, {headers, []}, {body, Body}], http_server:route(Req, FetchCfg), peer_actors:peers_srv() =:= [bob]\") :name)") + +;; No peer-resolver cfg'd at all -> 401 +(epoch 24) +(eval "(get (erlang-eval-ast \"${SETUP} EmptyCfg = [{kernel, nx_kernel}], Req = [{method, <<80,79,83,84>>}, {path, InboxPath}, {headers, []}, {body, Body}], case http_server:route(Req, EmptyCfg) of [{status, 401}, _, _] -> true; _ -> false end\") :name)") + +EPOCHS + +OUTPUT=$(timeout 600 "$SX_SERVER" < "$TMPFILE" 2>/dev/null) + +check() { + local epoch="$1" desc="$2" expected="$3" + local actual + actual=$(echo "$OUTPUT" | awk -v e="$epoch" ' + $0 ~ "^\\(ok-len " e " " { getline; print; exit } + $0 ~ "^\\(ok " e " " { print; exit } + $0 ~ "^\\(error " e " " { print; exit } + ') + [ -z "$actual" ] && actual="" + if echo "$actual" | grep -qF -- "$expected"; then + PASS=$((PASS+1)) + [ "$VERBOSE" = "-v" ] && echo " ok $desc" + else + FAIL=$((FAIL+1)) + ERRORS+=" FAIL [$desc] (epoch $epoch) expected: $expected | actual: $actual +" + fi +} + +check 10 "http_server module loaded" "http_server" +check 20 "peer_actors srv lookup -> 202" "true" +check 21 "FetchFn fallback -> 202" "true" +check 22 "FetchFn error -> 401" "true" +check 23 "FetchFn caches into peer_actors" "true" +check 24 "no resolver cfg'd -> 401" "true" + +TOTAL=$((PASS+FAIL)) +if [ $FAIL -eq 0 ]; then + echo "ok $PASS/$TOTAL next/tests/inbox_peer_resolution.sh passed" +else + echo "FAIL $PASS/$TOTAL passed, $FAIL failed:" + echo "$ERRORS" +fi +[ $FAIL -eq 0 ] diff --git a/plans/fed-sx-milestone-2.md b/plans/fed-sx-milestone-2.md index 423c13ef..c41062de 100644 --- a/plans/fed-sx-milestone-2.md +++ b/plans/fed-sx-milestone-2.md @@ -375,12 +375,30 @@ actor *received*), and broadcasts to projections. for tests / fixtures. 19/19 in `peer_actors.sh`. The actual fetch implementation (HTTP GET of the peer's actor doc) is Step 5d's responsibility — for 5c, FetchFn is just a contract. -- [ ] **5d** — http_server inbox handler wires the chain: - `POST /actors//inbox` body is the signed activity wire bytes; - parse → resolve peer-AS → `validate_inbound` → `append_inbox` → - 202 on accept, 401 on bad sig, 422 on replay/shape failure, - 404 on unknown target actor. Activity broadcast to receiving - actor's projections (via `projection:async_fold`). +- [x] **5d** — http_server inbox handler wires the chain. POST + /actors//inbox is now special-cased in `route/2` (next to + POST /activity) so the body + full Cfg reach the handler. New + `handle_inbox_post/3` orchestrates: `kernel_has_actor` → + `decode_activity` (term_codec wire format) → `resolve_peer_as` + (Cfg `:peer_as` map > `:peer_actors` srv > `:peer_fetch_fn` + fallback) → `pipeline:validate_inbound/3` → `nx_kernel:append_inbox`. + Status codes: + - 202 Accepted on pipeline ok + inbox append + - 401 Unauthorized on bad_signature / no_signature / unknown + peer / fetch error + - 404 Not Found on unknown target actor + - 422 Unprocessable on shape / decode / replay failure + v1 stub `actor_post/1` removed; the route/2 special case + supersedes it. M1 `actor_inbox_post_response/0` kept for + callers that need to compose the response shape. + Projection broadcast on success is intentionally deferred — + the same TODO covers outbox broadcast invariance and lands in + a follow-up sub-deliverable. `inbox.sh` 11/11 covers happy + path / shape / sig / replay / unknown-target / multi-message; + `inbox_peer_resolution.sh` 6/6 covers the four peer-AS + resolution paths. Tests split into two files because the + cumulative cost of one kernel start_link per epoch pushed a + single suite past the wall-clock budget. **Acceptance:** `bash next/tests/inbox.sh` passes 16+ cases. @@ -790,6 +808,27 @@ proceed. Newest first. +- **2026-06-06** — Step 5d: POST /actors//inbox real ingestion. + `route/2` now special-cases POST `/actors//inbox` next to POST + `/activity` so the body + full Cfg reach the new + `handle_inbox_post/3` handler. Flow: + `kernel_has_actor` -> `decode_activity` (term_codec wire format) + -> `resolve_peer_as` (Cfg `:peer_as` map > `:peer_actors` srv > + `:peer_fetch_fn` fallback) -> `pipeline:validate_inbound/3` -> + `nx_kernel:append_inbox`. Status codes 202 / 401 / 404 / 422 + per design §16.1. v1 stub `actor_post/1` removed; M1 + `actor_inbox_post_response/0` kept for response shape composition. + Projection broadcast on inbox success intentionally deferred to a + follow-up. `inbox.sh` 11/11 (basic ingestion: happy path / shape + / sig / replay / unknown-target / multi-message); + `inbox_peer_resolution.sh` 6/6 (peer-AS resolution variants). + Split into two files because cumulative per-epoch kernel + start_link + outbox construct + term_codec encode pushed a + single suite past the wall-clock budget. http_server.erl now + 1181 lines — load time on this Erlang port scales superlinearly + with function count, so eight http_*.sh tests' internal sx_server + timeout bumped 60s → 360s. Conformance 761/761. + - **2026-06-06** — Step 5c: peer-actors cache (`peer_actors.erl`). Pure-functional cache of `{PeerActorId, PeerAS}` entries with the load-bearing `lookup_or_fetch/3(PeerId, FetchFn, State)`