TA-live: real A→B federation over HTTP + a durable outbox (LIVE-VERIFIED)

Step 3 — federation, live-verified with TWO real host instances.

- host/ta.sx: host/ta--post/make-http-wire/federate (POST a serialized activity to a peer's /inbox
  over real HTTP). host/blog.sx: POST /inbox (host/blog-inbox → receive! → process locally, does NOT
  re-federate — no loops).
- DURABLE OUTBOX (fed-sx reliability, after the user asked 'if B is down does it still work?'):
  emit! processes locally (always succeeds), QUEUES per-peer to a persisted outbox, delivers
  best-effort. A peer being DOWN no longer fails the publish — delivery is GUARDED (SX guard catches
  the http-request connection error), failed items stay queued and retry on next emit / on boot /
  manual /flows?flush=1. /flows shows the outbox depth.
- serve.sh: SX_PEERS → peers; boot load+flush of the outbox. docker-compose: a 2nd host sx_host_b
  (peer B, own store, no peers).

LIVE PROOF: (1) a peer POSTs create/article to blog.rose-ash.com/inbox → A fires validate+notify.
(2) publish on A → federates to B → B fires ITS behaviors on A's activity (B's /flows + /activities).
(3) RESILIENCE: publish with B DOWN → A returns 303 (was 500) + queues; start B + flush → B receives
the backlog + fires. blog 218/218 (+TA receive test), full host conformance green.

A = blog.rose-ash.com (public/Caddy); B = sx_host_b (internal docker DNS only, no public domain).

Co-Authored-By: Claude Opus 4.8 <noreply@anthropic.com>
This commit is contained in:
2026-07-02 19:47:07 +00:00
parent cb0d866002
commit afb9ce5e90
6 changed files with 155 additions and 11 deletions

View File

@@ -188,6 +188,9 @@
(define host/blog--runner-fleet (list host/flow--exec-runner))
(define host/blog--add-runner! (fn (r) (set! host/blog--runner-fleet (concat host/blog--runner-fleet (list r)))))
(define host/blog--kernel-base "")
;; TA-live: peer base URLs — emitted activities federate to each peer's /inbox (serve-set from SX_PEERS).
(define host/blog--peers (list))
(define host/blog--set-peers! (fn (ps) (set! host/blog--peers ps)))
;; per-type behavior declaration, stored on the type-post (string-keyed → persist-safe).
(define host/blog--type-behavior (fn (type) (or (get (host/blog-get type) :behavior) (list))))
(define host/blog--set-type-behavior!
@@ -287,13 +290,47 @@
(fn ()
(let ((v (persist/backend-kv-get host/blog-store host/blog--pendinglog-key)))
(when (and v (= (type-of v) "list")) (set! host/blog--pending-log v)))))
;; P2: EMIT any activity through the seam — LOGGED (event source) + matched (fires behaviors). A
;; durable runner that SUSPENDS records its kernel instance in the pending log for a later resume.
;; P2/TA-live: process an activity through the seam locally (fire behaviors + record suspensions).
;; Shared by emit! (our own state changes) and receive! (a peer's, arriving via /inbox).
(define host/blog--process-local!
(fn (a)
(let ((tr (behavior/process host/blog--publish-engine a)))
(begin (for-each (fn (s) (host/blog--record-pending! a s)) (get tr :suspended)) tr))))
;; ── TA-live: the durable OUTBOX (fed-sx reliability) ──────────────────
;; Emitted activities are QUEUED per-peer (durable) and delivered BEST-EFFORT. A peer being DOWN
;; does NOT fail the local emit — delivery is GUARDED, and a failed item stays queued for retry (on
;; the next emit + on boot). This is the ActivityPub/fed-sx model, vs the fragile direct POST.
(define host/blog--outbox (list)) ;; pending {peer, wire} deliveries
(define host/blog--outbox-key "outbox")
(define host/blog-load-outbox!
(fn () (let ((v (persist/backend-kv-get host/blog-store host/blog--outbox-key)))
(when (and v (= (type-of v) "list")) (set! host/blog--outbox v)))))
(define host/blog--outbox-persist! (fn () (persist/backend-kv-put host/blog-store host/blog--outbox-key host/blog--outbox)))
(define host/blog--enqueue-outbox!
(fn (a)
(begin
(for-each (fn (peer) (set! host/blog--outbox
(concat host/blog--outbox (list {"peer" peer "wire" (host/ta--serialize a)}))))
host/blog--peers)
(host/blog--outbox-persist!))))
;; guarded delivery: POST the wire; a connection failure returns false (item kept), never raises.
(define host/blog--try-deliver
(fn (peer wire) (guard (e (true false)) (begin (host/ta--post peer wire) true))))
;; deliver every pending item; KEEP the ones that failed (peer down) for the next retry.
(define host/blog--flush-outbox!
(fn ()
(begin
(set! host/blog--outbox
(filter (fn (item) (not (host/blog--try-deliver (get item "peer") (get item "wire")))) host/blog--outbox))
(host/blog--outbox-persist!))))
;; EMIT our own state change: process locally (ALWAYS succeeds), QUEUE to the outbox, best-effort flush.
(define host/blog--emit!
(fn (a)
(if (nil? a) nil
(let ((tr (behavior/process host/blog--publish-engine a)))
(begin (for-each (fn (s) (host/blog--record-pending! a s)) (get tr :suspended)) tr)))))
(let ((tr (host/blog--process-local! a)))
(begin (host/blog--enqueue-outbox! a) (host/blog--flush-outbox!) tr)))))
;; RECEIVE a peer's activity: process locally only — do NOT re-federate (avoids federation loops).
(define host/blog--receive! (fn (a) (if (nil? a) nil (host/blog--process-local! a))))
;; a slug's content CHANGE → the right verb: draft→published = Create (first publish); published→
;; published = Update (a subsequent edit). Draft↔draft emits nothing (unobservable). Fire-once on the
;; create transition; an identical re-edit dedups (same verb:cid id).
@@ -2835,11 +2872,15 @@
(let ((rid (dream-query-param req "resume")))
(begin
(when (and rid (not (= rid ""))) (host/blog--resume-pending! rid))
(when (dream-query-param req "flush") (host/blog--flush-outbox!))
(host/blog--resp req 200
(host/blog--page req "Flows"
(quasiquote
(div (h1 "Flows")
(p "Effect-as-data from behavior workflows — the seam: activity → DAG → runner → effects.")
(p :style "font-size:0.9em;color:#555"
(unquote (str "Federation outbox: " (len host/blog--outbox) " pending delivery(ies) ")) " "
(a :href "/flows?flush=1" "flush"))
(h3 :style "font-size:1em;margin:1em 0 0.3em" "Suspended (durable, on the kernel)")
(unquote
(if (= (len host/blog--pending-log) 0)
@@ -2881,11 +2922,23 @@
(unquote (str " — " (get a "delta"))))))
host/blog--activity-log))))))))))
;; ── TA-live: the federation INBOX ────────────────────────────────────
;; A peer POSTs a serialized activity here (fed-sx over HTTP); we deserialize it and run it through
;; OUR engine — so a REMOTE instance's state change fires THIS instance's behaviors (and logs as a
;; received event, and can suspend on our kernel). Public for the demo; prod verifies the peer's
;; signature before accepting. This is the receive side of TA — "everything works over fed-sx", live.
;; (host/blog--receive! is defined with emit! above — process-local only, no re-federation.)
(define host/blog-inbox
(fn (req)
(let ((a (host/ta--deserialize (dream-body req))))
(begin (host/blog--receive! a) (host/ok {:received (or (get a :id) "")})))))
;; ── routes ──────────────────────────────────────────────────────────
;; Public reads + the create form. /, /posts, /new BEFORE /:slug (catch-all).
;; MUST be mounted LAST in the app so domain routes (/feed, /health) win.
(define host/blog-routes
(list
(dream-post "/inbox" host/blog-inbox)
(dream-get "/" host/blog-home)
(dream-get "/posts" host/blog-index)
(dream-get "/new" host/blog-new-form)

View File

@@ -220,6 +220,24 @@ EPOCH=1
echo "(epoch $EPOCH)"
echo "(eval \"(host/blog-load-pendinglog!)\")"
EPOCH=$((EPOCH+1))
# TA-live: federate emitted activities to peer /inbox endpoints (comma-separated SX_PEERS, e.g.
# "http://sx_host_b:8000"). Empty by default (no federation). A peer that receives does NOT
# re-federate, so an acyclic peer graph doesn't loop.
PEERS_SX="(list"
IFS=',' read -ra _PEER_ARR <<< "${SX_PEERS:-}"
for _p in "${_PEER_ARR[@]:-}"; do [ -n "$_p" ] && PEERS_SX="$PEERS_SX \\\"$_p\\\""; done
PEERS_SX="$PEERS_SX)"
echo "(epoch $EPOCH)"
echo "(eval \"(host/blog--set-peers! $PEERS_SX)\")"
EPOCH=$((EPOCH+1))
# TA-live: rebuild the durable outbox + RETRY any deliveries that were pending from before a
# restart (a peer that was down gets its backlog once it + we are back up).
echo "(epoch $EPOCH)"
echo "(eval \"(host/blog-load-outbox!)\")"
EPOCH=$((EPOCH+1))
echo "(epoch $EPOCH)"
echo "(eval \"(host/blog--flush-outbox!)\")"
EPOCH=$((EPOCH+1))
# Seed a live demo of the composition fold (plans/composition-objects.md): /compose-demo
# is one composition object rendered by host/comp-render — renders differently by context.
echo "(epoch $EPOCH)"

View File

@@ -39,3 +39,17 @@
(let ((q (list)))
{:send (fn (s) (set! q (concat q (list s))))
:recv (fn () (let ((batch q)) (begin (set! q (list)) batch)))})))
;; TA-LIVE: an HTTP fed-wire — :send POSTs a serialized activity to a PEER's /inbox over real HTTP
;; (http-request, native primitive). :recv is unused: a peer's /inbox route pushes received
;; activities straight into its engine (host/blog--receive!), so delivery is push, not poll. This is
;; the fed-sx transport in production — an activity emitted here fires a REMOTE instance's behaviors.
;; POST a pre-serialized wire string to a peer's /inbox (may raise on connection failure — callers
;; that must not fail the local emit wrap this in a guard, per the durable-outbox pattern).
(define host/ta--post (fn (peer-base s) (http-request "POST" (str peer-base "/inbox") {"content-type" "text/plain"} s)))
(define host/ta--make-http-wire
(fn (peer-base)
{:send (fn (s) (host/ta--post peer-base s))
:recv (fn () (list))}))
;; serialize an activity + POST it to a peer (direct; the outbox path serializes-then-queues instead).
(define host/ta--federate (fn (peer-base a) (host/ta--post peer-base (host/ta--serialize a))))

View File

@@ -1243,6 +1243,13 @@
(begin (set! host/blog--activity-log (list)) (host/blog-load-activitylog!)
(list before (len host/blog--activity-log)))))
(list 1 1))
;; TA-live: a RECEIVED activity (a peer's, arriving via /inbox) fires OUR behaviors through the engine.
(host-bl-test "TA-live: a received create/article activity fires our on-create behavior"
(begin
(set! host/blog--flow-log (list))
(host/blog--receive! {:verb "create" :object-type "article" :category "urgent" :slug "remote1" :id "create:remote1"})
(map (fn (e) (get e "verb")) host/blog--flow-log))
(list "validate" "notify"))
;; P0.2: the publish WORKFLOW as an execute-fold DAG — branches on category, needs {effect,branch},
;; binds to the synchronous execute-fold runner (derived, not chosen).
(host-bl-test "publish-DAG: category branch (newsletter→digest) via the execute-fold"