Fix streaming: split setup (needs context) from generator (just yields)
Async generator bodies don't execute until __anext__(), by which time the request context is gone. Restructure execute_page_streaming as a regular async function that does all context-dependent work (g, request, current_app access, layout resolution, task creation) while the context is live, then returns an inner async generator that only yields strings and awaits pre-created tasks. Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
This commit is contained in:
@@ -320,14 +320,10 @@ async def execute_page_streaming(
|
|||||||
):
|
):
|
||||||
"""Execute a page with streaming response.
|
"""Execute a page with streaming response.
|
||||||
|
|
||||||
Returns an async generator that yields HTML chunks:
|
All context-dependent setup (g, request, current_app access) runs in
|
||||||
1. HTML shell with suspense placeholders (immediate)
|
this regular async function — called while the request context is live.
|
||||||
2. Resolution <script> tags as IO completes
|
Returns an async generator that yields pre-computed HTML chunks and
|
||||||
3. Closing </body></html>
|
awaits already-created tasks (no further context access needed).
|
||||||
|
|
||||||
Each suspense placeholder renders a loading skeleton. As data and
|
|
||||||
header IO resolve, the server streams inline scripts that call
|
|
||||||
``__sxResolve(id, sx)`` to replace the placeholder content.
|
|
||||||
"""
|
"""
|
||||||
import asyncio
|
import asyncio
|
||||||
from .jinja_bridge import get_component_env, _get_request_context
|
from .jinja_bridge import get_component_env, _get_request_context
|
||||||
@@ -392,7 +388,7 @@ async def execute_page_streaming(
|
|||||||
layout_name = str(page_def.layout)
|
layout_name = str(page_def.layout)
|
||||||
layout = get_layout(layout_name)
|
layout = get_layout(layout_name)
|
||||||
|
|
||||||
# --- Concurrent IO tasks ---
|
# --- Launch concurrent IO tasks (inherit context via create_task) ---
|
||||||
|
|
||||||
async def _eval_data_and_content():
|
async def _eval_data_and_content():
|
||||||
data_env = dict(env)
|
data_env = dict(env)
|
||||||
@@ -417,7 +413,7 @@ async def execute_page_streaming(
|
|||||||
data_task = asyncio.create_task(_eval_data_and_content())
|
data_task = asyncio.create_task(_eval_data_and_content())
|
||||||
header_task = asyncio.create_task(_eval_headers())
|
header_task = asyncio.create_task(_eval_headers())
|
||||||
|
|
||||||
# --- Build initial page SX with suspense placeholders ---
|
# --- Build initial shell (still in request context) ---
|
||||||
|
|
||||||
initial_page_sx = await _render_to_sx("app-body",
|
initial_page_sx = await _render_to_sx("app-body",
|
||||||
header_rows=SxExpr(
|
header_rows=SxExpr(
|
||||||
@@ -430,34 +426,38 @@ async def execute_page_streaming(
|
|||||||
|
|
||||||
shell, tail = sx_page_streaming_parts(tctx, initial_page_sx)
|
shell, tail = sx_page_streaming_parts(tctx, initial_page_sx)
|
||||||
|
|
||||||
# --- Yield initial shell + scripts ---
|
# --- Return async generator that yields chunks ---
|
||||||
yield shell + tail
|
# No context access needed below — just awaiting tasks and yielding strings.
|
||||||
|
|
||||||
# --- Yield resolution chunks in completion order ---
|
async def _stream_chunks():
|
||||||
tasks = {data_task: "data", header_task: "headers"}
|
yield shell + tail
|
||||||
pending = set(tasks.keys())
|
|
||||||
|
|
||||||
while pending:
|
tasks = {data_task: "data", header_task: "headers"}
|
||||||
done, pending = await asyncio.wait(
|
pending = set(tasks.keys())
|
||||||
pending, return_when=asyncio.FIRST_COMPLETED,
|
|
||||||
)
|
|
||||||
for task in done:
|
|
||||||
label = tasks[task]
|
|
||||||
try:
|
|
||||||
result = task.result()
|
|
||||||
except Exception as e:
|
|
||||||
logger.error("Streaming %s task failed: %s", label, e)
|
|
||||||
continue
|
|
||||||
|
|
||||||
if label == "data":
|
while pending:
|
||||||
content_sx, filter_sx, aside_sx, menu_sx = result
|
done, pending = await asyncio.wait(
|
||||||
yield sx_streaming_resolve_script("stream-content", content_sx)
|
pending, return_when=asyncio.FIRST_COMPLETED,
|
||||||
elif label == "headers":
|
)
|
||||||
header_rows, header_menu = result
|
for task in done:
|
||||||
if header_rows:
|
label = tasks[task]
|
||||||
yield sx_streaming_resolve_script("stream-headers", header_rows)
|
try:
|
||||||
|
result = task.result()
|
||||||
|
except Exception as e:
|
||||||
|
logger.error("Streaming %s task failed: %s", label, e)
|
||||||
|
continue
|
||||||
|
|
||||||
yield "\n</body>\n</html>"
|
if label == "data":
|
||||||
|
content_sx, filter_sx, aside_sx, menu_sx = result
|
||||||
|
yield sx_streaming_resolve_script("stream-content", content_sx)
|
||||||
|
elif label == "headers":
|
||||||
|
header_rows, header_menu = result
|
||||||
|
if header_rows:
|
||||||
|
yield sx_streaming_resolve_script("stream-headers", header_rows)
|
||||||
|
|
||||||
|
yield "\n</body>\n</html>"
|
||||||
|
|
||||||
|
return _stream_chunks()
|
||||||
|
|
||||||
|
|
||||||
# ---------------------------------------------------------------------------
|
# ---------------------------------------------------------------------------
|
||||||
@@ -507,7 +507,7 @@ def mount_pages(bp: Any, service_name: str,
|
|||||||
|
|
||||||
def _mount_one_page(bp: Any, service_name: str, page_def: PageDef) -> None:
|
def _mount_one_page(bp: Any, service_name: str, page_def: PageDef) -> None:
|
||||||
"""Mount a single PageDef as a GET route on the blueprint."""
|
"""Mount a single PageDef as a GET route on the blueprint."""
|
||||||
from quart import make_response, Response, stream_with_context
|
from quart import make_response, Response
|
||||||
|
|
||||||
if page_def.stream:
|
if page_def.stream:
|
||||||
# Streaming response: yields HTML chunks as IO resolves
|
# Streaming response: yields HTML chunks as IO resolves
|
||||||
@@ -520,15 +520,13 @@ def _mount_one_page(bp: Any, service_name: str, page_def: PageDef) -> None:
|
|||||||
if hasattr(result, "status_code"):
|
if hasattr(result, "status_code"):
|
||||||
return result
|
return result
|
||||||
return await make_response(result, 200)
|
return await make_response(result, 200)
|
||||||
# stream_with_context is a decorator — wrap the generator function
|
# execute_page_streaming does all context-dependent setup as a
|
||||||
# so app/request context is preserved across yields
|
# regular async function (while request context is live), then
|
||||||
@stream_with_context
|
# returns an async generator that only yields strings.
|
||||||
async def _stream():
|
gen = await execute_page_streaming(
|
||||||
async for chunk in execute_page_streaming(
|
current, service_name, url_params=kwargs,
|
||||||
current, service_name, url_params=kwargs,
|
)
|
||||||
):
|
return Response(gen, content_type="text/html; charset=utf-8")
|
||||||
yield chunk
|
|
||||||
return Response(_stream(), content_type="text/html; charset=utf-8")
|
|
||||||
else:
|
else:
|
||||||
# Standard non-streaming response
|
# Standard non-streaming response
|
||||||
async def page_view(**kwargs: Any) -> Any:
|
async def page_view(**kwargs: Any) -> Any:
|
||||||
|
|||||||
Reference in New Issue
Block a user