Restore stashed WIP: live streaming plan, forms, CI pipeline, streaming demo

Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
This commit is contained in:
2026-03-07 22:07:59 +00:00
parent df1aa4e1d1
commit 5a68046bd8
21 changed files with 1463 additions and 120 deletions

View File

@@ -18,6 +18,7 @@ Usage::
from __future__ import annotations
import inspect
import logging
import os
from typing import Any
@@ -86,10 +87,15 @@ def register_page_helpers(service: str, helpers: dict[str, Any]) -> None:
for name in helpers:
validate_helper(service, name)
# Wrap helpers to validate return values at the boundary
# Wrap helpers to validate return values at the boundary.
# Async generators pass through unwrapped — their yields are validated
# by the streaming infrastructure, not at the helper boundary.
wrapped: dict[str, Any] = {}
for name, fn in helpers.items():
if asyncio.iscoroutinefunction(fn):
if inspect.isasyncgenfunction(fn):
# Async generator: pass through (streaming infra validates yields)
wrapped[name] = fn
elif asyncio.iscoroutinefunction(fn):
@functools.wraps(fn)
async def _async_wrap(*a, _fn=fn, _name=name, **kw):
result = await _fn(*a, **kw)
@@ -168,6 +174,44 @@ async def _eval_slot(expr: Any, env: dict, ctx: Any) -> str:
return await async_eval_slot_to_sx(expr, env, ctx)
def _replace_suspense_sexp(sx: str, stream_id: str, replacement: str) -> str:
"""Replace a rendered ~suspense div in SX source with replacement content.
After _eval_slot, ~suspense expands to:
(div :id "sx-suspense-{id}" :data-suspense "{id}" :style "display:contents" ...)
This finds the balanced s-expression containing :data-suspense "{id}" and
replaces it with the given replacement string.
"""
marker = f':data-suspense "{stream_id}"'
idx = sx.find(marker)
if idx < 0:
return sx
# Walk backwards to find the opening paren of the containing (div ...)
start = sx.rfind("(", 0, idx)
if start < 0:
return sx
# Walk forward from start to find matching close paren (balanced)
depth = 0
i = start
while i < len(sx):
ch = sx[i]
if ch == "(":
depth += 1
elif ch == ")":
depth -= 1
if depth == 0:
return sx[:start] + replacement + sx[i + 1:]
elif ch == '"':
# Skip string contents
i += 1
while i < len(sx) and sx[i] != '"':
if sx[i] == "\\":
i += 1 # skip escaped char
i += 1
i += 1
return sx
async def execute_page(
page_def: PageDef,
service_name: str,
@@ -207,15 +251,47 @@ async def execute_page(
ctx = _get_request_context()
# Evaluate :data expression if present
_multi_stream_content = None
if page_def.data_expr is not None:
data_result = await async_eval(page_def.data_expr, env, ctx)
if isinstance(data_result, dict):
if hasattr(data_result, '__aiter__'):
# Multi-stream: consume generator, eval :content per chunk,
# combine into shell with resolved suspense slots.
chunks = []
async for chunk in data_result:
if not isinstance(chunk, dict):
continue
chunk = dict(chunk)
stream_id = chunk.pop("stream-id", "stream-content")
chunk_env = dict(env)
for k, v in chunk.items():
chunk_env[k.replace("_", "-")] = v
chunk_sx = await _eval_slot(page_def.content_expr, chunk_env, ctx) if page_def.content_expr else ""
chunks.append((stream_id, chunk_sx))
# Build content: if :shell exists, render it and inline resolved chunks
if page_def.shell_expr is not None:
shell_sx = await _eval_slot(page_def.shell_expr, env, ctx)
# Replace each rendered suspense div with resolved content.
# _eval_slot expands ~suspense into:
# (div :id "sx-suspense-X" :data-suspense "X" :style "display:contents" ...)
# We find the balanced s-expr containing :data-suspense "X" and replace it.
for stream_id, chunk_sx in chunks:
shell_sx = _replace_suspense_sexp(shell_sx, stream_id, chunk_sx)
_multi_stream_content = shell_sx
else:
# No shell: just concatenate all chunks in a fragment
parts = " ".join(sx for _, sx in chunks)
_multi_stream_content = f"(<> {parts})"
elif isinstance(data_result, dict):
# Merge with kebab-case keys so SX symbols can reference them
for k, v in data_result.items():
env[k.replace("_", "-")] = v
# Render content slot (required)
content_sx = await _eval_slot(page_def.content_expr, env, ctx)
if _multi_stream_content is not None:
content_sx = _multi_stream_content
else:
content_sx = await _eval_slot(page_def.content_expr, env, ctx)
# Render optional slots
filter_sx = ""
@@ -391,25 +467,60 @@ async def execute_page_streaming(
# --- Launch concurrent IO tasks (inherit context via create_task) ---
_stream_queue: asyncio.Queue = asyncio.Queue()
_multi_stream = False
async def _eval_data_and_content():
data_env = dict(env)
if page_def.data_expr is not None:
data_result = await async_eval(page_def.data_expr, data_env, ctx)
if isinstance(data_result, dict):
for k, v in data_result.items():
data_env[k.replace("_", "-")] = v
content_sx = await _eval_slot(page_def.content_expr, data_env, ctx) if page_def.content_expr else ""
filter_sx = await _eval_slot(page_def.filter_expr, data_env, ctx) if page_def.filter_expr else ""
aside_sx = await _eval_slot(page_def.aside_expr, data_env, ctx) if page_def.aside_expr else ""
menu_sx = await _eval_slot(page_def.menu_expr, data_env, ctx) if page_def.menu_expr else ""
return content_sx, filter_sx, aside_sx, menu_sx
"""Evaluate :data then :content.
If :data returns an async generator (multi-stream mode), iterate it
and push each (stream_id, content_sx) to _stream_queue incrementally.
The main stream loop drains the queue and sends resolve scripts as
items arrive — giving true staggered streaming.
"""
nonlocal _multi_stream
try:
data_env = dict(env)
if page_def.data_expr is not None:
data_result = await async_eval(page_def.data_expr, data_env, ctx)
# Async generator: multi-stream mode
if hasattr(data_result, '__aiter__'):
_multi_stream = True
async for chunk in data_result:
if not isinstance(chunk, dict):
continue
chunk = dict(chunk) # copy so pop doesn't mutate
stream_id = chunk.pop("stream-id", "stream-content")
chunk_env = dict(env)
for k, v in chunk.items():
chunk_env[k.replace("_", "-")] = v
content_sx = await _eval_slot(page_def.content_expr, chunk_env, ctx) if page_def.content_expr else ""
await _stream_queue.put(("data", stream_id, content_sx))
await _stream_queue.put(("data-done",))
return
if isinstance(data_result, dict):
for k, v in data_result.items():
data_env[k.replace("_", "-")] = v
content_sx = await _eval_slot(page_def.content_expr, data_env, ctx) if page_def.content_expr else ""
filter_sx = await _eval_slot(page_def.filter_expr, data_env, ctx) if page_def.filter_expr else ""
aside_sx = await _eval_slot(page_def.aside_expr, data_env, ctx) if page_def.aside_expr else ""
menu_sx = await _eval_slot(page_def.menu_expr, data_env, ctx) if page_def.menu_expr else ""
await _stream_queue.put(("data-single", content_sx, filter_sx, aside_sx, menu_sx))
except Exception as e:
logger.error("Streaming data task failed: %s", e)
await _stream_queue.put(("data-done",))
async def _eval_headers():
if layout is None:
return "", ""
rows = await layout.full_headers(tctx, **layout_kwargs)
menu = await layout.mobile_menu(tctx, **layout_kwargs)
return rows, menu
try:
if layout is None:
await _stream_queue.put(("headers", "", ""))
return
rows = await layout.full_headers(tctx, **layout_kwargs)
menu = await layout.mobile_menu(tctx, **layout_kwargs)
await _stream_queue.put(("headers", rows, menu))
except Exception as e:
logger.error("Streaming headers task failed: %s", e)
await _stream_queue.put(("headers", "", ""))
data_task = asyncio.create_task(_eval_data_and_content())
header_task = asyncio.create_task(_eval_headers())
@@ -419,7 +530,15 @@ async def execute_page_streaming(
# No dependency on sx-browser.js boot timing for the initial shell.
suspense_header_sx = f'(~suspense :id "stream-headers" :fallback {header_fallback})'
suspense_content_sx = f'(~suspense :id "stream-content" :fallback {fallback_sx})'
# When :shell is provided, it renders directly as the content slot
# (it contains its own ~suspense for the data-dependent part).
# Otherwise, wrap the entire :content in a single suspense.
if page_def.shell_expr is not None:
shell_content_sx = await _eval_slot(page_def.shell_expr, env, ctx)
suspense_content_sx = shell_content_sx
else:
suspense_content_sx = f'(~suspense :id "stream-content" :fallback {fallback_sx})'
initial_page_html = await _helpers_render_to_html("app-body",
header_rows=SxExpr(suspense_header_sx),
@@ -434,7 +553,10 @@ async def execute_page_streaming(
content_ref = ""
if page_def.content_expr is not None:
content_ref = sx_serialize(page_def.content_expr)
page_sx_for_scan = f'(<> {layout_refs} {content_ref} (~app-body :header-rows {suspense_header_sx} :content {suspense_content_sx}))'
shell_ref = ""
if page_def.shell_expr is not None:
shell_ref = sx_serialize(page_def.shell_expr)
page_sx_for_scan = f'(<> {layout_refs} {content_ref} {shell_ref} (~app-body :header-rows {suspense_header_sx} :content {suspense_content_sx}))'
shell, tail = sx_page_streaming_parts(
tctx, initial_page_html, page_sx=page_sx_for_scan,
)
@@ -476,36 +598,244 @@ async def execute_page_streaming(
async def _stream_chunks():
yield shell + tail
tasks = {data_task: "data", header_task: "headers"}
pending = set(tasks.keys())
while pending:
done, pending = await asyncio.wait(
pending, return_when=asyncio.FIRST_COMPLETED,
)
for task in done:
label = tasks[task]
try:
result = task.result()
except Exception as e:
logger.error("Streaming %s task failed: %s", label, e)
continue
if label == "data":
content_sx, filter_sx, aside_sx, menu_sx = result
extras = _extra_defs(content_sx)
yield sx_streaming_resolve_script("stream-content", content_sx, extras)
elif label == "headers":
header_rows, header_menu = result
# Both tasks push tagged items onto _stream_queue. We drain until
# both are done. Items: ("headers", rows, menu), ("data-single", ...),
# ("data", stream_id, sx), ("data-done",).
remaining = 2 # waiting for: headers + data
while remaining > 0:
item = await _stream_queue.get()
kind = item[0]
try:
if kind == "headers":
_, header_rows, header_menu = item
remaining -= 1
if header_rows:
extras = _extra_defs(header_rows)
yield sx_streaming_resolve_script("stream-headers", header_rows, extras)
elif kind == "data-single":
_, content_sx, filter_sx, aside_sx, menu_sx = item
remaining -= 1
extras = _extra_defs(content_sx)
yield sx_streaming_resolve_script("stream-content", content_sx, extras)
elif kind == "data":
_, stream_id, content_sx = item
extras = _extra_defs(content_sx)
yield sx_streaming_resolve_script(stream_id, content_sx, extras)
elif kind == "data-done":
remaining -= 1
except Exception as e:
logger.error("Streaming resolve failed for %s: %s", kind, e)
yield "\n</body>\n</html>"
return _stream_chunks()
async def execute_page_streaming_oob(
page_def: PageDef,
service_name: str,
url_params: dict[str, Any] | None = None,
):
"""Execute a streaming page for HTMX/SX requests.
Like execute_page_streaming but yields OOB SX swap format instead of a
full HTML document:
1. First yield: OOB SX with shell content (suspense skeletons) + CSS + defs
2. Subsequent yields: __sxResolve script tags as data resolves
The client uses streaming fetch (ReadableStream) to process the OOB swap
immediately and then execute resolve scripts as they arrive.
"""
import asyncio
from .jinja_bridge import get_component_env, _get_request_context
from .async_eval import async_eval
from .page import get_template_context
from .helpers import (
oob_page_sx,
sx_streaming_resolve_script,
components_for_request,
SxExpr,
)
from .parser import serialize as sx_serialize
from .layouts import get_layout
if url_params is None:
url_params = {}
env = dict(get_component_env())
env.update(get_page_helpers(service_name))
env.update(page_def.closure)
for key, val in url_params.items():
kebab = key.replace("_", "-")
env[kebab] = val
env[key] = val
ctx = _get_request_context()
# Evaluate shell with suspense skeletons (no data yet)
shell_sx = ""
if page_def.shell_expr is not None:
shell_sx = await _eval_slot(page_def.shell_expr, env, ctx)
# Build initial OOB response with shell as content
tctx = await get_template_context()
# Resolve layout for OOB headers
layout = None
layout_kwargs: dict[str, Any] = {}
if page_def.layout is not None:
if isinstance(page_def.layout, str):
layout_name = page_def.layout
elif isinstance(page_def.layout, list):
from .types import Keyword as SxKeyword, Symbol as SxSymbol
raw = page_def.layout
first = raw[0]
layout_name = (
first.name if isinstance(first, (SxKeyword, SxSymbol))
else str(first)
)
i = 1
while i < len(raw):
k = raw[i]
if isinstance(k, SxKeyword) and i + 1 < len(raw):
resolved = await async_eval(raw[i + 1], env, ctx)
layout_kwargs[k.name.replace("-", "_")] = resolved
i += 2
else:
i += 1
else:
layout_name = str(page_def.layout)
layout = get_layout(layout_name)
# Launch concurrent tasks
_stream_queue: asyncio.Queue = asyncio.Queue()
async def _eval_data():
try:
if page_def.data_expr is not None:
data_result = await async_eval(page_def.data_expr, env, ctx)
if hasattr(data_result, '__aiter__'):
async for chunk in data_result:
if not isinstance(chunk, dict):
continue
chunk = dict(chunk)
stream_id = chunk.pop("stream-id", "stream-content")
chunk_env = dict(env)
for k, v in chunk.items():
chunk_env[k.replace("_", "-")] = v
content_sx = await _eval_slot(page_def.content_expr, chunk_env, ctx) if page_def.content_expr else ""
await _stream_queue.put(("data", stream_id, content_sx))
await _stream_queue.put(("data-done",))
return
await _stream_queue.put(("data-done",))
except Exception as e:
logger.error("Streaming OOB data task failed: %s", e)
await _stream_queue.put(("data-done",))
async def _eval_oob_headers():
try:
if layout is not None:
oob_headers = await layout.oob_headers(tctx, **layout_kwargs)
await _stream_queue.put(("headers", oob_headers))
else:
await _stream_queue.put(("headers", ""))
except Exception as e:
logger.error("Streaming OOB headers task failed: %s", e)
await _stream_queue.put(("headers", ""))
data_task = asyncio.create_task(_eval_data())
header_task = asyncio.create_task(_eval_oob_headers())
# Build initial OOB body with shell content (skeletons in place)
oob_body = await oob_page_sx(
oobs="", # headers will arrive via resolve script
content=shell_sx,
)
# Prepend component definitions + CSS (like sx_response does)
from quart import request
comp_defs = components_for_request(oob_body)
body = oob_body
if comp_defs:
body = (f'<script type="text/sx" data-components>'
f'{comp_defs}</script>\n{body}')
from .css_registry import scan_classes_from_sx, lookup_rules, registry_loaded
if registry_loaded():
new_classes = scan_classes_from_sx(oob_body)
if comp_defs:
new_classes.update(scan_classes_from_sx(comp_defs))
known_raw = request.headers.get("SX-Css", "")
if known_raw:
from .css_registry import lookup_css_hash
if len(known_raw) <= 16:
looked_up = lookup_css_hash(known_raw)
known_classes = looked_up if looked_up is not None else set()
else:
known_classes = set(known_raw.split(","))
new_classes -= known_classes
if new_classes:
new_rules = lookup_rules(new_classes)
if new_rules:
body = f'<style data-sx-css>{new_rules}</style>\n{body}'
# Capture component env for extra defs in resolve chunks
from .jinja_bridge import components_for_page as _comp_scan
_base_scan = oob_body
def _extra_defs(sx_source: str) -> str:
from .deps import components_needed
comp_env = dict(get_component_env())
base_needed = components_needed(_base_scan, comp_env)
resolve_needed = components_needed(sx_source, comp_env)
extra = resolve_needed - base_needed
if not extra:
return ""
from .parser import serialize
from .types import Component
parts = []
for key, val in comp_env.items():
if isinstance(val, Component) and (f"~{val.name}" in extra or key in extra):
param_strs = ["&key"] + list(val.params)
if val.has_children:
param_strs.extend(["&rest", "children"])
params_sx = "(" + " ".join(param_strs) + ")"
body_sx = serialize(val.body, pretty=True)
parts.append(f"(defcomp ~{val.name} {params_sx} {body_sx})")
return "\n".join(parts)
# Yield chunks
async def _stream_oob_chunks():
# First chunk: OOB swap with skeletons
yield body
# Drain queue for resolve scripts
remaining = 2 # headers + data
while remaining > 0:
item = await _stream_queue.get()
kind = item[0]
try:
if kind == "headers":
_, oob_hdr = item
remaining -= 1
# Headers don't need resolve scripts for OOB — they're
# handled by OOB swap attributes in the SX content itself.
# But if we have header content, send a resolve for it.
if oob_hdr:
extras = _extra_defs(oob_hdr)
yield sx_streaming_resolve_script("stream-headers", oob_hdr, extras)
elif kind == "data":
_, stream_id, content_sx = item
extras = _extra_defs(content_sx)
yield sx_streaming_resolve_script(stream_id, content_sx, extras)
elif kind == "data-done":
remaining -= 1
except Exception as e:
logger.error("Streaming OOB resolve failed for %s: %s", kind, e)
return _stream_oob_chunks()
# ---------------------------------------------------------------------------
# Blueprint mounting
# ---------------------------------------------------------------------------
@@ -556,19 +886,17 @@ def _mount_one_page(bp: Any, service_name: str, page_def: PageDef) -> None:
from quart import make_response, Response
if page_def.stream:
# Streaming response: yields HTML chunks as IO resolves
# Streaming response: yields chunks as IO resolves
async def page_view(**kwargs: Any) -> Any:
from shared.browser.app.utils.htmx import is_htmx_request
current = get_page(service_name, page_def.name) or page_def
# Only stream for full page loads (not SX/HTMX requests)
if is_htmx_request():
result = await execute_page(current, service_name, url_params=kwargs)
if hasattr(result, "status_code"):
return result
return await make_response(result, 200)
# execute_page_streaming does all context-dependent setup as a
# regular async function (while request context is live), then
# returns an async generator that only yields strings.
# Streaming OOB: shell with skeletons first, then resolve scripts
gen = await execute_page_streaming_oob(
current, service_name, url_params=kwargs,
)
return Response(gen, content_type="text/sx; charset=utf-8")
# Full page streaming: HTML document with inline resolve scripts
gen = await execute_page_streaming(
current, service_name, url_params=kwargs,
)
@@ -696,7 +1024,14 @@ async def evaluate_page_data(
data_result = await async_eval(page_def.data_expr, env, ctx)
# Kebab-case dict keys (matching execute_page line 214-215)
# Multi-stream: async generator can't be serialized as a single dict.
# Return nil to signal the client to fall back to server-side rendering.
if hasattr(data_result, '__aiter__'):
# Close the generator cleanly
await data_result.aclose()
return "nil"
# Kebab-case dict keys (matching execute_page)
if isinstance(data_result, dict):
data_result = {
k.replace("_", "-"): v for k, v in data_result.items()