host TA: the fed-sx transport adapter — federation loop proven at the seam
lib/host/ta.sx — a seam transport {:emit :deliver} over a DIRECTIONAL wire (out = outbox→followers,
in = inbox←follows). The transport is the SERIALIZATION boundary: activities cross the wire as
SX-source strings (host/ta--serialize/deserialize map the keyword-keyed activity ↔ a flat
string-keyed wire form of the P2 activity fields). host/ta--make-transport(out-wire, in-wire) +
host/ta--make-mem-wire (an in-memory directional queue for tests).
Proven (ta 5/5): content + relation activities round-trip through the wire; the FEDERATION LOOP —
instance A emits an activity → the wire carries it → instance B's behavior/pump delivers + processes
it → B's engine fires ITS behavior on A's activity; DIRECTIONAL (B re-emits to its own outbox, not
back into the inbox — no loop). 'Everything works over fed-sx', proven at the seam.
TA-live (deferred, same shape as RA-live): swap the mem-wire for the real next/ delivery wire —
needs a PERSISTENT next/ kernel (gen_servers don't survive across erlang-eval-ast calls) + the ACTOR
MODEL (peer_actors/follower_graph decide who the out-wire delivers to) + pushing /activities onto it.
Full host conformance green (+ta 5).
Co-Authored-By: Claude Opus 4.8 <noreply@anthropic.com>
This commit is contained in:
68
lib/host/tests/ta.sx
Normal file
68
lib/host/tests/ta.sx
Normal file
@@ -0,0 +1,68 @@
|
||||
;; lib/host/tests/ta.sx — TA the fed-sx transport adapter (lib/host/ta.sx). Proves the federation
|
||||
;; loop with an in-memory wire: an activity emitted on instance A crosses to instance B and fires
|
||||
;; B's engine. The REAL wire (next/ delivery) is TA-live (deferred, persistent-kernel prerequisite).
|
||||
|
||||
(define host-ta-pass 0)
|
||||
(define host-ta-fail 0)
|
||||
(define host-ta-fails (list))
|
||||
(define host-ta-test
|
||||
(fn (name actual expected)
|
||||
(if (= actual expected)
|
||||
(set! host-ta-pass (+ host-ta-pass 1))
|
||||
(begin (set! host-ta-fail (+ host-ta-fail 1))
|
||||
(append! host-ta-fails {:name name :actual actual :expected expected})))))
|
||||
|
||||
;; ── serialization: activities cross the wire as SX-source and round-trip ──
|
||||
(host-ta-test "a content activity round-trips through the wire (serialize → deserialize)"
|
||||
(let ((r (host/ta--deserialize (host/ta--serialize
|
||||
{:verb "create" :actor "alice" :object "cid1" :object-type "article"
|
||||
:slug "post1" :category "urgent" :delta "create" :id "create:cid1"}))))
|
||||
(list (get r :verb) (get r :object-type) (get r :slug) (get r :category) (get r :id)))
|
||||
(list "create" "article" "post1" "urgent" "create:cid1"))
|
||||
(host-ta-test "a relation activity round-trips (relation + target preserved)"
|
||||
(let ((r (host/ta--deserialize (host/ta--serialize
|
||||
{:verb "add" :actor "site" :object "p1" :object-type "article"
|
||||
:relation "tagged" :target "urgent" :delta "add tagged urgent" :id "add:p1:tagged:urgent"}))))
|
||||
(list (get r :verb) (get r :relation) (get r :target) (get r :id)))
|
||||
(list "add" "tagged" "urgent" "add:p1:tagged:urgent"))
|
||||
|
||||
;; ── the in-memory wire accumulates + drains ──
|
||||
(host-ta-test "mem-wire: send accumulates, recv drains"
|
||||
(let ((w (host/ta--make-mem-wire)))
|
||||
(begin ((get w :send) "a") ((get w :send) "b")
|
||||
(let ((batch ((get w :recv)))) (list (len batch) (len ((get w :recv)))))))
|
||||
(list 2 0))
|
||||
|
||||
;; ── the federation LOOP: A emits → wire → B pump → B's behavior fires on A's activity ──
|
||||
(define ta-wire-ab (host/ta--make-mem-wire)) ;; A's outbox → B's inbox
|
||||
(define ta-sink (host/ta--make-mem-wire)) ;; B's outbox (its own followers; ignored here)
|
||||
(define ta-empty (host/ta--make-mem-wire)) ;; A's inbox (empty)
|
||||
(define ta-A (host/ta--make-transport ta-wire-ab ta-empty))
|
||||
(define ta-fired (list))
|
||||
(define ta-B-engine
|
||||
(behavior/make-engine
|
||||
{:triggers {:register! (fn (s d h) nil)
|
||||
:match (fn (a) (if (= (get a :verb) "create") (list {:dag "peer-dag"}) (list)))}
|
||||
:runner {:capabilities (list)
|
||||
:run (fn (dag env) {:status "done"
|
||||
:effects (list {:kind "peer-fired" :for (get (get env :activity) :id)})})}
|
||||
:transport (host/ta--make-transport ta-sink ta-wire-ab) ;; B delivers from wire-ab, emits to sink
|
||||
:driver {:dispatch (fn (eff) (begin (set! ta-fired (concat ta-fired (list eff))) (list)))}}))
|
||||
(host-ta-test "federation loop: A emits → wire → B pump → B's behavior fires on A's activity"
|
||||
(begin
|
||||
(set! ta-fired (list))
|
||||
((get ta-A :emit) {:verb "create" :actor "alice" :object "cid1" :object-type "article" :id "create:cid1"})
|
||||
(let ((trace (behavior/pump ta-B-engine)))
|
||||
(list (len (get trace :emitted)) (get (first ta-fired) :kind) (get (first ta-fired) :for))))
|
||||
(list 1 "peer-fired" "create:cid1"))
|
||||
(host-ta-test "directional: B re-emits to its OWN outbox, not back into the inbox (no loop)"
|
||||
(begin
|
||||
((get ta-A :emit) {:verb "create" :actor "a" :object "c2" :object-type "article" :id "create:c2"})
|
||||
(behavior/pump ta-B-engine) ;; delivers + fires; re-emit → sink
|
||||
(len (get (behavior/pump ta-B-engine) :emitted))) ;; wire-ab now empty → nothing
|
||||
0)
|
||||
|
||||
(define host-ta-tests-run!
|
||||
(fn ()
|
||||
{:total (+ host-ta-pass host-ta-fail)
|
||||
:passed host-ta-pass :failed host-ta-fail :fails host-ta-fails}))
|
||||
Reference in New Issue
Block a user