Merge loops/events into architecture: events-on-sx end-to-end delivery pipeline (303 tests, 12 suites)

Adds the SX->Scheme delivery bridge (ev/deliver-messages): notification-
derivation modules (reminders/booking-lifecycle/reschedule) now flow through
the durable notify flow end to end, with an integration suite covering
delivery success, transient-failure, and empty-batch paths.
This commit is contained in:
2026-06-07 12:25:34 +00:00
6 changed files with 224 additions and 6 deletions

View File

@@ -57,4 +57,5 @@ SUITES=(
"notify:lib/events/tests/notify.sx:(ev-notify-tests-run!)"
"reminders:lib/events/tests/reminders.sx:(ev-reminders-tests-run!)"
"federation:lib/events/tests/federation.sx:(ev-federation-tests-run!)"
"integration:lib/events/tests/integration.sx:(ev-integration-tests-run!)"
)

View File

@@ -36,3 +36,62 @@
(define
ev/notify-run
(fn (prog) (flow-run (str ev-notify-flows-src "\n" prog))))
;; ---- end-to-end delivery: SX messages -> the notify flow ----
;; Bridges the SX notification-derivation modules (reminders / booking-notify /
;; reschedule) to the durable delivery flow. An SX message (id recipient body)
;; is serialized to s-expression text and spliced into the Scheme program as
;; quoted data, then the digest flow delivers the batch over an injected
;; transport. Strings round-trip through the guest Scheme as {:scm-string ...}
;; boxes; results are unboxed back to plain SX.
;; A default transport (Scheme source): always reports delivered.
(define ev-notify-ok-transport "(lambda (k p) (list (quote ok) (quote sent)))")
(define
ev-notify-join
(fn
(parts sep)
(if
(empty? parts)
""
(reduce (fn (acc p) (str acc sep p)) (first parts) (rest parts)))))
(define ev-msg->quoted (fn (m) (str "(quote " (serialize m) ")")))
(define
ev-msgs->scheme
(fn
(msgs)
(str "(list " (ev-notify-join (map ev-msg->quoted msgs) " ") ")")))
(define
ev-unbox-str
(fn
(x)
(if (and (dict? x) (has-key? x :scm-string)) (get x :scm-string) x)))
(define
ev-unbox-result
(fn (r) (map (fn (item) (map ev-unbox-str item)) r)))
;; Deliver a list of SX messages through the digest flow over `transport-src`
;; (a Scheme (kind payload) -> (ok ..)|(retry reason) lambda source). `maxn`
;; bounds retries per message, `maxticks` bounds host service ticks. Returns the
;; per-message outcomes unboxed: (("delivered"|"failed" <id> <n-or-reason>) ...)
(define
ev/deliver-messages
(fn
(msgs transport-src maxn maxticks)
(ev-unbox-result
(ev/notify-run
(str
"(define msgs "
(ev-msgs->scheme msgs)
") (if (null? msgs) (list) (let ((s (flow/start (ev-deliver-digest "
maxn
") msgs))) (begin (flow-run-host "
transport-src
" "
maxticks
") (flow/result (car (cdr s))))))")))))

View File

@@ -1,8 +1,8 @@
{
"lang": "events",
"total_passed": 295,
"total_passed": 303,
"total_failed": 0,
"total": 295,
"total": 303,
"suites": [
{"name":"calendar","passed":51,"failed":0,"total":51},
{"name":"timezone","passed":17,"failed":0,"total":17},
@@ -13,7 +13,8 @@
{"name":"ticket","passed":31,"failed":0,"total":31},
{"name":"notify","passed":7,"failed":0,"total":7},
{"name":"reminders","passed":21,"failed":0,"total":21},
{"name":"federation","passed":29,"failed":0,"total":29}
{"name":"federation","passed":29,"failed":0,"total":29},
{"name":"integration","passed":8,"failed":0,"total":8}
],
"generated": "2026-06-07T09:30:28+00:00"
"generated": "2026-06-07T11:51:43+00:00"
}

View File

@@ -1,6 +1,6 @@
# events scoreboard
**295 / 295 passing** (0 failure(s)).
**303 / 303 passing** (0 failure(s)).
| Suite | Passed | Total | Status |
|-------|--------|-------|--------|
@@ -14,3 +14,4 @@
| notify | 7 | 7 | ok |
| reminders | 21 | 21 | ok |
| federation | 29 | 29 | ok |
| integration | 8 | 8 | ok |

View File

@@ -0,0 +1,144 @@
;; lib/events/tests/integration.sx — end-to-end pipeline: derive notification
;; messages (SX) -> deliver them through the durable notify flow (Scheme).
(define ev-it-pass 0)
(define ev-it-fail 0)
(define ev-it-failures (list))
(define
ev-it-check!
(fn
(name got expected)
(if
(= got expected)
(set! ev-it-pass (+ ev-it-pass 1))
(do
(set! ev-it-fail (+ ev-it-fail 1))
(append!
ev-it-failures
(str name "\n expected: " expected "\n got: " got))))))
(define ev-it-status (fn (outcome) (first outcome)))
(define ev-it-id (fn (outcome) (first (rest outcome))))
;; A store with a weekly class; nia + ola booked into the first occurrence.
(define
ev-it-setup
(fn
(b)
(let
((store (ev/schedule (ev/empty) (quote yoga) (ev-dt 2026 6 1 18 0) 60 {:freq :weekly :count 4 :byday (list 0 2)} 20)))
(let
((occ1 (ev-occ (quote yoga) (ev-dt 2026 6 1 18 0) 60)))
(do
(ev/book-occ! b store (quote nia) occ1)
(ev/book-occ! b store (quote ola) occ1)
store)))))
(define
ev-it-run-all!
(fn
()
(do
(let
((b (persist/open)))
(let
((store (ev-it-setup b)))
(let
((reminders (ev/agenda-reminders b store (ev-date 2026 6 1) (ev-date 2026 7 1) 60)))
(let
((msgs (map ev/reminder->msg reminders))
(outcomes
(ev/deliver-messages
(map ev/reminder->msg reminders)
ev-notify-ok-transport
3
20)))
(do
(ev-it-check!
"every booked attendee's reminder is delivered"
(map ev-it-status outcomes)
(list "delivered" "delivered"))
(ev-it-check!
"one delivery per derived reminder"
(len outcomes)
(len msgs))
(ev-it-check!
"delivered ids match the reminder idempotency keys"
(map ev-it-id outcomes)
(map (fn (r) (get r :id)) reminders)))))))
(let
((b (persist/open)))
(let
((store (ev-it-setup b)))
(let
((msgs (map ev/reminder->msg (ev/agenda-reminders b store (ev-date 2026 6 1) (ev-date 2026 7 1) 60))))
(ev-it-check!
"a permanently-failing transport reports failed deliveries"
(map
ev-it-status
(ev/deliver-messages
msgs
"(lambda (k p) (list (quote retry) (quote down)))"
2
20))
(list "failed" "failed")))))
(let
((b (persist/open)))
(do
(ev/book! b "occ" 1 (quote nia))
(ev/waitlist! b "occ" 1 (quote ola))
(ev/cancel-promote! b "occ" 1 (quote nia))
(let
((promoted (ev/notify-of-kind (ev/booking-notifications b "occ" (quote yoga)) :promoted)))
(let
((outcomes (ev/deliver-messages (map ev/booking-notify->msg promoted) ev-notify-ok-transport 3 12)))
(do
(ev-it-check!
"the waitlist-promotion notification is delivered"
(map ev-it-status outcomes)
(list "delivered"))
(ev-it-check!
"exactly one promotion was delivered"
(len outcomes)
1))))))
(let
((b (persist/open)))
(let
((ev (ev-event (quote yoga) (ev-dt 2026 6 1 18 0) 60 {:freq :daily :count 3} 20)))
(do
(ev/book-occ!
b
(ev/add-event (ev/empty) ev)
(quote nia)
(ev-occ
(quote yoga)
(ev-dt 2026 6 2 18 0)
60))
(let
((moved (ev-with-override ev (ev-dt 2026 6 2 18 0) (ev-dt 2026 6 2 20 0) 60)))
(let
((outcomes (ev/deliver-messages (map ev/reschedule-notify->msg (ev/reschedule-notifications b moved)) ev-notify-ok-transport 3 12)))
(ev-it-check!
"the reschedule notice is delivered to the booked attendee"
(map ev-it-status outcomes)
(list "delivered")))))))
(ev-it-check!
"delivering no messages yields no outcomes"
(ev/deliver-messages
(list)
ev-notify-ok-transport
3
12)
(list)))))
(define
ev-integration-tests-run!
(fn
()
(do
(set! ev-it-pass 0)
(set! ev-it-fail 0)
(set! ev-it-failures (list))
(ev-it-run-all!)
{:failures ev-it-failures :total (+ ev-it-pass ev-it-fail) :passed ev-it-pass :failed ev-it-fail})))

View File

@@ -18,7 +18,7 @@ capacity rules, transactional booking, and a flow-driven notification dispatcher
## Status (rolling)
`bash lib/events/conformance.sh`**295/295** (Phases 1-4 + 8 ext: fed f/b, waitlist, EXDATE/RDATE, overrides, booking/reschedule-notify, fed transport, timezones+DST)
`bash lib/events/conformance.sh`**303/303** (Phases 1-4 + 9 ext: fed f/b, waitlist, EXDATE/RDATE, overrides, booking/reschedule-notify, fed transport, timezones+DST, e2e delivery pipeline)
## Ground rules
@@ -74,6 +74,8 @@ lib/events/api.sx ── (events/schedule) (events/book) (events/agenda) ──
- [x] retry/backoff on transport failure (flow suspend/resume)
- [x] tests: delivery success, retry path, idempotent re-send
- [x] wire reminders to occurrences (`reminders.sx` — derive from agenda + roster)
- [x] end-to-end pipeline: derive (reminders/booking/reschedule) → deliver via
the notify flow (`ev/deliver-messages`, SX→Scheme bridge)
- [ ] NOTE: shared with `feed/notify` — candidate for later extraction to a
`delivery-on-sx` once a second consumer is real. **Delivery core
(request→dispatch→resume, idempotent, bounded retry) is the extraction seam.**
@@ -86,6 +88,16 @@ lib/events/api.sx ── (events/schedule) (events/book) (events/agenda) ──
## Progress log
- 2026-06-07 — End-to-end delivery pipeline (closes the derivation↔delivery
gap). `ev/deliver-messages` bridges SX notification messages to the Scheme
notify flow: each (id recipient body) is `serialize`d to s-expression text,
spliced as quoted data into the digest-flow program, delivered over an
injected transport-src, and results unboxed ({:scm-string}→str). New
integration suite drives all three derivations through delivery: reminders →
delivered (ids = idempotency keys), transient-fail transport → failed,
waitlist-promotion notification → delivered, reschedule notice → delivered,
empty batch → empty (guarded: an empty digest completes without suspending).
+8 tests, 303/303 green.
- 2026-06-07 — Timezone + DST support (user request). `timezone.sx`: a tz maps
wall-clock LOCAL ↔ absolute UTC (offset = local-utc). :fixed (constant) and
:dst (std/dst offsets + two UTC transition rules, e.g. EU last-Sun-Mar/Oct