Step 5: CEK IO suspension + R7RS modules (define-library/import)
Third CEK phase "io-suspended": perform suspends evaluation, host resolves IO, cek-resume feeds result back. VM OP_PERFORM (opcode 112) enables JIT-compiled functions to suspend. VM→CEK→suspend chain propagates suspension across the JIT/CEK boundary via pending_cek. R7RS define-library creates isolated environments with export control. import checks the library registry and suspends for unknown libraries, enabling lazy on-demand loading. Import qualifiers: only, prefix. Server-side cek_run_with_io handles suspension by dispatching IO requests to the Python bridge and resuming. guard composes cleanly with perform for structured error recovery across IO boundaries. 2598/2598 tests (30 new: 15 core suspension, 3 JIT, 1 cross-boundary, 9 modules, 2 error handling). Zero regressions. Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
This commit is contained in:
@@ -1714,7 +1714,8 @@ PLATFORM_CEK_JS = '''
|
||||
CEK_FIXUPS_JS = '''
|
||||
// Override recursive cekRun with iterative loop (avoids stack overflow)
|
||||
cekRun = function(state) {
|
||||
while (!cekTerminal_p(state)) { state = cekStep(state); }
|
||||
while (!cekTerminal_p(state) && !cekSuspended_p(state)) { state = cekStep(state); }
|
||||
if (cekSuspended_p(state)) { throw new Error("IO suspension in non-IO context"); }
|
||||
return cekValue(state);
|
||||
};
|
||||
|
||||
|
||||
@@ -211,6 +211,8 @@ let make_test_env () =
|
||||
| [e; Keyword k; v] -> Sx_types.env_set (uw e) k v
|
||||
| _ -> raise (Eval_error "env-set!: expected env, key, value"));
|
||||
|
||||
bind "make-env" (fun _args -> Env (Sx_types.make_env ()));
|
||||
|
||||
bind "env-extend" (fun args ->
|
||||
match args with
|
||||
| [e] -> Env (Sx_types.env_extend (uw e))
|
||||
@@ -841,6 +843,60 @@ let make_test_env () =
|
||||
| [frame] -> Sx_ref.frame_type frame
|
||||
| _ -> raise (Eval_error "frame-type: expected 1 arg"));
|
||||
|
||||
(* IO suspension primitives — inline until retranspile *)
|
||||
let is_suspended state =
|
||||
match get_val state (String "phase") with String "io-suspended" -> true | _ -> false in
|
||||
let step_loop state =
|
||||
let s = ref state in
|
||||
while not (match Sx_ref.cek_terminal_p !s with Bool true -> true | _ -> false)
|
||||
&& not (is_suspended !s) do
|
||||
s := Sx_ref.cek_step !s
|
||||
done;
|
||||
!s in
|
||||
bind "cek-step-loop" (fun args ->
|
||||
match args with
|
||||
| [state] -> step_loop state
|
||||
| _ -> raise (Eval_error "cek-step-loop: expected 1 arg"));
|
||||
bind "cek-resume" (fun args ->
|
||||
match args with
|
||||
| [state; result] ->
|
||||
step_loop (Sx_ref.make_cek_value result (get_val state (String "env")) (get_val state (String "kont")))
|
||||
| _ -> raise (Eval_error "cek-resume: expected 2 args"));
|
||||
bind "cek-suspended?" (fun args ->
|
||||
match args with
|
||||
| [state] -> Bool (is_suspended state)
|
||||
| _ -> raise (Eval_error "cek-suspended?: expected 1 arg"));
|
||||
bind "cek-io-request" (fun args ->
|
||||
match args with
|
||||
| [state] -> get_val state (String "request")
|
||||
| _ -> raise (Eval_error "cek-io-request: expected 1 arg"));
|
||||
bind "make-cek-suspended" (fun args ->
|
||||
match args with
|
||||
| [req; env'; kont] ->
|
||||
let d = Hashtbl.create 4 in
|
||||
Hashtbl.replace d "phase" (String "io-suspended");
|
||||
Hashtbl.replace d "request" req;
|
||||
Hashtbl.replace d "env" env';
|
||||
Hashtbl.replace d "kont" kont;
|
||||
Dict d
|
||||
| _ -> raise (Eval_error "make-cek-suspended: expected 3 args"));
|
||||
|
||||
(* --- Library registry --- *)
|
||||
let lib_registry = Hashtbl.create 16 in
|
||||
ignore (Sx_types.env_bind env "*library-registry*" (Dict lib_registry));
|
||||
bind "library-loaded?" (fun args ->
|
||||
match args with
|
||||
| [spec] -> Sx_ref.library_loaded_p spec
|
||||
| _ -> raise (Eval_error "library-loaded?: expected 1 arg"));
|
||||
bind "library-exports" (fun args ->
|
||||
match args with
|
||||
| [spec] -> Sx_ref.library_exports spec
|
||||
| _ -> raise (Eval_error "library-exports: expected 1 arg"));
|
||||
bind "register-library" (fun args ->
|
||||
match args with
|
||||
| [spec; exports] -> Sx_ref.register_library spec exports
|
||||
| _ -> raise (Eval_error "register-library: expected 2 args"));
|
||||
|
||||
(* --- Strict mode --- *)
|
||||
(* *strict* is a plain value in the env, mutated via env_set by set-strict! *)
|
||||
ignore (Sx_types.env_bind env "*strict*" (Bool false));
|
||||
|
||||
@@ -281,6 +281,43 @@ let flush_batched_io result_str =
|
||||
!final
|
||||
end
|
||||
|
||||
(** IO-aware CEK run — handles suspension by dispatching IO requests.
|
||||
When the CEK machine suspends with a perform, this function sends
|
||||
the IO request to the Python bridge, resumes with the response,
|
||||
and repeats until evaluation completes. *)
|
||||
let cek_run_with_io state =
|
||||
let s = ref state in
|
||||
let is_terminal s = match Sx_ref.cek_terminal_p s with Bool true -> true | _ -> false in
|
||||
let is_suspended s = match Sx_runtime.get_val s (String "phase") with String "io-suspended" -> true | _ -> false in
|
||||
let rec loop () =
|
||||
while not (is_terminal !s) && not (is_suspended !s) do
|
||||
s := Sx_ref.cek_step !s
|
||||
done;
|
||||
if is_suspended !s then begin
|
||||
let request = Sx_runtime.get_val !s (String "request") in
|
||||
let op = match Sx_runtime.get_val request (String "op") with String s -> s | _ -> "" in
|
||||
(* Extract args based on operation type *)
|
||||
let args = match op with
|
||||
| "import" ->
|
||||
let lib = Sx_runtime.get_val request (String "library") in
|
||||
[String "import"; lib]
|
||||
| _ ->
|
||||
let a = Sx_runtime.get_val request (String "args") in
|
||||
(match a with List l -> l | _ -> [a])
|
||||
in
|
||||
let response = io_request op args in
|
||||
s := Sx_ref.cek_resume !s response;
|
||||
loop ()
|
||||
end else
|
||||
Sx_ref.cek_value !s
|
||||
in
|
||||
loop ()
|
||||
|
||||
(** IO-aware eval_expr — like eval_expr but handles IO suspension. *)
|
||||
let _eval_expr_io expr env =
|
||||
let state = Sx_ref.make_cek_state expr env (List []) in
|
||||
cek_run_with_io state
|
||||
|
||||
(** Bind IO primitives into the environment. *)
|
||||
let setup_io_env env =
|
||||
let bind name fn =
|
||||
@@ -798,6 +835,21 @@ let _jit_compiling = ref false (* re-entrancy guard *)
|
||||
|
||||
let _jit_warned : (string, bool) Hashtbl.t = Hashtbl.create 16
|
||||
|
||||
let rec make_vm_suspend_marker request saved_vm =
|
||||
let d = Hashtbl.create 3 in
|
||||
Hashtbl.replace d "__vm_suspended" (Bool true);
|
||||
Hashtbl.replace d "request" request;
|
||||
(* Create a resume function that continues this specific VM.
|
||||
May raise VmSuspended again — caller must handle. *)
|
||||
Hashtbl.replace d "resume" (NativeFn ("vm-resume", fun args ->
|
||||
match args with
|
||||
| [result] ->
|
||||
(try Sx_vm.resume_vm saved_vm result
|
||||
with Sx_vm.VmSuspended (req2, vm2) ->
|
||||
make_vm_suspend_marker req2 vm2)
|
||||
| _ -> raise (Eval_error "vm-resume: expected 1 arg")));
|
||||
Dict d
|
||||
|
||||
let register_jit_hook env =
|
||||
Sx_runtime._jit_try_call_fn := Some (fun f args ->
|
||||
match f with
|
||||
@@ -807,7 +859,10 @@ let register_jit_hook env =
|
||||
(* Cached bytecode — run on VM, fall back to CEK on runtime error.
|
||||
Log once per function name, then stay quiet. Don't disable. *)
|
||||
(try Some (Sx_vm.call_closure cl args cl.vm_env_ref)
|
||||
with e ->
|
||||
with
|
||||
| Sx_vm.VmSuspended (request, saved_vm) ->
|
||||
Some (make_vm_suspend_marker request saved_vm)
|
||||
| e ->
|
||||
let fn_name = match l.l_name with Some n -> n | None -> "?" in
|
||||
if not (Hashtbl.mem _jit_warned fn_name) then begin
|
||||
Hashtbl.replace _jit_warned fn_name true;
|
||||
@@ -832,7 +887,10 @@ let register_jit_hook env =
|
||||
| Some cl ->
|
||||
l.l_compiled <- Some cl;
|
||||
(try Some (Sx_vm.call_closure cl args cl.vm_env_ref)
|
||||
with e ->
|
||||
with
|
||||
| Sx_vm.VmSuspended (request, saved_vm) ->
|
||||
Some (make_vm_suspend_marker request saved_vm)
|
||||
| e ->
|
||||
Printf.eprintf "[jit] %s first-call fallback to CEK: %s\n%!" fn_name (Printexc.to_string e);
|
||||
Hashtbl.replace _jit_warned fn_name true;
|
||||
None)
|
||||
|
||||
@@ -74,10 +74,13 @@ let () = Sx_primitives._sx_trampoline_fn := !trampoline_fn
|
||||
let cek_run_iterative state =
|
||||
let s = ref state in
|
||||
(try
|
||||
while not (match cek_terminal_p !s with Bool true -> true | _ -> false) do
|
||||
while not (match cek_terminal_p !s with Bool true -> true | _ -> false)
|
||||
&& not (match cek_suspended_p !s with Bool true -> true | _ -> false) do
|
||||
s := cek_step !s
|
||||
done;
|
||||
cek_value !s
|
||||
(match cek_suspended_p !s with
|
||||
| Bool true -> raise (Eval_error "IO suspension in non-IO context")
|
||||
| _ -> cek_value !s)
|
||||
with Eval_error msg ->
|
||||
_last_error_kont_ref := cek_kont !s;
|
||||
raise (Eval_error msg))
|
||||
|
||||
File diff suppressed because one or more lines are too long
@@ -26,8 +26,14 @@ type vm = {
|
||||
mutable sp : int;
|
||||
mutable frames : frame list;
|
||||
globals : (string, value) Hashtbl.t; (* live reference to kernel env *)
|
||||
mutable pending_cek : value option; (* suspended CEK state from Component/Lambda call *)
|
||||
}
|
||||
|
||||
(** Raised when OP_PERFORM is executed. Carries the IO request dict
|
||||
and a reference to the VM (which is in a resumable state:
|
||||
ip past OP_PERFORM, stack ready for a result push). *)
|
||||
exception VmSuspended of value * vm
|
||||
|
||||
(** Forward reference for JIT compilation — set after definition. *)
|
||||
let jit_compile_ref : (lambda -> (string, value) Hashtbl.t -> vm_closure option) ref =
|
||||
ref (fun _ _ -> None)
|
||||
@@ -49,7 +55,7 @@ let is_jit_failed cl = cl.vm_code.vc_arity = -1
|
||||
let _active_vm : vm option ref = ref None
|
||||
|
||||
let create globals =
|
||||
{ stack = Array.make 4096 Nil; sp = 0; frames = []; globals }
|
||||
{ stack = Array.make 4096 Nil; sp = 0; frames = []; globals; pending_cek = None }
|
||||
|
||||
(** Stack ops — inlined for speed. *)
|
||||
let push vm v =
|
||||
@@ -128,6 +134,19 @@ let code_from_value v =
|
||||
{ vc_arity = arity; vc_locals = arity + 16; vc_bytecode = bc_list; vc_constants = constants }
|
||||
| _ -> { vc_arity = 0; vc_locals = 16; vc_bytecode = [||]; vc_constants = [||] }
|
||||
|
||||
(** Call an SX value via CEK, detecting suspension instead of erroring.
|
||||
Returns the result value, or raises VmSuspended if CEK suspends.
|
||||
Saves the suspended CEK state in vm.pending_cek for later resume. *)
|
||||
let cek_call_or_suspend vm f args =
|
||||
let a = match args with Nil -> [] | List l -> l | _ -> [args] in
|
||||
let state = Sx_ref.continue_with_call f (List a) (Env (Sx_types.make_env ())) (List a) (List []) in
|
||||
let final = Sx_ref.cek_step_loop state in
|
||||
match Sx_runtime.get_val final (String "phase") with
|
||||
| String "io-suspended" ->
|
||||
vm.pending_cek <- Some final;
|
||||
raise (VmSuspended (Sx_runtime.get_val final (String "request"), vm))
|
||||
| _ -> Sx_ref.cek_value final
|
||||
|
||||
(** Execute a closure with arguments — creates a fresh VM.
|
||||
Used for entry points: JIT Lambda calls, module execution, cross-boundary. *)
|
||||
let rec call_closure cl args globals =
|
||||
@@ -165,12 +184,11 @@ and vm_call vm f args =
|
||||
not the caller's globals. Closure vars were merged at compile time. *)
|
||||
(try push vm (call_closure cl args cl.vm_env_ref)
|
||||
with _e ->
|
||||
(* Fallback to CEK — data-dependent error, not a JIT bug.
|
||||
Dedup logging happens in register_jit_hook. *)
|
||||
push vm (Sx_ref.cek_call f (List args)))
|
||||
(* Fallback to CEK — suspension-aware *)
|
||||
push vm (cek_call_or_suspend vm f (List args)))
|
||||
| Some _ ->
|
||||
(* Compile failed — CEK *)
|
||||
push vm (Sx_ref.cek_call f (List args))
|
||||
(* Compile failed — CEK, suspension-aware *)
|
||||
push vm (cek_call_or_suspend vm f (List args))
|
||||
| None ->
|
||||
if l.l_name <> None
|
||||
then begin
|
||||
@@ -180,17 +198,16 @@ and vm_call vm f args =
|
||||
| Some cl ->
|
||||
l.l_compiled <- Some cl;
|
||||
(try push vm (call_closure cl args cl.vm_env_ref)
|
||||
with _e -> push vm (Sx_ref.cek_call f (List args)))
|
||||
with _e -> push vm (cek_call_or_suspend vm f (List args)))
|
||||
| None ->
|
||||
push vm (Sx_ref.cek_call f (List args))
|
||||
push vm (cek_call_or_suspend vm f (List args))
|
||||
end
|
||||
else
|
||||
push vm (Sx_ref.cek_call f (List args)))
|
||||
push vm (cek_call_or_suspend vm f (List args)))
|
||||
| Component _ | Island _ ->
|
||||
(* Components use keyword-arg parsing — CEK handles this *)
|
||||
(* Components use keyword-arg parsing — CEK handles this, suspension-aware *)
|
||||
incr _vm_cek_count;
|
||||
let result = Sx_ref.cek_call f (List args) in
|
||||
push vm result
|
||||
push vm (cek_call_or_suspend vm f (List args))
|
||||
| _ ->
|
||||
raise (Eval_error ("VM: not callable: " ^ Sx_runtime.value_to_str f))
|
||||
|
||||
@@ -534,6 +551,11 @@ and run vm =
|
||||
| Number x -> Number (x -. 1.0)
|
||||
| _ -> (Hashtbl.find Sx_primitives.primitives "dec") [v])
|
||||
|
||||
(* ---- IO Suspension ---- *)
|
||||
| 112 (* OP_PERFORM *) ->
|
||||
let request = pop vm in
|
||||
raise (VmSuspended (request, vm))
|
||||
|
||||
| opcode ->
|
||||
raise (Eval_error (Printf.sprintf "VM: unknown opcode %d at ip=%d"
|
||||
opcode (frame.ip - 1)))
|
||||
@@ -546,6 +568,26 @@ and run vm =
|
||||
end
|
||||
done
|
||||
|
||||
(** Resume a suspended VM by pushing the IO result and continuing.
|
||||
May raise VmSuspended again if the VM hits another OP_PERFORM. *)
|
||||
let resume_vm vm result =
|
||||
(match vm.pending_cek with
|
||||
| Some cek_state ->
|
||||
(* Resume the suspended CEK evaluation first *)
|
||||
vm.pending_cek <- None;
|
||||
let final = Sx_ref.cek_resume cek_state result in
|
||||
(match Sx_runtime.get_val final (String "phase") with
|
||||
| String "io-suspended" ->
|
||||
(* CEK suspended again — re-suspend the VM *)
|
||||
vm.pending_cek <- Some final;
|
||||
raise (VmSuspended (Sx_runtime.get_val final (String "request"), vm))
|
||||
| _ ->
|
||||
push vm (Sx_ref.cek_value final))
|
||||
| None ->
|
||||
push vm result);
|
||||
run vm;
|
||||
pop vm
|
||||
|
||||
(** Execute a compiled module (top-level bytecode). *)
|
||||
let execute_module code globals =
|
||||
let cl = { vm_code = code; vm_upvalues = [||]; vm_name = Some "module"; vm_env_ref = globals; vm_closure_env = None } in
|
||||
|
||||
@@ -215,6 +215,15 @@
|
||||
"for-each-indexed"
|
||||
"cek-call"
|
||||
"cek-run"
|
||||
"cek-step-loop"
|
||||
"cek-resume"
|
||||
"cek-suspended?"
|
||||
"cek-io-request"
|
||||
"make-cek-suspended"
|
||||
"library-name-key"
|
||||
"library-loaded?"
|
||||
"library-exports"
|
||||
"register-library"
|
||||
"sx-call"
|
||||
"sx-apply"
|
||||
"collect!"
|
||||
|
||||
Reference in New Issue
Block a user