Add unit test coverage for shared pure-logic modules (240 tests)

Track 1.1 of master plan: expand from sexp-only tests to cover
DTOs, HTTP signatures, HMAC auth, URL utilities, Jinja filters,
calendar helpers, config freeze, activity bus registry, parse
utilities, sexp helpers, error classes, and jinja bridge render API.

Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
This commit is contained in:
2026-02-28 19:34:37 +00:00
parent 6c44a5f3d0
commit 00efbc2a35
14 changed files with 1697 additions and 0 deletions

0
shared/tests/__init__.py Normal file
View File

10
shared/tests/conftest.py Normal file
View File

@@ -0,0 +1,10 @@
"""Shared test fixtures for unit tests."""
from __future__ import annotations
import os
import sys
# Ensure project root is on sys.path so shared.* imports work
_project_root = os.path.dirname(os.path.dirname(os.path.dirname(os.path.abspath(__file__))))
if _project_root not in sys.path:
sys.path.insert(0, _project_root)

View File

@@ -0,0 +1,119 @@
"""Tests for activity bus handler registry."""
from __future__ import annotations
import pytest
from collections import defaultdict
from shared.events.bus import (
register_activity_handler,
get_activity_handlers,
_activity_handlers,
)
@pytest.fixture(autouse=True)
def _clean_handlers():
"""Clear handler registry before each test."""
_activity_handlers.clear()
yield
_activity_handlers.clear()
# Dummy handlers
async def handler_a(activity, session):
pass
async def handler_b(activity, session):
pass
async def handler_c(activity, session):
pass
async def handler_global(activity, session):
pass
# ---------------------------------------------------------------------------
# register_activity_handler
# ---------------------------------------------------------------------------
class TestRegisterHandler:
def test_register_with_type_only(self):
register_activity_handler("Create", handler_a)
assert ("Create", "*") in _activity_handlers
assert handler_a in _activity_handlers[("Create", "*")]
def test_register_with_object_type(self):
register_activity_handler("Create", handler_a, object_type="Note")
assert ("Create", "Note") in _activity_handlers
def test_multiple_handlers_same_key(self):
register_activity_handler("Create", handler_a)
register_activity_handler("Create", handler_b)
assert len(_activity_handlers[("Create", "*")]) == 2
def test_wildcard_type(self):
register_activity_handler("*", handler_global)
assert ("*", "*") in _activity_handlers
# ---------------------------------------------------------------------------
# get_activity_handlers — cascading wildcard lookup
# ---------------------------------------------------------------------------
class TestGetHandlers:
def test_exact_match(self):
register_activity_handler("Create", handler_a, object_type="Note")
handlers = get_activity_handlers("Create", "Note")
assert handler_a in handlers
def test_type_wildcard_match(self):
register_activity_handler("Create", handler_a) # key: ("Create", "*")
handlers = get_activity_handlers("Create", "Note")
assert handler_a in handlers
def test_global_wildcard_match(self):
register_activity_handler("*", handler_global) # key: ("*", "*")
handlers = get_activity_handlers("Create", "Note")
assert handler_global in handlers
def test_cascading_order(self):
"""Handlers should come in order: exact → type-wildcard → global-wildcard."""
register_activity_handler("Create", handler_a, object_type="Note") # exact
register_activity_handler("Create", handler_b) # type-wildcard
register_activity_handler("*", handler_c) # global wildcard
handlers = get_activity_handlers("Create", "Note")
assert handlers == [handler_a, handler_b, handler_c]
def test_no_match(self):
register_activity_handler("Create", handler_a, object_type="Note")
handlers = get_activity_handlers("Delete", "Article")
assert handlers == []
def test_no_object_type_skips_exact(self):
register_activity_handler("Create", handler_a, object_type="Note")
register_activity_handler("Create", handler_b)
handlers = get_activity_handlers("Create")
# Should get type-wildcard only (since object_type defaults to "*")
assert handler_b in handlers
assert handler_a not in handlers
def test_global_wildcard_not_duplicated(self):
"""Global wildcard should not fire when activity_type is already '*'."""
register_activity_handler("*", handler_global)
handlers = get_activity_handlers("*")
# Should not include global wildcard twice
assert handlers.count(handler_global) == 1
def test_type_wildcard_plus_global(self):
register_activity_handler("Follow", handler_a)
register_activity_handler("*", handler_global)
handlers = get_activity_handlers("Follow")
assert handler_a in handlers
assert handler_global in handlers
def test_only_global_wildcard(self):
register_activity_handler("*", handler_global)
handlers = get_activity_handlers("Like", "Post")
assert handlers == [handler_global]

View File

@@ -0,0 +1,117 @@
"""Tests for calendar date helper functions."""
from __future__ import annotations
from datetime import date
from unittest.mock import patch, MagicMock
from shared.utils.calendar_helpers import add_months, build_calendar_weeks
# ---------------------------------------------------------------------------
# add_months
# ---------------------------------------------------------------------------
class TestAddMonths:
def test_same_year(self):
assert add_months(2025, 3, 2) == (2025, 5)
def test_next_year(self):
assert add_months(2025, 11, 2) == (2026, 1)
def test_subtract(self):
assert add_months(2025, 3, -2) == (2025, 1)
def test_subtract_prev_year(self):
assert add_months(2025, 1, -1) == (2024, 12)
def test_add_twelve(self):
assert add_months(2025, 6, 12) == (2026, 6)
def test_subtract_twelve(self):
assert add_months(2025, 6, -12) == (2024, 6)
def test_large_delta(self):
assert add_months(2025, 1, 25) == (2027, 2)
def test_zero_delta(self):
assert add_months(2025, 7, 0) == (2025, 7)
def test_december_to_january(self):
assert add_months(2025, 12, 1) == (2026, 1)
def test_january_to_december(self):
assert add_months(2025, 1, -1) == (2024, 12)
def test_subtract_large(self):
assert add_months(2025, 3, -15) == (2023, 12)
def test_month_boundaries(self):
# Every month +1
for m in range(1, 12):
y, nm = add_months(2025, m, 1)
assert nm == m + 1
assert y == 2025
y, nm = add_months(2025, 12, 1)
assert nm == 1
assert y == 2026
# ---------------------------------------------------------------------------
# build_calendar_weeks
# ---------------------------------------------------------------------------
class TestBuildCalendarWeeks:
def test_returns_list_of_weeks(self):
weeks = build_calendar_weeks(2025, 6)
assert isinstance(weeks, list)
assert len(weeks) >= 4 # at least 4 weeks in any month
assert len(weeks) <= 6 # at most 6 weeks
def test_each_week_has_7_days(self):
weeks = build_calendar_weeks(2025, 6)
for week in weeks:
assert len(week) == 7
def test_day_dict_structure(self):
weeks = build_calendar_weeks(2025, 6)
day = weeks[0][0]
assert "date" in day
assert "in_month" in day
assert "is_today" in day
assert isinstance(day["date"], date)
assert isinstance(day["in_month"], bool)
assert isinstance(day["is_today"], bool)
def test_in_month_flag(self):
weeks = build_calendar_weeks(2025, 6)
june_days = [d for w in weeks for d in w if d["in_month"]]
assert len(june_days) == 30 # June has 30 days
def test_february_leap_year(self):
weeks = build_calendar_weeks(2024, 2)
feb_days = [d for w in weeks for d in w if d["in_month"]]
assert len(feb_days) == 29
def test_february_non_leap_year(self):
weeks = build_calendar_weeks(2025, 2)
feb_days = [d for w in weeks for d in w if d["in_month"]]
assert len(feb_days) == 28
def test_starts_on_monday(self):
"""Calendar should start on Monday (firstweekday=0)."""
weeks = build_calendar_weeks(2025, 6)
first_day = weeks[0][0]["date"]
assert first_day.weekday() == 0 # Monday
def test_is_today_flag(self):
"""The today flag should be True for exactly one day (or zero if not in month range)."""
# Use a fixed known date - mock datetime.now
from datetime import datetime, timezone
fixed_now = datetime(2025, 6, 15, tzinfo=timezone.utc)
with patch("shared.utils.calendar_helpers.datetime") as mock_dt:
mock_dt.now.return_value = fixed_now
mock_dt.side_effect = lambda *a, **kw: datetime(*a, **kw)
weeks = build_calendar_weeks(2025, 6)
today_days = [d for w in weeks for d in w if d["is_today"]]
assert len(today_days) == 1
assert today_days[0]["date"] == date(2025, 6, 15)

152
shared/tests/test_config.py Normal file
View File

@@ -0,0 +1,152 @@
"""Tests for config freeze/readonly enforcement."""
from __future__ import annotations
import asyncio
import os
import tempfile
from types import MappingProxyType
import pytest
from shared.config import _freeze
# ---------------------------------------------------------------------------
# _freeze
# ---------------------------------------------------------------------------
class TestFreeze:
def test_freezes_dict(self):
result = _freeze({"a": 1, "b": 2})
assert isinstance(result, MappingProxyType)
assert result["a"] == 1
def test_frozen_dict_is_immutable(self):
result = _freeze({"a": 1})
with pytest.raises(TypeError):
result["a"] = 2
with pytest.raises(TypeError):
result["new"] = 3
def test_freezes_list_to_tuple(self):
result = _freeze([1, 2, 3])
assert isinstance(result, tuple)
assert result == (1, 2, 3)
def test_freezes_set_to_frozenset(self):
result = _freeze({1, 2, 3})
assert isinstance(result, frozenset)
assert result == frozenset({1, 2, 3})
def test_freezes_nested_dict(self):
result = _freeze({"a": {"b": {"c": 1}}})
assert isinstance(result, MappingProxyType)
assert isinstance(result["a"], MappingProxyType)
assert isinstance(result["a"]["b"], MappingProxyType)
assert result["a"]["b"]["c"] == 1
def test_freezes_dict_with_list(self):
result = _freeze({"items": [1, 2, 3]})
assert isinstance(result["items"], tuple)
def test_freezes_list_of_dicts(self):
result = _freeze([{"a": 1}, {"b": 2}])
assert isinstance(result, tuple)
assert isinstance(result[0], MappingProxyType)
def test_preserves_scalars(self):
assert _freeze(42) == 42
assert _freeze("hello") == "hello"
assert _freeze(3.14) == 3.14
assert _freeze(True) is True
assert _freeze(None) is None
def test_freezes_tuple_recursively(self):
result = _freeze(({"a": 1}, [2, 3]))
assert isinstance(result, tuple)
assert isinstance(result[0], MappingProxyType)
assert isinstance(result[1], tuple)
def test_complex_config_structure(self):
"""Simulates a real app-config.yaml structure."""
raw = {
"app_urls": {
"blog": "https://blog.rose-ash.com",
"market": "https://market.rose-ash.com",
},
"features": ["sexp", "federation"],
"limits": {"max_upload": 10485760},
}
frozen = _freeze(raw)
assert frozen["app_urls"]["blog"] == "https://blog.rose-ash.com"
assert frozen["features"] == ("sexp", "federation")
with pytest.raises(TypeError):
frozen["app_urls"]["blog"] = "changed"
# ---------------------------------------------------------------------------
# init_config / config / as_plain / pretty
# ---------------------------------------------------------------------------
class TestConfigInit:
def test_init_and_read(self):
"""Test full init_config → config() → as_plain() → pretty() cycle."""
import shared.config as cfg
# Save original state
orig_frozen = cfg._data_frozen
orig_plain = cfg._data_plain
try:
# Reset state
cfg._data_frozen = None
cfg._data_plain = None
# Write a temp YAML file
with tempfile.NamedTemporaryFile(mode="w", suffix=".yaml", delete=False) as f:
f.write("app_urls:\n blog: https://blog.example.com\nport: 8001\n")
path = f.name
try:
asyncio.run(cfg.init_config(path, force=True))
c = cfg.config()
assert c["app_urls"]["blog"] == "https://blog.example.com"
assert c["port"] == 8001
assert isinstance(c, MappingProxyType)
plain = cfg.as_plain()
assert isinstance(plain, dict)
assert plain["port"] == 8001
# Modifying plain should not affect config
plain["port"] = 9999
assert cfg.config()["port"] == 8001
pretty_str = cfg.pretty()
assert "blog" in pretty_str
finally:
os.unlink(path)
finally:
# Restore original state
cfg._data_frozen = orig_frozen
cfg._data_plain = orig_plain
def test_config_raises_before_init(self):
import shared.config as cfg
orig = cfg._data_frozen
try:
cfg._data_frozen = None
with pytest.raises(RuntimeError, match="init_config"):
cfg.config()
finally:
cfg._data_frozen = orig
def test_file_not_found(self):
import shared.config as cfg
orig = cfg._data_frozen
try:
cfg._data_frozen = None
with pytest.raises(FileNotFoundError):
asyncio.run(cfg.init_config("/nonexistent/path.yaml", force=True))
finally:
cfg._data_frozen = orig

215
shared/tests/test_dtos.py Normal file
View File

@@ -0,0 +1,215 @@
"""Tests for DTO serialization helpers and dataclass contracts."""
from __future__ import annotations
import pytest
from datetime import datetime
from decimal import Decimal
from shared.contracts.dtos import (
PostDTO,
CalendarDTO,
CalendarEntryDTO,
TicketDTO,
MarketPlaceDTO,
ProductDTO,
CartItemDTO,
CartSummaryDTO,
ActorProfileDTO,
APActivityDTO,
RemotePostDTO,
dto_to_dict,
dto_from_dict,
_serialize_value,
_unwrap_optional,
)
# ---------------------------------------------------------------------------
# _serialize_value
# ---------------------------------------------------------------------------
class TestSerializeValue:
def test_datetime(self):
dt = datetime(2025, 6, 15, 12, 30, 0)
assert _serialize_value(dt) == "2025-06-15T12:30:00"
def test_decimal(self):
assert _serialize_value(Decimal("19.99")) == "19.99"
def test_set_to_list(self):
result = _serialize_value({1, 2, 3})
assert isinstance(result, list)
assert set(result) == {1, 2, 3}
def test_string_passthrough(self):
assert _serialize_value("hello") == "hello"
def test_int_passthrough(self):
assert _serialize_value(42) == 42
def test_none_passthrough(self):
assert _serialize_value(None) is None
def test_nested_list(self):
dt = datetime(2025, 1, 1)
result = _serialize_value([dt, Decimal("5")])
assert result == ["2025-01-01T00:00:00", "5"]
def test_nested_dataclass(self):
post = PostDTO(id=1, slug="test", title="Test", status="published", visibility="public")
result = _serialize_value(post)
assert isinstance(result, dict)
assert result["slug"] == "test"
# ---------------------------------------------------------------------------
# _unwrap_optional
# ---------------------------------------------------------------------------
class TestUnwrapOptional:
def test_optional_str(self):
from typing import Optional
result = _unwrap_optional(Optional[str])
assert result is str
def test_optional_int(self):
from typing import Optional
result = _unwrap_optional(Optional[int])
assert result is int
def test_plain_type(self):
assert _unwrap_optional(str) is str
def test_union_with_none(self):
from typing import Union
result = _unwrap_optional(Union[datetime, None])
assert result is datetime
# ---------------------------------------------------------------------------
# dto_to_dict
# ---------------------------------------------------------------------------
class TestDtoToDict:
def test_simple_post(self):
post = PostDTO(id=1, slug="my-post", title="My Post", status="published", visibility="public")
d = dto_to_dict(post)
assert d["id"] == 1
assert d["slug"] == "my-post"
assert d["feature_image"] is None
def test_with_datetime(self):
dt = datetime(2025, 6, 15, 10, 0, 0)
post = PostDTO(id=1, slug="s", title="T", status="published", visibility="public", published_at=dt)
d = dto_to_dict(post)
assert d["published_at"] == "2025-06-15T10:00:00"
def test_with_decimal(self):
product = ProductDTO(id=1, slug="widget", rrp=Decimal("29.99"), regular_price=Decimal("24.99"))
d = dto_to_dict(product)
assert d["rrp"] == "29.99"
assert d["regular_price"] == "24.99"
def test_cart_summary_with_items(self):
item = CartItemDTO(id=1, product_id=10, quantity=2, unit_price=Decimal("5.00"))
summary = CartSummaryDTO(count=1, total=Decimal("10.00"), items=[item])
d = dto_to_dict(summary)
assert d["count"] == 1
assert d["total"] == "10.00"
assert len(d["items"]) == 1
assert d["items"][0]["product_id"] == 10
def test_remote_post_with_nested_lists(self):
rp = RemotePostDTO(
id=1, remote_actor_id=5, object_id="https://example.com/1",
content="<p>Hello</p>",
attachments=[{"type": "Image", "url": "https://img.example.com/1.jpg"}],
tags=[{"type": "Hashtag", "name": "#test"}],
)
d = dto_to_dict(rp)
assert d["attachments"] == [{"type": "Image", "url": "https://img.example.com/1.jpg"}]
# ---------------------------------------------------------------------------
# dto_from_dict
# ---------------------------------------------------------------------------
class TestDtoFromDict:
def test_none_data(self):
assert dto_from_dict(PostDTO, None) is None
def test_empty_dict(self):
assert dto_from_dict(PostDTO, {}) is None
def test_simple_post(self):
d = {"id": 1, "slug": "test", "title": "Test", "status": "published", "visibility": "public"}
post = dto_from_dict(PostDTO, d)
assert post.slug == "test"
assert post.feature_image is None
def test_datetime_coercion(self):
d = {
"id": 1, "slug": "s", "title": "T", "status": "published",
"visibility": "public", "published_at": "2025-06-15T10:00:00",
}
post = dto_from_dict(PostDTO, d)
assert isinstance(post.published_at, datetime)
assert post.published_at.year == 2025
def test_decimal_coercion(self):
d = {"id": 1, "slug": "w", "rrp": "29.99", "regular_price": 24.99}
product = dto_from_dict(ProductDTO, d)
assert isinstance(product.rrp, Decimal)
assert product.rrp == Decimal("29.99")
assert isinstance(product.regular_price, Decimal)
def test_round_trip(self):
dt = datetime(2025, 3, 1, 9, 30, 0)
original = PostDTO(id=1, slug="rt", title="Round Trip", status="draft", visibility="members", published_at=dt)
d = dto_to_dict(original)
restored = dto_from_dict(PostDTO, d)
assert restored.id == original.id
assert restored.slug == original.slug
assert restored.published_at == original.published_at
def test_extra_keys_ignored(self):
d = {"id": 1, "slug": "s", "title": "T", "status": "published", "visibility": "public", "extra_field": "ignored"}
post = dto_from_dict(PostDTO, d)
assert post.slug == "s"
def test_calendar_entry_decimals(self):
d = {
"id": 1, "calendar_id": 2, "name": "Event", "start_at": "2025-07-01T14:00:00",
"state": "confirmed", "cost": "15.00", "ticket_price": "10.50",
}
entry = dto_from_dict(CalendarEntryDTO, d)
assert isinstance(entry.cost, Decimal)
assert entry.cost == Decimal("15.00")
assert isinstance(entry.ticket_price, Decimal)
# ---------------------------------------------------------------------------
# Frozen DTOs
# ---------------------------------------------------------------------------
class TestFrozenDTOs:
def test_post_is_frozen(self):
post = PostDTO(id=1, slug="s", title="T", status="published", visibility="public")
with pytest.raises(AttributeError):
post.title = "changed"
def test_product_is_frozen(self):
product = ProductDTO(id=1, slug="s")
with pytest.raises(AttributeError):
product.slug = "changed"
def test_calendar_dto_defaults(self):
cal = CalendarDTO(id=1, container_type="page", container_id=5, name="My Cal", slug="my-cal")
assert cal.description is None
def test_cart_summary_defaults(self):
summary = CartSummaryDTO()
assert summary.count == 0
assert summary.total == Decimal("0")
assert summary.items == []
assert summary.ticket_count == 0

View File

@@ -0,0 +1,96 @@
"""Tests for error classes and error page rendering."""
from __future__ import annotations
import pytest
from shared.browser.app.errors import AppError, _error_page
# ---------------------------------------------------------------------------
# AppError
# ---------------------------------------------------------------------------
class TestAppError:
def test_single_message(self):
err = AppError("Something went wrong")
assert str(err) == "Something went wrong"
assert err.messages == ["Something went wrong"]
assert err.status_code == 400
def test_custom_status_code(self):
err = AppError("Not found", status_code=404)
assert err.status_code == 404
assert str(err) == "Not found"
def test_list_of_messages(self):
err = AppError(["Error 1", "Error 2", "Error 3"])
assert err.messages == ["Error 1", "Error 2", "Error 3"]
assert str(err) == "Error 1" # first message as str
def test_tuple_of_messages(self):
err = AppError(("A", "B"))
assert err.messages == ["A", "B"]
def test_set_of_messages(self):
err = AppError({"only one"})
assert err.messages == ["only one"]
def test_empty_list(self):
err = AppError([])
assert err.messages == []
assert str(err) == ""
def test_is_value_error(self):
"""AppError should be catchable as ValueError for backwards compat."""
err = AppError("test")
assert isinstance(err, ValueError)
def test_default_status_is_400(self):
err = AppError("test")
assert err.status_code == 400
def test_integer_message_coerced(self):
err = AppError([42, "text"])
assert err.messages == ["42", "text"]
def test_status_code_override(self):
err = AppError("conflict", status_code=409)
assert err.status_code == 409
def test_messages_are_strings(self):
err = AppError([None, 123, True])
assert all(isinstance(m, str) for m in err.messages)
# ---------------------------------------------------------------------------
# _error_page
# ---------------------------------------------------------------------------
class TestErrorPage:
def test_returns_html_string(self):
html = _error_page("Not Found")
assert isinstance(html, str)
assert "<!DOCTYPE html>" in html
def test_contains_message(self):
html = _error_page("Something broke")
assert "Something broke" in html
def test_contains_error_gif(self):
html = _error_page("Error")
assert "/static/errors/error.gif" in html
def test_contains_reload_link(self):
html = _error_page("Error")
assert "Reload" in html
def test_html_in_message(self):
"""Messages can contain HTML (used by fragment_error handler)."""
html = _error_page("The <b>account</b> service is unavailable")
assert "<b>account</b>" in html
def test_self_contained(self):
"""Error page should include its own styles (no external CSS deps)."""
html = _error_page("Error")
assert "<style>" in html
assert "</style>" in html

View File

@@ -0,0 +1,291 @@
"""Tests for Jinja template filters (pure-logic functions)."""
from __future__ import annotations
from decimal import Decimal
from markupsafe import Markup
from shared.browser.app.filters.highlight import highlight
from shared.browser.app.filters.combine import _deep_merge
from shared.browser.app.filters.qs_base import (
_iterify,
_norm,
make_filter_set,
build_qs,
)
# ---------------------------------------------------------------------------
# highlight
# ---------------------------------------------------------------------------
class TestHighlight:
def test_basic_highlight(self):
result = highlight("Hello World", "world")
assert isinstance(result, Markup)
assert "<mark" in result
assert "World" in result
def test_case_insensitive(self):
result = highlight("Hello World", "HELLO")
assert "<mark" in result
def test_empty_needle(self):
result = highlight("Hello", "")
assert result == "Hello"
def test_empty_text(self):
result = highlight("", "needle")
assert result == ""
def test_none_text(self):
result = highlight(None, "needle")
assert result == ""
def test_no_match(self):
result = highlight("Hello World", "xyz")
assert "<mark" not in result
assert "Hello World" in result
def test_escapes_html(self):
result = highlight("<script>alert('xss')</script>", "script")
assert "<script>" not in str(result)
assert "&lt;script&gt;" in str(result) or "&lt;" in str(result)
def test_custom_class(self):
result = highlight("Hello", "Hello", cls="highlight-red")
assert "highlight-red" in result
# ---------------------------------------------------------------------------
# truncate (tested as pure function)
# ---------------------------------------------------------------------------
class TestTruncate:
"""Test the truncate logic directly."""
@staticmethod
def _truncate(text, max_length=100):
if text is None:
return ""
text = str(text)
if len(text) <= max_length:
return text
if max_length <= 1:
return ""
return text[:max_length - 1] + ""
def test_short_text(self):
assert self._truncate("hello", 10) == "hello"
def test_exact_length(self):
assert self._truncate("hello", 5) == "hello"
def test_truncates_long(self):
result = self._truncate("hello world", 6)
assert result == "hello…"
assert len(result) == 6
def test_none_input(self):
assert self._truncate(None) == ""
def test_max_length_one(self):
assert self._truncate("hello", 1) == ""
def test_max_length_zero(self):
assert self._truncate("hello", 0) == ""
# ---------------------------------------------------------------------------
# currency (tested as pure function)
# ---------------------------------------------------------------------------
class TestCurrency:
@staticmethod
def _currency(value, code="GBP"):
if value is None:
return ""
if isinstance(value, float):
value = Decimal(str(value))
symbol = "£" if code == "GBP" else code
return f"{symbol}{value:.2f}"
def test_gbp(self):
assert self._currency(Decimal("19.99")) == "£19.99"
def test_none(self):
assert self._currency(None) == ""
def test_float_conversion(self):
assert self._currency(19.99) == "£19.99"
def test_non_gbp(self):
assert self._currency(Decimal("10.00"), "EUR") == "EUR10.00"
def test_zero(self):
assert self._currency(Decimal("0")) == "£0.00"
def test_integer_decimal(self):
assert self._currency(Decimal("5")) == "£5.00"
# ---------------------------------------------------------------------------
# combine / _deep_merge
# ---------------------------------------------------------------------------
class TestDeepMerge:
def test_simple_merge(self):
assert _deep_merge({"a": 1}, {"b": 2}) == {"a": 1, "b": 2}
def test_overwrite(self):
assert _deep_merge({"a": 1}, {"a": 2}) == {"a": 2}
def test_nested_merge(self):
a = {"x": {"a": 1, "b": 2}}
b = {"x": {"b": 3, "c": 4}}
result = _deep_merge(a, b)
assert result == {"x": {"a": 1, "b": 3, "c": 4}}
def test_deeply_nested(self):
a = {"x": {"y": {"z": 1}}}
b = {"x": {"y": {"w": 2}}}
result = _deep_merge(a, b)
assert result == {"x": {"y": {"z": 1, "w": 2}}}
def test_does_not_mutate_original(self):
a = {"a": 1}
b = {"b": 2}
_deep_merge(a, b)
assert a == {"a": 1}
class TestCombineFilter:
"""Test the combine filter logic inline (it's defined inside register())."""
from typing import Any, Mapping
@staticmethod
def _combine(a, b, deep=False, drop_none=False):
from collections.abc import Mapping
if not isinstance(a, Mapping) or not isinstance(b, Mapping):
return a
b2 = {k: v for k, v in b.items() if not (drop_none and v is None)}
return _deep_merge(a, b2) if deep else {**a, **b2}
def test_non_dict_returns_a(self):
assert self._combine("hello", {"a": 1}) == "hello"
assert self._combine(42, {"a": 1}) == 42
def test_shallow_merge(self):
result = self._combine({"a": 1}, {"b": 2})
assert result == {"a": 1, "b": 2}
def test_deep_merge(self):
result = self._combine({"x": {"a": 1}}, {"x": {"b": 2}}, deep=True)
assert result == {"x": {"a": 1, "b": 2}}
def test_drop_none(self):
result = self._combine({"a": 1, "b": 2}, {"b": None, "c": 3}, drop_none=True)
assert result == {"a": 1, "b": 2, "c": 3}
def test_keep_none_when_not_dropping(self):
result = self._combine({"a": 1}, {"a": None})
assert result == {"a": None}
# ---------------------------------------------------------------------------
# qs_base
# ---------------------------------------------------------------------------
class TestIterify:
def test_none(self):
assert _iterify(None) == []
def test_scalar(self):
assert _iterify("hello") == ["hello"]
def test_list(self):
assert _iterify([1, 2]) == [1, 2]
def test_tuple(self):
assert _iterify((1, 2)) == (1, 2)
def test_set(self):
assert _iterify({1, 2}) == {1, 2}
class TestNorm:
def test_strips_and_lowercases(self):
assert _norm(" Hello ") == "hello"
def test_already_lower(self):
assert _norm("hello") == "hello"
class TestMakeFilterSet:
def test_add_to_empty(self):
result = make_filter_set([], "new", None, False)
assert result == ["new"]
def test_add_preserves_existing(self):
result = make_filter_set(["a", "b"], "c", None, False)
assert result == ["a", "b", "c"]
def test_remove(self):
result = make_filter_set(["a", "b", "c"], None, "b", False)
assert result == ["a", "c"]
def test_clear_filters(self):
result = make_filter_set(["a", "b"], None, None, True)
assert result == []
def test_clear_then_add(self):
result = make_filter_set(["a", "b"], "c", None, True)
assert result == ["c"]
def test_case_insensitive_dedup(self):
result = make_filter_set(["Hello"], "hello", None, False)
assert len(result) == 1
def test_sorted_output(self):
result = make_filter_set([], ["c", "a", "b"], None, False)
assert result == ["a", "b", "c"]
def test_single_select_replaces(self):
result = make_filter_set(["old1", "old2"], "new", None, False, single_select=True)
assert result == ["new"]
def test_single_select_no_add_keeps_base(self):
result = make_filter_set(["a", "b"], None, None, False, single_select=True)
assert result == ["a", "b"]
def test_remove_case_insensitive(self):
result = make_filter_set(["Hello", "World"], None, "hello", False)
assert result == ["World"]
def test_add_none_values_filtered(self):
result = make_filter_set([], [None, "a", None], None, False)
assert result == ["a"]
class TestBuildQs:
def test_basic(self):
result = build_qs([("key", "value")])
assert result == "?key=value"
def test_multiple_params(self):
result = build_qs([("a", "1"), ("b", "2")])
assert "a=1" in result
assert "b=2" in result
assert result.startswith("?")
def test_no_leading_q(self):
result = build_qs([("key", "value")], leading_q=False)
assert result == "key=value"
def test_empty_params(self):
result = build_qs([])
assert result == ""
def test_empty_params_no_leading(self):
result = build_qs([], leading_q=False)
assert result == ""

View File

@@ -0,0 +1,140 @@
"""Tests for RSA key generation and HTTP Signature signing/verification."""
from __future__ import annotations
import json
from shared.utils.http_signatures import (
generate_rsa_keypair,
sign_request,
verify_request_signature,
create_ld_signature,
)
# ---------------------------------------------------------------------------
# Key generation
# ---------------------------------------------------------------------------
class TestKeyGeneration:
def test_generates_pem_strings(self):
private_pem, public_pem = generate_rsa_keypair()
assert isinstance(private_pem, str)
assert isinstance(public_pem, str)
def test_private_key_format(self):
private_pem, _ = generate_rsa_keypair()
assert "BEGIN PRIVATE KEY" in private_pem
assert "END PRIVATE KEY" in private_pem
def test_public_key_format(self):
_, public_pem = generate_rsa_keypair()
assert "BEGIN PUBLIC KEY" in public_pem
assert "END PUBLIC KEY" in public_pem
def test_keys_are_unique(self):
priv1, pub1 = generate_rsa_keypair()
priv2, pub2 = generate_rsa_keypair()
assert priv1 != priv2
assert pub1 != pub2
# ---------------------------------------------------------------------------
# Sign + verify round-trip
# ---------------------------------------------------------------------------
class TestSignVerify:
def test_round_trip_no_body(self):
private_pem, public_pem = generate_rsa_keypair()
headers = sign_request(
private_pem, key_id="https://example.com/users/alice#main-key",
method="GET", path="/users/bob/inbox", host="example.com",
date="Sat, 15 Jun 2025 12:00:00 GMT",
)
assert "Signature" in headers
assert "Date" in headers
assert "Host" in headers
assert "Digest" not in headers
ok = verify_request_signature(
public_pem, headers["Signature"], method="GET",
path="/users/bob/inbox", headers=headers,
)
assert ok is True
def test_round_trip_with_body(self):
private_pem, public_pem = generate_rsa_keypair()
body = b'{"type": "Follow"}'
headers = sign_request(
private_pem, key_id="https://example.com/users/alice#main-key",
method="POST", path="/users/bob/inbox", host="example.com",
body=body, date="Sat, 15 Jun 2025 12:00:00 GMT",
)
assert "Digest" in headers
assert headers["Digest"].startswith("SHA-256=")
ok = verify_request_signature(
public_pem, headers["Signature"], method="POST",
path="/users/bob/inbox", headers=headers,
)
assert ok is True
def test_wrong_key_fails(self):
priv1, _ = generate_rsa_keypair()
_, pub2 = generate_rsa_keypair()
headers = sign_request(
priv1, key_id="key1", method="GET", path="/inbox", host="a.com",
date="Sat, 15 Jun 2025 12:00:00 GMT",
)
ok = verify_request_signature(pub2, headers["Signature"], "GET", "/inbox", headers)
assert ok is False
def test_tampered_path_fails(self):
private_pem, public_pem = generate_rsa_keypair()
headers = sign_request(
private_pem, key_id="key1", method="GET", path="/inbox", host="a.com",
date="Sat, 15 Jun 2025 12:00:00 GMT",
)
ok = verify_request_signature(public_pem, headers["Signature"], "GET", "/tampered", headers)
assert ok is False
def test_tampered_method_fails(self):
private_pem, public_pem = generate_rsa_keypair()
headers = sign_request(
private_pem, key_id="key1", method="GET", path="/inbox", host="a.com",
date="Sat, 15 Jun 2025 12:00:00 GMT",
)
ok = verify_request_signature(public_pem, headers["Signature"], "POST", "/inbox", headers)
assert ok is False
def test_signature_header_contains_key_id(self):
private_pem, _ = generate_rsa_keypair()
headers = sign_request(
private_pem, key_id="https://my.server/actor#main-key",
method="POST", path="/inbox", host="remote.server",
date="Sat, 15 Jun 2025 12:00:00 GMT",
)
assert 'keyId="https://my.server/actor#main-key"' in headers["Signature"]
assert 'algorithm="rsa-sha256"' in headers["Signature"]
# ---------------------------------------------------------------------------
# Linked Data signature
# ---------------------------------------------------------------------------
class TestLDSignature:
def test_creates_ld_signature(self):
private_pem, _ = generate_rsa_keypair()
activity = {"type": "Create", "actor": "https://example.com/users/alice"}
sig = create_ld_signature(private_pem, "https://example.com/users/alice#main-key", activity)
assert sig["type"] == "RsaSignature2017"
assert sig["creator"] == "https://example.com/users/alice#main-key"
assert "signatureValue" in sig
assert "created" in sig
def test_deterministic_canonical(self):
"""Same activity always produces same canonical form (signature differs due to timestamp)."""
private_pem, _ = generate_rsa_keypair()
activity = {"b": 2, "a": 1}
# The canonical form should sort keys
canonical = json.dumps(activity, sort_keys=True, separators=(",", ":"))
assert canonical == '{"a":1,"b":2}'

View File

@@ -0,0 +1,114 @@
"""Tests for HMAC-based internal service-to-service authentication."""
from __future__ import annotations
import hashlib
import hmac
import time
from unittest.mock import patch
from shared.infrastructure.internal_auth import sign_internal_headers
# ---------------------------------------------------------------------------
# sign_internal_headers
# ---------------------------------------------------------------------------
class TestSignInternalHeaders:
def test_returns_required_headers(self):
headers = sign_internal_headers("cart")
assert "X-Internal-Timestamp" in headers
assert "X-Internal-App" in headers
assert "X-Internal-Signature" in headers
def test_app_name_in_header(self):
headers = sign_internal_headers("blog")
assert headers["X-Internal-App"] == "blog"
def test_timestamp_is_recent(self):
headers = sign_internal_headers("events")
ts = int(headers["X-Internal-Timestamp"])
now = int(time.time())
assert abs(now - ts) < 5
def test_signature_is_hex(self):
headers = sign_internal_headers("cart")
sig = headers["X-Internal-Signature"]
# SHA-256 hex is 64 chars
assert len(sig) == 64
int(sig, 16) # should not raise
def test_different_apps_different_signatures(self):
h1 = sign_internal_headers("cart")
h2 = sign_internal_headers("blog")
assert h1["X-Internal-Signature"] != h2["X-Internal-Signature"]
# ---------------------------------------------------------------------------
# Round-trip: sign then validate
# ---------------------------------------------------------------------------
class TestSignAndValidate:
"""Test the HMAC signing logic directly without needing a Quart request context."""
def _validate_headers(self, headers: dict[str, str], secret: bytes, max_age: int = 300) -> bool:
"""Replicate validate_internal_request logic without Quart request context."""
ts = headers.get("X-Internal-Timestamp", "")
app_name = headers.get("X-Internal-App", "")
sig = headers.get("X-Internal-Signature", "")
if not ts or not app_name or not sig:
return False
try:
req_time = int(ts)
except (ValueError, TypeError):
return False
now = int(time.time())
if abs(now - req_time) > max_age:
return False
payload = f"{ts}:{app_name}".encode()
expected = hmac.new(secret, payload, hashlib.sha256).hexdigest()
return hmac.compare_digest(sig, expected)
def test_valid_signature(self):
from shared.infrastructure.internal_auth import _get_secret
secret = _get_secret()
headers = sign_internal_headers("relations")
assert self._validate_headers(headers, secret) is True
def test_tampered_signature_fails(self):
from shared.infrastructure.internal_auth import _get_secret
secret = _get_secret()
headers = sign_internal_headers("cart")
headers["X-Internal-Signature"] = "0" * 64
assert self._validate_headers(headers, secret) is False
def test_wrong_secret_fails(self):
headers = sign_internal_headers("cart")
assert self._validate_headers(headers, b"wrong-secret") is False
def test_expired_timestamp_fails(self):
from shared.infrastructure.internal_auth import _get_secret
secret = _get_secret()
headers = sign_internal_headers("cart")
# Set timestamp to 10 minutes ago
old_ts = str(int(time.time()) - 600)
headers["X-Internal-Timestamp"] = old_ts
# Re-sign with old timestamp (so the signature matches the old ts)
payload = f"{old_ts}:cart".encode()
headers["X-Internal-Signature"] = hmac.new(secret, payload, hashlib.sha256).hexdigest()
assert self._validate_headers(headers, secret) is False
def test_missing_headers_fail(self):
from shared.infrastructure.internal_auth import _get_secret
secret = _get_secret()
assert self._validate_headers({}, secret) is False
assert self._validate_headers({"X-Internal-Timestamp": "123"}, secret) is False
def test_invalid_timestamp_fails(self):
from shared.infrastructure.internal_auth import _get_secret
secret = _get_secret()
headers = {"X-Internal-Timestamp": "not-a-number", "X-Internal-App": "cart", "X-Internal-Signature": "abc"}
assert self._validate_headers(headers, secret) is False

View File

@@ -0,0 +1,166 @@
"""Tests for the render() function and component loading in jinja_bridge.
These test functionality added in recent commits (render() API,
load_sexp_dir, snake→kebab conversion) that isn't covered by the existing
shared/sexp/tests/test_jinja_bridge.py.
"""
from __future__ import annotations
import os
import tempfile
import pytest
from shared.sexp.jinja_bridge import (
render,
register_components,
load_sexp_dir,
_COMPONENT_ENV,
)
@pytest.fixture(autouse=True)
def _clean_env():
"""Clear component env before each test."""
_COMPONENT_ENV.clear()
yield
_COMPONENT_ENV.clear()
# ---------------------------------------------------------------------------
# render() — call component by name with Python kwargs
# ---------------------------------------------------------------------------
class TestRender:
def test_basic_render(self):
register_components('(defcomp ~badge (&key label) (span :class "badge" label))')
html = render("badge", label="New")
assert html == '<span class="badge">New</span>'
def test_tilde_prefix_optional(self):
register_components('(defcomp ~pill (&key text) (em text))')
# Both forms should work
assert render("pill", text="Hi") == render("~pill", text="Hi")
def test_snake_to_kebab_conversion(self):
"""Python snake_case kwargs should map to sexp kebab-case params."""
register_components('''
(defcomp ~card (&key nav-html link-href)
(div :class "card" (a :href link-href nav-html)))
''')
html = render("card", nav_html="Nav", link_href="/about")
assert 'href="/about"' in html
assert "Nav" in html
def test_multiple_kwargs(self):
register_components('''
(defcomp ~item (&key title price image-url)
(div (h3 title) (span price) (img :src image-url)))
''')
html = render("item", title="Widget", price="£10", image_url="/img/w.jpg")
assert "Widget" in html
assert "£10" in html
assert 'src="/img/w.jpg"' in html
def test_unknown_component_raises(self):
with pytest.raises(ValueError, match="Unknown component"):
render("nonexistent", label="x")
def test_empty_kwargs(self):
register_components('(defcomp ~empty () (hr))')
html = render("empty")
assert html == "<hr>"
def test_html_escaping_in_values(self):
register_components('(defcomp ~safe (&key text) (p text))')
html = render("safe", text='<script>alert("xss")</script>')
assert "<script>" not in html
assert "&lt;script&gt;" in html
def test_boolean_false_value(self):
register_components('''
(defcomp ~toggle (&key active)
(when active (span "ON")))
''')
html = render("toggle", active=False)
assert "ON" not in html
def test_boolean_true_value(self):
register_components('''
(defcomp ~toggle (&key active)
(when active (span "ON")))
''')
html = render("toggle", active=True)
assert "ON" in html
# ---------------------------------------------------------------------------
# load_sexp_dir
# ---------------------------------------------------------------------------
class TestLoadSexpDir:
def test_loads_sexp_files(self):
with tempfile.TemporaryDirectory() as tmpdir:
# Write a .sexp file
with open(os.path.join(tmpdir, "components.sexp"), "w") as f:
f.write('(defcomp ~test-comp (&key msg) (div msg))')
load_sexp_dir(tmpdir)
html = render("test-comp", msg="loaded!")
assert html == "<div>loaded!</div>"
def test_loads_sexpr_files(self):
with tempfile.TemporaryDirectory() as tmpdir:
with open(os.path.join(tmpdir, "nav.sexpr"), "w") as f:
f.write('(defcomp ~nav-item (&key href label) (a :href href label))')
load_sexp_dir(tmpdir)
html = render("nav-item", href="/about", label="About")
assert 'href="/about"' in html
def test_loads_multiple_files(self):
with tempfile.TemporaryDirectory() as tmpdir:
with open(os.path.join(tmpdir, "a.sexp"), "w") as f:
f.write('(defcomp ~comp-a (&key x) (b x))')
with open(os.path.join(tmpdir, "b.sexp"), "w") as f:
f.write('(defcomp ~comp-b (&key y) (i y))')
load_sexp_dir(tmpdir)
assert render("comp-a", x="A") == "<b>A</b>"
assert render("comp-b", y="B") == "<i>B</i>"
def test_empty_directory(self):
with tempfile.TemporaryDirectory() as tmpdir:
load_sexp_dir(tmpdir) # should not raise
def test_ignores_non_sexp_files(self):
with tempfile.TemporaryDirectory() as tmpdir:
with open(os.path.join(tmpdir, "readme.txt"), "w") as f:
f.write("not a sexp file")
with open(os.path.join(tmpdir, "comp.sexp"), "w") as f:
f.write('(defcomp ~real (&key v) (span v))')
load_sexp_dir(tmpdir)
assert "~real" in _COMPONENT_ENV
# txt file should not have been loaded
assert len([k for k in _COMPONENT_ENV if k.startswith("~")]) == 1
# ---------------------------------------------------------------------------
# register_components — multiple definitions in one source
# ---------------------------------------------------------------------------
class TestRegisterComponents:
def test_multiple_in_one_source(self):
register_components('''
(defcomp ~a (&key x) (b x))
(defcomp ~b (&key y) (i y))
''')
assert "~a" in _COMPONENT_ENV
assert "~b" in _COMPONENT_ENV
def test_overwrite_existing(self):
register_components('(defcomp ~ow (&key x) (b x))')
assert render("ow", x="v1") == "<b>v1</b>"
register_components('(defcomp ~ow (&key x) (i x))')
assert render("ow", x="v2") == "<i>v2</i>"

View File

@@ -0,0 +1,95 @@
"""Tests for parse utility functions."""
from __future__ import annotations
from datetime import time, datetime, timezone
from shared.browser.app.utils.parse import parse_time, parse_cost, parse_dt
# ---------------------------------------------------------------------------
# parse_time
# ---------------------------------------------------------------------------
class TestParseTime:
def test_valid_time(self):
result = parse_time("14:30")
assert result == time(14, 30)
def test_midnight(self):
result = parse_time("00:00")
assert result == time(0, 0)
def test_end_of_day(self):
result = parse_time("23:59")
assert result == time(23, 59)
def test_none_input(self):
assert parse_time(None) is None
def test_empty_string(self):
assert parse_time("") is None
def test_invalid_format(self):
assert parse_time("not-a-time") is None
def test_invalid_hours(self):
assert parse_time("25:00") is None
def test_single_digit(self):
result = parse_time("9:05")
assert result == time(9, 5)
# ---------------------------------------------------------------------------
# parse_cost
# ---------------------------------------------------------------------------
class TestParseCost:
def test_valid_float(self):
assert parse_cost("19.99") == 19.99
def test_integer_string(self):
assert parse_cost("10") == 10.0
def test_zero(self):
assert parse_cost("0") == 0.0
def test_none_input(self):
assert parse_cost(None) is None
def test_empty_string(self):
assert parse_cost("") is None
def test_invalid_string(self):
assert parse_cost("not-a-number") is None
# ---------------------------------------------------------------------------
# parse_dt
# ---------------------------------------------------------------------------
class TestParseDt:
def test_iso_format(self):
result = parse_dt("2025-06-15T14:30:00")
assert isinstance(result, datetime)
assert result.year == 2025
assert result.month == 6
assert result.day == 15
def test_naive_gets_utc(self):
result = parse_dt("2025-06-15T14:30:00")
assert result.tzinfo == timezone.utc
def test_aware_preserved(self):
result = parse_dt("2025-06-15T14:30:00+01:00")
assert result.tzinfo is not None
def test_none_input(self):
assert parse_dt(None) is None
def test_empty_string(self):
assert parse_dt("") is None
def test_date_only(self):
result = parse_dt("2025-06-15")
assert result.year == 2025

View File

@@ -0,0 +1,74 @@
"""Tests for shared sexp helper functions (call_url, get_asset_url, etc.)."""
from __future__ import annotations
from shared.sexp.helpers import call_url, get_asset_url
# ---------------------------------------------------------------------------
# call_url
# ---------------------------------------------------------------------------
class TestCallUrl:
def test_callable_url_fn(self):
ctx = {"blog_url": lambda path: f"https://blog.example.com{path}"}
assert call_url(ctx, "blog_url", "/posts/") == "https://blog.example.com/posts/"
def test_callable_default_path(self):
ctx = {"blog_url": lambda path: f"https://blog.example.com{path}"}
assert call_url(ctx, "blog_url") == "https://blog.example.com/"
def test_string_url(self):
ctx = {"blog_url": "https://blog.example.com"}
assert call_url(ctx, "blog_url", "/posts/") == "https://blog.example.com/posts/"
def test_string_url_default_path(self):
ctx = {"blog_url": "https://blog.example.com"}
assert call_url(ctx, "blog_url") == "https://blog.example.com/"
def test_missing_key(self):
ctx = {}
assert call_url(ctx, "blog_url", "/x") == "/x"
def test_none_value(self):
ctx = {"blog_url": None}
assert call_url(ctx, "blog_url", "/x") == "/x"
def test_callable_with_empty_path(self):
ctx = {"cart_url": lambda path: f"https://cart.example.com{path}"}
assert call_url(ctx, "cart_url", "") == "https://cart.example.com"
# ---------------------------------------------------------------------------
# get_asset_url
# ---------------------------------------------------------------------------
class TestGetAssetUrl:
def test_callable_asset_url(self):
ctx = {"asset_url": lambda path: f"https://cdn.example.com/static{path}"}
result = get_asset_url(ctx)
# Should strip the trailing path component
assert "cdn.example.com" in result
def test_string_asset_url(self):
ctx = {"asset_url": "https://cdn.example.com/static"}
assert get_asset_url(ctx) == "https://cdn.example.com/static"
def test_missing_asset_url(self):
ctx = {}
assert get_asset_url(ctx) == ""
def test_none_asset_url(self):
ctx = {"asset_url": None}
assert get_asset_url(ctx) == ""
def test_callable_returns_path_only(self):
# au("") → "/static", rsplit("/",1)[0] → "" (splits on leading /)
ctx = {"asset_url": lambda path: f"/static{path}"}
result = get_asset_url(ctx)
assert result == ""
def test_callable_with_nested_path(self):
# au("") → "/assets/static", rsplit("/",1)[0] → "/assets"
ctx = {"asset_url": lambda path: f"/assets/static{path}"}
result = get_asset_url(ctx)
assert result == "/assets"

View File

@@ -0,0 +1,108 @@
"""Tests for URL join utilities."""
from __future__ import annotations
from shared.utils import _join_url_parts, join_url, normalize_text, soup_of
# ---------------------------------------------------------------------------
# _join_url_parts
# ---------------------------------------------------------------------------
class TestJoinUrlParts:
def test_empty_list(self):
assert _join_url_parts([]) == ""
def test_single_part(self):
assert _join_url_parts(["hello"]) == "hello"
def test_two_parts(self):
assert _join_url_parts(["https://example.com", "path"]) == "https://example.com/path"
def test_preserves_scheme(self):
assert _join_url_parts(["https://example.com/", "/api/", "v1"]) == "https://example.com/api/v1"
def test_trailing_slash_preserved(self):
result = _join_url_parts(["https://example.com", "path/"])
assert result.endswith("/")
def test_no_trailing_slash_when_last_has_none(self):
result = _join_url_parts(["https://example.com", "path"])
assert not result.endswith("/")
def test_strips_internal_slashes(self):
result = _join_url_parts(["https://example.com/", "/api/", "/v1"])
assert result == "https://example.com/api/v1"
def test_absolute_url_mid_list_replaces(self):
result = _join_url_parts(["https://old.com/foo", "https://new.com/bar"])
assert result == "https://new.com/bar"
def test_query_string_attached(self):
result = _join_url_parts(["https://example.com/path", "?key=val"])
assert result == "https://example.com/path?key=val"
def test_fragment_attached(self):
result = _join_url_parts(["https://example.com/path", "#section"])
assert result == "https://example.com/path#section"
def test_filters_none_and_empty(self):
result = _join_url_parts(["https://example.com", None, "", "path"])
assert result == "https://example.com/path"
def test_no_scheme(self):
result = _join_url_parts(["/foo", "bar", "baz/"])
assert result == "foo/bar/baz/"
def test_multiple_segments(self):
result = _join_url_parts(["https://example.com", "a", "b", "c/"])
assert result == "https://example.com/a/b/c/"
# ---------------------------------------------------------------------------
# join_url
# ---------------------------------------------------------------------------
class TestJoinUrl:
def test_string_input(self):
assert join_url("https://example.com") == "https://example.com"
def test_list_input(self):
assert join_url(["https://example.com", "path"]) == "https://example.com/path"
def test_tuple_input(self):
assert join_url(("https://example.com", "path/")) == "https://example.com/path/"
# ---------------------------------------------------------------------------
# normalize_text
# ---------------------------------------------------------------------------
class TestNormalizeText:
def test_collapses_whitespace(self):
assert normalize_text(" hello world ") == "hello world"
def test_tabs_and_newlines(self):
assert normalize_text("hello\t\nworld") == "hello world"
def test_empty_string(self):
assert normalize_text("") == ""
def test_none_input(self):
assert normalize_text(None) == ""
def test_single_word(self):
assert normalize_text(" word ") == "word"
# ---------------------------------------------------------------------------
# soup_of
# ---------------------------------------------------------------------------
class TestSoupOf:
def test_parses_html(self):
s = soup_of("<p>Hello <b>world</b></p>")
assert s.find("b").text == "world"
def test_empty_html(self):
s = soup_of("")
assert s.text == ""