Step 17: streaming render — chunked transfer, shell-first suspense, resolve scripts
Server (sx_server.ml): - Chunked HTTP transport (Transfer-Encoding: chunked) - Streaming page detection via scan_defpages (:stream true) - Shell-first render: outer layout + shell AST → aser → SSR → flush - Data resolution: evaluate :data, render :content per slot, flush __sxResolve scripts - AJAX streaming: synchronous eval + OOB swaps for SPA navigation - SX URL → flat path conversion for defpage matching - Error boundaries per resolve section - streaming-demo-data helper for the demo page Client (sx-platform.js): - Sx.resolveSuspense: finds [data-suspense] element, parses SX, renders to DOM - Fallback define for resolve-suspense when boot.sx imports fail in WASM - __sxPending drain on boot (queued resolves from before sx.js loads) - __sxResolve direct dispatch after boot Tests (streaming.spec.js): - 5 sandbox tests using real WASM kernel - Suspense placeholder rendering, __sxResolve replacement, independent slot resolution - Full layout with gutters, end-to-end resolve with streaming-demo/chunk components Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
This commit is contained in:
@@ -149,6 +149,8 @@ let _scope_stacks = Sx_primitives._scope_stacks
|
||||
(* Populated from __app-config dict after SX files load. *)
|
||||
let _app_config : (string, value) Hashtbl.t option ref = ref None
|
||||
let _defpage_paths : string list ref = ref []
|
||||
(* Streaming pages: path → page name, for pages with :stream true *)
|
||||
let _streaming_pages : (string, string) Hashtbl.t = Hashtbl.create 8
|
||||
|
||||
let get_app_config key default =
|
||||
match !_app_config with
|
||||
@@ -1740,6 +1742,31 @@ let http_response ?(status=200) ?(content_type="text/html; charset=utf-8") body
|
||||
let http_redirect url =
|
||||
Printf.sprintf "HTTP/1.1 301 Moved Permanently\r\nLocation: %s\r\nContent-Length: 0\r\nConnection: keep-alive\r\n\r\n" url
|
||||
|
||||
(* Chunked transfer encoding helpers for streaming responses *)
|
||||
let http_chunked_header ?(status=200) ?(content_type="text/html; charset=utf-8") () =
|
||||
let status_text = match status with
|
||||
| 200 -> "OK" | 404 -> "Not Found" | 500 -> "Internal Server Error" | _ -> "Unknown" in
|
||||
Printf.sprintf "HTTP/1.1 %d %s\r\nContent-Type: %s\r\nTransfer-Encoding: chunked\r\nConnection: keep-alive\r\n\r\n"
|
||||
status status_text content_type
|
||||
|
||||
let write_chunk fd data =
|
||||
if String.length data > 0 then begin
|
||||
let chunk = Printf.sprintf "%x\r\n%s\r\n" (String.length data) data in
|
||||
let bytes = Bytes.of_string chunk in
|
||||
let total = Bytes.length bytes in
|
||||
let written = ref 0 in
|
||||
(try
|
||||
while !written < total do
|
||||
let n = Unix.write fd bytes !written (total - !written) in
|
||||
written := !written + n
|
||||
done
|
||||
with Unix.Unix_error _ -> ())
|
||||
end
|
||||
|
||||
let end_chunked fd =
|
||||
(try ignore (Unix.write_substring fd "0\r\n\r\n" 0 5) with Unix.Unix_error _ -> ());
|
||||
(try Unix.close fd with _ -> ())
|
||||
|
||||
let parse_http_request data =
|
||||
match String.index_opt data '\r' with
|
||||
| None -> (match String.index_opt data '\n' with
|
||||
@@ -1900,6 +1927,194 @@ let http_render_page env path headers =
|
||||
end
|
||||
end
|
||||
|
||||
(* JSON-encode a string for use in __sxResolve script tags *)
|
||||
let json_encode_string s =
|
||||
let buf = Buffer.create (String.length s + 16) in
|
||||
Buffer.add_char buf '"';
|
||||
String.iter (fun c -> match c with
|
||||
| '"' -> Buffer.add_string buf "\\\""
|
||||
| '\\' -> Buffer.add_string buf "\\\\"
|
||||
| '\n' -> Buffer.add_string buf "\\n"
|
||||
| '\r' -> Buffer.add_string buf "\\r"
|
||||
| '\t' -> Buffer.add_string buf "\\t"
|
||||
| c when Char.code c < 0x20 ->
|
||||
Buffer.add_string buf (Printf.sprintf "\\u%04x" (Char.code c))
|
||||
| c -> Buffer.add_char buf c
|
||||
) s;
|
||||
Buffer.add_char buf '"';
|
||||
Buffer.contents buf
|
||||
|
||||
(* Bootstrap script that queues resolves arriving before sx.js loads.
|
||||
Must match _SX_STREAMING_BOOTSTRAP in shared/sx/helpers.py *)
|
||||
let _sx_streaming_bootstrap =
|
||||
"<script>window.__sxPending=[];window.__sxResolve=function(i,s){\
|
||||
if(window.Sx&&Sx.resolveSuspense){Sx.resolveSuspense(i,s)}\
|
||||
else{window.__sxPending.push({id:i,sx:s})}}</script>"
|
||||
|
||||
(* Build a resolve script tag. Must match _SX_STREAMING_RESOLVE in helpers.py *)
|
||||
let sx_streaming_resolve_script id sx_source =
|
||||
Printf.sprintf "<script>window.__sxResolve&&window.__sxResolve(%s,%s)</script>"
|
||||
(json_encode_string id) (json_encode_string sx_source)
|
||||
|
||||
(* ====================================================================== *)
|
||||
(* Streaming page render — shell-first with chunked transfer encoding *)
|
||||
(* ====================================================================== *)
|
||||
|
||||
let http_render_page_streaming env path _headers fd page_name =
|
||||
let t0 = Unix.gettimeofday () in
|
||||
let page_def = try
|
||||
match env_get env ("page:" ^ page_name) with Dict d -> d | _ -> raise Not_found
|
||||
with _ ->
|
||||
Printf.eprintf "[sx-stream] page def not found: page:%s\n%!" page_name;
|
||||
let err = http_response ~status:500 "<h1>Streaming page def not found</h1>" in
|
||||
let bytes = Bytes.of_string err in
|
||||
(try ignore (Unix.write fd bytes 0 (Bytes.length bytes)) with _ -> ());
|
||||
(try Unix.close fd with _ -> ());
|
||||
raise Exit
|
||||
in
|
||||
|
||||
(* Extract streaming fields from page def *)
|
||||
let shell_ast = match Hashtbl.find_opt page_def "shell" with Some v -> v | None -> Nil in
|
||||
let data_ast = match Hashtbl.find_opt page_def "data" with Some v -> v | None -> Nil in
|
||||
let content_ast = match Hashtbl.find_opt page_def "content" with Some v -> v | None -> Nil in
|
||||
|
||||
(* Phase 1: Evaluate shell AST — contains ~suspense placeholders with fallbacks.
|
||||
The :shell expression already includes the inner layout (e.g. ~layouts/doc),
|
||||
so we only wrap in the outer layout (~shared:layout/app-body) for gutters.
|
||||
NO inner layout wrapping — shell_ast already has it. *)
|
||||
let shell_html = try
|
||||
let outer_layout = get_app_str "outer-layout" "~shared:layout/app-body" in
|
||||
let full_ast = List [Symbol outer_layout; Keyword "content"; shell_ast] in
|
||||
let page_source = serialize_value full_ast in
|
||||
(* aser → SSR *)
|
||||
let body_result =
|
||||
let call = List [Symbol "aser"; List [Symbol "quote"; full_ast]; Env env] in
|
||||
Sx_ref.eval_expr call (Env env) in
|
||||
let body_str = match body_result with
|
||||
| String s | SxExpr s -> s | _ -> serialize_value body_result in
|
||||
let body_html = try
|
||||
let body_expr = match Sx_parser.parse_all body_str with
|
||||
| [e] -> e | [] -> Nil | es -> List (Symbol "<>" :: es) in
|
||||
if env_has env "render-to-html" then
|
||||
let render_call = List [Symbol "render-to-html";
|
||||
List [Symbol "quote"; body_expr]; Env env] in
|
||||
(match Sx_ref.eval_expr render_call (Env env) with
|
||||
| String s | RawHTML s -> s | v -> Sx_runtime.value_to_str v)
|
||||
else Sx_render.sx_render_to_html env body_expr env
|
||||
with e -> Printf.eprintf "[sx-stream] SSR failed: %s\n%!" (Printexc.to_string e); "" in
|
||||
|
||||
(* Build full page shell with body HTML *)
|
||||
let get_shell name = try env_get env ("__shell-" ^ name) with _ -> Nil in
|
||||
let shell_args = [
|
||||
Keyword "title"; String (get_app_str "title" "SX"); Keyword "csrf"; String "";
|
||||
Keyword "page-sx"; String page_source;
|
||||
Keyword "body-html"; String body_html;
|
||||
Keyword "component-defs"; get_shell "component-defs";
|
||||
Keyword "component-hash"; get_shell "component-hash";
|
||||
Keyword "pages-sx"; get_shell "pages-sx";
|
||||
Keyword "sx-css"; get_shell "sx-css";
|
||||
Keyword "asset-url"; get_shell "asset-url";
|
||||
Keyword "wasm-hash"; get_shell "wasm-hash";
|
||||
Keyword "platform-hash"; get_shell "platform-hash";
|
||||
Keyword "sxbc-hash"; get_shell "sxbc-hash";
|
||||
Keyword "inline-css"; get_shell "inline-css";
|
||||
Keyword "inline-head-js"; get_shell "inline-head-js";
|
||||
Keyword "init-sx"; get_shell "init-sx";
|
||||
Keyword "meta-html"; String "";
|
||||
] in
|
||||
let shell_sym = get_app_str "shell" "~shared:shell/sx-page-shell" in
|
||||
let shell_call = List (Symbol shell_sym :: shell_args) in
|
||||
if env_has env "render-to-html" then
|
||||
let render_call = List [Symbol "render-to-html";
|
||||
List [Symbol "quote"; shell_call]; Env env] in
|
||||
(match Sx_ref.eval_expr render_call (Env env) with
|
||||
| String s | RawHTML s -> s | v -> Sx_runtime.value_to_str v)
|
||||
else Sx_render.sx_render_to_html env shell_call env
|
||||
with e ->
|
||||
Printf.eprintf "[sx-stream] shell render failed: %s\n%!" (Printexc.to_string e);
|
||||
"<html><body><h1>Streaming shell render failed</h1></body></html>"
|
||||
in
|
||||
let t1 = Unix.gettimeofday () in
|
||||
|
||||
(* Phase 2: Send chunked header + shell HTML *)
|
||||
let header = http_chunked_header () in
|
||||
let header_bytes = Bytes.of_string header in
|
||||
(try ignore (Unix.write fd header_bytes 0 (Bytes.length header_bytes)) with _ -> ());
|
||||
write_chunk fd shell_html;
|
||||
(* Bootstrap resolve script — must come after shell so suspense elements exist *)
|
||||
write_chunk fd _sx_streaming_bootstrap;
|
||||
let t2 = Unix.gettimeofday () in
|
||||
|
||||
(* Phase 3: Evaluate :data, render :content, flush resolve scripts *)
|
||||
let resolve_count = ref 0 in
|
||||
if data_ast <> Nil && content_ast <> Nil then begin
|
||||
(try
|
||||
let data_result = Sx_ref.eval_expr data_ast (Env env) in
|
||||
let t3_data = Unix.gettimeofday () in
|
||||
|
||||
(* Determine single-stream vs multi-stream *)
|
||||
let data_items = match data_result with
|
||||
| Dict _ -> [(data_result, "stream-content")]
|
||||
| List items ->
|
||||
List.map (fun item ->
|
||||
let stream_id = match item with
|
||||
| Dict d -> (match Hashtbl.find_opt d "stream-id" with
|
||||
| Some (String s) -> s | _ -> "stream-content")
|
||||
| _ -> "stream-content" in
|
||||
(item, stream_id)
|
||||
) items
|
||||
| _ ->
|
||||
Printf.eprintf "[sx-stream] :data returned %s, expected dict or list\n%!"
|
||||
(Sx_runtime.type_of data_result |> Sx_runtime.value_to_str);
|
||||
[]
|
||||
in
|
||||
|
||||
(* For each data item, bind values and render :content *)
|
||||
List.iter (fun (item, stream_id) ->
|
||||
(try
|
||||
(* Create fresh env with data bindings *)
|
||||
let content_env = { bindings = Hashtbl.create 16; parent = Some env } in
|
||||
(match item with
|
||||
| Dict d ->
|
||||
Hashtbl.iter (fun k v ->
|
||||
if k <> "stream-id" && k <> "__type" then begin
|
||||
(* Normalize: underscores → hyphens *)
|
||||
let norm_k = String.map (fun c -> if c = '_' then '-' else c) k in
|
||||
ignore (env_bind content_env norm_k v);
|
||||
if norm_k <> k then ignore (env_bind content_env k v)
|
||||
end
|
||||
) d
|
||||
| _ -> ());
|
||||
|
||||
(* aser :content in the data-bound env *)
|
||||
let content_result =
|
||||
let call = List [Symbol "aser"; List [Symbol "quote"; content_ast]; Env content_env] in
|
||||
Sx_ref.eval_expr call (Env content_env) in
|
||||
let sx_source = match content_result with
|
||||
| String s | SxExpr s -> s | _ -> serialize_value content_result in
|
||||
let resolve_script = sx_streaming_resolve_script stream_id sx_source in
|
||||
write_chunk fd resolve_script;
|
||||
incr resolve_count
|
||||
with e ->
|
||||
(* Error boundary: emit error fallback for this slot *)
|
||||
let msg = Printexc.to_string e in
|
||||
Printf.eprintf "[sx-stream] resolve error for %s: %s\n%!" stream_id msg;
|
||||
let error_sx = Printf.sprintf "(div :class \"text-rose-600 p-4 text-sm\" \"Error: %s\")"
|
||||
(String.map (fun c -> if c = '"' then '\'' else c) msg) in
|
||||
write_chunk fd (sx_streaming_resolve_script stream_id error_sx);
|
||||
incr resolve_count)
|
||||
) data_items;
|
||||
let t3 = Unix.gettimeofday () in
|
||||
Printf.eprintf "[sx-stream] %s shell=%.3fs flush=%.3fs data=%.3fs resolve=%.3fs total=%.3fs chunks=%d\n%!"
|
||||
path (t1 -. t0) (t2 -. t1) (t3_data -. t2) (t3 -. t3_data) (t3 -. t0) !resolve_count
|
||||
with e ->
|
||||
Printf.eprintf "[sx-stream] data eval failed: %s\n%!" (Printexc.to_string e))
|
||||
end else
|
||||
Printf.eprintf "[sx-stream] %s shell=%.3fs (no :data/:content)\n%!" path (t1 -. t0);
|
||||
|
||||
(* Phase 4: End chunked response *)
|
||||
end_chunked fd
|
||||
|
||||
(* ====================================================================== *)
|
||||
(* Static file serving + file hashing *)
|
||||
(* ====================================================================== *)
|
||||
@@ -2056,8 +2271,12 @@ let http_inject_shell_statics env static_dir sx_sxc =
|
||||
| Some v -> serialize_value v | _ -> "" in
|
||||
let has_data = match extract_kw "data" rest with
|
||||
| Some _ -> true | None -> false in
|
||||
let is_stream = match extract_kw "stream" rest with
|
||||
| Some (Symbol "true") | Some (Bool true) -> true | _ -> false in
|
||||
if path_val <> "" then begin
|
||||
_defpage_paths := path_val :: !_defpage_paths;
|
||||
if is_stream then
|
||||
Hashtbl.replace _streaming_pages path_val name;
|
||||
Buffer.add_string pages_buf
|
||||
(Printf.sprintf "{:name \"%s\" :path \"%s\" :auth \"public\" :has-data %s :content \"%s\"}\n"
|
||||
name path_val (if has_data then "true" else "false")
|
||||
@@ -2074,6 +2293,9 @@ let http_inject_shell_statics env static_dir sx_sxc =
|
||||
Printf.eprintf "[sx-http] pages-sx: %d bytes (%d lines)\n%!"
|
||||
(String.length pages_sx)
|
||||
(List.length (String.split_on_char '\n' pages_sx));
|
||||
if Hashtbl.length _streaming_pages > 0 then
|
||||
Printf.eprintf "[sx-http] streaming pages: %s\n%!"
|
||||
(String.concat ", " (Hashtbl.fold (fun p n acc -> (p ^ "→" ^ n) :: acc) _streaming_pages []));
|
||||
ignore (env_bind env "__shell-pages-sx" (String pages_sx));
|
||||
ignore (env_bind env "__shell-sx-css" (String sx_css));
|
||||
ignore (env_bind env "__shell-asset-url" (String "/static"));
|
||||
@@ -2514,6 +2736,23 @@ let http_setup_page_helpers env =
|
||||
with _ -> String (";; component " ^ name ^ " not found"))
|
||||
| _ -> raise (Eval_error "component-source: expected (name)"));
|
||||
|
||||
(* Streaming demo data helper — returns list of dicts for multi-stream test.
|
||||
Matches the Python async generator streaming-demo-data in sx_docs. *)
|
||||
bind "streaming-demo-data" (fun _args ->
|
||||
let mk_item id label color message time =
|
||||
let d = Hashtbl.create 8 in
|
||||
Hashtbl.replace d "stream-id" (String id);
|
||||
Hashtbl.replace d "stream_label" (String label);
|
||||
Hashtbl.replace d "stream_color" (String color);
|
||||
Hashtbl.replace d "stream_message" (String message);
|
||||
Hashtbl.replace d "stream_time" (String time);
|
||||
Dict d in
|
||||
List [
|
||||
mk_item "stream-fast" "Fast source" "emerald" "Resolved in ~0ms (sync)" "0ms";
|
||||
mk_item "stream-medium" "Medium source" "amber" "Resolved in ~0ms (sync)" "0ms";
|
||||
mk_item "stream-slow" "Slow source" "violet" "Resolved in ~0ms (sync)" "0ms";
|
||||
]);
|
||||
|
||||
ignore bind (* suppress unused warning *)
|
||||
|
||||
let http_mode port =
|
||||
@@ -3040,6 +3279,89 @@ let http_mode port =
|
||||
in
|
||||
write_response fd response; true
|
||||
end else if is_sx then begin
|
||||
(* Streaming pages: chunked transfer, bypass cache.
|
||||
Convert SX URL to flat defpage path:
|
||||
/sx/(geography.(isomorphism.streaming)) → /geography/isomorphism/streaming
|
||||
Strip prefix, remove parens, replace dots with slashes. *)
|
||||
let page_path =
|
||||
let raw = if String.length path > app_prefix_len
|
||||
&& String.sub path 0 app_prefix_len = app_prefix
|
||||
then String.sub path app_prefix_len (String.length path - app_prefix_len)
|
||||
else path in
|
||||
let buf = Buffer.create (String.length raw + 1) in
|
||||
Buffer.add_char buf '/';
|
||||
String.iter (fun c -> match c with
|
||||
| '(' | ')' -> ()
|
||||
| '.' -> Buffer.add_char buf '/'
|
||||
| c -> Buffer.add_char buf c
|
||||
) raw;
|
||||
Buffer.contents buf in
|
||||
let stream_page_name = Hashtbl.find_opt _streaming_pages page_path in
|
||||
if stream_page_name <> None then begin
|
||||
let sname = match stream_page_name with Some s -> s | None -> "" in
|
||||
if is_ajax then begin
|
||||
(* AJAX streaming: evaluate shell + data + content synchronously,
|
||||
return fully-resolved SX wire format (no chunked transfer). *)
|
||||
let response = try
|
||||
let page_def = match env_get env ("page:" ^ sname) with Dict d -> d | _ -> raise Not_found in
|
||||
let shell_ast = match Hashtbl.find_opt page_def "shell" with Some v -> v | None -> Nil in
|
||||
let data_ast = match Hashtbl.find_opt page_def "data" with Some v -> v | None -> Nil in
|
||||
let content_ast = match Hashtbl.find_opt page_def "content" with Some v -> v | None -> Nil in
|
||||
(* Evaluate shell — provides nav + suspense skeletons *)
|
||||
let shell_sx =
|
||||
let call = List [Symbol "aser"; List [Symbol "quote"; shell_ast]; Env env] in
|
||||
match Sx_ref.eval_expr call (Env env) with
|
||||
| String s | SxExpr s -> s | v -> serialize_value v in
|
||||
(* If we have data+content, resolve all slots and embed as OOB swaps *)
|
||||
let resolve_oob = if data_ast <> Nil && content_ast <> Nil then begin
|
||||
let data_result = try Sx_ref.eval_expr data_ast (Env env) with _ -> Nil in
|
||||
let data_items = match data_result with
|
||||
| Dict _ -> [(data_result, "stream-content")]
|
||||
| List items -> List.map (fun item ->
|
||||
let sid = match item with Dict d ->
|
||||
(match Hashtbl.find_opt d "stream-id" with Some (String s) -> s | _ -> "stream-content")
|
||||
| _ -> "stream-content" in (item, sid)) items
|
||||
| _ -> [] in
|
||||
let buf = Buffer.create 1024 in
|
||||
List.iter (fun (item, stream_id) ->
|
||||
try
|
||||
let cenv = { bindings = Hashtbl.create 16; parent = Some env } in
|
||||
(match item with Dict d ->
|
||||
Hashtbl.iter (fun k v ->
|
||||
if k <> "stream-id" && k <> "__type" then begin
|
||||
let nk = String.map (fun c -> if c = '_' then '-' else c) k in
|
||||
ignore (env_bind cenv nk v);
|
||||
if nk <> k then ignore (env_bind cenv k v)
|
||||
end) d | _ -> ());
|
||||
let cr = let call = List [Symbol "aser"; List [Symbol "quote"; content_ast]; Env cenv] in
|
||||
Sx_ref.eval_expr call (Env cenv) in
|
||||
let sx_src = match cr with String s | SxExpr s -> s | v -> serialize_value v in
|
||||
(* OOB swap: replace suspense placeholder contents *)
|
||||
Buffer.add_string buf
|
||||
(Printf.sprintf "(div :id \"sx-suspense-%s\" :data-suspense \"%s\" :sx-swap-oob \"innerHTML\" :style \"display:contents\" %s)"
|
||||
stream_id stream_id sx_src)
|
||||
with e ->
|
||||
Printf.eprintf "[sx-stream-ajax] resolve error %s: %s\n%!" stream_id (Printexc.to_string e)
|
||||
) data_items;
|
||||
Buffer.contents buf
|
||||
end else "" in
|
||||
http_response ~content_type:"text/sx; charset=utf-8" (shell_sx ^ resolve_oob)
|
||||
with e ->
|
||||
Printf.eprintf "[sx-stream-ajax] error for %s: %s\n%!" path (Printexc.to_string e);
|
||||
http_response ~status:500 ~content_type:"text/sx; charset=utf-8"
|
||||
(Printf.sprintf "(div :class \"p-4 text-rose-600\" \"Streaming page error: %s\")"
|
||||
(escape_sx_string (Printexc.to_string e)))
|
||||
in
|
||||
write_response fd response; true
|
||||
end else begin
|
||||
(* Full page streaming: chunked transfer encoding *)
|
||||
(try http_render_page_streaming env path [] fd sname
|
||||
with Exit -> () (* page def not found — already handled *)
|
||||
| e -> Printf.eprintf "[sx-stream] unexpected error for %s: %s\n%!" path (Printexc.to_string e);
|
||||
(try Unix.close fd with _ -> ()));
|
||||
true
|
||||
end
|
||||
end else
|
||||
let has_state_cookie = Hashtbl.mem _request_cookies "sx-home-stepper" in
|
||||
let cache_key = if is_ajax then "ajax:" ^ path else path in
|
||||
match (if has_state_cookie then None
|
||||
|
||||
Reference in New Issue
Block a user