Moved flash-sale, settle-data, search-products/events/posts, and catalog endpoints from bp/pages/routes.py into sx/sx/handlers/reactive-api.sx. routes.py now contains only the SSE endpoint (async generators need Python). Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
34 lines
1.3 KiB
Python
34 lines
1.3 KiB
Python
"""SX docs page routes.
|
|
|
|
Page GET routes are defined declaratively in sxc/pages/docs.sx via defpage.
|
|
API endpoints are defined in sx/handlers/*.sx via defhandler.
|
|
This file contains only SSE endpoints that need Python (async generators).
|
|
"""
|
|
from __future__ import annotations
|
|
|
|
import asyncio
|
|
from datetime import datetime
|
|
|
|
from quart import Blueprint, Response
|
|
|
|
|
|
def register(url_prefix: str = "/") -> Blueprint:
|
|
bp = Blueprint("pages", __name__, url_prefix=url_prefix)
|
|
|
|
# ------------------------------------------------------------------
|
|
# SSE — async generator, fundamentally not expressible in SX
|
|
# ------------------------------------------------------------------
|
|
|
|
@bp.get("/sx/(geography.(hypermedia.(reference.(api.sse-time))))")
|
|
async def ref_sse_time():
|
|
async def generate():
|
|
for _ in range(30): # stream for 60 seconds max
|
|
now = datetime.now().strftime("%H:%M:%S")
|
|
sx_src = f'(span :class "text-emerald-700 font-mono text-sm" "Server time: {now}")'
|
|
yield f"event: time\ndata: {sx_src}\n\n"
|
|
await asyncio.sleep(2)
|
|
return Response(generate(), content_type="text/event-stream",
|
|
headers={"Cache-Control": "no-cache", "X-Accel-Buffering": "no"})
|
|
|
|
return bp
|