;; Erlang runtime — scheduler, process records, mailbox queue. ;; Phase 3 foundation. spawn/send/receive build on these primitives. ;; ;; Scheduler is a single global dict in `er-scheduler` holding: ;; :next-pid INT — counter for fresh pid allocation ;; :processes DICT — pid-key (string) -> process record ;; :runnable QUEUE — FIFO of pids ready to run ;; :current PID — pid currently executing, or nil ;; ;; A pid value is tagged: {:tag "pid" :id INT}. Pids compare by id. ;; ;; Process record fields: ;; :pid — this process's pid ;; :mailbox — queue of received messages (arrival order) ;; :state — "runnable" | "running" | "waiting" | "exiting" | "dead" ;; :continuation — saved k (for receive suspension); nil otherwise ;; :receive-pats — patterns the process is blocked on; nil otherwise ;; :trap-exit — bool ;; :links — list of pids ;; :monitors — list of {:ref :pid} ;; :env — Erlang env at the last yield ;; :exit-reason — nil until the process exits ;; ;; Queue — amortised-O(1) FIFO with head-pointer + slab-compact: ;; {:items (list...) :head-idx INT} ;; ── queue ──────────────────────────────────────────────────────── (define er-q-new (fn () {:head-idx 0 :items (list)})) (define er-q-push! (fn (q x) (append! (get q :items) x))) (define er-q-pop! (fn (q) (let ((h (get q :head-idx)) (items (get q :items))) (if (>= h (len items)) nil (let ((x (nth items h))) (dict-set! q :head-idx (+ h 1)) (er-q-compact! q) x))))) (define er-q-peek (fn (q) (let ((h (get q :head-idx)) (items (get q :items))) (if (>= h (len items)) nil (nth items h))))) (define er-q-len (fn (q) (- (len (get q :items)) (get q :head-idx)))) (define er-q-empty? (fn (q) (= (er-q-len q) 0))) ;; Compact the backing list when the head pointer gets large so the ;; queue doesn't grow without bound. Threshold chosen to amortise the ;; O(n) copy — pops are still amortised O(1). (define er-q-compact! (fn (q) (let ((h (get q :head-idx)) (items (get q :items))) (when (> h 128) (let ((new (list))) (for-each (fn (i) (append! new (nth items i))) (range h (len items))) (dict-set! q :items new) (dict-set! q :head-idx 0)))))) (define er-q-to-list (fn (q) (let ((h (get q :head-idx)) (items (get q :items)) (out (list))) (for-each (fn (i) (append! out (nth items i))) (range h (len items))) out))) ;; Read the i'th entry (relative to head) without popping. (define er-q-nth (fn (q i) (nth (get q :items) (+ (get q :head-idx) i)))) ;; Remove entry at logical index i, shift tail in. (define er-q-delete-at! (fn (q i) (let ((h (get q :head-idx)) (items (get q :items)) (new (list))) (for-each (fn (j) (when (not (= j (+ h i))) (append! new (nth items j)))) (range h (len items))) (dict-set! q :items new) (dict-set! q :head-idx 0)))) ;; ── pids ───────────────────────────────────────────────────────── (define er-mk-pid (fn (id) {:id id :tag "pid"})) (define er-pid? (fn (v) (er-is-tagged? v "pid"))) (define er-pid-id (fn (pid) (get pid :id))) (define er-pid-key (fn (pid) (str "p" (er-pid-id pid)))) (define er-pid-equal? (fn (a b) (and (er-pid? a) (er-pid? b) (= (er-pid-id a) (er-pid-id b))))) ;; ── refs ───────────────────────────────────────────────────────── (define er-mk-ref (fn (id) {:id id :tag "ref"})) (define er-ref? (fn (v) (er-is-tagged? v "ref"))) (define er-ref-equal? (fn (a b) (and (er-ref? a) (er-ref? b) (= (get a :id) (get b :id))))) (define er-ref-new! (fn () (let ((s (er-sched))) (let ((n (get s :next-ref))) (dict-set! s :next-ref (+ n 1)) (er-mk-ref n))))) ;; ── scheduler state ────────────────────────────────────────────── (define er-scheduler (list nil)) (define er-sched-init! (fn () (set-nth! er-scheduler 0 {:next-pid 0 :next-ref 0 :current nil :processes {} :runnable (er-q-new)}))) (define er-sched (fn () (nth er-scheduler 0))) (define er-pid-new! (fn () (let ((s (er-sched))) (let ((n (get s :next-pid))) (dict-set! s :next-pid (+ n 1)) (er-mk-pid n))))) (define er-sched-runnable (fn () (get (er-sched) :runnable))) (define er-sched-processes (fn () (get (er-sched) :processes))) (define er-sched-enqueue! (fn (pid) (er-q-push! (er-sched-runnable) pid))) (define er-sched-next-runnable! (fn () (er-q-pop! (er-sched-runnable)))) (define er-sched-runnable-count (fn () (er-q-len (er-sched-runnable)))) (define er-sched-set-current! (fn (pid) (dict-set! (er-sched) :current pid))) (define er-sched-current-pid (fn () (get (er-sched) :current))) (define er-sched-process-count (fn () (len (keys (er-sched-processes))))) ;; ── process records ────────────────────────────────────────────── (define er-proc-new! (fn (env) (let ((pid (er-pid-new!))) (let ((proc {:pid pid :env env :links (list) :mailbox (er-q-new) :state "runnable" :monitors (list) :monitored-by (list) :continuation nil :receive-pats nil :trap-exit false :has-timeout false :timed-out false :exit-reason nil})) (dict-set! (er-sched-processes) (er-pid-key pid) proc) (er-sched-enqueue! pid) proc)))) (define er-proc-get (fn (pid) (get (er-sched-processes) (er-pid-key pid)))) (define er-proc-exists? (fn (pid) (dict-has? (er-sched-processes) (er-pid-key pid)))) (define er-proc-field (fn (pid field) (get (er-proc-get pid) field))) (define er-proc-set! (fn (pid field val) (let ((p (er-proc-get pid))) (if (= p nil) (error (str "Erlang: no such process " (er-pid-key pid))) (dict-set! p field val))))) (define er-proc-mailbox-push! (fn (pid msg) (er-q-push! (er-proc-field pid :mailbox) msg))) (define er-proc-mailbox-size (fn (pid) (er-q-len (er-proc-field pid :mailbox)))) ;; Main process is always pid 0 (scheduler starts with next-pid 0 and ;; erlang-eval-ast calls er-proc-new! first). Returns nil if no eval ;; has run. (define er-main-pid (fn () (er-mk-pid 0))) (define er-last-main-exit-reason (fn () (if (er-proc-exists? (er-main-pid)) (er-proc-field (er-main-pid) :exit-reason) nil))) ;; ── process BIFs ──────────────────────────────────────────────── (define er-bif-is-pid (fn (vs) (er-bool (er-pid? (er-bif-arg1 vs "is_pid"))))) (define er-bif-self (fn (vs) (if (not (= (len vs) 0)) (error "Erlang: self/0: arity") (let ((pid (er-sched-current-pid))) (if (= pid nil) (error "Erlang: self/0: no current process") pid))))) (define er-bif-spawn (fn (vs) (cond (= (len vs) 1) (er-spawn-fun (nth vs 0)) (= (len vs) 3) (error "Erlang: spawn/3: module-based spawn deferred to Phase 5 (modules)") :else (error "Erlang: spawn: wrong arity")))) (define er-spawn-fun (fn (fv) (if (not (er-fun? fv)) (error "Erlang: spawn/1: not a fun") (let ((proc (er-proc-new! (er-env-new)))) (dict-set! proc :initial-fun fv) (get proc :pid))))) (define er-bif-exit (fn (vs) (cond (= (len vs) 1) (raise (er-mk-exit-marker (nth vs 0))) (= (len vs) 2) (error "Erlang: exit/2 (signal another process) deferred to next Phase 4 step (signal propagation)") :else (error "Erlang: exit: wrong arity")))) ;; ── links / monitors / refs ───────────────────────────────────── (define er-bif-is-reference (fn (vs) (er-bool (er-ref? (er-bif-arg1 vs "is_reference"))))) (define er-bif-process-flag (fn (vs) (if (not (= (len vs) 2)) (error "Erlang: process_flag/2: arity") (let ((flag (nth vs 0)) (val (nth vs 1)) (me (er-sched-current-pid))) (cond (and (er-atom? flag) (= (get flag :name) "trap_exit")) (let ((old (er-proc-field me :trap-exit))) (er-proc-set! me :trap-exit (er-truthy? val)) (er-bool old)) :else (error (str "Erlang: process_flag: unsupported flag '" (er-format-value flag) "'"))))))) (define er-bif-make-ref (fn (vs) (if (not (= (len vs) 0)) (error "Erlang: make_ref/0: arity") (er-ref-new!)))) ;; Add `target` to `pid`'s :links list if not already there. (define er-link-add-one! (fn (pid target) (let ((links (er-proc-field pid :links))) (when (not (er-link-has? links target)) (append! links target))))) (define er-link-has? (fn (links target) (cond (= (len links) 0) false (er-pid-equal? (nth links 0) target) true :else (er-link-has? (er-slice-list links 1) target)))) (define er-link-remove-one! (fn (pid target) (let ((old (er-proc-field pid :links)) (out (list))) (for-each (fn (i) (let ((p (nth old i))) (when (not (er-pid-equal? p target)) (append! out p)))) (range 0 (len old))) (er-proc-set! pid :links out)))) (define er-bif-link (fn (vs) (let ((target (er-bif-arg1 vs "link")) (me (er-sched-current-pid))) (cond (not (er-pid? target)) (error "Erlang: link: not a pid") (er-pid-equal? target me) (er-mk-atom "true") (not (er-proc-exists? target)) (raise (er-mk-exit-marker (er-mk-atom "noproc"))) :else (do (er-link-add-one! me target) (er-link-add-one! target me) (er-mk-atom "true")))))) (define er-bif-unlink (fn (vs) (let ((target (er-bif-arg1 vs "unlink")) (me (er-sched-current-pid))) (cond (not (er-pid? target)) (error "Erlang: unlink: not a pid") :else (do (er-link-remove-one! me target) (when (er-proc-exists? target) (er-link-remove-one! target me)) (er-mk-atom "true")))))) (define er-bif-monitor (fn (vs) (if (not (= (len vs) 2)) (error "Erlang: monitor/2: arity") (let ((kind (nth vs 0)) (target (nth vs 1)) (me (er-sched-current-pid))) (cond (not (and (er-atom? kind) (= (get kind :name) "process"))) (error "Erlang: monitor: only 'process' supported") (not (er-pid? target)) (error "Erlang: monitor: not a pid") :else (let ((ref (er-ref-new!))) (append! (er-proc-field me :monitors) {:ref ref :pid target}) (when (er-proc-exists? target) (append! (er-proc-field target :monitored-by) {:from me :ref ref})) ref)))))) (define er-bif-demonitor (fn (vs) (let ((ref (er-bif-arg1 vs "demonitor")) (me (er-sched-current-pid))) (if (not (er-ref? ref)) (error "Erlang: demonitor: not a reference") (do (er-demonitor-purge! me ref) (er-mk-atom "true")))))) (define er-demonitor-purge! (fn (me ref) (let ((old (er-proc-field me :monitors)) (out (list)) (target-ref (list nil))) (for-each (fn (i) (let ((m (nth old i))) (if (er-ref-equal? (get m :ref) ref) (set-nth! target-ref 0 (get m :pid)) (append! out m)))) (range 0 (len old))) (er-proc-set! me :monitors out) (when (and (not (= (nth target-ref 0) nil)) (er-proc-exists? (nth target-ref 0))) (let ((target (nth target-ref 0)) (oldby (er-proc-field (nth target-ref 0) :monitored-by)) (out2 (list))) (for-each (fn (i) (let ((m (nth oldby i))) (when (not (er-ref-equal? (get m :ref) ref)) (append! out2 m)))) (range 0 (len oldby))) (er-proc-set! target :monitored-by out2)))))) ;; ── scheduler loop ────────────────────────────────────────────── ;; Each scheduler step wraps the process body in `guard`. `receive` ;; with no match captures a `call/cc` continuation onto the proc ;; record and then `raise`s `er-suspend-marker`; the guard catches ;; the raise and the scheduler moves on. `exit/1` raises an exit ;; marker the same way. Resumption from a saved continuation also ;; runs under a fresh `guard` so a resumed receive that needs to ;; suspend again has a handler to unwind to. `shift`/`reset` aren't ;; usable here because SX's captured delimited continuations don't ;; re-establish their own reset boundary when invoked — a second ;; suspension during replay raises "shift without enclosing reset". (define er-suspend-marker {:tag "er-suspend-marker"}) (define er-suspended? (fn (v) (and (= (type-of v) "dict") (= (get v :tag) "er-suspend-marker")))) (define er-exited? (fn (v) (and (= (type-of v) "dict") (= (get v :tag) "er-exit-marker")))) (define er-mk-exit-marker (fn (reason) {:tag "er-exit-marker" :reason reason})) (define er-mk-throw-marker (fn (reason) {:tag "er-throw-marker" :reason reason})) (define er-mk-error-marker (fn (reason) {:tag "er-error-marker" :reason reason})) (define er-thrown? (fn (v) (and (= (type-of v) "dict") (= (get v :tag) "er-throw-marker")))) (define er-errored? (fn (v) (and (= (type-of v) "dict") (= (get v :tag) "er-error-marker")))) (define er-sched-run-all! (fn () (let ((pid (er-sched-next-runnable!))) (cond (not (= pid nil)) (do (er-sched-step! pid) (er-sched-run-all!)) ;; Queue empty — fire one pending receive-with-timeout and go again. (er-sched-fire-one-timeout!) (er-sched-run-all!) :else nil)))) ;; Wake one waiting process whose receive had an `after Ms` clause. ;; Returns true if one fired. In our synchronous model "time passes" ;; once the runnable queue drains — timeouts only fire then. (define er-sched-fire-one-timeout! (fn () (let ((ks (keys (er-sched-processes))) (fired (list false))) (for-each (fn (k) (when (not (nth fired 0)) (let ((p (get (er-sched-processes) k))) (when (and (= (get p :state) "waiting") (get p :has-timeout)) (dict-set! p :timed-out true) (dict-set! p :has-timeout false) (dict-set! p :state "runnable") (er-sched-enqueue! (get p :pid)) (set-nth! fired 0 true))))) ks) (nth fired 0)))) (define er-sched-step! (fn (pid) (cond (= (er-proc-field pid :state) "dead") nil :else (er-sched-step-alive! pid)))) (define er-sched-step-alive! (fn (pid) (er-sched-set-current! pid) (er-proc-set! pid :state "running") (let ((prev-k (er-proc-field pid :continuation)) (result-ref (list nil))) (guard (c ((er-suspended? c) (set-nth! result-ref 0 c)) ((er-exited? c) (set-nth! result-ref 0 c)) ((er-thrown? c) (set-nth! result-ref 0 (er-mk-exit-marker (er-mk-tuple (list (er-mk-atom "nocatch") (get c :reason)))))) ((er-errored? c) (set-nth! result-ref 0 (er-mk-exit-marker (get c :reason))))) (set-nth! result-ref 0 (if (= prev-k nil) (er-apply-fun (er-proc-field pid :initial-fun) (list)) (do (er-proc-set! pid :continuation nil) (prev-k nil))))) (let ((r (nth result-ref 0))) (cond (er-suspended? r) nil (er-exited? r) (do (er-proc-set! pid :state "dead") (er-proc-set! pid :exit-reason (get r :reason)) (er-proc-set! pid :exit-result nil) (er-proc-set! pid :continuation nil) (er-propagate-exit! pid (get r :reason))) :else (do (er-proc-set! pid :state "dead") (er-proc-set! pid :exit-reason (er-mk-atom "normal")) (er-proc-set! pid :exit-result r) (er-proc-set! pid :continuation nil) (er-propagate-exit! pid (er-mk-atom "normal")))))) (er-sched-set-current! nil))) ;; ── exit-signal propagation ───────────────────────────────────── ;; Called when `pid` finishes (normally or via exit). Walks the ;; process's `:monitored-by` and `:links` lists to deliver `{'DOWN'}` ;; messages and exit signals respectively. Linked processes without ;; `trap_exit` cascade-die with the same reason; those with ;; `trap_exit` true receive an `{'EXIT', From, Reason}` message. (define er-propagate-exit! (fn (pid reason) (er-fire-monitors! pid reason) (er-fire-links! pid reason))) (define er-fire-monitors! (fn (pid reason) (let ((mons (er-proc-field pid :monitored-by))) (for-each (fn (i) (let ((m (nth mons i))) (let ((from (get m :from)) (ref (get m :ref))) (when (and (er-proc-exists? from) (not (= (er-proc-field from :state) "dead"))) (let ((msg (er-mk-tuple (list (er-mk-atom "DOWN") ref (er-mk-atom "process") pid reason)))) (er-proc-mailbox-push! from msg) (when (= (er-proc-field from :state) "waiting") (er-proc-set! from :state "runnable") (er-sched-enqueue! from))))))) (range 0 (len mons)))))) (define er-fire-links! (fn (pid reason) (let ((links (er-proc-field pid :links)) (is-normal (er-is-atom-named? reason "normal"))) (for-each (fn (i) (let ((target (nth links i))) (when (and (er-proc-exists? target) (not (= (er-proc-field target :state) "dead"))) (let ((trap (er-proc-field target :trap-exit))) (cond trap (er-deliver-exit-msg! target pid reason) is-normal nil :else (er-cascade-exit! target reason)))))) (range 0 (len links)))))) (define er-deliver-exit-msg! (fn (target from reason) (let ((msg (er-mk-tuple (list (er-mk-atom "EXIT") from reason)))) (er-proc-mailbox-push! target msg) (when (= (er-proc-field target :state) "waiting") (er-proc-set! target :state "runnable") (er-sched-enqueue! target))))) (define er-cascade-exit! (fn (target reason) (er-proc-set! target :state "dead") (er-proc-set! target :exit-reason reason) (er-proc-set! target :exit-result nil) (er-proc-set! target :continuation nil) (er-propagate-exit! target reason))) ;; ── module registry ───────────────────────────────────────────── ;; Global mutable dict from module name -> module env (which itself ;; binds each function name to a fun value capturing the same env, so ;; sibling functions can call each other recursively). (define er-modules (list {})) (define er-modules-get (fn () (nth er-modules 0))) (define er-modules-reset! (fn () (set-nth! er-modules 0 {}))) ;; Load an Erlang module declaration. Source must start with ;; `-module(Name).` and contain function definitions. Functions ;; sharing a name (different arities) get their clauses concatenated ;; into a single fun value — `er-apply-fun-clauses` already filters ;; by arity, so multi-arity dispatch falls out for free. (define erlang-load-module (fn (src) (let ((module-ast (er-parse-module src))) (let ((mod-name (get module-ast :name)) (functions (get module-ast :functions)) (mod-env (er-env-new)) (by-name {})) (for-each (fn (i) (let ((f (nth functions i))) (let ((name (get f :name)) (clauses (get f :clauses))) (if (dict-has? by-name name) (let ((existing (get by-name name))) (for-each (fn (j) (append! existing (nth clauses j))) (range 0 (len clauses)))) (let ((init (list))) (for-each (fn (j) (append! init (nth clauses j))) (range 0 (len clauses))) (dict-set! by-name name init)))))) (range 0 (len functions))) (for-each (fn (k) (let ((all-clauses (get by-name k))) (er-env-bind! mod-env k (er-mk-fun all-clauses mod-env)))) (keys by-name)) (dict-set! (er-modules-get) mod-name mod-env) (er-mk-atom mod-name))))) (define er-apply-user-module (fn (mod name vs) (let ((mod-env (get (er-modules-get) mod))) (if (not (dict-has? mod-env name)) (raise (er-mk-error-marker (er-mk-tuple (list (er-mk-atom "undef") (er-mk-atom mod) (er-mk-atom name))))) (er-apply-fun (get mod-env name) vs)))))