sx-http: OCaml 5 domain pool + per-request env safety
Domain pool with N workers (Domain.recommended_domain_count), mutex+ condition queue for request dispatch. Each domain has its own minor heap — GC pauses don't block other requests. expand-components? bound once at startup (always true in HTTP mode) instead of per-request mutation. Shell rendering uses native Sx_render.render_to_html for domain safety. Performance (warm, 2 worker domains, 2MB RSS): Homepage: 107-194ms TTFB (Quart: 202ms) — faster Geography: 199-306ms TTFB (Quart: 144ms) — close Reactive: 351-382ms TTFB (Quart: 187ms) — 2x slower Concurrent: 5.88 req/s at c=5 (Quart: 6.8 req/s) Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
This commit is contained in:
@@ -1467,18 +1467,12 @@ let http_render_page env path =
|
||||
] in
|
||||
let page_source = serialize_value full_ast in
|
||||
let t1 = Unix.gettimeofday () in
|
||||
(* Phase 1: aser — expand all components server-side *)
|
||||
let expand_fn = NativeFn ("expand-components?", fun _args -> Bool true) in
|
||||
ignore (env_bind env "expand-components?" expand_fn);
|
||||
(* Phase 1: aser — expand all components server-side.
|
||||
expand-components? is pre-bound at startup (always true in HTTP mode). *)
|
||||
let body_result =
|
||||
try
|
||||
let call = List [Symbol "aser"; List [Symbol "quote"; full_ast]; Env env] in
|
||||
Sx_ref.eval_expr call (Env env)
|
||||
with e ->
|
||||
Hashtbl.remove env.bindings (Sx_types.intern "expand-components?");
|
||||
raise e
|
||||
let call = List [Symbol "aser"; List [Symbol "quote"; full_ast]; Env env] in
|
||||
Sx_ref.eval_expr call (Env env)
|
||||
in
|
||||
Hashtbl.remove env.bindings (Sx_types.intern "expand-components?");
|
||||
let body_str = match body_result with
|
||||
| String s | SxExpr s -> s
|
||||
| _ -> serialize_value body_result
|
||||
@@ -1523,7 +1517,7 @@ let http_render_page env path =
|
||||
Keyword "meta-html"; String "";
|
||||
] in
|
||||
let shell_call = List (Symbol "~shared:shell/sx-page-shell" :: shell_args) in
|
||||
let html = sx_render_to_html shell_call env in
|
||||
let html = Sx_render.render_to_html shell_call env in
|
||||
let t4 = Unix.gettimeofday () in
|
||||
Printf.eprintf "[sx-http] %s route=%.3fs aser=%.3fs ssr=%.3fs shell=%.3fs total=%.3fs html=%d\n%!"
|
||||
path (t1 -. t0) (t2 -. t1) (t3 -. t2) (t4 -. t3) (t4 -. t0) (String.length html);
|
||||
@@ -1788,16 +1782,26 @@ let http_mode port =
|
||||
match args with
|
||||
| String code :: _ -> SxExpr (Printf.sprintf "(pre :class \"text-sm overflow-x-auto\" (code \"%s\"))" (escape_sx_string code))
|
||||
| _ -> Nil))));
|
||||
(* HTTP mode always expands components — bind once, shared across domains *)
|
||||
ignore (env_bind env "expand-components?" (NativeFn ("expand-components?", fun _args -> Bool true)));
|
||||
(* Inject shell statics *)
|
||||
http_inject_shell_statics env;
|
||||
(* Start TCP server *)
|
||||
let sock = Unix.socket Unix.PF_INET Unix.SOCK_STREAM 0 in
|
||||
Unix.setsockopt sock Unix.SO_REUSEADDR true;
|
||||
Unix.bind sock (Unix.ADDR_INET (Unix.inet_addr_any, port));
|
||||
Unix.listen sock 128;
|
||||
Printf.eprintf "[sx-http] Listening on port %d (project=%s)\n%!" port project_dir;
|
||||
while true do
|
||||
let (client, _addr) = Unix.accept sock in
|
||||
(* Write full response to a socket *)
|
||||
let write_response client response =
|
||||
let resp_bytes = Bytes.of_string response in
|
||||
let total = Bytes.length resp_bytes in
|
||||
let written = ref 0 in
|
||||
(try
|
||||
while !written < total do
|
||||
let n = Unix.write client resp_bytes !written (total - !written) in
|
||||
written := !written + n
|
||||
done
|
||||
with Unix.Unix_error _ -> ());
|
||||
(try Unix.close client with _ -> ())
|
||||
in
|
||||
|
||||
(* Handle one HTTP request *)
|
||||
let handle_client env client =
|
||||
let buf = Bytes.create 8192 in
|
||||
let n = try Unix.read client buf 0 8192 with _ -> 0 in
|
||||
if n > 0 then begin
|
||||
@@ -1824,18 +1828,65 @@ let http_mode port =
|
||||
Printf.eprintf "[sx-http] Error: %s\n%!" (Printexc.to_string e);
|
||||
http_response ~status:500 "<h1>Internal Server Error</h1>"
|
||||
in
|
||||
let resp_bytes = Bytes.of_string response in
|
||||
let total = Bytes.length resp_bytes in
|
||||
let written = ref 0 in
|
||||
(try
|
||||
while !written < total do
|
||||
let n = Unix.write client resp_bytes !written (total - !written) in
|
||||
written := !written + n
|
||||
done
|
||||
with Unix.Unix_error _ -> ());
|
||||
end;
|
||||
(try Unix.close client with _ -> ())
|
||||
done
|
||||
write_response client response
|
||||
end else
|
||||
(try Unix.close client with _ -> ())
|
||||
in
|
||||
|
||||
(* Domain pool — each domain has its own minor heap for GC isolation.
|
||||
Requests are dispatched round-robin to avoid GC pauses blocking others. *)
|
||||
let n_workers = max 1 (Domain.recommended_domain_count ()) in
|
||||
Printf.eprintf "[sx-http] Starting %d worker domains\n%!" n_workers;
|
||||
|
||||
(* Request queue: mutex + condition + list *)
|
||||
let queue : Unix.file_descr list ref = ref [] in
|
||||
let queue_mutex = Mutex.create () in
|
||||
let queue_cond = Condition.create () in
|
||||
let shutdown = ref false in
|
||||
|
||||
(* Worker loop — each domain pops from queue and handles requests *)
|
||||
let worker_fn _id () =
|
||||
while not !shutdown do
|
||||
let client =
|
||||
Mutex.lock queue_mutex;
|
||||
while !queue = [] && not !shutdown do
|
||||
Condition.wait queue_cond queue_mutex
|
||||
done;
|
||||
let c = match !queue with
|
||||
| fd :: rest -> queue := rest; Some fd
|
||||
| [] -> None
|
||||
in
|
||||
Mutex.unlock queue_mutex;
|
||||
c
|
||||
in
|
||||
match client with
|
||||
| Some fd -> handle_client env fd
|
||||
| None -> ()
|
||||
done
|
||||
in
|
||||
|
||||
(* Spawn worker domains *)
|
||||
let workers = Array.init n_workers (fun id ->
|
||||
Domain.spawn (worker_fn id)) in
|
||||
|
||||
(* Start TCP server — main domain accepts and enqueues *)
|
||||
let sock = Unix.socket Unix.PF_INET Unix.SOCK_STREAM 0 in
|
||||
Unix.setsockopt sock Unix.SO_REUSEADDR true;
|
||||
Unix.bind sock (Unix.ADDR_INET (Unix.inet_addr_any, port));
|
||||
Unix.listen sock 128;
|
||||
Printf.eprintf "[sx-http] Listening on port %d (%d workers, project=%s)\n%!" port n_workers project_dir;
|
||||
(try
|
||||
while true do
|
||||
let (client, _addr) = Unix.accept sock in
|
||||
Mutex.lock queue_mutex;
|
||||
queue := !queue @ [client];
|
||||
Condition.signal queue_cond;
|
||||
Mutex.unlock queue_mutex
|
||||
done
|
||||
with _ ->
|
||||
shutdown := true;
|
||||
Condition.broadcast queue_cond;
|
||||
Array.iter Domain.join workers)
|
||||
|
||||
|
||||
let () =
|
||||
|
||||
Reference in New Issue
Block a user