; lib/agentic/durable.sx — agentic-sx Phase 4: long agent sessions as ; DURABLE flow workflows. Deterministic replay IS the durability mechanism: ; every transition re-runs a self-contained flow program (the session's ; defflow source + flow/start + a replay of every recorded resume value), ; so the only durable state is a plain record {:flow :input :resumes ...} ; in the persist kv store — restart-safe by construction, and forking an ; agent run is literally copying the record (both replays then diverge). ; Effects are data: a suspended session exposes its suspend tag / typed ; (request kind payload) envelope to the host as plain SX values. ; Transitions also land in the agent's Phase-3 trace buffer, so the session ; history travels with the agent's next commit. ; Convention for session flows: suspend tags are quoted symbols, decision ; values are numbers/strings/lists of those (see agentic/scm-lit). ; Requires: lib/agentic/trace.sx (and its deps), lib/flow/* (+ scheme stack). ; ---- SX -> Scheme literal (numbers, strings, booleans, lists; nil = ()) ---- (define agentic/scm-lit (fn (v) (cond ((nil? v) "(list)") ((= v true) "#t") ((= v false) "#f") ((number? v) (str v)) ((string? v) (str "\"" v "\"")) ((list? v) (str "(list " (join " " (map agentic/scm-lit v)) ")")) (else "(list)")))) ; ---- Scheme -> SX: unbox {:scm-string ...} recursively ---- (define agentic/scm-out (fn (v) (cond ((and (dict? v) (has-key? v :scm-string)) (get v :scm-string)) ((list? v) (map agentic/scm-out v)) (else v)))) ; ---- kv keys (namespaced under the repo prefix) ---- (define agentic/session-def-key (fn (sp name) (str (get (agentic/space-repo sp) :prefix) "/session-def/" name))) (define agentic/session-key (fn (sp agent) (str (get (agentic/space-repo sp) :prefix) "/session/" agent))) ; ---- durable session flow definitions ---- (define agentic/defsession! (fn (sp name scheme-src) (begin (persist/kv-put (git/repo-db (agentic/space-repo sp)) (agentic/session-def-key sp name) scheme-src) name))) (define agentic/session-def (fn (sp name) (persist/kv-get (git/repo-db (agentic/space-repo sp)) (agentic/session-def-key sp name)))) (define agentic/session-record (fn (sp agent) (persist/kv-get (git/repo-db (agentic/space-repo sp)) (agentic/session-key sp agent)))) ; ---- one self-contained replay program per transition ---- ; a fresh flow-run resets the flow store, so the started flow is always id 1 (define agentic/session-program (fn (defs name input resumes) (str defs "\n" "(define s0 (flow/start " name " " (agentic/scm-lit input) "))\n" (join "\n" (map (fn (v) (str "(flow/resume 1 " (agentic/scm-lit v) ")")) resumes)) "\n(list (flow/status 1) (flow/pending) (flow/result 1))"))) ; replay the record, derive {:status :tag/:result}, persist record+state (define agentic/session-transition! (fn (sp agent record) (let ((defs (agentic/session-def sp (get record :flow)))) (if (nil? defs) {:flow (get record :flow) :error "no-such-session-flow"} (let ((out (flow-run (agentic/session-program defs (get record :flow) (get record :input) (get record :resumes))))) (let ((status (agentic/scm-out (nth out 0))) (pending (agentic/scm-out (nth out 1)))) (let ((state (cond ((= status "done") {:status "done" :result (agentic/scm-out (nth out 2))}) ((= status "suspended") {:tag (nth (nth pending 0) 1) :status "suspended"}) (else {:status status})))) (begin (persist/kv-put (git/repo-db (agentic/space-repo sp)) (agentic/session-key sp agent) (merge {:flow (get record :flow) :resumes (get record :resumes) :input (get record :input)} state)) state)))))))) ; ---- lifecycle ---- (define agentic/session-start! (fn (sp agent flow-name input) (if (nil? (agentic/head sp agent)) {:agent agent :error "no-such-agent"} (let ((state (agentic/session-transition! sp agent {:flow flow-name :resumes (list) :input input}))) (begin (if (has-key? state :error) nil (agentic/trace! sp agent "session" (str "start " flow-name))) state))))) (define agentic/session-resume! (fn (sp agent value) (let ((rec (agentic/session-record sp agent))) (cond ((nil? rec) {:agent agent :error "no-session"}) ((not (= (get rec :status) "suspended")) {:agent agent :error "not-suspended"}) (else (let ((state (agentic/session-transition! sp agent {:flow (get rec :flow) :resumes (append (get rec :resumes) (list value)) :input (get rec :input)}))) (begin (if (has-key? state :error) nil (agentic/trace! sp agent "session" (str "resume " (agentic/scm-lit value)))) state))))))) (define agentic/session-status (fn (sp agent) (let ((r (agentic/session-record sp agent))) (if (nil? r) "none" (get r :status))))) (define agentic/session-pending (fn (sp agent) (let ((r (agentic/session-record sp agent))) (if (and (dict? r) (= (get r :status) "suspended")) (get r :tag) nil)))) (define agentic/session-result (fn (sp agent) (let ((r (agentic/session-record sp agent))) (if (and (dict? r) (= (get r :status) "done")) (get r :result) nil)))) ; ---- fork-an-agent-run: copy the record, replay rebuilds the run ---- ; to-agent must already be spawned (branch fork) and session-free (define agentic/session-fork! (fn (sp from-agent to-agent) (let ((rec (agentic/session-record sp from-agent))) (cond ((nil? rec) {:agent from-agent :error "no-session"}) ((nil? (agentic/head sp to-agent)) {:agent to-agent :error "no-such-agent"}) ((not (nil? (agentic/session-record sp to-agent))) {:agent to-agent :error "session-exists"}) (else (let ((state (agentic/session-transition! sp to-agent {:flow (get rec :flow) :resumes (get rec :resumes) :input (get rec :input)}))) (begin (if (has-key? state :error) nil (agentic/trace! sp to-agent "session" (str "fork " from-agent))) state))))))) ; ---- effect-as-data helpers over (request kind payload) envelopes ---- (define agentic/effect-request? (fn (tag) (and (list? tag) (= (len tag) 3) (= (nth tag 0) "flow-request")))) (define agentic/effect-kind (fn (tag) (if (agentic/effect-request? tag) (nth tag 1) nil))) (define agentic/effect-payload (fn (tag) (if (agentic/effect-request? tag) (nth tag 2) nil)))