Compare commits

..

9 Commits

Author SHA1 Message Date
6c9b96390f fed-sx-types Phase 8: blog-publish-digest e2e + flow:wait
Some checks failed
Test, Build, and Deploy / test-build-deploy (push) Failing after 58s
The motivating end-to-end demonstration (fed-sx-triggers-loop.md Phase
4): one trigger arriving in the pipeline drives a multi-step business
flow with a branch, a timer suspension, an injected effect, and a
follow-up activity emit — all in the kernel's own runtime.

- flow.erl: flow:wait/1 — a timer-style suspend that PRESERVES the value
  on resume (vs flow:suspend/1, which returns the logged result), so a
  "wait until morning" step lets the env flow through to later steps.
- next/flow/flows/blog_publish_digest.erl: the flow. Branches on the
  article :category (newsletter -> wait-until-morning -> send + emit;
  urgent -> send + emit now; else -> skip), fetches followers (injected),
  builds a digest email per follower, and emits a DigestSent activity
  OBJECT. Effect-as-data: a flow can't call kernel gen_servers from
  inside the drive (a blocking call there deadlocks the scheduler), so
  it returns the emails + DigestSent object for a driver to dispatch and
  append — which can then trigger downstream flows, closing the loop.

Test: triggers_e2e.sh (10) — urgent completes in one cycle with 3 emails
+ a DigestSent object; newsletter suspends on the morning timer, then
resumes to the same on "advancing the clock"; draft takes the else
branch (no emails); a non-Article note is rejected by the guard; a
duplicate activity fires once. flow:wait covered in next/flow (36/36).

plans/fed-sx-design.md §13.10 documents the trigger fan-out as a
kernel convention. lib/erlang 771/771.

Co-Authored-By: Claude Opus 4.8 (1M context) <noreply@anthropic.com>
2026-06-30 18:31:26 +00:00
6b4850b34e fed-sx-types Phase 7: pipeline trigger fan-out + flow_dispatch
Some checks failed
Test, Build, and Deploy / test-build-deploy (push) Failing after 44s
The post-append fan-out that fires durable flows from arriving
activities (fed-sx-triggers-loop.md Phases 2+3), native into next/flow
— no cross-guest FFI.

- pipeline.erl: apply_triggers/3 runs AFTER the kernel append (rejected
  activities never reach it). It looks the activity's type up in the
  trigger registry, drops specs whose guard/actor-scope fails or whose
  {activity_cid, trigger_cid} pair already fired (federation can deliver
  the same activity twice — dedup is keyed on that pair, read from the
  actor's :triggers_fired), and dispatches the rest. Returns the audit
  triples for the kernel to fold into :triggers_fired + its projection.
  Must not be called inside a `try` (it does gen_server:calls, which
  deadlock the scheduler inside a try); running post-append in its own
  step satisfies that.
- flow_dispatch.erl: bridges a matched trigger to flow_store:start, with
  the activity bound into the flow's input env. guard_passes/3 gates on
  actor-scope + guard. Failures (unknown flow, crashing first step) come
  back as {error, _}, never raised — one flow can't take down the rest.
- flow_store.erl: drive wrapped in try (the drive is pure, so the try is
  safe) so a flow whose step raises yields {error, {flow_crashed, _}}
  instead of crashing the store.

Tests: flow_dispatch.sh (12), pipeline_triggers.sh (10). lib/erlang
771/771, next/flow 34/34.

Co-Authored-By: Claude Opus 4.8 (1M context) <noreply@anthropic.com>
2026-06-30 18:22:50 +00:00
fc6a47ad62 fed-sx-types Phase 6: DefineTrigger verb + trigger_registry
Some checks failed
Test, Build, and Deploy / test-build-deploy (push) Failing after 53s
The trigger declaration layer (fed-sx-triggers-loop.md Phase 1): bind an
activity-type to a durable flow so an arriving activity can fan out into
a business flow.

- next/genesis/activity-types/define_trigger.sx — the DefineTrigger verb
  (DefineActivity form, nested-get schema). :object carries
  :activity-type, :flow-name, optional :guard / :actor-scope.
- next/kernel/trigger_registry.erl — pure core + registered gen_server,
  mirroring peer_actors/peer_types. Keyed by activity-type, multiple
  specs per type fire independently. Spec = {TriggerCid, FlowName,
  Guard, ActorScope}. Hydrates on start from a fold over DefineTrigger
  activities (restart-safe, same content-addressing as define_registry).

Manifest activity-types 7->8 (total bundle 38->39); the four bootstrap
count suites + genesis_parse bumped, and bootstrap_load's internal
timeout raised (the larger bundle's double cid:to_string was truncating).

Tests: define_trigger.sh (6), trigger_registry.sh (17). lib/erlang
771/771 + next/flow 34/34 untouched.

Co-Authored-By: Claude Opus 4.8 (1M context) <noreply@anthropic.com>
2026-06-30 18:18:22 +00:00
8b3d92ed5f fed-sx-types Phase 5: flow-on-erlang engine core (next/flow/)
Some checks failed
Test, Build, and Deploy / test-build-deploy (push) Failing after 49s
A native Erlang-on-SX durable workflow engine, so the fed-sx kernel can
fan activities out into business flows in its own runtime — no cross-
guest FFI to the Scheme lib/flow, no marshalling, no Scheme dependency.
The seed of a real engine (chosen over bridging Scheme flow) that can
later supersede it for substrate use.

- flow.erl — the deterministic-replay driver. Same durability model as
  the Scheme engine (re-run from the top; effects go through suspend;
  the replay log is plain [{Tag,Value}] data, restart-ready), but
  adapted to three hard runtime constraints: no re-enterable
  continuation, no process dictionary, and a blocking receive inside a
  `try` deadlocks the cooperative scheduler. Resolution: thread the log
  through a railway-style context and make suspend SHORT-CIRCUIT (like a
  fail value) instead of throwing — purely functional, sidesteps all
  three. Ctx = {flow_cont,V,Log} | {flow_susp,Tag,Log}.
- flow_spec.erl — combinator algebra mirrored from lib/flow/spec.sx:
  leaves, sequence/parallel/map_flow, flow_while/flow_until, branch,
  railway fail/recover/attempt, tap, try_catch/retry.
- flow_store.erl — durable gen_server: named-flow registry + instance
  table + start/resume/status. Drives the pure flow from handle_call,
  so no gen_server:call is ever inside the replay try-path.

Gate: next/flow/conformance.sh — 34/34. lib/erlang untouched (771/771).
See next/flow/README.md for the model + why railway threading.

Co-Authored-By: Claude Opus 4.8 (1M context) <noreply@anthropic.com>
2026-06-30 17:51:15 +00:00
bba2d7e5cd fed-sx-types: briefing for the host-side fed-sx adapter loop
Some checks failed
Test, Build, and Deploy / test-build-deploy (push) Failing after 42s
Companion to plans/fed-sx-host-types.md. Build sheet for the deferred
lib/host adapter slice (fed_sx_outbox / fed_sx_inbox): projects the
host's existing type-post metamodel (blog.sx: :cid, :schema, subtype-of
graph) onto the fed-sx DefineType/SubtypeOf verbs, ingests peers' types
into peer_types, validates inbound typed objects via
pipeline:apply_object_schema/2, and serves GET /types/<cid>.

Surfaces the two gating decisions for loops/host: the SX-host <->
Erlang-on-SX runtime boundary (recommends an HTTP boundary to dodge the
er-scheduler gen_server:call deadlock) and the type-CID identity choice.
Scope is the inverse of this loop: lib/host/** only, no next/ edits.

Co-Authored-By: Claude Opus 4.8 (1M context) <noreply@anthropic.com>
2026-06-30 17:05:53 +00:00
89dd23c287 fed-sx-types Phase 4: object-schema validation stage in pipeline
Some checks failed
Test, Build, and Deploy / test-build-deploy (push) Failing after 54s
pipeline:apply_object_schema/2 (+ stage_object_schema/1 factory) — the
object-schema stage between activity-type validation and the kernel
append (plans/fed-sx-host-types.md step 4). When an inbound activity's
:object declares a refinement type ({type, TypeName}), resolve it
(Cfg type_index: TypeName -> TypeCid; then peer_types:lookup_or_fetch/2,
a local hit or a wire fetch) and apply the record's refinement schema
to the object's :field_values, rejecting on schema-fail with
{error, {validation_failed, object_schema}}.

The schema is either a 1-arity Erlang predicate (substrate stand-in,
locally stored) or a term_codec-safe {required, [Field,...]} constraint
(so a wire-fetched record validates too). Default
strict_object_schema = false: an unresolvable type is let through (the
skip is where a validation_skipped log belongs); strict rejects.
Objects with no declared type, and names absent from the local index,
are skipped (open-world).

Test: next/tests/object_schema.sh (15) — local hit, wire fetch, fetch
failure strict/non-strict, no peer_types, untyped object, undeclared
name, fun + data schema forms, no-schema record, stage composition.

No regression: pipeline_signature, pipeline_driver green. Plan doc
steps 1-4 marked done.

Co-Authored-By: Claude Opus 4.8 (1M context) <noreply@anthropic.com>
2026-06-30 15:50:45 +00:00
441a895737 fed-sx-types Phase 3: /types/<cid> route + discovery_type_fetch
Some checks failed
Test, Build, and Deploy / test-build-deploy (push) Failing after 51s
Wire format for serving + fetching type docs (plans/fed-sx-host-types.md
step 3).

http_server.erl:
- new type_doc Accept format + content type
  (application/vnd.fed-sx.type-doc), distinct from actor-doc.
- GET /types/<cid> -> the cached TypeRecord term_codec-encoded, 404 if
  not in the peer_types cache. Reads peer_types via a Cfg
  {peer_types, peer_types} guard (hardcoded registered atom, mirroring
  the actor-doc route's kernel guard).

discovery_type_fetch.erl — sibling of discovery_fetch. make_fetch_fn
produces the fun/2 peer_types:lookup_or_fetch calls: GET
<base>/types/<cid> with the type-doc Accept header, returning the RAW
bytes (peer_types owns the term_codec decode, so the wire format lives
in one place — the route encodes, the cache decodes). Cfg carries
type_url / type_url_fn for TypeCid -> base URL resolution.

Tests: next/tests/peer_types_route.sh (13, in-process route dispatch),
next/tests/discovery_type_fetch.sh (9, closure vs a python type-doc
stub, end-to-end through peer_types:lookup_or_fetch).

No regression: http_accept, http_actors, http_get_format,
discovery_fetch all still green. Conformance 771/771.

Co-Authored-By: Claude Opus 4.8 (1M context) <noreply@anthropic.com>
2026-06-30 15:48:33 +00:00
8d54028c7f fed-sx-types Phase 2: peer_types.erl receiver-side cache
Some checks failed
Test, Build, and Deploy / test-build-deploy (push) Failing after 39s
next/kernel/peer_types.erl — a mirror of peer_actors keyed by type CID
(plans/fed-sx-host-types.md step 2). State [{TypeCidBytes, TypeRecord}],
where TypeRecord is the parsed DefineType :object payload. Refinement
schemas are immutable per CID, so cache entries never go stale.

Pure API: new/0, lookup/2, store/3, evict/2, types/1, lookup_or_fetch/3.
gen_server API (registered `peer_types`): put/2, lookup/1, state_for/1,
known_types/0, lookup_or_fetch/2, start_link/0,1.

lookup_or_fetch pulls a Cfg-supplied
  type_fetch_fn :: fun ((TypeCid, Cfg) -> {ok, Bytes} | {error, _})
on a miss, decodes the wire bytes via term_codec into the TypeRecord,
and caches it. No fn -> {error, no_fetch_fn}; fetch error / bad bytes
don't poison the cache (caller can retry). Keeping transport in the
closure (Phase 3 discovery_type_fetch) keeps the cache testable.

Test: next/tests/peer_types.sh (18) — pure + gen_server surface, fetch
miss/hit, no-fn, error-no-poison, undecodable-bytes, prepopulate.

Conformance 771/771.

Co-Authored-By: Claude Opus 4.8 (1M context) <noreply@anthropic.com>
2026-06-30 15:30:47 +00:00
5959a97dca fed-sx-types Phase 1: DefineType + SubtypeOf genesis verbs
Some checks failed
Test, Build, and Deploy / test-build-deploy (push) Failing after 21s
Two new DefineActivity-form genesis activity-types for host-type
federation (plans/fed-sx-host-types.md step 1):

- next/genesis/activity-types/define_type.sx — DefineType verb; schema
  accepts an :object with a string :name and optional list :fields.
- next/genesis/activity-types/subtype_of.sx — SubtypeOf verb; schema
  accepts an :object carrying string :child-type-cid + :parent-type-cid.

Schema bodies use nested `get` (not keyword-threading) so they are
directly evaluatable — keywords are not callable getters in the kernel.
Both registered in manifest.sx (activity-types now 7); the four bootstrap
suites' bundle counts bumped (5->7, total 36->38).

Tests: next/tests/define_type.sh (7), subtype_of.sh (6) — parse shape,
schema accept/reject, term_codec envelope round-trip.

Also load follower_graph + delivery in bootstrap_start.sh: its check-26
publish path exercises outbox:compute_delivery_set/3 (follower_graph:new
+ delivery:delivery_set), which an m2 substrate change had left unloaded
in that suite — a pre-existing red unrelated to the count bump.

Conformance 771/771; all touched next/tests green.

Co-Authored-By: Claude Opus 4.8 (1M context) <noreply@anthropic.com>
2026-06-30 15:30:21 +00:00
35 changed files with 3689 additions and 12 deletions

91
next/flow/README.md Normal file
View File

@@ -0,0 +1,91 @@
# flow-on-erlang — durable workflows in the fed-sx runtime
A native Erlang-on-SX port of the Scheme flow engine (`lib/flow`), so
the fed-sx kernel can fan arriving activities out into durable,
branching, multi-step business flows **in its own runtime** — no
cross-guest FFI, no marshalling, no Scheme dependency. The seed of a
real engine that can later supersede the Scheme one for substrate use.
Run the suite: `bash next/flow/conformance.sh` → engine conformance.
## Model
A **flow** is an Erlang `fun(Ctx) -> Ctx`. Combinators (`flow_spec`)
compose flows; user code stays value-level (the functions you hand to
`flow_node`/`branch`/… take and return plain values). A flow that
ignores its input is a thunk; composition *is* function composition.
```erlang
F = flow_spec:sequence([
flow_spec:flow_node(fun(Draft) -> Draft + 1 end),
flow_spec:branch(fun(P) -> P >= 3 end,
flow_spec:flow_const(ok),
flow_spec:flow_const(rejected))]),
flow:run(F, 2) %% => {flow_done, ok}
```
## Durability — deterministic replay
Same semantics as the Scheme engine: a flow re-runs from the top on
every resume; effects/non-determinism go through `flow:suspend/1`,
whose resolved values are logged; an already-resolved suspend replays
its logged value, and the first unresolved suspend short-circuits back
to the driver. The persisted state is the **replay log** — plain
`[{Tag, Value}]` data — so nothing live (no continuation, no process)
is ever serialized; an instance survives restart by re-driving its
named flow against its log.
```erlang
flow_store:register_flow(publish, F),
{ok, Id, R} = flow_store:start(publish, Draft), %% R = {flow_suspended, Tag} | {flow_done, V}
%% ... driver performs the effect for Tag, then:
flow_store:resume(Id, EffectResult) %% re-drives; completes or suspends again
```
## Why railway threading instead of call/cc + a global
The Scheme engine uses an escape-only `call/cc` plus a mutable global
replay log. This Erlang-on-SX runtime can't do either, and has a third
sharp edge:
- **No re-enterable continuation** — but suspend only needs to *escape*,
which Erlang `throw` could do …
- **… except a blocking `receive` / `gen_server:call` inside a `try`
deadlocks** the cooperative scheduler. So `suspend` must not consult
the log via a registry process while inside a `try`.
- **No process dictionary** — so there is no ambient per-process slot to
stash the replay log in.
The resolution: thread the replay log through a railway-style **context**
and make `suspend` *short-circuit* (like a `fail` value) rather than
throw. No ambient state, no throw, no gen_server in the hot path —
purely functional, which sidesteps all three constraints. The driver
(`flow_store`) is the only stateful part, and it calls the pure
`flow:drive/3` from inside `handle_call`, never wrapping a blocking
receive.
A `Ctx` is `{flow_cont, Value, Log}` (running) or `{flow_susp, Tag,
Log}` (short-circuited); every combinator passes a suspended context
straight through.
## Modules
| Module | Role |
|---|---|
| `flow.erl` | pure replay driver: `drive/3`, `run/2`, `suspend/1`, the `Ctx` constructors/accessors |
| `flow_spec.erl` | combinator algebra: leaves, `sequence`/`parallel`/`map_flow`, `flow_while`/`flow_until`, `branch`, railway `fail`/`recover`/`attempt`, `tap`, `try_catch`/`retry` |
| `flow_store.erl` | durable gen_server: named-flow registry + instance table + `start`/`resume`/`status` |
## Consumed by
The fed-sx kernel's trigger fan-out (`pipeline.erl` + `flow_dispatch`)
starts named flows from arriving activities; see
`plans/fed-sx-host-types.md` and the triggers phases.
## Not yet (later layers)
- Persisting instance logs to the kernel's durable on-disk log (the
data shape is already restart-ready; only the backing is in-memory).
- `parallel` with multiple independent suspends resolving concurrently
(current `parallel` is sequential under one shared log).
- Full parity with the Scheme engine's distributed/remote nodes.

206
next/flow/conformance.sh Executable file
View File

@@ -0,0 +1,206 @@
#!/usr/bin/env bash
# next/flow/conformance.sh — flow-on-erlang engine conformance.
#
# Exercises the native Erlang-on-SX durable workflow engine
# (next/flow/{flow,flow_spec,flow_store}.erl): the combinator algebra,
# the deterministic-replay suspend/resume core, and the durable store.
# This is the gate for the engine, replacing lib/flow/conformance.sh
# (the Scheme engine) for the fed-sx substrate — the kernel's trigger
# fan-out drives flows in its own runtime, with no cross-guest FFI.
set -uo pipefail
cd "$(git rev-parse --show-toplevel)"
SX_SERVER="${SX_SERVER:-hosts/ocaml/_build/default/bin/sx_server.exe}"
if [ ! -x "$SX_SERVER" ]; then
SX_SERVER="/root/rose-ash/hosts/ocaml/_build/default/bin/sx_server.exe"
fi
if [ ! -x "$SX_SERVER" ]; then
echo "ERROR: sx_server.exe not found." >&2
exit 1
fi
VERBOSE="${1:-}"
PASS=0; FAIL=0; ERRORS=""
TMPFILE=$(mktemp); trap "rm -f $TMPFILE" EXIT
# Common combinator shorthands built per-epoch (Erlang locals don't
# survive across erlang-eval-ast calls; the gen_server state does).
N1='flow_spec:flow_node(fun(X) -> X + 1 end)'
N2='flow_spec:flow_node(fun(X) -> X * 2 end)'
SUSP_FLOW='flow_spec:sequence([flow_spec:flow_node(fun(X) -> X + 1 end), flow:suspend(wait1), flow_spec:flow_node(fun(V) -> {resumed, V} end)])'
TWO_SUSP='flow_spec:sequence([flow:suspend(a), flow_spec:flow_node(fun(V) -> V * 10 end), flow:suspend(b), flow_spec:flow_node(fun(V) -> V + 1 end)])'
cat > "$TMPFILE" <<EPOCHS
(epoch 1)
(load "lib/erlang/tokenizer.sx")
(load "lib/erlang/parser.sx")
(load "lib/erlang/parser-core.sx")
(load "lib/erlang/parser-expr.sx")
(load "lib/erlang/parser-module.sx")
(load "lib/erlang/transpile.sx")
(load "lib/erlang/runtime.sx")
(load "lib/erlang/vm/dispatcher.sx")
(epoch 2)
(eval "(er-load-gen-server!)")
(epoch 3)
(eval "(get (erlang-load-module (file-read \"next/flow/flow.erl\")) :name)")
(epoch 4)
(eval "(get (erlang-load-module (file-read \"next/flow/flow_spec.erl\")) :name)")
(epoch 5)
(eval "(get (erlang-load-module (file-read \"next/flow/flow_store.erl\")) :name)")
;; ── leaves ─────────────────────────────────────────────────
(epoch 10)
(eval "(get (erlang-eval-ast \"flow:run(flow_spec:flow_id(), 7) =:= {flow_done, 7}\") :name)")
(epoch 11)
(eval "(get (erlang-eval-ast \"flow:run(flow_spec:flow_const(k), 7) =:= {flow_done, k}\") :name)")
(epoch 12)
(eval "(get (erlang-eval-ast \"flow:run(${N1}, 41) =:= {flow_done, 42}\") :name)")
;; ── threading / fan-out / iteration ────────────────────────
(epoch 20)
(eval "(get (erlang-eval-ast \"flow:run(flow_spec:sequence([${N1}, ${N2}]), 3) =:= {flow_done, 8}\") :name)")
(epoch 21)
(eval "(get (erlang-eval-ast \"flow:run(flow_spec:parallel([flow_spec:flow_const(a), flow_spec:flow_const(b)]), 0) =:= {flow_done, [a, b]}\") :name)")
(epoch 22)
(eval "(get (erlang-eval-ast \"flow:run(flow_spec:map_flow(${N1}), [1, 2, 3]) =:= {flow_done, [2, 3, 4]}\") :name)")
(epoch 23)
(eval "(get (erlang-eval-ast \"flow:run(flow_spec:flow_while(fun(X) -> X < 10 end, ${N1}, 100), 0) =:= {flow_done, 10}\") :name)")
(epoch 24)
(eval "(get (erlang-eval-ast \"flow:run(flow_spec:flow_until(fun(X) -> X >= 5 end, ${N1}, 100), 0) =:= {flow_done, 5}\") :name)")
(epoch 25)
(eval "(get (erlang-eval-ast \"flow:run(flow_spec:flow_while(fun(_) -> true end, ${N1}, 3), 0) =:= {flow_done, 3}\") :name)")
;; ── branching ──────────────────────────────────────────────
(epoch 30)
(eval "(get (erlang-eval-ast \"flow:run(flow_spec:branch(fun(X) -> X > 0 end, flow_spec:flow_const(pos), flow_spec:flow_const(neg)), 5) =:= {flow_done, pos}\") :name)")
(epoch 31)
(eval "(get (erlang-eval-ast \"flow:run(flow_spec:branch(fun(X) -> X > 0 end, flow_spec:flow_const(pos), flow_spec:flow_const(neg)), -5) =:= {flow_done, neg}\") :name)")
;; ── railway failure ────────────────────────────────────────
(epoch 40)
(eval "(get (erlang-eval-ast \"flow_spec:failed(flow_spec:fail(x)) andalso (flow_spec:failed(42) =:= false)\") :name)")
(epoch 41)
(eval "(get (erlang-eval-ast \"flow:run(flow_spec:attempt([flow_spec:flow_node(fun(_) -> flow_spec:fail(boom) end), flow_spec:flow_node(fun(_) -> 999 end)]), 0) =:= {flow_done, {flow_fail, boom}}\") :name)")
(epoch 42)
(eval "(get (erlang-eval-ast \"flow:run(flow_spec:attempt([${N1}, ${N2}]), 3) =:= {flow_done, 8}\") :name)")
(epoch 43)
(eval "(get (erlang-eval-ast \"flow:run(flow_spec:recover(flow_spec:flow_node(fun(_) -> flow_spec:fail(bad) end), fun(R) -> {ok, R} end), 0) =:= {flow_done, {ok, bad}}\") :name)")
;; ── effects / exceptions ───────────────────────────────────
(epoch 50)
(eval "(get (erlang-eval-ast \"flow:run(flow_spec:tap(fun(_) -> ok end), 7) =:= {flow_done, 7}\") :name)")
(epoch 51)
(eval "(get (erlang-eval-ast \"flow:run(flow_spec:try_catch(flow_spec:flow_node(fun(_) -> throw(oops) end), fun(E) -> {caught, E} end), 0) =:= {flow_done, {caught, oops}}\") :name)")
(epoch 52)
(eval "(get (erlang-eval-ast \"flow:run(flow_spec:retry(5, flow_spec:flow_node(fun(X) -> X + 1 end)), 1) =:= {flow_done, 2}\") :name)")
;; ── suspend / replay (deterministic-replay core) ───────────
(epoch 60)
(eval "(get (erlang-eval-ast \"flow:run(${SUSP_FLOW}, 0) =:= {flow_suspended, wait1}\") :name)")
(epoch 61)
(eval "(get (erlang-eval-ast \"flow:drive(${SUSP_FLOW}, 0, [{wait1, 99}]) =:= {flow_done, {resumed, 99}}\") :name)")
(epoch 62)
(eval "(get (erlang-eval-ast \"flow:run(flow_spec:sequence([flow:suspend(a), flow:suspend(b)]), 0) =:= {flow_suspended, a}\") :name)")
;; wait/1 — timer-style suspend that PRESERVES the value on resume
(epoch 63)
(eval "(get (erlang-eval-ast \"flow:run(flow_spec:sequence([flow:wait(t), flow_spec:flow_node(fun(X) -> X + 1 end)]), 5) =:= {flow_suspended, t}\") :name)")
(epoch 64)
(eval "(get (erlang-eval-ast \"flow:drive(flow_spec:sequence([flow:wait(t), flow_spec:flow_node(fun(X) -> X + 1 end)]), 5, [{t, ignored}]) =:= {flow_done, 6}\") :name)")
;; ── durable store: registry ────────────────────────────────
(epoch 70)
(eval "(get (erlang-eval-ast \"flow_store:start_link(), flow_store:register_flow(f1, ${N1}), flow_store:resolve_flow(f1) =/= not_found andalso flow_store:registered_flows() =:= [f1]\") :name)")
(epoch 71)
(eval "(get (erlang-eval-ast \"flow_store:start_link(), flow_store:resolve_flow(ghost) =:= not_found\") :name)")
;; ── durable store: start / resume ──────────────────────────
;; one-shot flow runs to completion on start
(epoch 80)
(eval "(get (erlang-eval-ast \"flow_store:start_link(), flow_store:register_flow(done1, ${N1}), flow_store:start(done1, 41) =:= {ok, 1, {flow_done, 42}}\") :name)")
;; suspending flow: start suspends, resume completes
(epoch 81)
(eval "(get (erlang-eval-ast \"flow_store:start_link(), flow_store:register_flow(s1, ${SUSP_FLOW}), {ok, Id, R} = flow_store:start(s1, 10), R =:= {flow_suspended, wait1} andalso flow_store:status(Id) =:= {ok, {suspended, wait1}}\") :name)")
(epoch 82)
(eval "(get (erlang-eval-ast \"flow_store:start_link(), flow_store:register_flow(s1, ${SUSP_FLOW}), {ok, Id, _} = flow_store:start(s1, 10), flow_store:resume(Id, 99) =:= {ok, {flow_done, {resumed, 99}}}\") :name)")
(epoch 83)
(eval "(get (erlang-eval-ast \"flow_store:start_link(), flow_store:register_flow(s1, ${SUSP_FLOW}), {ok, Id, _} = flow_store:start(s1, 10), flow_store:resume(Id, 99), flow_store:status(Id) =:= {ok, {done, {resumed, 99}}}\") :name)")
;; two-suspend flow: resume chain accumulates the replay log
(epoch 84)
(eval "(get (erlang-eval-ast \"flow_store:start_link(), flow_store:register_flow(s2, ${TWO_SUSP}), {ok, Id, _} = flow_store:start(s2, 0), {ok, R1} = flow_store:resume(Id, 5), R2 = flow_store:resume(Id, 7), R1 =:= {flow_suspended, b} andalso R2 =:= {ok, {flow_done, 8}}\") :name)")
;; error paths
(epoch 85)
(eval "(get (erlang-eval-ast \"flow_store:start_link(), flow_store:start(ghost, 0) =:= {error, no_such_flow}\") :name)")
(epoch 86)
(eval "(get (erlang-eval-ast \"flow_store:start_link(), flow_store:resume(999, x) =:= {error, no_such_instance}\") :name)")
(epoch 87)
(eval "(get (erlang-eval-ast \"flow_store:start_link(), flow_store:register_flow(done1, ${N1}), {ok, Id, _} = flow_store:start(done1, 0), flow_store:resume(Id, x) =:= {error, already_done}\") :name)")
EPOCHS
OUTPUT=$(timeout 360 "$SX_SERVER" < "$TMPFILE" 2>/dev/null)
check() {
local epoch="$1" desc="$2" expected="$3"
local actual
actual=$(echo "$OUTPUT" | awk -v e="$epoch" '
$0 ~ "^\\(ok-len " e " " { getline; print; exit }
$0 ~ "^\\(ok " e " " { print; exit }
$0 ~ "^\\(error " e " " { print; exit }
')
[ -z "$actual" ] && actual="<no output for epoch $epoch>"
if echo "$actual" | grep -qF -- "$expected"; then
PASS=$((PASS+1))
[ "$VERBOSE" = "-v" ] && echo " ok $desc"
else
FAIL=$((FAIL+1))
ERRORS+=" FAIL [$desc] (epoch $epoch) expected: $expected | actual: $actual
"
fi
}
check 3 "flow module loaded" "flow"
check 4 "flow_spec module loaded" "flow_spec"
check 5 "flow_store module loaded" "flow_store"
check 10 "flow_id" "true"
check 11 "flow_const" "true"
check 12 "flow_node" "true"
check 20 "sequence threads left-to-right" "true"
check 21 "parallel fans out" "true"
check 22 "map_flow over a list" "true"
check 23 "flow_while bounded by pred" "true"
check 24 "flow_until bounded by pred" "true"
check 25 "flow_while bounded by max" "true"
check 30 "branch then-arm" "true"
check 31 "branch else-arm" "true"
check 40 "failed? predicate" "true"
check 41 "attempt stops at first fail" "true"
check 42 "attempt threads on success" "true"
check 43 "recover handles fail value" "true"
check 50 "tap pass-through" "true"
check 51 "try_catch catches a raise" "true"
check 52 "retry runs node" "true"
check 60 "suspend miss short-circuits" "true"
check 61 "suspend replay completes" "true"
check 62 "first of two suspends wins" "true"
check 63 "wait short-circuits on miss" "true"
check 64 "wait preserves value on resume" "true"
check 70 "register + resolve + list" "true"
check 71 "resolve unknown -> not_found" "true"
check 80 "start one-shot -> done" "true"
check 81 "start suspends + status" "true"
check 82 "resume completes" "true"
check 83 "status after resume = done" "true"
check 84 "two-suspend resume chain" "true"
check 85 "start unknown -> no_such_flow" "true"
check 86 "resume unknown -> no_such_instance" "true"
check 87 "resume a done flow -> already_done" "true"
TOTAL=$((PASS+FAIL))
if [ $FAIL -eq 0 ]; then
echo "ok $PASS/$TOTAL flow-on-erlang engine tests passed"
else
echo "FAIL $PASS/$TOTAL passed, $FAIL failed:"
echo "$ERRORS"
fi
[ $FAIL -eq 0 ]

102
next/flow/flow.erl Normal file
View File

@@ -0,0 +1,102 @@
-module(flow).
-export([drive/3, run/2,
cont/2, susp/2, is_susp/1, ctx_value/1, ctx_log/1,
suspend/1, wait/1, log_lookup/2]).
%% flow-on-erlang — the deterministic-replay core. A native Erlang port
%% of the Scheme flow engine (lib/flow), so the fed-sx kernel can fan
%% activities out into durable business flows in its own runtime (no
%% cross-guest FFI).
%%
%% Durability model — identical semantics to the Scheme engine, but
%% adapted to this Erlang-on-SX runtime, which has three hard
%% constraints the Scheme host doesn't: no escape continuation that can
%% be re-entered, no process dictionary, and (critically) a blocking
%% `receive` / `gen_server:call` inside a `try` deadlocks the
%% cooperative scheduler. So instead of the Scheme engine's
%% mutable-global + call/cc-escape, the replay log is THREADED through a
%% railway-style context and `suspend` SHORT-CIRCUITS (like a fail
%% value) rather than throwing. No ambient state, no throw, no
%% gen_server — purely functional, which sidesteps every constraint.
%%
%% A node is `fun(Ctx) -> Ctx`. A Ctx is one of:
%% {flow_cont, Value, Log} — running; Value is the current value
%% {flow_susp, Tag, Log} — short-circuited at suspend Tag
%% Log is the replay log: [{Tag, ResolvedValue}, ...]. Combinators
%% (flow_spec) thread Ctx and pass {flow_susp,...} straight through, so
%% once a flow suspends nothing downstream runs.
%%
%% suspend/1 is the load-bearing primitive: a node that, given the
%% running Ctx, looks Tag up in the replay log. A hit replaces the
%% current value with the logged value and continues; a miss
%% short-circuits to {flow_susp, Tag, Log}. ALL effects/non-determinism
%% go through a suspend node so they run once — in the driver, between
%% drives — and their results are logged, never re-run on replay. Tags
%% must be unique and deterministic across replays.
%% ── context constructors / accessors ────────────────────────────
cont(Value, Log) -> {flow_cont, Value, Log}.
susp(Tag, Log) -> {flow_susp, Tag, Log}.
is_susp({flow_susp, _, _}) -> true;
is_susp(_) -> false.
ctx_value({flow_cont, Value, _}) -> Value;
ctx_value({flow_susp, _, _}) -> undefined.
ctx_log({flow_cont, _, Log}) -> Log;
ctx_log({flow_susp, _, Log}) -> Log.
%% ── suspend node ────────────────────────────────────────────────
suspend(Tag) ->
fun (Ctx) ->
case Ctx of
{flow_susp, _, _} -> Ctx;
{flow_cont, _Value, Log} ->
case log_lookup(Tag, Log) of
{ok, Resolved} -> {flow_cont, Resolved, Log};
miss -> {flow_susp, Tag, Log}
end
end
end.
%% wait(Tag) — a timer-style suspend that PRESERVES the current value
%% instead of replacing it with the resolved one. Use it for pure
%% waits ("resume in the morning") where the resume is just a signal,
%% not a result: on the first pass it short-circuits like suspend; once
%% Tag is in the log the value flows through unchanged, so downstream
%% steps still see the value (e.g. the env) they had before the wait.
wait(Tag) ->
fun (Ctx) ->
case Ctx of
{flow_susp, _, _} -> Ctx;
{flow_cont, Value, Log} ->
case log_lookup(Tag, Log) of
{ok, _} -> {flow_cont, Value, Log};
miss -> {flow_susp, Tag, Log}
end
end
end.
log_lookup(_, []) -> miss;
log_lookup(Tag, [{Tag, Value} | _]) -> {ok, Value};
log_lookup(Tag, [_ | Rest]) -> log_lookup(Tag, Rest).
%% ── driver ──────────────────────────────────────────────────────
%% drive(Flow, Input, Log) — run Flow under the replay Log.
%% {flow_done, Result} — flow completed
%% {flow_suspended, Tag} — flow short-circuited at an unresolved
%% suspend; the driver resolves Tag, appends
%% {Tag, Value} to Log, and re-drives.
drive(Flow, Input, Log) ->
case Flow({flow_cont, Input, Log}) of
{flow_cont, Result, _} -> {flow_done, Result};
{flow_susp, Tag, _} -> {flow_suspended, Tag}
end.
%% run(Flow, Input) — drive with an empty replay log.
run(Flow, Input) ->
drive(Flow, Input, []).

240
next/flow/flow_spec.erl Normal file
View File

@@ -0,0 +1,240 @@
-module(flow_spec).
-export([flow_node/1, flow_id/0, flow_const/1,
sequence/1, parallel/1, map_flow/1,
flow_while/3, flow_until/3,
branch/3, fail/1, failed/1, fail_reason/1,
recover/2, tap/1, attempt/1, try_catch/2, retry/2]).
%% flow-on-erlang combinators — a native port of lib/flow/spec.sx,
%% adapted to the railway-threaded context model in flow.erl. A node is
%% `fun(Ctx) -> Ctx`; every combinator passes a {flow_susp,...} context
%% straight through, so once a flow suspends nothing downstream runs.
%% User code stays value-level: the predicates/functions handed to
%% flow_node / branch / etc. take and return plain values, and the
%% combinator threads them into the context.
%%
%% Variadic Scheme forms (sequence, parallel, attempt) take an explicit
%% list here — the one idiom difference from the Scheme engine. Effects
%% must go through a flow:suspend/1 node so they run once (in the
%% driver) and replay from the log; `tap` is only for replay-safe
%% effects (e.g. tracing).
%% ── leaves ──────────────────────────────────────────────────────
%% flow_node(F) — lift a value function F :: Value -> Value into a node.
flow_node(F) ->
fun (Ctx) ->
case flow:is_susp(Ctx) of
true -> Ctx;
false -> flow:cont(F(flow:ctx_value(Ctx)), flow:ctx_log(Ctx))
end
end.
flow_id() ->
fun (Ctx) -> Ctx end.
flow_const(V) ->
fun (Ctx) ->
case flow:is_susp(Ctx) of
true -> Ctx;
false -> flow:cont(V, flow:ctx_log(Ctx))
end
end.
%% ── threading / fan-out / iteration ─────────────────────────────
%% sequence(Nodes) — thread the context left-to-right. Each node
%% self-guards on suspension, so a suspended context flows through
%% untouched.
sequence(Nodes) ->
fun (Ctx) -> seq_step(Nodes, Ctx) end.
seq_step([], Ctx) -> Ctx;
seq_step([N | Ns], Ctx) -> seq_step(Ns, N(Ctx)).
%% parallel(Nodes) — fan the input value to every node, join results
%% into a list (sequential evaluation under one shared replay log).
%% First child to suspend short-circuits the whole parallel.
parallel(Nodes) ->
fun (Ctx) ->
case flow:is_susp(Ctx) of
true -> Ctx;
false -> par_step(Nodes, flow:ctx_value(Ctx), flow:ctx_log(Ctx), [])
end
end.
par_step([], _Input, Log, Acc) ->
flow:cont(lists:reverse(Acc), Log);
par_step([N | Ns], Input, Log, Acc) ->
R = N(flow:cont(Input, Log)),
case flow:is_susp(R) of
true -> R;
false -> par_step(Ns, Input, Log, [flow:ctx_value(R) | Acc])
end.
%% map_flow(Node) — run Node over each item of a list input value.
map_flow(Node) ->
fun (Ctx) ->
case flow:is_susp(Ctx) of
true -> Ctx;
false -> map_step(Node, flow:ctx_value(Ctx), flow:ctx_log(Ctx), [])
end
end.
map_step(_, [], Log, Acc) ->
flow:cont(lists:reverse(Acc), Log);
map_step(Node, [I | Is], Log, Acc) ->
R = Node(flow:cont(I, Log)),
case flow:is_susp(R) of
true -> R;
false -> map_step(Node, Is, Log, [flow:ctx_value(R) | Acc])
end.
%% flow_while(Pred, Body, Max) — re-run Body (a node), threading the
%% context, while Pred(value) holds, up to Max steps. Pred :: Value ->
%% bool; Body :: node.
flow_while(Pred, Body, Max) ->
fun (Ctx) -> while_step(Pred, Body, Ctx, Max) end.
while_step(_, _, Ctx, N) when N =< 0 -> Ctx;
while_step(Pred, Body, Ctx, N) ->
case flow:is_susp(Ctx) of
true -> Ctx;
false ->
case Pred(flow:ctx_value(Ctx)) of
true -> while_step(Pred, Body, Body(Ctx), N - 1);
_ -> Ctx
end
end.
%% flow_until(Pred, Body, Max) — re-run Body until Pred(value) holds.
flow_until(Pred, Body, Max) ->
fun (Ctx) -> until_step(Pred, Body, Ctx, Max) end.
until_step(_, _, Ctx, N) when N =< 0 -> Ctx;
until_step(Pred, Body, Ctx, N) ->
case flow:is_susp(Ctx) of
true -> Ctx;
false ->
case Pred(flow:ctx_value(Ctx)) of
true -> Ctx;
_ -> until_step(Pred, Body, Body(Ctx), N - 1)
end
end.
%% ── branching ───────────────────────────────────────────────────
%% branch(Pred, Then, Else) — Pred :: Value -> bool; Then/Else :: node.
branch(Pred, Then, Else) ->
fun (Ctx) ->
case flow:is_susp(Ctx) of
true -> Ctx;
false ->
case Pred(flow:ctx_value(Ctx)) of
true -> Then(Ctx);
_ -> Else(Ctx)
end
end
end.
%% ── railway-style failure (values, not exceptions) ──────────────
fail(Reason) -> {flow_fail, Reason}.
failed({flow_fail, _}) -> true;
failed(_) -> false.
fail_reason({flow_fail, R}) -> R.
%% recover(Node, Handler) — if Node yields a fail VALUE, run Handler on
%% the reason; else pass through. Handler :: Reason -> Value.
recover(Node, Handler) ->
fun (Ctx) ->
R = Node(Ctx),
case flow:is_susp(R) of
true -> R;
false ->
V = flow:ctx_value(R),
case failed(V) of
true -> flow:cont(Handler(fail_reason(V)), flow:ctx_log(R));
false -> R
end
end
end.
%% tap(Effect) — replay-safe side-effecting pass-through (returns the
%% input value unchanged). Effect :: Value -> any.
tap(Effect) ->
fun (Ctx) ->
case flow:is_susp(Ctx) of
true -> Ctx;
false -> Effect(flow:ctx_value(Ctx)), Ctx
end
end.
%% attempt(Nodes) — railway sequence: thread left-to-right but stop at
%% the first node whose value is a fail, returning that failure.
attempt(Nodes) ->
fun (Ctx) -> attempt_step(Nodes, Ctx) end.
attempt_step([], Ctx) -> Ctx;
attempt_step([N | Ns], Ctx) ->
case flow:is_susp(Ctx) of
true -> Ctx;
false ->
case failed(flow:ctx_value(Ctx)) of
true -> Ctx;
false -> attempt_step(Ns, N(Ctx))
end
end.
%% ── exception-style control ─────────────────────────────────────
%% Nodes are pure (effects go through suspend, run by the driver), so a
%% try around a node never wraps a blocking receive — safe in this
%% runtime.
%% try_catch(Node, Handler) — run Node; if it raises, run Handler on the
%% exception. Handler :: Exception -> Value.
try_catch(Node, Handler) ->
fun (Ctx) ->
case flow:is_susp(Ctx) of
true -> Ctx;
false ->
Log = flow:ctx_log(Ctx),
try Node(Ctx) of
R -> R
catch
throw:E -> flow:cont(Handler(E), Log);
error:E -> flow:cont(Handler(E), Log);
exit:E -> flow:cont(Handler(E), Log)
end
end
end.
%% retry(N, Node) — run Node, retrying up to N attempts on a raise.
retry(N, Node) ->
fun (Ctx) -> retry_step(N, Node, Ctx) end.
retry_step(N, Node, Ctx) ->
case flow:is_susp(Ctx) of
true -> Ctx;
false ->
try Node(Ctx) of
R -> R
catch
throw:Reason -> retry_reraise(N, Node, Ctx, throw, Reason);
error:Reason -> retry_reraise(N, Node, Ctx, error, Reason);
exit:Reason -> retry_reraise(N, Node, Ctx, exit, Reason)
end
end.
retry_reraise(N, Node, Ctx, Class, Reason) ->
case N =< 1 of
false -> retry_step(N - 1, Node, Ctx);
true ->
case Class of
throw -> throw(Reason);
error -> erlang:error(Reason);
exit -> exit(Reason)
end
end.

161
next/flow/flow_store.erl Normal file
View File

@@ -0,0 +1,161 @@
-module(flow_store).
-export([start_link/0, start_link/1, stop/0,
register_flow/2, resolve_flow/1, registered_flows/0,
start/2, resume/2, status/1, instances/0]).
-export([init/1, handle_call/3, handle_cast/2, handle_info/2]).
-behaviour(gen_server).
%% flow-on-erlang durable store — the named-flow registry plus the
%% instance table that makes suspend/resume durable. flow.erl is the
%% pure replay driver; this gen_server is the stateful shell around it,
%% holding the registry (so triggers can reference flows by name, and
%% so an instance can be re-resolved + replayed after a restart) and
%% each instance's accumulated replay log.
%%
%% Crucially the driver stays OUT of any blocking context: start/resume
%% call flow:drive/3 (pure — no receive, no gen_server:call) from inside
%% handle_call, and the only message-passing is the caller's
%% gen_server:call into this store. (A blocking receive inside a `try`
%% deadlocks this cooperative scheduler, so the engine never does one.)
%%
%% State: {Registry, Instances, NextId}
%% Registry = [{Name, FlowFun}, ...]
%% Instances = [{Id, {Name, Input, Log, Status}}, ...]
%% Status = {suspended, Tag} | {done, Result}
%% Log = [{Tag, ResolvedValue}, ...] (the replay log — plain
%% data, so an instance is fully described by its log and
%% survives process restart by re-driving the named flow)
%%
%% v1 backs the store in gen_server memory; persisting the instance
%% logs to the kernel's durable log (so flows survive an OS restart) is
%% a later layer — the data shape is already restart-ready.
start_link() ->
start_link([]).
start_link(InitialFlows) ->
Pid = gen_server:start_link(flow_store, [InitialFlows]),
erlang:register(flow_store, Pid),
Pid.
stop() ->
R = gen_server:call(flow_store, '$gen_stop'),
erlang:unregister(flow_store),
R.
%% register_flow(Name, Flow) — register a named flow (a node fun). Named
%% rather than `register` to avoid the erlang:register/2 auto-import.
register_flow(Name, Flow) ->
gen_server:call(flow_store, {register_flow, Name, Flow}).
resolve_flow(Name) ->
gen_server:call(flow_store, {resolve_flow, Name}).
registered_flows() ->
gen_server:call(flow_store, registered_flows).
%% start(Name, Input) -> {ok, Id, Result} | {error, no_such_flow}.
%% Result is {flow_done, V} | {flow_suspended, Tag}; the instance is
%% recorded either way so a suspended flow can be resumed by Id.
start(Name, Input) ->
gen_server:call(flow_store, {start, Name, Input}).
%% resume(Id, Value) -> {ok, Result} | {error, Reason}. Resolves the
%% instance's current suspend tag with Value (appends {Tag, Value} to
%% its replay log) and re-drives from the top.
resume(Id, Value) ->
gen_server:call(flow_store, {resume, Id, Value}).
%% status(Id) -> {ok, {suspended, Tag}} | {ok, {done, Result}} | not_found
status(Id) ->
gen_server:call(flow_store, {status, Id}).
instances() ->
gen_server:call(flow_store, instances).
%% ── gen_server ──────────────────────────────────────────────────
init([InitialFlows]) ->
{ok, {InitialFlows, [], 1}}.
handle_call({register_flow, Name, Flow}, _From, {Reg, Ins, N}) ->
{reply, ok, {set_keyed(Name, Flow, Reg), Ins, N}};
handle_call({resolve_flow, Name}, _From, {Reg, Ins, N}) ->
{reply, find_keyed(Name, Reg), {Reg, Ins, N}};
handle_call(registered_flows, _From, {Reg, Ins, N}) ->
{reply, [Name || {Name, _} <- Reg], {Reg, Ins, N}};
handle_call({start, Name, Input}, _From, {Reg, Ins, N}) ->
case find_keyed(Name, Reg) of
not_found ->
{reply, {error, no_such_flow}, {Reg, Ins, N}};
{ok, Flow} ->
case safe_drive(Flow, Input, []) of
{ok, R} ->
Status = result_status(R),
Ins2 = set_keyed(N, {Name, Input, [], Status}, Ins),
{reply, {ok, N, R}, {Reg, Ins2, N + 1}};
{error, Crash} ->
{reply, {error, {flow_crashed, Crash}}, {Reg, Ins, N}}
end
end;
handle_call({resume, Id, Value}, _From, {Reg, Ins, N}) ->
case find_keyed(Id, Ins) of
not_found ->
{reply, {error, no_such_instance}, {Reg, Ins, N}};
{ok, {_Name, _Input, _Log, {done, _}}} ->
{reply, {error, already_done}, {Reg, Ins, N}};
{ok, {Name, Input, Log, {suspended, Tag}}} ->
case find_keyed(Name, Reg) of
not_found ->
{reply, {error, no_such_flow}, {Reg, Ins, N}};
{ok, Flow} ->
NewLog = log_append(Log, Tag, Value),
case safe_drive(Flow, Input, NewLog) of
{ok, R} ->
Status = result_status(R),
Ins2 = set_keyed(Id, {Name, Input, NewLog, Status}, Ins),
{reply, {ok, R}, {Reg, Ins2, N}};
{error, Crash} ->
{reply, {error, {flow_crashed, Crash}}, {Reg, Ins, N}}
end
end
end;
handle_call({status, Id}, _From, {Reg, Ins, N}) ->
case find_keyed(Id, Ins) of
{ok, {_Name, _Input, _Log, Status}} -> {reply, {ok, Status}, {Reg, Ins, N}};
not_found -> {reply, not_found, {Reg, Ins, N}}
end;
handle_call(instances, _From, {Reg, Ins, N}) ->
{reply, [Id || {Id, _} <- Ins], {Reg, Ins, N}}.
handle_cast(_, S) -> {noreply, S}.
handle_info(_, S) -> {noreply, S}.
%% ── helpers ─────────────────────────────────────────────────────
result_status({flow_done, R}) -> {done, R};
result_status({flow_suspended, T}) -> {suspended, T}.
%% safe_drive/3 — flow:drive is pure (no blocking receive), so a `try`
%% around it is safe in this runtime and isolates a flow whose step
%% raises: the store returns {error, {flow_crashed, _}} instead of the
%% gen_server crashing, keeping one bad flow from taking down others.
safe_drive(Flow, Input, Log) ->
try {ok, flow:drive(Flow, Input, Log)}
catch
throw:R -> {error, {throw, R}};
error:R -> {error, {error, R}};
exit:R -> {error, {exit, R}}
end.
log_append([], Tag, Value) -> [{Tag, Value}];
log_append([H | T], Tag, Value) -> [H | log_append(T, Tag, Value)].
find_keyed(_, []) -> not_found;
find_keyed(K, [{K, V} | _]) -> {ok, V};
find_keyed(K, [_ | Rest]) -> find_keyed(K, Rest).
set_keyed(K, V, []) -> [{K, V}];
set_keyed(K, V, [{K, _} | Rest]) -> [{K, V} | Rest];
set_keyed(K, V, [P | Rest]) -> [P | set_keyed(K, V, Rest)].

View File

@@ -0,0 +1,81 @@
-module(blog_publish_digest).
-export([build/1]).
%% A motivating multi-step business flow for the fed-sx-triggers e2e:
%% when an Article is published, decide a batch policy by category,
%% (for newsletters) wait until morning, fetch the author's followers,
%% build a digest email for each, and emit a DigestSent activity — the
%% flow's own output, which a driver appends, closing the loop so it can
%% trigger downstream flows.
%%
%% Demonstrates: a branch on an activity field (:category), a timer
%% suspension (flow:wait/1, resumed by advancing the clock), an injected
%% effect (fetch_followers), and a follow-up activity emit.
%%
%% Effect-as-data: a flow runs inside flow_store's drive, where a
%% blocking call (e.g. into nx_kernel) would deadlock this scheduler, so
%% the flow does NOT perform IO itself. It DESCRIBES the effects in its
%% result — {digest_sent, Emails, DigestActivityObject} — and the driver
%% (the fan-out caller) dispatches the emails and appends the DigestSent
%% activity. fetch_followers is injected (the one external read) as a
%% pure function so the e2e can supply a deterministic list.
%%
%% Input env (from flow_dispatch): [{activity, A}, {actor, Actor}, ...].
%% Result: {digest_sent, [Email], DigestObject} | skipped.
build(Effects) ->
FetchFollowers = field(fetch_followers, Effects),
flow_spec:branch(
fun (Env) -> is_article(Env) end,
flow_spec:branch(
fun (Env) -> category_is(Env, newsletter) end,
%% newsletter: hold until morning, then send + emit
flow_spec:sequence([flow:wait(morning), send_emit(FetchFollowers)]),
flow_spec:branch(
fun (Env) -> category_is(Env, urgent) end,
%% urgent: send + emit now (no wait)
send_emit(FetchFollowers),
%% any other category: skip
flow_spec:flow_const(skipped))),
%% not an Article: skip
flow_spec:flow_const(skipped)).
%% send_emit(FetchFollowers) — the terminal step: build one digest email
%% per follower and the DigestSent emit object. Pure given the injected
%% follower list, so it is replay-safe (and it sits after the only
%% suspend point, so it runs exactly once).
send_emit(FetchFollowers) ->
flow_spec:flow_node(
fun (Env) ->
Activity = env_activity(Env),
Actor = env_actor(Env),
ArtId = activity_id(Activity),
Followers = FetchFollowers(Actor),
Emails = [ [{to, F}, {article, ArtId}] || F <- Followers ],
Digest = [{type, digest_sent},
{for, ArtId},
{follower_count, length(Followers)}],
{digest_sent, Emails, Digest}
end).
%% ── predicates / accessors ──────────────────────────────────────
is_article(Env) ->
object_type(object_of(env_activity(Env))) =:= article.
category_is(Env, Cat) ->
object_category(object_of(env_activity(Env))) =:= Cat.
env_activity(Env) -> field(activity, Env).
env_actor(Env) -> field(actor, Env).
object_of(Activity) -> field(object, Activity).
object_type(Obj) -> field(type, Obj).
object_category(Obj) -> field(category, Obj).
activity_id(Activity) -> field(id, Activity).
field(Key, Proplist) ->
case envelope:get_field(Key, Proplist) of
{ok, V} -> V;
_ -> undefined
end.

View File

@@ -0,0 +1,33 @@
;; next/genesis/activity-types/define_trigger.sx
;;
;; Bootstrap definition of the DefineTrigger verb per
;; plans/agent-briefings/fed-sx-triggers-loop.md (Phase 1) and
;; plans/fed-sx-design.md §13. Read as data by the bundler
;; (bootstrap.erl) — never evaluated as code.
;;
;; DefineTrigger binds an activity-type to a flow. When a matching
;; activity is appended to the log, the kernel's trigger fan-out
;; (pipeline.erl, post-append) looks the type up in the trigger
;; registry and starts the named flow with the activity as input.
;; The activity's :object is the binding record:
;; {:activity-type "Create" ;; the verb to fire on
;; :flow-name "blog-publish-digest"
;; :guard <optional predicate> ;; discriminator
;; :actor-scope <optional actor id>} ;; default: any
;;
;; The schema validates the *activity* shape: :object present with
;; string :activity-type and :flow-name. The optional :guard lets one
;; type bind to multiple flows with discriminators; it is resolved to
;; an Erlang predicate at registration time (trigger_registry), not
;; carried in the pure-predicate schema here. Schema bodies use nested
;; `get` (not keyword-threading) so the predicate is evaluatable.
(DefineActivity
:name "DefineTrigger"
:doc "Bind an activity-type to a flow. :object carries :activity-type, :flow-name, and optional :guard and :actor-scope."
:schema (fn
(act)
(and
(not (nil? (get act :object)))
(string? (get (get act :object) :activity-type))
(string? (get (get act :object) :flow-name))))
:semantics (fn (state act) state))

View File

@@ -0,0 +1,34 @@
;; next/genesis/activity-types/define_type.sx
;;
;; Bootstrap definition of the DefineType verb per
;; plans/fed-sx-host-types.md (host-type federation, Phase 1).
;; Read as data by the bundler (bootstrap.erl) — never evaluated as
;; code. The :schema and :semantics bodies are SX source.
;;
;; DefineType declares a refinement type. The activity's :object is
;; the type record:
;; {:name "Post" ;; the type's display name
;; :fields (...) ;; optional field descriptors
;; :refinement-schema (fn (obj) ...) ;; predicate over instances
;; :instance-type "Note"} ;; base object-type it refines
;;
;; The schema below validates the *activity* shape: :object present,
;; :name a string, :fields (when present) a list. The richer
;; per-field shape check and the registry registration land with the
;; peer_types cache (Phase 2) — at this phase the form is pure data.
;;
;; Schema bodies use nested `get` rather than keyword-threading so
;; the predicate is directly evaluatable (keywords are not callable
;; getters in the kernel; `(-> d :k)` is not a get).
(DefineActivity
:name "DefineType"
:doc "Declare a refinement type. :object carries :name, optional :fields, :refinement-schema, and :instance-type."
:schema (fn
(act)
(and
(not (nil? (get act :object)))
(string? (get (get act :object) :name))
(or
(nil? (get (get act :object) :fields))
(list? (get (get act :object) :fields)))))
:semantics (fn (state act) state))

View File

@@ -0,0 +1,31 @@
;; next/genesis/activity-types/subtype_of.sx
;;
;; Bootstrap definition of the SubtypeOf verb per
;; plans/fed-sx-host-types.md (host-type federation, Phase 1).
;; Read as data by the bundler (bootstrap.erl) — never evaluated as
;; code. The :schema and :semantics bodies are SX source.
;;
;; SubtypeOf records a hierarchy edge between two previously-defined
;; types. The activity's :object is the relation record:
;; {:child-type-cid "bafy...child"
;; :parent-type-cid "bafy...parent"}
;;
;; The schema validates the *activity* shape: both CIDs present and
;; string-typed. Verifying that each CID names a previously-defined
;; type is a registry concern (it needs the type index that lands
;; with peer_types in Phase 2), so it is deliberately out of the
;; pure-predicate schema here — adding the edge to the hierarchy
;; index is the :semantics' job once the registry surface exists.
;;
;; Schema bodies use nested `get` rather than keyword-threading so
;; the predicate is directly evaluatable.
(DefineActivity
:name "SubtypeOf"
:doc "Record a subtype edge. :object carries :child-type-cid and :parent-type-cid, both type CIDs."
:schema (fn
(act)
(and
(not (nil? (get act :object)))
(string? (get (get act :object) :child-type-cid))
(string? (get (get act :object) :parent-type-cid))))
:semantics (fn (state act) state))

View File

@@ -22,7 +22,10 @@
"activity-types/update.sx"
"activity-types/delete.sx"
"activity-types/announce.sx"
"activity-types/endorse.sx")
"activity-types/endorse.sx"
"activity-types/define_type.sx"
"activity-types/subtype_of.sx"
"activity-types/define_trigger.sx")
:object-types ("object-types/sx-artifact.sx"
"object-types/note.sx"
"object-types/tombstone.sx"

View File

@@ -0,0 +1,118 @@
-module(discovery_type_fetch).
-export([make_fetch_fn/0, make_fetch_fn/1,
fetch/2,
type_doc_url/2,
resolve_type_url/2,
accept_header/0]).
%% Live type-doc fetch for peer_types — host-type federation Step 3,
%% the sibling of discovery_fetch.erl. peer_types:lookup_or_fetch/3
%% calls a Cfg-supplied type_fetch_fn :: fun ((TypeCid, Cfg) -> {ok,
%% Bytes} | {error, _}) on a cache miss; this module produces that
%% closure for live federation. It GETs <base>/types/<cid> with an
%% Accept header that asks for the type-doc format (http_server.erl
%% Step 3) and returns the RAW response bytes — peer_types decodes
%% them via term_codec into the TypeRecord. (This is the one shape
%% difference from discovery_fetch, whose closure returns an already-
%% decoded actor-state: there the cache stores the decoded AS, here
%% peer_types owns the decode so the type-doc wire format lives in one
%% place — the /types/ route encodes, peer_types decodes.)
%%
%% Cfg shape (parallels discovery_fetch's peer URL resolution):
%% {type_url, [{TypeCid, BaseUrl}, ...]}
%% {type_url_fn, fun ((TypeCid) -> {ok, BaseUrl} | not_found)}
%%
%% BaseUrl shape: <<"http://host:port">> (no trailing slash; this
%% module appends the path). TypeCid is the type's CID bytes.
%%
%% Outcomes:
%% 2xx -> {ok, Bytes}
%% non-2xx -> {error, {status, N}}
%% resolver miss -> {error, no_type_url}
%% transport -> {error, Reason}
%% ── Accept header ────────────────────────────────────────────
%% "application/vnd.fed-sx.type-doc" — same MIME http_server's
%% content_type_for(type_doc) emits, so the Accept negotiation routes
%% the served bytes to the term_codec-encoded TypeRecord arm.
accept_header() ->
<<97,112,112,108,105,99,97,116,105,111,110,47,
118,110,100,46,102,101,100,45,115,120,46,
116,121,112,101,45,100,111,99>>.
%% ── public API ───────────────────────────────────────────────
%% make_fetch_fn/0 — the fun/2 peer_types:lookup_or_fetch calls. It
%% reads the type-URL resolver out of the Cfg passed at call time, so
%% the same Cfg threads through peer_types and this closure.
make_fetch_fn() ->
fun (TypeCid, Cfg) ->
case resolve_type_url(TypeCid, Cfg) of
{error, R} -> {error, R};
{ok, BaseUrl} -> fetch(type_doc_url(BaseUrl, TypeCid), Cfg)
end
end.
%% make_fetch_fn/1 — variant that closes over a static Cfg for the
%% resolver while still honouring the call-time Cfg for transport.
%% Lets a caller bake the type_url map once and reuse the closure.
make_fetch_fn(StaticCfg) ->
fun (TypeCid, Cfg) ->
case resolve_type_url(TypeCid, StaticCfg) of
{error, R} -> {error, R};
{ok, BaseUrl} -> fetch(type_doc_url(BaseUrl, TypeCid), Cfg)
end
end.
fetch(Url, _Cfg) ->
AcceptKey = <<97,99,99,101,112,116>>, % "accept"
Headers = [{AcceptKey, accept_header()}],
try httpc:request(Url, get, Headers, <<>>) of
{ok, Status, _H, Body} when Status >= 200, Status < 300 ->
{ok, Body};
{ok, Status, _H, _B} ->
{error, {status, Status}};
Other ->
{error, {bad_response, Other}}
catch
error:Reason -> {error, Reason}
end.
%% type_doc_url/2 — <BaseUrl>/types/<cid>. TypeCid is the cid bytes,
%% appended verbatim as the path segment (matches the "/types/" prefix
%% http_server.erl registers).
type_doc_url(BaseUrl, TypeCid) when is_binary(TypeCid) ->
%% "/types/" — 7 bytes
Prefix = <<47,116,121,112,101,115,47>>,
<<BaseUrl/binary, Prefix/binary, TypeCid/binary>>.
%% resolve_type_url/2 — map a TypeCid to its serving node's base URL.
%% type_url_fn (a 1-arity closure) takes precedence over the static
%% type_url proplist; absent both -> {error, no_type_url}.
resolve_type_url(TypeCid, Cfg) ->
case field(type_url_fn, Cfg) of
Fn when is_function(Fn, 1) ->
case Fn(TypeCid) of
{ok, BaseUrl} -> {ok, BaseUrl};
_ -> {error, no_type_url}
end;
_ ->
case field(type_url, Cfg) of
nil -> {error, no_type_url};
Map ->
case find_keyed(TypeCid, Map) of
{ok, BaseUrl} -> {ok, BaseUrl};
_ -> {error, no_type_url}
end
end
end.
%% ── helpers ──────────────────────────────────────────────────
field(K, [{K, V} | _]) -> V;
field(K, [_ | Rest]) -> field(K, Rest);
field(_, []) -> nil.
find_keyed(_, []) -> {error, not_found};
find_keyed(K, [{K, V} | _]) -> {ok, V};
find_keyed(K, [_ | Rest]) -> find_keyed(K, Rest).

View File

@@ -0,0 +1,76 @@
-module(flow_dispatch).
-export([start/4, guard_passes/3]).
%% Bridge from "an activity matched a trigger" to "a flow started with
%% that activity as input" (fed-sx-triggers Phase 3). A NATIVE call into
%% next/flow (flow_store) — the engine is Erlang-on-SX too, so there is
%% no cross-guest FFI: the kernel and the workflow engine share one
%% runtime.
%%
%% start(Spec, Activity, ActorState, Cfg)
%% -> {ok, FlowId, {ActivityCid, TriggerCid, FlowId}} (audit triple)
%% | {error, Reason}
%%
%% The flow named in Spec is started with the activity bound into its
%% input environment, so flow steps can read the activity, the actor id,
%% and the trigger cid (the audit chain). Flow-start failures — an
%% unknown flow name, or a crashing first step (flow_store isolates the
%% raise) — come back as {error, Reason}, never raised, so the fan-out
%% caller is insulated from one flow's failure.
start(Spec, Activity, ActorState, _Cfg) ->
FlowName = trigger_registry:spec_flow_name(Spec),
TriggerCid = trigger_registry:spec_cid(Spec),
ActivityCid = activity_cid(Activity),
Input = [{activity, Activity},
{actor, actor_id_of(ActorState, Activity)},
{trigger_cid, TriggerCid}],
case flow_store:start(FlowName, Input) of
{ok, FlowId, _Result} ->
{ok, FlowId, {ActivityCid, TriggerCid, FlowId}};
{error, Reason} ->
{error, Reason}
end.
%% guard_passes(Spec, Activity, ActorState) — a spec fires when its
%% actor-scope admits the activity's actor AND its guard (if any)
%% returns true. An `any` scope and an `undefined` guard always pass;
%% the guard lets one activity-type bind multiple flows with
%% discriminators.
guard_passes(Spec, Activity, ActorState) ->
scope_ok(trigger_registry:spec_actor_scope(Spec), Activity) andalso
guard_ok(trigger_registry:spec_guard(Spec), Activity, ActorState).
scope_ok(any, _Activity) -> true;
scope_ok(Scope, Activity) ->
case envelope:get_field(actor, Activity) of
{ok, Scope} -> true;
_ -> false
end.
guard_ok(undefined, _Activity, _ActorState) -> true;
guard_ok(Guard, Activity, ActorState) when is_function(Guard, 2) ->
Guard(Activity, ActorState);
guard_ok(_, _, _) -> false.
%% ── helpers ─────────────────────────────────────────────────────
activity_cid(Activity) ->
case envelope:get_field(id, Activity) of
{ok, Cid} -> Cid;
_ -> undefined
end.
%% actor_id_of/2 — prefer the receiving actor's id (ActorState carries
%% {actor_id, _}); fall back to the activity's :actor. Reading
%% ActorState as a proplist keeps this decoupled from actor_state's
%% internal shape and testable with a plain [{actor_id, _}] stand-in.
actor_id_of(ActorState, Activity) ->
case envelope:get_field(actor_id, ActorState) of
{ok, Id} -> Id;
_ ->
case envelope:get_field(actor, Activity) of
{ok, A} -> A;
_ -> undefined
end
end.

View File

@@ -4,6 +4,7 @@
welcome_body/0, capabilities_body/0,
capabilities_path/0,
match_prefix/2, actors_prefix/0, actor_doc_response/1,
types_prefix/0, type_doc_response_for/2,
artifacts_prefix/0, artifact_response/1,
projections_list_path/0, projections_prefix/0,
projections_list_response/0, projection_response/1,
@@ -156,7 +157,12 @@ dispatch(<<71, 69, 84>>, Path, F, Cfg) ->
{ok, Name} when byte_size(Name) > 0 ->
projection_response_for(Name, F);
_ ->
not_found_response()
case match_prefix(types_prefix(), Path) of
{ok, Cid} when byte_size(Cid) > 0 ->
type_doc_response_for(Cid, Cfg);
_ ->
not_found_response()
end
end
end
end;
@@ -289,6 +295,10 @@ artifact_response(Cid) ->
Body = <<Pre/binary, Cid/binary, 10>>,
ok_response(Body).
%% "/types/" — 7 bytes: 47 116 121 112 101 115 47 (host-type fed Step 3)
types_prefix() ->
<<47,116,121,112,101,115,47>>.
%% "/projections" — 12 bytes (no trailing slash; the list endpoint)
projections_list_path() ->
<<47,112,114,111,106,101,99,116,105,111,110,115>>.
@@ -488,9 +498,20 @@ actor_doc_prefix() ->
118,110,100,46,102,101,100,45,115,120,46,
97,99,116,111,114,45,100,111,99>>.
%% "application/vnd.fed-sx.type-doc" — 31 bytes (host-type fed Step 3).
%% Distinct from actor-doc: the body is a term_codec-encoded
%% TypeRecord (peer_types cache entry), not a peer-actor-state.
type_doc_prefix() ->
<<97,112,112,108,105,99,97,116,105,111,110,47,
118,110,100,46,102,101,100,45,115,120,46,
116,121,112,101,45,100,111,99>>.
accept_format(nil) -> text;
accept_format(<<>>) -> text;
accept_format(V) when is_binary(V) ->
case match_prefix(type_doc_prefix(), V) of
{ok, _} -> type_doc;
_ ->
case match_prefix(actor_doc_prefix(), V) of
{ok, _} -> actor_doc;
_ ->
@@ -510,6 +531,7 @@ accept_format(V) when is_binary(V) ->
end
end
end
end
end;
accept_format(_) -> text.
@@ -586,6 +608,11 @@ content_type_for(actor_doc) ->
<<97,112,112,108,105,99,97,116,105,111,110,47,
118,110,100,46,102,101,100,45,115,120,46,
97,99,116,111,114,45,100,111,99>>;
%% "application/vnd.fed-sx.type-doc" — 31 bytes (host-type fed Step 3).
content_type_for(type_doc) ->
<<97,112,112,108,105,99,97,116,105,111,110,47,
118,110,100,46,102,101,100,45,115,120,46,
116,121,112,101,45,100,111,99>>;
content_type_for(_) ->
content_type_for(text).
@@ -714,6 +741,42 @@ kernel_actor_state(_Kernel, Id) ->
_ -> nil
end.
%% ── host-type fed Step 3: GET /types/<cid> ──────────────────────
%%
%% Serves a TypeRecord the node has cached (its own published types or
%% types fetched from peers) so a federated peer running
%% discovery_type_fetch can decode it directly into the shape
%% peer_types + the object-schema pipeline stage consume. The wire
%% body is term_codec:encode(TypeRecord) under the
%% application/vnd.fed-sx.type-doc content type; a cache miss is a 404.
%%
%% Cid is the path segment after "/types/" (the type's CID bytes). Cfg
%% carries `{peer_types, peer_types}` to opt the route into the cache —
%% absent (or the gen_server down) short-circuits to 404, matching the
%% kernel_actor_state guard for the actor-doc route. This port can't
%% dispatch `Mod:Fun` on a variable module, so the registered
%% `peer_types` atom is hardcoded; the Cfg field flags "no cache wired".
type_doc_response_for(Cid, Cfg) ->
case type_record_for(Cfg, Cid) of
nil -> not_found_response();
TR -> ok_response(term_codec:encode(TR), type_doc)
end.
type_record_for(Cfg, Cid) ->
case field(peer_types, Cfg) of
nil -> nil;
_ ->
case erlang:whereis(peer_types) of
undefined -> nil;
_ ->
case peer_types:lookup(Cid) of
{ok, TR} -> TR;
_ -> nil
end
end
end.
%% ── Step 4a: per-actor sub-resource stubs ──────────────────────
%% Per design §16.1 each actor has /outbox /inbox /followers
%% /following routes. v1 returns text-stub bodies so route resolution

180
next/kernel/peer_types.erl Normal file
View File

@@ -0,0 +1,180 @@
-module(peer_types).
-export([new/0, lookup/2, store/3, evict/2, types/1,
lookup_or_fetch/3, decode_type_doc/1,
start_link/0, start_link/1, stop/0,
put/2, lookup/1, state_for/1, known_types/0,
lookup_or_fetch/2, evict/1]).
-export([init/1, handle_call/3, handle_cast/2, handle_info/2]).
-behaviour(gen_server).
%% Peer-types cache — receiver-side mirror of peer_actors.erl, for
%% host-type federation (plans/fed-sx-host-types.md, Phase 2). When an
%% inbound activity references a refinement type the local node hasn't
%% seen, the object-schema validation stage (Phase 4) needs that
%% type's record — its :refinement-schema and field shape — to vet the
%% inner object. Re-fetching the type doc on every inbound would be
%% wasteful, so we cache the TypeRecord keyed by its content-address.
%%
%% State shape (pure-functional):
%% [{TypeCidBytes, TypeRecord}, ...]
%%
%% TypeCidBytes is the type's CID (a binary). TypeRecord is the parsed
%% DefineType envelope's :object payload — a proplist carrying :name,
%% :fields, :refinement-schema, :instance-type. Refinement schemas are
%% immutable per CID (an updated type is a new CID), so cache entries
%% never go stale — TTL-free, like peer_actors' v2 entries.
%%
%% lookup_or_fetch is the load-bearing entry point: a miss invokes a
%% Cfg-supplied closure to fetch the type doc over the wire. Per the
%% design the closure has shape
%% type_fetch_fn :: fun ((TypeCid, Cfg) -> {ok, Bytes} | {error, _})
%% returning the term_codec-encoded type-doc bytes; lookup_or_fetch
%% decodes them into the TypeRecord and caches it. Keeping the
%% transport in the closure (Phase 3's discovery_type_fetch) keeps
%% peer_types testable with a mocked fetch — same split as
%% peer_actors / discovery_fetch.
%%
%% gen_server wrapper registers under the atom `peer_types` so the
%% pipeline + http_server handlers can reach it without threading a
%% Pid through Cfg.
%% ── Pure-functional API ─────────────────────────────────────────
new() -> [].
lookup(TypeCid, State) ->
case find_keyed(TypeCid, State) of
{ok, TR} -> {ok, TR};
{error, _} -> not_found
end.
store(TypeCid, TR, State) ->
set_keyed(TypeCid, TR, State).
evict(TypeCid, State) ->
delete_keyed(TypeCid, State).
types(State) -> [Cid || {Cid, _TR} <- State].
%% lookup_or_fetch/3 — cache hit returns {ok, TR, State} unchanged.
%% Cache miss pulls the type_fetch_fn out of Cfg and calls it with
%% (TypeCid, Cfg); a {ok, Bytes} reply is decoded via term_codec into
%% the TypeRecord, which is then stored. Failures (no fn, fetch error,
%% bad bytes) do NOT poison the cache so the caller can retry.
%%
%% no type_fetch_fn in Cfg -> {error, no_fetch_fn, State}
%% fn -> {ok, Bytes}, decodable -> {ok, TR, store(...)}
%% fn -> {ok, Bytes}, bad bytes -> {error, bad_type_doc, State}
%% fn -> {error, Reason} -> {error, Reason, State}
%% fn -> Other -> {error, {bad_fetch_return, Other}, State}
lookup_or_fetch(TypeCid, Cfg, State) ->
case find_keyed(TypeCid, State) of
{ok, TR} -> {ok, TR, State};
{error, _} -> fetch_and_store(TypeCid, Cfg, State)
end.
fetch_and_store(TypeCid, Cfg, State) ->
case field(type_fetch_fn, Cfg) of
nil -> {error, no_fetch_fn, State};
Fn when is_function(Fn, 2) ->
case Fn(TypeCid, Cfg) of
{ok, Bytes} ->
case decode_type_doc(Bytes) of
{ok, TR} -> {ok, TR, store(TypeCid, TR, State)};
{error, R} -> {error, R, State}
end;
{error, Reason} -> {error, Reason, State};
Other -> {error, {bad_fetch_return, Other}, State}
end;
_ -> {error, bad_fetch_fn_cfg, State}
end.
%% decode_type_doc/1 — round the wire body back through term_codec.
%% The on-wire form is term_codec:encode(TypeRecord) (Phase 3's
%% /types/<cid> route), so a clean decode yields the proplist TR.
decode_type_doc(Bytes) ->
case term_codec:decode(Bytes) of
{ok, TR, _} when is_list(TR) -> {ok, TR};
_ -> {error, bad_type_doc}
end.
%% ── gen_server wrapper ──────────────────────────────────────────
start_link() ->
start_link([]).
start_link(InitialState) ->
Pid = gen_server:start_link(peer_types, [InitialState]),
erlang:register(peer_types, Pid),
Pid.
stop() ->
R = gen_server:call(peer_types, '$gen_stop'),
erlang:unregister(peer_types),
R.
%% put/2 — store a TypeRecord under its CID. Mirrors store_srv.
put(TypeCid, TR) ->
gen_server:call(peer_types, {put, TypeCid, TR}).
%% lookup/1 — cache read. {ok, TR} | not_found.
lookup(TypeCid) ->
gen_server:call(peer_types, {lookup, TypeCid}).
%% state_for/1 — alias of lookup/1, named to match peer_actors'
%% state_for accessor used by http_server's kernel bridge.
state_for(TypeCid) ->
gen_server:call(peer_types, {lookup, TypeCid}).
known_types() ->
gen_server:call(peer_types, get_types).
evict(TypeCid) ->
gen_server:call(peer_types, {evict, TypeCid}).
%% lookup_or_fetch/2 — gen_server form. Cfg carries the type_fetch_fn.
%% Reply is {ok, TR} on hit-or-fetched, {error, Reason} otherwise.
lookup_or_fetch(TypeCid, Cfg) ->
gen_server:call(peer_types, {lookup_or_fetch, TypeCid, Cfg}).
%% gen_server callbacks
init([InitialState]) ->
{ok, InitialState}.
handle_call({put, TypeCid, TR}, _From, State) ->
{reply, ok, store(TypeCid, TR, State)};
handle_call({lookup, TypeCid}, _From, State) ->
{reply, lookup(TypeCid, State), State};
handle_call({lookup_or_fetch, TypeCid, Cfg}, _From, State) ->
case lookup_or_fetch(TypeCid, Cfg, State) of
{ok, TR, NewState} -> {reply, {ok, TR}, NewState};
{error, Reason, Same} -> {reply, {error, Reason}, Same}
end;
handle_call(get_types, _From, State) ->
{reply, types(State), State};
handle_call({evict, TypeCid}, _From, State) ->
{reply, ok, evict(TypeCid, State)}.
handle_cast(_, S) -> {noreply, S}.
handle_info(_, S) -> {noreply, S}.
%% ── Internal helpers ────────────────────────────────────────────
field(K, [{K, V} | _]) -> V;
field(K, [_ | Rest]) -> field(K, Rest);
field(_, []) -> nil.
find_keyed(_, []) -> {error, not_found};
find_keyed(K, [{K, V} | _]) -> {ok, V};
find_keyed(K, [_ | Rest]) -> find_keyed(K, Rest).
set_keyed(K, V, []) -> [{K, V}];
set_keyed(K, V, [{K, _} | Rest]) -> [{K, V} | Rest];
set_keyed(K, V, [P | Rest]) -> [P | set_keyed(K, V, Rest)].
delete_keyed(_, []) -> [];
delete_keyed(K, [{K, _} | Rest]) -> Rest;
delete_keyed(K, [P | Rest]) -> [P | delete_keyed(K, Rest)].

View File

@@ -6,7 +6,9 @@
stage_envelope/1,
stage_signature/1, stage_signature/2,
stage_replay/1, stage_replay/2,
stage_schema/1, stage_schema/2]).
stage_schema/1, stage_schema/2,
apply_object_schema/2, stage_object_schema/1,
apply_triggers/3]).
%% Validation pipeline per design §14.
%%
@@ -165,3 +167,233 @@ check_object_schema(Activity, SchemaFn) ->
stage_schema(SchemaLookup) ->
fun (Activity) -> stage_schema(Activity, SchemaLookup) end.
%% ── host-type fed Step 4: object-schema validation stage ────────
%%
%% apply_object_schema/2 — when an inbound activity's :object declares
%% a refinement type ({type, TypeName} on the object), resolve that
%% type's record and apply its refinement schema to the object's
%% :field_values. Sits between activity-type (stage_schema) validation
%% and the kernel append; rejects the activity on schema-fail.
%%
%% Resolution mirrors the design note: TypeName -> TypeCid via Cfg's
%% `type_index` ([{TypeName, TypeCid}, ...], the local Define-name
%% index), then TypeCid -> TypeRecord via peer_types:lookup_or_fetch/2
%% (a local cache hit, or a wire fetch through the Cfg type_fetch_fn).
%%
%% Outcomes:
%% object has no {type, _} -> ok (no schema applies)
%% TypeName not in type_index -> ok (undeclared type;
%% open-world default)
%% record resolved, schema passes -> ok
%% record resolved, schema fails -> {error, {validation_failed,
%% object_schema}}
%% record unresolvable (cache miss + -> strict_object_schema:
%% fetch failure / no peer_types) true -> {error, ...}
%% false -> ok (skipped)
%%
%% Default strict_object_schema = false: a node only blocks on an
%% unresolvable type when it opts into airtight validation via Cfg
%% {strict_object_schema, true}. The non-strict skip is where a
%% `validation_skipped` log entry belongs (left to the caller's logger
%% so this stage keeps the ok | {error, _} contract run_stages wants).
%%
%% A TypeRecord's refinement schema is either a 1-arity Erlang
%% predicate over the field-values (the substrate stand-in, for
%% locally-defined types) or a data constraint {required, [Field, ...]}
%% (term_codec-safe, so a wire-fetched TypeRecord can still validate).
apply_object_schema(Activity, Cfg) ->
case object_type_name(Activity) of
none -> ok;
{ok, TypeName} ->
case type_cid_for(TypeName, Cfg) of
none -> ok;
{ok, TypeCid} ->
case resolve_type_record(TypeCid, Cfg) of
{ok, TR} -> check_object_against(Activity, TR);
{error, _} -> on_unresolved_type(Cfg)
end
end
end.
stage_object_schema(Cfg) ->
fun (Activity) -> apply_object_schema(Activity, Cfg) end.
object_type_name(Activity) ->
case envelope:get_field(object, Activity) of
{ok, Obj} when is_list(Obj) ->
case envelope:get_field(type, Obj) of
{ok, T} -> {ok, T};
_ -> none
end;
_ -> none
end.
object_field_values(Activity) ->
case envelope:get_field(object, Activity) of
{ok, Obj} when is_list(Obj) ->
case envelope:get_field(field_values, Obj) of
{ok, FV} -> FV;
_ -> []
end;
_ -> []
end.
type_cid_for(TypeName, Cfg) ->
case stage_field(type_index, Cfg) of
nil -> none;
Index ->
case find_keyed(TypeName, Index) of
{ok, Cid} -> {ok, Cid};
_ -> none
end
end.
resolve_type_record(TypeCid, Cfg) ->
case stage_field(peer_types, Cfg) of
nil -> {error, no_peer_types};
_ ->
case erlang:whereis(peer_types) of
undefined -> {error, peer_types_down};
_ -> peer_types:lookup_or_fetch(TypeCid, Cfg)
end
end.
on_unresolved_type(Cfg) ->
case stage_field(strict_object_schema, Cfg) of
true -> {error, {validation_failed, object_schema}};
_ -> ok
end.
check_object_against(Activity, TR) ->
case stage_field(refinement_schema, TR) of
nil -> ok;
Schema -> apply_refinement(Schema, object_field_values(Activity))
end.
apply_refinement(Fn, FieldValues) when is_function(Fn, 1) ->
case Fn(FieldValues) of
true -> ok;
_ -> {error, {validation_failed, object_schema}}
end;
apply_refinement({required, Fields}, FieldValues) ->
case all_present(Fields, FieldValues) of
true -> ok;
false -> {error, {validation_failed, object_schema}}
end;
apply_refinement(_, _) -> ok.
all_present([], _) -> true;
all_present([F | Rest], FV) ->
case has_key(F, FV) of
true -> all_present(Rest, FV);
false -> false
end.
has_key(_, []) -> false;
has_key(K, [{K, _} | _]) -> true;
has_key(K, [_ | Rest]) -> has_key(K, Rest).
stage_field(K, [{K, V} | _]) -> V;
stage_field(K, [_ | Rest]) -> stage_field(K, Rest);
stage_field(_, []) -> nil.
find_keyed(_, []) -> {error, not_found};
find_keyed(K, [{K, V} | _]) -> {ok, V};
find_keyed(K, [_ | Rest]) -> find_keyed(K, Rest).
%% ── fed-sx triggers Step 2: post-append fan-out ─────────────────
%%
%% apply_triggers/3 — fires the durable flows bound to an activity's
%% type AFTER it has been accepted and appended (rejected activities
%% never reach here, so a flow only runs for an activity that really
%% landed). For each spec the activity's type is bound to, the spec
%% must pass its guard/actor-scope, and its {ActivityCid, TriggerCid}
%% pair must not already have fired (federation can deliver the same
%% activity twice via different peers — dedup is keyed on that pair,
%% read from the receiving actor's :triggers_fired). Surviving specs are
%% dispatched via flow_dispatch:start (a native flow_store:start), which
%% never raises.
%%
%% Returns {ok, Results} where Results is one
%% {ActivityCid, TriggerCid, {ok, FlowId} | {error, Reason}}
%% per spec actually dispatched (guard-passed, not a duplicate). The
%% kernel folds the {ActivityCid, TriggerCid} pairs into the actor's
%% :triggers_fired (dedup) and the audit triples into its projection.
%% No matching/ready registry yields {ok, []}.
%%
%% Cfg gates the fan-out on {trigger_registry, trigger_registry} (the
%% registered gen_server), mirroring the object-schema stage's
%% {peer_types, _} gate. apply_triggers must NOT be called inside a
%% `try` — flow_dispatch does gen_server:calls, and a blocking call
%% inside a try deadlocks this scheduler; the fan-out runs after append,
%% in its own step, so this is naturally satisfied.
apply_triggers(Activity, ActorState, Cfg) ->
case trigger_registry_ready(Cfg) of
false -> {ok, []};
true ->
Type = activity_type_of(Activity),
Specs = trigger_registry:lookup(Type),
ActCid = trigger_activity_cid(Activity),
Fired = field_or_default(triggers_fired, ActorState, []),
fire_each(Specs, Activity, ActorState, ActCid, Fired, Cfg, [])
end.
trigger_registry_ready(Cfg) ->
case stage_field(trigger_registry, Cfg) of
nil -> false;
_ ->
case erlang:whereis(trigger_registry) of
undefined -> false;
_ -> true
end
end.
fire_each([], _A, _AS, _ACid, _Fired, _Cfg, Acc) ->
{ok, lists:reverse(Acc)};
fire_each([Spec | Rest], A, AS, ACid, Fired, Cfg, Acc) ->
TCid = trigger_registry:spec_cid(Spec),
Pair = {ACid, TCid},
AlreadyFired = pair_member(Pair, Fired) orelse acc_member(Pair, Acc),
Pass = (not AlreadyFired) andalso flow_dispatch:guard_passes(Spec, A, AS),
case Pass of
false ->
fire_each(Rest, A, AS, ACid, Fired, Cfg, Acc);
true ->
Outcome = case flow_dispatch:start(Spec, A, AS, Cfg) of
{ok, FlowId, _Audit} -> {ok, FlowId};
{error, Reason} -> {error, Reason}
end,
fire_each(Rest, A, AS, ACid, Fired, Cfg, [{ACid, TCid, Outcome} | Acc])
end.
activity_type_of(Activity) ->
case envelope:get_field(type, Activity) of
{ok, Type} -> Type;
_ -> undefined
end.
trigger_activity_cid(Activity) ->
case envelope:get_field(id, Activity) of
{ok, Cid} -> Cid;
_ -> undefined
end.
field_or_default(Key, Proplist, Default) ->
case envelope:get_field(Key, Proplist) of
{ok, V} -> V;
_ -> Default
end.
%% pair_member/2 — {ACid, TCid} present in a [{ACid, TCid}] fired list.
pair_member(_, []) -> false;
pair_member(P, [P | _]) -> true;
pair_member(P, [_ | Rest]) -> pair_member(P, Rest).
%% acc_member/2 — {ACid, TCid} already dispatched this call (Acc holds
%% {ACid, TCid, Outcome} triples).
acc_member(_, []) -> false;
acc_member({A, T}, [{A, T, _} | _]) -> true;
acc_member(P, [_ | Rest]) -> acc_member(P, Rest).

View File

@@ -0,0 +1,180 @@
-module(trigger_registry).
-export([new/0, add/3, remove/2, lookup/2, all/1, fold/2, fold_fn/0,
mk_spec/4, spec_cid/1, spec_flow_name/1, spec_guard/1,
spec_actor_scope/1,
start_link/0, start_link/1, stop/0,
add/2, remove/1, lookup/1, all_triggers/0]).
-export([init/1, handle_call/3, handle_cast/2, handle_info/2]).
-behaviour(gen_server).
%% Trigger registry — binds activity-types to durable flows
%% (plans/agent-briefings/fed-sx-triggers-loop.md, Phase 1). When an
%% activity is appended, the kernel's post-append fan-out
%% (pipeline.erl, Phase 2) looks the activity's type up here and starts
%% each registered flow. Mirrors the peer_actors / peer_types shape: a
%% pure-functional core plus a registered gen_server, hydrated on start
%% from a fold over DefineTrigger activities.
%%
%% State shape (pure-functional):
%% [{ActivityType, [Spec, ...]}, ...]
%% Multiple triggers may bind the same activity-type; they fire
%% independently. A Spec is a 4-tuple:
%% {TriggerCid, FlowName, Guard, ActorScope}
%% TriggerCid — content-address of the DefineTrigger activity
%% (dedup + audit); `undefined` if not yet addressed.
%% FlowName — the flow_store-registered flow to start.
%% Guard — fun ((Activity, ActorState) -> bool) | undefined.
%% Lets one type bind multiple flows with
%% discriminators ("only Articles in :newsletter").
%% Resolved to a fun at registration; not carried over
%% the wire (term_codec can't encode funs).
%% ActorScope — an actor id the trigger is scoped to, or `any`.
%% ── Spec constructor / accessors ────────────────────────────────
mk_spec(TriggerCid, FlowName, Guard, ActorScope) ->
{TriggerCid, FlowName, Guard, ActorScope}.
spec_cid({Cid, _, _, _}) -> Cid.
spec_flow_name({_, FlowName, _, _}) -> FlowName.
spec_guard({_, _, Guard, _}) -> Guard.
spec_actor_scope({_, _, _, Scope}) -> Scope.
%% ── Pure-functional API ─────────────────────────────────────────
new() -> [].
%% add(ActivityType, Spec, State) — append Spec to ActivityType's list.
add(ActivityType, Spec, State) ->
Existing = lookup(ActivityType, State),
set_keyed(ActivityType, append1(Existing, Spec), State).
%% remove(TriggerCid, State) — drop every spec carrying TriggerCid,
%% across all activity-types; empties are pruned.
remove(TriggerCid, State) ->
prune([{T, drop_cid(TriggerCid, Specs)} || {T, Specs} <- State]).
%% lookup(ActivityType, State) — the specs bound to ActivityType ([] if
%% none).
lookup(ActivityType, State) ->
case find_keyed(ActivityType, State) of
{ok, Specs} -> Specs;
not_found -> []
end.
all(State) -> State.
%% ── Hydration fold ──────────────────────────────────────────────
%%
%% fold(Activity, State) — register the binding carried by a
%% DefineTrigger activity. Replaying the actor log through this fold
%% rebuilds the registry after a restart (same content-addressing
%% discipline as define_registry). A non-DefineTrigger activity passes
%% through untouched.
fold(Activity, State) ->
case envelope:get_field(type, Activity) of
{ok, define_trigger} -> fold_trigger(Activity, State);
_ -> State
end.
fold_trigger(Activity, State) ->
case envelope:get_field(object, Activity) of
{ok, Obj} ->
case binding_of(Activity, Obj) of
{ok, AType, Spec} -> add(AType, Spec, State);
not_a_binding -> State
end;
_ -> State
end.
binding_of(Activity, Obj) ->
case envelope:get_field(activity_type, Obj) of
{ok, AType} ->
case envelope:get_field(flow_name, Obj) of
{ok, FlowName} ->
Guard = field_or(guard, Obj, undefined),
Scope = field_or(actor_scope, Obj, any),
Cid = field_or(id, Activity, undefined),
{ok, AType, mk_spec(Cid, FlowName, Guard, Scope)};
_ -> not_a_binding
end;
_ -> not_a_binding
end.
%% fold_fn/0 — a 2-arity fun the projection scheduler can plant.
fold_fn() ->
fun (Activity, State) -> fold(Activity, State) end.
%% ── gen_server wrapper ──────────────────────────────────────────
start_link() ->
start_link([]).
start_link(InitialState) ->
Pid = gen_server:start_link(trigger_registry, [InitialState]),
erlang:register(trigger_registry, Pid),
Pid.
stop() ->
R = gen_server:call(trigger_registry, '$gen_stop'),
erlang:unregister(trigger_registry),
R.
add(ActivityType, Spec) ->
gen_server:call(trigger_registry, {add, ActivityType, Spec}).
remove(TriggerCid) ->
gen_server:call(trigger_registry, {remove, TriggerCid}).
lookup(ActivityType) ->
gen_server:call(trigger_registry, {lookup, ActivityType}).
all_triggers() ->
gen_server:call(trigger_registry, all_triggers).
init([InitialState]) ->
{ok, InitialState}.
handle_call({add, ActivityType, Spec}, _From, State) ->
{reply, ok, add(ActivityType, Spec, State)};
handle_call({remove, TriggerCid}, _From, State) ->
{reply, ok, remove(TriggerCid, State)};
handle_call({lookup, ActivityType}, _From, State) ->
{reply, lookup(ActivityType, State), State};
handle_call(all_triggers, _From, State) ->
{reply, State, State}.
handle_cast(_, S) -> {noreply, S}.
handle_info(_, S) -> {noreply, S}.
%% ── helpers ─────────────────────────────────────────────────────
field_or(Key, Proplist, Default) ->
case envelope:get_field(Key, Proplist) of
{ok, V} -> V;
_ -> Default
end.
drop_cid(_, []) -> [];
drop_cid(Cid, [Spec | Rest]) ->
case spec_cid(Spec) of
Cid -> drop_cid(Cid, Rest);
_ -> [Spec | drop_cid(Cid, Rest)]
end.
prune([]) -> [];
prune([{_, []} | Rest]) -> prune(Rest);
prune([P | Rest]) -> [P | prune(Rest)].
append1([], X) -> [X];
append1([H | T], X) -> [H | append1(T, X)].
find_keyed(_, []) -> not_found;
find_keyed(K, [{K, V} | _]) -> {ok, V};
find_keyed(K, [_ | Rest]) -> find_keyed(K, Rest).
set_keyed(K, V, []) -> [{K, V}];
set_keyed(K, V, [{K, _} | Rest]) -> [{K, V} | Rest];
set_keyed(K, V, [P | Rest]) -> [P | set_keyed(K, V, Rest)].

View File

@@ -79,7 +79,7 @@ cat > "$TMPFILE" <<'EPOCHS'
(eval "(get (erlang-eval-ast \"R = bootstrap:read_genesis(), {ok, S1} = bootstrap:load_genesis(R), {ok, S2} = bootstrap:load_genesis(R), cid:to_string(S1) =:= cid:to_string(S2)\") :name)")
EPOCHS
OUTPUT=$(timeout 300 "$SX_SERVER" < "$TMPFILE" 2>/dev/null)
OUTPUT=$(timeout 590 "$SX_SERVER" < "$TMPFILE" 2>/dev/null)
check() {
local epoch="$1" desc="$2" expected="$3"
@@ -106,7 +106,7 @@ check 10 "strip suffix create.sx -> create" "true"
check 11 "strip suffix hello unchanged" "true"
check 12 "strip suffix .sx -> empty" "true"
check 13 "load_genesis rejects bad shape" "ok"
check 20 "loaded activity_types count = 5" "5"
check 20 "loaded activity_types count = 8" "8"
check 21 "loaded object_types count = 13" "13"
check 22 "loaded projections count = 7" "7"
check 23 "loaded validators count = 3" "3"

View File

@@ -99,8 +99,8 @@ check() {
check 2 "gen_server loaded" "gen_server"
check 3 "registry loaded" "registry"
check 4 "bootstrap loaded" "bootstrap"
check 10 "populate returns total 36" "36"
check 20 "activity_types count = 5" "5"
check 10 "populate returns total 39" "39"
check 20 "activity_types count = 8" "8"
check 21 "object_types count = 13" "13"
check 22 "projections count = 7" "7"
check 23 "validators count = 3" "3"

View File

@@ -102,7 +102,7 @@ check 10 "sections/0 length" "7"
check 11 "ends_with_sx create.sx" "true"
check 12 "ends_with_sx hello" "false"
check 13 "ends_with_sx empty" "false"
check 20 "section activity_types count" "5"
check 20 "section activity_types count" "8"
check 21 "section object_types count" "13"
check 22 "section projections count" "7"
check 23 "section validators count" "3"
@@ -111,7 +111,7 @@ check 25 "section sig_suites count" "2"
check 26 "section audience count" "3"
check 30 "read_genesis returns 7 sections" "7"
check 31 "first section name" "activity_types"
check 32 "first section entry count" "5"
check 32 "first section entry count" "8"
TOTAL=$((PASS+FAIL))
if [ $FAIL -eq 0 ]; then

View File

@@ -54,6 +54,12 @@ cat > "$TMPFILE" <<EPOCHS
(eval "(get (erlang-load-module (file-read \"next/kernel/nx_kernel.erl\")) :name)")
(epoch 10)
(eval "(get (erlang-load-module (file-read \"next/kernel/bootstrap.erl\")) :name)")
;; outbox:publish computes a delivery set via follower_graph + delivery
;; (compute_delivery_set/3) — load both so the publish path resolves.
(epoch 11)
(eval "(get (erlang-load-module (file-read \"next/kernel/follower_graph.erl\")) :name)")
(epoch 12)
(eval "(get (erlang-load-module (file-read \"next/kernel/delivery.erl\")) :name)")
;; bootstrap:start returns a Pid
(epoch 20)
@@ -115,10 +121,10 @@ check() {
check 10 "bootstrap module loaded" "bootstrap"
check 20 "whereis(nx_kernel) is Pid" "true"
check 21 "activity_types count = 5" "5"
check 21 "activity_types count = 8" "8"
check 22 "object_types count = 13" "13"
check 23 "projections count = 7" "7"
check 24 "total entries = 36" "36"
check 24 "total entries = 39" "39"
check 25 "fresh log_tip = 0" "0"
check 26 "publish advances tip to 1" "1"
check 27 "actor_id = alice" "true"

99
next/tests/define_trigger.sh Executable file
View File

@@ -0,0 +1,99 @@
#!/usr/bin/env bash
# next/tests/define_trigger.sh — fed-sx triggers Phase 1 (verb).
#
# The DefineTrigger genesis verb
# (next/genesis/activity-types/define_trigger.sx) binds an activity-type
# to a flow. This suite confirms it parses with the expected
# DefineActivity head + :name, that its :schema accepts a well-formed
# binding and rejects malformed ones, and that a DefineTrigger envelope
# round-trips through term_codec.
set -uo pipefail
cd "$(git rev-parse --show-toplevel)"
SX_SERVER="${SX_SERVER:-hosts/ocaml/_build/default/bin/sx_server.exe}"
if [ ! -x "$SX_SERVER" ]; then
SX_SERVER="/root/rose-ash/hosts/ocaml/_build/default/bin/sx_server.exe"
fi
if [ ! -x "$SX_SERVER" ]; then
echo "ERROR: sx_server.exe not found." >&2
exit 1
fi
VERBOSE="${1:-}"
PASS=0; FAIL=0; ERRORS=""
TMPFILE=$(mktemp); trap "rm -f $TMPFILE" EXIT
SCH='(eval-expr (get (apply dict (rest (parse (file-read \"next/genesis/activity-types/define_trigger.sx\")))) :schema))'
cat > "$TMPFILE" <<EPOCHS
(epoch 1)
(load "lib/erlang/tokenizer.sx")
(load "lib/erlang/parser.sx")
(load "lib/erlang/parser-core.sx")
(load "lib/erlang/parser-expr.sx")
(load "lib/erlang/parser-module.sx")
(load "lib/erlang/transpile.sx")
(load "lib/erlang/runtime.sx")
(load "lib/erlang/vm/dispatcher.sx")
(epoch 2)
(eval "(get (erlang-load-module (file-read \"next/kernel/term_codec.erl\")) :name)")
;; ── parse / shape ──────────────────────────────────────────
(epoch 10)
(eval "(first (parse (file-read \"next/genesis/activity-types/define_trigger.sx\")))")
(epoch 11)
(eval "(get (apply dict (rest (parse (file-read \"next/genesis/activity-types/define_trigger.sx\")))) :name)")
;; ── schema accept / reject ─────────────────────────────────
;; valid binding: string :activity-type + :flow-name -> true
(epoch 20)
(eval "(define sch ${SCH}) (sch (dict :object (dict :activity-type \"Create\" :flow-name \"blog-publish-digest\")))")
;; reject: missing :activity-type -> false
(epoch 21)
(eval "(define sch ${SCH}) (sch (dict :object (dict :flow-name \"f\")))")
;; reject: missing :flow-name -> false
(epoch 22)
(eval "(define sch ${SCH}) (sch (dict :object (dict :activity-type \"Create\")))")
;; ── envelope round-trip through term_codec ─────────────────
(epoch 30)
(eval "(get (erlang-eval-ast \"A = [{type, define_trigger}, {actor, alice}, {object, [{activity_type, create}, {flow_name, blog_publish_digest}]}], {ok, D, _} = term_codec:decode(term_codec:encode(A)), D =:= A\") :name)")
EPOCHS
OUTPUT=$(timeout 180 "$SX_SERVER" < "$TMPFILE" 2>/dev/null)
check() {
local epoch="$1" desc="$2" expected="$3"
local actual
actual=$(echo "$OUTPUT" | awk -v e="$epoch" '
$0 ~ "^\\(ok-len " e " " { getline; print; exit }
$0 ~ "^\\(ok " e " " { print; exit }
$0 ~ "^\\(error " e " " { print; exit }
')
[ -z "$actual" ] && actual="<no output for epoch $epoch>"
if echo "$actual" | grep -qF -- "$expected"; then
PASS=$((PASS+1))
[ "$VERBOSE" = "-v" ] && echo " ok $desc"
else
FAIL=$((FAIL+1))
ERRORS+=" FAIL [$desc] (epoch $epoch) expected: $expected | actual: $actual
"
fi
}
check 10 "define_trigger.sx head form" "DefineActivity"
check 11 "define_trigger.sx name" "DefineTrigger"
check 20 "schema accepts valid binding" "true"
check 21 "schema rejects missing type" "false"
check 22 "schema rejects missing flow-name" "false"
check 30 "DefineTrigger envelope round-trips" "true"
TOTAL=$((PASS+FAIL))
if [ $FAIL -eq 0 ]; then
echo "ok $PASS/$TOTAL next/tests/define_trigger.sh passed"
else
echo "FAIL $PASS/$TOTAL passed, $FAIL failed:"
echo "$ERRORS"
fi
[ $FAIL -eq 0 ]

110
next/tests/define_type.sh Executable file
View File

@@ -0,0 +1,110 @@
#!/usr/bin/env bash
# next/tests/define_type.sh — host-type federation Phase 1 acceptance.
#
# The DefineType genesis verb (next/genesis/activity-types/define_type.sx)
# declares a refinement type. This suite confirms:
# - the file parses with the expected DefineActivity head + :name
# - the :schema predicate accepts a well-formed type-definition
# activity and rejects malformed ones (missing :name, non-list
# :fields)
# - a DefineType envelope round-trips through term_codec
#
# Schema bodies are SX source; we eval them with `eval-expr` and call
# the resulting lambda directly (note: `apply` does not spread into
# SX lambdas in this kernel, and keyword-getters are not callable —
# the schema uses nested `get`). 7 cases.
set -uo pipefail
cd "$(git rev-parse --show-toplevel)"
SX_SERVER="${SX_SERVER:-hosts/ocaml/_build/default/bin/sx_server.exe}"
if [ ! -x "$SX_SERVER" ]; then
SX_SERVER="/root/rose-ash/hosts/ocaml/_build/default/bin/sx_server.exe"
fi
if [ ! -x "$SX_SERVER" ]; then
echo "ERROR: sx_server.exe not found." >&2
exit 1
fi
VERBOSE="${1:-}"
PASS=0; FAIL=0; ERRORS=""
TMPFILE=$(mktemp); trap "rm -f $TMPFILE" EXIT
# The schema fn, evaluated from the genesis file into a lambda.
SCH='(eval-expr (get (apply dict (rest (parse (file-read \"next/genesis/activity-types/define_type.sx\")))) :schema))'
cat > "$TMPFILE" <<EPOCHS
(epoch 1)
(load "lib/erlang/tokenizer.sx")
(load "lib/erlang/parser.sx")
(load "lib/erlang/parser-core.sx")
(load "lib/erlang/parser-expr.sx")
(load "lib/erlang/parser-module.sx")
(load "lib/erlang/transpile.sx")
(load "lib/erlang/runtime.sx")
(load "lib/erlang/vm/dispatcher.sx")
(epoch 2)
(eval "(get (erlang-load-module (file-read \"next/kernel/term_codec.erl\")) :name)")
;; ── parse / shape ──────────────────────────────────────────
(epoch 10)
(eval "(first (parse (file-read \"next/genesis/activity-types/define_type.sx\")))")
(epoch 11)
(eval "(get (apply dict (rest (parse (file-read \"next/genesis/activity-types/define_type.sx\")))) :name)")
;; ── schema accept / reject ─────────────────────────────────
;; valid: :object with string :name and list :fields -> true
(epoch 20)
(eval "(define sch ${SCH}) (sch (dict :object (dict :name \"Post\" :fields (list))))")
;; valid: :fields omitted (optional) -> true
(epoch 21)
(eval "(define sch ${SCH}) (sch (dict :object (dict :name \"Post\")))")
;; reject: missing :name -> false
(epoch 22)
(eval "(define sch ${SCH}) (sch (dict :object (dict :fields (list))))")
;; reject: :fields present but not a list -> false
(epoch 23)
(eval "(define sch ${SCH}) (sch (dict :object (dict :name \"Post\" :fields \"notalist\")))")
;; ── envelope round-trip through term_codec ─────────────────
(epoch 30)
(eval "(get (erlang-eval-ast \"A = [{type, define_type}, {actor, alice}, {object, [{name, <<80,111,115,116>>}, {instance_type, <<78,111,116,101>>}]}], {ok, D, _} = term_codec:decode(term_codec:encode(A)), D =:= A\") :name)")
EPOCHS
OUTPUT=$(timeout 180 "$SX_SERVER" < "$TMPFILE" 2>/dev/null)
check() {
local epoch="$1" desc="$2" expected="$3"
local actual
actual=$(echo "$OUTPUT" | awk -v e="$epoch" '
$0 ~ "^\\(ok-len " e " " { getline; print; exit }
$0 ~ "^\\(ok " e " " { print; exit }
$0 ~ "^\\(error " e " " { print; exit }
')
[ -z "$actual" ] && actual="<no output for epoch $epoch>"
if echo "$actual" | grep -qF -- "$expected"; then
PASS=$((PASS+1))
[ "$VERBOSE" = "-v" ] && echo " ok $desc"
else
FAIL=$((FAIL+1))
ERRORS+=" FAIL [$desc] (epoch $epoch) expected: $expected | actual: $actual
"
fi
}
check 10 "define_type.sx head form" "DefineActivity"
check 11 "define_type.sx name" "DefineType"
check 20 "schema accepts valid type def" "true"
check 21 "schema accepts omitted :fields" "true"
check 22 "schema rejects missing :name" "false"
check 23 "schema rejects non-list :fields" "false"
check 30 "DefineType envelope round-trips" "true"
TOTAL=$((PASS+FAIL))
if [ $FAIL -eq 0 ]; then
echo "ok $PASS/$TOTAL next/tests/define_type.sh passed"
else
echo "FAIL $PASS/$TOTAL passed, $FAIL failed:"
echo "$ERRORS"
fi
[ $FAIL -eq 0 ]

View File

@@ -0,0 +1,176 @@
#!/usr/bin/env bash
# next/tests/discovery_type_fetch.sh — host-type federation Phase 3.
#
# Client side of the type-doc wire: discovery_type_fetch builds the
# fun/2 closure peer_types:lookup_or_fetch calls on a cache miss. It
# GETs <base>/types/<cid> with the type-doc Accept header and returns
# the RAW response bytes (peer_types decodes them via term_codec).
# Exercised end-to-end against a background python http server that
# serves hand-crafted term_codec bytes, so we test the wire — not just
# an in-process call.
set -uo pipefail
cd "$(git rev-parse --show-toplevel)"
SX_SERVER="${SX_SERVER:-hosts/ocaml/_build/default/bin/sx_server.exe}"
if [ ! -x "$SX_SERVER" ]; then
SX_SERVER="/root/rose-ash/hosts/ocaml/_build/default/bin/sx_server.exe"
fi
if [ ! -x "$SX_SERVER" ]; then
echo "ERROR: sx_server.exe not found." >&2
exit 1
fi
VERBOSE="${1:-}"
PASS=0; FAIL=0; ERRORS=""
# ── live stub server ─────────────────────────────────────────
# GET /types/bafy1 -> 200 with term_codec-encoded TypeRecord
# TR = [{name, <<"Post">>}, {instance_type, <<"Note">>}]
# GET anything else -> 404
PORT=$(python3 -c 'import socket;s=socket.socket();s.bind(("127.0.0.1",0));print(s.getsockname()[1]);s.close()')
SRVROOT=$(mktemp -d)
PYSRV="$SRVROOT/srv.py"
cat > "$PYSRV" <<'PY'
import sys, http.server, socketserver
PORT = int(sys.argv[1])
# term_codec encoding (mirror of next/kernel/term_codec.erl).
def enc_atom(s):
b = s.encode()
return f"a{len(b)}:".encode() + b
def enc_bin(b):
return f"b{len(b)}:".encode() + b
def enc_tuple(items):
return f"t{len(items)}:".encode() + b"".join(items)
def enc_list(items):
return f"l{len(items)}:".encode() + b"".join(items)
# [{name, <<"Post">>}, {instance_type, <<"Note">>}]
TYPEDOC = enc_list([
enc_tuple([enc_atom("name"), enc_bin(b"Post")]),
enc_tuple([enc_atom("instance_type"), enc_bin(b"Note")]),
])
class H(http.server.BaseHTTPRequestHandler):
def do_GET(self):
if self.path == "/types/bafy1":
self.send_response(200)
self.send_header('content-type','application/vnd.fed-sx.type-doc')
self.send_header('content-length', str(len(TYPEDOC)))
self.end_headers()
self.wfile.write(TYPEDOC)
else:
self.send_response(404); self.end_headers(); self.wfile.write(b'not found')
def log_message(self, fmt, *args): pass
with socketserver.TCPServer(("127.0.0.1", PORT), H) as srv:
srv.serve_forever()
PY
python3 "$PYSRV" "$PORT" >/dev/null 2>&1 &
SRV_PID=$!
TMPFILE=$(mktemp)
trap "rm -rf $SRVROOT $TMPFILE; kill $SRV_PID 2>/dev/null || true" EXIT
for _ in 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15; do
if curl -fsS "http://127.0.0.1:$PORT/types/bafy1" >/dev/null 2>&1; then break; fi
sleep 0.2
done
bytes_of() { python3 -c "import sys; print(','.join(str(b) for b in sys.argv[1].encode()))" "$1"; }
URL_BASE_BYTES=$(bytes_of "http://127.0.0.1:$PORT")
cat > "$TMPFILE" <<'EPOCHS'
(epoch 1)
(load "lib/erlang/tokenizer.sx")
(load "lib/erlang/parser.sx")
(load "lib/erlang/parser-core.sx")
(load "lib/erlang/parser-expr.sx")
(load "lib/erlang/parser-module.sx")
(load "lib/erlang/transpile.sx")
(load "lib/erlang/runtime.sx")
(load "lib/erlang/vm/dispatcher.sx")
(epoch 2)
(eval "(er-load-gen-server!)")
(epoch 3)
(eval "(get (erlang-load-module (file-read \"next/kernel/term_codec.erl\")) :name)")
(epoch 4)
(eval "(get (erlang-load-module (file-read \"next/kernel/peer_types.erl\")) :name)")
(epoch 5)
(eval "(get (erlang-load-module (file-read \"next/kernel/discovery_type_fetch.erl\")) :name)")
;; accept_header is the 31-byte type-doc MIME
(epoch 10)
(eval "(get (erlang-eval-ast \"byte_size(discovery_type_fetch:accept_header()) =:= 31\") :name)")
;; type_doc_url builds <base>/types/bafy1
(epoch 11)
(eval "(get (erlang-eval-ast \"U = discovery_type_fetch:type_doc_url(<<__URL_BASE__>>, <<98,97,102,121,49>>), U =:= <<__URL_BASE__,47,116,121,112,101,115,47,98,97,102,121,49>>\") :name)")
;; resolve_type_url via the static type_url proplist
(epoch 12)
(eval "(get (erlang-eval-ast \"discovery_type_fetch:resolve_type_url(<<98,97,102,121,49>>, [{type_url, [{<<98,97,102,121,49>>, <<__URL_BASE__>>}]}]) =:= {ok, <<__URL_BASE__>>}\") :name)")
;; fetch live -> {ok, Bytes} that decode to the TypeRecord
(epoch 13)
(eval "(get (erlang-eval-ast \"R = discovery_type_fetch:fetch(<<__URL_BASE__,47,116,121,112,101,115,47,98,97,102,121,49>>, []), case R of {ok, B} -> {ok, TR, _} = term_codec:decode(B), TR =:= [{name, <<80,111,115,116>>}, {instance_type, <<78,111,116,101>>}]; _ -> false end\") :name)")
;; closure from make_fetch_fn/0 dispatches and returns raw bytes
(epoch 14)
(eval "(get (erlang-eval-ast \"Fn = discovery_type_fetch:make_fetch_fn(), Cfg = [{type_url, [{<<98,97,102,121,49>>, <<__URL_BASE__>>}]}], case Fn(<<98,97,102,121,49>>, Cfg) of {ok, B} -> {ok, TR, _} = term_codec:decode(B), TR =:= [{name, <<80,111,115,116>>}, {instance_type, <<78,111,116,101>>}]; _ -> false end\") :name)")
;; closure with no resolver -> {error, no_type_url}
(epoch 15)
(eval "(get (erlang-eval-ast \"Fn = discovery_type_fetch:make_fetch_fn(), case Fn(<<98,97,102,121,49>>, []) of {error, no_type_url} -> true; _ -> false end\") :name)")
;; fetch on an unknown cid path -> {error, {status, 404}}
(epoch 16)
(eval "(get (erlang-eval-ast \"R = discovery_type_fetch:fetch(<<__URL_BASE__,47,116,121,112,101,115,47,122,122,122>>, []), case R of {error, {status, 404}} -> true; _ -> false end\") :name)")
;; end-to-end: peer_types:lookup_or_fetch uses the closure, decodes,
;; and writes the TypeRecord into the cache
(epoch 17)
(eval "(get (erlang-eval-ast \"Fn = discovery_type_fetch:make_fetch_fn(), Cfg = [{type_fetch_fn, Fn}, {type_url, [{<<98,97,102,121,49>>, <<__URL_BASE__>>}]}], case peer_types:lookup_or_fetch(<<98,97,102,121,49>>, Cfg, peer_types:new()) of {ok, TR, S} -> TR =:= [{name, <<80,111,115,116>>}, {instance_type, <<78,111,116,101>>}] andalso peer_types:types(S) =:= [<<98,97,102,121,49>>]; _ -> false end\") :name)")
EPOCHS
sed -i "s|__URL_BASE__|${URL_BASE_BYTES}|g" "$TMPFILE"
OUTPUT=$(timeout 300 "$SX_SERVER" < "$TMPFILE" 2>/dev/null)
check() {
local epoch="$1" desc="$2" expected="$3"
local actual
actual=$(echo "$OUTPUT" | awk -v e="$epoch" '
$0 ~ "^\\(ok-len " e " " { getline; print; exit }
$0 ~ "^\\(ok " e " " { print; exit }
$0 ~ "^\\(error " e " " { print; exit }
')
[ -z "$actual" ] && actual="<no output for epoch $epoch>"
if echo "$actual" | grep -qF -- "$expected"; then
PASS=$((PASS+1))
[ "$VERBOSE" = "-v" ] && echo " ok $desc"
else
FAIL=$((FAIL+1))
ERRORS+=" FAIL [$desc] (epoch $epoch) expected: $expected | actual: $actual
"
fi
}
check 5 "discovery_type_fetch loaded" "discovery_type_fetch"
check 10 "accept_header is 31-byte type-doc" "true"
check 11 "type_doc_url builds /types/<cid>" "true"
check 12 "resolve_type_url via type_url map" "true"
check 13 "fetch live -> raw bytes decode to TR" "true"
check 14 "closure -> raw bytes decode to TR" "true"
check 15 "closure no resolver -> no_type_url" "true"
check 16 "fetch 404 path -> {status, 404}" "true"
check 17 "lookup_or_fetch caches fetched TR" "true"
TOTAL=$((PASS+FAIL))
if [ $FAIL -eq 0 ]; then
echo "ok $PASS/$TOTAL next/tests/discovery_type_fetch.sh passed"
else
echo "FAIL $PASS/$TOTAL passed, $FAIL failed:"
echo "$ERRORS"
fi
[ $FAIL -eq 0 ]

130
next/tests/flow_dispatch.sh Executable file
View File

@@ -0,0 +1,130 @@
#!/usr/bin/env bash
# next/tests/flow_dispatch.sh — fed-sx triggers Phase 3.
#
# flow_dispatch bridges a matched trigger to a started flow — a native
# flow_store:start (the engine is Erlang-on-SX too, no FFI). Confirms
# guard/actor-scope gating, the audit triple, synchronous first-step
# execution, suspend/resume of a started instance, a branch on an
# activity field, and graceful handling of an unknown flow name.
set -uo pipefail
cd "$(git rev-parse --show-toplevel)"
SX_SERVER="${SX_SERVER:-hosts/ocaml/_build/default/bin/sx_server.exe}"
if [ ! -x "$SX_SERVER" ]; then
SX_SERVER="/root/rose-ash/hosts/ocaml/_build/default/bin/sx_server.exe"
fi
if [ ! -x "$SX_SERVER" ]; then
echo "ERROR: sx_server.exe not found." >&2
exit 1
fi
VERBOSE="${1:-}"
PASS=0; FAIL=0; ERRORS=""
TMPFILE=$(mktemp); trap "rm -f $TMPFILE" EXIT
# Activity (Create of a Note by alice), receiving actor-state, and a
# couple of flows: `capture` echoes the activity's type out of the
# flow's input env; `wait_flow` suspends then wraps the resumed value;
# `cat_flow` branches on the inner object's :type.
ACT='[{type, create}, {actor, alice}, {id, <<97,99,105,100>>}, {object, [{type, note}]}]'
AS='[{actor_id, alice}]'
CAP='flow_spec:flow_node(fun(In) -> {ok, A} = envelope:get_field(activity, In), {ok, T} = envelope:get_field(type, A), T end)'
WAITF='flow_spec:sequence([flow:suspend(w), flow_spec:flow_node(fun(V) -> {got, V} end)])'
CATF='flow_spec:branch(fun(In) -> {ok, A} = envelope:get_field(activity, In), {ok, O} = envelope:get_field(object, A), envelope:get_field(type, O) =:= {ok, note} end, flow_spec:flow_const(is_note), flow_spec:flow_const(not_note))'
cat > "$TMPFILE" <<EPOCHS
(epoch 1)
(load "lib/erlang/tokenizer.sx")
(load "lib/erlang/parser.sx")
(load "lib/erlang/parser-core.sx")
(load "lib/erlang/parser-expr.sx")
(load "lib/erlang/parser-module.sx")
(load "lib/erlang/transpile.sx")
(load "lib/erlang/runtime.sx")
(load "lib/erlang/vm/dispatcher.sx")
(epoch 2)
(eval "(er-load-gen-server!)")
(eval "(get (erlang-load-module (file-read \"next/kernel/envelope.erl\")) :name)")
(eval "(get (erlang-load-module (file-read \"next/flow/flow.erl\")) :name)")
(eval "(get (erlang-load-module (file-read \"next/flow/flow_spec.erl\")) :name)")
(eval "(get (erlang-load-module (file-read \"next/flow/flow_store.erl\")) :name)")
(eval "(get (erlang-load-module (file-read \"next/kernel/trigger_registry.erl\")) :name)")
(epoch 3)
(eval "(get (erlang-load-module (file-read \"next/kernel/flow_dispatch.erl\")) :name)")
;; ── guard / actor-scope gating ─────────────────────────────
(epoch 10)
(eval "(get (erlang-eval-ast \"flow_dispatch:guard_passes(trigger_registry:mk_spec(c, f, undefined, any), ${ACT}, ${AS})\") :name)")
(epoch 11)
(eval "(get (erlang-eval-ast \"flow_dispatch:guard_passes(trigger_registry:mk_spec(c, f, fun(_, _) -> false end, any), ${ACT}, ${AS}) =:= false\") :name)")
(epoch 12)
(eval "(get (erlang-eval-ast \"flow_dispatch:guard_passes(trigger_registry:mk_spec(c, f, fun(A, _) -> envelope:get_field(actor, A) =:= {ok, alice} end, any), ${ACT}, ${AS})\") :name)")
(epoch 13)
(eval "(get (erlang-eval-ast \"flow_dispatch:guard_passes(trigger_registry:mk_spec(c, f, undefined, alice), ${ACT}, ${AS})\") :name)")
(epoch 14)
(eval "(get (erlang-eval-ast \"flow_dispatch:guard_passes(trigger_registry:mk_spec(c, f, undefined, bob), ${ACT}, ${AS}) =:= false\") :name)")
;; ── start: audit triple + synchronous first step ───────────
(epoch 20)
(eval "(get (erlang-eval-ast \"flow_store:start_link(), flow_store:register_flow(capture, ${CAP}), flow_dispatch:start(trigger_registry:mk_spec(<<116,99>>, capture, undefined, any), ${ACT}, ${AS}, []) =:= {ok, 1, {<<97,99,105,100>>, <<116,99>>, 1}}\") :name)")
(epoch 21)
(eval "(get (erlang-eval-ast \"flow_store:start_link(), flow_store:register_flow(capture, ${CAP}), {ok, FlowId, _} = flow_dispatch:start(trigger_registry:mk_spec(<<116,99>>, capture, undefined, any), ${ACT}, ${AS}, []), flow_store:status(FlowId) =:= {ok, {done, create}}\") :name)")
;; ── unknown flow name -> {error, no_such_flow}, no crash ────
(epoch 30)
(eval "(get (erlang-eval-ast \"flow_store:start_link(), flow_dispatch:start(trigger_registry:mk_spec(<<116,99>>, ghostflow, undefined, any), ${ACT}, ${AS}, []) =:= {error, no_such_flow}\") :name)")
;; ── started instance suspends; resume completes ────────────
(epoch 40)
(eval "(get (erlang-eval-ast \"flow_store:start_link(), flow_store:register_flow(wait_flow, ${WAITF}), {ok, FlowId, _} = flow_dispatch:start(trigger_registry:mk_spec(<<116,99>>, wait_flow, undefined, any), ${ACT}, ${AS}, []), S1 = flow_store:status(FlowId), R = flow_store:resume(FlowId, 7), S1 =:= {ok, {suspended, w}} andalso R =:= {ok, {flow_done, {got, 7}}}\") :name)")
;; ── branch on an activity field (both branches) ────────────
(epoch 50)
(eval "(get (erlang-eval-ast \"flow_store:start_link(), flow_store:register_flow(cat_flow, ${CATF}), {ok, FlowId, _} = flow_dispatch:start(trigger_registry:mk_spec(<<116,99>>, cat_flow, undefined, any), ${ACT}, ${AS}, []), flow_store:status(FlowId) =:= {ok, {done, is_note}}\") :name)")
(epoch 51)
(eval "(get (erlang-eval-ast \"flow_store:start_link(), flow_store:register_flow(cat_flow, ${CATF}), {ok, FlowId, _} = flow_dispatch:start(trigger_registry:mk_spec(<<116,99>>, cat_flow, undefined, any), [{type, create}, {actor, alice}, {id, <<120>>}, {object, [{type, article}]}], ${AS}, []), flow_store:status(FlowId) =:= {ok, {done, not_note}}\") :name)")
EPOCHS
OUTPUT=$(timeout 360 "$SX_SERVER" < "$TMPFILE" 2>/dev/null)
check() {
local epoch="$1" desc="$2" expected="$3"
local actual
actual=$(echo "$OUTPUT" | awk -v e="$epoch" '
$0 ~ "^\\(ok-len " e " " { getline; print; exit }
$0 ~ "^\\(ok " e " " { print; exit }
$0 ~ "^\\(error " e " " { print; exit }
')
[ -z "$actual" ] && actual="<no output for epoch $epoch>"
if echo "$actual" | grep -qF -- "$expected"; then
PASS=$((PASS+1))
[ "$VERBOSE" = "-v" ] && echo " ok $desc"
else
FAIL=$((FAIL+1))
ERRORS+=" FAIL [$desc] (epoch $epoch) expected: $expected | actual: $actual
"
fi
}
check 3 "flow_dispatch module loaded" "flow_dispatch"
check 10 "undefined guard + any scope pass" "true"
check 11 "guard false -> no pass" "true"
check 12 "guard true on activity field" "true"
check 13 "actor-scope match passes" "true"
check 14 "actor-scope mismatch fails" "true"
check 20 "start returns audit triple" "true"
check 21 "first step runs synchronously" "true"
check 30 "unknown flow -> no_such_flow" "true"
check 40 "started flow suspends + resumes" "true"
check 50 "branch then-arm (is_note)" "true"
check 51 "branch else-arm (not_note)" "true"
TOTAL=$((PASS+FAIL))
if [ $FAIL -eq 0 ]; then
echo "ok $PASS/$TOTAL next/tests/flow_dispatch.sh passed"
else
echo "FAIL $PASS/$TOTAL passed, $FAIL failed:"
echo "$ERRORS"
fi
[ $FAIL -eq 0 ]

View File

@@ -48,6 +48,18 @@ cat > "$TMPFILE" <<'EPOCHS'
(eval "(first (parse (file-read \"next/genesis/activity-types/endorse.sx\")))")
(epoch 200)
(eval "(get (apply dict (rest (parse (file-read \"next/genesis/activity-types/endorse.sx\")))) :name)")
(epoch 201)
(eval "(first (parse (file-read \"next/genesis/activity-types/define_type.sx\")))")
(epoch 202)
(eval "(get (apply dict (rest (parse (file-read \"next/genesis/activity-types/define_type.sx\")))) :name)")
(epoch 203)
(eval "(first (parse (file-read \"next/genesis/activity-types/subtype_of.sx\")))")
(epoch 204)
(eval "(get (apply dict (rest (parse (file-read \"next/genesis/activity-types/subtype_of.sx\")))) :name)")
(epoch 205)
(eval "(first (parse (file-read \"next/genesis/activity-types/define_trigger.sx\")))")
(epoch 206)
(eval "(get (apply dict (rest (parse (file-read \"next/genesis/activity-types/define_trigger.sx\")))) :name)")
(epoch 19)
(eval "(len (get (apply dict (rest (parse (file-read \"next/genesis/manifest.sx\")))) :activity-types))")
(epoch 30)
@@ -180,7 +192,13 @@ check 27 "announce.sx head form" "DefineActivity"
check 28 "announce.sx name is Announce" "Announce"
check 29 "endorse.sx head form" "DefineActivity"
check 200 "endorse.sx name is Endorse" "Endorse"
check 19 "manifest has 5 activity-types" "5"
check 201 "define_type.sx head form" "DefineActivity"
check 202 "define_type.sx name" "DefineType"
check 203 "subtype_of.sx head form" "DefineActivity"
check 204 "subtype_of.sx name" "SubtypeOf"
check 205 "define_trigger.sx head form" "DefineActivity"
check 206 "define_trigger.sx name" "DefineTrigger"
check 19 "manifest has 8 activity-types" "8"
check 30 "sx-artifact.sx head form" "DefineObject"
check 31 "sx-artifact.sx name" "SXArtifact"
check 32 "note.sx name" "Note"

154
next/tests/object_schema.sh Executable file
View File

@@ -0,0 +1,154 @@
#!/usr/bin/env bash
# next/tests/object_schema.sh — host-type federation Phase 4.
#
# pipeline:apply_object_schema/2 validates an inbound activity's inner
# object against its declared refinement type. The type is resolved
# TypeName -> TypeCid (Cfg type_index) -> TypeRecord
# (peer_types:lookup_or_fetch, a local hit or a wire fetch), then the
# record's refinement schema is applied to the object's :field_values.
# Default strict_object_schema = false: an unresolvable type is let
# through; opt-in strict rejects.
#
# Refinement schemas are either a 1-arity Erlang predicate (the
# substrate stand-in, locally stored) or a term_codec-safe
# {required, [Field,...]} constraint (so a wire-fetched record still
# validates). Both are exercised here.
set -uo pipefail
cd "$(git rev-parse --show-toplevel)"
SX_SERVER="${SX_SERVER:-hosts/ocaml/_build/default/bin/sx_server.exe}"
if [ ! -x "$SX_SERVER" ]; then
SX_SERVER="/root/rose-ash/hosts/ocaml/_build/default/bin/sx_server.exe"
fi
if [ ! -x "$SX_SERVER" ]; then
echo "ERROR: sx_server.exe not found." >&2
exit 1
fi
VERBOSE="${1:-}"
PASS=0; FAIL=0; ERRORS=""
TMPFILE=$(mktemp); trap "rm -f $TMPFILE" EXIT
# Cid is the Post type's CID; TRdata carries a data-form refinement
# (object must have a `title` field), TRfun the Erlang-predicate form.
# ActValid's object has :title, ActFail's doesn't, ActNoType's object
# declares no type, ActUnknown's type isn't in the index. PostName is
# <<"Post">>, title "Hi" = <<72,105>>. Index maps name -> Cid.
SETUP='Cid = <<98,97,102,121,80>>, PostName = <<80,111,115,116>>, TRdata = [{name, PostName}, {refinement_schema, {required, [title]}}], TRfun = [{name, PostName}, {refinement_schema, fun(FV) -> case FV of [{title, _} | _] -> true; _ -> false end end}], ObjValid = [{type, PostName}, {field_values, [{title, <<72,105>>}, {body, <<104,105>>}]}], ObjFail = [{type, PostName}, {field_values, [{body, <<104,105>>}]}], ActValid = [{type, create}, {actor, alice}, {object, ObjValid}], ActFail = [{type, create}, {actor, alice}, {object, ObjFail}], ActNoType = [{type, create}, {actor, alice}, {object, [{field_values, [{title, <<72,105>>}]}]}], ActUnknown = [{type, create}, {actor, alice}, {object, [{type, <<82,101,112,108,121>>}, {field_values, [{title, <<72,105>>}]}]}], Index = [{PostName, Cid}], FAIL = {error, {validation_failed, object_schema}},'
cat > "$TMPFILE" <<EPOCHS
(epoch 1)
(load "lib/erlang/tokenizer.sx")
(load "lib/erlang/parser.sx")
(load "lib/erlang/parser-core.sx")
(load "lib/erlang/parser-expr.sx")
(load "lib/erlang/parser-module.sx")
(load "lib/erlang/transpile.sx")
(load "lib/erlang/runtime.sx")
(load "lib/erlang/vm/dispatcher.sx")
(epoch 2)
(eval "(er-load-gen-server!)")
(epoch 3)
(eval "(get (erlang-load-module (file-read \"next/kernel/envelope.erl\")) :name)")
(epoch 4)
(eval "(get (erlang-load-module (file-read \"next/kernel/term_codec.erl\")) :name)")
(epoch 5)
(eval "(get (erlang-load-module (file-read \"next/kernel/peer_types.erl\")) :name)")
(epoch 6)
(eval "(get (erlang-load-module (file-read \"next/kernel/pipeline.erl\")) :name)")
;; local registry match + valid object -> accepted
(epoch 10)
(eval "(get (erlang-eval-ast \"${SETUP} peer_types:start_link(), peer_types:put(Cid, TRdata), Cfg = [{peer_types, peer_types}, {type_index, Index}], pipeline:apply_object_schema(ActValid, Cfg) =:= ok\") :name)")
;; local match + refinement-failing object -> rejected
(epoch 11)
(eval "(get (erlang-eval-ast \"${SETUP} peer_types:start_link(), peer_types:put(Cid, TRdata), Cfg = [{peer_types, peer_types}, {type_index, Index}], pipeline:apply_object_schema(ActFail, Cfg) =:= FAIL\") :name)")
;; type not cached, fetch succeeds -> validates against fetched record
(epoch 12)
(eval "(get (erlang-eval-ast \"${SETUP} peer_types:start_link(), Cfg = [{peer_types, peer_types}, {type_index, Index}, {type_fetch_fn, fun(_, _) -> {ok, term_codec:encode(TRdata)} end}], pipeline:apply_object_schema(ActValid, Cfg) =:= ok\") :name)")
;; fetched record, failing object -> rejected
(epoch 13)
(eval "(get (erlang-eval-ast \"${SETUP} peer_types:start_link(), Cfg = [{peer_types, peer_types}, {type_index, Index}, {type_fetch_fn, fun(_, _) -> {ok, term_codec:encode(TRdata)} end}], pipeline:apply_object_schema(ActFail, Cfg) =:= FAIL\") :name)")
;; unknown type, fetch fails, strict not set -> accepted (skipped)
(epoch 14)
(eval "(get (erlang-eval-ast \"${SETUP} peer_types:start_link(), Cfg = [{peer_types, peer_types}, {type_index, Index}, {type_fetch_fn, fun(_, _) -> {error, http_404} end}], pipeline:apply_object_schema(ActValid, Cfg) =:= ok\") :name)")
;; unknown type, fetch fails, strict set -> rejected
(epoch 15)
(eval "(get (erlang-eval-ast \"${SETUP} peer_types:start_link(), Cfg = [{peer_types, peer_types}, {type_index, Index}, {type_fetch_fn, fun(_, _) -> {error, http_404} end}, {strict_object_schema, true}], pipeline:apply_object_schema(ActValid, Cfg) =:= FAIL\") :name)")
;; no peer_types cfg at all, non-strict -> accepted (skipped)
(epoch 16)
(eval "(get (erlang-eval-ast \"${SETUP} Cfg = [{type_index, Index}], pipeline:apply_object_schema(ActValid, Cfg) =:= ok\") :name)")
;; no peer_types cfg, strict -> rejected
(epoch 17)
(eval "(get (erlang-eval-ast \"${SETUP} Cfg = [{type_index, Index}, {strict_object_schema, true}], pipeline:apply_object_schema(ActValid, Cfg) =:= FAIL\") :name)")
;; object without inner {type, _} -> skipped (accepted)
(epoch 18)
(eval "(get (erlang-eval-ast \"${SETUP} peer_types:start_link(), peer_types:put(Cid, TRdata), Cfg = [{peer_types, peer_types}, {type_index, Index}], pipeline:apply_object_schema(ActNoType, Cfg) =:= ok\") :name)")
;; object type not in the local index -> skipped (open-world)
(epoch 19)
(eval "(get (erlang-eval-ast \"${SETUP} peer_types:start_link(), peer_types:put(Cid, TRdata), Cfg = [{peer_types, peer_types}, {type_index, Index}], pipeline:apply_object_schema(ActUnknown, Cfg) =:= ok\") :name)")
;; Erlang-predicate refinement schema: valid -> ok, failing -> reject
(epoch 20)
(eval "(get (erlang-eval-ast \"${SETUP} peer_types:start_link(), peer_types:put(Cid, TRfun), Cfg = [{peer_types, peer_types}, {type_index, Index}], pipeline:apply_object_schema(ActValid, Cfg) =:= ok\") :name)")
(epoch 21)
(eval "(get (erlang-eval-ast \"${SETUP} peer_types:start_link(), peer_types:put(Cid, TRfun), Cfg = [{peer_types, peer_types}, {type_index, Index}], pipeline:apply_object_schema(ActFail, Cfg) =:= FAIL\") :name)")
;; type known but record carries no refinement schema -> accepted
(epoch 22)
(eval "(get (erlang-eval-ast \"${SETUP} peer_types:start_link(), peer_types:put(Cid, [{name, PostName}]), Cfg = [{peer_types, peer_types}, {type_index, Index}], pipeline:apply_object_schema(ActFail, Cfg) =:= ok\") :name)")
;; stage_object_schema/1 yields a 1-arity stage usable by run_stages
(epoch 23)
(eval "(get (erlang-eval-ast \"${SETUP} peer_types:start_link(), peer_types:put(Cid, TRdata), Cfg = [{peer_types, peer_types}, {type_index, Index}], Stage = pipeline:stage_object_schema(Cfg), is_function(Stage, 1) andalso pipeline:run_stages(ActValid, [Stage]) =:= ok andalso pipeline:run_stages(ActFail, [Stage]) =:= FAIL\") :name)")
EPOCHS
OUTPUT=$(timeout 300 "$SX_SERVER" < "$TMPFILE" 2>/dev/null)
check() {
local epoch="$1" desc="$2" expected="$3"
local actual
actual=$(echo "$OUTPUT" | awk -v e="$epoch" '
$0 ~ "^\\(ok-len " e " " { getline; print; exit }
$0 ~ "^\\(ok " e " " { print; exit }
$0 ~ "^\\(error " e " " { print; exit }
')
[ -z "$actual" ] && actual="<no output for epoch $epoch>"
if echo "$actual" | grep -qF -- "$expected"; then
PASS=$((PASS+1))
[ "$VERBOSE" = "-v" ] && echo " ok $desc"
else
FAIL=$((FAIL+1))
ERRORS+=" FAIL [$desc] (epoch $epoch) expected: $expected | actual: $actual
"
fi
}
check 6 "pipeline module loaded" "pipeline"
check 10 "local match + valid -> accepted" "true"
check 11 "local match + failing -> rejected" "true"
check 12 "fetch ok -> validates fetched record" "true"
check 13 "fetched record + failing -> rejected" "true"
check 14 "fetch fail, non-strict -> accepted" "true"
check 15 "fetch fail, strict -> rejected" "true"
check 16 "no peer_types, non-strict -> accepted" "true"
check 17 "no peer_types, strict -> rejected" "true"
check 18 "object without type -> skipped" "true"
check 19 "type not in index -> skipped" "true"
check 20 "fun schema valid -> accepted" "true"
check 21 "fun schema failing -> rejected" "true"
check 22 "no refinement schema -> accepted" "true"
check 23 "stage_object_schema composes" "true"
TOTAL=$((PASS+FAIL))
if [ $FAIL -eq 0 ]; then
echo "ok $PASS/$TOTAL next/tests/object_schema.sh passed"
else
echo "FAIL $PASS/$TOTAL passed, $FAIL failed:"
echo "$ERRORS"
fi
[ $FAIL -eq 0 ]

155
next/tests/peer_types.sh Executable file
View File

@@ -0,0 +1,155 @@
#!/usr/bin/env bash
# next/tests/peer_types.sh — host-type federation Phase 2 acceptance.
#
# Receiver-side peer-types cache (next/kernel/peer_types.erl), a mirror
# of peer_actors keyed by type CID. Tracks {TypeCidBytes, TypeRecord}
# pairs so the object-schema validation stage can vet inbound objects
# against a fetched-once refinement type. lookup_or_fetch pulls a
# Cfg-supplied type_fetch_fn on a miss, decodes the returned wire bytes
# via term_codec, and caches the TypeRecord.
set -uo pipefail
cd "$(git rev-parse --show-toplevel)"
SX_SERVER="${SX_SERVER:-hosts/ocaml/_build/default/bin/sx_server.exe}"
if [ ! -x "$SX_SERVER" ]; then
SX_SERVER="/root/rose-ash/hosts/ocaml/_build/default/bin/sx_server.exe"
fi
if [ ! -x "$SX_SERVER" ]; then
echo "ERROR: sx_server.exe not found." >&2
exit 1
fi
VERBOSE="${1:-}"
PASS=0; FAIL=0; ERRORS=""
TMPFILE=$(mktemp); trap "rm -f $TMPFILE" EXIT
# TR1/TR2 are TypeRecords (the DefineType :object payloads). Doc1 is
# TR1's on-wire form (term_codec). FetchOk serves Doc1 for Cid1;
# FetchBad returns undecodable bytes. CfgOk/CfgBad/CfgNone vary the
# type_fetch_fn slot.
SETUP='Cid1 = <<98,97,102,121,49>>, Cid2 = <<98,97,102,121,50>>, TR1 = [{name, <<80,111,115,116>>}, {instance_type, <<78,111,116,101>>}], TR2 = [{name, <<82,101,112,108,121>>}], Doc1 = term_codec:encode(TR1), FetchOk = fun(C, _) -> case C =:= Cid1 of true -> {ok, Doc1}; false -> {error, not_found} end end, FetchBad = fun(_, _) -> {ok, <<255>>} end, CfgOk = [{type_fetch_fn, FetchOk}], CfgBad = [{type_fetch_fn, FetchBad}], CfgNone = [],'
cat > "$TMPFILE" <<EPOCHS
(epoch 1)
(load "lib/erlang/tokenizer.sx")
(load "lib/erlang/parser.sx")
(load "lib/erlang/parser-core.sx")
(load "lib/erlang/parser-expr.sx")
(load "lib/erlang/parser-module.sx")
(load "lib/erlang/transpile.sx")
(load "lib/erlang/runtime.sx")
(load "lib/erlang/vm/dispatcher.sx")
(epoch 2)
(eval "(er-load-gen-server!)")
(epoch 3)
(eval "(get (erlang-load-module (file-read \"next/kernel/term_codec.erl\")) :name)")
(epoch 4)
(eval "(get (erlang-load-module (file-read \"next/kernel/peer_types.erl\")) :name)")
;; ── pure API ───────────────────────────────────────────────
;; new/0 -> []
(epoch 10)
(eval "(get (erlang-eval-ast \"peer_types:new() =:= []\") :name)")
;; lookup miss -> not_found
(epoch 11)
(eval "(get (erlang-eval-ast \"peer_types:lookup(<<1>>, peer_types:new()) =:= not_found\") :name)")
;; store + lookup round-trip
(epoch 12)
(eval "(get (erlang-eval-ast \"${SETUP} S = peer_types:store(Cid1, TR1, peer_types:new()), peer_types:lookup(Cid1, S) =:= {ok, TR1}\") :name)")
;; types/1 lists CIDs in insertion order
(epoch 13)
(eval "(get (erlang-eval-ast \"${SETUP} S = peer_types:store(Cid2, TR2, peer_types:store(Cid1, TR1, peer_types:new())), peer_types:types(S) =:= [Cid1, Cid2]\") :name)")
;; evict removes the entry
(epoch 14)
(eval "(get (erlang-eval-ast \"${SETUP} S = peer_types:evict(Cid1, peer_types:store(Cid1, TR1, peer_types:new())), peer_types:lookup(Cid1, S) =:= not_found\") :name)")
;; ── lookup_or_fetch (pure) ─────────────────────────────────
;; miss -> fetch via Cfg.fn, decode bytes, cache TR
(epoch 20)
(eval "(get (erlang-eval-ast \"${SETUP} case peer_types:lookup_or_fetch(Cid1, CfgOk, peer_types:new()) of {ok, TR1, [{Cid1, TR1}]} -> ok; _ -> bad end\") :name)")
;; hit -> returns cached without calling fetch
(epoch 21)
(eval "(get (erlang-eval-ast \"${SETUP} S = peer_types:store(Cid1, TR1, peer_types:new()), case peer_types:lookup_or_fetch(Cid1, CfgBad, S) of {ok, TR1, S} -> ok; _ -> bad end\") :name)")
;; no type_fetch_fn -> {error, no_fetch_fn}, cache untouched
(epoch 22)
(eval "(get (erlang-eval-ast \"${SETUP} case peer_types:lookup_or_fetch(Cid1, CfgNone, peer_types:new()) of {error, no_fetch_fn, []} -> ok; _ -> bad end\") :name)")
;; fetch error does NOT poison the cache
(epoch 23)
(eval "(get (erlang-eval-ast \"${SETUP} BadCfg = [{type_fetch_fn, fun(_, _) -> {error, http_404} end}], case peer_types:lookup_or_fetch(Cid1, BadCfg, peer_types:new()) of {error, http_404, []} -> ok; _ -> bad end\") :name)")
;; undecodable bytes -> {error, bad_type_doc}
(epoch 24)
(eval "(get (erlang-eval-ast \"${SETUP} case peer_types:lookup_or_fetch(Cid1, CfgBad, peer_types:new()) of {error, bad_type_doc, []} -> ok; _ -> bad end\") :name)")
;; ── gen_server API ─────────────────────────────────────────
;; start_link + put + lookup round-trip
(epoch 30)
(eval "(get (erlang-eval-ast \"${SETUP} peer_types:start_link(), peer_types:put(Cid1, TR1), peer_types:lookup(Cid1) =:= {ok, TR1}\") :name)")
;; lookup miss -> not_found
(epoch 31)
(eval "(get (erlang-eval-ast \"peer_types:start_link(), peer_types:lookup(<<9>>) =:= not_found\") :name)")
;; state_for is an alias of lookup
(epoch 32)
(eval "(get (erlang-eval-ast \"${SETUP} peer_types:start_link(), peer_types:put(Cid1, TR1), peer_types:state_for(Cid1) =:= {ok, TR1}\") :name)")
;; known_types lists stored CIDs
(epoch 33)
(eval "(get (erlang-eval-ast \"${SETUP} peer_types:start_link(), peer_types:put(Cid1, TR1), peer_types:put(Cid2, TR2), peer_types:known_types() =:= [Cid1, Cid2]\") :name)")
;; lookup_or_fetch miss fetches + caches
(epoch 34)
(eval "(get (erlang-eval-ast \"${SETUP} peer_types:start_link(), R = peer_types:lookup_or_fetch(Cid1, CfgOk), R =:= {ok, TR1} andalso peer_types:known_types() =:= [Cid1]\") :name)")
;; lookup_or_fetch with no fn -> {error, no_fetch_fn}, pristine
(epoch 35)
(eval "(get (erlang-eval-ast \"${SETUP} peer_types:start_link(), R = peer_types:lookup_or_fetch(Cid1, CfgNone), R =:= {error, no_fetch_fn} andalso peer_types:known_types() =:= []\") :name)")
;; start_link/1 pre-populates the cache
(epoch 36)
(eval "(get (erlang-eval-ast \"${SETUP} peer_types:start_link([{Cid1, TR1}]), peer_types:lookup(Cid1) =:= {ok, TR1}\") :name)")
EPOCHS
OUTPUT=$(timeout 300 "$SX_SERVER" < "$TMPFILE" 2>/dev/null)
check() {
local epoch="$1" desc="$2" expected="$3"
local actual
actual=$(echo "$OUTPUT" | awk -v e="$epoch" '
$0 ~ "^\\(ok-len " e " " { getline; print; exit }
$0 ~ "^\\(ok " e " " { print; exit }
$0 ~ "^\\(error " e " " { print; exit }
')
[ -z "$actual" ] && actual="<no output for epoch $epoch>"
if echo "$actual" | grep -qF -- "$expected"; then
PASS=$((PASS+1))
[ "$VERBOSE" = "-v" ] && echo " ok $desc"
else
FAIL=$((FAIL+1))
ERRORS+=" FAIL [$desc] (epoch $epoch) expected: $expected | actual: $actual
"
fi
}
check 4 "peer_types module loaded" "peer_types"
check 10 "new/0 -> []" "true"
check 11 "lookup miss -> not_found" "true"
check 12 "store + lookup round-trip" "true"
check 13 "types/1 lists in insertion order" "true"
check 14 "evict removes entry" "true"
check 20 "lookup_or_fetch miss fetches" "ok"
check 21 "lookup_or_fetch hit skips fetch" "ok"
check 22 "no fetch_fn -> no_fetch_fn" "ok"
check 23 "fetch error doesn't poison" "ok"
check 24 "undecodable bytes -> bad_type_doc" "ok"
check 30 "gen_server put + lookup" "true"
check 31 "gen_server lookup miss" "true"
check 32 "gen_server state_for alias" "true"
check 33 "gen_server known_types lists" "true"
check 34 "gen_server fetch + cache" "true"
check 35 "gen_server no fn -> pristine" "true"
check 36 "start_link/1 pre-populates" "true"
TOTAL=$((PASS+FAIL))
if [ $FAIL -eq 0 ]; then
echo "ok $PASS/$TOTAL next/tests/peer_types.sh passed"
else
echo "FAIL $PASS/$TOTAL passed, $FAIL failed:"
echo "$ERRORS"
fi
[ $FAIL -eq 0 ]

140
next/tests/peer_types_route.sh Executable file
View File

@@ -0,0 +1,140 @@
#!/usr/bin/env bash
# next/tests/peer_types_route.sh — host-type federation Phase 3.
#
# Server side of the type-doc wire: http_server serves
# GET /types/<cid> Accept: application/vnd.fed-sx.type-doc
# as the term_codec-encoded TypeRecord pulled from the peer_types
# cache; 404 if the cid isn't cached. Exercised via http_server:route
# in-process (the established pattern — see http_actors.sh) so the
# route resolution + content negotiation are tested without a live
# socket. The peer_types gen_server holds the cache across epochs.
set -uo pipefail
cd "$(git rev-parse --show-toplevel)"
SX_SERVER="${SX_SERVER:-hosts/ocaml/_build/default/bin/sx_server.exe}"
if [ ! -x "$SX_SERVER" ]; then
SX_SERVER="/root/rose-ash/hosts/ocaml/_build/default/bin/sx_server.exe"
fi
if [ ! -x "$SX_SERVER" ]; then
echo "ERROR: sx_server.exe not found." >&2
exit 1
fi
VERBOSE="${1:-}"
PASS=0; FAIL=0; ERRORS=""
TMPFILE=$(mktemp); trap "rm -f $TMPFILE" EXIT
# TR is the served TypeRecord, Cid its key. AccV is the type-doc
# Accept header value, CT the content-type key. Cfg opts the route
# into the peer_types cache. ReqHit / ReqMiss / ReqEmpty / ReqPost
# vary the request line.
SETUP='TR = [{name, <<80,111,115,116>>}, {instance_type, <<78,111,116,101>>}], Cid = <<98,97,102,121,49>>, peer_types:start_link(), peer_types:put(Cid, TR), AcK = <<97,99,99,101,112,116>>, AcV = <<97,112,112,108,105,99,97,116,105,111,110,47,118,110,100,46,102,101,100,45,115,120,46,116,121,112,101,45,100,111,99>>, Hs = [{AcK, AcV}], Cfg = [{peer_types, peer_types}],'
cat > "$TMPFILE" <<EPOCHS
(epoch 1)
(load "lib/erlang/tokenizer.sx")
(load "lib/erlang/parser.sx")
(load "lib/erlang/parser-core.sx")
(load "lib/erlang/parser-expr.sx")
(load "lib/erlang/parser-module.sx")
(load "lib/erlang/transpile.sx")
(load "lib/erlang/runtime.sx")
(load "lib/erlang/vm/dispatcher.sx")
(epoch 2)
(eval "(er-load-gen-server!)")
(epoch 3)
(eval "(get (erlang-load-module (file-read \"next/kernel/envelope.erl\")) :name)")
(epoch 4)
(eval "(get (erlang-load-module (file-read \"next/kernel/term_codec.erl\")) :name)")
(epoch 5)
(eval "(get (erlang-load-module (file-read \"next/kernel/peer_types.erl\")) :name)")
(epoch 6)
(eval "(get (erlang-load-module (file-read \"next/kernel/http_server.erl\")) :name)")
;; ── negotiation + prefix primitives ────────────────────────
;; Accept: type-doc negotiates to the type_doc format atom
(epoch 10)
(eval "(get (erlang-eval-ast \"http_server:accept_format(<<97,112,112,108,105,99,97,116,105,111,110,47,118,110,100,46,102,101,100,45,115,120,46,116,121,112,101,45,100,111,99>>) =:= type_doc\") :name)")
;; type_doc content type is 31 bytes
(epoch 11)
(eval "(get (erlang-eval-ast \"byte_size(http_server:content_type_for(type_doc)) =:= 31\") :name)")
;; types_prefix is "/types/" — 7 bytes
(epoch 12)
(eval "(get (erlang-eval-ast \"byte_size(http_server:types_prefix()) =:= 7\") :name)")
;; ── GET /types/<cid> ───────────────────────────────────────
;; cache hit -> 200
(epoch 20)
(eval "(get (erlang-eval-ast \"${SETUP} Req = [{method, <<71,69,84>>}, {path, <<47,116,121,112,101,115,47,98,97,102,121,49>>}, {headers, Hs}], R = http_server:route(Req, Cfg), {ok, S} = envelope:get_field(status, R), S =:= 200\") :name)")
;; body decodes back to the stored TypeRecord
(epoch 21)
(eval "(get (erlang-eval-ast \"${SETUP} Req = [{method, <<71,69,84>>}, {path, <<47,116,121,112,101,115,47,98,97,102,121,49>>}, {headers, Hs}], R = http_server:route(Req, Cfg), {ok, B} = envelope:get_field(body, R), {ok, DTR, _} = term_codec:decode(B), DTR =:= TR\") :name)")
;; response carries the type-doc content type
(epoch 22)
(eval "(get (erlang-eval-ast \"${SETUP} Req = [{method, <<71,69,84>>}, {path, <<47,116,121,112,101,115,47,98,97,102,121,49>>}, {headers, Hs}], R = http_server:route(Req, Cfg), {ok, Hdrs} = envelope:get_field(headers, R), {_CTK, CTV} = hd(Hdrs), CTV =:= http_server:content_type_for(type_doc)\") :name)")
;; type_doc_response_for/2 direct: known cid -> 200
(epoch 23)
(eval "(get (erlang-eval-ast \"${SETUP} R = http_server:type_doc_response_for(Cid, Cfg), {ok, S} = envelope:get_field(status, R), S =:= 200\") :name)")
;; ── misses + wrong method ──────────────────────────────────
;; unknown cid -> 404
(epoch 30)
(eval "(get (erlang-eval-ast \"${SETUP} Req = [{method, <<71,69,84>>}, {path, <<47,116,121,112,101,115,47,122,122,122>>}, {headers, Hs}], R = http_server:route(Req, Cfg), {ok, S} = envelope:get_field(status, R), S =:= 404\") :name)")
;; empty cid (GET /types/) -> 404
(epoch 31)
(eval "(get (erlang-eval-ast \"${SETUP} Req = [{method, <<71,69,84>>}, {path, <<47,116,121,112,101,115,47>>}, {headers, Hs}], R = http_server:route(Req, Cfg), {ok, S} = envelope:get_field(status, R), S =:= 404\") :name)")
;; no peer_types cfg -> 404 even for a known cid
(epoch 32)
(eval "(get (erlang-eval-ast \"${SETUP} Req = [{method, <<71,69,84>>}, {path, <<47,116,121,112,101,115,47,98,97,102,121,49>>}, {headers, Hs}], R = http_server:route(Req, []), {ok, S} = envelope:get_field(status, R), S =:= 404\") :name)")
;; POST /types/<cid> -> 404 (only GET serves type docs)
(epoch 33)
(eval "(get (erlang-eval-ast \"${SETUP} Req = [{method, <<80,79,83,84>>}, {path, <<47,116,121,112,101,115,47,98,97,102,121,49>>}, {headers, Hs}], R = http_server:route(Req, Cfg), {ok, S} = envelope:get_field(status, R), S =:= 404\") :name)")
;; existing routes intact: GET / still 200
(epoch 34)
(eval "(get (erlang-eval-ast \"${SETUP} Req = [{method, <<71,69,84>>}, {path, <<47>>}], R = http_server:route(Req, Cfg), {ok, S} = envelope:get_field(status, R), S =:= 200\") :name)")
EPOCHS
OUTPUT=$(timeout 300 "$SX_SERVER" < "$TMPFILE" 2>/dev/null)
check() {
local epoch="$1" desc="$2" expected="$3"
local actual
actual=$(echo "$OUTPUT" | awk -v e="$epoch" '
$0 ~ "^\\(ok-len " e " " { getline; print; exit }
$0 ~ "^\\(ok " e " " { print; exit }
$0 ~ "^\\(error " e " " { print; exit }
')
[ -z "$actual" ] && actual="<no output for epoch $epoch>"
if echo "$actual" | grep -qF -- "$expected"; then
PASS=$((PASS+1))
[ "$VERBOSE" = "-v" ] && echo " ok $desc"
else
FAIL=$((FAIL+1))
ERRORS+=" FAIL [$desc] (epoch $epoch) expected: $expected | actual: $actual
"
fi
}
check 6 "http_server module loaded" "http_server"
check 10 "Accept type-doc -> type_doc" "true"
check 11 "type_doc content type = 31 bytes" "true"
check 12 "types_prefix = 7 bytes" "true"
check 20 "GET /types/<cid> hit -> 200" "true"
check 21 "body decodes to TypeRecord" "true"
check 22 "response is type-doc content type" "true"
check 23 "type_doc_response_for hit -> 200" "true"
check 30 "unknown cid -> 404" "true"
check 31 "empty cid -> 404" "true"
check 32 "no peer_types cfg -> 404" "true"
check 33 "POST /types/<cid> -> 404" "true"
check 34 "existing GET / route intact" "true"
TOTAL=$((PASS+FAIL))
if [ $FAIL -eq 0 ]; then
echo "ok $PASS/$TOTAL next/tests/peer_types_route.sh passed"
else
echo "FAIL $PASS/$TOTAL passed, $FAIL failed:"
echo "$ERRORS"
fi
[ $FAIL -eq 0 ]

129
next/tests/pipeline_triggers.sh Executable file
View File

@@ -0,0 +1,129 @@
#!/usr/bin/env bash
# next/tests/pipeline_triggers.sh — fed-sx triggers Phase 2.
#
# pipeline:apply_triggers/3 is the post-append fan-out: a successfully
# appended activity has its type looked up in the trigger registry, and
# each surviving spec (guard + actor-scope pass, not already fired) is
# dispatched to a durable flow. Confirms lookup -> dispatch, no-match,
# guard rejection, {activity,trigger}-cid dedup, multi-bind, graceful
# handling of an unknown flow and a crashing flow, and the cfg gate.
set -uo pipefail
cd "$(git rev-parse --show-toplevel)"
SX_SERVER="${SX_SERVER:-hosts/ocaml/_build/default/bin/sx_server.exe}"
if [ ! -x "$SX_SERVER" ]; then
SX_SERVER="/root/rose-ash/hosts/ocaml/_build/default/bin/sx_server.exe"
fi
if [ ! -x "$SX_SERVER" ]; then
echo "ERROR: sx_server.exe not found." >&2
exit 1
fi
VERBOSE="${1:-}"
PASS=0; FAIL=0; ERRORS=""
TMPFILE=$(mktemp); trap "rm -f $TMPFILE" EXIT
ACT='[{type, create}, {actor, alice}, {id, <<97,99,105,100>>}, {object, [{type, note}]}]'
AS='[{actor_id, alice}]'
CFG='[{trigger_registry, trigger_registry}]'
DONEF='flow_spec:flow_const(ran)'
BOOMF='flow_spec:flow_node(fun(_) -> error(kaboom) end)'
cat > "$TMPFILE" <<EPOCHS
(epoch 1)
(load "lib/erlang/tokenizer.sx")
(load "lib/erlang/parser.sx")
(load "lib/erlang/parser-core.sx")
(load "lib/erlang/parser-expr.sx")
(load "lib/erlang/parser-module.sx")
(load "lib/erlang/transpile.sx")
(load "lib/erlang/runtime.sx")
(load "lib/erlang/vm/dispatcher.sx")
(epoch 2)
(eval "(er-load-gen-server!)")
(eval "(get (erlang-load-module (file-read \"next/kernel/envelope.erl\")) :name)")
(eval "(get (erlang-load-module (file-read \"next/flow/flow.erl\")) :name)")
(eval "(get (erlang-load-module (file-read \"next/flow/flow_spec.erl\")) :name)")
(eval "(get (erlang-load-module (file-read \"next/flow/flow_store.erl\")) :name)")
(eval "(get (erlang-load-module (file-read \"next/kernel/trigger_registry.erl\")) :name)")
(eval "(get (erlang-load-module (file-read \"next/kernel/flow_dispatch.erl\")) :name)")
(epoch 3)
(eval "(get (erlang-load-module (file-read \"next/kernel/pipeline.erl\")) :name)")
;; ── lookup -> dispatch ─────────────────────────────────────
(epoch 10)
(eval "(get (erlang-eval-ast \"trigger_registry:start_link(), trigger_registry:add(create, trigger_registry:mk_spec(<<116,99>>, ranflow, undefined, any)), flow_store:start_link(), flow_store:register_flow(ranflow, ${DONEF}), pipeline:apply_triggers(${ACT}, ${AS}, ${CFG}) =:= {ok, [{<<97,99,105,100>>, <<116,99>>, {ok, 1}}]}\") :name)")
;; the dispatched flow really ran (instance recorded done)
(epoch 11)
(eval "(get (erlang-eval-ast \"trigger_registry:start_link(), trigger_registry:add(create, trigger_registry:mk_spec(<<116,99>>, ranflow, undefined, any)), flow_store:start_link(), flow_store:register_flow(ranflow, ${DONEF}), pipeline:apply_triggers(${ACT}, ${AS}, ${CFG}), flow_store:status(1) =:= {ok, {done, ran}}\") :name)")
;; ── no matching trigger -> no dispatch ─────────────────────
(epoch 20)
(eval "(get (erlang-eval-ast \"trigger_registry:start_link(), flow_store:start_link(), pipeline:apply_triggers(${ACT}, ${AS}, ${CFG}) =:= {ok, []}\") :name)")
;; ── guard returns false -> no dispatch ─────────────────────
(epoch 30)
(eval "(get (erlang-eval-ast \"trigger_registry:start_link(), trigger_registry:add(create, trigger_registry:mk_spec(<<116,99>>, ranflow, fun(_, _) -> false end, any)), flow_store:start_link(), flow_store:register_flow(ranflow, ${DONEF}), pipeline:apply_triggers(${ACT}, ${AS}, ${CFG}) =:= {ok, []}\") :name)")
;; ── dedup: already-fired {activity,trigger} pair -> skipped ─
(epoch 40)
(eval "(get (erlang-eval-ast \"trigger_registry:start_link(), trigger_registry:add(create, trigger_registry:mk_spec(<<116,99>>, ranflow, undefined, any)), flow_store:start_link(), flow_store:register_flow(ranflow, ${DONEF}), pipeline:apply_triggers(${ACT}, [{actor_id, alice}, {triggers_fired, [{<<97,99,105,100>>, <<116,99>>}]}], ${CFG}) =:= {ok, []}\") :name)")
;; ── multiple triggers for the same type -> each dispatched ─
(epoch 50)
(eval "(get (erlang-eval-ast \"trigger_registry:start_link(), trigger_registry:add(create, trigger_registry:mk_spec(<<116,49>>, ranflow, undefined, any)), trigger_registry:add(create, trigger_registry:mk_spec(<<116,50>>, ranflow, undefined, any)), flow_store:start_link(), flow_store:register_flow(ranflow, ${DONEF}), {ok, Rs} = pipeline:apply_triggers(${ACT}, ${AS}, ${CFG}), length(Rs) =:= 2\") :name)")
;; ── unknown flow name -> {error, _} in results, no crash ───
(epoch 60)
(eval "(get (erlang-eval-ast \"trigger_registry:start_link(), trigger_registry:add(create, trigger_registry:mk_spec(<<116,99>>, ghostflow, undefined, any)), flow_store:start_link(), pipeline:apply_triggers(${ACT}, ${AS}, ${CFG}) =:= {ok, [{<<97,99,105,100>>, <<116,99>>, {error, no_such_flow}}]}\") :name)")
;; ── crashing flow -> isolated as {error, {flow_crashed, _}} ─
(epoch 61)
(eval "(get (erlang-eval-ast \"trigger_registry:start_link(), trigger_registry:add(create, trigger_registry:mk_spec(<<116,99>>, boom, undefined, any)), flow_store:start_link(), flow_store:register_flow(boom, ${BOOMF}), {ok, [{_, _, Outcome}]} = pipeline:apply_triggers(${ACT}, ${AS}, ${CFG}), case Outcome of {error, {flow_crashed, _}} -> true; _ -> false end\") :name)")
;; ── no trigger_registry cfg -> {ok, []} ────────────────────
(epoch 70)
(eval "(get (erlang-eval-ast \"trigger_registry:start_link(), trigger_registry:add(create, trigger_registry:mk_spec(<<116,99>>, ranflow, undefined, any)), flow_store:start_link(), flow_store:register_flow(ranflow, ${DONEF}), pipeline:apply_triggers(${ACT}, ${AS}, []) =:= {ok, []}\") :name)")
EPOCHS
OUTPUT=$(timeout 360 "$SX_SERVER" < "$TMPFILE" 2>/dev/null)
check() {
local epoch="$1" desc="$2" expected="$3"
local actual
actual=$(echo "$OUTPUT" | awk -v e="$epoch" '
$0 ~ "^\\(ok-len " e " " { getline; print; exit }
$0 ~ "^\\(ok " e " " { print; exit }
$0 ~ "^\\(error " e " " { print; exit }
')
[ -z "$actual" ] && actual="<no output for epoch $epoch>"
if echo "$actual" | grep -qF -- "$expected"; then
PASS=$((PASS+1))
[ "$VERBOSE" = "-v" ] && echo " ok $desc"
else
FAIL=$((FAIL+1))
ERRORS+=" FAIL [$desc] (epoch $epoch) expected: $expected | actual: $actual
"
fi
}
check 3 "pipeline module loaded" "pipeline"
check 10 "lookup -> dispatch (audit)" "true"
check 11 "dispatched flow actually ran" "true"
check 20 "no matching trigger -> no dispatch" "true"
check 30 "guard false -> no dispatch" "true"
check 40 "dedup already-fired -> skipped" "true"
check 50 "multi-bind: each dispatched" "true"
check 60 "unknown flow -> error in results" "true"
check 61 "crashing flow isolated" "true"
check 70 "no registry cfg -> no dispatch" "true"
TOTAL=$((PASS+FAIL))
if [ $FAIL -eq 0 ]; then
echo "ok $PASS/$TOTAL next/tests/pipeline_triggers.sh passed"
else
echo "FAIL $PASS/$TOTAL passed, $FAIL failed:"
echo "$ERRORS"
fi
[ $FAIL -eq 0 ]

103
next/tests/subtype_of.sh Executable file
View File

@@ -0,0 +1,103 @@
#!/usr/bin/env bash
# next/tests/subtype_of.sh — host-type federation Phase 1 acceptance.
#
# The SubtypeOf genesis verb (next/genesis/activity-types/subtype_of.sx)
# records a hierarchy edge between two previously-defined types. This
# suite confirms:
# - the file parses with the expected DefineActivity head + :name
# - the :schema predicate accepts an edge carrying both CIDs and
# rejects edges missing either side
# - a SubtypeOf envelope round-trips through term_codec
#
# Schema bodies are SX source; we eval them with `eval-expr` and call
# the resulting lambda directly. 7 cases.
set -uo pipefail
cd "$(git rev-parse --show-toplevel)"
SX_SERVER="${SX_SERVER:-hosts/ocaml/_build/default/bin/sx_server.exe}"
if [ ! -x "$SX_SERVER" ]; then
SX_SERVER="/root/rose-ash/hosts/ocaml/_build/default/bin/sx_server.exe"
fi
if [ ! -x "$SX_SERVER" ]; then
echo "ERROR: sx_server.exe not found." >&2
exit 1
fi
VERBOSE="${1:-}"
PASS=0; FAIL=0; ERRORS=""
TMPFILE=$(mktemp); trap "rm -f $TMPFILE" EXIT
SCH='(eval-expr (get (apply dict (rest (parse (file-read \"next/genesis/activity-types/subtype_of.sx\")))) :schema))'
cat > "$TMPFILE" <<EPOCHS
(epoch 1)
(load "lib/erlang/tokenizer.sx")
(load "lib/erlang/parser.sx")
(load "lib/erlang/parser-core.sx")
(load "lib/erlang/parser-expr.sx")
(load "lib/erlang/parser-module.sx")
(load "lib/erlang/transpile.sx")
(load "lib/erlang/runtime.sx")
(load "lib/erlang/vm/dispatcher.sx")
(epoch 2)
(eval "(get (erlang-load-module (file-read \"next/kernel/term_codec.erl\")) :name)")
;; ── parse / shape ──────────────────────────────────────────
(epoch 10)
(eval "(first (parse (file-read \"next/genesis/activity-types/subtype_of.sx\")))")
(epoch 11)
(eval "(get (apply dict (rest (parse (file-read \"next/genesis/activity-types/subtype_of.sx\")))) :name)")
;; ── schema accept / reject ─────────────────────────────────
;; valid: both CIDs present + strings -> true
(epoch 20)
(eval "(define sch ${SCH}) (sch (dict :object (dict :child-type-cid \"bafyChild\" :parent-type-cid \"bafyParent\")))")
;; reject: missing :child-type-cid -> false
(epoch 21)
(eval "(define sch ${SCH}) (sch (dict :object (dict :parent-type-cid \"bafyParent\")))")
;; reject: missing :parent-type-cid -> false
(epoch 22)
(eval "(define sch ${SCH}) (sch (dict :object (dict :child-type-cid \"bafyChild\")))")
;; ── envelope round-trip through term_codec ─────────────────
(epoch 30)
(eval "(get (erlang-eval-ast \"A = [{type, subtype_of}, {actor, alice}, {object, [{child_type_cid, <<99,104>>}, {parent_type_cid, <<112,97>>}]}], {ok, D, _} = term_codec:decode(term_codec:encode(A)), D =:= A\") :name)")
EPOCHS
OUTPUT=$(timeout 180 "$SX_SERVER" < "$TMPFILE" 2>/dev/null)
check() {
local epoch="$1" desc="$2" expected="$3"
local actual
actual=$(echo "$OUTPUT" | awk -v e="$epoch" '
$0 ~ "^\\(ok-len " e " " { getline; print; exit }
$0 ~ "^\\(ok " e " " { print; exit }
$0 ~ "^\\(error " e " " { print; exit }
')
[ -z "$actual" ] && actual="<no output for epoch $epoch>"
if echo "$actual" | grep -qF -- "$expected"; then
PASS=$((PASS+1))
[ "$VERBOSE" = "-v" ] && echo " ok $desc"
else
FAIL=$((FAIL+1))
ERRORS+=" FAIL [$desc] (epoch $epoch) expected: $expected | actual: $actual
"
fi
}
check 10 "subtype_of.sx head form" "DefineActivity"
check 11 "subtype_of.sx name" "SubtypeOf"
check 20 "schema accepts edge with 2 CIDs" "true"
check 21 "schema rejects missing child CID" "false"
check 22 "schema rejects missing parent CID" "false"
check 30 "SubtypeOf envelope round-trips" "true"
TOTAL=$((PASS+FAIL))
if [ $FAIL -eq 0 ]; then
echo "ok $PASS/$TOTAL next/tests/subtype_of.sh passed"
else
echo "FAIL $PASS/$TOTAL passed, $FAIL failed:"
echo "$ERRORS"
fi
[ $FAIL -eq 0 ]

143
next/tests/trigger_registry.sh Executable file
View File

@@ -0,0 +1,143 @@
#!/usr/bin/env bash
# next/tests/trigger_registry.sh — fed-sx triggers Phase 1 (registry).
#
# trigger_registry binds activity-types to durable flows. The kernel's
# post-append fan-out (Phase 2) looks an arriving activity's type up
# here and starts each registered flow. Mirrors peer_actors / peer_types:
# a pure core + a gen_server, hydrated from a fold over DefineTrigger
# activities.
set -uo pipefail
cd "$(git rev-parse --show-toplevel)"
SX_SERVER="${SX_SERVER:-hosts/ocaml/_build/default/bin/sx_server.exe}"
if [ ! -x "$SX_SERVER" ]; then
SX_SERVER="/root/rose-ash/hosts/ocaml/_build/default/bin/sx_server.exe"
fi
if [ ! -x "$SX_SERVER" ]; then
echo "ERROR: sx_server.exe not found." >&2
exit 1
fi
VERBOSE="${1:-}"
PASS=0; FAIL=0; ERRORS=""
TMPFILE=$(mktemp); trap "rm -f $TMPFILE" EXIT
# Spec1/Spec2 bind activity-type `create`. TrigAct/TrigAct2 are
# DefineTrigger activities the fold hydrates from.
SETUP='S1 = trigger_registry:mk_spec(<<99,49>>, flow_a, undefined, any), S2 = trigger_registry:mk_spec(<<99,50>>, flow_b, undefined, any), TrigAct = [{type, define_trigger}, {actor, alice}, {id, <<99,49>>}, {object, [{activity_type, create}, {flow_name, flow_a}]}], TrigAct2 = [{type, define_trigger}, {actor, alice}, {id, <<99,50>>}, {object, [{activity_type, follow}, {flow_name, flow_c}]}], Note = [{type, note}, {actor, alice}, {object, [{content, hi}]}],'
cat > "$TMPFILE" <<EPOCHS
(epoch 1)
(load "lib/erlang/tokenizer.sx")
(load "lib/erlang/parser.sx")
(load "lib/erlang/parser-core.sx")
(load "lib/erlang/parser-expr.sx")
(load "lib/erlang/parser-module.sx")
(load "lib/erlang/transpile.sx")
(load "lib/erlang/runtime.sx")
(load "lib/erlang/vm/dispatcher.sx")
(epoch 2)
(eval "(er-load-gen-server!)")
(epoch 3)
(eval "(get (erlang-load-module (file-read \"next/kernel/envelope.erl\")) :name)")
(epoch 4)
(eval "(get (erlang-load-module (file-read \"next/kernel/trigger_registry.erl\")) :name)")
;; ── pure core ──────────────────────────────────────────────
(epoch 10)
(eval "(get (erlang-eval-ast \"trigger_registry:new() =:= []\") :name)")
;; add + lookup round-trip
(epoch 11)
(eval "(get (erlang-eval-ast \"${SETUP} St = trigger_registry:add(create, S1, trigger_registry:new()), trigger_registry:lookup(create, St) =:= [S1]\") :name)")
;; lookup with no match -> []
(epoch 12)
(eval "(get (erlang-eval-ast \"${SETUP} trigger_registry:lookup(create, trigger_registry:new()) =:= []\") :name)")
;; multi-bind: two specs on the same activity-type, both returned in order
(epoch 13)
(eval "(get (erlang-eval-ast \"${SETUP} St = trigger_registry:add(create, S2, trigger_registry:add(create, S1, trigger_registry:new())), trigger_registry:lookup(create, St) =:= [S1, S2]\") :name)")
;; remove by trigger cid
(epoch 14)
(eval "(get (erlang-eval-ast \"${SETUP} St = trigger_registry:add(create, S2, trigger_registry:add(create, S1, trigger_registry:new())), trigger_registry:lookup(create, trigger_registry:remove(<<99,49>>, St)) =:= [S2]\") :name)")
;; remove last spec for a type prunes the type
(epoch 15)
(eval "(get (erlang-eval-ast \"${SETUP} St = trigger_registry:add(create, S1, trigger_registry:new()), trigger_registry:remove(<<99,49>>, St) =:= []\") :name)")
;; spec accessors
(epoch 16)
(eval "(get (erlang-eval-ast \"${SETUP} {trigger_registry:spec_cid(S1), trigger_registry:spec_flow_name(S1), trigger_registry:spec_guard(S1), trigger_registry:spec_actor_scope(S1)} =:= {<<99,49>>, flow_a, undefined, any}\") :name)")
;; ── hydration fold ─────────────────────────────────────────
;; a DefineTrigger activity registers its binding
(epoch 20)
(eval "(get (erlang-eval-ast \"${SETUP} St = trigger_registry:fold(TrigAct, trigger_registry:new()), trigger_registry:lookup(create, St) =:= [trigger_registry:mk_spec(<<99,49>>, flow_a, undefined, any)]\") :name)")
;; a non-trigger activity passes through untouched
(epoch 21)
(eval "(get (erlang-eval-ast \"${SETUP} trigger_registry:fold(Note, trigger_registry:new()) =:= []\") :name)")
;; folding several Trigger activities rebuilds the whole registry
(epoch 22)
(eval "(get (erlang-eval-ast \"${SETUP} St = trigger_registry:fold(TrigAct2, trigger_registry:fold(TrigAct, trigger_registry:new())), {trigger_registry:lookup(create, St), trigger_registry:lookup(follow, St)} =:= {[trigger_registry:mk_spec(<<99,49>>, flow_a, undefined, any)], [trigger_registry:mk_spec(<<99,50>>, flow_c, undefined, any)]}\") :name)")
;; fold_fn/0 is a 2-arity fun
(epoch 23)
(eval "(get (erlang-eval-ast \"is_function(trigger_registry:fold_fn(), 2)\") :name)")
;; ── gen_server ─────────────────────────────────────────────
(epoch 30)
(eval "(get (erlang-eval-ast \"${SETUP} trigger_registry:start_link(), trigger_registry:add(create, S1), trigger_registry:lookup(create) =:= [S1]\") :name)")
(epoch 31)
(eval "(get (erlang-eval-ast \"trigger_registry:start_link(), trigger_registry:lookup(create) =:= []\") :name)")
(epoch 32)
(eval "(get (erlang-eval-ast \"${SETUP} trigger_registry:start_link(), trigger_registry:add(create, S1), trigger_registry:add(create, S2), trigger_registry:remove(<<99,49>>), trigger_registry:lookup(create) =:= [S2]\") :name)")
(epoch 33)
(eval "(get (erlang-eval-ast \"${SETUP} trigger_registry:start_link(), trigger_registry:add(create, S1), trigger_registry:add(follow, S2), trigger_registry:all_triggers() =:= [{create, [S1]}, {follow, [S2]}]\") :name)")
;; start_link/1 pre-populates from a hydrated state
(epoch 34)
(eval "(get (erlang-eval-ast \"${SETUP} St = trigger_registry:fold(TrigAct, trigger_registry:new()), trigger_registry:start_link(St), trigger_registry:lookup(create) =:= [trigger_registry:mk_spec(<<99,49>>, flow_a, undefined, any)]\") :name)")
EPOCHS
OUTPUT=$(timeout 300 "$SX_SERVER" < "$TMPFILE" 2>/dev/null)
check() {
local epoch="$1" desc="$2" expected="$3"
local actual
actual=$(echo "$OUTPUT" | awk -v e="$epoch" '
$0 ~ "^\\(ok-len " e " " { getline; print; exit }
$0 ~ "^\\(ok " e " " { print; exit }
$0 ~ "^\\(error " e " " { print; exit }
')
[ -z "$actual" ] && actual="<no output for epoch $epoch>"
if echo "$actual" | grep -qF -- "$expected"; then
PASS=$((PASS+1))
[ "$VERBOSE" = "-v" ] && echo " ok $desc"
else
FAIL=$((FAIL+1))
ERRORS+=" FAIL [$desc] (epoch $epoch) expected: $expected | actual: $actual
"
fi
}
check 4 "trigger_registry module loaded" "trigger_registry"
check 10 "new/0 -> []" "true"
check 11 "add + lookup round-trip" "true"
check 12 "lookup no match -> []" "true"
check 13 "multi-bind same type, ordered" "true"
check 14 "remove by trigger cid" "true"
check 15 "remove last prunes the type" "true"
check 16 "spec accessors" "true"
check 20 "fold registers a binding" "true"
check 21 "fold non-trigger passes through" "true"
check 22 "fold hydration rebuilds registry" "true"
check 23 "fold_fn/0 is fun/2" "true"
check 30 "gen_server add + lookup" "true"
check 31 "gen_server lookup no match -> []" "true"
check 32 "gen_server remove" "true"
check 33 "gen_server all_triggers" "true"
check 34 "start_link/1 pre-populates" "true"
TOTAL=$((PASS+FAIL))
if [ $FAIL -eq 0 ]; then
echo "ok $PASS/$TOTAL next/tests/trigger_registry.sh passed"
else
echo "FAIL $PASS/$TOTAL passed, $FAIL failed:"
echo "$ERRORS"
fi
[ $FAIL -eq 0 ]

134
next/tests/triggers_e2e.sh Executable file
View File

@@ -0,0 +1,134 @@
#!/usr/bin/env bash
# next/tests/triggers_e2e.sh — fed-sx triggers Phase 4 (end-to-end).
#
# The motivating blog-publish-digest flow, driven the whole way: a
# trigger binds Article-creates to the flow; the post-append fan-out
# starts it; the flow branches on :category, (for newsletters) suspends
# on a morning timer, fetches followers (injected), and emits a
# DigestSent activity object. Effect-as-data: the flow returns the
# emails + DigestSent object (a driver would dispatch/append them) since
# a flow can't call kernel gen_servers from inside the drive.
#
# Each epoch starts fresh gen_servers so instance ids are deterministic.
set -uo pipefail
cd "$(git rev-parse --show-toplevel)"
SX_SERVER="${SX_SERVER:-hosts/ocaml/_build/default/bin/sx_server.exe}"
if [ ! -x "$SX_SERVER" ]; then
SX_SERVER="/root/rose-ash/hosts/ocaml/_build/default/bin/sx_server.exe"
fi
if [ ! -x "$SX_SERVER" ]; then
echo "ERROR: sx_server.exe not found." >&2
exit 1
fi
VERBOSE="${1:-}"
PASS=0; FAIL=0; ERRORS=""
TMPFILE=$(mktemp); trap "rm -f $TMPFILE" EXIT
# Bring-up shared by every case: registry + store, a 3-follower mock,
# the flow registered as blog_digest, and a trigger binding `create`
# to it guarded on "the object is an Article". Cfg/AS as the fan-out
# expects. Activities differ by :category (urgent / newsletter / draft)
# plus a non-Article note.
BOOT='trigger_registry:start_link(), flow_store:start_link(), FF = fun(_) -> [f1, f2, f3] end, Flow = blog_publish_digest:build([{fetch_followers, FF}]), flow_store:register_flow(blog_digest, Flow), Guard = fun(A, _) -> case envelope:get_field(object, A) of {ok, O} -> envelope:get_field(type, O) =:= {ok, article}; _ -> false end end, trigger_registry:add(create, trigger_registry:mk_spec(<<116,99>>, blog_digest, Guard, any)), Cfg = [{trigger_registry, trigger_registry}], AS = [{actor_id, alice}],'
URGENT='[{type, create}, {actor, alice}, {id, <<117,49>>}, {object, [{type, article}, {category, urgent}]}]'
NEWS='[{type, create}, {actor, alice}, {id, <<110,49>>}, {object, [{type, article}, {category, newsletter}]}]'
DRAFT='[{type, create}, {actor, alice}, {id, <<100,49>>}, {object, [{type, article}, {category, draft}]}]'
NOTE='[{type, create}, {actor, alice}, {id, <<120,49>>}, {object, [{type, note}]}]'
cat > "$TMPFILE" <<EPOCHS
(epoch 1)
(load "lib/erlang/tokenizer.sx")
(load "lib/erlang/parser.sx")
(load "lib/erlang/parser-core.sx")
(load "lib/erlang/parser-expr.sx")
(load "lib/erlang/parser-module.sx")
(load "lib/erlang/transpile.sx")
(load "lib/erlang/runtime.sx")
(load "lib/erlang/vm/dispatcher.sx")
(epoch 2)
(eval "(er-load-gen-server!)")
(eval "(get (erlang-load-module (file-read \"next/kernel/envelope.erl\")) :name)")
(eval "(get (erlang-load-module (file-read \"next/flow/flow.erl\")) :name)")
(eval "(get (erlang-load-module (file-read \"next/flow/flow_spec.erl\")) :name)")
(eval "(get (erlang-load-module (file-read \"next/flow/flow_store.erl\")) :name)")
(eval "(get (erlang-load-module (file-read \"next/kernel/trigger_registry.erl\")) :name)")
(eval "(get (erlang-load-module (file-read \"next/kernel/flow_dispatch.erl\")) :name)")
(eval "(get (erlang-load-module (file-read \"next/kernel/pipeline.erl\")) :name)")
(epoch 3)
(eval "(get (erlang-load-module (file-read \"next/flow/flows/blog_publish_digest.erl\")) :name)")
;; ── urgent: fans out, completes in one cycle, 3 emails ─────
(epoch 10)
(eval "(get (erlang-eval-ast \"${BOOT} pipeline:apply_triggers(${URGENT}, AS, Cfg) =:= {ok, [{<<117,49>>, <<116,99>>, {ok, 1}}]}\") :name)")
(epoch 11)
(eval "(get (erlang-eval-ast \"${BOOT} pipeline:apply_triggers(${URGENT}, AS, Cfg), {ok, {done, {digest_sent, Emails, _}}} = flow_store:status(1), length(Emails) =:= 3\") :name)")
;; DigestSent emit object is well-formed (type, for the article, count)
(epoch 12)
(eval "(get (erlang-eval-ast \"${BOOT} pipeline:apply_triggers(${URGENT}, AS, Cfg), {ok, {done, {digest_sent, _, Digest}}} = flow_store:status(1), Digest =:= [{type, digest_sent}, {for, <<117,49>>}, {follower_count, 3}]\") :name)")
;; ── newsletter: suspends on the morning timer, then resumes ─
(epoch 20)
(eval "(get (erlang-eval-ast \"${BOOT} pipeline:apply_triggers(${NEWS}, AS, Cfg), flow_store:status(1) =:= {ok, {suspended, morning}}\") :name)")
;; advancing the clock (resume the timer) drives it to completion
(epoch 21)
(eval "(get (erlang-eval-ast \"${BOOT} pipeline:apply_triggers(${NEWS}, AS, Cfg), {ok, {flow_done, {digest_sent, Emails, _}}} = flow_store:resume(1, morning_ts), length(Emails) =:= 3\") :name)")
;; before resume no digest exists (still suspended, not done)
(epoch 22)
(eval "(get (erlang-eval-ast \"${BOOT} pipeline:apply_triggers(${NEWS}, AS, Cfg), case flow_store:status(1) of {ok, {done, _}} -> false; {ok, {suspended, morning}} -> true; _ -> false end\") :name)")
;; ── draft: the :else branch, no emails, no DigestSent ──────
(epoch 30)
(eval "(get (erlang-eval-ast \"${BOOT} pipeline:apply_triggers(${DRAFT}, AS, Cfg), flow_store:status(1) =:= {ok, {done, skipped}}\") :name)")
;; ── non-Article note: guard rejects, no flow dispatched ────
(epoch 40)
(eval "(get (erlang-eval-ast \"${BOOT} pipeline:apply_triggers(${NOTE}, AS, Cfg) =:= {ok, []}\") :name)")
;; ── dedup: the same activity arriving twice fires once ─────
(epoch 50)
(eval "(get (erlang-eval-ast \"${BOOT} pipeline:apply_triggers(${URGENT}, [{actor_id, alice}, {triggers_fired, [{<<117,49>>, <<116,99>>}]}], Cfg) =:= {ok, []}\") :name)")
EPOCHS
OUTPUT=$(timeout 360 "$SX_SERVER" < "$TMPFILE" 2>/dev/null)
check() {
local epoch="$1" desc="$2" expected="$3"
local actual
actual=$(echo "$OUTPUT" | awk -v e="$epoch" '
$0 ~ "^\\(ok-len " e " " { getline; print; exit }
$0 ~ "^\\(ok " e " " { print; exit }
$0 ~ "^\\(error " e " " { print; exit }
')
[ -z "$actual" ] && actual="<no output for epoch $epoch>"
if echo "$actual" | grep -qF -- "$expected"; then
PASS=$((PASS+1))
[ "$VERBOSE" = "-v" ] && echo " ok $desc"
else
FAIL=$((FAIL+1))
ERRORS+=" FAIL [$desc] (epoch $epoch) expected: $expected | actual: $actual
"
fi
}
check 3 "blog_publish_digest loaded" "blog_publish_digest"
check 10 "urgent fans out (audit triple)" "true"
check 11 "urgent: 3 emails dispatched" "true"
check 12 "urgent: DigestSent object emitted" "true"
check 20 "newsletter suspends on timer" "true"
check 21 "newsletter resumes -> 3 emails" "true"
check 22 "no digest before resume" "true"
check 30 "draft -> else branch, skipped" "true"
check 40 "non-Article note -> guard rejects" "true"
check 50 "duplicate activity fires once" "true"
TOTAL=$((PASS+FAIL))
if [ $FAIL -eq 0 ]; then
echo "ok $PASS/$TOTAL next/tests/triggers_e2e.sh passed"
else
echo "FAIL $PASS/$TOTAL passed, $FAIL failed:"
echo "$ERRORS"
fi
[ $FAIL -eq 0 ]

View File

@@ -0,0 +1,211 @@
; -*- mode: markdown -*-
# loops/host — fed-sx adapter slice (host side of host-type federation)
Scoped briefing for the follow-up that wires `loops/host`'s SX/dream
front door to the fed-sx kernel substrate landed by
`loops/fed-sx-types`. Companion to `plans/fed-sx-host-types.md` (the
substrate design + public surface). This is the build sheet for the
host-side adapters the substrate loop deliberately deferred.
```
description: loops/host — fed-sx adapter (publish/serve/ingest typed posts)
subagent_type: general-purpose
run_in_background: true
isolation: worktree # worktree at /root/rose-ash-loops/host
```
## Why this is small now
The substrate is done and tested (`origin/loops/fed-sx-types`, 4
phases). And the host already has *most of a type system*:
`lib/host/blog.sx` models a **type as a post** with a content-address
`:cid`, a `:schema` (`{:required [...]}`), `:fields`, `:template`, and a
**`subtype-of` graph over lib/relations**. So this loop is not building
a type model — it is **projecting the host's existing one onto fed-sx**
and ingesting peers' types back. The pieces line up almost 1:1:
| host (lib/host/blog.sx) | fed-sx (next/kernel) |
|--------------------------------------------|------------------------------------------|
| a type-post + `:schema {:required [...]}` | `DefineType` activity + refinement `{required, [...]}` |
| `subtype-of` edge (lib/relations) | `SubtypeOf` activity |
| `host/blog-by-cid` / `host/blog-type-defs` | `peer_types` cache + `GET /types/<cid>` |
| `host/blog-type-issues` (local validate) | `pipeline:apply_object_schema/2` (inbound) |
The host's `{:required [...]}` schema maps **directly** onto the
term_codec-safe `{required, [Field,...]}` refinement form the substrate
already validates — so schema translation is nearly trivial. Derive
both validators from the *same* schema data to avoid drift.
## Scope
**In scope**`lib/host/**` only (the mirror of fed-sx-types'
`next/**` only). New: `lib/host/fed_sx_outbox.sx`,
`lib/host/fed_sx_inbox.sx` (and a small shared `lib/host/fed_sx.sx`
bridge + a `host/types-routes`), plus `serve.sh` bring-up wiring and
`lib/host/tests/`.
**Out of scope**`next/**`. The substrate is frozen public surface;
do **not** edit `next/kernel/**` or the genesis verbs from here. If a
gap is found, file it against fed-sx-types, don't patch across the
boundary. **Hard line: do not edit `next/`.**
## Branch base
Start from `loops/host`, then bring in the substrate (clean, additive —
disjoint paths, no conflicts):
```bash
cd /root/rose-ash-loops/host
git fetch origin
git merge origin/loops/fed-sx-types # adds next/kernel + genesis + plan doc
```
After the merge the worktree has both `lib/host/**` (host) and
`next/kernel/**` (fed-sx substrate) on one branch.
## Phase 0 — settle the runtime boundary (DECIDE FIRST, blocks all else)
`lib/host` is **pure same-runtime SX** (one `sx_server.exe`: stdlib →
R7RS → APL → Datalog → ACL → Relations → Feed → Persist → Dream →
Host, per `serve.sh`). The fed-sx kernel is **Erlang-on-SX** on the
er-scheduler (`erlang-load-module` + gen_servers: `peer_types`,
`nx_kernel`). The host's dream handlers run on the native `http-listen`
accept loop — **outside** the er-scheduler. Calling a kernel
`gen_server:call` synchronously from a native-thread handler hits the
known scheduler-context deadlock (see
`plans/fed-sx-design.md` §, and the fed-prims http-listen note: a
handler on `Thread.create` outside er-sched can't complete a
`gen_server:call → receive`).
Two architectures; **the loop's first deliverable is choosing one with
a tiny spike**:
- **Option A — in-process Erlang bridge.** Host's sx_server also
`erlang-load-module`s the kernel and calls it directly. Pro: one
process, no serialization. Con: the deadlock above — kernel calls
must be marshalled onto the er-scheduler or restricted to pure
(non-gen_server) functions. Fragile; not recommended.
- **Option B — HTTP boundary (RECOMMENDED).** Run the fed-sx kernel
with its own `http_server`/`http-listen` loop (it already has the
whole route surface, and the m2 two-instance smoke test proves
HTTP federation between fed-sx nodes). The host talks to its local
fed-sx node **over localhost HTTP using the wire it already speaks**
(term_codec / activity+json / type-doc). This is literally what
federation is — the host is just another peer to its own node. An
`httpc`/localhost call from a native-thread host handler does **not**
touch the er-scheduler, so the deadlock never arises; the kernel's
own listen handler runs the gen_server calls within er-sched context.
Works whether the kernel is a sidecar process or spawned on an
er-scheduler process inside the host's sx_server (two ports, one
process). Pro: clean, reuses the fully-tested surface, no deadlock.
Con: serialization + lifecycle coordination.
**Recommendation: Option B.** Spike: host handler → `httpc` POST to the
local kernel's `/activity` → 200/cid back, with no hang. Lock the
decision before Phase 1.
## Phase 1 — outbox: project host types → DefineType / SubtypeOf
`lib/host/fed_sx_outbox.sx`. When a host type-post is created/updated
(`host/blog-put!` path), project it and publish to the local fed-sx
node:
- type-post → `DefineType` activity: `:object` = `{name: slug,
fields: (host/blog-fields-of slug), refinement-schema:
(host/blog-schema-of slug), instance-type: <base>}`. The host
`{:required [...]}` becomes the substrate `{required, [...]}` form
verbatim.
- each `subtype-of` edge (`relations/parents` over `"subtype-of"`) →
a `SubtypeOf` activity `{child-type-cid, parent-type-cid}`.
- publish via Phase 0's transport (POST `/activity` to the local node,
authed with the node's publish token).
**Key open decision — the type CID.** The host computes `:cid` via
`host/blog--cid-of` (double-hash over the canonical record); fed-sx
keys `peer_types` by a `TypeCid`. Either:
(a) **adopt the host `:cid` as the fed-sx TypeCid** — one identity,
no reconciliation, but peers can't content-verify it from the
wire bytes; or
(b) **let the kernel content-address the TypeRecord** — verifiable,
but the host must keep a `slug → fed-sx-cid` map (and
`SubtypeOf` edges must reference fed-sx CIDs, not host CIDs).
Pick one and document it in `plans/fed-sx-host-types.md`. (a) is
simpler and probably right for v1; revisit when cross-node verification
matters.
## Phase 2 — inbox: ingest peers' types + validate typed objects
`lib/host/fed_sx_inbox.sx`. Inbound from the local node's inbox:
- inbound `DefineType` → `peer_types:put` (cache it). Decide whether to
also **materialize** it as a host post (`host/blog-put!` +
`host/blog--set-schema!`) or keep federation-only types out of the
local blog (recommended for v1: cache-only, materialize on demand).
- inbound `SubtypeOf` → record the edge (peer_types hierarchy and/or
`host/blog-relate! child parent "subtype-of"` if materialized).
- inbound typed `Create` (a post that `is-a` some refinement type) →
the kernel inbound pipeline runs `pipeline:apply_object_schema/2`
(configured with a `type_index` + `{peer_types, peer_types}` +
`type_fetch_fn`), so a typed object is validated against its declared
type **before** the host sees it. Choose `strict_object_schema`
per-node (default false = open-world).
**Avoid double-validation drift:** the host already has
`host/blog-type-issues`. Let the **kernel validate federation inbound**
and the **host validate local writes**, both deriving from the same
`{:required [...]}` schema data — don't fork the rules.
## Phase 3 — serve + bring-up wiring
- **Serve `GET /types/<cid>`** on the host front door. Either proxy to
the kernel's `/types/<cid>` (Option B keeps one source of truth), or
serve directly from `host/blog-by-cid` + the projected TypeRecord.
Hook as `(dream-get "/types/:cid" host/types-by-cid)` and add
`host/types-routes` to the `host/serve` list (per `router.sx` /
`serve.sh` pattern).
- **Bring-up** in `serve.sh`: start the fed-sx node (Phase 0 transport),
start `peer_types`, configure `type_fetch_fn =
discovery_type_fetch:make_fetch_fn()` + a `type_url` resolver, and on
startup project existing host types (Phase 1) so the node is
type-aware from boot. Gate writes behind the existing
`host/require-auth` / `host/require-permission` middleware, same as
the relations write routes.
## Phase 4 — end-to-end round-trip test
Two nodes (host A + host B, or host + a sidecar fed-sx node): A defines
a refinement type → B fetches the type-doc via `GET /types/<cid>` → B
ingests an inbound typed object and `apply_object_schema` accepts the
valid one / rejects a refinement-failing one. Mirror the m2
two-instance smoke test style. Plus per-phase suites in
`lib/host/tests/` (the host runs its own `conformance.sh`).
## Tests discipline
- The host's `lib/host/conformance.sh` green before AND after every
commit. `lib/host` is **LIVE at blog.rose-ash.com** — pushing
`loops/host` reloads dev, so treat pushes as deliberate.
- Commits scoped to `lib/host/**` (+ `plans/fed-sx-host-types.md` as
decisions ratify). Do **not** edit `next/**`.
- One commit per phase; smaller intermediate commits fine if each
leaves the gate green. The Phase-0 spike can be its own commit.
## Done when
- A host type round-trips: defined locally → published as
`DefineType`/`SubtypeOf` → fetchable at `GET /types/<cid>` → a peer
validates an inbound typed object against it.
- `peer_types` is populated from inbound `DefineType`, and the inbound
pipeline rejects refinement-failing typed objects (strict node).
- The runtime-boundary decision and the type-CID decision are recorded
in `plans/fed-sx-host-types.md`.
- Host `conformance.sh` + the new fed-sx adapter suites green.
## Parallel-safety with loops/fed-sx-types
That loop owns `next/**` and is feature-complete; this loop owns
`lib/host/**`. Disjoint surfaces — they meet only at the merge that
brings the substrate in (Branch base, above). If this loop needs a
substrate change, file it against fed-sx-types rather than editing
`next/` here.

View File

@@ -1296,6 +1296,32 @@ inbox + pull from outbox. SSE is convenience, not protocol.
unknown verbs are stored-but-not-projected — safe by default, with explicit
operator control over what extensions load.
### 13.10 Activity-driven flow triggers (kernel convention)
Beyond projections (which fold an activity into read-model state), the kernel
supports firing **durable business flows** off arriving activities — the
"something happened → here is what we DO about it" half of the model. The
convention (substrate landed in `loops/fed-sx-types`, Phases 58):
- A `DefineTrigger{activity-type, flow-name, guard?, actor-scope?}` activity binds
an activity-type to a named flow. `trigger_registry` hydrates from a fold over
these (restart-safe, same content-addressing as `define-registry`).
- Fan-out runs **after** the kernel append, as the last pipeline step (§14):
`envelope → signature → activity-type schema → object schema → append → trigger
fan-out`. Only accepted activities fire flows; rejected ones never trigger.
- Fan-out is deduped per `{activity-cid, trigger-cid}` (federation can deliver the
same activity twice via different peers) using the actor's `:triggers_fired`
field, and is failure-isolated: one flow's failure never blocks the append or
the other flows.
- Flows run on **flow-on-erlang** (`next/flow/`), a native Erlang-on-SX durable
workflow engine (deterministic-replay suspend/resume; combinator algebra
mirrored from the Scheme `lib/flow`). It runs in the kernel's own runtime, so
the fan-out is a direct call — no cross-guest bridge. Because a flow runs inside
the engine's drive (where a blocking kernel call would deadlock the cooperative
scheduler), flows are **pure and describe effects as data** (their output, or a
`suspend`); a driver outside the flow performs IO and appends any follow-up
activity — which can in turn trigger further flows.
## 14. Validation pipeline
Every activity entering the substrate (whether published locally or received from a

112
plans/fed-sx-host-types.md Normal file
View File

@@ -0,0 +1,112 @@
; -*- mode: markdown -*-
# fed-sx host-type federation — substrate design + build log
How a host's typed-post graph (refinement types declared in
`lib/host`'s metamodel) flows across fed-sx nodes: a type is published
as a content-addressed `DefineType` activity, peers cache its record,
serve it over the wire, and validate inbound objects against the
declared refinement schema before appending them.
This document is both the design and the running build log for
`loops/fed-sx-types`. The companion build sheet is
`plans/agent-briefings/fed-sx-types-loop.md`.
## Vocabulary
- **Type record** — `{name, fields, refinement-schema, instance-type}`.
The parsed `:object` payload of a `DefineType` activity. Immutable
per CID: an updated type is a new CID (no in-place evolution).
- **Type CID** — content-address of the type record's wire form. The
stable handle a `SubtypeOf` edge or an object's `{type, _}` field
references.
- **Refinement schema** — a predicate over an object's field-values;
the extra constraint a refinement type adds on top of its base
`instance-type` (e.g. a `Post` is a `Note` whose `:title` is a
non-empty string).
## Scope
Substrate side only — everything under `next/**`. The host-side
adapters (`lib/host/fed_sx_outbox.sx`, `lib/host/fed_sx_inbox.sx`)
are a deliberate follow-up that consumes this branch's public surface
(`DefineType` / `SubtypeOf` verbs, `peer_types`, the `/types/<cid>`
route) once `loops/host`'s metamodel settles. **This loop does not
touch `lib/host/`.**
## Steps
### Step 1 — `DefineType` + `SubtypeOf` genesis activity-types — DONE
New `DefineActivity`-form genesis files, parsed as data by
`bootstrap.erl` at startup (no kernel change yet):
- `next/genesis/activity-types/define_type.sx` — declares the
`DefineType` verb. `:schema` accepts an activity whose `:object`
carries a string `:name` and an optional list `:fields`.
- `next/genesis/activity-types/subtype_of.sx` — declares the
`SubtypeOf` verb. `:schema` accepts an `:object` carrying both
`:child-type-cid` and `:parent-type-cid` as strings.
Schema bodies are SX source written with nested `get` (not
keyword-threading) so they are directly evaluatable: keywords are not
callable getters in the kernel and `(-> d :k)` does not get. Both are
registered in `next/genesis/manifest.sx` (activity-types now 7) and the
bundle counts in the bootstrap suites were bumped accordingly.
Tests: `next/tests/define_type.sh`, `next/tests/subtype_of.sh` — parse
shape, schema accept/reject, and a `term_codec` envelope round-trip.
### Step 2 — `peer_types.erl` receiver-side cache — DONE
`next/kernel/peer_types.erl`, a mirror of `peer_actors.erl` keyed by
type CID. State `[{TypeCidBytes, TypeRecord}, ...]`. Pure API
(`new/2`-threaded `lookup`/`store`/`evict`/`types`/`lookup_or_fetch`)
plus a registered gen_server (`put`, `lookup`, `state_for`,
`known_types`, `lookup_or_fetch`). On a miss `lookup_or_fetch` pulls a
Cfg-supplied `type_fetch_fn :: fun ((TypeCid, Cfg) -> {ok, Bytes} |
{error, _})`, decodes the wire bytes via `term_codec`, and caches the
record. No fn → `{error, no_fetch_fn}`; fetch error or bad bytes do not
poison the cache. Test: `next/tests/peer_types.sh`.
### Step 3 — `/types/<cid>` route + `discovery_type_fetch.erl` — DONE
`http_server.erl` serves `GET /types/<cid>` with
`Accept: application/vnd.fed-sx.type-doc`: the cached TypeRecord
`term_codec`-encoded, 404 if not cached. `discovery_type_fetch.erl`
holds the live-HTTP closure that `peer_types:lookup_or_fetch` calls.
Tests: `next/tests/peer_types_route.sh`, `next/tests/discovery_type_fetch.sh`.
### Step 4 — object-schema validation stage in `pipeline.erl` — DONE
`pipeline:apply_object_schema/2` (+ `stage_object_schema/1` factory)
sits between activity-type validation and the kernel append. When an
inbound object carries `{type, TypeName}`, resolve the TypeRecord
(Cfg `type_index`: TypeName → TypeCid; then
`peer_types:lookup_or_fetch/2`) and apply its refinement schema to the
object's `:field_values`. The schema is either a 1-arity Erlang
predicate (the substrate stand-in, for locally-defined types) or a
term_codec-safe `{required, [Field, ...]}` data constraint (so a
wire-fetched record validates too). Default `strict_object_schema =
false`: an unresolvable type is let through (the non-strict skip is
where a `validation_skipped` log belongs); opt-in strict rejects.
Objects with no declared type, and type names absent from the local
index, are skipped (open-world). Test: `next/tests/object_schema.sh`.
## Out of scope (deliberately)
- Host-side outbox/inbox adapters (`lib/host/**`).
- Type evolution / version migration — schemas are immutable per CID;
the "name → currently-valid CID" routing layer is a separate problem.
- Subtype-of unification / rendering across nodes — the graph data
lands via `SubtypeOf` activities; dedup/display is a consumer concern.
## What the host-side adapter loop gets
Once all four steps land, the follow-up `loops/host` adapter work can
treat the following as stable public surface:
- `DefineType` / `SubtypeOf` activity verbs (publish a type, link two).
- `peer_types` gen_server (cache a peer's type, look it up).
- `GET /types/<cid>` (serve a type the node knows).
- `pipeline`'s object-schema stage (inbound objects validated against
their declared refinement type when resolvable).