Files
mono/sx/bp/pages/routes.py
giles 213421516e Add SSE, response headers, view transitions, and 5 new sx attributes
Implement missing SxEngine features:
- SSE (sx-sse, sx-sse-swap) with EventSource management and auto-cleanup
- Response headers: SX-Trigger, SX-Retarget, SX-Reswap, SX-Redirect,
  SX-Refresh, SX-Location, SX-Replace-Url, SX-Trigger-After-Swap/Settle
- View Transitions API: transition:true swap modifier + global config
- every:<time> trigger for polling (setInterval)
- sx-replace-url (replaceState instead of pushState)
- sx-disabled-elt (disable elements during request)
- sx-prompt (window.prompt, value sent as SX-Prompt header)
- sx-params (filter form parameters: *, none, not x,y, x,y)

Adds docs (ATTR_DETAILS, BEHAVIOR_ATTRS, headers, events), demo
components in reference.sx, API endpoints (prompt-echo, sse-time),
and 27 new unit tests for engine logic.

Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
2026-03-04 11:55:21 +00:00

905 lines
39 KiB
Python

"""SX docs page routes.
Page GET routes are defined declaratively in sxc/pages/docs.sx via defpage.
This file contains only redirect routes and example API endpoints.
"""
from __future__ import annotations
import asyncio
import json
import random
from datetime import datetime
from uuid import uuid4
from quart import Blueprint, Response, make_response, request
from shared.browser.app.csrf import csrf_exempt
def register(url_prefix: str = "/") -> Blueprint:
bp = Blueprint("pages", __name__, url_prefix=url_prefix)
# ------------------------------------------------------------------
# Example API endpoints (for live demos)
# ------------------------------------------------------------------
@bp.get("/examples/api/click")
async def api_click():
from shared.sx.helpers import sx_response
from sxc.pages import _oob_code, _component_source_text, _full_wire_text
now = datetime.now().strftime("%Y-%m-%d %H:%M:%S")
sx_src = f'(~click-result :time "{now}")'
comp_text = _component_source_text("click-result")
wire_text = _full_wire_text(sx_src, "click-result")
oob_wire = _oob_code("click-wire", wire_text)
oob_comp = _oob_code("click-comp", comp_text)
return sx_response(f'(<> {sx_src} {oob_wire} {oob_comp})')
@csrf_exempt
@bp.post("/examples/api/form")
async def api_form():
from shared.sx.helpers import sx_response
from sxc.pages import _oob_code, _component_source_text, _full_wire_text
form = await request.form
name = form.get("name", "")
escaped = name.replace('"', '\\"')
sx_src = f'(~form-result :name "{escaped}")'
comp_text = _component_source_text("form-result")
wire_text = _full_wire_text(sx_src, "form-result")
oob_wire = _oob_code("form-wire", wire_text)
oob_comp = _oob_code("form-comp", comp_text)
return sx_response(f'(<> {sx_src} {oob_wire} {oob_comp})')
_poll_count = {"n": 0}
@bp.get("/examples/api/poll")
async def api_poll():
from shared.sx.helpers import sx_response
from sxc.pages import _oob_code, _component_source_text, _full_wire_text
_poll_count["n"] += 1
now = datetime.now().strftime("%H:%M:%S")
count = min(_poll_count["n"], 10)
sx_src = f'(~poll-result :time "{now}" :count {count})'
comp_text = _component_source_text("poll-result")
wire_text = _full_wire_text(sx_src, "poll-result")
oob_wire = _oob_code("poll-wire", wire_text)
oob_comp = _oob_code("poll-comp", comp_text)
return sx_response(f'(<> {sx_src} {oob_wire} {oob_comp})')
@csrf_exempt
@bp.delete("/examples/api/delete/<item_id>")
async def api_delete(item_id: str):
from shared.sx.helpers import sx_response
from sxc.pages import _oob_code, _component_source_text, _full_wire_text
# Empty primary response — outerHTML swap removes the row
# But send OOB swaps to show what happened
wire_text = _full_wire_text(f'(empty — row #{item_id} removed by outerHTML swap)')
comp_text = _component_source_text("delete-row")
oob_wire = _oob_code("delete-wire", wire_text)
oob_comp = _oob_code("delete-comp", comp_text)
return sx_response(f'(<> {oob_wire} {oob_comp})')
@bp.get("/examples/api/edit")
async def api_edit_form():
from shared.sx.helpers import sx_response
from sxc.pages import _oob_code, _component_source_text, _full_wire_text
value = request.args.get("value", "")
escaped = value.replace('"', '\\"')
sx_src = f'(~inline-edit-form :value "{escaped}")'
comp_text = _component_source_text("inline-edit-form")
wire_text = _full_wire_text(sx_src, "inline-edit-form")
oob_wire = _oob_code("edit-wire", wire_text)
oob_comp = _oob_code("edit-comp", comp_text)
return sx_response(f'(<> {sx_src} {oob_wire} {oob_comp})')
@csrf_exempt
@bp.post("/examples/api/edit")
async def api_edit_save():
from shared.sx.helpers import sx_response
from sxc.pages import _oob_code, _component_source_text, _full_wire_text
form = await request.form
value = form.get("value", "")
escaped = value.replace('"', '\\"')
sx_src = f'(~inline-view :value "{escaped}")'
comp_text = _component_source_text("inline-view")
wire_text = _full_wire_text(sx_src, "inline-view")
oob_wire = _oob_code("edit-wire", wire_text)
oob_comp = _oob_code("edit-comp", comp_text)
return sx_response(f'(<> {sx_src} {oob_wire} {oob_comp})')
@bp.get("/examples/api/edit/cancel")
async def api_edit_cancel():
from shared.sx.helpers import sx_response
from sxc.pages import _oob_code, _component_source_text, _full_wire_text
value = request.args.get("value", "")
escaped = value.replace('"', '\\"')
sx_src = f'(~inline-view :value "{escaped}")'
comp_text = _component_source_text("inline-view")
wire_text = _full_wire_text(sx_src, "inline-view")
oob_wire = _oob_code("edit-wire", wire_text)
oob_comp = _oob_code("edit-comp", comp_text)
return sx_response(f'(<> {sx_src} {oob_wire} {oob_comp})')
@bp.get("/examples/api/oob")
async def api_oob():
from shared.sx.helpers import sx_response
from sxc.pages import _oob_code, _full_wire_text
now = datetime.now().strftime("%H:%M:%S")
sx_src = (
f'(<>'
f' (p :class "text-emerald-600 font-medium" "Box A updated!")'
f' (p :class "text-sm text-stone-500" "at {now}")'
f' (div :id "oob-box-b" :sx-swap-oob "innerHTML"'
f' (p :class "text-violet-600 font-medium" "Box B updated via OOB!")'
f' (p :class "text-sm text-stone-500" "at {now}")))'
)
wire_text = _full_wire_text(sx_src)
oob_wire = _oob_code("oob-wire", wire_text)
return sx_response(f'(<> {sx_src} {oob_wire})')
# --- Lazy Loading ---
@bp.get("/examples/api/lazy")
async def api_lazy():
from shared.sx.helpers import sx_response
from sxc.pages import _oob_code, _component_source_text, _full_wire_text
now = datetime.now().strftime("%H:%M:%S")
sx_src = f'(~lazy-result :time "{now}")'
comp_text = _component_source_text("lazy-result")
wire_text = _full_wire_text(sx_src, "lazy-result")
oob_wire = _oob_code("lazy-wire", wire_text)
oob_comp = _oob_code("lazy-comp", comp_text)
return sx_response(f'(<> {sx_src} {oob_wire} {oob_comp})')
# --- Infinite Scroll ---
@bp.get("/examples/api/scroll")
async def api_scroll():
from shared.sx.helpers import sx_response
from sxc.pages import _oob_code, _full_wire_text
page = int(request.args.get("page", 2))
start = (page - 1) * 5 + 1
next_page = page + 1
items_html = " ".join(
f'(div :class "px-4 py-3 border-b border-stone-100 text-sm text-stone-700" "Item {i} — loaded from page {page}")'
for i in range(start, start + 5)
)
if next_page <= 6:
sentinel = (
f'(div :id "scroll-sentinel"'
f' :sx-get "/examples/api/scroll?page={next_page}"'
f' :sx-trigger "intersect once"'
f' :sx-target "#scroll-items"'
f' :sx-swap "beforeend"'
f' :class "p-3 text-center text-stone-400 text-sm"'
f' "Loading more...")'
)
else:
sentinel = (
'(div :class "p-3 text-center text-stone-500 text-sm font-medium"'
' "All items loaded.")'
)
sx_src = f'(<> {items_html} {sentinel})'
wire_text = _full_wire_text(sx_src)
oob_wire = _oob_code("scroll-wire", wire_text)
return sx_response(f'(<> {sx_src} {oob_wire})')
# --- Progress Bar ---
_jobs: dict[str, int] = {}
@csrf_exempt
@bp.post("/examples/api/progress/start")
async def api_progress_start():
from shared.sx.helpers import sx_response
from sxc.pages import _oob_code, _component_source_text, _full_wire_text
job_id = str(uuid4())[:8]
_jobs[job_id] = 0
sx_src = f'(~progress-status :percent 0 :job-id "{job_id}")'
comp_text = _component_source_text("progress-status")
wire_text = _full_wire_text(sx_src, "progress-status")
oob_wire = _oob_code("progress-wire", wire_text)
oob_comp = _oob_code("progress-comp", comp_text)
return sx_response(f'(<> {sx_src} {oob_wire} {oob_comp})')
@bp.get("/examples/api/progress/status")
async def api_progress_status():
from shared.sx.helpers import sx_response
from sxc.pages import _oob_code, _component_source_text, _full_wire_text
job_id = request.args.get("job", "")
current = _jobs.get(job_id, 0)
current = min(current + random.randint(15, 30), 100)
_jobs[job_id] = current
sx_src = f'(~progress-status :percent {current} :job-id "{job_id}")'
comp_text = _component_source_text("progress-status")
wire_text = _full_wire_text(sx_src, "progress-status")
oob_wire = _oob_code("progress-wire", wire_text)
oob_comp = _oob_code("progress-comp", comp_text)
return sx_response(f'(<> {sx_src} {oob_wire} {oob_comp})')
# --- Active Search ---
@bp.get("/examples/api/search")
async def api_search():
from shared.sx.helpers import sx_response
from sxc.pages import _oob_code, _component_source_text, _full_wire_text
from content.pages import SEARCH_LANGUAGES
q = request.args.get("q", "").strip().lower()
if not q:
results = SEARCH_LANGUAGES
else:
results = [lang for lang in SEARCH_LANGUAGES if q in lang.lower()]
items_sx = " ".join(f'"{r}"' for r in results)
escaped_q = q.replace('"', '\\"')
sx_src = f'(~search-results :items (list {items_sx}) :query "{escaped_q}")'
comp_text = _component_source_text("search-results")
wire_text = _full_wire_text(sx_src, "search-results")
oob_wire = _oob_code("search-wire", wire_text)
oob_comp = _oob_code("search-comp", comp_text)
return sx_response(f'(<> {sx_src} {oob_wire} {oob_comp})')
# --- Inline Validation ---
_TAKEN_EMAILS = {"admin@example.com", "test@example.com", "user@example.com"}
@bp.get("/examples/api/validate")
async def api_validate():
from shared.sx.helpers import sx_response
from sxc.pages import _oob_code, _component_source_text, _full_wire_text
email = request.args.get("email", "").strip()
if not email:
sx_src = '(~validation-error :message "Email is required")'
comp_name = "validation-error"
elif "@" not in email or "." not in email.split("@")[-1]:
sx_src = '(~validation-error :message "Invalid email format")'
comp_name = "validation-error"
elif email.lower() in _TAKEN_EMAILS:
escaped = email.replace('"', '\\"')
sx_src = f'(~validation-error :message "{escaped} is already taken")'
comp_name = "validation-error"
else:
escaped = email.replace('"', '\\"')
sx_src = f'(~validation-ok :email "{escaped}")'
comp_name = "validation-ok"
comp_text = _component_source_text(comp_name)
wire_text = _full_wire_text(sx_src, comp_name)
oob_wire = _oob_code("validate-wire", wire_text)
oob_comp = _oob_code("validate-comp", comp_text)
return sx_response(f'(<> {sx_src} {oob_wire} {oob_comp})')
@csrf_exempt
@bp.post("/examples/api/validate/submit")
async def api_validate_submit():
from shared.sx.helpers import sx_response
form = await request.form
email = form.get("email", "").strip()
if not email or "@" not in email:
return sx_response('(p :class "text-sm text-rose-600 mt-2" "Please enter a valid email.")')
escaped = email.replace('"', '\\"')
return sx_response(f'(p :class "text-sm text-emerald-600 mt-2" "Form submitted with: {escaped}")')
# --- Value Select ---
@bp.get("/examples/api/values")
async def api_values():
from shared.sx.helpers import sx_response
from sxc.pages import _oob_code, _full_wire_text
from content.pages import VALUE_SELECT_DATA
cat = request.args.get("category", "")
items = VALUE_SELECT_DATA.get(cat, [])
options_sx = " ".join(f'(option :value "{i}" "{i}")' for i in items)
if not options_sx:
options_sx = '(option :value "" "No items")'
sx_src = f'(<> {options_sx})'
wire_text = _full_wire_text(sx_src)
oob_wire = _oob_code("values-wire", wire_text)
return sx_response(f'(<> {options_sx} {oob_wire})')
# --- Reset on Submit ---
@csrf_exempt
@bp.post("/examples/api/reset-submit")
async def api_reset_submit():
from shared.sx.helpers import sx_response
from sxc.pages import _oob_code, _component_source_text, _full_wire_text
form = await request.form
msg = form.get("message", "").strip() or "(empty)"
escaped = msg.replace('"', '\\"')
now = datetime.now().strftime("%H:%M:%S")
sx_src = f'(~reset-message :message "{escaped}" :time "{now}")'
comp_text = _component_source_text("reset-message")
wire_text = _full_wire_text(sx_src, "reset-message")
oob_wire = _oob_code("reset-wire", wire_text)
oob_comp = _oob_code("reset-comp", comp_text)
return sx_response(f'(<> {sx_src} {oob_wire} {oob_comp})')
# --- Edit Row ---
_edit_rows: dict[str, dict] = {}
def _get_edit_rows() -> dict[str, dict]:
if not _edit_rows:
from content.pages import EDIT_ROW_DATA
for r in EDIT_ROW_DATA:
_edit_rows[r["id"]] = dict(r)
return _edit_rows
@bp.get("/examples/api/editrow/<row_id>")
async def api_editrow_form(row_id: str):
from shared.sx.helpers import sx_response
from sxc.pages import _oob_code, _component_source_text, _full_wire_text
rows = _get_edit_rows()
row = rows.get(row_id, {"id": row_id, "name": "", "price": "0", "stock": "0"})
sx_src = (f'(~edit-row-form :id "{row["id"]}" :name "{row["name"]}"'
f' :price "{row["price"]}" :stock "{row["stock"]}")')
comp_text = _component_source_text("edit-row-form")
wire_text = _full_wire_text(sx_src, "edit-row-form")
oob_wire = _oob_code("editrow-wire", wire_text)
oob_comp = _oob_code("editrow-comp", comp_text)
return sx_response(f'(<> {sx_src} {oob_wire} {oob_comp})')
@csrf_exempt
@bp.post("/examples/api/editrow/<row_id>")
async def api_editrow_save(row_id: str):
from shared.sx.helpers import sx_response
from sxc.pages import _oob_code, _component_source_text, _full_wire_text
form = await request.form
rows = _get_edit_rows()
rows[row_id] = {
"id": row_id,
"name": form.get("name", ""),
"price": form.get("price", "0"),
"stock": form.get("stock", "0"),
}
row = rows[row_id]
sx_src = (f'(~edit-row-view :id "{row["id"]}" :name "{row["name"]}"'
f' :price "{row["price"]}" :stock "{row["stock"]}")')
comp_text = _component_source_text("edit-row-view")
wire_text = _full_wire_text(sx_src, "edit-row-view")
oob_wire = _oob_code("editrow-wire", wire_text)
oob_comp = _oob_code("editrow-comp", comp_text)
return sx_response(f'(<> {sx_src} {oob_wire} {oob_comp})')
@bp.get("/examples/api/editrow/<row_id>/cancel")
async def api_editrow_cancel(row_id: str):
from shared.sx.helpers import sx_response
from sxc.pages import _oob_code, _component_source_text, _full_wire_text
rows = _get_edit_rows()
row = rows.get(row_id, {"id": row_id, "name": "", "price": "0", "stock": "0"})
sx_src = (f'(~edit-row-view :id "{row["id"]}" :name "{row["name"]}"'
f' :price "{row["price"]}" :stock "{row["stock"]}")')
comp_text = _component_source_text("edit-row-view")
wire_text = _full_wire_text(sx_src, "edit-row-view")
oob_wire = _oob_code("editrow-wire", wire_text)
oob_comp = _oob_code("editrow-comp", comp_text)
return sx_response(f'(<> {sx_src} {oob_wire} {oob_comp})')
# --- Bulk Update ---
_bulk_users: dict[str, dict] = {}
def _get_bulk_users() -> dict[str, dict]:
if not _bulk_users:
from content.pages import BULK_USERS
for u in BULK_USERS:
_bulk_users[u["id"]] = dict(u)
return _bulk_users
@csrf_exempt
@bp.post("/examples/api/bulk")
async def api_bulk():
from shared.sx.helpers import sx_response
from sxc.pages import _oob_code, _component_source_text, _full_wire_text
action = request.args.get("action", "activate")
form = await request.form
ids = form.getlist("ids")
users = _get_bulk_users()
new_status = "active" if action == "activate" else "inactive"
for uid in ids:
if uid in users:
users[uid]["status"] = new_status
rows = []
for u in users.values():
rows.append(
f'(~bulk-row :id "{u["id"]}" :name "{u["name"]}"'
f' :email "{u["email"]}" :status "{u["status"]}")'
)
sx_src = f'(<> {" ".join(rows)})'
comp_text = _component_source_text("bulk-row")
wire_text = _full_wire_text(sx_src, "bulk-row")
oob_wire = _oob_code("bulk-wire", wire_text)
oob_comp = _oob_code("bulk-comp", comp_text)
return sx_response(f'(<> {sx_src} {oob_wire} {oob_comp})')
# --- Swap Positions ---
_swap_count = {"n": 0}
@csrf_exempt
@bp.post("/examples/api/swap-log")
async def api_swap_log():
from shared.sx.helpers import sx_response
from sxc.pages import _oob_code, _full_wire_text
mode = request.args.get("mode", "beforeend")
_swap_count["n"] += 1
now = datetime.now().strftime("%H:%M:%S")
n = _swap_count["n"]
entry = f'(div :class "px-3 py-2 text-sm text-stone-700" "[{now}] {mode} (#{n})")'
oob_counter = (
f'(span :id "swap-counter" :sx-swap-oob "innerHTML"'
f' :class "self-center text-sm text-stone-500" "Count: {n}")'
)
sx_src = f'(<> {entry} {oob_counter})'
wire_text = _full_wire_text(sx_src)
oob_wire = _oob_code("swap-wire", wire_text)
return sx_response(f'(<> {sx_src} {oob_wire})')
# --- Select Filter (dashboard) ---
@bp.get("/examples/api/dashboard")
async def api_dashboard():
from shared.sx.helpers import sx_response
from sxc.pages import _oob_code, _full_wire_text
now = datetime.now().strftime("%H:%M:%S")
sx_src = (
f'(<>'
f' (div :id "dash-header" :class "p-3 bg-violet-50 rounded mb-3"'
f' (h4 :class "font-semibold text-violet-800" "Dashboard Header")'
f' (p :class "text-sm text-violet-600" "Generated at {now}"))'
f' (div :id "dash-stats" :class "grid grid-cols-3 gap-3 mb-3"'
f' (div :class "p-3 bg-emerald-50 rounded text-center"'
f' (p :class "text-2xl font-bold text-emerald-700" "142")'
f' (p :class "text-xs text-emerald-600" "Users"))'
f' (div :class "p-3 bg-blue-50 rounded text-center"'
f' (p :class "text-2xl font-bold text-blue-700" "89")'
f' (p :class "text-xs text-blue-600" "Orders"))'
f' (div :class "p-3 bg-amber-50 rounded text-center"'
f' (p :class "text-2xl font-bold text-amber-700" "$4.2k")'
f' (p :class "text-xs text-amber-600" "Revenue")))'
f' (div :id "dash-footer" :class "p-3 bg-stone-50 rounded"'
f' (p :class "text-sm text-stone-500" "Last updated: {now}")))'
)
wire_text = _full_wire_text(sx_src)
oob_wire = _oob_code("filter-wire", wire_text)
return sx_response(f'(<> {sx_src} {oob_wire})')
# --- Tabs ---
_TAB_CONTENT = {
"tab1": ('(div (p :class "text-stone-700" "Welcome to the Overview tab.")'
' (p :class "text-stone-500 text-sm mt-2"'
' "This is the default tab content loaded via sx-get."))'),
"tab2": ('(div (p :class "text-stone-700" "Here are the details.")'
' (ul :class "mt-2 space-y-1 text-sm text-stone-600"'
' (li "Version: 1.0.0")'
' (li "Build: 2024-01-15")'
' (li "Engine: sx")))'),
"tab3": ('(div (p :class "text-stone-700" "Recent history:")'
' (ol :class "mt-2 space-y-1 text-sm text-stone-600 list-decimal list-inside"'
' (li "Initial release")'
' (li "Added component caching")'
' (li "Wire format v2")))'),
}
@bp.get("/examples/api/tabs/<tab>")
async def api_tabs(tab: str):
from shared.sx.helpers import sx_response
from sxc.pages import _oob_code, _full_wire_text
sx_src = _TAB_CONTENT.get(tab, _TAB_CONTENT["tab1"])
buttons = []
for t, label in [("tab1", "Overview"), ("tab2", "Details"), ("tab3", "History")]:
active = "true" if t == tab else "false"
buttons.append(f'(~tab-btn :tab "{t}" :label "{label}" :active "{active}")')
oob_tabs = (
f'(div :id "tab-buttons" :sx-swap-oob "innerHTML"'
f' :class "flex border-b border-stone-200"'
f' {" ".join(buttons)})'
)
wire_text = _full_wire_text(f'(<> {sx_src} {oob_tabs})', "tab-btn")
oob_wire = _oob_code("tabs-wire", wire_text)
return sx_response(f'(<> {sx_src} {oob_tabs} {oob_wire})')
# --- Animations ---
@bp.get("/examples/api/animate")
async def api_animate():
from shared.sx.helpers import sx_response
from sxc.pages import _oob_code, _component_source_text, _full_wire_text
colors = ["bg-violet-100", "bg-emerald-100", "bg-blue-100", "bg-amber-100", "bg-rose-100"]
color = random.choice(colors)
now = datetime.now().strftime("%H:%M:%S")
sx_src = f'(~anim-result :color "{color}" :time "{now}")'
comp_text = _component_source_text("anim-result")
wire_text = _full_wire_text(sx_src, "anim-result")
oob_wire = _oob_code("anim-wire", wire_text)
oob_comp = _oob_code("anim-comp", comp_text)
return sx_response(f'(<> {sx_src} {oob_wire} {oob_comp})')
# --- Dialogs ---
@bp.get("/examples/api/dialog")
async def api_dialog():
from shared.sx.helpers import sx_response
from sxc.pages import _oob_code, _component_source_text, _full_wire_text
sx_src = '(~dialog-modal :title "Confirm Action" :message "Are you sure you want to proceed? This is a demo dialog rendered entirely with sx components.")'
comp_text = _component_source_text("dialog-modal")
wire_text = _full_wire_text(sx_src, "dialog-modal")
oob_wire = _oob_code("dialog-wire", wire_text)
oob_comp = _oob_code("dialog-comp", comp_text)
return sx_response(f'(<> {sx_src} {oob_wire} {oob_comp})')
@bp.get("/examples/api/dialog/close")
async def api_dialog_close():
from shared.sx.helpers import sx_response
from sxc.pages import _oob_code, _full_wire_text
wire_text = _full_wire_text("(empty — dialog closed)")
oob_wire = _oob_code("dialog-wire", wire_text)
return sx_response(f'(<> {oob_wire})')
# --- Keyboard Shortcuts ---
_KBD_ACTIONS = {
"s": "Search panel activated",
"n": "New item created",
"h": "Help panel opened",
}
@bp.get("/examples/api/keyboard")
async def api_keyboard():
from shared.sx.helpers import sx_response
from sxc.pages import _oob_code, _component_source_text, _full_wire_text
key = request.args.get("key", "")
action = _KBD_ACTIONS.get(key, f"Unknown key: {key}")
escaped_action = action.replace('"', '\\"')
escaped_key = key.replace('"', '\\"')
sx_src = f'(~kbd-result :key "{escaped_key}" :action "{escaped_action}")'
comp_text = _component_source_text("kbd-result")
wire_text = _full_wire_text(sx_src, "kbd-result")
oob_wire = _oob_code("kbd-wire", wire_text)
oob_comp = _oob_code("kbd-comp", comp_text)
return sx_response(f'(<> {sx_src} {oob_wire} {oob_comp})')
# --- PUT / PATCH ---
_profile = {}
def _get_profile() -> dict:
if not _profile:
from content.pages import PROFILE_DEFAULT
_profile.update(PROFILE_DEFAULT)
return _profile
@bp.get("/examples/api/putpatch/edit-all")
async def api_pp_edit_all():
from shared.sx.helpers import sx_response
from sxc.pages import _oob_code, _component_source_text, _full_wire_text
p = _get_profile()
sx_src = f'(~pp-form-full :name "{p["name"]}" :email "{p["email"]}" :role "{p["role"]}")'
comp_text = _component_source_text("pp-form-full")
wire_text = _full_wire_text(sx_src, "pp-form-full")
oob_wire = _oob_code("pp-wire", wire_text)
oob_comp = _oob_code("pp-comp", comp_text)
return sx_response(f'(<> {sx_src} {oob_wire} {oob_comp})')
@csrf_exempt
@bp.put("/examples/api/putpatch")
async def api_pp_put():
from shared.sx.helpers import sx_response
from sxc.pages import _oob_code, _component_source_text, _full_wire_text
form = await request.form
p = _get_profile()
p["name"] = form.get("name", p["name"])
p["email"] = form.get("email", p["email"])
p["role"] = form.get("role", p["role"])
sx_src = f'(~pp-view :name "{p["name"]}" :email "{p["email"]}" :role "{p["role"]}")'
comp_text = _component_source_text("pp-view")
wire_text = _full_wire_text(sx_src, "pp-view")
oob_wire = _oob_code("pp-wire", wire_text)
oob_comp = _oob_code("pp-comp", comp_text)
return sx_response(f'(<> {sx_src} {oob_wire} {oob_comp})')
@bp.get("/examples/api/putpatch/cancel")
async def api_pp_cancel():
from shared.sx.helpers import sx_response
from sxc.pages import _oob_code, _component_source_text, _full_wire_text
p = _get_profile()
sx_src = f'(~pp-view :name "{p["name"]}" :email "{p["email"]}" :role "{p["role"]}")'
comp_text = _component_source_text("pp-view")
wire_text = _full_wire_text(sx_src, "pp-view")
oob_wire = _oob_code("pp-wire", wire_text)
oob_comp = _oob_code("pp-comp", comp_text)
return sx_response(f'(<> {sx_src} {oob_wire} {oob_comp})')
# --- JSON Encoding ---
@csrf_exempt
@bp.post("/examples/api/json-echo")
async def api_json_echo():
from shared.sx.helpers import sx_response
from sxc.pages import _oob_code, _component_source_text, _full_wire_text
data = await request.get_json(silent=True) or {}
body = json.dumps(data, indent=2)
ct = request.content_type or "unknown"
escaped_body = body.replace('\\', '\\\\').replace('"', '\\"')
escaped_ct = ct.replace('"', '\\"')
sx_src = f'(~json-result :body "{escaped_body}" :content-type "{escaped_ct}")'
comp_text = _component_source_text("json-result")
wire_text = _full_wire_text(sx_src, "json-result")
oob_wire = _oob_code("json-wire", wire_text)
oob_comp = _oob_code("json-comp", comp_text)
return sx_response(f'(<> {sx_src} {oob_wire} {oob_comp})')
# --- Vals & Headers ---
@bp.get("/examples/api/echo-vals")
async def api_echo_vals():
from shared.sx.helpers import sx_response
from sxc.pages import _oob_code, _component_source_text, _full_wire_text
vals = {k: v for k, v in request.args.items()
if k not in ("_", "sx-request")}
items_sx = " ".join(f'"{k}: {v}"' for k, v in vals.items())
sx_src = f'(~echo-result :label "values" :items (list {items_sx}))'
comp_text = _component_source_text("echo-result")
wire_text = _full_wire_text(sx_src, "echo-result")
oob_wire = _oob_code("vals-wire", wire_text)
oob_comp = _oob_code("vals-comp", comp_text)
return sx_response(f'(<> {sx_src} {oob_wire} {oob_comp})')
@bp.get("/examples/api/echo-headers")
async def api_echo_headers():
from shared.sx.helpers import sx_response
from sxc.pages import _oob_code, _component_source_text, _full_wire_text
custom = {k: v for k, v in request.headers if k.lower().startswith("x-")}
items_sx = " ".join(f'"{k}: {v}"' for k, v in custom.items())
sx_src = f'(~echo-result :label "headers" :items (list {items_sx}))'
comp_text = _component_source_text("echo-result")
wire_text = _full_wire_text(sx_src, "echo-result")
oob_wire = _oob_code("vals-wire", wire_text)
oob_comp = _oob_code("vals-comp", comp_text)
return sx_response(f'(<> {sx_src} {oob_wire} {oob_comp})')
# --- Loading States ---
@bp.get("/examples/api/slow")
async def api_slow():
from shared.sx.helpers import sx_response
from sxc.pages import _oob_code, _component_source_text, _full_wire_text
await asyncio.sleep(2)
now = datetime.now().strftime("%H:%M:%S")
sx_src = f'(~loading-result :time "{now}")'
comp_text = _component_source_text("loading-result")
wire_text = _full_wire_text(sx_src, "loading-result")
oob_wire = _oob_code("loading-wire", wire_text)
oob_comp = _oob_code("loading-comp", comp_text)
return sx_response(f'(<> {sx_src} {oob_wire} {oob_comp})')
# --- Request Abort (sync replace) ---
@bp.get("/examples/api/slow-search")
async def api_slow_search():
from shared.sx.helpers import sx_response
from sxc.pages import _oob_code, _component_source_text, _full_wire_text
delay = random.uniform(0.5, 2.0)
await asyncio.sleep(delay)
q = request.args.get("q", "").strip()
delay_ms = int(delay * 1000)
escaped = q.replace('"', '\\"')
sx_src = f'(~sync-result :query "{escaped}" :delay "{delay_ms}")'
comp_text = _component_source_text("sync-result")
wire_text = _full_wire_text(sx_src, "sync-result")
oob_wire = _oob_code("sync-wire", wire_text)
oob_comp = _oob_code("sync-comp", comp_text)
return sx_response(f'(<> {sx_src} {oob_wire} {oob_comp})')
# --- Retry ---
_flaky = {"n": 0}
@bp.get("/examples/api/flaky")
async def api_flaky():
from shared.sx.helpers import sx_response
from sxc.pages import _oob_code, _component_source_text, _full_wire_text
_flaky["n"] += 1
n = _flaky["n"]
if n % 3 != 0:
return Response("", status=503, content_type="text/plain")
sx_src = f'(~retry-result :attempt "{n}" :message "Success! The endpoint finally responded.")'
comp_text = _component_source_text("retry-result")
wire_text = _full_wire_text(sx_src, "retry-result")
oob_wire = _oob_code("retry-wire", wire_text)
oob_comp = _oob_code("retry-comp", comp_text)
return sx_response(f'(<> {sx_src} {oob_wire} {oob_comp})')
# ------------------------------------------------------------------
# Reference attribute detail API endpoints (for live demos)
# ------------------------------------------------------------------
def _ref_wire(wire_id: str, sx_src: str) -> str:
"""Build OOB swap showing the wire response text."""
from sxc.pages import _oob_code
return _oob_code(f"ref-wire-{wire_id}", sx_src)
@bp.get("/reference/api/time")
async def ref_time():
from shared.sx.helpers import sx_response
now = datetime.now().strftime("%H:%M:%S")
sx_src = f'(span :class "text-stone-800 text-sm" "Server time: " (strong "{now}"))'
oob = _ref_wire("sx-get", sx_src)
return sx_response(f'(<> {sx_src} {oob})')
@csrf_exempt
@bp.post("/reference/api/greet")
async def ref_greet():
from shared.sx.helpers import sx_response
form = await request.form
name = form.get("name") or "stranger"
sx_src = f'(span :class "text-stone-800 text-sm" "Hello, " (strong "{name}") "!")'
oob = _ref_wire("sx-post", sx_src)
return sx_response(f'(<> {sx_src} {oob})')
@csrf_exempt
@bp.put("/reference/api/status")
async def ref_status():
from shared.sx.helpers import sx_response
form = await request.form
status = form.get("status", "unknown")
sx_src = f'(span :class "text-stone-700 text-sm" "Status: " (strong "{status}") " — updated via PUT")'
oob = _ref_wire("sx-put", sx_src)
return sx_response(f'(<> {sx_src} {oob})')
@csrf_exempt
@bp.patch("/reference/api/theme")
async def ref_theme():
from shared.sx.helpers import sx_response
form = await request.form
theme = form.get("theme", "unknown")
sx_src = f'"{theme}"'
oob = _ref_wire("sx-patch", sx_src)
return sx_response(f'(<> {sx_src} {oob})')
@csrf_exempt
@bp.delete("/reference/api/item/<item_id>")
async def ref_delete(item_id: str):
from shared.sx.helpers import sx_response
oob = _ref_wire("sx-delete", '""')
return sx_response(f'(<> {oob})')
@bp.get("/reference/api/trigger-search")
async def ref_trigger_search():
from shared.sx.helpers import sx_response
q = request.args.get("q", "")
if not q:
sx_src = '(span :class "text-stone-400 text-sm" "Start typing to trigger a search.")'
else:
sx_src = f'(span :class "text-stone-800 text-sm" "Results for: " (strong "{q}"))'
oob = _ref_wire("sx-trigger", sx_src)
return sx_response(f'(<> {sx_src} {oob})')
@bp.get("/reference/api/swap-item")
async def ref_swap_item():
from shared.sx.helpers import sx_response
now = datetime.now().strftime("%H:%M:%S")
sx_src = f'(div :class "text-sm text-violet-700" "New item (" "{now}" ")")'
oob = _ref_wire("sx-swap", sx_src)
return sx_response(f'(<> {sx_src} {oob})')
@bp.get("/reference/api/oob")
async def ref_oob():
from shared.sx.helpers import sx_response
now = datetime.now().strftime("%H:%M:%S")
sx_src = (
f'(<>'
f' (span :class "text-emerald-700 text-sm" "Main updated at " "{now}")'
f' (div :id "ref-oob-side" :sx-swap-oob "innerHTML"'
f' (span :class "text-violet-700 text-sm" "OOB updated at " "{now}")))')
oob = _ref_wire("sx-swap-oob", sx_src)
return sx_response(f'(<> {sx_src} {oob})')
@bp.get("/reference/api/select-page")
async def ref_select_page():
from shared.sx.helpers import sx_response
now = datetime.now().strftime("%H:%M:%S")
sx_src = (
f'(<>'
f' (div :id "the-header" (h3 "Page header — not selected"))'
f' (div :id "the-content"'
f' (span :class "text-emerald-700 text-sm"'
f' "This fragment was selected from a larger response. Time: " "{now}"))'
f' (div :id "the-footer" (p "Page footer — not selected")))')
oob = _ref_wire("sx-select", sx_src)
return sx_response(f'(<> {sx_src} {oob})')
@bp.get("/reference/api/slow-echo")
async def ref_slow_echo():
from shared.sx.helpers import sx_response
await asyncio.sleep(0.8)
q = request.args.get("q", "")
sx_src = f'(span :class "text-stone-800 text-sm" "Echo: " (strong "{q}"))'
oob = _ref_wire("sx-sync", sx_src)
return sx_response(f'(<> {sx_src} {oob})')
@csrf_exempt
@bp.post("/reference/api/upload-name")
async def ref_upload_name():
from shared.sx.helpers import sx_response
files = await request.files
f = files.get("file")
name = f.filename if f else "(no file)"
sx_src = f'(span :class "text-stone-800 text-sm" "Received: " (strong "{name}"))'
oob = _ref_wire("sx-encoding", sx_src)
return sx_response(f'(<> {sx_src} {oob})')
@bp.get("/reference/api/echo-headers")
async def ref_echo_headers():
from shared.sx.helpers import sx_response
custom = [(k, v) for k, v in request.headers if k.lower().startswith("x-")]
if not custom:
sx_src = '(span :class "text-stone-400 text-sm" "No custom headers received.")'
else:
items = " ".join(
f'(li (strong "{k}") ": " "{v}")' for k, v in custom)
sx_src = f'(ul :class "text-sm text-stone-700 space-y-1" {items})'
oob = _ref_wire("sx-headers", sx_src)
return sx_response(f'(<> {sx_src} {oob})')
@bp.get("/reference/api/echo-vals")
async def ref_echo_vals_get():
from shared.sx.helpers import sx_response
vals = list(request.args.items())
if not vals:
sx_src = '(span :class "text-stone-400 text-sm" "No values received.")'
else:
items = " ".join(
f'(li (strong "{k}") ": " "{v}")' for k, v in vals)
sx_src = f'(ul :class "text-sm text-stone-700 space-y-1" {items})'
oob_include = _ref_wire("sx-include", sx_src)
return sx_response(f'(<> {sx_src} {oob_include})')
@csrf_exempt
@bp.post("/reference/api/echo-vals")
async def ref_echo_vals_post():
from shared.sx.helpers import sx_response
form = await request.form
vals = list(form.items())
if not vals:
sx_src = '(span :class "text-stone-400 text-sm" "No values received.")'
else:
items = " ".join(
f'(li (strong "{k}") ": " "{v}")' for k, v in vals)
sx_src = f'(ul :class "text-sm text-stone-700 space-y-1" {items})'
oob = _ref_wire("sx-vals", sx_src)
return sx_response(f'(<> {sx_src} {oob})')
_ref_flaky = {"n": 0}
@bp.get("/reference/api/flaky")
async def ref_flaky():
from shared.sx.helpers import sx_response
_ref_flaky["n"] += 1
n = _ref_flaky["n"]
if n % 3 != 0:
return Response("", status=503, content_type="text/plain")
sx_src = f'(span :class "text-emerald-700 text-sm" "Success on attempt " "{n}" "!")'
oob = _ref_wire("sx-retry", sx_src)
return sx_response(f'(<> {sx_src} {oob})')
@bp.get("/reference/api/prompt-echo")
async def ref_prompt_echo():
from shared.sx.helpers import sx_response
name = request.headers.get("SX-Prompt", "anonymous")
sx_src = f'(span :class "text-stone-800 text-sm" "Hello, " (strong "{name}") "!")'
oob = _ref_wire("sx-prompt", sx_src)
return sx_response(f'(<> {sx_src} {oob})')
@bp.get("/reference/api/sse-time")
async def ref_sse_time():
async def generate():
for _ in range(30): # stream for 60 seconds max
now = datetime.now().strftime("%H:%M:%S")
sx_src = f'(span :class "text-emerald-700 font-mono text-sm" "Server time: {now}")'
yield f"event: time\ndata: {sx_src}\n\n"
await asyncio.sleep(2)
return Response(generate(), content_type="text/event-stream",
headers={"Cache-Control": "no-cache", "X-Accel-Buffering": "no"})
return bp