Some checks failed
Test, Build, and Deploy / test-build-deploy (push) Failing after 1m1s
ev/deliver-messages bridges SX notification messages to the Scheme notify flow: each (id recipient body) is serialized to s-expr text, spliced as quoted data into the digest-flow program, delivered over an injected transport, and results unboxed. Integration suite drives all three derivations (reminders / booking-notify / reschedule) through delivery end to end; empty batch guarded (empty digest completes without suspending). 303/303 green. Co-Authored-By: Claude Opus 4.8 (1M context) <noreply@anthropic.com>
98 lines
4.7 KiB
Plaintext
98 lines
4.7 KiB
Plaintext
;; lib/events/notify.sx — durable notification delivery flows over an injected
|
|
;; transport (lib/flow).
|
|
;;
|
|
;; Reminders and digests are durable `flow`s: a flow `request`s delivery (a
|
|
;; suspend point), the HOST performs the actual send via an injected `dispatch`
|
|
;; (the transport — email/push/etc.), and resumes the flow with the outcome.
|
|
;; Because flow uses deterministic replay, a completed delivery is never re-run
|
|
;; on recovery; the host owns IO and persistence.
|
|
;;
|
|
;; Delivery is AT-LEAST-ONCE with idempotency. Each message carries an id (the
|
|
;; idempotency key). Two protections stop double-delivery:
|
|
;; 1. The transport dedups by id — a re-send of a delivered id is a no-op
|
|
;; that still reports ok, so a retry never produces two pings.
|
|
;; 2. flow's replay log records each resolved request, so recovery replays the
|
|
;; logged outcome instead of re-issuing the send.
|
|
;;
|
|
;; Retry/backoff rides flow suspend/resume: each attempt issues a request with a
|
|
;; DISTINCT tag `(deliver <id> <n>)` — distinct tags keep deterministic replay
|
|
;; correct across retries. The dispatch returns (ok info) to finish or
|
|
;; (retry reason) to try again, bounded by `maxn` (then (failed id reason)).
|
|
;;
|
|
;; A message is a 3-element list (id recipient body). The transport is generic
|
|
;; and injected — when feed/notify lands, both consumers share one transport,
|
|
;; so this delivery core is a candidate for extraction to `delivery-on-sx`.
|
|
;;
|
|
;; The Scheme flow source below loads into a flow env (see lib/flow/api.sx).
|
|
;; `ev/notify-run` prepends it to a caller program and evaluates in the shared
|
|
;; flow env.
|
|
|
|
(define
|
|
ev-notify-flows-src
|
|
"(define (ev-msg-id m) (car m))\n (define (ev-msg-recipient m) (car (cdr m)))\n (define (ev-msg-body m) (car (cdr (cdr m))))\n (define (ev-mem x xs)\n (if (null? xs) #f (if (equal? x (car xs)) #t (ev-mem x (cdr xs)))))\n (define (ev-notify-attempt m n maxn)\n (let ((r (request (list (quote deliver) (ev-msg-id m) n) m)))\n (if (eq? (car r) (quote ok))\n (list (quote delivered) (ev-msg-id m) n)\n (if (>= n maxn)\n (list (quote failed) (ev-msg-id m) (car (cdr r)))\n (ev-notify-attempt m (+ n 1) maxn)))))\n (define (ev-deliver-reminder maxn)\n (flow-node (lambda (m) (ev-notify-attempt m 1 maxn))))\n (define (ev-digest-step ms maxn)\n (if (null? ms)\n (list)\n (cons (ev-notify-attempt (car ms) 1 maxn)\n (ev-digest-step (cdr ms) maxn))))\n (define (ev-deliver-digest maxn)\n (flow-node (lambda (ms) (ev-digest-step ms maxn))))")
|
|
|
|
;; Run a Scheme flow program with the notify flows preloaded, in the shared
|
|
;; flow env. Returns the program's value (SX-native).
|
|
(define
|
|
ev/notify-run
|
|
(fn (prog) (flow-run (str ev-notify-flows-src "\n" prog))))
|
|
|
|
;; ---- end-to-end delivery: SX messages -> the notify flow ----
|
|
;; Bridges the SX notification-derivation modules (reminders / booking-notify /
|
|
;; reschedule) to the durable delivery flow. An SX message (id recipient body)
|
|
;; is serialized to s-expression text and spliced into the Scheme program as
|
|
;; quoted data, then the digest flow delivers the batch over an injected
|
|
;; transport. Strings round-trip through the guest Scheme as {:scm-string ...}
|
|
;; boxes; results are unboxed back to plain SX.
|
|
|
|
;; A default transport (Scheme source): always reports delivered.
|
|
(define ev-notify-ok-transport "(lambda (k p) (list (quote ok) (quote sent)))")
|
|
|
|
(define
|
|
ev-notify-join
|
|
(fn
|
|
(parts sep)
|
|
(if
|
|
(empty? parts)
|
|
""
|
|
(reduce (fn (acc p) (str acc sep p)) (first parts) (rest parts)))))
|
|
|
|
(define ev-msg->quoted (fn (m) (str "(quote " (serialize m) ")")))
|
|
|
|
(define
|
|
ev-msgs->scheme
|
|
(fn
|
|
(msgs)
|
|
(str "(list " (ev-notify-join (map ev-msg->quoted msgs) " ") ")")))
|
|
|
|
(define
|
|
ev-unbox-str
|
|
(fn
|
|
(x)
|
|
(if (and (dict? x) (has-key? x :scm-string)) (get x :scm-string) x)))
|
|
|
|
(define
|
|
ev-unbox-result
|
|
(fn (r) (map (fn (item) (map ev-unbox-str item)) r)))
|
|
|
|
;; Deliver a list of SX messages through the digest flow over `transport-src`
|
|
;; (a Scheme (kind payload) -> (ok ..)|(retry reason) lambda source). `maxn`
|
|
;; bounds retries per message, `maxticks` bounds host service ticks. Returns the
|
|
;; per-message outcomes unboxed: (("delivered"|"failed" <id> <n-or-reason>) ...)
|
|
(define
|
|
ev/deliver-messages
|
|
(fn
|
|
(msgs transport-src maxn maxticks)
|
|
(ev-unbox-result
|
|
(ev/notify-run
|
|
(str
|
|
"(define msgs "
|
|
(ev-msgs->scheme msgs)
|
|
") (if (null? msgs) (list) (let ((s (flow/start (ev-deliver-digest "
|
|
maxn
|
|
") msgs))) (begin (flow-run-host "
|
|
transport-src
|
|
" "
|
|
maxticks
|
|
") (flow/result (car (cdr s))))))")))))
|