Some checks failed
Test, Build, and Deploy / test-build-deploy (push) Failing after 12m51s
outbox:publish/2 now computes the audience-resolved delivery set
after sign + log and stashes it in the Result proplist as
{delivery_set, [ActorId, ...]}. Step 8's delivery-queue worker
reads it off the publish result.
New compute_delivery_set/3(Request, Signed, Context):
- Pulls :follower_graph from Context (defaults to empty graph)
- Calls recipients_envelope/2 to synthesise a minimal envelope
from Request's :to / :cc + Signed's :actor
- Routes through delivery:delivery_set/3 unchanged
The envelope construct/4 surface doesn't carry :to / :cc (only
type / actor / published / object), and changing that ripples
through every envelope shape test. recipients_envelope/2 keeps
the compute boundary local to outbox.
4 new cases in outbox_publish.sh (17/17 total):
- Result :delivery_set empty default
- explicit :to -> [bob] in set
- followers symbol expands via Context :follower_graph
- self-suppression (alice in :to drops to []bob])
Module loads rebumped: follower_graph + delivery added as
dependencies; outbox shifts from epoch 5 to epoch 7. Internal
sx_server timeout bumped 240s -> 480s to fit the larger module
set.
Step 7 fully closed (7a delivery module + 7b public expansion
+ 7c outbox integration). Federation now has the end-to-end
audience resolution: an outbound activity's :to / :cc plus any
follower_graph expansion becomes a deduped recipient list ready
for Step 8 to dispatch.
Conformance running + adjacent gate running.
157 lines
6.2 KiB
Erlang
157 lines
6.2 KiB
Erlang
-module(outbox).
|
|
-export([construct/4, sign/2, cid_of/1, publish/2]).
|
|
|
|
%% Outbox envelope construction + signing per design §3.1.
|
|
%%
|
|
%% construct/4 builds an unsigned activity envelope from caller-supplied
|
|
%% (Type, ActorId, Published, Object). The envelope's `:id` field is
|
|
%% derived from the host `cid:to_string` BIF over a skeleton tag, so
|
|
%% recipients can address the activity by its content hash. The
|
|
%% returned property list is the canonical key-sorted form that
|
|
%% `envelope:canonical_bytes/1` operates on.
|
|
%%
|
|
%% sign/2 takes the unsigned envelope plus a KeySpec proplist that
|
|
%% mirrors a `public_keys` entry: `[{key_id, _}, {algorithm, _},
|
|
%% {value, KeyMaterial}]`. It computes the v1 HMAC stand-in
|
|
%% `crypto:hash(sha256, <<KeyMaterial/binary, CanonicalBytes/binary>>)`
|
|
%% — the same scheme `envelope:verify_signature/2` checks — and
|
|
%% appends a `:signature` pair.
|
|
%%
|
|
%% Real Ed25519 / RSA signing arrives in milestone 2 once
|
|
%% `crypto:sign_ed25519/2` BIFs land; the API shape doesn't change.
|
|
|
|
%% construct/4 — Type and ActorId are atoms; Published is an
|
|
%% integer timestamp the caller supplies (no clock BIF in this
|
|
%% port; the HTTP layer / outbox:publish caller injects it).
|
|
%% Object can be any term, including a property list of inner
|
|
%% fields.
|
|
construct(Type, ActorId, Published, Object) ->
|
|
Skeleton = [{actor, ActorId},
|
|
{object, Object},
|
|
{published, Published},
|
|
{type, Type}],
|
|
Id = cid:to_string({activity_envelope, Skeleton}),
|
|
[{actor, ActorId},
|
|
{id, Id},
|
|
{object, Object},
|
|
{published, Published},
|
|
{type, Type}].
|
|
|
|
%% sign/2 — KeySpec carries key_id, algorithm, value (key material).
|
|
sign(Envelope, KeySpec) ->
|
|
{ok, KeyId} = envelope:get_field(key_id, KeySpec),
|
|
{ok, Alg} = envelope:get_field(algorithm, KeySpec),
|
|
{ok, KM} = envelope:get_field(value, KeySpec),
|
|
CB = envelope:canonical_bytes(Envelope),
|
|
SigValue = crypto:hash(sha256, <<KM/binary, CB/binary>>),
|
|
Sig = [{algorithm, Alg}, {key_id, KeyId}, {value, SigValue}],
|
|
Envelope ++ [{signature, Sig}].
|
|
|
|
%% cid_of/1 — extract the :id field from a constructed envelope.
|
|
%% Convenience for callers that don't want to thread the CID
|
|
%% separately when both the envelope and its ID matter.
|
|
cid_of(Envelope) ->
|
|
{ok, Id} = envelope:get_field(id, Envelope),
|
|
Id.
|
|
|
|
%% publish/2 — the outbound activity pipeline orchestrator.
|
|
%%
|
|
%% Request shape: [{type, T}, {object, O}]
|
|
%% Context shape: [{actor_id, A}, {published, P}, {key_spec, KS},
|
|
%% {actor_state, AS}, {log, L}]
|
|
%%
|
|
%% Returns:
|
|
%% {ok, [{cid, Cid}, {activity, Signed}], NewLog} — happy path
|
|
%% {error, Reason, LogState} — validation halted
|
|
%%
|
|
%% Stages run in order: envelope shape, signature, replay. The
|
|
%% replay check uses the log state pre-append, so if the caller
|
|
%% publishes the same Request twice with the same Published
|
|
%% timestamp the second call halts with {error, replay, _}.
|
|
%%
|
|
%% Projection-scheduler dispatch (the async fold the design calls
|
|
%% for) is deferred to Step 7 — once the projection gen_server
|
|
%% exists, this function will broadcast `Signed` to it.
|
|
|
|
publish(Request, Context) ->
|
|
Type = envelope_field(type, Request),
|
|
Object = envelope_field(object, Request),
|
|
ActorId = envelope_field(actor_id, Context),
|
|
Published = envelope_field(published, Context),
|
|
KeySpec = envelope_field(key_spec, Context),
|
|
ActorState = envelope_field(actor_state, Context),
|
|
LogState = envelope_field(log, Context),
|
|
Unsigned = construct(Type, ActorId, Published, Object),
|
|
Signed = sign(Unsigned, KeySpec),
|
|
Stages = [
|
|
fun (A) -> pipeline:stage_envelope(A) end,
|
|
pipeline:stage_signature(ActorState),
|
|
pipeline:stage_replay(LogState)
|
|
],
|
|
case pipeline:run_stages(Signed, Stages) of
|
|
ok ->
|
|
{ok, NewLog, _Seq} = log:append(LogState, Signed),
|
|
broadcast(Signed, envelope_field(projections, Context)),
|
|
DeliverySet = compute_delivery_set(Request, Signed, Context),
|
|
Result = [{cid, cid_of(Signed)},
|
|
{activity, Signed},
|
|
{delivery_set, DeliverySet}],
|
|
{ok, Result, NewLog};
|
|
{error, Reason} ->
|
|
{error, Reason, LogState}
|
|
end.
|
|
|
|
%% compute_delivery_set/3 — Step 7c. Pulls the audience-resolved
|
|
%% recipient list off the Request's `:to` / `:cc` fields (the
|
|
%% envelope itself doesn't carry them — construct/4 only takes
|
|
%% type / actor / published / object). Context's optional
|
|
%% `:follower_graph` field carries a follower_graph state for
|
|
%% `public` / `followers` audience expansion; absent -> empty graph,
|
|
%% so explicit `:to` / `:cc` lists still resolve. Synthesises a
|
|
%% recipient-shaped envelope from Request + Signed so the existing
|
|
%% delivery:delivery_set/3 (which reads `:actor`, `:to`, `:cc`) can
|
|
%% process it as-is.
|
|
%%
|
|
%% Step 8's delivery-queue worker reads `{delivery_set, [ActorId, ...]}`
|
|
%% off the publish result and routes one HTTP POST per entry.
|
|
|
|
compute_delivery_set(Request, Signed, Context) ->
|
|
Graph = case envelope_field(follower_graph, Context) of
|
|
nil -> follower_graph:new();
|
|
G -> G
|
|
end,
|
|
Recipients = recipients_envelope(Request, Signed),
|
|
delivery:delivery_set(Recipients, [], Graph).
|
|
|
|
recipients_envelope(Request, Signed) ->
|
|
Base = case envelope:get_field(actor, Signed) of
|
|
{ok, A} -> [{actor, A}];
|
|
_ -> []
|
|
end,
|
|
To = case envelope:get_field(to, Request) of
|
|
{ok, T} -> [{to, T}];
|
|
_ -> []
|
|
end,
|
|
Cc = case envelope:get_field(cc, Request) of
|
|
{ok, C} -> [{cc, C}];
|
|
_ -> []
|
|
end,
|
|
Base ++ To ++ Cc.
|
|
|
|
%% broadcast/2 — fire-and-forget cast to each named projection.
|
|
%% Missing/nil/empty list is a no-op; the publish API does not
|
|
%% require projections to exist. Activity is the post-sign Signed
|
|
%% envelope (same value that landed in the log).
|
|
broadcast(_Activity, nil) -> ok;
|
|
broadcast(_Activity, []) -> ok;
|
|
broadcast(Activity, [Name | Rest]) ->
|
|
projection:async_fold(Name, Activity),
|
|
broadcast(Activity, Rest).
|
|
|
|
envelope_field(K, PL) ->
|
|
case envelope:get_field(K, PL) of
|
|
{ok, V} -> V;
|
|
not_found -> nil
|
|
end.
|
|
|