Files
rose-ash/next/kernel/outbox.erl
giles 1ea47681b2
Some checks failed
Test, Build, and Deploy / test-build-deploy (push) Failing after 28s
fed-sx-m1: Step 7c — outbox:publish broadcasts to projection processes + 14 tests
2026-05-28 06:57:36 +00:00

117 lines
4.6 KiB
Erlang

-module(outbox).
-export([construct/4, sign/2, cid_of/1, publish/2]).
%% Outbox envelope construction + signing per design §3.1.
%%
%% construct/4 builds an unsigned activity envelope from caller-supplied
%% (Type, ActorId, Published, Object). The envelope's `:id` field is
%% derived from the host `cid:to_string` BIF over a skeleton tag, so
%% recipients can address the activity by its content hash. The
%% returned property list is the canonical key-sorted form that
%% `envelope:canonical_bytes/1` operates on.
%%
%% sign/2 takes the unsigned envelope plus a KeySpec proplist that
%% mirrors a `public_keys` entry: `[{key_id, _}, {algorithm, _},
%% {value, KeyMaterial}]`. It computes the v1 HMAC stand-in
%% `crypto:hash(sha256, <<KeyMaterial/binary, CanonicalBytes/binary>>)`
%% — the same scheme `envelope:verify_signature/2` checks — and
%% appends a `:signature` pair.
%%
%% Real Ed25519 / RSA signing arrives in milestone 2 once
%% `crypto:sign_ed25519/2` BIFs land; the API shape doesn't change.
%% construct/4 — Type and ActorId are atoms; Published is an
%% integer timestamp the caller supplies (no clock BIF in this
%% port; the HTTP layer / outbox:publish caller injects it).
%% Object can be any term, including a property list of inner
%% fields.
construct(Type, ActorId, Published, Object) ->
Skeleton = [{actor, ActorId},
{object, Object},
{published, Published},
{type, Type}],
Id = cid:to_string({activity_envelope, Skeleton}),
[{actor, ActorId},
{id, Id},
{object, Object},
{published, Published},
{type, Type}].
%% sign/2 — KeySpec carries key_id, algorithm, value (key material).
sign(Envelope, KeySpec) ->
{ok, KeyId} = envelope:get_field(key_id, KeySpec),
{ok, Alg} = envelope:get_field(algorithm, KeySpec),
{ok, KM} = envelope:get_field(value, KeySpec),
CB = envelope:canonical_bytes(Envelope),
SigValue = crypto:hash(sha256, <<KM/binary, CB/binary>>),
Sig = [{algorithm, Alg}, {key_id, KeyId}, {value, SigValue}],
Envelope ++ [{signature, Sig}].
%% cid_of/1 — extract the :id field from a constructed envelope.
%% Convenience for callers that don't want to thread the CID
%% separately when both the envelope and its ID matter.
cid_of(Envelope) ->
{ok, Id} = envelope:get_field(id, Envelope),
Id.
%% publish/2 — the outbound activity pipeline orchestrator.
%%
%% Request shape: [{type, T}, {object, O}]
%% Context shape: [{actor_id, A}, {published, P}, {key_spec, KS},
%% {actor_state, AS}, {log, L}]
%%
%% Returns:
%% {ok, [{cid, Cid}, {activity, Signed}], NewLog} — happy path
%% {error, Reason, LogState} — validation halted
%%
%% Stages run in order: envelope shape, signature, replay. The
%% replay check uses the log state pre-append, so if the caller
%% publishes the same Request twice with the same Published
%% timestamp the second call halts with {error, replay, _}.
%%
%% Projection-scheduler dispatch (the async fold the design calls
%% for) is deferred to Step 7 — once the projection gen_server
%% exists, this function will broadcast `Signed` to it.
publish(Request, Context) ->
Type = envelope_field(type, Request),
Object = envelope_field(object, Request),
ActorId = envelope_field(actor_id, Context),
Published = envelope_field(published, Context),
KeySpec = envelope_field(key_spec, Context),
ActorState = envelope_field(actor_state, Context),
LogState = envelope_field(log, Context),
Unsigned = construct(Type, ActorId, Published, Object),
Signed = sign(Unsigned, KeySpec),
Stages = [
fun (A) -> pipeline:stage_envelope(A) end,
pipeline:stage_signature(ActorState),
pipeline:stage_replay(LogState)
],
case pipeline:run_stages(Signed, Stages) of
ok ->
{ok, NewLog, _Seq} = log:append(LogState, Signed),
broadcast(Signed, envelope_field(projections, Context)),
Result = [{cid, cid_of(Signed)}, {activity, Signed}],
{ok, Result, NewLog};
{error, Reason} ->
{error, Reason, LogState}
end.
%% broadcast/2 — fire-and-forget cast to each named projection.
%% Missing/nil/empty list is a no-op; the publish API does not
%% require projections to exist. Activity is the post-sign Signed
%% envelope (same value that landed in the log).
broadcast(_Activity, nil) -> ok;
broadcast(_Activity, []) -> ok;
broadcast(Activity, [Name | Rest]) ->
projection:async_fold(Name, Activity),
broadcast(Activity, Rest).
envelope_field(K, PL) ->
case envelope:get_field(K, PL) of
{ok, V} -> V;
not_found -> nil
end.