;; lib/flow/remote.sx — distributed nodes via fed-sx (Phase 4). ;; ;; A node can execute on a federation peer. The transport is the fed-sx boundary; ;; it is MOCKED in tests by a peer registry mapping addr -> function table. In ;; production flow-transport would issue a fed-sx call; here it dispatches locally. ;; ;; (flow-peer-register! addr table) — register a mock peer. table is a list of ;; (fn-name proc) entries — the functions that peer exposes. ;; (flow-transport addr fn input) — invoke fn on the peer with input. Raises ;; (flow-remote-unreachable) if the addr is unknown, (flow-remote-no-fn) if the ;; peer does not expose fn. ;; (remote-node addr fn) — a node that runs fn on the peer at addr. ;; (remote-failover addrs fn local) — try fn on each peer in addrs in order; on a ;; raised error move to the next peer; if every peer fails, run the `local` ;; node as a fallback. ;; ;; Persistence across instances + handoff. Each instance runs the same flow ;; definitions, so the only thing that needs to cross the wire is the (plain-data) ;; store — exactly flow-store-export from store.sx. Replication pushes that export ;; to a peer's replica slot; handoff = restore the replica on the peer and resume. ;; ;; (flow-replicate-to addr) — copy this instance's store to peer addr's replica ;; (flow-restore-from addr) — import the replica from peer addr (#t / #f) ;; (flow-replica-get addr) — the raw replicated store at addr (or #f) (define flow-remote-src "(define flow-peers (list))\n (define (flow-assoc key alist)\n (if (null? alist)\n #f\n (if (eq? (car (car alist)) key) (car (cdr (car alist))) (flow-assoc key (cdr alist)))))\n (define (flow-peer-register! addr table) (set! flow-peers (cons (list addr table) flow-peers)))\n (define (flow-transport addr fn input)\n (let ((table (flow-assoc addr flow-peers)))\n (if table\n (let ((proc (flow-assoc fn table)))\n (if proc (proc input) (raise (quote flow-remote-no-fn))))\n (raise (quote flow-remote-unreachable)))))\n (define (remote-node addr fn) (lambda (input) (flow-transport addr fn input)))\n (define (flow-failover-step addrs fn input local)\n (if (null? addrs)\n (local input)\n (guard (e (#t (flow-failover-step (cdr addrs) fn input local)))\n (flow-transport (car addrs) fn input))))\n (define (remote-failover addrs fn local)\n (lambda (input) (flow-failover-step addrs fn input local)))\n\n (define flow-replicas (list))\n (define (flow-replicas-remove addr reps)\n (if (null? reps)\n (list)\n (if (eq? (car (car reps)) addr)\n (flow-replicas-remove addr (cdr reps))\n (cons (car reps) (flow-replicas-remove addr (cdr reps))))))\n (define (flow-replicate-to addr)\n (set! flow-replicas (cons (list addr (flow-store-export)) (flow-replicas-remove addr flow-replicas))))\n (define (flow-replica-get addr) (flow-assoc addr flow-replicas))\n (define (flow-restore-from addr)\n (let ((data (flow-replica-get addr)))\n (if data (begin (flow-store-import! data) #t) #f)))") (define flow-load-remote! (fn (env) (begin (scheme-eval-program (scheme-parse-all flow-remote-src) env) env)))