from __future__ import annotations from typing import Any, Dict, List, Optional, Sequence, Tuple from sqlalchemy import select, func, asc, desc, and_, or_ from sqlalchemy.ext.asyncio import AsyncSession from sqlalchemy.orm import selectinload, joinedload from models.ghost_content import Post, Author, Tag, PostTag from models.tag_group import TagGroup, TagGroupTag class DBAPIError(Exception): """Raised when our local DB returns something unexpected.""" def _author_to_public(a: Optional[Author]) -> Optional[Dict[str, Any]]: if a is None: return None if a.deleted_at is not None: # treat deleted authors as missing return None return { "id": a.ghost_id, "slug": a.slug, "name": a.name, "profile_image": a.profile_image, "cover_image": a.cover_image, # expose more (bio, etc.) if needed } def _tag_to_public(t: Tag) -> Dict[str, Any]: return { "id": t.ghost_id, "slug": t.slug, "name": t.name, "description": t.description, "feature_image": t.feature_image, # fixed key "visibility": t.visibility, "deleted_at": t.deleted_at, } def _post_to_public(p: Post) -> Dict[str, Any]: """ Shape a Post to the public JSON used by the app, mirroring GhostClient._normalise_post. """ # Primary author: explicit or first available primary_author = p.primary_author or (p.authors[0] if p.authors else None) # Primary tag: prefer explicit relationship, otherwise first public/non-deleted tag primary_tag = getattr(p, "primary_tag", None) if primary_tag is None: public_tags = [ t for t in (p.tags or []) if t.deleted_at is None and (t.visibility or "public") == "public" ] primary_tag = public_tags[0] if public_tags else None return { "id": p.id, "ghost_id": p.ghost_id, "slug": p.slug, "title": p.title, "html": p.html, "is_page": p.is_page, "excerpt": p.custom_excerpt or p.excerpt, "custom_excerpt": p.custom_excerpt, "published_at": p.published_at, "updated_at": p.updated_at, "visibility": p.visibility, "status": p.status, "deleted_at": p.deleted_at, "feature_image": p.feature_image, "user_id": p.user_id, "publish_requested": p.publish_requested, "primary_author": _author_to_public(primary_author), "primary_tag": _tag_to_public(primary_tag) if primary_tag else None, "tags": [ _tag_to_public(t) for t in (p.tags or []) if t.deleted_at is None and (t.visibility or "public") == "public" ], "authors": [ _author_to_public(a) for a in (p.authors or []) if a and a.deleted_at is None ], } class DBClient: """ Drop-in replacement for GhostClient, but served from our mirrored tables. Call methods with an AsyncSession. """ def __init__(self, session: AsyncSession): self.sess = session async def list_posts( self, limit: int = 10, page: int = 1, selected_tags: Optional[Sequence[str]] = None, selected_authors: Optional[Sequence[str]] = None, search: Optional[str] = None, drafts: bool = False, drafts_user_id: Optional[int] = None, exclude_covered_tag_ids: Optional[Sequence[int]] = None, ) -> Tuple[List[Dict[str, Any]], Dict[str, Any]]: """ List published posts, optionally filtered by tags/authors and a search term. When drafts=True, lists draft posts instead (filtered by drafts_user_id if given). Returns (posts, pagination). """ # ---- base visibility filters if drafts: base_filters = [ Post.deleted_at.is_(None), Post.status == "draft", Post.is_page.is_(False), ] if drafts_user_id is not None: base_filters.append(Post.user_id == drafts_user_id) else: base_filters = [ Post.deleted_at.is_(None), Post.status == "published", Post.is_page.is_(False), ] q = select(Post).where(*base_filters) # ---- TAG FILTER (matches any tag on the post) if selected_tags: tag_slugs = list(selected_tags) q = q.where( Post.tags.any( and_( Tag.slug.in_(tag_slugs), Tag.deleted_at.is_(None), ) ) ) # ---- EXCLUDE-COVERED FILTER ("etc" mode: posts NOT covered by any group) if exclude_covered_tag_ids: covered_sq = ( select(PostTag.post_id) .join(Tag, Tag.id == PostTag.tag_id) .where( Tag.id.in_(list(exclude_covered_tag_ids)), Tag.deleted_at.is_(None), ) ) q = q.where(Post.id.notin_(covered_sq)) # ---- AUTHOR FILTER (matches primary or any author) if selected_authors: author_slugs = list(selected_authors) q = q.where( or_( Post.primary_author.has( and_( Author.slug.in_(author_slugs), Author.deleted_at.is_(None), ) ), Post.authors.any( and_( Author.slug.in_(author_slugs), Author.deleted_at.is_(None), ) ), ) ) # ---- SEARCH FILTER (title OR excerpt OR plaintext contains) if search: term = f"%{search.strip().lower()}%" q = q.where( or_( func.lower(func.coalesce(Post.title, "")).like(term), func.lower(func.coalesce(Post.excerpt, "")).like(term), func.lower(func.coalesce(Post.plaintext,"")).like(term), ) ) # ---- ordering if drafts: q = q.order_by(desc(Post.updated_at)) else: q = q.order_by(desc(Post.published_at)) # ---- pagination math if page < 1: page = 1 offset_val = (page - 1) * limit # ---- total count with SAME filters (including tag/author/search) q_no_limit = q.with_only_columns(Post.id).order_by(None) count_q = select(func.count()).select_from(q_no_limit.subquery()) total = int((await self.sess.execute(count_q)).scalar() or 0) # ---- eager load relationships to avoid N+1 / greenlet issues q = ( q.options( joinedload(Post.primary_author), joinedload(Post.primary_tag), selectinload(Post.authors), selectinload(Post.tags), ) .limit(limit) .offset(offset_val) ) rows: List[Post] = list((await self.sess.execute(q)).scalars()) posts = [_post_to_public(p) for p in rows] # ---- search_count: reflect same filters + search (i.e., equals total once filters applied) search_count = total pages_total = (total + limit - 1) // limit if limit else 1 pagination = { "page": page, "limit": limit, "pages": pages_total, "total": total, "search_count": search_count, "next": page + 1 if page < pages_total else None, "prev": page - 1 if page > 1 else None, } return posts, pagination async def list_pages( self, limit: int = 10, page: int = 1, search: Optional[str] = None, ) -> Tuple[List[Dict[str, Any]], Dict[str, Any]]: """ List published pages (is_page=True) with their PageConfig eagerly loaded. Returns (pages, pagination). """ base_filters = [ Post.deleted_at.is_(None), Post.status == "published", Post.is_page.is_(True), ] q = select(Post).where(*base_filters) if search: term = f"%{search.strip().lower()}%" q = q.where( or_( func.lower(func.coalesce(Post.title, "")).like(term), func.lower(func.coalesce(Post.excerpt, "")).like(term), func.lower(func.coalesce(Post.plaintext, "")).like(term), ) ) q = q.order_by(desc(Post.published_at)) if page < 1: page = 1 offset_val = (page - 1) * limit q_no_limit = q.with_only_columns(Post.id).order_by(None) count_q = select(func.count()).select_from(q_no_limit.subquery()) total = int((await self.sess.execute(count_q)).scalar() or 0) q = ( q.options( joinedload(Post.primary_author), joinedload(Post.primary_tag), selectinload(Post.authors), selectinload(Post.tags), ) .limit(limit) .offset(offset_val) ) rows: List[Post] = list((await self.sess.execute(q)).scalars()) # Load PageConfigs directly (page_configs now lives in db_blog) post_ids = [p.id for p in rows] pc_map: Dict[int, dict] = {} if post_ids: from shared.models.page_config import PageConfig try: pc_result = await self.sess.execute( select(PageConfig).where( PageConfig.container_type == "page", PageConfig.container_id.in_(post_ids), ) ) for pc in pc_result.scalars().all(): pc_map[pc.container_id] = { "id": pc.id, "container_type": pc.container_type, "container_id": pc.container_id, "features": pc.features or {}, "sumup_merchant_code": pc.sumup_merchant_code, "sumup_api_key": pc.sumup_api_key, "sumup_checkout_prefix": pc.sumup_checkout_prefix, } except Exception: pass # graceful degradation — pages render without features def _page_to_public(p: Post) -> Dict[str, Any]: d = _post_to_public(p) pc = pc_map.get(p.id) d["features"] = pc["features"] if pc else {} return d pages_list = [_page_to_public(p) for p in rows] pages_total = (total + limit - 1) // limit if limit else 1 pagination = { "page": page, "limit": limit, "pages": pages_total, "total": total, "next": page + 1 if page < pages_total else None, "prev": page - 1 if page > 1 else None, } return pages_list, pagination async def posts_by_slug( self, slug: str, include: Sequence[str] = ("tags", "authors"), fields: Sequence[str] = ( "id", "slug", "title", "html", "excerpt", "custom_excerpt", "published_at", "feature_image", ), include_drafts: bool = False, ) -> List[Dict[str, Any]]: """ Return posts (usually 1) matching this slug. Only returns published, non-deleted posts by default. When include_drafts=True, also returns draft posts (for admin access). Eager-load related objects via selectinload/joinedload so we don't N+1 when serializing in _post_to_public(). """ # Build .options(...) dynamically based on `include` load_options = [] # Tags if "tags" in include: load_options.append(selectinload(Post.tags)) if hasattr(Post, "primary_tag"): # joinedload is fine too; selectin keeps a single extra roundtrip load_options.append(selectinload(Post.primary_tag)) # Authors if "authors" in include: if hasattr(Post, "primary_author"): load_options.append(selectinload(Post.primary_author)) if hasattr(Post, "authors"): load_options.append(selectinload(Post.authors)) filters = [Post.deleted_at.is_(None), Post.slug == slug] if not include_drafts: filters.append(Post.status == "published") q = ( select(Post) .where(*filters) .order_by(desc(Post.published_at)) .options(*load_options) ) result = await self.sess.execute(q) rows: List[Post] = list(result.scalars()) return [(_post_to_public(p), p) for p in rows] async def list_tags( self, limit: int = 5000, page: int = 1, is_page=False, ) -> List[Dict[str, Any]]: """ Return public, not-soft-deleted tags. Include published_post_count = number of published (not deleted) posts using that tag. """ if page < 1: page = 1 offset_val = (page - 1) * limit # Subquery: count published posts per tag tag_post_counts_sq = ( select( PostTag.tag_id.label("tag_id"), func.count().label("published_post_count"), ) .select_from(PostTag) .join(Post, Post.id == PostTag.post_id) .where( Post.deleted_at.is_(None), Post.published_at.is_not(None), Post.is_page.is_(is_page), ) .group_by(PostTag.tag_id) .subquery() ) q = ( select( Tag, func.coalesce(tag_post_counts_sq.c.published_post_count, 0).label( "published_post_count" ), ) .outerjoin( tag_post_counts_sq, tag_post_counts_sq.c.tag_id == Tag.id, ) .where( Tag.deleted_at.is_(None), (Tag.visibility == "public") | (Tag.visibility.is_(None)), func.coalesce(tag_post_counts_sq.c.published_post_count, 0) > 0, ) .order_by(desc(func.coalesce(tag_post_counts_sq.c.published_post_count, 0)), asc(Tag.name)) .limit(limit) .offset(offset_val) ) result = await self.sess.execute(q) # result will return rows like (Tag, published_post_count) rows = list(result.all()) tags = [ { "id": tag.ghost_id, "slug": tag.slug, "name": tag.name, "description": tag.description, "feature_image": tag.feature_image, "visibility": tag.visibility, "published_post_count": count, } for (tag, count) in rows ] return tags async def list_authors( self, limit: int = 5000, page: int = 1, is_page=False, ) -> List[Dict[str, Any]]: """ Return non-deleted authors. Include published_post_count = number of published (not deleted) posts by that author (counted via Post.primary_author_id). """ if page < 1: page = 1 offset_val = (page - 1) * limit # Subquery: count published posts per primary author author_post_counts_sq = ( select( Post.primary_author_id.label("author_id"), func.count().label("published_post_count"), ) .where( Post.deleted_at.is_(None), Post.published_at.is_not(None), Post.is_page.is_(is_page), ) .group_by(Post.primary_author_id) .subquery() ) q = ( select( Author, func.coalesce(author_post_counts_sq.c.published_post_count, 0).label( "published_post_count" ), ) .outerjoin( author_post_counts_sq, author_post_counts_sq.c.author_id == Author.id, ) .where( Author.deleted_at.is_(None), ) .order_by(asc(Author.name)) .limit(limit) .offset(offset_val) ) result = await self.sess.execute(q) rows = list(result.all()) authors = [ { "id": a.ghost_id, "slug": a.slug, "name": a.name, "bio": a.bio, "profile_image": a.profile_image, "cover_image": a.cover_image, "website": a.website, "location": a.location, "facebook": a.facebook, "twitter": a.twitter, "published_post_count": count, } for (a, count) in rows ] return authors async def count_drafts(self, user_id: Optional[int] = None) -> int: """Count draft (non-page, non-deleted) posts, optionally for a single user.""" q = select(func.count()).select_from(Post).where( Post.deleted_at.is_(None), Post.status == "draft", Post.is_page.is_(False), ) if user_id is not None: q = q.where(Post.user_id == user_id) return int((await self.sess.execute(q)).scalar() or 0) async def list_tag_groups_with_counts(self) -> List[Dict[str, Any]]: """ Return all tag groups with aggregated published post counts. Each group dict includes a `tag_slugs` list and `tag_ids` list. Count = distinct published posts having ANY member tag. Ordered by sort_order, name. """ # Subquery: distinct published post IDs per tag group post_count_sq = ( select( TagGroupTag.tag_group_id.label("group_id"), func.count(func.distinct(PostTag.post_id)).label("post_count"), ) .select_from(TagGroupTag) .join(PostTag, PostTag.tag_id == TagGroupTag.tag_id) .join(Post, Post.id == PostTag.post_id) .where( Post.deleted_at.is_(None), Post.published_at.is_not(None), Post.is_page.is_(False), ) .group_by(TagGroupTag.tag_group_id) .subquery() ) q = ( select( TagGroup, func.coalesce(post_count_sq.c.post_count, 0).label("post_count"), ) .outerjoin(post_count_sq, post_count_sq.c.group_id == TagGroup.id) .order_by(asc(TagGroup.sort_order), asc(TagGroup.name)) ) rows = list((await self.sess.execute(q)).all()) groups = [] for tg, count in rows: # Fetch member tag slugs + ids for this group tag_rows = list( (await self.sess.execute( select(Tag.slug, Tag.id) .join(TagGroupTag, TagGroupTag.tag_id == Tag.id) .where( TagGroupTag.tag_group_id == tg.id, Tag.deleted_at.is_(None), (Tag.visibility == "public") | (Tag.visibility.is_(None)), ) )).all() ) groups.append({ "id": tg.id, "name": tg.name, "slug": tg.slug, "feature_image": tg.feature_image, "colour": tg.colour, "sort_order": tg.sort_order, "post_count": count, "tag_slugs": [r[0] for r in tag_rows], "tag_ids": [r[1] for r in tag_rows], }) return groups async def count_etc_posts(self, assigned_tag_ids: List[int]) -> int: """ Count published posts not covered by any tag group. Includes posts with no tags and posts whose tags are all unassigned. """ base = [ Post.deleted_at.is_(None), Post.published_at.is_not(None), Post.is_page.is_(False), ] if assigned_tag_ids: covered_sq = ( select(PostTag.post_id) .join(Tag, Tag.id == PostTag.tag_id) .where( Tag.id.in_(assigned_tag_ids), Tag.deleted_at.is_(None), ) ) base.append(Post.id.notin_(covered_sq)) q = select(func.count()).select_from(Post).where(*base) return int((await self.sess.execute(q)).scalar() or 0) async def list_drafts(self) -> List[Dict[str, Any]]: """Return all draft (non-page, non-deleted) posts, newest-updated first.""" q = ( select(Post) .where( Post.deleted_at.is_(None), Post.status == "draft", Post.is_page.is_(False), ) .order_by(desc(Post.updated_at)) .options( joinedload(Post.primary_author), joinedload(Post.primary_tag), selectinload(Post.authors), selectinload(Post.tags), ) ) rows: List[Post] = list((await self.sess.execute(q)).scalars()) return [_post_to_public(p) for p in rows]