events: notification delivery flows on lib/flow + 7 tests (Phase 3 start)
Some checks failed
Test, Build, and Deploy / test-build-deploy (push) Failing after 34s
Some checks failed
Test, Build, and Deploy / test-build-deploy (push) Failing after 34s
notify.sx: reminders + digests as durable flows over an injected transport. A flow requests delivery (suspend); the host dispatch sends and resumes with the outcome. At-least-once + idempotent (transport dedups by msg id; replay logs outcomes). Retry rides suspend/resume with distinct per-attempt tags, bounded by maxn. Digest delivers a batch with per-message outcomes. 182/182 green. Delivery core is the delivery-on-sx extraction seam. Co-Authored-By: Claude Opus 4.8 (1M context) <noreply@anthropic.com>
This commit is contained in:
@@ -27,6 +27,18 @@ PRELOADS=(
|
||||
lib/persist/api.sx
|
||||
lib/events/booking.sx
|
||||
lib/events/ticket.sx
|
||||
lib/guest/lex.sx
|
||||
lib/guest/reflective/env.sx
|
||||
lib/guest/reflective/quoting.sx
|
||||
lib/scheme/parser.sx
|
||||
lib/scheme/eval.sx
|
||||
lib/scheme/runtime.sx
|
||||
lib/flow/spec.sx
|
||||
lib/flow/store.sx
|
||||
lib/flow/remote.sx
|
||||
lib/flow/host.sx
|
||||
lib/flow/api.sx
|
||||
lib/events/notify.sx
|
||||
lib/events/api.sx
|
||||
)
|
||||
|
||||
@@ -36,4 +48,5 @@ SUITES=(
|
||||
"api:lib/events/tests/api.sx:(ev-api-tests-run!)"
|
||||
"booking:lib/events/tests/booking.sx:(ev-booking-tests-run!)"
|
||||
"ticket:lib/events/tests/ticket.sx:(ev-ticket-tests-run!)"
|
||||
"notify:lib/events/tests/notify.sx:(ev-notify-tests-run!)"
|
||||
)
|
||||
|
||||
38
lib/events/notify.sx
Normal file
38
lib/events/notify.sx
Normal file
@@ -0,0 +1,38 @@
|
||||
;; lib/events/notify.sx — durable notification delivery flows over an injected
|
||||
;; transport (lib/flow).
|
||||
;;
|
||||
;; Reminders and digests are durable `flow`s: a flow `request`s delivery (a
|
||||
;; suspend point), the HOST performs the actual send via an injected `dispatch`
|
||||
;; (the transport — email/push/etc.), and resumes the flow with the outcome.
|
||||
;; Because flow uses deterministic replay, a completed delivery is never re-run
|
||||
;; on recovery; the host owns IO and persistence.
|
||||
;;
|
||||
;; Delivery is AT-LEAST-ONCE with idempotency. Each message carries an id (the
|
||||
;; idempotency key). Two protections stop double-delivery:
|
||||
;; 1. The transport dedups by id — a re-send of a delivered id is a no-op
|
||||
;; that still reports ok, so a retry never produces two pings.
|
||||
;; 2. flow's replay log records each resolved request, so recovery replays the
|
||||
;; logged outcome instead of re-issuing the send.
|
||||
;;
|
||||
;; Retry/backoff rides flow suspend/resume: each attempt issues a request with a
|
||||
;; DISTINCT tag `(deliver <id> <n>)` — distinct tags keep deterministic replay
|
||||
;; correct across retries. The dispatch returns (ok info) to finish or
|
||||
;; (retry reason) to try again, bounded by `maxn` (then (failed id reason)).
|
||||
;;
|
||||
;; A message is a 3-element list (id recipient body). The transport is generic
|
||||
;; and injected — when feed/notify lands, both consumers share one transport,
|
||||
;; so this delivery core is a candidate for extraction to `delivery-on-sx`.
|
||||
;;
|
||||
;; The Scheme flow source below loads into a flow env (see lib/flow/api.sx).
|
||||
;; `ev/notify-run` prepends it to a caller program and evaluates in the shared
|
||||
;; flow env.
|
||||
|
||||
(define
|
||||
ev-notify-flows-src
|
||||
"(define (ev-msg-id m) (car m))\n (define (ev-msg-recipient m) (car (cdr m)))\n (define (ev-msg-body m) (car (cdr (cdr m))))\n (define (ev-mem x xs)\n (if (null? xs) #f (if (equal? x (car xs)) #t (ev-mem x (cdr xs)))))\n (define (ev-notify-attempt m n maxn)\n (let ((r (request (list (quote deliver) (ev-msg-id m) n) m)))\n (if (eq? (car r) (quote ok))\n (list (quote delivered) (ev-msg-id m) n)\n (if (>= n maxn)\n (list (quote failed) (ev-msg-id m) (car (cdr r)))\n (ev-notify-attempt m (+ n 1) maxn)))))\n (define (ev-deliver-reminder maxn)\n (flow-node (lambda (m) (ev-notify-attempt m 1 maxn))))\n (define (ev-digest-step ms maxn)\n (if (null? ms)\n (list)\n (cons (ev-notify-attempt (car ms) 1 maxn)\n (ev-digest-step (cdr ms) maxn))))\n (define (ev-deliver-digest maxn)\n (flow-node (lambda (ms) (ev-digest-step ms maxn))))")
|
||||
|
||||
;; Run a Scheme flow program with the notify flows preloaded, in the shared
|
||||
;; flow env. Returns the program's value (SX-native).
|
||||
(define
|
||||
ev/notify-run
|
||||
(fn (prog) (flow-run (str ev-notify-flows-src "\n" prog))))
|
||||
@@ -1,14 +1,15 @@
|
||||
{
|
||||
"lang": "events",
|
||||
"total_passed": 175,
|
||||
"total_passed": 182,
|
||||
"total_failed": 0,
|
||||
"total": 175,
|
||||
"total": 182,
|
||||
"suites": [
|
||||
{"name":"calendar","passed":37,"failed":0,"total":37},
|
||||
{"name":"availability","passed":22,"failed":0,"total":22},
|
||||
{"name":"api","passed":24,"failed":0,"total":24},
|
||||
{"name":"booking","passed":61,"failed":0,"total":61},
|
||||
{"name":"ticket","passed":31,"failed":0,"total":31}
|
||||
{"name":"ticket","passed":31,"failed":0,"total":31},
|
||||
{"name":"notify","passed":7,"failed":0,"total":7}
|
||||
],
|
||||
"generated": "2026-06-07T03:33:46+00:00"
|
||||
"generated": "2026-06-07T04:02:26+00:00"
|
||||
}
|
||||
|
||||
@@ -1,6 +1,6 @@
|
||||
# events scoreboard
|
||||
|
||||
**175 / 175 passing** (0 failure(s)).
|
||||
**182 / 182 passing** (0 failure(s)).
|
||||
|
||||
| Suite | Passed | Total | Status |
|
||||
|-------|--------|-------|--------|
|
||||
@@ -9,3 +9,4 @@
|
||||
| api | 24 | 24 | ok |
|
||||
| booking | 61 | 61 | ok |
|
||||
| ticket | 31 | 31 | ok |
|
||||
| notify | 7 | 7 | ok |
|
||||
|
||||
77
lib/events/tests/notify.sx
Normal file
77
lib/events/tests/notify.sx
Normal file
@@ -0,0 +1,77 @@
|
||||
;; lib/events/tests/notify.sx — durable notification delivery flows.
|
||||
|
||||
(define ev-nt-pass 0)
|
||||
(define ev-nt-fail 0)
|
||||
(define ev-nt-failures (list))
|
||||
|
||||
(define
|
||||
ev-nt-check!
|
||||
(fn
|
||||
(name got expected)
|
||||
(if
|
||||
(= got expected)
|
||||
(set! ev-nt-pass (+ ev-nt-pass 1))
|
||||
(do
|
||||
(set! ev-nt-fail (+ ev-nt-fail 1))
|
||||
(append!
|
||||
ev-nt-failures
|
||||
(str name "\n expected: " expected "\n got: " got))))))
|
||||
|
||||
;; Each case runs a Scheme flow program (notify flows preloaded) and asserts on
|
||||
;; the SX-native result. Scheme symbols come back as strings.
|
||||
(define
|
||||
ev-nt-run-all!
|
||||
(fn
|
||||
()
|
||||
(do
|
||||
(ev-nt-check!
|
||||
"reminder delivers on the first attempt"
|
||||
(ev/notify-run
|
||||
"(define s (flow/start (ev-deliver-reminder 3) (list (quote m1) (quote alice) (quote hello))))\n (flow-run-host (lambda (k p) (list (quote ok) (quote sent))) 5)\n (list (flow/status (car (cdr s))) (flow/result (car (cdr s))))")
|
||||
(list "done" (list "delivered" "m1" 1)))
|
||||
(ev-nt-check!
|
||||
"reminder retries a transient failure then delivers"
|
||||
(ev/notify-run
|
||||
"(define hits 0)\n (define s (flow/start (ev-deliver-reminder 3) (list (quote m1) (quote bob) (quote hi))))\n (flow-run-host (lambda (k p) (begin (set! hits (+ hits 1)) (if (< hits 2) (list (quote retry) (quote down)) (list (quote ok) (quote sent))))) 10)\n (list (flow/result (car (cdr s))) hits)")
|
||||
(list (list "delivered" "m1" 2) 2))
|
||||
(ev-nt-check!
|
||||
"reminder gives up after maxn attempts"
|
||||
(ev/notify-run
|
||||
"(define s (flow/start (ev-deliver-reminder 2) (list (quote m1) (quote x) (quote y))))\n (flow-run-host (lambda (k p) (list (quote retry) (quote down))) 10)\n (flow/result (car (cdr s)))")
|
||||
(list "failed" "m1" "down"))
|
||||
(ev-nt-check!
|
||||
"redelivery of the same id sends only once (at-least-once, idempotent)"
|
||||
(ev/notify-run
|
||||
"(define sent (list)) (define deliveries 0)\n (define (xport k p)\n (let ((id (ev-msg-id p)))\n (if (ev-mem id sent)\n (list (quote ok) (quote duplicate))\n (begin (set! sent (cons id sent)) (set! deliveries (+ deliveries 1)) (list (quote ok) (quote sent))))))\n (define s1 (flow/start (ev-deliver-reminder 3) (list (quote m1) (quote a) (quote hi))))\n (flow-run-host xport 5)\n (define s2 (flow/start (ev-deliver-reminder 3) (list (quote m1) (quote a) (quote hi))))\n (flow-run-host xport 5)\n (list deliveries (flow/result (car (cdr s2))))")
|
||||
(list 1 (list "delivered" "m1" 1)))
|
||||
(ev-nt-check!
|
||||
"digest delivers every message in the batch"
|
||||
(ev/notify-run
|
||||
"(define s (flow/start (ev-deliver-digest 3) (list (list (quote a) (quote u1) (quote hi)) (list (quote b) (quote u2) (quote yo)))))\n (flow-run-host (lambda (k p) (list (quote ok) (quote sent))) 10)\n (flow/result (car (cdr s)))")
|
||||
(list
|
||||
(list "delivered" "a" 1)
|
||||
(list "delivered" "b" 1)))
|
||||
(ev-nt-check!
|
||||
"digest reports per-message outcomes independently"
|
||||
(ev/notify-run
|
||||
"(define (xport k p)\n (let ((id (ev-msg-id p)))\n (if (equal? id (quote b)) (list (quote retry) (quote flaky)) (list (quote ok) (quote sent)))))\n (define s (flow/start (ev-deliver-digest 2) (list (list (quote a) (quote u1) (quote hi)) (list (quote b) (quote u2) (quote yo)) (list (quote c) (quote u3) (quote ya)))))\n (flow-run-host xport 12)\n (flow/result (car (cdr s)))")
|
||||
(list
|
||||
(list "delivered" "a" 1)
|
||||
(list "failed" "b" "flaky")
|
||||
(list "delivered" "c" 1)))
|
||||
(ev-nt-check!
|
||||
"delivery suspends until the transport responds"
|
||||
(ev/notify-run
|
||||
"(define s (flow/start (ev-deliver-reminder 3) (list (quote m1) (quote a) (quote hi))))\n (flow/status (car (cdr s)))")
|
||||
"suspended"))))
|
||||
|
||||
(define
|
||||
ev-notify-tests-run!
|
||||
(fn
|
||||
()
|
||||
(do
|
||||
(set! ev-nt-pass 0)
|
||||
(set! ev-nt-fail 0)
|
||||
(set! ev-nt-failures (list))
|
||||
(ev-nt-run-all!)
|
||||
{:failures ev-nt-failures :total (+ ev-nt-pass ev-nt-fail) :passed ev-nt-pass :failed ev-nt-fail})))
|
||||
@@ -18,7 +18,7 @@ capacity rules, transactional booking, and a flow-driven notification dispatcher
|
||||
|
||||
## Status (rolling)
|
||||
|
||||
`bash lib/events/conformance.sh` → **175/175** (Phase 1 + Phase 2 complete: booking/holds/paid-ticket contract)
|
||||
`bash lib/events/conformance.sh` → **182/182** (Phases 1-2 + Phase 3 notification delivery flows)
|
||||
|
||||
## Ground rules
|
||||
|
||||
@@ -70,11 +70,13 @@ lib/events/api.sx ── (events/schedule) (events/book) (events/agenda) ──
|
||||
- [x] tests: capacity edge, double-book guard, conflict detection
|
||||
|
||||
## Phase 3 — Notification delivery (flow)
|
||||
- [ ] `notify.sx` — reminder/digest flows over injected transport
|
||||
- [ ] retry/backoff on transport failure (flow suspend/resume)
|
||||
- [ ] tests: delivery success, retry path, idempotent re-send
|
||||
- [x] `notify.sx` — reminder/digest flows over injected transport
|
||||
- [x] retry/backoff on transport failure (flow suspend/resume)
|
||||
- [x] tests: delivery success, retry path, idempotent re-send
|
||||
- [ ] wire reminders to occurrences (schedule "starts in 1h" from agenda)
|
||||
- [ ] NOTE: shared with `feed/notify` — candidate for later extraction to a
|
||||
`delivery-on-sx` once a second consumer is real
|
||||
`delivery-on-sx` once a second consumer is real. **Delivery core
|
||||
(request→dispatch→resume, idempotent, bounded retry) is the extraction seam.**
|
||||
|
||||
## Phase 4 — Federation
|
||||
- [ ] cross-instance events (peer calendar) — trust-gated stub
|
||||
@@ -82,6 +84,18 @@ lib/events/api.sx ── (events/schedule) (events/book) (events/agenda) ──
|
||||
|
||||
## Progress log
|
||||
|
||||
- 2026-06-07 — **Phase 3 start: notification delivery flows.** `notify.sx`:
|
||||
reminders + digests as durable `flow`s over an INJECTED transport (the host
|
||||
`dispatch`). A flow `request`s delivery (suspend), the host sends and resumes
|
||||
with the outcome; flow's replay log means a completed send is never re-run on
|
||||
recovery. At-least-once + idempotent: messages carry an id; the transport
|
||||
dedups (re-send is a no-op that still reports ok) and replay logs each
|
||||
outcome. Retry rides suspend/resume — each attempt uses a DISTINCT tag
|
||||
`(deliver <id> <n>)` so replay stays correct; dispatch returns (ok) /
|
||||
(retry reason), bounded by maxn → (failed id reason). Digest delivers a batch
|
||||
with independent per-message outcomes. Authored as Scheme flow source run via
|
||||
`ev/notify-run` (scheme + flow substrate preloaded). +7 tests, 182/182 green.
|
||||
Delivery core is the `delivery-on-sx` extraction seam for feed/notify.
|
||||
- 2026-06-07 — **Phase 2 complete: paid-ticket contract.** `ticket.sx` defines
|
||||
the two wire messages between events and commerce — `checkout-request`
|
||||
(events→commerce) and `payment-result` (commerce→events, :paid/:failed/
|
||||
|
||||
Reference in New Issue
Block a user