Deterministic replay IS the durability mechanism: every transition re-runs a
self-contained flow program (defflow source + flow/start + replay of all
recorded resume values), so the only durable state is {:flow :input :resumes}
in persist kv — restart-safe by construction (fresh space handles over the
same backend resume mid-flight runs). fork-an-agent-run = copy the record;
the two replays diverge independently. Effects are data (suspend tags +
typed request envelopes surface as plain SX); transitions ride the Phase-3
trace buffer so session history travels with the next commit. Guest numeric
results compared with = per house convention. 43/43 (196/196 total).
Co-Authored-By: Claude Fable 5 <noreply@anthropic.com>
235 lines
7.1 KiB
Plaintext
235 lines
7.1 KiB
Plaintext
; lib/agentic/durable.sx — agentic-sx Phase 4: long agent sessions as
|
|
; DURABLE flow workflows. Deterministic replay IS the durability mechanism:
|
|
; every transition re-runs a self-contained flow program (the session's
|
|
; defflow source + flow/start + a replay of every recorded resume value),
|
|
; so the only durable state is a plain record {:flow :input :resumes ...}
|
|
; in the persist kv store — restart-safe by construction, and forking an
|
|
; agent run is literally copying the record (both replays then diverge).
|
|
; Effects are data: a suspended session exposes its suspend tag / typed
|
|
; (request kind payload) envelope to the host as plain SX values.
|
|
; Transitions also land in the agent's Phase-3 trace buffer, so the session
|
|
; history travels with the agent's next commit.
|
|
; Convention for session flows: suspend tags are quoted symbols, decision
|
|
; values are numbers/strings/lists of those (see agentic/scm-lit).
|
|
; Requires: lib/agentic/trace.sx (and its deps), lib/flow/* (+ scheme stack).
|
|
|
|
; ---- SX -> Scheme literal (numbers, strings, booleans, lists; nil = ()) ----
|
|
(define
|
|
agentic/scm-lit
|
|
(fn
|
|
(v)
|
|
(cond
|
|
((nil? v) "(list)")
|
|
((= v true) "#t")
|
|
((= v false) "#f")
|
|
((number? v) (str v))
|
|
((string? v) (str "\"" v "\""))
|
|
((list? v) (str "(list " (join " " (map agentic/scm-lit v)) ")"))
|
|
(else "(list)"))))
|
|
|
|
; ---- Scheme -> SX: unbox {:scm-string ...} recursively ----
|
|
(define
|
|
agentic/scm-out
|
|
(fn
|
|
(v)
|
|
(cond
|
|
((and (dict? v) (has-key? v :scm-string)) (get v :scm-string))
|
|
((list? v) (map agentic/scm-out v))
|
|
(else v))))
|
|
|
|
; ---- kv keys (namespaced under the repo prefix) ----
|
|
(define
|
|
agentic/session-def-key
|
|
(fn
|
|
(sp name)
|
|
(str (get (agentic/space-repo sp) :prefix) "/session-def/" name)))
|
|
|
|
(define
|
|
agentic/session-key
|
|
(fn
|
|
(sp agent)
|
|
(str (get (agentic/space-repo sp) :prefix) "/session/" agent)))
|
|
|
|
; ---- durable session flow definitions ----
|
|
(define
|
|
agentic/defsession!
|
|
(fn
|
|
(sp name scheme-src)
|
|
(begin
|
|
(persist/kv-put
|
|
(git/repo-db (agentic/space-repo sp))
|
|
(agentic/session-def-key sp name)
|
|
scheme-src)
|
|
name)))
|
|
|
|
(define
|
|
agentic/session-def
|
|
(fn
|
|
(sp name)
|
|
(persist/kv-get
|
|
(git/repo-db (agentic/space-repo sp))
|
|
(agentic/session-def-key sp name))))
|
|
|
|
(define
|
|
agentic/session-record
|
|
(fn
|
|
(sp agent)
|
|
(persist/kv-get
|
|
(git/repo-db (agentic/space-repo sp))
|
|
(agentic/session-key sp agent))))
|
|
|
|
; ---- one self-contained replay program per transition ----
|
|
; a fresh flow-run resets the flow store, so the started flow is always id 1
|
|
(define
|
|
agentic/session-program
|
|
(fn
|
|
(defs name input resumes)
|
|
(str
|
|
defs
|
|
"\n"
|
|
"(define s0 (flow/start "
|
|
name
|
|
" "
|
|
(agentic/scm-lit input)
|
|
"))\n"
|
|
(join
|
|
"\n"
|
|
(map
|
|
(fn (v) (str "(flow/resume 1 " (agentic/scm-lit v) ")"))
|
|
resumes))
|
|
"\n(list (flow/status 1) (flow/pending) (flow/result 1))")))
|
|
|
|
; replay the record, derive {:status :tag/:result}, persist record+state
|
|
(define
|
|
agentic/session-transition!
|
|
(fn
|
|
(sp agent record)
|
|
(let
|
|
((defs (agentic/session-def sp (get record :flow))))
|
|
(if
|
|
(nil? defs)
|
|
{:flow (get record :flow) :error "no-such-session-flow"}
|
|
(let
|
|
((out (flow-run (agentic/session-program defs (get record :flow) (get record :input) (get record :resumes)))))
|
|
(let
|
|
((status (agentic/scm-out (nth out 0)))
|
|
(pending (agentic/scm-out (nth out 1))))
|
|
(let
|
|
((state (cond ((= status "done") {:status "done" :result (agentic/scm-out (nth out 2))}) ((= status "suspended") {:tag (nth (nth pending 0) 1) :status "suspended"}) (else {:status status}))))
|
|
(begin
|
|
(persist/kv-put
|
|
(git/repo-db (agentic/space-repo sp))
|
|
(agentic/session-key sp agent)
|
|
(merge {:flow (get record :flow) :resumes (get record :resumes) :input (get record :input)} state))
|
|
state))))))))
|
|
|
|
; ---- lifecycle ----
|
|
(define
|
|
agentic/session-start!
|
|
(fn
|
|
(sp agent flow-name input)
|
|
(if
|
|
(nil? (agentic/head sp agent))
|
|
{:agent agent :error "no-such-agent"}
|
|
(let
|
|
((state (agentic/session-transition! sp agent {:flow flow-name :resumes (list) :input input})))
|
|
(begin
|
|
(if
|
|
(has-key? state :error)
|
|
nil
|
|
(agentic/trace! sp agent "session" (str "start " flow-name)))
|
|
state)))))
|
|
|
|
(define
|
|
agentic/session-resume!
|
|
(fn
|
|
(sp agent value)
|
|
(let
|
|
((rec (agentic/session-record sp agent)))
|
|
(cond
|
|
((nil? rec) {:agent agent :error "no-session"})
|
|
((not (= (get rec :status) "suspended")) {:agent agent :error "not-suspended"})
|
|
(else
|
|
(let
|
|
((state (agentic/session-transition! sp agent {:flow (get rec :flow) :resumes (append (get rec :resumes) (list value)) :input (get rec :input)})))
|
|
(begin
|
|
(if
|
|
(has-key? state :error)
|
|
nil
|
|
(agentic/trace!
|
|
sp
|
|
agent
|
|
"session"
|
|
(str "resume " (agentic/scm-lit value))))
|
|
state)))))))
|
|
|
|
(define
|
|
agentic/session-status
|
|
(fn
|
|
(sp agent)
|
|
(let
|
|
((r (agentic/session-record sp agent)))
|
|
(if (nil? r) "none" (get r :status)))))
|
|
|
|
(define
|
|
agentic/session-pending
|
|
(fn
|
|
(sp agent)
|
|
(let
|
|
((r (agentic/session-record sp agent)))
|
|
(if
|
|
(and (dict? r) (= (get r :status) "suspended"))
|
|
(get r :tag)
|
|
nil))))
|
|
|
|
(define
|
|
agentic/session-result
|
|
(fn
|
|
(sp agent)
|
|
(let
|
|
((r (agentic/session-record sp agent)))
|
|
(if (and (dict? r) (= (get r :status) "done")) (get r :result) nil))))
|
|
|
|
; ---- fork-an-agent-run: copy the record, replay rebuilds the run ----
|
|
; to-agent must already be spawned (branch fork) and session-free
|
|
(define
|
|
agentic/session-fork!
|
|
(fn
|
|
(sp from-agent to-agent)
|
|
(let
|
|
((rec (agentic/session-record sp from-agent)))
|
|
(cond
|
|
((nil? rec) {:agent from-agent :error "no-session"})
|
|
((nil? (agentic/head sp to-agent)) {:agent to-agent :error "no-such-agent"})
|
|
((not (nil? (agentic/session-record sp to-agent))) {:agent to-agent :error "session-exists"})
|
|
(else
|
|
(let
|
|
((state (agentic/session-transition! sp to-agent {:flow (get rec :flow) :resumes (get rec :resumes) :input (get rec :input)})))
|
|
(begin
|
|
(if
|
|
(has-key? state :error)
|
|
nil
|
|
(agentic/trace!
|
|
sp
|
|
to-agent
|
|
"session"
|
|
(str "fork " from-agent)))
|
|
state)))))))
|
|
|
|
; ---- effect-as-data helpers over (request kind payload) envelopes ----
|
|
(define
|
|
agentic/effect-request?
|
|
(fn
|
|
(tag)
|
|
(and
|
|
(list? tag)
|
|
(= (len tag) 3)
|
|
(= (nth tag 0) "flow-request"))))
|
|
|
|
(define
|
|
agentic/effect-kind
|
|
(fn (tag) (if (agentic/effect-request? tag) (nth tag 1) nil)))
|
|
|
|
(define
|
|
agentic/effect-payload
|
|
(fn (tag) (if (agentic/effect-request? tag) (nth tag 2) nil))) |