agentic-sx Phase 4: durable — agent sessions as durable flow workflows (TDD)

Deterministic replay IS the durability mechanism: every transition re-runs a
self-contained flow program (defflow source + flow/start + replay of all
recorded resume values), so the only durable state is {:flow :input :resumes}
in persist kv — restart-safe by construction (fresh space handles over the
same backend resume mid-flight runs). fork-an-agent-run = copy the record;
the two replays diverge independently. Effects are data (suspend tags +
typed request envelopes surface as plain SX); transitions ride the Phase-3
trace buffer so session history travels with the next commit. Guest numeric
results compared with = per house convention. 43/43 (196/196 total).

Co-Authored-By: Claude Fable 5 <noreply@anthropic.com>
This commit is contained in:
2026-07-03 13:13:50 +00:00
parent b92095ccaf
commit c66ee35010
5 changed files with 523 additions and 5 deletions

View File

@@ -13,7 +13,7 @@ if [ ! -x "$SX_SERVER" ]; then
exit 1 exit 1
fi fi
SUITES=(schema branch trace) SUITES=(schema branch trace durable)
OUT_JSON="lib/agentic/scoreboard.json" OUT_JSON="lib/agentic/scoreboard.json"
OUT_MD="lib/agentic/scoreboard.md" OUT_MD="lib/agentic/scoreboard.md"

235
lib/agentic/durable.sx Normal file
View File

@@ -0,0 +1,235 @@
; lib/agentic/durable.sx — agentic-sx Phase 4: long agent sessions as
; DURABLE flow workflows. Deterministic replay IS the durability mechanism:
; every transition re-runs a self-contained flow program (the session's
; defflow source + flow/start + a replay of every recorded resume value),
; so the only durable state is a plain record {:flow :input :resumes ...}
; in the persist kv store — restart-safe by construction, and forking an
; agent run is literally copying the record (both replays then diverge).
; Effects are data: a suspended session exposes its suspend tag / typed
; (request kind payload) envelope to the host as plain SX values.
; Transitions also land in the agent's Phase-3 trace buffer, so the session
; history travels with the agent's next commit.
; Convention for session flows: suspend tags are quoted symbols, decision
; values are numbers/strings/lists of those (see agentic/scm-lit).
; Requires: lib/agentic/trace.sx (and its deps), lib/flow/* (+ scheme stack).
; ---- SX -> Scheme literal (numbers, strings, booleans, lists; nil = ()) ----
(define
agentic/scm-lit
(fn
(v)
(cond
((nil? v) "(list)")
((= v true) "#t")
((= v false) "#f")
((number? v) (str v))
((string? v) (str "\"" v "\""))
((list? v) (str "(list " (join " " (map agentic/scm-lit v)) ")"))
(else "(list)"))))
; ---- Scheme -> SX: unbox {:scm-string ...} recursively ----
(define
agentic/scm-out
(fn
(v)
(cond
((and (dict? v) (has-key? v :scm-string)) (get v :scm-string))
((list? v) (map agentic/scm-out v))
(else v))))
; ---- kv keys (namespaced under the repo prefix) ----
(define
agentic/session-def-key
(fn
(sp name)
(str (get (agentic/space-repo sp) :prefix) "/session-def/" name)))
(define
agentic/session-key
(fn
(sp agent)
(str (get (agentic/space-repo sp) :prefix) "/session/" agent)))
; ---- durable session flow definitions ----
(define
agentic/defsession!
(fn
(sp name scheme-src)
(begin
(persist/kv-put
(git/repo-db (agentic/space-repo sp))
(agentic/session-def-key sp name)
scheme-src)
name)))
(define
agentic/session-def
(fn
(sp name)
(persist/kv-get
(git/repo-db (agentic/space-repo sp))
(agentic/session-def-key sp name))))
(define
agentic/session-record
(fn
(sp agent)
(persist/kv-get
(git/repo-db (agentic/space-repo sp))
(agentic/session-key sp agent))))
; ---- one self-contained replay program per transition ----
; a fresh flow-run resets the flow store, so the started flow is always id 1
(define
agentic/session-program
(fn
(defs name input resumes)
(str
defs
"\n"
"(define s0 (flow/start "
name
" "
(agentic/scm-lit input)
"))\n"
(join
"\n"
(map
(fn (v) (str "(flow/resume 1 " (agentic/scm-lit v) ")"))
resumes))
"\n(list (flow/status 1) (flow/pending) (flow/result 1))")))
; replay the record, derive {:status :tag/:result}, persist record+state
(define
agentic/session-transition!
(fn
(sp agent record)
(let
((defs (agentic/session-def sp (get record :flow))))
(if
(nil? defs)
{:flow (get record :flow) :error "no-such-session-flow"}
(let
((out (flow-run (agentic/session-program defs (get record :flow) (get record :input) (get record :resumes)))))
(let
((status (agentic/scm-out (nth out 0)))
(pending (agentic/scm-out (nth out 1))))
(let
((state (cond ((= status "done") {:status "done" :result (agentic/scm-out (nth out 2))}) ((= status "suspended") {:tag (nth (nth pending 0) 1) :status "suspended"}) (else {:status status}))))
(begin
(persist/kv-put
(git/repo-db (agentic/space-repo sp))
(agentic/session-key sp agent)
(merge {:flow (get record :flow) :resumes (get record :resumes) :input (get record :input)} state))
state))))))))
; ---- lifecycle ----
(define
agentic/session-start!
(fn
(sp agent flow-name input)
(if
(nil? (agentic/head sp agent))
{:agent agent :error "no-such-agent"}
(let
((state (agentic/session-transition! sp agent {:flow flow-name :resumes (list) :input input})))
(begin
(if
(has-key? state :error)
nil
(agentic/trace! sp agent "session" (str "start " flow-name)))
state)))))
(define
agentic/session-resume!
(fn
(sp agent value)
(let
((rec (agentic/session-record sp agent)))
(cond
((nil? rec) {:agent agent :error "no-session"})
((not (= (get rec :status) "suspended")) {:agent agent :error "not-suspended"})
(else
(let
((state (agentic/session-transition! sp agent {:flow (get rec :flow) :resumes (append (get rec :resumes) (list value)) :input (get rec :input)})))
(begin
(if
(has-key? state :error)
nil
(agentic/trace!
sp
agent
"session"
(str "resume " (agentic/scm-lit value))))
state)))))))
(define
agentic/session-status
(fn
(sp agent)
(let
((r (agentic/session-record sp agent)))
(if (nil? r) "none" (get r :status)))))
(define
agentic/session-pending
(fn
(sp agent)
(let
((r (agentic/session-record sp agent)))
(if
(and (dict? r) (= (get r :status) "suspended"))
(get r :tag)
nil))))
(define
agentic/session-result
(fn
(sp agent)
(let
((r (agentic/session-record sp agent)))
(if (and (dict? r) (= (get r :status) "done")) (get r :result) nil))))
; ---- fork-an-agent-run: copy the record, replay rebuilds the run ----
; to-agent must already be spawned (branch fork) and session-free
(define
agentic/session-fork!
(fn
(sp from-agent to-agent)
(let
((rec (agentic/session-record sp from-agent)))
(cond
((nil? rec) {:agent from-agent :error "no-session"})
((nil? (agentic/head sp to-agent)) {:agent to-agent :error "no-such-agent"})
((not (nil? (agentic/session-record sp to-agent))) {:agent to-agent :error "session-exists"})
(else
(let
((state (agentic/session-transition! sp to-agent {:flow (get rec :flow) :resumes (get rec :resumes) :input (get rec :input)})))
(begin
(if
(has-key? state :error)
nil
(agentic/trace!
sp
to-agent
"session"
(str "fork " from-agent)))
state)))))))
; ---- effect-as-data helpers over (request kind payload) envelopes ----
(define
agentic/effect-request?
(fn
(tag)
(and
(list? tag)
(= (len tag) 3)
(= (nth tag 0) "flow-request"))))
(define
agentic/effect-kind
(fn (tag) (if (agentic/effect-request? tag) (nth tag 1) nil)))
(define
agentic/effect-payload
(fn (tag) (if (agentic/effect-request? tag) (nth tag 2) nil)))

View File

@@ -2,9 +2,10 @@
"suites": { "suites": {
"schema": {"pass": 65, "fail": 0}, "schema": {"pass": 65, "fail": 0},
"branch": {"pass": 53, "fail": 0}, "branch": {"pass": 53, "fail": 0},
"trace": {"pass": 35, "fail": 0} "trace": {"pass": 35, "fail": 0},
"durable": {"pass": 43, "fail": 0}
}, },
"total_pass": 153, "total_pass": 196,
"total_fail": 0, "total_fail": 0,
"total": 153 "total": 196
} }

View File

@@ -7,4 +7,5 @@ _Generated by `lib/agentic/conformance.sh`_
| schema | 65 | 0 | 65 | | schema | 65 | 0 | 65 |
| branch | 53 | 0 | 53 | | branch | 53 | 0 | 53 |
| trace | 35 | 0 | 35 | | trace | 35 | 0 | 35 |
| **Total** | **153** | **0** | **153** | | durable | 43 | 0 | 43 |
| **Total** | **196** | **0** | **196** |

View File

@@ -0,0 +1,281 @@
; Phase 4 — durable: agent sessions as durable flow workflows. Fixture story:
; worker-1 runs the two-suspend "triage" flow to completion; worker-1b proves
; restart-safety (fresh space handles over the same backend, resume across
; the restart); worker-2's mid-flight run is FORKED to worker-2b and the two
; replays diverge; worker-3 exercises typed (request kind payload) effects
; and the trace-buffer composition with Phase 3.
; NOTE: numbers computed inside the guest are compared with = (numeric
; equality), not equal? — guest numerics box differently at the boundary.
(define agd-db (persist/mem-backend))
(define agd-sp (agentic/space agd-db "agentic-durable-test"))
(define
agd-a
(agentic/spawn!
agd-sp
"worker-1"
(agentic/briefing "long task" "run a durable session" {})))
(define
agd-b
(agentic/spawn-from!
agd-sp
"worker-1b"
(agentic/briefing "second worker" "restart survivor" {})
"worker-1"))
(agentic/defsession!
agd-sp
"triage"
"(defflow triage (sequence (lambda (x) (+ x (suspend (quote ask-priority)))) (lambda (y) (* y (suspend (quote ask-factor))))))")
; ---- literals across the guest boundary ----
(agentic-test "scm-lit numbers" (agentic/scm-lit 42) "42")
(agentic-test "scm-lit strings" (agentic/scm-lit "hi") "\"hi\"")
(agentic-test
"scm-lit lists nest"
(agentic/scm-lit (list 1 "a"))
"(list 1 \"a\")")
(agentic-test
"scm-out unboxes scheme strings"
(agentic/scm-out {:scm-string "x"})
"x")
; ---- lifecycle: start / suspend / resume / done ----
(agentic-test
"session flow source is durable"
(starts-with? (agentic/session-def agd-sp "triage") "(defflow")
true)
(agentic-test
"no session before start"
(agentic/session-status agd-sp "worker-1")
"none")
(define agd-s1 (agentic/session-start! agd-sp "worker-1" "triage" 10))
(agentic-test
"start suspends at the first effect"
(get agd-s1 :status)
"suspended")
(agentic-test "the suspend tag is data" (get agd-s1 :tag) "ask-priority")
(agentic-test
"session-status tracks the suspension"
(agentic/session-status agd-sp "worker-1")
"suspended")
(agentic-test
"session-pending exposes the tag"
(agentic/session-pending agd-sp "worker-1")
"ask-priority")
(agentic-test
"start on unknown agent fails"
(get (agentic/session-start! agd-sp "ghost" "triage" 1) :error)
"no-such-agent")
(agentic-test
"start with unknown flow fails"
(get
(agentic/session-start! agd-sp "worker-1b" "frobnicate" 1)
:error)
"no-such-session-flow")
(agentic-test
"a failed start leaves no session"
(agentic/session-status agd-sp "worker-1b")
"none")
(define agd-s2 (agentic/session-resume! agd-sp "worker-1" 5))
(agentic-test
"resume replays to the next effect"
(get agd-s2 :tag)
"ask-factor")
(agentic-test
"resume on session-less agent fails"
(get (agentic/session-resume! agd-sp "worker-1b" 1) :error)
"no-session")
(define agd-s3 (agentic/session-resume! agd-sp "worker-1" 3))
(agentic-test
"final resume completes the session"
(get agd-s3 :status)
"done")
(agentic-test
"deterministic replay computes the result"
(= (get agd-s3 :result) 45)
true)
(agentic-test
"session-status done"
(agentic/session-status agd-sp "worker-1")
"done")
(agentic-test
"session-result reads back"
(= (agentic/session-result agd-sp "worker-1") 45)
true)
(agentic-test
"resume after done fails"
(get (agentic/session-resume! agd-sp "worker-1" 9) :error)
"not-suspended")
(agentic-test
"the record keeps the full replay history"
(=
(get (agentic/session-record agd-sp "worker-1") :resumes)
(list 5 3))
true)
; ---- restart: a fresh space handle over the same backend ----
(define agd-sp2 (agentic/space agd-db "agentic-durable-test"))
(agentic-test
"restart sees the finished session"
(agentic/session-status agd-sp2 "worker-1")
"done")
(agentic-test
"restart sees the result"
(= (agentic/session-result agd-sp2 "worker-1") 45)
true)
(define
agd-s4
(agentic/session-start! agd-sp "worker-1b" "triage" 100))
(define agd-sp3 (agentic/space agd-db "agentic-durable-test"))
(agentic-test
"restart mid-flight stays suspended"
(agentic/session-status agd-sp3 "worker-1b")
"suspended")
(agentic-test
"resume across the restart replays deterministically"
(get (agentic/session-resume! agd-sp3 "worker-1b" 2) :tag)
"ask-factor")
(agentic-test
"the resumed run completes across the restart"
(=
(get (agentic/session-resume! agd-sp3 "worker-1b" 7) :result)
714)
true)
; ---- fork-an-agent-run: copy the record, replays diverge ----
(define
agd-w2
(agentic/spawn!
agd-sp
"worker-2"
(agentic/briefing "explore" "mainline run" {})))
(define
agd-w2b
(agentic/spawn-from!
agd-sp
"worker-2b"
(agentic/briefing "explore alt" "forked run" {})
"worker-2"))
(define agd-f0 (agentic/session-start! agd-sp "worker-2" "triage" 10))
(define agd-f1 (agentic/session-resume! agd-sp "worker-2" 5))
(define agd-fork (agentic/session-fork! agd-sp "worker-2" "worker-2b"))
(agentic-test
"fork replays to the same suspended state"
(get agd-fork :tag)
"ask-factor")
(agentic-test
"forked session is live"
(agentic/session-status agd-sp "worker-2b")
"suspended")
(agentic-test
"forked history is copied"
(=
(get (agentic/session-record agd-sp "worker-2b") :resumes)
(list 5))
true)
(agentic-test
"mainline resumes its own way"
(=
(get (agentic/session-resume! agd-sp "worker-2" 3) :result)
45)
true)
(agentic-test
"fork diverges independently"
(=
(get (agentic/session-resume! agd-sp "worker-2b" 100) :result)
1500)
true)
(agentic-test
"the fork's divergence never touches the mainline"
(= (agentic/session-result agd-sp "worker-2") 45)
true)
(agentic-test
"fork needs an existing session"
(get (agentic/session-fork! agd-sp "worker-1x" "worker-2b") :error)
"no-session")
(agentic-test
"fork target must be spawned"
(get (agentic/session-fork! agd-sp "worker-2" "ghost") :error)
"no-such-agent")
(agentic-test
"fork refuses to clobber a session"
(get (agentic/session-fork! agd-sp "worker-2" "worker-2b") :error)
"session-exists")
; ---- typed effects: (request kind payload) envelopes as data ----
(agentic/defsession!
agd-sp
"review-loop"
"(defflow review-loop (sequence (lambda (x) (await-human (list (quote approve?) x))) (branch (lambda (d) (eq? d 1)) (flow-const (quote shipped)) (flow-const (quote parked)))))")
(define
agd-w3
(agentic/spawn!
agd-sp
"worker-3"
(agentic/briefing "ship it" "review then ship" {})))
(define
agd-r1
(agentic/session-start! agd-sp "worker-3" "review-loop" 7))
(agentic-test
"request effects are typed envelopes"
(agentic/effect-request? (get agd-r1 :tag))
true)
(agentic-test "effect kind" (agentic/effect-kind (get agd-r1 :tag)) "human")
(agentic-test
"effect payload"
(=
(agentic/effect-payload (get agd-r1 :tag))
(list "approve?" 7))
true)
(agentic-test
"plain tags are not request envelopes"
(agentic/effect-request? "ask-priority")
false)
(agentic-test
"the human decision resumes the session"
(get (agentic/session-resume! agd-sp "worker-3" 1) :result)
"shipped")
; ---- composition with Phase 3: transitions ride the trace buffer ----
(agentic-test
"session transitions land in the trace buffer"
(len
(filter
(fn (e) (= (get e :kind) "session"))
(agentic/trace-pending agd-sp "worker-3")))
2)
(define
agd-c
(agentic/commit-with-trace!
agd-sp
"worker-3"
"decision"
(assoc {} "ship.md" "approved\n")
{:message "shipped"}))
(agentic-test
"the session history travels with the commit"
(len (agentic/trace-entries (agentic/trace-for agd-sp (get agd-c :cid))))
2)
(agentic-test
"the bound trace records the session start"
(get
(nth
(agentic/trace-entries (agentic/trace-for agd-sp (get agd-c :cid)))
0)
:text)
"start review-loop")