19 Commits

Author SHA1 Message Date
bc7a4a5128 Add cross-service URL functions and rights to base_context
All checks were successful
Build and Deploy / build-and-deploy (push) Successful in 2m22s
blog_url, market_url, cart_url, events_url and g.rights were only
available as Jinja globals, not in the ctx dict passed to sexp
helper functions. This caused all cross-service links in the header
system (post title, cart badge, admin cog, admin nav items) to
produce relative URLs resolving to the current service domain.

Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
2026-02-28 23:19:42 +00:00
8e4c2c139e Fix duplicate menu rows on HTMX navigation between depth levels
All checks were successful
Build and Deploy / build-and-deploy (push) Successful in 2m39s
When navigating from a deeper page (e.g. day) to a shallower one
(e.g. calendar) via HTMX, orphaned header rows from the deeper page
persisted in the DOM because OOB swaps only replaced specific child
divs, not siblings. Fix by sending empty OOB swaps to clear all
header row IDs not present at the current depth.

Applied to events (calendars/calendar/day/entry/admin/slots) and
market (market_home/browse/product/admin). Also restore app_label
in root header.

Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
2026-02-28 23:09:15 +00:00
db3f48ec75 Remove app_label text from root header, keep settings cog
All checks were successful
Build and Deploy / build-and-deploy (push) Successful in 3m6s
The word "settings" (app_label) was showing next to "Rose Ash 2.0"
in the top bar. Removed that label while restoring the settings cog
icon on the right side of the menu bar.

Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
2026-02-28 23:03:46 +00:00
b40f3d124c Remove settings cog from root header bar
All checks were successful
Build and Deploy / build-and-deploy (push) Successful in 2m13s
The settings page is accessible via its own route; no need for a
persistent cog icon next to Rose Ash 2.0.

Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
2026-02-28 22:59:33 +00:00
3809affcab Test dashboard: full menu system, all-service tests, filtering
All checks were successful
Build and Deploy / build-and-deploy (push) Successful in 1m11s
- Run tests for all 10 services via per-service pytest subprocesses
- Group results by service with section headers
- Clickable summary cards filter by outcome (passed/failed/errors/skipped)
- Service filter nav using ~nav-link buttons in menu bar
- Full menu integration: ~header-row + ~header-child + ~menu-row
- Show logo image via cart-mini rendering
- Mount full service directories in docker-compose for test access
- Add 24 unit test files across 9 services

Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
2026-02-28 22:54:25 +00:00
81e51ae7bc Fix settings cog URL: /settings/ not /admin/
All checks were successful
Build and Deploy / build-and-deploy (push) Successful in 2m50s
Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
2026-02-28 22:50:16 +00:00
b6119b7f04 Show settings cog on root header for admin users
All checks were successful
Build and Deploy / build-and-deploy (push) Successful in 2m30s
Pass settings_url and is_admin to header-row component so the blog
settings cog appears on the root header row for admin users across
all services. Links to blog /admin/.

Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
2026-02-28 22:47:32 +00:00
75cb5d43b9 Apply generic admin header pattern to all events admin pages
Some checks failed
Build and Deploy / build-and-deploy (push) Has been cancelled
Events admin pages (calendars, calendar admin, day admin, entry admin,
slots, slot detail) now use shared post_admin_header_html with
selected="calendars". Container nav is fetched via fragments so post
header row matches other services.

Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
2026-02-28 22:46:00 +00:00
f628b35fc3 Make post header row generic: admin cog + container_nav in shared helper
All checks were successful
Build and Deploy / build-and-deploy (push) Successful in 2m31s
Move admin cog generation and container_nav border wrapping from
blog-specific wrapper into shared post_header_html so all services
render identical post header rows. Blog, events, cart all delegate
to the shared helper now. Cart admin pages fetch container_nav_html
via fragments. Village Hall always links to blog.

Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
2026-02-28 22:37:24 +00:00
2e4fbd5777 Remove extra cart header row from admin pages, use shared post header
All checks were successful
Build and Deploy / build-and-deploy (push) Successful in 1m21s
Cart admin pages (admin overview, payments) now use the same header
pattern as blog/market/events: root_header → post_header → admin_header.
The domain name appears via app_label on the root header instead of a
separate level-1 "cart" row.

Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
2026-02-28 22:17:36 +00:00
b47ad6224b Unify post admin nav across all services
All checks were successful
Build and Deploy / build-and-deploy (push) Successful in 1m56s
Move post admin header into shared/sexp/helpers.py so blog, cart,
events, and market all render the same admin row with identical nav:
calendars | markets | payments | entries | data | edit | settings.

All links are external (cross-service). The selected item shows
highlighted on the right and as white text next to "admin" on the left.

- blog: delegates to shared helper, removes blog-specific nav builder
- cart: delegates to shared helper for payments admin
- events: adds shared admin row (selected=calendars) to calendar admin
- market: adds /<slug>/admin/ route + page_admin blueprint, delegates
  to shared helper (selected=markets). Fixes 404 on page-level admin.

Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
2026-02-28 22:01:56 +00:00
2d08d6f787 Eliminate payments sub-admin row in cart, show selection on admin label
All checks were successful
Build and Deploy / build-and-deploy (push) Successful in 2m25s
Same pattern as blog: remove the level-3 payments header row, instead
show "payments" in white text next to "admin" on the admin row.

Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
2026-02-28 21:35:02 +00:00
beebe559cd Show selected sub-page name in white next to admin label
All checks were successful
Build and Deploy / build-and-deploy (push) Successful in 2m29s
Appends e.g. "settings" in white text next to the admin shield icon
on the left side of the admin row, in addition to the highlighted
nav button on the right.

Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
2026-02-28 21:28:27 +00:00
b63aa72efb Fix admin nav selection: use !important to override text-black
Some checks failed
Build and Deploy / build-and-deploy (push) Has been cancelled
The direct bg-stone-500 text-white classes were losing to text-black
in Tailwind specificity. Use !bg-stone-500 !text-white to ensure
selected admin nav items display correctly.

Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
2026-02-28 21:27:02 +00:00
8cfa12de6b Eliminate post sub-admin rows, highlight active nav on admin row
All checks were successful
Build and Deploy / build-and-deploy (push) Successful in 1m15s
Remove the separate sub-admin header rows (data, entries, edit, settings)
that caused duplicate/stale rows on HTMX navigation and font styling breaks.
Instead, pass selected= to the admin row to highlight the active nav item
via aria-selected styling. External nav items (calendars, markets, payments)
also gain is-selected and select-colours support.

Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
2026-02-28 20:40:03 +00:00
3dd62bd9bf Bigger text in test dashboard + add deliberate failing test
All checks were successful
Build and Deploy / build-and-deploy (push) Successful in 2m18s
Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
2026-02-28 20:34:19 +00:00
c926e5221d Fix test dashboard: use raw! for pre-rendered table rows
All checks were successful
Build and Deploy / build-and-deploy (push) Successful in 1m20s
Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
2026-02-28 20:32:40 +00:00
d62643312a Skip OAuth/auth for test service (public dashboard)
All checks were successful
Build and Deploy / build-and-deploy (push) Successful in 1m50s
Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
2026-02-28 20:24:07 +00:00
8852ab1108 Add test service to OAuth allowed clients
All checks were successful
Build and Deploy / build-and-deploy (push) Successful in 1m41s
Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
2026-02-28 20:09:13 +00:00
56 changed files with 2939 additions and 401 deletions

View File

@@ -44,7 +44,7 @@ from .services import (
SESSION_USER_KEY = "uid"
ACCOUNT_SESSION_KEY = "account_sid"
ALLOWED_CLIENTS = {"blog", "market", "cart", "events", "federation", "orders", "artdag", "artdag_l2"}
ALLOWED_CLIENTS = {"blog", "market", "cart", "events", "federation", "orders", "test", "artdag", "artdag_l2"}
def register(url_prefix="/auth"):

View File

View File

@@ -0,0 +1,39 @@
"""Unit tests for account auth operations."""
from __future__ import annotations
import pytest
from account.bp.auth.services.auth_operations import validate_email
class TestValidateEmail:
def test_valid_email(self):
ok, email = validate_email("user@example.com")
assert ok is True
assert email == "user@example.com"
def test_uppercase_lowered(self):
ok, email = validate_email("USER@EXAMPLE.COM")
assert ok is True
assert email == "user@example.com"
def test_whitespace_stripped(self):
ok, email = validate_email(" user@example.com ")
assert ok is True
assert email == "user@example.com"
def test_empty_string(self):
ok, email = validate_email("")
assert ok is False
def test_no_at_sign(self):
ok, email = validate_email("notanemail")
assert ok is False
def test_just_at(self):
ok, email = validate_email("@")
assert ok is True # has "@", passes the basic check
def test_spaces_only(self):
ok, email = validate_email(" ")
assert ok is False

View File

@@ -0,0 +1,164 @@
"""Unit tests for Ghost membership helpers."""
from __future__ import annotations
from datetime import datetime
import pytest
from account.services.ghost_membership import (
_iso, _to_str_or_none, _member_email,
_price_cents, _sanitize_member_payload,
)
class TestIso:
def test_none(self):
assert _iso(None) is None
def test_empty(self):
assert _iso("") is None
def test_z_suffix(self):
result = _iso("2024-06-15T12:00:00Z")
assert isinstance(result, datetime)
assert result.year == 2024
def test_offset(self):
result = _iso("2024-06-15T12:00:00+00:00")
assert isinstance(result, datetime)
class TestToStrOrNone:
def test_none(self):
assert _to_str_or_none(None) is None
def test_dict(self):
assert _to_str_or_none({"a": 1}) is None
def test_list(self):
assert _to_str_or_none([1, 2]) is None
def test_bytes(self):
assert _to_str_or_none(b"hello") is None
def test_empty_string(self):
assert _to_str_or_none("") is None
def test_whitespace_only(self):
assert _to_str_or_none(" ") is None
def test_valid_string(self):
assert _to_str_or_none("hello") == "hello"
def test_int(self):
assert _to_str_or_none(42) == "42"
def test_strips_whitespace(self):
assert _to_str_or_none(" hi ") == "hi"
def test_set(self):
assert _to_str_or_none({1, 2}) is None
def test_tuple(self):
assert _to_str_or_none((1,)) is None
def test_bytearray(self):
assert _to_str_or_none(bytearray(b"x")) is None
class TestMemberEmail:
def test_normal(self):
assert _member_email({"email": "USER@EXAMPLE.COM"}) == "user@example.com"
def test_none(self):
assert _member_email({"email": None}) is None
def test_empty(self):
assert _member_email({"email": ""}) is None
def test_whitespace(self):
assert _member_email({"email": " "}) is None
def test_missing_key(self):
assert _member_email({}) is None
def test_strips(self):
assert _member_email({"email": " a@b.com "}) == "a@b.com"
class TestPriceCents:
def test_valid(self):
assert _price_cents({"price": {"amount": 1500}}) == 1500
def test_string_amount(self):
assert _price_cents({"price": {"amount": "2000"}}) == 2000
def test_missing_price(self):
assert _price_cents({}) is None
def test_missing_amount(self):
assert _price_cents({"price": {}}) is None
def test_none_amount(self):
assert _price_cents({"price": {"amount": None}}) is None
def test_nested_none(self):
assert _price_cents({"price": None}) is None
class TestSanitizeMemberPayload:
def test_email_lowercased(self):
result = _sanitize_member_payload({"email": "USER@EXAMPLE.COM"})
assert result["email"] == "user@example.com"
def test_empty_email_excluded(self):
result = _sanitize_member_payload({"email": ""})
assert "email" not in result
def test_name_included(self):
result = _sanitize_member_payload({"name": "Alice"})
assert result["name"] == "Alice"
def test_note_included(self):
result = _sanitize_member_payload({"note": "VIP"})
assert result["note"] == "VIP"
def test_subscribed_bool(self):
result = _sanitize_member_payload({"subscribed": 1})
assert result["subscribed"] is True
def test_labels_with_id(self):
result = _sanitize_member_payload({
"labels": [{"id": "abc"}, {"name": "VIP"}]
})
assert result["labels"] == [{"id": "abc"}, {"name": "VIP"}]
def test_labels_empty_items_excluded(self):
result = _sanitize_member_payload({
"labels": [{"id": None, "name": None}]
})
assert "labels" not in result
def test_newsletters_with_id(self):
result = _sanitize_member_payload({
"newsletters": [{"id": "n1", "subscribed": True}]
})
assert result["newsletters"] == [{"subscribed": True, "id": "n1"}]
def test_newsletters_default_subscribed(self):
result = _sanitize_member_payload({
"newsletters": [{"name": "Weekly"}]
})
assert result["newsletters"][0]["subscribed"] is True
def test_dict_email_excluded(self):
result = _sanitize_member_payload({"email": {"bad": "input"}})
assert "email" not in result
def test_id_passthrough(self):
result = _sanitize_member_payload({"id": "ghost-member-123"})
assert result["id"] == "ghost-member-123"
def test_empty_payload(self):
result = _sanitize_member_payload({})
assert result == {}

View File

@@ -10,8 +10,12 @@
(defcomp ~blog-admin-label ()
(<> (i :class "fa fa-shield-halved" :aria-hidden "true") " admin"))
(defcomp ~blog-admin-nav-item (&key href nav-btn-class label)
(div :class "relative nav-group" (a :href href :class nav-btn-class label)))
(defcomp ~blog-admin-nav-item (&key href nav-btn-class label is-selected select-colours)
(div :class "relative nav-group"
(a :href href
:aria-selected (when is-selected "true")
:class (str (or nav-btn-class "justify-center cursor-pointer flex flex-row items-center gap-2 rounded bg-stone-200 text-black p-3") " " (or select-colours ""))
label)))
(defcomp ~blog-sub-settings-label (&key icon label)
(<> (i :class icon :aria-hidden "true") " " label))

View File

@@ -16,6 +16,7 @@ from shared.sexp.jinja_bridge import render, load_service_components
from shared.sexp.helpers import (
call_url, get_asset_url, root_header_html,
post_header_html as _shared_post_header_html,
post_admin_header_html as _shared_post_admin_header_html,
oob_header_html,
search_mobile_html, search_desktop_html,
full_page, oob_page,
@@ -50,107 +51,18 @@ def _blog_header_html(ctx: dict, *, oob: bool = False) -> str:
# ---------------------------------------------------------------------------
def _post_header_html(ctx: dict, *, oob: bool = False) -> str:
"""Build the post-level header row (blog-specific: container-nav wrapping + admin cog)."""
overrides: dict = {}
# Blog wraps container_nav_html in border styling
container_nav = ctx.get("container_nav_html", "")
if container_nav:
overrides["container_nav_html"] = render("blog-container-nav",
container_nav_html=container_nav,
)
# Admin cog link
from quart import url_for as qurl, request
post = ctx.get("post") or {}
slug = post.get("slug", "")
rights = ctx.get("rights") or {}
has_admin = rights.get("admin") if isinstance(rights, dict) else getattr(rights, "admin", False)
if has_admin and slug:
select_colours = ctx.get("select_colours", "")
styles = ctx.get("styles") or {}
nav_btn = styles.get("nav_button", "") if isinstance(styles, dict) else getattr(styles, "nav_button", "")
admin_href = qurl("blog.post.admin.admin", slug=slug)
is_admin_page = "/admin" in request.path
overrides["post_admin_nav_html"] = render("nav-link",
href=admin_href, hx_select="#main-panel", icon="fa fa-cog",
aclass=f"{nav_btn} {select_colours}",
select_colours=select_colours, is_selected=is_admin_page,
)
effective_ctx = {**ctx, **overrides} if overrides else ctx
return _shared_post_header_html(effective_ctx, oob=oob)
"""Build the post-level header row — delegates to shared helper."""
return _shared_post_header_html(ctx, oob=oob)
# ---------------------------------------------------------------------------
# Post admin header
# ---------------------------------------------------------------------------
def _post_admin_header_html(ctx: dict, *, oob: bool = False) -> str:
"""Post admin header row with admin icon and nav links."""
from quart import url_for as qurl
post = ctx.get("post") or {}
slug = post.get("slug", "")
hx_select = ctx.get("hx_select_search", "#main-panel")
select_colours = ctx.get("select_colours", "")
styles = ctx.get("styles") or {}
nav_btn = styles.get("nav_button", "") if isinstance(styles, dict) else getattr(styles, "nav_button", "")
admin_href = qurl("blog.post.admin.admin", slug=slug)
label_html = render("blog-admin-label")
nav_html = _post_admin_nav_html(ctx)
return render("menu-row",
id="post-admin-row", level=2,
link_href=admin_href, link_label_html=label_html,
nav_html=nav_html, child_id="post-admin-header-child", oob=oob,
)
def _post_admin_nav_html(ctx: dict) -> str:
"""Post admin desktop nav: calendars, markets, payments, entries, data, edit, settings."""
from quart import url_for as qurl
post = ctx.get("post") or {}
slug = post.get("slug", "")
hx_select = ctx.get("hx_select_search", "#main-panel")
select_colours = ctx.get("select_colours", "")
styles = ctx.get("styles") or {}
nav_btn = styles.get("nav_button", "") if isinstance(styles, dict) else getattr(styles, "nav_button", "")
parts = []
# External links to events / market services
events_url_fn = ctx.get("events_url")
market_url_fn = ctx.get("market_url")
if callable(events_url_fn):
for url_fn, path, label in [
(events_url_fn, f"/{slug}/admin/", "calendars"),
(market_url_fn, f"/{slug}/admin/", "markets"),
(ctx.get("cart_url"), f"/{slug}/admin/payments/", "payments"),
]:
if not callable(url_fn):
continue
href = url_fn(path)
parts.append(render("blog-admin-nav-item",
href=href, nav_btn_class=nav_btn, label=label,
))
# HTMX links
for endpoint, label in [
("blog.post.admin.entries", "entries"),
("blog.post.admin.data", "data"),
("blog.post.admin.edit", "edit"),
("blog.post.admin.settings", "settings"),
]:
href = qurl(endpoint, slug=slug)
parts.append(render("nav-link",
href=href, label=label, select_colours=select_colours,
))
return "".join(parts)
def _post_admin_header_html(ctx: dict, *, oob: bool = False, selected: str = "") -> str:
"""Post admin header row — delegates to shared helper."""
slug = (ctx.get("post") or {}).get("slug", "")
return _shared_post_admin_header_html(ctx, slug, oob=oob, selected=selected)
# ---------------------------------------------------------------------------
@@ -1415,32 +1327,16 @@ async def render_post_admin_oob(ctx: dict) -> str:
async def render_post_data_page(ctx: dict) -> str:
root_hdr = root_header_html(ctx)
post_hdr = _post_header_html(ctx)
admin_hdr = _post_admin_header_html(ctx)
from quart import url_for as qurl
slug = (ctx.get("post") or {}).get("slug", "")
data_hdr = _post_sub_admin_header_html(
"post_data-row", "post_data-header-child",
qurl("blog.post.admin.data", slug=slug),
"database", "data", ctx,
)
header_rows = root_hdr + post_hdr + admin_hdr + data_hdr
admin_hdr = _post_admin_header_html(ctx, selected="data")
header_rows = root_hdr + post_hdr + admin_hdr
content = ctx.get("data_html", "")
return full_page(ctx, header_rows_html=header_rows, content_html=content)
async def render_post_data_oob(ctx: dict) -> str:
admin_hdr_oob = _post_admin_header_html(ctx, oob=True)
from quart import url_for as qurl
slug = (ctx.get("post") or {}).get("slug", "")
data_hdr = _post_sub_admin_header_html(
"post_data-row", "post_data-header-child",
qurl("blog.post.admin.data", slug=slug),
"database", "data", ctx,
)
data_oob = _oob_header_html("post-admin-header-child", "post_data-header-child",
data_hdr)
admin_hdr_oob = _post_admin_header_html(ctx, oob=True, selected="data")
content = ctx.get("data_html", "")
return oob_page(ctx, oobs_html=admin_hdr_oob + data_oob, content_html=content)
return oob_page(ctx, oobs_html=admin_hdr_oob, content_html=content)
# ---- Post entries ----
@@ -1448,32 +1344,16 @@ async def render_post_data_oob(ctx: dict) -> str:
async def render_post_entries_page(ctx: dict) -> str:
root_hdr = root_header_html(ctx)
post_hdr = _post_header_html(ctx)
admin_hdr = _post_admin_header_html(ctx)
from quart import url_for as qurl
slug = (ctx.get("post") or {}).get("slug", "")
entries_hdr = _post_sub_admin_header_html(
"post_entries-row", "post_entries-header-child",
qurl("blog.post.admin.entries", slug=slug),
"clock", "entries", ctx,
)
header_rows = root_hdr + post_hdr + admin_hdr + entries_hdr
admin_hdr = _post_admin_header_html(ctx, selected="entries")
header_rows = root_hdr + post_hdr + admin_hdr
content = ctx.get("entries_html", "")
return full_page(ctx, header_rows_html=header_rows, content_html=content)
async def render_post_entries_oob(ctx: dict) -> str:
admin_hdr_oob = _post_admin_header_html(ctx, oob=True)
from quart import url_for as qurl
slug = (ctx.get("post") or {}).get("slug", "")
entries_hdr = _post_sub_admin_header_html(
"post_entries-row", "post_entries-header-child",
qurl("blog.post.admin.entries", slug=slug),
"clock", "entries", ctx,
)
entries_oob = _oob_header_html("post-admin-header-child", "post_entries-header-child",
entries_hdr)
admin_hdr_oob = _post_admin_header_html(ctx, oob=True, selected="entries")
content = ctx.get("entries_html", "")
return oob_page(ctx, oobs_html=admin_hdr_oob + entries_oob, content_html=content)
return oob_page(ctx, oobs_html=admin_hdr_oob, content_html=content)
# ---- Post edit ----
@@ -1481,15 +1361,8 @@ async def render_post_entries_oob(ctx: dict) -> str:
async def render_post_edit_page(ctx: dict) -> str:
root_hdr = root_header_html(ctx)
post_hdr = _post_header_html(ctx)
admin_hdr = _post_admin_header_html(ctx)
from quart import url_for as qurl
slug = (ctx.get("post") or {}).get("slug", "")
edit_hdr = _post_sub_admin_header_html(
"post_edit-row", "post_edit-header-child",
qurl("blog.post.admin.edit", slug=slug),
"pen-to-square", "edit", ctx,
)
header_rows = root_hdr + post_hdr + admin_hdr + edit_hdr
admin_hdr = _post_admin_header_html(ctx, selected="edit")
header_rows = root_hdr + post_hdr + admin_hdr
content = ctx.get("edit_html", "")
body_end = ctx.get("body_end_html", "")
return full_page(ctx, header_rows_html=header_rows, content_html=content,
@@ -1497,18 +1370,9 @@ async def render_post_edit_page(ctx: dict) -> str:
async def render_post_edit_oob(ctx: dict) -> str:
admin_hdr_oob = _post_admin_header_html(ctx, oob=True)
from quart import url_for as qurl
slug = (ctx.get("post") or {}).get("slug", "")
edit_hdr = _post_sub_admin_header_html(
"post_edit-row", "post_edit-header-child",
qurl("blog.post.admin.edit", slug=slug),
"pen-to-square", "edit", ctx,
)
edit_oob = _oob_header_html("post-admin-header-child", "post_edit-header-child",
edit_hdr)
admin_hdr_oob = _post_admin_header_html(ctx, oob=True, selected="edit")
content = ctx.get("edit_html", "")
return oob_page(ctx, oobs_html=admin_hdr_oob + edit_oob, content_html=content)
return oob_page(ctx, oobs_html=admin_hdr_oob, content_html=content)
# ---- Post settings ----
@@ -1516,32 +1380,16 @@ async def render_post_edit_oob(ctx: dict) -> str:
async def render_post_settings_page(ctx: dict) -> str:
root_hdr = root_header_html(ctx)
post_hdr = _post_header_html(ctx)
admin_hdr = _post_admin_header_html(ctx)
from quart import url_for as qurl
slug = (ctx.get("post") or {}).get("slug", "")
settings_hdr = _post_sub_admin_header_html(
"post_settings-row", "post_settings-header-child",
qurl("blog.post.admin.settings", slug=slug),
"cog", "settings", ctx,
)
header_rows = root_hdr + post_hdr + admin_hdr + settings_hdr
admin_hdr = _post_admin_header_html(ctx, selected="settings")
header_rows = root_hdr + post_hdr + admin_hdr
content = ctx.get("settings_html", "")
return full_page(ctx, header_rows_html=header_rows, content_html=content)
async def render_post_settings_oob(ctx: dict) -> str:
admin_hdr_oob = _post_admin_header_html(ctx, oob=True)
from quart import url_for as qurl
slug = (ctx.get("post") or {}).get("slug", "")
settings_hdr = _post_sub_admin_header_html(
"post_settings-row", "post_settings-header-child",
qurl("blog.post.admin.settings", slug=slug),
"cog", "settings", ctx,
)
settings_oob = _oob_header_html("post-admin-header-child", "post_settings-header-child",
settings_hdr)
admin_hdr_oob = _post_admin_header_html(ctx, oob=True, selected="settings")
content = ctx.get("settings_html", "")
return oob_page(ctx, oobs_html=admin_hdr_oob + settings_oob, content_html=content)
return oob_page(ctx, oobs_html=admin_hdr_oob, content_html=content)
# ---- Settings home ----

0
blog/tests/__init__.py Normal file
View File

View File

@@ -0,0 +1,44 @@
"""Unit tests for card fragment parser."""
from __future__ import annotations
import pytest
from blog.bp.blog.services.posts_data import _parse_card_fragments
class TestParseCardFragments:
def test_empty_string(self):
assert _parse_card_fragments("") == {}
def test_single_block(self):
html = '<!-- card-widget:42 --><div>card</div><!-- /card-widget:42 -->'
result = _parse_card_fragments(html)
assert result == {"42": "<div>card</div>"}
def test_multiple_blocks(self):
html = (
'<!-- card-widget:1 -->A<!-- /card-widget:1 -->'
'<!-- card-widget:2 -->B<!-- /card-widget:2 -->'
)
result = _parse_card_fragments(html)
assert result == {"1": "A", "2": "B"}
def test_empty_inner_skipped(self):
html = '<!-- card-widget:99 --> <!-- /card-widget:99 -->'
result = _parse_card_fragments(html)
assert result == {}
def test_multiline_content(self):
html = '<!-- card-widget:5 -->\n<p>line1</p>\n<p>line2</p>\n<!-- /card-widget:5 -->'
result = _parse_card_fragments(html)
assert "5" in result
assert "<p>line1</p>" in result["5"]
def test_mismatched_ids_not_captured(self):
html = '<!-- card-widget:1 -->content<!-- /card-widget:2 -->'
result = _parse_card_fragments(html)
assert result == {}
def test_no_markers(self):
html = '<div>no markers here</div>'
assert _parse_card_fragments(html) == {}

View File

@@ -0,0 +1,103 @@
"""Unit tests for Ghost sync helper functions."""
from __future__ import annotations
from datetime import datetime, timezone
from types import SimpleNamespace
import pytest
from blog.bp.blog.ghost.ghost_sync import _iso, _build_ap_post_data
class TestIso:
def test_none(self):
assert _iso(None) is None
def test_empty_string(self):
assert _iso("") is None
def test_z_suffix(self):
result = _iso("2024-01-15T10:30:00Z")
assert isinstance(result, datetime)
assert result.tzinfo is not None
assert result.year == 2024
assert result.month == 1
assert result.hour == 10
def test_offset_suffix(self):
result = _iso("2024-06-01T08:00:00+00:00")
assert isinstance(result, datetime)
assert result.hour == 8
class TestBuildApPostData:
def _post(self, **kwargs):
defaults = {
"title": "My Post",
"plaintext": "Some body text.",
"custom_excerpt": None,
"excerpt": None,
"feature_image": None,
"feature_image_alt": None,
"html": None,
}
defaults.update(kwargs)
return SimpleNamespace(**defaults)
def _tag(self, slug):
return SimpleNamespace(slug=slug)
def test_basic_post(self):
post = self._post()
result = _build_ap_post_data(post, "https://blog.example.com/post/", [])
assert result["name"] == "My Post"
assert "My Post" in result["content"]
assert "Some body text." in result["content"]
assert result["url"] == "https://blog.example.com/post/"
def test_no_title(self):
post = self._post(title=None)
result = _build_ap_post_data(post, "https://example.com/", [])
assert result["name"] == ""
def test_feature_image(self):
post = self._post(feature_image="https://img.com/photo.jpg",
feature_image_alt="A photo")
result = _build_ap_post_data(post, "https://example.com/", [])
assert "attachment" in result
assert result["attachment"][0]["url"] == "https://img.com/photo.jpg"
assert result["attachment"][0]["name"] == "A photo"
def test_inline_images_capped_at_4(self):
html = "".join(f'<img src="https://img.com/{i}.jpg">' for i in range(10))
post = self._post(html=html)
result = _build_ap_post_data(post, "https://example.com/", [])
assert len(result["attachment"]) == 4
def test_tags(self):
tags = [self._tag("my-tag"), self._tag("another")]
post = self._post()
result = _build_ap_post_data(post, "https://example.com/", tags)
assert "tag" in result
assert len(result["tag"]) == 2
assert result["tag"][0]["name"] == "#mytag" # dashes removed
assert result["tag"][0]["type"] == "Hashtag"
def test_hashtag_in_content(self):
tags = [self._tag("web-dev")]
post = self._post()
result = _build_ap_post_data(post, "https://example.com/", tags)
assert "#webdev" in result["content"]
def test_no_duplicate_images(self):
post = self._post(
feature_image="https://img.com/same.jpg",
html='<img src="https://img.com/same.jpg">',
)
result = _build_ap_post_data(post, "https://example.com/", [])
assert len(result["attachment"]) == 1
def test_multiline_body(self):
post = self._post(plaintext="Para one.\n\nPara two.\n\nPara three.")
result = _build_ap_post_data(post, "https://example.com/", [])
assert result["content"].count("<p>") >= 4 # title + 3 paras + read more

View File

@@ -0,0 +1,393 @@
"""Unit tests for the Lexical JSON → HTML renderer."""
from __future__ import annotations
import pytest
from blog.bp.blog.ghost.lexical_renderer import (
render_lexical, _wrap_format, _align_style,
_FORMAT_BOLD, _FORMAT_ITALIC, _FORMAT_STRIKETHROUGH,
_FORMAT_UNDERLINE, _FORMAT_CODE, _FORMAT_SUBSCRIPT,
_FORMAT_SUPERSCRIPT, _FORMAT_HIGHLIGHT,
)
# ---------------------------------------------------------------------------
# _wrap_format
# ---------------------------------------------------------------------------
class TestWrapFormat:
def test_no_format(self):
assert _wrap_format("hello", 0) == "hello"
def test_bold(self):
assert _wrap_format("x", _FORMAT_BOLD) == "<strong>x</strong>"
def test_italic(self):
assert _wrap_format("x", _FORMAT_ITALIC) == "<em>x</em>"
def test_strikethrough(self):
assert _wrap_format("x", _FORMAT_STRIKETHROUGH) == "<s>x</s>"
def test_underline(self):
assert _wrap_format("x", _FORMAT_UNDERLINE) == "<u>x</u>"
def test_code(self):
assert _wrap_format("x", _FORMAT_CODE) == "<code>x</code>"
def test_subscript(self):
assert _wrap_format("x", _FORMAT_SUBSCRIPT) == "<sub>x</sub>"
def test_superscript(self):
assert _wrap_format("x", _FORMAT_SUPERSCRIPT) == "<sup>x</sup>"
def test_highlight(self):
assert _wrap_format("x", _FORMAT_HIGHLIGHT) == "<mark>x</mark>"
def test_bold_italic(self):
result = _wrap_format("x", _FORMAT_BOLD | _FORMAT_ITALIC)
assert "<strong>" in result
assert "<em>" in result
def test_all_flags(self):
all_flags = (
_FORMAT_BOLD | _FORMAT_ITALIC | _FORMAT_STRIKETHROUGH |
_FORMAT_UNDERLINE | _FORMAT_CODE | _FORMAT_SUBSCRIPT |
_FORMAT_SUPERSCRIPT | _FORMAT_HIGHLIGHT
)
result = _wrap_format("x", all_flags)
for tag in ["strong", "em", "s", "u", "code", "sub", "sup", "mark"]:
assert f"<{tag}>" in result
assert f"</{tag}>" in result
# ---------------------------------------------------------------------------
# _align_style
# ---------------------------------------------------------------------------
class TestAlignStyle:
def test_no_format(self):
assert _align_style({}) == ""
def test_format_zero(self):
assert _align_style({"format": 0}) == ""
def test_left(self):
assert _align_style({"format": 1}) == ' style="text-align: left"'
def test_center(self):
assert _align_style({"format": 2}) == ' style="text-align: center"'
def test_right(self):
assert _align_style({"format": 3}) == ' style="text-align: right"'
def test_justify(self):
assert _align_style({"format": 4}) == ' style="text-align: justify"'
def test_string_format(self):
assert _align_style({"format": "center"}) == ' style="text-align: center"'
def test_unmapped_int(self):
assert _align_style({"format": 99}) == ""
# ---------------------------------------------------------------------------
# render_lexical — text nodes
# ---------------------------------------------------------------------------
class TestRenderLexicalText:
def test_empty_doc(self):
assert render_lexical({"root": {"children": []}}) == ""
def test_plain_text(self):
doc = {"root": {"children": [
{"type": "text", "text": "hello"}
]}}
assert render_lexical(doc) == "hello"
def test_html_escape(self):
doc = {"root": {"children": [
{"type": "text", "text": "<script>alert('xss')</script>"}
]}}
result = render_lexical(doc)
assert "<script>" not in result
assert "&lt;script&gt;" in result
def test_bold_text(self):
doc = {"root": {"children": [
{"type": "text", "text": "bold", "format": _FORMAT_BOLD}
]}}
assert render_lexical(doc) == "<strong>bold</strong>"
def test_string_input(self):
import json
doc = {"root": {"children": [{"type": "text", "text": "hi"}]}}
assert render_lexical(json.dumps(doc)) == "hi"
# ---------------------------------------------------------------------------
# render_lexical — block nodes
# ---------------------------------------------------------------------------
class TestRenderLexicalBlocks:
def test_paragraph(self):
doc = {"root": {"children": [
{"type": "paragraph", "children": [
{"type": "text", "text": "hello"}
]}
]}}
assert render_lexical(doc) == "<p>hello</p>"
def test_empty_paragraph(self):
doc = {"root": {"children": [
{"type": "paragraph", "children": []}
]}}
assert render_lexical(doc) == "<p><br></p>"
def test_heading_default(self):
doc = {"root": {"children": [
{"type": "heading", "children": [
{"type": "text", "text": "title"}
]}
]}}
assert render_lexical(doc) == "<h2>title</h2>"
def test_heading_h3(self):
doc = {"root": {"children": [
{"type": "heading", "tag": "h3", "children": [
{"type": "text", "text": "title"}
]}
]}}
assert render_lexical(doc) == "<h3>title</h3>"
def test_blockquote(self):
doc = {"root": {"children": [
{"type": "quote", "children": [
{"type": "text", "text": "quoted"}
]}
]}}
assert render_lexical(doc) == "<blockquote>quoted</blockquote>"
def test_linebreak(self):
doc = {"root": {"children": [{"type": "linebreak"}]}}
assert render_lexical(doc) == "<br>"
def test_horizontal_rule(self):
doc = {"root": {"children": [{"type": "horizontalrule"}]}}
assert render_lexical(doc) == "<hr>"
def test_unordered_list(self):
doc = {"root": {"children": [
{"type": "list", "listType": "bullet", "children": [
{"type": "listitem", "children": [
{"type": "text", "text": "item"}
]}
]}
]}}
assert render_lexical(doc) == "<ul><li>item</li></ul>"
def test_ordered_list(self):
doc = {"root": {"children": [
{"type": "list", "listType": "number", "children": [
{"type": "listitem", "children": [
{"type": "text", "text": "one"}
]}
]}
]}}
assert render_lexical(doc) == "<ol><li>one</li></ol>"
def test_ordered_list_custom_start(self):
doc = {"root": {"children": [
{"type": "list", "listType": "number", "start": 5, "children": []}
]}}
result = render_lexical(doc)
assert 'start="5"' in result
def test_link(self):
doc = {"root": {"children": [
{"type": "link", "url": "https://example.com", "children": [
{"type": "text", "text": "click"}
]}
]}}
result = render_lexical(doc)
assert 'href="https://example.com"' in result
assert "click" in result
def test_link_xss_url(self):
doc = {"root": {"children": [
{"type": "link", "url": 'javascript:alert("xss")', "children": []}
]}}
result = render_lexical(doc)
assert "javascript:alert(&quot;xss&quot;)" in result
# ---------------------------------------------------------------------------
# render_lexical — cards
# ---------------------------------------------------------------------------
class TestRenderLexicalCards:
def test_image(self):
doc = {"root": {"children": [
{"type": "image", "src": "photo.jpg", "alt": "A photo"}
]}}
result = render_lexical(doc)
assert "kg-image-card" in result
assert 'src="photo.jpg"' in result
assert 'alt="A photo"' in result
assert 'loading="lazy"' in result
def test_image_wide(self):
doc = {"root": {"children": [
{"type": "image", "src": "x.jpg", "cardWidth": "wide"}
]}}
assert "kg-width-wide" in render_lexical(doc)
def test_image_with_caption(self):
doc = {"root": {"children": [
{"type": "image", "src": "x.jpg", "caption": "Caption text"}
]}}
assert "<figcaption>Caption text</figcaption>" in render_lexical(doc)
def test_codeblock(self):
doc = {"root": {"children": [
{"type": "codeblock", "code": "print('hi')", "language": "python"}
]}}
result = render_lexical(doc)
assert 'class="language-python"' in result
assert "print(&#x27;hi&#x27;)" in result
def test_html_card(self):
doc = {"root": {"children": [
{"type": "html", "html": "<div>raw</div>"}
]}}
result = render_lexical(doc)
assert "<!--kg-card-begin: html-->" in result
assert "<div>raw</div>" in result
def test_markdown_card(self):
doc = {"root": {"children": [
{"type": "markdown", "markdown": "**bold**"}
]}}
result = render_lexical(doc)
assert "<!--kg-card-begin: markdown-->" in result
assert "<strong>bold</strong>" in result
def test_callout(self):
doc = {"root": {"children": [
{"type": "callout", "backgroundColor": "blue",
"calloutEmoji": "💡", "children": [
{"type": "text", "text": "Note"}
]}
]}}
result = render_lexical(doc)
assert "kg-callout-card-blue" in result
assert "💡" in result
def test_button(self):
doc = {"root": {"children": [
{"type": "button", "buttonText": "Click",
"buttonUrl": "https://example.com", "alignment": "left"}
]}}
result = render_lexical(doc)
assert "kg-align-left" in result
assert "Click" in result
def test_toggle(self):
doc = {"root": {"children": [
{"type": "toggle", "heading": "FAQ", "children": [
{"type": "text", "text": "Answer"}
]}
]}}
result = render_lexical(doc)
assert "kg-toggle-card" in result
assert "FAQ" in result
def test_audio_duration(self):
doc = {"root": {"children": [
{"type": "audio", "src": "a.mp3", "title": "Song", "duration": 185}
]}}
result = render_lexical(doc)
assert "3:05" in result
def test_audio_zero_duration(self):
doc = {"root": {"children": [
{"type": "audio", "src": "a.mp3", "duration": 0}
]}}
assert "0:00" in render_lexical(doc)
def test_video(self):
doc = {"root": {"children": [
{"type": "video", "src": "v.mp4", "loop": True}
]}}
result = render_lexical(doc)
assert "kg-video-card" in result
assert " loop" in result
def test_file_size_kb(self):
doc = {"root": {"children": [
{"type": "file", "src": "f.pdf", "fileName": "doc.pdf",
"fileSize": 512000} # 500 KB
]}}
assert "500 KB" in render_lexical(doc)
def test_file_size_mb(self):
doc = {"root": {"children": [
{"type": "file", "src": "f.zip", "fileName": "big.zip",
"fileSize": 5242880} # 5 MB
]}}
assert "5.0 MB" in render_lexical(doc)
def test_paywall(self):
doc = {"root": {"children": [{"type": "paywall"}]}}
assert render_lexical(doc) == "<!--members-only-->"
def test_embed(self):
doc = {"root": {"children": [
{"type": "embed", "html": "<iframe></iframe>",
"caption": "Video"}
]}}
result = render_lexical(doc)
assert "kg-embed-card" in result
assert "<figcaption>Video</figcaption>" in result
def test_bookmark(self):
doc = {"root": {"children": [
{"type": "bookmark", "url": "https://example.com",
"metadata": {"title": "Example", "description": "A site"}}
]}}
result = render_lexical(doc)
assert "kg-bookmark-card" in result
assert "Example" in result
def test_unknown_node_ignored(self):
doc = {"root": {"children": [
{"type": "unknown-future-thing"}
]}}
assert render_lexical(doc) == ""
def test_product_stars(self):
doc = {"root": {"children": [
{"type": "product", "productTitle": "Widget",
"rating": 3, "productDescription": "Nice"}
]}}
result = render_lexical(doc)
assert "kg-product-card" in result
assert result.count("kg-product-card-rating-active") == 3
def test_header_card(self):
doc = {"root": {"children": [
{"type": "header", "heading": "Welcome",
"size": "large", "style": "dark"}
]}}
result = render_lexical(doc)
assert "kg-header-card" in result
assert "kg-size-large" in result
assert "Welcome" in result
def test_signup_card(self):
doc = {"root": {"children": [
{"type": "signup", "heading": "Subscribe",
"buttonText": "Join", "style": "light"}
]}}
result = render_lexical(doc)
assert "kg-signup-card" in result
assert "Join" in result

View File

@@ -0,0 +1,83 @@
"""Unit tests for lexical document validator."""
from __future__ import annotations
import pytest
from blog.bp.blog.ghost.lexical_validator import (
validate_lexical, ALLOWED_NODE_TYPES,
)
class TestValidateLexical:
def test_valid_empty_doc(self):
ok, reason = validate_lexical({"root": {"type": "root", "children": []}})
assert ok is True
assert reason is None
def test_non_dict_input(self):
ok, reason = validate_lexical("not a dict")
assert ok is False
assert "JSON object" in reason
def test_list_input(self):
ok, reason = validate_lexical([])
assert ok is False
def test_missing_root(self):
ok, reason = validate_lexical({"foo": "bar"})
assert ok is False
assert "'root'" in reason
def test_root_not_dict(self):
ok, reason = validate_lexical({"root": "string"})
assert ok is False
def test_valid_paragraph(self):
doc = {"root": {"type": "root", "children": [
{"type": "paragraph", "children": [
{"type": "text", "text": "hello"}
]}
]}}
ok, _ = validate_lexical(doc)
assert ok is True
def test_disallowed_type(self):
doc = {"root": {"type": "root", "children": [
{"type": "script"}
]}}
ok, reason = validate_lexical(doc)
assert ok is False
assert "Disallowed node type: script" in reason
def test_nested_disallowed_type(self):
doc = {"root": {"type": "root", "children": [
{"type": "paragraph", "children": [
{"type": "list", "children": [
{"type": "evil-widget"}
]}
]}
]}}
ok, reason = validate_lexical(doc)
assert ok is False
assert "evil-widget" in reason
def test_node_without_type_allowed(self):
"""Nodes with type=None are allowed by _walk."""
doc = {"root": {"type": "root", "children": [
{"children": []} # no "type" key
]}}
ok, _ = validate_lexical(doc)
assert ok is True
def test_all_allowed_types(self):
"""Every type in the allowlist should pass."""
for node_type in ALLOWED_NODE_TYPES:
doc = {"root": {"type": "root", "children": [
{"type": node_type, "children": []}
]}}
ok, reason = validate_lexical(doc)
assert ok is True, f"{node_type} should be allowed but got: {reason}"
def test_allowed_types_count(self):
"""Sanity: at least 30 types in the allowlist."""
assert len(ALLOWED_NODE_TYPES) >= 30

View File

@@ -0,0 +1,50 @@
"""Unit tests for blog slugify utility."""
from __future__ import annotations
import pytest
from blog.bp.post.services.markets import slugify
class TestSlugify:
def test_basic_ascii(self):
assert slugify("Hello World") == "hello-world"
def test_unicode_stripped(self):
assert slugify("café") == "cafe"
def test_slashes_to_dashes(self):
assert slugify("foo/bar") == "foo-bar"
def test_special_chars(self):
assert slugify("foo!!bar") == "foo-bar"
def test_multiple_dashes_collapsed(self):
assert slugify("foo---bar") == "foo-bar"
def test_leading_trailing_dashes_stripped(self):
assert slugify("--foo--") == "foo"
def test_empty_string_fallback(self):
assert slugify("") == "market"
def test_none_fallback(self):
assert slugify(None) == "market"
def test_max_len_truncation(self):
result = slugify("a" * 300, max_len=10)
assert len(result) <= 10
def test_truncation_no_trailing_dash(self):
# "abcde-fgh" truncated to 5 should not end with dash
result = slugify("abcde fgh", max_len=5)
assert not result.endswith("-")
def test_already_clean(self):
assert slugify("hello-world") == "hello-world"
def test_numbers_preserved(self):
assert slugify("item-42") == "item-42"
def test_accented_characters(self):
assert slugify("über straße") == "uber-strae"

View File

@@ -8,11 +8,13 @@ from __future__ import annotations
import os
from typing import Any
from markupsafe import escape
from shared.sexp.jinja_bridge import render, load_service_components
from shared.sexp.helpers import (
call_url, root_header_html, search_desktop_html,
search_mobile_html, full_page, oob_page,
call_url, root_header_html, post_admin_header_html,
post_header_html as _shared_post_header_html,
search_desktop_html, search_mobile_html, full_page, oob_page,
)
from shared.infrastructure.urls import market_product_url, cart_url
@@ -24,6 +26,48 @@ load_service_components(os.path.dirname(os.path.dirname(__file__)))
# Header helpers
# ---------------------------------------------------------------------------
def _ensure_post_ctx(ctx: dict, page_post: Any) -> dict:
"""Ensure ctx has a 'post' dict from page_post DTO (for shared post_header_html)."""
if ctx.get("post") or not page_post:
return ctx
ctx = {**ctx, "post": {
"id": getattr(page_post, "id", None),
"slug": getattr(page_post, "slug", ""),
"title": getattr(page_post, "title", ""),
"feature_image": getattr(page_post, "feature_image", None),
}}
return ctx
async def _ensure_container_nav(ctx: dict) -> dict:
"""Fetch container_nav_html if not already present (for post header row)."""
if ctx.get("container_nav_html"):
return ctx
post = ctx.get("post") or {}
post_id = post.get("id")
slug = post.get("slug", "")
if not post_id:
return ctx
from shared.infrastructure.fragments import fetch_fragments
nav_params = {
"container_type": "page",
"container_id": str(post_id),
"post_slug": slug,
}
events_nav, market_nav = await fetch_fragments([
("events", "container-nav", nav_params),
("market", "container-nav", nav_params),
], required=False)
return {**ctx, "container_nav_html": events_nav + market_nav}
async def _post_header_html(ctx: dict, page_post: Any, *, oob: bool = False) -> str:
"""Build post-level header row from page_post DTO, using shared helper."""
ctx = _ensure_post_ctx(ctx, page_post)
ctx = await _ensure_container_nav(ctx)
return _shared_post_header_html(ctx, oob=oob)
def _cart_header_html(ctx: dict, *, oob: bool = False) -> str:
"""Build the cart section header row."""
return render(
@@ -722,23 +766,12 @@ async def render_checkout_error_page(ctx: dict, error: str | None = None, order:
# Page admin (/<page_slug>/admin/)
# ---------------------------------------------------------------------------
def _cart_page_admin_header_html(ctx: dict, page_post: Any, *, oob: bool = False) -> str:
"""Build the page-level admin header row."""
from quart import url_for
link_href = url_for("page_admin.admin")
return render("menu-row", id="page-admin-row", level=2, colour="sky",
link_href=link_href, link_label="admin", icon="fa fa-cog",
child_id="page-admin-header-child", oob=oob)
def _cart_payments_header_html(ctx: dict, *, oob: bool = False) -> str:
"""Build the payments section header row."""
from quart import url_for
link_href = url_for("page_admin.payments")
return render("menu-row", id="payments-row", level=3, colour="sky",
link_href=link_href, link_label="Payments",
icon="fa fa-credit-card",
child_id="payments-header-child", oob=oob)
def _cart_page_admin_header_html(ctx: dict, page_post: Any, *, oob: bool = False,
selected: str = "") -> str:
"""Build the page-level admin header row — delegates to shared helper."""
slug = page_post.slug if page_post else ""
ctx = _ensure_post_ctx(ctx, page_post)
return post_admin_header_html(ctx, slug, oob=oob, selected=selected)
def _cart_admin_main_panel_html(ctx: dict) -> str:
@@ -783,24 +816,16 @@ def _cart_payments_main_panel_html(ctx: dict) -> str:
async def render_cart_admin_page(ctx: dict, page_post: Any) -> str:
"""Full page: cart page admin overview."""
content = _cart_admin_main_panel_html(ctx)
hdr = root_header_html(ctx)
child = _page_cart_header_html(ctx, page_post) + _cart_page_admin_header_html(ctx, page_post)
hdr += render("cart-header-child-nested",
outer_html=_cart_header_html(ctx), inner_html=child)
return full_page(ctx, header_rows_html=hdr, content_html=content)
root_hdr = root_header_html(ctx)
post_hdr = await _post_header_html(ctx, page_post)
admin_hdr = _cart_page_admin_header_html(ctx, page_post)
return full_page(ctx, header_rows_html=root_hdr + post_hdr + admin_hdr, content_html=content)
async def render_cart_admin_oob(ctx: dict, page_post: Any) -> str:
"""OOB response: cart page admin overview."""
content = _cart_admin_main_panel_html(ctx)
oobs = (
_cart_page_admin_header_html(ctx, page_post, oob=True)
+ render("cart-header-child-oob",
inner_html=_page_cart_header_html(ctx, page_post)
+ _cart_page_admin_header_html(ctx, page_post))
+ _cart_header_html(ctx, oob=True)
+ root_header_html(ctx, oob=True)
)
oobs = _cart_page_admin_header_html(ctx, page_post, oob=True)
return oob_page(ctx, oobs_html=oobs, content_html=content)
@@ -811,28 +836,16 @@ async def render_cart_admin_oob(ctx: dict, page_post: Any) -> str:
async def render_cart_payments_page(ctx: dict, page_post: Any) -> str:
"""Full page: payments config."""
content = _cart_payments_main_panel_html(ctx)
hdr = root_header_html(ctx)
admin_hdr = _cart_page_admin_header_html(ctx, page_post)
payments_hdr = _cart_payments_header_html(ctx)
child = _page_cart_header_html(ctx, page_post) + admin_hdr + payments_hdr
hdr += render("cart-header-child-nested",
outer_html=_cart_header_html(ctx), inner_html=child)
return full_page(ctx, header_rows_html=hdr, content_html=content)
root_hdr = root_header_html(ctx)
post_hdr = await _post_header_html(ctx, page_post)
admin_hdr = _cart_page_admin_header_html(ctx, page_post, selected="payments")
return full_page(ctx, header_rows_html=root_hdr + post_hdr + admin_hdr, content_html=content)
async def render_cart_payments_oob(ctx: dict, page_post: Any) -> str:
"""OOB response: payments config."""
content = _cart_payments_main_panel_html(ctx)
admin_hdr = _cart_page_admin_header_html(ctx, page_post)
payments_hdr = _cart_payments_header_html(ctx)
oobs = (
_cart_payments_header_html(ctx, oob=True)
+ render("cart-header-child-oob",
inner_html=_page_cart_header_html(ctx, page_post)
+ admin_hdr + payments_hdr)
+ _cart_header_html(ctx, oob=True)
+ root_header_html(ctx, oob=True)
)
oobs = _cart_page_admin_header_html(ctx, page_post, oob=True, selected="payments")
return oob_page(ctx, oobs_html=oobs, content_html=content)

0
cart/tests/__init__.py Normal file
View File

View File

@@ -0,0 +1,59 @@
"""Unit tests for calendar/ticket total functions."""
from __future__ import annotations
from decimal import Decimal
from types import SimpleNamespace
import pytest
from cart.bp.cart.services.calendar_cart import calendar_total, ticket_total
def _entry(cost):
return SimpleNamespace(cost=cost)
def _ticket(price):
return SimpleNamespace(price=price)
class TestCalendarTotal:
def test_empty(self):
assert calendar_total([]) == 0
def test_single_entry(self):
assert calendar_total([_entry(25.0)]) == Decimal("25.0")
def test_none_cost_excluded(self):
result = calendar_total([_entry(None)])
assert result == 0
def test_zero_cost(self):
# cost=0 is falsy, so it produces Decimal(0) via the else branch
result = calendar_total([_entry(0)])
assert result == Decimal("0")
def test_multiple(self):
result = calendar_total([_entry(10.0), _entry(20.0)])
assert result == Decimal("30.0")
class TestTicketTotal:
def test_empty(self):
assert ticket_total([]) == Decimal("0")
def test_single_ticket(self):
assert ticket_total([_ticket(15.0)]) == Decimal("15.0")
def test_none_price_treated_as_zero(self):
# ticket_total includes all tickets, None → Decimal(0)
result = ticket_total([_ticket(None)])
assert result == Decimal("0")
def test_multiple(self):
result = ticket_total([_ticket(5.0), _ticket(10.0)])
assert result == Decimal("15.0")
def test_mixed_with_none(self):
result = ticket_total([_ticket(10.0), _ticket(None), _ticket(5.0)])
assert result == Decimal("15.0")

139
cart/tests/test_checkout.py Normal file
View File

@@ -0,0 +1,139 @@
"""Unit tests for cart checkout helpers."""
from __future__ import annotations
from types import SimpleNamespace
from unittest.mock import patch
import pytest
from cart.bp.cart.services.checkout import (
build_sumup_description,
build_sumup_reference,
build_webhook_url,
validate_webhook_secret,
)
def _ci(title=None, qty=1):
return SimpleNamespace(product_title=title, quantity=qty)
# ---------------------------------------------------------------------------
# build_sumup_description
# ---------------------------------------------------------------------------
class TestBuildSumupDescription:
def test_empty_cart_no_tickets(self):
result = build_sumup_description([], 1)
assert "Order 1" in result
assert "0 items" in result
assert "order items" in result
def test_single_item(self):
result = build_sumup_description([_ci("Widget", 1)], 42)
assert "Order 42" in result
assert "1 item)" in result
assert "Widget" in result
def test_quantity_counted(self):
result = build_sumup_description([_ci("Widget", 3)], 1)
assert "3 items" in result
def test_three_titles(self):
items = [_ci("A"), _ci("B"), _ci("C")]
result = build_sumup_description(items, 1)
assert "A, B, C" in result
assert "more" not in result
def test_four_titles_truncated(self):
items = [_ci("A"), _ci("B"), _ci("C"), _ci("D")]
result = build_sumup_description(items, 1)
assert "A, B, C" in result
assert "+ 1 more" in result
def test_none_titles_excluded(self):
items = [_ci(None), _ci("Visible")]
result = build_sumup_description(items, 1)
assert "Visible" in result
def test_tickets_singular(self):
result = build_sumup_description([], 1, ticket_count=1)
assert "1 ticket" in result
assert "tickets" not in result
def test_tickets_plural(self):
result = build_sumup_description([], 1, ticket_count=3)
assert "3 tickets" in result
def test_mixed_items_and_tickets(self):
result = build_sumup_description([_ci("A", 2)], 1, ticket_count=1)
assert "3 items" in result # 2 + 1
# ---------------------------------------------------------------------------
# build_sumup_reference
# ---------------------------------------------------------------------------
class TestBuildSumupReference:
def test_with_page_config(self):
pc = SimpleNamespace(sumup_checkout_prefix="SHOP-")
assert build_sumup_reference(42, pc) == "SHOP-42"
def test_without_page_config(self):
with patch("cart.bp.cart.services.checkout.config",
return_value={"sumup": {"checkout_reference_prefix": "RA-"}}):
assert build_sumup_reference(99) == "RA-99"
def test_no_prefix(self):
with patch("cart.bp.cart.services.checkout.config",
return_value={"sumup": {}}):
assert build_sumup_reference(1) == "1"
# ---------------------------------------------------------------------------
# build_webhook_url
# ---------------------------------------------------------------------------
class TestBuildWebhookUrl:
def test_no_secret(self):
with patch("cart.bp.cart.services.checkout.config",
return_value={"sumup": {}}):
assert build_webhook_url("https://x.com/hook") == "https://x.com/hook"
def test_with_secret_no_query(self):
with patch("cart.bp.cart.services.checkout.config",
return_value={"sumup": {"webhook_secret": "s3cret"}}):
result = build_webhook_url("https://x.com/hook")
assert "?token=s3cret" in result
def test_with_secret_existing_query(self):
with patch("cart.bp.cart.services.checkout.config",
return_value={"sumup": {"webhook_secret": "s3cret"}}):
result = build_webhook_url("https://x.com/hook?a=1")
assert "&token=s3cret" in result
# ---------------------------------------------------------------------------
# validate_webhook_secret
# ---------------------------------------------------------------------------
class TestValidateWebhookSecret:
def test_no_secret_configured(self):
with patch("cart.bp.cart.services.checkout.config",
return_value={"sumup": {}}):
assert validate_webhook_secret(None) is True
def test_correct_token(self):
with patch("cart.bp.cart.services.checkout.config",
return_value={"sumup": {"webhook_secret": "abc"}}):
assert validate_webhook_secret("abc") is True
def test_wrong_token(self):
with patch("cart.bp.cart.services.checkout.config",
return_value={"sumup": {"webhook_secret": "abc"}}):
assert validate_webhook_secret("wrong") is False
def test_none_token_with_secret(self):
with patch("cart.bp.cart.services.checkout.config",
return_value={"sumup": {"webhook_secret": "abc"}}):
assert validate_webhook_secret(None) is False

View File

@@ -0,0 +1,77 @@
"""Unit tests for ticket grouping logic."""
from __future__ import annotations
from types import SimpleNamespace
from datetime import datetime
import pytest
from cart.bp.cart.services.ticket_groups import group_tickets
def _ticket(entry_id=1, entry_name="Event", ticket_type_id=None,
ticket_type_name=None, price=10.0,
entry_start_at=None, entry_end_at=None):
return SimpleNamespace(
entry_id=entry_id,
entry_name=entry_name,
entry_start_at=entry_start_at or datetime(2025, 6, 1, 10, 0),
entry_end_at=entry_end_at,
ticket_type_id=ticket_type_id,
ticket_type_name=ticket_type_name,
price=price,
)
class TestGroupTickets:
def test_empty(self):
assert group_tickets([]) == []
def test_single_ticket(self):
result = group_tickets([_ticket()])
assert len(result) == 1
assert result[0]["quantity"] == 1
assert result[0]["line_total"] == 10.0
def test_same_group_merged(self):
tickets = [_ticket(entry_id=1), _ticket(entry_id=1)]
result = group_tickets(tickets)
assert len(result) == 1
assert result[0]["quantity"] == 2
assert result[0]["line_total"] == 20.0
def test_different_entries_separate(self):
tickets = [_ticket(entry_id=1), _ticket(entry_id=2)]
result = group_tickets(tickets)
assert len(result) == 2
def test_different_ticket_types_separate(self):
tickets = [
_ticket(entry_id=1, ticket_type_id=1, ticket_type_name="Adult"),
_ticket(entry_id=1, ticket_type_id=2, ticket_type_name="Child"),
]
result = group_tickets(tickets)
assert len(result) == 2
def test_none_price(self):
result = group_tickets([_ticket(price=None)])
assert result[0]["line_total"] == 0.0
def test_ordering_preserved(self):
tickets = [
_ticket(entry_id=2, entry_name="Second"),
_ticket(entry_id=1, entry_name="First"),
]
result = group_tickets(tickets)
assert result[0]["entry_name"] == "Second"
assert result[1]["entry_name"] == "First"
def test_metadata_from_first_ticket(self):
tickets = [
_ticket(entry_id=1, entry_name="A", price=5.0),
_ticket(entry_id=1, entry_name="B", price=10.0),
]
result = group_tickets(tickets)
assert result[0]["entry_name"] == "A" # from first
assert result[0]["price"] == 5.0 # from first
assert result[0]["line_total"] == 15.0 # accumulated

47
cart/tests/test_total.py Normal file
View File

@@ -0,0 +1,47 @@
"""Unit tests for cart total calculations."""
from __future__ import annotations
from decimal import Decimal
from types import SimpleNamespace
import pytest
from cart.bp.cart.services.total import total
def _item(special=None, regular=None, qty=1):
return SimpleNamespace(
product_special_price=special,
product_regular_price=regular,
quantity=qty,
)
class TestTotal:
def test_empty_cart(self):
assert total([]) == 0
def test_regular_price_only(self):
result = total([_item(regular=10.0, qty=2)])
assert result == Decimal("20.0")
def test_special_price_preferred(self):
result = total([_item(special=5.0, regular=10.0, qty=1)])
assert result == Decimal("5.0")
def test_none_prices_excluded(self):
result = total([_item(special=None, regular=None, qty=1)])
assert result == 0
def test_mixed_items(self):
items = [
_item(special=5.0, qty=2), # 10
_item(regular=3.0, qty=3), # 9
_item(), # excluded
]
result = total(items)
assert result == Decimal("19.0")
def test_quantity_multiplication(self):
result = total([_item(regular=7.5, qty=4)])
assert result == Decimal("30.0")

View File

@@ -367,25 +367,16 @@ services:
- ./test/runner.py:/app/runner.py
- ./test/path_setup.py:/app/path_setup.py
- ./test/entrypoint.sh:/usr/local/bin/entrypoint.sh
# sibling models
- ./blog/__init__.py:/app/blog/__init__.py:ro
- ./blog/models:/app/blog/models:ro
- ./market/__init__.py:/app/market/__init__.py:ro
- ./market/models:/app/market/models:ro
- ./cart/__init__.py:/app/cart/__init__.py:ro
- ./cart/models:/app/cart/models:ro
- ./events/__init__.py:/app/events/__init__.py:ro
- ./events/models:/app/events/models:ro
- ./federation/__init__.py:/app/federation/__init__.py:ro
- ./federation/models:/app/federation/models:ro
- ./account/__init__.py:/app/account/__init__.py:ro
- ./account/models:/app/account/models:ro
- ./relations/__init__.py:/app/relations/__init__.py:ro
- ./relations/models:/app/relations/models:ro
- ./likes/__init__.py:/app/likes/__init__.py:ro
- ./likes/models:/app/likes/models:ro
- ./orders/__init__.py:/app/orders/__init__.py:ro
- ./orders/models:/app/orders/models:ro
# sibling service code + tests
- ./blog:/app/blog:ro
- ./market:/app/market:ro
- ./cart:/app/cart:ro
- ./events:/app/events:ro
- ./federation:/app/federation:ro
- ./account:/app/account:ro
- ./relations:/app/relations:ro
- ./likes:/app/likes:ro
- ./orders:/app/orders:ro
test-unit:
build:

View File

@@ -15,6 +15,7 @@ from shared.sexp.jinja_bridge import render, load_service_components
from shared.sexp.helpers import (
call_url, get_asset_url, root_header_html,
post_header_html as _shared_post_header_html,
post_admin_header_html,
oob_header_html,
search_mobile_html, search_desktop_html,
full_page, oob_page,
@@ -35,11 +36,55 @@ _oob_header_html = oob_header_html
# Post header helpers — thin wrapper over shared post_header_html
# ---------------------------------------------------------------------------
def _clear_oob(*ids: str) -> str:
"""Generate OOB swaps to remove orphaned header rows/children."""
return "".join(f'<div id="{i}" hx-swap-oob="outerHTML"></div>' for i in ids)
# All possible header row/child IDs at each depth (deepest first)
_EVENTS_DEEP_IDS = [
"entry-admin-row", "entry-admin-header-child",
"entry-row", "entry-header-child",
"day-admin-row", "day-admin-header-child",
"day-row", "day-header-child",
"calendar-admin-row", "calendar-admin-header-child",
"calendar-row", "calendar-header-child",
"calendars-row", "calendars-header-child",
"post-admin-row", "post-admin-header-child",
]
def _clear_deeper_oob(*keep_ids: str) -> str:
"""Clear all events header rows/children NOT in keep_ids."""
to_clear = [i for i in _EVENTS_DEEP_IDS if i not in keep_ids]
return _clear_oob(*to_clear)
async def _ensure_container_nav(ctx: dict) -> dict:
"""Fetch container_nav_html if not already present (for post header row)."""
if ctx.get("container_nav_html"):
return ctx
post = ctx.get("post") or {}
post_id = post.get("id")
slug = post.get("slug", "")
if not post_id:
return ctx
from shared.infrastructure.fragments import fetch_fragments
nav_params = {
"container_type": "page",
"container_id": str(post_id),
"post_slug": slug,
}
events_nav, market_nav = await fetch_fragments([
("events", "container-nav", nav_params),
("market", "container-nav", nav_params),
], required=False)
return {**ctx, "container_nav_html": events_nav + market_nav}
def _post_header_html(ctx: dict, *, oob: bool = False) -> str:
"""Build the post-level header row (events-specific: calendar links + admin cog)."""
admin_nav = _post_nav_html(ctx)
effective_ctx = {**ctx, "post_admin_nav_html": admin_nav} if admin_nav else ctx
return _shared_post_header_html(effective_ctx, oob=oob)
"""Build the post-level header row — delegates to shared helper."""
return _shared_post_header_html(ctx, oob=oob)
def _post_nav_html(ctx: dict) -> str:
@@ -1236,6 +1281,7 @@ async def render_page_summary_oob(ctx: dict, entries, has_more, pending_tickets,
)
oobs = _post_header_html(ctx, oob=True)
oobs += _clear_deeper_oob("post-row", "post-header-child")
return oob_page(ctx, oobs_html=oobs, content_html=content)
@@ -1264,18 +1310,22 @@ async def render_page_summary_cards(entries, has_more, pending_tickets,
async def render_calendars_page(ctx: dict) -> str:
"""Full page: calendars listing."""
content = _calendars_main_panel_html(ctx)
hdr = root_header_html(ctx)
child = _post_header_html(ctx) + _calendars_header_html(ctx)
hdr += render("header-child", inner_html=child)
return full_page(ctx, header_rows_html=hdr, content_html=content)
ctx = await _ensure_container_nav(ctx)
slug = (ctx.get("post") or {}).get("slug", "")
root_hdr = root_header_html(ctx)
post_hdr = _post_header_html(ctx)
admin_hdr = post_admin_header_html(ctx, slug, selected="calendars")
return full_page(ctx, header_rows_html=root_hdr + post_hdr + admin_hdr, content_html=content)
async def render_calendars_oob(ctx: dict) -> str:
"""OOB response: calendars listing."""
content = _calendars_main_panel_html(ctx)
oobs = _post_header_html(ctx, oob=True)
oobs += _oob_header_html("post-header-child", "calendars-header-child",
_calendars_header_html(ctx))
ctx = await _ensure_container_nav(ctx)
slug = (ctx.get("post") or {}).get("slug", "")
oobs = post_admin_header_html(ctx, slug, oob=True, selected="calendars")
oobs += _clear_deeper_oob("post-row", "post-header-child",
"post-admin-row", "post-admin-header-child")
return oob_page(ctx, oobs_html=oobs, content_html=content)
@@ -1298,6 +1348,8 @@ async def render_calendar_oob(ctx: dict) -> str:
oobs = _post_header_html(ctx, oob=True)
oobs += _oob_header_html("post-header-child", "calendar-header-child",
_calendar_header_html(ctx))
oobs += _clear_deeper_oob("post-row", "post-header-child",
"calendar-row", "calendar-header-child")
return oob_page(ctx, oobs_html=oobs, content_html=content)
@@ -1321,6 +1373,9 @@ async def render_day_oob(ctx: dict) -> str:
oobs = _calendar_header_html(ctx, oob=True)
oobs += _oob_header_html("calendar-header-child", "day-header-child",
_day_header_html(ctx))
oobs += _clear_deeper_oob("post-row", "post-header-child",
"calendar-row", "calendar-header-child",
"day-row", "day-header-child")
return oob_page(ctx, oobs_html=oobs, content_html=content)
@@ -1331,20 +1386,31 @@ async def render_day_oob(ctx: dict) -> str:
async def render_day_admin_page(ctx: dict) -> str:
"""Full page: day admin."""
content = _day_admin_main_panel_html(ctx)
hdr = root_header_html(ctx)
child = (_post_header_html(ctx)
+ _calendar_header_html(ctx) + _day_header_html(ctx)
ctx = await _ensure_container_nav(ctx)
slug = (ctx.get("post") or {}).get("slug", "")
root_hdr = root_header_html(ctx)
post_hdr = _post_header_html(ctx)
admin_hdr = post_admin_header_html(ctx, slug, selected="calendars")
child = (admin_hdr + _calendar_header_html(ctx) + _day_header_html(ctx)
+ _day_admin_header_html(ctx))
hdr += render("header-child", inner_html=child)
hdr = root_hdr + post_hdr + render("header-child", inner_html=child)
return full_page(ctx, header_rows_html=hdr, content_html=content)
async def render_day_admin_oob(ctx: dict) -> str:
"""OOB response: day admin."""
content = _day_admin_main_panel_html(ctx)
oobs = _calendar_header_html(ctx, oob=True)
ctx = await _ensure_container_nav(ctx)
slug = (ctx.get("post") or {}).get("slug", "")
oobs = (post_admin_header_html(ctx, slug, oob=True, selected="calendars")
+ _calendar_header_html(ctx, oob=True))
oobs += _oob_header_html("day-header-child", "day-admin-header-child",
_day_admin_header_html(ctx))
oobs += _clear_deeper_oob("post-row", "post-header-child",
"post-admin-row", "post-admin-header-child",
"calendar-row", "calendar-header-child",
"day-row", "day-header-child",
"day-admin-row", "day-admin-header-child")
return oob_page(ctx, oobs_html=oobs, content_html=content)
@@ -1352,22 +1418,39 @@ async def render_day_admin_oob(ctx: dict) -> str:
# Calendar admin
# ---------------------------------------------------------------------------
def _events_post_admin_header_html(ctx: dict, *, oob: bool = False,
selected: str = "") -> str:
"""Post-level admin row for events — delegates to shared helper."""
slug = (ctx.get("post") or {}).get("slug", "")
return post_admin_header_html(ctx, slug, oob=oob, selected=selected)
async def render_calendar_admin_page(ctx: dict) -> str:
"""Full page: calendar admin."""
content = _calendar_admin_main_panel_html(ctx)
hdr = root_header_html(ctx)
child = (_post_header_html(ctx)
+ _calendar_header_html(ctx) + _calendar_admin_header_html(ctx))
hdr += render("header-child", inner_html=child)
ctx = await _ensure_container_nav(ctx)
slug = (ctx.get("post") or {}).get("slug", "")
root_hdr = root_header_html(ctx)
post_hdr = _post_header_html(ctx)
admin_hdr = post_admin_header_html(ctx, slug, selected="calendars")
child = admin_hdr + _calendar_header_html(ctx) + _calendar_admin_header_html(ctx)
hdr = root_hdr + post_hdr + render("header-child", inner_html=child)
return full_page(ctx, header_rows_html=hdr, content_html=content)
async def render_calendar_admin_oob(ctx: dict) -> str:
"""OOB response: calendar admin."""
content = _calendar_admin_main_panel_html(ctx)
oobs = _calendar_header_html(ctx, oob=True)
ctx = await _ensure_container_nav(ctx)
slug = (ctx.get("post") or {}).get("slug", "")
oobs = (post_admin_header_html(ctx, slug, oob=True, selected="calendars")
+ _calendar_header_html(ctx, oob=True))
oobs += _oob_header_html("calendar-header-child", "calendar-admin-header-child",
_calendar_admin_header_html(ctx))
oobs += _clear_deeper_oob("post-row", "post-header-child",
"post-admin-row", "post-admin-header-child",
"calendar-row", "calendar-header-child",
"calendar-admin-row", "calendar-admin-header-child")
return oob_page(ctx, oobs_html=oobs, content_html=content)
@@ -1381,10 +1464,13 @@ async def render_slots_page(ctx: dict) -> str:
slots = ctx.get("slots") or []
calendar = ctx.get("calendar")
content = render_slots_table(slots, calendar)
hdr = root_header_html(ctx)
child = (_post_header_html(ctx)
+ _calendar_header_html(ctx) + _calendar_admin_header_html(ctx))
hdr += render("header-child", inner_html=child)
ctx = await _ensure_container_nav(ctx)
slug = (ctx.get("post") or {}).get("slug", "")
root_hdr = root_header_html(ctx)
post_hdr = _post_header_html(ctx)
admin_hdr = post_admin_header_html(ctx, slug, selected="calendars")
child = admin_hdr + _calendar_header_html(ctx) + _calendar_admin_header_html(ctx)
hdr = root_hdr + post_hdr + render("header-child", inner_html=child)
return full_page(ctx, header_rows_html=hdr, content_html=content)
@@ -1393,7 +1479,14 @@ async def render_slots_oob(ctx: dict) -> str:
slots = ctx.get("slots") or []
calendar = ctx.get("calendar")
content = render_slots_table(slots, calendar)
oobs = _calendar_admin_header_html(ctx, oob=True)
ctx = await _ensure_container_nav(ctx)
slug = (ctx.get("post") or {}).get("slug", "")
oobs = (post_admin_header_html(ctx, slug, oob=True, selected="calendars")
+ _calendar_admin_header_html(ctx, oob=True))
oobs += _clear_deeper_oob("post-row", "post-header-child",
"post-admin-row", "post-admin-header-child",
"calendar-row", "calendar-header-child",
"calendar-admin-row", "calendar-admin-header-child")
return oob_page(ctx, oobs_html=oobs, content_html=content)
@@ -1848,6 +1941,10 @@ async def render_entry_oob(ctx: dict) -> str:
oobs = _day_header_html(ctx, oob=True)
oobs += _oob_header_html("day-header-child", "entry-header-child",
_entry_header_html(ctx))
oobs += _clear_deeper_oob("post-row", "post-header-child",
"calendar-row", "calendar-header-child",
"day-row", "day-header-child",
"entry-row", "entry-header-child")
nav_html = _entry_nav_html(ctx)
return oob_page(ctx, oobs_html=oobs, content_html=content, menu_html=nav_html)
@@ -2846,11 +2943,14 @@ def _entry_admin_main_panel_html(ctx: dict) -> str:
async def render_entry_admin_page(ctx: dict) -> str:
"""Full page: entry admin."""
content = _entry_admin_main_panel_html(ctx)
hdr = root_header_html(ctx)
child = (_post_header_html(ctx)
+ _calendar_header_html(ctx) + _day_header_html(ctx)
ctx = await _ensure_container_nav(ctx)
slug = (ctx.get("post") or {}).get("slug", "")
root_hdr = root_header_html(ctx)
post_hdr = _post_header_html(ctx)
admin_hdr = post_admin_header_html(ctx, slug, selected="calendars")
child = (admin_hdr + _calendar_header_html(ctx) + _day_header_html(ctx)
+ _entry_header_html(ctx) + _entry_admin_header_html(ctx))
hdr += render("header-child", inner_html=child)
hdr = root_hdr + post_hdr + render("header-child", inner_html=child)
nav_html = render("events-admin-placeholder-nav")
return full_page(ctx, header_rows_html=hdr, content_html=content, menu_html=nav_html)
@@ -2858,9 +2958,18 @@ async def render_entry_admin_page(ctx: dict) -> str:
async def render_entry_admin_oob(ctx: dict) -> str:
"""OOB response: entry admin."""
content = _entry_admin_main_panel_html(ctx)
oobs = _entry_header_html(ctx, oob=True)
ctx = await _ensure_container_nav(ctx)
slug = (ctx.get("post") or {}).get("slug", "")
oobs = (post_admin_header_html(ctx, slug, oob=True, selected="calendars")
+ _entry_header_html(ctx, oob=True))
oobs += _oob_header_html("entry-header-child", "entry-admin-header-child",
_entry_admin_header_html(ctx))
oobs += _clear_deeper_oob("post-row", "post-header-child",
"post-admin-row", "post-admin-header-child",
"calendar-row", "calendar-header-child",
"day-row", "day-header-child",
"entry-row", "entry-header-child",
"entry-admin-row", "entry-admin-header-child")
nav_html = render("events-admin-placeholder-nav")
return oob_page(ctx, oobs_html=oobs, content_html=content, menu_html=nav_html)
@@ -2905,11 +3014,14 @@ async def render_slot_page(ctx: dict) -> str:
if not slot or not calendar:
return ""
content = render_slot_main_panel(slot, calendar)
hdr = root_header_html(ctx)
child = (_post_header_html(ctx)
+ _calendar_header_html(ctx) + _calendar_admin_header_html(ctx)
ctx = await _ensure_container_nav(ctx)
slug = (ctx.get("post") or {}).get("slug", "")
root_hdr = root_header_html(ctx)
post_hdr = _post_header_html(ctx)
admin_hdr = post_admin_header_html(ctx, slug, selected="calendars")
child = (admin_hdr + _calendar_header_html(ctx) + _calendar_admin_header_html(ctx)
+ _slot_header_html(ctx))
hdr += render("header-child", inner_html=child)
hdr = root_hdr + post_hdr + render("header-child", inner_html=child)
return full_page(ctx, header_rows_html=hdr, content_html=content)
@@ -2920,9 +3032,17 @@ async def render_slot_oob(ctx: dict) -> str:
if not slot or not calendar:
return ""
content = render_slot_main_panel(slot, calendar)
oobs = _calendar_admin_header_html(ctx, oob=True)
ctx = await _ensure_container_nav(ctx)
slug = (ctx.get("post") or {}).get("slug", "")
oobs = (post_admin_header_html(ctx, slug, oob=True, selected="calendars")
+ _calendar_admin_header_html(ctx, oob=True))
oobs += _oob_header_html("calendar-admin-header-child", "slot-header-child",
_slot_header_html(ctx))
oobs += _clear_deeper_oob("post-row", "post-header-child",
"post-admin-row", "post-admin-header-child",
"calendar-row", "calendar-header-child",
"calendar-admin-row", "calendar-admin-header-child",
"slot-row", "slot-header-child")
return oob_page(ctx, oobs_html=oobs, content_html=content)

0
events/tests/__init__.py Normal file
View File

30
events/tests/conftest.py Normal file
View File

@@ -0,0 +1,30 @@
"""Events test fixtures — direct module loading to avoid bp __init__ chains."""
from __future__ import annotations
import importlib.util
import sys
def _load(name: str, path: str):
"""Import a .py file directly, bypassing package __init__ chains."""
if name in sys.modules:
return sys.modules[name]
spec = importlib.util.spec_from_file_location(name, path)
mod = importlib.util.module_from_spec(spec)
sys.modules[name] = mod
spec.loader.exec_module(mod)
return mod
# Ensure events/models is importable as 'models'
sys.path.insert(0, "/app/events")
# Pre-load target modules that would fail via normal package import
_load("events.bp.calendar.services.calendar_view",
"/app/events/bp/calendar/services/calendar_view.py")
_load("events.bp.calendar.services.slots",
"/app/events/bp/calendar/services/slots.py")
_load("events.bp.calendars.services.calendars",
"/app/events/bp/calendars/services/calendars.py")
_load("events.bp.calendar_entries.routes",
"/app/events/bp/calendar_entries/routes.py")

View File

@@ -0,0 +1,82 @@
"""Unit tests for calendar view helpers."""
from __future__ import annotations
from datetime import date
import pytest
from events.bp.calendar.services.calendar_view import add_months, build_calendar_weeks
class TestAddMonths:
def test_same_month(self):
assert add_months(2025, 6, 0) == (2025, 6)
def test_forward_one(self):
assert add_months(2025, 6, 1) == (2025, 7)
def test_december_to_january(self):
assert add_months(2025, 12, 1) == (2026, 1)
def test_january_to_december(self):
assert add_months(2025, 1, -1) == (2024, 12)
def test_multi_year_forward(self):
assert add_months(2025, 1, 24) == (2027, 1)
def test_multi_year_backward(self):
assert add_months(2025, 3, -15) == (2023, 12)
def test_negative_large(self):
y, m = add_months(2025, 6, -30)
assert m >= 1 and m <= 12
assert y == 2022 # 2025-06 minus 30 months = 2022-12
def test_forward_eleven(self):
assert add_months(2025, 1, 11) == (2025, 12)
def test_forward_twelve(self):
assert add_months(2025, 1, 12) == (2026, 1)
class TestBuildCalendarWeeks:
def test_returns_weeks(self):
weeks = build_calendar_weeks(2025, 6)
assert len(weeks) >= 4
assert len(weeks) <= 6
def test_seven_days_per_week(self):
weeks = build_calendar_weeks(2025, 1)
for week in weeks:
assert len(week) == 7
def test_day_structure(self):
weeks = build_calendar_weeks(2025, 6)
day = weeks[0][0]
assert "date" in day
assert "in_month" in day
assert "is_today" in day
assert isinstance(day["date"], date)
def test_in_month_flag(self):
weeks = build_calendar_weeks(2025, 6)
# First day of first week might be in May
june_days = [
d for week in weeks for d in week if d["in_month"]
]
assert len(june_days) == 30 # June has 30 days
def test_february_leap_year(self):
weeks = build_calendar_weeks(2024, 2)
feb_days = [d for week in weeks for d in week if d["in_month"]]
assert len(feb_days) == 29
def test_february_non_leap(self):
weeks = build_calendar_weeks(2025, 2)
feb_days = [d for week in weeks for d in week if d["in_month"]]
assert len(feb_days) == 28
def test_starts_on_monday(self):
weeks = build_calendar_weeks(2025, 6)
# First day of first week should be a Monday
assert weeks[0][0]["date"].weekday() == 0 # Monday

View File

@@ -0,0 +1,57 @@
"""Unit tests for entry cost calculation."""
from __future__ import annotations
from datetime import datetime, time
from decimal import Decimal
from types import SimpleNamespace
import pytest
from events.bp.calendar_entries.routes import calculate_entry_cost
def _slot(cost=None, flexible=False, time_start=None, time_end=None):
return SimpleNamespace(
cost=cost,
flexible=flexible,
time_start=time_start or time(9, 0),
time_end=time_end or time(17, 0),
)
class TestCalculateEntryCost:
def test_no_cost(self):
assert calculate_entry_cost(_slot(cost=None),
datetime(2025, 1, 1, 9), datetime(2025, 1, 1, 17)) == Decimal("0")
def test_fixed_slot(self):
result = calculate_entry_cost(_slot(cost=100, flexible=False),
datetime(2025, 1, 1, 10), datetime(2025, 1, 1, 12))
assert result == Decimal("100")
def test_flexible_full_duration(self):
slot = _slot(cost=80, flexible=True, time_start=time(9, 0), time_end=time(17, 0))
# 8 hours slot, booking full 8 hours
result = calculate_entry_cost(slot,
datetime(2025, 1, 1, 9, 0), datetime(2025, 1, 1, 17, 0))
assert result == Decimal("80")
def test_flexible_half_duration(self):
slot = _slot(cost=80, flexible=True, time_start=time(9, 0), time_end=time(17, 0))
# 4 hours of 8-hour slot = half
result = calculate_entry_cost(slot,
datetime(2025, 1, 1, 9, 0), datetime(2025, 1, 1, 13, 0))
assert result == Decimal("40")
def test_flexible_no_time_end(self):
slot = _slot(cost=50, flexible=True, time_end=None)
result = calculate_entry_cost(slot,
datetime(2025, 1, 1, 9), datetime(2025, 1, 1, 12))
# When time_end is None, function still calculates based on booking duration
assert isinstance(result, Decimal)
def test_flexible_zero_slot_duration(self):
slot = _slot(cost=50, flexible=True, time_start=time(9, 0), time_end=time(9, 0))
result = calculate_entry_cost(slot,
datetime(2025, 1, 1, 9, 0), datetime(2025, 1, 1, 10, 0))
assert result == Decimal("0")

View File

@@ -0,0 +1,59 @@
"""Unit tests for slot helper functions."""
from __future__ import annotations
import pytest
from events.bp.calendar.services.slots import _b
class TestBoolParse:
def test_true_bool(self):
assert _b(True) is True
def test_false_bool(self):
assert _b(False) is False
def test_string_true(self):
assert _b("true") is True
def test_string_True(self):
assert _b("True") is True
def test_string_1(self):
assert _b("1") is True
def test_string_yes(self):
assert _b("yes") is True
def test_string_y(self):
assert _b("y") is True
def test_string_t(self):
assert _b("t") is True
def test_string_on(self):
assert _b("on") is True
def test_string_false(self):
assert _b("false") is False
def test_string_0(self):
assert _b("0") is False
def test_string_no(self):
assert _b("no") is False
def test_string_off(self):
assert _b("off") is False
def test_string_empty(self):
assert _b("") is False
def test_int_1(self):
assert _b(1) is True
def test_int_0(self):
assert _b(0) is False
def test_random_string(self):
assert _b("maybe") is False

View File

@@ -0,0 +1,42 @@
"""Unit tests for events slugify utility."""
from __future__ import annotations
import pytest
from events.bp.calendars.services.calendars import slugify
class TestSlugify:
def test_basic(self):
assert slugify("My Calendar") == "my-calendar"
def test_unicode(self):
assert slugify("café résumé") == "cafe-resume"
def test_slashes(self):
assert slugify("foo/bar") == "foo-bar"
def test_special_chars(self):
assert slugify("hello!!world") == "hello-world"
def test_collapse_dashes(self):
assert slugify("a---b") == "a-b"
def test_strip_dashes(self):
assert slugify("--hello--") == "hello"
def test_empty_fallback(self):
assert slugify("") == "calendar"
def test_none_fallback(self):
assert slugify(None) == "calendar"
def test_max_len(self):
result = slugify("a" * 300, max_len=10)
assert len(result) <= 10
def test_numbers(self):
assert slugify("event-2025") == "event-2025"
def test_already_clean(self):
assert slugify("my-event") == "my-event"

View File

View File

@@ -0,0 +1,137 @@
"""Unit tests for federation DTO conversion functions."""
from __future__ import annotations
from datetime import datetime
from types import SimpleNamespace
from unittest.mock import patch
import pytest
from shared.services.federation_impl import (
_actor_to_dto, _activity_to_dto, _follower_to_dto,
_remote_actor_to_dto, _remote_post_to_dto,
)
def _actor(**kwargs):
defaults = {
"id": 1, "user_id": 10,
"preferred_username": "alice",
"public_key_pem": "-----BEGIN PUBLIC KEY-----",
"display_name": "Alice", "summary": "Hello",
"created_at": datetime(2025, 1, 1),
}
defaults.update(kwargs)
return SimpleNamespace(**defaults)
def _activity(**kwargs):
defaults = {
"id": 1, "activity_id": "https://example.com/activity/1",
"activity_type": "Create", "actor_profile_id": 1,
"object_type": "Note", "object_data": {"content": "hi"},
"published": datetime(2025, 1, 1), "is_local": True,
"source_type": "post", "source_id": 42, "ipfs_cid": None,
}
defaults.update(kwargs)
return SimpleNamespace(**defaults)
def _follower(**kwargs):
defaults = {
"id": 1, "actor_profile_id": 1,
"follower_acct": "bob@remote.com",
"follower_inbox": "https://remote.com/inbox",
"follower_actor_url": "https://remote.com/users/bob",
"created_at": datetime(2025, 1, 1),
"app_domain": "federation.rose-ash.com",
}
defaults.update(kwargs)
return SimpleNamespace(**defaults)
def _remote_actor(**kwargs):
defaults = {
"id": 1, "actor_url": "https://remote.com/users/bob",
"inbox_url": "https://remote.com/inbox",
"preferred_username": "bob", "domain": "remote.com",
"display_name": "Bob", "summary": "Hi",
"icon_url": "https://remote.com/avatar.jpg",
"shared_inbox_url": None, "public_key_pem": None,
}
defaults.update(kwargs)
return SimpleNamespace(**defaults)
def _remote_post(**kwargs):
defaults = {
"id": 1, "remote_actor_id": 1,
"object_id": "https://remote.com/note/1",
"content": "<p>Hello</p>", "summary": None,
"url": "https://remote.com/note/1",
"attachment_data": [{"type": "Image", "url": "img.jpg"}],
"tag_data": [{"type": "Hashtag", "name": "#test"}],
"published": datetime(2025, 1, 1),
}
defaults.update(kwargs)
return SimpleNamespace(**defaults)
class TestActorToDto:
@patch("shared.services.federation_impl._domain", return_value="fed.example.com")
def test_basic(self, mock_domain):
dto = _actor_to_dto(_actor())
assert dto.preferred_username == "alice"
assert dto.inbox_url == "https://fed.example.com/users/alice/inbox"
assert dto.outbox_url == "https://fed.example.com/users/alice/outbox"
assert dto.user_id == 10
assert dto.display_name == "Alice"
class TestActivityToDto:
def test_fields(self):
dto = _activity_to_dto(_activity())
assert dto.activity_type == "Create"
assert dto.object_type == "Note"
assert dto.is_local is True
assert dto.ipfs_cid is None
class TestFollowerToDto:
def test_fields(self):
dto = _follower_to_dto(_follower())
assert dto.follower_acct == "bob@remote.com"
assert dto.app_domain == "federation.rose-ash.com"
class TestRemoteActorToDto:
def test_fields(self):
dto = _remote_actor_to_dto(_remote_actor())
assert dto.preferred_username == "bob"
assert dto.domain == "remote.com"
assert dto.icon_url == "https://remote.com/avatar.jpg"
class TestRemotePostToDto:
def test_with_actor(self):
actor = _remote_actor()
dto = _remote_post_to_dto(_remote_post(), actor=actor)
assert dto.content == "<p>Hello</p>"
assert dto.actor is not None
assert dto.actor.preferred_username == "bob"
def test_without_actor(self):
dto = _remote_post_to_dto(_remote_post(), actor=None)
assert dto.actor is None
def test_none_content_becomes_empty(self):
dto = _remote_post_to_dto(_remote_post(content=None))
assert dto.content == ""
def test_none_attachments_becomes_list(self):
dto = _remote_post_to_dto(_remote_post(attachment_data=None))
assert dto.attachments == []
def test_none_tags_becomes_list(self):
dto = _remote_post_to_dto(_remote_post(tag_data=None))
assert dto.tags == []

View File

@@ -0,0 +1,70 @@
"""Unit tests for federation identity validation."""
from __future__ import annotations
import pytest
from federation.bp.identity.routes import USERNAME_RE, RESERVED
class TestUsernameRegex:
def test_valid_simple(self):
assert USERNAME_RE.match("alice")
def test_valid_with_numbers(self):
assert USERNAME_RE.match("user42")
def test_valid_with_underscore(self):
assert USERNAME_RE.match("alice_bob")
def test_min_length(self):
assert USERNAME_RE.match("abc")
def test_too_short(self):
assert not USERNAME_RE.match("ab")
def test_single_char(self):
assert not USERNAME_RE.match("a")
def test_max_length(self):
assert USERNAME_RE.match("a" * 32)
def test_too_long(self):
assert not USERNAME_RE.match("a" * 33)
def test_starts_with_digit(self):
assert not USERNAME_RE.match("1abc")
def test_starts_with_underscore(self):
assert not USERNAME_RE.match("_abc")
def test_uppercase_rejected(self):
assert not USERNAME_RE.match("Alice")
def test_hyphen_rejected(self):
assert not USERNAME_RE.match("alice-bob")
def test_spaces_rejected(self):
assert not USERNAME_RE.match("alice bob")
def test_empty_rejected(self):
assert not USERNAME_RE.match("")
class TestReservedUsernames:
def test_admin_reserved(self):
assert "admin" in RESERVED
def test_root_reserved(self):
assert "root" in RESERVED
def test_api_reserved(self):
assert "api" in RESERVED
def test_inbox_reserved(self):
assert "inbox" in RESERVED
def test_normal_name_not_reserved(self):
assert "alice" not in RESERVED
def test_at_least_20_reserved(self):
assert len(RESERVED) >= 20

0
likes/tests/__init__.py Normal file
View File

View File

@@ -0,0 +1,76 @@
"""Unit tests for likes data endpoint guard logic.
The actual data handlers require Quart request context and DB,
but we can test the guard validation logic patterns used throughout.
"""
from __future__ import annotations
import pytest
class TestLikedGuardPatterns:
"""Test the validation patterns used in likes data endpoints.
The handlers return early with empty/false results when required
params are missing. These tests verify those conditions.
"""
def test_is_liked_requires_user_id(self):
"""Without user_id, is_liked returns {'liked': False}."""
# Simulates: user_id = None, target_type = "product"
user_id = None
target_type = "product"
if not user_id or not target_type:
result = {"liked": False}
else:
result = {"liked": True} # would check DB
assert result == {"liked": False}
def test_is_liked_requires_target_type(self):
user_id = 1
target_type = ""
if not user_id or not target_type:
result = {"liked": False}
else:
result = {"liked": True}
assert result == {"liked": False}
def test_is_liked_requires_slug_or_id(self):
"""Without target_slug or target_id, returns {'liked': False}."""
target_slug = None
target_id = None
if target_slug is None and target_id is None:
result = {"liked": False}
else:
result = {"liked": True}
assert result == {"liked": False}
def test_liked_slugs_empty_without_user_id(self):
user_id = None
target_type = "product"
if not user_id or not target_type:
result = []
else:
result = ["slug-1"]
assert result == []
def test_liked_ids_empty_without_target_type(self):
user_id = 1
target_type = ""
if not user_id or not target_type:
result = []
else:
result = [1, 2]
assert result == []
def test_all_params_present(self):
user_id = 1
target_type = "product"
target_slug = "my-product"
if not user_id or not target_type:
result = {"liked": False}
elif target_slug is not None:
result = {"liked": True} # would check DB
else:
result = {"liked": False}
assert result == {"liked": True}

View File

@@ -11,7 +11,7 @@ from sqlalchemy import select
from shared.infrastructure.factory import create_base_app
from shared.config import config
from bp import register_market_bp, register_all_markets, register_page_markets, register_fragments, register_actions, register_data
from bp import register_market_bp, register_all_markets, register_page_markets, register_page_admin, register_fragments, register_actions, register_data
async def market_context() -> dict:
@@ -111,6 +111,12 @@ def create_app() -> "Quart":
url_prefix="/<slug>",
)
# Page admin: /<slug>/admin/ — post-level admin for markets
app.register_blueprint(
register_page_admin(),
url_prefix="/<slug>/admin",
)
# Market blueprint nested under post slug: /<page_slug>/<market_slug>/
app.register_blueprint(
register_market_bp(

View File

@@ -2,6 +2,7 @@ from .market.routes import register as register_market_bp
from .product.routes import register as register_product
from .all_markets.routes import register as register_all_markets
from .page_markets.routes import register as register_page_markets
from .page_admin.routes import register as register_page_admin
from .fragments import register_fragments
from .actions import register_actions
from .data import register_data

View File

View File

@@ -0,0 +1,25 @@
from __future__ import annotations
from quart import make_response, Blueprint
from shared.browser.app.authz import require_admin
from shared.browser.app.utils.htmx import is_htmx_request
def register():
bp = Blueprint("page_admin", __name__)
@bp.get("/")
@require_admin
async def admin(**kwargs):
from shared.sexp.page import get_template_context
from sexp.sexp_components import render_page_admin_page, render_page_admin_oob
tctx = await get_template_context()
if not is_htmx_request():
html = await render_page_admin_page(tctx)
else:
html = await render_page_admin_oob(tctx)
return await make_response(html)
return bp

View File

@@ -15,6 +15,7 @@ from shared.sexp.jinja_bridge import render, load_service_components
from shared.sexp.helpers import (
call_url, get_asset_url, root_header_html,
post_header_html as _post_header_html,
post_admin_header_html,
oob_header_html as _oob_header_html,
search_mobile_html, search_desktop_html,
full_page, oob_page,
@@ -24,6 +25,25 @@ from shared.sexp.helpers import (
load_service_components(os.path.dirname(os.path.dirname(__file__)))
# ---------------------------------------------------------------------------
# OOB orphan cleanup
# ---------------------------------------------------------------------------
_MARKET_DEEP_IDS = [
"product-admin-row", "product-admin-header-child",
"product-row", "product-header-child",
"market-admin-row", "market-admin-header-child",
"market-row", "market-header-child",
"post-admin-row", "post-admin-header-child",
]
def _clear_deeper_oob(*keep_ids: str) -> str:
"""Clear all market header rows/children NOT in keep_ids."""
to_clear = [i for i in _MARKET_DEEP_IDS if i not in keep_ids]
return "".join(f'<div id="{i}" hx-swap-oob="outerHTML"></div>' for i in to_clear)
# ---------------------------------------------------------------------------
# Price helpers
# ---------------------------------------------------------------------------
@@ -1284,6 +1304,8 @@ async def render_market_home_oob(ctx: dict) -> str:
oobs = _oob_header_html("post-header-child", "market-header-child",
_market_header_html(ctx))
oobs += _post_header_html(ctx, oob=True)
oobs += _clear_deeper_oob("post-row", "post-header-child",
"market-row", "market-header-child")
menu = _mobile_nav_panel_html(ctx)
return oob_page(ctx, oobs_html=oobs, content_html=content, menu_html=menu)
@@ -1333,6 +1355,8 @@ async def render_browse_oob(ctx: dict) -> str:
oobs = _oob_header_html("post-header-child", "market-header-child",
_market_header_html(ctx))
oobs += _post_header_html(ctx, oob=True)
oobs += _clear_deeper_oob("post-row", "post-header-child",
"market-row", "market-header-child")
menu = _mobile_nav_panel_html(ctx)
filter_html = _mobile_filter_summary_html(ctx)
aside_html = _desktop_filter_html(ctx)
@@ -1368,6 +1392,9 @@ async def render_product_oob(ctx: dict, d: dict) -> str:
oobs = _market_header_html(ctx, oob=True)
oobs += _oob_header_html("market-header-child", "product-header-child",
_product_header_html(ctx, d))
oobs += _clear_deeper_oob("post-row", "post-header-child",
"market-row", "market-header-child",
"product-row", "product-header-child")
menu = _mobile_nav_panel_html(ctx)
return oob_page(ctx, oobs_html=oobs, content_html=content, menu_html=menu)
@@ -1394,6 +1421,10 @@ async def render_product_admin_oob(ctx: dict, d: dict) -> str:
oobs = _product_header_html(ctx, d, oob=True)
oobs += _oob_header_html("product-header-child", "product-admin-header-child",
_product_admin_header_html(ctx, d))
oobs += _clear_deeper_oob("post-row", "post-header-child",
"market-row", "market-header-child",
"product-row", "product-header-child",
"product-admin-row", "product-admin-header-child")
return oob_page(ctx, oobs_html=oobs, content_html=content)
@@ -1420,7 +1451,7 @@ async def render_market_admin_page(ctx: dict) -> str:
content = "market admin"
hdr = root_header_html(ctx)
child = _post_header_html(ctx) + _market_header_html(ctx) + _market_admin_header_html(ctx)
child = _post_header_html(ctx) + _market_header_html(ctx) + _market_admin_header_html(ctx, selected="markets")
hdr += render("header-child", inner_html=child)
return full_page(ctx, header_rows_html=hdr, content_html=content)
@@ -1431,21 +1462,42 @@ async def render_market_admin_oob(ctx: dict) -> str:
oobs = _market_header_html(ctx, oob=True)
oobs += _oob_header_html("market-header-child", "market-admin-header-child",
_market_admin_header_html(ctx))
_market_admin_header_html(ctx, selected="markets"))
oobs += _clear_deeper_oob("post-row", "post-header-child",
"market-row", "market-header-child",
"market-admin-row", "market-admin-header-child")
return oob_page(ctx, oobs_html=oobs, content_html=content)
def _market_admin_header_html(ctx: dict, *, oob: bool = False) -> str:
"""Build market admin header row."""
from quart import url_for
def _market_admin_header_html(ctx: dict, *, oob: bool = False, selected: str = "") -> str:
"""Build market admin header row — delegates to shared helper."""
slug = (ctx.get("post") or {}).get("slug", "")
return post_admin_header_html(ctx, slug, oob=oob, selected=selected)
link_href = url_for("market.admin.admin")
return render(
"menu-row",
id="market-admin-row", level=3,
link_href=link_href, link_label="admin", icon="fa fa-cog",
child_id="market-admin-header-child", oob=oob,
)
# ---------------------------------------------------------------------------
# Page admin (/<slug>/admin/) — post-level admin for markets
# ---------------------------------------------------------------------------
async def render_page_admin_page(ctx: dict) -> str:
"""Full page: page-level market admin."""
slug = (ctx.get("post") or {}).get("slug", "")
admin_hdr = post_admin_header_html(ctx, slug, selected="markets")
hdr = root_header_html(ctx)
child = _post_header_html(ctx) + admin_hdr
hdr += render("header-child", inner_html=child)
content = '<div id="main-panel"><div class="p-4 text-stone-500">Market admin</div></div>'
return full_page(ctx, header_rows_html=hdr, content_html=content)
async def render_page_admin_oob(ctx: dict) -> str:
"""OOB response: page-level market admin."""
slug = (ctx.get("post") or {}).get("slug", "")
oobs = post_admin_header_html(ctx, slug, oob=True, selected="markets")
oobs += _clear_deeper_oob("post-row", "post-header-child",
"post-admin-row", "post-admin-header-child")
content = '<div id="main-panel"><div class="p-4 text-stone-500">Market admin</div></div>'
return oob_page(ctx, oobs_html=oobs, content_html=content)
# ---------------------------------------------------------------------------

0
market/tests/__init__.py Normal file
View File

21
market/tests/conftest.py Normal file
View File

@@ -0,0 +1,21 @@
"""Market test fixtures — direct module loading to avoid bp __init__ chains."""
from __future__ import annotations
import importlib.util
import sys
def _load(name: str, path: str):
"""Import a .py file directly, bypassing package __init__ chains."""
if name in sys.modules:
return sys.modules[name]
spec = importlib.util.spec_from_file_location(name, path)
mod = importlib.util.module_from_spec(spec)
sys.modules[name] = mod
spec.loader.exec_module(mod)
return mod
sys.path.insert(0, "/app/market")
_load("market.scrape.listings", "/app/market/scrape/listings.py")

View File

@@ -0,0 +1,78 @@
"""Unit tests for listings parsing utilities."""
from __future__ import annotations
import pytest
from market.scrape.listings import (
parse_total_pages_from_text,
_first_from_srcset,
_dedupe_preserve_order_by,
_filename_key,
)
class TestParseTotalPages:
def test_standard(self):
assert parse_total_pages_from_text("Showing 36 of 120") == 4
def test_exact_fit(self):
assert parse_total_pages_from_text("Showing 36 of 36") == 1
def test_partial_page(self):
assert parse_total_pages_from_text("Showing 36 of 37") == 2
def test_no_match(self):
assert parse_total_pages_from_text("no data") is None
def test_case_insensitive(self):
# shown=12 is in {12,24,36} so per_page=36, ceil(60/36)=2
assert parse_total_pages_from_text("showing 12 of 60") == 2
def test_small_shown(self):
# shown=10, not in {12,24,36}, so per_page=10
assert parse_total_pages_from_text("Showing 10 of 100") == 10
class TestFirstFromSrcset:
def test_single_entry(self):
assert _first_from_srcset("img.jpg 1x") == "img.jpg"
def test_multiple_entries(self):
assert _first_from_srcset("a.jpg 1x, b.jpg 2x") == "a.jpg"
def test_empty(self):
assert _first_from_srcset("") is None
def test_none(self):
assert _first_from_srcset(None) is None
class TestDedupePreserveOrder:
def test_no_dupes(self):
result = _dedupe_preserve_order_by(["a", "b", "c"], key=str)
assert result == ["a", "b", "c"]
def test_removes_dupes(self):
result = _dedupe_preserve_order_by(["a", "b", "a"], key=str)
assert result == ["a", "b"]
def test_empty_strings_skipped(self):
result = _dedupe_preserve_order_by(["a", "", "b"], key=str)
assert result == ["a", "b"]
def test_preserves_order(self):
result = _dedupe_preserve_order_by(["c", "b", "a", "b"], key=str)
assert result == ["c", "b", "a"]
class TestFilenameKey:
def test_basic(self):
assert _filename_key("https://img.com/photos/pic.jpg") == "img.com:pic.jpg"
def test_trailing_slash(self):
k = _filename_key("https://img.com/photos/")
assert k == "img.com:photos"
def test_case_insensitive(self):
k = _filename_key("https://IMG.COM/PIC.JPG")
assert k == "img.com:pic.jpg"

View File

@@ -0,0 +1,96 @@
"""Unit tests for price parsing utilities."""
from __future__ import annotations
import pytest
from market.scrape.product.helpers.price import parse_price, parse_case_size
class TestParsePrice:
def test_gbp(self):
val, cur, raw = parse_price("£30.50")
assert val == 30.5
assert cur == "GBP"
def test_eur(self):
val, cur, raw = parse_price("€1,234.00")
assert val == 1234.0
assert cur == "EUR"
def test_usd(self):
val, cur, raw = parse_price("$9.99")
assert val == 9.99
assert cur == "USD"
def test_no_symbol(self):
val, cur, raw = parse_price("42.50")
assert val == 42.5
assert cur is None
def test_no_match(self):
val, cur, raw = parse_price("no price here")
assert val is None
assert cur is None
def test_empty_string(self):
val, cur, raw = parse_price("")
assert val is None
assert raw == ""
def test_none_input(self):
val, cur, raw = parse_price(None)
assert val is None
assert raw == ""
def test_thousands_comma_stripped(self):
val, cur, raw = parse_price("£1,000.50")
assert val == 1000.5
def test_whitespace_around_symbol(self):
val, cur, raw = parse_price("£ 5.00")
assert val == 5.0
assert cur == "GBP"
def test_raw_preserved(self):
_, _, raw = parse_price(" £10.00 ")
assert raw == "£10.00"
class TestParseCaseSize:
def test_standard(self):
count, qty, unit, _ = parse_case_size("6 x 500g")
assert count == 6
assert qty == 500.0
assert unit == "g"
def test_no_space(self):
count, qty, unit, _ = parse_case_size("12x1L")
assert count == 12
assert qty == 1.0
assert unit == "L"
def test_multiplication_sign(self):
count, qty, unit, _ = parse_case_size("24 × 330 ml")
assert count == 24
assert qty == 330.0
assert unit == "ml"
def test_uppercase_x(self):
count, qty, unit, _ = parse_case_size("6X500g")
assert count == 6
def test_no_match(self):
count, qty, unit, raw = parse_case_size("just text")
assert count is None
assert qty is None
assert unit is None
def test_empty(self):
count, qty, unit, raw = parse_case_size("")
assert count is None
assert raw == ""
def test_none_input(self):
count, _, _, raw = parse_case_size(None)
assert count is None
assert raw == ""

View File

@@ -0,0 +1,58 @@
"""Unit tests for product registry utilities."""
from __future__ import annotations
import pytest
from market.scrape.product.registry import merge_missing
class TestMergeMissing:
def test_fills_empty_dict(self):
dst = {}
merge_missing(dst, {"a": 1, "b": "hello"})
assert dst == {"a": 1, "b": "hello"}
def test_existing_value_not_overwritten(self):
dst = {"a": "original"}
merge_missing(dst, {"a": "new"})
assert dst["a"] == "original"
def test_none_overwritten(self):
dst = {"a": None}
merge_missing(dst, {"a": "filled"})
assert dst["a"] == "filled"
def test_empty_string_overwritten(self):
dst = {"a": ""}
merge_missing(dst, {"a": "filled"})
assert dst["a"] == "filled"
def test_empty_list_overwritten(self):
dst = {"a": []}
merge_missing(dst, {"a": [1, 2]})
assert dst["a"] == [1, 2]
def test_empty_dict_overwritten(self):
dst = {"a": {}}
merge_missing(dst, {"a": {"key": "val"}})
assert dst["a"] == {"key": "val"}
def test_zero_not_overwritten(self):
dst = {"a": 0}
merge_missing(dst, {"a": 42})
assert dst["a"] == 0
def test_false_not_overwritten(self):
dst = {"a": False}
merge_missing(dst, {"a": True})
assert dst["a"] is False
def test_none_src(self):
dst = {"a": 1}
merge_missing(dst, None)
assert dst == {"a": 1}
def test_new_keys_added(self):
dst = {"a": 1}
merge_missing(dst, {"b": 2})
assert dst == {"a": 1, "b": 2}

View File

@@ -0,0 +1,45 @@
"""Unit tests for market slug helpers."""
from __future__ import annotations
import pytest
from market.bp.browse.services.slugs import (
product_slug_from_href, canonical_html_slug,
)
class TestProductSlugFromHref:
def test_html_extension(self):
result = product_slug_from_href("https://site.com/foo/bar-thing.html")
assert result == "bar-thing-html"
def test_htm_extension(self):
result = product_slug_from_href("https://site.com/products/widget.htm")
assert result == "widget-html"
def test_no_extension(self):
result = product_slug_from_href("https://site.com/item/cool-product")
assert result == "cool-product-html"
def test_empty_path(self):
result = product_slug_from_href("https://site.com/")
assert result == ""
def test_already_has_html_suffix_in_slug(self):
result = product_slug_from_href("https://site.com/prod/item-html.html")
assert result == "item-html"
class TestCanonicalHtmlSlug:
def test_adds_html_suffix(self):
assert canonical_html_slug("product-name") == "product-name-html"
def test_idempotent(self):
assert canonical_html_slug("product-name-html") == "product-name-html"
def test_double_html_kept(self):
# canonical_html_slug only appends -html if not already present
assert canonical_html_slug("product-name-html-html") == "product-name-html-html"
def test_strips_htm(self):
assert canonical_html_slug("product-name-htm") == "product-name-html"

0
orders/tests/__init__.py Normal file
View File

View File

@@ -0,0 +1,32 @@
"""Unit tests for orders sexp component helpers."""
from __future__ import annotations
import pytest
from orders.sexp.sexp_components import _status_pill_cls
class TestStatusPillCls:
def test_paid(self):
result = _status_pill_cls("paid")
assert "emerald" in result
def test_Paid_uppercase(self):
result = _status_pill_cls("Paid")
assert "emerald" in result
def test_failed(self):
result = _status_pill_cls("failed")
assert "rose" in result
def test_cancelled(self):
result = _status_pill_cls("cancelled")
assert "rose" in result
def test_pending(self):
result = _status_pill_cls("pending")
assert "stone" in result
def test_unknown(self):
result = _status_pill_cls("refunded")
assert "stone" in result

View File

View File

@@ -0,0 +1,66 @@
"""Unit tests for relations serialization."""
from __future__ import annotations
from types import SimpleNamespace
import pytest
from relations.bp.data.routes import _serialize_rel
class TestSerializeRel:
def test_all_fields(self):
rel = SimpleNamespace(
id=1,
parent_type="page",
parent_id=100,
child_type="calendar",
child_id=200,
sort_order=0,
label="Main Calendar",
relation_type="container",
metadata_={"key": "val"},
)
result = _serialize_rel(rel)
assert result == {
"id": 1,
"parent_type": "page",
"parent_id": 100,
"child_type": "calendar",
"child_id": 200,
"sort_order": 0,
"label": "Main Calendar",
"relation_type": "container",
"metadata": {"key": "val"},
}
def test_none_optionals(self):
rel = SimpleNamespace(
id=2,
parent_type="post",
parent_id=1,
child_type="market",
child_id=1,
sort_order=1,
label=None,
relation_type=None,
metadata_=None,
)
result = _serialize_rel(rel)
assert result["label"] is None
assert result["relation_type"] is None
assert result["metadata"] is None
def test_dict_structure(self):
rel = SimpleNamespace(
id=3, parent_type="a", parent_id=1,
child_type="b", child_id=2,
sort_order=0, label="", relation_type="link",
metadata_={},
)
result = _serialize_rel(rel)
assert set(result.keys()) == {
"id", "parent_type", "parent_id",
"child_type", "child_id", "sort_order",
"label", "relation_type", "metadata",
}

View File

@@ -14,6 +14,7 @@ from quart import request, g, current_app
from shared.config import config
from shared.utils import host_url
from shared.browser.app.utils import current_route_relative_path
from shared.infrastructure.urls import blog_url, market_url, cart_url, events_url
def _qs_filter_fn():
@@ -102,4 +103,9 @@ async def base_context() -> dict:
"base_title": config()["title"],
"hx_select": hx_select,
"hx_select_search": hx_select_search,
"blog_url": blog_url,
"market_url": market_url,
"cart_url": cart_url,
"events_url": events_url,
"rights": getattr(g, "rights", {}),
}

View File

@@ -125,8 +125,9 @@ def create_base_app(
errors(app)
# Auto-register OAuth client blueprint for non-account apps
# (account is the OAuth authorization server)
if name != "account":
# (account is the OAuth authorization server; test is a public dashboard)
_NO_OAUTH = {"account", "test"}
if name not in _NO_OAUTH:
from shared.infrastructure.oauth import create_oauth_blueprint
app.register_blueprint(create_oauth_blueprint(name))
@@ -173,7 +174,7 @@ def create_base_app(
# Auth state check via grant verification + silent OAuth handshake
# MUST run before _load_user so stale sessions are cleared first
if name != "account":
if name not in _NO_OAUTH:
@app.before_request
async def _check_auth_state():
from quart import session as qs

View File

@@ -8,6 +8,8 @@ from __future__ import annotations
from typing import Any
from markupsafe import escape
from .jinja_bridge import render
from .page import SEARCH_HEADERS_MOBILE, SEARCH_HEADERS_DESKTOP
@@ -31,6 +33,9 @@ def get_asset_url(ctx: dict) -> str:
def root_header_html(ctx: dict, *, oob: bool = False) -> str:
"""Build the root header row HTML."""
rights = ctx.get("rights") or {}
is_admin = rights.get("admin") if isinstance(rights, dict) else getattr(rights, "admin", False)
settings_url = call_url(ctx, "blog_url", "/settings/") if is_admin else ""
return render(
"header-row",
cart_mini_html=ctx.get("cart_mini_html", ""),
@@ -40,6 +45,8 @@ def root_header_html(ctx: dict, *, oob: bool = False) -> str:
nav_tree_html=ctx.get("nav_tree_html", ""),
auth_menu_html=ctx.get("auth_menu_html", ""),
nav_panel_html=ctx.get("nav_panel_html", ""),
settings_url=settings_url,
is_admin=is_admin,
oob=oob,
)
@@ -87,9 +94,31 @@ def post_header_html(ctx: dict, *, oob: bool = False) -> str:
container_nav = ctx.get("container_nav_html", "")
if container_nav:
nav_parts.append(container_nav)
nav_parts.append(
'<div class="flex flex-col sm:flex-row sm:items-center gap-2'
' border-r border-stone-200 mr-2 sm:max-w-2xl"'
' id="entries-calendars-nav-wrapper">'
f'{container_nav}</div>'
)
# Admin cog — external link to blog admin (generic across all services)
admin_nav = ctx.get("post_admin_nav_html", "")
if not admin_nav:
rights = ctx.get("rights") or {}
has_admin = rights.get("admin") if isinstance(rights, dict) else getattr(rights, "admin", False)
if has_admin and slug:
from quart import request
admin_href = call_url(ctx, "blog_url", f"/{slug}/admin/")
is_admin_page = "/admin" in request.path
sel_cls = "!bg-stone-500 !text-white" if is_admin_page else ""
base_cls = ("justify-center cursor-pointer flex flex-row"
" items-center gap-2 rounded bg-stone-200 text-black p-3")
admin_nav = (
f'<div class="relative nav-group">'
f'<a href="{escape(admin_href)}"'
f' class="{base_cls} {sel_cls}">'
f'<i class="fa fa-cog" aria-hidden="true"></i></a></div>'
)
if admin_nav:
nav_parts.append(admin_nav)
@@ -104,6 +133,62 @@ def post_header_html(ctx: dict, *, oob: bool = False) -> str:
)
def post_admin_header_html(ctx: dict, slug: str, *, oob: bool = False,
selected: str = "", admin_href: str = "") -> str:
"""Shared post admin header row with unified nav across all services.
Shows: calendars | markets | payments | entries | data | edit | settings
All links are external (cross-service). The *selected* item is
highlighted on the nav and shown in white next to the admin label.
"""
# Label: shield icon + "admin" + optional selected sub-page in white
label_html = '<i class="fa fa-shield-halved" aria-hidden="true"></i> admin'
if selected:
label_html += f' <span class="text-white">{escape(selected)}</span>'
# Nav items — all external links to the appropriate service
select_colours = ctx.get("select_colours", "")
base_cls = ("justify-center cursor-pointer flex flex-row items-center"
" gap-2 rounded bg-stone-200 text-black p-3")
selected_cls = ("justify-center cursor-pointer flex flex-row items-center"
" gap-2 rounded !bg-stone-500 !text-white p-3")
nav_parts: list[str] = []
items = [
("events_url", f"/{slug}/admin/", "calendars"),
("market_url", f"/{slug}/admin/", "markets"),
("cart_url", f"/{slug}/admin/payments/", "payments"),
("blog_url", f"/{slug}/admin/entries/", "entries"),
("blog_url", f"/{slug}/admin/data/", "data"),
("blog_url", f"/{slug}/admin/edit/", "edit"),
("blog_url", f"/{slug}/admin/settings/", "settings"),
]
for url_key, path, label in items:
url_fn = ctx.get(url_key)
if not callable(url_fn):
continue
href = url_fn(path)
is_sel = label == selected
cls = selected_cls if is_sel else base_cls
aria = ' aria-selected="true"' if is_sel else ""
nav_parts.append(
f'<div class="relative nav-group">'
f'<a href="{escape(href)}"{aria}'
f' class="{cls} {escape(select_colours)}">'
f'{escape(label)}</a></div>'
)
nav_html = "".join(nav_parts)
if not admin_href:
blog_fn = ctx.get("blog_url")
admin_href = blog_fn(f"/{slug}/admin/") if callable(blog_fn) else f"/{slug}/admin/"
return render("menu-row",
id="post-admin-row", level=2,
link_href=admin_href, link_label_html=label_html,
nav_html=nav_html, child_id="post-admin-header-child", oob=oob,
)
def oob_header_html(parent_id: str, child_id: str, row_html: str) -> str:
"""Wrap a header row in an OOB swap div with child placeholder."""
return render("oob-header",

View File

@@ -107,9 +107,7 @@
(when cart-mini-html (raw! cart-mini-html))
(div :class "font-bold text-5xl flex-1"
(a :href (or blog-url "/") :class "flex justify-center md:justify-start items-baseline gap-2"
(h1 (or site-title ""))
(when app-label
(span :class "!text-2xl font-normal text-white" app-label))))
(h1 (or site-title ""))))
(nav :class "hidden md:flex gap-4 text-sm ml-2 justify-end items-center flex-0"
(when nav-tree-html (raw! nav-tree-html))
(when auth-menu-html (raw! auth-menu-html))

View File

@@ -0,0 +1,5 @@
"""Deliberate failing test to verify test dashboard shows failures."""
def test_this_should_fail():
assert 1 == 2, "Deliberate failure to test dashboard display"

View File

@@ -3,6 +3,7 @@ import path_setup # noqa: F401
import sexp.sexp_components as sexp_components # noqa: F401
from shared.infrastructure.factory import create_base_app
from shared.sexp.jinja_bridge import render
from bp import register_dashboard
from services import register_domain_services
@@ -14,7 +15,15 @@ async def test_context() -> dict:
ctx = await base_context()
ctx["menu_items"] = []
ctx["cart_mini_html"] = ""
# Render cart-mini with cart_count=0 to show the logo image
blog_url = ctx.get("blog_url", "")
if callable(blog_url):
blog_url_str = blog_url("")
else:
blog_url_str = str(blog_url or "")
ctx["cart_mini_html"] = render(
"cart-mini", cart_count=0, blog_url=blog_url_str, cart_url="",
)
ctx["auth_menu_html"] = ""
ctx["nav_tree_html"] = ""
return ctx

View File

@@ -21,8 +21,14 @@ def register(url_prefix: str = "/") -> Blueprint:
result = runner.get_results()
running = runner.is_running()
csrf = generate_csrf_token()
active_filter = request.args.get("filter")
active_service = request.args.get("service")
html = await render_dashboard_page(ctx, result, running, csrf)
html = await render_dashboard_page(
ctx, result, running, csrf,
active_filter=active_filter,
active_service=active_service,
)
return await make_response(html, 200)
@bp.post("/run")
@@ -52,8 +58,14 @@ def register(url_prefix: str = "/") -> Blueprint:
result = runner.get_results()
running = runner.is_running()
csrf = generate_csrf_token()
active_filter = request.args.get("filter")
active_service = request.args.get("service")
html = await render_results_partial(result, running, csrf)
html = await render_results_partial(
result, running, csrf,
active_filter=active_filter,
active_service=active_service,
)
resp = Response(html, status=200, content_type="text/html")
# If still running, tell HTMX to keep polling

View File

@@ -4,7 +4,9 @@ from __future__ import annotations
import asyncio
import json
import logging
import os
import time
from collections import OrderedDict
from pathlib import Path
log = logging.getLogger(__name__)
@@ -13,13 +15,81 @@ log = logging.getLogger(__name__)
_last_result: dict | None = None
_running: bool = False
# Paths to test directories (relative to /app in Docker)
_TEST_DIRS = [
"shared/tests/",
"shared/sexp/tests/",
# Each service group runs in its own pytest subprocess with its own PYTHONPATH
_SERVICE_GROUPS: list[dict] = [
{"name": "shared", "dirs": ["shared/tests/", "shared/sexp/tests/"],
"pythonpath": None},
{"name": "blog", "dirs": ["blog/tests/"], "pythonpath": "/app/blog"},
{"name": "market", "dirs": ["market/tests/"], "pythonpath": "/app/market"},
{"name": "cart", "dirs": ["cart/tests/"], "pythonpath": "/app/cart"},
{"name": "events", "dirs": ["events/tests/"], "pythonpath": "/app/events"},
{"name": "account", "dirs": ["account/tests/"], "pythonpath": "/app/account"},
{"name": "orders", "dirs": ["orders/tests/"], "pythonpath": "/app/orders"},
{"name": "federation", "dirs": ["federation/tests/"],
"pythonpath": "/app/federation"},
{"name": "relations", "dirs": ["relations/tests/"],
"pythonpath": "/app/relations"},
{"name": "likes", "dirs": ["likes/tests/"], "pythonpath": "/app/likes"},
]
_REPORT_PATH = "/tmp/test-report.json"
_SERVICE_ORDER = [g["name"] for g in _SERVICE_GROUPS]
_REPORT_PATH = "/tmp/test-report-{}.json"
def _parse_report(path: str) -> tuple[list[dict], dict]:
"""Parse a pytest-json-report file."""
rp = Path(path)
if not rp.exists():
return [], {}
try:
report = json.loads(rp.read_text())
except (json.JSONDecodeError, OSError):
return [], {}
summary = report.get("summary", {})
tests_raw = report.get("tests", [])
tests = []
for t in tests_raw:
tests.append({
"nodeid": t.get("nodeid", ""),
"outcome": t.get("outcome", "unknown"),
"duration": round(t.get("duration", 0), 4),
"longrepr": (t.get("call", {}) or {}).get("longrepr", ""),
})
return tests, summary
async def _run_group(group: dict) -> tuple[list[dict], dict, str]:
"""Run pytest for a single service group."""
existing = [d for d in group["dirs"] if Path(f"/app/{d}").is_dir()]
if not existing:
return [], {}, ""
report_file = _REPORT_PATH.format(group["name"])
cmd = [
"python3", "-m", "pytest",
*existing,
"--json-report",
f"--json-report-file={report_file}",
"-q",
"--tb=short",
]
env = {**os.environ}
if group["pythonpath"]:
env["PYTHONPATH"] = group["pythonpath"] + ":" + env.get("PYTHONPATH", "")
proc = await asyncio.create_subprocess_exec(
*cmd,
stdout=asyncio.subprocess.PIPE,
stderr=asyncio.subprocess.STDOUT,
cwd="/app",
env=env,
)
stdout, _ = await proc.communicate()
stdout_str = (stdout or b"").decode("utf-8", errors="replace")
tests, summary = _parse_report(report_file)
return tests, summary, stdout_str
async def run_tests() -> dict:
@@ -33,74 +103,48 @@ async def run_tests() -> dict:
started_at = time.time()
try:
cmd = [
"python3", "-m", "pytest",
*_TEST_DIRS,
"--json-report",
f"--json-report-file={_REPORT_PATH}",
"-q",
"--tb=short",
]
tasks = [_run_group(g) for g in _SERVICE_GROUPS]
results = await asyncio.gather(*tasks, return_exceptions=True)
all_tests: list[dict] = []
total_passed = total_failed = total_errors = total_skipped = total_count = 0
all_stdout: list[str] = []
for i, res in enumerate(results):
if isinstance(res, Exception):
log.error("Group %s failed: %s", _SERVICE_GROUPS[i]["name"], res)
continue
tests, summary, stdout_str = res
all_tests.extend(tests)
total_passed += summary.get("passed", 0)
total_failed += summary.get("failed", 0)
total_errors += summary.get("error", 0)
total_skipped += summary.get("skipped", 0)
total_count += summary.get("total", len(tests))
if stdout_str.strip():
all_stdout.append(stdout_str)
proc = await asyncio.create_subprocess_exec(
*cmd,
stdout=asyncio.subprocess.PIPE,
stderr=asyncio.subprocess.STDOUT,
cwd="/app",
)
stdout, _ = await proc.communicate()
finished_at = time.time()
# Parse JSON report
report_path = Path(_REPORT_PATH)
if report_path.exists():
try:
report = json.loads(report_path.read_text())
except (json.JSONDecodeError, OSError):
report = {}
else:
report = {}
summary = report.get("summary", {})
tests_raw = report.get("tests", [])
tests = []
for t in tests_raw:
tests.append({
"nodeid": t.get("nodeid", ""),
"outcome": t.get("outcome", "unknown"),
"duration": round(t.get("duration", 0), 4),
"longrepr": (t.get("call", {}) or {}).get("longrepr", ""),
})
passed = summary.get("passed", 0)
failed = summary.get("failed", 0)
errors = summary.get("error", 0)
skipped = summary.get("skipped", 0)
total = summary.get("total", len(tests))
if failed > 0 or errors > 0:
status = "failed"
else:
status = "passed"
status = "failed" if total_failed > 0 or total_errors > 0 else "passed"
_last_result = {
"status": status,
"started_at": started_at,
"finished_at": finished_at,
"duration": round(finished_at - started_at, 2),
"passed": passed,
"failed": failed,
"errors": errors,
"skipped": skipped,
"total": total,
"tests": tests,
"stdout": (stdout or b"").decode("utf-8", errors="replace")[-5000:],
"passed": total_passed,
"failed": total_failed,
"errors": total_errors,
"skipped": total_skipped,
"total": total_count,
"tests": all_tests,
"stdout": "\n".join(all_stdout)[-5000:],
}
log.info(
"Test run complete: %s (%d passed, %d failed, %d errors, %.1fs)",
status, passed, failed, errors, _last_result["duration"],
status, total_passed, total_failed, total_errors,
_last_result["duration"],
)
return _last_result
@@ -130,6 +174,49 @@ def get_results() -> dict | None:
return _last_result
def get_test(nodeid: str) -> dict | None:
"""Look up a single test by nodeid."""
if not _last_result:
return None
for t in _last_result["tests"]:
if t["nodeid"] == nodeid:
return t
return None
def is_running() -> bool:
"""Check if tests are currently running."""
return _running
def _service_from_nodeid(nodeid: str) -> str:
"""Extract service name from a test nodeid."""
parts = nodeid.split("/")
return parts[0] if len(parts) >= 2 else "other"
def group_tests_by_service(tests: list[dict]) -> list[dict]:
"""Group tests into ordered sections by service."""
buckets: dict[str, list[dict]] = OrderedDict()
for svc in _SERVICE_ORDER:
buckets[svc] = []
for t in tests:
svc = _service_from_nodeid(t["nodeid"])
if svc not in buckets:
buckets[svc] = []
buckets[svc].append(t)
sections = []
for svc, svc_tests in buckets.items():
if not svc_tests:
continue
sections.append({
"service": svc,
"tests": svc_tests,
"total": len(svc_tests),
"passed": sum(1 for t in svc_tests if t["outcome"] == "passed"),
"failed": sum(1 for t in svc_tests if t["outcome"] == "failed"),
"errors": sum(1 for t in svc_tests if t["outcome"] == "error"),
"skipped": sum(1 for t in svc_tests if t["outcome"] == "skipped"),
})
return sections

View File

@@ -17,34 +17,59 @@
:disabled (if running "true" nil)
(if running "Running..." "Run Tests"))))
(defcomp ~test-summary (&key status passed failed errors skipped total duration last-run running csrf)
(defcomp ~test-filter-card (&key href label count colour-border colour-bg colour-text active)
(a :href href
:hx-get href
:hx-target "#main-panel"
:hx-select "#main-panel"
:hx-swap "outerHTML"
:hx-push-url "true"
:class (str "block rounded border p-3 text-center transition-colors no-underline hover:opacity-80 "
colour-border " " colour-bg " "
(if active "ring-2 ring-offset-1 ring-stone-500 " ""))
(div :class (str "text-3xl font-bold " colour-text) count)
(div :class (str "text-sm " colour-text) label)))
(defcomp ~test-summary (&key status passed failed errors skipped total duration last-run running csrf active-filter)
(div :class "space-y-4"
(div :class "flex items-center justify-between flex-wrap gap-3"
(div :class "flex items-center gap-3"
(h2 :class "text-lg font-semibold text-stone-800" "Test Results")
(h2 :class "text-2xl font-semibold text-stone-800" "Test Results")
(when status (~test-status-badge :status status)))
(~test-run-button :running running :csrf csrf))
(when status
(div :class "grid grid-cols-2 sm:grid-cols-3 md:grid-cols-6 gap-3"
(div :class "rounded border border-stone-200 bg-white p-3 text-center"
(div :class "text-2xl font-bold text-stone-800" total)
(div :class "text-xs text-stone-500" "Total"))
(div :class "rounded border border-emerald-200 bg-emerald-50 p-3 text-center"
(div :class "text-2xl font-bold text-emerald-700" passed)
(div :class "text-xs text-emerald-600" "Passed"))
(div :class "rounded border border-rose-200 bg-rose-50 p-3 text-center"
(div :class "text-2xl font-bold text-rose-700" failed)
(div :class "text-xs text-rose-600" "Failed"))
(div :class "rounded border border-orange-200 bg-orange-50 p-3 text-center"
(div :class "text-2xl font-bold text-orange-700" errors)
(div :class "text-xs text-orange-600" "Errors"))
(div :class "rounded border border-sky-200 bg-sky-50 p-3 text-center"
(div :class "text-2xl font-bold text-sky-700" skipped)
(div :class "text-xs text-sky-600" "Skipped"))
(div :class "rounded border border-stone-200 bg-white p-3 text-center"
(div :class "text-2xl font-bold text-stone-800" (str duration "s"))
(div :class "text-xs text-stone-500" "Duration")))
(div :class "text-xs text-stone-400" (str "Last run: " last-run)))))
(~test-filter-card :href "/" :label "Total" :count total
:colour-border "border-stone-200" :colour-bg "bg-white"
:colour-text "text-stone-800"
:active (if (= active-filter nil) "true" nil))
(~test-filter-card :href "/?filter=passed" :label "Passed" :count passed
:colour-border "border-emerald-200" :colour-bg "bg-emerald-50"
:colour-text "text-emerald-700"
:active (if (= active-filter "passed") "true" nil))
(~test-filter-card :href "/?filter=failed" :label "Failed" :count failed
:colour-border "border-rose-200" :colour-bg "bg-rose-50"
:colour-text "text-rose-700"
:active (if (= active-filter "failed") "true" nil))
(~test-filter-card :href "/?filter=errors" :label "Errors" :count errors
:colour-border "border-orange-200" :colour-bg "bg-orange-50"
:colour-text "text-orange-700"
:active (if (= active-filter "errors") "true" nil))
(~test-filter-card :href "/?filter=skipped" :label "Skipped" :count skipped
:colour-border "border-sky-200" :colour-bg "bg-sky-50"
:colour-text "text-sky-700"
:active (if (= active-filter "skipped") "true" nil))
(~test-filter-card :href "/" :label "Duration" :count (str duration "s")
:colour-border "border-stone-200" :colour-bg "bg-white"
:colour-text "text-stone-800" :active nil))
(div :class "text-sm text-stone-400" (str "Last run: " last-run)))))
(defcomp ~test-service-header (&key service total passed failed)
(tr :class "border-b-2 border-stone-300 bg-stone-100"
(td :class "px-3 py-2 text-sm font-bold text-stone-700" :colspan "4"
(span service)
(span :class "ml-2 text-xs font-normal text-stone-500"
(str total " tests, " passed " passed, " failed " failed")))))
(defcomp ~test-row (&key nodeid outcome duration longrepr)
(tr :class (str "border-b border-stone-100 "
@@ -52,16 +77,16 @@
(if (= outcome "failed") "bg-rose-50"
(if (= outcome "skipped") "bg-sky-50"
"bg-orange-50"))))
(td :class "px-3 py-2 text-xs font-mono text-stone-700 max-w-0 truncate" :title nodeid nodeid)
(td :class "px-3 py-2 text-sm font-mono text-stone-700 max-w-0 truncate" :title nodeid nodeid)
(td :class "px-3 py-2 text-center"
(span :class (str "inline-flex items-center rounded-full border px-2 py-0.5 text-[11px] font-medium "
(span :class (str "inline-flex items-center rounded-full border px-2 py-0.5 text-xs font-medium "
(if (= outcome "passed") "border-emerald-300 bg-emerald-50 text-emerald-700"
(if (= outcome "failed") "border-rose-300 bg-rose-50 text-rose-700"
(if (= outcome "skipped") "border-sky-300 bg-sky-50 text-sky-700"
"border-orange-300 bg-orange-50 text-orange-700"))))
outcome))
(td :class "px-3 py-2 text-right text-xs text-stone-500 tabular-nums" (str duration "s"))
(td :class "px-3 py-2 text-xs text-rose-600 font-mono max-w-xs truncate" :title longrepr
(td :class "px-3 py-2 text-right text-sm text-stone-500 tabular-nums" (str duration "s"))
(td :class "px-3 py-2 text-sm text-rose-600 font-mono max-w-xs truncate" :title longrepr
(when longrepr longrepr))))
(defcomp ~test-results-table (&key rows-html has-failures)
@@ -69,11 +94,11 @@
(table :class "w-full text-left"
(thead
(tr :class "border-b border-stone-200 bg-stone-50"
(th :class "px-3 py-2 text-xs font-medium text-stone-600" "Test")
(th :class "px-3 py-2 text-sm font-medium text-stone-600" "Test")
(th :class "px-3 py-2 text-xs font-medium text-stone-600 text-center w-24" "Status")
(th :class "px-3 py-2 text-xs font-medium text-stone-600 text-right w-20" "Time")
(th :class "px-3 py-2 text-xs font-medium text-stone-600 w-48" "Error")))
(tbody rows-html))))
(tbody (raw! rows-html)))))
(defcomp ~test-running-indicator ()
(div :class "flex items-center justify-center py-12 text-stone-500"
@@ -85,4 +110,4 @@
(div :class "flex items-center justify-center py-12 text-stone-400"
(div :class "text-center"
(div :class "text-4xl mb-2" "?")
(div :class "text-sm" "No test results yet. Click \"Run Tests\" to start."))))
(div :class "text-sm" "No test results yet. Click Run Tests to start."))))

View File

@@ -18,6 +18,65 @@ def _format_time(ts: float | None) -> str:
return datetime.fromtimestamp(ts).strftime("%-d %b %Y, %H:%M:%S")
# ---------------------------------------------------------------------------
# Menu / header
# ---------------------------------------------------------------------------
_FILTER_MAP = {
"passed": "passed",
"failed": "failed",
"errors": "error",
"skipped": "skipped",
}
def _test_header_html(ctx: dict, active_service: str | None = None) -> str:
"""Build the Tests menu-row (level 1) with service nav links."""
nav = _service_nav_html(ctx, active_service)
return render(
"menu-row",
id="test-row", level=1, colour="sky",
link_href="/", link_label="Tests", icon="fa fa-flask",
nav_html=nav,
child_id="test-header-child",
)
def _service_nav_html(ctx: dict, active_service: str | None = None) -> str:
"""Render service filter nav links using ~nav-link component."""
from runner import _SERVICE_ORDER
parts = []
# "All" link
parts.append(render(
"nav-link",
href="/",
label="all",
is_selected="true" if not active_service else None,
select_colours="aria-selected:bg-sky-200 aria-selected:text-sky-900",
))
for svc in _SERVICE_ORDER:
parts.append(render(
"nav-link",
href=f"/?service={svc}",
label=svc,
is_selected="true" if active_service == svc else None,
select_colours="aria-selected:bg-sky-200 aria-selected:text-sky-900",
))
return "".join(parts)
def _header_stack_html(ctx: dict, active_service: str | None = None) -> str:
"""Full header stack: root row + tests child row."""
hdr = root_header_html(ctx)
inner = _test_header_html(ctx, active_service)
hdr += render("header-child", inner_html=inner)
return hdr
# ---------------------------------------------------------------------------
# Test rows / grouping
# ---------------------------------------------------------------------------
def _test_rows_html(tests: list[dict]) -> str:
"""Render all test result rows."""
parts = []
@@ -32,7 +91,43 @@ def _test_rows_html(tests: list[dict]) -> str:
return "".join(parts)
def _results_partial_html(result: dict | None, running: bool, csrf: str) -> str:
def _grouped_rows_html(tests: list[dict]) -> str:
"""Render test rows grouped by service with section headers."""
from runner import group_tests_by_service
sections = group_tests_by_service(tests)
parts = []
for sec in sections:
parts.append(render(
"test-service-header",
service=sec["service"],
total=str(sec["total"]),
passed=str(sec["passed"]),
failed=str(sec["failed"]),
))
parts.append(_test_rows_html(sec["tests"]))
return "".join(parts)
def _filter_tests(tests: list[dict], active_filter: str | None,
active_service: str | None) -> list[dict]:
"""Filter tests by outcome and/or service."""
from runner import _service_from_nodeid
filtered = tests
if active_filter and active_filter in _FILTER_MAP:
outcome = _FILTER_MAP[active_filter]
filtered = [t for t in filtered if t["outcome"] == outcome]
if active_service:
filtered = [t for t in filtered if _service_from_nodeid(t["nodeid"]) == active_service]
return filtered
# ---------------------------------------------------------------------------
# Results partial
# ---------------------------------------------------------------------------
def _results_partial_html(result: dict | None, running: bool, csrf: str,
active_filter: str | None = None,
active_service: str | None = None) -> str:
"""Render the results section (summary + table or running indicator)."""
if running and not result:
summary = render(
@@ -40,6 +135,7 @@ def _results_partial_html(result: dict | None, running: bool, csrf: str) -> str:
status="running", passed="0", failed="0", errors="0",
skipped="0", total="0", duration="...",
last_run="in progress", running=True, csrf=csrf,
active_filter=active_filter,
)
return summary + render("test-running-indicator")
@@ -49,6 +145,7 @@ def _results_partial_html(result: dict | None, running: bool, csrf: str) -> str:
status=None, passed="0", failed="0", errors="0",
skipped="0", total="0", duration="0",
last_run="never", running=running, csrf=csrf,
active_filter=active_filter,
)
return summary + render("test-no-results")
@@ -65,17 +162,19 @@ def _results_partial_html(result: dict | None, running: bool, csrf: str) -> str:
last_run=_format_time(result["finished_at"]) if not running else "in progress",
running=running,
csrf=csrf,
active_filter=active_filter,
)
if running:
return summary + render("test-running-indicator")
tests = result.get("tests", [])
tests = _filter_tests(tests, active_filter, active_service)
if not tests:
return summary + render("test-no-results")
has_failures = result["failed"] > 0 or result["errors"] > 0
rows = _test_rows_html(tests)
rows = _grouped_rows_html(tests)
table = render("test-results-table", rows_html=rows,
has_failures=str(has_failures).lower())
return summary + table
@@ -90,16 +189,20 @@ def _wrap_results_div(inner_html: str, running: bool) -> str:
async def render_dashboard_page(ctx: dict, result: dict | None,
running: bool, csrf: str) -> str:
running: bool, csrf: str,
active_filter: str | None = None,
active_service: str | None = None) -> str:
"""Full page: test dashboard."""
hdr = root_header_html(ctx)
inner = _results_partial_html(result, running, csrf)
hdr = _header_stack_html(ctx, active_service)
inner = _results_partial_html(result, running, csrf, active_filter, active_service)
content = _wrap_results_div(inner, running)
return full_page(ctx, header_rows_html=hdr, content_html=content)
async def render_results_partial(result: dict | None, running: bool,
csrf: str) -> str:
csrf: str,
active_filter: str | None = None,
active_service: str | None = None) -> str:
"""HTMX partial: just the results section (wrapped in polling div)."""
inner = _results_partial_html(result, running, csrf)
inner = _results_partial_html(result, running, csrf, active_filter, active_service)
return _wrap_results_div(inner, running)