-module(pipeline). -export([run_stages/2, validate_inbound/1, validate_outbound/1, inbound_stages/0, outbound_stages/0, stage_envelope/1, stage_signature/1, stage_signature/2, stage_replay/1, stage_replay/2, stage_schema/1, stage_schema/2]). %% Validation pipeline per design §14. %% %% A stage is a 1-arity fun `(Activity) -> ok | {error, Reason}`. %% The driver folds the activity through the stage list, halting %% on the first error. The pure-functional driver itself takes a %% stage list directly so tests can inject ad-hoc stage sequences %% without depending on the bundled inbound/outbound lists. %% %% Inbound pipeline (full set per design §14): envelope, signature, %% replay, audience, activity_schema, object_schema, content_validators, %% capabilities, trust. Outbound is a subset (no replay, no trust; %% auth handled at the HTTP layer). %% %% This sub-deliverable (6a) wires only the driver and the empty %% stage lists. Concrete stages land in 6b-6c. run_stages(_Activity, []) -> ok; run_stages(Activity, [Stage | Rest]) -> Result = Stage(Activity), case Result of ok -> run_stages(Activity, Rest); {error, _} -> Result end. validate_inbound(Activity) -> run_stages(Activity, inbound_stages()). validate_outbound(Activity) -> run_stages(Activity, outbound_stages()). inbound_stages() -> [fun (A) -> stage_envelope(A) end]. outbound_stages() -> [fun (A) -> stage_envelope(A) end]. %% ── Concrete stages ───────────────────────────────────────────── %% stage_envelope/1 — wrap envelope:validate_shape/1. The pipeline %% driver expects ok | {error, R}; validate_shape returns exactly %% that, so delegation is direct. stage_envelope(Activity) -> envelope:validate_shape(Activity). %% stage_signature/2 — direct (Activity, ActorState) check. Wraps %% envelope:verify_signature/2 from Step 2c. Useful for tests and %% for callers that already have ActorState in scope. stage_signature(Activity, ActorState) -> envelope:verify_signature(Activity, ActorState). %% stage_signature/1 — factory: takes the ActorState and returns a %% 1-arity stage fun the pipeline driver can fold. This is how %% signature checking gets composed into a stage list at runtime %% (the static `inbound_stages/0` list omits it precisely because %% ActorState isn't available at static-list build time). stage_signature(ActorState) -> fun (Activity) -> envelope:verify_signature(Activity, ActorState) end. %% stage_replay/2 — checks the in-memory log for an existing %% activity with the same :id. Returns ok if the activity is new, %% `{error, replay}` if the log already carries it, `{error, no_id}` %% if the activity has no :id field. The check is linear scan of %% log entries; the projection scheduler (Step 7) will eventually %% maintain a CID index that turns this into O(1). stage_replay(Activity, LogState) -> case envelope:get_field(id, Activity) of not_found -> {error, no_id}; {ok, Id} -> case log_has_id(Id, log:entries(LogState)) of true -> {error, replay}; false -> ok end end. stage_replay(LogState) -> fun (Activity) -> stage_replay(Activity, LogState) end. log_has_id(_, []) -> false; log_has_id(Id, [Act | Rest]) -> case envelope:get_field(id, Act) of {ok, Id} -> true; _ -> log_has_id(Id, Rest) end. %% stage_schema/2 — validates the activity's :object against the %% schema registered for its :type. SchemaLookup is a caller- %% supplied fun (Type) -> {ok, SchemaFn} | not_found; SchemaFn is %% itself a fun (Object) -> bool. Returns: %% ok when the schema accepts the object %% {error, no_type} when the activity has no :type %% {error, schema_mismatch} when SchemaFn returned false %% %% Open-world default: an unregistered Type returns ok so the %% pipeline doesn't block activities the kernel hasn't yet learned %% about. Tightening to strict-world happens later in milestone 2. %% %% Activities with no :object skip the schema check (some verbs %% legitimately carry no object). %% %% The Erlang-fun shape is the substrate-friendly stand-in for the %% SX-source :schema bodies stored in the genesis bundle. Once an %% SX-source eval bridge exists, the same stage shape will dispatch %% through it instead — no API change. stage_schema(Activity, SchemaLookup) -> case envelope:get_field(type, Activity) of not_found -> {error, no_type}; {ok, Type} -> case SchemaLookup(Type) of not_found -> ok; {ok, SchemaFn} -> check_object_schema(Activity, SchemaFn) end end. check_object_schema(Activity, SchemaFn) -> case envelope:get_field(object, Activity) of not_found -> ok; {ok, Obj} -> case SchemaFn(Obj) of true -> ok; false -> {error, schema_mismatch} end end. stage_schema(SchemaLookup) -> fun (Activity) -> stage_schema(Activity, SchemaLookup) end.