"""SX docs page routes. Page GET routes are defined declaratively in sxc/pages/docs.sx via defpage. API endpoints are defined in sx/handlers/*.sx via defhandler. This file contains only SSE endpoints that need Python (async generators). """ from __future__ import annotations import asyncio from datetime import datetime from quart import Blueprint, Response def register(url_prefix: str = "/") -> Blueprint: bp = Blueprint("pages", __name__, url_prefix=url_prefix) # ------------------------------------------------------------------ # SSE — async generator, fundamentally not expressible in SX # ------------------------------------------------------------------ @bp.get("/sx/(geography.(hypermedia.(reference.(api.sse-time))))") async def ref_sse_time(): async def generate(): for _ in range(30): # stream for 60 seconds max now = datetime.now().strftime("%H:%M:%S") sx_src = f'(span :class "text-emerald-700 font-mono text-sm" "Server time: {now}")' yield f"event: time\ndata: {sx_src}\n\n" await asyncio.sleep(2) return Response(generate(), content_type="text/event-stream", headers={"Cache-Control": "no-cache", "X-Accel-Buffering": "no"}) return bp