This repository has been archived on 2026-02-24. You can view files and clone it. You cannot open issues or pull requests or push a commit.
Files
blog/bp/blog/ghost_db.py
giles 8b6320619e
All checks were successful
Build and Deploy / build-and-deploy (push) Successful in 45s
feat: implement Pages as Spaces Phase 1
- Add PageConfig model with feature flags (calendar, market)
- Auto-create PageConfig on Ghost page sync
- Add create_page() for Ghost /pages/ API endpoint
- Add /new-page/ route for creating pages
- Add ?type=pages blog filter with Posts|Pages tab toggle
- Add list_pages() to DBClient with PageConfig eager loading
- Add PUT /<slug>/admin/features/ route for feature toggles
- Add feature badges (calendar, market) on page cards
- Add features panel to page admin dashboard
- Update shared_lib submodule with PageConfig model

Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
2026-02-10 14:25:34 +00:00

633 lines
20 KiB
Python

from __future__ import annotations
from typing import Any, Dict, List, Optional, Sequence, Tuple
from sqlalchemy import select, func, asc, desc, and_, or_
from sqlalchemy.ext.asyncio import AsyncSession
from sqlalchemy.orm import selectinload, joinedload
from models.ghost_content import Post, Author, Tag, PostTag
from models.page_config import PageConfig
from models.tag_group import TagGroup, TagGroupTag
class DBAPIError(Exception):
"""Raised when our local DB returns something unexpected."""
def _author_to_public(a: Optional[Author]) -> Optional[Dict[str, Any]]:
if a is None:
return None
if a.deleted_at is not None:
# treat deleted authors as missing
return None
return {
"id": a.ghost_id,
"slug": a.slug,
"name": a.name,
"profile_image": a.profile_image,
"cover_image": a.cover_image,
# expose more (bio, etc.) if needed
}
def _tag_to_public(t: Tag) -> Dict[str, Any]:
return {
"id": t.ghost_id,
"slug": t.slug,
"name": t.name,
"description": t.description,
"feature_image": t.feature_image, # fixed key
"visibility": t.visibility,
"deleted_at": t.deleted_at,
}
def _post_to_public(p: Post) -> Dict[str, Any]:
"""
Shape a Post to the public JSON used by the app, mirroring GhostClient._normalise_post.
"""
# Primary author: explicit or first available
primary_author = p.primary_author or (p.authors[0] if p.authors else None)
# Primary tag: prefer explicit relationship, otherwise first public/non-deleted tag
primary_tag = getattr(p, "primary_tag", None)
if primary_tag is None:
public_tags = [
t for t in (p.tags or [])
if t.deleted_at is None and (t.visibility or "public") == "public"
]
primary_tag = public_tags[0] if public_tags else None
return {
"id": p.id,
"ghost_id": p.ghost_id,
"slug": p.slug,
"title": p.title,
"html": p.html,
"is_page": p.is_page,
"excerpt": p.custom_excerpt or p.excerpt,
"custom_excerpt": p.custom_excerpt,
"published_at": p.published_at,
"updated_at": p.updated_at,
"visibility": p.visibility,
"status": p.status,
"deleted_at": p.deleted_at,
"feature_image": p.feature_image,
"user_id": p.user_id,
"publish_requested": p.publish_requested,
"primary_author": _author_to_public(primary_author),
"primary_tag": _tag_to_public(primary_tag) if primary_tag else None,
"tags": [
_tag_to_public(t)
for t in (p.tags or [])
if t.deleted_at is None and (t.visibility or "public") == "public"
],
"authors": [
_author_to_public(a)
for a in (p.authors or [])
if a and a.deleted_at is None
],
}
class DBClient:
"""
Drop-in replacement for GhostClient, but served from our mirrored tables.
Call methods with an AsyncSession.
"""
def __init__(self, session: AsyncSession):
self.sess = session
async def list_posts(
self,
limit: int = 10,
page: int = 1,
selected_tags: Optional[Sequence[str]] = None,
selected_authors: Optional[Sequence[str]] = None,
search: Optional[str] = None,
drafts: bool = False,
drafts_user_id: Optional[int] = None,
exclude_covered_tag_ids: Optional[Sequence[int]] = None,
) -> Tuple[List[Dict[str, Any]], Dict[str, Any]]:
"""
List published posts, optionally filtered by tags/authors and a search term.
When drafts=True, lists draft posts instead (filtered by drafts_user_id if given).
Returns (posts, pagination).
"""
# ---- base visibility filters
if drafts:
base_filters = [
Post.deleted_at.is_(None),
Post.status == "draft",
Post.is_page.is_(False),
]
if drafts_user_id is not None:
base_filters.append(Post.user_id == drafts_user_id)
else:
base_filters = [
Post.deleted_at.is_(None),
Post.status == "published",
Post.is_page.is_(False),
]
q = select(Post).where(*base_filters)
# ---- TAG FILTER (matches any tag on the post)
if selected_tags:
tag_slugs = list(selected_tags)
q = q.where(
Post.tags.any(
and_(
Tag.slug.in_(tag_slugs),
Tag.deleted_at.is_(None),
)
)
)
# ---- EXCLUDE-COVERED FILTER ("etc" mode: posts NOT covered by any group)
if exclude_covered_tag_ids:
covered_sq = (
select(PostTag.post_id)
.join(Tag, Tag.id == PostTag.tag_id)
.where(
Tag.id.in_(list(exclude_covered_tag_ids)),
Tag.deleted_at.is_(None),
)
)
q = q.where(Post.id.notin_(covered_sq))
# ---- AUTHOR FILTER (matches primary or any author)
if selected_authors:
author_slugs = list(selected_authors)
q = q.where(
or_(
Post.primary_author.has(
and_(
Author.slug.in_(author_slugs),
Author.deleted_at.is_(None),
)
),
Post.authors.any(
and_(
Author.slug.in_(author_slugs),
Author.deleted_at.is_(None),
)
),
)
)
# ---- SEARCH FILTER (title OR excerpt OR plaintext contains)
if search:
term = f"%{search.strip().lower()}%"
q = q.where(
or_(
func.lower(func.coalesce(Post.title, "")).like(term),
func.lower(func.coalesce(Post.excerpt, "")).like(term),
func.lower(func.coalesce(Post.plaintext,"")).like(term),
)
)
# ---- ordering
if drafts:
q = q.order_by(desc(Post.updated_at))
else:
q = q.order_by(desc(Post.published_at))
# ---- pagination math
if page < 1:
page = 1
offset_val = (page - 1) * limit
# ---- total count with SAME filters (including tag/author/search)
q_no_limit = q.with_only_columns(Post.id).order_by(None)
count_q = select(func.count()).select_from(q_no_limit.subquery())
total = int((await self.sess.execute(count_q)).scalar() or 0)
# ---- eager load relationships to avoid N+1 / greenlet issues
q = (
q.options(
joinedload(Post.primary_author),
joinedload(Post.primary_tag),
selectinload(Post.authors),
selectinload(Post.tags),
)
.limit(limit)
.offset(offset_val)
)
rows: List[Post] = list((await self.sess.execute(q)).scalars())
posts = [_post_to_public(p) for p in rows]
# ---- search_count: reflect same filters + search (i.e., equals total once filters applied)
search_count = total
pages_total = (total + limit - 1) // limit if limit else 1
pagination = {
"page": page,
"limit": limit,
"pages": pages_total,
"total": total,
"search_count": search_count,
"next": page + 1 if page < pages_total else None,
"prev": page - 1 if page > 1 else None,
}
return posts, pagination
async def list_pages(
self,
limit: int = 10,
page: int = 1,
search: Optional[str] = None,
) -> Tuple[List[Dict[str, Any]], Dict[str, Any]]:
"""
List published pages (is_page=True) with their PageConfig eagerly loaded.
Returns (pages, pagination).
"""
base_filters = [
Post.deleted_at.is_(None),
Post.status == "published",
Post.is_page.is_(True),
]
q = select(Post).where(*base_filters)
if search:
term = f"%{search.strip().lower()}%"
q = q.where(
or_(
func.lower(func.coalesce(Post.title, "")).like(term),
func.lower(func.coalesce(Post.excerpt, "")).like(term),
func.lower(func.coalesce(Post.plaintext, "")).like(term),
)
)
q = q.order_by(desc(Post.published_at))
if page < 1:
page = 1
offset_val = (page - 1) * limit
q_no_limit = q.with_only_columns(Post.id).order_by(None)
count_q = select(func.count()).select_from(q_no_limit.subquery())
total = int((await self.sess.execute(count_q)).scalar() or 0)
q = (
q.options(
joinedload(Post.primary_author),
joinedload(Post.primary_tag),
selectinload(Post.authors),
selectinload(Post.tags),
joinedload(Post.page_config),
)
.limit(limit)
.offset(offset_val)
)
rows: List[Post] = list((await self.sess.execute(q)).scalars())
def _page_to_public(p: Post) -> Dict[str, Any]:
d = _post_to_public(p)
pc = p.page_config
d["features"] = pc.features if pc else {}
return d
pages_list = [_page_to_public(p) for p in rows]
pages_total = (total + limit - 1) // limit if limit else 1
pagination = {
"page": page,
"limit": limit,
"pages": pages_total,
"total": total,
"next": page + 1 if page < pages_total else None,
"prev": page - 1 if page > 1 else None,
}
return pages_list, pagination
async def posts_by_slug(
self,
slug: str,
include: Sequence[str] = ("tags", "authors"),
fields: Sequence[str] = (
"id",
"slug",
"title",
"html",
"excerpt",
"custom_excerpt",
"published_at",
"feature_image",
),
include_drafts: bool = False,
) -> List[Dict[str, Any]]:
"""
Return posts (usually 1) matching this slug.
Only returns published, non-deleted posts by default.
When include_drafts=True, also returns draft posts (for admin access).
Eager-load related objects via selectinload/joinedload so we don't N+1 when
serializing in _post_to_public().
"""
# Build .options(...) dynamically based on `include`
load_options = []
# Tags
if "tags" in include:
load_options.append(selectinload(Post.tags))
if hasattr(Post, "primary_tag"):
# joinedload is fine too; selectin keeps a single extra roundtrip
load_options.append(selectinload(Post.primary_tag))
# Authors
if "authors" in include:
if hasattr(Post, "primary_author"):
load_options.append(selectinload(Post.primary_author))
if hasattr(Post, "authors"):
load_options.append(selectinload(Post.authors))
filters = [Post.deleted_at.is_(None), Post.slug == slug]
if not include_drafts:
filters.append(Post.status == "published")
q = (
select(Post)
.where(*filters)
.order_by(desc(Post.published_at))
.options(*load_options)
)
result = await self.sess.execute(q)
rows: List[Post] = list(result.scalars())
return [(_post_to_public(p), p) for p in rows]
async def list_tags(
self,
limit: int = 5000,
page: int = 1,
is_page=False,
) -> List[Dict[str, Any]]:
"""
Return public, not-soft-deleted tags.
Include published_post_count = number of published (not deleted) posts using that tag.
"""
if page < 1:
page = 1
offset_val = (page - 1) * limit
# Subquery: count published posts per tag
tag_post_counts_sq = (
select(
PostTag.tag_id.label("tag_id"),
func.count().label("published_post_count"),
)
.select_from(PostTag)
.join(Post, Post.id == PostTag.post_id)
.where(
Post.deleted_at.is_(None),
Post.published_at.is_not(None),
Post.is_page.is_(is_page),
)
.group_by(PostTag.tag_id)
.subquery()
)
q = (
select(
Tag,
func.coalesce(tag_post_counts_sq.c.published_post_count, 0).label(
"published_post_count"
),
)
.outerjoin(
tag_post_counts_sq,
tag_post_counts_sq.c.tag_id == Tag.id,
)
.where(
Tag.deleted_at.is_(None),
(Tag.visibility == "public") | (Tag.visibility.is_(None)),
func.coalesce(tag_post_counts_sq.c.published_post_count, 0) > 0,
)
.order_by(desc(func.coalesce(tag_post_counts_sq.c.published_post_count, 0)), asc(Tag.name))
.limit(limit)
.offset(offset_val)
)
result = await self.sess.execute(q)
# result will return rows like (Tag, published_post_count)
rows = list(result.all())
tags = [
{
"id": tag.ghost_id,
"slug": tag.slug,
"name": tag.name,
"description": tag.description,
"feature_image": tag.feature_image,
"visibility": tag.visibility,
"published_post_count": count,
}
for (tag, count) in rows
]
return tags
async def list_authors(
self,
limit: int = 5000,
page: int = 1,
is_page=False,
) -> List[Dict[str, Any]]:
"""
Return non-deleted authors.
Include published_post_count = number of published (not deleted) posts by that author
(counted via Post.primary_author_id).
"""
if page < 1:
page = 1
offset_val = (page - 1) * limit
# Subquery: count published posts per primary author
author_post_counts_sq = (
select(
Post.primary_author_id.label("author_id"),
func.count().label("published_post_count"),
)
.where(
Post.deleted_at.is_(None),
Post.published_at.is_not(None),
Post.is_page.is_(is_page),
)
.group_by(Post.primary_author_id)
.subquery()
)
q = (
select(
Author,
func.coalesce(author_post_counts_sq.c.published_post_count, 0).label(
"published_post_count"
),
)
.outerjoin(
author_post_counts_sq,
author_post_counts_sq.c.author_id == Author.id,
)
.where(
Author.deleted_at.is_(None),
)
.order_by(asc(Author.name))
.limit(limit)
.offset(offset_val)
)
result = await self.sess.execute(q)
rows = list(result.all())
authors = [
{
"id": a.ghost_id,
"slug": a.slug,
"name": a.name,
"bio": a.bio,
"profile_image": a.profile_image,
"cover_image": a.cover_image,
"website": a.website,
"location": a.location,
"facebook": a.facebook,
"twitter": a.twitter,
"published_post_count": count,
}
for (a, count) in rows
]
return authors
async def count_drafts(self, user_id: Optional[int] = None) -> int:
"""Count draft (non-page, non-deleted) posts, optionally for a single user."""
q = select(func.count()).select_from(Post).where(
Post.deleted_at.is_(None),
Post.status == "draft",
Post.is_page.is_(False),
)
if user_id is not None:
q = q.where(Post.user_id == user_id)
return int((await self.sess.execute(q)).scalar() or 0)
async def list_tag_groups_with_counts(self) -> List[Dict[str, Any]]:
"""
Return all tag groups with aggregated published post counts.
Each group dict includes a `tag_slugs` list and `tag_ids` list.
Count = distinct published posts having ANY member tag.
Ordered by sort_order, name.
"""
# Subquery: distinct published post IDs per tag group
post_count_sq = (
select(
TagGroupTag.tag_group_id.label("group_id"),
func.count(func.distinct(PostTag.post_id)).label("post_count"),
)
.select_from(TagGroupTag)
.join(PostTag, PostTag.tag_id == TagGroupTag.tag_id)
.join(Post, Post.id == PostTag.post_id)
.where(
Post.deleted_at.is_(None),
Post.published_at.is_not(None),
Post.is_page.is_(False),
)
.group_by(TagGroupTag.tag_group_id)
.subquery()
)
q = (
select(
TagGroup,
func.coalesce(post_count_sq.c.post_count, 0).label("post_count"),
)
.outerjoin(post_count_sq, post_count_sq.c.group_id == TagGroup.id)
.order_by(asc(TagGroup.sort_order), asc(TagGroup.name))
)
rows = list((await self.sess.execute(q)).all())
groups = []
for tg, count in rows:
# Fetch member tag slugs + ids for this group
tag_rows = list(
(await self.sess.execute(
select(Tag.slug, Tag.id)
.join(TagGroupTag, TagGroupTag.tag_id == Tag.id)
.where(
TagGroupTag.tag_group_id == tg.id,
Tag.deleted_at.is_(None),
(Tag.visibility == "public") | (Tag.visibility.is_(None)),
)
)).all()
)
groups.append({
"id": tg.id,
"name": tg.name,
"slug": tg.slug,
"feature_image": tg.feature_image,
"colour": tg.colour,
"sort_order": tg.sort_order,
"post_count": count,
"tag_slugs": [r[0] for r in tag_rows],
"tag_ids": [r[1] for r in tag_rows],
})
return groups
async def count_etc_posts(self, assigned_tag_ids: List[int]) -> int:
"""
Count published posts not covered by any tag group.
Includes posts with no tags and posts whose tags are all unassigned.
"""
base = [
Post.deleted_at.is_(None),
Post.published_at.is_not(None),
Post.is_page.is_(False),
]
if assigned_tag_ids:
covered_sq = (
select(PostTag.post_id)
.join(Tag, Tag.id == PostTag.tag_id)
.where(
Tag.id.in_(assigned_tag_ids),
Tag.deleted_at.is_(None),
)
)
base.append(Post.id.notin_(covered_sq))
q = select(func.count()).select_from(Post).where(*base)
return int((await self.sess.execute(q)).scalar() or 0)
async def list_drafts(self) -> List[Dict[str, Any]]:
"""Return all draft (non-page, non-deleted) posts, newest-updated first."""
q = (
select(Post)
.where(
Post.deleted_at.is_(None),
Post.status == "draft",
Post.is_page.is_(False),
)
.order_by(desc(Post.updated_at))
.options(
joinedload(Post.primary_author),
joinedload(Post.primary_tag),
selectinload(Post.authors),
selectinload(Post.tags),
)
)
rows: List[Post] = list((await self.sess.execute(q)).scalars())
return [_post_to_public(p) for p in rows]