artdag: Phase 4 Execute — content-addressed memo + incremental recompute + 15 tests
Some checks failed
Test, Build, and Deploy / test-build-deploy (push) Failing after 50s
Some checks failed
Test, Build, and Deploy / test-build-deploy (push) Failing after 50s
execute.sx folds a plan, runs each node via an injected runner (perform in prod, op-table in tests), and memoizes results in a lib/persist kv backend keyed by content-id. Incremental recompute falls out of content addressing: a leaf change reassigns ids across its dirty closure, so re-running hits the unchanged nodes and recomputes only the closure (cold 5 -> rerun 0 -> change 3). Cross-dag subgraph sharing verified. execute 15/15, total 69/69. Co-Authored-By: Claude Opus 4.8 (1M context) <noreply@anthropic.com>
This commit is contained in:
82
lib/artdag/execute.sx
Normal file
82
lib/artdag/execute.sx
Normal file
@@ -0,0 +1,82 @@
|
||||
; lib/artdag/execute.sx — Phase 4: interpret a plan with a content-addressed
|
||||
; memo cache. A node's result is keyed by its content-id, so a node whose id is
|
||||
; already in the cache is skipped (cache hit). Because changing a leaf changes
|
||||
; the content-ids of its whole dirty closure, re-running recomputes exactly those
|
||||
; nodes and cache-hits the rest — incremental recompute falls out of content
|
||||
; addressing. Depends on dag.sx and plan.sx; the cache is a lib/persist/ backend.
|
||||
|
||||
; runner: (fn (op params input-results) -> result). The injected effect interface.
|
||||
; In production this performs the op (perform -> JAX/IPFS adapter); in tests it
|
||||
; dispatches a pure SX op over its already-computed input results.
|
||||
|
||||
; build a runner from a dict of op-name -> (fn (params inputs) -> result).
|
||||
(define
|
||||
artdag/op-table-runner
|
||||
(fn (table) (fn (op params inputs) ((get table op) params inputs))))
|
||||
|
||||
; resolve an input id's result: this run's results first, then the warm cache.
|
||||
(define
|
||||
artdag/-input-result
|
||||
(fn
|
||||
(results cache in)
|
||||
(if (has-key? results in) (get results in) (persist/kv-get cache in))))
|
||||
|
||||
(define
|
||||
artdag/-exec-node
|
||||
(fn
|
||||
(dag runner cache acc id)
|
||||
(let
|
||||
((node (artdag/dag-get dag id)))
|
||||
(if
|
||||
(persist/kv-has? cache id)
|
||||
(assoc
|
||||
acc
|
||||
:results (assoc (get acc :results) id (persist/kv-get cache id))
|
||||
:hits (concat (get acc :hits) (list id)))
|
||||
(let
|
||||
((inputs (map (fn (in) (artdag/-input-result (get acc :results) cache in)) (artdag/node-inputs node))))
|
||||
(let
|
||||
((result (runner (artdag/node-op node) (artdag/node-params node) inputs)))
|
||||
(begin
|
||||
(persist/kv-put cache id result)
|
||||
(assoc
|
||||
acc
|
||||
:results (assoc (get acc :results) id result)
|
||||
:recomputed (concat (get acc :recomputed) (list id))))))))))
|
||||
|
||||
; execute a plan against a memo cache, returning {:results :recomputed :hits}.
|
||||
(define
|
||||
artdag/execute
|
||||
(fn
|
||||
(dag plan runner cache)
|
||||
(reduce
|
||||
(fn (acc id) (artdag/-exec-node dag runner cache acc id))
|
||||
{:recomputed (list) :results {} :hits (list)}
|
||||
(artdag/plan-flatten plan))))
|
||||
|
||||
; full run over every node, unlimited width.
|
||||
(define
|
||||
artdag/run
|
||||
(fn
|
||||
(dag runner cache)
|
||||
(artdag/execute dag (artdag/plan dag 0) runner cache)))
|
||||
|
||||
; incremental run: schedule only the dirty closure of the changed nodes.
|
||||
(define
|
||||
artdag/run-dirty
|
||||
(fn
|
||||
(dag changed runner cache)
|
||||
(artdag/execute
|
||||
dag
|
||||
(artdag/plan-dirty dag changed 0)
|
||||
runner
|
||||
cache)))
|
||||
|
||||
; ---- result inspection ----
|
||||
|
||||
(define artdag/result-of (fn (exec id) (get (get exec :results) id)))
|
||||
(define
|
||||
artdag/recomputed
|
||||
(fn (exec) (artdag/sort-strings (get exec :recomputed))))
|
||||
(define artdag/recompute-count (fn (exec) (len (get exec :recomputed))))
|
||||
(define artdag/hit-count (fn (exec) (len (get exec :hits))))
|
||||
Reference in New Issue
Block a user