Async IO in streaming render — staggered resolve with io-sleep

Server (sx_server.ml):
- eval_with_io: CEK evaluator with IO suspension handling (io-sleep, import)
- io-sleep platform primitive: raises CekPerformRequest, resolved by eval_with_io
- Streaming render uses eval_with_io for data + content evaluation
- Data items with "delay" field sleep before resolving (async streaming)
- Removed hardcoded streaming-demo-data — application logic belongs in .sx

Application (streaming-demo.sx):
- streaming-demo-data defined in SX: 3 items with 1s/3s/5s delays
- Each item has delay, stream-id, and display data fields
- Shell renders instantly, slots fill progressively as IO completes

Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
This commit is contained in:
2026-04-11 16:19:21 +00:00
parent eaf5af4cd8
commit c850737c60
2 changed files with 118 additions and 22 deletions

View File

@@ -1956,6 +1956,49 @@ let sx_streaming_resolve_script id sx_source =
Printf.sprintf "<script>window.__sxResolve&&window.__sxResolve(%s,%s)</script>"
(json_encode_string id) (json_encode_string sx_source)
(* ====================================================================== *)
(* IO-resolving evaluator for streaming render *)
(* ====================================================================== *)
(* Evaluate an expression with IO suspension handling.
Steps the CEK machine; when it suspends on an IO request, resolves
the request locally and resumes. Supports: io-sleep, import, helper. *)
let eval_with_io expr env =
let state = ref (Sx_ref.cek_step_loop (Sx_ref.make_cek_state expr (Env env) Nil)) in
while sx_truthy (Sx_ref.cek_suspended_p !state) do
let request = Sx_ref.cek_io_request !state in
let op = match request with
| Dict d -> (match Hashtbl.find_opt d "op" with Some (String s) -> s | Some (Symbol s) -> s | _ -> "")
| _ -> "" in
let args = match request with
| Dict d -> (match Hashtbl.find_opt d "args" with Some v -> v | None -> Nil)
| _ -> Nil in
let result = match op with
| "io-sleep" | "sleep" ->
let ms = match args with
| List (Number n :: _) -> n
| Number n -> n
| _ -> 0.0 in
Unix.sleepf (ms /. 1000.0);
Nil
| "import" ->
(* Library import — delegate to existing import hook *)
(try
let lib_name = match request with
| Dict d -> (match Hashtbl.find_opt d "library" with Some v -> v | _ -> args)
| _ -> args in
ignore lib_name; (* import handled by _import_hook if registered *)
Nil
with _ -> Nil)
| _ ->
Printf.eprintf "[io] unhandled IO op: %s\n%!" op;
Nil
in
state := Sx_ref.cek_step_loop (Sx_ref.cek_resume !state result)
done;
if sx_truthy (Sx_ref.cek_terminal_p !state) then Sx_ref.cek_value !state
else Nil
(* ====================================================================== *)
(* Streaming page render — shell-first with chunked transfer encoding *)
(* ====================================================================== *)
@@ -2045,11 +2088,14 @@ let http_render_page_streaming env path _headers fd page_name =
write_chunk fd _sx_streaming_bootstrap;
let t2 = Unix.gettimeofday () in
(* Phase 3: Evaluate :data, render :content, flush resolve scripts *)
(* Phase 3: Evaluate :data, render :content, flush resolve scripts.
Uses eval_with_io so :data expressions can perform IO (e.g. sleep, fetch).
Each data item is resolved independently — IO in one item doesn't block others
from being flushed as they complete. *)
let resolve_count = ref 0 in
if data_ast <> Nil && content_ast <> Nil then begin
(try
let data_result = Sx_ref.eval_expr data_ast (Env env) in
let data_result = eval_with_io data_ast env in
let t3_data = Unix.gettimeofday () in
(* Determine single-stream vs multi-stream *)
@@ -2069,15 +2115,26 @@ let http_render_page_streaming env path _headers fd page_name =
[]
in
(* For each data item, bind values and render :content *)
(* For each data item, bind values and render :content.
If the item has a "delay" field, perform IO sleep before resolving.
Each item flushes its resolve script independently — the client sees
content appear progressively as each IO completes. *)
List.iter (fun (item, stream_id) ->
(try
(* IO sleep if delay specified — demonstrates async streaming *)
(match item with
| Dict d -> (match Hashtbl.find_opt d "delay" with
| Some (Number ms) when ms > 0.0 ->
Printf.eprintf "[sx-stream] %s: sleeping %.0fms for IO...\n%!" stream_id ms;
Unix.sleepf (ms /. 1000.0)
| _ -> ())
| _ -> ());
(* Create fresh env with data bindings *)
let content_env = { bindings = Hashtbl.create 16; parent = Some env } in
(match item with
| Dict d ->
Hashtbl.iter (fun k v ->
if k <> "stream-id" && k <> "__type" then begin
if k <> "stream-id" && k <> "__type" && k <> "delay" then begin
(* Normalize: underscores → hyphens *)
let norm_k = String.map (fun c -> if c = '_' then '-' else c) k in
ignore (env_bind content_env norm_k v);
@@ -2086,10 +2143,10 @@ let http_render_page_streaming env path _headers fd page_name =
) d
| _ -> ());
(* aser :content in the data-bound env *)
(* aser :content in the data-bound env — also with IO resolution *)
let content_result =
let call = List [Symbol "aser"; List [Symbol "quote"; content_ast]; Env content_env] in
Sx_ref.eval_expr call (Env content_env) in
eval_with_io call content_env in
let sx_source = match content_result with
| String s | SxExpr s -> s | _ -> serialize_value content_result in
let resolve_script = sx_streaming_resolve_script stream_id sx_source in
@@ -2736,22 +2793,16 @@ let http_setup_page_helpers env =
with _ -> String (";; component " ^ name ^ " not found"))
| _ -> raise (Eval_error "component-source: expected (name)"));
(* Streaming demo data helper — returns list of dicts for multi-stream test.
Matches the Python async generator streaming-demo-data in sx_docs. *)
bind "streaming-demo-data" (fun _args ->
let mk_item id label color message time =
let d = Hashtbl.create 8 in
Hashtbl.replace d "stream-id" (String id);
Hashtbl.replace d "stream_label" (String label);
Hashtbl.replace d "stream_color" (String color);
Hashtbl.replace d "stream_message" (String message);
Hashtbl.replace d "stream_time" (String time);
Dict d in
List [
mk_item "stream-fast" "Fast source" "emerald" "Resolved in ~0ms (sync)" "0ms";
mk_item "stream-medium" "Medium source" "amber" "Resolved in ~0ms (sync)" "0ms";
mk_item "stream-slow" "Slow source" "violet" "Resolved in ~0ms (sync)" "0ms";
]);
(* IO sleep primitive — used by streaming pages to simulate async IO delays.
Application code calls (io-sleep ms) which raises CekPerformRequest;
the streaming render's eval_with_io resolves it with Unix.sleepf. *)
bind "io-sleep" (fun args ->
let ms = match args with Number n :: _ -> n | _ -> 0.0 in
raise (Sx_types.CekPerformRequest (
let d = Hashtbl.create 2 in
Hashtbl.replace d "op" (String "io-sleep");
Hashtbl.replace d "args" (List [Number ms]);
Dict d)));
ignore bind (* suppress unused warning *)

View File

@@ -91,3 +91,48 @@
(li (code ":content") " expression is re-evaluated with each yield's bindings")
(li "Headers stream concurrently — independent of the data generator")
(li "Future: SSE/WebSocket for re-resolving slots after initial page load")))))
(define
streaming-demo-data
(fn
()
(list
(dict
"stream-id"
"stream-fast"
"delay"
1000
"stream_label"
"Fast source"
"stream_color"
"emerald"
"stream_message"
"Resolved in ~1s (async IO)"
"stream_time"
"~1s")
(dict
"stream-id"
"stream-medium"
"delay"
3000
"stream_label"
"Medium source"
"stream_color"
"amber"
"stream_message"
"Resolved in ~3s (async IO)"
"stream_time"
"~3s")
(dict
"stream-id"
"stream-slow"
"delay"
5000
"stream_label"
"Slow source"
"stream_color"
"violet"
"stream_message"
"Resolved in ~5s (async IO)"
"stream_time"
"~5s"))))