Compare commits

...

13 Commits

Author SHA1 Message Date
6c9b96390f fed-sx-types Phase 8: blog-publish-digest e2e + flow:wait
Some checks failed
Test, Build, and Deploy / test-build-deploy (push) Failing after 58s
The motivating end-to-end demonstration (fed-sx-triggers-loop.md Phase
4): one trigger arriving in the pipeline drives a multi-step business
flow with a branch, a timer suspension, an injected effect, and a
follow-up activity emit — all in the kernel's own runtime.

- flow.erl: flow:wait/1 — a timer-style suspend that PRESERVES the value
  on resume (vs flow:suspend/1, which returns the logged result), so a
  "wait until morning" step lets the env flow through to later steps.
- next/flow/flows/blog_publish_digest.erl: the flow. Branches on the
  article :category (newsletter -> wait-until-morning -> send + emit;
  urgent -> send + emit now; else -> skip), fetches followers (injected),
  builds a digest email per follower, and emits a DigestSent activity
  OBJECT. Effect-as-data: a flow can't call kernel gen_servers from
  inside the drive (a blocking call there deadlocks the scheduler), so
  it returns the emails + DigestSent object for a driver to dispatch and
  append — which can then trigger downstream flows, closing the loop.

Test: triggers_e2e.sh (10) — urgent completes in one cycle with 3 emails
+ a DigestSent object; newsletter suspends on the morning timer, then
resumes to the same on "advancing the clock"; draft takes the else
branch (no emails); a non-Article note is rejected by the guard; a
duplicate activity fires once. flow:wait covered in next/flow (36/36).

plans/fed-sx-design.md §13.10 documents the trigger fan-out as a
kernel convention. lib/erlang 771/771.

Co-Authored-By: Claude Opus 4.8 (1M context) <noreply@anthropic.com>
2026-06-30 18:31:26 +00:00
6b4850b34e fed-sx-types Phase 7: pipeline trigger fan-out + flow_dispatch
Some checks failed
Test, Build, and Deploy / test-build-deploy (push) Failing after 44s
The post-append fan-out that fires durable flows from arriving
activities (fed-sx-triggers-loop.md Phases 2+3), native into next/flow
— no cross-guest FFI.

- pipeline.erl: apply_triggers/3 runs AFTER the kernel append (rejected
  activities never reach it). It looks the activity's type up in the
  trigger registry, drops specs whose guard/actor-scope fails or whose
  {activity_cid, trigger_cid} pair already fired (federation can deliver
  the same activity twice — dedup is keyed on that pair, read from the
  actor's :triggers_fired), and dispatches the rest. Returns the audit
  triples for the kernel to fold into :triggers_fired + its projection.
  Must not be called inside a `try` (it does gen_server:calls, which
  deadlock the scheduler inside a try); running post-append in its own
  step satisfies that.
- flow_dispatch.erl: bridges a matched trigger to flow_store:start, with
  the activity bound into the flow's input env. guard_passes/3 gates on
  actor-scope + guard. Failures (unknown flow, crashing first step) come
  back as {error, _}, never raised — one flow can't take down the rest.
- flow_store.erl: drive wrapped in try (the drive is pure, so the try is
  safe) so a flow whose step raises yields {error, {flow_crashed, _}}
  instead of crashing the store.

Tests: flow_dispatch.sh (12), pipeline_triggers.sh (10). lib/erlang
771/771, next/flow 34/34.

Co-Authored-By: Claude Opus 4.8 (1M context) <noreply@anthropic.com>
2026-06-30 18:22:50 +00:00
fc6a47ad62 fed-sx-types Phase 6: DefineTrigger verb + trigger_registry
Some checks failed
Test, Build, and Deploy / test-build-deploy (push) Failing after 53s
The trigger declaration layer (fed-sx-triggers-loop.md Phase 1): bind an
activity-type to a durable flow so an arriving activity can fan out into
a business flow.

- next/genesis/activity-types/define_trigger.sx — the DefineTrigger verb
  (DefineActivity form, nested-get schema). :object carries
  :activity-type, :flow-name, optional :guard / :actor-scope.
- next/kernel/trigger_registry.erl — pure core + registered gen_server,
  mirroring peer_actors/peer_types. Keyed by activity-type, multiple
  specs per type fire independently. Spec = {TriggerCid, FlowName,
  Guard, ActorScope}. Hydrates on start from a fold over DefineTrigger
  activities (restart-safe, same content-addressing as define_registry).

Manifest activity-types 7->8 (total bundle 38->39); the four bootstrap
count suites + genesis_parse bumped, and bootstrap_load's internal
timeout raised (the larger bundle's double cid:to_string was truncating).

Tests: define_trigger.sh (6), trigger_registry.sh (17). lib/erlang
771/771 + next/flow 34/34 untouched.

Co-Authored-By: Claude Opus 4.8 (1M context) <noreply@anthropic.com>
2026-06-30 18:18:22 +00:00
8b3d92ed5f fed-sx-types Phase 5: flow-on-erlang engine core (next/flow/)
Some checks failed
Test, Build, and Deploy / test-build-deploy (push) Failing after 49s
A native Erlang-on-SX durable workflow engine, so the fed-sx kernel can
fan activities out into business flows in its own runtime — no cross-
guest FFI to the Scheme lib/flow, no marshalling, no Scheme dependency.
The seed of a real engine (chosen over bridging Scheme flow) that can
later supersede it for substrate use.

- flow.erl — the deterministic-replay driver. Same durability model as
  the Scheme engine (re-run from the top; effects go through suspend;
  the replay log is plain [{Tag,Value}] data, restart-ready), but
  adapted to three hard runtime constraints: no re-enterable
  continuation, no process dictionary, and a blocking receive inside a
  `try` deadlocks the cooperative scheduler. Resolution: thread the log
  through a railway-style context and make suspend SHORT-CIRCUIT (like a
  fail value) instead of throwing — purely functional, sidesteps all
  three. Ctx = {flow_cont,V,Log} | {flow_susp,Tag,Log}.
- flow_spec.erl — combinator algebra mirrored from lib/flow/spec.sx:
  leaves, sequence/parallel/map_flow, flow_while/flow_until, branch,
  railway fail/recover/attempt, tap, try_catch/retry.
- flow_store.erl — durable gen_server: named-flow registry + instance
  table + start/resume/status. Drives the pure flow from handle_call,
  so no gen_server:call is ever inside the replay try-path.

Gate: next/flow/conformance.sh — 34/34. lib/erlang untouched (771/771).
See next/flow/README.md for the model + why railway threading.

Co-Authored-By: Claude Opus 4.8 (1M context) <noreply@anthropic.com>
2026-06-30 17:51:15 +00:00
bba2d7e5cd fed-sx-types: briefing for the host-side fed-sx adapter loop
Some checks failed
Test, Build, and Deploy / test-build-deploy (push) Failing after 42s
Companion to plans/fed-sx-host-types.md. Build sheet for the deferred
lib/host adapter slice (fed_sx_outbox / fed_sx_inbox): projects the
host's existing type-post metamodel (blog.sx: :cid, :schema, subtype-of
graph) onto the fed-sx DefineType/SubtypeOf verbs, ingests peers' types
into peer_types, validates inbound typed objects via
pipeline:apply_object_schema/2, and serves GET /types/<cid>.

Surfaces the two gating decisions for loops/host: the SX-host <->
Erlang-on-SX runtime boundary (recommends an HTTP boundary to dodge the
er-scheduler gen_server:call deadlock) and the type-CID identity choice.
Scope is the inverse of this loop: lib/host/** only, no next/ edits.

Co-Authored-By: Claude Opus 4.8 (1M context) <noreply@anthropic.com>
2026-06-30 17:05:53 +00:00
89dd23c287 fed-sx-types Phase 4: object-schema validation stage in pipeline
Some checks failed
Test, Build, and Deploy / test-build-deploy (push) Failing after 54s
pipeline:apply_object_schema/2 (+ stage_object_schema/1 factory) — the
object-schema stage between activity-type validation and the kernel
append (plans/fed-sx-host-types.md step 4). When an inbound activity's
:object declares a refinement type ({type, TypeName}), resolve it
(Cfg type_index: TypeName -> TypeCid; then peer_types:lookup_or_fetch/2,
a local hit or a wire fetch) and apply the record's refinement schema
to the object's :field_values, rejecting on schema-fail with
{error, {validation_failed, object_schema}}.

The schema is either a 1-arity Erlang predicate (substrate stand-in,
locally stored) or a term_codec-safe {required, [Field,...]} constraint
(so a wire-fetched record validates too). Default
strict_object_schema = false: an unresolvable type is let through (the
skip is where a validation_skipped log belongs); strict rejects.
Objects with no declared type, and names absent from the local index,
are skipped (open-world).

Test: next/tests/object_schema.sh (15) — local hit, wire fetch, fetch
failure strict/non-strict, no peer_types, untyped object, undeclared
name, fun + data schema forms, no-schema record, stage composition.

No regression: pipeline_signature, pipeline_driver green. Plan doc
steps 1-4 marked done.

Co-Authored-By: Claude Opus 4.8 (1M context) <noreply@anthropic.com>
2026-06-30 15:50:45 +00:00
441a895737 fed-sx-types Phase 3: /types/<cid> route + discovery_type_fetch
Some checks failed
Test, Build, and Deploy / test-build-deploy (push) Failing after 51s
Wire format for serving + fetching type docs (plans/fed-sx-host-types.md
step 3).

http_server.erl:
- new type_doc Accept format + content type
  (application/vnd.fed-sx.type-doc), distinct from actor-doc.
- GET /types/<cid> -> the cached TypeRecord term_codec-encoded, 404 if
  not in the peer_types cache. Reads peer_types via a Cfg
  {peer_types, peer_types} guard (hardcoded registered atom, mirroring
  the actor-doc route's kernel guard).

discovery_type_fetch.erl — sibling of discovery_fetch. make_fetch_fn
produces the fun/2 peer_types:lookup_or_fetch calls: GET
<base>/types/<cid> with the type-doc Accept header, returning the RAW
bytes (peer_types owns the term_codec decode, so the wire format lives
in one place — the route encodes, the cache decodes). Cfg carries
type_url / type_url_fn for TypeCid -> base URL resolution.

Tests: next/tests/peer_types_route.sh (13, in-process route dispatch),
next/tests/discovery_type_fetch.sh (9, closure vs a python type-doc
stub, end-to-end through peer_types:lookup_or_fetch).

No regression: http_accept, http_actors, http_get_format,
discovery_fetch all still green. Conformance 771/771.

Co-Authored-By: Claude Opus 4.8 (1M context) <noreply@anthropic.com>
2026-06-30 15:48:33 +00:00
8d54028c7f fed-sx-types Phase 2: peer_types.erl receiver-side cache
Some checks failed
Test, Build, and Deploy / test-build-deploy (push) Failing after 39s
next/kernel/peer_types.erl — a mirror of peer_actors keyed by type CID
(plans/fed-sx-host-types.md step 2). State [{TypeCidBytes, TypeRecord}],
where TypeRecord is the parsed DefineType :object payload. Refinement
schemas are immutable per CID, so cache entries never go stale.

Pure API: new/0, lookup/2, store/3, evict/2, types/1, lookup_or_fetch/3.
gen_server API (registered `peer_types`): put/2, lookup/1, state_for/1,
known_types/0, lookup_or_fetch/2, start_link/0,1.

lookup_or_fetch pulls a Cfg-supplied
  type_fetch_fn :: fun ((TypeCid, Cfg) -> {ok, Bytes} | {error, _})
on a miss, decodes the wire bytes via term_codec into the TypeRecord,
and caches it. No fn -> {error, no_fetch_fn}; fetch error / bad bytes
don't poison the cache (caller can retry). Keeping transport in the
closure (Phase 3 discovery_type_fetch) keeps the cache testable.

Test: next/tests/peer_types.sh (18) — pure + gen_server surface, fetch
miss/hit, no-fn, error-no-poison, undecodable-bytes, prepopulate.

Conformance 771/771.

Co-Authored-By: Claude Opus 4.8 (1M context) <noreply@anthropic.com>
2026-06-30 15:30:47 +00:00
5959a97dca fed-sx-types Phase 1: DefineType + SubtypeOf genesis verbs
Some checks failed
Test, Build, and Deploy / test-build-deploy (push) Failing after 21s
Two new DefineActivity-form genesis activity-types for host-type
federation (plans/fed-sx-host-types.md step 1):

- next/genesis/activity-types/define_type.sx — DefineType verb; schema
  accepts an :object with a string :name and optional list :fields.
- next/genesis/activity-types/subtype_of.sx — SubtypeOf verb; schema
  accepts an :object carrying string :child-type-cid + :parent-type-cid.

Schema bodies use nested `get` (not keyword-threading) so they are
directly evaluatable — keywords are not callable getters in the kernel.
Both registered in manifest.sx (activity-types now 7); the four bootstrap
suites' bundle counts bumped (5->7, total 36->38).

Tests: next/tests/define_type.sh (7), subtype_of.sh (6) — parse shape,
schema accept/reject, term_codec envelope round-trip.

Also load follower_graph + delivery in bootstrap_start.sh: its check-26
publish path exercises outbox:compute_delivery_set/3 (follower_graph:new
+ delivery:delivery_set), which an m2 substrate change had left unloaded
in that suite — a pre-existing red unrelated to the count bump.

Conformance 771/771; all touched next/tests green.

Co-Authored-By: Claude Opus 4.8 (1M context) <noreply@anthropic.com>
2026-06-30 15:30:21 +00:00
4da2a98c30 fed-sx-m2: Step 8b-timer — live retry-loop wiring on send_after
Some checks failed
Test, Build, and Deploy / test-build-deploy (push) Failing after 44s
Wires the delivery_worker's retry loop on top of the
erlang:send_after / cancel_timer primitives just landed on
loops/erlang (3709460d, 98b0104c, 779e53b2 — cherry-picked here
since origin/architecture hasn't caught up yet).

Surface:
- new :timers [{Cid, Ref}] state field tracks live timer refs
- handle_call(flush): drain (existing semantics) + arm_retry_timer
  per retried Cid (computes backoff slot from the now-bumped attempt
  count, sets next_retry_at, send_after self-cast). Reply shape
  unchanged.
- handle_info({retry, Cid}, S): redrives that one Cid through
  deliver_one_pure. Success → record_success_pure + clear pending.
  Failure → schedule_retry_for (which bumps attempts, dead-letters on
  slot 6, or arms next slot).
- cancel_timer_for/2 before arming a new timer so stale timers don't
  keep the scheduler's run loop alive after the work is done.
- state_srv/1 + timer_ref_for/2 for test introspection.

5/5 in new delivery_retry_timer.sh; existing delivery_worker.sh
17/17 and delivery_retry.sh 11/11 still green. Conformance gate
771/771 (was 761/761; the +10 is the cherry-picked send_after
suite).

Closes Blockers #3. m2 is now feature-complete.

Co-Authored-By: Claude Opus 4.7 <noreply@anthropic.com>
2026-06-30 14:05:31 +00:00
779e53b2a8 erlang: send_after to registered name + gen_server timeout returns (T5+T6, 771/771)
T5 — send_after addresses a registered atom name; the delayed message
lands in that process's mailbox (destination resolved at fire time,
dead/unregistered targets drop silently).

T6 — gen_server loop now handles the {reply,R,S,T} / {noreply,S,T}
timeout-bearing callback returns by scheduling {timeout} to itself via
send_after; handle_info({timeout}, S) fires when no other message
arrives first. Sanity-checks the library hookup.

Co-Authored-By: Claude Opus 4.8 (1M context) <noreply@anthropic.com>
2026-06-30 13:10:46 +00:00
d09c0048c7 erlang: send_after deadline-ordering + cancel-of-fired tests (T3+T4, 769/769)
T3 — concurrent timers fire in deadline order, not schedule order
(scheduler jumps the clock to the earliest pending deadline each
time the runnable queue drains). T4 — cancel_timer on an
already-fired timer returns the atom false.

Co-Authored-By: Claude Opus 4.8 (1M context) <noreply@anthropic.com>
2026-06-30 13:10:46 +00:00
3dbb3e318a erlang: erlang:send_after/3 + cancel_timer/1 + monotonic_time (T1+T2, 766/766)
Logical-clock timer wheel in the scheduler. send_after schedules a
message-delivery event at an absolute deadline (clock + Time ms);
cancel_timer marks a live timer cancelled and reports remaining ms,
or false. Time advances only when the runnable queue drains, jumping
to the earliest pending deadline (deterministic, no wall clock).

monotonic_time/0,1 exposes the logical ms clock.

Co-Authored-By: Claude Opus 4.8 (1M context) <noreply@anthropic.com>
2026-06-30 13:10:46 +00:00
44 changed files with 4438 additions and 52 deletions

View File

@@ -38,6 +38,7 @@ SUITES=(
"fib|er-fib-test-pass|er-fib-test-count"
"ffi|er-ffi-test-pass|er-ffi-test-count"
"vm|er-vm-test-pass|er-vm-test-count"
"send_after|er-sa-test-pass|er-sa-test-count"
)
cat > "$TMPFILE" << 'EPOCHS'
@@ -61,6 +62,7 @@ cat > "$TMPFILE" << 'EPOCHS'
(load "lib/erlang/vm/dispatcher.sx")
(load "lib/erlang/tests/ffi.sx")
(load "lib/erlang/tests/vm.sx")
(load "lib/erlang/tests/send_after.sx")
(epoch 100)
(eval "(list er-test-pass er-test-count)")
(epoch 101)
@@ -83,6 +85,8 @@ cat > "$TMPFILE" << 'EPOCHS'
(eval "(list er-ffi-test-pass er-ffi-test-count)")
(epoch 110)
(eval "(list er-vm-test-pass er-vm-test-count)")
(epoch 111)
(eval "(list er-sa-test-pass er-sa-test-count)")
EPOCHS
timeout 600 "$SX_SERVER" < "$TMPFILE" > "$OUTFILE" 2>&1

View File

@@ -135,6 +135,56 @@
(dict-set! s :next-ref (+ n 1))
(er-mk-ref n)))))
;; ── logical clock + timer wheel ──────────────────────────────────
;; The scheduler runs a synchronous model: logical time advances only
;; when the runnable queue drains (see `er-sched-advance-time!`). The
;; clock is in milliseconds, monotonic, never derived from wall time
;; — deterministic and time-travel-safe. `send_after` schedules a
;; message-delivery event at an absolute deadline; `receive after Ms`
;; schedules a timeout event the same way. When no process is runnable
;; the scheduler jumps the clock to the earliest pending deadline and
;; fires that single event, then re-runs.
(define er-clock (fn () (get (er-sched) :clock)))
;; Advance the clock to `ms`, but never backwards (monotonicity).
(define
er-clock-set!
(fn (ms) (dict-set! (er-sched) :clock (max (er-clock) ms))))
(define er-sched-timers (fn () (get (er-sched) :timers)))
;; Register a timer event. `dest` is a pid or registered-atom value,
;; resolved to a live process at fire time. Returns the timer ref.
(define
er-timer-add!
(fn
(deadline dest msg ref)
(append!
(er-sched-timers)
{:ref ref :deadline deadline :dest dest :msg msg :alive true})
ref))
;; Find the live timer with the given ref, or nil.
(define
er-timer-find-alive
(fn
(ref)
(let
((ts (er-sched-timers)) (found (list nil)))
(for-each
(fn
(i)
(let
((t (nth ts i)))
(when
(and
(= (nth found 0) nil)
(get t :alive)
(er-ref-equal? (get t :ref) ref))
(set-nth! found 0 t))))
(range 0 (len ts)))
(nth found 0))))
;; ── scheduler state ──────────────────────────────────────────────
(define er-scheduler (list nil))
@@ -151,6 +201,8 @@
:processes {}
:registered {}
:ets {}
:clock 0
:timers (list)
:runnable (er-q-new)})))
(define er-sched (fn () (nth er-scheduler 0)))
@@ -217,6 +269,7 @@
:trap-exit false
:has-timeout false
:timed-out false
:timeout-deadline nil
:exit-reason nil}))
(dict-set! (er-sched-processes) (er-pid-key pid) proc)
(er-sched-enqueue! pid)
@@ -456,6 +509,69 @@
(error "Erlang: make_ref/0: arity")
(er-ref-new!))))
;; ── timer BIFs ───────────────────────────────────────────────────
;; erlang:send_after(Time, Dest, Msg) -> Ref
;; Schedules Msg to be delivered to Dest after Time ms (logical).
;; Time must be a non-negative integer; Dest a pid or registered
;; atom name. Returns a fresh timer reference.
(define
er-bif-send-after
(fn
(vs)
(let
((time (nth vs 0)) (dest (nth vs 1)) (msg (nth vs 2)))
(cond
(not (and (= (type-of time) "number") (>= time 0)))
(raise (er-mk-error-marker (er-mk-atom "badarg")))
(not (or (er-pid? dest) (er-atom? dest)))
(raise (er-mk-error-marker (er-mk-atom "badarg")))
:else
(er-timer-add!
(+ (er-clock) (truncate time))
dest
msg
(er-ref-new!))))))
;; erlang:cancel_timer(Ref) -> RemainingMs | false
;; For a live (not-yet-fired) timer, marks it cancelled and returns
;; the milliseconds left until its deadline. For an already-fired,
;; already-cancelled, or unknown ref, returns the atom `false`.
(define
er-bif-cancel-timer
(fn
(vs)
(let
((ref (nth vs 0)))
(cond
(not (er-ref? ref))
(raise (er-mk-error-marker (er-mk-atom "badarg")))
:else
(let
((t (er-timer-find-alive ref)))
(cond
(= t nil) (er-mk-atom "false")
:else (do
(dict-set! t :alive false)
(max 0 (- (get t :deadline) (er-clock))))))))))
;; erlang:monotonic_time() | erlang:monotonic_time(Unit) -> Integer
;; Returns the scheduler's logical monotonic clock in milliseconds.
;; Unit (millisecond / second / native) is accepted for API
;; compatibility; all units report from the same ms-resolution clock.
(define
er-bif-monotonic-time
(fn
(vs)
(cond
(= (len vs) 0) (er-clock)
(and (= (len vs) 1) (er-atom? (nth vs 0)))
(let
((unit (get (nth vs 0) :name)))
(cond
(= unit "second") (truncate (/ (er-clock) 1000))
:else (er-clock)))
:else (raise (er-mk-error-marker (er-mk-atom "badarg"))))))
;; Add `target` to `pid`'s :links list if not already there.
(define
er-link-add-one!
@@ -664,37 +780,122 @@
(cond
(not (= pid nil))
(do (er-sched-step! pid) (er-sched-run-all!))
;; Queue empty — fire one pending receive-with-timeout and go again.
(er-sched-fire-one-timeout!) (er-sched-run-all!)
;; Queue empty — advance logical time to the next pending
;; deadline (timer delivery or receive-timeout) and go again.
(er-sched-advance-time!) (er-sched-run-all!)
:else nil))))
;; Wake one waiting process whose receive had an `after Ms` clause.
;; Returns true if one fired. In our synchronous model "time passes"
;; once the runnable queue drains — timeouts only fire then.
;; ── time advance ─────────────────────────────────────────────────
;; Called when the runnable queue is empty. Two kinds of pending event
;; carry a deadline: live `send_after` timers and waiting processes in
;; a `receive ... after Ms` block. Find the single earliest deadline
;; across both, jump the clock to it, and fire just that one event
;; (timer wins ties — a message delivered exactly at the timeout
;; arrives "first"). Returns true if an event fired, false when there
;; is nothing left to wake (genuine idle / termination).
(define
er-sched-fire-one-timeout!
er-sched-advance-time!
(fn
()
(let
((ks (keys (er-sched-processes))) (fired (list false)))
((best (er-sched-next-event)))
(cond
(= best nil) false
:else (do
(er-clock-set! (get best :deadline))
(cond
(= (get best :kind) "timer")
(er-timer-fire! (get best :timer))
:else (er-recv-timeout-fire! (get best :proc)))
true)))))
;; Scan timers and waiting-with-timeout processes for the earliest
;; deadline. Returns {:kind "timer"|"recv" :deadline D ...} or nil.
(define
er-sched-next-event
(fn
()
(let
((best (list nil)))
(for-each
(fn
(i)
(let
((t (nth (er-sched-timers) i)))
(when
(get t :alive)
(er-event-consider!
best
{:kind "timer" :deadline (get t :deadline) :timer t}))))
(range 0 (len (er-sched-timers))))
(for-each
(fn
(k)
(when
(not (nth fired 0))
(let
((p (get (er-sched-processes) k)))
(when
(and
(= (get p :state) "waiting")
(get p :has-timeout))
(dict-set! p :timed-out true)
(dict-set! p :has-timeout false)
(dict-set! p :state "runnable")
(er-sched-enqueue! (get p :pid))
(set-nth! fired 0 true)))))
ks)
(nth fired 0))))
(let
((p (get (er-sched-processes) k)))
(when
(and (= (get p :state) "waiting") (get p :has-timeout))
(er-event-consider!
best
{:kind "recv"
:deadline (get p :timeout-deadline)
:proc p}))))
(keys (er-sched-processes)))
(nth best 0))))
;; Keep the earlier-deadline candidate in the single-cell `best`.
;; Strictly-earlier replaces; equal deadlines keep the incumbent so a
;; timer registered first (and timers over recv-timeouts) win ties.
(define
er-event-consider!
(fn
(best cand)
(when
(or
(= (nth best 0) nil)
(< (get cand :deadline) (get (nth best 0) :deadline)))
(set-nth! best 0 cand))))
;; Deliver a fired timer's message to its destination and retire it.
;; Destination is resolved at fire time; a dead/missing target (or an
;; unregistered name) silently drops the message, as in real Erlang.
(define
er-timer-fire!
(fn
(t)
(dict-set! t :alive false)
(let
((pid (er-timer-resolve-dest (get t :dest))))
(when
(and (not (= pid nil)) (er-proc-exists? pid))
(er-proc-mailbox-push! pid (get t :msg))
(when
(= (er-proc-field pid :state) "waiting")
(er-proc-set! pid :state "runnable")
(er-sched-enqueue! pid))))))
;; Non-raising destination resolver for timer delivery.
(define
er-timer-resolve-dest
(fn
(v)
(cond
(er-pid? v) v
(er-atom? v)
(let
((name (get v :name)))
(if (dict-has? (er-registered) name) (get (er-registered) name) nil))
:else nil)))
;; Wake a process whose `receive ... after Ms` deadline elapsed.
(define
er-recv-timeout-fire!
(fn
(p)
(dict-set! p :timed-out true)
(dict-set! p :has-timeout false)
(dict-set! p :state "runnable")
(er-sched-enqueue! (get p :pid))))
(define
er-sched-step!
@@ -1177,8 +1378,15 @@
{reply, Reply, NewState} ->
From ! {Ref, Reply},
gen_server:loop(Mod, NewState);
{reply, Reply, NewState, Timeout} ->
From ! {Ref, Reply},
erlang:send_after(Timeout, self(), {timeout}),
gen_server:loop(Mod, NewState);
{noreply, NewState} ->
gen_server:loop(Mod, NewState);
{noreply, NewState, Timeout} ->
erlang:send_after(Timeout, self(), {timeout}),
gen_server:loop(Mod, NewState);
{stop, Reason, Reply, NewState} ->
From ! {Ref, Reply},
exit(Reason)
@@ -1186,11 +1394,17 @@
{'$gen_cast', Msg} ->
case Mod:handle_cast(Msg, State) of
{noreply, NewState} -> gen_server:loop(Mod, NewState);
{noreply, NewState, Timeout} ->
erlang:send_after(Timeout, self(), {timeout}),
gen_server:loop(Mod, NewState);
{stop, Reason, NewState} -> exit(Reason)
end;
Other ->
case Mod:handle_info(Other, State) of
{noreply, NewState} -> gen_server:loop(Mod, NewState);
{noreply, NewState, Timeout} ->
erlang:send_after(Timeout, self(), {timeout}),
gen_server:loop(Mod, NewState);
{stop, Reason, NewState} -> exit(Reason)
end
end.")
@@ -1785,6 +1999,10 @@
(er-register-bif! "erlang" "exit" 1 er-bif-exit)
(er-register-bif! "erlang" "exit" 2 er-bif-exit)
(er-register-bif! "erlang" "make_ref" 0 er-bif-make-ref)
(er-register-bif! "erlang" "send_after" 3 er-bif-send-after)
(er-register-bif! "erlang" "cancel_timer" 1 er-bif-cancel-timer)
(er-register-bif! "erlang" "monotonic_time" 0 er-bif-monotonic-time)
(er-register-bif! "erlang" "monotonic_time" 1 er-bif-monotonic-time)
(er-register-bif! "erlang" "link" 1 er-bif-link)
(er-register-bif! "erlang" "unlink" 1 er-bif-unlink)
(er-register-bif! "erlang" "monitor" 2 er-bif-monitor)

View File

@@ -1,7 +1,7 @@
{
"language": "erlang",
"total_pass": 761,
"total": 761,
"total_pass": 771,
"total": 771,
"suites": [
{"name":"tokenize","pass":62,"total":62,"status":"ok"},
{"name":"parse","pass":52,"total":52,"status":"ok"},
@@ -13,6 +13,7 @@
{"name":"echo","pass":7,"total":7,"status":"ok"},
{"name":"fib","pass":8,"total":8,"status":"ok"},
{"name":"ffi","pass":37,"total":37,"status":"ok"},
{"name":"vm","pass":78,"total":78,"status":"ok"}
{"name":"vm","pass":78,"total":78,"status":"ok"},
{"name":"send_after","pass":10,"total":10,"status":"ok"}
]
}

View File

@@ -1,6 +1,6 @@
# Erlang-on-SX Scoreboard
**Total: 761 / 761 tests passing**
**Total: 771 / 771 tests passing**
| | Suite | Pass | Total |
|---|---|---|---|
@@ -15,6 +15,7 @@
| ✅ | fib | 8 | 8 |
| ✅ | ffi | 37 | 37 |
| ✅ | vm | 78 | 78 |
| ✅ | send_after | 10 | 10 |
Generated by `lib/erlang/conformance.sh`.

View File

@@ -0,0 +1,163 @@
;; erlang:send_after / cancel_timer — timer primitives.
;;
;; A process schedules a message to itself (or another pid / registered
;; name) after N logical milliseconds. `cancel_timer` removes a pending
;; timer and reports the time left. These are the same primitives the
;; gen_server library uses to implement `{noreply, State, Timeout}`.
;;
;; The scheduler runs a synchronous logical clock (see runtime.sx
;; `er-sched-advance-time!`): time advances only when the runnable
;; queue drains, jumping to the earliest pending deadline. That makes
;; delivery deterministic and time-travel-safe — no wall clock.
(define er-sa-test-count 0)
(define er-sa-test-pass 0)
(define er-sa-test-fails (list))
(define
er-sa-test
(fn
(name actual expected)
(set! er-sa-test-count (+ er-sa-test-count 1))
(if
(= actual expected)
(set! er-sa-test-pass (+ er-sa-test-pass 1))
(append!
er-sa-test-fails
{:actual actual :expected expected :name name}))))
(define er-sa-pred
(fn (name actual) (er-sa-test name (if actual true false) true)))
(define sa-ev erlang-eval-ast)
;; ── T1 — schedule a self-message, receive it after the deadline ──
;; send_after returns a reference handle.
(er-sa-pred
"T1 send_after returns a ref"
(er-ref?
(sa-ev "erlang:send_after(50, self(), hello)")))
;; The scheduled message lands and a plain receive picks it up.
(er-sa-test
"T1 delivered message received"
(get
(sa-ev
"erlang:send_after(50, self(), hello),
receive M -> M end")
:name)
"hello")
;; Logical time advances exactly to the timer deadline (50ms) by the
;; time the message is received — round-trip latency well under 100ms.
(er-sa-test
"T1 clock at deadline on receipt"
(sa-ev
"erlang:send_after(50, self(), hello),
receive hello -> erlang:monotonic_time() end")
50)
;; ── T2 — cancel_timer returns remaining ms; message never arrives ──
;; Cancel immediately after scheduling: clock has not advanced, so the
;; full duration (~1000ms) is reported as remaining.
(er-sa-test
"T2 cancel returns remaining ms"
(sa-ev
"Ref = erlang:send_after(1000, self(), late),
erlang:cancel_timer(Ref)")
1000)
;; The cancelled timer never delivers — the receive falls through to
;; its `after` clause and returns `none`.
(er-sa-test
"T2 cancelled message never arrives"
(get
(sa-ev
"Ref = erlang:send_after(1000, self(), late),
erlang:cancel_timer(Ref),
receive late -> got after 50 -> none end")
:name)
"none")
;; ── T3 — multiple timers fire in deadline order, not schedule order ──
;; `b` is scheduled first (deadline 80) but `a` second (deadline 20).
;; Two plain receives drain the mailbox in arrival order — and arrival
;; is governed by deadline, so the first message out is `a`.
(er-sa-test
"T3 timers fire in deadline order"
(er-format-value
(sa-ev
"erlang:send_after(80, self(), b),
erlang:send_after(20, self(), a),
X = receive M1 -> M1 end,
Y = receive M2 -> M2 end,
{X, Y}"))
"{a,b}")
;; A selective receive on `a` matches the earlier-deadline timer even
;; though `b` was scheduled first.
(er-sa-test
"T3 selective receive picks earliest deadline"
(get
(sa-ev
"erlang:send_after(80, self(), b),
erlang:send_after(20, self(), a),
receive a -> first end")
:name)
"first")
;; ── T4 — cancel_timer on an already-fired timer returns false ──────
;; Once `x` has been received the timer has fired; cancelling its ref
;; now yields the atom `false`.
(er-sa-test
"T4 cancel of fired timer is false"
(get
(sa-ev
"Ref = erlang:send_after(20, self(), x),
receive x -> ok end,
erlang:cancel_timer(Ref)")
:name)
"false")
;; ── T5 — send_after to a registered atom name ──────────────────────
;; A second process registers itself as `srv`; the timer addresses it
;; by name, and the delayed message lands in that process's mailbox.
;; The server forwards what it got back to the parent for inspection.
(er-sa-test
"T5 timer delivers to registered name"
(get
(sa-ev
"Me = self(),
Pid = spawn(fun () -> receive M -> Me ! {got, M} end end),
register(srv, Pid),
erlang:send_after(20, srv, ping),
receive {got, X} -> X end")
:name)
"ping")
;; ── T6 — gen_server {noreply, State, Timeout} hookup ───────────────
;; A gen_server that, on the `arm` cast, returns {noreply, S, 100}.
;; The library schedules {timeout} to itself via send_after; when no
;; other message arrives first, handle_info({timeout}, S) fires. The
;; handler signals the parent so we can confirm the timeout landed.
(do
(er-load-gen-server!)
(erlang-load-module
"-module(sa_tmo).
init(Me) -> {ok, Me}.
handle_call(_R, _F, S) -> {reply, ok, S}.
handle_cast(arm, Me) -> {noreply, Me, 100}.
handle_info({timeout}, Me) -> Me ! fired, {noreply, Me};
handle_info(_M, S) -> {noreply, S}.")
nil)
(er-sa-test
"T6 gen_server timeout fires handle_info"
(get
(sa-ev
"Me = self(),
P = gen_server:start_link(sa_tmo, Me),
gen_server:cast(P, arm),
receive fired -> ok after 5000 -> timeout end")
:name)
"ok")

View File

@@ -1147,7 +1147,7 @@
(and (er-atom? ms) (= (get ms :name) "infinity"))
(er-eval-receive-loop node pid env)
(= ms 0) (er-eval-receive-poll node pid env)
:else (er-eval-receive-timed node pid env)))))
:else (er-eval-receive-timed node pid env (+ (er-clock) ms))))))
;; after 0 — poll once; on no match, run the after-body immediately.
(define
@@ -1161,12 +1161,15 @@
(get r :value)
(er-eval-body (get node :after-body) env)))))
;; after Ms — suspend; on resume check :timed-out. When the scheduler
;; runs out of other work it fires one pending timeout per round.
;; after Ms — suspend with an absolute `deadline` (logical ms). On
;; resume check :timed-out: the scheduler fires the earliest pending
;; deadline once the runnable queue drains. A non-matching message can
;; wake the process early; it re-suspends on the SAME deadline so the
;; timeout window is not extended.
(define
er-eval-receive-timed
(fn
(node pid env)
(node pid env deadline)
(let
((r (er-try-receive (get node :clauses) pid env)))
(if
@@ -1174,6 +1177,7 @@
(get r :value)
(do
(er-proc-set! pid :has-timeout true)
(er-proc-set! pid :timeout-deadline deadline)
(call/cc
(fn
(k)
@@ -1186,7 +1190,7 @@
(er-proc-set! pid :timed-out false)
(er-proc-set! pid :has-timeout false)
(er-eval-body (get node :after-body) env))
(er-eval-receive-timed node pid env)))))))
(er-eval-receive-timed node pid env deadline)))))))
;; Scan mailbox in arrival order. For each msg, try every clause.
;; On first match: remove that msg from mailbox and return body value.

91
next/flow/README.md Normal file
View File

@@ -0,0 +1,91 @@
# flow-on-erlang — durable workflows in the fed-sx runtime
A native Erlang-on-SX port of the Scheme flow engine (`lib/flow`), so
the fed-sx kernel can fan arriving activities out into durable,
branching, multi-step business flows **in its own runtime** — no
cross-guest FFI, no marshalling, no Scheme dependency. The seed of a
real engine that can later supersede the Scheme one for substrate use.
Run the suite: `bash next/flow/conformance.sh` → engine conformance.
## Model
A **flow** is an Erlang `fun(Ctx) -> Ctx`. Combinators (`flow_spec`)
compose flows; user code stays value-level (the functions you hand to
`flow_node`/`branch`/… take and return plain values). A flow that
ignores its input is a thunk; composition *is* function composition.
```erlang
F = flow_spec:sequence([
flow_spec:flow_node(fun(Draft) -> Draft + 1 end),
flow_spec:branch(fun(P) -> P >= 3 end,
flow_spec:flow_const(ok),
flow_spec:flow_const(rejected))]),
flow:run(F, 2) %% => {flow_done, ok}
```
## Durability — deterministic replay
Same semantics as the Scheme engine: a flow re-runs from the top on
every resume; effects/non-determinism go through `flow:suspend/1`,
whose resolved values are logged; an already-resolved suspend replays
its logged value, and the first unresolved suspend short-circuits back
to the driver. The persisted state is the **replay log** — plain
`[{Tag, Value}]` data — so nothing live (no continuation, no process)
is ever serialized; an instance survives restart by re-driving its
named flow against its log.
```erlang
flow_store:register_flow(publish, F),
{ok, Id, R} = flow_store:start(publish, Draft), %% R = {flow_suspended, Tag} | {flow_done, V}
%% ... driver performs the effect for Tag, then:
flow_store:resume(Id, EffectResult) %% re-drives; completes or suspends again
```
## Why railway threading instead of call/cc + a global
The Scheme engine uses an escape-only `call/cc` plus a mutable global
replay log. This Erlang-on-SX runtime can't do either, and has a third
sharp edge:
- **No re-enterable continuation** — but suspend only needs to *escape*,
which Erlang `throw` could do …
- **… except a blocking `receive` / `gen_server:call` inside a `try`
deadlocks** the cooperative scheduler. So `suspend` must not consult
the log via a registry process while inside a `try`.
- **No process dictionary** — so there is no ambient per-process slot to
stash the replay log in.
The resolution: thread the replay log through a railway-style **context**
and make `suspend` *short-circuit* (like a `fail` value) rather than
throw. No ambient state, no throw, no gen_server in the hot path —
purely functional, which sidesteps all three constraints. The driver
(`flow_store`) is the only stateful part, and it calls the pure
`flow:drive/3` from inside `handle_call`, never wrapping a blocking
receive.
A `Ctx` is `{flow_cont, Value, Log}` (running) or `{flow_susp, Tag,
Log}` (short-circuited); every combinator passes a suspended context
straight through.
## Modules
| Module | Role |
|---|---|
| `flow.erl` | pure replay driver: `drive/3`, `run/2`, `suspend/1`, the `Ctx` constructors/accessors |
| `flow_spec.erl` | combinator algebra: leaves, `sequence`/`parallel`/`map_flow`, `flow_while`/`flow_until`, `branch`, railway `fail`/`recover`/`attempt`, `tap`, `try_catch`/`retry` |
| `flow_store.erl` | durable gen_server: named-flow registry + instance table + `start`/`resume`/`status` |
## Consumed by
The fed-sx kernel's trigger fan-out (`pipeline.erl` + `flow_dispatch`)
starts named flows from arriving activities; see
`plans/fed-sx-host-types.md` and the triggers phases.
## Not yet (later layers)
- Persisting instance logs to the kernel's durable on-disk log (the
data shape is already restart-ready; only the backing is in-memory).
- `parallel` with multiple independent suspends resolving concurrently
(current `parallel` is sequential under one shared log).
- Full parity with the Scheme engine's distributed/remote nodes.

206
next/flow/conformance.sh Executable file
View File

@@ -0,0 +1,206 @@
#!/usr/bin/env bash
# next/flow/conformance.sh — flow-on-erlang engine conformance.
#
# Exercises the native Erlang-on-SX durable workflow engine
# (next/flow/{flow,flow_spec,flow_store}.erl): the combinator algebra,
# the deterministic-replay suspend/resume core, and the durable store.
# This is the gate for the engine, replacing lib/flow/conformance.sh
# (the Scheme engine) for the fed-sx substrate — the kernel's trigger
# fan-out drives flows in its own runtime, with no cross-guest FFI.
set -uo pipefail
cd "$(git rev-parse --show-toplevel)"
SX_SERVER="${SX_SERVER:-hosts/ocaml/_build/default/bin/sx_server.exe}"
if [ ! -x "$SX_SERVER" ]; then
SX_SERVER="/root/rose-ash/hosts/ocaml/_build/default/bin/sx_server.exe"
fi
if [ ! -x "$SX_SERVER" ]; then
echo "ERROR: sx_server.exe not found." >&2
exit 1
fi
VERBOSE="${1:-}"
PASS=0; FAIL=0; ERRORS=""
TMPFILE=$(mktemp); trap "rm -f $TMPFILE" EXIT
# Common combinator shorthands built per-epoch (Erlang locals don't
# survive across erlang-eval-ast calls; the gen_server state does).
N1='flow_spec:flow_node(fun(X) -> X + 1 end)'
N2='flow_spec:flow_node(fun(X) -> X * 2 end)'
SUSP_FLOW='flow_spec:sequence([flow_spec:flow_node(fun(X) -> X + 1 end), flow:suspend(wait1), flow_spec:flow_node(fun(V) -> {resumed, V} end)])'
TWO_SUSP='flow_spec:sequence([flow:suspend(a), flow_spec:flow_node(fun(V) -> V * 10 end), flow:suspend(b), flow_spec:flow_node(fun(V) -> V + 1 end)])'
cat > "$TMPFILE" <<EPOCHS
(epoch 1)
(load "lib/erlang/tokenizer.sx")
(load "lib/erlang/parser.sx")
(load "lib/erlang/parser-core.sx")
(load "lib/erlang/parser-expr.sx")
(load "lib/erlang/parser-module.sx")
(load "lib/erlang/transpile.sx")
(load "lib/erlang/runtime.sx")
(load "lib/erlang/vm/dispatcher.sx")
(epoch 2)
(eval "(er-load-gen-server!)")
(epoch 3)
(eval "(get (erlang-load-module (file-read \"next/flow/flow.erl\")) :name)")
(epoch 4)
(eval "(get (erlang-load-module (file-read \"next/flow/flow_spec.erl\")) :name)")
(epoch 5)
(eval "(get (erlang-load-module (file-read \"next/flow/flow_store.erl\")) :name)")
;; ── leaves ─────────────────────────────────────────────────
(epoch 10)
(eval "(get (erlang-eval-ast \"flow:run(flow_spec:flow_id(), 7) =:= {flow_done, 7}\") :name)")
(epoch 11)
(eval "(get (erlang-eval-ast \"flow:run(flow_spec:flow_const(k), 7) =:= {flow_done, k}\") :name)")
(epoch 12)
(eval "(get (erlang-eval-ast \"flow:run(${N1}, 41) =:= {flow_done, 42}\") :name)")
;; ── threading / fan-out / iteration ────────────────────────
(epoch 20)
(eval "(get (erlang-eval-ast \"flow:run(flow_spec:sequence([${N1}, ${N2}]), 3) =:= {flow_done, 8}\") :name)")
(epoch 21)
(eval "(get (erlang-eval-ast \"flow:run(flow_spec:parallel([flow_spec:flow_const(a), flow_spec:flow_const(b)]), 0) =:= {flow_done, [a, b]}\") :name)")
(epoch 22)
(eval "(get (erlang-eval-ast \"flow:run(flow_spec:map_flow(${N1}), [1, 2, 3]) =:= {flow_done, [2, 3, 4]}\") :name)")
(epoch 23)
(eval "(get (erlang-eval-ast \"flow:run(flow_spec:flow_while(fun(X) -> X < 10 end, ${N1}, 100), 0) =:= {flow_done, 10}\") :name)")
(epoch 24)
(eval "(get (erlang-eval-ast \"flow:run(flow_spec:flow_until(fun(X) -> X >= 5 end, ${N1}, 100), 0) =:= {flow_done, 5}\") :name)")
(epoch 25)
(eval "(get (erlang-eval-ast \"flow:run(flow_spec:flow_while(fun(_) -> true end, ${N1}, 3), 0) =:= {flow_done, 3}\") :name)")
;; ── branching ──────────────────────────────────────────────
(epoch 30)
(eval "(get (erlang-eval-ast \"flow:run(flow_spec:branch(fun(X) -> X > 0 end, flow_spec:flow_const(pos), flow_spec:flow_const(neg)), 5) =:= {flow_done, pos}\") :name)")
(epoch 31)
(eval "(get (erlang-eval-ast \"flow:run(flow_spec:branch(fun(X) -> X > 0 end, flow_spec:flow_const(pos), flow_spec:flow_const(neg)), -5) =:= {flow_done, neg}\") :name)")
;; ── railway failure ────────────────────────────────────────
(epoch 40)
(eval "(get (erlang-eval-ast \"flow_spec:failed(flow_spec:fail(x)) andalso (flow_spec:failed(42) =:= false)\") :name)")
(epoch 41)
(eval "(get (erlang-eval-ast \"flow:run(flow_spec:attempt([flow_spec:flow_node(fun(_) -> flow_spec:fail(boom) end), flow_spec:flow_node(fun(_) -> 999 end)]), 0) =:= {flow_done, {flow_fail, boom}}\") :name)")
(epoch 42)
(eval "(get (erlang-eval-ast \"flow:run(flow_spec:attempt([${N1}, ${N2}]), 3) =:= {flow_done, 8}\") :name)")
(epoch 43)
(eval "(get (erlang-eval-ast \"flow:run(flow_spec:recover(flow_spec:flow_node(fun(_) -> flow_spec:fail(bad) end), fun(R) -> {ok, R} end), 0) =:= {flow_done, {ok, bad}}\") :name)")
;; ── effects / exceptions ───────────────────────────────────
(epoch 50)
(eval "(get (erlang-eval-ast \"flow:run(flow_spec:tap(fun(_) -> ok end), 7) =:= {flow_done, 7}\") :name)")
(epoch 51)
(eval "(get (erlang-eval-ast \"flow:run(flow_spec:try_catch(flow_spec:flow_node(fun(_) -> throw(oops) end), fun(E) -> {caught, E} end), 0) =:= {flow_done, {caught, oops}}\") :name)")
(epoch 52)
(eval "(get (erlang-eval-ast \"flow:run(flow_spec:retry(5, flow_spec:flow_node(fun(X) -> X + 1 end)), 1) =:= {flow_done, 2}\") :name)")
;; ── suspend / replay (deterministic-replay core) ───────────
(epoch 60)
(eval "(get (erlang-eval-ast \"flow:run(${SUSP_FLOW}, 0) =:= {flow_suspended, wait1}\") :name)")
(epoch 61)
(eval "(get (erlang-eval-ast \"flow:drive(${SUSP_FLOW}, 0, [{wait1, 99}]) =:= {flow_done, {resumed, 99}}\") :name)")
(epoch 62)
(eval "(get (erlang-eval-ast \"flow:run(flow_spec:sequence([flow:suspend(a), flow:suspend(b)]), 0) =:= {flow_suspended, a}\") :name)")
;; wait/1 — timer-style suspend that PRESERVES the value on resume
(epoch 63)
(eval "(get (erlang-eval-ast \"flow:run(flow_spec:sequence([flow:wait(t), flow_spec:flow_node(fun(X) -> X + 1 end)]), 5) =:= {flow_suspended, t}\") :name)")
(epoch 64)
(eval "(get (erlang-eval-ast \"flow:drive(flow_spec:sequence([flow:wait(t), flow_spec:flow_node(fun(X) -> X + 1 end)]), 5, [{t, ignored}]) =:= {flow_done, 6}\") :name)")
;; ── durable store: registry ────────────────────────────────
(epoch 70)
(eval "(get (erlang-eval-ast \"flow_store:start_link(), flow_store:register_flow(f1, ${N1}), flow_store:resolve_flow(f1) =/= not_found andalso flow_store:registered_flows() =:= [f1]\") :name)")
(epoch 71)
(eval "(get (erlang-eval-ast \"flow_store:start_link(), flow_store:resolve_flow(ghost) =:= not_found\") :name)")
;; ── durable store: start / resume ──────────────────────────
;; one-shot flow runs to completion on start
(epoch 80)
(eval "(get (erlang-eval-ast \"flow_store:start_link(), flow_store:register_flow(done1, ${N1}), flow_store:start(done1, 41) =:= {ok, 1, {flow_done, 42}}\") :name)")
;; suspending flow: start suspends, resume completes
(epoch 81)
(eval "(get (erlang-eval-ast \"flow_store:start_link(), flow_store:register_flow(s1, ${SUSP_FLOW}), {ok, Id, R} = flow_store:start(s1, 10), R =:= {flow_suspended, wait1} andalso flow_store:status(Id) =:= {ok, {suspended, wait1}}\") :name)")
(epoch 82)
(eval "(get (erlang-eval-ast \"flow_store:start_link(), flow_store:register_flow(s1, ${SUSP_FLOW}), {ok, Id, _} = flow_store:start(s1, 10), flow_store:resume(Id, 99) =:= {ok, {flow_done, {resumed, 99}}}\") :name)")
(epoch 83)
(eval "(get (erlang-eval-ast \"flow_store:start_link(), flow_store:register_flow(s1, ${SUSP_FLOW}), {ok, Id, _} = flow_store:start(s1, 10), flow_store:resume(Id, 99), flow_store:status(Id) =:= {ok, {done, {resumed, 99}}}\") :name)")
;; two-suspend flow: resume chain accumulates the replay log
(epoch 84)
(eval "(get (erlang-eval-ast \"flow_store:start_link(), flow_store:register_flow(s2, ${TWO_SUSP}), {ok, Id, _} = flow_store:start(s2, 0), {ok, R1} = flow_store:resume(Id, 5), R2 = flow_store:resume(Id, 7), R1 =:= {flow_suspended, b} andalso R2 =:= {ok, {flow_done, 8}}\") :name)")
;; error paths
(epoch 85)
(eval "(get (erlang-eval-ast \"flow_store:start_link(), flow_store:start(ghost, 0) =:= {error, no_such_flow}\") :name)")
(epoch 86)
(eval "(get (erlang-eval-ast \"flow_store:start_link(), flow_store:resume(999, x) =:= {error, no_such_instance}\") :name)")
(epoch 87)
(eval "(get (erlang-eval-ast \"flow_store:start_link(), flow_store:register_flow(done1, ${N1}), {ok, Id, _} = flow_store:start(done1, 0), flow_store:resume(Id, x) =:= {error, already_done}\") :name)")
EPOCHS
OUTPUT=$(timeout 360 "$SX_SERVER" < "$TMPFILE" 2>/dev/null)
check() {
local epoch="$1" desc="$2" expected="$3"
local actual
actual=$(echo "$OUTPUT" | awk -v e="$epoch" '
$0 ~ "^\\(ok-len " e " " { getline; print; exit }
$0 ~ "^\\(ok " e " " { print; exit }
$0 ~ "^\\(error " e " " { print; exit }
')
[ -z "$actual" ] && actual="<no output for epoch $epoch>"
if echo "$actual" | grep -qF -- "$expected"; then
PASS=$((PASS+1))
[ "$VERBOSE" = "-v" ] && echo " ok $desc"
else
FAIL=$((FAIL+1))
ERRORS+=" FAIL [$desc] (epoch $epoch) expected: $expected | actual: $actual
"
fi
}
check 3 "flow module loaded" "flow"
check 4 "flow_spec module loaded" "flow_spec"
check 5 "flow_store module loaded" "flow_store"
check 10 "flow_id" "true"
check 11 "flow_const" "true"
check 12 "flow_node" "true"
check 20 "sequence threads left-to-right" "true"
check 21 "parallel fans out" "true"
check 22 "map_flow over a list" "true"
check 23 "flow_while bounded by pred" "true"
check 24 "flow_until bounded by pred" "true"
check 25 "flow_while bounded by max" "true"
check 30 "branch then-arm" "true"
check 31 "branch else-arm" "true"
check 40 "failed? predicate" "true"
check 41 "attempt stops at first fail" "true"
check 42 "attempt threads on success" "true"
check 43 "recover handles fail value" "true"
check 50 "tap pass-through" "true"
check 51 "try_catch catches a raise" "true"
check 52 "retry runs node" "true"
check 60 "suspend miss short-circuits" "true"
check 61 "suspend replay completes" "true"
check 62 "first of two suspends wins" "true"
check 63 "wait short-circuits on miss" "true"
check 64 "wait preserves value on resume" "true"
check 70 "register + resolve + list" "true"
check 71 "resolve unknown -> not_found" "true"
check 80 "start one-shot -> done" "true"
check 81 "start suspends + status" "true"
check 82 "resume completes" "true"
check 83 "status after resume = done" "true"
check 84 "two-suspend resume chain" "true"
check 85 "start unknown -> no_such_flow" "true"
check 86 "resume unknown -> no_such_instance" "true"
check 87 "resume a done flow -> already_done" "true"
TOTAL=$((PASS+FAIL))
if [ $FAIL -eq 0 ]; then
echo "ok $PASS/$TOTAL flow-on-erlang engine tests passed"
else
echo "FAIL $PASS/$TOTAL passed, $FAIL failed:"
echo "$ERRORS"
fi
[ $FAIL -eq 0 ]

102
next/flow/flow.erl Normal file
View File

@@ -0,0 +1,102 @@
-module(flow).
-export([drive/3, run/2,
cont/2, susp/2, is_susp/1, ctx_value/1, ctx_log/1,
suspend/1, wait/1, log_lookup/2]).
%% flow-on-erlang — the deterministic-replay core. A native Erlang port
%% of the Scheme flow engine (lib/flow), so the fed-sx kernel can fan
%% activities out into durable business flows in its own runtime (no
%% cross-guest FFI).
%%
%% Durability model — identical semantics to the Scheme engine, but
%% adapted to this Erlang-on-SX runtime, which has three hard
%% constraints the Scheme host doesn't: no escape continuation that can
%% be re-entered, no process dictionary, and (critically) a blocking
%% `receive` / `gen_server:call` inside a `try` deadlocks the
%% cooperative scheduler. So instead of the Scheme engine's
%% mutable-global + call/cc-escape, the replay log is THREADED through a
%% railway-style context and `suspend` SHORT-CIRCUITS (like a fail
%% value) rather than throwing. No ambient state, no throw, no
%% gen_server — purely functional, which sidesteps every constraint.
%%
%% A node is `fun(Ctx) -> Ctx`. A Ctx is one of:
%% {flow_cont, Value, Log} — running; Value is the current value
%% {flow_susp, Tag, Log} — short-circuited at suspend Tag
%% Log is the replay log: [{Tag, ResolvedValue}, ...]. Combinators
%% (flow_spec) thread Ctx and pass {flow_susp,...} straight through, so
%% once a flow suspends nothing downstream runs.
%%
%% suspend/1 is the load-bearing primitive: a node that, given the
%% running Ctx, looks Tag up in the replay log. A hit replaces the
%% current value with the logged value and continues; a miss
%% short-circuits to {flow_susp, Tag, Log}. ALL effects/non-determinism
%% go through a suspend node so they run once — in the driver, between
%% drives — and their results are logged, never re-run on replay. Tags
%% must be unique and deterministic across replays.
%% ── context constructors / accessors ────────────────────────────
cont(Value, Log) -> {flow_cont, Value, Log}.
susp(Tag, Log) -> {flow_susp, Tag, Log}.
is_susp({flow_susp, _, _}) -> true;
is_susp(_) -> false.
ctx_value({flow_cont, Value, _}) -> Value;
ctx_value({flow_susp, _, _}) -> undefined.
ctx_log({flow_cont, _, Log}) -> Log;
ctx_log({flow_susp, _, Log}) -> Log.
%% ── suspend node ────────────────────────────────────────────────
suspend(Tag) ->
fun (Ctx) ->
case Ctx of
{flow_susp, _, _} -> Ctx;
{flow_cont, _Value, Log} ->
case log_lookup(Tag, Log) of
{ok, Resolved} -> {flow_cont, Resolved, Log};
miss -> {flow_susp, Tag, Log}
end
end
end.
%% wait(Tag) — a timer-style suspend that PRESERVES the current value
%% instead of replacing it with the resolved one. Use it for pure
%% waits ("resume in the morning") where the resume is just a signal,
%% not a result: on the first pass it short-circuits like suspend; once
%% Tag is in the log the value flows through unchanged, so downstream
%% steps still see the value (e.g. the env) they had before the wait.
wait(Tag) ->
fun (Ctx) ->
case Ctx of
{flow_susp, _, _} -> Ctx;
{flow_cont, Value, Log} ->
case log_lookup(Tag, Log) of
{ok, _} -> {flow_cont, Value, Log};
miss -> {flow_susp, Tag, Log}
end
end
end.
log_lookup(_, []) -> miss;
log_lookup(Tag, [{Tag, Value} | _]) -> {ok, Value};
log_lookup(Tag, [_ | Rest]) -> log_lookup(Tag, Rest).
%% ── driver ──────────────────────────────────────────────────────
%% drive(Flow, Input, Log) — run Flow under the replay Log.
%% {flow_done, Result} — flow completed
%% {flow_suspended, Tag} — flow short-circuited at an unresolved
%% suspend; the driver resolves Tag, appends
%% {Tag, Value} to Log, and re-drives.
drive(Flow, Input, Log) ->
case Flow({flow_cont, Input, Log}) of
{flow_cont, Result, _} -> {flow_done, Result};
{flow_susp, Tag, _} -> {flow_suspended, Tag}
end.
%% run(Flow, Input) — drive with an empty replay log.
run(Flow, Input) ->
drive(Flow, Input, []).

240
next/flow/flow_spec.erl Normal file
View File

@@ -0,0 +1,240 @@
-module(flow_spec).
-export([flow_node/1, flow_id/0, flow_const/1,
sequence/1, parallel/1, map_flow/1,
flow_while/3, flow_until/3,
branch/3, fail/1, failed/1, fail_reason/1,
recover/2, tap/1, attempt/1, try_catch/2, retry/2]).
%% flow-on-erlang combinators — a native port of lib/flow/spec.sx,
%% adapted to the railway-threaded context model in flow.erl. A node is
%% `fun(Ctx) -> Ctx`; every combinator passes a {flow_susp,...} context
%% straight through, so once a flow suspends nothing downstream runs.
%% User code stays value-level: the predicates/functions handed to
%% flow_node / branch / etc. take and return plain values, and the
%% combinator threads them into the context.
%%
%% Variadic Scheme forms (sequence, parallel, attempt) take an explicit
%% list here — the one idiom difference from the Scheme engine. Effects
%% must go through a flow:suspend/1 node so they run once (in the
%% driver) and replay from the log; `tap` is only for replay-safe
%% effects (e.g. tracing).
%% ── leaves ──────────────────────────────────────────────────────
%% flow_node(F) — lift a value function F :: Value -> Value into a node.
flow_node(F) ->
fun (Ctx) ->
case flow:is_susp(Ctx) of
true -> Ctx;
false -> flow:cont(F(flow:ctx_value(Ctx)), flow:ctx_log(Ctx))
end
end.
flow_id() ->
fun (Ctx) -> Ctx end.
flow_const(V) ->
fun (Ctx) ->
case flow:is_susp(Ctx) of
true -> Ctx;
false -> flow:cont(V, flow:ctx_log(Ctx))
end
end.
%% ── threading / fan-out / iteration ─────────────────────────────
%% sequence(Nodes) — thread the context left-to-right. Each node
%% self-guards on suspension, so a suspended context flows through
%% untouched.
sequence(Nodes) ->
fun (Ctx) -> seq_step(Nodes, Ctx) end.
seq_step([], Ctx) -> Ctx;
seq_step([N | Ns], Ctx) -> seq_step(Ns, N(Ctx)).
%% parallel(Nodes) — fan the input value to every node, join results
%% into a list (sequential evaluation under one shared replay log).
%% First child to suspend short-circuits the whole parallel.
parallel(Nodes) ->
fun (Ctx) ->
case flow:is_susp(Ctx) of
true -> Ctx;
false -> par_step(Nodes, flow:ctx_value(Ctx), flow:ctx_log(Ctx), [])
end
end.
par_step([], _Input, Log, Acc) ->
flow:cont(lists:reverse(Acc), Log);
par_step([N | Ns], Input, Log, Acc) ->
R = N(flow:cont(Input, Log)),
case flow:is_susp(R) of
true -> R;
false -> par_step(Ns, Input, Log, [flow:ctx_value(R) | Acc])
end.
%% map_flow(Node) — run Node over each item of a list input value.
map_flow(Node) ->
fun (Ctx) ->
case flow:is_susp(Ctx) of
true -> Ctx;
false -> map_step(Node, flow:ctx_value(Ctx), flow:ctx_log(Ctx), [])
end
end.
map_step(_, [], Log, Acc) ->
flow:cont(lists:reverse(Acc), Log);
map_step(Node, [I | Is], Log, Acc) ->
R = Node(flow:cont(I, Log)),
case flow:is_susp(R) of
true -> R;
false -> map_step(Node, Is, Log, [flow:ctx_value(R) | Acc])
end.
%% flow_while(Pred, Body, Max) — re-run Body (a node), threading the
%% context, while Pred(value) holds, up to Max steps. Pred :: Value ->
%% bool; Body :: node.
flow_while(Pred, Body, Max) ->
fun (Ctx) -> while_step(Pred, Body, Ctx, Max) end.
while_step(_, _, Ctx, N) when N =< 0 -> Ctx;
while_step(Pred, Body, Ctx, N) ->
case flow:is_susp(Ctx) of
true -> Ctx;
false ->
case Pred(flow:ctx_value(Ctx)) of
true -> while_step(Pred, Body, Body(Ctx), N - 1);
_ -> Ctx
end
end.
%% flow_until(Pred, Body, Max) — re-run Body until Pred(value) holds.
flow_until(Pred, Body, Max) ->
fun (Ctx) -> until_step(Pred, Body, Ctx, Max) end.
until_step(_, _, Ctx, N) when N =< 0 -> Ctx;
until_step(Pred, Body, Ctx, N) ->
case flow:is_susp(Ctx) of
true -> Ctx;
false ->
case Pred(flow:ctx_value(Ctx)) of
true -> Ctx;
_ -> until_step(Pred, Body, Body(Ctx), N - 1)
end
end.
%% ── branching ───────────────────────────────────────────────────
%% branch(Pred, Then, Else) — Pred :: Value -> bool; Then/Else :: node.
branch(Pred, Then, Else) ->
fun (Ctx) ->
case flow:is_susp(Ctx) of
true -> Ctx;
false ->
case Pred(flow:ctx_value(Ctx)) of
true -> Then(Ctx);
_ -> Else(Ctx)
end
end
end.
%% ── railway-style failure (values, not exceptions) ──────────────
fail(Reason) -> {flow_fail, Reason}.
failed({flow_fail, _}) -> true;
failed(_) -> false.
fail_reason({flow_fail, R}) -> R.
%% recover(Node, Handler) — if Node yields a fail VALUE, run Handler on
%% the reason; else pass through. Handler :: Reason -> Value.
recover(Node, Handler) ->
fun (Ctx) ->
R = Node(Ctx),
case flow:is_susp(R) of
true -> R;
false ->
V = flow:ctx_value(R),
case failed(V) of
true -> flow:cont(Handler(fail_reason(V)), flow:ctx_log(R));
false -> R
end
end
end.
%% tap(Effect) — replay-safe side-effecting pass-through (returns the
%% input value unchanged). Effect :: Value -> any.
tap(Effect) ->
fun (Ctx) ->
case flow:is_susp(Ctx) of
true -> Ctx;
false -> Effect(flow:ctx_value(Ctx)), Ctx
end
end.
%% attempt(Nodes) — railway sequence: thread left-to-right but stop at
%% the first node whose value is a fail, returning that failure.
attempt(Nodes) ->
fun (Ctx) -> attempt_step(Nodes, Ctx) end.
attempt_step([], Ctx) -> Ctx;
attempt_step([N | Ns], Ctx) ->
case flow:is_susp(Ctx) of
true -> Ctx;
false ->
case failed(flow:ctx_value(Ctx)) of
true -> Ctx;
false -> attempt_step(Ns, N(Ctx))
end
end.
%% ── exception-style control ─────────────────────────────────────
%% Nodes are pure (effects go through suspend, run by the driver), so a
%% try around a node never wraps a blocking receive — safe in this
%% runtime.
%% try_catch(Node, Handler) — run Node; if it raises, run Handler on the
%% exception. Handler :: Exception -> Value.
try_catch(Node, Handler) ->
fun (Ctx) ->
case flow:is_susp(Ctx) of
true -> Ctx;
false ->
Log = flow:ctx_log(Ctx),
try Node(Ctx) of
R -> R
catch
throw:E -> flow:cont(Handler(E), Log);
error:E -> flow:cont(Handler(E), Log);
exit:E -> flow:cont(Handler(E), Log)
end
end
end.
%% retry(N, Node) — run Node, retrying up to N attempts on a raise.
retry(N, Node) ->
fun (Ctx) -> retry_step(N, Node, Ctx) end.
retry_step(N, Node, Ctx) ->
case flow:is_susp(Ctx) of
true -> Ctx;
false ->
try Node(Ctx) of
R -> R
catch
throw:Reason -> retry_reraise(N, Node, Ctx, throw, Reason);
error:Reason -> retry_reraise(N, Node, Ctx, error, Reason);
exit:Reason -> retry_reraise(N, Node, Ctx, exit, Reason)
end
end.
retry_reraise(N, Node, Ctx, Class, Reason) ->
case N =< 1 of
false -> retry_step(N - 1, Node, Ctx);
true ->
case Class of
throw -> throw(Reason);
error -> erlang:error(Reason);
exit -> exit(Reason)
end
end.

161
next/flow/flow_store.erl Normal file
View File

@@ -0,0 +1,161 @@
-module(flow_store).
-export([start_link/0, start_link/1, stop/0,
register_flow/2, resolve_flow/1, registered_flows/0,
start/2, resume/2, status/1, instances/0]).
-export([init/1, handle_call/3, handle_cast/2, handle_info/2]).
-behaviour(gen_server).
%% flow-on-erlang durable store — the named-flow registry plus the
%% instance table that makes suspend/resume durable. flow.erl is the
%% pure replay driver; this gen_server is the stateful shell around it,
%% holding the registry (so triggers can reference flows by name, and
%% so an instance can be re-resolved + replayed after a restart) and
%% each instance's accumulated replay log.
%%
%% Crucially the driver stays OUT of any blocking context: start/resume
%% call flow:drive/3 (pure — no receive, no gen_server:call) from inside
%% handle_call, and the only message-passing is the caller's
%% gen_server:call into this store. (A blocking receive inside a `try`
%% deadlocks this cooperative scheduler, so the engine never does one.)
%%
%% State: {Registry, Instances, NextId}
%% Registry = [{Name, FlowFun}, ...]
%% Instances = [{Id, {Name, Input, Log, Status}}, ...]
%% Status = {suspended, Tag} | {done, Result}
%% Log = [{Tag, ResolvedValue}, ...] (the replay log — plain
%% data, so an instance is fully described by its log and
%% survives process restart by re-driving the named flow)
%%
%% v1 backs the store in gen_server memory; persisting the instance
%% logs to the kernel's durable log (so flows survive an OS restart) is
%% a later layer — the data shape is already restart-ready.
start_link() ->
start_link([]).
start_link(InitialFlows) ->
Pid = gen_server:start_link(flow_store, [InitialFlows]),
erlang:register(flow_store, Pid),
Pid.
stop() ->
R = gen_server:call(flow_store, '$gen_stop'),
erlang:unregister(flow_store),
R.
%% register_flow(Name, Flow) — register a named flow (a node fun). Named
%% rather than `register` to avoid the erlang:register/2 auto-import.
register_flow(Name, Flow) ->
gen_server:call(flow_store, {register_flow, Name, Flow}).
resolve_flow(Name) ->
gen_server:call(flow_store, {resolve_flow, Name}).
registered_flows() ->
gen_server:call(flow_store, registered_flows).
%% start(Name, Input) -> {ok, Id, Result} | {error, no_such_flow}.
%% Result is {flow_done, V} | {flow_suspended, Tag}; the instance is
%% recorded either way so a suspended flow can be resumed by Id.
start(Name, Input) ->
gen_server:call(flow_store, {start, Name, Input}).
%% resume(Id, Value) -> {ok, Result} | {error, Reason}. Resolves the
%% instance's current suspend tag with Value (appends {Tag, Value} to
%% its replay log) and re-drives from the top.
resume(Id, Value) ->
gen_server:call(flow_store, {resume, Id, Value}).
%% status(Id) -> {ok, {suspended, Tag}} | {ok, {done, Result}} | not_found
status(Id) ->
gen_server:call(flow_store, {status, Id}).
instances() ->
gen_server:call(flow_store, instances).
%% ── gen_server ──────────────────────────────────────────────────
init([InitialFlows]) ->
{ok, {InitialFlows, [], 1}}.
handle_call({register_flow, Name, Flow}, _From, {Reg, Ins, N}) ->
{reply, ok, {set_keyed(Name, Flow, Reg), Ins, N}};
handle_call({resolve_flow, Name}, _From, {Reg, Ins, N}) ->
{reply, find_keyed(Name, Reg), {Reg, Ins, N}};
handle_call(registered_flows, _From, {Reg, Ins, N}) ->
{reply, [Name || {Name, _} <- Reg], {Reg, Ins, N}};
handle_call({start, Name, Input}, _From, {Reg, Ins, N}) ->
case find_keyed(Name, Reg) of
not_found ->
{reply, {error, no_such_flow}, {Reg, Ins, N}};
{ok, Flow} ->
case safe_drive(Flow, Input, []) of
{ok, R} ->
Status = result_status(R),
Ins2 = set_keyed(N, {Name, Input, [], Status}, Ins),
{reply, {ok, N, R}, {Reg, Ins2, N + 1}};
{error, Crash} ->
{reply, {error, {flow_crashed, Crash}}, {Reg, Ins, N}}
end
end;
handle_call({resume, Id, Value}, _From, {Reg, Ins, N}) ->
case find_keyed(Id, Ins) of
not_found ->
{reply, {error, no_such_instance}, {Reg, Ins, N}};
{ok, {_Name, _Input, _Log, {done, _}}} ->
{reply, {error, already_done}, {Reg, Ins, N}};
{ok, {Name, Input, Log, {suspended, Tag}}} ->
case find_keyed(Name, Reg) of
not_found ->
{reply, {error, no_such_flow}, {Reg, Ins, N}};
{ok, Flow} ->
NewLog = log_append(Log, Tag, Value),
case safe_drive(Flow, Input, NewLog) of
{ok, R} ->
Status = result_status(R),
Ins2 = set_keyed(Id, {Name, Input, NewLog, Status}, Ins),
{reply, {ok, R}, {Reg, Ins2, N}};
{error, Crash} ->
{reply, {error, {flow_crashed, Crash}}, {Reg, Ins, N}}
end
end
end;
handle_call({status, Id}, _From, {Reg, Ins, N}) ->
case find_keyed(Id, Ins) of
{ok, {_Name, _Input, _Log, Status}} -> {reply, {ok, Status}, {Reg, Ins, N}};
not_found -> {reply, not_found, {Reg, Ins, N}}
end;
handle_call(instances, _From, {Reg, Ins, N}) ->
{reply, [Id || {Id, _} <- Ins], {Reg, Ins, N}}.
handle_cast(_, S) -> {noreply, S}.
handle_info(_, S) -> {noreply, S}.
%% ── helpers ─────────────────────────────────────────────────────
result_status({flow_done, R}) -> {done, R};
result_status({flow_suspended, T}) -> {suspended, T}.
%% safe_drive/3 — flow:drive is pure (no blocking receive), so a `try`
%% around it is safe in this runtime and isolates a flow whose step
%% raises: the store returns {error, {flow_crashed, _}} instead of the
%% gen_server crashing, keeping one bad flow from taking down others.
safe_drive(Flow, Input, Log) ->
try {ok, flow:drive(Flow, Input, Log)}
catch
throw:R -> {error, {throw, R}};
error:R -> {error, {error, R}};
exit:R -> {error, {exit, R}}
end.
log_append([], Tag, Value) -> [{Tag, Value}];
log_append([H | T], Tag, Value) -> [H | log_append(T, Tag, Value)].
find_keyed(_, []) -> not_found;
find_keyed(K, [{K, V} | _]) -> {ok, V};
find_keyed(K, [_ | Rest]) -> find_keyed(K, Rest).
set_keyed(K, V, []) -> [{K, V}];
set_keyed(K, V, [{K, _} | Rest]) -> [{K, V} | Rest];
set_keyed(K, V, [P | Rest]) -> [P | set_keyed(K, V, Rest)].

View File

@@ -0,0 +1,81 @@
-module(blog_publish_digest).
-export([build/1]).
%% A motivating multi-step business flow for the fed-sx-triggers e2e:
%% when an Article is published, decide a batch policy by category,
%% (for newsletters) wait until morning, fetch the author's followers,
%% build a digest email for each, and emit a DigestSent activity — the
%% flow's own output, which a driver appends, closing the loop so it can
%% trigger downstream flows.
%%
%% Demonstrates: a branch on an activity field (:category), a timer
%% suspension (flow:wait/1, resumed by advancing the clock), an injected
%% effect (fetch_followers), and a follow-up activity emit.
%%
%% Effect-as-data: a flow runs inside flow_store's drive, where a
%% blocking call (e.g. into nx_kernel) would deadlock this scheduler, so
%% the flow does NOT perform IO itself. It DESCRIBES the effects in its
%% result — {digest_sent, Emails, DigestActivityObject} — and the driver
%% (the fan-out caller) dispatches the emails and appends the DigestSent
%% activity. fetch_followers is injected (the one external read) as a
%% pure function so the e2e can supply a deterministic list.
%%
%% Input env (from flow_dispatch): [{activity, A}, {actor, Actor}, ...].
%% Result: {digest_sent, [Email], DigestObject} | skipped.
build(Effects) ->
FetchFollowers = field(fetch_followers, Effects),
flow_spec:branch(
fun (Env) -> is_article(Env) end,
flow_spec:branch(
fun (Env) -> category_is(Env, newsletter) end,
%% newsletter: hold until morning, then send + emit
flow_spec:sequence([flow:wait(morning), send_emit(FetchFollowers)]),
flow_spec:branch(
fun (Env) -> category_is(Env, urgent) end,
%% urgent: send + emit now (no wait)
send_emit(FetchFollowers),
%% any other category: skip
flow_spec:flow_const(skipped))),
%% not an Article: skip
flow_spec:flow_const(skipped)).
%% send_emit(FetchFollowers) — the terminal step: build one digest email
%% per follower and the DigestSent emit object. Pure given the injected
%% follower list, so it is replay-safe (and it sits after the only
%% suspend point, so it runs exactly once).
send_emit(FetchFollowers) ->
flow_spec:flow_node(
fun (Env) ->
Activity = env_activity(Env),
Actor = env_actor(Env),
ArtId = activity_id(Activity),
Followers = FetchFollowers(Actor),
Emails = [ [{to, F}, {article, ArtId}] || F <- Followers ],
Digest = [{type, digest_sent},
{for, ArtId},
{follower_count, length(Followers)}],
{digest_sent, Emails, Digest}
end).
%% ── predicates / accessors ──────────────────────────────────────
is_article(Env) ->
object_type(object_of(env_activity(Env))) =:= article.
category_is(Env, Cat) ->
object_category(object_of(env_activity(Env))) =:= Cat.
env_activity(Env) -> field(activity, Env).
env_actor(Env) -> field(actor, Env).
object_of(Activity) -> field(object, Activity).
object_type(Obj) -> field(type, Obj).
object_category(Obj) -> field(category, Obj).
activity_id(Activity) -> field(id, Activity).
field(Key, Proplist) ->
case envelope:get_field(Key, Proplist) of
{ok, V} -> V;
_ -> undefined
end.

View File

@@ -0,0 +1,33 @@
;; next/genesis/activity-types/define_trigger.sx
;;
;; Bootstrap definition of the DefineTrigger verb per
;; plans/agent-briefings/fed-sx-triggers-loop.md (Phase 1) and
;; plans/fed-sx-design.md §13. Read as data by the bundler
;; (bootstrap.erl) — never evaluated as code.
;;
;; DefineTrigger binds an activity-type to a flow. When a matching
;; activity is appended to the log, the kernel's trigger fan-out
;; (pipeline.erl, post-append) looks the type up in the trigger
;; registry and starts the named flow with the activity as input.
;; The activity's :object is the binding record:
;; {:activity-type "Create" ;; the verb to fire on
;; :flow-name "blog-publish-digest"
;; :guard <optional predicate> ;; discriminator
;; :actor-scope <optional actor id>} ;; default: any
;;
;; The schema validates the *activity* shape: :object present with
;; string :activity-type and :flow-name. The optional :guard lets one
;; type bind to multiple flows with discriminators; it is resolved to
;; an Erlang predicate at registration time (trigger_registry), not
;; carried in the pure-predicate schema here. Schema bodies use nested
;; `get` (not keyword-threading) so the predicate is evaluatable.
(DefineActivity
:name "DefineTrigger"
:doc "Bind an activity-type to a flow. :object carries :activity-type, :flow-name, and optional :guard and :actor-scope."
:schema (fn
(act)
(and
(not (nil? (get act :object)))
(string? (get (get act :object) :activity-type))
(string? (get (get act :object) :flow-name))))
:semantics (fn (state act) state))

View File

@@ -0,0 +1,34 @@
;; next/genesis/activity-types/define_type.sx
;;
;; Bootstrap definition of the DefineType verb per
;; plans/fed-sx-host-types.md (host-type federation, Phase 1).
;; Read as data by the bundler (bootstrap.erl) — never evaluated as
;; code. The :schema and :semantics bodies are SX source.
;;
;; DefineType declares a refinement type. The activity's :object is
;; the type record:
;; {:name "Post" ;; the type's display name
;; :fields (...) ;; optional field descriptors
;; :refinement-schema (fn (obj) ...) ;; predicate over instances
;; :instance-type "Note"} ;; base object-type it refines
;;
;; The schema below validates the *activity* shape: :object present,
;; :name a string, :fields (when present) a list. The richer
;; per-field shape check and the registry registration land with the
;; peer_types cache (Phase 2) — at this phase the form is pure data.
;;
;; Schema bodies use nested `get` rather than keyword-threading so
;; the predicate is directly evaluatable (keywords are not callable
;; getters in the kernel; `(-> d :k)` is not a get).
(DefineActivity
:name "DefineType"
:doc "Declare a refinement type. :object carries :name, optional :fields, :refinement-schema, and :instance-type."
:schema (fn
(act)
(and
(not (nil? (get act :object)))
(string? (get (get act :object) :name))
(or
(nil? (get (get act :object) :fields))
(list? (get (get act :object) :fields)))))
:semantics (fn (state act) state))

View File

@@ -0,0 +1,31 @@
;; next/genesis/activity-types/subtype_of.sx
;;
;; Bootstrap definition of the SubtypeOf verb per
;; plans/fed-sx-host-types.md (host-type federation, Phase 1).
;; Read as data by the bundler (bootstrap.erl) — never evaluated as
;; code. The :schema and :semantics bodies are SX source.
;;
;; SubtypeOf records a hierarchy edge between two previously-defined
;; types. The activity's :object is the relation record:
;; {:child-type-cid "bafy...child"
;; :parent-type-cid "bafy...parent"}
;;
;; The schema validates the *activity* shape: both CIDs present and
;; string-typed. Verifying that each CID names a previously-defined
;; type is a registry concern (it needs the type index that lands
;; with peer_types in Phase 2), so it is deliberately out of the
;; pure-predicate schema here — adding the edge to the hierarchy
;; index is the :semantics' job once the registry surface exists.
;;
;; Schema bodies use nested `get` rather than keyword-threading so
;; the predicate is directly evaluatable.
(DefineActivity
:name "SubtypeOf"
:doc "Record a subtype edge. :object carries :child-type-cid and :parent-type-cid, both type CIDs."
:schema (fn
(act)
(and
(not (nil? (get act :object)))
(string? (get (get act :object) :child-type-cid))
(string? (get (get act :object) :parent-type-cid))))
:semantics (fn (state act) state))

View File

@@ -22,7 +22,10 @@
"activity-types/update.sx"
"activity-types/delete.sx"
"activity-types/announce.sx"
"activity-types/endorse.sx")
"activity-types/endorse.sx"
"activity-types/define_type.sx"
"activity-types/subtype_of.sx"
"activity-types/define_trigger.sx")
:object-types ("object-types/sx-artifact.sx"
"object-types/note.sx"
"object-types/tombstone.sx"

View File

@@ -5,9 +5,10 @@
backoff_for/1, schedule_for/1,
record_failure_pure/3, record_success_pure/2,
next_due_pure/2, attempts_for/2, next_retry_at/2,
dead_letter_list/1,
dead_letter_list/1, timer_ref_for/2,
start_link/1, start_link/2, stop/1,
enqueue/2, flush/1, pending_srv/1, set_dispatch_fn/2]).
enqueue/2, flush/1, pending_srv/1, set_dispatch_fn/2,
state_srv/1]).
-export([init/1, handle_call/3, handle_cast/2, handle_info/2]).
%% Outbound delivery worker per design §13.4. One gen_server per
@@ -49,6 +50,7 @@ new(PeerId) ->
{attempts, []},
{next_retry, []},
{dead_letter, []},
{timers, []},
{dispatch_fn, undefined}].
pending(State) -> field(pending, State).
@@ -183,6 +185,16 @@ next_retry_at(Cid, State) ->
dead_letter_list(State) -> field(dead_letter, State).
%% Step 8b-timer: per-cid timer ref accessor. Exposed for tests so
%% they can assert a retry timer was scheduled (or wasn't, after a
%% success / dead-letter). Returns the live Ref or undefined.
timer_ref_for(Cid, State) ->
case find_keyed(Cid, field(timers, State)) of
{ok, Ref} -> Ref;
_ -> undefined
end.
move_to_dead_letter(Cid, State) ->
Pending = field(pending, State),
{Match, Rest} = take_by_cid(Cid, Pending, [], []),
@@ -229,6 +241,13 @@ pending_srv(PeerId) ->
set_dispatch_fn(PeerId, Fn) ->
gen_server:call(PeerId, {set_dispatch_fn, Fn}).
%% Step 8b-timer: return the worker's full state so tests can use the
%% pure introspection functions (attempts_for / next_retry_at /
%% timer_ref_for / dead_letter_list) against it.
state_srv(PeerId) ->
gen_server:call(PeerId, get_state).
%% gen_server callbacks
init([PeerId, DispatchFn]) ->
@@ -238,17 +257,138 @@ init([PeerId, DispatchFn]) ->
handle_call({enqueue, Activity}, _From, State) ->
{reply, ok, enqueue_pure(field(peer, State), Activity, State)};
handle_call(flush, _From, State) ->
{NewState, Delivered, Retry} = drain_pure(State),
%% Step 8b-timer: drain (which already bumps :attempts via
%% bump_attempt on each failed deliver), then for each retried
%% Cid compute the backoff slot from the now-current attempt
%% count, set NextRetryAt, and arm a send_after self-cast.
%% handle_info({retry, Cid}, ...) fires when the slot elapses.
%% Reply shape unchanged.
{DrainState, Delivered, Retry} = drain_pure(State),
Now = monotonic_seconds(),
NewState = lists:foldl(
fun(Cid, S) -> arm_retry_timer(Cid, Now, S) end,
DrainState, Retry),
{reply, {ok, Delivered, Retry}, NewState};
handle_call(get_pending, _From, State) ->
{reply, field(pending, State), State};
handle_call(get_state, _From, State) ->
{reply, State, State};
handle_call({set_dispatch_fn, Fn}, _From, State) ->
{reply, ok, set_field(dispatch_fn, Fn, State)}.
handle_cast(_, S) -> {noreply, S}.
%% Step 8b-timer: a retry timer fired. Pull the activity by Cid from
%% the pending queue (it might have been drained meanwhile by a
%% concurrent flush — if so, we just clear bookkeeping and exit).
%% Run deliver_one_pure: success clears retry state; failure bumps
%% the counter and schedules the next slot — or dead-letters if the
%% sixth attempt failed.
handle_info({retry, Cid}, State) ->
%% Clear the timer ref we just consumed.
State0 = clear_timer_ref(Cid, State),
case take_by_cid(Cid, field(pending, State0), [], 0) of
{none, _} ->
%% Already drained / dead-lettered. Clear any stale
%% bookkeeping in case the cid is half-tracked.
{noreply, record_success_pure(Cid, State0)};
{Activity, Rest} ->
case deliver_one_pure(Activity, State0) of
{ok, _} ->
State1 = set_field(pending, Rest, State0),
State2 = record_success_pure(Cid, State1),
{noreply, State2};
{error, _, _} ->
%% Keep the activity in pending; record_failure
%% leaves :pending alone (or dead-letters it on
%% slot 6).
Now = monotonic_seconds(),
State1 = schedule_retry_for(Cid, Now, State0),
{noreply, State1}
end
end;
handle_info(_, S) -> {noreply, S}.
%% Step 8b-timer helpers ────────────────────────────────────────────
%% arm_retry_timer/3 — POST-DRAIN form. Used from handle_call(flush)
%% after drain_pure has already bumped :attempts via bump_attempt.
%% Sets next_retry_at = Now + backoff(attempts) and schedules the
%% send_after self-cast. On the dead-letter slot (attempt 6), moves
%% the activity from :pending to :dead_letter and arms no timer.
arm_retry_timer(Cid, Now, State) ->
State0 = cancel_timer_for(Cid, State),
Attempts = attempts_for(Cid, State0),
case backoff_for(Attempts) of
dead_letter ->
move_to_dead_letter(Cid, State0);
Seconds ->
NextAt = Now + Seconds,
NR = field(next_retry, State0),
State1 = set_field(next_retry, set_keyed(Cid, NextAt, NR), State0),
Ms = Seconds * 1000,
Ref = erlang:send_after(Ms, self(), {retry, Cid}),
Timers = field(timers, State1),
set_field(timers, set_keyed(Cid, Ref, Timers), State1)
end.
%% schedule_retry_for/3 — POST-RETRY-ATTEMPT form. Used from
%% handle_info({retry, Cid}, ...) when the retry attempt failed.
%% Bookkeep one failure and arm the next retry timer (or promote
%% to dead-letter, in which case no timer is needed).
schedule_retry_for(Cid, Now, State) ->
%% Cancel any in-flight timer for this Cid before scheduling a new
%% one. Without the cancel a stale timer can still fire after
%% record_success has cleared the cid, the handle_info no-match
%% branch silently absorbs it — but it keeps the scheduler's
%% run-loop alive long after the work is done. A pure clear (no
%% cancel) is fine when the timer's own firing brought us here,
%% so the explicit cancel only matters for the flush path.
State0 = cancel_timer_for(Cid, State),
State1 = record_failure_pure(Cid, Now, State0),
Attempts = attempts_for(Cid, State1),
case backoff_for(Attempts) of
dead_letter ->
State1;
Seconds ->
Ms = Seconds * 1000,
Ref = erlang:send_after(Ms, self(), {retry, Cid}),
Timers = field(timers, State1),
set_field(timers, set_keyed(Cid, Ref, Timers), State1)
end.
%% Cancel the live timer for Cid (if any) and clear it from :timers.
%% Idempotent — silent no-op if there isn't one.
cancel_timer_for(Cid, State) ->
Timers = field(timers, State),
case find_keyed(Cid, Timers) of
{ok, Ref} ->
erlang:cancel_timer(Ref),
set_field(timers, del_keyed(Cid, Timers), State);
_ -> State
end.
%% Drop the :timers entry for Cid without calling cancel_timer — used
%% when the timer's own firing brought us into handle_info and the ref
%% is already consumed.
clear_timer_ref(Cid, State) ->
Timers = field(timers, State),
case find_keyed(Cid, Timers) of
{ok, _Ref} -> set_field(timers, del_keyed(Cid, Timers), State);
_ -> State
end.
%% Step 8b-timer: bookkeeping uses seconds (matches backoff_for /
%% record_failure_pure / next_retry_at). The monotonic clock reports
%% ms; we floor to seconds here to keep all the comparisons aligned.
monotonic_seconds() -> erlang:monotonic_time() div 1000.
%% ── Internal ────────────────────────────────────────────────────
activity_cid(Activity) ->

View File

@@ -0,0 +1,118 @@
-module(discovery_type_fetch).
-export([make_fetch_fn/0, make_fetch_fn/1,
fetch/2,
type_doc_url/2,
resolve_type_url/2,
accept_header/0]).
%% Live type-doc fetch for peer_types — host-type federation Step 3,
%% the sibling of discovery_fetch.erl. peer_types:lookup_or_fetch/3
%% calls a Cfg-supplied type_fetch_fn :: fun ((TypeCid, Cfg) -> {ok,
%% Bytes} | {error, _}) on a cache miss; this module produces that
%% closure for live federation. It GETs <base>/types/<cid> with an
%% Accept header that asks for the type-doc format (http_server.erl
%% Step 3) and returns the RAW response bytes — peer_types decodes
%% them via term_codec into the TypeRecord. (This is the one shape
%% difference from discovery_fetch, whose closure returns an already-
%% decoded actor-state: there the cache stores the decoded AS, here
%% peer_types owns the decode so the type-doc wire format lives in one
%% place — the /types/ route encodes, peer_types decodes.)
%%
%% Cfg shape (parallels discovery_fetch's peer URL resolution):
%% {type_url, [{TypeCid, BaseUrl}, ...]}
%% {type_url_fn, fun ((TypeCid) -> {ok, BaseUrl} | not_found)}
%%
%% BaseUrl shape: <<"http://host:port">> (no trailing slash; this
%% module appends the path). TypeCid is the type's CID bytes.
%%
%% Outcomes:
%% 2xx -> {ok, Bytes}
%% non-2xx -> {error, {status, N}}
%% resolver miss -> {error, no_type_url}
%% transport -> {error, Reason}
%% ── Accept header ────────────────────────────────────────────
%% "application/vnd.fed-sx.type-doc" — same MIME http_server's
%% content_type_for(type_doc) emits, so the Accept negotiation routes
%% the served bytes to the term_codec-encoded TypeRecord arm.
accept_header() ->
<<97,112,112,108,105,99,97,116,105,111,110,47,
118,110,100,46,102,101,100,45,115,120,46,
116,121,112,101,45,100,111,99>>.
%% ── public API ───────────────────────────────────────────────
%% make_fetch_fn/0 — the fun/2 peer_types:lookup_or_fetch calls. It
%% reads the type-URL resolver out of the Cfg passed at call time, so
%% the same Cfg threads through peer_types and this closure.
make_fetch_fn() ->
fun (TypeCid, Cfg) ->
case resolve_type_url(TypeCid, Cfg) of
{error, R} -> {error, R};
{ok, BaseUrl} -> fetch(type_doc_url(BaseUrl, TypeCid), Cfg)
end
end.
%% make_fetch_fn/1 — variant that closes over a static Cfg for the
%% resolver while still honouring the call-time Cfg for transport.
%% Lets a caller bake the type_url map once and reuse the closure.
make_fetch_fn(StaticCfg) ->
fun (TypeCid, Cfg) ->
case resolve_type_url(TypeCid, StaticCfg) of
{error, R} -> {error, R};
{ok, BaseUrl} -> fetch(type_doc_url(BaseUrl, TypeCid), Cfg)
end
end.
fetch(Url, _Cfg) ->
AcceptKey = <<97,99,99,101,112,116>>, % "accept"
Headers = [{AcceptKey, accept_header()}],
try httpc:request(Url, get, Headers, <<>>) of
{ok, Status, _H, Body} when Status >= 200, Status < 300 ->
{ok, Body};
{ok, Status, _H, _B} ->
{error, {status, Status}};
Other ->
{error, {bad_response, Other}}
catch
error:Reason -> {error, Reason}
end.
%% type_doc_url/2 — <BaseUrl>/types/<cid>. TypeCid is the cid bytes,
%% appended verbatim as the path segment (matches the "/types/" prefix
%% http_server.erl registers).
type_doc_url(BaseUrl, TypeCid) when is_binary(TypeCid) ->
%% "/types/" — 7 bytes
Prefix = <<47,116,121,112,101,115,47>>,
<<BaseUrl/binary, Prefix/binary, TypeCid/binary>>.
%% resolve_type_url/2 — map a TypeCid to its serving node's base URL.
%% type_url_fn (a 1-arity closure) takes precedence over the static
%% type_url proplist; absent both -> {error, no_type_url}.
resolve_type_url(TypeCid, Cfg) ->
case field(type_url_fn, Cfg) of
Fn when is_function(Fn, 1) ->
case Fn(TypeCid) of
{ok, BaseUrl} -> {ok, BaseUrl};
_ -> {error, no_type_url}
end;
_ ->
case field(type_url, Cfg) of
nil -> {error, no_type_url};
Map ->
case find_keyed(TypeCid, Map) of
{ok, BaseUrl} -> {ok, BaseUrl};
_ -> {error, no_type_url}
end
end
end.
%% ── helpers ──────────────────────────────────────────────────
field(K, [{K, V} | _]) -> V;
field(K, [_ | Rest]) -> field(K, Rest);
field(_, []) -> nil.
find_keyed(_, []) -> {error, not_found};
find_keyed(K, [{K, V} | _]) -> {ok, V};
find_keyed(K, [_ | Rest]) -> find_keyed(K, Rest).

View File

@@ -0,0 +1,76 @@
-module(flow_dispatch).
-export([start/4, guard_passes/3]).
%% Bridge from "an activity matched a trigger" to "a flow started with
%% that activity as input" (fed-sx-triggers Phase 3). A NATIVE call into
%% next/flow (flow_store) — the engine is Erlang-on-SX too, so there is
%% no cross-guest FFI: the kernel and the workflow engine share one
%% runtime.
%%
%% start(Spec, Activity, ActorState, Cfg)
%% -> {ok, FlowId, {ActivityCid, TriggerCid, FlowId}} (audit triple)
%% | {error, Reason}
%%
%% The flow named in Spec is started with the activity bound into its
%% input environment, so flow steps can read the activity, the actor id,
%% and the trigger cid (the audit chain). Flow-start failures — an
%% unknown flow name, or a crashing first step (flow_store isolates the
%% raise) — come back as {error, Reason}, never raised, so the fan-out
%% caller is insulated from one flow's failure.
start(Spec, Activity, ActorState, _Cfg) ->
FlowName = trigger_registry:spec_flow_name(Spec),
TriggerCid = trigger_registry:spec_cid(Spec),
ActivityCid = activity_cid(Activity),
Input = [{activity, Activity},
{actor, actor_id_of(ActorState, Activity)},
{trigger_cid, TriggerCid}],
case flow_store:start(FlowName, Input) of
{ok, FlowId, _Result} ->
{ok, FlowId, {ActivityCid, TriggerCid, FlowId}};
{error, Reason} ->
{error, Reason}
end.
%% guard_passes(Spec, Activity, ActorState) — a spec fires when its
%% actor-scope admits the activity's actor AND its guard (if any)
%% returns true. An `any` scope and an `undefined` guard always pass;
%% the guard lets one activity-type bind multiple flows with
%% discriminators.
guard_passes(Spec, Activity, ActorState) ->
scope_ok(trigger_registry:spec_actor_scope(Spec), Activity) andalso
guard_ok(trigger_registry:spec_guard(Spec), Activity, ActorState).
scope_ok(any, _Activity) -> true;
scope_ok(Scope, Activity) ->
case envelope:get_field(actor, Activity) of
{ok, Scope} -> true;
_ -> false
end.
guard_ok(undefined, _Activity, _ActorState) -> true;
guard_ok(Guard, Activity, ActorState) when is_function(Guard, 2) ->
Guard(Activity, ActorState);
guard_ok(_, _, _) -> false.
%% ── helpers ─────────────────────────────────────────────────────
activity_cid(Activity) ->
case envelope:get_field(id, Activity) of
{ok, Cid} -> Cid;
_ -> undefined
end.
%% actor_id_of/2 — prefer the receiving actor's id (ActorState carries
%% {actor_id, _}); fall back to the activity's :actor. Reading
%% ActorState as a proplist keeps this decoupled from actor_state's
%% internal shape and testable with a plain [{actor_id, _}] stand-in.
actor_id_of(ActorState, Activity) ->
case envelope:get_field(actor_id, ActorState) of
{ok, Id} -> Id;
_ ->
case envelope:get_field(actor, Activity) of
{ok, A} -> A;
_ -> undefined
end
end.

View File

@@ -4,6 +4,7 @@
welcome_body/0, capabilities_body/0,
capabilities_path/0,
match_prefix/2, actors_prefix/0, actor_doc_response/1,
types_prefix/0, type_doc_response_for/2,
artifacts_prefix/0, artifact_response/1,
projections_list_path/0, projections_prefix/0,
projections_list_response/0, projection_response/1,
@@ -156,7 +157,12 @@ dispatch(<<71, 69, 84>>, Path, F, Cfg) ->
{ok, Name} when byte_size(Name) > 0 ->
projection_response_for(Name, F);
_ ->
not_found_response()
case match_prefix(types_prefix(), Path) of
{ok, Cid} when byte_size(Cid) > 0 ->
type_doc_response_for(Cid, Cfg);
_ ->
not_found_response()
end
end
end
end;
@@ -289,6 +295,10 @@ artifact_response(Cid) ->
Body = <<Pre/binary, Cid/binary, 10>>,
ok_response(Body).
%% "/types/" — 7 bytes: 47 116 121 112 101 115 47 (host-type fed Step 3)
types_prefix() ->
<<47,116,121,112,101,115,47>>.
%% "/projections" — 12 bytes (no trailing slash; the list endpoint)
projections_list_path() ->
<<47,112,114,111,106,101,99,116,105,111,110,115>>.
@@ -488,9 +498,20 @@ actor_doc_prefix() ->
118,110,100,46,102,101,100,45,115,120,46,
97,99,116,111,114,45,100,111,99>>.
%% "application/vnd.fed-sx.type-doc" — 31 bytes (host-type fed Step 3).
%% Distinct from actor-doc: the body is a term_codec-encoded
%% TypeRecord (peer_types cache entry), not a peer-actor-state.
type_doc_prefix() ->
<<97,112,112,108,105,99,97,116,105,111,110,47,
118,110,100,46,102,101,100,45,115,120,46,
116,121,112,101,45,100,111,99>>.
accept_format(nil) -> text;
accept_format(<<>>) -> text;
accept_format(V) when is_binary(V) ->
case match_prefix(type_doc_prefix(), V) of
{ok, _} -> type_doc;
_ ->
case match_prefix(actor_doc_prefix(), V) of
{ok, _} -> actor_doc;
_ ->
@@ -510,6 +531,7 @@ accept_format(V) when is_binary(V) ->
end
end
end
end
end;
accept_format(_) -> text.
@@ -586,6 +608,11 @@ content_type_for(actor_doc) ->
<<97,112,112,108,105,99,97,116,105,111,110,47,
118,110,100,46,102,101,100,45,115,120,46,
97,99,116,111,114,45,100,111,99>>;
%% "application/vnd.fed-sx.type-doc" — 31 bytes (host-type fed Step 3).
content_type_for(type_doc) ->
<<97,112,112,108,105,99,97,116,105,111,110,47,
118,110,100,46,102,101,100,45,115,120,46,
116,121,112,101,45,100,111,99>>;
content_type_for(_) ->
content_type_for(text).
@@ -714,6 +741,42 @@ kernel_actor_state(_Kernel, Id) ->
_ -> nil
end.
%% ── host-type fed Step 3: GET /types/<cid> ──────────────────────
%%
%% Serves a TypeRecord the node has cached (its own published types or
%% types fetched from peers) so a federated peer running
%% discovery_type_fetch can decode it directly into the shape
%% peer_types + the object-schema pipeline stage consume. The wire
%% body is term_codec:encode(TypeRecord) under the
%% application/vnd.fed-sx.type-doc content type; a cache miss is a 404.
%%
%% Cid is the path segment after "/types/" (the type's CID bytes). Cfg
%% carries `{peer_types, peer_types}` to opt the route into the cache —
%% absent (or the gen_server down) short-circuits to 404, matching the
%% kernel_actor_state guard for the actor-doc route. This port can't
%% dispatch `Mod:Fun` on a variable module, so the registered
%% `peer_types` atom is hardcoded; the Cfg field flags "no cache wired".
type_doc_response_for(Cid, Cfg) ->
case type_record_for(Cfg, Cid) of
nil -> not_found_response();
TR -> ok_response(term_codec:encode(TR), type_doc)
end.
type_record_for(Cfg, Cid) ->
case field(peer_types, Cfg) of
nil -> nil;
_ ->
case erlang:whereis(peer_types) of
undefined -> nil;
_ ->
case peer_types:lookup(Cid) of
{ok, TR} -> TR;
_ -> nil
end
end
end.
%% ── Step 4a: per-actor sub-resource stubs ──────────────────────
%% Per design §16.1 each actor has /outbox /inbox /followers
%% /following routes. v1 returns text-stub bodies so route resolution

180
next/kernel/peer_types.erl Normal file
View File

@@ -0,0 +1,180 @@
-module(peer_types).
-export([new/0, lookup/2, store/3, evict/2, types/1,
lookup_or_fetch/3, decode_type_doc/1,
start_link/0, start_link/1, stop/0,
put/2, lookup/1, state_for/1, known_types/0,
lookup_or_fetch/2, evict/1]).
-export([init/1, handle_call/3, handle_cast/2, handle_info/2]).
-behaviour(gen_server).
%% Peer-types cache — receiver-side mirror of peer_actors.erl, for
%% host-type federation (plans/fed-sx-host-types.md, Phase 2). When an
%% inbound activity references a refinement type the local node hasn't
%% seen, the object-schema validation stage (Phase 4) needs that
%% type's record — its :refinement-schema and field shape — to vet the
%% inner object. Re-fetching the type doc on every inbound would be
%% wasteful, so we cache the TypeRecord keyed by its content-address.
%%
%% State shape (pure-functional):
%% [{TypeCidBytes, TypeRecord}, ...]
%%
%% TypeCidBytes is the type's CID (a binary). TypeRecord is the parsed
%% DefineType envelope's :object payload — a proplist carrying :name,
%% :fields, :refinement-schema, :instance-type. Refinement schemas are
%% immutable per CID (an updated type is a new CID), so cache entries
%% never go stale — TTL-free, like peer_actors' v2 entries.
%%
%% lookup_or_fetch is the load-bearing entry point: a miss invokes a
%% Cfg-supplied closure to fetch the type doc over the wire. Per the
%% design the closure has shape
%% type_fetch_fn :: fun ((TypeCid, Cfg) -> {ok, Bytes} | {error, _})
%% returning the term_codec-encoded type-doc bytes; lookup_or_fetch
%% decodes them into the TypeRecord and caches it. Keeping the
%% transport in the closure (Phase 3's discovery_type_fetch) keeps
%% peer_types testable with a mocked fetch — same split as
%% peer_actors / discovery_fetch.
%%
%% gen_server wrapper registers under the atom `peer_types` so the
%% pipeline + http_server handlers can reach it without threading a
%% Pid through Cfg.
%% ── Pure-functional API ─────────────────────────────────────────
new() -> [].
lookup(TypeCid, State) ->
case find_keyed(TypeCid, State) of
{ok, TR} -> {ok, TR};
{error, _} -> not_found
end.
store(TypeCid, TR, State) ->
set_keyed(TypeCid, TR, State).
evict(TypeCid, State) ->
delete_keyed(TypeCid, State).
types(State) -> [Cid || {Cid, _TR} <- State].
%% lookup_or_fetch/3 — cache hit returns {ok, TR, State} unchanged.
%% Cache miss pulls the type_fetch_fn out of Cfg and calls it with
%% (TypeCid, Cfg); a {ok, Bytes} reply is decoded via term_codec into
%% the TypeRecord, which is then stored. Failures (no fn, fetch error,
%% bad bytes) do NOT poison the cache so the caller can retry.
%%
%% no type_fetch_fn in Cfg -> {error, no_fetch_fn, State}
%% fn -> {ok, Bytes}, decodable -> {ok, TR, store(...)}
%% fn -> {ok, Bytes}, bad bytes -> {error, bad_type_doc, State}
%% fn -> {error, Reason} -> {error, Reason, State}
%% fn -> Other -> {error, {bad_fetch_return, Other}, State}
lookup_or_fetch(TypeCid, Cfg, State) ->
case find_keyed(TypeCid, State) of
{ok, TR} -> {ok, TR, State};
{error, _} -> fetch_and_store(TypeCid, Cfg, State)
end.
fetch_and_store(TypeCid, Cfg, State) ->
case field(type_fetch_fn, Cfg) of
nil -> {error, no_fetch_fn, State};
Fn when is_function(Fn, 2) ->
case Fn(TypeCid, Cfg) of
{ok, Bytes} ->
case decode_type_doc(Bytes) of
{ok, TR} -> {ok, TR, store(TypeCid, TR, State)};
{error, R} -> {error, R, State}
end;
{error, Reason} -> {error, Reason, State};
Other -> {error, {bad_fetch_return, Other}, State}
end;
_ -> {error, bad_fetch_fn_cfg, State}
end.
%% decode_type_doc/1 — round the wire body back through term_codec.
%% The on-wire form is term_codec:encode(TypeRecord) (Phase 3's
%% /types/<cid> route), so a clean decode yields the proplist TR.
decode_type_doc(Bytes) ->
case term_codec:decode(Bytes) of
{ok, TR, _} when is_list(TR) -> {ok, TR};
_ -> {error, bad_type_doc}
end.
%% ── gen_server wrapper ──────────────────────────────────────────
start_link() ->
start_link([]).
start_link(InitialState) ->
Pid = gen_server:start_link(peer_types, [InitialState]),
erlang:register(peer_types, Pid),
Pid.
stop() ->
R = gen_server:call(peer_types, '$gen_stop'),
erlang:unregister(peer_types),
R.
%% put/2 — store a TypeRecord under its CID. Mirrors store_srv.
put(TypeCid, TR) ->
gen_server:call(peer_types, {put, TypeCid, TR}).
%% lookup/1 — cache read. {ok, TR} | not_found.
lookup(TypeCid) ->
gen_server:call(peer_types, {lookup, TypeCid}).
%% state_for/1 — alias of lookup/1, named to match peer_actors'
%% state_for accessor used by http_server's kernel bridge.
state_for(TypeCid) ->
gen_server:call(peer_types, {lookup, TypeCid}).
known_types() ->
gen_server:call(peer_types, get_types).
evict(TypeCid) ->
gen_server:call(peer_types, {evict, TypeCid}).
%% lookup_or_fetch/2 — gen_server form. Cfg carries the type_fetch_fn.
%% Reply is {ok, TR} on hit-or-fetched, {error, Reason} otherwise.
lookup_or_fetch(TypeCid, Cfg) ->
gen_server:call(peer_types, {lookup_or_fetch, TypeCid, Cfg}).
%% gen_server callbacks
init([InitialState]) ->
{ok, InitialState}.
handle_call({put, TypeCid, TR}, _From, State) ->
{reply, ok, store(TypeCid, TR, State)};
handle_call({lookup, TypeCid}, _From, State) ->
{reply, lookup(TypeCid, State), State};
handle_call({lookup_or_fetch, TypeCid, Cfg}, _From, State) ->
case lookup_or_fetch(TypeCid, Cfg, State) of
{ok, TR, NewState} -> {reply, {ok, TR}, NewState};
{error, Reason, Same} -> {reply, {error, Reason}, Same}
end;
handle_call(get_types, _From, State) ->
{reply, types(State), State};
handle_call({evict, TypeCid}, _From, State) ->
{reply, ok, evict(TypeCid, State)}.
handle_cast(_, S) -> {noreply, S}.
handle_info(_, S) -> {noreply, S}.
%% ── Internal helpers ────────────────────────────────────────────
field(K, [{K, V} | _]) -> V;
field(K, [_ | Rest]) -> field(K, Rest);
field(_, []) -> nil.
find_keyed(_, []) -> {error, not_found};
find_keyed(K, [{K, V} | _]) -> {ok, V};
find_keyed(K, [_ | Rest]) -> find_keyed(K, Rest).
set_keyed(K, V, []) -> [{K, V}];
set_keyed(K, V, [{K, _} | Rest]) -> [{K, V} | Rest];
set_keyed(K, V, [P | Rest]) -> [P | set_keyed(K, V, Rest)].
delete_keyed(_, []) -> [];
delete_keyed(K, [{K, _} | Rest]) -> Rest;
delete_keyed(K, [P | Rest]) -> [P | delete_keyed(K, Rest)].

View File

@@ -6,7 +6,9 @@
stage_envelope/1,
stage_signature/1, stage_signature/2,
stage_replay/1, stage_replay/2,
stage_schema/1, stage_schema/2]).
stage_schema/1, stage_schema/2,
apply_object_schema/2, stage_object_schema/1,
apply_triggers/3]).
%% Validation pipeline per design §14.
%%
@@ -165,3 +167,233 @@ check_object_schema(Activity, SchemaFn) ->
stage_schema(SchemaLookup) ->
fun (Activity) -> stage_schema(Activity, SchemaLookup) end.
%% ── host-type fed Step 4: object-schema validation stage ────────
%%
%% apply_object_schema/2 — when an inbound activity's :object declares
%% a refinement type ({type, TypeName} on the object), resolve that
%% type's record and apply its refinement schema to the object's
%% :field_values. Sits between activity-type (stage_schema) validation
%% and the kernel append; rejects the activity on schema-fail.
%%
%% Resolution mirrors the design note: TypeName -> TypeCid via Cfg's
%% `type_index` ([{TypeName, TypeCid}, ...], the local Define-name
%% index), then TypeCid -> TypeRecord via peer_types:lookup_or_fetch/2
%% (a local cache hit, or a wire fetch through the Cfg type_fetch_fn).
%%
%% Outcomes:
%% object has no {type, _} -> ok (no schema applies)
%% TypeName not in type_index -> ok (undeclared type;
%% open-world default)
%% record resolved, schema passes -> ok
%% record resolved, schema fails -> {error, {validation_failed,
%% object_schema}}
%% record unresolvable (cache miss + -> strict_object_schema:
%% fetch failure / no peer_types) true -> {error, ...}
%% false -> ok (skipped)
%%
%% Default strict_object_schema = false: a node only blocks on an
%% unresolvable type when it opts into airtight validation via Cfg
%% {strict_object_schema, true}. The non-strict skip is where a
%% `validation_skipped` log entry belongs (left to the caller's logger
%% so this stage keeps the ok | {error, _} contract run_stages wants).
%%
%% A TypeRecord's refinement schema is either a 1-arity Erlang
%% predicate over the field-values (the substrate stand-in, for
%% locally-defined types) or a data constraint {required, [Field, ...]}
%% (term_codec-safe, so a wire-fetched TypeRecord can still validate).
apply_object_schema(Activity, Cfg) ->
case object_type_name(Activity) of
none -> ok;
{ok, TypeName} ->
case type_cid_for(TypeName, Cfg) of
none -> ok;
{ok, TypeCid} ->
case resolve_type_record(TypeCid, Cfg) of
{ok, TR} -> check_object_against(Activity, TR);
{error, _} -> on_unresolved_type(Cfg)
end
end
end.
stage_object_schema(Cfg) ->
fun (Activity) -> apply_object_schema(Activity, Cfg) end.
object_type_name(Activity) ->
case envelope:get_field(object, Activity) of
{ok, Obj} when is_list(Obj) ->
case envelope:get_field(type, Obj) of
{ok, T} -> {ok, T};
_ -> none
end;
_ -> none
end.
object_field_values(Activity) ->
case envelope:get_field(object, Activity) of
{ok, Obj} when is_list(Obj) ->
case envelope:get_field(field_values, Obj) of
{ok, FV} -> FV;
_ -> []
end;
_ -> []
end.
type_cid_for(TypeName, Cfg) ->
case stage_field(type_index, Cfg) of
nil -> none;
Index ->
case find_keyed(TypeName, Index) of
{ok, Cid} -> {ok, Cid};
_ -> none
end
end.
resolve_type_record(TypeCid, Cfg) ->
case stage_field(peer_types, Cfg) of
nil -> {error, no_peer_types};
_ ->
case erlang:whereis(peer_types) of
undefined -> {error, peer_types_down};
_ -> peer_types:lookup_or_fetch(TypeCid, Cfg)
end
end.
on_unresolved_type(Cfg) ->
case stage_field(strict_object_schema, Cfg) of
true -> {error, {validation_failed, object_schema}};
_ -> ok
end.
check_object_against(Activity, TR) ->
case stage_field(refinement_schema, TR) of
nil -> ok;
Schema -> apply_refinement(Schema, object_field_values(Activity))
end.
apply_refinement(Fn, FieldValues) when is_function(Fn, 1) ->
case Fn(FieldValues) of
true -> ok;
_ -> {error, {validation_failed, object_schema}}
end;
apply_refinement({required, Fields}, FieldValues) ->
case all_present(Fields, FieldValues) of
true -> ok;
false -> {error, {validation_failed, object_schema}}
end;
apply_refinement(_, _) -> ok.
all_present([], _) -> true;
all_present([F | Rest], FV) ->
case has_key(F, FV) of
true -> all_present(Rest, FV);
false -> false
end.
has_key(_, []) -> false;
has_key(K, [{K, _} | _]) -> true;
has_key(K, [_ | Rest]) -> has_key(K, Rest).
stage_field(K, [{K, V} | _]) -> V;
stage_field(K, [_ | Rest]) -> stage_field(K, Rest);
stage_field(_, []) -> nil.
find_keyed(_, []) -> {error, not_found};
find_keyed(K, [{K, V} | _]) -> {ok, V};
find_keyed(K, [_ | Rest]) -> find_keyed(K, Rest).
%% ── fed-sx triggers Step 2: post-append fan-out ─────────────────
%%
%% apply_triggers/3 — fires the durable flows bound to an activity's
%% type AFTER it has been accepted and appended (rejected activities
%% never reach here, so a flow only runs for an activity that really
%% landed). For each spec the activity's type is bound to, the spec
%% must pass its guard/actor-scope, and its {ActivityCid, TriggerCid}
%% pair must not already have fired (federation can deliver the same
%% activity twice via different peers — dedup is keyed on that pair,
%% read from the receiving actor's :triggers_fired). Surviving specs are
%% dispatched via flow_dispatch:start (a native flow_store:start), which
%% never raises.
%%
%% Returns {ok, Results} where Results is one
%% {ActivityCid, TriggerCid, {ok, FlowId} | {error, Reason}}
%% per spec actually dispatched (guard-passed, not a duplicate). The
%% kernel folds the {ActivityCid, TriggerCid} pairs into the actor's
%% :triggers_fired (dedup) and the audit triples into its projection.
%% No matching/ready registry yields {ok, []}.
%%
%% Cfg gates the fan-out on {trigger_registry, trigger_registry} (the
%% registered gen_server), mirroring the object-schema stage's
%% {peer_types, _} gate. apply_triggers must NOT be called inside a
%% `try` — flow_dispatch does gen_server:calls, and a blocking call
%% inside a try deadlocks this scheduler; the fan-out runs after append,
%% in its own step, so this is naturally satisfied.
apply_triggers(Activity, ActorState, Cfg) ->
case trigger_registry_ready(Cfg) of
false -> {ok, []};
true ->
Type = activity_type_of(Activity),
Specs = trigger_registry:lookup(Type),
ActCid = trigger_activity_cid(Activity),
Fired = field_or_default(triggers_fired, ActorState, []),
fire_each(Specs, Activity, ActorState, ActCid, Fired, Cfg, [])
end.
trigger_registry_ready(Cfg) ->
case stage_field(trigger_registry, Cfg) of
nil -> false;
_ ->
case erlang:whereis(trigger_registry) of
undefined -> false;
_ -> true
end
end.
fire_each([], _A, _AS, _ACid, _Fired, _Cfg, Acc) ->
{ok, lists:reverse(Acc)};
fire_each([Spec | Rest], A, AS, ACid, Fired, Cfg, Acc) ->
TCid = trigger_registry:spec_cid(Spec),
Pair = {ACid, TCid},
AlreadyFired = pair_member(Pair, Fired) orelse acc_member(Pair, Acc),
Pass = (not AlreadyFired) andalso flow_dispatch:guard_passes(Spec, A, AS),
case Pass of
false ->
fire_each(Rest, A, AS, ACid, Fired, Cfg, Acc);
true ->
Outcome = case flow_dispatch:start(Spec, A, AS, Cfg) of
{ok, FlowId, _Audit} -> {ok, FlowId};
{error, Reason} -> {error, Reason}
end,
fire_each(Rest, A, AS, ACid, Fired, Cfg, [{ACid, TCid, Outcome} | Acc])
end.
activity_type_of(Activity) ->
case envelope:get_field(type, Activity) of
{ok, Type} -> Type;
_ -> undefined
end.
trigger_activity_cid(Activity) ->
case envelope:get_field(id, Activity) of
{ok, Cid} -> Cid;
_ -> undefined
end.
field_or_default(Key, Proplist, Default) ->
case envelope:get_field(Key, Proplist) of
{ok, V} -> V;
_ -> Default
end.
%% pair_member/2 — {ACid, TCid} present in a [{ACid, TCid}] fired list.
pair_member(_, []) -> false;
pair_member(P, [P | _]) -> true;
pair_member(P, [_ | Rest]) -> pair_member(P, Rest).
%% acc_member/2 — {ACid, TCid} already dispatched this call (Acc holds
%% {ACid, TCid, Outcome} triples).
acc_member(_, []) -> false;
acc_member({A, T}, [{A, T, _} | _]) -> true;
acc_member(P, [_ | Rest]) -> acc_member(P, Rest).

View File

@@ -0,0 +1,180 @@
-module(trigger_registry).
-export([new/0, add/3, remove/2, lookup/2, all/1, fold/2, fold_fn/0,
mk_spec/4, spec_cid/1, spec_flow_name/1, spec_guard/1,
spec_actor_scope/1,
start_link/0, start_link/1, stop/0,
add/2, remove/1, lookup/1, all_triggers/0]).
-export([init/1, handle_call/3, handle_cast/2, handle_info/2]).
-behaviour(gen_server).
%% Trigger registry — binds activity-types to durable flows
%% (plans/agent-briefings/fed-sx-triggers-loop.md, Phase 1). When an
%% activity is appended, the kernel's post-append fan-out
%% (pipeline.erl, Phase 2) looks the activity's type up here and starts
%% each registered flow. Mirrors the peer_actors / peer_types shape: a
%% pure-functional core plus a registered gen_server, hydrated on start
%% from a fold over DefineTrigger activities.
%%
%% State shape (pure-functional):
%% [{ActivityType, [Spec, ...]}, ...]
%% Multiple triggers may bind the same activity-type; they fire
%% independently. A Spec is a 4-tuple:
%% {TriggerCid, FlowName, Guard, ActorScope}
%% TriggerCid — content-address of the DefineTrigger activity
%% (dedup + audit); `undefined` if not yet addressed.
%% FlowName — the flow_store-registered flow to start.
%% Guard — fun ((Activity, ActorState) -> bool) | undefined.
%% Lets one type bind multiple flows with
%% discriminators ("only Articles in :newsletter").
%% Resolved to a fun at registration; not carried over
%% the wire (term_codec can't encode funs).
%% ActorScope — an actor id the trigger is scoped to, or `any`.
%% ── Spec constructor / accessors ────────────────────────────────
mk_spec(TriggerCid, FlowName, Guard, ActorScope) ->
{TriggerCid, FlowName, Guard, ActorScope}.
spec_cid({Cid, _, _, _}) -> Cid.
spec_flow_name({_, FlowName, _, _}) -> FlowName.
spec_guard({_, _, Guard, _}) -> Guard.
spec_actor_scope({_, _, _, Scope}) -> Scope.
%% ── Pure-functional API ─────────────────────────────────────────
new() -> [].
%% add(ActivityType, Spec, State) — append Spec to ActivityType's list.
add(ActivityType, Spec, State) ->
Existing = lookup(ActivityType, State),
set_keyed(ActivityType, append1(Existing, Spec), State).
%% remove(TriggerCid, State) — drop every spec carrying TriggerCid,
%% across all activity-types; empties are pruned.
remove(TriggerCid, State) ->
prune([{T, drop_cid(TriggerCid, Specs)} || {T, Specs} <- State]).
%% lookup(ActivityType, State) — the specs bound to ActivityType ([] if
%% none).
lookup(ActivityType, State) ->
case find_keyed(ActivityType, State) of
{ok, Specs} -> Specs;
not_found -> []
end.
all(State) -> State.
%% ── Hydration fold ──────────────────────────────────────────────
%%
%% fold(Activity, State) — register the binding carried by a
%% DefineTrigger activity. Replaying the actor log through this fold
%% rebuilds the registry after a restart (same content-addressing
%% discipline as define_registry). A non-DefineTrigger activity passes
%% through untouched.
fold(Activity, State) ->
case envelope:get_field(type, Activity) of
{ok, define_trigger} -> fold_trigger(Activity, State);
_ -> State
end.
fold_trigger(Activity, State) ->
case envelope:get_field(object, Activity) of
{ok, Obj} ->
case binding_of(Activity, Obj) of
{ok, AType, Spec} -> add(AType, Spec, State);
not_a_binding -> State
end;
_ -> State
end.
binding_of(Activity, Obj) ->
case envelope:get_field(activity_type, Obj) of
{ok, AType} ->
case envelope:get_field(flow_name, Obj) of
{ok, FlowName} ->
Guard = field_or(guard, Obj, undefined),
Scope = field_or(actor_scope, Obj, any),
Cid = field_or(id, Activity, undefined),
{ok, AType, mk_spec(Cid, FlowName, Guard, Scope)};
_ -> not_a_binding
end;
_ -> not_a_binding
end.
%% fold_fn/0 — a 2-arity fun the projection scheduler can plant.
fold_fn() ->
fun (Activity, State) -> fold(Activity, State) end.
%% ── gen_server wrapper ──────────────────────────────────────────
start_link() ->
start_link([]).
start_link(InitialState) ->
Pid = gen_server:start_link(trigger_registry, [InitialState]),
erlang:register(trigger_registry, Pid),
Pid.
stop() ->
R = gen_server:call(trigger_registry, '$gen_stop'),
erlang:unregister(trigger_registry),
R.
add(ActivityType, Spec) ->
gen_server:call(trigger_registry, {add, ActivityType, Spec}).
remove(TriggerCid) ->
gen_server:call(trigger_registry, {remove, TriggerCid}).
lookup(ActivityType) ->
gen_server:call(trigger_registry, {lookup, ActivityType}).
all_triggers() ->
gen_server:call(trigger_registry, all_triggers).
init([InitialState]) ->
{ok, InitialState}.
handle_call({add, ActivityType, Spec}, _From, State) ->
{reply, ok, add(ActivityType, Spec, State)};
handle_call({remove, TriggerCid}, _From, State) ->
{reply, ok, remove(TriggerCid, State)};
handle_call({lookup, ActivityType}, _From, State) ->
{reply, lookup(ActivityType, State), State};
handle_call(all_triggers, _From, State) ->
{reply, State, State}.
handle_cast(_, S) -> {noreply, S}.
handle_info(_, S) -> {noreply, S}.
%% ── helpers ─────────────────────────────────────────────────────
field_or(Key, Proplist, Default) ->
case envelope:get_field(Key, Proplist) of
{ok, V} -> V;
_ -> Default
end.
drop_cid(_, []) -> [];
drop_cid(Cid, [Spec | Rest]) ->
case spec_cid(Spec) of
Cid -> drop_cid(Cid, Rest);
_ -> [Spec | drop_cid(Cid, Rest)]
end.
prune([]) -> [];
prune([{_, []} | Rest]) -> prune(Rest);
prune([P | Rest]) -> [P | prune(Rest)].
append1([], X) -> [X];
append1([H | T], X) -> [H | append1(T, X)].
find_keyed(_, []) -> not_found;
find_keyed(K, [{K, V} | _]) -> {ok, V};
find_keyed(K, [_ | Rest]) -> find_keyed(K, Rest).
set_keyed(K, V, []) -> [{K, V}];
set_keyed(K, V, [{K, _} | Rest]) -> [{K, V} | Rest];
set_keyed(K, V, [P | Rest]) -> [P | set_keyed(K, V, Rest)].

View File

@@ -79,7 +79,7 @@ cat > "$TMPFILE" <<'EPOCHS'
(eval "(get (erlang-eval-ast \"R = bootstrap:read_genesis(), {ok, S1} = bootstrap:load_genesis(R), {ok, S2} = bootstrap:load_genesis(R), cid:to_string(S1) =:= cid:to_string(S2)\") :name)")
EPOCHS
OUTPUT=$(timeout 300 "$SX_SERVER" < "$TMPFILE" 2>/dev/null)
OUTPUT=$(timeout 590 "$SX_SERVER" < "$TMPFILE" 2>/dev/null)
check() {
local epoch="$1" desc="$2" expected="$3"
@@ -106,7 +106,7 @@ check 10 "strip suffix create.sx -> create" "true"
check 11 "strip suffix hello unchanged" "true"
check 12 "strip suffix .sx -> empty" "true"
check 13 "load_genesis rejects bad shape" "ok"
check 20 "loaded activity_types count = 5" "5"
check 20 "loaded activity_types count = 8" "8"
check 21 "loaded object_types count = 13" "13"
check 22 "loaded projections count = 7" "7"
check 23 "loaded validators count = 3" "3"

View File

@@ -99,8 +99,8 @@ check() {
check 2 "gen_server loaded" "gen_server"
check 3 "registry loaded" "registry"
check 4 "bootstrap loaded" "bootstrap"
check 10 "populate returns total 36" "36"
check 20 "activity_types count = 5" "5"
check 10 "populate returns total 39" "39"
check 20 "activity_types count = 8" "8"
check 21 "object_types count = 13" "13"
check 22 "projections count = 7" "7"
check 23 "validators count = 3" "3"

View File

@@ -102,7 +102,7 @@ check 10 "sections/0 length" "7"
check 11 "ends_with_sx create.sx" "true"
check 12 "ends_with_sx hello" "false"
check 13 "ends_with_sx empty" "false"
check 20 "section activity_types count" "5"
check 20 "section activity_types count" "8"
check 21 "section object_types count" "13"
check 22 "section projections count" "7"
check 23 "section validators count" "3"
@@ -111,7 +111,7 @@ check 25 "section sig_suites count" "2"
check 26 "section audience count" "3"
check 30 "read_genesis returns 7 sections" "7"
check 31 "first section name" "activity_types"
check 32 "first section entry count" "5"
check 32 "first section entry count" "8"
TOTAL=$((PASS+FAIL))
if [ $FAIL -eq 0 ]; then

View File

@@ -54,6 +54,12 @@ cat > "$TMPFILE" <<EPOCHS
(eval "(get (erlang-load-module (file-read \"next/kernel/nx_kernel.erl\")) :name)")
(epoch 10)
(eval "(get (erlang-load-module (file-read \"next/kernel/bootstrap.erl\")) :name)")
;; outbox:publish computes a delivery set via follower_graph + delivery
;; (compute_delivery_set/3) — load both so the publish path resolves.
(epoch 11)
(eval "(get (erlang-load-module (file-read \"next/kernel/follower_graph.erl\")) :name)")
(epoch 12)
(eval "(get (erlang-load-module (file-read \"next/kernel/delivery.erl\")) :name)")
;; bootstrap:start returns a Pid
(epoch 20)
@@ -115,10 +121,10 @@ check() {
check 10 "bootstrap module loaded" "bootstrap"
check 20 "whereis(nx_kernel) is Pid" "true"
check 21 "activity_types count = 5" "5"
check 21 "activity_types count = 8" "8"
check 22 "object_types count = 13" "13"
check 23 "projections count = 7" "7"
check 24 "total entries = 36" "36"
check 24 "total entries = 39" "39"
check 25 "fresh log_tip = 0" "0"
check 26 "publish advances tip to 1" "1"
check 27 "actor_id = alice" "true"

99
next/tests/define_trigger.sh Executable file
View File

@@ -0,0 +1,99 @@
#!/usr/bin/env bash
# next/tests/define_trigger.sh — fed-sx triggers Phase 1 (verb).
#
# The DefineTrigger genesis verb
# (next/genesis/activity-types/define_trigger.sx) binds an activity-type
# to a flow. This suite confirms it parses with the expected
# DefineActivity head + :name, that its :schema accepts a well-formed
# binding and rejects malformed ones, and that a DefineTrigger envelope
# round-trips through term_codec.
set -uo pipefail
cd "$(git rev-parse --show-toplevel)"
SX_SERVER="${SX_SERVER:-hosts/ocaml/_build/default/bin/sx_server.exe}"
if [ ! -x "$SX_SERVER" ]; then
SX_SERVER="/root/rose-ash/hosts/ocaml/_build/default/bin/sx_server.exe"
fi
if [ ! -x "$SX_SERVER" ]; then
echo "ERROR: sx_server.exe not found." >&2
exit 1
fi
VERBOSE="${1:-}"
PASS=0; FAIL=0; ERRORS=""
TMPFILE=$(mktemp); trap "rm -f $TMPFILE" EXIT
SCH='(eval-expr (get (apply dict (rest (parse (file-read \"next/genesis/activity-types/define_trigger.sx\")))) :schema))'
cat > "$TMPFILE" <<EPOCHS
(epoch 1)
(load "lib/erlang/tokenizer.sx")
(load "lib/erlang/parser.sx")
(load "lib/erlang/parser-core.sx")
(load "lib/erlang/parser-expr.sx")
(load "lib/erlang/parser-module.sx")
(load "lib/erlang/transpile.sx")
(load "lib/erlang/runtime.sx")
(load "lib/erlang/vm/dispatcher.sx")
(epoch 2)
(eval "(get (erlang-load-module (file-read \"next/kernel/term_codec.erl\")) :name)")
;; ── parse / shape ──────────────────────────────────────────
(epoch 10)
(eval "(first (parse (file-read \"next/genesis/activity-types/define_trigger.sx\")))")
(epoch 11)
(eval "(get (apply dict (rest (parse (file-read \"next/genesis/activity-types/define_trigger.sx\")))) :name)")
;; ── schema accept / reject ─────────────────────────────────
;; valid binding: string :activity-type + :flow-name -> true
(epoch 20)
(eval "(define sch ${SCH}) (sch (dict :object (dict :activity-type \"Create\" :flow-name \"blog-publish-digest\")))")
;; reject: missing :activity-type -> false
(epoch 21)
(eval "(define sch ${SCH}) (sch (dict :object (dict :flow-name \"f\")))")
;; reject: missing :flow-name -> false
(epoch 22)
(eval "(define sch ${SCH}) (sch (dict :object (dict :activity-type \"Create\")))")
;; ── envelope round-trip through term_codec ─────────────────
(epoch 30)
(eval "(get (erlang-eval-ast \"A = [{type, define_trigger}, {actor, alice}, {object, [{activity_type, create}, {flow_name, blog_publish_digest}]}], {ok, D, _} = term_codec:decode(term_codec:encode(A)), D =:= A\") :name)")
EPOCHS
OUTPUT=$(timeout 180 "$SX_SERVER" < "$TMPFILE" 2>/dev/null)
check() {
local epoch="$1" desc="$2" expected="$3"
local actual
actual=$(echo "$OUTPUT" | awk -v e="$epoch" '
$0 ~ "^\\(ok-len " e " " { getline; print; exit }
$0 ~ "^\\(ok " e " " { print; exit }
$0 ~ "^\\(error " e " " { print; exit }
')
[ -z "$actual" ] && actual="<no output for epoch $epoch>"
if echo "$actual" | grep -qF -- "$expected"; then
PASS=$((PASS+1))
[ "$VERBOSE" = "-v" ] && echo " ok $desc"
else
FAIL=$((FAIL+1))
ERRORS+=" FAIL [$desc] (epoch $epoch) expected: $expected | actual: $actual
"
fi
}
check 10 "define_trigger.sx head form" "DefineActivity"
check 11 "define_trigger.sx name" "DefineTrigger"
check 20 "schema accepts valid binding" "true"
check 21 "schema rejects missing type" "false"
check 22 "schema rejects missing flow-name" "false"
check 30 "DefineTrigger envelope round-trips" "true"
TOTAL=$((PASS+FAIL))
if [ $FAIL -eq 0 ]; then
echo "ok $PASS/$TOTAL next/tests/define_trigger.sh passed"
else
echo "FAIL $PASS/$TOTAL passed, $FAIL failed:"
echo "$ERRORS"
fi
[ $FAIL -eq 0 ]

110
next/tests/define_type.sh Executable file
View File

@@ -0,0 +1,110 @@
#!/usr/bin/env bash
# next/tests/define_type.sh — host-type federation Phase 1 acceptance.
#
# The DefineType genesis verb (next/genesis/activity-types/define_type.sx)
# declares a refinement type. This suite confirms:
# - the file parses with the expected DefineActivity head + :name
# - the :schema predicate accepts a well-formed type-definition
# activity and rejects malformed ones (missing :name, non-list
# :fields)
# - a DefineType envelope round-trips through term_codec
#
# Schema bodies are SX source; we eval them with `eval-expr` and call
# the resulting lambda directly (note: `apply` does not spread into
# SX lambdas in this kernel, and keyword-getters are not callable —
# the schema uses nested `get`). 7 cases.
set -uo pipefail
cd "$(git rev-parse --show-toplevel)"
SX_SERVER="${SX_SERVER:-hosts/ocaml/_build/default/bin/sx_server.exe}"
if [ ! -x "$SX_SERVER" ]; then
SX_SERVER="/root/rose-ash/hosts/ocaml/_build/default/bin/sx_server.exe"
fi
if [ ! -x "$SX_SERVER" ]; then
echo "ERROR: sx_server.exe not found." >&2
exit 1
fi
VERBOSE="${1:-}"
PASS=0; FAIL=0; ERRORS=""
TMPFILE=$(mktemp); trap "rm -f $TMPFILE" EXIT
# The schema fn, evaluated from the genesis file into a lambda.
SCH='(eval-expr (get (apply dict (rest (parse (file-read \"next/genesis/activity-types/define_type.sx\")))) :schema))'
cat > "$TMPFILE" <<EPOCHS
(epoch 1)
(load "lib/erlang/tokenizer.sx")
(load "lib/erlang/parser.sx")
(load "lib/erlang/parser-core.sx")
(load "lib/erlang/parser-expr.sx")
(load "lib/erlang/parser-module.sx")
(load "lib/erlang/transpile.sx")
(load "lib/erlang/runtime.sx")
(load "lib/erlang/vm/dispatcher.sx")
(epoch 2)
(eval "(get (erlang-load-module (file-read \"next/kernel/term_codec.erl\")) :name)")
;; ── parse / shape ──────────────────────────────────────────
(epoch 10)
(eval "(first (parse (file-read \"next/genesis/activity-types/define_type.sx\")))")
(epoch 11)
(eval "(get (apply dict (rest (parse (file-read \"next/genesis/activity-types/define_type.sx\")))) :name)")
;; ── schema accept / reject ─────────────────────────────────
;; valid: :object with string :name and list :fields -> true
(epoch 20)
(eval "(define sch ${SCH}) (sch (dict :object (dict :name \"Post\" :fields (list))))")
;; valid: :fields omitted (optional) -> true
(epoch 21)
(eval "(define sch ${SCH}) (sch (dict :object (dict :name \"Post\")))")
;; reject: missing :name -> false
(epoch 22)
(eval "(define sch ${SCH}) (sch (dict :object (dict :fields (list))))")
;; reject: :fields present but not a list -> false
(epoch 23)
(eval "(define sch ${SCH}) (sch (dict :object (dict :name \"Post\" :fields \"notalist\")))")
;; ── envelope round-trip through term_codec ─────────────────
(epoch 30)
(eval "(get (erlang-eval-ast \"A = [{type, define_type}, {actor, alice}, {object, [{name, <<80,111,115,116>>}, {instance_type, <<78,111,116,101>>}]}], {ok, D, _} = term_codec:decode(term_codec:encode(A)), D =:= A\") :name)")
EPOCHS
OUTPUT=$(timeout 180 "$SX_SERVER" < "$TMPFILE" 2>/dev/null)
check() {
local epoch="$1" desc="$2" expected="$3"
local actual
actual=$(echo "$OUTPUT" | awk -v e="$epoch" '
$0 ~ "^\\(ok-len " e " " { getline; print; exit }
$0 ~ "^\\(ok " e " " { print; exit }
$0 ~ "^\\(error " e " " { print; exit }
')
[ -z "$actual" ] && actual="<no output for epoch $epoch>"
if echo "$actual" | grep -qF -- "$expected"; then
PASS=$((PASS+1))
[ "$VERBOSE" = "-v" ] && echo " ok $desc"
else
FAIL=$((FAIL+1))
ERRORS+=" FAIL [$desc] (epoch $epoch) expected: $expected | actual: $actual
"
fi
}
check 10 "define_type.sx head form" "DefineActivity"
check 11 "define_type.sx name" "DefineType"
check 20 "schema accepts valid type def" "true"
check 21 "schema accepts omitted :fields" "true"
check 22 "schema rejects missing :name" "false"
check 23 "schema rejects non-list :fields" "false"
check 30 "DefineType envelope round-trips" "true"
TOTAL=$((PASS+FAIL))
if [ $FAIL -eq 0 ]; then
echo "ok $PASS/$TOTAL next/tests/define_type.sh passed"
else
echo "FAIL $PASS/$TOTAL passed, $FAIL failed:"
echo "$ERRORS"
fi
[ $FAIL -eq 0 ]

View File

@@ -0,0 +1,131 @@
#!/usr/bin/env bash
# next/tests/delivery_retry_timer.sh — m2 Step 8b-timer.
#
# Live timer wiring on the delivery_worker gen_server. The pure
# bookkeeping is covered by delivery_retry.sh — this suite proves the
# erlang:send_after / cancel_timer wiring fires retries from the
# scheduler's logical clock without anyone calling drain by hand.
#
# Substrate dependency: erlang:send_after/3 + cancel_timer/1 +
# monotonic_time/0,1 — landed via cherry-pick from loops/erlang
# (commits 3709460d / 98b0104c / 779e53b2 on this branch).
#
# Test discipline: every test cancels its leftover timer before
# returning. If we don't, the scheduler keeps the run loop alive
# advancing time through the full backoff chain (30s → 5m → 30m →
# 6h → 24h), and each tick costs ~10s of wall time inside the
# Erlang-on-SX VM. Canceling the trailing timer is the difference
# between a 25s test and a 60s+ test.
set -uo pipefail
cd "$(git rev-parse --show-toplevel)"
SX_SERVER="${SX_SERVER:-hosts/ocaml/_build/default/bin/sx_server.exe}"
if [ ! -x "$SX_SERVER" ]; then
SX_SERVER="/root/rose-ash/hosts/ocaml/_build/default/bin/sx_server.exe"
fi
if [ ! -x "$SX_SERVER" ]; then
echo "ERROR: sx_server.exe not found." >&2
exit 1
fi
VERBOSE="${1:-}"
PASS=0; FAIL=0; ERRORS=""
TMPFILE=$(mktemp); trap "rm -f $TMPFILE" EXIT
# A canned activity with cid <<1,2,3>>.
SETUP='Act = [{id, <<1,2,3>>}, {type, note}, {actor, alice}], FailFn = fun(_) -> {error, transient} end,'
# Convenience: cancel any leftover timer for cid <<1,2,3>> on Peer.
# Prevents the scheduler from grinding through 30s/5m/30m/6h/24h of
# retries between epochs.
CANCEL='CancelLeftover = fun(Peer) -> SS = delivery_worker:state_srv(Peer), case delivery_worker:timer_ref_for(<<1,2,3>>, SS) of undefined -> ok; LRef -> erlang:cancel_timer(LRef), ok end end,'
cat > "$TMPFILE" <<EPOCHS
(epoch 1)
(load "lib/erlang/tokenizer.sx")
(load "lib/erlang/parser.sx")
(load "lib/erlang/parser-core.sx")
(load "lib/erlang/parser-expr.sx")
(load "lib/erlang/parser-module.sx")
(load "lib/erlang/transpile.sx")
(load "lib/erlang/runtime.sx")
(load "lib/erlang/vm/dispatcher.sx")
(epoch 2)
(eval "(er-load-gen-server!)")
(eval "(get (erlang-load-module (file-read \"next/kernel/envelope.erl\")) :name)")
(eval "(get (erlang-load-module (file-read \"next/kernel/delivery_worker.erl\")) :name)")
;; T1 — a failing flush schedules a retry timer. timer_ref_for
;; returns a live Ref (not undefined). Then cancel before
;; returning so the scheduler doesn't grind the full backoff
;; chain trying to retry.
(epoch 10)
(eval "(get (erlang-eval-ast \"${SETUP}${CANCEL} delivery_worker:start_link(bob, FailFn), delivery_worker:enqueue(bob, Act), {ok, [], [<<1,2,3>>]} = delivery_worker:flush(bob), S = delivery_worker:state_srv(bob), Ref = delivery_worker:timer_ref_for(<<1,2,3>>, S), Result = is_reference(Ref), CancelLeftover(bob), Result\") :name)")
;; T2 — initial flush bumps the attempt counter to 1; next_retry_at
;; gets set; cancel the timer before returning.
(epoch 11)
(eval "(get (erlang-eval-ast \"${SETUP}${CANCEL} delivery_worker:start_link(bob, FailFn), delivery_worker:enqueue(bob, Act), delivery_worker:flush(bob), S = delivery_worker:state_srv(bob), Result = delivery_worker:attempts_for(<<1,2,3>>, S) =:= 1, CancelLeftover(bob), Result\") :name)")
;; T3 — advancing the logical clock past the 30s backoff fires the
;; timer; handle_info({retry, Cid}) bumps attempts to 2 and arms
;; the next slot (backoff(2)=300s). Then cancel the new timer.
(epoch 12)
(eval "(get (erlang-eval-ast \"${SETUP}${CANCEL} delivery_worker:start_link(bob, FailFn), delivery_worker:enqueue(bob, Act), delivery_worker:flush(bob), receive after 31000 -> ok end, S = delivery_worker:state_srv(bob), Result = delivery_worker:attempts_for(<<1,2,3>>, S) =:= 2, CancelLeftover(bob), Result\") :name)")
;; T4 — after the retry fires the worker has armed a fresh timer
;; for the next backoff slot. Confirm it's a live ref, then
;; cancel it.
(epoch 13)
(eval "(get (erlang-eval-ast \"${SETUP}${CANCEL} delivery_worker:start_link(bob, FailFn), delivery_worker:enqueue(bob, Act), delivery_worker:flush(bob), receive after 31000 -> ok end, S = delivery_worker:state_srv(bob), Result = is_reference(delivery_worker:timer_ref_for(<<1,2,3>>, S)), CancelLeftover(bob), Result\") :name)")
;; T5 — successful retry path. Dispatch fails twice then succeeds
;; (ets-backed counter). After two backoff slots elapse
;; (30s, then 300s), the third attempt succeeds and
;; record_success_pure clears the per-cid bookkeeping. No new
;; timer is scheduled, so the scheduler terminates naturally.
(epoch 14)
(eval "(get (erlang-eval-ast \"${SETUP} ets:new(rt_ctr, [named_table, public]), ets:insert(rt_ctr, {n, 0}), Mixed = fun(_) -> [{n, N}] = ets:lookup(rt_ctr, n), ets:insert(rt_ctr, {n, N+1}), case N < 2 of true -> {error, transient}; false -> ok end end, delivery_worker:start_link(carol, Mixed), delivery_worker:enqueue(carol, Act), delivery_worker:flush(carol), receive after 31000 -> ok end, receive after 301000 -> ok end, S = delivery_worker:state_srv(carol), delivery_worker:pending(S) =:= [] andalso delivery_worker:attempts_for(<<1,2,3>>, S) =:= 0 andalso delivery_worker:timer_ref_for(<<1,2,3>>, S) =:= undefined\") :name)")
EPOCHS
OUTPUT=$(timeout 900 "$SX_SERVER" < "$TMPFILE" 2>/dev/null)
check() {
local epoch="$1" desc="$2" expected="$3"
local actual
actual=$(echo "$OUTPUT" | awk -v e="$epoch" '
$0 ~ "^\\(ok-len " e " " { getline; print; exit }
$0 ~ "^\\(ok " e " " { print; exit }
$0 ~ "^\\(error " e " " { print; exit }
')
[ -z "$actual" ] && actual="<no output for epoch $epoch>"
if echo "$actual" | grep -qF -- "$expected"; then
PASS=$((PASS+1))
[ "$VERBOSE" = "-v" ] && echo " ok $desc"
else
FAIL=$((FAIL+1))
ERRORS+=" FAIL [$desc] (epoch $epoch) expected: $expected | actual: $actual
"
fi
}
check 10 "T1 flush schedules a timer" "true"
check 11 "T2 initial flush bumps attempts to 1" "true"
check 12 "T3 timer fires; attempts=2" "true"
check 13 "T4 retry rearms next timer" "true"
check 14 "T5 success clears retry state" "true"
TOTAL=$((PASS+FAIL))
if [ $FAIL -eq 0 ]; then
echo "ok $PASS/$TOTAL next/tests/delivery_retry_timer.sh passed"
else
echo "FAIL $PASS/$TOTAL passed, $FAIL failed:"
echo "$ERRORS"
if [ "$VERBOSE" = "-v" ]; then
echo "--- sx_server output ---"
echo "$OUTPUT" | tail -40
echo "---"
fi
fi
[ $FAIL -eq 0 ]

View File

@@ -0,0 +1,176 @@
#!/usr/bin/env bash
# next/tests/discovery_type_fetch.sh — host-type federation Phase 3.
#
# Client side of the type-doc wire: discovery_type_fetch builds the
# fun/2 closure peer_types:lookup_or_fetch calls on a cache miss. It
# GETs <base>/types/<cid> with the type-doc Accept header and returns
# the RAW response bytes (peer_types decodes them via term_codec).
# Exercised end-to-end against a background python http server that
# serves hand-crafted term_codec bytes, so we test the wire — not just
# an in-process call.
set -uo pipefail
cd "$(git rev-parse --show-toplevel)"
SX_SERVER="${SX_SERVER:-hosts/ocaml/_build/default/bin/sx_server.exe}"
if [ ! -x "$SX_SERVER" ]; then
SX_SERVER="/root/rose-ash/hosts/ocaml/_build/default/bin/sx_server.exe"
fi
if [ ! -x "$SX_SERVER" ]; then
echo "ERROR: sx_server.exe not found." >&2
exit 1
fi
VERBOSE="${1:-}"
PASS=0; FAIL=0; ERRORS=""
# ── live stub server ─────────────────────────────────────────
# GET /types/bafy1 -> 200 with term_codec-encoded TypeRecord
# TR = [{name, <<"Post">>}, {instance_type, <<"Note">>}]
# GET anything else -> 404
PORT=$(python3 -c 'import socket;s=socket.socket();s.bind(("127.0.0.1",0));print(s.getsockname()[1]);s.close()')
SRVROOT=$(mktemp -d)
PYSRV="$SRVROOT/srv.py"
cat > "$PYSRV" <<'PY'
import sys, http.server, socketserver
PORT = int(sys.argv[1])
# term_codec encoding (mirror of next/kernel/term_codec.erl).
def enc_atom(s):
b = s.encode()
return f"a{len(b)}:".encode() + b
def enc_bin(b):
return f"b{len(b)}:".encode() + b
def enc_tuple(items):
return f"t{len(items)}:".encode() + b"".join(items)
def enc_list(items):
return f"l{len(items)}:".encode() + b"".join(items)
# [{name, <<"Post">>}, {instance_type, <<"Note">>}]
TYPEDOC = enc_list([
enc_tuple([enc_atom("name"), enc_bin(b"Post")]),
enc_tuple([enc_atom("instance_type"), enc_bin(b"Note")]),
])
class H(http.server.BaseHTTPRequestHandler):
def do_GET(self):
if self.path == "/types/bafy1":
self.send_response(200)
self.send_header('content-type','application/vnd.fed-sx.type-doc')
self.send_header('content-length', str(len(TYPEDOC)))
self.end_headers()
self.wfile.write(TYPEDOC)
else:
self.send_response(404); self.end_headers(); self.wfile.write(b'not found')
def log_message(self, fmt, *args): pass
with socketserver.TCPServer(("127.0.0.1", PORT), H) as srv:
srv.serve_forever()
PY
python3 "$PYSRV" "$PORT" >/dev/null 2>&1 &
SRV_PID=$!
TMPFILE=$(mktemp)
trap "rm -rf $SRVROOT $TMPFILE; kill $SRV_PID 2>/dev/null || true" EXIT
for _ in 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15; do
if curl -fsS "http://127.0.0.1:$PORT/types/bafy1" >/dev/null 2>&1; then break; fi
sleep 0.2
done
bytes_of() { python3 -c "import sys; print(','.join(str(b) for b in sys.argv[1].encode()))" "$1"; }
URL_BASE_BYTES=$(bytes_of "http://127.0.0.1:$PORT")
cat > "$TMPFILE" <<'EPOCHS'
(epoch 1)
(load "lib/erlang/tokenizer.sx")
(load "lib/erlang/parser.sx")
(load "lib/erlang/parser-core.sx")
(load "lib/erlang/parser-expr.sx")
(load "lib/erlang/parser-module.sx")
(load "lib/erlang/transpile.sx")
(load "lib/erlang/runtime.sx")
(load "lib/erlang/vm/dispatcher.sx")
(epoch 2)
(eval "(er-load-gen-server!)")
(epoch 3)
(eval "(get (erlang-load-module (file-read \"next/kernel/term_codec.erl\")) :name)")
(epoch 4)
(eval "(get (erlang-load-module (file-read \"next/kernel/peer_types.erl\")) :name)")
(epoch 5)
(eval "(get (erlang-load-module (file-read \"next/kernel/discovery_type_fetch.erl\")) :name)")
;; accept_header is the 31-byte type-doc MIME
(epoch 10)
(eval "(get (erlang-eval-ast \"byte_size(discovery_type_fetch:accept_header()) =:= 31\") :name)")
;; type_doc_url builds <base>/types/bafy1
(epoch 11)
(eval "(get (erlang-eval-ast \"U = discovery_type_fetch:type_doc_url(<<__URL_BASE__>>, <<98,97,102,121,49>>), U =:= <<__URL_BASE__,47,116,121,112,101,115,47,98,97,102,121,49>>\") :name)")
;; resolve_type_url via the static type_url proplist
(epoch 12)
(eval "(get (erlang-eval-ast \"discovery_type_fetch:resolve_type_url(<<98,97,102,121,49>>, [{type_url, [{<<98,97,102,121,49>>, <<__URL_BASE__>>}]}]) =:= {ok, <<__URL_BASE__>>}\") :name)")
;; fetch live -> {ok, Bytes} that decode to the TypeRecord
(epoch 13)
(eval "(get (erlang-eval-ast \"R = discovery_type_fetch:fetch(<<__URL_BASE__,47,116,121,112,101,115,47,98,97,102,121,49>>, []), case R of {ok, B} -> {ok, TR, _} = term_codec:decode(B), TR =:= [{name, <<80,111,115,116>>}, {instance_type, <<78,111,116,101>>}]; _ -> false end\") :name)")
;; closure from make_fetch_fn/0 dispatches and returns raw bytes
(epoch 14)
(eval "(get (erlang-eval-ast \"Fn = discovery_type_fetch:make_fetch_fn(), Cfg = [{type_url, [{<<98,97,102,121,49>>, <<__URL_BASE__>>}]}], case Fn(<<98,97,102,121,49>>, Cfg) of {ok, B} -> {ok, TR, _} = term_codec:decode(B), TR =:= [{name, <<80,111,115,116>>}, {instance_type, <<78,111,116,101>>}]; _ -> false end\") :name)")
;; closure with no resolver -> {error, no_type_url}
(epoch 15)
(eval "(get (erlang-eval-ast \"Fn = discovery_type_fetch:make_fetch_fn(), case Fn(<<98,97,102,121,49>>, []) of {error, no_type_url} -> true; _ -> false end\") :name)")
;; fetch on an unknown cid path -> {error, {status, 404}}
(epoch 16)
(eval "(get (erlang-eval-ast \"R = discovery_type_fetch:fetch(<<__URL_BASE__,47,116,121,112,101,115,47,122,122,122>>, []), case R of {error, {status, 404}} -> true; _ -> false end\") :name)")
;; end-to-end: peer_types:lookup_or_fetch uses the closure, decodes,
;; and writes the TypeRecord into the cache
(epoch 17)
(eval "(get (erlang-eval-ast \"Fn = discovery_type_fetch:make_fetch_fn(), Cfg = [{type_fetch_fn, Fn}, {type_url, [{<<98,97,102,121,49>>, <<__URL_BASE__>>}]}], case peer_types:lookup_or_fetch(<<98,97,102,121,49>>, Cfg, peer_types:new()) of {ok, TR, S} -> TR =:= [{name, <<80,111,115,116>>}, {instance_type, <<78,111,116,101>>}] andalso peer_types:types(S) =:= [<<98,97,102,121,49>>]; _ -> false end\") :name)")
EPOCHS
sed -i "s|__URL_BASE__|${URL_BASE_BYTES}|g" "$TMPFILE"
OUTPUT=$(timeout 300 "$SX_SERVER" < "$TMPFILE" 2>/dev/null)
check() {
local epoch="$1" desc="$2" expected="$3"
local actual
actual=$(echo "$OUTPUT" | awk -v e="$epoch" '
$0 ~ "^\\(ok-len " e " " { getline; print; exit }
$0 ~ "^\\(ok " e " " { print; exit }
$0 ~ "^\\(error " e " " { print; exit }
')
[ -z "$actual" ] && actual="<no output for epoch $epoch>"
if echo "$actual" | grep -qF -- "$expected"; then
PASS=$((PASS+1))
[ "$VERBOSE" = "-v" ] && echo " ok $desc"
else
FAIL=$((FAIL+1))
ERRORS+=" FAIL [$desc] (epoch $epoch) expected: $expected | actual: $actual
"
fi
}
check 5 "discovery_type_fetch loaded" "discovery_type_fetch"
check 10 "accept_header is 31-byte type-doc" "true"
check 11 "type_doc_url builds /types/<cid>" "true"
check 12 "resolve_type_url via type_url map" "true"
check 13 "fetch live -> raw bytes decode to TR" "true"
check 14 "closure -> raw bytes decode to TR" "true"
check 15 "closure no resolver -> no_type_url" "true"
check 16 "fetch 404 path -> {status, 404}" "true"
check 17 "lookup_or_fetch caches fetched TR" "true"
TOTAL=$((PASS+FAIL))
if [ $FAIL -eq 0 ]; then
echo "ok $PASS/$TOTAL next/tests/discovery_type_fetch.sh passed"
else
echo "FAIL $PASS/$TOTAL passed, $FAIL failed:"
echo "$ERRORS"
fi
[ $FAIL -eq 0 ]

130
next/tests/flow_dispatch.sh Executable file
View File

@@ -0,0 +1,130 @@
#!/usr/bin/env bash
# next/tests/flow_dispatch.sh — fed-sx triggers Phase 3.
#
# flow_dispatch bridges a matched trigger to a started flow — a native
# flow_store:start (the engine is Erlang-on-SX too, no FFI). Confirms
# guard/actor-scope gating, the audit triple, synchronous first-step
# execution, suspend/resume of a started instance, a branch on an
# activity field, and graceful handling of an unknown flow name.
set -uo pipefail
cd "$(git rev-parse --show-toplevel)"
SX_SERVER="${SX_SERVER:-hosts/ocaml/_build/default/bin/sx_server.exe}"
if [ ! -x "$SX_SERVER" ]; then
SX_SERVER="/root/rose-ash/hosts/ocaml/_build/default/bin/sx_server.exe"
fi
if [ ! -x "$SX_SERVER" ]; then
echo "ERROR: sx_server.exe not found." >&2
exit 1
fi
VERBOSE="${1:-}"
PASS=0; FAIL=0; ERRORS=""
TMPFILE=$(mktemp); trap "rm -f $TMPFILE" EXIT
# Activity (Create of a Note by alice), receiving actor-state, and a
# couple of flows: `capture` echoes the activity's type out of the
# flow's input env; `wait_flow` suspends then wraps the resumed value;
# `cat_flow` branches on the inner object's :type.
ACT='[{type, create}, {actor, alice}, {id, <<97,99,105,100>>}, {object, [{type, note}]}]'
AS='[{actor_id, alice}]'
CAP='flow_spec:flow_node(fun(In) -> {ok, A} = envelope:get_field(activity, In), {ok, T} = envelope:get_field(type, A), T end)'
WAITF='flow_spec:sequence([flow:suspend(w), flow_spec:flow_node(fun(V) -> {got, V} end)])'
CATF='flow_spec:branch(fun(In) -> {ok, A} = envelope:get_field(activity, In), {ok, O} = envelope:get_field(object, A), envelope:get_field(type, O) =:= {ok, note} end, flow_spec:flow_const(is_note), flow_spec:flow_const(not_note))'
cat > "$TMPFILE" <<EPOCHS
(epoch 1)
(load "lib/erlang/tokenizer.sx")
(load "lib/erlang/parser.sx")
(load "lib/erlang/parser-core.sx")
(load "lib/erlang/parser-expr.sx")
(load "lib/erlang/parser-module.sx")
(load "lib/erlang/transpile.sx")
(load "lib/erlang/runtime.sx")
(load "lib/erlang/vm/dispatcher.sx")
(epoch 2)
(eval "(er-load-gen-server!)")
(eval "(get (erlang-load-module (file-read \"next/kernel/envelope.erl\")) :name)")
(eval "(get (erlang-load-module (file-read \"next/flow/flow.erl\")) :name)")
(eval "(get (erlang-load-module (file-read \"next/flow/flow_spec.erl\")) :name)")
(eval "(get (erlang-load-module (file-read \"next/flow/flow_store.erl\")) :name)")
(eval "(get (erlang-load-module (file-read \"next/kernel/trigger_registry.erl\")) :name)")
(epoch 3)
(eval "(get (erlang-load-module (file-read \"next/kernel/flow_dispatch.erl\")) :name)")
;; ── guard / actor-scope gating ─────────────────────────────
(epoch 10)
(eval "(get (erlang-eval-ast \"flow_dispatch:guard_passes(trigger_registry:mk_spec(c, f, undefined, any), ${ACT}, ${AS})\") :name)")
(epoch 11)
(eval "(get (erlang-eval-ast \"flow_dispatch:guard_passes(trigger_registry:mk_spec(c, f, fun(_, _) -> false end, any), ${ACT}, ${AS}) =:= false\") :name)")
(epoch 12)
(eval "(get (erlang-eval-ast \"flow_dispatch:guard_passes(trigger_registry:mk_spec(c, f, fun(A, _) -> envelope:get_field(actor, A) =:= {ok, alice} end, any), ${ACT}, ${AS})\") :name)")
(epoch 13)
(eval "(get (erlang-eval-ast \"flow_dispatch:guard_passes(trigger_registry:mk_spec(c, f, undefined, alice), ${ACT}, ${AS})\") :name)")
(epoch 14)
(eval "(get (erlang-eval-ast \"flow_dispatch:guard_passes(trigger_registry:mk_spec(c, f, undefined, bob), ${ACT}, ${AS}) =:= false\") :name)")
;; ── start: audit triple + synchronous first step ───────────
(epoch 20)
(eval "(get (erlang-eval-ast \"flow_store:start_link(), flow_store:register_flow(capture, ${CAP}), flow_dispatch:start(trigger_registry:mk_spec(<<116,99>>, capture, undefined, any), ${ACT}, ${AS}, []) =:= {ok, 1, {<<97,99,105,100>>, <<116,99>>, 1}}\") :name)")
(epoch 21)
(eval "(get (erlang-eval-ast \"flow_store:start_link(), flow_store:register_flow(capture, ${CAP}), {ok, FlowId, _} = flow_dispatch:start(trigger_registry:mk_spec(<<116,99>>, capture, undefined, any), ${ACT}, ${AS}, []), flow_store:status(FlowId) =:= {ok, {done, create}}\") :name)")
;; ── unknown flow name -> {error, no_such_flow}, no crash ────
(epoch 30)
(eval "(get (erlang-eval-ast \"flow_store:start_link(), flow_dispatch:start(trigger_registry:mk_spec(<<116,99>>, ghostflow, undefined, any), ${ACT}, ${AS}, []) =:= {error, no_such_flow}\") :name)")
;; ── started instance suspends; resume completes ────────────
(epoch 40)
(eval "(get (erlang-eval-ast \"flow_store:start_link(), flow_store:register_flow(wait_flow, ${WAITF}), {ok, FlowId, _} = flow_dispatch:start(trigger_registry:mk_spec(<<116,99>>, wait_flow, undefined, any), ${ACT}, ${AS}, []), S1 = flow_store:status(FlowId), R = flow_store:resume(FlowId, 7), S1 =:= {ok, {suspended, w}} andalso R =:= {ok, {flow_done, {got, 7}}}\") :name)")
;; ── branch on an activity field (both branches) ────────────
(epoch 50)
(eval "(get (erlang-eval-ast \"flow_store:start_link(), flow_store:register_flow(cat_flow, ${CATF}), {ok, FlowId, _} = flow_dispatch:start(trigger_registry:mk_spec(<<116,99>>, cat_flow, undefined, any), ${ACT}, ${AS}, []), flow_store:status(FlowId) =:= {ok, {done, is_note}}\") :name)")
(epoch 51)
(eval "(get (erlang-eval-ast \"flow_store:start_link(), flow_store:register_flow(cat_flow, ${CATF}), {ok, FlowId, _} = flow_dispatch:start(trigger_registry:mk_spec(<<116,99>>, cat_flow, undefined, any), [{type, create}, {actor, alice}, {id, <<120>>}, {object, [{type, article}]}], ${AS}, []), flow_store:status(FlowId) =:= {ok, {done, not_note}}\") :name)")
EPOCHS
OUTPUT=$(timeout 360 "$SX_SERVER" < "$TMPFILE" 2>/dev/null)
check() {
local epoch="$1" desc="$2" expected="$3"
local actual
actual=$(echo "$OUTPUT" | awk -v e="$epoch" '
$0 ~ "^\\(ok-len " e " " { getline; print; exit }
$0 ~ "^\\(ok " e " " { print; exit }
$0 ~ "^\\(error " e " " { print; exit }
')
[ -z "$actual" ] && actual="<no output for epoch $epoch>"
if echo "$actual" | grep -qF -- "$expected"; then
PASS=$((PASS+1))
[ "$VERBOSE" = "-v" ] && echo " ok $desc"
else
FAIL=$((FAIL+1))
ERRORS+=" FAIL [$desc] (epoch $epoch) expected: $expected | actual: $actual
"
fi
}
check 3 "flow_dispatch module loaded" "flow_dispatch"
check 10 "undefined guard + any scope pass" "true"
check 11 "guard false -> no pass" "true"
check 12 "guard true on activity field" "true"
check 13 "actor-scope match passes" "true"
check 14 "actor-scope mismatch fails" "true"
check 20 "start returns audit triple" "true"
check 21 "first step runs synchronously" "true"
check 30 "unknown flow -> no_such_flow" "true"
check 40 "started flow suspends + resumes" "true"
check 50 "branch then-arm (is_note)" "true"
check 51 "branch else-arm (not_note)" "true"
TOTAL=$((PASS+FAIL))
if [ $FAIL -eq 0 ]; then
echo "ok $PASS/$TOTAL next/tests/flow_dispatch.sh passed"
else
echo "FAIL $PASS/$TOTAL passed, $FAIL failed:"
echo "$ERRORS"
fi
[ $FAIL -eq 0 ]

View File

@@ -48,6 +48,18 @@ cat > "$TMPFILE" <<'EPOCHS'
(eval "(first (parse (file-read \"next/genesis/activity-types/endorse.sx\")))")
(epoch 200)
(eval "(get (apply dict (rest (parse (file-read \"next/genesis/activity-types/endorse.sx\")))) :name)")
(epoch 201)
(eval "(first (parse (file-read \"next/genesis/activity-types/define_type.sx\")))")
(epoch 202)
(eval "(get (apply dict (rest (parse (file-read \"next/genesis/activity-types/define_type.sx\")))) :name)")
(epoch 203)
(eval "(first (parse (file-read \"next/genesis/activity-types/subtype_of.sx\")))")
(epoch 204)
(eval "(get (apply dict (rest (parse (file-read \"next/genesis/activity-types/subtype_of.sx\")))) :name)")
(epoch 205)
(eval "(first (parse (file-read \"next/genesis/activity-types/define_trigger.sx\")))")
(epoch 206)
(eval "(get (apply dict (rest (parse (file-read \"next/genesis/activity-types/define_trigger.sx\")))) :name)")
(epoch 19)
(eval "(len (get (apply dict (rest (parse (file-read \"next/genesis/manifest.sx\")))) :activity-types))")
(epoch 30)
@@ -180,7 +192,13 @@ check 27 "announce.sx head form" "DefineActivity"
check 28 "announce.sx name is Announce" "Announce"
check 29 "endorse.sx head form" "DefineActivity"
check 200 "endorse.sx name is Endorse" "Endorse"
check 19 "manifest has 5 activity-types" "5"
check 201 "define_type.sx head form" "DefineActivity"
check 202 "define_type.sx name" "DefineType"
check 203 "subtype_of.sx head form" "DefineActivity"
check 204 "subtype_of.sx name" "SubtypeOf"
check 205 "define_trigger.sx head form" "DefineActivity"
check 206 "define_trigger.sx name" "DefineTrigger"
check 19 "manifest has 8 activity-types" "8"
check 30 "sx-artifact.sx head form" "DefineObject"
check 31 "sx-artifact.sx name" "SXArtifact"
check 32 "note.sx name" "Note"

154
next/tests/object_schema.sh Executable file
View File

@@ -0,0 +1,154 @@
#!/usr/bin/env bash
# next/tests/object_schema.sh — host-type federation Phase 4.
#
# pipeline:apply_object_schema/2 validates an inbound activity's inner
# object against its declared refinement type. The type is resolved
# TypeName -> TypeCid (Cfg type_index) -> TypeRecord
# (peer_types:lookup_or_fetch, a local hit or a wire fetch), then the
# record's refinement schema is applied to the object's :field_values.
# Default strict_object_schema = false: an unresolvable type is let
# through; opt-in strict rejects.
#
# Refinement schemas are either a 1-arity Erlang predicate (the
# substrate stand-in, locally stored) or a term_codec-safe
# {required, [Field,...]} constraint (so a wire-fetched record still
# validates). Both are exercised here.
set -uo pipefail
cd "$(git rev-parse --show-toplevel)"
SX_SERVER="${SX_SERVER:-hosts/ocaml/_build/default/bin/sx_server.exe}"
if [ ! -x "$SX_SERVER" ]; then
SX_SERVER="/root/rose-ash/hosts/ocaml/_build/default/bin/sx_server.exe"
fi
if [ ! -x "$SX_SERVER" ]; then
echo "ERROR: sx_server.exe not found." >&2
exit 1
fi
VERBOSE="${1:-}"
PASS=0; FAIL=0; ERRORS=""
TMPFILE=$(mktemp); trap "rm -f $TMPFILE" EXIT
# Cid is the Post type's CID; TRdata carries a data-form refinement
# (object must have a `title` field), TRfun the Erlang-predicate form.
# ActValid's object has :title, ActFail's doesn't, ActNoType's object
# declares no type, ActUnknown's type isn't in the index. PostName is
# <<"Post">>, title "Hi" = <<72,105>>. Index maps name -> Cid.
SETUP='Cid = <<98,97,102,121,80>>, PostName = <<80,111,115,116>>, TRdata = [{name, PostName}, {refinement_schema, {required, [title]}}], TRfun = [{name, PostName}, {refinement_schema, fun(FV) -> case FV of [{title, _} | _] -> true; _ -> false end end}], ObjValid = [{type, PostName}, {field_values, [{title, <<72,105>>}, {body, <<104,105>>}]}], ObjFail = [{type, PostName}, {field_values, [{body, <<104,105>>}]}], ActValid = [{type, create}, {actor, alice}, {object, ObjValid}], ActFail = [{type, create}, {actor, alice}, {object, ObjFail}], ActNoType = [{type, create}, {actor, alice}, {object, [{field_values, [{title, <<72,105>>}]}]}], ActUnknown = [{type, create}, {actor, alice}, {object, [{type, <<82,101,112,108,121>>}, {field_values, [{title, <<72,105>>}]}]}], Index = [{PostName, Cid}], FAIL = {error, {validation_failed, object_schema}},'
cat > "$TMPFILE" <<EPOCHS
(epoch 1)
(load "lib/erlang/tokenizer.sx")
(load "lib/erlang/parser.sx")
(load "lib/erlang/parser-core.sx")
(load "lib/erlang/parser-expr.sx")
(load "lib/erlang/parser-module.sx")
(load "lib/erlang/transpile.sx")
(load "lib/erlang/runtime.sx")
(load "lib/erlang/vm/dispatcher.sx")
(epoch 2)
(eval "(er-load-gen-server!)")
(epoch 3)
(eval "(get (erlang-load-module (file-read \"next/kernel/envelope.erl\")) :name)")
(epoch 4)
(eval "(get (erlang-load-module (file-read \"next/kernel/term_codec.erl\")) :name)")
(epoch 5)
(eval "(get (erlang-load-module (file-read \"next/kernel/peer_types.erl\")) :name)")
(epoch 6)
(eval "(get (erlang-load-module (file-read \"next/kernel/pipeline.erl\")) :name)")
;; local registry match + valid object -> accepted
(epoch 10)
(eval "(get (erlang-eval-ast \"${SETUP} peer_types:start_link(), peer_types:put(Cid, TRdata), Cfg = [{peer_types, peer_types}, {type_index, Index}], pipeline:apply_object_schema(ActValid, Cfg) =:= ok\") :name)")
;; local match + refinement-failing object -> rejected
(epoch 11)
(eval "(get (erlang-eval-ast \"${SETUP} peer_types:start_link(), peer_types:put(Cid, TRdata), Cfg = [{peer_types, peer_types}, {type_index, Index}], pipeline:apply_object_schema(ActFail, Cfg) =:= FAIL\") :name)")
;; type not cached, fetch succeeds -> validates against fetched record
(epoch 12)
(eval "(get (erlang-eval-ast \"${SETUP} peer_types:start_link(), Cfg = [{peer_types, peer_types}, {type_index, Index}, {type_fetch_fn, fun(_, _) -> {ok, term_codec:encode(TRdata)} end}], pipeline:apply_object_schema(ActValid, Cfg) =:= ok\") :name)")
;; fetched record, failing object -> rejected
(epoch 13)
(eval "(get (erlang-eval-ast \"${SETUP} peer_types:start_link(), Cfg = [{peer_types, peer_types}, {type_index, Index}, {type_fetch_fn, fun(_, _) -> {ok, term_codec:encode(TRdata)} end}], pipeline:apply_object_schema(ActFail, Cfg) =:= FAIL\") :name)")
;; unknown type, fetch fails, strict not set -> accepted (skipped)
(epoch 14)
(eval "(get (erlang-eval-ast \"${SETUP} peer_types:start_link(), Cfg = [{peer_types, peer_types}, {type_index, Index}, {type_fetch_fn, fun(_, _) -> {error, http_404} end}], pipeline:apply_object_schema(ActValid, Cfg) =:= ok\") :name)")
;; unknown type, fetch fails, strict set -> rejected
(epoch 15)
(eval "(get (erlang-eval-ast \"${SETUP} peer_types:start_link(), Cfg = [{peer_types, peer_types}, {type_index, Index}, {type_fetch_fn, fun(_, _) -> {error, http_404} end}, {strict_object_schema, true}], pipeline:apply_object_schema(ActValid, Cfg) =:= FAIL\") :name)")
;; no peer_types cfg at all, non-strict -> accepted (skipped)
(epoch 16)
(eval "(get (erlang-eval-ast \"${SETUP} Cfg = [{type_index, Index}], pipeline:apply_object_schema(ActValid, Cfg) =:= ok\") :name)")
;; no peer_types cfg, strict -> rejected
(epoch 17)
(eval "(get (erlang-eval-ast \"${SETUP} Cfg = [{type_index, Index}, {strict_object_schema, true}], pipeline:apply_object_schema(ActValid, Cfg) =:= FAIL\") :name)")
;; object without inner {type, _} -> skipped (accepted)
(epoch 18)
(eval "(get (erlang-eval-ast \"${SETUP} peer_types:start_link(), peer_types:put(Cid, TRdata), Cfg = [{peer_types, peer_types}, {type_index, Index}], pipeline:apply_object_schema(ActNoType, Cfg) =:= ok\") :name)")
;; object type not in the local index -> skipped (open-world)
(epoch 19)
(eval "(get (erlang-eval-ast \"${SETUP} peer_types:start_link(), peer_types:put(Cid, TRdata), Cfg = [{peer_types, peer_types}, {type_index, Index}], pipeline:apply_object_schema(ActUnknown, Cfg) =:= ok\") :name)")
;; Erlang-predicate refinement schema: valid -> ok, failing -> reject
(epoch 20)
(eval "(get (erlang-eval-ast \"${SETUP} peer_types:start_link(), peer_types:put(Cid, TRfun), Cfg = [{peer_types, peer_types}, {type_index, Index}], pipeline:apply_object_schema(ActValid, Cfg) =:= ok\") :name)")
(epoch 21)
(eval "(get (erlang-eval-ast \"${SETUP} peer_types:start_link(), peer_types:put(Cid, TRfun), Cfg = [{peer_types, peer_types}, {type_index, Index}], pipeline:apply_object_schema(ActFail, Cfg) =:= FAIL\") :name)")
;; type known but record carries no refinement schema -> accepted
(epoch 22)
(eval "(get (erlang-eval-ast \"${SETUP} peer_types:start_link(), peer_types:put(Cid, [{name, PostName}]), Cfg = [{peer_types, peer_types}, {type_index, Index}], pipeline:apply_object_schema(ActFail, Cfg) =:= ok\") :name)")
;; stage_object_schema/1 yields a 1-arity stage usable by run_stages
(epoch 23)
(eval "(get (erlang-eval-ast \"${SETUP} peer_types:start_link(), peer_types:put(Cid, TRdata), Cfg = [{peer_types, peer_types}, {type_index, Index}], Stage = pipeline:stage_object_schema(Cfg), is_function(Stage, 1) andalso pipeline:run_stages(ActValid, [Stage]) =:= ok andalso pipeline:run_stages(ActFail, [Stage]) =:= FAIL\") :name)")
EPOCHS
OUTPUT=$(timeout 300 "$SX_SERVER" < "$TMPFILE" 2>/dev/null)
check() {
local epoch="$1" desc="$2" expected="$3"
local actual
actual=$(echo "$OUTPUT" | awk -v e="$epoch" '
$0 ~ "^\\(ok-len " e " " { getline; print; exit }
$0 ~ "^\\(ok " e " " { print; exit }
$0 ~ "^\\(error " e " " { print; exit }
')
[ -z "$actual" ] && actual="<no output for epoch $epoch>"
if echo "$actual" | grep -qF -- "$expected"; then
PASS=$((PASS+1))
[ "$VERBOSE" = "-v" ] && echo " ok $desc"
else
FAIL=$((FAIL+1))
ERRORS+=" FAIL [$desc] (epoch $epoch) expected: $expected | actual: $actual
"
fi
}
check 6 "pipeline module loaded" "pipeline"
check 10 "local match + valid -> accepted" "true"
check 11 "local match + failing -> rejected" "true"
check 12 "fetch ok -> validates fetched record" "true"
check 13 "fetched record + failing -> rejected" "true"
check 14 "fetch fail, non-strict -> accepted" "true"
check 15 "fetch fail, strict -> rejected" "true"
check 16 "no peer_types, non-strict -> accepted" "true"
check 17 "no peer_types, strict -> rejected" "true"
check 18 "object without type -> skipped" "true"
check 19 "type not in index -> skipped" "true"
check 20 "fun schema valid -> accepted" "true"
check 21 "fun schema failing -> rejected" "true"
check 22 "no refinement schema -> accepted" "true"
check 23 "stage_object_schema composes" "true"
TOTAL=$((PASS+FAIL))
if [ $FAIL -eq 0 ]; then
echo "ok $PASS/$TOTAL next/tests/object_schema.sh passed"
else
echo "FAIL $PASS/$TOTAL passed, $FAIL failed:"
echo "$ERRORS"
fi
[ $FAIL -eq 0 ]

155
next/tests/peer_types.sh Executable file
View File

@@ -0,0 +1,155 @@
#!/usr/bin/env bash
# next/tests/peer_types.sh — host-type federation Phase 2 acceptance.
#
# Receiver-side peer-types cache (next/kernel/peer_types.erl), a mirror
# of peer_actors keyed by type CID. Tracks {TypeCidBytes, TypeRecord}
# pairs so the object-schema validation stage can vet inbound objects
# against a fetched-once refinement type. lookup_or_fetch pulls a
# Cfg-supplied type_fetch_fn on a miss, decodes the returned wire bytes
# via term_codec, and caches the TypeRecord.
set -uo pipefail
cd "$(git rev-parse --show-toplevel)"
SX_SERVER="${SX_SERVER:-hosts/ocaml/_build/default/bin/sx_server.exe}"
if [ ! -x "$SX_SERVER" ]; then
SX_SERVER="/root/rose-ash/hosts/ocaml/_build/default/bin/sx_server.exe"
fi
if [ ! -x "$SX_SERVER" ]; then
echo "ERROR: sx_server.exe not found." >&2
exit 1
fi
VERBOSE="${1:-}"
PASS=0; FAIL=0; ERRORS=""
TMPFILE=$(mktemp); trap "rm -f $TMPFILE" EXIT
# TR1/TR2 are TypeRecords (the DefineType :object payloads). Doc1 is
# TR1's on-wire form (term_codec). FetchOk serves Doc1 for Cid1;
# FetchBad returns undecodable bytes. CfgOk/CfgBad/CfgNone vary the
# type_fetch_fn slot.
SETUP='Cid1 = <<98,97,102,121,49>>, Cid2 = <<98,97,102,121,50>>, TR1 = [{name, <<80,111,115,116>>}, {instance_type, <<78,111,116,101>>}], TR2 = [{name, <<82,101,112,108,121>>}], Doc1 = term_codec:encode(TR1), FetchOk = fun(C, _) -> case C =:= Cid1 of true -> {ok, Doc1}; false -> {error, not_found} end end, FetchBad = fun(_, _) -> {ok, <<255>>} end, CfgOk = [{type_fetch_fn, FetchOk}], CfgBad = [{type_fetch_fn, FetchBad}], CfgNone = [],'
cat > "$TMPFILE" <<EPOCHS
(epoch 1)
(load "lib/erlang/tokenizer.sx")
(load "lib/erlang/parser.sx")
(load "lib/erlang/parser-core.sx")
(load "lib/erlang/parser-expr.sx")
(load "lib/erlang/parser-module.sx")
(load "lib/erlang/transpile.sx")
(load "lib/erlang/runtime.sx")
(load "lib/erlang/vm/dispatcher.sx")
(epoch 2)
(eval "(er-load-gen-server!)")
(epoch 3)
(eval "(get (erlang-load-module (file-read \"next/kernel/term_codec.erl\")) :name)")
(epoch 4)
(eval "(get (erlang-load-module (file-read \"next/kernel/peer_types.erl\")) :name)")
;; ── pure API ───────────────────────────────────────────────
;; new/0 -> []
(epoch 10)
(eval "(get (erlang-eval-ast \"peer_types:new() =:= []\") :name)")
;; lookup miss -> not_found
(epoch 11)
(eval "(get (erlang-eval-ast \"peer_types:lookup(<<1>>, peer_types:new()) =:= not_found\") :name)")
;; store + lookup round-trip
(epoch 12)
(eval "(get (erlang-eval-ast \"${SETUP} S = peer_types:store(Cid1, TR1, peer_types:new()), peer_types:lookup(Cid1, S) =:= {ok, TR1}\") :name)")
;; types/1 lists CIDs in insertion order
(epoch 13)
(eval "(get (erlang-eval-ast \"${SETUP} S = peer_types:store(Cid2, TR2, peer_types:store(Cid1, TR1, peer_types:new())), peer_types:types(S) =:= [Cid1, Cid2]\") :name)")
;; evict removes the entry
(epoch 14)
(eval "(get (erlang-eval-ast \"${SETUP} S = peer_types:evict(Cid1, peer_types:store(Cid1, TR1, peer_types:new())), peer_types:lookup(Cid1, S) =:= not_found\") :name)")
;; ── lookup_or_fetch (pure) ─────────────────────────────────
;; miss -> fetch via Cfg.fn, decode bytes, cache TR
(epoch 20)
(eval "(get (erlang-eval-ast \"${SETUP} case peer_types:lookup_or_fetch(Cid1, CfgOk, peer_types:new()) of {ok, TR1, [{Cid1, TR1}]} -> ok; _ -> bad end\") :name)")
;; hit -> returns cached without calling fetch
(epoch 21)
(eval "(get (erlang-eval-ast \"${SETUP} S = peer_types:store(Cid1, TR1, peer_types:new()), case peer_types:lookup_or_fetch(Cid1, CfgBad, S) of {ok, TR1, S} -> ok; _ -> bad end\") :name)")
;; no type_fetch_fn -> {error, no_fetch_fn}, cache untouched
(epoch 22)
(eval "(get (erlang-eval-ast \"${SETUP} case peer_types:lookup_or_fetch(Cid1, CfgNone, peer_types:new()) of {error, no_fetch_fn, []} -> ok; _ -> bad end\") :name)")
;; fetch error does NOT poison the cache
(epoch 23)
(eval "(get (erlang-eval-ast \"${SETUP} BadCfg = [{type_fetch_fn, fun(_, _) -> {error, http_404} end}], case peer_types:lookup_or_fetch(Cid1, BadCfg, peer_types:new()) of {error, http_404, []} -> ok; _ -> bad end\") :name)")
;; undecodable bytes -> {error, bad_type_doc}
(epoch 24)
(eval "(get (erlang-eval-ast \"${SETUP} case peer_types:lookup_or_fetch(Cid1, CfgBad, peer_types:new()) of {error, bad_type_doc, []} -> ok; _ -> bad end\") :name)")
;; ── gen_server API ─────────────────────────────────────────
;; start_link + put + lookup round-trip
(epoch 30)
(eval "(get (erlang-eval-ast \"${SETUP} peer_types:start_link(), peer_types:put(Cid1, TR1), peer_types:lookup(Cid1) =:= {ok, TR1}\") :name)")
;; lookup miss -> not_found
(epoch 31)
(eval "(get (erlang-eval-ast \"peer_types:start_link(), peer_types:lookup(<<9>>) =:= not_found\") :name)")
;; state_for is an alias of lookup
(epoch 32)
(eval "(get (erlang-eval-ast \"${SETUP} peer_types:start_link(), peer_types:put(Cid1, TR1), peer_types:state_for(Cid1) =:= {ok, TR1}\") :name)")
;; known_types lists stored CIDs
(epoch 33)
(eval "(get (erlang-eval-ast \"${SETUP} peer_types:start_link(), peer_types:put(Cid1, TR1), peer_types:put(Cid2, TR2), peer_types:known_types() =:= [Cid1, Cid2]\") :name)")
;; lookup_or_fetch miss fetches + caches
(epoch 34)
(eval "(get (erlang-eval-ast \"${SETUP} peer_types:start_link(), R = peer_types:lookup_or_fetch(Cid1, CfgOk), R =:= {ok, TR1} andalso peer_types:known_types() =:= [Cid1]\") :name)")
;; lookup_or_fetch with no fn -> {error, no_fetch_fn}, pristine
(epoch 35)
(eval "(get (erlang-eval-ast \"${SETUP} peer_types:start_link(), R = peer_types:lookup_or_fetch(Cid1, CfgNone), R =:= {error, no_fetch_fn} andalso peer_types:known_types() =:= []\") :name)")
;; start_link/1 pre-populates the cache
(epoch 36)
(eval "(get (erlang-eval-ast \"${SETUP} peer_types:start_link([{Cid1, TR1}]), peer_types:lookup(Cid1) =:= {ok, TR1}\") :name)")
EPOCHS
OUTPUT=$(timeout 300 "$SX_SERVER" < "$TMPFILE" 2>/dev/null)
check() {
local epoch="$1" desc="$2" expected="$3"
local actual
actual=$(echo "$OUTPUT" | awk -v e="$epoch" '
$0 ~ "^\\(ok-len " e " " { getline; print; exit }
$0 ~ "^\\(ok " e " " { print; exit }
$0 ~ "^\\(error " e " " { print; exit }
')
[ -z "$actual" ] && actual="<no output for epoch $epoch>"
if echo "$actual" | grep -qF -- "$expected"; then
PASS=$((PASS+1))
[ "$VERBOSE" = "-v" ] && echo " ok $desc"
else
FAIL=$((FAIL+1))
ERRORS+=" FAIL [$desc] (epoch $epoch) expected: $expected | actual: $actual
"
fi
}
check 4 "peer_types module loaded" "peer_types"
check 10 "new/0 -> []" "true"
check 11 "lookup miss -> not_found" "true"
check 12 "store + lookup round-trip" "true"
check 13 "types/1 lists in insertion order" "true"
check 14 "evict removes entry" "true"
check 20 "lookup_or_fetch miss fetches" "ok"
check 21 "lookup_or_fetch hit skips fetch" "ok"
check 22 "no fetch_fn -> no_fetch_fn" "ok"
check 23 "fetch error doesn't poison" "ok"
check 24 "undecodable bytes -> bad_type_doc" "ok"
check 30 "gen_server put + lookup" "true"
check 31 "gen_server lookup miss" "true"
check 32 "gen_server state_for alias" "true"
check 33 "gen_server known_types lists" "true"
check 34 "gen_server fetch + cache" "true"
check 35 "gen_server no fn -> pristine" "true"
check 36 "start_link/1 pre-populates" "true"
TOTAL=$((PASS+FAIL))
if [ $FAIL -eq 0 ]; then
echo "ok $PASS/$TOTAL next/tests/peer_types.sh passed"
else
echo "FAIL $PASS/$TOTAL passed, $FAIL failed:"
echo "$ERRORS"
fi
[ $FAIL -eq 0 ]

140
next/tests/peer_types_route.sh Executable file
View File

@@ -0,0 +1,140 @@
#!/usr/bin/env bash
# next/tests/peer_types_route.sh — host-type federation Phase 3.
#
# Server side of the type-doc wire: http_server serves
# GET /types/<cid> Accept: application/vnd.fed-sx.type-doc
# as the term_codec-encoded TypeRecord pulled from the peer_types
# cache; 404 if the cid isn't cached. Exercised via http_server:route
# in-process (the established pattern — see http_actors.sh) so the
# route resolution + content negotiation are tested without a live
# socket. The peer_types gen_server holds the cache across epochs.
set -uo pipefail
cd "$(git rev-parse --show-toplevel)"
SX_SERVER="${SX_SERVER:-hosts/ocaml/_build/default/bin/sx_server.exe}"
if [ ! -x "$SX_SERVER" ]; then
SX_SERVER="/root/rose-ash/hosts/ocaml/_build/default/bin/sx_server.exe"
fi
if [ ! -x "$SX_SERVER" ]; then
echo "ERROR: sx_server.exe not found." >&2
exit 1
fi
VERBOSE="${1:-}"
PASS=0; FAIL=0; ERRORS=""
TMPFILE=$(mktemp); trap "rm -f $TMPFILE" EXIT
# TR is the served TypeRecord, Cid its key. AccV is the type-doc
# Accept header value, CT the content-type key. Cfg opts the route
# into the peer_types cache. ReqHit / ReqMiss / ReqEmpty / ReqPost
# vary the request line.
SETUP='TR = [{name, <<80,111,115,116>>}, {instance_type, <<78,111,116,101>>}], Cid = <<98,97,102,121,49>>, peer_types:start_link(), peer_types:put(Cid, TR), AcK = <<97,99,99,101,112,116>>, AcV = <<97,112,112,108,105,99,97,116,105,111,110,47,118,110,100,46,102,101,100,45,115,120,46,116,121,112,101,45,100,111,99>>, Hs = [{AcK, AcV}], Cfg = [{peer_types, peer_types}],'
cat > "$TMPFILE" <<EPOCHS
(epoch 1)
(load "lib/erlang/tokenizer.sx")
(load "lib/erlang/parser.sx")
(load "lib/erlang/parser-core.sx")
(load "lib/erlang/parser-expr.sx")
(load "lib/erlang/parser-module.sx")
(load "lib/erlang/transpile.sx")
(load "lib/erlang/runtime.sx")
(load "lib/erlang/vm/dispatcher.sx")
(epoch 2)
(eval "(er-load-gen-server!)")
(epoch 3)
(eval "(get (erlang-load-module (file-read \"next/kernel/envelope.erl\")) :name)")
(epoch 4)
(eval "(get (erlang-load-module (file-read \"next/kernel/term_codec.erl\")) :name)")
(epoch 5)
(eval "(get (erlang-load-module (file-read \"next/kernel/peer_types.erl\")) :name)")
(epoch 6)
(eval "(get (erlang-load-module (file-read \"next/kernel/http_server.erl\")) :name)")
;; ── negotiation + prefix primitives ────────────────────────
;; Accept: type-doc negotiates to the type_doc format atom
(epoch 10)
(eval "(get (erlang-eval-ast \"http_server:accept_format(<<97,112,112,108,105,99,97,116,105,111,110,47,118,110,100,46,102,101,100,45,115,120,46,116,121,112,101,45,100,111,99>>) =:= type_doc\") :name)")
;; type_doc content type is 31 bytes
(epoch 11)
(eval "(get (erlang-eval-ast \"byte_size(http_server:content_type_for(type_doc)) =:= 31\") :name)")
;; types_prefix is "/types/" — 7 bytes
(epoch 12)
(eval "(get (erlang-eval-ast \"byte_size(http_server:types_prefix()) =:= 7\") :name)")
;; ── GET /types/<cid> ───────────────────────────────────────
;; cache hit -> 200
(epoch 20)
(eval "(get (erlang-eval-ast \"${SETUP} Req = [{method, <<71,69,84>>}, {path, <<47,116,121,112,101,115,47,98,97,102,121,49>>}, {headers, Hs}], R = http_server:route(Req, Cfg), {ok, S} = envelope:get_field(status, R), S =:= 200\") :name)")
;; body decodes back to the stored TypeRecord
(epoch 21)
(eval "(get (erlang-eval-ast \"${SETUP} Req = [{method, <<71,69,84>>}, {path, <<47,116,121,112,101,115,47,98,97,102,121,49>>}, {headers, Hs}], R = http_server:route(Req, Cfg), {ok, B} = envelope:get_field(body, R), {ok, DTR, _} = term_codec:decode(B), DTR =:= TR\") :name)")
;; response carries the type-doc content type
(epoch 22)
(eval "(get (erlang-eval-ast \"${SETUP} Req = [{method, <<71,69,84>>}, {path, <<47,116,121,112,101,115,47,98,97,102,121,49>>}, {headers, Hs}], R = http_server:route(Req, Cfg), {ok, Hdrs} = envelope:get_field(headers, R), {_CTK, CTV} = hd(Hdrs), CTV =:= http_server:content_type_for(type_doc)\") :name)")
;; type_doc_response_for/2 direct: known cid -> 200
(epoch 23)
(eval "(get (erlang-eval-ast \"${SETUP} R = http_server:type_doc_response_for(Cid, Cfg), {ok, S} = envelope:get_field(status, R), S =:= 200\") :name)")
;; ── misses + wrong method ──────────────────────────────────
;; unknown cid -> 404
(epoch 30)
(eval "(get (erlang-eval-ast \"${SETUP} Req = [{method, <<71,69,84>>}, {path, <<47,116,121,112,101,115,47,122,122,122>>}, {headers, Hs}], R = http_server:route(Req, Cfg), {ok, S} = envelope:get_field(status, R), S =:= 404\") :name)")
;; empty cid (GET /types/) -> 404
(epoch 31)
(eval "(get (erlang-eval-ast \"${SETUP} Req = [{method, <<71,69,84>>}, {path, <<47,116,121,112,101,115,47>>}, {headers, Hs}], R = http_server:route(Req, Cfg), {ok, S} = envelope:get_field(status, R), S =:= 404\") :name)")
;; no peer_types cfg -> 404 even for a known cid
(epoch 32)
(eval "(get (erlang-eval-ast \"${SETUP} Req = [{method, <<71,69,84>>}, {path, <<47,116,121,112,101,115,47,98,97,102,121,49>>}, {headers, Hs}], R = http_server:route(Req, []), {ok, S} = envelope:get_field(status, R), S =:= 404\") :name)")
;; POST /types/<cid> -> 404 (only GET serves type docs)
(epoch 33)
(eval "(get (erlang-eval-ast \"${SETUP} Req = [{method, <<80,79,83,84>>}, {path, <<47,116,121,112,101,115,47,98,97,102,121,49>>}, {headers, Hs}], R = http_server:route(Req, Cfg), {ok, S} = envelope:get_field(status, R), S =:= 404\") :name)")
;; existing routes intact: GET / still 200
(epoch 34)
(eval "(get (erlang-eval-ast \"${SETUP} Req = [{method, <<71,69,84>>}, {path, <<47>>}], R = http_server:route(Req, Cfg), {ok, S} = envelope:get_field(status, R), S =:= 200\") :name)")
EPOCHS
OUTPUT=$(timeout 300 "$SX_SERVER" < "$TMPFILE" 2>/dev/null)
check() {
local epoch="$1" desc="$2" expected="$3"
local actual
actual=$(echo "$OUTPUT" | awk -v e="$epoch" '
$0 ~ "^\\(ok-len " e " " { getline; print; exit }
$0 ~ "^\\(ok " e " " { print; exit }
$0 ~ "^\\(error " e " " { print; exit }
')
[ -z "$actual" ] && actual="<no output for epoch $epoch>"
if echo "$actual" | grep -qF -- "$expected"; then
PASS=$((PASS+1))
[ "$VERBOSE" = "-v" ] && echo " ok $desc"
else
FAIL=$((FAIL+1))
ERRORS+=" FAIL [$desc] (epoch $epoch) expected: $expected | actual: $actual
"
fi
}
check 6 "http_server module loaded" "http_server"
check 10 "Accept type-doc -> type_doc" "true"
check 11 "type_doc content type = 31 bytes" "true"
check 12 "types_prefix = 7 bytes" "true"
check 20 "GET /types/<cid> hit -> 200" "true"
check 21 "body decodes to TypeRecord" "true"
check 22 "response is type-doc content type" "true"
check 23 "type_doc_response_for hit -> 200" "true"
check 30 "unknown cid -> 404" "true"
check 31 "empty cid -> 404" "true"
check 32 "no peer_types cfg -> 404" "true"
check 33 "POST /types/<cid> -> 404" "true"
check 34 "existing GET / route intact" "true"
TOTAL=$((PASS+FAIL))
if [ $FAIL -eq 0 ]; then
echo "ok $PASS/$TOTAL next/tests/peer_types_route.sh passed"
else
echo "FAIL $PASS/$TOTAL passed, $FAIL failed:"
echo "$ERRORS"
fi
[ $FAIL -eq 0 ]

129
next/tests/pipeline_triggers.sh Executable file
View File

@@ -0,0 +1,129 @@
#!/usr/bin/env bash
# next/tests/pipeline_triggers.sh — fed-sx triggers Phase 2.
#
# pipeline:apply_triggers/3 is the post-append fan-out: a successfully
# appended activity has its type looked up in the trigger registry, and
# each surviving spec (guard + actor-scope pass, not already fired) is
# dispatched to a durable flow. Confirms lookup -> dispatch, no-match,
# guard rejection, {activity,trigger}-cid dedup, multi-bind, graceful
# handling of an unknown flow and a crashing flow, and the cfg gate.
set -uo pipefail
cd "$(git rev-parse --show-toplevel)"
SX_SERVER="${SX_SERVER:-hosts/ocaml/_build/default/bin/sx_server.exe}"
if [ ! -x "$SX_SERVER" ]; then
SX_SERVER="/root/rose-ash/hosts/ocaml/_build/default/bin/sx_server.exe"
fi
if [ ! -x "$SX_SERVER" ]; then
echo "ERROR: sx_server.exe not found." >&2
exit 1
fi
VERBOSE="${1:-}"
PASS=0; FAIL=0; ERRORS=""
TMPFILE=$(mktemp); trap "rm -f $TMPFILE" EXIT
ACT='[{type, create}, {actor, alice}, {id, <<97,99,105,100>>}, {object, [{type, note}]}]'
AS='[{actor_id, alice}]'
CFG='[{trigger_registry, trigger_registry}]'
DONEF='flow_spec:flow_const(ran)'
BOOMF='flow_spec:flow_node(fun(_) -> error(kaboom) end)'
cat > "$TMPFILE" <<EPOCHS
(epoch 1)
(load "lib/erlang/tokenizer.sx")
(load "lib/erlang/parser.sx")
(load "lib/erlang/parser-core.sx")
(load "lib/erlang/parser-expr.sx")
(load "lib/erlang/parser-module.sx")
(load "lib/erlang/transpile.sx")
(load "lib/erlang/runtime.sx")
(load "lib/erlang/vm/dispatcher.sx")
(epoch 2)
(eval "(er-load-gen-server!)")
(eval "(get (erlang-load-module (file-read \"next/kernel/envelope.erl\")) :name)")
(eval "(get (erlang-load-module (file-read \"next/flow/flow.erl\")) :name)")
(eval "(get (erlang-load-module (file-read \"next/flow/flow_spec.erl\")) :name)")
(eval "(get (erlang-load-module (file-read \"next/flow/flow_store.erl\")) :name)")
(eval "(get (erlang-load-module (file-read \"next/kernel/trigger_registry.erl\")) :name)")
(eval "(get (erlang-load-module (file-read \"next/kernel/flow_dispatch.erl\")) :name)")
(epoch 3)
(eval "(get (erlang-load-module (file-read \"next/kernel/pipeline.erl\")) :name)")
;; ── lookup -> dispatch ─────────────────────────────────────
(epoch 10)
(eval "(get (erlang-eval-ast \"trigger_registry:start_link(), trigger_registry:add(create, trigger_registry:mk_spec(<<116,99>>, ranflow, undefined, any)), flow_store:start_link(), flow_store:register_flow(ranflow, ${DONEF}), pipeline:apply_triggers(${ACT}, ${AS}, ${CFG}) =:= {ok, [{<<97,99,105,100>>, <<116,99>>, {ok, 1}}]}\") :name)")
;; the dispatched flow really ran (instance recorded done)
(epoch 11)
(eval "(get (erlang-eval-ast \"trigger_registry:start_link(), trigger_registry:add(create, trigger_registry:mk_spec(<<116,99>>, ranflow, undefined, any)), flow_store:start_link(), flow_store:register_flow(ranflow, ${DONEF}), pipeline:apply_triggers(${ACT}, ${AS}, ${CFG}), flow_store:status(1) =:= {ok, {done, ran}}\") :name)")
;; ── no matching trigger -> no dispatch ─────────────────────
(epoch 20)
(eval "(get (erlang-eval-ast \"trigger_registry:start_link(), flow_store:start_link(), pipeline:apply_triggers(${ACT}, ${AS}, ${CFG}) =:= {ok, []}\") :name)")
;; ── guard returns false -> no dispatch ─────────────────────
(epoch 30)
(eval "(get (erlang-eval-ast \"trigger_registry:start_link(), trigger_registry:add(create, trigger_registry:mk_spec(<<116,99>>, ranflow, fun(_, _) -> false end, any)), flow_store:start_link(), flow_store:register_flow(ranflow, ${DONEF}), pipeline:apply_triggers(${ACT}, ${AS}, ${CFG}) =:= {ok, []}\") :name)")
;; ── dedup: already-fired {activity,trigger} pair -> skipped ─
(epoch 40)
(eval "(get (erlang-eval-ast \"trigger_registry:start_link(), trigger_registry:add(create, trigger_registry:mk_spec(<<116,99>>, ranflow, undefined, any)), flow_store:start_link(), flow_store:register_flow(ranflow, ${DONEF}), pipeline:apply_triggers(${ACT}, [{actor_id, alice}, {triggers_fired, [{<<97,99,105,100>>, <<116,99>>}]}], ${CFG}) =:= {ok, []}\") :name)")
;; ── multiple triggers for the same type -> each dispatched ─
(epoch 50)
(eval "(get (erlang-eval-ast \"trigger_registry:start_link(), trigger_registry:add(create, trigger_registry:mk_spec(<<116,49>>, ranflow, undefined, any)), trigger_registry:add(create, trigger_registry:mk_spec(<<116,50>>, ranflow, undefined, any)), flow_store:start_link(), flow_store:register_flow(ranflow, ${DONEF}), {ok, Rs} = pipeline:apply_triggers(${ACT}, ${AS}, ${CFG}), length(Rs) =:= 2\") :name)")
;; ── unknown flow name -> {error, _} in results, no crash ───
(epoch 60)
(eval "(get (erlang-eval-ast \"trigger_registry:start_link(), trigger_registry:add(create, trigger_registry:mk_spec(<<116,99>>, ghostflow, undefined, any)), flow_store:start_link(), pipeline:apply_triggers(${ACT}, ${AS}, ${CFG}) =:= {ok, [{<<97,99,105,100>>, <<116,99>>, {error, no_such_flow}}]}\") :name)")
;; ── crashing flow -> isolated as {error, {flow_crashed, _}} ─
(epoch 61)
(eval "(get (erlang-eval-ast \"trigger_registry:start_link(), trigger_registry:add(create, trigger_registry:mk_spec(<<116,99>>, boom, undefined, any)), flow_store:start_link(), flow_store:register_flow(boom, ${BOOMF}), {ok, [{_, _, Outcome}]} = pipeline:apply_triggers(${ACT}, ${AS}, ${CFG}), case Outcome of {error, {flow_crashed, _}} -> true; _ -> false end\") :name)")
;; ── no trigger_registry cfg -> {ok, []} ────────────────────
(epoch 70)
(eval "(get (erlang-eval-ast \"trigger_registry:start_link(), trigger_registry:add(create, trigger_registry:mk_spec(<<116,99>>, ranflow, undefined, any)), flow_store:start_link(), flow_store:register_flow(ranflow, ${DONEF}), pipeline:apply_triggers(${ACT}, ${AS}, []) =:= {ok, []}\") :name)")
EPOCHS
OUTPUT=$(timeout 360 "$SX_SERVER" < "$TMPFILE" 2>/dev/null)
check() {
local epoch="$1" desc="$2" expected="$3"
local actual
actual=$(echo "$OUTPUT" | awk -v e="$epoch" '
$0 ~ "^\\(ok-len " e " " { getline; print; exit }
$0 ~ "^\\(ok " e " " { print; exit }
$0 ~ "^\\(error " e " " { print; exit }
')
[ -z "$actual" ] && actual="<no output for epoch $epoch>"
if echo "$actual" | grep -qF -- "$expected"; then
PASS=$((PASS+1))
[ "$VERBOSE" = "-v" ] && echo " ok $desc"
else
FAIL=$((FAIL+1))
ERRORS+=" FAIL [$desc] (epoch $epoch) expected: $expected | actual: $actual
"
fi
}
check 3 "pipeline module loaded" "pipeline"
check 10 "lookup -> dispatch (audit)" "true"
check 11 "dispatched flow actually ran" "true"
check 20 "no matching trigger -> no dispatch" "true"
check 30 "guard false -> no dispatch" "true"
check 40 "dedup already-fired -> skipped" "true"
check 50 "multi-bind: each dispatched" "true"
check 60 "unknown flow -> error in results" "true"
check 61 "crashing flow isolated" "true"
check 70 "no registry cfg -> no dispatch" "true"
TOTAL=$((PASS+FAIL))
if [ $FAIL -eq 0 ]; then
echo "ok $PASS/$TOTAL next/tests/pipeline_triggers.sh passed"
else
echo "FAIL $PASS/$TOTAL passed, $FAIL failed:"
echo "$ERRORS"
fi
[ $FAIL -eq 0 ]

103
next/tests/subtype_of.sh Executable file
View File

@@ -0,0 +1,103 @@
#!/usr/bin/env bash
# next/tests/subtype_of.sh — host-type federation Phase 1 acceptance.
#
# The SubtypeOf genesis verb (next/genesis/activity-types/subtype_of.sx)
# records a hierarchy edge between two previously-defined types. This
# suite confirms:
# - the file parses with the expected DefineActivity head + :name
# - the :schema predicate accepts an edge carrying both CIDs and
# rejects edges missing either side
# - a SubtypeOf envelope round-trips through term_codec
#
# Schema bodies are SX source; we eval them with `eval-expr` and call
# the resulting lambda directly. 7 cases.
set -uo pipefail
cd "$(git rev-parse --show-toplevel)"
SX_SERVER="${SX_SERVER:-hosts/ocaml/_build/default/bin/sx_server.exe}"
if [ ! -x "$SX_SERVER" ]; then
SX_SERVER="/root/rose-ash/hosts/ocaml/_build/default/bin/sx_server.exe"
fi
if [ ! -x "$SX_SERVER" ]; then
echo "ERROR: sx_server.exe not found." >&2
exit 1
fi
VERBOSE="${1:-}"
PASS=0; FAIL=0; ERRORS=""
TMPFILE=$(mktemp); trap "rm -f $TMPFILE" EXIT
SCH='(eval-expr (get (apply dict (rest (parse (file-read \"next/genesis/activity-types/subtype_of.sx\")))) :schema))'
cat > "$TMPFILE" <<EPOCHS
(epoch 1)
(load "lib/erlang/tokenizer.sx")
(load "lib/erlang/parser.sx")
(load "lib/erlang/parser-core.sx")
(load "lib/erlang/parser-expr.sx")
(load "lib/erlang/parser-module.sx")
(load "lib/erlang/transpile.sx")
(load "lib/erlang/runtime.sx")
(load "lib/erlang/vm/dispatcher.sx")
(epoch 2)
(eval "(get (erlang-load-module (file-read \"next/kernel/term_codec.erl\")) :name)")
;; ── parse / shape ──────────────────────────────────────────
(epoch 10)
(eval "(first (parse (file-read \"next/genesis/activity-types/subtype_of.sx\")))")
(epoch 11)
(eval "(get (apply dict (rest (parse (file-read \"next/genesis/activity-types/subtype_of.sx\")))) :name)")
;; ── schema accept / reject ─────────────────────────────────
;; valid: both CIDs present + strings -> true
(epoch 20)
(eval "(define sch ${SCH}) (sch (dict :object (dict :child-type-cid \"bafyChild\" :parent-type-cid \"bafyParent\")))")
;; reject: missing :child-type-cid -> false
(epoch 21)
(eval "(define sch ${SCH}) (sch (dict :object (dict :parent-type-cid \"bafyParent\")))")
;; reject: missing :parent-type-cid -> false
(epoch 22)
(eval "(define sch ${SCH}) (sch (dict :object (dict :child-type-cid \"bafyChild\")))")
;; ── envelope round-trip through term_codec ─────────────────
(epoch 30)
(eval "(get (erlang-eval-ast \"A = [{type, subtype_of}, {actor, alice}, {object, [{child_type_cid, <<99,104>>}, {parent_type_cid, <<112,97>>}]}], {ok, D, _} = term_codec:decode(term_codec:encode(A)), D =:= A\") :name)")
EPOCHS
OUTPUT=$(timeout 180 "$SX_SERVER" < "$TMPFILE" 2>/dev/null)
check() {
local epoch="$1" desc="$2" expected="$3"
local actual
actual=$(echo "$OUTPUT" | awk -v e="$epoch" '
$0 ~ "^\\(ok-len " e " " { getline; print; exit }
$0 ~ "^\\(ok " e " " { print; exit }
$0 ~ "^\\(error " e " " { print; exit }
')
[ -z "$actual" ] && actual="<no output for epoch $epoch>"
if echo "$actual" | grep -qF -- "$expected"; then
PASS=$((PASS+1))
[ "$VERBOSE" = "-v" ] && echo " ok $desc"
else
FAIL=$((FAIL+1))
ERRORS+=" FAIL [$desc] (epoch $epoch) expected: $expected | actual: $actual
"
fi
}
check 10 "subtype_of.sx head form" "DefineActivity"
check 11 "subtype_of.sx name" "SubtypeOf"
check 20 "schema accepts edge with 2 CIDs" "true"
check 21 "schema rejects missing child CID" "false"
check 22 "schema rejects missing parent CID" "false"
check 30 "SubtypeOf envelope round-trips" "true"
TOTAL=$((PASS+FAIL))
if [ $FAIL -eq 0 ]; then
echo "ok $PASS/$TOTAL next/tests/subtype_of.sh passed"
else
echo "FAIL $PASS/$TOTAL passed, $FAIL failed:"
echo "$ERRORS"
fi
[ $FAIL -eq 0 ]

143
next/tests/trigger_registry.sh Executable file
View File

@@ -0,0 +1,143 @@
#!/usr/bin/env bash
# next/tests/trigger_registry.sh — fed-sx triggers Phase 1 (registry).
#
# trigger_registry binds activity-types to durable flows. The kernel's
# post-append fan-out (Phase 2) looks an arriving activity's type up
# here and starts each registered flow. Mirrors peer_actors / peer_types:
# a pure core + a gen_server, hydrated from a fold over DefineTrigger
# activities.
set -uo pipefail
cd "$(git rev-parse --show-toplevel)"
SX_SERVER="${SX_SERVER:-hosts/ocaml/_build/default/bin/sx_server.exe}"
if [ ! -x "$SX_SERVER" ]; then
SX_SERVER="/root/rose-ash/hosts/ocaml/_build/default/bin/sx_server.exe"
fi
if [ ! -x "$SX_SERVER" ]; then
echo "ERROR: sx_server.exe not found." >&2
exit 1
fi
VERBOSE="${1:-}"
PASS=0; FAIL=0; ERRORS=""
TMPFILE=$(mktemp); trap "rm -f $TMPFILE" EXIT
# Spec1/Spec2 bind activity-type `create`. TrigAct/TrigAct2 are
# DefineTrigger activities the fold hydrates from.
SETUP='S1 = trigger_registry:mk_spec(<<99,49>>, flow_a, undefined, any), S2 = trigger_registry:mk_spec(<<99,50>>, flow_b, undefined, any), TrigAct = [{type, define_trigger}, {actor, alice}, {id, <<99,49>>}, {object, [{activity_type, create}, {flow_name, flow_a}]}], TrigAct2 = [{type, define_trigger}, {actor, alice}, {id, <<99,50>>}, {object, [{activity_type, follow}, {flow_name, flow_c}]}], Note = [{type, note}, {actor, alice}, {object, [{content, hi}]}],'
cat > "$TMPFILE" <<EPOCHS
(epoch 1)
(load "lib/erlang/tokenizer.sx")
(load "lib/erlang/parser.sx")
(load "lib/erlang/parser-core.sx")
(load "lib/erlang/parser-expr.sx")
(load "lib/erlang/parser-module.sx")
(load "lib/erlang/transpile.sx")
(load "lib/erlang/runtime.sx")
(load "lib/erlang/vm/dispatcher.sx")
(epoch 2)
(eval "(er-load-gen-server!)")
(epoch 3)
(eval "(get (erlang-load-module (file-read \"next/kernel/envelope.erl\")) :name)")
(epoch 4)
(eval "(get (erlang-load-module (file-read \"next/kernel/trigger_registry.erl\")) :name)")
;; ── pure core ──────────────────────────────────────────────
(epoch 10)
(eval "(get (erlang-eval-ast \"trigger_registry:new() =:= []\") :name)")
;; add + lookup round-trip
(epoch 11)
(eval "(get (erlang-eval-ast \"${SETUP} St = trigger_registry:add(create, S1, trigger_registry:new()), trigger_registry:lookup(create, St) =:= [S1]\") :name)")
;; lookup with no match -> []
(epoch 12)
(eval "(get (erlang-eval-ast \"${SETUP} trigger_registry:lookup(create, trigger_registry:new()) =:= []\") :name)")
;; multi-bind: two specs on the same activity-type, both returned in order
(epoch 13)
(eval "(get (erlang-eval-ast \"${SETUP} St = trigger_registry:add(create, S2, trigger_registry:add(create, S1, trigger_registry:new())), trigger_registry:lookup(create, St) =:= [S1, S2]\") :name)")
;; remove by trigger cid
(epoch 14)
(eval "(get (erlang-eval-ast \"${SETUP} St = trigger_registry:add(create, S2, trigger_registry:add(create, S1, trigger_registry:new())), trigger_registry:lookup(create, trigger_registry:remove(<<99,49>>, St)) =:= [S2]\") :name)")
;; remove last spec for a type prunes the type
(epoch 15)
(eval "(get (erlang-eval-ast \"${SETUP} St = trigger_registry:add(create, S1, trigger_registry:new()), trigger_registry:remove(<<99,49>>, St) =:= []\") :name)")
;; spec accessors
(epoch 16)
(eval "(get (erlang-eval-ast \"${SETUP} {trigger_registry:spec_cid(S1), trigger_registry:spec_flow_name(S1), trigger_registry:spec_guard(S1), trigger_registry:spec_actor_scope(S1)} =:= {<<99,49>>, flow_a, undefined, any}\") :name)")
;; ── hydration fold ─────────────────────────────────────────
;; a DefineTrigger activity registers its binding
(epoch 20)
(eval "(get (erlang-eval-ast \"${SETUP} St = trigger_registry:fold(TrigAct, trigger_registry:new()), trigger_registry:lookup(create, St) =:= [trigger_registry:mk_spec(<<99,49>>, flow_a, undefined, any)]\") :name)")
;; a non-trigger activity passes through untouched
(epoch 21)
(eval "(get (erlang-eval-ast \"${SETUP} trigger_registry:fold(Note, trigger_registry:new()) =:= []\") :name)")
;; folding several Trigger activities rebuilds the whole registry
(epoch 22)
(eval "(get (erlang-eval-ast \"${SETUP} St = trigger_registry:fold(TrigAct2, trigger_registry:fold(TrigAct, trigger_registry:new())), {trigger_registry:lookup(create, St), trigger_registry:lookup(follow, St)} =:= {[trigger_registry:mk_spec(<<99,49>>, flow_a, undefined, any)], [trigger_registry:mk_spec(<<99,50>>, flow_c, undefined, any)]}\") :name)")
;; fold_fn/0 is a 2-arity fun
(epoch 23)
(eval "(get (erlang-eval-ast \"is_function(trigger_registry:fold_fn(), 2)\") :name)")
;; ── gen_server ─────────────────────────────────────────────
(epoch 30)
(eval "(get (erlang-eval-ast \"${SETUP} trigger_registry:start_link(), trigger_registry:add(create, S1), trigger_registry:lookup(create) =:= [S1]\") :name)")
(epoch 31)
(eval "(get (erlang-eval-ast \"trigger_registry:start_link(), trigger_registry:lookup(create) =:= []\") :name)")
(epoch 32)
(eval "(get (erlang-eval-ast \"${SETUP} trigger_registry:start_link(), trigger_registry:add(create, S1), trigger_registry:add(create, S2), trigger_registry:remove(<<99,49>>), trigger_registry:lookup(create) =:= [S2]\") :name)")
(epoch 33)
(eval "(get (erlang-eval-ast \"${SETUP} trigger_registry:start_link(), trigger_registry:add(create, S1), trigger_registry:add(follow, S2), trigger_registry:all_triggers() =:= [{create, [S1]}, {follow, [S2]}]\") :name)")
;; start_link/1 pre-populates from a hydrated state
(epoch 34)
(eval "(get (erlang-eval-ast \"${SETUP} St = trigger_registry:fold(TrigAct, trigger_registry:new()), trigger_registry:start_link(St), trigger_registry:lookup(create) =:= [trigger_registry:mk_spec(<<99,49>>, flow_a, undefined, any)]\") :name)")
EPOCHS
OUTPUT=$(timeout 300 "$SX_SERVER" < "$TMPFILE" 2>/dev/null)
check() {
local epoch="$1" desc="$2" expected="$3"
local actual
actual=$(echo "$OUTPUT" | awk -v e="$epoch" '
$0 ~ "^\\(ok-len " e " " { getline; print; exit }
$0 ~ "^\\(ok " e " " { print; exit }
$0 ~ "^\\(error " e " " { print; exit }
')
[ -z "$actual" ] && actual="<no output for epoch $epoch>"
if echo "$actual" | grep -qF -- "$expected"; then
PASS=$((PASS+1))
[ "$VERBOSE" = "-v" ] && echo " ok $desc"
else
FAIL=$((FAIL+1))
ERRORS+=" FAIL [$desc] (epoch $epoch) expected: $expected | actual: $actual
"
fi
}
check 4 "trigger_registry module loaded" "trigger_registry"
check 10 "new/0 -> []" "true"
check 11 "add + lookup round-trip" "true"
check 12 "lookup no match -> []" "true"
check 13 "multi-bind same type, ordered" "true"
check 14 "remove by trigger cid" "true"
check 15 "remove last prunes the type" "true"
check 16 "spec accessors" "true"
check 20 "fold registers a binding" "true"
check 21 "fold non-trigger passes through" "true"
check 22 "fold hydration rebuilds registry" "true"
check 23 "fold_fn/0 is fun/2" "true"
check 30 "gen_server add + lookup" "true"
check 31 "gen_server lookup no match -> []" "true"
check 32 "gen_server remove" "true"
check 33 "gen_server all_triggers" "true"
check 34 "start_link/1 pre-populates" "true"
TOTAL=$((PASS+FAIL))
if [ $FAIL -eq 0 ]; then
echo "ok $PASS/$TOTAL next/tests/trigger_registry.sh passed"
else
echo "FAIL $PASS/$TOTAL passed, $FAIL failed:"
echo "$ERRORS"
fi
[ $FAIL -eq 0 ]

134
next/tests/triggers_e2e.sh Executable file
View File

@@ -0,0 +1,134 @@
#!/usr/bin/env bash
# next/tests/triggers_e2e.sh — fed-sx triggers Phase 4 (end-to-end).
#
# The motivating blog-publish-digest flow, driven the whole way: a
# trigger binds Article-creates to the flow; the post-append fan-out
# starts it; the flow branches on :category, (for newsletters) suspends
# on a morning timer, fetches followers (injected), and emits a
# DigestSent activity object. Effect-as-data: the flow returns the
# emails + DigestSent object (a driver would dispatch/append them) since
# a flow can't call kernel gen_servers from inside the drive.
#
# Each epoch starts fresh gen_servers so instance ids are deterministic.
set -uo pipefail
cd "$(git rev-parse --show-toplevel)"
SX_SERVER="${SX_SERVER:-hosts/ocaml/_build/default/bin/sx_server.exe}"
if [ ! -x "$SX_SERVER" ]; then
SX_SERVER="/root/rose-ash/hosts/ocaml/_build/default/bin/sx_server.exe"
fi
if [ ! -x "$SX_SERVER" ]; then
echo "ERROR: sx_server.exe not found." >&2
exit 1
fi
VERBOSE="${1:-}"
PASS=0; FAIL=0; ERRORS=""
TMPFILE=$(mktemp); trap "rm -f $TMPFILE" EXIT
# Bring-up shared by every case: registry + store, a 3-follower mock,
# the flow registered as blog_digest, and a trigger binding `create`
# to it guarded on "the object is an Article". Cfg/AS as the fan-out
# expects. Activities differ by :category (urgent / newsletter / draft)
# plus a non-Article note.
BOOT='trigger_registry:start_link(), flow_store:start_link(), FF = fun(_) -> [f1, f2, f3] end, Flow = blog_publish_digest:build([{fetch_followers, FF}]), flow_store:register_flow(blog_digest, Flow), Guard = fun(A, _) -> case envelope:get_field(object, A) of {ok, O} -> envelope:get_field(type, O) =:= {ok, article}; _ -> false end end, trigger_registry:add(create, trigger_registry:mk_spec(<<116,99>>, blog_digest, Guard, any)), Cfg = [{trigger_registry, trigger_registry}], AS = [{actor_id, alice}],'
URGENT='[{type, create}, {actor, alice}, {id, <<117,49>>}, {object, [{type, article}, {category, urgent}]}]'
NEWS='[{type, create}, {actor, alice}, {id, <<110,49>>}, {object, [{type, article}, {category, newsletter}]}]'
DRAFT='[{type, create}, {actor, alice}, {id, <<100,49>>}, {object, [{type, article}, {category, draft}]}]'
NOTE='[{type, create}, {actor, alice}, {id, <<120,49>>}, {object, [{type, note}]}]'
cat > "$TMPFILE" <<EPOCHS
(epoch 1)
(load "lib/erlang/tokenizer.sx")
(load "lib/erlang/parser.sx")
(load "lib/erlang/parser-core.sx")
(load "lib/erlang/parser-expr.sx")
(load "lib/erlang/parser-module.sx")
(load "lib/erlang/transpile.sx")
(load "lib/erlang/runtime.sx")
(load "lib/erlang/vm/dispatcher.sx")
(epoch 2)
(eval "(er-load-gen-server!)")
(eval "(get (erlang-load-module (file-read \"next/kernel/envelope.erl\")) :name)")
(eval "(get (erlang-load-module (file-read \"next/flow/flow.erl\")) :name)")
(eval "(get (erlang-load-module (file-read \"next/flow/flow_spec.erl\")) :name)")
(eval "(get (erlang-load-module (file-read \"next/flow/flow_store.erl\")) :name)")
(eval "(get (erlang-load-module (file-read \"next/kernel/trigger_registry.erl\")) :name)")
(eval "(get (erlang-load-module (file-read \"next/kernel/flow_dispatch.erl\")) :name)")
(eval "(get (erlang-load-module (file-read \"next/kernel/pipeline.erl\")) :name)")
(epoch 3)
(eval "(get (erlang-load-module (file-read \"next/flow/flows/blog_publish_digest.erl\")) :name)")
;; ── urgent: fans out, completes in one cycle, 3 emails ─────
(epoch 10)
(eval "(get (erlang-eval-ast \"${BOOT} pipeline:apply_triggers(${URGENT}, AS, Cfg) =:= {ok, [{<<117,49>>, <<116,99>>, {ok, 1}}]}\") :name)")
(epoch 11)
(eval "(get (erlang-eval-ast \"${BOOT} pipeline:apply_triggers(${URGENT}, AS, Cfg), {ok, {done, {digest_sent, Emails, _}}} = flow_store:status(1), length(Emails) =:= 3\") :name)")
;; DigestSent emit object is well-formed (type, for the article, count)
(epoch 12)
(eval "(get (erlang-eval-ast \"${BOOT} pipeline:apply_triggers(${URGENT}, AS, Cfg), {ok, {done, {digest_sent, _, Digest}}} = flow_store:status(1), Digest =:= [{type, digest_sent}, {for, <<117,49>>}, {follower_count, 3}]\") :name)")
;; ── newsletter: suspends on the morning timer, then resumes ─
(epoch 20)
(eval "(get (erlang-eval-ast \"${BOOT} pipeline:apply_triggers(${NEWS}, AS, Cfg), flow_store:status(1) =:= {ok, {suspended, morning}}\") :name)")
;; advancing the clock (resume the timer) drives it to completion
(epoch 21)
(eval "(get (erlang-eval-ast \"${BOOT} pipeline:apply_triggers(${NEWS}, AS, Cfg), {ok, {flow_done, {digest_sent, Emails, _}}} = flow_store:resume(1, morning_ts), length(Emails) =:= 3\") :name)")
;; before resume no digest exists (still suspended, not done)
(epoch 22)
(eval "(get (erlang-eval-ast \"${BOOT} pipeline:apply_triggers(${NEWS}, AS, Cfg), case flow_store:status(1) of {ok, {done, _}} -> false; {ok, {suspended, morning}} -> true; _ -> false end\") :name)")
;; ── draft: the :else branch, no emails, no DigestSent ──────
(epoch 30)
(eval "(get (erlang-eval-ast \"${BOOT} pipeline:apply_triggers(${DRAFT}, AS, Cfg), flow_store:status(1) =:= {ok, {done, skipped}}\") :name)")
;; ── non-Article note: guard rejects, no flow dispatched ────
(epoch 40)
(eval "(get (erlang-eval-ast \"${BOOT} pipeline:apply_triggers(${NOTE}, AS, Cfg) =:= {ok, []}\") :name)")
;; ── dedup: the same activity arriving twice fires once ─────
(epoch 50)
(eval "(get (erlang-eval-ast \"${BOOT} pipeline:apply_triggers(${URGENT}, [{actor_id, alice}, {triggers_fired, [{<<117,49>>, <<116,99>>}]}], Cfg) =:= {ok, []}\") :name)")
EPOCHS
OUTPUT=$(timeout 360 "$SX_SERVER" < "$TMPFILE" 2>/dev/null)
check() {
local epoch="$1" desc="$2" expected="$3"
local actual
actual=$(echo "$OUTPUT" | awk -v e="$epoch" '
$0 ~ "^\\(ok-len " e " " { getline; print; exit }
$0 ~ "^\\(ok " e " " { print; exit }
$0 ~ "^\\(error " e " " { print; exit }
')
[ -z "$actual" ] && actual="<no output for epoch $epoch>"
if echo "$actual" | grep -qF -- "$expected"; then
PASS=$((PASS+1))
[ "$VERBOSE" = "-v" ] && echo " ok $desc"
else
FAIL=$((FAIL+1))
ERRORS+=" FAIL [$desc] (epoch $epoch) expected: $expected | actual: $actual
"
fi
}
check 3 "blog_publish_digest loaded" "blog_publish_digest"
check 10 "urgent fans out (audit triple)" "true"
check 11 "urgent: 3 emails dispatched" "true"
check 12 "urgent: DigestSent object emitted" "true"
check 20 "newsletter suspends on timer" "true"
check 21 "newsletter resumes -> 3 emails" "true"
check 22 "no digest before resume" "true"
check 30 "draft -> else branch, skipped" "true"
check 40 "non-Article note -> guard rejects" "true"
check 50 "duplicate activity fires once" "true"
TOTAL=$((PASS+FAIL))
if [ $FAIL -eq 0 ]; then
echo "ok $PASS/$TOTAL next/tests/triggers_e2e.sh passed"
else
echo "FAIL $PASS/$TOTAL passed, $FAIL failed:"
echo "$ERRORS"
fi
[ $FAIL -eq 0 ]

View File

@@ -0,0 +1,211 @@
; -*- mode: markdown -*-
# loops/host — fed-sx adapter slice (host side of host-type federation)
Scoped briefing for the follow-up that wires `loops/host`'s SX/dream
front door to the fed-sx kernel substrate landed by
`loops/fed-sx-types`. Companion to `plans/fed-sx-host-types.md` (the
substrate design + public surface). This is the build sheet for the
host-side adapters the substrate loop deliberately deferred.
```
description: loops/host — fed-sx adapter (publish/serve/ingest typed posts)
subagent_type: general-purpose
run_in_background: true
isolation: worktree # worktree at /root/rose-ash-loops/host
```
## Why this is small now
The substrate is done and tested (`origin/loops/fed-sx-types`, 4
phases). And the host already has *most of a type system*:
`lib/host/blog.sx` models a **type as a post** with a content-address
`:cid`, a `:schema` (`{:required [...]}`), `:fields`, `:template`, and a
**`subtype-of` graph over lib/relations**. So this loop is not building
a type model — it is **projecting the host's existing one onto fed-sx**
and ingesting peers' types back. The pieces line up almost 1:1:
| host (lib/host/blog.sx) | fed-sx (next/kernel) |
|--------------------------------------------|------------------------------------------|
| a type-post + `:schema {:required [...]}` | `DefineType` activity + refinement `{required, [...]}` |
| `subtype-of` edge (lib/relations) | `SubtypeOf` activity |
| `host/blog-by-cid` / `host/blog-type-defs` | `peer_types` cache + `GET /types/<cid>` |
| `host/blog-type-issues` (local validate) | `pipeline:apply_object_schema/2` (inbound) |
The host's `{:required [...]}` schema maps **directly** onto the
term_codec-safe `{required, [Field,...]}` refinement form the substrate
already validates — so schema translation is nearly trivial. Derive
both validators from the *same* schema data to avoid drift.
## Scope
**In scope**`lib/host/**` only (the mirror of fed-sx-types'
`next/**` only). New: `lib/host/fed_sx_outbox.sx`,
`lib/host/fed_sx_inbox.sx` (and a small shared `lib/host/fed_sx.sx`
bridge + a `host/types-routes`), plus `serve.sh` bring-up wiring and
`lib/host/tests/`.
**Out of scope**`next/**`. The substrate is frozen public surface;
do **not** edit `next/kernel/**` or the genesis verbs from here. If a
gap is found, file it against fed-sx-types, don't patch across the
boundary. **Hard line: do not edit `next/`.**
## Branch base
Start from `loops/host`, then bring in the substrate (clean, additive —
disjoint paths, no conflicts):
```bash
cd /root/rose-ash-loops/host
git fetch origin
git merge origin/loops/fed-sx-types # adds next/kernel + genesis + plan doc
```
After the merge the worktree has both `lib/host/**` (host) and
`next/kernel/**` (fed-sx substrate) on one branch.
## Phase 0 — settle the runtime boundary (DECIDE FIRST, blocks all else)
`lib/host` is **pure same-runtime SX** (one `sx_server.exe`: stdlib →
R7RS → APL → Datalog → ACL → Relations → Feed → Persist → Dream →
Host, per `serve.sh`). The fed-sx kernel is **Erlang-on-SX** on the
er-scheduler (`erlang-load-module` + gen_servers: `peer_types`,
`nx_kernel`). The host's dream handlers run on the native `http-listen`
accept loop — **outside** the er-scheduler. Calling a kernel
`gen_server:call` synchronously from a native-thread handler hits the
known scheduler-context deadlock (see
`plans/fed-sx-design.md` §, and the fed-prims http-listen note: a
handler on `Thread.create` outside er-sched can't complete a
`gen_server:call → receive`).
Two architectures; **the loop's first deliverable is choosing one with
a tiny spike**:
- **Option A — in-process Erlang bridge.** Host's sx_server also
`erlang-load-module`s the kernel and calls it directly. Pro: one
process, no serialization. Con: the deadlock above — kernel calls
must be marshalled onto the er-scheduler or restricted to pure
(non-gen_server) functions. Fragile; not recommended.
- **Option B — HTTP boundary (RECOMMENDED).** Run the fed-sx kernel
with its own `http_server`/`http-listen` loop (it already has the
whole route surface, and the m2 two-instance smoke test proves
HTTP federation between fed-sx nodes). The host talks to its local
fed-sx node **over localhost HTTP using the wire it already speaks**
(term_codec / activity+json / type-doc). This is literally what
federation is — the host is just another peer to its own node. An
`httpc`/localhost call from a native-thread host handler does **not**
touch the er-scheduler, so the deadlock never arises; the kernel's
own listen handler runs the gen_server calls within er-sched context.
Works whether the kernel is a sidecar process or spawned on an
er-scheduler process inside the host's sx_server (two ports, one
process). Pro: clean, reuses the fully-tested surface, no deadlock.
Con: serialization + lifecycle coordination.
**Recommendation: Option B.** Spike: host handler → `httpc` POST to the
local kernel's `/activity` → 200/cid back, with no hang. Lock the
decision before Phase 1.
## Phase 1 — outbox: project host types → DefineType / SubtypeOf
`lib/host/fed_sx_outbox.sx`. When a host type-post is created/updated
(`host/blog-put!` path), project it and publish to the local fed-sx
node:
- type-post → `DefineType` activity: `:object` = `{name: slug,
fields: (host/blog-fields-of slug), refinement-schema:
(host/blog-schema-of slug), instance-type: <base>}`. The host
`{:required [...]}` becomes the substrate `{required, [...]}` form
verbatim.
- each `subtype-of` edge (`relations/parents` over `"subtype-of"`) →
a `SubtypeOf` activity `{child-type-cid, parent-type-cid}`.
- publish via Phase 0's transport (POST `/activity` to the local node,
authed with the node's publish token).
**Key open decision — the type CID.** The host computes `:cid` via
`host/blog--cid-of` (double-hash over the canonical record); fed-sx
keys `peer_types` by a `TypeCid`. Either:
(a) **adopt the host `:cid` as the fed-sx TypeCid** — one identity,
no reconciliation, but peers can't content-verify it from the
wire bytes; or
(b) **let the kernel content-address the TypeRecord** — verifiable,
but the host must keep a `slug → fed-sx-cid` map (and
`SubtypeOf` edges must reference fed-sx CIDs, not host CIDs).
Pick one and document it in `plans/fed-sx-host-types.md`. (a) is
simpler and probably right for v1; revisit when cross-node verification
matters.
## Phase 2 — inbox: ingest peers' types + validate typed objects
`lib/host/fed_sx_inbox.sx`. Inbound from the local node's inbox:
- inbound `DefineType` → `peer_types:put` (cache it). Decide whether to
also **materialize** it as a host post (`host/blog-put!` +
`host/blog--set-schema!`) or keep federation-only types out of the
local blog (recommended for v1: cache-only, materialize on demand).
- inbound `SubtypeOf` → record the edge (peer_types hierarchy and/or
`host/blog-relate! child parent "subtype-of"` if materialized).
- inbound typed `Create` (a post that `is-a` some refinement type) →
the kernel inbound pipeline runs `pipeline:apply_object_schema/2`
(configured with a `type_index` + `{peer_types, peer_types}` +
`type_fetch_fn`), so a typed object is validated against its declared
type **before** the host sees it. Choose `strict_object_schema`
per-node (default false = open-world).
**Avoid double-validation drift:** the host already has
`host/blog-type-issues`. Let the **kernel validate federation inbound**
and the **host validate local writes**, both deriving from the same
`{:required [...]}` schema data — don't fork the rules.
## Phase 3 — serve + bring-up wiring
- **Serve `GET /types/<cid>`** on the host front door. Either proxy to
the kernel's `/types/<cid>` (Option B keeps one source of truth), or
serve directly from `host/blog-by-cid` + the projected TypeRecord.
Hook as `(dream-get "/types/:cid" host/types-by-cid)` and add
`host/types-routes` to the `host/serve` list (per `router.sx` /
`serve.sh` pattern).
- **Bring-up** in `serve.sh`: start the fed-sx node (Phase 0 transport),
start `peer_types`, configure `type_fetch_fn =
discovery_type_fetch:make_fetch_fn()` + a `type_url` resolver, and on
startup project existing host types (Phase 1) so the node is
type-aware from boot. Gate writes behind the existing
`host/require-auth` / `host/require-permission` middleware, same as
the relations write routes.
## Phase 4 — end-to-end round-trip test
Two nodes (host A + host B, or host + a sidecar fed-sx node): A defines
a refinement type → B fetches the type-doc via `GET /types/<cid>` → B
ingests an inbound typed object and `apply_object_schema` accepts the
valid one / rejects a refinement-failing one. Mirror the m2
two-instance smoke test style. Plus per-phase suites in
`lib/host/tests/` (the host runs its own `conformance.sh`).
## Tests discipline
- The host's `lib/host/conformance.sh` green before AND after every
commit. `lib/host` is **LIVE at blog.rose-ash.com** — pushing
`loops/host` reloads dev, so treat pushes as deliberate.
- Commits scoped to `lib/host/**` (+ `plans/fed-sx-host-types.md` as
decisions ratify). Do **not** edit `next/**`.
- One commit per phase; smaller intermediate commits fine if each
leaves the gate green. The Phase-0 spike can be its own commit.
## Done when
- A host type round-trips: defined locally → published as
`DefineType`/`SubtypeOf` → fetchable at `GET /types/<cid>` → a peer
validates an inbound typed object against it.
- `peer_types` is populated from inbound `DefineType`, and the inbound
pipeline rejects refinement-failing typed objects (strict node).
- The runtime-boundary decision and the type-CID decision are recorded
in `plans/fed-sx-host-types.md`.
- Host `conformance.sh` + the new fed-sx adapter suites green.
## Parallel-safety with loops/fed-sx-types
That loop owns `next/**` and is feature-complete; this loop owns
`lib/host/**`. Disjoint surfaces — they meet only at the merge that
brings the substrate in (Branch base, above). If this loop needs a
substrate change, file it against fed-sx-types rather than editing
`next/` here.

View File

@@ -1296,6 +1296,32 @@ inbox + pull from outbox. SSE is convenience, not protocol.
unknown verbs are stored-but-not-projected — safe by default, with explicit
operator control over what extensions load.
### 13.10 Activity-driven flow triggers (kernel convention)
Beyond projections (which fold an activity into read-model state), the kernel
supports firing **durable business flows** off arriving activities — the
"something happened → here is what we DO about it" half of the model. The
convention (substrate landed in `loops/fed-sx-types`, Phases 58):
- A `DefineTrigger{activity-type, flow-name, guard?, actor-scope?}` activity binds
an activity-type to a named flow. `trigger_registry` hydrates from a fold over
these (restart-safe, same content-addressing as `define-registry`).
- Fan-out runs **after** the kernel append, as the last pipeline step (§14):
`envelope → signature → activity-type schema → object schema → append → trigger
fan-out`. Only accepted activities fire flows; rejected ones never trigger.
- Fan-out is deduped per `{activity-cid, trigger-cid}` (federation can deliver the
same activity twice via different peers) using the actor's `:triggers_fired`
field, and is failure-isolated: one flow's failure never blocks the append or
the other flows.
- Flows run on **flow-on-erlang** (`next/flow/`), a native Erlang-on-SX durable
workflow engine (deterministic-replay suspend/resume; combinator algebra
mirrored from the Scheme `lib/flow`). It runs in the kernel's own runtime, so
the fan-out is a direct call — no cross-guest bridge. Because a flow runs inside
the engine's drive (where a blocking kernel call would deadlock the cooperative
scheduler), flows are **pure and describe effects as data** (their output, or a
`suspend`); a driver outside the flow performs IO and appends any follow-up
activity — which can in turn trigger further flows.
## 14. Validation pipeline
Every activity entering the substrate (whether published locally or received from a

112
plans/fed-sx-host-types.md Normal file
View File

@@ -0,0 +1,112 @@
; -*- mode: markdown -*-
# fed-sx host-type federation — substrate design + build log
How a host's typed-post graph (refinement types declared in
`lib/host`'s metamodel) flows across fed-sx nodes: a type is published
as a content-addressed `DefineType` activity, peers cache its record,
serve it over the wire, and validate inbound objects against the
declared refinement schema before appending them.
This document is both the design and the running build log for
`loops/fed-sx-types`. The companion build sheet is
`plans/agent-briefings/fed-sx-types-loop.md`.
## Vocabulary
- **Type record** — `{name, fields, refinement-schema, instance-type}`.
The parsed `:object` payload of a `DefineType` activity. Immutable
per CID: an updated type is a new CID (no in-place evolution).
- **Type CID** — content-address of the type record's wire form. The
stable handle a `SubtypeOf` edge or an object's `{type, _}` field
references.
- **Refinement schema** — a predicate over an object's field-values;
the extra constraint a refinement type adds on top of its base
`instance-type` (e.g. a `Post` is a `Note` whose `:title` is a
non-empty string).
## Scope
Substrate side only — everything under `next/**`. The host-side
adapters (`lib/host/fed_sx_outbox.sx`, `lib/host/fed_sx_inbox.sx`)
are a deliberate follow-up that consumes this branch's public surface
(`DefineType` / `SubtypeOf` verbs, `peer_types`, the `/types/<cid>`
route) once `loops/host`'s metamodel settles. **This loop does not
touch `lib/host/`.**
## Steps
### Step 1 — `DefineType` + `SubtypeOf` genesis activity-types — DONE
New `DefineActivity`-form genesis files, parsed as data by
`bootstrap.erl` at startup (no kernel change yet):
- `next/genesis/activity-types/define_type.sx` — declares the
`DefineType` verb. `:schema` accepts an activity whose `:object`
carries a string `:name` and an optional list `:fields`.
- `next/genesis/activity-types/subtype_of.sx` — declares the
`SubtypeOf` verb. `:schema` accepts an `:object` carrying both
`:child-type-cid` and `:parent-type-cid` as strings.
Schema bodies are SX source written with nested `get` (not
keyword-threading) so they are directly evaluatable: keywords are not
callable getters in the kernel and `(-> d :k)` does not get. Both are
registered in `next/genesis/manifest.sx` (activity-types now 7) and the
bundle counts in the bootstrap suites were bumped accordingly.
Tests: `next/tests/define_type.sh`, `next/tests/subtype_of.sh` — parse
shape, schema accept/reject, and a `term_codec` envelope round-trip.
### Step 2 — `peer_types.erl` receiver-side cache — DONE
`next/kernel/peer_types.erl`, a mirror of `peer_actors.erl` keyed by
type CID. State `[{TypeCidBytes, TypeRecord}, ...]`. Pure API
(`new/2`-threaded `lookup`/`store`/`evict`/`types`/`lookup_or_fetch`)
plus a registered gen_server (`put`, `lookup`, `state_for`,
`known_types`, `lookup_or_fetch`). On a miss `lookup_or_fetch` pulls a
Cfg-supplied `type_fetch_fn :: fun ((TypeCid, Cfg) -> {ok, Bytes} |
{error, _})`, decodes the wire bytes via `term_codec`, and caches the
record. No fn → `{error, no_fetch_fn}`; fetch error or bad bytes do not
poison the cache. Test: `next/tests/peer_types.sh`.
### Step 3 — `/types/<cid>` route + `discovery_type_fetch.erl` — DONE
`http_server.erl` serves `GET /types/<cid>` with
`Accept: application/vnd.fed-sx.type-doc`: the cached TypeRecord
`term_codec`-encoded, 404 if not cached. `discovery_type_fetch.erl`
holds the live-HTTP closure that `peer_types:lookup_or_fetch` calls.
Tests: `next/tests/peer_types_route.sh`, `next/tests/discovery_type_fetch.sh`.
### Step 4 — object-schema validation stage in `pipeline.erl` — DONE
`pipeline:apply_object_schema/2` (+ `stage_object_schema/1` factory)
sits between activity-type validation and the kernel append. When an
inbound object carries `{type, TypeName}`, resolve the TypeRecord
(Cfg `type_index`: TypeName → TypeCid; then
`peer_types:lookup_or_fetch/2`) and apply its refinement schema to the
object's `:field_values`. The schema is either a 1-arity Erlang
predicate (the substrate stand-in, for locally-defined types) or a
term_codec-safe `{required, [Field, ...]}` data constraint (so a
wire-fetched record validates too). Default `strict_object_schema =
false`: an unresolvable type is let through (the non-strict skip is
where a `validation_skipped` log belongs); opt-in strict rejects.
Objects with no declared type, and type names absent from the local
index, are skipped (open-world). Test: `next/tests/object_schema.sh`.
## Out of scope (deliberately)
- Host-side outbox/inbox adapters (`lib/host/**`).
- Type evolution / version migration — schemas are immutable per CID;
the "name → currently-valid CID" routing layer is a separate problem.
- Subtype-of unification / rendering across nodes — the graph data
lands via `SubtypeOf` activities; dedup/display is a consumer concern.
## What the host-side adapter loop gets
Once all four steps land, the follow-up `loops/host` adapter work can
treat the following as stable public surface:
- `DefineType` / `SubtypeOf` activity verbs (publish a type, link two).
- `peer_types` gen_server (cache a peer's type, look it up).
- `GET /types/<cid>` (serve a type the node knows).
- `pipeline`'s object-schema stage (inbound objects validated against
their declared refinement type when resolvable).

View File

@@ -562,10 +562,24 @@ a dead-letter list visible via `/admin/dead-letter`.
is cleared from `:next_retry`. `record_success_pure` clears
both. `next_due_pure` returns cids whose retry time has
passed. 11 cases in `delivery_retry.sh`.
- [ ] **8b-timer** — Erlang-side timer wiring (`erlang:send_after`
self-cast or equivalent). Needs the same substrate primitive
that `gen_server` uses for `timeout` returns. Defer behind
substrate gap discovery for now — see Blockers.
- [x] **8b-timer** — Erlang-side timer wiring on the
`delivery_worker` gen_server. handle_call(flush) drains then
arms a `send_after` self-cast per retried Cid (backoff from
the now-bumped attempt counter); handle_info({retry, Cid})
redrives that single Cid through deliver_one_pure. Success
clears bookkeeping via record_success; failure bumps attempts
via record_failure_pure and arms the next backoff slot — or
promotes to dead-letter on the 6th attempt and stops arming.
A `:timers [{Cid, Ref}]` state field tracks live refs so
schedule_retry_for can cancel the previous one before arming
the next (otherwise stale timers keep the scheduler's run
loop alive long after the work is done). 5/5 in
`delivery_retry_timer.sh`: T1 timer scheduled, T2 attempts=1,
T3 retry fires + attempts=2, T4 next timer rearmed, T5 ets-
counter dispatch (fail/fail/ok) lands in 3 attempts and
clears state. Substrate dependency landed via cherry-pick
from `loops/erlang` (3709460d / 98b0104c / 779e53b2) until
`loops/erlang` → architecture catches up.
- [x] **8c** — Delivery-state projection
(`next/kernel/delivery_state.erl`). Folds delivery events into
per-peer worker-shaped snapshots so the outbound queue survives
@@ -1105,8 +1119,16 @@ proceed.
through `delivery_worker`) and Step 10c (peer-actor doc
fetch in `peer_actors`) are now unblocked.
3. **`erlang:send_after`-style timer primitive** — discovered
during Step 8b prep. The retry loop needs a way for the
3. **`erlang:send_after`-style timer primitive** — ~~discovered
during Step 8b prep~~ **RESOLVED 2026-06-30** via the
`loops/erlang` `send_after`/`cancel_timer`/`monotonic_time`
work landing on `origin/loops/erlang` (commits 3709460d,
98b0104c, b10e55f0; 766/766 → 771/771). m2 cherry-picked all
three onto this branch so 8b-timer could land without waiting
for `loops/erlang` → architecture; the cherry-picks fall away
as no-op duplicates when architecture catches up. Original
diagnosis preserved below for the audit trail.
The retry loop needs a way for the
delivery_worker to wake itself up after `backoff_for(N)`
seconds. Erlang's `erlang:send_after/3` is the standard
primitive; this port doesn't seem to register it (looked at
@@ -1241,6 +1263,31 @@ proceed.
Newest first.
- **2026-06-30** — Step 8b-timer closed. Cherry-picked the three
`loops/erlang` send_after commits onto m2 (3709460d, 98b0104c,
779e53b2 — the substrate landed standalone on origin/loops/erlang
earlier and hadn't propagated to origin/architecture yet). Wired
the live timer loop in `next/kernel/delivery_worker.erl`: a
`:timers [{Cid, Ref}]` state field; `handle_call(flush)` drains
then arms a `send_after` self-cast per retried Cid; the new
`handle_info({retry, Cid})` callback redrives that one Cid through
`deliver_one_pure` and either records success / clears state, or
bumps and arms the next backoff slot (or dead-letters on the 6th
attempt). Two arm-paths split — `arm_retry_timer` (post-drain,
attempts already bumped) vs `schedule_retry_for` (post-retry
attempt, needs to bump). `cancel_timer_for/1` clears the previous
timer before arming the next so stale timers don't keep the
scheduler's run loop alive after the work is done. Two new public
APIs for tests: `state_srv/1` returns the worker's full state,
`timer_ref_for/2` looks up a Cid's live ref. 5/5 in new
`delivery_retry_timer.sh` (T1 timer scheduled, T2 attempts=1, T3
retry fires + attempts=2, T4 next timer rearmed, T5 ets-counter
dispatch fail/fail/ok lands in 3 attempts and clears state).
Existing `delivery_worker.sh` 17/17 and `delivery_retry.sh` 11/11
still green. Conformance gate 771/771 (was 761/761; the +10 is
the cherry-picked send_after suite). Blockers #3 RESOLVED.
Reply shape of `flush` unchanged; no caller updates needed.
- **2026-06-28** — Merge-prep pass. Conformance 761/761 still green
on m2 tip `cd0de8cb`. Both smoke tests still pass cold:
`next/tests/smoke_kernel_route.sh` 6/6 (port 54471, listener up