federation production layer: actor model + follower graph + delivery timer + signatures (LIVE)
The full fed-sx production layer, live-verified across A (blog.rose-ash.com) and B (sx_host_b).
ACTOR MODEL + FOLLOWER GRAPH: activities carry a real :actor (SX_ACTOR); delivery targets FOLLOWERS,
not a static peer list. A peer subscribes by POSTing {verb:follow, actor, base} to /inbox
(host/blog--add-follower!); B follows A at boot (SX_FOLLOW) so A delivers to B. host/blog--{actor,
self-base, followers, follow!, delivery-bases} + durable followers store.
BACKGROUND DELIVERY TIMER: serve.sh's detached _fed_delivery_loop hits GET /fed-tick every 15s
(over /dev/tcp) → re-follow (idempotent, recovers a target that was down at boot) + flush the durable
outbox. Federation is eventually-consistent, not best-effort-at-emit.
SIGNATURE VERIFICATION: every federated POST is signed (host/blog--fed-sign = dr/sess-sig shared-secret
MAC over the body, SX_FED_SECRET); /inbox rejects a bad/missing signature with 403 (empty secret =
open). Applies to both follows and activity delivery.
PUBLIC DOMAIN: B joins externalnet so Caddy CAN reverse_proxy a subdomain to it — the DNS + Caddy
route itself is external ops config (no local Caddyfile).
LIVE PROOF: B follows A (followers:1); publish on A → SIGNED delivery to follower B → B verifies +
fires validate+notify; a forged POST (bad x-fed-sig) → 403; B down → publish queues → the background
timer auto-delivers the backlog when B returns (no manual flush). blog 218/218, full conformance green.
Co-Authored-By: Claude Opus 4.8 <noreply@anthropic.com>
This commit is contained in:
@@ -132,7 +132,7 @@
|
||||
(let ((r (host/blog-get slug)))
|
||||
(if (nil? r) nil
|
||||
(let ((cid (host/blog-cid slug)))
|
||||
{:verb verb :actor "site"
|
||||
{:verb verb :actor host/blog--actor
|
||||
:object cid :object-type (host/blog--post-type slug)
|
||||
:slug slug :category (host/blog--post-category slug)
|
||||
:delta verb :id (str verb ":" cid)})))))
|
||||
@@ -143,7 +143,7 @@
|
||||
;; id would false-dedup across different edges on the same object.
|
||||
(define host/blog--relation-activity
|
||||
(fn (verb src kind dst)
|
||||
{:verb verb :actor "site"
|
||||
{:verb verb :actor host/blog--actor
|
||||
:object src :object-type (host/blog--post-type src)
|
||||
:relation kind :target dst
|
||||
:delta (str verb " " kind " " dst)
|
||||
@@ -188,9 +188,47 @@
|
||||
(define host/blog--runner-fleet (list host/flow--exec-runner))
|
||||
(define host/blog--add-runner! (fn (r) (set! host/blog--runner-fleet (concat host/blog--runner-fleet (list r)))))
|
||||
(define host/blog--kernel-base "")
|
||||
;; TA-live: peer base URLs — emitted activities federate to each peer's /inbox (serve-set from SX_PEERS).
|
||||
(define host/blog--peers (list))
|
||||
(define host/blog--set-peers! (fn (ps) (set! host/blog--peers ps)))
|
||||
;; ── the ACTOR MODEL — who WE are + who FOLLOWS us (fed-sx delivery is FOLLOWER-based, not a static
|
||||
;; peer list). An emitted activity carries our :actor and delivers to our followers' inboxes. A peer
|
||||
;; FOLLOWS us by POSTing a {verb:follow, actor, base} to our /inbox; we add it to our followers.
|
||||
(define host/blog--actor "site") ;; our actor id (serve-set from SX_ACTOR)
|
||||
(define host/blog--self-base "") ;; our base URL — followers POST to <base>/inbox (serve-set)
|
||||
(define host/blog--set-actor! (fn (a base) (begin (set! host/blog--actor a) (set! host/blog--self-base base))))
|
||||
(define host/blog--followers (list)) ;; [{actor, base}] — who follows us (we deliver here)
|
||||
(define host/blog--followers-key "followers")
|
||||
(define host/blog-load-followers!
|
||||
(fn () (let ((v (persist/backend-kv-get host/blog-store host/blog--followers-key)))
|
||||
(when (and v (= (type-of v) "list")) (set! host/blog--followers v)))))
|
||||
(define host/blog--follows? (fn (base) (some (fn (f) (= (get f "base") base)) host/blog--followers)))
|
||||
(define host/blog--add-follower!
|
||||
(fn (actor base)
|
||||
(when (and base (not (= base "")) (not (host/blog--follows? base)))
|
||||
(begin (set! host/blog--followers (concat host/blog--followers (list {"actor" actor "base" base})))
|
||||
(persist/backend-kv-put host/blog-store host/blog--followers-key host/blog--followers)))))
|
||||
(define host/blog--delivery-bases (fn () (map (fn (f) (get f "base")) host/blog--followers)))
|
||||
;; the actor we follow at boot / on each tick (serve-set from SX_FOLLOW; idempotent re-follow).
|
||||
(define host/blog--follow-target "")
|
||||
(define host/blog--set-follow-target! (fn (t) (set! host/blog--follow-target t)))
|
||||
;; ── federation SIGNATURE: every POST we send is signed (shared-secret MAC over the body); /inbox
|
||||
;; verifies before accepting. An empty secret disables verification (back-compat / open demo). ──
|
||||
(define host/blog--fed-secret "")
|
||||
(define host/blog--set-fed-secret! (fn (s) (set! host/blog--fed-secret s)))
|
||||
(define host/blog--fed-sign (fn (body) (if (= host/blog--fed-secret "") "" (dr/sess-sig host/blog--fed-secret body))))
|
||||
;; POST a wire to a peer's /inbox WITH a signature header (may raise; callers guard).
|
||||
(define host/blog--fed-post
|
||||
(fn (base wire)
|
||||
(http-request "POST" (str base "/inbox")
|
||||
{"content-type" "text/plain" "x-fed-sig" (host/blog--fed-sign wire)} wire)))
|
||||
;; verify an inbound POST's signature (accept if no secret configured, else require a matching MAC).
|
||||
(define host/blog--fed-verify?
|
||||
(fn (req body)
|
||||
(or (= host/blog--fed-secret "")
|
||||
(= (str (or (dream-header req "x-fed-sig") "")) (dr/sess-sig host/blog--fed-secret body)))))
|
||||
;; FOLLOW another actor: POST a follow to its /inbox announcing OUR actor + base, so it delivers to us.
|
||||
(define host/blog--follow!
|
||||
(fn (target-base)
|
||||
(guard (e (true false))
|
||||
(begin (host/blog--fed-post target-base (serialize {"verb" "follow" "actor" host/blog--actor "base" host/blog--self-base})) true))))
|
||||
;; per-type behavior declaration, stored on the type-post (string-keyed → persist-safe).
|
||||
(define host/blog--type-behavior (fn (type) (or (get (host/blog-get type) :behavior) (list))))
|
||||
(define host/blog--set-type-behavior!
|
||||
@@ -309,13 +347,13 @@
|
||||
(define host/blog--enqueue-outbox!
|
||||
(fn (a)
|
||||
(begin
|
||||
(for-each (fn (peer) (set! host/blog--outbox
|
||||
(concat host/blog--outbox (list {"peer" peer "wire" (host/ta--serialize a)}))))
|
||||
host/blog--peers)
|
||||
(for-each (fn (base) (set! host/blog--outbox
|
||||
(concat host/blog--outbox (list {"peer" base "wire" (host/ta--serialize a)}))))
|
||||
(host/blog--delivery-bases))
|
||||
(host/blog--outbox-persist!))))
|
||||
;; guarded delivery: POST the wire; a connection failure returns false (item kept), never raises.
|
||||
;; guarded, SIGNED delivery: POST the wire; a connection failure returns false (item kept), never raises.
|
||||
(define host/blog--try-deliver
|
||||
(fn (peer wire) (guard (e (true false)) (begin (host/ta--post peer wire) true))))
|
||||
(fn (peer wire) (guard (e (true false)) (begin (host/blog--fed-post peer wire) true))))
|
||||
;; deliver every pending item; KEEP the ones that failed (peer down) for the next retry.
|
||||
(define host/blog--flush-outbox!
|
||||
(fn ()
|
||||
@@ -2866,6 +2904,16 @@
|
||||
(persist/backend-kv-put host/blog-store host/blog--flowlog-key host/blog--flow-log)
|
||||
(host/blog--drop-pending! id)))
|
||||
r))))
|
||||
;; ── /fed-tick — the background federation worker (hit periodically by serve.sh's detached loop) ──
|
||||
;; Re-follows our target (idempotent — recovers if it was down at boot) and flushes the durable
|
||||
;; outbox (delivers any backlog to followers who are now up). This is the delivery TIMER.
|
||||
(define host/blog-fed-tick
|
||||
(fn (req)
|
||||
(begin
|
||||
(when (not (= host/blog--follow-target "")) (host/blog--follow! host/blog--follow-target))
|
||||
(host/blog--flush-outbox!)
|
||||
(host/ok {:outbox (len host/blog--outbox) :followers (len host/blog--followers)}))))
|
||||
|
||||
;; ── /flows — the behavior surface: what fired + what's SUSPENDED (RA-live). ?resume=<id> resumes. ─
|
||||
(define host/blog-flows
|
||||
(fn (req)
|
||||
@@ -2930,8 +2978,16 @@
|
||||
;; (host/blog--receive! is defined with emit! above — process-local only, no re-federation.)
|
||||
(define host/blog-inbox
|
||||
(fn (req)
|
||||
(let ((a (host/ta--deserialize (dream-body req))))
|
||||
(begin (host/blog--receive! a) (host/ok {:received (or (get a :id) "")})))))
|
||||
(let ((body (dream-body req)))
|
||||
(if (not (host/blog--fed-verify? req body))
|
||||
(host/error 403 "bad federation signature") ;; reject unsigned/forged POSTs
|
||||
(let ((w (parse-safe body)))
|
||||
(if (= (get w "verb") "follow")
|
||||
;; a FOLLOW: register the sender as a follower (deliver our activities to its inbox).
|
||||
(begin (host/blog--add-follower! (get w "actor") (get w "base")) (host/ok {:followed (get w "actor")}))
|
||||
;; a regular activity: process it (fires our behaviors on the peer's event).
|
||||
(let ((a (host/ta--wire->activity w)))
|
||||
(begin (host/blog--receive! a) (host/ok {:received (or (get a :id) "")})))))))))
|
||||
|
||||
;; ── routes ──────────────────────────────────────────────────────────
|
||||
;; Public reads + the create form. /, /posts, /new BEFORE /:slug (catch-all).
|
||||
@@ -2946,6 +3002,7 @@
|
||||
(dream-get "/meta" host/blog-meta-index)
|
||||
(dream-get "/workflow-demo" host/blog-workflow-demo)
|
||||
(dream-get "/flows" host/blog-flows)
|
||||
(dream-get "/fed-tick" host/blog-fed-tick)
|
||||
(dream-get "/activities" host/blog-activities)
|
||||
(dream-get "/:slug/source" host/blog-source)
|
||||
(dream-get "/:slug/relate-options" host/blog-relate-options)
|
||||
|
||||
@@ -49,6 +49,24 @@ _warm_serving_jit() {
|
||||
}
|
||||
_warm_serving_jit &
|
||||
|
||||
# TA-live: the background FEDERATION DELIVERY loop. Every 15s, hit /fed-tick (the container has no
|
||||
# curl, so speak HTTP over /dev/tcp), which re-follows our target (idempotent — recovers if it was
|
||||
# down at boot) and flushes the durable outbox (delivers backlog to followers who are now up). This
|
||||
# is the delivery TIMER — federation is eventually-consistent, not best-effort-at-emit-only.
|
||||
_fed_delivery_loop() {
|
||||
for _ in $(seq 1 240); do
|
||||
case "$( { exec 3<>/dev/tcp/127.0.0.1/"$PORT" && printf 'GET /health HTTP/1.0\r\nHost: x\r\n\r\n' >&3 && head -1 <&3; exec 3>&- 3<&-; } 2>/dev/null )" in
|
||||
*200*) break ;;
|
||||
esac
|
||||
sleep 0.5
|
||||
done
|
||||
while true; do
|
||||
{ exec 3<>/dev/tcp/127.0.0.1/"$PORT" && printf 'GET /fed-tick HTTP/1.0\r\nHost: x\r\n\r\n' >&3 && cat <&3 >/dev/null; exec 3>&- 3<&-; } 2>/dev/null
|
||||
sleep 15
|
||||
done
|
||||
}
|
||||
_fed_delivery_loop &
|
||||
|
||||
# Modules: every load line from conformance.sh's MODULES list, minus the ledger
|
||||
# (not needed to serve). server.sx supplies host/serve.
|
||||
MODULES=(
|
||||
@@ -220,15 +238,22 @@ EPOCH=1
|
||||
echo "(epoch $EPOCH)"
|
||||
echo "(eval \"(host/blog-load-pendinglog!)\")"
|
||||
EPOCH=$((EPOCH+1))
|
||||
# TA-live: federate emitted activities to peer /inbox endpoints (comma-separated SX_PEERS, e.g.
|
||||
# "http://sx_host_b:8000"). Empty by default (no federation). A peer that receives does NOT
|
||||
# re-federate, so an acyclic peer graph doesn't loop.
|
||||
PEERS_SX="(list"
|
||||
IFS=',' read -ra _PEER_ARR <<< "${SX_PEERS:-}"
|
||||
for _p in "${_PEER_ARR[@]:-}"; do [ -n "$_p" ] && PEERS_SX="$PEERS_SX \\\"$_p\\\""; done
|
||||
PEERS_SX="$PEERS_SX)"
|
||||
# TA-live ACTOR MODEL: our actor id (SX_ACTOR) + base URL (SX_SELF_URL — followers POST to
|
||||
# <base>/inbox). Load our persisted followers. If SX_FOLLOW is set, follow that actor's base at
|
||||
# boot (POST a follow to its /inbox), so it delivers ITS activities to us. Follower-based delivery
|
||||
# replaces the static peer list; the background loop re-follows (idempotent) if the target was down.
|
||||
echo "(epoch $EPOCH)"
|
||||
echo "(eval \"(host/blog--set-peers! $PEERS_SX)\")"
|
||||
echo "(eval \"(host/blog--set-actor! \\\"${SX_ACTOR:-site}\\\" \\\"${SX_SELF_URL:-}\\\")\")"
|
||||
EPOCH=$((EPOCH+1))
|
||||
echo "(epoch $EPOCH)"
|
||||
echo "(eval \"(host/blog-load-followers!)\")"
|
||||
EPOCH=$((EPOCH+1))
|
||||
echo "(epoch $EPOCH)"
|
||||
echo "(eval \"(host/blog--set-follow-target! \\\"${SX_FOLLOW:-}\\\")\")"
|
||||
EPOCH=$((EPOCH+1))
|
||||
# TA-live: the shared federation secret (peers sign every POST; /inbox verifies). Empty = open.
|
||||
echo "(epoch $EPOCH)"
|
||||
echo "(eval \"(host/blog--set-fed-secret! \\\"${SX_FED_SECRET:-}\\\")\")"
|
||||
EPOCH=$((EPOCH+1))
|
||||
# TA-live: rebuild the durable outbox + RETRY any deliveries that were pending from before a
|
||||
# restart (a peer that was down gets its backlog once it + we are back up).
|
||||
|
||||
Reference in New Issue
Block a user