4 untracked agent-briefing docs from the fed-sx-m1 worktree (merged branch loops/fed-sx-m2), saved here so they survive the worktree cleanup.
15 KiB
; -- mode: markdown --
loops/fed-sx-triggers — activity-driven flow triggers
Scoped briefing for a focused iteration loop that wires fed-sx
activities to durable business flows on top of lib/flow/. Every
arriving activity (federation OR local publish) can fan out into
one or more registered flows; the flow engine handles multi-step
workflows, branches, side effects, retries, and human gates.
Companion to plans/fed-sx-design.md §13 (delivery / projection
model) and plans/flow-on-sx.md (flow engine surface). This is
the build sheet for the substrate side of "an activity fires a
business flow."
description: loops/fed-sx-triggers — activity → flow dispatch (substrate)
subagent_type: general-purpose
run_in_background: true
isolation: worktree # /root/rose-ash-loops/fed-sx-triggers
The mental model
activity arrives ─→ pipeline validates ─→ kernel appends ─→ fan-out
│
trigger_registry lookup
│
┌───────────────┬───────────────────┴────────┐
↓ ↓ ↓
flow:start(F1) flow:start(F2) flow:start(F3)
│ │ │
(suspend, (sync side (multi-step:
resume on effect: payment → ship
timer) send email) → notify, with
branches)
Activities are the unit of "something happened." Flows are the unit of "what we DO about it" — composable, durable, branching.
Concrete examples (motivating)
- Blog publish (
Createfor anArticle): index in search, send digest to followers (per-actor preference dictates batching), push to federation peers, invalidate caches. Branch: if the article references unpublished drafts, hold publication until they land. - Order placed (
Createfor anOrder): charge payment → on fail, cancel + notify; on success, allocate inventory → on out-of-stock, refund + notify; on success, schedule shipping- send confirmation + start a follow-up flow for "ask for review in 14 days."
- Comment posted: run moderation pipeline → on fail, hold for human review (suspend until a moderator activity arrives); on pass, notify the post author + bump comment count + check for follow-up triggers.
- Membership renews: bill the card → on fail, retry 3 days later; on terminal fail, downgrade access + notify. Suspend for 364 days, then fire again.
All of these are 5-20 step flows with branches, retries,
suspensions on external events, and human gates. They are
lib/flow/ territory — m2's delivery_worker retry is much
simpler (one HTTP call, fixed backoff) and lives in
next/kernel/, not here.
Scope
In scope — next/** only. Four pieces:
DefineTriggergenesis activity-type +trigger_registry.erl- Trigger fan-out stage in
pipeline.erl(post-append) flow_dispatch.erl— bridges activity →lib/flow/start call- End-to-end integration test exercising a multi-step flow with one branch and one suspension
In scope to IMPORT from — lib/flow/** is a public dependency.
This loop calls flow:start/2, flow:resume/2, etc. as a
consumer; it does not modify lib/flow/.
Out of scope — lib/host/**, lib/erlang/**, bin/sx_server.ml,
any lib/<other>/ directory. Touching them would conflict with
parallel loops.
Parallel-safety with loops/fed-sx-types — that loop also
edits pipeline.erl (adding apply_object_schema/2 between
activity-type validation and append). This loop adds a fan-out
stage AFTER append. Different positions in the pipeline. Resolve
by inserting both stages in a single commit OR by branching this
loop from fed-sx-types' tip once fed-sx-types completes.
Default: branch from fed-sx-types' tip. If fed-sx-types is
mid-flight, wait for it to land Phase 4 before starting this
loop's Phase 2.
Branch base
origin/loops/fed-sx-types if it exists and has Phase 4 landed.
Else origin/loops/fed-sx-m2 (has all m2 substrate + send_after
- http-listen + peer_actors pattern). Either base has flow-on-sx
available via
lib/flow/**.
Phase 1 — DefineTrigger + trigger_registry.erl
next/genesis/activity-types/define_trigger.sx
Verb declaration. The :object payload binds an activity-type
(possibly per-actor, possibly with a guard predicate) to a flow
name:
(DefineTrigger
:name "Trigger"
:doc "Bind an activity-type to a flow. When a matching activity
is appended, fan out to flow:start with the activity as
input."
:schema (fn (act)
(and (string? (-> act :object :activity-type))
(string? (-> act :object :flow-name))
;; guard is optional — a (fn (activity) ...) returning bool
(or (nil? (-> act :object :guard))
(callable? (-> act :object :guard)))))
:semantics (fn (state act)
;; Folds into the actor's :triggers projection — registry on
;; restart hydrates from this.
(trigger-state-add state act)))
The :guard lets one type bind to multiple flows with
discriminators ("only fire EmailDigest for Articles with
:category "newsletter""). Optional; default true.
next/kernel/trigger_registry.erl
In-memory registry, hydrated from the actor's trigger projection
on start. Mirrors peer_actors.erl / peer_types.erl shape:
trigger_registry:start_link(_) -> Pid
trigger_registry:add(ActivityType, Spec) -> ok
trigger_registry:remove(Trigger Cid) -> ok
trigger_registry:lookup(ActivityType) -> [Spec]
trigger_registry:all_triggers() -> [{ActivityType, [Spec]}]
Where Spec is {trigger_cid, flow_name, guard_fn|undefined, actor_scope|any}. Multiple triggers per activity-type are
supported and fire independently.
Tests — next/tests/define_trigger.sh + next/tests/trigger_registry.sh
- Verb round-trips through term_codec
- Schema accepts well-formed, rejects missing fields
- Registry: add/lookup/remove, lookup with no matches → [], multi-bind one activity-type → list of specs
- Restart hydration from a fold over Trigger activities
Phase 1 acceptance: 12-14 tests.
Phase 2 — Pipeline fan-out stage
In next/kernel/pipeline.erl, add a stage after the kernel
append (not before — fan-out fires only on successfully accepted
activities; rejected activities don't trigger flows):
%% Pipeline (revised):
%% 1. envelope shape
%% 2. signature against peer-AS
%% 3. activity-type :schema
%% 4. object-schema validation (fed-sx-types Phase 4)
%% 5. kernel append
%% 6. trigger fan-out (THIS PHASE)
apply_triggers(Activity, ActorState, Cfg) ->
Type = activity_type_atom(Activity),
Specs = trigger_registry:lookup(Type),
Matching = [S || S <- Specs, guard_passes(S, Activity, ActorState)],
[flow_dispatch:start(Spec, Activity, ActorState, Cfg)
|| Spec <- Matching],
ok.
apply_triggers is fire-and-forget from the pipeline's POV —
the kernel response doesn't wait for flows to complete. Each
spawned flow is its own erlang process; failures are isolated
to that one flow.
Idempotency
Federation deduplication is a known weak spot: the same activity
can arrive twice via different peers. Fan-out must therefore
de-duplicate per {activity-cid, trigger-cid} pair before
calling flow_dispatch. The :triggers_fired actor-state field
holds [{activity_cid, trigger_cid}] of historical fires; before
calling flow_dispatch, check membership; on dispatch, add.
Tests — next/tests/pipeline_triggers.sh
- Activity append → trigger lookup → flow_dispatch invoked
- Activity with no matching trigger → no dispatch
- Guard returns false → no dispatch
- Same activity appended twice → flow_dispatch invoked once
- Failed flow_dispatch (raises) does not block the kernel append
- Multiple triggers for same activity-type → each dispatched
Phase 2 acceptance: 10-12 tests.
Phase 3 — flow_dispatch.erl
The bridge from "an activity matched a trigger" to "a flow
started with that activity as input." Calls into lib/flow/'s
public API.
%% flow_dispatch:start/4 starts the flow named in Spec with
%% the activity bound as :activity in the flow's input env. The
%% started flow's id is returned (or {error, _} on failure).
flow_dispatch:start(Spec, Activity, ActorState, Cfg) ->
FlowName = field(flow_name, Spec),
Input = [{activity, Activity},
{actor, actor_state:actor_id(ActorState)},
{trigger_cid, field(trigger_cid, Spec)}],
case flow:start(FlowName, Input) of
{ok, FlowId} -> {ok, FlowId};
{error, Reason} -> log_flow_start_failure(Spec, Activity, Reason),
{error, Reason}
end.
The Input proplist becomes the flow's initial environment.
Flow steps can read the activity, the actor id, and the trigger
cid (for audit chain).
Audit chain
Every flow started from an activity records its {activity_cid, trigger_cid, flow_id} triple to the actor's projection. Replaying
the actor log + the trigger registry reconstructs which flows
fired for what reasons. Same content-addressing discipline as the
rest of fed-sx.
Tests — next/tests/flow_dispatch.sh
- Successful flow start records audit triple
- Flow names that don't resolve in the flow registry → log + return
{error, no_such_flow}; don't crash - Flow's first step runs synchronously before flow_dispatch returns (proves the activity payload landed in the flow's env)
- Multi-step flow: trigger + first step + suspend on
flow:wait_for_message; resume on synthetic message → second step completes - Branch test: flow has if/else gate on activity field; both branches exercised
Phase 3 acceptance: 10-14 tests.
Phase 4 — End-to-end integration: blog-publish digest
A motivating, multi-step business flow that ties everything
together. Lives at next/genesis/flows/blog-publish-digest.sx
as a flow definition consumable by lib/flow/. Demonstrates:
- Multi-step (3+ named steps)
- Branch on activity field (article :category)
- Side effect (mock email sender via
:dispatch_fnCfg hook) - Suspension on a timer (e.g. "send digest at 9am the next day")
- Completion records a
DigestSentactivity (closing the loop — the flow's own output is itself an activity, which can in turn trigger downstream flows)
next/genesis/flows/blog-publish-digest.sx
(defflow blog-publish-digest
(step :validate
(let ((art (-> $input :activity :object)))
(if (= (-> art :type) "Article")
(continue :decide-batch art)
(done :skipped))))
(step :decide-batch
(fn (art)
(case (-> art :category)
:newsletter (continue :batch-until-morning art)
:urgent (continue :send-now art)
:else (done :skipped))))
(step :batch-until-morning
(fn (art)
(wait-until (next-morning-utc))
(continue :send-now art)))
(step :send-now
(fn (art)
(let ((followers (fetch-followers (-> $input :actor))))
(for-each follower followers
(dispatch-email follower art))
(continue :emit-digest-sent art))))
(step :emit-digest-sent
(fn (art)
(publish-activity
(Create :object {:type "DigestSent"
:for (-> art :id)
:follower-count (-> art :follower-count)}))
(done :ok))))
The exact defflow syntax is what lib/flow/ already provides —
mirror its existing test fixtures (lib/flow/tests/programs/*.sx)
for the canonical surface.
Integration test — next/tests/triggers_e2e.sh
End-to-end:
- Bootstrap an actor with a
Triggeractivity bindingArticle→blog-publish-digest. - Mock
dispatch_emailandfetch_followersvia Cfg hooks. - Publish a Create-Article activity with
:category :urgent. - Assert: flow ran to completion within one append cycle, three
emails dispatched,
DigestSentactivity appended. - Publish a Create-Article with
:category :newsletter. - Assert: flow suspended after
:decide-batch; advance the logical clock 16 hours; flow resumes; emails sent;DigestSentappended. - Publish a Create-Article with
:category :draft. - Assert: flow took the
:elsebranch, no emails, noDigestSent.
Phase 4 acceptance: 8-10 e2e tests.
Out-of-scope deliberately
- In-process local subscriber path on delivery_worker. Right
now
delivery_workerdispatches over HTTP; an "internal sink" abstraction would let activities fan out via the delivery_set to a local consumer. That's a separate, smaller change indelivery_worker.erl+outbox.erl. Keep it out of this loop to keep the scope tight. - Per-flow auth / capability gates. Triggers fire flows with the activity's actor identity. Authorisation of what flows can publish what activities is a separate concern handled by the existing pipeline.
- Flow versioning / hot-reload. When a flow definition changes, in-flight instances continue with the version they started on. New instances pick up the new version. flow-on-sx handles this; nothing to do here.
- Cross-actor triggers. A trigger lives on the actor it's defined for. "When ANY actor publishes an Article, fire this flow" is a fan-in pattern that needs a different design.
Tests discipline
bash lib/erlang/conformance.shgreen before AND after every commit (substrate not touched, should stay 771/771).bash lib/flow/conformance.shgreen before AND after (the flow engine is a dependency we use as-is — if our changes break it, we did something wrong).- Commits scoped to
next/**andplans/. No edits tolib/flow/**,lib/host/**,lib/erlang/**,bin/sx_server.ml, or otherlib/<lang>/**. - One commit per phase as outlined.
- Push to
origin/loops/fed-sx-triggersafter each commit.
Done when
- Phases 1-4 land with their acceptance tests green.
- The blog-publish-digest e2e test demonstrates a multi-step flow with a branch, a suspension, a side effect, and a follow-up activity emit, all driven by one trigger arriving in the pipeline.
plans/fed-sx-design.mdupdated (small note in §13) describing the trigger fan-out stage as a kernel-level convention.
When this loop closes, the substrate supports declaring business
flows as data, binding them to activity-types as Define*
activities, and seeing them fire automatically whenever the
matching activity lands — locally OR via federation. The whole
"event-driven CQRS" pattern that the user asked about works
end-to-end with lib/flow/ doing the actual workflow lifting.
Parallel safety with other loops
loops/host— no overlap (next/ only).loops/fed-sx-types— pipeline.erl overlap. Branch base on fed-sx-types' tip OR coordinate via a single combined pipeline-stage insertion commit.loops/erlang— no overlap (lib/erlang/ untouched).loops/flow— no overlap (lib/flow/ consumed as a dependency, never edited).