Some checks failed
Test, Build, and Deploy / test-build-deploy (push) Has been cancelled
412 lines
11 KiB
Plaintext
412 lines
11 KiB
Plaintext
;; Erlang runtime — scheduler, process records, mailbox queue.
|
|
;; Phase 3 foundation. spawn/send/receive build on these primitives.
|
|
;;
|
|
;; Scheduler is a single global dict in `er-scheduler` holding:
|
|
;; :next-pid INT — counter for fresh pid allocation
|
|
;; :processes DICT — pid-key (string) -> process record
|
|
;; :runnable QUEUE — FIFO of pids ready to run
|
|
;; :current PID — pid currently executing, or nil
|
|
;;
|
|
;; A pid value is tagged: {:tag "pid" :id INT}. Pids compare by id.
|
|
;;
|
|
;; Process record fields:
|
|
;; :pid — this process's pid
|
|
;; :mailbox — queue of received messages (arrival order)
|
|
;; :state — "runnable" | "running" | "waiting" | "exiting" | "dead"
|
|
;; :continuation — saved k (for receive suspension); nil otherwise
|
|
;; :receive-pats — patterns the process is blocked on; nil otherwise
|
|
;; :trap-exit — bool
|
|
;; :links — list of pids
|
|
;; :monitors — list of {:ref :pid}
|
|
;; :env — Erlang env at the last yield
|
|
;; :exit-reason — nil until the process exits
|
|
;;
|
|
;; Queue — amortised-O(1) FIFO with head-pointer + slab-compact:
|
|
;; {:items (list...) :head-idx INT}
|
|
|
|
;; ── queue ────────────────────────────────────────────────────────
|
|
(define er-q-new (fn () {:head-idx 0 :items (list)}))
|
|
|
|
(define er-q-push! (fn (q x) (append! (get q :items) x)))
|
|
|
|
(define
|
|
er-q-pop!
|
|
(fn
|
|
(q)
|
|
(let
|
|
((h (get q :head-idx)) (items (get q :items)))
|
|
(if
|
|
(>= h (len items))
|
|
nil
|
|
(let
|
|
((x (nth items h)))
|
|
(dict-set! q :head-idx (+ h 1))
|
|
(er-q-compact! q)
|
|
x)))))
|
|
|
|
(define
|
|
er-q-peek
|
|
(fn
|
|
(q)
|
|
(let
|
|
((h (get q :head-idx)) (items (get q :items)))
|
|
(if (>= h (len items)) nil (nth items h)))))
|
|
|
|
(define
|
|
er-q-len
|
|
(fn (q) (- (len (get q :items)) (get q :head-idx))))
|
|
|
|
(define er-q-empty? (fn (q) (= (er-q-len q) 0)))
|
|
|
|
;; Compact the backing list when the head pointer gets large so the
|
|
;; queue doesn't grow without bound. Threshold chosen to amortise the
|
|
;; O(n) copy — pops are still amortised O(1).
|
|
(define
|
|
er-q-compact!
|
|
(fn
|
|
(q)
|
|
(let
|
|
((h (get q :head-idx)) (items (get q :items)))
|
|
(when
|
|
(> h 128)
|
|
(let
|
|
((new (list)))
|
|
(for-each
|
|
(fn (i) (append! new (nth items i)))
|
|
(range h (len items)))
|
|
(dict-set! q :items new)
|
|
(dict-set! q :head-idx 0))))))
|
|
|
|
(define
|
|
er-q-to-list
|
|
(fn
|
|
(q)
|
|
(let
|
|
((h (get q :head-idx)) (items (get q :items)) (out (list)))
|
|
(for-each
|
|
(fn (i) (append! out (nth items i)))
|
|
(range h (len items)))
|
|
out)))
|
|
|
|
;; Read the i'th entry (relative to head) without popping.
|
|
(define
|
|
er-q-nth
|
|
(fn (q i) (nth (get q :items) (+ (get q :head-idx) i))))
|
|
|
|
;; Remove entry at logical index i, shift tail in.
|
|
(define
|
|
er-q-delete-at!
|
|
(fn
|
|
(q i)
|
|
(let
|
|
((h (get q :head-idx)) (items (get q :items)) (new (list)))
|
|
(for-each
|
|
(fn
|
|
(j)
|
|
(when (not (= j (+ h i))) (append! new (nth items j))))
|
|
(range h (len items)))
|
|
(dict-set! q :items new)
|
|
(dict-set! q :head-idx 0))))
|
|
|
|
;; ── pids ─────────────────────────────────────────────────────────
|
|
(define er-mk-pid (fn (id) {:id id :tag "pid"}))
|
|
(define er-pid? (fn (v) (er-is-tagged? v "pid")))
|
|
(define er-pid-id (fn (pid) (get pid :id)))
|
|
(define er-pid-key (fn (pid) (str "p" (er-pid-id pid))))
|
|
(define
|
|
er-pid-equal?
|
|
(fn (a b) (and (er-pid? a) (er-pid? b) (= (er-pid-id a) (er-pid-id b)))))
|
|
|
|
;; ── scheduler state ──────────────────────────────────────────────
|
|
(define er-scheduler (list nil))
|
|
|
|
(define
|
|
er-sched-init!
|
|
(fn
|
|
()
|
|
(set-nth!
|
|
er-scheduler
|
|
0
|
|
{:next-pid 0
|
|
:current nil
|
|
:processes {}
|
|
:runnable (er-q-new)})))
|
|
|
|
(define er-sched (fn () (nth er-scheduler 0)))
|
|
|
|
(define
|
|
er-pid-new!
|
|
(fn
|
|
()
|
|
(let
|
|
((s (er-sched)))
|
|
(let
|
|
((n (get s :next-pid)))
|
|
(dict-set! s :next-pid (+ n 1))
|
|
(er-mk-pid n)))))
|
|
|
|
(define
|
|
er-sched-runnable
|
|
(fn () (get (er-sched) :runnable)))
|
|
|
|
(define
|
|
er-sched-processes
|
|
(fn () (get (er-sched) :processes)))
|
|
|
|
(define
|
|
er-sched-enqueue!
|
|
(fn (pid) (er-q-push! (er-sched-runnable) pid)))
|
|
|
|
(define
|
|
er-sched-next-runnable!
|
|
(fn () (er-q-pop! (er-sched-runnable))))
|
|
|
|
(define
|
|
er-sched-runnable-count
|
|
(fn () (er-q-len (er-sched-runnable))))
|
|
|
|
(define
|
|
er-sched-set-current!
|
|
(fn (pid) (dict-set! (er-sched) :current pid)))
|
|
|
|
(define er-sched-current-pid (fn () (get (er-sched) :current)))
|
|
|
|
(define
|
|
er-sched-process-count
|
|
(fn () (len (keys (er-sched-processes)))))
|
|
|
|
;; ── process records ──────────────────────────────────────────────
|
|
(define
|
|
er-proc-new!
|
|
(fn
|
|
(env)
|
|
(let
|
|
((pid (er-pid-new!)))
|
|
(let
|
|
((proc
|
|
{:pid pid
|
|
:env env
|
|
:links (list)
|
|
:mailbox (er-q-new)
|
|
:state "runnable"
|
|
:monitors (list)
|
|
:continuation nil
|
|
:receive-pats nil
|
|
:trap-exit false
|
|
:has-timeout false
|
|
:timed-out false
|
|
:exit-reason nil}))
|
|
(dict-set! (er-sched-processes) (er-pid-key pid) proc)
|
|
(er-sched-enqueue! pid)
|
|
proc))))
|
|
|
|
(define
|
|
er-proc-get
|
|
(fn (pid) (get (er-sched-processes) (er-pid-key pid))))
|
|
|
|
(define
|
|
er-proc-exists?
|
|
(fn (pid) (dict-has? (er-sched-processes) (er-pid-key pid))))
|
|
|
|
(define
|
|
er-proc-field
|
|
(fn (pid field) (get (er-proc-get pid) field)))
|
|
|
|
(define
|
|
er-proc-set!
|
|
(fn
|
|
(pid field val)
|
|
(let
|
|
((p (er-proc-get pid)))
|
|
(if
|
|
(= p nil)
|
|
(error (str "Erlang: no such process " (er-pid-key pid)))
|
|
(dict-set! p field val)))))
|
|
|
|
(define
|
|
er-proc-mailbox-push!
|
|
(fn (pid msg) (er-q-push! (er-proc-field pid :mailbox) msg)))
|
|
|
|
(define
|
|
er-proc-mailbox-size
|
|
(fn (pid) (er-q-len (er-proc-field pid :mailbox))))
|
|
|
|
;; Main process is always pid 0 (scheduler starts with next-pid 0 and
|
|
;; erlang-eval-ast calls er-proc-new! first). Returns nil if no eval
|
|
;; has run.
|
|
(define
|
|
er-main-pid
|
|
(fn () (er-mk-pid 0)))
|
|
|
|
(define
|
|
er-last-main-exit-reason
|
|
(fn
|
|
()
|
|
(if
|
|
(er-proc-exists? (er-main-pid))
|
|
(er-proc-field (er-main-pid) :exit-reason)
|
|
nil)))
|
|
|
|
;; ── process BIFs ────────────────────────────────────────────────
|
|
(define
|
|
er-bif-is-pid
|
|
(fn (vs) (er-bool (er-pid? (er-bif-arg1 vs "is_pid")))))
|
|
|
|
(define
|
|
er-bif-self
|
|
(fn
|
|
(vs)
|
|
(if
|
|
(not (= (len vs) 0))
|
|
(error "Erlang: self/0: arity")
|
|
(let
|
|
((pid (er-sched-current-pid)))
|
|
(if
|
|
(= pid nil)
|
|
(error "Erlang: self/0: no current process")
|
|
pid)))))
|
|
|
|
(define
|
|
er-bif-spawn
|
|
(fn
|
|
(vs)
|
|
(cond
|
|
(= (len vs) 1) (er-spawn-fun (nth vs 0))
|
|
(= (len vs) 3) (error
|
|
"Erlang: spawn/3: module-based spawn deferred to Phase 5 (modules)")
|
|
:else (error "Erlang: spawn: wrong arity"))))
|
|
|
|
(define
|
|
er-spawn-fun
|
|
(fn
|
|
(fv)
|
|
(if
|
|
(not (er-fun? fv))
|
|
(error "Erlang: spawn/1: not a fun")
|
|
(let
|
|
((proc (er-proc-new! (er-env-new))))
|
|
(dict-set! proc :initial-fun fv)
|
|
(get proc :pid)))))
|
|
|
|
(define
|
|
er-bif-exit
|
|
(fn
|
|
(vs)
|
|
(cond
|
|
(= (len vs) 1) (raise (er-mk-exit-marker (nth vs 0)))
|
|
(= (len vs) 2)
|
|
(error
|
|
"Erlang: exit/2 (signal another process) deferred to Phase 4 (links)")
|
|
:else (error "Erlang: exit: wrong arity"))))
|
|
|
|
;; ── scheduler loop ──────────────────────────────────────────────
|
|
;; Each scheduler step wraps the process body in `guard`. `receive`
|
|
;; with no match captures a `call/cc` continuation onto the proc
|
|
;; record and then `raise`s `er-suspend-marker`; the guard catches
|
|
;; the raise and the scheduler moves on. `exit/1` raises an exit
|
|
;; marker the same way. Resumption from a saved continuation also
|
|
;; runs under a fresh `guard` so a resumed receive that needs to
|
|
;; suspend again has a handler to unwind to. `shift`/`reset` aren't
|
|
;; usable here because SX's captured delimited continuations don't
|
|
;; re-establish their own reset boundary when invoked — a second
|
|
;; suspension during replay raises "shift without enclosing reset".
|
|
(define er-suspend-marker {:tag "er-suspend-marker"})
|
|
|
|
(define
|
|
er-suspended?
|
|
(fn
|
|
(v)
|
|
(and
|
|
(= (type-of v) "dict")
|
|
(= (get v :tag) "er-suspend-marker"))))
|
|
|
|
(define
|
|
er-exited?
|
|
(fn
|
|
(v)
|
|
(and
|
|
(= (type-of v) "dict")
|
|
(= (get v :tag) "er-exit-marker"))))
|
|
|
|
(define
|
|
er-mk-exit-marker
|
|
(fn (reason) {:tag "er-exit-marker" :reason reason}))
|
|
|
|
(define
|
|
er-sched-run-all!
|
|
(fn
|
|
()
|
|
(let
|
|
((pid (er-sched-next-runnable!)))
|
|
(cond
|
|
(not (= pid nil))
|
|
(do (er-sched-step! pid) (er-sched-run-all!))
|
|
;; Queue empty — fire one pending receive-with-timeout and go again.
|
|
(er-sched-fire-one-timeout!) (er-sched-run-all!)
|
|
:else nil))))
|
|
|
|
;; Wake one waiting process whose receive had an `after Ms` clause.
|
|
;; Returns true if one fired. In our synchronous model "time passes"
|
|
;; once the runnable queue drains — timeouts only fire then.
|
|
(define
|
|
er-sched-fire-one-timeout!
|
|
(fn
|
|
()
|
|
(let
|
|
((ks (keys (er-sched-processes))) (fired (list false)))
|
|
(for-each
|
|
(fn
|
|
(k)
|
|
(when
|
|
(not (nth fired 0))
|
|
(let
|
|
((p (get (er-sched-processes) k)))
|
|
(when
|
|
(and
|
|
(= (get p :state) "waiting")
|
|
(get p :has-timeout))
|
|
(dict-set! p :timed-out true)
|
|
(dict-set! p :has-timeout false)
|
|
(dict-set! p :state "runnable")
|
|
(er-sched-enqueue! (get p :pid))
|
|
(set-nth! fired 0 true)))))
|
|
ks)
|
|
(nth fired 0))))
|
|
|
|
(define
|
|
er-sched-step!
|
|
(fn
|
|
(pid)
|
|
(er-sched-set-current! pid)
|
|
(er-proc-set! pid :state "running")
|
|
(let
|
|
((prev-k (er-proc-field pid :continuation))
|
|
(result-ref (list nil)))
|
|
(guard
|
|
(c
|
|
((er-suspended? c) (set-nth! result-ref 0 c))
|
|
((er-exited? c) (set-nth! result-ref 0 c)))
|
|
(set-nth!
|
|
result-ref
|
|
0
|
|
(if
|
|
(= prev-k nil)
|
|
(er-apply-fun (er-proc-field pid :initial-fun) (list))
|
|
(do (er-proc-set! pid :continuation nil) (prev-k nil)))))
|
|
(let
|
|
((r (nth result-ref 0)))
|
|
(cond
|
|
(er-suspended? r) nil
|
|
(er-exited? r)
|
|
(do
|
|
(er-proc-set! pid :state "dead")
|
|
(er-proc-set! pid :exit-reason (get r :reason))
|
|
(er-proc-set! pid :exit-result nil)
|
|
(er-proc-set! pid :continuation nil))
|
|
:else (do
|
|
(er-proc-set! pid :state "dead")
|
|
(er-proc-set! pid :exit-reason (er-mk-atom "normal"))
|
|
(er-proc-set! pid :exit-result r)
|
|
(er-proc-set! pid :continuation nil)))))
|
|
(er-sched-set-current! nil)))
|