Compare commits
3 Commits
6d43404b12
...
1559c5c931
| Author | SHA1 | Date | |
|---|---|---|---|
| 1559c5c931 | |||
| 00efbc2a35 | |||
| 6c44a5f3d0 |
@@ -511,17 +511,13 @@ async def render_overview_page(ctx: dict, page_groups: list) -> str:
|
||||
"""Full page: cart overview."""
|
||||
main = _overview_main_panel_html(page_groups, ctx)
|
||||
hdr = root_header_html(ctx)
|
||||
hdr += render("cart-header-child", inner_html=_cart_header_html(ctx))
|
||||
return full_page(ctx, header_rows_html=hdr, content_html=main)
|
||||
|
||||
|
||||
async def render_overview_oob(ctx: dict, page_groups: list) -> str:
|
||||
"""OOB response for cart overview."""
|
||||
main = _overview_main_panel_html(page_groups, ctx)
|
||||
oobs = (
|
||||
_cart_header_html(ctx, oob=True)
|
||||
+ root_header_html(ctx, oob=True)
|
||||
)
|
||||
oobs = root_header_html(ctx, oob=True)
|
||||
return oob_page(ctx, oobs_html=oobs, content_html=main)
|
||||
|
||||
|
||||
@@ -717,7 +713,6 @@ def _checkout_error_content_html(error: str | None, order: Any | None) -> str:
|
||||
async def render_checkout_error_page(ctx: dict, error: str | None = None, order: Any | None = None) -> str:
|
||||
"""Full page: checkout error."""
|
||||
hdr = root_header_html(ctx)
|
||||
hdr += render("cart-header-child", inner_html=_cart_header_html(ctx))
|
||||
filt = _checkout_error_filter_html()
|
||||
content = _checkout_error_content_html(error, order)
|
||||
return full_page(ctx, header_rows_html=hdr, filter_html=filt, content_html=content)
|
||||
|
||||
@@ -2,7 +2,7 @@
|
||||
set -euo pipefail
|
||||
|
||||
REGISTRY="registry.rose-ash.com:5000"
|
||||
APPS="blog market cart events federation account relations likes orders"
|
||||
APPS="blog market cart events federation account relations likes orders test"
|
||||
|
||||
usage() {
|
||||
echo "Usage: deploy.sh [app ...]"
|
||||
|
||||
4
dev.sh
4
dev.sh
@@ -20,8 +20,8 @@ case "${1:-up}" in
|
||||
shift
|
||||
$COMPOSE logs -f "$@"
|
||||
;;
|
||||
test)
|
||||
# One-shot: all unit tests
|
||||
test-run)
|
||||
# One-shot: all unit tests (headless, no dashboard)
|
||||
$COMPOSE run --rm test-unit python -m pytest \
|
||||
shared/ artdag/core/tests/ artdag/core/artdag/sexp/ \
|
||||
artdag/l1/tests/ artdag/l1/sexp_effects/ \
|
||||
|
||||
@@ -351,6 +351,42 @@ services:
|
||||
- ./account/__init__.py:/app/account/__init__.py:ro
|
||||
- ./account/models:/app/account/models:ro
|
||||
|
||||
test:
|
||||
restart: unless-stopped
|
||||
ports:
|
||||
- "8011:8000"
|
||||
environment:
|
||||
<<: *dev-env
|
||||
volumes:
|
||||
- /root/rose-ash/_config/app-config.yaml:/app/config/app-config.yaml:ro
|
||||
- ./shared:/app/shared
|
||||
- ./test/app.py:/app/app.py
|
||||
- ./test/sexp:/app/sexp
|
||||
- ./test/bp:/app/bp
|
||||
- ./test/services:/app/services
|
||||
- ./test/runner.py:/app/runner.py
|
||||
- ./test/path_setup.py:/app/path_setup.py
|
||||
- ./test/entrypoint.sh:/usr/local/bin/entrypoint.sh
|
||||
# sibling models
|
||||
- ./blog/__init__.py:/app/blog/__init__.py:ro
|
||||
- ./blog/models:/app/blog/models:ro
|
||||
- ./market/__init__.py:/app/market/__init__.py:ro
|
||||
- ./market/models:/app/market/models:ro
|
||||
- ./cart/__init__.py:/app/cart/__init__.py:ro
|
||||
- ./cart/models:/app/cart/models:ro
|
||||
- ./events/__init__.py:/app/events/__init__.py:ro
|
||||
- ./events/models:/app/events/models:ro
|
||||
- ./federation/__init__.py:/app/federation/__init__.py:ro
|
||||
- ./federation/models:/app/federation/models:ro
|
||||
- ./account/__init__.py:/app/account/__init__.py:ro
|
||||
- ./account/models:/app/account/models:ro
|
||||
- ./relations/__init__.py:/app/relations/__init__.py:ro
|
||||
- ./relations/models:/app/relations/models:ro
|
||||
- ./likes/__init__.py:/app/likes/__init__.py:ro
|
||||
- ./likes/models:/app/likes/models:ro
|
||||
- ./orders/__init__.py:/app/orders/__init__.py:ro
|
||||
- ./orders/models:/app/orders/models:ro
|
||||
|
||||
test-unit:
|
||||
build:
|
||||
context: .
|
||||
|
||||
@@ -35,6 +35,7 @@ x-app-env: &app-env
|
||||
APP_URL_ORDERS: https://orders.rose-ash.com
|
||||
APP_URL_RELATIONS: http://relations:8000
|
||||
APP_URL_LIKES: http://likes:8000
|
||||
APP_URL_TEST: https://test.rose-ash.com
|
||||
APP_URL_ARTDAG: https://celery-artdag.rose-ash.com
|
||||
APP_URL_ARTDAG_L2: https://artdag.rose-ash.com
|
||||
INTERNAL_URL_BLOG: http://blog:8000
|
||||
@@ -46,6 +47,7 @@ x-app-env: &app-env
|
||||
INTERNAL_URL_ORDERS: http://orders:8000
|
||||
INTERNAL_URL_RELATIONS: http://relations:8000
|
||||
INTERNAL_URL_LIKES: http://likes:8000
|
||||
INTERNAL_URL_TEST: http://test:8000
|
||||
INTERNAL_URL_ARTDAG: http://l1-server:8100
|
||||
AP_DOMAIN: federation.rose-ash.com
|
||||
AP_DOMAIN_BLOG: blog.rose-ash.com
|
||||
@@ -201,6 +203,17 @@ services:
|
||||
RUN_MIGRATIONS: "true"
|
||||
WORKERS: "1"
|
||||
|
||||
test:
|
||||
<<: *app-common
|
||||
image: registry.rose-ash.com:5000/test:latest
|
||||
build:
|
||||
context: .
|
||||
dockerfile: test/Dockerfile
|
||||
environment:
|
||||
<<: *app-env
|
||||
REDIS_URL: redis://redis:6379/9
|
||||
WORKERS: "1"
|
||||
|
||||
db:
|
||||
image: postgres:16
|
||||
environment:
|
||||
|
||||
@@ -98,6 +98,7 @@ async def base_context() -> dict:
|
||||
"qs_filter": _qs_filter_fn(),
|
||||
"print": print,
|
||||
"base_url": base_url,
|
||||
"app_label": current_app.name,
|
||||
"base_title": config()["title"],
|
||||
"hx_select": hx_select,
|
||||
"hx_select_search": hx_select_search,
|
||||
|
||||
@@ -114,6 +114,14 @@ def create_base_app(
|
||||
setup_sexp_bridge(app)
|
||||
load_shared_components()
|
||||
load_relation_registry()
|
||||
|
||||
# Dev-mode: auto-reload sexp templates when files change on disk
|
||||
if os.getenv("RELOAD") == "true":
|
||||
from shared.sexp.jinja_bridge import reload_if_changed
|
||||
|
||||
@app.before_request
|
||||
async def _sexp_hot_reload():
|
||||
reload_if_changed()
|
||||
errors(app)
|
||||
|
||||
# Auto-register OAuth client blueprint for non-account apps
|
||||
|
||||
@@ -9,10 +9,11 @@ from __future__ import annotations
|
||||
|
||||
import os
|
||||
|
||||
from .jinja_bridge import load_sexp_dir
|
||||
from .jinja_bridge import load_sexp_dir, watch_sexp_dir
|
||||
|
||||
|
||||
def load_shared_components() -> None:
|
||||
"""Register all shared s-expression components."""
|
||||
templates_dir = os.path.join(os.path.dirname(__file__), "templates")
|
||||
load_sexp_dir(templates_dir)
|
||||
watch_sexp_dir(templates_dir)
|
||||
|
||||
@@ -36,6 +36,7 @@ def root_header_html(ctx: dict, *, oob: bool = False) -> str:
|
||||
cart_mini_html=ctx.get("cart_mini_html", ""),
|
||||
blog_url=call_url(ctx, "blog_url", ""),
|
||||
site_title=ctx.get("base_title", ""),
|
||||
app_label=ctx.get("app_label", ""),
|
||||
nav_tree_html=ctx.get("nav_tree_html", ""),
|
||||
auth_menu_html=ctx.get("auth_menu_html", ""),
|
||||
nav_panel_html=ctx.get("nav_panel_html", ""),
|
||||
|
||||
@@ -53,11 +53,49 @@ def load_sexp_dir(directory: str) -> None:
|
||||
register_components(f.read())
|
||||
|
||||
|
||||
# ---------------------------------------------------------------------------
|
||||
# Dev-mode auto-reload of sexp templates
|
||||
# ---------------------------------------------------------------------------
|
||||
|
||||
_watched_dirs: list[str] = []
|
||||
_file_mtimes: dict[str, float] = {}
|
||||
|
||||
|
||||
def watch_sexp_dir(directory: str) -> None:
|
||||
"""Register a directory for dev-mode file watching."""
|
||||
_watched_dirs.append(directory)
|
||||
# Seed mtimes
|
||||
for fp in sorted(
|
||||
glob.glob(os.path.join(directory, "*.sexp"))
|
||||
+ glob.glob(os.path.join(directory, "*.sexpr"))
|
||||
):
|
||||
_file_mtimes[fp] = os.path.getmtime(fp)
|
||||
|
||||
|
||||
def reload_if_changed() -> None:
|
||||
"""Re-read sexp files if any have changed on disk. Called per-request in dev."""
|
||||
changed = False
|
||||
for directory in _watched_dirs:
|
||||
for fp in sorted(
|
||||
glob.glob(os.path.join(directory, "*.sexp"))
|
||||
+ glob.glob(os.path.join(directory, "*.sexpr"))
|
||||
):
|
||||
mtime = os.path.getmtime(fp)
|
||||
if fp not in _file_mtimes or _file_mtimes[fp] != mtime:
|
||||
_file_mtimes[fp] = mtime
|
||||
changed = True
|
||||
if changed:
|
||||
_COMPONENT_ENV.clear()
|
||||
for directory in _watched_dirs:
|
||||
load_sexp_dir(directory)
|
||||
|
||||
|
||||
def load_service_components(service_dir: str) -> None:
|
||||
"""Load service-specific s-expression components from {service_dir}/sexp/."""
|
||||
sexp_dir = os.path.join(service_dir, "sexp")
|
||||
if os.path.isdir(sexp_dir):
|
||||
load_sexp_dir(sexp_dir)
|
||||
watch_sexp_dir(sexp_dir)
|
||||
|
||||
|
||||
def register_components(sexp_source: str) -> None:
|
||||
|
||||
@@ -96,7 +96,7 @@
|
||||
:class "w-12 h-12 rotate-180 transition-transform group-open/root:block hidden self-start"
|
||||
(path :d "M6 9l6 6 6-6" :fill "currentColor"))))
|
||||
|
||||
(defcomp ~header-row (&key cart-mini-html blog-url site-title
|
||||
(defcomp ~header-row (&key cart-mini-html blog-url site-title app-label
|
||||
nav-tree-html auth-menu-html nav-panel-html
|
||||
settings-url is-admin oob)
|
||||
(<>
|
||||
@@ -106,8 +106,10 @@
|
||||
(div :class "w-full flex flex-row items-top"
|
||||
(when cart-mini-html (raw! cart-mini-html))
|
||||
(div :class "font-bold text-5xl flex-1"
|
||||
(a :href (or blog-url "/") :class "flex justify-center md:justify-start"
|
||||
(h1 (or site-title ""))))
|
||||
(a :href (or blog-url "/") :class "flex justify-center md:justify-start items-baseline gap-2"
|
||||
(h1 (or site-title ""))
|
||||
(when app-label
|
||||
(span :class "!text-2xl font-normal text-white" app-label))))
|
||||
(nav :class "hidden md:flex gap-4 text-sm ml-2 justify-end items-center flex-0"
|
||||
(when nav-tree-html (raw! nav-tree-html))
|
||||
(when auth-menu-html (raw! auth-menu-html))
|
||||
|
||||
0
shared/tests/__init__.py
Normal file
0
shared/tests/__init__.py
Normal file
10
shared/tests/conftest.py
Normal file
10
shared/tests/conftest.py
Normal file
@@ -0,0 +1,10 @@
|
||||
"""Shared test fixtures for unit tests."""
|
||||
from __future__ import annotations
|
||||
|
||||
import os
|
||||
import sys
|
||||
|
||||
# Ensure project root is on sys.path so shared.* imports work
|
||||
_project_root = os.path.dirname(os.path.dirname(os.path.dirname(os.path.abspath(__file__))))
|
||||
if _project_root not in sys.path:
|
||||
sys.path.insert(0, _project_root)
|
||||
119
shared/tests/test_activity_bus.py
Normal file
119
shared/tests/test_activity_bus.py
Normal file
@@ -0,0 +1,119 @@
|
||||
"""Tests for activity bus handler registry."""
|
||||
from __future__ import annotations
|
||||
|
||||
import pytest
|
||||
from collections import defaultdict
|
||||
|
||||
from shared.events.bus import (
|
||||
register_activity_handler,
|
||||
get_activity_handlers,
|
||||
_activity_handlers,
|
||||
)
|
||||
|
||||
|
||||
@pytest.fixture(autouse=True)
|
||||
def _clean_handlers():
|
||||
"""Clear handler registry before each test."""
|
||||
_activity_handlers.clear()
|
||||
yield
|
||||
_activity_handlers.clear()
|
||||
|
||||
|
||||
# Dummy handlers
|
||||
async def handler_a(activity, session):
|
||||
pass
|
||||
|
||||
async def handler_b(activity, session):
|
||||
pass
|
||||
|
||||
async def handler_c(activity, session):
|
||||
pass
|
||||
|
||||
async def handler_global(activity, session):
|
||||
pass
|
||||
|
||||
|
||||
# ---------------------------------------------------------------------------
|
||||
# register_activity_handler
|
||||
# ---------------------------------------------------------------------------
|
||||
|
||||
class TestRegisterHandler:
|
||||
def test_register_with_type_only(self):
|
||||
register_activity_handler("Create", handler_a)
|
||||
assert ("Create", "*") in _activity_handlers
|
||||
assert handler_a in _activity_handlers[("Create", "*")]
|
||||
|
||||
def test_register_with_object_type(self):
|
||||
register_activity_handler("Create", handler_a, object_type="Note")
|
||||
assert ("Create", "Note") in _activity_handlers
|
||||
|
||||
def test_multiple_handlers_same_key(self):
|
||||
register_activity_handler("Create", handler_a)
|
||||
register_activity_handler("Create", handler_b)
|
||||
assert len(_activity_handlers[("Create", "*")]) == 2
|
||||
|
||||
def test_wildcard_type(self):
|
||||
register_activity_handler("*", handler_global)
|
||||
assert ("*", "*") in _activity_handlers
|
||||
|
||||
|
||||
# ---------------------------------------------------------------------------
|
||||
# get_activity_handlers — cascading wildcard lookup
|
||||
# ---------------------------------------------------------------------------
|
||||
|
||||
class TestGetHandlers:
|
||||
def test_exact_match(self):
|
||||
register_activity_handler("Create", handler_a, object_type="Note")
|
||||
handlers = get_activity_handlers("Create", "Note")
|
||||
assert handler_a in handlers
|
||||
|
||||
def test_type_wildcard_match(self):
|
||||
register_activity_handler("Create", handler_a) # key: ("Create", "*")
|
||||
handlers = get_activity_handlers("Create", "Note")
|
||||
assert handler_a in handlers
|
||||
|
||||
def test_global_wildcard_match(self):
|
||||
register_activity_handler("*", handler_global) # key: ("*", "*")
|
||||
handlers = get_activity_handlers("Create", "Note")
|
||||
assert handler_global in handlers
|
||||
|
||||
def test_cascading_order(self):
|
||||
"""Handlers should come in order: exact → type-wildcard → global-wildcard."""
|
||||
register_activity_handler("Create", handler_a, object_type="Note") # exact
|
||||
register_activity_handler("Create", handler_b) # type-wildcard
|
||||
register_activity_handler("*", handler_c) # global wildcard
|
||||
|
||||
handlers = get_activity_handlers("Create", "Note")
|
||||
assert handlers == [handler_a, handler_b, handler_c]
|
||||
|
||||
def test_no_match(self):
|
||||
register_activity_handler("Create", handler_a, object_type="Note")
|
||||
handlers = get_activity_handlers("Delete", "Article")
|
||||
assert handlers == []
|
||||
|
||||
def test_no_object_type_skips_exact(self):
|
||||
register_activity_handler("Create", handler_a, object_type="Note")
|
||||
register_activity_handler("Create", handler_b)
|
||||
handlers = get_activity_handlers("Create")
|
||||
# Should get type-wildcard only (since object_type defaults to "*")
|
||||
assert handler_b in handlers
|
||||
assert handler_a not in handlers
|
||||
|
||||
def test_global_wildcard_not_duplicated(self):
|
||||
"""Global wildcard should not fire when activity_type is already '*'."""
|
||||
register_activity_handler("*", handler_global)
|
||||
handlers = get_activity_handlers("*")
|
||||
# Should not include global wildcard twice
|
||||
assert handlers.count(handler_global) == 1
|
||||
|
||||
def test_type_wildcard_plus_global(self):
|
||||
register_activity_handler("Follow", handler_a)
|
||||
register_activity_handler("*", handler_global)
|
||||
handlers = get_activity_handlers("Follow")
|
||||
assert handler_a in handlers
|
||||
assert handler_global in handlers
|
||||
|
||||
def test_only_global_wildcard(self):
|
||||
register_activity_handler("*", handler_global)
|
||||
handlers = get_activity_handlers("Like", "Post")
|
||||
assert handlers == [handler_global]
|
||||
117
shared/tests/test_calendar_helpers.py
Normal file
117
shared/tests/test_calendar_helpers.py
Normal file
@@ -0,0 +1,117 @@
|
||||
"""Tests for calendar date helper functions."""
|
||||
from __future__ import annotations
|
||||
|
||||
from datetime import date
|
||||
from unittest.mock import patch, MagicMock
|
||||
|
||||
from shared.utils.calendar_helpers import add_months, build_calendar_weeks
|
||||
|
||||
|
||||
# ---------------------------------------------------------------------------
|
||||
# add_months
|
||||
# ---------------------------------------------------------------------------
|
||||
|
||||
class TestAddMonths:
|
||||
def test_same_year(self):
|
||||
assert add_months(2025, 3, 2) == (2025, 5)
|
||||
|
||||
def test_next_year(self):
|
||||
assert add_months(2025, 11, 2) == (2026, 1)
|
||||
|
||||
def test_subtract(self):
|
||||
assert add_months(2025, 3, -2) == (2025, 1)
|
||||
|
||||
def test_subtract_prev_year(self):
|
||||
assert add_months(2025, 1, -1) == (2024, 12)
|
||||
|
||||
def test_add_twelve(self):
|
||||
assert add_months(2025, 6, 12) == (2026, 6)
|
||||
|
||||
def test_subtract_twelve(self):
|
||||
assert add_months(2025, 6, -12) == (2024, 6)
|
||||
|
||||
def test_large_delta(self):
|
||||
assert add_months(2025, 1, 25) == (2027, 2)
|
||||
|
||||
def test_zero_delta(self):
|
||||
assert add_months(2025, 7, 0) == (2025, 7)
|
||||
|
||||
def test_december_to_january(self):
|
||||
assert add_months(2025, 12, 1) == (2026, 1)
|
||||
|
||||
def test_january_to_december(self):
|
||||
assert add_months(2025, 1, -1) == (2024, 12)
|
||||
|
||||
def test_subtract_large(self):
|
||||
assert add_months(2025, 3, -15) == (2023, 12)
|
||||
|
||||
def test_month_boundaries(self):
|
||||
# Every month +1
|
||||
for m in range(1, 12):
|
||||
y, nm = add_months(2025, m, 1)
|
||||
assert nm == m + 1
|
||||
assert y == 2025
|
||||
y, nm = add_months(2025, 12, 1)
|
||||
assert nm == 1
|
||||
assert y == 2026
|
||||
|
||||
|
||||
# ---------------------------------------------------------------------------
|
||||
# build_calendar_weeks
|
||||
# ---------------------------------------------------------------------------
|
||||
|
||||
class TestBuildCalendarWeeks:
|
||||
def test_returns_list_of_weeks(self):
|
||||
weeks = build_calendar_weeks(2025, 6)
|
||||
assert isinstance(weeks, list)
|
||||
assert len(weeks) >= 4 # at least 4 weeks in any month
|
||||
assert len(weeks) <= 6 # at most 6 weeks
|
||||
|
||||
def test_each_week_has_7_days(self):
|
||||
weeks = build_calendar_weeks(2025, 6)
|
||||
for week in weeks:
|
||||
assert len(week) == 7
|
||||
|
||||
def test_day_dict_structure(self):
|
||||
weeks = build_calendar_weeks(2025, 6)
|
||||
day = weeks[0][0]
|
||||
assert "date" in day
|
||||
assert "in_month" in day
|
||||
assert "is_today" in day
|
||||
assert isinstance(day["date"], date)
|
||||
assert isinstance(day["in_month"], bool)
|
||||
assert isinstance(day["is_today"], bool)
|
||||
|
||||
def test_in_month_flag(self):
|
||||
weeks = build_calendar_weeks(2025, 6)
|
||||
june_days = [d for w in weeks for d in w if d["in_month"]]
|
||||
assert len(june_days) == 30 # June has 30 days
|
||||
|
||||
def test_february_leap_year(self):
|
||||
weeks = build_calendar_weeks(2024, 2)
|
||||
feb_days = [d for w in weeks for d in w if d["in_month"]]
|
||||
assert len(feb_days) == 29
|
||||
|
||||
def test_february_non_leap_year(self):
|
||||
weeks = build_calendar_weeks(2025, 2)
|
||||
feb_days = [d for w in weeks for d in w if d["in_month"]]
|
||||
assert len(feb_days) == 28
|
||||
|
||||
def test_starts_on_monday(self):
|
||||
"""Calendar should start on Monday (firstweekday=0)."""
|
||||
weeks = build_calendar_weeks(2025, 6)
|
||||
first_day = weeks[0][0]["date"]
|
||||
assert first_day.weekday() == 0 # Monday
|
||||
|
||||
def test_is_today_flag(self):
|
||||
"""The today flag should be True for exactly one day (or zero if not in month range)."""
|
||||
# Use a fixed known date - mock datetime.now
|
||||
from datetime import datetime, timezone
|
||||
fixed_now = datetime(2025, 6, 15, tzinfo=timezone.utc)
|
||||
with patch("shared.utils.calendar_helpers.datetime") as mock_dt:
|
||||
mock_dt.now.return_value = fixed_now
|
||||
mock_dt.side_effect = lambda *a, **kw: datetime(*a, **kw)
|
||||
weeks = build_calendar_weeks(2025, 6)
|
||||
today_days = [d for w in weeks for d in w if d["is_today"]]
|
||||
assert len(today_days) == 1
|
||||
assert today_days[0]["date"] == date(2025, 6, 15)
|
||||
152
shared/tests/test_config.py
Normal file
152
shared/tests/test_config.py
Normal file
@@ -0,0 +1,152 @@
|
||||
"""Tests for config freeze/readonly enforcement."""
|
||||
from __future__ import annotations
|
||||
|
||||
import asyncio
|
||||
import os
|
||||
import tempfile
|
||||
from types import MappingProxyType
|
||||
|
||||
import pytest
|
||||
|
||||
from shared.config import _freeze
|
||||
|
||||
|
||||
# ---------------------------------------------------------------------------
|
||||
# _freeze
|
||||
# ---------------------------------------------------------------------------
|
||||
|
||||
class TestFreeze:
|
||||
def test_freezes_dict(self):
|
||||
result = _freeze({"a": 1, "b": 2})
|
||||
assert isinstance(result, MappingProxyType)
|
||||
assert result["a"] == 1
|
||||
|
||||
def test_frozen_dict_is_immutable(self):
|
||||
result = _freeze({"a": 1})
|
||||
with pytest.raises(TypeError):
|
||||
result["a"] = 2
|
||||
with pytest.raises(TypeError):
|
||||
result["new"] = 3
|
||||
|
||||
def test_freezes_list_to_tuple(self):
|
||||
result = _freeze([1, 2, 3])
|
||||
assert isinstance(result, tuple)
|
||||
assert result == (1, 2, 3)
|
||||
|
||||
def test_freezes_set_to_frozenset(self):
|
||||
result = _freeze({1, 2, 3})
|
||||
assert isinstance(result, frozenset)
|
||||
assert result == frozenset({1, 2, 3})
|
||||
|
||||
def test_freezes_nested_dict(self):
|
||||
result = _freeze({"a": {"b": {"c": 1}}})
|
||||
assert isinstance(result, MappingProxyType)
|
||||
assert isinstance(result["a"], MappingProxyType)
|
||||
assert isinstance(result["a"]["b"], MappingProxyType)
|
||||
assert result["a"]["b"]["c"] == 1
|
||||
|
||||
def test_freezes_dict_with_list(self):
|
||||
result = _freeze({"items": [1, 2, 3]})
|
||||
assert isinstance(result["items"], tuple)
|
||||
|
||||
def test_freezes_list_of_dicts(self):
|
||||
result = _freeze([{"a": 1}, {"b": 2}])
|
||||
assert isinstance(result, tuple)
|
||||
assert isinstance(result[0], MappingProxyType)
|
||||
|
||||
def test_preserves_scalars(self):
|
||||
assert _freeze(42) == 42
|
||||
assert _freeze("hello") == "hello"
|
||||
assert _freeze(3.14) == 3.14
|
||||
assert _freeze(True) is True
|
||||
assert _freeze(None) is None
|
||||
|
||||
def test_freezes_tuple_recursively(self):
|
||||
result = _freeze(({"a": 1}, [2, 3]))
|
||||
assert isinstance(result, tuple)
|
||||
assert isinstance(result[0], MappingProxyType)
|
||||
assert isinstance(result[1], tuple)
|
||||
|
||||
def test_complex_config_structure(self):
|
||||
"""Simulates a real app-config.yaml structure."""
|
||||
raw = {
|
||||
"app_urls": {
|
||||
"blog": "https://blog.rose-ash.com",
|
||||
"market": "https://market.rose-ash.com",
|
||||
},
|
||||
"features": ["sexp", "federation"],
|
||||
"limits": {"max_upload": 10485760},
|
||||
}
|
||||
frozen = _freeze(raw)
|
||||
assert frozen["app_urls"]["blog"] == "https://blog.rose-ash.com"
|
||||
assert frozen["features"] == ("sexp", "federation")
|
||||
with pytest.raises(TypeError):
|
||||
frozen["app_urls"]["blog"] = "changed"
|
||||
|
||||
|
||||
# ---------------------------------------------------------------------------
|
||||
# init_config / config / as_plain / pretty
|
||||
# ---------------------------------------------------------------------------
|
||||
|
||||
class TestConfigInit:
|
||||
def test_init_and_read(self):
|
||||
"""Test full init_config → config() → as_plain() → pretty() cycle."""
|
||||
import shared.config as cfg
|
||||
|
||||
# Save original state
|
||||
orig_frozen = cfg._data_frozen
|
||||
orig_plain = cfg._data_plain
|
||||
|
||||
try:
|
||||
# Reset state
|
||||
cfg._data_frozen = None
|
||||
cfg._data_plain = None
|
||||
|
||||
# Write a temp YAML file
|
||||
with tempfile.NamedTemporaryFile(mode="w", suffix=".yaml", delete=False) as f:
|
||||
f.write("app_urls:\n blog: https://blog.example.com\nport: 8001\n")
|
||||
path = f.name
|
||||
|
||||
try:
|
||||
asyncio.run(cfg.init_config(path, force=True))
|
||||
|
||||
c = cfg.config()
|
||||
assert c["app_urls"]["blog"] == "https://blog.example.com"
|
||||
assert c["port"] == 8001
|
||||
assert isinstance(c, MappingProxyType)
|
||||
|
||||
plain = cfg.as_plain()
|
||||
assert isinstance(plain, dict)
|
||||
assert plain["port"] == 8001
|
||||
# Modifying plain should not affect config
|
||||
plain["port"] = 9999
|
||||
assert cfg.config()["port"] == 8001
|
||||
|
||||
pretty_str = cfg.pretty()
|
||||
assert "blog" in pretty_str
|
||||
finally:
|
||||
os.unlink(path)
|
||||
finally:
|
||||
# Restore original state
|
||||
cfg._data_frozen = orig_frozen
|
||||
cfg._data_plain = orig_plain
|
||||
|
||||
def test_config_raises_before_init(self):
|
||||
import shared.config as cfg
|
||||
orig = cfg._data_frozen
|
||||
try:
|
||||
cfg._data_frozen = None
|
||||
with pytest.raises(RuntimeError, match="init_config"):
|
||||
cfg.config()
|
||||
finally:
|
||||
cfg._data_frozen = orig
|
||||
|
||||
def test_file_not_found(self):
|
||||
import shared.config as cfg
|
||||
orig = cfg._data_frozen
|
||||
try:
|
||||
cfg._data_frozen = None
|
||||
with pytest.raises(FileNotFoundError):
|
||||
asyncio.run(cfg.init_config("/nonexistent/path.yaml", force=True))
|
||||
finally:
|
||||
cfg._data_frozen = orig
|
||||
215
shared/tests/test_dtos.py
Normal file
215
shared/tests/test_dtos.py
Normal file
@@ -0,0 +1,215 @@
|
||||
"""Tests for DTO serialization helpers and dataclass contracts."""
|
||||
from __future__ import annotations
|
||||
|
||||
import pytest
|
||||
from datetime import datetime
|
||||
from decimal import Decimal
|
||||
|
||||
from shared.contracts.dtos import (
|
||||
PostDTO,
|
||||
CalendarDTO,
|
||||
CalendarEntryDTO,
|
||||
TicketDTO,
|
||||
MarketPlaceDTO,
|
||||
ProductDTO,
|
||||
CartItemDTO,
|
||||
CartSummaryDTO,
|
||||
ActorProfileDTO,
|
||||
APActivityDTO,
|
||||
RemotePostDTO,
|
||||
dto_to_dict,
|
||||
dto_from_dict,
|
||||
_serialize_value,
|
||||
_unwrap_optional,
|
||||
)
|
||||
|
||||
|
||||
# ---------------------------------------------------------------------------
|
||||
# _serialize_value
|
||||
# ---------------------------------------------------------------------------
|
||||
|
||||
class TestSerializeValue:
|
||||
def test_datetime(self):
|
||||
dt = datetime(2025, 6, 15, 12, 30, 0)
|
||||
assert _serialize_value(dt) == "2025-06-15T12:30:00"
|
||||
|
||||
def test_decimal(self):
|
||||
assert _serialize_value(Decimal("19.99")) == "19.99"
|
||||
|
||||
def test_set_to_list(self):
|
||||
result = _serialize_value({1, 2, 3})
|
||||
assert isinstance(result, list)
|
||||
assert set(result) == {1, 2, 3}
|
||||
|
||||
def test_string_passthrough(self):
|
||||
assert _serialize_value("hello") == "hello"
|
||||
|
||||
def test_int_passthrough(self):
|
||||
assert _serialize_value(42) == 42
|
||||
|
||||
def test_none_passthrough(self):
|
||||
assert _serialize_value(None) is None
|
||||
|
||||
def test_nested_list(self):
|
||||
dt = datetime(2025, 1, 1)
|
||||
result = _serialize_value([dt, Decimal("5")])
|
||||
assert result == ["2025-01-01T00:00:00", "5"]
|
||||
|
||||
def test_nested_dataclass(self):
|
||||
post = PostDTO(id=1, slug="test", title="Test", status="published", visibility="public")
|
||||
result = _serialize_value(post)
|
||||
assert isinstance(result, dict)
|
||||
assert result["slug"] == "test"
|
||||
|
||||
|
||||
# ---------------------------------------------------------------------------
|
||||
# _unwrap_optional
|
||||
# ---------------------------------------------------------------------------
|
||||
|
||||
class TestUnwrapOptional:
|
||||
def test_optional_str(self):
|
||||
from typing import Optional
|
||||
result = _unwrap_optional(Optional[str])
|
||||
assert result is str
|
||||
|
||||
def test_optional_int(self):
|
||||
from typing import Optional
|
||||
result = _unwrap_optional(Optional[int])
|
||||
assert result is int
|
||||
|
||||
def test_plain_type(self):
|
||||
assert _unwrap_optional(str) is str
|
||||
|
||||
def test_union_with_none(self):
|
||||
from typing import Union
|
||||
result = _unwrap_optional(Union[datetime, None])
|
||||
assert result is datetime
|
||||
|
||||
|
||||
# ---------------------------------------------------------------------------
|
||||
# dto_to_dict
|
||||
# ---------------------------------------------------------------------------
|
||||
|
||||
class TestDtoToDict:
|
||||
def test_simple_post(self):
|
||||
post = PostDTO(id=1, slug="my-post", title="My Post", status="published", visibility="public")
|
||||
d = dto_to_dict(post)
|
||||
assert d["id"] == 1
|
||||
assert d["slug"] == "my-post"
|
||||
assert d["feature_image"] is None
|
||||
|
||||
def test_with_datetime(self):
|
||||
dt = datetime(2025, 6, 15, 10, 0, 0)
|
||||
post = PostDTO(id=1, slug="s", title="T", status="published", visibility="public", published_at=dt)
|
||||
d = dto_to_dict(post)
|
||||
assert d["published_at"] == "2025-06-15T10:00:00"
|
||||
|
||||
def test_with_decimal(self):
|
||||
product = ProductDTO(id=1, slug="widget", rrp=Decimal("29.99"), regular_price=Decimal("24.99"))
|
||||
d = dto_to_dict(product)
|
||||
assert d["rrp"] == "29.99"
|
||||
assert d["regular_price"] == "24.99"
|
||||
|
||||
def test_cart_summary_with_items(self):
|
||||
item = CartItemDTO(id=1, product_id=10, quantity=2, unit_price=Decimal("5.00"))
|
||||
summary = CartSummaryDTO(count=1, total=Decimal("10.00"), items=[item])
|
||||
d = dto_to_dict(summary)
|
||||
assert d["count"] == 1
|
||||
assert d["total"] == "10.00"
|
||||
assert len(d["items"]) == 1
|
||||
assert d["items"][0]["product_id"] == 10
|
||||
|
||||
def test_remote_post_with_nested_lists(self):
|
||||
rp = RemotePostDTO(
|
||||
id=1, remote_actor_id=5, object_id="https://example.com/1",
|
||||
content="<p>Hello</p>",
|
||||
attachments=[{"type": "Image", "url": "https://img.example.com/1.jpg"}],
|
||||
tags=[{"type": "Hashtag", "name": "#test"}],
|
||||
)
|
||||
d = dto_to_dict(rp)
|
||||
assert d["attachments"] == [{"type": "Image", "url": "https://img.example.com/1.jpg"}]
|
||||
|
||||
|
||||
# ---------------------------------------------------------------------------
|
||||
# dto_from_dict
|
||||
# ---------------------------------------------------------------------------
|
||||
|
||||
class TestDtoFromDict:
|
||||
def test_none_data(self):
|
||||
assert dto_from_dict(PostDTO, None) is None
|
||||
|
||||
def test_empty_dict(self):
|
||||
assert dto_from_dict(PostDTO, {}) is None
|
||||
|
||||
def test_simple_post(self):
|
||||
d = {"id": 1, "slug": "test", "title": "Test", "status": "published", "visibility": "public"}
|
||||
post = dto_from_dict(PostDTO, d)
|
||||
assert post.slug == "test"
|
||||
assert post.feature_image is None
|
||||
|
||||
def test_datetime_coercion(self):
|
||||
d = {
|
||||
"id": 1, "slug": "s", "title": "T", "status": "published",
|
||||
"visibility": "public", "published_at": "2025-06-15T10:00:00",
|
||||
}
|
||||
post = dto_from_dict(PostDTO, d)
|
||||
assert isinstance(post.published_at, datetime)
|
||||
assert post.published_at.year == 2025
|
||||
|
||||
def test_decimal_coercion(self):
|
||||
d = {"id": 1, "slug": "w", "rrp": "29.99", "regular_price": 24.99}
|
||||
product = dto_from_dict(ProductDTO, d)
|
||||
assert isinstance(product.rrp, Decimal)
|
||||
assert product.rrp == Decimal("29.99")
|
||||
assert isinstance(product.regular_price, Decimal)
|
||||
|
||||
def test_round_trip(self):
|
||||
dt = datetime(2025, 3, 1, 9, 30, 0)
|
||||
original = PostDTO(id=1, slug="rt", title="Round Trip", status="draft", visibility="members", published_at=dt)
|
||||
d = dto_to_dict(original)
|
||||
restored = dto_from_dict(PostDTO, d)
|
||||
assert restored.id == original.id
|
||||
assert restored.slug == original.slug
|
||||
assert restored.published_at == original.published_at
|
||||
|
||||
def test_extra_keys_ignored(self):
|
||||
d = {"id": 1, "slug": "s", "title": "T", "status": "published", "visibility": "public", "extra_field": "ignored"}
|
||||
post = dto_from_dict(PostDTO, d)
|
||||
assert post.slug == "s"
|
||||
|
||||
def test_calendar_entry_decimals(self):
|
||||
d = {
|
||||
"id": 1, "calendar_id": 2, "name": "Event", "start_at": "2025-07-01T14:00:00",
|
||||
"state": "confirmed", "cost": "15.00", "ticket_price": "10.50",
|
||||
}
|
||||
entry = dto_from_dict(CalendarEntryDTO, d)
|
||||
assert isinstance(entry.cost, Decimal)
|
||||
assert entry.cost == Decimal("15.00")
|
||||
assert isinstance(entry.ticket_price, Decimal)
|
||||
|
||||
|
||||
# ---------------------------------------------------------------------------
|
||||
# Frozen DTOs
|
||||
# ---------------------------------------------------------------------------
|
||||
|
||||
class TestFrozenDTOs:
|
||||
def test_post_is_frozen(self):
|
||||
post = PostDTO(id=1, slug="s", title="T", status="published", visibility="public")
|
||||
with pytest.raises(AttributeError):
|
||||
post.title = "changed"
|
||||
|
||||
def test_product_is_frozen(self):
|
||||
product = ProductDTO(id=1, slug="s")
|
||||
with pytest.raises(AttributeError):
|
||||
product.slug = "changed"
|
||||
|
||||
def test_calendar_dto_defaults(self):
|
||||
cal = CalendarDTO(id=1, container_type="page", container_id=5, name="My Cal", slug="my-cal")
|
||||
assert cal.description is None
|
||||
|
||||
def test_cart_summary_defaults(self):
|
||||
summary = CartSummaryDTO()
|
||||
assert summary.count == 0
|
||||
assert summary.total == Decimal("0")
|
||||
assert summary.items == []
|
||||
assert summary.ticket_count == 0
|
||||
96
shared/tests/test_errors.py
Normal file
96
shared/tests/test_errors.py
Normal file
@@ -0,0 +1,96 @@
|
||||
"""Tests for error classes and error page rendering."""
|
||||
from __future__ import annotations
|
||||
|
||||
import pytest
|
||||
|
||||
from shared.browser.app.errors import AppError, _error_page
|
||||
|
||||
|
||||
# ---------------------------------------------------------------------------
|
||||
# AppError
|
||||
# ---------------------------------------------------------------------------
|
||||
|
||||
class TestAppError:
|
||||
def test_single_message(self):
|
||||
err = AppError("Something went wrong")
|
||||
assert str(err) == "Something went wrong"
|
||||
assert err.messages == ["Something went wrong"]
|
||||
assert err.status_code == 400
|
||||
|
||||
def test_custom_status_code(self):
|
||||
err = AppError("Not found", status_code=404)
|
||||
assert err.status_code == 404
|
||||
assert str(err) == "Not found"
|
||||
|
||||
def test_list_of_messages(self):
|
||||
err = AppError(["Error 1", "Error 2", "Error 3"])
|
||||
assert err.messages == ["Error 1", "Error 2", "Error 3"]
|
||||
assert str(err) == "Error 1" # first message as str
|
||||
|
||||
def test_tuple_of_messages(self):
|
||||
err = AppError(("A", "B"))
|
||||
assert err.messages == ["A", "B"]
|
||||
|
||||
def test_set_of_messages(self):
|
||||
err = AppError({"only one"})
|
||||
assert err.messages == ["only one"]
|
||||
|
||||
def test_empty_list(self):
|
||||
err = AppError([])
|
||||
assert err.messages == []
|
||||
assert str(err) == ""
|
||||
|
||||
def test_is_value_error(self):
|
||||
"""AppError should be catchable as ValueError for backwards compat."""
|
||||
err = AppError("test")
|
||||
assert isinstance(err, ValueError)
|
||||
|
||||
def test_default_status_is_400(self):
|
||||
err = AppError("test")
|
||||
assert err.status_code == 400
|
||||
|
||||
def test_integer_message_coerced(self):
|
||||
err = AppError([42, "text"])
|
||||
assert err.messages == ["42", "text"]
|
||||
|
||||
def test_status_code_override(self):
|
||||
err = AppError("conflict", status_code=409)
|
||||
assert err.status_code == 409
|
||||
|
||||
def test_messages_are_strings(self):
|
||||
err = AppError([None, 123, True])
|
||||
assert all(isinstance(m, str) for m in err.messages)
|
||||
|
||||
|
||||
# ---------------------------------------------------------------------------
|
||||
# _error_page
|
||||
# ---------------------------------------------------------------------------
|
||||
|
||||
class TestErrorPage:
|
||||
def test_returns_html_string(self):
|
||||
html = _error_page("Not Found")
|
||||
assert isinstance(html, str)
|
||||
assert "<!DOCTYPE html>" in html
|
||||
|
||||
def test_contains_message(self):
|
||||
html = _error_page("Something broke")
|
||||
assert "Something broke" in html
|
||||
|
||||
def test_contains_error_gif(self):
|
||||
html = _error_page("Error")
|
||||
assert "/static/errors/error.gif" in html
|
||||
|
||||
def test_contains_reload_link(self):
|
||||
html = _error_page("Error")
|
||||
assert "Reload" in html
|
||||
|
||||
def test_html_in_message(self):
|
||||
"""Messages can contain HTML (used by fragment_error handler)."""
|
||||
html = _error_page("The <b>account</b> service is unavailable")
|
||||
assert "<b>account</b>" in html
|
||||
|
||||
def test_self_contained(self):
|
||||
"""Error page should include its own styles (no external CSS deps)."""
|
||||
html = _error_page("Error")
|
||||
assert "<style>" in html
|
||||
assert "</style>" in html
|
||||
291
shared/tests/test_filters.py
Normal file
291
shared/tests/test_filters.py
Normal file
@@ -0,0 +1,291 @@
|
||||
"""Tests for Jinja template filters (pure-logic functions)."""
|
||||
from __future__ import annotations
|
||||
|
||||
from decimal import Decimal
|
||||
|
||||
from markupsafe import Markup
|
||||
|
||||
from shared.browser.app.filters.highlight import highlight
|
||||
from shared.browser.app.filters.combine import _deep_merge
|
||||
from shared.browser.app.filters.qs_base import (
|
||||
_iterify,
|
||||
_norm,
|
||||
make_filter_set,
|
||||
build_qs,
|
||||
)
|
||||
|
||||
|
||||
# ---------------------------------------------------------------------------
|
||||
# highlight
|
||||
# ---------------------------------------------------------------------------
|
||||
|
||||
class TestHighlight:
|
||||
def test_basic_highlight(self):
|
||||
result = highlight("Hello World", "world")
|
||||
assert isinstance(result, Markup)
|
||||
assert "<mark" in result
|
||||
assert "World" in result
|
||||
|
||||
def test_case_insensitive(self):
|
||||
result = highlight("Hello World", "HELLO")
|
||||
assert "<mark" in result
|
||||
|
||||
def test_empty_needle(self):
|
||||
result = highlight("Hello", "")
|
||||
assert result == "Hello"
|
||||
|
||||
def test_empty_text(self):
|
||||
result = highlight("", "needle")
|
||||
assert result == ""
|
||||
|
||||
def test_none_text(self):
|
||||
result = highlight(None, "needle")
|
||||
assert result == ""
|
||||
|
||||
def test_no_match(self):
|
||||
result = highlight("Hello World", "xyz")
|
||||
assert "<mark" not in result
|
||||
assert "Hello World" in result
|
||||
|
||||
def test_escapes_html(self):
|
||||
result = highlight("<script>alert('xss')</script>", "script")
|
||||
assert "<script>" not in str(result)
|
||||
assert "<script>" in str(result) or "<" in str(result)
|
||||
|
||||
def test_custom_class(self):
|
||||
result = highlight("Hello", "Hello", cls="highlight-red")
|
||||
assert "highlight-red" in result
|
||||
|
||||
|
||||
# ---------------------------------------------------------------------------
|
||||
# truncate (tested as pure function)
|
||||
# ---------------------------------------------------------------------------
|
||||
|
||||
class TestTruncate:
|
||||
"""Test the truncate logic directly."""
|
||||
|
||||
@staticmethod
|
||||
def _truncate(text, max_length=100):
|
||||
if text is None:
|
||||
return ""
|
||||
text = str(text)
|
||||
if len(text) <= max_length:
|
||||
return text
|
||||
if max_length <= 1:
|
||||
return "…"
|
||||
return text[:max_length - 1] + "…"
|
||||
|
||||
def test_short_text(self):
|
||||
assert self._truncate("hello", 10) == "hello"
|
||||
|
||||
def test_exact_length(self):
|
||||
assert self._truncate("hello", 5) == "hello"
|
||||
|
||||
def test_truncates_long(self):
|
||||
result = self._truncate("hello world", 6)
|
||||
assert result == "hello…"
|
||||
assert len(result) == 6
|
||||
|
||||
def test_none_input(self):
|
||||
assert self._truncate(None) == ""
|
||||
|
||||
def test_max_length_one(self):
|
||||
assert self._truncate("hello", 1) == "…"
|
||||
|
||||
def test_max_length_zero(self):
|
||||
assert self._truncate("hello", 0) == "…"
|
||||
|
||||
|
||||
# ---------------------------------------------------------------------------
|
||||
# currency (tested as pure function)
|
||||
# ---------------------------------------------------------------------------
|
||||
|
||||
class TestCurrency:
|
||||
@staticmethod
|
||||
def _currency(value, code="GBP"):
|
||||
if value is None:
|
||||
return ""
|
||||
if isinstance(value, float):
|
||||
value = Decimal(str(value))
|
||||
symbol = "£" if code == "GBP" else code
|
||||
return f"{symbol}{value:.2f}"
|
||||
|
||||
def test_gbp(self):
|
||||
assert self._currency(Decimal("19.99")) == "£19.99"
|
||||
|
||||
def test_none(self):
|
||||
assert self._currency(None) == ""
|
||||
|
||||
def test_float_conversion(self):
|
||||
assert self._currency(19.99) == "£19.99"
|
||||
|
||||
def test_non_gbp(self):
|
||||
assert self._currency(Decimal("10.00"), "EUR") == "EUR10.00"
|
||||
|
||||
def test_zero(self):
|
||||
assert self._currency(Decimal("0")) == "£0.00"
|
||||
|
||||
def test_integer_decimal(self):
|
||||
assert self._currency(Decimal("5")) == "£5.00"
|
||||
|
||||
|
||||
# ---------------------------------------------------------------------------
|
||||
# combine / _deep_merge
|
||||
# ---------------------------------------------------------------------------
|
||||
|
||||
class TestDeepMerge:
|
||||
def test_simple_merge(self):
|
||||
assert _deep_merge({"a": 1}, {"b": 2}) == {"a": 1, "b": 2}
|
||||
|
||||
def test_overwrite(self):
|
||||
assert _deep_merge({"a": 1}, {"a": 2}) == {"a": 2}
|
||||
|
||||
def test_nested_merge(self):
|
||||
a = {"x": {"a": 1, "b": 2}}
|
||||
b = {"x": {"b": 3, "c": 4}}
|
||||
result = _deep_merge(a, b)
|
||||
assert result == {"x": {"a": 1, "b": 3, "c": 4}}
|
||||
|
||||
def test_deeply_nested(self):
|
||||
a = {"x": {"y": {"z": 1}}}
|
||||
b = {"x": {"y": {"w": 2}}}
|
||||
result = _deep_merge(a, b)
|
||||
assert result == {"x": {"y": {"z": 1, "w": 2}}}
|
||||
|
||||
def test_does_not_mutate_original(self):
|
||||
a = {"a": 1}
|
||||
b = {"b": 2}
|
||||
_deep_merge(a, b)
|
||||
assert a == {"a": 1}
|
||||
|
||||
|
||||
class TestCombineFilter:
|
||||
"""Test the combine filter logic inline (it's defined inside register())."""
|
||||
from typing import Any, Mapping
|
||||
|
||||
@staticmethod
|
||||
def _combine(a, b, deep=False, drop_none=False):
|
||||
from collections.abc import Mapping
|
||||
if not isinstance(a, Mapping) or not isinstance(b, Mapping):
|
||||
return a
|
||||
b2 = {k: v for k, v in b.items() if not (drop_none and v is None)}
|
||||
return _deep_merge(a, b2) if deep else {**a, **b2}
|
||||
|
||||
def test_non_dict_returns_a(self):
|
||||
assert self._combine("hello", {"a": 1}) == "hello"
|
||||
assert self._combine(42, {"a": 1}) == 42
|
||||
|
||||
def test_shallow_merge(self):
|
||||
result = self._combine({"a": 1}, {"b": 2})
|
||||
assert result == {"a": 1, "b": 2}
|
||||
|
||||
def test_deep_merge(self):
|
||||
result = self._combine({"x": {"a": 1}}, {"x": {"b": 2}}, deep=True)
|
||||
assert result == {"x": {"a": 1, "b": 2}}
|
||||
|
||||
def test_drop_none(self):
|
||||
result = self._combine({"a": 1, "b": 2}, {"b": None, "c": 3}, drop_none=True)
|
||||
assert result == {"a": 1, "b": 2, "c": 3}
|
||||
|
||||
def test_keep_none_when_not_dropping(self):
|
||||
result = self._combine({"a": 1}, {"a": None})
|
||||
assert result == {"a": None}
|
||||
|
||||
|
||||
# ---------------------------------------------------------------------------
|
||||
# qs_base
|
||||
# ---------------------------------------------------------------------------
|
||||
|
||||
class TestIterify:
|
||||
def test_none(self):
|
||||
assert _iterify(None) == []
|
||||
|
||||
def test_scalar(self):
|
||||
assert _iterify("hello") == ["hello"]
|
||||
|
||||
def test_list(self):
|
||||
assert _iterify([1, 2]) == [1, 2]
|
||||
|
||||
def test_tuple(self):
|
||||
assert _iterify((1, 2)) == (1, 2)
|
||||
|
||||
def test_set(self):
|
||||
assert _iterify({1, 2}) == {1, 2}
|
||||
|
||||
|
||||
class TestNorm:
|
||||
def test_strips_and_lowercases(self):
|
||||
assert _norm(" Hello ") == "hello"
|
||||
|
||||
def test_already_lower(self):
|
||||
assert _norm("hello") == "hello"
|
||||
|
||||
|
||||
class TestMakeFilterSet:
|
||||
def test_add_to_empty(self):
|
||||
result = make_filter_set([], "new", None, False)
|
||||
assert result == ["new"]
|
||||
|
||||
def test_add_preserves_existing(self):
|
||||
result = make_filter_set(["a", "b"], "c", None, False)
|
||||
assert result == ["a", "b", "c"]
|
||||
|
||||
def test_remove(self):
|
||||
result = make_filter_set(["a", "b", "c"], None, "b", False)
|
||||
assert result == ["a", "c"]
|
||||
|
||||
def test_clear_filters(self):
|
||||
result = make_filter_set(["a", "b"], None, None, True)
|
||||
assert result == []
|
||||
|
||||
def test_clear_then_add(self):
|
||||
result = make_filter_set(["a", "b"], "c", None, True)
|
||||
assert result == ["c"]
|
||||
|
||||
def test_case_insensitive_dedup(self):
|
||||
result = make_filter_set(["Hello"], "hello", None, False)
|
||||
assert len(result) == 1
|
||||
|
||||
def test_sorted_output(self):
|
||||
result = make_filter_set([], ["c", "a", "b"], None, False)
|
||||
assert result == ["a", "b", "c"]
|
||||
|
||||
def test_single_select_replaces(self):
|
||||
result = make_filter_set(["old1", "old2"], "new", None, False, single_select=True)
|
||||
assert result == ["new"]
|
||||
|
||||
def test_single_select_no_add_keeps_base(self):
|
||||
result = make_filter_set(["a", "b"], None, None, False, single_select=True)
|
||||
assert result == ["a", "b"]
|
||||
|
||||
def test_remove_case_insensitive(self):
|
||||
result = make_filter_set(["Hello", "World"], None, "hello", False)
|
||||
assert result == ["World"]
|
||||
|
||||
def test_add_none_values_filtered(self):
|
||||
result = make_filter_set([], [None, "a", None], None, False)
|
||||
assert result == ["a"]
|
||||
|
||||
|
||||
class TestBuildQs:
|
||||
def test_basic(self):
|
||||
result = build_qs([("key", "value")])
|
||||
assert result == "?key=value"
|
||||
|
||||
def test_multiple_params(self):
|
||||
result = build_qs([("a", "1"), ("b", "2")])
|
||||
assert "a=1" in result
|
||||
assert "b=2" in result
|
||||
assert result.startswith("?")
|
||||
|
||||
def test_no_leading_q(self):
|
||||
result = build_qs([("key", "value")], leading_q=False)
|
||||
assert result == "key=value"
|
||||
|
||||
def test_empty_params(self):
|
||||
result = build_qs([])
|
||||
assert result == ""
|
||||
|
||||
def test_empty_params_no_leading(self):
|
||||
result = build_qs([], leading_q=False)
|
||||
assert result == ""
|
||||
140
shared/tests/test_http_signatures.py
Normal file
140
shared/tests/test_http_signatures.py
Normal file
@@ -0,0 +1,140 @@
|
||||
"""Tests for RSA key generation and HTTP Signature signing/verification."""
|
||||
from __future__ import annotations
|
||||
|
||||
import json
|
||||
|
||||
from shared.utils.http_signatures import (
|
||||
generate_rsa_keypair,
|
||||
sign_request,
|
||||
verify_request_signature,
|
||||
create_ld_signature,
|
||||
)
|
||||
|
||||
|
||||
# ---------------------------------------------------------------------------
|
||||
# Key generation
|
||||
# ---------------------------------------------------------------------------
|
||||
|
||||
class TestKeyGeneration:
|
||||
def test_generates_pem_strings(self):
|
||||
private_pem, public_pem = generate_rsa_keypair()
|
||||
assert isinstance(private_pem, str)
|
||||
assert isinstance(public_pem, str)
|
||||
|
||||
def test_private_key_format(self):
|
||||
private_pem, _ = generate_rsa_keypair()
|
||||
assert "BEGIN PRIVATE KEY" in private_pem
|
||||
assert "END PRIVATE KEY" in private_pem
|
||||
|
||||
def test_public_key_format(self):
|
||||
_, public_pem = generate_rsa_keypair()
|
||||
assert "BEGIN PUBLIC KEY" in public_pem
|
||||
assert "END PUBLIC KEY" in public_pem
|
||||
|
||||
def test_keys_are_unique(self):
|
||||
priv1, pub1 = generate_rsa_keypair()
|
||||
priv2, pub2 = generate_rsa_keypair()
|
||||
assert priv1 != priv2
|
||||
assert pub1 != pub2
|
||||
|
||||
|
||||
# ---------------------------------------------------------------------------
|
||||
# Sign + verify round-trip
|
||||
# ---------------------------------------------------------------------------
|
||||
|
||||
class TestSignVerify:
|
||||
def test_round_trip_no_body(self):
|
||||
private_pem, public_pem = generate_rsa_keypair()
|
||||
headers = sign_request(
|
||||
private_pem, key_id="https://example.com/users/alice#main-key",
|
||||
method="GET", path="/users/bob/inbox", host="example.com",
|
||||
date="Sat, 15 Jun 2025 12:00:00 GMT",
|
||||
)
|
||||
assert "Signature" in headers
|
||||
assert "Date" in headers
|
||||
assert "Host" in headers
|
||||
assert "Digest" not in headers
|
||||
|
||||
ok = verify_request_signature(
|
||||
public_pem, headers["Signature"], method="GET",
|
||||
path="/users/bob/inbox", headers=headers,
|
||||
)
|
||||
assert ok is True
|
||||
|
||||
def test_round_trip_with_body(self):
|
||||
private_pem, public_pem = generate_rsa_keypair()
|
||||
body = b'{"type": "Follow"}'
|
||||
headers = sign_request(
|
||||
private_pem, key_id="https://example.com/users/alice#main-key",
|
||||
method="POST", path="/users/bob/inbox", host="example.com",
|
||||
body=body, date="Sat, 15 Jun 2025 12:00:00 GMT",
|
||||
)
|
||||
assert "Digest" in headers
|
||||
assert headers["Digest"].startswith("SHA-256=")
|
||||
|
||||
ok = verify_request_signature(
|
||||
public_pem, headers["Signature"], method="POST",
|
||||
path="/users/bob/inbox", headers=headers,
|
||||
)
|
||||
assert ok is True
|
||||
|
||||
def test_wrong_key_fails(self):
|
||||
priv1, _ = generate_rsa_keypair()
|
||||
_, pub2 = generate_rsa_keypair()
|
||||
headers = sign_request(
|
||||
priv1, key_id="key1", method="GET", path="/inbox", host="a.com",
|
||||
date="Sat, 15 Jun 2025 12:00:00 GMT",
|
||||
)
|
||||
ok = verify_request_signature(pub2, headers["Signature"], "GET", "/inbox", headers)
|
||||
assert ok is False
|
||||
|
||||
def test_tampered_path_fails(self):
|
||||
private_pem, public_pem = generate_rsa_keypair()
|
||||
headers = sign_request(
|
||||
private_pem, key_id="key1", method="GET", path="/inbox", host="a.com",
|
||||
date="Sat, 15 Jun 2025 12:00:00 GMT",
|
||||
)
|
||||
ok = verify_request_signature(public_pem, headers["Signature"], "GET", "/tampered", headers)
|
||||
assert ok is False
|
||||
|
||||
def test_tampered_method_fails(self):
|
||||
private_pem, public_pem = generate_rsa_keypair()
|
||||
headers = sign_request(
|
||||
private_pem, key_id="key1", method="GET", path="/inbox", host="a.com",
|
||||
date="Sat, 15 Jun 2025 12:00:00 GMT",
|
||||
)
|
||||
ok = verify_request_signature(public_pem, headers["Signature"], "POST", "/inbox", headers)
|
||||
assert ok is False
|
||||
|
||||
def test_signature_header_contains_key_id(self):
|
||||
private_pem, _ = generate_rsa_keypair()
|
||||
headers = sign_request(
|
||||
private_pem, key_id="https://my.server/actor#main-key",
|
||||
method="POST", path="/inbox", host="remote.server",
|
||||
date="Sat, 15 Jun 2025 12:00:00 GMT",
|
||||
)
|
||||
assert 'keyId="https://my.server/actor#main-key"' in headers["Signature"]
|
||||
assert 'algorithm="rsa-sha256"' in headers["Signature"]
|
||||
|
||||
|
||||
# ---------------------------------------------------------------------------
|
||||
# Linked Data signature
|
||||
# ---------------------------------------------------------------------------
|
||||
|
||||
class TestLDSignature:
|
||||
def test_creates_ld_signature(self):
|
||||
private_pem, _ = generate_rsa_keypair()
|
||||
activity = {"type": "Create", "actor": "https://example.com/users/alice"}
|
||||
sig = create_ld_signature(private_pem, "https://example.com/users/alice#main-key", activity)
|
||||
assert sig["type"] == "RsaSignature2017"
|
||||
assert sig["creator"] == "https://example.com/users/alice#main-key"
|
||||
assert "signatureValue" in sig
|
||||
assert "created" in sig
|
||||
|
||||
def test_deterministic_canonical(self):
|
||||
"""Same activity always produces same canonical form (signature differs due to timestamp)."""
|
||||
private_pem, _ = generate_rsa_keypair()
|
||||
activity = {"b": 2, "a": 1}
|
||||
# The canonical form should sort keys
|
||||
canonical = json.dumps(activity, sort_keys=True, separators=(",", ":"))
|
||||
assert canonical == '{"a":1,"b":2}'
|
||||
114
shared/tests/test_internal_auth.py
Normal file
114
shared/tests/test_internal_auth.py
Normal file
@@ -0,0 +1,114 @@
|
||||
"""Tests for HMAC-based internal service-to-service authentication."""
|
||||
from __future__ import annotations
|
||||
|
||||
import hashlib
|
||||
import hmac
|
||||
import time
|
||||
from unittest.mock import patch
|
||||
|
||||
from shared.infrastructure.internal_auth import sign_internal_headers
|
||||
|
||||
|
||||
# ---------------------------------------------------------------------------
|
||||
# sign_internal_headers
|
||||
# ---------------------------------------------------------------------------
|
||||
|
||||
class TestSignInternalHeaders:
|
||||
def test_returns_required_headers(self):
|
||||
headers = sign_internal_headers("cart")
|
||||
assert "X-Internal-Timestamp" in headers
|
||||
assert "X-Internal-App" in headers
|
||||
assert "X-Internal-Signature" in headers
|
||||
|
||||
def test_app_name_in_header(self):
|
||||
headers = sign_internal_headers("blog")
|
||||
assert headers["X-Internal-App"] == "blog"
|
||||
|
||||
def test_timestamp_is_recent(self):
|
||||
headers = sign_internal_headers("events")
|
||||
ts = int(headers["X-Internal-Timestamp"])
|
||||
now = int(time.time())
|
||||
assert abs(now - ts) < 5
|
||||
|
||||
def test_signature_is_hex(self):
|
||||
headers = sign_internal_headers("cart")
|
||||
sig = headers["X-Internal-Signature"]
|
||||
# SHA-256 hex is 64 chars
|
||||
assert len(sig) == 64
|
||||
int(sig, 16) # should not raise
|
||||
|
||||
def test_different_apps_different_signatures(self):
|
||||
h1 = sign_internal_headers("cart")
|
||||
h2 = sign_internal_headers("blog")
|
||||
assert h1["X-Internal-Signature"] != h2["X-Internal-Signature"]
|
||||
|
||||
|
||||
# ---------------------------------------------------------------------------
|
||||
# Round-trip: sign then validate
|
||||
# ---------------------------------------------------------------------------
|
||||
|
||||
class TestSignAndValidate:
|
||||
"""Test the HMAC signing logic directly without needing a Quart request context."""
|
||||
|
||||
def _validate_headers(self, headers: dict[str, str], secret: bytes, max_age: int = 300) -> bool:
|
||||
"""Replicate validate_internal_request logic without Quart request context."""
|
||||
ts = headers.get("X-Internal-Timestamp", "")
|
||||
app_name = headers.get("X-Internal-App", "")
|
||||
sig = headers.get("X-Internal-Signature", "")
|
||||
|
||||
if not ts or not app_name or not sig:
|
||||
return False
|
||||
|
||||
try:
|
||||
req_time = int(ts)
|
||||
except (ValueError, TypeError):
|
||||
return False
|
||||
|
||||
now = int(time.time())
|
||||
if abs(now - req_time) > max_age:
|
||||
return False
|
||||
|
||||
payload = f"{ts}:{app_name}".encode()
|
||||
expected = hmac.new(secret, payload, hashlib.sha256).hexdigest()
|
||||
return hmac.compare_digest(sig, expected)
|
||||
|
||||
def test_valid_signature(self):
|
||||
from shared.infrastructure.internal_auth import _get_secret
|
||||
secret = _get_secret()
|
||||
headers = sign_internal_headers("relations")
|
||||
assert self._validate_headers(headers, secret) is True
|
||||
|
||||
def test_tampered_signature_fails(self):
|
||||
from shared.infrastructure.internal_auth import _get_secret
|
||||
secret = _get_secret()
|
||||
headers = sign_internal_headers("cart")
|
||||
headers["X-Internal-Signature"] = "0" * 64
|
||||
assert self._validate_headers(headers, secret) is False
|
||||
|
||||
def test_wrong_secret_fails(self):
|
||||
headers = sign_internal_headers("cart")
|
||||
assert self._validate_headers(headers, b"wrong-secret") is False
|
||||
|
||||
def test_expired_timestamp_fails(self):
|
||||
from shared.infrastructure.internal_auth import _get_secret
|
||||
secret = _get_secret()
|
||||
headers = sign_internal_headers("cart")
|
||||
# Set timestamp to 10 minutes ago
|
||||
old_ts = str(int(time.time()) - 600)
|
||||
headers["X-Internal-Timestamp"] = old_ts
|
||||
# Re-sign with old timestamp (so the signature matches the old ts)
|
||||
payload = f"{old_ts}:cart".encode()
|
||||
headers["X-Internal-Signature"] = hmac.new(secret, payload, hashlib.sha256).hexdigest()
|
||||
assert self._validate_headers(headers, secret) is False
|
||||
|
||||
def test_missing_headers_fail(self):
|
||||
from shared.infrastructure.internal_auth import _get_secret
|
||||
secret = _get_secret()
|
||||
assert self._validate_headers({}, secret) is False
|
||||
assert self._validate_headers({"X-Internal-Timestamp": "123"}, secret) is False
|
||||
|
||||
def test_invalid_timestamp_fails(self):
|
||||
from shared.infrastructure.internal_auth import _get_secret
|
||||
secret = _get_secret()
|
||||
headers = {"X-Internal-Timestamp": "not-a-number", "X-Internal-App": "cart", "X-Internal-Signature": "abc"}
|
||||
assert self._validate_headers(headers, secret) is False
|
||||
166
shared/tests/test_jinja_bridge_render.py
Normal file
166
shared/tests/test_jinja_bridge_render.py
Normal file
@@ -0,0 +1,166 @@
|
||||
"""Tests for the render() function and component loading in jinja_bridge.
|
||||
|
||||
These test functionality added in recent commits (render() API,
|
||||
load_sexp_dir, snake→kebab conversion) that isn't covered by the existing
|
||||
shared/sexp/tests/test_jinja_bridge.py.
|
||||
"""
|
||||
from __future__ import annotations
|
||||
|
||||
import os
|
||||
import tempfile
|
||||
|
||||
import pytest
|
||||
|
||||
from shared.sexp.jinja_bridge import (
|
||||
render,
|
||||
register_components,
|
||||
load_sexp_dir,
|
||||
_COMPONENT_ENV,
|
||||
)
|
||||
|
||||
|
||||
@pytest.fixture(autouse=True)
|
||||
def _clean_env():
|
||||
"""Clear component env before each test."""
|
||||
_COMPONENT_ENV.clear()
|
||||
yield
|
||||
_COMPONENT_ENV.clear()
|
||||
|
||||
|
||||
# ---------------------------------------------------------------------------
|
||||
# render() — call component by name with Python kwargs
|
||||
# ---------------------------------------------------------------------------
|
||||
|
||||
class TestRender:
|
||||
def test_basic_render(self):
|
||||
register_components('(defcomp ~badge (&key label) (span :class "badge" label))')
|
||||
html = render("badge", label="New")
|
||||
assert html == '<span class="badge">New</span>'
|
||||
|
||||
def test_tilde_prefix_optional(self):
|
||||
register_components('(defcomp ~pill (&key text) (em text))')
|
||||
# Both forms should work
|
||||
assert render("pill", text="Hi") == render("~pill", text="Hi")
|
||||
|
||||
def test_snake_to_kebab_conversion(self):
|
||||
"""Python snake_case kwargs should map to sexp kebab-case params."""
|
||||
register_components('''
|
||||
(defcomp ~card (&key nav-html link-href)
|
||||
(div :class "card" (a :href link-href nav-html)))
|
||||
''')
|
||||
html = render("card", nav_html="Nav", link_href="/about")
|
||||
assert 'href="/about"' in html
|
||||
assert "Nav" in html
|
||||
|
||||
def test_multiple_kwargs(self):
|
||||
register_components('''
|
||||
(defcomp ~item (&key title price image-url)
|
||||
(div (h3 title) (span price) (img :src image-url)))
|
||||
''')
|
||||
html = render("item", title="Widget", price="£10", image_url="/img/w.jpg")
|
||||
assert "Widget" in html
|
||||
assert "£10" in html
|
||||
assert 'src="/img/w.jpg"' in html
|
||||
|
||||
def test_unknown_component_raises(self):
|
||||
with pytest.raises(ValueError, match="Unknown component"):
|
||||
render("nonexistent", label="x")
|
||||
|
||||
def test_empty_kwargs(self):
|
||||
register_components('(defcomp ~empty () (hr))')
|
||||
html = render("empty")
|
||||
assert html == "<hr>"
|
||||
|
||||
def test_html_escaping_in_values(self):
|
||||
register_components('(defcomp ~safe (&key text) (p text))')
|
||||
html = render("safe", text='<script>alert("xss")</script>')
|
||||
assert "<script>" not in html
|
||||
assert "<script>" in html
|
||||
|
||||
def test_boolean_false_value(self):
|
||||
register_components('''
|
||||
(defcomp ~toggle (&key active)
|
||||
(when active (span "ON")))
|
||||
''')
|
||||
html = render("toggle", active=False)
|
||||
assert "ON" not in html
|
||||
|
||||
def test_boolean_true_value(self):
|
||||
register_components('''
|
||||
(defcomp ~toggle (&key active)
|
||||
(when active (span "ON")))
|
||||
''')
|
||||
html = render("toggle", active=True)
|
||||
assert "ON" in html
|
||||
|
||||
|
||||
# ---------------------------------------------------------------------------
|
||||
# load_sexp_dir
|
||||
# ---------------------------------------------------------------------------
|
||||
|
||||
class TestLoadSexpDir:
|
||||
def test_loads_sexp_files(self):
|
||||
with tempfile.TemporaryDirectory() as tmpdir:
|
||||
# Write a .sexp file
|
||||
with open(os.path.join(tmpdir, "components.sexp"), "w") as f:
|
||||
f.write('(defcomp ~test-comp (&key msg) (div msg))')
|
||||
|
||||
load_sexp_dir(tmpdir)
|
||||
html = render("test-comp", msg="loaded!")
|
||||
assert html == "<div>loaded!</div>"
|
||||
|
||||
def test_loads_sexpr_files(self):
|
||||
with tempfile.TemporaryDirectory() as tmpdir:
|
||||
with open(os.path.join(tmpdir, "nav.sexpr"), "w") as f:
|
||||
f.write('(defcomp ~nav-item (&key href label) (a :href href label))')
|
||||
|
||||
load_sexp_dir(tmpdir)
|
||||
html = render("nav-item", href="/about", label="About")
|
||||
assert 'href="/about"' in html
|
||||
|
||||
def test_loads_multiple_files(self):
|
||||
with tempfile.TemporaryDirectory() as tmpdir:
|
||||
with open(os.path.join(tmpdir, "a.sexp"), "w") as f:
|
||||
f.write('(defcomp ~comp-a (&key x) (b x))')
|
||||
with open(os.path.join(tmpdir, "b.sexp"), "w") as f:
|
||||
f.write('(defcomp ~comp-b (&key y) (i y))')
|
||||
|
||||
load_sexp_dir(tmpdir)
|
||||
assert render("comp-a", x="A") == "<b>A</b>"
|
||||
assert render("comp-b", y="B") == "<i>B</i>"
|
||||
|
||||
def test_empty_directory(self):
|
||||
with tempfile.TemporaryDirectory() as tmpdir:
|
||||
load_sexp_dir(tmpdir) # should not raise
|
||||
|
||||
def test_ignores_non_sexp_files(self):
|
||||
with tempfile.TemporaryDirectory() as tmpdir:
|
||||
with open(os.path.join(tmpdir, "readme.txt"), "w") as f:
|
||||
f.write("not a sexp file")
|
||||
with open(os.path.join(tmpdir, "comp.sexp"), "w") as f:
|
||||
f.write('(defcomp ~real (&key v) (span v))')
|
||||
|
||||
load_sexp_dir(tmpdir)
|
||||
assert "~real" in _COMPONENT_ENV
|
||||
# txt file should not have been loaded
|
||||
assert len([k for k in _COMPONENT_ENV if k.startswith("~")]) == 1
|
||||
|
||||
|
||||
# ---------------------------------------------------------------------------
|
||||
# register_components — multiple definitions in one source
|
||||
# ---------------------------------------------------------------------------
|
||||
|
||||
class TestRegisterComponents:
|
||||
def test_multiple_in_one_source(self):
|
||||
register_components('''
|
||||
(defcomp ~a (&key x) (b x))
|
||||
(defcomp ~b (&key y) (i y))
|
||||
''')
|
||||
assert "~a" in _COMPONENT_ENV
|
||||
assert "~b" in _COMPONENT_ENV
|
||||
|
||||
def test_overwrite_existing(self):
|
||||
register_components('(defcomp ~ow (&key x) (b x))')
|
||||
assert render("ow", x="v1") == "<b>v1</b>"
|
||||
register_components('(defcomp ~ow (&key x) (i x))')
|
||||
assert render("ow", x="v2") == "<i>v2</i>"
|
||||
95
shared/tests/test_parse_utils.py
Normal file
95
shared/tests/test_parse_utils.py
Normal file
@@ -0,0 +1,95 @@
|
||||
"""Tests for parse utility functions."""
|
||||
from __future__ import annotations
|
||||
|
||||
from datetime import time, datetime, timezone
|
||||
|
||||
from shared.browser.app.utils.parse import parse_time, parse_cost, parse_dt
|
||||
|
||||
|
||||
# ---------------------------------------------------------------------------
|
||||
# parse_time
|
||||
# ---------------------------------------------------------------------------
|
||||
|
||||
class TestParseTime:
|
||||
def test_valid_time(self):
|
||||
result = parse_time("14:30")
|
||||
assert result == time(14, 30)
|
||||
|
||||
def test_midnight(self):
|
||||
result = parse_time("00:00")
|
||||
assert result == time(0, 0)
|
||||
|
||||
def test_end_of_day(self):
|
||||
result = parse_time("23:59")
|
||||
assert result == time(23, 59)
|
||||
|
||||
def test_none_input(self):
|
||||
assert parse_time(None) is None
|
||||
|
||||
def test_empty_string(self):
|
||||
assert parse_time("") is None
|
||||
|
||||
def test_invalid_format(self):
|
||||
assert parse_time("not-a-time") is None
|
||||
|
||||
def test_invalid_hours(self):
|
||||
assert parse_time("25:00") is None
|
||||
|
||||
def test_single_digit(self):
|
||||
result = parse_time("9:05")
|
||||
assert result == time(9, 5)
|
||||
|
||||
|
||||
# ---------------------------------------------------------------------------
|
||||
# parse_cost
|
||||
# ---------------------------------------------------------------------------
|
||||
|
||||
class TestParseCost:
|
||||
def test_valid_float(self):
|
||||
assert parse_cost("19.99") == 19.99
|
||||
|
||||
def test_integer_string(self):
|
||||
assert parse_cost("10") == 10.0
|
||||
|
||||
def test_zero(self):
|
||||
assert parse_cost("0") == 0.0
|
||||
|
||||
def test_none_input(self):
|
||||
assert parse_cost(None) is None
|
||||
|
||||
def test_empty_string(self):
|
||||
assert parse_cost("") is None
|
||||
|
||||
def test_invalid_string(self):
|
||||
assert parse_cost("not-a-number") is None
|
||||
|
||||
|
||||
# ---------------------------------------------------------------------------
|
||||
# parse_dt
|
||||
# ---------------------------------------------------------------------------
|
||||
|
||||
class TestParseDt:
|
||||
def test_iso_format(self):
|
||||
result = parse_dt("2025-06-15T14:30:00")
|
||||
assert isinstance(result, datetime)
|
||||
assert result.year == 2025
|
||||
assert result.month == 6
|
||||
assert result.day == 15
|
||||
|
||||
def test_naive_gets_utc(self):
|
||||
result = parse_dt("2025-06-15T14:30:00")
|
||||
assert result.tzinfo == timezone.utc
|
||||
|
||||
def test_aware_preserved(self):
|
||||
result = parse_dt("2025-06-15T14:30:00+01:00")
|
||||
assert result.tzinfo is not None
|
||||
|
||||
def test_none_input(self):
|
||||
assert parse_dt(None) is None
|
||||
|
||||
def test_empty_string(self):
|
||||
assert parse_dt("") is None
|
||||
|
||||
def test_date_only(self):
|
||||
result = parse_dt("2025-06-15")
|
||||
assert result.year == 2025
|
||||
74
shared/tests/test_sexp_helpers.py
Normal file
74
shared/tests/test_sexp_helpers.py
Normal file
@@ -0,0 +1,74 @@
|
||||
"""Tests for shared sexp helper functions (call_url, get_asset_url, etc.)."""
|
||||
from __future__ import annotations
|
||||
|
||||
from shared.sexp.helpers import call_url, get_asset_url
|
||||
|
||||
|
||||
# ---------------------------------------------------------------------------
|
||||
# call_url
|
||||
# ---------------------------------------------------------------------------
|
||||
|
||||
class TestCallUrl:
|
||||
def test_callable_url_fn(self):
|
||||
ctx = {"blog_url": lambda path: f"https://blog.example.com{path}"}
|
||||
assert call_url(ctx, "blog_url", "/posts/") == "https://blog.example.com/posts/"
|
||||
|
||||
def test_callable_default_path(self):
|
||||
ctx = {"blog_url": lambda path: f"https://blog.example.com{path}"}
|
||||
assert call_url(ctx, "blog_url") == "https://blog.example.com/"
|
||||
|
||||
def test_string_url(self):
|
||||
ctx = {"blog_url": "https://blog.example.com"}
|
||||
assert call_url(ctx, "blog_url", "/posts/") == "https://blog.example.com/posts/"
|
||||
|
||||
def test_string_url_default_path(self):
|
||||
ctx = {"blog_url": "https://blog.example.com"}
|
||||
assert call_url(ctx, "blog_url") == "https://blog.example.com/"
|
||||
|
||||
def test_missing_key(self):
|
||||
ctx = {}
|
||||
assert call_url(ctx, "blog_url", "/x") == "/x"
|
||||
|
||||
def test_none_value(self):
|
||||
ctx = {"blog_url": None}
|
||||
assert call_url(ctx, "blog_url", "/x") == "/x"
|
||||
|
||||
def test_callable_with_empty_path(self):
|
||||
ctx = {"cart_url": lambda path: f"https://cart.example.com{path}"}
|
||||
assert call_url(ctx, "cart_url", "") == "https://cart.example.com"
|
||||
|
||||
|
||||
# ---------------------------------------------------------------------------
|
||||
# get_asset_url
|
||||
# ---------------------------------------------------------------------------
|
||||
|
||||
class TestGetAssetUrl:
|
||||
def test_callable_asset_url(self):
|
||||
ctx = {"asset_url": lambda path: f"https://cdn.example.com/static{path}"}
|
||||
result = get_asset_url(ctx)
|
||||
# Should strip the trailing path component
|
||||
assert "cdn.example.com" in result
|
||||
|
||||
def test_string_asset_url(self):
|
||||
ctx = {"asset_url": "https://cdn.example.com/static"}
|
||||
assert get_asset_url(ctx) == "https://cdn.example.com/static"
|
||||
|
||||
def test_missing_asset_url(self):
|
||||
ctx = {}
|
||||
assert get_asset_url(ctx) == ""
|
||||
|
||||
def test_none_asset_url(self):
|
||||
ctx = {"asset_url": None}
|
||||
assert get_asset_url(ctx) == ""
|
||||
|
||||
def test_callable_returns_path_only(self):
|
||||
# au("") → "/static", rsplit("/",1)[0] → "" (splits on leading /)
|
||||
ctx = {"asset_url": lambda path: f"/static{path}"}
|
||||
result = get_asset_url(ctx)
|
||||
assert result == ""
|
||||
|
||||
def test_callable_with_nested_path(self):
|
||||
# au("") → "/assets/static", rsplit("/",1)[0] → "/assets"
|
||||
ctx = {"asset_url": lambda path: f"/assets/static{path}"}
|
||||
result = get_asset_url(ctx)
|
||||
assert result == "/assets"
|
||||
108
shared/tests/test_url_utils.py
Normal file
108
shared/tests/test_url_utils.py
Normal file
@@ -0,0 +1,108 @@
|
||||
"""Tests for URL join utilities."""
|
||||
from __future__ import annotations
|
||||
|
||||
from shared.utils import _join_url_parts, join_url, normalize_text, soup_of
|
||||
|
||||
|
||||
# ---------------------------------------------------------------------------
|
||||
# _join_url_parts
|
||||
# ---------------------------------------------------------------------------
|
||||
|
||||
class TestJoinUrlParts:
|
||||
def test_empty_list(self):
|
||||
assert _join_url_parts([]) == ""
|
||||
|
||||
def test_single_part(self):
|
||||
assert _join_url_parts(["hello"]) == "hello"
|
||||
|
||||
def test_two_parts(self):
|
||||
assert _join_url_parts(["https://example.com", "path"]) == "https://example.com/path"
|
||||
|
||||
def test_preserves_scheme(self):
|
||||
assert _join_url_parts(["https://example.com/", "/api/", "v1"]) == "https://example.com/api/v1"
|
||||
|
||||
def test_trailing_slash_preserved(self):
|
||||
result = _join_url_parts(["https://example.com", "path/"])
|
||||
assert result.endswith("/")
|
||||
|
||||
def test_no_trailing_slash_when_last_has_none(self):
|
||||
result = _join_url_parts(["https://example.com", "path"])
|
||||
assert not result.endswith("/")
|
||||
|
||||
def test_strips_internal_slashes(self):
|
||||
result = _join_url_parts(["https://example.com/", "/api/", "/v1"])
|
||||
assert result == "https://example.com/api/v1"
|
||||
|
||||
def test_absolute_url_mid_list_replaces(self):
|
||||
result = _join_url_parts(["https://old.com/foo", "https://new.com/bar"])
|
||||
assert result == "https://new.com/bar"
|
||||
|
||||
def test_query_string_attached(self):
|
||||
result = _join_url_parts(["https://example.com/path", "?key=val"])
|
||||
assert result == "https://example.com/path?key=val"
|
||||
|
||||
def test_fragment_attached(self):
|
||||
result = _join_url_parts(["https://example.com/path", "#section"])
|
||||
assert result == "https://example.com/path#section"
|
||||
|
||||
def test_filters_none_and_empty(self):
|
||||
result = _join_url_parts(["https://example.com", None, "", "path"])
|
||||
assert result == "https://example.com/path"
|
||||
|
||||
def test_no_scheme(self):
|
||||
result = _join_url_parts(["/foo", "bar", "baz/"])
|
||||
assert result == "foo/bar/baz/"
|
||||
|
||||
def test_multiple_segments(self):
|
||||
result = _join_url_parts(["https://example.com", "a", "b", "c/"])
|
||||
assert result == "https://example.com/a/b/c/"
|
||||
|
||||
|
||||
# ---------------------------------------------------------------------------
|
||||
# join_url
|
||||
# ---------------------------------------------------------------------------
|
||||
|
||||
class TestJoinUrl:
|
||||
def test_string_input(self):
|
||||
assert join_url("https://example.com") == "https://example.com"
|
||||
|
||||
def test_list_input(self):
|
||||
assert join_url(["https://example.com", "path"]) == "https://example.com/path"
|
||||
|
||||
def test_tuple_input(self):
|
||||
assert join_url(("https://example.com", "path/")) == "https://example.com/path/"
|
||||
|
||||
|
||||
# ---------------------------------------------------------------------------
|
||||
# normalize_text
|
||||
# ---------------------------------------------------------------------------
|
||||
|
||||
class TestNormalizeText:
|
||||
def test_collapses_whitespace(self):
|
||||
assert normalize_text(" hello world ") == "hello world"
|
||||
|
||||
def test_tabs_and_newlines(self):
|
||||
assert normalize_text("hello\t\nworld") == "hello world"
|
||||
|
||||
def test_empty_string(self):
|
||||
assert normalize_text("") == ""
|
||||
|
||||
def test_none_input(self):
|
||||
assert normalize_text(None) == ""
|
||||
|
||||
def test_single_word(self):
|
||||
assert normalize_text(" word ") == "word"
|
||||
|
||||
|
||||
# ---------------------------------------------------------------------------
|
||||
# soup_of
|
||||
# ---------------------------------------------------------------------------
|
||||
|
||||
class TestSoupOf:
|
||||
def test_parses_html(self):
|
||||
s = soup_of("<p>Hello <b>world</b></p>")
|
||||
assert s.find("b").text == "world"
|
||||
|
||||
def test_empty_html(self):
|
||||
s = soup_of("")
|
||||
assert s.text == ""
|
||||
60
test/Dockerfile
Normal file
60
test/Dockerfile
Normal file
@@ -0,0 +1,60 @@
|
||||
# syntax=docker/dockerfile:1
|
||||
|
||||
FROM python:3.11-slim AS base
|
||||
|
||||
ENV PYTHONDONTWRITEBYTECODE=1 \
|
||||
PYTHONUNBUFFERED=1 \
|
||||
PYTHONPATH=/app \
|
||||
PIP_NO_CACHE_DIR=1 \
|
||||
APP_PORT=8000 \
|
||||
APP_MODULE=app:app
|
||||
|
||||
WORKDIR /app
|
||||
|
||||
RUN apt-get update && apt-get install -y --no-install-recommends \
|
||||
ca-certificates \
|
||||
&& rm -rf /var/lib/apt/lists/*
|
||||
|
||||
COPY shared/requirements.txt ./requirements.txt
|
||||
RUN pip install -r requirements.txt && \
|
||||
pip install pytest pytest-json-report
|
||||
|
||||
# Shared code (including tests)
|
||||
COPY shared/ ./shared/
|
||||
|
||||
# App code
|
||||
COPY test/ ./test-app-tmp/
|
||||
# Move service files into /app (flatten), but keep Dockerfile.* in place
|
||||
RUN cp -r test-app-tmp/app.py test-app-tmp/path_setup.py \
|
||||
test-app-tmp/bp test-app-tmp/sexp test-app-tmp/services \
|
||||
test-app-tmp/runner.py test-app-tmp/__init__.py ./ 2>/dev/null || true && \
|
||||
rm -rf test-app-tmp
|
||||
|
||||
# Sibling models for cross-domain SQLAlchemy imports
|
||||
COPY blog/__init__.py ./blog/__init__.py
|
||||
COPY blog/models/ ./blog/models/
|
||||
COPY market/__init__.py ./market/__init__.py
|
||||
COPY market/models/ ./market/models/
|
||||
COPY cart/__init__.py ./cart/__init__.py
|
||||
COPY cart/models/ ./cart/models/
|
||||
COPY events/__init__.py ./events/__init__.py
|
||||
COPY events/models/ ./events/models/
|
||||
COPY federation/__init__.py ./federation/__init__.py
|
||||
COPY federation/models/ ./federation/models/
|
||||
COPY account/__init__.py ./account/__init__.py
|
||||
COPY account/models/ ./account/models/
|
||||
COPY relations/__init__.py ./relations/__init__.py
|
||||
COPY relations/models/ ./relations/models/
|
||||
COPY likes/__init__.py ./likes/__init__.py
|
||||
COPY likes/models/ ./likes/models/
|
||||
COPY orders/__init__.py ./orders/__init__.py
|
||||
COPY orders/models/ ./orders/models/
|
||||
|
||||
COPY test/entrypoint.sh /usr/local/bin/entrypoint.sh
|
||||
RUN chmod +x /usr/local/bin/entrypoint.sh
|
||||
|
||||
RUN useradd -m -u 10001 appuser && chown -R appuser:appuser /app
|
||||
USER appuser
|
||||
|
||||
EXPOSE ${APP_PORT}
|
||||
ENTRYPOINT ["/usr/local/bin/entrypoint.sh"]
|
||||
0
test/__init__.py
Normal file
0
test/__init__.py
Normal file
44
test/app.py
Normal file
44
test/app.py
Normal file
@@ -0,0 +1,44 @@
|
||||
from __future__ import annotations
|
||||
import path_setup # noqa: F401
|
||||
import sexp.sexp_components as sexp_components # noqa: F401
|
||||
|
||||
from shared.infrastructure.factory import create_base_app
|
||||
|
||||
from bp import register_dashboard
|
||||
from services import register_domain_services
|
||||
|
||||
|
||||
async def test_context() -> dict:
|
||||
"""Test app context processor — minimal, no cross-service fragments."""
|
||||
from shared.infrastructure.context import base_context
|
||||
|
||||
ctx = await base_context()
|
||||
ctx["menu_items"] = []
|
||||
ctx["cart_mini_html"] = ""
|
||||
ctx["auth_menu_html"] = ""
|
||||
ctx["nav_tree_html"] = ""
|
||||
return ctx
|
||||
|
||||
|
||||
def create_app() -> "Quart":
|
||||
app = create_base_app(
|
||||
"test",
|
||||
context_fn=test_context,
|
||||
domain_services_fn=register_domain_services,
|
||||
)
|
||||
|
||||
import sexp.sexp_components # noqa: F401
|
||||
|
||||
app.register_blueprint(register_dashboard(url_prefix="/"))
|
||||
|
||||
# Run tests on startup
|
||||
@app.before_serving
|
||||
async def _run_tests_on_startup():
|
||||
import runner
|
||||
import asyncio
|
||||
asyncio.create_task(runner.run_tests())
|
||||
|
||||
return app
|
||||
|
||||
|
||||
app = create_app()
|
||||
1
test/bp/__init__.py
Normal file
1
test/bp/__init__.py
Normal file
@@ -0,0 +1 @@
|
||||
from .dashboard.routes import register as register_dashboard
|
||||
0
test/bp/dashboard/__init__.py
Normal file
0
test/bp/dashboard/__init__.py
Normal file
64
test/bp/dashboard/routes.py
Normal file
64
test/bp/dashboard/routes.py
Normal file
@@ -0,0 +1,64 @@
|
||||
"""Test dashboard routes."""
|
||||
from __future__ import annotations
|
||||
|
||||
import asyncio
|
||||
|
||||
from quart import Blueprint, Response, make_response, request
|
||||
|
||||
|
||||
def register(url_prefix: str = "/") -> Blueprint:
|
||||
bp = Blueprint("dashboard", __name__, url_prefix=url_prefix)
|
||||
|
||||
@bp.get("/")
|
||||
async def index():
|
||||
"""Full page dashboard with last results."""
|
||||
from shared.sexp.page import get_template_context
|
||||
from shared.browser.app.csrf import generate_csrf_token
|
||||
from sexp.sexp_components import render_dashboard_page
|
||||
import runner
|
||||
|
||||
ctx = await get_template_context()
|
||||
result = runner.get_results()
|
||||
running = runner.is_running()
|
||||
csrf = generate_csrf_token()
|
||||
|
||||
html = await render_dashboard_page(ctx, result, running, csrf)
|
||||
return await make_response(html, 200)
|
||||
|
||||
@bp.post("/run")
|
||||
async def run():
|
||||
"""Trigger a test run, redirect to /."""
|
||||
import runner
|
||||
|
||||
if not runner.is_running():
|
||||
asyncio.create_task(runner.run_tests())
|
||||
|
||||
# HX-Redirect for HTMX, regular redirect for non-HTMX
|
||||
if request.headers.get("HX-Request"):
|
||||
resp = Response("", status=200)
|
||||
resp.headers["HX-Redirect"] = "/"
|
||||
return resp
|
||||
|
||||
from quart import redirect as qredirect
|
||||
return qredirect("/")
|
||||
|
||||
@bp.get("/results")
|
||||
async def results():
|
||||
"""HTMX partial — poll target for results table."""
|
||||
from shared.browser.app.csrf import generate_csrf_token
|
||||
from sexp.sexp_components import render_results_partial
|
||||
import runner
|
||||
|
||||
result = runner.get_results()
|
||||
running = runner.is_running()
|
||||
csrf = generate_csrf_token()
|
||||
|
||||
html = await render_results_partial(result, running, csrf)
|
||||
|
||||
resp = Response(html, status=200, content_type="text/html")
|
||||
# If still running, tell HTMX to keep polling
|
||||
if running:
|
||||
resp.headers["HX-Trigger-After-Swap"] = "test-still-running"
|
||||
return resp
|
||||
|
||||
return bp
|
||||
24
test/entrypoint.sh
Executable file
24
test/entrypoint.sh
Executable file
@@ -0,0 +1,24 @@
|
||||
#!/usr/bin/env bash
|
||||
set -euo pipefail
|
||||
|
||||
# No database — skip DB wait and migrations
|
||||
|
||||
# Clear Redis page cache on deploy
|
||||
if [[ -n "${REDIS_URL:-}" && "${REDIS_URL}" != "no" ]]; then
|
||||
python3 -c "
|
||||
import redis, os
|
||||
r = redis.from_url(os.environ['REDIS_URL'])
|
||||
r.flushdb()
|
||||
" || echo "Redis flush failed (non-fatal), continuing..."
|
||||
fi
|
||||
|
||||
# Start the app
|
||||
RELOAD_FLAG=""
|
||||
if [[ "${RELOAD:-}" == "true" ]]; then
|
||||
RELOAD_FLAG="--reload"
|
||||
fi
|
||||
PYTHONUNBUFFERED=1 exec hypercorn "${APP_MODULE:-app:app}" \
|
||||
--bind 0.0.0.0:${PORT:-8000} \
|
||||
--workers ${WORKERS:-1} \
|
||||
--keep-alive 75 \
|
||||
${RELOAD_FLAG}
|
||||
9
test/path_setup.py
Normal file
9
test/path_setup.py
Normal file
@@ -0,0 +1,9 @@
|
||||
import sys
|
||||
import os
|
||||
|
||||
_app_dir = os.path.dirname(os.path.abspath(__file__))
|
||||
_project_root = os.path.dirname(_app_dir)
|
||||
|
||||
for _p in (_project_root, _app_dir):
|
||||
if _p not in sys.path:
|
||||
sys.path.insert(0, _p)
|
||||
135
test/runner.py
Normal file
135
test/runner.py
Normal file
@@ -0,0 +1,135 @@
|
||||
"""Pytest subprocess runner + in-memory result storage."""
|
||||
from __future__ import annotations
|
||||
|
||||
import asyncio
|
||||
import json
|
||||
import logging
|
||||
import time
|
||||
from pathlib import Path
|
||||
|
||||
log = logging.getLogger(__name__)
|
||||
|
||||
# In-memory state
|
||||
_last_result: dict | None = None
|
||||
_running: bool = False
|
||||
|
||||
# Paths to test directories (relative to /app in Docker)
|
||||
_TEST_DIRS = [
|
||||
"shared/tests/",
|
||||
"shared/sexp/tests/",
|
||||
]
|
||||
|
||||
_REPORT_PATH = "/tmp/test-report.json"
|
||||
|
||||
|
||||
async def run_tests() -> dict:
|
||||
"""Run pytest in subprocess, parse JSON report, store results."""
|
||||
global _last_result, _running
|
||||
|
||||
if _running:
|
||||
return {"status": "already_running"}
|
||||
|
||||
_running = True
|
||||
started_at = time.time()
|
||||
|
||||
try:
|
||||
cmd = [
|
||||
"python3", "-m", "pytest",
|
||||
*_TEST_DIRS,
|
||||
"--json-report",
|
||||
f"--json-report-file={_REPORT_PATH}",
|
||||
"-q",
|
||||
"--tb=short",
|
||||
]
|
||||
|
||||
proc = await asyncio.create_subprocess_exec(
|
||||
*cmd,
|
||||
stdout=asyncio.subprocess.PIPE,
|
||||
stderr=asyncio.subprocess.STDOUT,
|
||||
cwd="/app",
|
||||
)
|
||||
stdout, _ = await proc.communicate()
|
||||
finished_at = time.time()
|
||||
|
||||
# Parse JSON report
|
||||
report_path = Path(_REPORT_PATH)
|
||||
if report_path.exists():
|
||||
try:
|
||||
report = json.loads(report_path.read_text())
|
||||
except (json.JSONDecodeError, OSError):
|
||||
report = {}
|
||||
else:
|
||||
report = {}
|
||||
|
||||
summary = report.get("summary", {})
|
||||
tests_raw = report.get("tests", [])
|
||||
|
||||
tests = []
|
||||
for t in tests_raw:
|
||||
tests.append({
|
||||
"nodeid": t.get("nodeid", ""),
|
||||
"outcome": t.get("outcome", "unknown"),
|
||||
"duration": round(t.get("duration", 0), 4),
|
||||
"longrepr": (t.get("call", {}) or {}).get("longrepr", ""),
|
||||
})
|
||||
|
||||
passed = summary.get("passed", 0)
|
||||
failed = summary.get("failed", 0)
|
||||
errors = summary.get("error", 0)
|
||||
skipped = summary.get("skipped", 0)
|
||||
total = summary.get("total", len(tests))
|
||||
|
||||
if failed > 0 or errors > 0:
|
||||
status = "failed"
|
||||
else:
|
||||
status = "passed"
|
||||
|
||||
_last_result = {
|
||||
"status": status,
|
||||
"started_at": started_at,
|
||||
"finished_at": finished_at,
|
||||
"duration": round(finished_at - started_at, 2),
|
||||
"passed": passed,
|
||||
"failed": failed,
|
||||
"errors": errors,
|
||||
"skipped": skipped,
|
||||
"total": total,
|
||||
"tests": tests,
|
||||
"stdout": (stdout or b"").decode("utf-8", errors="replace")[-5000:],
|
||||
}
|
||||
|
||||
log.info(
|
||||
"Test run complete: %s (%d passed, %d failed, %d errors, %.1fs)",
|
||||
status, passed, failed, errors, _last_result["duration"],
|
||||
)
|
||||
return _last_result
|
||||
|
||||
except Exception:
|
||||
log.exception("Test run failed")
|
||||
finished_at = time.time()
|
||||
_last_result = {
|
||||
"status": "error",
|
||||
"started_at": started_at,
|
||||
"finished_at": finished_at,
|
||||
"duration": round(finished_at - started_at, 2),
|
||||
"passed": 0,
|
||||
"failed": 0,
|
||||
"errors": 1,
|
||||
"skipped": 0,
|
||||
"total": 0,
|
||||
"tests": [],
|
||||
"stdout": "",
|
||||
}
|
||||
return _last_result
|
||||
finally:
|
||||
_running = False
|
||||
|
||||
|
||||
def get_results() -> dict | None:
|
||||
"""Return last run results."""
|
||||
return _last_result
|
||||
|
||||
|
||||
def is_running() -> bool:
|
||||
"""Check if tests are currently running."""
|
||||
return _running
|
||||
6
test/services/__init__.py
Normal file
6
test/services/__init__.py
Normal file
@@ -0,0 +1,6 @@
|
||||
"""Test app service registration."""
|
||||
from __future__ import annotations
|
||||
|
||||
|
||||
def register_domain_services() -> None:
|
||||
"""Register services for the test app (none needed)."""
|
||||
0
test/sexp/__init__.py
Normal file
0
test/sexp/__init__.py
Normal file
88
test/sexp/dashboard.sexpr
Normal file
88
test/sexp/dashboard.sexpr
Normal file
@@ -0,0 +1,88 @@
|
||||
;; Test dashboard components
|
||||
|
||||
(defcomp ~test-status-badge (&key status)
|
||||
(span :class (str "inline-flex items-center rounded-full border px-3 py-1 text-sm font-medium "
|
||||
(if (= status "running") "border-amber-300 bg-amber-50 text-amber-700 animate-pulse"
|
||||
(if (= status "passed") "border-emerald-300 bg-emerald-50 text-emerald-700"
|
||||
(if (= status "failed") "border-rose-300 bg-rose-50 text-rose-700"
|
||||
"border-stone-300 bg-stone-50 text-stone-700"))))
|
||||
status))
|
||||
|
||||
(defcomp ~test-run-button (&key running csrf)
|
||||
(form :method "POST" :action "/run" :class "inline"
|
||||
(input :type "hidden" :name "csrf_token" :value csrf)
|
||||
(button :type "submit"
|
||||
:class (str "rounded bg-stone-800 px-4 py-2 text-sm font-medium text-white hover:bg-stone-700 "
|
||||
"disabled:opacity-50 disabled:cursor-not-allowed transition-colors")
|
||||
:disabled (if running "true" nil)
|
||||
(if running "Running..." "Run Tests"))))
|
||||
|
||||
(defcomp ~test-summary (&key status passed failed errors skipped total duration last-run running csrf)
|
||||
(div :class "space-y-4"
|
||||
(div :class "flex items-center justify-between flex-wrap gap-3"
|
||||
(div :class "flex items-center gap-3"
|
||||
(h2 :class "text-lg font-semibold text-stone-800" "Test Results")
|
||||
(when status (~test-status-badge :status status)))
|
||||
(~test-run-button :running running :csrf csrf))
|
||||
(when status
|
||||
(div :class "grid grid-cols-2 sm:grid-cols-3 md:grid-cols-6 gap-3"
|
||||
(div :class "rounded border border-stone-200 bg-white p-3 text-center"
|
||||
(div :class "text-2xl font-bold text-stone-800" total)
|
||||
(div :class "text-xs text-stone-500" "Total"))
|
||||
(div :class "rounded border border-emerald-200 bg-emerald-50 p-3 text-center"
|
||||
(div :class "text-2xl font-bold text-emerald-700" passed)
|
||||
(div :class "text-xs text-emerald-600" "Passed"))
|
||||
(div :class "rounded border border-rose-200 bg-rose-50 p-3 text-center"
|
||||
(div :class "text-2xl font-bold text-rose-700" failed)
|
||||
(div :class "text-xs text-rose-600" "Failed"))
|
||||
(div :class "rounded border border-orange-200 bg-orange-50 p-3 text-center"
|
||||
(div :class "text-2xl font-bold text-orange-700" errors)
|
||||
(div :class "text-xs text-orange-600" "Errors"))
|
||||
(div :class "rounded border border-sky-200 bg-sky-50 p-3 text-center"
|
||||
(div :class "text-2xl font-bold text-sky-700" skipped)
|
||||
(div :class "text-xs text-sky-600" "Skipped"))
|
||||
(div :class "rounded border border-stone-200 bg-white p-3 text-center"
|
||||
(div :class "text-2xl font-bold text-stone-800" (str duration "s"))
|
||||
(div :class "text-xs text-stone-500" "Duration")))
|
||||
(div :class "text-xs text-stone-400" (str "Last run: " last-run)))))
|
||||
|
||||
(defcomp ~test-row (&key nodeid outcome duration longrepr)
|
||||
(tr :class (str "border-b border-stone-100 "
|
||||
(if (= outcome "passed") "bg-white"
|
||||
(if (= outcome "failed") "bg-rose-50"
|
||||
(if (= outcome "skipped") "bg-sky-50"
|
||||
"bg-orange-50"))))
|
||||
(td :class "px-3 py-2 text-xs font-mono text-stone-700 max-w-0 truncate" :title nodeid nodeid)
|
||||
(td :class "px-3 py-2 text-center"
|
||||
(span :class (str "inline-flex items-center rounded-full border px-2 py-0.5 text-[11px] font-medium "
|
||||
(if (= outcome "passed") "border-emerald-300 bg-emerald-50 text-emerald-700"
|
||||
(if (= outcome "failed") "border-rose-300 bg-rose-50 text-rose-700"
|
||||
(if (= outcome "skipped") "border-sky-300 bg-sky-50 text-sky-700"
|
||||
"border-orange-300 bg-orange-50 text-orange-700"))))
|
||||
outcome))
|
||||
(td :class "px-3 py-2 text-right text-xs text-stone-500 tabular-nums" (str duration "s"))
|
||||
(td :class "px-3 py-2 text-xs text-rose-600 font-mono max-w-xs truncate" :title longrepr
|
||||
(when longrepr longrepr))))
|
||||
|
||||
(defcomp ~test-results-table (&key rows-html has-failures)
|
||||
(div :class "overflow-x-auto rounded border border-stone-200 bg-white"
|
||||
(table :class "w-full text-left"
|
||||
(thead
|
||||
(tr :class "border-b border-stone-200 bg-stone-50"
|
||||
(th :class "px-3 py-2 text-xs font-medium text-stone-600" "Test")
|
||||
(th :class "px-3 py-2 text-xs font-medium text-stone-600 text-center w-24" "Status")
|
||||
(th :class "px-3 py-2 text-xs font-medium text-stone-600 text-right w-20" "Time")
|
||||
(th :class "px-3 py-2 text-xs font-medium text-stone-600 w-48" "Error")))
|
||||
(tbody rows-html))))
|
||||
|
||||
(defcomp ~test-running-indicator ()
|
||||
(div :class "flex items-center justify-center py-12 text-stone-500"
|
||||
(div :class "flex items-center gap-3"
|
||||
(div :class "animate-spin h-6 w-6 border-2 border-stone-300 border-t-stone-600 rounded-full")
|
||||
(span :class "text-sm" "Running tests..."))))
|
||||
|
||||
(defcomp ~test-no-results ()
|
||||
(div :class "flex items-center justify-center py-12 text-stone-400"
|
||||
(div :class "text-center"
|
||||
(div :class "text-4xl mb-2" "?")
|
||||
(div :class "text-sm" "No test results yet. Click \"Run Tests\" to start."))))
|
||||
105
test/sexp/sexp_components.py
Normal file
105
test/sexp/sexp_components.py
Normal file
@@ -0,0 +1,105 @@
|
||||
"""Test service s-expression page components."""
|
||||
from __future__ import annotations
|
||||
|
||||
import os
|
||||
from datetime import datetime
|
||||
|
||||
from shared.sexp.jinja_bridge import render, load_service_components
|
||||
from shared.sexp.helpers import root_header_html, full_page
|
||||
|
||||
# Load test-specific .sexpr components at import time
|
||||
load_service_components(os.path.dirname(os.path.dirname(__file__)))
|
||||
|
||||
|
||||
def _format_time(ts: float | None) -> str:
|
||||
"""Format a unix timestamp for display."""
|
||||
if not ts:
|
||||
return "never"
|
||||
return datetime.fromtimestamp(ts).strftime("%-d %b %Y, %H:%M:%S")
|
||||
|
||||
|
||||
def _test_rows_html(tests: list[dict]) -> str:
|
||||
"""Render all test result rows."""
|
||||
parts = []
|
||||
for t in tests:
|
||||
parts.append(render(
|
||||
"test-row",
|
||||
nodeid=t["nodeid"],
|
||||
outcome=t["outcome"],
|
||||
duration=str(t["duration"]),
|
||||
longrepr=t.get("longrepr", ""),
|
||||
))
|
||||
return "".join(parts)
|
||||
|
||||
|
||||
def _results_partial_html(result: dict | None, running: bool, csrf: str) -> str:
|
||||
"""Render the results section (summary + table or running indicator)."""
|
||||
if running and not result:
|
||||
summary = render(
|
||||
"test-summary",
|
||||
status="running", passed="0", failed="0", errors="0",
|
||||
skipped="0", total="0", duration="...",
|
||||
last_run="in progress", running=True, csrf=csrf,
|
||||
)
|
||||
return summary + render("test-running-indicator")
|
||||
|
||||
if not result:
|
||||
summary = render(
|
||||
"test-summary",
|
||||
status=None, passed="0", failed="0", errors="0",
|
||||
skipped="0", total="0", duration="0",
|
||||
last_run="never", running=running, csrf=csrf,
|
||||
)
|
||||
return summary + render("test-no-results")
|
||||
|
||||
status = "running" if running else result["status"]
|
||||
summary = render(
|
||||
"test-summary",
|
||||
status=status,
|
||||
passed=str(result["passed"]),
|
||||
failed=str(result["failed"]),
|
||||
errors=str(result["errors"]),
|
||||
skipped=str(result.get("skipped", 0)),
|
||||
total=str(result["total"]),
|
||||
duration=str(result["duration"]),
|
||||
last_run=_format_time(result["finished_at"]) if not running else "in progress",
|
||||
running=running,
|
||||
csrf=csrf,
|
||||
)
|
||||
|
||||
if running:
|
||||
return summary + render("test-running-indicator")
|
||||
|
||||
tests = result.get("tests", [])
|
||||
if not tests:
|
||||
return summary + render("test-no-results")
|
||||
|
||||
has_failures = result["failed"] > 0 or result["errors"] > 0
|
||||
rows = _test_rows_html(tests)
|
||||
table = render("test-results-table", rows_html=rows,
|
||||
has_failures=str(has_failures).lower())
|
||||
return summary + table
|
||||
|
||||
|
||||
def _wrap_results_div(inner_html: str, running: bool) -> str:
|
||||
"""Wrap results in a div with HTMX polling when running."""
|
||||
attrs = 'id="test-results" class="space-y-6 p-4"'
|
||||
if running:
|
||||
attrs += ' hx-get="/results" hx-trigger="every 2s" hx-swap="outerHTML"'
|
||||
return f'<div {attrs}>{inner_html}</div>'
|
||||
|
||||
|
||||
async def render_dashboard_page(ctx: dict, result: dict | None,
|
||||
running: bool, csrf: str) -> str:
|
||||
"""Full page: test dashboard."""
|
||||
hdr = root_header_html(ctx)
|
||||
inner = _results_partial_html(result, running, csrf)
|
||||
content = _wrap_results_div(inner, running)
|
||||
return full_page(ctx, header_rows_html=hdr, content_html=content)
|
||||
|
||||
|
||||
async def render_results_partial(result: dict | None, running: bool,
|
||||
csrf: str) -> str:
|
||||
"""HTMX partial: just the results section (wrapped in polling div)."""
|
||||
inner = _results_partial_html(result, running, csrf)
|
||||
return _wrap_results_div(inner, running)
|
||||
Reference in New Issue
Block a user