From c850737c6075dc753918bda53e873bcd8cded8af Mon Sep 17 00:00:00 2001 From: giles Date: Sat, 11 Apr 2026 16:19:21 +0000 Subject: [PATCH] =?UTF-8?q?Async=20IO=20in=20streaming=20render=20?= =?UTF-8?q?=E2=80=94=20staggered=20resolve=20with=20io-sleep?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Server (sx_server.ml): - eval_with_io: CEK evaluator with IO suspension handling (io-sleep, import) - io-sleep platform primitive: raises CekPerformRequest, resolved by eval_with_io - Streaming render uses eval_with_io for data + content evaluation - Data items with "delay" field sleep before resolving (async streaming) - Removed hardcoded streaming-demo-data — application logic belongs in .sx Application (streaming-demo.sx): - streaming-demo-data defined in SX: 3 items with 1s/3s/5s delays - Each item has delay, stream-id, and display data fields - Shell renders instantly, slots fill progressively as IO completes Co-Authored-By: Claude Opus 4.6 (1M context) --- hosts/ocaml/bin/sx_server.ml | 95 +++++++++++++++++++++++++++--------- sx/sx/streaming-demo.sx | 45 +++++++++++++++++ 2 files changed, 118 insertions(+), 22 deletions(-) diff --git a/hosts/ocaml/bin/sx_server.ml b/hosts/ocaml/bin/sx_server.ml index 164f3d4c..76b9a793 100644 --- a/hosts/ocaml/bin/sx_server.ml +++ b/hosts/ocaml/bin/sx_server.ml @@ -1956,6 +1956,49 @@ let sx_streaming_resolve_script id sx_source = Printf.sprintf "" (json_encode_string id) (json_encode_string sx_source) +(* ====================================================================== *) +(* IO-resolving evaluator for streaming render *) +(* ====================================================================== *) + +(* Evaluate an expression with IO suspension handling. + Steps the CEK machine; when it suspends on an IO request, resolves + the request locally and resumes. Supports: io-sleep, import, helper. *) +let eval_with_io expr env = + let state = ref (Sx_ref.cek_step_loop (Sx_ref.make_cek_state expr (Env env) Nil)) in + while sx_truthy (Sx_ref.cek_suspended_p !state) do + let request = Sx_ref.cek_io_request !state in + let op = match request with + | Dict d -> (match Hashtbl.find_opt d "op" with Some (String s) -> s | Some (Symbol s) -> s | _ -> "") + | _ -> "" in + let args = match request with + | Dict d -> (match Hashtbl.find_opt d "args" with Some v -> v | None -> Nil) + | _ -> Nil in + let result = match op with + | "io-sleep" | "sleep" -> + let ms = match args with + | List (Number n :: _) -> n + | Number n -> n + | _ -> 0.0 in + Unix.sleepf (ms /. 1000.0); + Nil + | "import" -> + (* Library import — delegate to existing import hook *) + (try + let lib_name = match request with + | Dict d -> (match Hashtbl.find_opt d "library" with Some v -> v | _ -> args) + | _ -> args in + ignore lib_name; (* import handled by _import_hook if registered *) + Nil + with _ -> Nil) + | _ -> + Printf.eprintf "[io] unhandled IO op: %s\n%!" op; + Nil + in + state := Sx_ref.cek_step_loop (Sx_ref.cek_resume !state result) + done; + if sx_truthy (Sx_ref.cek_terminal_p !state) then Sx_ref.cek_value !state + else Nil + (* ====================================================================== *) (* Streaming page render — shell-first with chunked transfer encoding *) (* ====================================================================== *) @@ -2045,11 +2088,14 @@ let http_render_page_streaming env path _headers fd page_name = write_chunk fd _sx_streaming_bootstrap; let t2 = Unix.gettimeofday () in - (* Phase 3: Evaluate :data, render :content, flush resolve scripts *) + (* Phase 3: Evaluate :data, render :content, flush resolve scripts. + Uses eval_with_io so :data expressions can perform IO (e.g. sleep, fetch). + Each data item is resolved independently — IO in one item doesn't block others + from being flushed as they complete. *) let resolve_count = ref 0 in if data_ast <> Nil && content_ast <> Nil then begin (try - let data_result = Sx_ref.eval_expr data_ast (Env env) in + let data_result = eval_with_io data_ast env in let t3_data = Unix.gettimeofday () in (* Determine single-stream vs multi-stream *) @@ -2069,15 +2115,26 @@ let http_render_page_streaming env path _headers fd page_name = [] in - (* For each data item, bind values and render :content *) + (* For each data item, bind values and render :content. + If the item has a "delay" field, perform IO sleep before resolving. + Each item flushes its resolve script independently — the client sees + content appear progressively as each IO completes. *) List.iter (fun (item, stream_id) -> (try + (* IO sleep if delay specified — demonstrates async streaming *) + (match item with + | Dict d -> (match Hashtbl.find_opt d "delay" with + | Some (Number ms) when ms > 0.0 -> + Printf.eprintf "[sx-stream] %s: sleeping %.0fms for IO...\n%!" stream_id ms; + Unix.sleepf (ms /. 1000.0) + | _ -> ()) + | _ -> ()); (* Create fresh env with data bindings *) let content_env = { bindings = Hashtbl.create 16; parent = Some env } in (match item with | Dict d -> Hashtbl.iter (fun k v -> - if k <> "stream-id" && k <> "__type" then begin + if k <> "stream-id" && k <> "__type" && k <> "delay" then begin (* Normalize: underscores → hyphens *) let norm_k = String.map (fun c -> if c = '_' then '-' else c) k in ignore (env_bind content_env norm_k v); @@ -2086,10 +2143,10 @@ let http_render_page_streaming env path _headers fd page_name = ) d | _ -> ()); - (* aser :content in the data-bound env *) + (* aser :content in the data-bound env — also with IO resolution *) let content_result = let call = List [Symbol "aser"; List [Symbol "quote"; content_ast]; Env content_env] in - Sx_ref.eval_expr call (Env content_env) in + eval_with_io call content_env in let sx_source = match content_result with | String s | SxExpr s -> s | _ -> serialize_value content_result in let resolve_script = sx_streaming_resolve_script stream_id sx_source in @@ -2736,22 +2793,16 @@ let http_setup_page_helpers env = with _ -> String (";; component " ^ name ^ " not found")) | _ -> raise (Eval_error "component-source: expected (name)")); - (* Streaming demo data helper — returns list of dicts for multi-stream test. - Matches the Python async generator streaming-demo-data in sx_docs. *) - bind "streaming-demo-data" (fun _args -> - let mk_item id label color message time = - let d = Hashtbl.create 8 in - Hashtbl.replace d "stream-id" (String id); - Hashtbl.replace d "stream_label" (String label); - Hashtbl.replace d "stream_color" (String color); - Hashtbl.replace d "stream_message" (String message); - Hashtbl.replace d "stream_time" (String time); - Dict d in - List [ - mk_item "stream-fast" "Fast source" "emerald" "Resolved in ~0ms (sync)" "0ms"; - mk_item "stream-medium" "Medium source" "amber" "Resolved in ~0ms (sync)" "0ms"; - mk_item "stream-slow" "Slow source" "violet" "Resolved in ~0ms (sync)" "0ms"; - ]); + (* IO sleep primitive — used by streaming pages to simulate async IO delays. + Application code calls (io-sleep ms) which raises CekPerformRequest; + the streaming render's eval_with_io resolves it with Unix.sleepf. *) + bind "io-sleep" (fun args -> + let ms = match args with Number n :: _ -> n | _ -> 0.0 in + raise (Sx_types.CekPerformRequest ( + let d = Hashtbl.create 2 in + Hashtbl.replace d "op" (String "io-sleep"); + Hashtbl.replace d "args" (List [Number ms]); + Dict d))); ignore bind (* suppress unused warning *) diff --git a/sx/sx/streaming-demo.sx b/sx/sx/streaming-demo.sx index e89e8c3b..4696dfe3 100644 --- a/sx/sx/streaming-demo.sx +++ b/sx/sx/streaming-demo.sx @@ -91,3 +91,48 @@ (li (code ":content") " expression is re-evaluated with each yield's bindings") (li "Headers stream concurrently — independent of the data generator") (li "Future: SSE/WebSocket for re-resolving slots after initial page load"))))) + +(define + streaming-demo-data + (fn + () + (list + (dict + "stream-id" + "stream-fast" + "delay" + 1000 + "stream_label" + "Fast source" + "stream_color" + "emerald" + "stream_message" + "Resolved in ~1s (async IO)" + "stream_time" + "~1s") + (dict + "stream-id" + "stream-medium" + "delay" + 3000 + "stream_label" + "Medium source" + "stream_color" + "amber" + "stream_message" + "Resolved in ~3s (async IO)" + "stream_time" + "~3s") + (dict + "stream-id" + "stream-slow" + "delay" + 5000 + "stream_label" + "Slow source" + "stream_color" + "violet" + "stream_message" + "Resolved in ~5s (async IO)" + "stream_time" + "~5s"))))