events: injected federation transport (fed-sx-ready) + 6 tests
Some checks failed
Test, Build, and Deploy / test-build-deploy (push) Failing after 41s

fetch abstracts how a peer's agenda arrives: (fetch peer-id ws we) ->
{:status :ok :occurrences} | {:status :error}. ev/federated-agenda-via merges
local + trusted peers fetched via the transport; unreachable peers degrade
gracefully. ev/peer-fetch = in-process adapter; ev/federation-status reports
reachability. A real fed-sx transport drops in unchanged. 278/278 green.

Co-Authored-By: Claude Opus 4.8 (1M context) <noreply@anthropic.com>
This commit is contained in:
2026-06-07 08:12:37 +00:00
parent 07e4cb5f4a
commit c991c7c3d3
5 changed files with 142 additions and 8 deletions

View File

@@ -163,3 +163,70 @@
(some
(fn (iv) (ev-fed-overlaps? iv qs qe))
(ev/federated-busy local-db peers trust actor)))))
;; ---- injected transport (real fed-sx / signed fetch) ----
;; The in-process merge above expands a peer's local :store directly. In
;; production a peer's agenda arrives over a transport. `fetch` abstracts that:
;; (fetch peer-id ws we) -> {:status :ok :occurrences (...)} | {:status :error :reason ...}
;; The same merge works for any transport; an unreachable peer (:error) is
;; skipped (graceful degradation), never breaking the agenda.
(define
ev-find-peer
(fn
(peers pid)
(cond
((empty? peers) nil)
((= (ev/peer-id (first peers)) pid) (first peers))
(else (ev-find-peer (rest peers) pid)))))
;; In-process transport adapter: resolves a peer-id against a peer list and
;; expands its :store. Lets the in-process model run through the same `fetch`
;; interface a remote transport implements.
(define
ev/peer-fetch
(fn
(peers)
(fn
(pid ws we)
(let
((p (ev-find-peer peers pid)))
(if
(nil? p)
{:status :error :reason :unknown-peer}
{:status :ok :occurrences (ev/agenda (ev/peer-store p) ws we)})))))
;; Local agenda (:local) merged with each trusted peer's agenda fetched via the
;; injected `fetch` transport, sorted by start, tagged with :origin. Peers that
;; fail to fetch contribute nothing.
(define
ev/federated-agenda-via
(fn
(local-store trusted-ids ws we fetch)
(let
((acc (list)))
(begin
(for-each
(fn (o) (append! acc o))
(ev-tag-origin (ev/agenda local-store ws we) :local))
(for-each
(fn
(pid)
(let
((res (fetch pid ws we)))
(when
(= (get res :status) :ok)
(for-each
(fn (o) (append! acc o))
(ev-tag-origin (get res :occurrences) pid)))))
trusted-ids)
(ev-fed-sort acc)))))
;; Reachability report: ((peer-id :ok|:error) ...) for the trusted peers.
(define
ev/federation-status
(fn
(trusted-ids ws we fetch)
(map
(fn (pid) (list pid (get (fetch pid ws we) :status)))
trusted-ids)))