diff --git a/docker-compose.dev-sx-host.yml b/docker-compose.dev-sx-host.yml index c3a47f06..f6fadb57 100644 --- a/docker-compose.dev-sx-host.yml +++ b/docker-compose.dev-sx-host.yml @@ -34,10 +34,13 @@ services: # 5.4x faster (1m43s -> 19s). Default-OFF gate, opt in here. SX_SERVING_JIT: "1" OCAMLRUNPARAM: "b" + # TA-live: federate emitted activities to peer B's /inbox (real fed-sx over HTTP). + SX_PEERS: "http://sx_host_b:8000" volumes: # SX source (hot-reload on container restart) - ./spec:/app/spec:ro - ./lib:/app/lib:ro + - ./next:/app/next:ro - ./web:/app/web:ro # Client assets for the blog SPA: the WASM OCaml kernel + sx-platform + the # web-stack modules, served by lib/host/static.sx at /static/**. @@ -76,6 +79,38 @@ services: - default restart: unless-stopped + # A second host instance — a federation PEER (B). Host A federates its emitted activities to B's + # /inbox; B's engine fires ITS OWN behaviors on A's state changes ("everything works over fed-sx"). + # B has its own durable store + no peers (receives without re-federating). Reached on the default + # network only (not exposed via Caddy). + sx_host_b: + image: registry.rose-ash.com:5000/sx_docs:latest + container_name: sx-dev-sx_host_b-1 + entrypoint: ["bash", "/app/lib/host/serve.sh"] + working_dir: /app + environment: + SX_PROJECT_DIR: /app + SX_SERVER: /app/bin/sx_server + HOST_PORT: "8000" + SX_HTTP_HOST: "0.0.0.0" + SX_PERSIST_DIR: /data/persist + SX_ADMIN_USER: admin + SX_ADMIN_PASSWORD: "sx-host-b-camper-2026" + SX_SESSION_SECRET: "ta-host-b-sess-9d2e1f" + SX_SERVING_JIT: "1" + OCAMLRUNPARAM: "b" + volumes: + - ./spec:/app/spec:ro + - ./lib:/app/lib:ro + - ./next:/app/next:ro + - ./web:/app/web:ro + - ./shared/static:/app/shared/static:ro + - ./hosts/ocaml/_build/default/bin/sx_server.exe:/app/bin/sx_server:ro + - /root/sx-host-b-persist:/data/persist + networks: + - default + restart: unless-stopped + networks: externalnet: external: true diff --git a/lib/host/blog.sx b/lib/host/blog.sx index ebb373e3..9dbf4e0b 100644 --- a/lib/host/blog.sx +++ b/lib/host/blog.sx @@ -188,6 +188,9 @@ (define host/blog--runner-fleet (list host/flow--exec-runner)) (define host/blog--add-runner! (fn (r) (set! host/blog--runner-fleet (concat host/blog--runner-fleet (list r))))) (define host/blog--kernel-base "") +;; TA-live: peer base URLs — emitted activities federate to each peer's /inbox (serve-set from SX_PEERS). +(define host/blog--peers (list)) +(define host/blog--set-peers! (fn (ps) (set! host/blog--peers ps))) ;; per-type behavior declaration, stored on the type-post (string-keyed → persist-safe). (define host/blog--type-behavior (fn (type) (or (get (host/blog-get type) :behavior) (list)))) (define host/blog--set-type-behavior! @@ -287,13 +290,47 @@ (fn () (let ((v (persist/backend-kv-get host/blog-store host/blog--pendinglog-key))) (when (and v (= (type-of v) "list")) (set! host/blog--pending-log v))))) -;; P2: EMIT any activity through the seam — LOGGED (event source) + matched (fires behaviors). A -;; durable runner that SUSPENDS records its kernel instance in the pending log for a later resume. +;; P2/TA-live: process an activity through the seam locally (fire behaviors + record suspensions). +;; Shared by emit! (our own state changes) and receive! (a peer's, arriving via /inbox). +(define host/blog--process-local! + (fn (a) + (let ((tr (behavior/process host/blog--publish-engine a))) + (begin (for-each (fn (s) (host/blog--record-pending! a s)) (get tr :suspended)) tr)))) +;; ── TA-live: the durable OUTBOX (fed-sx reliability) ────────────────── +;; Emitted activities are QUEUED per-peer (durable) and delivered BEST-EFFORT. A peer being DOWN +;; does NOT fail the local emit — delivery is GUARDED, and a failed item stays queued for retry (on +;; the next emit + on boot). This is the ActivityPub/fed-sx model, vs the fragile direct POST. +(define host/blog--outbox (list)) ;; pending {peer, wire} deliveries +(define host/blog--outbox-key "outbox") +(define host/blog-load-outbox! + (fn () (let ((v (persist/backend-kv-get host/blog-store host/blog--outbox-key))) + (when (and v (= (type-of v) "list")) (set! host/blog--outbox v))))) +(define host/blog--outbox-persist! (fn () (persist/backend-kv-put host/blog-store host/blog--outbox-key host/blog--outbox))) +(define host/blog--enqueue-outbox! + (fn (a) + (begin + (for-each (fn (peer) (set! host/blog--outbox + (concat host/blog--outbox (list {"peer" peer "wire" (host/ta--serialize a)})))) + host/blog--peers) + (host/blog--outbox-persist!)))) +;; guarded delivery: POST the wire; a connection failure returns false (item kept), never raises. +(define host/blog--try-deliver + (fn (peer wire) (guard (e (true false)) (begin (host/ta--post peer wire) true)))) +;; deliver every pending item; KEEP the ones that failed (peer down) for the next retry. +(define host/blog--flush-outbox! + (fn () + (begin + (set! host/blog--outbox + (filter (fn (item) (not (host/blog--try-deliver (get item "peer") (get item "wire")))) host/blog--outbox)) + (host/blog--outbox-persist!)))) +;; EMIT our own state change: process locally (ALWAYS succeeds), QUEUE to the outbox, best-effort flush. (define host/blog--emit! (fn (a) (if (nil? a) nil - (let ((tr (behavior/process host/blog--publish-engine a))) - (begin (for-each (fn (s) (host/blog--record-pending! a s)) (get tr :suspended)) tr))))) + (let ((tr (host/blog--process-local! a))) + (begin (host/blog--enqueue-outbox! a) (host/blog--flush-outbox!) tr))))) +;; RECEIVE a peer's activity: process locally only — do NOT re-federate (avoids federation loops). +(define host/blog--receive! (fn (a) (if (nil? a) nil (host/blog--process-local! a)))) ;; a slug's content CHANGE → the right verb: draft→published = Create (first publish); published→ ;; published = Update (a subsequent edit). Draft↔draft emits nothing (unobservable). Fire-once on the ;; create transition; an identical re-edit dedups (same verb:cid id). @@ -2835,11 +2872,15 @@ (let ((rid (dream-query-param req "resume"))) (begin (when (and rid (not (= rid ""))) (host/blog--resume-pending! rid)) + (when (dream-query-param req "flush") (host/blog--flush-outbox!)) (host/blog--resp req 200 (host/blog--page req "Flows" (quasiquote (div (h1 "Flows") (p "Effect-as-data from behavior workflows — the seam: activity → DAG → runner → effects.") + (p :style "font-size:0.9em;color:#555" + (unquote (str "Federation outbox: " (len host/blog--outbox) " pending delivery(ies) ")) " " + (a :href "/flows?flush=1" "flush")) (h3 :style "font-size:1em;margin:1em 0 0.3em" "Suspended (durable, on the kernel)") (unquote (if (= (len host/blog--pending-log) 0) @@ -2881,11 +2922,23 @@ (unquote (str " — " (get a "delta")))))) host/blog--activity-log)))))))))) +;; ── TA-live: the federation INBOX ──────────────────────────────────── +;; A peer POSTs a serialized activity here (fed-sx over HTTP); we deserialize it and run it through +;; OUR engine — so a REMOTE instance's state change fires THIS instance's behaviors (and logs as a +;; received event, and can suspend on our kernel). Public for the demo; prod verifies the peer's +;; signature before accepting. This is the receive side of TA — "everything works over fed-sx", live. +;; (host/blog--receive! is defined with emit! above — process-local only, no re-federation.) +(define host/blog-inbox + (fn (req) + (let ((a (host/ta--deserialize (dream-body req)))) + (begin (host/blog--receive! a) (host/ok {:received (or (get a :id) "")}))))) + ;; ── routes ────────────────────────────────────────────────────────── ;; Public reads + the create form. /, /posts, /new BEFORE /:slug (catch-all). ;; MUST be mounted LAST in the app so domain routes (/feed, /health) win. (define host/blog-routes (list + (dream-post "/inbox" host/blog-inbox) (dream-get "/" host/blog-home) (dream-get "/posts" host/blog-index) (dream-get "/new" host/blog-new-form) diff --git a/lib/host/serve.sh b/lib/host/serve.sh index 5235d94e..0b18c9ea 100755 --- a/lib/host/serve.sh +++ b/lib/host/serve.sh @@ -220,6 +220,24 @@ EPOCH=1 echo "(epoch $EPOCH)" echo "(eval \"(host/blog-load-pendinglog!)\")" EPOCH=$((EPOCH+1)) + # TA-live: federate emitted activities to peer /inbox endpoints (comma-separated SX_PEERS, e.g. + # "http://sx_host_b:8000"). Empty by default (no federation). A peer that receives does NOT + # re-federate, so an acyclic peer graph doesn't loop. + PEERS_SX="(list" + IFS=',' read -ra _PEER_ARR <<< "${SX_PEERS:-}" + for _p in "${_PEER_ARR[@]:-}"; do [ -n "$_p" ] && PEERS_SX="$PEERS_SX \\\"$_p\\\""; done + PEERS_SX="$PEERS_SX)" + echo "(epoch $EPOCH)" + echo "(eval \"(host/blog--set-peers! $PEERS_SX)\")" + EPOCH=$((EPOCH+1)) + # TA-live: rebuild the durable outbox + RETRY any deliveries that were pending from before a + # restart (a peer that was down gets its backlog once it + we are back up). + echo "(epoch $EPOCH)" + echo "(eval \"(host/blog-load-outbox!)\")" + EPOCH=$((EPOCH+1)) + echo "(epoch $EPOCH)" + echo "(eval \"(host/blog--flush-outbox!)\")" + EPOCH=$((EPOCH+1)) # Seed a live demo of the composition fold (plans/composition-objects.md): /compose-demo # is one composition object rendered by host/comp-render — renders differently by context. echo "(epoch $EPOCH)" diff --git a/lib/host/ta.sx b/lib/host/ta.sx index 1689e688..ec49f586 100644 --- a/lib/host/ta.sx +++ b/lib/host/ta.sx @@ -39,3 +39,17 @@ (let ((q (list))) {:send (fn (s) (set! q (concat q (list s)))) :recv (fn () (let ((batch q)) (begin (set! q (list)) batch)))}))) + +;; TA-LIVE: an HTTP fed-wire — :send POSTs a serialized activity to a PEER's /inbox over real HTTP +;; (http-request, native primitive). :recv is unused: a peer's /inbox route pushes received +;; activities straight into its engine (host/blog--receive!), so delivery is push, not poll. This is +;; the fed-sx transport in production — an activity emitted here fires a REMOTE instance's behaviors. +;; POST a pre-serialized wire string to a peer's /inbox (may raise on connection failure — callers +;; that must not fail the local emit wrap this in a guard, per the durable-outbox pattern). +(define host/ta--post (fn (peer-base s) (http-request "POST" (str peer-base "/inbox") {"content-type" "text/plain"} s))) +(define host/ta--make-http-wire + (fn (peer-base) + {:send (fn (s) (host/ta--post peer-base s)) + :recv (fn () (list))})) +;; serialize an activity + POST it to a peer (direct; the outbox path serializes-then-queues instead). +(define host/ta--federate (fn (peer-base a) (host/ta--post peer-base (host/ta--serialize a)))) diff --git a/lib/host/tests/blog.sx b/lib/host/tests/blog.sx index bfe1bced..510ee620 100644 --- a/lib/host/tests/blog.sx +++ b/lib/host/tests/blog.sx @@ -1243,6 +1243,13 @@ (begin (set! host/blog--activity-log (list)) (host/blog-load-activitylog!) (list before (len host/blog--activity-log))))) (list 1 1)) +;; TA-live: a RECEIVED activity (a peer's, arriving via /inbox) fires OUR behaviors through the engine. +(host-bl-test "TA-live: a received create/article activity fires our on-create behavior" + (begin + (set! host/blog--flow-log (list)) + (host/blog--receive! {:verb "create" :object-type "article" :category "urgent" :slug "remote1" :id "create:remote1"}) + (map (fn (e) (get e "verb")) host/blog--flow-log)) + (list "validate" "notify")) ;; P0.2: the publish WORKFLOW as an execute-fold DAG — branches on category, needs {effect,branch}, ;; binds to the synchronous execute-fold runner (derived, not chosen). (host-bl-test "publish-DAG: category branch (newsletter→digest) via the execute-fold" diff --git a/plans/business-logic-fed-flows.md b/plans/business-logic-fed-flows.md index 548e5af7..ca299610 100644 --- a/plans/business-logic-fed-flows.md +++ b/plans/business-logic-fed-flows.md @@ -290,13 +290,22 @@ the flow instance Id is the resume handle. behavior/pump delivers + processes it → B's engine fires ITS behavior on A's activity; DIRECTIONAL (B re-emits to its own outbox, not back into the inbox — no loop). This is "everything works over fed-sx" proven at the seam. Full host conformance green. -- [ ] **TA-LIVE (deferred — same shape as RA-live).** Swap the mem-wire for the REAL next/ delivery - wire (outbox → http_server → peer inbox). Needs: (a) a PERSISTENT next/ kernel process (gen_servers - don't survive across erlang-eval-ast calls — the RA-live finding; next/ outbox/http_server are the - persistent side); (b) the ACTOR MODEL real (:actor is a "site" placeholder — peer_actors / - follower_graph / per-author identity decide WHO the out-wire delivers to); (c) push /activities - (the P2 event source) onto the out-wire. RISK: next/ delivery M2 blockers (er-scheduler context). - The transport contract + serialization + the loop are proven; TA-live is the wire impl + placement. +- [x] **TA-LIVE DONE + LIVE-VERIFIED 2026-07-02 (real two-instance A→B federation over HTTP).** The + wire is the HOST's own http-request (not next/ delivery — simpler + already persistent). host/ta-- + {post, make-http-wire, federate}; the host gains a POST /inbox (host/blog-inbox → host/blog--receive! + → process locally, does NOT re-federate). A DURABLE OUTBOX (host/blog--outbox, persisted) gives fed- + sx RELIABILITY: emit! processes locally (always succeeds), QUEUES per-peer, and delivers best-effort + — a peer being DOWN does not fail the local publish (delivery is guarded; failed items stay queued + + retry on next emit / on boot / manual /flows?flush=1). serve.sh: SX_PEERS → host/blog--set-peers!, + boot load+flush. docker-compose: a 2nd host `sx_host_b` (its own store, no peers) as peer B. + LIVE PROOF: (1) a peer POSTs a create/article to blog.rose-ash.com/inbox → A fires validate+notify. + (2) publish on A → federates to B → B's /flows fires validate+notify on A's activity; B's /activities + shows the received create. (3) RESILIENCE — publish with B DOWN → A returns 303 (was 500), activity + queued; start B + flush → B receives the backlog + fires. blog 218/218, conformance green. + NOTE on placement/domains: A = blog.rose-ash.com (Caddy/externalnet); B = sx_host_b, internal-only + (docker DNS, no public domain) — a real peer would get its own Caddy subdomain. FUTURE: the actor + model (:actor "site" placeholder → follower_graph decides WHO to deliver to); a background delivery + loop (currently retry is opportunistic on emit/boot/flush, not a timer); signature verification on /inbox. ## AX — artdag GROWS control-flow (business logic MIGRATES to artdag) [DEMAND-DRIVEN] Today artdag is pure dataflow and the execute-fold is the synchronous control-flow runner. That's @@ -325,6 +334,14 @@ covers everything until a DAG's cost/latency/placement forces the substrate. activities), so business logic can change state, which federates, which triggers more flows. ## Progress log (newest first) +- 2026-07-02 — RA-LIVE + TA-LIVE DONE + LIVE-VERIFIED. (1) sx_kernel container (durable-execution + service) deployed; the host's RA kernel-runner drives it over HTTP — editing a newsletter article → + durable Update → kernel SUSPENDS (pending) → /flows?resume → done. (2) TA federation: host POST + /inbox receives peers' activities + fires; a 2nd instance sx_host_b (peer B) — publish on A → + federates to B → B fires ITS behaviors on A's activity. (3) DURABLE OUTBOX for fed-sx reliability + (user-driven): B down → A's publish still succeeds (303, was 500) + queues; B up + flush → backlog + delivers. The whole distributed half is LIVE. Remaining polish: actor model, background delivery + timer, /inbox signature verify, expose B on a public domain. - 2026-07-02 — the REAL KERNEL SERVICE built (next/kernel/host_kernel.erl + serve.sh + tests/ host_kernel.sh, 4/4 over HTTP). A persistent durable-execution service: flow_store + named-flow registry, parameterised flow routes (GET /flow/start/ → ":", GET