;; lib/events/notify.sx — durable notification delivery flows over an injected ;; transport (lib/flow). ;; ;; Reminders and digests are durable `flow`s: a flow `request`s delivery (a ;; suspend point), the HOST performs the actual send via an injected `dispatch` ;; (the transport — email/push/etc.), and resumes the flow with the outcome. ;; Because flow uses deterministic replay, a completed delivery is never re-run ;; on recovery; the host owns IO and persistence. ;; ;; Delivery is AT-LEAST-ONCE with idempotency. Each message carries an id (the ;; idempotency key). Two protections stop double-delivery: ;; 1. The transport dedups by id — a re-send of a delivered id is a no-op ;; that still reports ok, so a retry never produces two pings. ;; 2. flow's replay log records each resolved request, so recovery replays the ;; logged outcome instead of re-issuing the send. ;; ;; Retry/backoff rides flow suspend/resume: each attempt issues a request with a ;; DISTINCT tag `(deliver )` — distinct tags keep deterministic replay ;; correct across retries. The dispatch returns (ok info) to finish or ;; (retry reason) to try again, bounded by `maxn` (then (failed id reason)). ;; ;; A message is a 3-element list (id recipient body). The transport is generic ;; and injected — when feed/notify lands, both consumers share one transport, ;; so this delivery core is a candidate for extraction to `delivery-on-sx`. ;; ;; The Scheme flow source below loads into a flow env (see lib/flow/api.sx). ;; `ev/notify-run` prepends it to a caller program and evaluates in the shared ;; flow env. (define ev-notify-flows-src "(define (ev-msg-id m) (car m))\n (define (ev-msg-recipient m) (car (cdr m)))\n (define (ev-msg-body m) (car (cdr (cdr m))))\n (define (ev-mem x xs)\n (if (null? xs) #f (if (equal? x (car xs)) #t (ev-mem x (cdr xs)))))\n (define (ev-notify-attempt m n maxn)\n (let ((r (request (list (quote deliver) (ev-msg-id m) n) m)))\n (if (eq? (car r) (quote ok))\n (list (quote delivered) (ev-msg-id m) n)\n (if (>= n maxn)\n (list (quote failed) (ev-msg-id m) (car (cdr r)))\n (ev-notify-attempt m (+ n 1) maxn)))))\n (define (ev-deliver-reminder maxn)\n (flow-node (lambda (m) (ev-notify-attempt m 1 maxn))))\n (define (ev-digest-step ms maxn)\n (if (null? ms)\n (list)\n (cons (ev-notify-attempt (car ms) 1 maxn)\n (ev-digest-step (cdr ms) maxn))))\n (define (ev-deliver-digest maxn)\n (flow-node (lambda (ms) (ev-digest-step ms maxn))))") ;; Run a Scheme flow program with the notify flows preloaded, in the shared ;; flow env. Returns the program's value (SX-native). (define ev/notify-run (fn (prog) (flow-run (str ev-notify-flows-src "\n" prog)))) ;; ---- end-to-end delivery: SX messages -> the notify flow ---- ;; Bridges the SX notification-derivation modules (reminders / booking-notify / ;; reschedule) to the durable delivery flow. An SX message (id recipient body) ;; is serialized to s-expression text and spliced into the Scheme program as ;; quoted data, then the digest flow delivers the batch over an injected ;; transport. Strings round-trip through the guest Scheme as {:scm-string ...} ;; boxes; results are unboxed back to plain SX. ;; A default transport (Scheme source): always reports delivered. (define ev-notify-ok-transport "(lambda (k p) (list (quote ok) (quote sent)))") (define ev-notify-join (fn (parts sep) (if (empty? parts) "" (reduce (fn (acc p) (str acc sep p)) (first parts) (rest parts))))) (define ev-msg->quoted (fn (m) (str "(quote " (serialize m) ")"))) (define ev-msgs->scheme (fn (msgs) (str "(list " (ev-notify-join (map ev-msg->quoted msgs) " ") ")"))) (define ev-unbox-str (fn (x) (if (and (dict? x) (has-key? x :scm-string)) (get x :scm-string) x))) (define ev-unbox-result (fn (r) (map (fn (item) (map ev-unbox-str item)) r))) ;; Deliver a list of SX messages through the digest flow over `transport-src` ;; (a Scheme (kind payload) -> (ok ..)|(retry reason) lambda source). `maxn` ;; bounds retries per message, `maxticks` bounds host service ticks. Returns the ;; per-message outcomes unboxed: (("delivered"|"failed" ) ...) (define ev/deliver-messages (fn (msgs transport-src maxn maxticks) (ev-unbox-result (ev/notify-run (str "(define msgs " (ev-msgs->scheme msgs) ") (if (null? msgs) (list) (let ((s (flow/start (ev-deliver-digest " maxn ") msgs))) (begin (flow-run-host " transport-src " " maxticks ") (flow/result (car (cdr s))))))")))))