erlang: receive...after Ms timeout clause (+9 tests)
Some checks failed
Test, Build, and Deploy / test-build-deploy (push) Has been cancelled
Some checks failed
Test, Build, and Deploy / test-build-deploy (push) Has been cancelled
This commit is contained in:
@@ -193,6 +193,8 @@
|
||||
:continuation nil
|
||||
:receive-pats nil
|
||||
:trap-exit false
|
||||
:has-timeout false
|
||||
:timed-out false
|
||||
:exit-reason nil}))
|
||||
(dict-set! (er-sched-processes) (er-pid-key pid) proc)
|
||||
(er-sched-enqueue! pid)
|
||||
@@ -292,10 +294,40 @@
|
||||
()
|
||||
(let
|
||||
((pid (er-sched-next-runnable!)))
|
||||
(when
|
||||
(cond
|
||||
(not (= pid nil))
|
||||
(er-sched-step! pid)
|
||||
(er-sched-run-all!)))))
|
||||
(do (er-sched-step! pid) (er-sched-run-all!))
|
||||
;; Queue empty — fire one pending receive-with-timeout and go again.
|
||||
(er-sched-fire-one-timeout!) (er-sched-run-all!)
|
||||
:else nil))))
|
||||
|
||||
;; Wake one waiting process whose receive had an `after Ms` clause.
|
||||
;; Returns true if one fired. In our synchronous model "time passes"
|
||||
;; once the runnable queue drains — timeouts only fire then.
|
||||
(define
|
||||
er-sched-fire-one-timeout!
|
||||
(fn
|
||||
()
|
||||
(let
|
||||
((ks (keys (er-sched-processes))) (fired (list false)))
|
||||
(for-each
|
||||
(fn
|
||||
(k)
|
||||
(when
|
||||
(not (nth fired 0))
|
||||
(let
|
||||
((p (get (er-sched-processes) k)))
|
||||
(when
|
||||
(and
|
||||
(= (get p :state) "waiting")
|
||||
(get p :has-timeout))
|
||||
(dict-set! p :timed-out true)
|
||||
(dict-set! p :has-timeout false)
|
||||
(dict-set! p :state "runnable")
|
||||
(er-sched-enqueue! (get p :pid))
|
||||
(set-nth! fired 0 true)))))
|
||||
ks)
|
||||
(nth fired 0))))
|
||||
|
||||
(define
|
||||
er-sched-step!
|
||||
|
||||
@@ -369,6 +369,37 @@
|
||||
(er-eval-test "receive nested tuple"
|
||||
(ev "Me = self(), Me ! {result, {ok, 42}}, receive {result, {ok, V}} -> V end") 42)
|
||||
|
||||
;; ── receive ... after ... ───────────────────────────────────────
|
||||
(er-eval-test "after 0 empty mailbox"
|
||||
(nm (ev "receive _ -> got after 0 -> timeout end"))
|
||||
"timeout")
|
||||
(er-eval-test "after 0 match wins"
|
||||
(nm (ev "Me = self(), Me ! ok, receive ok -> got after 0 -> timeout end"))
|
||||
"got")
|
||||
(er-eval-test "after 0 non-match fires timeout"
|
||||
(nm (ev "Me = self(), Me ! wrong, receive right -> got after 0 -> timeout end"))
|
||||
"timeout")
|
||||
(er-eval-test "after 0 leaves non-match"
|
||||
(ev "Me = self(), Me ! wrong, receive right -> got after 0 -> to end, receive X -> X end")
|
||||
(er-mk-atom "wrong"))
|
||||
(er-eval-test "after Ms no sender — timeout fires"
|
||||
(nm (ev "receive _ -> got after 100 -> timed_out end"))
|
||||
"timed_out")
|
||||
(er-eval-test "after Ms with sender — match wins"
|
||||
(nm (ev "Me = self(), spawn(fun () -> Me ! hi end), receive hi -> got after 100 -> to end"))
|
||||
"got")
|
||||
(er-eval-test "after Ms computed"
|
||||
(nm (ev "Ms = 50, receive _ -> got after Ms -> done end"))
|
||||
"done")
|
||||
(er-eval-test "after 0 body side effect"
|
||||
(do (er-io-flush!)
|
||||
(ev "receive _ -> ok after 0 -> io:format(\"to~n\") end")
|
||||
(er-io-buffer-content))
|
||||
"to\n")
|
||||
(er-eval-test "after zero poll selective"
|
||||
(ev "Me = self(), Me ! first, Me ! second, X = receive second -> got_second after 0 -> to end, Y = receive first -> got_first after 0 -> to end, {X, Y}")
|
||||
(er-mk-tuple (list (er-mk-atom "got_second") (er-mk-atom "got_first"))))
|
||||
|
||||
(define
|
||||
er-eval-test-summary
|
||||
(str "eval " er-eval-test-pass "/" er-eval-test-count))
|
||||
|
||||
@@ -954,8 +954,12 @@
|
||||
(fn
|
||||
(node env)
|
||||
(let
|
||||
((pid (er-sched-current-pid)))
|
||||
(er-eval-receive-loop node pid env))))
|
||||
((pid (er-sched-current-pid))
|
||||
(after-node (get node :after-ms)))
|
||||
(if
|
||||
(= after-node nil)
|
||||
(er-eval-receive-loop node pid env)
|
||||
(er-eval-receive-with-after node pid env after-node)))))
|
||||
|
||||
(define
|
||||
er-eval-receive-loop
|
||||
@@ -975,6 +979,57 @@
|
||||
er-suspend-marker))
|
||||
(er-eval-receive-loop node pid env))))))
|
||||
|
||||
(define
|
||||
er-eval-receive-with-after
|
||||
(fn
|
||||
(node pid env after-node)
|
||||
(let
|
||||
((ms (er-eval-expr after-node env)))
|
||||
(cond
|
||||
(and (er-atom? ms) (= (get ms :name) "infinity"))
|
||||
(er-eval-receive-loop node pid env)
|
||||
(= ms 0) (er-eval-receive-poll node pid env)
|
||||
:else (er-eval-receive-timed node pid env)))))
|
||||
|
||||
;; after 0 — poll once; on no match, run the after-body immediately.
|
||||
(define
|
||||
er-eval-receive-poll
|
||||
(fn
|
||||
(node pid env)
|
||||
(let
|
||||
((r (er-try-receive (get node :clauses) pid env)))
|
||||
(if
|
||||
(get r :matched)
|
||||
(get r :value)
|
||||
(er-eval-body (get node :after-body) env)))))
|
||||
|
||||
;; after Ms — suspend; on resume check :timed-out. When the scheduler
|
||||
;; runs out of other work it fires one pending timeout per round.
|
||||
(define
|
||||
er-eval-receive-timed
|
||||
(fn
|
||||
(node pid env)
|
||||
(let
|
||||
((r (er-try-receive (get node :clauses) pid env)))
|
||||
(if
|
||||
(get r :matched)
|
||||
(get r :value)
|
||||
(do
|
||||
(er-proc-set! pid :has-timeout true)
|
||||
(shift
|
||||
k
|
||||
(do
|
||||
(er-proc-set! pid :continuation k)
|
||||
(er-proc-set! pid :state "waiting")
|
||||
er-suspend-marker))
|
||||
(if
|
||||
(er-proc-field pid :timed-out)
|
||||
(do
|
||||
(er-proc-set! pid :timed-out false)
|
||||
(er-proc-set! pid :has-timeout false)
|
||||
(er-eval-body (get node :after-body) env))
|
||||
(er-eval-receive-timed node pid env)))))))
|
||||
|
||||
;; Scan mailbox in arrival order. For each msg, try every clause.
|
||||
;; On first match: remove that msg from mailbox and return body value.
|
||||
(define
|
||||
|
||||
Reference in New Issue
Block a user