Loading: lazy loading, infinite scroll, progress bar Forms: active search, inline validation, value select, reset on submit Records: edit row, bulk update Swap/DOM: swap positions, select filter, tabs Display: animations, dialogs, keyboard shortcuts HTTP: PUT/PATCH, JSON encoding, vals & headers Resilience: loading states, request abort (sync replace), retry Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
835 lines
36 KiB
Python
835 lines
36 KiB
Python
"""SX docs page routes."""
|
|
from __future__ import annotations
|
|
|
|
import asyncio
|
|
import json
|
|
import random
|
|
from datetime import datetime
|
|
from uuid import uuid4
|
|
|
|
from quart import Blueprint, Response, make_response, request
|
|
|
|
|
|
def register(url_prefix: str = "/") -> Blueprint:
|
|
bp = Blueprint("pages", __name__, url_prefix=url_prefix)
|
|
|
|
def _is_sx_request() -> bool:
|
|
return bool(request.headers.get("SX-Request") or request.headers.get("HX-Request"))
|
|
|
|
# ------------------------------------------------------------------
|
|
# Home
|
|
# ------------------------------------------------------------------
|
|
|
|
@bp.get("/")
|
|
async def index():
|
|
if _is_sx_request():
|
|
from shared.sx.helpers import sx_response
|
|
from sxc.sx_components import home_oob_sx
|
|
return sx_response(await home_oob_sx())
|
|
|
|
from shared.sx.page import get_template_context
|
|
from sxc.sx_components import render_home_page_sx
|
|
ctx = await get_template_context()
|
|
html = await render_home_page_sx(ctx)
|
|
return await make_response(html, 200)
|
|
|
|
# ------------------------------------------------------------------
|
|
# Docs
|
|
# ------------------------------------------------------------------
|
|
|
|
@bp.get("/docs/")
|
|
async def docs_index():
|
|
from quart import redirect
|
|
return redirect("/docs/introduction")
|
|
|
|
@bp.get("/docs/<slug>")
|
|
async def docs_page(slug: str):
|
|
if _is_sx_request():
|
|
from shared.sx.helpers import sx_response
|
|
from sxc.sx_components import docs_oob_sx
|
|
return sx_response(await docs_oob_sx(slug))
|
|
|
|
from shared.sx.page import get_template_context
|
|
from sxc.sx_components import render_docs_page_sx
|
|
ctx = await get_template_context()
|
|
html = await render_docs_page_sx(ctx, slug)
|
|
return await make_response(html, 200)
|
|
|
|
# ------------------------------------------------------------------
|
|
# Reference
|
|
# ------------------------------------------------------------------
|
|
|
|
@bp.get("/reference/")
|
|
async def reference_index():
|
|
if _is_sx_request():
|
|
from shared.sx.helpers import sx_response
|
|
from sxc.sx_components import reference_oob_sx
|
|
return sx_response(await reference_oob_sx(""))
|
|
|
|
from shared.sx.page import get_template_context
|
|
from sxc.sx_components import render_reference_page_sx
|
|
ctx = await get_template_context()
|
|
html = await render_reference_page_sx(ctx, "")
|
|
return await make_response(html, 200)
|
|
|
|
@bp.get("/reference/<slug>")
|
|
async def reference_page(slug: str):
|
|
if _is_sx_request():
|
|
from shared.sx.helpers import sx_response
|
|
from sxc.sx_components import reference_oob_sx
|
|
return sx_response(await reference_oob_sx(slug))
|
|
|
|
from shared.sx.page import get_template_context
|
|
from sxc.sx_components import render_reference_page_sx
|
|
ctx = await get_template_context()
|
|
html = await render_reference_page_sx(ctx, slug)
|
|
return await make_response(html, 200)
|
|
|
|
# ------------------------------------------------------------------
|
|
# Protocols
|
|
# ------------------------------------------------------------------
|
|
|
|
@bp.get("/protocols/")
|
|
async def protocols_index():
|
|
from quart import redirect
|
|
return redirect("/protocols/wire-format")
|
|
|
|
@bp.get("/protocols/<slug>")
|
|
async def protocol_page(slug: str):
|
|
if _is_sx_request():
|
|
from shared.sx.helpers import sx_response
|
|
from sxc.sx_components import protocol_oob_sx
|
|
return sx_response(await protocol_oob_sx(slug))
|
|
|
|
from shared.sx.page import get_template_context
|
|
from sxc.sx_components import render_protocol_page_sx
|
|
ctx = await get_template_context()
|
|
html = await render_protocol_page_sx(ctx, slug)
|
|
return await make_response(html, 200)
|
|
|
|
# ------------------------------------------------------------------
|
|
# Examples
|
|
# ------------------------------------------------------------------
|
|
|
|
@bp.get("/examples/")
|
|
async def examples_index():
|
|
from quart import redirect
|
|
return redirect("/examples/click-to-load")
|
|
|
|
@bp.get("/examples/<slug>")
|
|
async def examples_page(slug: str):
|
|
if _is_sx_request():
|
|
from shared.sx.helpers import sx_response
|
|
from sxc.sx_components import examples_oob_sx
|
|
return sx_response(await examples_oob_sx(slug))
|
|
|
|
from shared.sx.page import get_template_context
|
|
from sxc.sx_components import render_examples_page_sx
|
|
ctx = await get_template_context()
|
|
html = await render_examples_page_sx(ctx, slug)
|
|
return await make_response(html, 200)
|
|
|
|
# ------------------------------------------------------------------
|
|
# Example API endpoints (for live demos)
|
|
# ------------------------------------------------------------------
|
|
|
|
@bp.get("/examples/api/click")
|
|
async def api_click():
|
|
from shared.sx.helpers import sx_response
|
|
from sxc.sx_components import _oob_code, _component_source_text, _full_wire_text
|
|
now = datetime.now().strftime("%Y-%m-%d %H:%M:%S")
|
|
sx_src = f'(~click-result :time "{now}")'
|
|
comp_text = _component_source_text("click-result")
|
|
wire_text = _full_wire_text(sx_src, "click-result")
|
|
oob_wire = _oob_code("click-wire", wire_text)
|
|
oob_comp = _oob_code("click-comp", comp_text)
|
|
return sx_response(f'(<> {sx_src} {oob_wire} {oob_comp})')
|
|
|
|
@bp.post("/examples/api/form")
|
|
async def api_form():
|
|
from shared.sx.helpers import sx_response
|
|
from sxc.sx_components import _oob_code, _component_source_text, _full_wire_text
|
|
form = await request.form
|
|
name = form.get("name", "")
|
|
escaped = name.replace('"', '\\"')
|
|
sx_src = f'(~form-result :name "{escaped}")'
|
|
comp_text = _component_source_text("form-result")
|
|
wire_text = _full_wire_text(sx_src, "form-result")
|
|
oob_wire = _oob_code("form-wire", wire_text)
|
|
oob_comp = _oob_code("form-comp", comp_text)
|
|
return sx_response(f'(<> {sx_src} {oob_wire} {oob_comp})')
|
|
|
|
_poll_count = {"n": 0}
|
|
|
|
@bp.get("/examples/api/poll")
|
|
async def api_poll():
|
|
from shared.sx.helpers import sx_response
|
|
from sxc.sx_components import _oob_code, _component_source_text, _full_wire_text
|
|
_poll_count["n"] += 1
|
|
now = datetime.now().strftime("%H:%M:%S")
|
|
count = min(_poll_count["n"], 10)
|
|
sx_src = f'(~poll-result :time "{now}" :count {count})'
|
|
comp_text = _component_source_text("poll-result")
|
|
wire_text = _full_wire_text(sx_src, "poll-result")
|
|
oob_wire = _oob_code("poll-wire", wire_text)
|
|
oob_comp = _oob_code("poll-comp", comp_text)
|
|
return sx_response(f'(<> {sx_src} {oob_wire} {oob_comp})')
|
|
|
|
@bp.delete("/examples/api/delete/<item_id>")
|
|
async def api_delete(item_id: str):
|
|
from shared.sx.helpers import sx_response
|
|
from sxc.sx_components import _oob_code, _component_source_text, _full_wire_text
|
|
# Empty primary response — outerHTML swap removes the row
|
|
# But send OOB swaps to show what happened
|
|
wire_text = _full_wire_text(f'(empty — row #{item_id} removed by outerHTML swap)')
|
|
comp_text = _component_source_text("delete-row")
|
|
oob_wire = _oob_code("delete-wire", wire_text)
|
|
oob_comp = _oob_code("delete-comp", comp_text)
|
|
return sx_response(f'(<> {oob_wire} {oob_comp})')
|
|
|
|
@bp.get("/examples/api/edit")
|
|
async def api_edit_form():
|
|
from shared.sx.helpers import sx_response
|
|
from sxc.sx_components import _oob_code, _component_source_text, _full_wire_text
|
|
value = request.args.get("value", "")
|
|
escaped = value.replace('"', '\\"')
|
|
sx_src = f'(~inline-edit-form :value "{escaped}")'
|
|
comp_text = _component_source_text("inline-edit-form")
|
|
wire_text = _full_wire_text(sx_src, "inline-edit-form")
|
|
oob_wire = _oob_code("edit-wire", wire_text)
|
|
oob_comp = _oob_code("edit-comp", comp_text)
|
|
return sx_response(f'(<> {sx_src} {oob_wire} {oob_comp})')
|
|
|
|
@bp.post("/examples/api/edit")
|
|
async def api_edit_save():
|
|
from shared.sx.helpers import sx_response
|
|
from sxc.sx_components import _oob_code, _component_source_text, _full_wire_text
|
|
form = await request.form
|
|
value = form.get("value", "")
|
|
escaped = value.replace('"', '\\"')
|
|
sx_src = f'(~inline-view :value "{escaped}")'
|
|
comp_text = _component_source_text("inline-view")
|
|
wire_text = _full_wire_text(sx_src, "inline-view")
|
|
oob_wire = _oob_code("edit-wire", wire_text)
|
|
oob_comp = _oob_code("edit-comp", comp_text)
|
|
return sx_response(f'(<> {sx_src} {oob_wire} {oob_comp})')
|
|
|
|
@bp.get("/examples/api/edit/cancel")
|
|
async def api_edit_cancel():
|
|
from shared.sx.helpers import sx_response
|
|
from sxc.sx_components import _oob_code, _component_source_text, _full_wire_text
|
|
value = request.args.get("value", "")
|
|
escaped = value.replace('"', '\\"')
|
|
sx_src = f'(~inline-view :value "{escaped}")'
|
|
comp_text = _component_source_text("inline-view")
|
|
wire_text = _full_wire_text(sx_src, "inline-view")
|
|
oob_wire = _oob_code("edit-wire", wire_text)
|
|
oob_comp = _oob_code("edit-comp", comp_text)
|
|
return sx_response(f'(<> {sx_src} {oob_wire} {oob_comp})')
|
|
|
|
@bp.get("/examples/api/oob")
|
|
async def api_oob():
|
|
from shared.sx.helpers import sx_response
|
|
from sxc.sx_components import _oob_code, _full_wire_text
|
|
now = datetime.now().strftime("%H:%M:%S")
|
|
sx_src = (
|
|
f'(<>'
|
|
f' (p :class "text-emerald-600 font-medium" "Box A updated!")'
|
|
f' (p :class "text-sm text-stone-500" "at {now}")'
|
|
f' (div :id "oob-box-b" :sx-swap-oob "innerHTML"'
|
|
f' (p :class "text-violet-600 font-medium" "Box B updated via OOB!")'
|
|
f' (p :class "text-sm text-stone-500" "at {now}")))'
|
|
)
|
|
wire_text = _full_wire_text(sx_src)
|
|
oob_wire = _oob_code("oob-wire", wire_text)
|
|
return sx_response(f'(<> {sx_src} {oob_wire})')
|
|
|
|
# --- Lazy Loading ---
|
|
|
|
@bp.get("/examples/api/lazy")
|
|
async def api_lazy():
|
|
from shared.sx.helpers import sx_response
|
|
from sxc.sx_components import _oob_code, _component_source_text, _full_wire_text
|
|
now = datetime.now().strftime("%H:%M:%S")
|
|
sx_src = f'(~lazy-result :time "{now}")'
|
|
comp_text = _component_source_text("lazy-result")
|
|
wire_text = _full_wire_text(sx_src, "lazy-result")
|
|
oob_wire = _oob_code("lazy-wire", wire_text)
|
|
oob_comp = _oob_code("lazy-comp", comp_text)
|
|
return sx_response(f'(<> {sx_src} {oob_wire} {oob_comp})')
|
|
|
|
# --- Infinite Scroll ---
|
|
|
|
@bp.get("/examples/api/scroll")
|
|
async def api_scroll():
|
|
from shared.sx.helpers import sx_response
|
|
from sxc.sx_components import _oob_code, _full_wire_text
|
|
page = int(request.args.get("page", 2))
|
|
start = (page - 1) * 5 + 1
|
|
next_page = page + 1
|
|
items_html = " ".join(
|
|
f'(div :class "px-4 py-3 border-b border-stone-100 text-sm text-stone-700" "Item {i} — loaded from page {page}")'
|
|
for i in range(start, start + 5)
|
|
)
|
|
if next_page <= 6:
|
|
sentinel = (
|
|
f'(div :id "scroll-sentinel"'
|
|
f' :sx-get "/examples/api/scroll?page={next_page}"'
|
|
f' :sx-trigger "intersect once"'
|
|
f' :sx-target "#scroll-items"'
|
|
f' :sx-swap "beforeend"'
|
|
f' :class "p-3 text-center text-stone-400 text-sm"'
|
|
f' "Loading more...")'
|
|
)
|
|
else:
|
|
sentinel = (
|
|
'(div :class "p-3 text-center text-stone-500 text-sm font-medium"'
|
|
' "All items loaded.")'
|
|
)
|
|
sx_src = f'(<> {items_html} {sentinel})'
|
|
wire_text = _full_wire_text(sx_src)
|
|
oob_wire = _oob_code("scroll-wire", wire_text)
|
|
return sx_response(f'(<> {sx_src} {oob_wire})')
|
|
|
|
# --- Progress Bar ---
|
|
|
|
_jobs: dict[str, int] = {}
|
|
|
|
@bp.post("/examples/api/progress/start")
|
|
async def api_progress_start():
|
|
from shared.sx.helpers import sx_response
|
|
from sxc.sx_components import _oob_code, _component_source_text, _full_wire_text
|
|
job_id = str(uuid4())[:8]
|
|
_jobs[job_id] = 0
|
|
sx_src = f'(~progress-status :percent 0 :job-id "{job_id}")'
|
|
comp_text = _component_source_text("progress-status")
|
|
wire_text = _full_wire_text(sx_src, "progress-status")
|
|
oob_wire = _oob_code("progress-wire", wire_text)
|
|
oob_comp = _oob_code("progress-comp", comp_text)
|
|
return sx_response(f'(<> {sx_src} {oob_wire} {oob_comp})')
|
|
|
|
@bp.get("/examples/api/progress/status")
|
|
async def api_progress_status():
|
|
from shared.sx.helpers import sx_response
|
|
from sxc.sx_components import _oob_code, _component_source_text, _full_wire_text
|
|
job_id = request.args.get("job", "")
|
|
current = _jobs.get(job_id, 0)
|
|
current = min(current + random.randint(15, 30), 100)
|
|
_jobs[job_id] = current
|
|
sx_src = f'(~progress-status :percent {current} :job-id "{job_id}")'
|
|
comp_text = _component_source_text("progress-status")
|
|
wire_text = _full_wire_text(sx_src, "progress-status")
|
|
oob_wire = _oob_code("progress-wire", wire_text)
|
|
oob_comp = _oob_code("progress-comp", comp_text)
|
|
return sx_response(f'(<> {sx_src} {oob_wire} {oob_comp})')
|
|
|
|
# --- Active Search ---
|
|
|
|
@bp.get("/examples/api/search")
|
|
async def api_search():
|
|
from shared.sx.helpers import sx_response
|
|
from sxc.sx_components import _oob_code, _component_source_text, _full_wire_text
|
|
from content.pages import SEARCH_LANGUAGES
|
|
q = request.args.get("q", "").strip().lower()
|
|
if not q:
|
|
results = SEARCH_LANGUAGES
|
|
else:
|
|
results = [lang for lang in SEARCH_LANGUAGES if q in lang.lower()]
|
|
items_sx = " ".join(f'"{r}"' for r in results)
|
|
escaped_q = q.replace('"', '\\"')
|
|
sx_src = f'(~search-results :items (list {items_sx}) :query "{escaped_q}")'
|
|
comp_text = _component_source_text("search-results")
|
|
wire_text = _full_wire_text(sx_src, "search-results")
|
|
oob_wire = _oob_code("search-wire", wire_text)
|
|
oob_comp = _oob_code("search-comp", comp_text)
|
|
return sx_response(f'(<> {sx_src} {oob_wire} {oob_comp})')
|
|
|
|
# --- Inline Validation ---
|
|
|
|
_TAKEN_EMAILS = {"admin@example.com", "test@example.com", "user@example.com"}
|
|
|
|
@bp.get("/examples/api/validate")
|
|
async def api_validate():
|
|
from shared.sx.helpers import sx_response
|
|
from sxc.sx_components import _oob_code, _component_source_text, _full_wire_text
|
|
email = request.args.get("email", "").strip()
|
|
if not email:
|
|
sx_src = '(~validation-error :message "Email is required")'
|
|
comp_name = "validation-error"
|
|
elif "@" not in email or "." not in email.split("@")[-1]:
|
|
sx_src = '(~validation-error :message "Invalid email format")'
|
|
comp_name = "validation-error"
|
|
elif email.lower() in _TAKEN_EMAILS:
|
|
escaped = email.replace('"', '\\"')
|
|
sx_src = f'(~validation-error :message "{escaped} is already taken")'
|
|
comp_name = "validation-error"
|
|
else:
|
|
escaped = email.replace('"', '\\"')
|
|
sx_src = f'(~validation-ok :email "{escaped}")'
|
|
comp_name = "validation-ok"
|
|
comp_text = _component_source_text(comp_name)
|
|
wire_text = _full_wire_text(sx_src, comp_name)
|
|
oob_wire = _oob_code("validate-wire", wire_text)
|
|
oob_comp = _oob_code("validate-comp", comp_text)
|
|
return sx_response(f'(<> {sx_src} {oob_wire} {oob_comp})')
|
|
|
|
@bp.post("/examples/api/validate/submit")
|
|
async def api_validate_submit():
|
|
from shared.sx.helpers import sx_response
|
|
form = await request.form
|
|
email = form.get("email", "").strip()
|
|
if not email or "@" not in email:
|
|
return sx_response('(p :class "text-sm text-rose-600 mt-2" "Please enter a valid email.")')
|
|
escaped = email.replace('"', '\\"')
|
|
return sx_response(f'(p :class "text-sm text-emerald-600 mt-2" "Form submitted with: {escaped}")')
|
|
|
|
# --- Value Select ---
|
|
|
|
@bp.get("/examples/api/values")
|
|
async def api_values():
|
|
from shared.sx.helpers import sx_response
|
|
from sxc.sx_components import _oob_code, _component_source_text, _full_wire_text
|
|
from content.pages import VALUE_SELECT_DATA
|
|
cat = request.args.get("category", "")
|
|
items = VALUE_SELECT_DATA.get(cat, [])
|
|
items_sx = " ".join(f'"{i}"' for i in items)
|
|
sx_src = f'(~value-options :items (list {items_sx}))'
|
|
comp_text = _component_source_text("value-options")
|
|
wire_text = _full_wire_text(sx_src, "value-options")
|
|
oob_wire = _oob_code("values-wire", wire_text)
|
|
oob_comp = _oob_code("values-comp", comp_text)
|
|
return sx_response(f'(<> {sx_src} {oob_wire} {oob_comp})')
|
|
|
|
# --- Reset on Submit ---
|
|
|
|
@bp.post("/examples/api/reset-submit")
|
|
async def api_reset_submit():
|
|
from shared.sx.helpers import sx_response
|
|
from sxc.sx_components import _oob_code, _component_source_text, _full_wire_text
|
|
form = await request.form
|
|
msg = form.get("message", "").strip() or "(empty)"
|
|
escaped = msg.replace('"', '\\"')
|
|
now = datetime.now().strftime("%H:%M:%S")
|
|
sx_src = f'(~reset-message :message "{escaped}" :time "{now}")'
|
|
comp_text = _component_source_text("reset-message")
|
|
wire_text = _full_wire_text(sx_src, "reset-message")
|
|
oob_wire = _oob_code("reset-wire", wire_text)
|
|
oob_comp = _oob_code("reset-comp", comp_text)
|
|
return sx_response(f'(<> {sx_src} {oob_wire} {oob_comp})')
|
|
|
|
# --- Edit Row ---
|
|
|
|
_edit_rows: dict[str, dict] = {}
|
|
|
|
def _get_edit_rows() -> dict[str, dict]:
|
|
if not _edit_rows:
|
|
from content.pages import EDIT_ROW_DATA
|
|
for r in EDIT_ROW_DATA:
|
|
_edit_rows[r["id"]] = dict(r)
|
|
return _edit_rows
|
|
|
|
@bp.get("/examples/api/editrow/<row_id>")
|
|
async def api_editrow_form(row_id: str):
|
|
from shared.sx.helpers import sx_response
|
|
from sxc.sx_components import _oob_code, _component_source_text, _full_wire_text
|
|
rows = _get_edit_rows()
|
|
row = rows.get(row_id, {"id": row_id, "name": "", "price": "0", "stock": "0"})
|
|
sx_src = (f'(~edit-row-form :id "{row["id"]}" :name "{row["name"]}"'
|
|
f' :price "{row["price"]}" :stock "{row["stock"]}")')
|
|
comp_text = _component_source_text("edit-row-form")
|
|
wire_text = _full_wire_text(sx_src, "edit-row-form")
|
|
oob_wire = _oob_code("editrow-wire", wire_text)
|
|
oob_comp = _oob_code("editrow-comp", comp_text)
|
|
return sx_response(f'(<> {sx_src} {oob_wire} {oob_comp})')
|
|
|
|
@bp.post("/examples/api/editrow/<row_id>")
|
|
async def api_editrow_save(row_id: str):
|
|
from shared.sx.helpers import sx_response
|
|
from sxc.sx_components import _oob_code, _component_source_text, _full_wire_text
|
|
form = await request.form
|
|
rows = _get_edit_rows()
|
|
rows[row_id] = {
|
|
"id": row_id,
|
|
"name": form.get("name", ""),
|
|
"price": form.get("price", "0"),
|
|
"stock": form.get("stock", "0"),
|
|
}
|
|
row = rows[row_id]
|
|
sx_src = (f'(~edit-row-view :id "{row["id"]}" :name "{row["name"]}"'
|
|
f' :price "{row["price"]}" :stock "{row["stock"]}")')
|
|
comp_text = _component_source_text("edit-row-view")
|
|
wire_text = _full_wire_text(sx_src, "edit-row-view")
|
|
oob_wire = _oob_code("editrow-wire", wire_text)
|
|
oob_comp = _oob_code("editrow-comp", comp_text)
|
|
return sx_response(f'(<> {sx_src} {oob_wire} {oob_comp})')
|
|
|
|
@bp.get("/examples/api/editrow/<row_id>/cancel")
|
|
async def api_editrow_cancel(row_id: str):
|
|
from shared.sx.helpers import sx_response
|
|
from sxc.sx_components import _oob_code, _component_source_text, _full_wire_text
|
|
rows = _get_edit_rows()
|
|
row = rows.get(row_id, {"id": row_id, "name": "", "price": "0", "stock": "0"})
|
|
sx_src = (f'(~edit-row-view :id "{row["id"]}" :name "{row["name"]}"'
|
|
f' :price "{row["price"]}" :stock "{row["stock"]}")')
|
|
comp_text = _component_source_text("edit-row-view")
|
|
wire_text = _full_wire_text(sx_src, "edit-row-view")
|
|
oob_wire = _oob_code("editrow-wire", wire_text)
|
|
oob_comp = _oob_code("editrow-comp", comp_text)
|
|
return sx_response(f'(<> {sx_src} {oob_wire} {oob_comp})')
|
|
|
|
# --- Bulk Update ---
|
|
|
|
_bulk_users: dict[str, dict] = {}
|
|
|
|
def _get_bulk_users() -> dict[str, dict]:
|
|
if not _bulk_users:
|
|
from content.pages import BULK_USERS
|
|
for u in BULK_USERS:
|
|
_bulk_users[u["id"]] = dict(u)
|
|
return _bulk_users
|
|
|
|
@bp.post("/examples/api/bulk")
|
|
async def api_bulk():
|
|
from shared.sx.helpers import sx_response
|
|
from sxc.sx_components import _oob_code, _component_source_text, _full_wire_text
|
|
action = request.args.get("action", "activate")
|
|
form = await request.form
|
|
ids = form.getlist("ids")
|
|
users = _get_bulk_users()
|
|
new_status = "active" if action == "activate" else "inactive"
|
|
for uid in ids:
|
|
if uid in users:
|
|
users[uid]["status"] = new_status
|
|
rows = []
|
|
for u in users.values():
|
|
rows.append(
|
|
f'(~bulk-row :id "{u["id"]}" :name "{u["name"]}"'
|
|
f' :email "{u["email"]}" :status "{u["status"]}")'
|
|
)
|
|
sx_src = f'(<> {" ".join(rows)})'
|
|
comp_text = _component_source_text("bulk-row")
|
|
wire_text = _full_wire_text(sx_src, "bulk-row")
|
|
oob_wire = _oob_code("bulk-wire", wire_text)
|
|
oob_comp = _oob_code("bulk-comp", comp_text)
|
|
return sx_response(f'(<> {sx_src} {oob_wire} {oob_comp})')
|
|
|
|
# --- Swap Positions ---
|
|
|
|
_swap_count = {"n": 0}
|
|
|
|
@bp.post("/examples/api/swap-log")
|
|
async def api_swap_log():
|
|
from shared.sx.helpers import sx_response
|
|
from sxc.sx_components import _oob_code, _full_wire_text
|
|
mode = request.args.get("mode", "beforeend")
|
|
_swap_count["n"] += 1
|
|
now = datetime.now().strftime("%H:%M:%S")
|
|
n = _swap_count["n"]
|
|
entry = f'(div :class "px-3 py-2 text-sm text-stone-700" "[{now}] {mode} (#{n})")'
|
|
oob_counter = (
|
|
f'(span :id "swap-counter" :sx-swap-oob "innerHTML"'
|
|
f' :class "self-center text-sm text-stone-500" "Count: {n}")'
|
|
)
|
|
sx_src = f'(<> {entry} {oob_counter})'
|
|
wire_text = _full_wire_text(sx_src)
|
|
oob_wire = _oob_code("swap-wire", wire_text)
|
|
return sx_response(f'(<> {sx_src} {oob_wire})')
|
|
|
|
# --- Select Filter (dashboard) ---
|
|
|
|
@bp.get("/examples/api/dashboard")
|
|
async def api_dashboard():
|
|
from shared.sx.helpers import sx_response
|
|
from sxc.sx_components import _oob_code, _full_wire_text
|
|
now = datetime.now().strftime("%H:%M:%S")
|
|
sx_src = (
|
|
f'(<>'
|
|
f' (div :id "dash-header" :class "p-3 bg-violet-50 rounded mb-3"'
|
|
f' (h4 :class "font-semibold text-violet-800" "Dashboard Header")'
|
|
f' (p :class "text-sm text-violet-600" "Generated at {now}"))'
|
|
f' (div :id "dash-stats" :class "grid grid-cols-3 gap-3 mb-3"'
|
|
f' (div :class "p-3 bg-emerald-50 rounded text-center"'
|
|
f' (p :class "text-2xl font-bold text-emerald-700" "142")'
|
|
f' (p :class "text-xs text-emerald-600" "Users"))'
|
|
f' (div :class "p-3 bg-blue-50 rounded text-center"'
|
|
f' (p :class "text-2xl font-bold text-blue-700" "89")'
|
|
f' (p :class "text-xs text-blue-600" "Orders"))'
|
|
f' (div :class "p-3 bg-amber-50 rounded text-center"'
|
|
f' (p :class "text-2xl font-bold text-amber-700" "$4.2k")'
|
|
f' (p :class "text-xs text-amber-600" "Revenue")))'
|
|
f' (div :id "dash-footer" :class "p-3 bg-stone-50 rounded"'
|
|
f' (p :class "text-sm text-stone-500" "Last updated: {now}")))'
|
|
)
|
|
wire_text = _full_wire_text(sx_src)
|
|
oob_wire = _oob_code("filter-wire", wire_text)
|
|
return sx_response(f'(<> {sx_src} {oob_wire})')
|
|
|
|
# --- Tabs ---
|
|
|
|
_TAB_CONTENT = {
|
|
"tab1": ('(div (p :class "text-stone-700" "Welcome to the Overview tab.")'
|
|
' (p :class "text-stone-500 text-sm mt-2"'
|
|
' "This is the default tab content loaded via sx-get."))'),
|
|
"tab2": ('(div (p :class "text-stone-700" "Here are the details.")'
|
|
' (ul :class "mt-2 space-y-1 text-sm text-stone-600"'
|
|
' (li "Version: 1.0.0")'
|
|
' (li "Build: 2024-01-15")'
|
|
' (li "Engine: sx")))'),
|
|
"tab3": ('(div (p :class "text-stone-700" "Recent history:")'
|
|
' (ol :class "mt-2 space-y-1 text-sm text-stone-600 list-decimal list-inside"'
|
|
' (li "Initial release")'
|
|
' (li "Added component caching")'
|
|
' (li "Wire format v2")))'),
|
|
}
|
|
|
|
@bp.get("/examples/api/tabs/<tab>")
|
|
async def api_tabs(tab: str):
|
|
from shared.sx.helpers import sx_response
|
|
from sxc.sx_components import _oob_code, _full_wire_text
|
|
sx_src = _TAB_CONTENT.get(tab, _TAB_CONTENT["tab1"])
|
|
buttons = []
|
|
for t, label in [("tab1", "Overview"), ("tab2", "Details"), ("tab3", "History")]:
|
|
active = "true" if t == tab else "false"
|
|
buttons.append(f'(~tab-btn :tab "{t}" :label "{label}" :active "{active}")')
|
|
oob_tabs = (
|
|
f'(div :id "tab-buttons" :sx-swap-oob "innerHTML"'
|
|
f' :class "flex border-b border-stone-200"'
|
|
f' {" ".join(buttons)})'
|
|
)
|
|
wire_text = _full_wire_text(f'(<> {sx_src} {oob_tabs})', "tab-btn")
|
|
oob_wire = _oob_code("tabs-wire", wire_text)
|
|
return sx_response(f'(<> {sx_src} {oob_tabs} {oob_wire})')
|
|
|
|
# --- Animations ---
|
|
|
|
@bp.get("/examples/api/animate")
|
|
async def api_animate():
|
|
from shared.sx.helpers import sx_response
|
|
from sxc.sx_components import _oob_code, _component_source_text, _full_wire_text
|
|
colors = ["bg-violet-100", "bg-emerald-100", "bg-blue-100", "bg-amber-100", "bg-rose-100"]
|
|
color = random.choice(colors)
|
|
now = datetime.now().strftime("%H:%M:%S")
|
|
sx_src = f'(~anim-result :color "{color}" :time "{now}")'
|
|
comp_text = _component_source_text("anim-result")
|
|
wire_text = _full_wire_text(sx_src, "anim-result")
|
|
oob_wire = _oob_code("anim-wire", wire_text)
|
|
oob_comp = _oob_code("anim-comp", comp_text)
|
|
return sx_response(f'(<> {sx_src} {oob_wire} {oob_comp})')
|
|
|
|
# --- Dialogs ---
|
|
|
|
@bp.get("/examples/api/dialog")
|
|
async def api_dialog():
|
|
from shared.sx.helpers import sx_response
|
|
from sxc.sx_components import _oob_code, _component_source_text, _full_wire_text
|
|
sx_src = '(~dialog-modal :title "Confirm Action" :message "Are you sure you want to proceed? This is a demo dialog rendered entirely with sx components.")'
|
|
comp_text = _component_source_text("dialog-modal")
|
|
wire_text = _full_wire_text(sx_src, "dialog-modal")
|
|
oob_wire = _oob_code("dialog-wire", wire_text)
|
|
oob_comp = _oob_code("dialog-comp", comp_text)
|
|
return sx_response(f'(<> {sx_src} {oob_wire} {oob_comp})')
|
|
|
|
@bp.get("/examples/api/dialog/close")
|
|
async def api_dialog_close():
|
|
from shared.sx.helpers import sx_response
|
|
from sxc.sx_components import _oob_code, _full_wire_text
|
|
wire_text = _full_wire_text("(empty — dialog closed)")
|
|
oob_wire = _oob_code("dialog-wire", wire_text)
|
|
return sx_response(f'(<> {oob_wire})')
|
|
|
|
# --- Keyboard Shortcuts ---
|
|
|
|
_KBD_ACTIONS = {
|
|
"s": "Search panel activated",
|
|
"n": "New item created",
|
|
"h": "Help panel opened",
|
|
}
|
|
|
|
@bp.get("/examples/api/keyboard")
|
|
async def api_keyboard():
|
|
from shared.sx.helpers import sx_response
|
|
from sxc.sx_components import _oob_code, _component_source_text, _full_wire_text
|
|
key = request.args.get("key", "")
|
|
action = _KBD_ACTIONS.get(key, f"Unknown key: {key}")
|
|
escaped_action = action.replace('"', '\\"')
|
|
escaped_key = key.replace('"', '\\"')
|
|
sx_src = f'(~kbd-result :key "{escaped_key}" :action "{escaped_action}")'
|
|
comp_text = _component_source_text("kbd-result")
|
|
wire_text = _full_wire_text(sx_src, "kbd-result")
|
|
oob_wire = _oob_code("kbd-wire", wire_text)
|
|
oob_comp = _oob_code("kbd-comp", comp_text)
|
|
return sx_response(f'(<> {sx_src} {oob_wire} {oob_comp})')
|
|
|
|
# --- PUT / PATCH ---
|
|
|
|
_profile = {}
|
|
|
|
def _get_profile() -> dict:
|
|
if not _profile:
|
|
from content.pages import PROFILE_DEFAULT
|
|
_profile.update(PROFILE_DEFAULT)
|
|
return _profile
|
|
|
|
@bp.get("/examples/api/putpatch/edit-all")
|
|
async def api_pp_edit_all():
|
|
from shared.sx.helpers import sx_response
|
|
from sxc.sx_components import _oob_code, _component_source_text, _full_wire_text
|
|
p = _get_profile()
|
|
sx_src = f'(~pp-form-full :name "{p["name"]}" :email "{p["email"]}" :role "{p["role"]}")'
|
|
comp_text = _component_source_text("pp-form-full")
|
|
wire_text = _full_wire_text(sx_src, "pp-form-full")
|
|
oob_wire = _oob_code("pp-wire", wire_text)
|
|
oob_comp = _oob_code("pp-comp", comp_text)
|
|
return sx_response(f'(<> {sx_src} {oob_wire} {oob_comp})')
|
|
|
|
@bp.put("/examples/api/putpatch")
|
|
async def api_pp_put():
|
|
from shared.sx.helpers import sx_response
|
|
from sxc.sx_components import _oob_code, _component_source_text, _full_wire_text
|
|
form = await request.form
|
|
p = _get_profile()
|
|
p["name"] = form.get("name", p["name"])
|
|
p["email"] = form.get("email", p["email"])
|
|
p["role"] = form.get("role", p["role"])
|
|
sx_src = f'(~pp-view :name "{p["name"]}" :email "{p["email"]}" :role "{p["role"]}")'
|
|
comp_text = _component_source_text("pp-view")
|
|
wire_text = _full_wire_text(sx_src, "pp-view")
|
|
oob_wire = _oob_code("pp-wire", wire_text)
|
|
oob_comp = _oob_code("pp-comp", comp_text)
|
|
return sx_response(f'(<> {sx_src} {oob_wire} {oob_comp})')
|
|
|
|
@bp.get("/examples/api/putpatch/cancel")
|
|
async def api_pp_cancel():
|
|
from shared.sx.helpers import sx_response
|
|
from sxc.sx_components import _oob_code, _component_source_text, _full_wire_text
|
|
p = _get_profile()
|
|
sx_src = f'(~pp-view :name "{p["name"]}" :email "{p["email"]}" :role "{p["role"]}")'
|
|
comp_text = _component_source_text("pp-view")
|
|
wire_text = _full_wire_text(sx_src, "pp-view")
|
|
oob_wire = _oob_code("pp-wire", wire_text)
|
|
oob_comp = _oob_code("pp-comp", comp_text)
|
|
return sx_response(f'(<> {sx_src} {oob_wire} {oob_comp})')
|
|
|
|
# --- JSON Encoding ---
|
|
|
|
@bp.post("/examples/api/json-echo")
|
|
async def api_json_echo():
|
|
from shared.sx.helpers import sx_response
|
|
from sxc.sx_components import _oob_code, _component_source_text, _full_wire_text
|
|
data = await request.get_json(silent=True) or {}
|
|
body = json.dumps(data, indent=2)
|
|
ct = request.content_type or "unknown"
|
|
escaped_body = body.replace('\\', '\\\\').replace('"', '\\"')
|
|
escaped_ct = ct.replace('"', '\\"')
|
|
sx_src = f'(~json-result :body "{escaped_body}" :content-type "{escaped_ct}")'
|
|
comp_text = _component_source_text("json-result")
|
|
wire_text = _full_wire_text(sx_src, "json-result")
|
|
oob_wire = _oob_code("json-wire", wire_text)
|
|
oob_comp = _oob_code("json-comp", comp_text)
|
|
return sx_response(f'(<> {sx_src} {oob_wire} {oob_comp})')
|
|
|
|
# --- Vals & Headers ---
|
|
|
|
@bp.get("/examples/api/echo-vals")
|
|
async def api_echo_vals():
|
|
from shared.sx.helpers import sx_response
|
|
from sxc.sx_components import _oob_code, _component_source_text, _full_wire_text
|
|
vals = {k: v for k, v in request.args.items()
|
|
if k not in ("_", "sx-request")}
|
|
items_sx = " ".join(f'"{k}: {v}"' for k, v in vals.items())
|
|
sx_src = f'(~echo-result :label "values" :items (list {items_sx}))'
|
|
comp_text = _component_source_text("echo-result")
|
|
wire_text = _full_wire_text(sx_src, "echo-result")
|
|
oob_wire = _oob_code("vals-wire", wire_text)
|
|
oob_comp = _oob_code("vals-comp", comp_text)
|
|
return sx_response(f'(<> {sx_src} {oob_wire} {oob_comp})')
|
|
|
|
@bp.get("/examples/api/echo-headers")
|
|
async def api_echo_headers():
|
|
from shared.sx.helpers import sx_response
|
|
from sxc.sx_components import _oob_code, _component_source_text, _full_wire_text
|
|
custom = {k: v for k, v in request.headers if k.lower().startswith("x-")}
|
|
items_sx = " ".join(f'"{k}: {v}"' for k, v in custom.items())
|
|
sx_src = f'(~echo-result :label "headers" :items (list {items_sx}))'
|
|
comp_text = _component_source_text("echo-result")
|
|
wire_text = _full_wire_text(sx_src, "echo-result")
|
|
oob_wire = _oob_code("vals-wire", wire_text)
|
|
oob_comp = _oob_code("vals-comp", comp_text)
|
|
return sx_response(f'(<> {sx_src} {oob_wire} {oob_comp})')
|
|
|
|
# --- Loading States ---
|
|
|
|
@bp.get("/examples/api/slow")
|
|
async def api_slow():
|
|
from shared.sx.helpers import sx_response
|
|
from sxc.sx_components import _oob_code, _component_source_text, _full_wire_text
|
|
await asyncio.sleep(2)
|
|
now = datetime.now().strftime("%H:%M:%S")
|
|
sx_src = f'(~loading-result :time "{now}")'
|
|
comp_text = _component_source_text("loading-result")
|
|
wire_text = _full_wire_text(sx_src, "loading-result")
|
|
oob_wire = _oob_code("loading-wire", wire_text)
|
|
oob_comp = _oob_code("loading-comp", comp_text)
|
|
return sx_response(f'(<> {sx_src} {oob_wire} {oob_comp})')
|
|
|
|
# --- Request Abort (sync replace) ---
|
|
|
|
@bp.get("/examples/api/slow-search")
|
|
async def api_slow_search():
|
|
from shared.sx.helpers import sx_response
|
|
from sxc.sx_components import _oob_code, _component_source_text, _full_wire_text
|
|
delay = random.uniform(0.5, 2.0)
|
|
await asyncio.sleep(delay)
|
|
q = request.args.get("q", "").strip()
|
|
delay_ms = int(delay * 1000)
|
|
escaped = q.replace('"', '\\"')
|
|
sx_src = f'(~sync-result :query "{escaped}" :delay "{delay_ms}")'
|
|
comp_text = _component_source_text("sync-result")
|
|
wire_text = _full_wire_text(sx_src, "sync-result")
|
|
oob_wire = _oob_code("sync-wire", wire_text)
|
|
oob_comp = _oob_code("sync-comp", comp_text)
|
|
return sx_response(f'(<> {sx_src} {oob_wire} {oob_comp})')
|
|
|
|
# --- Retry ---
|
|
|
|
_flaky = {"n": 0}
|
|
|
|
@bp.get("/examples/api/flaky")
|
|
async def api_flaky():
|
|
from shared.sx.helpers import sx_response
|
|
from sxc.sx_components import _oob_code, _component_source_text, _full_wire_text
|
|
_flaky["n"] += 1
|
|
n = _flaky["n"]
|
|
if n % 3 != 0:
|
|
return Response("", status=503, content_type="text/plain")
|
|
sx_src = f'(~retry-result :attempt "{n}" :message "Success! The endpoint finally responded.")'
|
|
comp_text = _component_source_text("retry-result")
|
|
wire_text = _full_wire_text(sx_src, "retry-result")
|
|
oob_wire = _oob_code("retry-wire", wire_text)
|
|
oob_comp = _oob_code("retry-comp", comp_text)
|
|
return sx_response(f'(<> {sx_src} {oob_wire} {oob_comp})')
|
|
|
|
# ------------------------------------------------------------------
|
|
# Essays
|
|
# ------------------------------------------------------------------
|
|
|
|
@bp.get("/essays/")
|
|
async def essays_index():
|
|
from quart import redirect
|
|
return redirect("/essays/sx-sucks")
|
|
|
|
@bp.get("/essays/<slug>")
|
|
async def essay_page(slug: str):
|
|
if _is_sx_request():
|
|
from shared.sx.helpers import sx_response
|
|
from sxc.sx_components import essay_oob_sx
|
|
return sx_response(await essay_oob_sx(slug))
|
|
|
|
from shared.sx.page import get_template_context
|
|
from sxc.sx_components import render_essay_page_sx
|
|
ctx = await get_template_context()
|
|
html = await render_essay_page_sx(ctx, slug)
|
|
return await make_response(html, 200)
|
|
|
|
return bp
|