Files
rose-ash/lib/artdag/execute.sx
giles a2f4fb5e89
Some checks failed
Test, Build, and Deploy / test-build-deploy (push) Failing after 50s
artdag: Phase 4 Execute — content-addressed memo + incremental recompute + 15 tests
execute.sx folds a plan, runs each node via an injected runner (perform in
prod, op-table in tests), and memoizes results in a lib/persist kv backend
keyed by content-id. Incremental recompute falls out of content addressing:
a leaf change reassigns ids across its dirty closure, so re-running hits the
unchanged nodes and recomputes only the closure (cold 5 -> rerun 0 -> change 3).
Cross-dag subgraph sharing verified. execute 15/15, total 69/69.

Co-Authored-By: Claude Opus 4.8 (1M context) <noreply@anthropic.com>
2026-06-07 12:00:50 +00:00

83 lines
2.8 KiB
Plaintext

; lib/artdag/execute.sx — Phase 4: interpret a plan with a content-addressed
; memo cache. A node's result is keyed by its content-id, so a node whose id is
; already in the cache is skipped (cache hit). Because changing a leaf changes
; the content-ids of its whole dirty closure, re-running recomputes exactly those
; nodes and cache-hits the rest — incremental recompute falls out of content
; addressing. Depends on dag.sx and plan.sx; the cache is a lib/persist/ backend.
; runner: (fn (op params input-results) -> result). The injected effect interface.
; In production this performs the op (perform -> JAX/IPFS adapter); in tests it
; dispatches a pure SX op over its already-computed input results.
; build a runner from a dict of op-name -> (fn (params inputs) -> result).
(define
artdag/op-table-runner
(fn (table) (fn (op params inputs) ((get table op) params inputs))))
; resolve an input id's result: this run's results first, then the warm cache.
(define
artdag/-input-result
(fn
(results cache in)
(if (has-key? results in) (get results in) (persist/kv-get cache in))))
(define
artdag/-exec-node
(fn
(dag runner cache acc id)
(let
((node (artdag/dag-get dag id)))
(if
(persist/kv-has? cache id)
(assoc
acc
:results (assoc (get acc :results) id (persist/kv-get cache id))
:hits (concat (get acc :hits) (list id)))
(let
((inputs (map (fn (in) (artdag/-input-result (get acc :results) cache in)) (artdag/node-inputs node))))
(let
((result (runner (artdag/node-op node) (artdag/node-params node) inputs)))
(begin
(persist/kv-put cache id result)
(assoc
acc
:results (assoc (get acc :results) id result)
:recomputed (concat (get acc :recomputed) (list id))))))))))
; execute a plan against a memo cache, returning {:results :recomputed :hits}.
(define
artdag/execute
(fn
(dag plan runner cache)
(reduce
(fn (acc id) (artdag/-exec-node dag runner cache acc id))
{:recomputed (list) :results {} :hits (list)}
(artdag/plan-flatten plan))))
; full run over every node, unlimited width.
(define
artdag/run
(fn
(dag runner cache)
(artdag/execute dag (artdag/plan dag 0) runner cache)))
; incremental run: schedule only the dirty closure of the changed nodes.
(define
artdag/run-dirty
(fn
(dag changed runner cache)
(artdag/execute
dag
(artdag/plan-dirty dag changed 0)
runner
cache)))
; ---- result inspection ----
(define artdag/result-of (fn (exec id) (get (get exec :results) id)))
(define
artdag/recomputed
(fn (exec) (artdag/sort-strings (get exec :recomputed))))
(define artdag/recompute-count (fn (exec) (len (get exec :recomputed))))
(define artdag/hit-count (fn (exec) (len (get exec :hits))))