Some checks failed
Test, Build, and Deploy / test-build-deploy (push) Failing after 30s
136 lines
5.1 KiB
Erlang
136 lines
5.1 KiB
Erlang
-module(pipeline).
|
|
-export([run_stages/2,
|
|
validate_inbound/1, validate_outbound/1,
|
|
inbound_stages/0, outbound_stages/0,
|
|
stage_envelope/1,
|
|
stage_signature/1, stage_signature/2,
|
|
stage_replay/1, stage_replay/2,
|
|
stage_schema/1, stage_schema/2]).
|
|
|
|
%% Validation pipeline per design §14.
|
|
%%
|
|
%% A stage is a 1-arity fun `(Activity) -> ok | {error, Reason}`.
|
|
%% The driver folds the activity through the stage list, halting
|
|
%% on the first error. The pure-functional driver itself takes a
|
|
%% stage list directly so tests can inject ad-hoc stage sequences
|
|
%% without depending on the bundled inbound/outbound lists.
|
|
%%
|
|
%% Inbound pipeline (full set per design §14): envelope, signature,
|
|
%% replay, audience, activity_schema, object_schema, content_validators,
|
|
%% capabilities, trust. Outbound is a subset (no replay, no trust;
|
|
%% auth handled at the HTTP layer).
|
|
%%
|
|
%% This sub-deliverable (6a) wires only the driver and the empty
|
|
%% stage lists. Concrete stages land in 6b-6c.
|
|
|
|
run_stages(_Activity, []) -> ok;
|
|
run_stages(Activity, [Stage | Rest]) ->
|
|
Result = Stage(Activity),
|
|
case Result of
|
|
ok -> run_stages(Activity, Rest);
|
|
{error, _} -> Result
|
|
end.
|
|
|
|
validate_inbound(Activity) ->
|
|
run_stages(Activity, inbound_stages()).
|
|
|
|
validate_outbound(Activity) ->
|
|
run_stages(Activity, outbound_stages()).
|
|
|
|
inbound_stages() ->
|
|
[fun (A) -> stage_envelope(A) end].
|
|
|
|
outbound_stages() ->
|
|
[fun (A) -> stage_envelope(A) end].
|
|
|
|
%% ── Concrete stages ─────────────────────────────────────────────
|
|
|
|
%% stage_envelope/1 — wrap envelope:validate_shape/1. The pipeline
|
|
%% driver expects ok | {error, R}; validate_shape returns exactly
|
|
%% that, so delegation is direct.
|
|
stage_envelope(Activity) ->
|
|
envelope:validate_shape(Activity).
|
|
|
|
%% stage_signature/2 — direct (Activity, ActorState) check. Wraps
|
|
%% envelope:verify_signature/2 from Step 2c. Useful for tests and
|
|
%% for callers that already have ActorState in scope.
|
|
stage_signature(Activity, ActorState) ->
|
|
envelope:verify_signature(Activity, ActorState).
|
|
|
|
%% stage_signature/1 — factory: takes the ActorState and returns a
|
|
%% 1-arity stage fun the pipeline driver can fold. This is how
|
|
%% signature checking gets composed into a stage list at runtime
|
|
%% (the static `inbound_stages/0` list omits it precisely because
|
|
%% ActorState isn't available at static-list build time).
|
|
stage_signature(ActorState) ->
|
|
fun (Activity) -> envelope:verify_signature(Activity, ActorState) end.
|
|
|
|
%% stage_replay/2 — checks the in-memory log for an existing
|
|
%% activity with the same :id. Returns ok if the activity is new,
|
|
%% `{error, replay}` if the log already carries it, `{error, no_id}`
|
|
%% if the activity has no :id field. The check is linear scan of
|
|
%% log entries; the projection scheduler (Step 7) will eventually
|
|
%% maintain a CID index that turns this into O(1).
|
|
stage_replay(Activity, LogState) ->
|
|
case envelope:get_field(id, Activity) of
|
|
not_found -> {error, no_id};
|
|
{ok, Id} ->
|
|
case log_has_id(Id, log:entries(LogState)) of
|
|
true -> {error, replay};
|
|
false -> ok
|
|
end
|
|
end.
|
|
|
|
stage_replay(LogState) ->
|
|
fun (Activity) -> stage_replay(Activity, LogState) end.
|
|
|
|
log_has_id(_, []) -> false;
|
|
log_has_id(Id, [Act | Rest]) ->
|
|
case envelope:get_field(id, Act) of
|
|
{ok, Id} -> true;
|
|
_ -> log_has_id(Id, Rest)
|
|
end.
|
|
|
|
%% stage_schema/2 — validates the activity's :object against the
|
|
%% schema registered for its :type. SchemaLookup is a caller-
|
|
%% supplied fun (Type) -> {ok, SchemaFn} | not_found; SchemaFn is
|
|
%% itself a fun (Object) -> bool. Returns:
|
|
%% ok when the schema accepts the object
|
|
%% {error, no_type} when the activity has no :type
|
|
%% {error, schema_mismatch} when SchemaFn returned false
|
|
%%
|
|
%% Open-world default: an unregistered Type returns ok so the
|
|
%% pipeline doesn't block activities the kernel hasn't yet learned
|
|
%% about. Tightening to strict-world happens later in milestone 2.
|
|
%%
|
|
%% Activities with no :object skip the schema check (some verbs
|
|
%% legitimately carry no object).
|
|
%%
|
|
%% The Erlang-fun shape is the substrate-friendly stand-in for the
|
|
%% SX-source :schema bodies stored in the genesis bundle. Once an
|
|
%% SX-source eval bridge exists, the same stage shape will dispatch
|
|
%% through it instead — no API change.
|
|
stage_schema(Activity, SchemaLookup) ->
|
|
case envelope:get_field(type, Activity) of
|
|
not_found -> {error, no_type};
|
|
{ok, Type} ->
|
|
case SchemaLookup(Type) of
|
|
not_found -> ok;
|
|
{ok, SchemaFn} ->
|
|
check_object_schema(Activity, SchemaFn)
|
|
end
|
|
end.
|
|
|
|
check_object_schema(Activity, SchemaFn) ->
|
|
case envelope:get_field(object, Activity) of
|
|
not_found -> ok;
|
|
{ok, Obj} ->
|
|
case SchemaFn(Obj) of
|
|
true -> ok;
|
|
false -> {error, schema_mismatch}
|
|
end
|
|
end.
|
|
|
|
stage_schema(SchemaLookup) ->
|
|
fun (Activity) -> stage_schema(Activity, SchemaLookup) end.
|