Files
rose-ash/plans/agent-briefings/fed-sx-triggers-loop.md
giles 4c0a48834e preserve fed-sx-m1 loop briefings before pruning its worktree
4 untracked agent-briefing docs from the fed-sx-m1 worktree (merged branch loops/fed-sx-m2),
saved here so they survive the worktree cleanup.
2026-07-02 12:25:50 +00:00

15 KiB

; -- mode: markdown --

loops/fed-sx-triggers — activity-driven flow triggers

Scoped briefing for a focused iteration loop that wires fed-sx activities to durable business flows on top of lib/flow/. Every arriving activity (federation OR local publish) can fan out into one or more registered flows; the flow engine handles multi-step workflows, branches, side effects, retries, and human gates.

Companion to plans/fed-sx-design.md §13 (delivery / projection model) and plans/flow-on-sx.md (flow engine surface). This is the build sheet for the substrate side of "an activity fires a business flow."

description: loops/fed-sx-triggers — activity → flow dispatch (substrate)
subagent_type: general-purpose
run_in_background: true
isolation: worktree     # /root/rose-ash-loops/fed-sx-triggers

The mental model

activity arrives ─→ pipeline validates ─→ kernel appends ─→ fan-out
                                                                  │
                                              trigger_registry lookup
                                                                  │
                              ┌───────────────┬───────────────────┴────────┐
                              ↓               ↓                            ↓
                          flow:start(F1)  flow:start(F2)   flow:start(F3)
                              │               │                            │
                            (suspend,     (sync side       (multi-step:
                             resume on     effect:          payment → ship
                             timer)        send email)      → notify, with
                                                            branches)

Activities are the unit of "something happened." Flows are the unit of "what we DO about it" — composable, durable, branching.

Concrete examples (motivating)

  • Blog publish (Create for an Article): index in search, send digest to followers (per-actor preference dictates batching), push to federation peers, invalidate caches. Branch: if the article references unpublished drafts, hold publication until they land.
  • Order placed (Create for an Order): charge payment → on fail, cancel + notify; on success, allocate inventory → on out-of-stock, refund + notify; on success, schedule shipping
    • send confirmation + start a follow-up flow for "ask for review in 14 days."
  • Comment posted: run moderation pipeline → on fail, hold for human review (suspend until a moderator activity arrives); on pass, notify the post author + bump comment count + check for follow-up triggers.
  • Membership renews: bill the card → on fail, retry 3 days later; on terminal fail, downgrade access + notify. Suspend for 364 days, then fire again.

All of these are 5-20 step flows with branches, retries, suspensions on external events, and human gates. They are lib/flow/ territory — m2's delivery_worker retry is much simpler (one HTTP call, fixed backoff) and lives in next/kernel/, not here.

Scope

In scopenext/** only. Four pieces:

  1. DefineTrigger genesis activity-type + trigger_registry.erl
  2. Trigger fan-out stage in pipeline.erl (post-append)
  3. flow_dispatch.erl — bridges activity → lib/flow/ start call
  4. End-to-end integration test exercising a multi-step flow with one branch and one suspension

In scope to IMPORT fromlib/flow/** is a public dependency. This loop calls flow:start/2, flow:resume/2, etc. as a consumer; it does not modify lib/flow/.

Out of scopelib/host/**, lib/erlang/**, bin/sx_server.ml, any lib/<other>/ directory. Touching them would conflict with parallel loops.

Parallel-safety with loops/fed-sx-types — that loop also edits pipeline.erl (adding apply_object_schema/2 between activity-type validation and append). This loop adds a fan-out stage AFTER append. Different positions in the pipeline. Resolve by inserting both stages in a single commit OR by branching this loop from fed-sx-types' tip once fed-sx-types completes. Default: branch from fed-sx-types' tip. If fed-sx-types is mid-flight, wait for it to land Phase 4 before starting this loop's Phase 2.

Branch base

origin/loops/fed-sx-types if it exists and has Phase 4 landed. Else origin/loops/fed-sx-m2 (has all m2 substrate + send_after

  • http-listen + peer_actors pattern). Either base has flow-on-sx available via lib/flow/**.

Phase 1 — DefineTrigger + trigger_registry.erl

next/genesis/activity-types/define_trigger.sx

Verb declaration. The :object payload binds an activity-type (possibly per-actor, possibly with a guard predicate) to a flow name:

(DefineTrigger
  :name "Trigger"
  :doc "Bind an activity-type to a flow. When a matching activity
        is appended, fan out to flow:start with the activity as
        input."
  :schema (fn (act)
    (and (string? (-> act :object :activity-type))
         (string? (-> act :object :flow-name))
         ;; guard is optional — a (fn (activity) ...) returning bool
         (or (nil? (-> act :object :guard))
             (callable? (-> act :object :guard)))))
  :semantics (fn (state act)
    ;; Folds into the actor's :triggers projection — registry on
    ;; restart hydrates from this.
    (trigger-state-add state act)))

The :guard lets one type bind to multiple flows with discriminators ("only fire EmailDigest for Articles with :category "newsletter""). Optional; default true.

next/kernel/trigger_registry.erl

In-memory registry, hydrated from the actor's trigger projection on start. Mirrors peer_actors.erl / peer_types.erl shape:

trigger_registry:start_link(_)                -> Pid
trigger_registry:add(ActivityType, Spec)      -> ok
trigger_registry:remove(Trigger Cid)          -> ok
trigger_registry:lookup(ActivityType)         -> [Spec]
trigger_registry:all_triggers()               -> [{ActivityType, [Spec]}]

Where Spec is {trigger_cid, flow_name, guard_fn|undefined, actor_scope|any}. Multiple triggers per activity-type are supported and fire independently.

Tests — next/tests/define_trigger.sh + next/tests/trigger_registry.sh

  • Verb round-trips through term_codec
  • Schema accepts well-formed, rejects missing fields
  • Registry: add/lookup/remove, lookup with no matches → [], multi-bind one activity-type → list of specs
  • Restart hydration from a fold over Trigger activities

Phase 1 acceptance: 12-14 tests.

Phase 2 — Pipeline fan-out stage

In next/kernel/pipeline.erl, add a stage after the kernel append (not before — fan-out fires only on successfully accepted activities; rejected activities don't trigger flows):

%% Pipeline (revised):
%%   1. envelope shape
%%   2. signature against peer-AS
%%   3. activity-type :schema
%%   4. object-schema validation       (fed-sx-types Phase 4)
%%   5. kernel append
%%   6. trigger fan-out                (THIS PHASE)

apply_triggers(Activity, ActorState, Cfg) ->
    Type = activity_type_atom(Activity),
    Specs = trigger_registry:lookup(Type),
    Matching = [S || S <- Specs, guard_passes(S, Activity, ActorState)],
    [flow_dispatch:start(Spec, Activity, ActorState, Cfg)
        || Spec <- Matching],
    ok.

apply_triggers is fire-and-forget from the pipeline's POV — the kernel response doesn't wait for flows to complete. Each spawned flow is its own erlang process; failures are isolated to that one flow.

Idempotency

Federation deduplication is a known weak spot: the same activity can arrive twice via different peers. Fan-out must therefore de-duplicate per {activity-cid, trigger-cid} pair before calling flow_dispatch. The :triggers_fired actor-state field holds [{activity_cid, trigger_cid}] of historical fires; before calling flow_dispatch, check membership; on dispatch, add.

Tests — next/tests/pipeline_triggers.sh

  • Activity append → trigger lookup → flow_dispatch invoked
  • Activity with no matching trigger → no dispatch
  • Guard returns false → no dispatch
  • Same activity appended twice → flow_dispatch invoked once
  • Failed flow_dispatch (raises) does not block the kernel append
  • Multiple triggers for same activity-type → each dispatched

Phase 2 acceptance: 10-12 tests.

Phase 3 — flow_dispatch.erl

The bridge from "an activity matched a trigger" to "a flow started with that activity as input." Calls into lib/flow/'s public API.

%% flow_dispatch:start/4 starts the flow named in Spec with
%% the activity bound as :activity in the flow's input env. The
%% started flow's id is returned (or {error, _} on failure).

flow_dispatch:start(Spec, Activity, ActorState, Cfg) ->
    FlowName = field(flow_name, Spec),
    Input    = [{activity, Activity},
                {actor, actor_state:actor_id(ActorState)},
                {trigger_cid, field(trigger_cid, Spec)}],
    case flow:start(FlowName, Input) of
        {ok, FlowId}    -> {ok, FlowId};
        {error, Reason} -> log_flow_start_failure(Spec, Activity, Reason),
                           {error, Reason}
    end.

The Input proplist becomes the flow's initial environment. Flow steps can read the activity, the actor id, and the trigger cid (for audit chain).

Audit chain

Every flow started from an activity records its {activity_cid, trigger_cid, flow_id} triple to the actor's projection. Replaying the actor log + the trigger registry reconstructs which flows fired for what reasons. Same content-addressing discipline as the rest of fed-sx.

Tests — next/tests/flow_dispatch.sh

  • Successful flow start records audit triple
  • Flow names that don't resolve in the flow registry → log + return {error, no_such_flow}; don't crash
  • Flow's first step runs synchronously before flow_dispatch returns (proves the activity payload landed in the flow's env)
  • Multi-step flow: trigger + first step + suspend on flow:wait_for_message; resume on synthetic message → second step completes
  • Branch test: flow has if/else gate on activity field; both branches exercised

Phase 3 acceptance: 10-14 tests.

Phase 4 — End-to-end integration: blog-publish digest

A motivating, multi-step business flow that ties everything together. Lives at next/genesis/flows/blog-publish-digest.sx as a flow definition consumable by lib/flow/. Demonstrates:

  • Multi-step (3+ named steps)
  • Branch on activity field (article :category)
  • Side effect (mock email sender via :dispatch_fn Cfg hook)
  • Suspension on a timer (e.g. "send digest at 9am the next day")
  • Completion records a DigestSent activity (closing the loop — the flow's own output is itself an activity, which can in turn trigger downstream flows)

next/genesis/flows/blog-publish-digest.sx

(defflow blog-publish-digest
  (step :validate
    (let ((art (-> $input :activity :object)))
      (if (= (-> art :type) "Article")
        (continue :decide-batch art)
        (done :skipped))))
  (step :decide-batch
    (fn (art)
      (case (-> art :category)
        :newsletter   (continue :batch-until-morning art)
        :urgent       (continue :send-now art)
        :else         (done :skipped))))
  (step :batch-until-morning
    (fn (art)
      (wait-until (next-morning-utc))
      (continue :send-now art)))
  (step :send-now
    (fn (art)
      (let ((followers (fetch-followers (-> $input :actor))))
        (for-each follower followers
          (dispatch-email follower art))
        (continue :emit-digest-sent art))))
  (step :emit-digest-sent
    (fn (art)
      (publish-activity
        (Create :object {:type "DigestSent"
                         :for (-> art :id)
                         :follower-count (-> art :follower-count)}))
      (done :ok))))

The exact defflow syntax is what lib/flow/ already provides — mirror its existing test fixtures (lib/flow/tests/programs/*.sx) for the canonical surface.

Integration test — next/tests/triggers_e2e.sh

End-to-end:

  1. Bootstrap an actor with a Trigger activity binding Articleblog-publish-digest.
  2. Mock dispatch_email and fetch_followers via Cfg hooks.
  3. Publish a Create-Article activity with :category :urgent.
  4. Assert: flow ran to completion within one append cycle, three emails dispatched, DigestSent activity appended.
  5. Publish a Create-Article with :category :newsletter.
  6. Assert: flow suspended after :decide-batch; advance the logical clock 16 hours; flow resumes; emails sent; DigestSent appended.
  7. Publish a Create-Article with :category :draft.
  8. Assert: flow took the :else branch, no emails, no DigestSent.

Phase 4 acceptance: 8-10 e2e tests.

Out-of-scope deliberately

  • In-process local subscriber path on delivery_worker. Right now delivery_worker dispatches over HTTP; an "internal sink" abstraction would let activities fan out via the delivery_set to a local consumer. That's a separate, smaller change in delivery_worker.erl + outbox.erl. Keep it out of this loop to keep the scope tight.
  • Per-flow auth / capability gates. Triggers fire flows with the activity's actor identity. Authorisation of what flows can publish what activities is a separate concern handled by the existing pipeline.
  • Flow versioning / hot-reload. When a flow definition changes, in-flight instances continue with the version they started on. New instances pick up the new version. flow-on-sx handles this; nothing to do here.
  • Cross-actor triggers. A trigger lives on the actor it's defined for. "When ANY actor publishes an Article, fire this flow" is a fan-in pattern that needs a different design.

Tests discipline

  • bash lib/erlang/conformance.sh green before AND after every commit (substrate not touched, should stay 771/771).
  • bash lib/flow/conformance.sh green before AND after (the flow engine is a dependency we use as-is — if our changes break it, we did something wrong).
  • Commits scoped to next/** and plans/. No edits to lib/flow/**, lib/host/**, lib/erlang/**, bin/sx_server.ml, or other lib/<lang>/**.
  • One commit per phase as outlined.
  • Push to origin/loops/fed-sx-triggers after each commit.

Done when

  • Phases 1-4 land with their acceptance tests green.
  • The blog-publish-digest e2e test demonstrates a multi-step flow with a branch, a suspension, a side effect, and a follow-up activity emit, all driven by one trigger arriving in the pipeline.
  • plans/fed-sx-design.md updated (small note in §13) describing the trigger fan-out stage as a kernel-level convention.

When this loop closes, the substrate supports declaring business flows as data, binding them to activity-types as Define* activities, and seeing them fire automatically whenever the matching activity lands — locally OR via federation. The whole "event-driven CQRS" pattern that the user asked about works end-to-end with lib/flow/ doing the actual workflow lifting.

Parallel safety with other loops

  • loops/host — no overlap (next/ only).
  • loops/fed-sx-types — pipeline.erl overlap. Branch base on fed-sx-types' tip OR coordinate via a single combined pipeline-stage insertion commit.
  • loops/erlang — no overlap (lib/erlang/ untouched).
  • loops/flow — no overlap (lib/flow/ consumed as a dependency, never edited).