; feed/fed — federation. Outbound: a local post fans out, then splits into local ; vs remote inboxes; remote events are handed to an injected send-fn. Inbound: ; peer activities merge into the local stream, deduped. Backfill: pull peer ; history via an injected fetch-fn and merge. ; ; remote? / send-fn / fetch-fn are injected so real fed-sx transport wires in here ; without feed depending on it. ; ; Requires: lib/feed/normalize.sx, lib/feed/stream.sx, lib/feed/fanout.sx, ; lib/feed/dedupe.sx. ; --- merge / ingest --------------------------------------------------------- (define feed/merge (fn (s1 s2) (feed/stream (append (feed/items s1) (feed/items s2))))) ; merge a peer stream into local, dropping (actor verb object) duplicates (define feed/ingest (fn (local peer) (feed/dedupe-activities (feed/merge local peer)))) ; --- inbound ---------------------------------------------------------------- ; peer pushes raw activities to the local inbox; normalize + ingest (define feed/inbound (fn (local raw-activities) (feed/ingest local (feed/stream (map feed/normalize raw-activities))))) ; backfill on subscribe: pull peer history via fetch-fn, normalize, ingest (define feed/backfill (fn (local fetch-fn peer-id) (feed/inbound local (fetch-fn peer-id)))) ; --- outbound --------------------------------------------------------------- ; split an inbox into local vs remote deliveries by viewer-id predicate (define feed/partition-inbox (fn (inbox remote?) {:local (feed/filter inbox (fn (ev) (not (remote? (get ev :to))))) :remote (feed/filter inbox (fn (ev) (remote? (get ev :to))))})) ; fan a stream out over the graph, then partition by locality (define feed/federate (fn (stream graph remote?) (feed/partition-inbox (feed/fanout stream graph) remote?))) ; deliver: hand each remote event to send-fn, return the local inbox to enqueue (define feed/deliver (fn (stream graph remote? send-fn) (let ((parts (feed/federate stream graph remote?))) (begin (for-each (fn (ev) (send-fn (get ev :to) (get ev :activity))) (feed/items (get parts :remote))) (get parts :local)))))