fed-sx-m2: Step 5d — inbox handler wires the ingestion chain
Some checks failed
Test, Build, and Deploy / test-build-deploy (push) Failing after 38s
Some checks failed
Test, Build, and Deploy / test-build-deploy (push) Failing after 38s
POST /actors/<id>/inbox is now special-cased in route/2 (next to
POST /activity) so the body + Cfg reach the new handle_inbox_post/3
handler.
Wire format: body = term_codec:encode(SignedActivity); the receiver
decodes into the activity proplist and runs the chain.
handle_inbox_post/3 orchestration:
1. kernel_has_actor(field(kernel, Cfg), TargetId) -> 404 if missing
2. decode_activity(Body) -> 422 on bad shape
3. envelope:get_field(actor, Activity) -> 422 if no peer id
4. resolve_peer_as(PeerId, Cfg) -> 401 if unknown
5. nx_kernel:inbox_state_for(TargetAtom) -> 404 belt-and-braces
6. pipeline:validate_inbound(Activity, PeerAS, InboxLog)
ok -> nx_kernel:append_inbox + 202
{error, bad_signature} -> 401
{error, no_signature} -> 401
{error, _} -> 422
resolve_peer_as/2 supports three Cfg paths in priority order:
{peer_as, [{PeerId, AS}, ...]} pure-fn pre-populated map
{peer_actors, AtomName} peer_actors gen_server cache
{peer_fetch_fn, fun/1} fallback on srv cache miss
Empty Cfg returns {error, no_peer_resolver} -> 401.
v1 actor_post/1 4a stub deleted; M1 actor_inbox_post_response/0
kept for response composition.
Projection broadcast on inbox success intentionally deferred to a
follow-up sub-deliverable.
inbox.sh 11/11 (acceptance suite for the basic chain):
- happy path -> 202
- inbox tip advances; outbox tip unchanged (per-actor bucket
independence carried through from Step 5a)
- empty / garbage body -> 422
- unknown peer -> 401
- bad peer-AS keys -> 401
- replay (same activity twice) -> 422 on second
- unknown target actor -> 404
- two distinct activities -> tip = 2
inbox_peer_resolution.sh 6/6 (Cfg resolution variants):
- peer_actors gen_server hit -> 202
- FetchFn fallback -> 202
- FetchFn error -> 401
- FetchFn caches into peer_actors (peers_srv shows [bob] after)
- No resolver -> 401
Tests split into two files because each epoch's kernel start_link
+ outbox construct + term_codec encode is expensive and a single
suite hits the wall-clock budget.
http_server.erl is now 1181 lines. erlang-load-module on this port
scales superlinearly with function count, so eight http_*.sh tests'
internal sx_server timeout bumped 60s -> 360s (http_route,
http_actors, http_accept, http_capabilities, http_capabilities_format,
http_content_type, http_artifacts, http_projections).
Conformance 761/761.
This commit is contained in:
@@ -77,10 +77,27 @@ route(Req, Cfg) ->
|
||||
<<47,46,119,101,108,108,45,107,110,111,119,110,
|
||||
47,115,120,45,99,97,112,97,98,105,108,105,116,105,101,115>>} ->
|
||||
ok_response(capabilities_body_for(F));
|
||||
{<<80,79,83,84>>, _} ->
|
||||
case match_prefix(actors_prefix(), P) of
|
||||
{ok, Rest} when byte_size(Rest) > 0 ->
|
||||
handle_post_actor(Rest, Req, Cfg1);
|
||||
_ ->
|
||||
dispatch(M, P, F, Cfg1)
|
||||
end;
|
||||
_ ->
|
||||
dispatch(M, P, F, Cfg1)
|
||||
end.
|
||||
|
||||
%% handle_post_actor/3 — Step 5d ingest. Rest is the path after
|
||||
%% "/actors/". Only `/<id>/inbox` is wired right now; other POST
|
||||
%% sub-paths fall through to 404.
|
||||
|
||||
handle_post_actor(Rest, Req, Cfg) ->
|
||||
case split_first_slash(Rest) of
|
||||
{Id, <<105,110,98,111,120>>} -> handle_inbox_post(Id, Req, Cfg);
|
||||
_ -> not_found_response()
|
||||
end.
|
||||
|
||||
%% with_request_query/2 — bake the Req's :query binary into Cfg as
|
||||
%% `{request_query, Q}` so sub-resource handlers can parse `?page=N`
|
||||
%% etc without taking the Req as an extra argument.
|
||||
@@ -137,15 +154,9 @@ dispatch(<<71, 69, 84>>, Path, F, Cfg) ->
|
||||
end
|
||||
end
|
||||
end;
|
||||
%% POST /actors/{id}/inbox — peer-side delivery (Step 4a returns
|
||||
%% 202 Accepted stub; Step 5 lands the real ingestion pipeline).
|
||||
dispatch(<<80, 79, 83, 84>>, Path, _F, _Cfg) ->
|
||||
case match_prefix(actors_prefix(), Path) of
|
||||
{ok, Rest} when byte_size(Rest) > 0 ->
|
||||
actor_post(Rest);
|
||||
_ ->
|
||||
not_found_response()
|
||||
end;
|
||||
%% POST handling moved to route/2 in Step 5d so the Req body and
|
||||
%% full Cfg are in scope for the inbox pipeline. Anything that
|
||||
%% reaches dispatch here is an unmatched method or path -> 404.
|
||||
dispatch(_, _, _, _) ->
|
||||
not_found_response().
|
||||
|
||||
@@ -176,14 +187,6 @@ actor_subresource_get(Id, <<102,111,108,108,111,119,105,110,103>>, F, _Cfg) ->
|
||||
actor_subresource_get(_, _, _, _) ->
|
||||
not_found_response().
|
||||
|
||||
actor_post(Rest) ->
|
||||
case split_first_slash(Rest) of
|
||||
{_Id, <<105,110,98,111,120>>} ->
|
||||
actor_inbox_post_response();
|
||||
_ ->
|
||||
not_found_response()
|
||||
end.
|
||||
|
||||
%% split_first_slash/1 — split a binary on the first slash. Returns
|
||||
%% {Before, After} where After omits the slash itself. If no slash
|
||||
%% is present, returns just Before. 47 = "/".
|
||||
@@ -1050,3 +1053,129 @@ projections_list_response_for(cbor) ->
|
||||
ok_response(Body, cbor);
|
||||
projections_list_response_for(_) ->
|
||||
projections_list_response().
|
||||
|
||||
%% ── Step 5d: POST /actors/<id>/inbox real ingestion ────────────
|
||||
%%
|
||||
%% Wire format for v2: body is `term_codec:encode(SignedActivity)`,
|
||||
%% which the receiver decodes into the activity proplist. Peer-AS
|
||||
%% comes from Cfg's `:peer_actors` cache (a registered atom for the
|
||||
%% peer_actors gen_server); on a cache miss the handler will fetch
|
||||
%% via Cfg's `:peer_fetch_fn` if present, otherwise the peer is
|
||||
%% considered unknown and the request is rejected as unauthorized.
|
||||
%%
|
||||
%% Status codes per design §16.1:
|
||||
%% 202 Accepted — pipeline ok, activity appended to inbox
|
||||
%% 401 Unauthorized — sig fail or peer unknown
|
||||
%% 404 Not Found — target actor unknown
|
||||
%% 422 Unprocessable — envelope / replay failure
|
||||
|
||||
handle_inbox_post(TargetId, Req, Cfg) ->
|
||||
case kernel_has_actor(field(kernel, Cfg), TargetId) of
|
||||
false -> not_found_response();
|
||||
true ->
|
||||
Body = field(body, Req),
|
||||
case decode_activity(Body) of
|
||||
{error, _} -> validation_failed_response();
|
||||
{ok, Activity} ->
|
||||
handle_inbox_decoded(TargetId, Activity, Cfg)
|
||||
end
|
||||
end.
|
||||
|
||||
handle_inbox_decoded(TargetId, Activity, Cfg) ->
|
||||
case envelope:get_field(actor, Activity) of
|
||||
not_found -> validation_failed_response();
|
||||
{ok, PeerId} ->
|
||||
case resolve_peer_as(PeerId, Cfg) of
|
||||
{error, _} -> unauthorized_response();
|
||||
{ok, PeerAS} ->
|
||||
TargetAtom = list_to_atom(binary_to_list(TargetId)),
|
||||
case nx_kernel:inbox_state_for(TargetAtom) of
|
||||
{ok, InboxLog} ->
|
||||
run_inbox_pipeline(TargetAtom, Activity,
|
||||
PeerAS, InboxLog, Cfg);
|
||||
_ -> not_found_response()
|
||||
end
|
||||
end
|
||||
end.
|
||||
|
||||
run_inbox_pipeline(TargetAtom, Activity, PeerAS, InboxLog, _Cfg) ->
|
||||
case pipeline:validate_inbound(Activity, PeerAS, InboxLog) of
|
||||
ok ->
|
||||
nx_kernel:append_inbox(TargetAtom, Activity),
|
||||
actor_inbox_post_response();
|
||||
{error, bad_signature} -> unauthorized_response();
|
||||
{error, no_signature} -> unauthorized_response();
|
||||
{error, _} -> validation_failed_response()
|
||||
end.
|
||||
|
||||
%% kernel_has_actor/2 — guard against unknown target actors. nil
|
||||
%% kernel (e.g. tests without a kernel cfg'd) treats every Id as
|
||||
%% present so the rest of the pipeline can still exercise.
|
||||
|
||||
kernel_has_actor(nil, _Id) -> true;
|
||||
kernel_has_actor(Kernel, Id) when is_atom(Kernel) ->
|
||||
case erlang:whereis(Kernel) of
|
||||
undefined -> false;
|
||||
_ ->
|
||||
A = list_to_atom(binary_to_list(Id)),
|
||||
Actors = nx_kernel:actors(),
|
||||
lists_member(A, Actors)
|
||||
end;
|
||||
kernel_has_actor(_, _) -> false.
|
||||
|
||||
lists_member(_, []) -> false;
|
||||
lists_member(X, [X | _]) -> true;
|
||||
lists_member(X, [_ | Rest]) -> lists_member(X, Rest).
|
||||
|
||||
%% decode_activity/1 — body wire format. v2 uses term_codec; v3 may
|
||||
%% layer JSON or content negotiation on top.
|
||||
|
||||
decode_activity(Body) ->
|
||||
case term_codec:decode(Body) of
|
||||
{ok, T, _} when is_list(T) -> {ok, T};
|
||||
_ -> {error, bad_envelope}
|
||||
end.
|
||||
|
||||
%% resolve_peer_as/2 — Cfg may carry:
|
||||
%% {peer_actors, AtomName} registered peer_actors gen_server
|
||||
%% {peer_fetch_fn, FetchFn} fallback FetchFn on cache miss
|
||||
%% {peer_as, [{PeerId, AS}]} pure-fn pre-populated map (tests)
|
||||
%% In priority order: explicit :peer_as map, then peer_actors srv
|
||||
%% with optional FetchFn, then unknown.
|
||||
|
||||
resolve_peer_as(PeerId, Cfg) ->
|
||||
case field(peer_as, Cfg) of
|
||||
nil -> resolve_peer_as_srv(PeerId, Cfg);
|
||||
Map ->
|
||||
case find_peer(PeerId, Map) of
|
||||
{ok, AS} -> {ok, AS};
|
||||
_ -> resolve_peer_as_srv(PeerId, Cfg)
|
||||
end
|
||||
end.
|
||||
|
||||
resolve_peer_as_srv(PeerId, Cfg) ->
|
||||
case field(peer_actors, Cfg) of
|
||||
nil -> {error, no_peer_resolver};
|
||||
Srv when is_atom(Srv) ->
|
||||
case erlang:whereis(Srv) of
|
||||
undefined -> {error, peer_actors_down};
|
||||
_ -> resolve_via_srv(PeerId, Cfg)
|
||||
end;
|
||||
_ -> {error, bad_peer_actors_cfg}
|
||||
end.
|
||||
|
||||
resolve_via_srv(PeerId, Cfg) ->
|
||||
case field(peer_fetch_fn, Cfg) of
|
||||
nil ->
|
||||
case peer_actors:lookup_srv(PeerId) of
|
||||
{ok, AS} -> {ok, AS};
|
||||
not_found -> {error, unknown_peer}
|
||||
end;
|
||||
FetchFn when is_function(FetchFn, 1) ->
|
||||
peer_actors:lookup_or_fetch_srv(PeerId, FetchFn);
|
||||
_ -> {error, bad_fetch_fn_cfg}
|
||||
end.
|
||||
|
||||
find_peer(_, []) -> not_found;
|
||||
find_peer(K, [{K, V} | _]) -> {ok, V};
|
||||
find_peer(K, [_ | Rest]) -> find_peer(K, Rest).
|
||||
|
||||
Reference in New Issue
Block a user