Domain isolation: typed contracts, service registry, and composable wiring

Add typed service contracts (Protocols + frozen DTOs) in shared/contracts/
for cross-domain communication. Each domain exposes a service interface
(BlogService, CalendarService, MarketService, CartService) backed by SQL
implementations in shared/services/. A singleton registry with has() guards
enables composable startup — apps register their own domain service and
stubs for absent domains.

Absorbs glue layer: navigation, relationships, event handlers (login,
container, order) now live in shared/ with has()-guarded service calls.
Factory gains domain_services_fn parameter for per-app service registration.

Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
This commit is contained in:
giles
2026-02-19 04:29:10 +00:00
parent ea7dc9723a
commit 70b1c7de10
19 changed files with 1375 additions and 5 deletions

31
contracts/__init__.py Normal file
View File

@@ -0,0 +1,31 @@
"""Typed contracts (DTOs + Protocols) for cross-domain service interfaces."""
from .dtos import (
PostDTO,
CalendarDTO,
CalendarEntryDTO,
MarketPlaceDTO,
ProductDTO,
CartItemDTO,
CartSummaryDTO,
)
from .protocols import (
BlogService,
CalendarService,
MarketService,
CartService,
)
__all__ = [
"PostDTO",
"CalendarDTO",
"CalendarEntryDTO",
"MarketPlaceDTO",
"ProductDTO",
"CartItemDTO",
"CartSummaryDTO",
"BlogService",
"CalendarService",
"MarketService",
"CartService",
]

113
contracts/dtos.py Normal file
View File

@@ -0,0 +1,113 @@
"""Frozen dataclasses for cross-domain data transfer.
These are the *only* shapes that cross domain boundaries. Consumers never
see ORM model instances from another domain — only these DTOs.
"""
from __future__ import annotations
from dataclasses import dataclass, field
from datetime import datetime
from decimal import Decimal
# ---------------------------------------------------------------------------
# Blog domain
# ---------------------------------------------------------------------------
@dataclass(frozen=True, slots=True)
class PostDTO:
id: int
slug: str
title: str
status: str
visibility: str
is_page: bool = False
feature_image: str | None = None
html: str | None = None
excerpt: str | None = None
custom_excerpt: str | None = None
published_at: datetime | None = None
# ---------------------------------------------------------------------------
# Calendar / Events domain
# ---------------------------------------------------------------------------
@dataclass(frozen=True, slots=True)
class CalendarDTO:
id: int
container_type: str
container_id: int
name: str
slug: str
description: str | None = None
@dataclass(frozen=True, slots=True)
class CalendarEntryDTO:
id: int
calendar_id: int
name: str
start_at: datetime
state: str
cost: Decimal
end_at: datetime | None = None
user_id: int | None = None
session_id: str | None = None
order_id: int | None = None
slot_id: int | None = None
ticket_price: Decimal | None = None
ticket_count: int | None = None
calendar_name: str | None = None
calendar_slug: str | None = None
# ---------------------------------------------------------------------------
# Market domain
# ---------------------------------------------------------------------------
@dataclass(frozen=True, slots=True)
class MarketPlaceDTO:
id: int
container_type: str
container_id: int
name: str
slug: str
description: str | None = None
@dataclass(frozen=True, slots=True)
class ProductDTO:
id: int
slug: str
title: str | None = None
image: str | None = None
description_short: str | None = None
rrp: Decimal | None = None
regular_price: Decimal | None = None
special_price: Decimal | None = None
# ---------------------------------------------------------------------------
# Cart domain
# ---------------------------------------------------------------------------
@dataclass(frozen=True, slots=True)
class CartItemDTO:
id: int
product_id: int
quantity: int
product_title: str | None = None
product_slug: str | None = None
product_image: str | None = None
unit_price: Decimal | None = None
market_place_id: int | None = None
@dataclass(frozen=True, slots=True)
class CartSummaryDTO:
count: int = 0
total: Decimal = Decimal("0")
calendar_count: int = 0
calendar_total: Decimal = Decimal("0")
items: list[CartItemDTO] = field(default_factory=list)

95
contracts/protocols.py Normal file
View File

@@ -0,0 +1,95 @@
"""Protocol classes defining each domain's service interface.
All cross-domain callers program against these Protocols. Concrete
implementations (Sql*Service) and no-op stubs both satisfy them.
"""
from __future__ import annotations
from typing import Protocol, runtime_checkable
from sqlalchemy.ext.asyncio import AsyncSession
from .dtos import (
PostDTO,
CalendarDTO,
CalendarEntryDTO,
MarketPlaceDTO,
ProductDTO,
CartItemDTO,
CartSummaryDTO,
)
@runtime_checkable
class BlogService(Protocol):
async def get_post_by_slug(self, session: AsyncSession, slug: str) -> PostDTO | None: ...
async def get_post_by_id(self, session: AsyncSession, id: int) -> PostDTO | None: ...
async def get_posts_by_ids(self, session: AsyncSession, ids: list[int]) -> list[PostDTO]: ...
@runtime_checkable
class CalendarService(Protocol):
async def calendars_for_container(
self, session: AsyncSession, container_type: str, container_id: int,
) -> list[CalendarDTO]: ...
async def pending_entries(
self, session: AsyncSession, *, user_id: int | None, session_id: str | None,
) -> list[CalendarEntryDTO]: ...
async def entries_for_page(
self, session: AsyncSession, page_id: int, *, user_id: int | None, session_id: str | None,
) -> list[CalendarEntryDTO]: ...
async def entry_by_id(self, session: AsyncSession, entry_id: int) -> CalendarEntryDTO | None: ...
async def associated_entries(
self, session: AsyncSession, content_type: str, content_id: int, page: int,
) -> tuple[list[CalendarEntryDTO], bool]: ...
async def toggle_entry_post(
self, session: AsyncSession, entry_id: int, content_type: str, content_id: int,
) -> bool: ...
async def adopt_entries_for_user(
self, session: AsyncSession, user_id: int, session_id: str,
) -> None: ...
async def claim_entries_for_order(
self, session: AsyncSession, order_id: int, user_id: int | None,
session_id: str | None, page_post_id: int | None,
) -> None: ...
async def confirm_entries_for_order(
self, session: AsyncSession, order_id: int, user_id: int | None,
session_id: str | None,
) -> None: ...
async def get_entries_for_order(
self, session: AsyncSession, order_id: int,
) -> list[CalendarEntryDTO]: ...
@runtime_checkable
class MarketService(Protocol):
async def marketplaces_for_container(
self, session: AsyncSession, container_type: str, container_id: int,
) -> list[MarketPlaceDTO]: ...
async def product_by_id(self, session: AsyncSession, product_id: int) -> ProductDTO | None: ...
@runtime_checkable
class CartService(Protocol):
async def cart_summary(
self, session: AsyncSession, *, user_id: int | None, session_id: str | None,
page_slug: str | None = None,
) -> CartSummaryDTO: ...
async def cart_items(
self, session: AsyncSession, *, user_id: int | None, session_id: str | None,
) -> list[CartItemDTO]: ...
async def adopt_cart_for_user(
self, session: AsyncSession, user_id: int, session_id: str,
) -> None: ...

View File

@@ -0,0 +1,8 @@
"""Shared event handlers (replaces glue.setup.register_glue_handlers)."""
def register_shared_handlers():
"""Import handler modules to trigger registration. Call at app startup."""
import shared.events.handlers.container_handlers # noqa: F401
import shared.events.handlers.login_handlers # noqa: F401
import shared.events.handlers.order_handlers # noqa: F401

View File

@@ -0,0 +1,19 @@
from __future__ import annotations
from sqlalchemy.ext.asyncio import AsyncSession
from shared.events import register_handler
from shared.models.domain_event import DomainEvent
from shared.services.navigation import rebuild_navigation
async def on_child_attached(event: DomainEvent, session: AsyncSession) -> None:
await rebuild_navigation(session)
async def on_child_detached(event: DomainEvent, session: AsyncSession) -> None:
await rebuild_navigation(session)
register_handler("container.child_attached", on_child_attached)
register_handler("container.child_detached", on_child_detached)

View File

@@ -0,0 +1,24 @@
from __future__ import annotations
from sqlalchemy.ext.asyncio import AsyncSession
from shared.events import register_handler
from shared.models.domain_event import DomainEvent
from shared.services.registry import services
async def on_user_logged_in(event: DomainEvent, session: AsyncSession) -> None:
payload = event.payload
user_id = payload["user_id"]
session_id = payload["session_id"]
# Adopt cart items (if cart service is registered)
if services.has("cart"):
await services.cart.adopt_cart_for_user(session, user_id, session_id)
# Adopt calendar entries (if calendar service is registered)
if services.has("calendar"):
await services.calendar.adopt_entries_for_user(session, user_id, session_id)
register_handler("user.logged_in", on_user_logged_in)

View File

@@ -0,0 +1,22 @@
from __future__ import annotations
import logging
from sqlalchemy.ext.asyncio import AsyncSession
from shared.events import register_handler
from shared.models.domain_event import DomainEvent
log = logging.getLogger(__name__)
async def on_order_created(event: DomainEvent, session: AsyncSession) -> None:
log.info("order.created: order_id=%s", event.payload.get("order_id"))
async def on_order_paid(event: DomainEvent, session: AsyncSession) -> None:
log.info("order.paid: order_id=%s", event.payload.get("order_id"))
register_handler("order.created", on_order_created)
register_handler("order.paid", on_order_paid)

View File

@@ -42,6 +42,7 @@ def create_base_app(
*,
context_fn: Callable[[], Awaitable[dict]] | None = None,
before_request_fns: Sequence[Callable[[], Awaitable[None]]] | None = None,
domain_services_fn: Callable[[], None] | None = None,
) -> Quart:
"""
Create a Quart app with shared infrastructure.
@@ -57,7 +58,13 @@ def create_base_app(
If not provided, a minimal default context is used.
before_request_fns:
Extra before-request hooks (e.g. cart_loader for the cart app).
domain_services_fn:
Callable that registers domain services on the shared registry.
Each app provides its own — registering real impls for owned
domains and stubs (or real impls) for others.
"""
if domain_services_fn is not None:
domain_services_fn()
app = Quart(
name,
static_folder=STATIC_DIR,
@@ -144,11 +151,8 @@ def create_base_app(
# --- startup ---
@app.before_serving
async def _startup():
try:
from glue.setup import register_glue_handlers
register_glue_handlers()
except ImportError:
pass # glue submodule not present in this build context
from shared.events.handlers import register_shared_handlers
register_shared_handlers()
await init_config()
print(pretty())
await _event_processor.start()

View File

@@ -0,0 +1,38 @@
from datetime import datetime
from typing import Optional
from sqlalchemy.orm import Mapped, mapped_column
from sqlalchemy import Integer, String, DateTime, Index, UniqueConstraint, func
from shared.db.base import Base
class ContainerRelation(Base):
__tablename__ = "container_relations"
__table_args__ = (
UniqueConstraint(
"parent_type", "parent_id", "child_type", "child_id",
name="uq_container_relations_parent_child",
),
Index("ix_container_relations_parent", "parent_type", "parent_id"),
Index("ix_container_relations_child", "child_type", "child_id"),
)
id: Mapped[int] = mapped_column(Integer, primary_key=True, autoincrement=True)
parent_type: Mapped[str] = mapped_column(String(32), nullable=False)
parent_id: Mapped[int] = mapped_column(Integer, nullable=False)
child_type: Mapped[str] = mapped_column(String(32), nullable=False)
child_id: Mapped[int] = mapped_column(Integer, nullable=False)
sort_order: Mapped[int] = mapped_column(Integer, nullable=False, default=0)
label: Mapped[Optional[str]] = mapped_column(String(255), nullable=True)
created_at: Mapped[datetime] = mapped_column(
DateTime(timezone=True),
server_default=func.now(),
nullable=False,
)
deleted_at: Mapped[Optional[datetime]] = mapped_column(
DateTime(timezone=True),
nullable=True,
)

50
models/menu_node.py Normal file
View File

@@ -0,0 +1,50 @@
from datetime import datetime
from typing import Optional
from sqlalchemy.orm import Mapped, mapped_column
from sqlalchemy import Integer, String, Text, DateTime, ForeignKey, Index, func
from shared.db.base import Base
class MenuNode(Base):
__tablename__ = "menu_nodes"
__table_args__ = (
Index("ix_menu_nodes_container", "container_type", "container_id"),
Index("ix_menu_nodes_parent_id", "parent_id"),
)
id: Mapped[int] = mapped_column(Integer, primary_key=True, autoincrement=True)
container_type: Mapped[str] = mapped_column(String(32), nullable=False)
container_id: Mapped[int] = mapped_column(Integer, nullable=False)
parent_id: Mapped[Optional[int]] = mapped_column(
Integer,
ForeignKey("menu_nodes.id", ondelete="SET NULL"),
nullable=True,
)
sort_order: Mapped[int] = mapped_column(Integer, nullable=False, default=0)
depth: Mapped[int] = mapped_column(Integer, nullable=False, default=0)
label: Mapped[str] = mapped_column(String(255), nullable=False)
slug: Mapped[Optional[str]] = mapped_column(String(255), nullable=True)
href: Mapped[Optional[str]] = mapped_column(String(1024), nullable=True)
icon: Mapped[Optional[str]] = mapped_column(String(64), nullable=True)
feature_image: Mapped[Optional[str]] = mapped_column(Text, nullable=True)
created_at: Mapped[datetime] = mapped_column(
DateTime(timezone=True),
server_default=func.now(),
nullable=False,
)
updated_at: Mapped[datetime] = mapped_column(
DateTime(timezone=True),
server_default=func.now(),
onupdate=func.now(),
nullable=False,
)
deleted_at: Mapped[Optional[datetime]] = mapped_column(
DateTime(timezone=True),
nullable=True,
)

5
services/__init__.py Normal file
View File

@@ -0,0 +1,5 @@
"""Domain service implementations and registry."""
from .registry import services
__all__ = ["services"]

65
services/blog_impl.py Normal file
View File

@@ -0,0 +1,65 @@
"""SQL-backed BlogService implementation.
Queries ``shared.models.ghost_content.Post`` — only this module may read
blog-domain tables on behalf of other domains.
"""
from __future__ import annotations
from sqlalchemy import select, func
from sqlalchemy.ext.asyncio import AsyncSession
from shared.models.ghost_content import Post
from shared.contracts.dtos import PostDTO
def _post_to_dto(post: Post) -> PostDTO:
return PostDTO(
id=post.id,
slug=post.slug,
title=post.title,
status=post.status,
visibility=post.visibility,
is_page=post.is_page,
feature_image=post.feature_image,
html=post.html,
excerpt=post.excerpt,
custom_excerpt=post.custom_excerpt,
published_at=post.published_at,
)
class SqlBlogService:
async def get_post_by_slug(self, session: AsyncSession, slug: str) -> PostDTO | None:
post = (
await session.execute(select(Post).where(Post.slug == slug))
).scalar_one_or_none()
return _post_to_dto(post) if post else None
async def get_post_by_id(self, session: AsyncSession, id: int) -> PostDTO | None:
post = (
await session.execute(select(Post).where(Post.id == id))
).scalar_one_or_none()
return _post_to_dto(post) if post else None
async def get_posts_by_ids(self, session: AsyncSession, ids: list[int]) -> list[PostDTO]:
if not ids:
return []
result = await session.execute(select(Post).where(Post.id.in_(ids)))
return [_post_to_dto(p) for p in result.scalars().all()]
async def search_posts(
self, session: AsyncSession, query: str, page: int = 1, per_page: int = 10,
) -> tuple[list[PostDTO], int]:
"""Search posts by title with pagination. Not part of the Protocol
(admin-only use in events), but provided for convenience."""
if query:
count_stmt = select(func.count(Post.id)).where(Post.title.ilike(f"%{query}%"))
posts_stmt = select(Post).where(Post.title.ilike(f"%{query}%")).order_by(Post.title)
else:
count_stmt = select(func.count(Post.id))
posts_stmt = select(Post).order_by(Post.published_at.desc().nullslast())
total = (await session.execute(count_stmt)).scalar() or 0
offset = (page - 1) * per_page
result = await session.execute(posts_stmt.limit(per_page).offset(offset))
return [_post_to_dto(p) for p in result.scalars().all()], total

306
services/calendar_impl.py Normal file
View File

@@ -0,0 +1,306 @@
"""SQL-backed CalendarService implementation.
Queries ``shared.models.calendars.*`` — only this module may write to
calendar-domain tables on behalf of other domains.
"""
from __future__ import annotations
from sqlalchemy import select, update, func
from sqlalchemy.ext.asyncio import AsyncSession
from sqlalchemy.orm import selectinload
from shared.models.calendars import Calendar, CalendarEntry, CalendarEntryPost
from shared.contracts.dtos import CalendarDTO, CalendarEntryDTO
def _cal_to_dto(cal: Calendar) -> CalendarDTO:
return CalendarDTO(
id=cal.id,
container_type=cal.container_type,
container_id=cal.container_id,
name=cal.name,
slug=cal.slug,
description=cal.description,
)
def _entry_to_dto(entry: CalendarEntry) -> CalendarEntryDTO:
cal = getattr(entry, "calendar", None)
return CalendarEntryDTO(
id=entry.id,
calendar_id=entry.calendar_id,
name=entry.name,
start_at=entry.start_at,
state=entry.state,
cost=entry.cost,
end_at=entry.end_at,
user_id=entry.user_id,
session_id=entry.session_id,
order_id=entry.order_id,
slot_id=entry.slot_id,
ticket_price=entry.ticket_price,
ticket_count=entry.ticket_count,
calendar_name=cal.name if cal else None,
calendar_slug=cal.slug if cal else None,
)
class SqlCalendarService:
# -- reads ----------------------------------------------------------------
async def calendars_for_container(
self, session: AsyncSession, container_type: str, container_id: int,
) -> list[CalendarDTO]:
result = await session.execute(
select(Calendar).where(
Calendar.container_type == container_type,
Calendar.container_id == container_id,
Calendar.deleted_at.is_(None),
).order_by(Calendar.name.asc())
)
return [_cal_to_dto(c) for c in result.scalars().all()]
async def pending_entries(
self, session: AsyncSession, *, user_id: int | None, session_id: str | None,
) -> list[CalendarEntryDTO]:
filters = [
CalendarEntry.deleted_at.is_(None),
CalendarEntry.state == "pending",
]
if user_id is not None:
filters.append(CalendarEntry.user_id == user_id)
elif session_id is not None:
filters.append(CalendarEntry.session_id == session_id)
else:
return []
result = await session.execute(
select(CalendarEntry)
.where(*filters)
.order_by(CalendarEntry.start_at.asc())
.options(selectinload(CalendarEntry.calendar))
)
return [_entry_to_dto(e) for e in result.scalars().all()]
async def entries_for_page(
self, session: AsyncSession, page_id: int, *,
user_id: int | None, session_id: str | None,
) -> list[CalendarEntryDTO]:
cal_ids = select(Calendar.id).where(
Calendar.container_type == "page",
Calendar.container_id == page_id,
Calendar.deleted_at.is_(None),
).scalar_subquery()
filters = [
CalendarEntry.deleted_at.is_(None),
CalendarEntry.state == "pending",
CalendarEntry.calendar_id.in_(cal_ids),
]
if user_id is not None:
filters.append(CalendarEntry.user_id == user_id)
elif session_id is not None:
filters.append(CalendarEntry.session_id == session_id)
else:
return []
result = await session.execute(
select(CalendarEntry)
.where(*filters)
.order_by(CalendarEntry.start_at.asc())
.options(selectinload(CalendarEntry.calendar))
)
return [_entry_to_dto(e) for e in result.scalars().all()]
async def entry_by_id(self, session: AsyncSession, entry_id: int) -> CalendarEntryDTO | None:
entry = (
await session.execute(
select(CalendarEntry)
.where(CalendarEntry.id == entry_id, CalendarEntry.deleted_at.is_(None))
.options(selectinload(CalendarEntry.calendar))
)
).scalar_one_or_none()
return _entry_to_dto(entry) if entry else None
async def entry_ids_for_content(
self, session: AsyncSession, content_type: str, content_id: int,
) -> set[int]:
"""Get entry IDs associated with a content item (e.g. post)."""
result = await session.execute(
select(CalendarEntryPost.entry_id).where(
CalendarEntryPost.content_type == content_type,
CalendarEntryPost.content_id == content_id,
CalendarEntryPost.deleted_at.is_(None),
)
)
return set(result.scalars().all())
async def associated_entries(
self, session: AsyncSession, content_type: str, content_id: int, page: int,
) -> tuple[list[CalendarEntryDTO], bool]:
"""Get paginated confirmed entries associated with a content item."""
per_page = 10
entry_ids_result = await session.execute(
select(CalendarEntryPost.entry_id).where(
CalendarEntryPost.content_type == content_type,
CalendarEntryPost.content_id == content_id,
CalendarEntryPost.deleted_at.is_(None),
)
)
entry_ids = set(entry_ids_result.scalars().all())
if not entry_ids:
return [], False
offset = (page - 1) * per_page
result = await session.execute(
select(CalendarEntry)
.where(
CalendarEntry.id.in_(entry_ids),
CalendarEntry.deleted_at.is_(None),
CalendarEntry.state == "confirmed",
)
.order_by(CalendarEntry.start_at.desc())
.limit(per_page)
.offset(offset)
.options(selectinload(CalendarEntry.calendar))
)
entries = result.scalars().all()
has_more = len(entries) == per_page
return [_entry_to_dto(e) for e in entries], has_more
async def toggle_entry_post(
self, session: AsyncSession, entry_id: int, content_type: str, content_id: int,
) -> bool:
"""Toggle association; returns True if now associated, False if removed."""
existing = await session.scalar(
select(CalendarEntryPost).where(
CalendarEntryPost.entry_id == entry_id,
CalendarEntryPost.content_type == content_type,
CalendarEntryPost.content_id == content_id,
CalendarEntryPost.deleted_at.is_(None),
)
)
if existing:
existing.deleted_at = func.now()
await session.flush()
return False
else:
assoc = CalendarEntryPost(
entry_id=entry_id,
content_type=content_type,
content_id=content_id,
)
session.add(assoc)
await session.flush()
return True
async def get_entries_for_order(
self, session: AsyncSession, order_id: int,
) -> list[CalendarEntryDTO]:
result = await session.execute(
select(CalendarEntry)
.where(
CalendarEntry.order_id == order_id,
CalendarEntry.deleted_at.is_(None),
)
.options(selectinload(CalendarEntry.calendar))
)
return [_entry_to_dto(e) for e in result.scalars().all()]
# -- batch reads (not in protocol — convenience for blog service) ---------
async def confirmed_entries_for_posts(
self, session: AsyncSession, post_ids: list[int],
) -> dict[int, list[CalendarEntryDTO]]:
"""Return confirmed entries grouped by post_id for a batch of posts."""
if not post_ids:
return {}
result = await session.execute(
select(CalendarEntry, CalendarEntryPost.content_id)
.join(CalendarEntryPost, CalendarEntry.id == CalendarEntryPost.entry_id)
.options(selectinload(CalendarEntry.calendar))
.where(
CalendarEntryPost.content_type == "post",
CalendarEntryPost.content_id.in_(post_ids),
CalendarEntryPost.deleted_at.is_(None),
CalendarEntry.deleted_at.is_(None),
CalendarEntry.state == "confirmed",
)
.order_by(CalendarEntry.start_at.asc())
)
entries_by_post: dict[int, list[CalendarEntryDTO]] = {}
for entry, post_id in result:
entries_by_post.setdefault(post_id, []).append(_entry_to_dto(entry))
return entries_by_post
# -- writes (absorb glue lifecycle) ---------------------------------------
async def adopt_entries_for_user(
self, session: AsyncSession, user_id: int, session_id: str,
) -> None:
"""Adopt anonymous calendar entries for a logged-in user."""
await session.execute(
update(CalendarEntry)
.where(CalendarEntry.deleted_at.is_(None), CalendarEntry.user_id == user_id)
.values(deleted_at=func.now())
)
cal_result = await session.execute(
select(CalendarEntry).where(
CalendarEntry.deleted_at.is_(None),
CalendarEntry.session_id == session_id,
)
)
for entry in cal_result.scalars().all():
entry.user_id = user_id
async def claim_entries_for_order(
self, session: AsyncSession, order_id: int, user_id: int | None,
session_id: str | None, page_post_id: int | None,
) -> None:
"""Mark pending CalendarEntries as 'ordered' and set order_id."""
filters = [
CalendarEntry.deleted_at.is_(None),
CalendarEntry.state == "pending",
]
if user_id is not None:
filters.append(CalendarEntry.user_id == user_id)
elif session_id is not None:
filters.append(CalendarEntry.session_id == session_id)
if page_post_id is not None:
cal_ids = select(Calendar.id).where(
Calendar.container_type == "page",
Calendar.container_id == page_post_id,
Calendar.deleted_at.is_(None),
).scalar_subquery()
filters.append(CalendarEntry.calendar_id.in_(cal_ids))
await session.execute(
update(CalendarEntry)
.where(*filters)
.values(state="ordered", order_id=order_id)
)
async def confirm_entries_for_order(
self, session: AsyncSession, order_id: int, user_id: int | None,
session_id: str | None,
) -> None:
"""Mark ordered CalendarEntries as 'provisional'."""
filters = [
CalendarEntry.deleted_at.is_(None),
CalendarEntry.state == "ordered",
CalendarEntry.order_id == order_id,
]
if user_id is not None:
filters.append(CalendarEntry.user_id == user_id)
elif session_id is not None:
filters.append(CalendarEntry.session_id == session_id)
await session.execute(
update(CalendarEntry)
.where(*filters)
.values(state="provisional")
)

143
services/cart_impl.py Normal file
View File

@@ -0,0 +1,143 @@
"""SQL-backed CartService implementation.
Queries ``shared.models.market.CartItem`` — only this module may write
to cart-domain tables on behalf of other domains.
"""
from __future__ import annotations
from decimal import Decimal
from sqlalchemy import select, update, func
from sqlalchemy.ext.asyncio import AsyncSession
from sqlalchemy.orm import selectinload
from shared.models.market import CartItem
from shared.models.market_place import MarketPlace
from shared.models.calendars import CalendarEntry, Calendar
from shared.contracts.dtos import CartItemDTO, CartSummaryDTO
def _item_to_dto(ci: CartItem) -> CartItemDTO:
product = ci.product
return CartItemDTO(
id=ci.id,
product_id=ci.product_id,
quantity=ci.quantity,
product_title=product.title if product else None,
product_slug=product.slug if product else None,
product_image=product.image if product else None,
unit_price=Decimal(str(product.special_price or product.regular_price or 0)) if product else None,
market_place_id=ci.market_place_id,
)
class SqlCartService:
async def cart_summary(
self, session: AsyncSession, *,
user_id: int | None, session_id: str | None,
page_slug: str | None = None,
) -> CartSummaryDTO:
"""Build a lightweight cart summary for the current identity."""
# Resolve page filter
page_post_id: int | None = None
if page_slug:
from shared.services.registry import services
post = await services.blog.get_post_by_slug(session, page_slug)
if post and post.is_page:
page_post_id = post.id
# --- product cart ---
cart_q = select(CartItem).where(CartItem.deleted_at.is_(None))
if user_id is not None:
cart_q = cart_q.where(CartItem.user_id == user_id)
elif session_id is not None:
cart_q = cart_q.where(CartItem.session_id == session_id)
else:
return CartSummaryDTO()
if page_post_id is not None:
mp_ids = select(MarketPlace.id).where(
MarketPlace.container_type == "page",
MarketPlace.container_id == page_post_id,
MarketPlace.deleted_at.is_(None),
).scalar_subquery()
cart_q = cart_q.where(CartItem.market_place_id.in_(mp_ids))
cart_q = cart_q.options(selectinload(CartItem.product))
result = await session.execute(cart_q)
cart_items = result.scalars().all()
count = sum(ci.quantity for ci in cart_items)
total = sum(
Decimal(str(ci.product.special_price or ci.product.regular_price or 0)) * ci.quantity
for ci in cart_items
if ci.product and (ci.product.special_price or ci.product.regular_price)
)
# --- calendar entries ---
from shared.services.registry import services
if page_post_id is not None:
cal_entries = await services.calendar.entries_for_page(
session, page_post_id,
user_id=user_id,
session_id=session_id,
)
else:
cal_entries = await services.calendar.pending_entries(
session,
user_id=user_id,
session_id=session_id,
)
calendar_count = len(cal_entries)
calendar_total = sum(Decimal(str(e.cost or 0)) for e in cal_entries if e.cost is not None)
items = [_item_to_dto(ci) for ci in cart_items]
return CartSummaryDTO(
count=count,
total=total,
calendar_count=calendar_count,
calendar_total=calendar_total,
items=items,
)
async def cart_items(
self, session: AsyncSession, *,
user_id: int | None, session_id: str | None,
) -> list[CartItemDTO]:
cart_q = select(CartItem).where(CartItem.deleted_at.is_(None))
if user_id is not None:
cart_q = cart_q.where(CartItem.user_id == user_id)
elif session_id is not None:
cart_q = cart_q.where(CartItem.session_id == session_id)
else:
return []
cart_q = cart_q.options(selectinload(CartItem.product)).order_by(CartItem.created_at.desc())
result = await session.execute(cart_q)
return [_item_to_dto(ci) for ci in result.scalars().all()]
async def adopt_cart_for_user(
self, session: AsyncSession, user_id: int, session_id: str,
) -> None:
"""Adopt anonymous cart items for a logged-in user."""
anon_result = await session.execute(
select(CartItem).where(
CartItem.deleted_at.is_(None),
CartItem.user_id.is_(None),
CartItem.session_id == session_id,
)
)
anon_items = anon_result.scalars().all()
if anon_items:
# Soft-delete existing user cart
await session.execute(
update(CartItem)
.where(CartItem.deleted_at.is_(None), CartItem.user_id == user_id)
.values(deleted_at=func.now())
)
for ci in anon_items:
ci.user_id = user_id

57
services/market_impl.py Normal file
View File

@@ -0,0 +1,57 @@
"""SQL-backed MarketService implementation.
Queries ``shared.models.market.*`` and ``shared.models.market_place.*`` —
only this module may read market-domain tables on behalf of other domains.
"""
from __future__ import annotations
from sqlalchemy import select
from sqlalchemy.ext.asyncio import AsyncSession
from shared.models.market import Product
from shared.models.market_place import MarketPlace
from shared.contracts.dtos import MarketPlaceDTO, ProductDTO
def _mp_to_dto(mp: MarketPlace) -> MarketPlaceDTO:
return MarketPlaceDTO(
id=mp.id,
container_type=mp.container_type,
container_id=mp.container_id,
name=mp.name,
slug=mp.slug,
description=mp.description,
)
def _product_to_dto(p: Product) -> ProductDTO:
return ProductDTO(
id=p.id,
slug=p.slug,
title=p.title,
image=p.image,
description_short=p.description_short,
rrp=p.rrp,
regular_price=p.regular_price,
special_price=p.special_price,
)
class SqlMarketService:
async def marketplaces_for_container(
self, session: AsyncSession, container_type: str, container_id: int,
) -> list[MarketPlaceDTO]:
result = await session.execute(
select(MarketPlace).where(
MarketPlace.container_type == container_type,
MarketPlace.container_id == container_id,
MarketPlace.deleted_at.is_(None),
).order_by(MarketPlace.name.asc())
)
return [_mp_to_dto(mp) for mp in result.scalars().all()]
async def product_by_id(self, session: AsyncSession, product_id: int) -> ProductDTO | None:
product = (
await session.execute(select(Product).where(Product.id == product_id))
).scalar_one_or_none()
return _product_to_dto(product) if product else None

32
services/navigation.py Normal file
View File

@@ -0,0 +1,32 @@
from __future__ import annotations
from sqlalchemy import select
from sqlalchemy.ext.asyncio import AsyncSession
from shared.models.menu_node import MenuNode
async def get_navigation_tree(session: AsyncSession) -> list[MenuNode]:
"""
Return top-level menu nodes ordered by sort_order.
All apps call this directly (shared DB) — no more HTTP API.
"""
result = await session.execute(
select(MenuNode)
.where(MenuNode.deleted_at.is_(None), MenuNode.depth == 0)
.order_by(MenuNode.sort_order.asc(), MenuNode.id.asc())
)
return list(result.scalars().all())
async def rebuild_navigation(session: AsyncSession) -> None:
"""
Rebuild menu_nodes from container_relations.
Called by event handlers when relationships change.
Currently a no-op placeholder — menu nodes are managed directly
by the admin UI. When the full relationship-driven nav is needed,
this will sync ContainerRelation -> MenuNode.
"""
pass

92
services/registry.py Normal file
View File

@@ -0,0 +1,92 @@
"""Typed singleton registry for domain services.
Usage::
from shared.services.registry import services
# Register at app startup
services.blog = SqlBlogService()
# Query anywhere
if services.has("calendar"):
entries = await services.calendar.pending_entries(session, ...)
# Or use stubs for absent domains
summary = await services.cart.cart_summary(session, ...)
"""
from __future__ import annotations
from shared.contracts.protocols import (
BlogService,
CalendarService,
MarketService,
CartService,
)
class _ServiceRegistry:
"""Central registry holding one implementation per domain.
Properties return the registered implementation or raise
``RuntimeError`` if nothing is registered. Use ``has(name)``
to check before access when the domain might be absent.
"""
def __init__(self) -> None:
self._blog: BlogService | None = None
self._calendar: CalendarService | None = None
self._market: MarketService | None = None
self._cart: CartService | None = None
# -- blog -----------------------------------------------------------------
@property
def blog(self) -> BlogService:
if self._blog is None:
raise RuntimeError("BlogService not registered")
return self._blog
@blog.setter
def blog(self, impl: BlogService) -> None:
self._blog = impl
# -- calendar -------------------------------------------------------------
@property
def calendar(self) -> CalendarService:
if self._calendar is None:
raise RuntimeError("CalendarService not registered")
return self._calendar
@calendar.setter
def calendar(self, impl: CalendarService) -> None:
self._calendar = impl
# -- market ---------------------------------------------------------------
@property
def market(self) -> MarketService:
if self._market is None:
raise RuntimeError("MarketService not registered")
return self._market
@market.setter
def market(self, impl: MarketService) -> None:
self._market = impl
# -- cart -----------------------------------------------------------------
@property
def cart(self) -> CartService:
if self._cart is None:
raise RuntimeError("CartService not registered")
return self._cart
@cart.setter
def cart(self, impl: CartService) -> None:
self._cart = impl
# -- introspection --------------------------------------------------------
def has(self, name: str) -> bool:
"""Check whether a domain service is registered."""
return getattr(self, f"_{name}", None) is not None
# Module-level singleton — import this everywhere.
services = _ServiceRegistry()

155
services/relationships.py Normal file
View File

@@ -0,0 +1,155 @@
from __future__ import annotations
from sqlalchemy import select, func
from sqlalchemy.ext.asyncio import AsyncSession
from shared.events import emit_event
from shared.models.container_relation import ContainerRelation
async def attach_child(
session: AsyncSession,
parent_type: str,
parent_id: int,
child_type: str,
child_id: int,
label: str | None = None,
sort_order: int | None = None,
) -> ContainerRelation:
"""
Create a ContainerRelation and emit container.child_attached event.
Upsert behaviour: if a relation already exists (including soft-deleted),
revive it instead of inserting a duplicate.
"""
# Check for existing (including soft-deleted)
existing = await session.scalar(
select(ContainerRelation).where(
ContainerRelation.parent_type == parent_type,
ContainerRelation.parent_id == parent_id,
ContainerRelation.child_type == child_type,
ContainerRelation.child_id == child_id,
)
)
if existing:
if existing.deleted_at is not None:
# Revive soft-deleted relation
existing.deleted_at = None
if sort_order is not None:
existing.sort_order = sort_order
if label is not None:
existing.label = label
await session.flush()
await emit_event(
session,
event_type="container.child_attached",
aggregate_type="container_relation",
aggregate_id=existing.id,
payload={
"parent_type": parent_type,
"parent_id": parent_id,
"child_type": child_type,
"child_id": child_id,
},
)
return existing
# Already attached and active — no-op
return existing
if sort_order is None:
max_order = await session.scalar(
select(func.max(ContainerRelation.sort_order)).where(
ContainerRelation.parent_type == parent_type,
ContainerRelation.parent_id == parent_id,
ContainerRelation.deleted_at.is_(None),
)
)
sort_order = (max_order or 0) + 1
rel = ContainerRelation(
parent_type=parent_type,
parent_id=parent_id,
child_type=child_type,
child_id=child_id,
label=label,
sort_order=sort_order,
)
session.add(rel)
await session.flush()
await emit_event(
session,
event_type="container.child_attached",
aggregate_type="container_relation",
aggregate_id=rel.id,
payload={
"parent_type": parent_type,
"parent_id": parent_id,
"child_type": child_type,
"child_id": child_id,
},
)
return rel
async def get_children(
session: AsyncSession,
parent_type: str,
parent_id: int,
child_type: str | None = None,
) -> list[ContainerRelation]:
"""Query children of a container, optionally filtered by child_type."""
stmt = select(ContainerRelation).where(
ContainerRelation.parent_type == parent_type,
ContainerRelation.parent_id == parent_id,
ContainerRelation.deleted_at.is_(None),
)
if child_type is not None:
stmt = stmt.where(ContainerRelation.child_type == child_type)
stmt = stmt.order_by(
ContainerRelation.sort_order.asc(), ContainerRelation.id.asc()
)
result = await session.execute(stmt)
return list(result.scalars().all())
async def detach_child(
session: AsyncSession,
parent_type: str,
parent_id: int,
child_type: str,
child_id: int,
) -> bool:
"""Soft-delete a ContainerRelation and emit container.child_detached event."""
result = await session.execute(
select(ContainerRelation).where(
ContainerRelation.parent_type == parent_type,
ContainerRelation.parent_id == parent_id,
ContainerRelation.child_type == child_type,
ContainerRelation.child_id == child_id,
ContainerRelation.deleted_at.is_(None),
)
)
rel = result.scalar_one_or_none()
if not rel:
return False
rel.deleted_at = func.now()
await session.flush()
await emit_event(
session,
event_type="container.child_detached",
aggregate_type="container_relation",
aggregate_id=rel.id,
payload={
"parent_type": parent_type,
"parent_id": parent_id,
"child_type": child_type,
"child_id": child_id,
},
)
return True

111
services/stubs.py Normal file
View File

@@ -0,0 +1,111 @@
"""No-op stub services for absent domains.
When an app starts without a particular domain, it registers the stub
so that ``services.X.method()`` returns empty/None rather than crashing.
"""
from __future__ import annotations
from decimal import Decimal
from sqlalchemy.ext.asyncio import AsyncSession
from shared.contracts.dtos import (
PostDTO,
CalendarDTO,
CalendarEntryDTO,
MarketPlaceDTO,
ProductDTO,
CartItemDTO,
CartSummaryDTO,
)
class StubBlogService:
async def get_post_by_slug(self, session: AsyncSession, slug: str) -> PostDTO | None:
return None
async def get_post_by_id(self, session: AsyncSession, id: int) -> PostDTO | None:
return None
async def get_posts_by_ids(self, session: AsyncSession, ids: list[int]) -> list[PostDTO]:
return []
class StubCalendarService:
async def calendars_for_container(
self, session: AsyncSession, container_type: str, container_id: int,
) -> list[CalendarDTO]:
return []
async def pending_entries(
self, session: AsyncSession, *, user_id: int | None, session_id: str | None,
) -> list[CalendarEntryDTO]:
return []
async def entries_for_page(
self, session: AsyncSession, page_id: int, *, user_id: int | None, session_id: str | None,
) -> list[CalendarEntryDTO]:
return []
async def entry_by_id(self, session: AsyncSession, entry_id: int) -> CalendarEntryDTO | None:
return None
async def associated_entries(
self, session: AsyncSession, content_type: str, content_id: int, page: int,
) -> tuple[list[CalendarEntryDTO], bool]:
return [], False
async def toggle_entry_post(
self, session: AsyncSession, entry_id: int, content_type: str, content_id: int,
) -> bool:
return False
async def adopt_entries_for_user(
self, session: AsyncSession, user_id: int, session_id: str,
) -> None:
pass
async def claim_entries_for_order(
self, session: AsyncSession, order_id: int, user_id: int | None,
session_id: str | None, page_post_id: int | None,
) -> None:
pass
async def confirm_entries_for_order(
self, session: AsyncSession, order_id: int, user_id: int | None,
session_id: str | None,
) -> None:
pass
async def get_entries_for_order(
self, session: AsyncSession, order_id: int,
) -> list[CalendarEntryDTO]:
return []
class StubMarketService:
async def marketplaces_for_container(
self, session: AsyncSession, container_type: str, container_id: int,
) -> list[MarketPlaceDTO]:
return []
async def product_by_id(self, session: AsyncSession, product_id: int) -> ProductDTO | None:
return None
class StubCartService:
async def cart_summary(
self, session: AsyncSession, *, user_id: int | None, session_id: str | None,
page_slug: str | None = None,
) -> CartSummaryDTO:
return CartSummaryDTO()
async def cart_items(
self, session: AsyncSession, *, user_id: int | None, session_id: str | None,
) -> list[CartItemDTO]:
return []
async def adopt_cart_for_user(
self, session: AsyncSession, user_id: int, session_id: str,
) -> None:
pass