Files
rose-ash/lib/agentic/durable.sx
giles c66ee35010 agentic-sx Phase 4: durable — agent sessions as durable flow workflows (TDD)
Deterministic replay IS the durability mechanism: every transition re-runs a
self-contained flow program (defflow source + flow/start + replay of all
recorded resume values), so the only durable state is {:flow :input :resumes}
in persist kv — restart-safe by construction (fresh space handles over the
same backend resume mid-flight runs). fork-an-agent-run = copy the record;
the two replays diverge independently. Effects are data (suspend tags +
typed request envelopes surface as plain SX); transitions ride the Phase-3
trace buffer so session history travels with the next commit. Guest numeric
results compared with = per house convention. 43/43 (196/196 total).

Co-Authored-By: Claude Fable 5 <noreply@anthropic.com>
2026-07-03 13:13:50 +00:00

235 lines
7.1 KiB
Plaintext

; lib/agentic/durable.sx — agentic-sx Phase 4: long agent sessions as
; DURABLE flow workflows. Deterministic replay IS the durability mechanism:
; every transition re-runs a self-contained flow program (the session's
; defflow source + flow/start + a replay of every recorded resume value),
; so the only durable state is a plain record {:flow :input :resumes ...}
; in the persist kv store — restart-safe by construction, and forking an
; agent run is literally copying the record (both replays then diverge).
; Effects are data: a suspended session exposes its suspend tag / typed
; (request kind payload) envelope to the host as plain SX values.
; Transitions also land in the agent's Phase-3 trace buffer, so the session
; history travels with the agent's next commit.
; Convention for session flows: suspend tags are quoted symbols, decision
; values are numbers/strings/lists of those (see agentic/scm-lit).
; Requires: lib/agentic/trace.sx (and its deps), lib/flow/* (+ scheme stack).
; ---- SX -> Scheme literal (numbers, strings, booleans, lists; nil = ()) ----
(define
agentic/scm-lit
(fn
(v)
(cond
((nil? v) "(list)")
((= v true) "#t")
((= v false) "#f")
((number? v) (str v))
((string? v) (str "\"" v "\""))
((list? v) (str "(list " (join " " (map agentic/scm-lit v)) ")"))
(else "(list)"))))
; ---- Scheme -> SX: unbox {:scm-string ...} recursively ----
(define
agentic/scm-out
(fn
(v)
(cond
((and (dict? v) (has-key? v :scm-string)) (get v :scm-string))
((list? v) (map agentic/scm-out v))
(else v))))
; ---- kv keys (namespaced under the repo prefix) ----
(define
agentic/session-def-key
(fn
(sp name)
(str (get (agentic/space-repo sp) :prefix) "/session-def/" name)))
(define
agentic/session-key
(fn
(sp agent)
(str (get (agentic/space-repo sp) :prefix) "/session/" agent)))
; ---- durable session flow definitions ----
(define
agentic/defsession!
(fn
(sp name scheme-src)
(begin
(persist/kv-put
(git/repo-db (agentic/space-repo sp))
(agentic/session-def-key sp name)
scheme-src)
name)))
(define
agentic/session-def
(fn
(sp name)
(persist/kv-get
(git/repo-db (agentic/space-repo sp))
(agentic/session-def-key sp name))))
(define
agentic/session-record
(fn
(sp agent)
(persist/kv-get
(git/repo-db (agentic/space-repo sp))
(agentic/session-key sp agent))))
; ---- one self-contained replay program per transition ----
; a fresh flow-run resets the flow store, so the started flow is always id 1
(define
agentic/session-program
(fn
(defs name input resumes)
(str
defs
"\n"
"(define s0 (flow/start "
name
" "
(agentic/scm-lit input)
"))\n"
(join
"\n"
(map
(fn (v) (str "(flow/resume 1 " (agentic/scm-lit v) ")"))
resumes))
"\n(list (flow/status 1) (flow/pending) (flow/result 1))")))
; replay the record, derive {:status :tag/:result}, persist record+state
(define
agentic/session-transition!
(fn
(sp agent record)
(let
((defs (agentic/session-def sp (get record :flow))))
(if
(nil? defs)
{:flow (get record :flow) :error "no-such-session-flow"}
(let
((out (flow-run (agentic/session-program defs (get record :flow) (get record :input) (get record :resumes)))))
(let
((status (agentic/scm-out (nth out 0)))
(pending (agentic/scm-out (nth out 1))))
(let
((state (cond ((= status "done") {:status "done" :result (agentic/scm-out (nth out 2))}) ((= status "suspended") {:tag (nth (nth pending 0) 1) :status "suspended"}) (else {:status status}))))
(begin
(persist/kv-put
(git/repo-db (agentic/space-repo sp))
(agentic/session-key sp agent)
(merge {:flow (get record :flow) :resumes (get record :resumes) :input (get record :input)} state))
state))))))))
; ---- lifecycle ----
(define
agentic/session-start!
(fn
(sp agent flow-name input)
(if
(nil? (agentic/head sp agent))
{:agent agent :error "no-such-agent"}
(let
((state (agentic/session-transition! sp agent {:flow flow-name :resumes (list) :input input})))
(begin
(if
(has-key? state :error)
nil
(agentic/trace! sp agent "session" (str "start " flow-name)))
state)))))
(define
agentic/session-resume!
(fn
(sp agent value)
(let
((rec (agentic/session-record sp agent)))
(cond
((nil? rec) {:agent agent :error "no-session"})
((not (= (get rec :status) "suspended")) {:agent agent :error "not-suspended"})
(else
(let
((state (agentic/session-transition! sp agent {:flow (get rec :flow) :resumes (append (get rec :resumes) (list value)) :input (get rec :input)})))
(begin
(if
(has-key? state :error)
nil
(agentic/trace!
sp
agent
"session"
(str "resume " (agentic/scm-lit value))))
state)))))))
(define
agentic/session-status
(fn
(sp agent)
(let
((r (agentic/session-record sp agent)))
(if (nil? r) "none" (get r :status)))))
(define
agentic/session-pending
(fn
(sp agent)
(let
((r (agentic/session-record sp agent)))
(if
(and (dict? r) (= (get r :status) "suspended"))
(get r :tag)
nil))))
(define
agentic/session-result
(fn
(sp agent)
(let
((r (agentic/session-record sp agent)))
(if (and (dict? r) (= (get r :status) "done")) (get r :result) nil))))
; ---- fork-an-agent-run: copy the record, replay rebuilds the run ----
; to-agent must already be spawned (branch fork) and session-free
(define
agentic/session-fork!
(fn
(sp from-agent to-agent)
(let
((rec (agentic/session-record sp from-agent)))
(cond
((nil? rec) {:agent from-agent :error "no-session"})
((nil? (agentic/head sp to-agent)) {:agent to-agent :error "no-such-agent"})
((not (nil? (agentic/session-record sp to-agent))) {:agent to-agent :error "session-exists"})
(else
(let
((state (agentic/session-transition! sp to-agent {:flow (get rec :flow) :resumes (get rec :resumes) :input (get rec :input)})))
(begin
(if
(has-key? state :error)
nil
(agentic/trace!
sp
to-agent
"session"
(str "fork " from-agent)))
state)))))))
; ---- effect-as-data helpers over (request kind payload) envelopes ----
(define
agentic/effect-request?
(fn
(tag)
(and
(list? tag)
(= (len tag) 3)
(= (nth tag 0) "flow-request"))))
(define
agentic/effect-kind
(fn (tag) (if (agentic/effect-request? tag) (nth tag 1) nil)))
(define
agentic/effect-payload
(fn (tag) (if (agentic/effect-request? tag) (nth tag 2) nil)))