Files
rose-ash/next/kernel/envelope.erl
giles 53b4a4c1fd
Some checks failed
Test, Build, and Deploy / test-build-deploy (push) Failing after 31s
fed-sx-m1: Step 2c — envelope:verify_signature/2 (time-aware key lookup + HMAC stand-in) + 11 tests
2026-05-26 21:00:39 +00:00

178 lines
6.3 KiB
Erlang

-module(envelope).
-export([validate_shape/1, get_field/2, canonical_bytes/1, verify_signature/2]).
%% Activity envelope per design §3.1.
%%
%% Erlang maps (#{...}) are not supported by this port, so envelopes
%% are represented as property lists of {atom_key, value} pairs. This
%% port's binary syntax also can't carry string literals; values that
%% would naturally be binaries in real Erlang are kept as atoms or
%% integer-segment binaries in the test corpus.
%%
%% Required fields: id, type, actor, published, signature.
%% The signature value is itself a property list with key_id,
%% algorithm, value.
%%
%% validate_shape/1 returns ok | {error, Reason}. Reasons:
%% not_a_proplist
%% {missing_field, FieldName}
%% {bad_signature, BadSigReason}
%%
%% get_field/2 returns {ok, Value} | not_found.
validate_shape(Env) when is_list(Env) ->
case check_required([id, type, actor, published, signature], Env) of
ok -> validate_signature_shape(Env);
Err -> Err
end;
validate_shape(_) ->
{error, not_a_proplist}.
get_field(_, []) -> not_found;
get_field(K, [{K, V} | _]) -> {ok, V};
get_field(K, [_ | Rest]) -> get_field(K, Rest).
check_required([], _) -> ok;
check_required([F | Rest], Env) ->
case get_field(F, Env) of
{ok, _} -> check_required(Rest, Env);
not_found -> {error, {missing_field, F}}
end.
validate_signature_shape(Env) ->
{ok, Sig} = get_field(signature, Env),
case is_list(Sig) of
true ->
case check_required([key_id, algorithm, value], Sig) of
ok -> ok;
{error, {missing_field, F}} ->
{error, {bad_signature, {missing_field, F}}}
end;
false ->
{error, {bad_signature, not_a_proplist}}
end.
%% canonical_bytes/1 — the byte string the signature covers.
%%
%% Real fed-sx will use dag-cbor over a JSON-LD-canonicalised form
%% (design §3.2). For milestone 1 we stand in for that with the host
%% BIF `cid:to_string/1`, which produces a CIDv1 over the deterministic
%% textual form of the term. Two prior steps make this work:
%% 1. The signature pair is stripped (sig covers everything except
%% itself).
%% 2. The top-level property list is sorted by key so field order in
%% the source envelope is not load-bearing.
%%
%% The result is an Erlang binary suitable as the sig-cover input.
canonical_bytes(Env) when is_list(Env) ->
Stripped = strip_signature(Env),
Sorted = sort_pairs(Stripped),
cid:to_string(Sorted).
strip_signature([]) -> [];
strip_signature([{signature, _} | Rest]) -> strip_signature(Rest);
strip_signature([P | Rest]) -> [P | strip_signature(Rest)].
sort_pairs([]) -> [];
sort_pairs([H | T]) -> insert_pair(H, sort_pairs(T)).
insert_pair(P, []) -> [P];
insert_pair({K1, V1}, [{K2, V2} | Rest]) ->
case K1 < K2 of
true -> [{K1, V1}, {K2, V2} | Rest];
false -> [{K2, V2} | insert_pair({K1, V1}, Rest)]
end.
%% verify_signature/2 — time-aware sig verification per design §9.6.
%%
%% Activity carries a `signature` proplist with `key_id`, `algorithm`,
%% `value`. ActorState carries `public_keys` — a list of key proplists
%% with `id`, `created`, optionally `superseded_at`, and `value` (the
%% key material).
%%
%% A key is active at time T iff `created =< T` AND
%% (no `superseded_at` OR T < `superseded_at`). Verification picks the
%% first matching active key whose `id == signature.key_id` at the
%% activity's `published` timestamp, then recomputes the MAC
%% `crypto:hash(sha256, <<KeyMaterial/binary, CanonicalBytes/binary>>)`
%% and compares it to `signature.value`.
%%
%% Returns ok | {error, Reason}. Reasons:
%% no_signature | no_key_id | no_published | no_keys |
%% no_active_key | bad_signature
%%
%% Real RSA-SHA256 / Ed25519 verification is deferred to milestone 2:
%% Phase 8 only ships `crypto:hash/2`, so we stand in with an HMAC-shaped
%% MAC that exercises the same key-lookup and canonical-bytes pipeline.
verify_signature(Activity, ActorState) ->
case get_field(signature, Activity) of
not_found -> {error, no_signature};
{ok, Sig} ->
case get_field(key_id, Sig) of
not_found -> {error, no_key_id};
{ok, KeyId} ->
case get_field(published, Activity) of
not_found -> {error, no_published};
{ok, Published} ->
verify_with_keys(Activity, Sig, KeyId,
Published, ActorState)
end
end
end.
verify_with_keys(Activity, Sig, KeyId, Published, ActorState) ->
case get_field(public_keys, ActorState) of
not_found -> {error, no_keys};
{ok, Keys} ->
case find_active_key(KeyId, Published, Keys) of
not_found -> {error, no_active_key};
{ok, Key} -> verify_mac(Activity, Sig, Key)
end
end.
find_active_key(_, _, []) -> not_found;
find_active_key(KeyId, Now, [Key | Rest]) ->
case is_matching_active_key(Key, KeyId, Now) of
true -> {ok, Key};
false -> find_active_key(KeyId, Now, Rest)
end.
is_matching_active_key(Key, WantId, Now) ->
case get_field(id, Key) of
{ok, WantId} -> is_active_at(Key, Now);
_ -> false
end.
is_active_at(Key, Now) ->
case get_field(created, Key) of
not_found -> false;
{ok, Created} ->
case Now >= Created of
false -> false;
true ->
case get_field(superseded_at, Key) of
not_found -> true;
{ok, SupAt} -> Now < SupAt
end
end
end.
verify_mac(Activity, Sig, Key) ->
case get_field(value, Sig) of
not_found -> {error, bad_signature};
{ok, SigValue} ->
case get_field(value, Key) of
not_found -> {error, bad_signature};
{ok, KeyMat} ->
Bytes = canonical_bytes(Activity),
Computed = crypto:hash(sha256,
<<KeyMat/binary, Bytes/binary>>),
case SigValue =:= Computed of
true -> ok;
false -> {error, bad_signature}
end
end
end.