diff --git a/lib/events/conformance.conf b/lib/events/conformance.conf index 52719eaf..05158f07 100644 --- a/lib/events/conformance.conf +++ b/lib/events/conformance.conf @@ -27,6 +27,18 @@ PRELOADS=( lib/persist/api.sx lib/events/booking.sx lib/events/ticket.sx + lib/guest/lex.sx + lib/guest/reflective/env.sx + lib/guest/reflective/quoting.sx + lib/scheme/parser.sx + lib/scheme/eval.sx + lib/scheme/runtime.sx + lib/flow/spec.sx + lib/flow/store.sx + lib/flow/remote.sx + lib/flow/host.sx + lib/flow/api.sx + lib/events/notify.sx lib/events/api.sx ) @@ -36,4 +48,5 @@ SUITES=( "api:lib/events/tests/api.sx:(ev-api-tests-run!)" "booking:lib/events/tests/booking.sx:(ev-booking-tests-run!)" "ticket:lib/events/tests/ticket.sx:(ev-ticket-tests-run!)" + "notify:lib/events/tests/notify.sx:(ev-notify-tests-run!)" ) diff --git a/lib/events/notify.sx b/lib/events/notify.sx new file mode 100644 index 00000000..914aa184 --- /dev/null +++ b/lib/events/notify.sx @@ -0,0 +1,38 @@ +;; lib/events/notify.sx — durable notification delivery flows over an injected +;; transport (lib/flow). +;; +;; Reminders and digests are durable `flow`s: a flow `request`s delivery (a +;; suspend point), the HOST performs the actual send via an injected `dispatch` +;; (the transport — email/push/etc.), and resumes the flow with the outcome. +;; Because flow uses deterministic replay, a completed delivery is never re-run +;; on recovery; the host owns IO and persistence. +;; +;; Delivery is AT-LEAST-ONCE with idempotency. Each message carries an id (the +;; idempotency key). Two protections stop double-delivery: +;; 1. The transport dedups by id — a re-send of a delivered id is a no-op +;; that still reports ok, so a retry never produces two pings. +;; 2. flow's replay log records each resolved request, so recovery replays the +;; logged outcome instead of re-issuing the send. +;; +;; Retry/backoff rides flow suspend/resume: each attempt issues a request with a +;; DISTINCT tag `(deliver )` — distinct tags keep deterministic replay +;; correct across retries. The dispatch returns (ok info) to finish or +;; (retry reason) to try again, bounded by `maxn` (then (failed id reason)). +;; +;; A message is a 3-element list (id recipient body). The transport is generic +;; and injected — when feed/notify lands, both consumers share one transport, +;; so this delivery core is a candidate for extraction to `delivery-on-sx`. +;; +;; The Scheme flow source below loads into a flow env (see lib/flow/api.sx). +;; `ev/notify-run` prepends it to a caller program and evaluates in the shared +;; flow env. + +(define + ev-notify-flows-src + "(define (ev-msg-id m) (car m))\n (define (ev-msg-recipient m) (car (cdr m)))\n (define (ev-msg-body m) (car (cdr (cdr m))))\n (define (ev-mem x xs)\n (if (null? xs) #f (if (equal? x (car xs)) #t (ev-mem x (cdr xs)))))\n (define (ev-notify-attempt m n maxn)\n (let ((r (request (list (quote deliver) (ev-msg-id m) n) m)))\n (if (eq? (car r) (quote ok))\n (list (quote delivered) (ev-msg-id m) n)\n (if (>= n maxn)\n (list (quote failed) (ev-msg-id m) (car (cdr r)))\n (ev-notify-attempt m (+ n 1) maxn)))))\n (define (ev-deliver-reminder maxn)\n (flow-node (lambda (m) (ev-notify-attempt m 1 maxn))))\n (define (ev-digest-step ms maxn)\n (if (null? ms)\n (list)\n (cons (ev-notify-attempt (car ms) 1 maxn)\n (ev-digest-step (cdr ms) maxn))))\n (define (ev-deliver-digest maxn)\n (flow-node (lambda (ms) (ev-digest-step ms maxn))))") + +;; Run a Scheme flow program with the notify flows preloaded, in the shared +;; flow env. Returns the program's value (SX-native). +(define + ev/notify-run + (fn (prog) (flow-run (str ev-notify-flows-src "\n" prog)))) diff --git a/lib/events/scoreboard.json b/lib/events/scoreboard.json index 5568a285..beea384f 100644 --- a/lib/events/scoreboard.json +++ b/lib/events/scoreboard.json @@ -1,14 +1,15 @@ { "lang": "events", - "total_passed": 175, + "total_passed": 182, "total_failed": 0, - "total": 175, + "total": 182, "suites": [ {"name":"calendar","passed":37,"failed":0,"total":37}, {"name":"availability","passed":22,"failed":0,"total":22}, {"name":"api","passed":24,"failed":0,"total":24}, {"name":"booking","passed":61,"failed":0,"total":61}, - {"name":"ticket","passed":31,"failed":0,"total":31} + {"name":"ticket","passed":31,"failed":0,"total":31}, + {"name":"notify","passed":7,"failed":0,"total":7} ], - "generated": "2026-06-07T03:33:46+00:00" + "generated": "2026-06-07T04:02:26+00:00" } diff --git a/lib/events/scoreboard.md b/lib/events/scoreboard.md index e9847412..59dd18db 100644 --- a/lib/events/scoreboard.md +++ b/lib/events/scoreboard.md @@ -1,6 +1,6 @@ # events scoreboard -**175 / 175 passing** (0 failure(s)). +**182 / 182 passing** (0 failure(s)). | Suite | Passed | Total | Status | |-------|--------|-------|--------| @@ -9,3 +9,4 @@ | api | 24 | 24 | ok | | booking | 61 | 61 | ok | | ticket | 31 | 31 | ok | +| notify | 7 | 7 | ok | diff --git a/lib/events/tests/notify.sx b/lib/events/tests/notify.sx new file mode 100644 index 00000000..c81745a3 --- /dev/null +++ b/lib/events/tests/notify.sx @@ -0,0 +1,77 @@ +;; lib/events/tests/notify.sx — durable notification delivery flows. + +(define ev-nt-pass 0) +(define ev-nt-fail 0) +(define ev-nt-failures (list)) + +(define + ev-nt-check! + (fn + (name got expected) + (if + (= got expected) + (set! ev-nt-pass (+ ev-nt-pass 1)) + (do + (set! ev-nt-fail (+ ev-nt-fail 1)) + (append! + ev-nt-failures + (str name "\n expected: " expected "\n got: " got)))))) + +;; Each case runs a Scheme flow program (notify flows preloaded) and asserts on +;; the SX-native result. Scheme symbols come back as strings. +(define + ev-nt-run-all! + (fn + () + (do + (ev-nt-check! + "reminder delivers on the first attempt" + (ev/notify-run + "(define s (flow/start (ev-deliver-reminder 3) (list (quote m1) (quote alice) (quote hello))))\n (flow-run-host (lambda (k p) (list (quote ok) (quote sent))) 5)\n (list (flow/status (car (cdr s))) (flow/result (car (cdr s))))") + (list "done" (list "delivered" "m1" 1))) + (ev-nt-check! + "reminder retries a transient failure then delivers" + (ev/notify-run + "(define hits 0)\n (define s (flow/start (ev-deliver-reminder 3) (list (quote m1) (quote bob) (quote hi))))\n (flow-run-host (lambda (k p) (begin (set! hits (+ hits 1)) (if (< hits 2) (list (quote retry) (quote down)) (list (quote ok) (quote sent))))) 10)\n (list (flow/result (car (cdr s))) hits)") + (list (list "delivered" "m1" 2) 2)) + (ev-nt-check! + "reminder gives up after maxn attempts" + (ev/notify-run + "(define s (flow/start (ev-deliver-reminder 2) (list (quote m1) (quote x) (quote y))))\n (flow-run-host (lambda (k p) (list (quote retry) (quote down))) 10)\n (flow/result (car (cdr s)))") + (list "failed" "m1" "down")) + (ev-nt-check! + "redelivery of the same id sends only once (at-least-once, idempotent)" + (ev/notify-run + "(define sent (list)) (define deliveries 0)\n (define (xport k p)\n (let ((id (ev-msg-id p)))\n (if (ev-mem id sent)\n (list (quote ok) (quote duplicate))\n (begin (set! sent (cons id sent)) (set! deliveries (+ deliveries 1)) (list (quote ok) (quote sent))))))\n (define s1 (flow/start (ev-deliver-reminder 3) (list (quote m1) (quote a) (quote hi))))\n (flow-run-host xport 5)\n (define s2 (flow/start (ev-deliver-reminder 3) (list (quote m1) (quote a) (quote hi))))\n (flow-run-host xport 5)\n (list deliveries (flow/result (car (cdr s2))))") + (list 1 (list "delivered" "m1" 1))) + (ev-nt-check! + "digest delivers every message in the batch" + (ev/notify-run + "(define s (flow/start (ev-deliver-digest 3) (list (list (quote a) (quote u1) (quote hi)) (list (quote b) (quote u2) (quote yo)))))\n (flow-run-host (lambda (k p) (list (quote ok) (quote sent))) 10)\n (flow/result (car (cdr s)))") + (list + (list "delivered" "a" 1) + (list "delivered" "b" 1))) + (ev-nt-check! + "digest reports per-message outcomes independently" + (ev/notify-run + "(define (xport k p)\n (let ((id (ev-msg-id p)))\n (if (equal? id (quote b)) (list (quote retry) (quote flaky)) (list (quote ok) (quote sent)))))\n (define s (flow/start (ev-deliver-digest 2) (list (list (quote a) (quote u1) (quote hi)) (list (quote b) (quote u2) (quote yo)) (list (quote c) (quote u3) (quote ya)))))\n (flow-run-host xport 12)\n (flow/result (car (cdr s)))") + (list + (list "delivered" "a" 1) + (list "failed" "b" "flaky") + (list "delivered" "c" 1))) + (ev-nt-check! + "delivery suspends until the transport responds" + (ev/notify-run + "(define s (flow/start (ev-deliver-reminder 3) (list (quote m1) (quote a) (quote hi))))\n (flow/status (car (cdr s)))") + "suspended")))) + +(define + ev-notify-tests-run! + (fn + () + (do + (set! ev-nt-pass 0) + (set! ev-nt-fail 0) + (set! ev-nt-failures (list)) + (ev-nt-run-all!) + {:failures ev-nt-failures :total (+ ev-nt-pass ev-nt-fail) :passed ev-nt-pass :failed ev-nt-fail}))) diff --git a/plans/events-on-sx.md b/plans/events-on-sx.md index 72db93e1..63fed601 100644 --- a/plans/events-on-sx.md +++ b/plans/events-on-sx.md @@ -18,7 +18,7 @@ capacity rules, transactional booking, and a flow-driven notification dispatcher ## Status (rolling) -`bash lib/events/conformance.sh` → **175/175** (Phase 1 + Phase 2 complete: booking/holds/paid-ticket contract) +`bash lib/events/conformance.sh` → **182/182** (Phases 1-2 + Phase 3 notification delivery flows) ## Ground rules @@ -70,11 +70,13 @@ lib/events/api.sx ── (events/schedule) (events/book) (events/agenda) ── - [x] tests: capacity edge, double-book guard, conflict detection ## Phase 3 — Notification delivery (flow) -- [ ] `notify.sx` — reminder/digest flows over injected transport -- [ ] retry/backoff on transport failure (flow suspend/resume) -- [ ] tests: delivery success, retry path, idempotent re-send +- [x] `notify.sx` — reminder/digest flows over injected transport +- [x] retry/backoff on transport failure (flow suspend/resume) +- [x] tests: delivery success, retry path, idempotent re-send +- [ ] wire reminders to occurrences (schedule "starts in 1h" from agenda) - [ ] NOTE: shared with `feed/notify` — candidate for later extraction to a - `delivery-on-sx` once a second consumer is real + `delivery-on-sx` once a second consumer is real. **Delivery core + (request→dispatch→resume, idempotent, bounded retry) is the extraction seam.** ## Phase 4 — Federation - [ ] cross-instance events (peer calendar) — trust-gated stub @@ -82,6 +84,18 @@ lib/events/api.sx ── (events/schedule) (events/book) (events/agenda) ── ## Progress log +- 2026-06-07 — **Phase 3 start: notification delivery flows.** `notify.sx`: + reminders + digests as durable `flow`s over an INJECTED transport (the host + `dispatch`). A flow `request`s delivery (suspend), the host sends and resumes + with the outcome; flow's replay log means a completed send is never re-run on + recovery. At-least-once + idempotent: messages carry an id; the transport + dedups (re-send is a no-op that still reports ok) and replay logs each + outcome. Retry rides suspend/resume — each attempt uses a DISTINCT tag + `(deliver )` so replay stays correct; dispatch returns (ok) / + (retry reason), bounded by maxn → (failed id reason). Digest delivers a batch + with independent per-message outcomes. Authored as Scheme flow source run via + `ev/notify-run` (scheme + flow substrate preloaded). +7 tests, 182/182 green. + Delivery core is the `delivery-on-sx` extraction seam for feed/notify. - 2026-06-07 — **Phase 2 complete: paid-ticket contract.** `ticket.sx` defines the two wire messages between events and commerce — `checkout-request` (events→commerce) and `payment-result` (commerce→events, :paid/:failed/