fed-sx-types Phase 7: pipeline trigger fan-out + flow_dispatch
Some checks failed
Test, Build, and Deploy / test-build-deploy (push) Failing after 44s

The post-append fan-out that fires durable flows from arriving
activities (fed-sx-triggers-loop.md Phases 2+3), native into next/flow
— no cross-guest FFI.

- pipeline.erl: apply_triggers/3 runs AFTER the kernel append (rejected
  activities never reach it). It looks the activity's type up in the
  trigger registry, drops specs whose guard/actor-scope fails or whose
  {activity_cid, trigger_cid} pair already fired (federation can deliver
  the same activity twice — dedup is keyed on that pair, read from the
  actor's :triggers_fired), and dispatches the rest. Returns the audit
  triples for the kernel to fold into :triggers_fired + its projection.
  Must not be called inside a `try` (it does gen_server:calls, which
  deadlock the scheduler inside a try); running post-append in its own
  step satisfies that.
- flow_dispatch.erl: bridges a matched trigger to flow_store:start, with
  the activity bound into the flow's input env. guard_passes/3 gates on
  actor-scope + guard. Failures (unknown flow, crashing first step) come
  back as {error, _}, never raised — one flow can't take down the rest.
- flow_store.erl: drive wrapped in try (the drive is pure, so the try is
  safe) so a flow whose step raises yields {error, {flow_crashed, _}}
  instead of crashing the store.

Tests: flow_dispatch.sh (12), pipeline_triggers.sh (10). lib/erlang
771/771, next/flow 34/34.

Co-Authored-By: Claude Opus 4.8 (1M context) <noreply@anthropic.com>
This commit is contained in:
2026-06-30 18:22:50 +00:00
parent fc6a47ad62
commit 6b4850b34e
5 changed files with 460 additions and 9 deletions

View File

@@ -89,10 +89,14 @@ handle_call({start, Name, Input}, _From, {Reg, Ins, N}) ->
not_found ->
{reply, {error, no_such_flow}, {Reg, Ins, N}};
{ok, Flow} ->
R = flow:drive(Flow, Input, []),
Status = result_status(R),
Ins2 = set_keyed(N, {Name, Input, [], Status}, Ins),
{reply, {ok, N, R}, {Reg, Ins2, N + 1}}
case safe_drive(Flow, Input, []) of
{ok, R} ->
Status = result_status(R),
Ins2 = set_keyed(N, {Name, Input, [], Status}, Ins),
{reply, {ok, N, R}, {Reg, Ins2, N + 1}};
{error, Crash} ->
{reply, {error, {flow_crashed, Crash}}, {Reg, Ins, N}}
end
end;
handle_call({resume, Id, Value}, _From, {Reg, Ins, N}) ->
case find_keyed(Id, Ins) of
@@ -106,10 +110,14 @@ handle_call({resume, Id, Value}, _From, {Reg, Ins, N}) ->
{reply, {error, no_such_flow}, {Reg, Ins, N}};
{ok, Flow} ->
NewLog = log_append(Log, Tag, Value),
R = flow:drive(Flow, Input, NewLog),
Status = result_status(R),
Ins2 = set_keyed(Id, {Name, Input, NewLog, Status}, Ins),
{reply, {ok, R}, {Reg, Ins2, N}}
case safe_drive(Flow, Input, NewLog) of
{ok, R} ->
Status = result_status(R),
Ins2 = set_keyed(Id, {Name, Input, NewLog, Status}, Ins),
{reply, {ok, R}, {Reg, Ins2, N}};
{error, Crash} ->
{reply, {error, {flow_crashed, Crash}}, {Reg, Ins, N}}
end
end
end;
handle_call({status, Id}, _From, {Reg, Ins, N}) ->
@@ -129,6 +137,18 @@ handle_info(_, S) -> {noreply, S}.
result_status({flow_done, R}) -> {done, R};
result_status({flow_suspended, T}) -> {suspended, T}.
%% safe_drive/3 — flow:drive is pure (no blocking receive), so a `try`
%% around it is safe in this runtime and isolates a flow whose step
%% raises: the store returns {error, {flow_crashed, _}} instead of the
%% gen_server crashing, keeping one bad flow from taking down others.
safe_drive(Flow, Input, Log) ->
try {ok, flow:drive(Flow, Input, Log)}
catch
throw:R -> {error, {throw, R}};
error:R -> {error, {error, R}};
exit:R -> {error, {exit, R}}
end.
log_append([], Tag, Value) -> [{Tag, Value}];
log_append([H | T], Tag, Value) -> [H | log_append(T, Tag, Value)].