Phase 6: Streaming & Suspense — chunked HTML with suspense resolution
Server streams HTML shell with ~suspense placeholders immediately, then sends resolution <script> chunks as async IO completes. Browser renders loading skeletons instantly, replacing them with real content as data arrives via __sxResolve(). - defpage :stream true opts pages into streaming response - ~suspense component renders fallback with data-suspense attr - resolve-suspense in boot.sx (spec) + bootstrapped to sx-browser.js - __sxPending queue handles resolution before sx-browser.js loads - execute_page_streaming() async generator with concurrent IO tasks - Streaming demo page at /isomorphism/streaming with 1.5s simulated delay Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
This commit is contained in:
@@ -309,6 +309,157 @@ async def execute_page(
|
||||
)
|
||||
|
||||
|
||||
# ---------------------------------------------------------------------------
|
||||
# Streaming page execution (Phase 6: Streaming & Suspense)
|
||||
# ---------------------------------------------------------------------------
|
||||
|
||||
async def execute_page_streaming(
|
||||
page_def: PageDef,
|
||||
service_name: str,
|
||||
url_params: dict[str, Any] | None = None,
|
||||
):
|
||||
"""Execute a page with streaming response.
|
||||
|
||||
Returns an async generator that yields HTML chunks:
|
||||
1. HTML shell with suspense placeholders (immediate)
|
||||
2. Resolution <script> tags as IO completes
|
||||
3. Closing </body></html>
|
||||
|
||||
Each suspense placeholder renders a loading skeleton. As data and
|
||||
header IO resolve, the server streams inline scripts that call
|
||||
``__sxResolve(id, sx)`` to replace the placeholder content.
|
||||
"""
|
||||
import asyncio
|
||||
from .jinja_bridge import get_component_env, _get_request_context
|
||||
from .async_eval import async_eval
|
||||
from .page import get_template_context
|
||||
from .helpers import (
|
||||
_render_to_sx, sx_page_streaming_parts,
|
||||
sx_streaming_resolve_script,
|
||||
)
|
||||
from .parser import SxExpr, serialize as sx_serialize
|
||||
from .layouts import get_layout
|
||||
|
||||
if url_params is None:
|
||||
url_params = {}
|
||||
|
||||
env = dict(get_component_env())
|
||||
env.update(get_page_helpers(service_name))
|
||||
env.update(page_def.closure)
|
||||
for key, val in url_params.items():
|
||||
kebab = key.replace("_", "-")
|
||||
env[kebab] = val
|
||||
env[key] = val
|
||||
|
||||
ctx = _get_request_context()
|
||||
tctx = await get_template_context()
|
||||
|
||||
# Build fallback expressions
|
||||
if page_def.fallback_expr is not None:
|
||||
fallback_sx = sx_serialize(page_def.fallback_expr)
|
||||
else:
|
||||
fallback_sx = (
|
||||
'(div :class "p-8 animate-pulse"'
|
||||
' (div :class "h-8 bg-stone-200 rounded mb-4 w-1/3")'
|
||||
' (div :class "h-64 bg-stone-200 rounded"))'
|
||||
)
|
||||
header_fallback = '(div :class "h-12 bg-stone-200 animate-pulse")'
|
||||
|
||||
# Resolve layout
|
||||
layout = None
|
||||
layout_kwargs: dict[str, Any] = {}
|
||||
if page_def.layout is not None:
|
||||
if isinstance(page_def.layout, str):
|
||||
layout_name = page_def.layout
|
||||
elif isinstance(page_def.layout, list):
|
||||
from .types import Keyword as SxKeyword, Symbol as SxSymbol
|
||||
raw = page_def.layout
|
||||
first = raw[0]
|
||||
layout_name = (
|
||||
first.name if isinstance(first, (SxKeyword, SxSymbol))
|
||||
else str(first)
|
||||
)
|
||||
i = 1
|
||||
while i < len(raw):
|
||||
k = raw[i]
|
||||
if isinstance(k, SxKeyword) and i + 1 < len(raw):
|
||||
resolved = await async_eval(raw[i + 1], env, ctx)
|
||||
layout_kwargs[k.name.replace("-", "_")] = resolved
|
||||
i += 2
|
||||
else:
|
||||
i += 1
|
||||
else:
|
||||
layout_name = str(page_def.layout)
|
||||
layout = get_layout(layout_name)
|
||||
|
||||
# --- Concurrent IO tasks ---
|
||||
|
||||
async def _eval_data_and_content():
|
||||
data_env = dict(env)
|
||||
if page_def.data_expr is not None:
|
||||
data_result = await async_eval(page_def.data_expr, data_env, ctx)
|
||||
if isinstance(data_result, dict):
|
||||
for k, v in data_result.items():
|
||||
data_env[k.replace("_", "-")] = v
|
||||
content_sx = await _eval_slot(page_def.content_expr, data_env, ctx) if page_def.content_expr else ""
|
||||
filter_sx = await _eval_slot(page_def.filter_expr, data_env, ctx) if page_def.filter_expr else ""
|
||||
aside_sx = await _eval_slot(page_def.aside_expr, data_env, ctx) if page_def.aside_expr else ""
|
||||
menu_sx = await _eval_slot(page_def.menu_expr, data_env, ctx) if page_def.menu_expr else ""
|
||||
return content_sx, filter_sx, aside_sx, menu_sx
|
||||
|
||||
async def _eval_headers():
|
||||
if layout is None:
|
||||
return "", ""
|
||||
rows = await layout.full_headers(tctx, **layout_kwargs)
|
||||
menu = await layout.mobile_menu(tctx, **layout_kwargs)
|
||||
return rows, menu
|
||||
|
||||
data_task = asyncio.create_task(_eval_data_and_content())
|
||||
header_task = asyncio.create_task(_eval_headers())
|
||||
|
||||
# --- Build initial page SX with suspense placeholders ---
|
||||
|
||||
initial_page_sx = await _render_to_sx("app-body",
|
||||
header_rows=SxExpr(
|
||||
f'(~suspense :id "stream-headers" :fallback {header_fallback})'
|
||||
),
|
||||
content=SxExpr(
|
||||
f'(~suspense :id "stream-content" :fallback {fallback_sx})'
|
||||
),
|
||||
)
|
||||
|
||||
shell, tail = sx_page_streaming_parts(tctx, initial_page_sx)
|
||||
|
||||
# --- Yield initial shell + scripts ---
|
||||
yield shell + tail
|
||||
|
||||
# --- Yield resolution chunks in completion order ---
|
||||
tasks = {data_task: "data", header_task: "headers"}
|
||||
pending = set(tasks.keys())
|
||||
|
||||
while pending:
|
||||
done, pending = await asyncio.wait(
|
||||
pending, return_when=asyncio.FIRST_COMPLETED,
|
||||
)
|
||||
for task in done:
|
||||
label = tasks[task]
|
||||
try:
|
||||
result = task.result()
|
||||
except Exception as e:
|
||||
logger.error("Streaming %s task failed: %s", label, e)
|
||||
continue
|
||||
|
||||
if label == "data":
|
||||
content_sx, filter_sx, aside_sx, menu_sx = result
|
||||
yield sx_streaming_resolve_script("stream-content", content_sx)
|
||||
elif label == "headers":
|
||||
header_rows, header_menu = result
|
||||
if header_rows:
|
||||
yield sx_streaming_resolve_script("stream-headers", header_rows)
|
||||
|
||||
yield "\n</body>\n</html>"
|
||||
|
||||
|
||||
# ---------------------------------------------------------------------------
|
||||
# Blueprint mounting
|
||||
# ---------------------------------------------------------------------------
|
||||
@@ -356,17 +507,30 @@ def mount_pages(bp: Any, service_name: str,
|
||||
|
||||
def _mount_one_page(bp: Any, service_name: str, page_def: PageDef) -> None:
|
||||
"""Mount a single PageDef as a GET route on the blueprint."""
|
||||
from quart import make_response
|
||||
from quart import make_response, Response
|
||||
|
||||
# Build the view function
|
||||
async def page_view(**kwargs: Any) -> Any:
|
||||
# Re-fetch the page from registry to support hot-reload of content
|
||||
current = get_page(service_name, page_def.name) or page_def
|
||||
result = await execute_page(current, service_name, url_params=kwargs)
|
||||
# If result is already a Response (from sx_response), return it
|
||||
if hasattr(result, "status_code"):
|
||||
return result
|
||||
return await make_response(result, 200)
|
||||
if page_def.stream:
|
||||
# Streaming response: yields HTML chunks as IO resolves
|
||||
async def page_view(**kwargs: Any) -> Any:
|
||||
from shared.browser.app.utils.htmx import is_htmx_request
|
||||
current = get_page(service_name, page_def.name) or page_def
|
||||
# Only stream for full page loads (not SX/HTMX requests)
|
||||
if is_htmx_request():
|
||||
result = await execute_page(current, service_name, url_params=kwargs)
|
||||
if hasattr(result, "status_code"):
|
||||
return result
|
||||
return await make_response(result, 200)
|
||||
# Streaming response
|
||||
gen = execute_page_streaming(current, service_name, url_params=kwargs)
|
||||
return Response(gen, content_type="text/html; charset=utf-8")
|
||||
else:
|
||||
# Standard non-streaming response
|
||||
async def page_view(**kwargs: Any) -> Any:
|
||||
current = get_page(service_name, page_def.name) or page_def
|
||||
result = await execute_page(current, service_name, url_params=kwargs)
|
||||
if hasattr(result, "status_code"):
|
||||
return result
|
||||
return await make_response(result, 200)
|
||||
|
||||
# Give the view function a unique name for Quart's routing
|
||||
page_view.__name__ = f"defpage_{page_def.name.replace('-', '_')}"
|
||||
|
||||
Reference in New Issue
Block a user