Files
rose-ash/lib/events/notify.sx
giles 1446eaaa47
Some checks failed
Test, Build, and Deploy / test-build-deploy (push) Failing after 1m1s
events: end-to-end delivery pipeline (derivation -> notify flow) + 8 tests
ev/deliver-messages bridges SX notification messages to the Scheme notify
flow: each (id recipient body) is serialized to s-expr text, spliced as quoted
data into the digest-flow program, delivered over an injected transport, and
results unboxed. Integration suite drives all three derivations (reminders /
booking-notify / reschedule) through delivery end to end; empty batch guarded
(empty digest completes without suspending). 303/303 green.

Co-Authored-By: Claude Opus 4.8 (1M context) <noreply@anthropic.com>
2026-06-07 11:52:00 +00:00

98 lines
4.7 KiB
Plaintext

;; lib/events/notify.sx — durable notification delivery flows over an injected
;; transport (lib/flow).
;;
;; Reminders and digests are durable `flow`s: a flow `request`s delivery (a
;; suspend point), the HOST performs the actual send via an injected `dispatch`
;; (the transport — email/push/etc.), and resumes the flow with the outcome.
;; Because flow uses deterministic replay, a completed delivery is never re-run
;; on recovery; the host owns IO and persistence.
;;
;; Delivery is AT-LEAST-ONCE with idempotency. Each message carries an id (the
;; idempotency key). Two protections stop double-delivery:
;; 1. The transport dedups by id — a re-send of a delivered id is a no-op
;; that still reports ok, so a retry never produces two pings.
;; 2. flow's replay log records each resolved request, so recovery replays the
;; logged outcome instead of re-issuing the send.
;;
;; Retry/backoff rides flow suspend/resume: each attempt issues a request with a
;; DISTINCT tag `(deliver <id> <n>)` — distinct tags keep deterministic replay
;; correct across retries. The dispatch returns (ok info) to finish or
;; (retry reason) to try again, bounded by `maxn` (then (failed id reason)).
;;
;; A message is a 3-element list (id recipient body). The transport is generic
;; and injected — when feed/notify lands, both consumers share one transport,
;; so this delivery core is a candidate for extraction to `delivery-on-sx`.
;;
;; The Scheme flow source below loads into a flow env (see lib/flow/api.sx).
;; `ev/notify-run` prepends it to a caller program and evaluates in the shared
;; flow env.
(define
ev-notify-flows-src
"(define (ev-msg-id m) (car m))\n (define (ev-msg-recipient m) (car (cdr m)))\n (define (ev-msg-body m) (car (cdr (cdr m))))\n (define (ev-mem x xs)\n (if (null? xs) #f (if (equal? x (car xs)) #t (ev-mem x (cdr xs)))))\n (define (ev-notify-attempt m n maxn)\n (let ((r (request (list (quote deliver) (ev-msg-id m) n) m)))\n (if (eq? (car r) (quote ok))\n (list (quote delivered) (ev-msg-id m) n)\n (if (>= n maxn)\n (list (quote failed) (ev-msg-id m) (car (cdr r)))\n (ev-notify-attempt m (+ n 1) maxn)))))\n (define (ev-deliver-reminder maxn)\n (flow-node (lambda (m) (ev-notify-attempt m 1 maxn))))\n (define (ev-digest-step ms maxn)\n (if (null? ms)\n (list)\n (cons (ev-notify-attempt (car ms) 1 maxn)\n (ev-digest-step (cdr ms) maxn))))\n (define (ev-deliver-digest maxn)\n (flow-node (lambda (ms) (ev-digest-step ms maxn))))")
;; Run a Scheme flow program with the notify flows preloaded, in the shared
;; flow env. Returns the program's value (SX-native).
(define
ev/notify-run
(fn (prog) (flow-run (str ev-notify-flows-src "\n" prog))))
;; ---- end-to-end delivery: SX messages -> the notify flow ----
;; Bridges the SX notification-derivation modules (reminders / booking-notify /
;; reschedule) to the durable delivery flow. An SX message (id recipient body)
;; is serialized to s-expression text and spliced into the Scheme program as
;; quoted data, then the digest flow delivers the batch over an injected
;; transport. Strings round-trip through the guest Scheme as {:scm-string ...}
;; boxes; results are unboxed back to plain SX.
;; A default transport (Scheme source): always reports delivered.
(define ev-notify-ok-transport "(lambda (k p) (list (quote ok) (quote sent)))")
(define
ev-notify-join
(fn
(parts sep)
(if
(empty? parts)
""
(reduce (fn (acc p) (str acc sep p)) (first parts) (rest parts)))))
(define ev-msg->quoted (fn (m) (str "(quote " (serialize m) ")")))
(define
ev-msgs->scheme
(fn
(msgs)
(str "(list " (ev-notify-join (map ev-msg->quoted msgs) " ") ")")))
(define
ev-unbox-str
(fn
(x)
(if (and (dict? x) (has-key? x :scm-string)) (get x :scm-string) x)))
(define
ev-unbox-result
(fn (r) (map (fn (item) (map ev-unbox-str item)) r)))
;; Deliver a list of SX messages through the digest flow over `transport-src`
;; (a Scheme (kind payload) -> (ok ..)|(retry reason) lambda source). `maxn`
;; bounds retries per message, `maxticks` bounds host service ticks. Returns the
;; per-message outcomes unboxed: (("delivered"|"failed" <id> <n-or-reason>) ...)
(define
ev/deliver-messages
(fn
(msgs transport-src maxn maxticks)
(ev-unbox-result
(ev/notify-run
(str
"(define msgs "
(ev-msgs->scheme msgs)
") (if (null? msgs) (list) (let ((s (flow/start (ev-deliver-digest "
maxn
") msgs))) (begin (flow-run-host "
transport-src
" "
maxticks
") (flow/result (car (cdr s))))))")))))