; persist/project — a projection folds a stream's events into a read model. ; A projection state is {:value v :seq s} where s is the last seq folded in, ; so a projection can resume incrementally from where it left off (replay only ; the tail). step : (value event) -> value. Determinism: step must be pure — ; time lives on the event (event-at), never a clock here. ; Requires: lib/persist/event.sx, lib/persist/log.sx. ; fold the tail (events with seq > prior's seq) onto a prior projection state (define persist/project-resume (fn (b stream step prior) (let ((tail (persist/read-from b stream (+ 1 (get prior :seq))))) (reduce (fn (acc e) {:value (step (get acc :value) e) :seq (persist/event-seq e)}) prior tail)))) ; project the whole stream from seed (define persist/project (fn (b stream step seed) (persist/project-resume b stream step {:value seed :seq 0}))) (define persist/project-value (fn (p) (get p :value))) (define persist/project-seq (fn (p) (get p :seq))) ; convenience: project and return just the value (define persist/project-fold (fn (b stream step seed) (persist/project-value (persist/project b stream step seed))))