From 1446eaaa47a0058b0eff211ea1f69afd171aaca0 Mon Sep 17 00:00:00 2001 From: giles Date: Sun, 7 Jun 2026 11:52:00 +0000 Subject: [PATCH] events: end-to-end delivery pipeline (derivation -> notify flow) + 8 tests ev/deliver-messages bridges SX notification messages to the Scheme notify flow: each (id recipient body) is serialized to s-expr text, spliced as quoted data into the digest-flow program, delivered over an injected transport, and results unboxed. Integration suite drives all three derivations (reminders / booking-notify / reschedule) through delivery end to end; empty batch guarded (empty digest completes without suspending). 303/303 green. Co-Authored-By: Claude Opus 4.8 (1M context) --- lib/events/conformance.conf | 1 + lib/events/notify.sx | 59 +++++++++++++ lib/events/scoreboard.json | 9 +- lib/events/scoreboard.md | 3 +- lib/events/tests/integration.sx | 144 ++++++++++++++++++++++++++++++++ plans/events-on-sx.md | 14 +++- 6 files changed, 224 insertions(+), 6 deletions(-) create mode 100644 lib/events/tests/integration.sx diff --git a/lib/events/conformance.conf b/lib/events/conformance.conf index 78767d29..f8954faf 100644 --- a/lib/events/conformance.conf +++ b/lib/events/conformance.conf @@ -57,4 +57,5 @@ SUITES=( "notify:lib/events/tests/notify.sx:(ev-notify-tests-run!)" "reminders:lib/events/tests/reminders.sx:(ev-reminders-tests-run!)" "federation:lib/events/tests/federation.sx:(ev-federation-tests-run!)" + "integration:lib/events/tests/integration.sx:(ev-integration-tests-run!)" ) diff --git a/lib/events/notify.sx b/lib/events/notify.sx index 914aa184..e2bc64e9 100644 --- a/lib/events/notify.sx +++ b/lib/events/notify.sx @@ -36,3 +36,62 @@ (define ev/notify-run (fn (prog) (flow-run (str ev-notify-flows-src "\n" prog)))) + +;; ---- end-to-end delivery: SX messages -> the notify flow ---- +;; Bridges the SX notification-derivation modules (reminders / booking-notify / +;; reschedule) to the durable delivery flow. An SX message (id recipient body) +;; is serialized to s-expression text and spliced into the Scheme program as +;; quoted data, then the digest flow delivers the batch over an injected +;; transport. Strings round-trip through the guest Scheme as {:scm-string ...} +;; boxes; results are unboxed back to plain SX. + +;; A default transport (Scheme source): always reports delivered. +(define ev-notify-ok-transport "(lambda (k p) (list (quote ok) (quote sent)))") + +(define + ev-notify-join + (fn + (parts sep) + (if + (empty? parts) + "" + (reduce (fn (acc p) (str acc sep p)) (first parts) (rest parts))))) + +(define ev-msg->quoted (fn (m) (str "(quote " (serialize m) ")"))) + +(define + ev-msgs->scheme + (fn + (msgs) + (str "(list " (ev-notify-join (map ev-msg->quoted msgs) " ") ")"))) + +(define + ev-unbox-str + (fn + (x) + (if (and (dict? x) (has-key? x :scm-string)) (get x :scm-string) x))) + +(define + ev-unbox-result + (fn (r) (map (fn (item) (map ev-unbox-str item)) r))) + +;; Deliver a list of SX messages through the digest flow over `transport-src` +;; (a Scheme (kind payload) -> (ok ..)|(retry reason) lambda source). `maxn` +;; bounds retries per message, `maxticks` bounds host service ticks. Returns the +;; per-message outcomes unboxed: (("delivered"|"failed" ) ...) +(define + ev/deliver-messages + (fn + (msgs transport-src maxn maxticks) + (ev-unbox-result + (ev/notify-run + (str + "(define msgs " + (ev-msgs->scheme msgs) + ") (if (null? msgs) (list) (let ((s (flow/start (ev-deliver-digest " + maxn + ") msgs))) (begin (flow-run-host " + transport-src + " " + maxticks + ") (flow/result (car (cdr s))))))"))))) diff --git a/lib/events/scoreboard.json b/lib/events/scoreboard.json index 35a7ffb0..18e86561 100644 --- a/lib/events/scoreboard.json +++ b/lib/events/scoreboard.json @@ -1,8 +1,8 @@ { "lang": "events", - "total_passed": 295, + "total_passed": 303, "total_failed": 0, - "total": 295, + "total": 303, "suites": [ {"name":"calendar","passed":51,"failed":0,"total":51}, {"name":"timezone","passed":17,"failed":0,"total":17}, @@ -13,7 +13,8 @@ {"name":"ticket","passed":31,"failed":0,"total":31}, {"name":"notify","passed":7,"failed":0,"total":7}, {"name":"reminders","passed":21,"failed":0,"total":21}, - {"name":"federation","passed":29,"failed":0,"total":29} + {"name":"federation","passed":29,"failed":0,"total":29}, + {"name":"integration","passed":8,"failed":0,"total":8} ], - "generated": "2026-06-07T09:30:28+00:00" + "generated": "2026-06-07T11:51:43+00:00" } diff --git a/lib/events/scoreboard.md b/lib/events/scoreboard.md index 28d99a92..15f6602f 100644 --- a/lib/events/scoreboard.md +++ b/lib/events/scoreboard.md @@ -1,6 +1,6 @@ # events scoreboard -**295 / 295 passing** (0 failure(s)). +**303 / 303 passing** (0 failure(s)). | Suite | Passed | Total | Status | |-------|--------|-------|--------| @@ -14,3 +14,4 @@ | notify | 7 | 7 | ok | | reminders | 21 | 21 | ok | | federation | 29 | 29 | ok | +| integration | 8 | 8 | ok | diff --git a/lib/events/tests/integration.sx b/lib/events/tests/integration.sx new file mode 100644 index 00000000..bfe2eb71 --- /dev/null +++ b/lib/events/tests/integration.sx @@ -0,0 +1,144 @@ +;; lib/events/tests/integration.sx — end-to-end pipeline: derive notification +;; messages (SX) -> deliver them through the durable notify flow (Scheme). + +(define ev-it-pass 0) +(define ev-it-fail 0) +(define ev-it-failures (list)) + +(define + ev-it-check! + (fn + (name got expected) + (if + (= got expected) + (set! ev-it-pass (+ ev-it-pass 1)) + (do + (set! ev-it-fail (+ ev-it-fail 1)) + (append! + ev-it-failures + (str name "\n expected: " expected "\n got: " got)))))) + +(define ev-it-status (fn (outcome) (first outcome))) +(define ev-it-id (fn (outcome) (first (rest outcome)))) + +;; A store with a weekly class; nia + ola booked into the first occurrence. +(define + ev-it-setup + (fn + (b) + (let + ((store (ev/schedule (ev/empty) (quote yoga) (ev-dt 2026 6 1 18 0) 60 {:freq :weekly :count 4 :byday (list 0 2)} 20))) + (let + ((occ1 (ev-occ (quote yoga) (ev-dt 2026 6 1 18 0) 60))) + (do + (ev/book-occ! b store (quote nia) occ1) + (ev/book-occ! b store (quote ola) occ1) + store))))) + +(define + ev-it-run-all! + (fn + () + (do + (let + ((b (persist/open))) + (let + ((store (ev-it-setup b))) + (let + ((reminders (ev/agenda-reminders b store (ev-date 2026 6 1) (ev-date 2026 7 1) 60))) + (let + ((msgs (map ev/reminder->msg reminders)) + (outcomes + (ev/deliver-messages + (map ev/reminder->msg reminders) + ev-notify-ok-transport + 3 + 20))) + (do + (ev-it-check! + "every booked attendee's reminder is delivered" + (map ev-it-status outcomes) + (list "delivered" "delivered")) + (ev-it-check! + "one delivery per derived reminder" + (len outcomes) + (len msgs)) + (ev-it-check! + "delivered ids match the reminder idempotency keys" + (map ev-it-id outcomes) + (map (fn (r) (get r :id)) reminders))))))) + (let + ((b (persist/open))) + (let + ((store (ev-it-setup b))) + (let + ((msgs (map ev/reminder->msg (ev/agenda-reminders b store (ev-date 2026 6 1) (ev-date 2026 7 1) 60)))) + (ev-it-check! + "a permanently-failing transport reports failed deliveries" + (map + ev-it-status + (ev/deliver-messages + msgs + "(lambda (k p) (list (quote retry) (quote down)))" + 2 + 20)) + (list "failed" "failed"))))) + (let + ((b (persist/open))) + (do + (ev/book! b "occ" 1 (quote nia)) + (ev/waitlist! b "occ" 1 (quote ola)) + (ev/cancel-promote! b "occ" 1 (quote nia)) + (let + ((promoted (ev/notify-of-kind (ev/booking-notifications b "occ" (quote yoga)) :promoted))) + (let + ((outcomes (ev/deliver-messages (map ev/booking-notify->msg promoted) ev-notify-ok-transport 3 12))) + (do + (ev-it-check! + "the waitlist-promotion notification is delivered" + (map ev-it-status outcomes) + (list "delivered")) + (ev-it-check! + "exactly one promotion was delivered" + (len outcomes) + 1)))))) + (let + ((b (persist/open))) + (let + ((ev (ev-event (quote yoga) (ev-dt 2026 6 1 18 0) 60 {:freq :daily :count 3} 20))) + (do + (ev/book-occ! + b + (ev/add-event (ev/empty) ev) + (quote nia) + (ev-occ + (quote yoga) + (ev-dt 2026 6 2 18 0) + 60)) + (let + ((moved (ev-with-override ev (ev-dt 2026 6 2 18 0) (ev-dt 2026 6 2 20 0) 60))) + (let + ((outcomes (ev/deliver-messages (map ev/reschedule-notify->msg (ev/reschedule-notifications b moved)) ev-notify-ok-transport 3 12))) + (ev-it-check! + "the reschedule notice is delivered to the booked attendee" + (map ev-it-status outcomes) + (list "delivered"))))))) + (ev-it-check! + "delivering no messages yields no outcomes" + (ev/deliver-messages + (list) + ev-notify-ok-transport + 3 + 12) + (list))))) + +(define + ev-integration-tests-run! + (fn + () + (do + (set! ev-it-pass 0) + (set! ev-it-fail 0) + (set! ev-it-failures (list)) + (ev-it-run-all!) + {:failures ev-it-failures :total (+ ev-it-pass ev-it-fail) :passed ev-it-pass :failed ev-it-fail}))) diff --git a/plans/events-on-sx.md b/plans/events-on-sx.md index 385129fa..5c62cfd4 100644 --- a/plans/events-on-sx.md +++ b/plans/events-on-sx.md @@ -18,7 +18,7 @@ capacity rules, transactional booking, and a flow-driven notification dispatcher ## Status (rolling) -`bash lib/events/conformance.sh` → **295/295** (Phases 1-4 + 8 ext: fed f/b, waitlist, EXDATE/RDATE, overrides, booking/reschedule-notify, fed transport, timezones+DST) +`bash lib/events/conformance.sh` → **303/303** (Phases 1-4 + 9 ext: fed f/b, waitlist, EXDATE/RDATE, overrides, booking/reschedule-notify, fed transport, timezones+DST, e2e delivery pipeline) ## Ground rules @@ -74,6 +74,8 @@ lib/events/api.sx ── (events/schedule) (events/book) (events/agenda) ── - [x] retry/backoff on transport failure (flow suspend/resume) - [x] tests: delivery success, retry path, idempotent re-send - [x] wire reminders to occurrences (`reminders.sx` — derive from agenda + roster) +- [x] end-to-end pipeline: derive (reminders/booking/reschedule) → deliver via + the notify flow (`ev/deliver-messages`, SX→Scheme bridge) - [ ] NOTE: shared with `feed/notify` — candidate for later extraction to a `delivery-on-sx` once a second consumer is real. **Delivery core (request→dispatch→resume, idempotent, bounded retry) is the extraction seam.** @@ -86,6 +88,16 @@ lib/events/api.sx ── (events/schedule) (events/book) (events/agenda) ── ## Progress log +- 2026-06-07 — End-to-end delivery pipeline (closes the derivation↔delivery + gap). `ev/deliver-messages` bridges SX notification messages to the Scheme + notify flow: each (id recipient body) is `serialize`d to s-expression text, + spliced as quoted data into the digest-flow program, delivered over an + injected transport-src, and results unboxed ({:scm-string}→str). New + integration suite drives all three derivations through delivery: reminders → + delivered (ids = idempotency keys), transient-fail transport → failed, + waitlist-promotion notification → delivered, reschedule notice → delivered, + empty batch → empty (guarded: an empty digest completes without suspending). + +8 tests, 303/303 green. - 2026-06-07 — Timezone + DST support (user request). `timezone.sx`: a tz maps wall-clock LOCAL ↔ absolute UTC (offset = local-utc). :fixed (constant) and :dst (std/dst offsets + two UTC transition rules, e.g. EU last-Sun-Mar/Oct