Some checks failed
Test, Build, and Deploy / test-build-deploy (push) Failing after 50s
execute.sx folds a plan, runs each node via an injected runner (perform in prod, op-table in tests), and memoizes results in a lib/persist kv backend keyed by content-id. Incremental recompute falls out of content addressing: a leaf change reassigns ids across its dirty closure, so re-running hits the unchanged nodes and recomputes only the closure (cold 5 -> rerun 0 -> change 3). Cross-dag subgraph sharing verified. execute 15/15, total 69/69. Co-Authored-By: Claude Opus 4.8 (1M context) <noreply@anthropic.com>
83 lines
2.8 KiB
Plaintext
83 lines
2.8 KiB
Plaintext
; lib/artdag/execute.sx — Phase 4: interpret a plan with a content-addressed
|
|
; memo cache. A node's result is keyed by its content-id, so a node whose id is
|
|
; already in the cache is skipped (cache hit). Because changing a leaf changes
|
|
; the content-ids of its whole dirty closure, re-running recomputes exactly those
|
|
; nodes and cache-hits the rest — incremental recompute falls out of content
|
|
; addressing. Depends on dag.sx and plan.sx; the cache is a lib/persist/ backend.
|
|
|
|
; runner: (fn (op params input-results) -> result). The injected effect interface.
|
|
; In production this performs the op (perform -> JAX/IPFS adapter); in tests it
|
|
; dispatches a pure SX op over its already-computed input results.
|
|
|
|
; build a runner from a dict of op-name -> (fn (params inputs) -> result).
|
|
(define
|
|
artdag/op-table-runner
|
|
(fn (table) (fn (op params inputs) ((get table op) params inputs))))
|
|
|
|
; resolve an input id's result: this run's results first, then the warm cache.
|
|
(define
|
|
artdag/-input-result
|
|
(fn
|
|
(results cache in)
|
|
(if (has-key? results in) (get results in) (persist/kv-get cache in))))
|
|
|
|
(define
|
|
artdag/-exec-node
|
|
(fn
|
|
(dag runner cache acc id)
|
|
(let
|
|
((node (artdag/dag-get dag id)))
|
|
(if
|
|
(persist/kv-has? cache id)
|
|
(assoc
|
|
acc
|
|
:results (assoc (get acc :results) id (persist/kv-get cache id))
|
|
:hits (concat (get acc :hits) (list id)))
|
|
(let
|
|
((inputs (map (fn (in) (artdag/-input-result (get acc :results) cache in)) (artdag/node-inputs node))))
|
|
(let
|
|
((result (runner (artdag/node-op node) (artdag/node-params node) inputs)))
|
|
(begin
|
|
(persist/kv-put cache id result)
|
|
(assoc
|
|
acc
|
|
:results (assoc (get acc :results) id result)
|
|
:recomputed (concat (get acc :recomputed) (list id))))))))))
|
|
|
|
; execute a plan against a memo cache, returning {:results :recomputed :hits}.
|
|
(define
|
|
artdag/execute
|
|
(fn
|
|
(dag plan runner cache)
|
|
(reduce
|
|
(fn (acc id) (artdag/-exec-node dag runner cache acc id))
|
|
{:recomputed (list) :results {} :hits (list)}
|
|
(artdag/plan-flatten plan))))
|
|
|
|
; full run over every node, unlimited width.
|
|
(define
|
|
artdag/run
|
|
(fn
|
|
(dag runner cache)
|
|
(artdag/execute dag (artdag/plan dag 0) runner cache)))
|
|
|
|
; incremental run: schedule only the dirty closure of the changed nodes.
|
|
(define
|
|
artdag/run-dirty
|
|
(fn
|
|
(dag changed runner cache)
|
|
(artdag/execute
|
|
dag
|
|
(artdag/plan-dirty dag changed 0)
|
|
runner
|
|
cache)))
|
|
|
|
; ---- result inspection ----
|
|
|
|
(define artdag/result-of (fn (exec id) (get (get exec :results) id)))
|
|
(define
|
|
artdag/recomputed
|
|
(fn (exec) (artdag/sort-strings (get exec :recomputed))))
|
|
(define artdag/recompute-count (fn (exec) (len (get exec :recomputed))))
|
|
(define artdag/hit-count (fn (exec) (len (get exec :hits))))
|