Files
rose-ash/next/kernel/pipeline.erl
giles d103ecb863
Some checks failed
Test, Build, and Deploy / test-build-deploy (push) Failing after 23s
fed-sx-m2: Step 5b — pipeline:validate_inbound/3 + 14 tests
New federation inbound pipeline that runs envelope-shape -> peer
signature -> replay against the receiving actor's inbox log.

pipeline.erl additions:
  validate_inbound/3(Activity, PeerActorState, InboxLog)
      runs inbound_stages(PeerAS, InboxLog) and halts on first
      failure (existing run_stages/2 driver). Returns ok |
      {error, Reason}.
  inbound_stages/2(PeerAS, InboxLog)
      [stage_envelope, stage_signature(PeerAS), stage_replay(InboxLog)]

M1's validate_inbound/1 and the static inbound_stages/0 (envelope-
only) are preserved — outbox-side callers don't have to re-key on
a peer-AS they don't have.

Signature verification routes through the peer's actor-state
:public_keys (NOT the local kernel's actor-state). Peer-AS
resolution is the caller's responsibility for 5b; Step 5c wires
the peer-actors cache lookup.

14 cases in next/tests/inbox_pipeline.sh:
  - happy path: valid signed activity + correct peer AS + empty
    inbox -> ok
  - bad envelope shape -> {error, _} (stage_envelope rejects)
  - unsigned activity -> stage_envelope rejects on
    {missing_field, signature} before sig runs
  - wrong peer AS (peer's claimed key bytes differ from real) ->
    {error, bad_signature}
  - replay: inbox already contains the same activity -> {error, replay}
  - inbox with a different activity doesn't trigger replay
  - inbound_stages/2 returns exactly 3 stages
  - inbound_stages/0 still returns 1 stage
  - validate_inbound/1 still works
  - shape failure short-circuits before sig
  - sig failure short-circuits before replay
  - two distinct activities both verify against empty inbox
  - inbox-of-one doesn't replay the other

Conformance 761/761. 130/130 across 10 Step-5-adjacent suites
(pipeline_envelope, pipeline_signature, pipeline_replay,
pipeline_driver, inbox_pipeline, inbox_bucket, nx_kernel_multi,
bootstrap_start, http_publish, outbox_publish, smoke_app_pure).
2026-06-06 16:22:47 +00:00

168 lines
6.5 KiB
Erlang

-module(pipeline).
-export([run_stages/2,
validate_inbound/1, validate_inbound/3,
validate_outbound/1,
inbound_stages/0, inbound_stages/2, outbound_stages/0,
stage_envelope/1,
stage_signature/1, stage_signature/2,
stage_replay/1, stage_replay/2,
stage_schema/1, stage_schema/2]).
%% Validation pipeline per design §14.
%%
%% A stage is a 1-arity fun `(Activity) -> ok | {error, Reason}`.
%% The driver folds the activity through the stage list, halting
%% on the first error. The pure-functional driver itself takes a
%% stage list directly so tests can inject ad-hoc stage sequences
%% without depending on the bundled inbound/outbound lists.
%%
%% Inbound pipeline (full set per design §14): envelope, signature,
%% replay, audience, activity_schema, object_schema, content_validators,
%% capabilities, trust. Outbound is a subset (no replay, no trust;
%% auth handled at the HTTP layer).
%%
%% This sub-deliverable (6a) wires only the driver and the empty
%% stage lists. Concrete stages land in 6b-6c.
run_stages(_Activity, []) -> ok;
run_stages(Activity, [Stage | Rest]) ->
Result = Stage(Activity),
case Result of
ok -> run_stages(Activity, Rest);
{error, _} -> Result
end.
validate_inbound(Activity) ->
run_stages(Activity, inbound_stages()).
%% validate_inbound/3 — Step 5b federation inbound pipeline.
%%
%% Activity: the signed envelope as received from the peer.
%% PeerActorState: the peer's actor-state proplist carrying
%% :public_keys for signature verification. Caller
%% resolves this — for v2 it's either pre-populated
%% from a peer-actors cache (Step 5c) or known from
%% a two-instance test fixture.
%% InboxLog: the receiving actor's :actor_inbox log state.
%% Used by stage_replay to reject duplicate :id.
%%
%% Stages (per design §13.2 + §14):
%% stage_envelope — shape check
%% stage_signature(PeerAS) — peer sig verify
%% stage_replay(InboxLog) — replay defence against
%% receiving actor's inbox
%%
%% Returns ok | {error, Reason}. The driver halts on first failure.
%% Audience / schema / capabilities / trust stages defer to v3.
validate_inbound(Activity, PeerActorState, InboxLog) ->
run_stages(Activity, inbound_stages(PeerActorState, InboxLog)).
validate_outbound(Activity) ->
run_stages(Activity, outbound_stages()).
inbound_stages() ->
[fun (A) -> stage_envelope(A) end].
%% inbound_stages/2 — the full ordered stage list for federation
%% inbound (envelope -> peer sig -> replay against inbox).
inbound_stages(PeerActorState, InboxLog) ->
[fun (A) -> stage_envelope(A) end,
stage_signature(PeerActorState),
stage_replay(InboxLog)].
outbound_stages() ->
[fun (A) -> stage_envelope(A) end].
%% ── Concrete stages ─────────────────────────────────────────────
%% stage_envelope/1 — wrap envelope:validate_shape/1. The pipeline
%% driver expects ok | {error, R}; validate_shape returns exactly
%% that, so delegation is direct.
stage_envelope(Activity) ->
envelope:validate_shape(Activity).
%% stage_signature/2 — direct (Activity, ActorState) check. Wraps
%% envelope:verify_signature/2 from Step 2c. Useful for tests and
%% for callers that already have ActorState in scope.
stage_signature(Activity, ActorState) ->
envelope:verify_signature(Activity, ActorState).
%% stage_signature/1 — factory: takes the ActorState and returns a
%% 1-arity stage fun the pipeline driver can fold. This is how
%% signature checking gets composed into a stage list at runtime
%% (the static `inbound_stages/0` list omits it precisely because
%% ActorState isn't available at static-list build time).
stage_signature(ActorState) ->
fun (Activity) -> envelope:verify_signature(Activity, ActorState) end.
%% stage_replay/2 — checks the in-memory log for an existing
%% activity with the same :id. Returns ok if the activity is new,
%% `{error, replay}` if the log already carries it, `{error, no_id}`
%% if the activity has no :id field. The check is linear scan of
%% log entries; the projection scheduler (Step 7) will eventually
%% maintain a CID index that turns this into O(1).
stage_replay(Activity, LogState) ->
case envelope:get_field(id, Activity) of
not_found -> {error, no_id};
{ok, Id} ->
case log_has_id(Id, log:entries(LogState)) of
true -> {error, replay};
false -> ok
end
end.
stage_replay(LogState) ->
fun (Activity) -> stage_replay(Activity, LogState) end.
log_has_id(_, []) -> false;
log_has_id(Id, [Act | Rest]) ->
case envelope:get_field(id, Act) of
{ok, Id} -> true;
_ -> log_has_id(Id, Rest)
end.
%% stage_schema/2 — validates the activity's :object against the
%% schema registered for its :type. SchemaLookup is a caller-
%% supplied fun (Type) -> {ok, SchemaFn} | not_found; SchemaFn is
%% itself a fun (Object) -> bool. Returns:
%% ok when the schema accepts the object
%% {error, no_type} when the activity has no :type
%% {error, schema_mismatch} when SchemaFn returned false
%%
%% Open-world default: an unregistered Type returns ok so the
%% pipeline doesn't block activities the kernel hasn't yet learned
%% about. Tightening to strict-world happens later in milestone 2.
%%
%% Activities with no :object skip the schema check (some verbs
%% legitimately carry no object).
%%
%% The Erlang-fun shape is the substrate-friendly stand-in for the
%% SX-source :schema bodies stored in the genesis bundle. Once an
%% SX-source eval bridge exists, the same stage shape will dispatch
%% through it instead — no API change.
stage_schema(Activity, SchemaLookup) ->
case envelope:get_field(type, Activity) of
not_found -> {error, no_type};
{ok, Type} ->
case SchemaLookup(Type) of
not_found -> ok;
{ok, SchemaFn} ->
check_object_schema(Activity, SchemaFn)
end
end.
check_object_schema(Activity, SchemaFn) ->
case envelope:get_field(object, Activity) of
not_found -> ok;
{ok, Obj} ->
case SchemaFn(Obj) of
true -> ok;
false -> {error, schema_mismatch}
end
end.
stage_schema(SchemaLookup) ->
fun (Activity) -> stage_schema(Activity, SchemaLookup) end.