fed-sx-m2: Step 6b — wire follower_graph fold to inbox handler
Some checks failed
Test, Build, and Deploy / test-build-deploy (push) Failing after 35s

http_server.erl run_inbox_pipeline now calls
broadcast_to_inbox_projections/2 after a successful
nx_kernel:append_inbox. Cfg may carry {inbox_projections,
[Name, ...]} listing projection gen_servers that should see every
successfully-ingested inbound activity. Each gets the activity via
projection:async_fold/2 — fire-and-forget so the inbox handler
doesn't block on fold processing. Empty / absent
:inbox_projections is a no-op (back-compat with Step 5d callers).

v2 leaves the routing field global (every inbound activity goes
to every named projection); per-actor projection wiring is a
forward-looking follow-up.

9/9 in next/tests/follow_lifecycle.sh:
  - Follow ingestion -> 202
  - follower_graph state: alice.pending_inbound = [bob]
  - follower_graph state: bob.pending_outbound = [alice]
  - inbox tip advances to 1 (Step 5a invariant preserved)
  - no inbox_projections Cfg -> projection state stays empty
  - end-to-end: Follow + Accept fold converges to
    alice.followers = [bob] and bob.following = [alice]
    (Accept fed via projection:async_fold for v2 — auto-Accept
    publish is Step 6c)
  - bad-sig inbound short-circuits before broadcast
  - two distinct peer Follows accumulate

bootstrap_start.sh internal sx_server timeout bumped 300s -> 600s
to match the cumulative cost trend other tests are seeing on this
port. (bootstrap_start doesn't load http_server but loads bootstrap
+ the full genesis bundle + 9 kernel modules — same cumulative
compile budget.)

Conformance 761/761.
This commit is contained in:
2026-06-06 21:59:43 +00:00
parent e890380a1a
commit 1d83120918
4 changed files with 197 additions and 7 deletions

View File

@@ -1098,16 +1098,40 @@ handle_inbox_decoded(TargetId, Activity, Cfg) ->
end
end.
run_inbox_pipeline(TargetAtom, Activity, PeerAS, InboxLog, _Cfg) ->
run_inbox_pipeline(TargetAtom, Activity, PeerAS, InboxLog, Cfg) ->
case pipeline:validate_inbound(Activity, PeerAS, InboxLog) of
ok ->
nx_kernel:append_inbox(TargetAtom, Activity),
broadcast_to_inbox_projections(Activity, Cfg),
actor_inbox_post_response();
{error, bad_signature} -> unauthorized_response();
{error, no_signature} -> unauthorized_response();
{error, _} -> validation_failed_response()
end.
%% broadcast_to_inbox_projections/2 — Step 6b. Cfg may carry
%% `{inbox_projections, [Name, ...]}` listing projection gen_servers
%% that should see every successfully-ingested inbound activity.
%% Casts via projection:async_fold/2 — fire-and-forget so the inbox
%% handler doesn't block on projection processing.
%%
%% No-op when the field is absent. v2 v2 layers per-actor projection
%% routing on top (each actor's bucket can carry its own projection
%% list); for now the field is global.
broadcast_to_inbox_projections(Activity, Cfg) ->
case field(inbox_projections, Cfg) of
nil -> ok;
Names when is_list(Names) ->
broadcast_each(Activity, Names);
_ -> ok
end.
broadcast_each(_, []) -> ok;
broadcast_each(Activity, [Name | Rest]) ->
projection:async_fold(Name, Activity),
broadcast_each(Activity, Rest).
%% kernel_has_actor/2 — guard against unknown target actors. nil
%% kernel (e.g. tests without a kernel cfg'd) treats every Id as
%% present so the rest of the pipeline can still exercise.