diff --git a/lib/artdag/conformance.sh b/lib/artdag/conformance.sh index 63c902ec..cb59c336 100755 --- a/lib/artdag/conformance.sh +++ b/lib/artdag/conformance.sh @@ -13,7 +13,7 @@ if [ ! -x "$SX_SERVER" ]; then exit 1 fi -SUITES=(dag analyze) +SUITES=(dag analyze plan) OUT_JSON="lib/artdag/scoreboard.json" OUT_MD="lib/artdag/scoreboard.md" @@ -38,6 +38,7 @@ run_suite() { (load "lib/datalog/api.sx") (load "lib/artdag/dag.sx") (load "lib/artdag/analyze.sx") +(load "lib/artdag/plan.sx") (epoch 2) (eval "(define artdag-test-pass 0)") (eval "(define artdag-test-fail 0)") diff --git a/lib/artdag/plan.sx b/lib/artdag/plan.sx new file mode 100644 index 00000000..c0dde7a8 --- /dev/null +++ b/lib/artdag/plan.sx @@ -0,0 +1,100 @@ +; lib/artdag/plan.sx — Phase 3: schedule a DAG (or its dirty subset) into +; topological batches under a max-parallelism cap. A batch is a set of nodes +; whose deps are all satisfied by earlier batches, so they run in parallel. +; cap <= 0 means unlimited width. Depends on dag.sx and analyze.sx. + +; inputs of id that also lie inside the scheduled set (out-of-set deps are +; treated as already satisfied — e.g. clean cache hits in an incremental plan). +(define + artdag/-deps-in + (fn + (dag id sset) + (filter + (fn (in) (artdag/member? in sset)) + (artdag/node-inputs (artdag/dag-get dag id))))) + +(define + artdag/-ready-in + (fn + (dag sset placed) + (filter + (fn + (id) + (and + (not (artdag/member? id placed)) + (artdag/all-in? (artdag/-deps-in dag id sset) placed))) + (artdag/sort-strings sset)))) + +(define + artdag/-batch-loop + (fn + (dag sset placed batches) + (if + (= (len placed) (len sset)) + batches + (let + ((wave (artdag/-ready-in dag sset placed))) + (artdag/-batch-loop + dag + sset + (concat placed wave) + (concat batches (list wave))))))) + +; split a wave into consecutive chunks of at most n (sorted order preserved). +(define + artdag/-chunk + (fn + (xs n) + (if + (<= (len xs) n) + (list xs) + (cons + (slice xs 0 n) + (artdag/-chunk (slice xs n (len xs)) n))))) + +(define + artdag/-cap-split + (fn + (batches cap) + (if + (<= cap 0) + batches + (reduce + (fn (acc b) (concat acc (artdag/-chunk b cap))) + (list) + batches)))) + +; schedule an explicit set of node-ids into capped topological batches. +(define + artdag/plan-subset + (fn + (dag node-ids cap) + (artdag/-cap-split (artdag/-batch-loop dag node-ids (list) (list)) cap))) + +; full plan over every node in the dag. +(define + artdag/plan + (fn (dag cap) (artdag/plan-subset dag (keys (artdag/dag-nodes dag)) cap))) + +; incremental plan: schedule only the dirty closure of the changed nodes. +(define + artdag/plan-dirty + (fn + (dag changed cap) + (artdag/plan-subset dag (artdag/dirty-closure dag changed) cap))) + +; ---- plan inspection ---- + +(define artdag/plan-batches (fn (plan) (len plan))) + +(define + artdag/plan-width + (fn + (plan) + (reduce (fn (m b) (if (> (len b) m) (len b) m)) 0 plan))) + +(define + artdag/plan-flatten + (fn (plan) (reduce (fn (acc b) (concat acc b)) (list) plan))) + +(define artdag/plan-size (fn (plan) (len (artdag/plan-flatten plan)))) diff --git a/lib/artdag/scoreboard.json b/lib/artdag/scoreboard.json index 46d9d6e8..cb0b8677 100644 --- a/lib/artdag/scoreboard.json +++ b/lib/artdag/scoreboard.json @@ -1,9 +1,10 @@ { "suites": { "dag": {"pass": 20, "fail": 0}, - "analyze": {"pass": 16, "fail": 0} + "analyze": {"pass": 16, "fail": 0}, + "plan": {"pass": 18, "fail": 0} }, - "total_pass": 36, + "total_pass": 54, "total_fail": 0, - "total": 36 + "total": 54 } diff --git a/lib/artdag/scoreboard.md b/lib/artdag/scoreboard.md index 43f3868d..30ab2fc4 100644 --- a/lib/artdag/scoreboard.md +++ b/lib/artdag/scoreboard.md @@ -6,4 +6,5 @@ _Generated by `lib/artdag/conformance.sh`_ |-------|-----:|-----:|------:| | dag | 20 | 0 | 20 | | analyze | 16 | 0 | 16 | -| **Total** | **36** | **0** | **36** | +| plan | 18 | 0 | 18 | +| **Total** | **54** | **0** | **54** | diff --git a/lib/artdag/tests/plan.sx b/lib/artdag/tests/plan.sx new file mode 100644 index 00000000..100155c6 --- /dev/null +++ b/lib/artdag/tests/plan.sx @@ -0,0 +1,122 @@ +; Phase 3 — Plan: topological batches under a parallelism cap, incremental plan. + +; diamond: a -> b, a -> c, (b,c) -> d +(define + pl-D + (artdag/build + (list + (list "a" "load" (list) {}) + (list "b" "f" (list "a") {}) + (list "c" "g" (list "a") {}) + (list "d" "add" (list "b" "c") {} true)))) +(define pl-a (artdag/dag-id pl-D "a")) +(define pl-b (artdag/dag-id pl-D "b")) +(define pl-c (artdag/dag-id pl-D "c")) +(define pl-d (artdag/dag-id pl-D "d")) + +; wide: a -> b, c, e, f (four independent dependents) +(define + pl-W + (artdag/build + (list + (list "a" "load" (list) {}) + (list "b" "f" (list "a") {}) + (list "c" "g" (list "a") {}) + (list "e" "h" (list "a") {}) + (list "f" "k" (list "a") {})))) + +; ---- full plan, unlimited width ---- + +(artdag-test + "full plan: batch count" + (artdag/plan-batches (artdag/plan pl-D 0)) + 3) + +(artdag-test + "full plan: schedules every node" + (artdag/plan-size (artdag/plan pl-D 0)) + 4) + +(artdag-test + "full plan: first batch is the leaf" + (first (artdag/plan pl-D 0)) + (list pl-a)) + +(artdag-test + "full plan: middle batch runs b,c in parallel" + (first (rest (artdag/plan pl-D 0))) + (artdag/sort-strings (list pl-b pl-c))) + +(artdag-test + "full plan: last batch is the sink" + (first (rest (rest (artdag/plan pl-D 0)))) + (list pl-d)) + +(artdag-test + "full plan: max width is 2" + (artdag/plan-width (artdag/plan pl-D 0)) + 2) + +; ---- parallelism cap ---- + +(artdag-test + "cap 1: width never exceeds 1" + (artdag/plan-width (artdag/plan pl-D 1)) + 1) + +(artdag-test + "cap 1: serializes into one node per batch" + (artdag/plan-batches (artdag/plan pl-D 1)) + 4) + +(artdag-test + "cap larger than widest wave is a no-op" + (artdag/plan pl-D 10) + (artdag/plan pl-D 0)) + +(artdag-test + "wide cap 2: width capped at 2" + (artdag/plan-width (artdag/plan pl-W 2)) + 2) + +(artdag-test + "wide cap 2: leaf wave then two capped sub-batches" + (artdag/plan-batches (artdag/plan pl-W 2)) + 3) + +(artdag-test + "wide cap 2: still schedules all five nodes" + (artdag/plan-size (artdag/plan pl-W 2)) + 5) + +(artdag-test + "wide unlimited: single wave of four after leaf" + (artdag/plan-width (artdag/plan pl-W 0)) + 4) + +; ---- incremental (dirty-only) plan ---- + +(artdag-test + "dirty plan: schedules only the dirty closure" + (artdag/plan-size (artdag/plan-dirty pl-D (list pl-b) 0)) + 2) + +(artdag-test + "dirty plan: b then d" + (artdag/plan-dirty pl-D (list pl-b) 0) + (list (list pl-b) (list pl-d))) + +(artdag-test + "dirty plan: clean deps treated as satisfied" + (first (artdag/plan-dirty pl-D (list pl-b) 0)) + (list pl-b)) + +(artdag-test + "dirty plan: leaf change replans whole graph" + (artdag/plan-size (artdag/plan-dirty pl-D (list pl-a) 0)) + 4) + +(artdag-test + "dirty plan: sink change is a single batch" + (artdag/plan-dirty pl-D (list pl-d) 0) + (list (list pl-d))) diff --git a/plans/artdag-on-sx.md b/plans/artdag-on-sx.md index 0ce869c1..4d8ab8e7 100644 --- a/plans/artdag-on-sx.md +++ b/plans/artdag-on-sx.md @@ -30,7 +30,7 @@ edges. ## Status (rolling) -`bash lib/artdag/conformance.sh` → **36/36** (2 suites: dag, analyze) +`bash lib/artdag/conformance.sh` → **54/54** (3 suites: dag, analyze, plan) ## Ground rules @@ -97,10 +97,10 @@ lib/artdag/optimize.sx lib/artdag/federation.sx ## Phase 3 — Plan -- [ ] `lib/artdag/plan.sx` — schedule into topological **batches** (each batch's +- [x] `lib/artdag/plan.sx` — schedule into topological **batches** (each batch's nodes have all deps satisfied → run in parallel); respect a max-parallelism limit -- [ ] plan over the *dirty* subset only (incremental plan) -- [ ] `lib/artdag/tests/plan.sx` — batch correctness, parallelism cap, dirty-only plan +- [x] plan over the *dirty* subset only (incremental plan) +- [x] `lib/artdag/tests/plan.sx` — batch correctness, parallelism cap, dirty-only plan - [ ] (optional/later) miniKanren constraint scheduling — flag, don't block on it ## Phase 4 — Execute (incremental + memoized) @@ -136,6 +136,15 @@ lib/artdag/optimize.sx lib/artdag/federation.sx ## Progress log +- **Phase 3 — Plan** (plan suite 18/18, total 54/54). `lib/artdag/plan.sx`: + `artdag/plan` schedules a dag into Kahn-wave topological batches — each batch's + nodes have all in-scope deps satisfied by earlier batches, so they run in parallel. + A `cap` (>0) splits any wave wider than the cap into consecutive sub-batches; + `cap<=0` is unlimited. `artdag/plan-dirty` schedules only the dirty closure: deps + outside the scheduled set (clean cache hits) count as already satisfied, so a + mid-node change yields just `[[changed]…[downstream]]`. Inspection helpers + `plan-batches`/`plan-width`/`plan-size`/`plan-flatten`. + - **Phase 2 — Analyze on Datalog** (analyze suite 16/16, total 36/36). `lib/artdag/analyze.sx`: `artdag/edge-facts` projects each `(input-id, node-id)` pair to an `(edge ...)` fact; `artdag/analyze` builds a `dl-program-data` db with