erlang: send + selective receive via shift/reset (+13 tests)
Some checks failed
Test, Build, and Deploy / test-build-deploy (push) Has been cancelled
Some checks failed
Test, Build, and Deploy / test-build-deploy (push) Has been cancelled
This commit is contained in:
@@ -88,6 +88,26 @@
|
||||
(range h (len items)))
|
||||
out)))
|
||||
|
||||
;; Read the i'th entry (relative to head) without popping.
|
||||
(define
|
||||
er-q-nth
|
||||
(fn (q i) (nth (get q :items) (+ (get q :head-idx) i))))
|
||||
|
||||
;; Remove entry at logical index i, shift tail in.
|
||||
(define
|
||||
er-q-delete-at!
|
||||
(fn
|
||||
(q i)
|
||||
(let
|
||||
((h (get q :head-idx)) (items (get q :items)) (new (list)))
|
||||
(for-each
|
||||
(fn
|
||||
(j)
|
||||
(when (not (= j (+ h i))) (append! new (nth items j))))
|
||||
(range h (len items)))
|
||||
(dict-set! q :items new)
|
||||
(dict-set! q :head-idx 0))))
|
||||
|
||||
;; ── pids ─────────────────────────────────────────────────────────
|
||||
(define er-mk-pid (fn (id) {:id id :tag "pid"}))
|
||||
(define er-pid? (fn (v) (er-is-tagged? v "pid")))
|
||||
@@ -251,25 +271,57 @@
|
||||
(get proc :pid)))))
|
||||
|
||||
;; ── scheduler loop ──────────────────────────────────────────────
|
||||
;; Drain all runnable processes to completion. Synchronous — each
|
||||
;; spawned process runs its :initial-fun front-to-back with no yielding.
|
||||
;; receive-driven suspension arrives in the next roadmap step.
|
||||
;; Each process's entry runs inside a `reset`; `receive` uses `shift`
|
||||
;; to suspend (saving a continuation on the proc record). When a `!`
|
||||
;; delivers a message to a waiting process we re-enqueue it — the
|
||||
;; scheduler step invokes the saved continuation, which retries the
|
||||
;; receive against the updated mailbox.
|
||||
(define er-suspend-marker {:tag "er-suspend-marker"})
|
||||
|
||||
(define
|
||||
er-sched-drain!
|
||||
er-suspended?
|
||||
(fn
|
||||
(v)
|
||||
(and
|
||||
(= (type-of v) "dict")
|
||||
(= (get v :tag) "er-suspend-marker"))))
|
||||
|
||||
(define
|
||||
er-sched-run-all!
|
||||
(fn
|
||||
()
|
||||
(let
|
||||
((pid (er-sched-next-runnable!)))
|
||||
(when
|
||||
(not (= pid nil))
|
||||
(er-sched-set-current! pid)
|
||||
(er-proc-set! pid :state "running")
|
||||
(let
|
||||
((fv (er-proc-field pid :initial-fun)))
|
||||
(when
|
||||
(not (= fv nil))
|
||||
(er-apply-fun fv (list))))
|
||||
(er-proc-set! pid :state "dead")
|
||||
(er-proc-set! pid :exit-reason (er-mk-atom "normal"))
|
||||
(er-sched-set-current! nil)
|
||||
(er-sched-drain!)))))
|
||||
(er-sched-step! pid)
|
||||
(er-sched-run-all!)))))
|
||||
|
||||
(define
|
||||
er-sched-step!
|
||||
(fn
|
||||
(pid)
|
||||
(er-sched-set-current! pid)
|
||||
(er-proc-set! pid :state "running")
|
||||
(let
|
||||
((prev-k (er-proc-field pid :continuation))
|
||||
(result-ref (list nil)))
|
||||
(if
|
||||
(= prev-k nil)
|
||||
(set-nth!
|
||||
result-ref
|
||||
0
|
||||
(reset (er-apply-fun (er-proc-field pid :initial-fun) (list))))
|
||||
(do
|
||||
(er-proc-set! pid :continuation nil)
|
||||
(set-nth! result-ref 0 (prev-k nil))))
|
||||
(let
|
||||
((r (nth result-ref 0)))
|
||||
(cond
|
||||
(er-suspended? r) nil
|
||||
:else (do
|
||||
(er-proc-set! pid :state "dead")
|
||||
(er-proc-set! pid :exit-reason (er-mk-atom "normal"))
|
||||
(er-proc-set! pid :exit-result r)
|
||||
(er-proc-set! pid :continuation nil)))))
|
||||
(er-sched-set-current! nil)))
|
||||
|
||||
@@ -327,6 +327,48 @@
|
||||
(er-io-buffer-content))
|
||||
"true;true")
|
||||
|
||||
;; ── ! (send) + receive ──────────────────────────────────────────
|
||||
(er-eval-test "self-send + receive"
|
||||
(nm (ev "Me = self(), Me ! hello, receive Msg -> Msg end")) "hello")
|
||||
(er-eval-test "send returns msg"
|
||||
(nm (ev "Me = self(), Msg = Me ! ok, Me ! x, receive _ -> Msg end")) "ok")
|
||||
(er-eval-test "receive int"
|
||||
(ev "Me = self(), Me ! 42, receive N -> N + 1 end") 43)
|
||||
(er-eval-test "receive with pattern"
|
||||
(ev "Me = self(), Me ! {ok, 7}, receive {ok, V} -> V * 2 end") 14)
|
||||
(er-eval-test "receive with guard"
|
||||
(ev "Me = self(), Me ! 5, receive N when N > 0 -> positive end")
|
||||
(er-mk-atom "positive"))
|
||||
(er-eval-test "receive skips non-match"
|
||||
(nm (ev "Me = self(), Me ! wrong, Me ! right, receive right -> ok end"))
|
||||
"ok")
|
||||
(er-eval-test "receive selective leaves others"
|
||||
(nm (ev "Me = self(), Me ! a, Me ! b, receive b -> got_b end"))
|
||||
"got_b")
|
||||
(er-eval-test "two receives consume both"
|
||||
(ev "Me = self(), Me ! 1, Me ! 2, X = receive A -> A end, Y = receive B -> B end, X + Y") 3)
|
||||
|
||||
;; ── spawn + send + receive (real process communication) ─────────
|
||||
(er-eval-test "spawn sends back"
|
||||
(nm
|
||||
(ev "Me = self(), spawn(fun () -> Me ! pong end), receive pong -> got_pong end"))
|
||||
"got_pong")
|
||||
(er-eval-test "ping-pong"
|
||||
(do
|
||||
(er-io-flush!)
|
||||
(ev "Me = self(), Child = spawn(fun () -> receive {ping, From} -> From ! pong end end), Child ! {ping, Me}, receive pong -> io:format(\"pong~n\") end")
|
||||
(er-io-buffer-content))
|
||||
"pong\n")
|
||||
(er-eval-test "echo server"
|
||||
(ev "Me = self(), Echo = spawn(fun () -> receive {From, Msg} -> From ! Msg end end), Echo ! {Me, 99}, receive R -> R end") 99)
|
||||
|
||||
;; ── receive with multiple clauses ────────────────────────────────
|
||||
(er-eval-test "receive multi-clause"
|
||||
(nm (ev "Me = self(), Me ! foo, receive ok -> a; foo -> b; bar -> c end"))
|
||||
"b")
|
||||
(er-eval-test "receive nested tuple"
|
||||
(ev "Me = self(), Me ! {result, {ok, 42}}, receive {result, {ok, V}} -> V end") 42)
|
||||
|
||||
(define
|
||||
er-eval-test-summary
|
||||
(str "eval " er-eval-test-pass "/" er-eval-test-count))
|
||||
|
||||
@@ -64,20 +64,27 @@
|
||||
((body (er-parse-body st)))
|
||||
(er-sched-init!)
|
||||
(let
|
||||
((main (er-proc-new! (er-env-new))))
|
||||
(er-sched-next-runnable!)
|
||||
(er-sched-set-current! (get main :pid))
|
||||
(er-proc-set! (get main :pid) :state "running")
|
||||
((env (er-env-new)))
|
||||
(let
|
||||
((result (er-eval-body body (get main :env))))
|
||||
(er-proc-set! (get main :pid) :state "dead")
|
||||
(er-proc-set!
|
||||
(get main :pid)
|
||||
:exit-reason
|
||||
(er-mk-atom "normal"))
|
||||
(er-sched-set-current! nil)
|
||||
(er-sched-drain!)
|
||||
result))))))
|
||||
((main-fun
|
||||
(er-mk-fun
|
||||
(list
|
||||
{:patterns (list)
|
||||
:body body
|
||||
:guards (list)
|
||||
:name nil})
|
||||
env)))
|
||||
(let
|
||||
((main-proc (er-proc-new! env)))
|
||||
(dict-set! main-proc :initial-fun main-fun)
|
||||
(er-sched-run-all!)
|
||||
(let
|
||||
((main-pid (get main-proc :pid)))
|
||||
(if
|
||||
(not (= (er-proc-field main-pid :state) "dead"))
|
||||
(error
|
||||
"Erlang: deadlock — main process never terminated")
|
||||
(er-proc-field main-pid :exit-result))))))))))
|
||||
|
||||
(define
|
||||
er-eval-body
|
||||
@@ -113,6 +120,8 @@
|
||||
(= ty "case") (er-eval-case node env)
|
||||
(= ty "call") (er-eval-call node env)
|
||||
(= ty "fun") (er-eval-fun node env)
|
||||
(= ty "send") (er-eval-send node env)
|
||||
(= ty "receive") (er-eval-receive node env)
|
||||
(= ty "match") (er-eval-match node env)
|
||||
:else (error (str "Erlang eval: unsupported node type '" ty "'"))))))
|
||||
|
||||
@@ -917,3 +926,94 @@
|
||||
(append! out (er-format-value (nth elems i))))
|
||||
(range 1 (len elems)))
|
||||
(reduce str "" out)))))
|
||||
|
||||
;; ── send: Pid ! Msg ──────────────────────────────────────────────
|
||||
(define
|
||||
er-eval-send
|
||||
(fn
|
||||
(node env)
|
||||
(let
|
||||
((to-val (er-eval-expr (get node :to) env))
|
||||
(msg-val (er-eval-expr (get node :msg) env)))
|
||||
(if
|
||||
(not (er-pid? to-val))
|
||||
(error "Erlang: '!': target is not a pid")
|
||||
(do
|
||||
(when
|
||||
(er-proc-exists? to-val)
|
||||
(er-proc-mailbox-push! to-val msg-val)
|
||||
(when
|
||||
(= (er-proc-field to-val :state) "waiting")
|
||||
(er-proc-set! to-val :state "runnable")
|
||||
(er-sched-enqueue! to-val)))
|
||||
msg-val)))))
|
||||
|
||||
;; ── receive (selective, delimited-continuation suspension) ──────
|
||||
(define
|
||||
er-eval-receive
|
||||
(fn
|
||||
(node env)
|
||||
(let
|
||||
((pid (er-sched-current-pid)))
|
||||
(er-eval-receive-loop node pid env))))
|
||||
|
||||
(define
|
||||
er-eval-receive-loop
|
||||
(fn
|
||||
(node pid env)
|
||||
(let
|
||||
((r (er-try-receive (get node :clauses) pid env)))
|
||||
(if
|
||||
(get r :matched)
|
||||
(get r :value)
|
||||
(do
|
||||
(shift
|
||||
k
|
||||
(do
|
||||
(er-proc-set! pid :continuation k)
|
||||
(er-proc-set! pid :state "waiting")
|
||||
er-suspend-marker))
|
||||
(er-eval-receive-loop node pid env))))))
|
||||
|
||||
;; Scan mailbox in arrival order. For each msg, try every clause.
|
||||
;; On first match: remove that msg from mailbox and return body value.
|
||||
(define
|
||||
er-try-receive
|
||||
(fn
|
||||
(clauses pid env)
|
||||
(let
|
||||
((mbox (er-proc-field pid :mailbox)))
|
||||
(er-try-receive-loop clauses mbox env 0))))
|
||||
|
||||
(define
|
||||
er-try-receive-loop
|
||||
(fn
|
||||
(clauses mbox env i)
|
||||
(if
|
||||
(>= i (er-q-len mbox))
|
||||
{:matched false}
|
||||
(let
|
||||
((msg (er-q-nth mbox i))
|
||||
(cr (er-try-receive-clauses clauses msg env 0)))
|
||||
(if
|
||||
(get cr :matched)
|
||||
(do (er-q-delete-at! mbox i) cr)
|
||||
(er-try-receive-loop clauses mbox env (+ i 1)))))))
|
||||
|
||||
(define
|
||||
er-try-receive-clauses
|
||||
(fn
|
||||
(clauses msg env i)
|
||||
(if
|
||||
(>= i (len clauses))
|
||||
{:matched false}
|
||||
(let
|
||||
((c (nth clauses i)) (snap (er-env-copy env)))
|
||||
(if
|
||||
(and
|
||||
(er-match! (get c :pattern) msg env)
|
||||
(er-eval-guards (get c :guards) env))
|
||||
{:value (er-eval-body (get c :body) env) :matched true}
|
||||
(do
|
||||
(er-env-restore! env snap)
|
||||
(er-try-receive-clauses clauses msg env (+ i 1))))))))
|
||||
|
||||
Reference in New Issue
Block a user