diff --git a/next/kernel/pipeline.erl b/next/kernel/pipeline.erl index ba3ace63..90ab913b 100644 --- a/next/kernel/pipeline.erl +++ b/next/kernel/pipeline.erl @@ -6,7 +6,8 @@ stage_envelope/1, stage_signature/1, stage_signature/2, stage_replay/1, stage_replay/2, - stage_schema/1, stage_schema/2]). + stage_schema/1, stage_schema/2, + apply_object_schema/2, stage_object_schema/1]). %% Validation pipeline per design §14. %% @@ -165,3 +166,138 @@ check_object_schema(Activity, SchemaFn) -> stage_schema(SchemaLookup) -> fun (Activity) -> stage_schema(Activity, SchemaLookup) end. + +%% ── host-type fed Step 4: object-schema validation stage ──────── +%% +%% apply_object_schema/2 — when an inbound activity's :object declares +%% a refinement type ({type, TypeName} on the object), resolve that +%% type's record and apply its refinement schema to the object's +%% :field_values. Sits between activity-type (stage_schema) validation +%% and the kernel append; rejects the activity on schema-fail. +%% +%% Resolution mirrors the design note: TypeName -> TypeCid via Cfg's +%% `type_index` ([{TypeName, TypeCid}, ...], the local Define-name +%% index), then TypeCid -> TypeRecord via peer_types:lookup_or_fetch/2 +%% (a local cache hit, or a wire fetch through the Cfg type_fetch_fn). +%% +%% Outcomes: +%% object has no {type, _} -> ok (no schema applies) +%% TypeName not in type_index -> ok (undeclared type; +%% open-world default) +%% record resolved, schema passes -> ok +%% record resolved, schema fails -> {error, {validation_failed, +%% object_schema}} +%% record unresolvable (cache miss + -> strict_object_schema: +%% fetch failure / no peer_types) true -> {error, ...} +%% false -> ok (skipped) +%% +%% Default strict_object_schema = false: a node only blocks on an +%% unresolvable type when it opts into airtight validation via Cfg +%% {strict_object_schema, true}. The non-strict skip is where a +%% `validation_skipped` log entry belongs (left to the caller's logger +%% so this stage keeps the ok | {error, _} contract run_stages wants). +%% +%% A TypeRecord's refinement schema is either a 1-arity Erlang +%% predicate over the field-values (the substrate stand-in, for +%% locally-defined types) or a data constraint {required, [Field, ...]} +%% (term_codec-safe, so a wire-fetched TypeRecord can still validate). + +apply_object_schema(Activity, Cfg) -> + case object_type_name(Activity) of + none -> ok; + {ok, TypeName} -> + case type_cid_for(TypeName, Cfg) of + none -> ok; + {ok, TypeCid} -> + case resolve_type_record(TypeCid, Cfg) of + {ok, TR} -> check_object_against(Activity, TR); + {error, _} -> on_unresolved_type(Cfg) + end + end + end. + +stage_object_schema(Cfg) -> + fun (Activity) -> apply_object_schema(Activity, Cfg) end. + +object_type_name(Activity) -> + case envelope:get_field(object, Activity) of + {ok, Obj} when is_list(Obj) -> + case envelope:get_field(type, Obj) of + {ok, T} -> {ok, T}; + _ -> none + end; + _ -> none + end. + +object_field_values(Activity) -> + case envelope:get_field(object, Activity) of + {ok, Obj} when is_list(Obj) -> + case envelope:get_field(field_values, Obj) of + {ok, FV} -> FV; + _ -> [] + end; + _ -> [] + end. + +type_cid_for(TypeName, Cfg) -> + case stage_field(type_index, Cfg) of + nil -> none; + Index -> + case find_keyed(TypeName, Index) of + {ok, Cid} -> {ok, Cid}; + _ -> none + end + end. + +resolve_type_record(TypeCid, Cfg) -> + case stage_field(peer_types, Cfg) of + nil -> {error, no_peer_types}; + _ -> + case erlang:whereis(peer_types) of + undefined -> {error, peer_types_down}; + _ -> peer_types:lookup_or_fetch(TypeCid, Cfg) + end + end. + +on_unresolved_type(Cfg) -> + case stage_field(strict_object_schema, Cfg) of + true -> {error, {validation_failed, object_schema}}; + _ -> ok + end. + +check_object_against(Activity, TR) -> + case stage_field(refinement_schema, TR) of + nil -> ok; + Schema -> apply_refinement(Schema, object_field_values(Activity)) + end. + +apply_refinement(Fn, FieldValues) when is_function(Fn, 1) -> + case Fn(FieldValues) of + true -> ok; + _ -> {error, {validation_failed, object_schema}} + end; +apply_refinement({required, Fields}, FieldValues) -> + case all_present(Fields, FieldValues) of + true -> ok; + false -> {error, {validation_failed, object_schema}} + end; +apply_refinement(_, _) -> ok. + +all_present([], _) -> true; +all_present([F | Rest], FV) -> + case has_key(F, FV) of + true -> all_present(Rest, FV); + false -> false + end. + +has_key(_, []) -> false; +has_key(K, [{K, _} | _]) -> true; +has_key(K, [_ | Rest]) -> has_key(K, Rest). + +stage_field(K, [{K, V} | _]) -> V; +stage_field(K, [_ | Rest]) -> stage_field(K, Rest); +stage_field(_, []) -> nil. + +find_keyed(_, []) -> {error, not_found}; +find_keyed(K, [{K, V} | _]) -> {ok, V}; +find_keyed(K, [_ | Rest]) -> find_keyed(K, Rest). diff --git a/next/tests/object_schema.sh b/next/tests/object_schema.sh new file mode 100755 index 00000000..07a5f0fa --- /dev/null +++ b/next/tests/object_schema.sh @@ -0,0 +1,154 @@ +#!/usr/bin/env bash +# next/tests/object_schema.sh — host-type federation Phase 4. +# +# pipeline:apply_object_schema/2 validates an inbound activity's inner +# object against its declared refinement type. The type is resolved +# TypeName -> TypeCid (Cfg type_index) -> TypeRecord +# (peer_types:lookup_or_fetch, a local hit or a wire fetch), then the +# record's refinement schema is applied to the object's :field_values. +# Default strict_object_schema = false: an unresolvable type is let +# through; opt-in strict rejects. +# +# Refinement schemas are either a 1-arity Erlang predicate (the +# substrate stand-in, locally stored) or a term_codec-safe +# {required, [Field,...]} constraint (so a wire-fetched record still +# validates). Both are exercised here. + +set -uo pipefail +cd "$(git rev-parse --show-toplevel)" + +SX_SERVER="${SX_SERVER:-hosts/ocaml/_build/default/bin/sx_server.exe}" +if [ ! -x "$SX_SERVER" ]; then + SX_SERVER="/root/rose-ash/hosts/ocaml/_build/default/bin/sx_server.exe" +fi +if [ ! -x "$SX_SERVER" ]; then + echo "ERROR: sx_server.exe not found." >&2 + exit 1 +fi + +VERBOSE="${1:-}" +PASS=0; FAIL=0; ERRORS="" +TMPFILE=$(mktemp); trap "rm -f $TMPFILE" EXIT + +# Cid is the Post type's CID; TRdata carries a data-form refinement +# (object must have a `title` field), TRfun the Erlang-predicate form. +# ActValid's object has :title, ActFail's doesn't, ActNoType's object +# declares no type, ActUnknown's type isn't in the index. PostName is +# <<"Post">>, title "Hi" = <<72,105>>. Index maps name -> Cid. +SETUP='Cid = <<98,97,102,121,80>>, PostName = <<80,111,115,116>>, TRdata = [{name, PostName}, {refinement_schema, {required, [title]}}], TRfun = [{name, PostName}, {refinement_schema, fun(FV) -> case FV of [{title, _} | _] -> true; _ -> false end end}], ObjValid = [{type, PostName}, {field_values, [{title, <<72,105>>}, {body, <<104,105>>}]}], ObjFail = [{type, PostName}, {field_values, [{body, <<104,105>>}]}], ActValid = [{type, create}, {actor, alice}, {object, ObjValid}], ActFail = [{type, create}, {actor, alice}, {object, ObjFail}], ActNoType = [{type, create}, {actor, alice}, {object, [{field_values, [{title, <<72,105>>}]}]}], ActUnknown = [{type, create}, {actor, alice}, {object, [{type, <<82,101,112,108,121>>}, {field_values, [{title, <<72,105>>}]}]}], Index = [{PostName, Cid}], FAIL = {error, {validation_failed, object_schema}},' + +cat > "$TMPFILE" < accepted +(epoch 10) +(eval "(get (erlang-eval-ast \"${SETUP} peer_types:start_link(), peer_types:put(Cid, TRdata), Cfg = [{peer_types, peer_types}, {type_index, Index}], pipeline:apply_object_schema(ActValid, Cfg) =:= ok\") :name)") +;; local match + refinement-failing object -> rejected +(epoch 11) +(eval "(get (erlang-eval-ast \"${SETUP} peer_types:start_link(), peer_types:put(Cid, TRdata), Cfg = [{peer_types, peer_types}, {type_index, Index}], pipeline:apply_object_schema(ActFail, Cfg) =:= FAIL\") :name)") + +;; type not cached, fetch succeeds -> validates against fetched record +(epoch 12) +(eval "(get (erlang-eval-ast \"${SETUP} peer_types:start_link(), Cfg = [{peer_types, peer_types}, {type_index, Index}, {type_fetch_fn, fun(_, _) -> {ok, term_codec:encode(TRdata)} end}], pipeline:apply_object_schema(ActValid, Cfg) =:= ok\") :name)") +;; fetched record, failing object -> rejected +(epoch 13) +(eval "(get (erlang-eval-ast \"${SETUP} peer_types:start_link(), Cfg = [{peer_types, peer_types}, {type_index, Index}, {type_fetch_fn, fun(_, _) -> {ok, term_codec:encode(TRdata)} end}], pipeline:apply_object_schema(ActFail, Cfg) =:= FAIL\") :name)") + +;; unknown type, fetch fails, strict not set -> accepted (skipped) +(epoch 14) +(eval "(get (erlang-eval-ast \"${SETUP} peer_types:start_link(), Cfg = [{peer_types, peer_types}, {type_index, Index}, {type_fetch_fn, fun(_, _) -> {error, http_404} end}], pipeline:apply_object_schema(ActValid, Cfg) =:= ok\") :name)") +;; unknown type, fetch fails, strict set -> rejected +(epoch 15) +(eval "(get (erlang-eval-ast \"${SETUP} peer_types:start_link(), Cfg = [{peer_types, peer_types}, {type_index, Index}, {type_fetch_fn, fun(_, _) -> {error, http_404} end}, {strict_object_schema, true}], pipeline:apply_object_schema(ActValid, Cfg) =:= FAIL\") :name)") +;; no peer_types cfg at all, non-strict -> accepted (skipped) +(epoch 16) +(eval "(get (erlang-eval-ast \"${SETUP} Cfg = [{type_index, Index}], pipeline:apply_object_schema(ActValid, Cfg) =:= ok\") :name)") +;; no peer_types cfg, strict -> rejected +(epoch 17) +(eval "(get (erlang-eval-ast \"${SETUP} Cfg = [{type_index, Index}, {strict_object_schema, true}], pipeline:apply_object_schema(ActValid, Cfg) =:= FAIL\") :name)") + +;; object without inner {type, _} -> skipped (accepted) +(epoch 18) +(eval "(get (erlang-eval-ast \"${SETUP} peer_types:start_link(), peer_types:put(Cid, TRdata), Cfg = [{peer_types, peer_types}, {type_index, Index}], pipeline:apply_object_schema(ActNoType, Cfg) =:= ok\") :name)") +;; object type not in the local index -> skipped (open-world) +(epoch 19) +(eval "(get (erlang-eval-ast \"${SETUP} peer_types:start_link(), peer_types:put(Cid, TRdata), Cfg = [{peer_types, peer_types}, {type_index, Index}], pipeline:apply_object_schema(ActUnknown, Cfg) =:= ok\") :name)") + +;; Erlang-predicate refinement schema: valid -> ok, failing -> reject +(epoch 20) +(eval "(get (erlang-eval-ast \"${SETUP} peer_types:start_link(), peer_types:put(Cid, TRfun), Cfg = [{peer_types, peer_types}, {type_index, Index}], pipeline:apply_object_schema(ActValid, Cfg) =:= ok\") :name)") +(epoch 21) +(eval "(get (erlang-eval-ast \"${SETUP} peer_types:start_link(), peer_types:put(Cid, TRfun), Cfg = [{peer_types, peer_types}, {type_index, Index}], pipeline:apply_object_schema(ActFail, Cfg) =:= FAIL\") :name)") + +;; type known but record carries no refinement schema -> accepted +(epoch 22) +(eval "(get (erlang-eval-ast \"${SETUP} peer_types:start_link(), peer_types:put(Cid, [{name, PostName}]), Cfg = [{peer_types, peer_types}, {type_index, Index}], pipeline:apply_object_schema(ActFail, Cfg) =:= ok\") :name)") + +;; stage_object_schema/1 yields a 1-arity stage usable by run_stages +(epoch 23) +(eval "(get (erlang-eval-ast \"${SETUP} peer_types:start_link(), peer_types:put(Cid, TRdata), Cfg = [{peer_types, peer_types}, {type_index, Index}], Stage = pipeline:stage_object_schema(Cfg), is_function(Stage, 1) andalso pipeline:run_stages(ActValid, [Stage]) =:= ok andalso pipeline:run_stages(ActFail, [Stage]) =:= FAIL\") :name)") +EPOCHS + +OUTPUT=$(timeout 300 "$SX_SERVER" < "$TMPFILE" 2>/dev/null) + +check() { + local epoch="$1" desc="$2" expected="$3" + local actual + actual=$(echo "$OUTPUT" | awk -v e="$epoch" ' + $0 ~ "^\\(ok-len " e " " { getline; print; exit } + $0 ~ "^\\(ok " e " " { print; exit } + $0 ~ "^\\(error " e " " { print; exit } + ') + [ -z "$actual" ] && actual="" + if echo "$actual" | grep -qF -- "$expected"; then + PASS=$((PASS+1)) + [ "$VERBOSE" = "-v" ] && echo " ok $desc" + else + FAIL=$((FAIL+1)) + ERRORS+=" FAIL [$desc] (epoch $epoch) expected: $expected | actual: $actual +" + fi +} + +check 6 "pipeline module loaded" "pipeline" +check 10 "local match + valid -> accepted" "true" +check 11 "local match + failing -> rejected" "true" +check 12 "fetch ok -> validates fetched record" "true" +check 13 "fetched record + failing -> rejected" "true" +check 14 "fetch fail, non-strict -> accepted" "true" +check 15 "fetch fail, strict -> rejected" "true" +check 16 "no peer_types, non-strict -> accepted" "true" +check 17 "no peer_types, strict -> rejected" "true" +check 18 "object without type -> skipped" "true" +check 19 "type not in index -> skipped" "true" +check 20 "fun schema valid -> accepted" "true" +check 21 "fun schema failing -> rejected" "true" +check 22 "no refinement schema -> accepted" "true" +check 23 "stage_object_schema composes" "true" + +TOTAL=$((PASS+FAIL)) +if [ $FAIL -eq 0 ]; then + echo "ok $PASS/$TOTAL next/tests/object_schema.sh passed" +else + echo "FAIL $PASS/$TOTAL passed, $FAIL failed:" + echo "$ERRORS" +fi +[ $FAIL -eq 0 ] diff --git a/plans/fed-sx-host-types.md b/plans/fed-sx-host-types.md index 69ad2806..3c1a9505 100644 --- a/plans/fed-sx-host-types.md +++ b/plans/fed-sx-host-types.md @@ -68,7 +68,7 @@ Cfg-supplied `type_fetch_fn :: fun ((TypeCid, Cfg) -> {ok, Bytes} | record. No fn → `{error, no_fetch_fn}`; fetch error or bad bytes do not poison the cache. Test: `next/tests/peer_types.sh`. -### Step 3 — `/types/` route + `discovery_type_fetch.erl` — TODO +### Step 3 — `/types/` route + `discovery_type_fetch.erl` — DONE `http_server.erl` serves `GET /types/` with `Accept: application/vnd.fed-sx.type-doc`: the cached TypeRecord @@ -76,15 +76,21 @@ poison the cache. Test: `next/tests/peer_types.sh`. holds the live-HTTP closure that `peer_types:lookup_or_fetch` calls. Tests: `next/tests/peer_types_route.sh`, `next/tests/discovery_type_fetch.sh`. -### Step 4 — object-schema validation stage in `pipeline.erl` — TODO +### Step 4 — object-schema validation stage in `pipeline.erl` — DONE -A new `apply_object_schema/2` stage between activity-type validation -and the kernel append. When an inbound object carries `{type, TypeName}`, -resolve the TypeRecord (local Define-name index → CID → -`peer_types:lookup_or_fetch`) and apply its refinement schema to the -object's field-values. Default `strict_object_schema = false`: an -unresolvable type is let through with a `validation_skipped` log; -opt-in strict mode rejects. Test: `next/tests/object_schema.sh`. +`pipeline:apply_object_schema/2` (+ `stage_object_schema/1` factory) +sits between activity-type validation and the kernel append. When an +inbound object carries `{type, TypeName}`, resolve the TypeRecord +(Cfg `type_index`: TypeName → TypeCid; then +`peer_types:lookup_or_fetch/2`) and apply its refinement schema to the +object's `:field_values`. The schema is either a 1-arity Erlang +predicate (the substrate stand-in, for locally-defined types) or a +term_codec-safe `{required, [Field, ...]}` data constraint (so a +wire-fetched record validates too). Default `strict_object_schema = +false`: an unresolvable type is let through (the non-strict skip is +where a `validation_skipped` log belongs); opt-in strict rejects. +Objects with no declared type, and type names absent from the local +index, are skipped (open-world). Test: `next/tests/object_schema.sh`. ## Out of scope (deliberately)