H6: durable activity dedup — same :id processed at most once, ever (TDD)
Failing tests first (3 red: a redelivered activity reran its behavior — behavior/process starts from an empty trace, so dedup evaporated per call). host/blog--process-local! now atomically claims the :id on persist stream 'activities:processed' via ev/book! (the same append-expect acquire as seats/votes) and returns a :deduped trace on duplicates. Store-backed → survives outbox retries AND restarts. Prerequisite for non-idempotent effects (payment). Id-less activities process unchecked. blog suite 250/250 (+3). Co-Authored-By: Claude Opus 4.8 <noreply@anthropic.com>
This commit is contained in:
@@ -385,10 +385,22 @@
|
||||
(when (and v (= (type-of v) "list")) (set! host/blog--pending-log v)))))
|
||||
;; P2/TA-live: process an activity through the seam locally (fire behaviors + record suspensions).
|
||||
;; Shared by emit! (our own state changes) and receive! (a peer's, arriving via /inbox).
|
||||
;; H6: DURABLE idempotency — an activity :id is processed AT MOST ONCE, ever. The id is claimed
|
||||
;; atomically on the persist stream "activities:processed" (ev/book! — the same append-expect
|
||||
;; acquire as seats/votes), so outbox redelivery and restart replay can't rerun behaviors. This is
|
||||
;; the prerequisite for non-idempotent effects (payment). An id-less activity processes unchecked.
|
||||
(define host/blog--processed-stream "activities:processed")
|
||||
(define host/blog--claim-activity?
|
||||
(fn (aid)
|
||||
(or (nil? aid) (= aid "")
|
||||
(= (get (ev/book! host/blog-store host/blog--processed-stream 1000000000 aid) :status) :booked))))
|
||||
(define host/blog--process-local!
|
||||
(fn (a)
|
||||
(let ((tr (behavior/process host/blog--publish-engine a)))
|
||||
(begin (for-each (fn (s) (host/blog--record-pending! a s)) (get tr :suspended)) tr))))
|
||||
(if (not (host/blog--claim-activity? (get a :id)))
|
||||
{:emitted (list) :ran (list) :effects (list) :suspended (list) :failed (list)
|
||||
:seen (list (get a :id)) :deduped true}
|
||||
(let ((tr (behavior/process host/blog--publish-engine a)))
|
||||
(begin (for-each (fn (s) (host/blog--record-pending! a s)) (get tr :suspended)) tr)))))
|
||||
;; ── TA-live: the durable OUTBOX (fed-sx reliability) ──────────────────
|
||||
;; Emitted activities are QUEUED per-peer (durable) and delivered BEST-EFFORT. A peer being DOWN
|
||||
;; does NOT fail the local emit — delivery is GUARDED, and a failed item stays queued for retry (on
|
||||
|
||||
@@ -1504,6 +1504,36 @@
|
||||
(set! host/blog--shop-base host-bl-h5-shop-was)
|
||||
(set! host/blog--mint-ticket host-bl-h5-mint-was)
|
||||
|
||||
;; ── HARDENING H6: DURABLE activity dedup — same :id processed at most once, store-backed ──
|
||||
;; behavior/process starts from an empty trace each call, so redelivery (outbox retry, restart
|
||||
;; replay) reran behaviors. Now process-local! atomically claims the id on stream
|
||||
;; "activities:processed" (ev/book! — same acquire as seats/votes) and skips duplicates.
|
||||
;; Prerequisite for any NON-idempotent effect (payment).
|
||||
(host/blog-use-store! (persist/open))
|
||||
(host/blog-seed! "h6type" "h6type" "(article (h1 \"t\"))" "published")
|
||||
(host/blog--register-dag! "h6-dag" (quote (effect h6-ping (field "slug"))))
|
||||
(host/blog--set-type-behavior! "h6type" (list {"verb" "ping" "type" "h6type" "dag" "h6-dag"}))
|
||||
(host/blog--load-behaviors!)
|
||||
(set! host/blog--flow-log (list))
|
||||
(define host-bl-h6-act
|
||||
{:verb "ping" :actor "test" :object "h6x" :object-type "h6type" :slug "h6x"
|
||||
:delta "ping" :id "ping:h6x"})
|
||||
|
||||
(host-bl-test "H6: same activity id processed twice -> behavior runs ONCE"
|
||||
(begin
|
||||
(host/blog--process-local! host-bl-h6-act)
|
||||
(host/blog--process-local! host-bl-h6-act) ;; redelivery
|
||||
(len (filter (fn (e) (= (get e "verb") "h6-ping")) host/blog--flow-log)))
|
||||
1)
|
||||
(host-bl-test "H6: the processed id is on the store (survives restarts)"
|
||||
(contains? (ev/roster host/blog-store "activities:processed") "ping:h6x")
|
||||
true)
|
||||
(host-bl-test "H6: a DIFFERENT id still processes"
|
||||
(begin
|
||||
(host/blog--process-local! (assoc host-bl-h6-act :id "ping:h6y" :slug "h6y" :object "h6y"))
|
||||
(len (filter (fn (e) (= (get e "verb") "h6-ping")) host/blog--flow-log)))
|
||||
2)
|
||||
|
||||
(define
|
||||
host-bl-tests-run!
|
||||
(fn ()
|
||||
|
||||
Reference in New Issue
Block a user