- Home page: inline shared helpers, render_to_sx("blog-home-main")
- Post detail: new ~blog-post-detail-content defcomp with data from service
- Like toggle: call render_to_sx("market-like-toggle-button") directly
- Add post_meta_data() and post_detail_data() to BlogPageService
Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
466 lines
19 KiB
Python
466 lines
19 KiB
Python
"""Blog page data service — provides serialized dicts for .sx defpages."""
|
|
from __future__ import annotations
|
|
|
|
|
|
class BlogPageService:
|
|
"""Service for blog page data, callable via (service "blog-page" ...)."""
|
|
|
|
async def cache_data(self, session, **kw):
|
|
from quart import url_for as qurl
|
|
from shared.browser.app.csrf import generate_csrf_token
|
|
return {
|
|
"clear_url": qurl("settings.cache_clear"),
|
|
"csrf": generate_csrf_token(),
|
|
}
|
|
|
|
async def snippets_data(self, session, **kw):
|
|
from quart import g, url_for as qurl
|
|
from sqlalchemy import select, or_
|
|
from models import Snippet
|
|
from shared.browser.app.csrf import generate_csrf_token
|
|
|
|
uid = g.user.id
|
|
is_admin = g.rights.get("admin")
|
|
csrf = generate_csrf_token()
|
|
filters = [Snippet.user_id == uid, Snippet.visibility == "shared"]
|
|
if is_admin:
|
|
filters.append(Snippet.visibility == "admin")
|
|
rows = (await session.execute(
|
|
select(Snippet).where(or_(*filters)).order_by(Snippet.name)
|
|
)).scalars().all()
|
|
|
|
snippets = []
|
|
for s in rows:
|
|
s_id = s.id
|
|
s_vis = s.visibility or "private"
|
|
s_uid = s.user_id
|
|
owner = "You" if s_uid == uid else f"User #{s_uid}"
|
|
can_delete = s_uid == uid or is_admin
|
|
d = {
|
|
"id": s_id,
|
|
"name": s.name or "",
|
|
"visibility": s_vis,
|
|
"owner": owner,
|
|
"can_delete": can_delete,
|
|
}
|
|
if is_admin:
|
|
d["patch_url"] = qurl("snippets.patch_visibility", snippet_id=s_id)
|
|
if can_delete:
|
|
d["delete_url"] = qurl("snippets.delete_snippet", snippet_id=s_id)
|
|
snippets.append(d)
|
|
return {
|
|
"snippets": snippets,
|
|
"is_admin": bool(is_admin),
|
|
"csrf": csrf,
|
|
}
|
|
|
|
async def menu_items_data(self, session, **kw):
|
|
from quart import url_for as qurl
|
|
from bp.menu_items.services.menu_items import get_all_menu_items
|
|
from shared.browser.app.csrf import generate_csrf_token
|
|
|
|
menu_items = await get_all_menu_items(session)
|
|
csrf = generate_csrf_token()
|
|
items = []
|
|
for mi in menu_items:
|
|
i_id = mi.id
|
|
label = mi.label or ""
|
|
fi = getattr(mi, "feature_image", None)
|
|
sort = mi.position or 0
|
|
items.append({
|
|
"id": i_id,
|
|
"label": label,
|
|
"url": mi.url or "",
|
|
"sort_order": sort,
|
|
"feature_image": fi,
|
|
"edit_url": qurl("menu_items.edit_menu_item", item_id=i_id),
|
|
"delete_url": qurl("menu_items.delete_menu_item_route", item_id=i_id),
|
|
})
|
|
return {
|
|
"menu_items": items,
|
|
"new_url": qurl("menu_items.new_menu_item"),
|
|
"csrf": csrf,
|
|
}
|
|
|
|
async def tag_groups_data(self, session, **kw):
|
|
from quart import url_for as qurl
|
|
from sqlalchemy import select
|
|
from models.tag_group import TagGroup
|
|
from bp.blog.admin.routes import _unassigned_tags
|
|
from shared.browser.app.csrf import generate_csrf_token
|
|
|
|
groups_rows = list(
|
|
(await session.execute(
|
|
select(TagGroup).order_by(TagGroup.sort_order, TagGroup.name)
|
|
)).scalars()
|
|
)
|
|
unassigned = await _unassigned_tags(session)
|
|
|
|
groups = []
|
|
for g in groups_rows:
|
|
groups.append({
|
|
"id": g.id,
|
|
"name": g.name or "",
|
|
"slug": getattr(g, "slug", "") or "",
|
|
"feature_image": getattr(g, "feature_image", None),
|
|
"colour": getattr(g, "colour", None),
|
|
"sort_order": getattr(g, "sort_order", 0) or 0,
|
|
"edit_href": qurl("blog.tag_groups_admin.defpage_tag_group_edit", id=g.id),
|
|
})
|
|
|
|
unassigned_tags = []
|
|
for t in unassigned:
|
|
unassigned_tags.append({
|
|
"name": getattr(t, "name", "") if hasattr(t, "name") else t.get("name", ""),
|
|
})
|
|
|
|
return {
|
|
"groups": groups,
|
|
"unassigned_tags": unassigned_tags,
|
|
"create_url": qurl("blog.tag_groups_admin.create"),
|
|
"csrf": generate_csrf_token(),
|
|
}
|
|
|
|
async def tag_group_edit_data(self, session, *, id=None, **kw):
|
|
from quart import abort, url_for as qurl
|
|
from sqlalchemy import select
|
|
from models.tag_group import TagGroup, TagGroupTag
|
|
from models.ghost_content import Tag
|
|
from shared.browser.app.csrf import generate_csrf_token
|
|
|
|
tg = await session.get(TagGroup, id)
|
|
if not tg:
|
|
abort(404)
|
|
|
|
assigned_rows = list(
|
|
(await session.execute(
|
|
select(TagGroupTag.tag_id).where(TagGroupTag.tag_group_id == id)
|
|
)).scalars()
|
|
)
|
|
assigned_set = set(assigned_rows)
|
|
|
|
all_tags_rows = list(
|
|
(await session.execute(
|
|
select(Tag).where(
|
|
Tag.deleted_at.is_(None),
|
|
(Tag.visibility == "public") | (Tag.visibility.is_(None)),
|
|
).order_by(Tag.name)
|
|
)).scalars()
|
|
)
|
|
|
|
all_tags = []
|
|
for t in all_tags_rows:
|
|
all_tags.append({
|
|
"id": t.id,
|
|
"name": getattr(t, "name", "") or "",
|
|
"feature_image": getattr(t, "feature_image", None),
|
|
"checked": t.id in assigned_set,
|
|
})
|
|
|
|
return {
|
|
"group": {
|
|
"id": tg.id,
|
|
"name": tg.name or "",
|
|
"colour": getattr(tg, "colour", "") or "",
|
|
"sort_order": getattr(tg, "sort_order", 0) or 0,
|
|
"feature_image": getattr(tg, "feature_image", "") or "",
|
|
},
|
|
"all_tags": all_tags,
|
|
"save_url": qurl("blog.tag_groups_admin.save", id=tg.id),
|
|
"delete_url": qurl("blog.tag_groups_admin.delete_group", id=tg.id),
|
|
"csrf": generate_csrf_token(),
|
|
}
|
|
|
|
async def index_data(self, session, **kw):
|
|
"""Blog index page data — posts or pages listing with filters."""
|
|
from quart import g, request, url_for as qurl
|
|
from bp.blog.services.posts_data import posts_data
|
|
from bp.blog.services.pages_data import pages_data
|
|
from bp.blog.filters.qs import decode
|
|
from shared.utils import host_url
|
|
from shared.browser.app.csrf import generate_csrf_token
|
|
|
|
q = decode()
|
|
content_type = request.args.get("type", "posts")
|
|
is_admin = bool((g.get("rights") or {}).get("admin"))
|
|
user = getattr(g, "user", None)
|
|
csrf = generate_csrf_token()
|
|
|
|
blog_url_base = host_url(qurl("blog.index")).rstrip("/index").rstrip("/")
|
|
|
|
if content_type == "pages":
|
|
data = await pages_data(session, q.page, q.search)
|
|
posts_list = data.get("pages", [])
|
|
tag_groups_raw = []
|
|
authors_raw = []
|
|
draft_count = 0
|
|
selected_tags = ()
|
|
selected_authors = ()
|
|
selected_groups = ()
|
|
else:
|
|
show_drafts = bool(q.drafts and user)
|
|
drafts_user_id = None if (not show_drafts or is_admin) else user.id
|
|
count_drafts_uid = None if (user and is_admin) else (user.id if user else False)
|
|
data = await posts_data(
|
|
session, q.page, q.search, q.sort, q.selected_tags,
|
|
q.selected_authors, q.liked,
|
|
drafts=show_drafts, drafts_user_id=drafts_user_id,
|
|
count_drafts_for_user_id=count_drafts_uid,
|
|
selected_groups=q.selected_groups,
|
|
)
|
|
posts_list = data.get("posts", [])
|
|
tag_groups_raw = data.get("tag_groups", [])
|
|
authors_raw = data.get("authors", [])
|
|
draft_count = data.get("draft_count", 0)
|
|
selected_tags = q.selected_tags
|
|
selected_authors = q.selected_authors
|
|
selected_groups = q.selected_groups
|
|
|
|
page_num = data.get("page", q.page)
|
|
total_pages = data.get("total_pages", 1)
|
|
card_widgets = data.get("card_widgets_html", {})
|
|
|
|
current_local_href = f"{blog_url_base}/index"
|
|
if content_type == "pages":
|
|
current_local_href += "?type=pages"
|
|
hx_select = "#main-panel"
|
|
|
|
# Serialize posts for cards
|
|
def _format_ts(dt):
|
|
if not dt:
|
|
return ""
|
|
return dt.strftime("%-d %b %Y at %H:%M") if hasattr(dt, "strftime") else str(dt)
|
|
|
|
cards = []
|
|
for p in posts_list:
|
|
slug = p.get("slug", "")
|
|
href = f"{blog_url_base}/{slug}/"
|
|
status = p.get("status", "published")
|
|
is_draft = status == "draft"
|
|
ts = _format_ts(p.get("updated_at") if is_draft else p.get("published_at"))
|
|
tags = []
|
|
for t in (p.get("tags") or []):
|
|
name = t.get("name") or getattr(t, "name", "")
|
|
fi = t.get("feature_image") or getattr(t, "feature_image", None)
|
|
tags.append({"name": name, "src": fi or "", "initial": name[:1] if name else ""})
|
|
authors = []
|
|
for a in (p.get("authors") or []):
|
|
name = a.get("name") or getattr(a, "name", "")
|
|
img = a.get("profile_image") or getattr(a, "profile_image", None)
|
|
authors.append({"name": name, "image": img or ""})
|
|
card = {
|
|
"slug": slug, "href": href, "hx_select": hx_select,
|
|
"title": p.get("title", ""), "feature_image": p.get("feature_image"),
|
|
"excerpt": p.get("custom_excerpt") or p.get("excerpt", ""),
|
|
"is_draft": is_draft,
|
|
"publish_requested": p.get("publish_requested", False) if is_draft else False,
|
|
"status_timestamp": ts,
|
|
"tags": tags, "authors": authors,
|
|
"has_like": bool(user),
|
|
}
|
|
if user:
|
|
card["liked"] = p.get("is_liked", False)
|
|
card["like_url"] = f"{blog_url_base}/{slug}/like/toggle/"
|
|
card["csrf_token"] = csrf
|
|
widget = card_widgets.get(str(p.get("id", "")), "")
|
|
if widget:
|
|
card["widget"] = widget
|
|
# Page-specific fields
|
|
features = p.get("features") or {}
|
|
if content_type == "pages":
|
|
card["has_calendar"] = features.get("calendar", False)
|
|
card["has_market"] = features.get("market", False)
|
|
card["pub_timestamp"] = ts
|
|
cards.append(card)
|
|
|
|
# Serialize tag groups for filter
|
|
tag_groups = []
|
|
for grp in tag_groups_raw:
|
|
g_slug = grp.get("slug", "") if isinstance(grp, dict) else getattr(grp, "slug", "")
|
|
g_name = grp.get("name", "") if isinstance(grp, dict) else getattr(grp, "name", "")
|
|
g_fi = grp.get("feature_image") if isinstance(grp, dict) else getattr(grp, "feature_image", None)
|
|
g_colour = grp.get("colour") if isinstance(grp, dict) else getattr(grp, "colour", None)
|
|
g_count = grp.get("post_count", 0) if isinstance(grp, dict) else getattr(grp, "post_count", 0)
|
|
if g_count <= 0 and g_slug not in selected_groups:
|
|
continue
|
|
tag_groups.append({
|
|
"slug": g_slug, "name": g_name, "feature_image": g_fi,
|
|
"colour": g_colour, "post_count": g_count,
|
|
"is_selected": g_slug in selected_groups,
|
|
})
|
|
|
|
# Serialize authors for filter
|
|
authors_list = []
|
|
for a in authors_raw:
|
|
a_slug = a.get("slug", "") if isinstance(a, dict) else getattr(a, "slug", "")
|
|
a_name = a.get("name", "") if isinstance(a, dict) else getattr(a, "name", "")
|
|
a_img = a.get("profile_image") if isinstance(a, dict) else getattr(a, "profile_image", None)
|
|
a_count = a.get("published_post_count", 0) if isinstance(a, dict) else getattr(a, "published_post_count", 0)
|
|
authors_list.append({
|
|
"slug": a_slug, "name": a_name, "profile_image": a_img,
|
|
"published_post_count": a_count,
|
|
"is_selected": a_slug in selected_authors,
|
|
})
|
|
|
|
# Filter summary names
|
|
tg_summary_names = [grp["name"] for grp in tag_groups if grp["is_selected"]]
|
|
au_summary_names = [a["name"] for a in authors_list if a["is_selected"]]
|
|
|
|
return {
|
|
"content_type": content_type,
|
|
"view": q.view,
|
|
"cards": cards,
|
|
"page": page_num,
|
|
"total_pages": total_pages,
|
|
"current_local_href": current_local_href,
|
|
"hx_select": hx_select,
|
|
"is_admin": is_admin,
|
|
"has_user": bool(user),
|
|
"draft_count": draft_count,
|
|
"drafts": bool(q.drafts) if user else False,
|
|
"new_post_href": f"{blog_url_base}/new/",
|
|
"new_page_href": f"{blog_url_base}/new-page/",
|
|
"tag_groups": tag_groups,
|
|
"authors": authors_list,
|
|
"is_any_group": len(selected_groups) == 0 and len(selected_tags) == 0,
|
|
"is_any_author": len(selected_authors) == 0,
|
|
"tg_summary": ", ".join(tg_summary_names) if tg_summary_names else "",
|
|
"au_summary": ", ".join(au_summary_names) if au_summary_names else "",
|
|
"blog_url_base": blog_url_base,
|
|
"csrf": csrf,
|
|
}
|
|
|
|
async def post_admin_data(self, session, *, slug=None, **kw):
|
|
"""Post admin panel — just needs post loaded into context."""
|
|
from quart import g
|
|
from sqlalchemy import select
|
|
from shared.models.page_config import PageConfig
|
|
|
|
# _ensure_post_data is called by before_request in defpage context
|
|
post = (g.post_data or {}).get("post", {})
|
|
features = {}
|
|
sumup_configured = False
|
|
if post.get("is_page"):
|
|
pc = (await session.execute(
|
|
select(PageConfig).where(
|
|
PageConfig.container_type == "page",
|
|
PageConfig.container_id == post["id"],
|
|
)
|
|
)).scalar_one_or_none()
|
|
if pc:
|
|
features = pc.features or {}
|
|
sumup_configured = bool(pc.sumup_api_key)
|
|
return {
|
|
"features": features,
|
|
"sumup_configured": sumup_configured,
|
|
}
|
|
|
|
def post_meta_data(self, post, base_title):
|
|
"""Compute SEO meta tag values from post dict."""
|
|
import re
|
|
from quart import request as req
|
|
|
|
is_public = post.get("visibility") == "public"
|
|
is_published = post.get("status") == "published"
|
|
email_only = post.get("email_only", False)
|
|
robots = "index,follow" if (is_public and is_published and not email_only) else "noindex,nofollow"
|
|
|
|
desc = (post.get("meta_description") or post.get("og_description") or
|
|
post.get("twitter_description") or post.get("custom_excerpt") or
|
|
post.get("excerpt") or "")
|
|
if not desc and post.get("html"):
|
|
desc = re.sub(r'<[^>]+>', '', post["html"])
|
|
desc = desc.replace("\n", " ").replace("\r", " ").strip()[:160]
|
|
|
|
image = (post.get("og_image") or post.get("twitter_image") or post.get("feature_image") or "")
|
|
canonical = post.get("canonical_url") or (req.url if req else "")
|
|
|
|
post_title = post.get("meta_title") or post.get("title") or ""
|
|
page_title = f"{post_title} \u2014 {base_title}" if post_title else base_title
|
|
og_title = post.get("og_title") or page_title
|
|
tw_title = post.get("twitter_title") or page_title
|
|
is_article = not post.get("is_page")
|
|
|
|
return {
|
|
"robots": robots, "page_title": page_title, "desc": desc,
|
|
"canonical": canonical,
|
|
"og_type": "article" if is_article else "website",
|
|
"og_title": og_title, "image": image,
|
|
"twitter_card": "summary_large_image" if image else "summary",
|
|
"twitter_title": tw_title,
|
|
}
|
|
|
|
def post_detail_data(self, post, user, rights, csrf, blog_url_base):
|
|
"""Serialize post detail view data for ~blog-post-detail-content defcomp."""
|
|
slug = post.get("slug", "")
|
|
is_admin = rights.get("admin") if isinstance(rights, dict) else getattr(rights, "admin", False)
|
|
user_id = getattr(user, "id", None) if user else None
|
|
|
|
# Tags and authors
|
|
tags = []
|
|
for t in (post.get("tags") or []):
|
|
name = t.get("name") or getattr(t, "name", "")
|
|
fi = t.get("feature_image") or getattr(t, "feature_image", None)
|
|
tags.append({"name": name, "src": fi or "", "initial": name[:1] if name else ""})
|
|
authors = []
|
|
for a in (post.get("authors") or []):
|
|
name = a.get("name") or getattr(a, "name", "")
|
|
img = a.get("profile_image") or getattr(a, "profile_image", None)
|
|
authors.append({"name": name, "image": img or ""})
|
|
|
|
return {
|
|
"slug": slug,
|
|
"is_draft": post.get("status") == "draft",
|
|
"publish_requested": post.get("publish_requested", False),
|
|
"can_edit": is_admin or (user_id is not None and post.get("user_id") == user_id),
|
|
"edit_href": f"{blog_url_base}/{slug}/admin/edit/",
|
|
"is_page": bool(post.get("is_page")),
|
|
"has_user": bool(user),
|
|
"liked": post.get("is_liked", False),
|
|
"like_url": f"{blog_url_base}/{slug}/like/toggle/",
|
|
"csrf": csrf,
|
|
"custom_excerpt": post.get("custom_excerpt") or "",
|
|
"tags": tags,
|
|
"authors": authors,
|
|
"feature_image": post.get("feature_image"),
|
|
"html_content": post.get("html", ""),
|
|
"sx_content": post.get("sx_content", ""),
|
|
}
|
|
|
|
async def preview_data(self, session, *, slug=None, **kw):
|
|
"""Build preview data with prettified/rendered content."""
|
|
from quart import g
|
|
from models.ghost_content import Post
|
|
from sqlalchemy import select as sa_select
|
|
|
|
post_id = g.post_data["post"]["id"]
|
|
post = (await session.execute(
|
|
sa_select(Post).where(Post.id == post_id)
|
|
)).scalar_one_or_none()
|
|
|
|
result = {}
|
|
sx_content = getattr(post, "sx_content", None) or ""
|
|
if sx_content:
|
|
from shared.sx.prettify import sx_to_pretty_sx
|
|
result["sx_pretty"] = sx_to_pretty_sx(sx_content)
|
|
lexical_raw = getattr(post, "lexical", None) or ""
|
|
if lexical_raw:
|
|
from shared.sx.prettify import json_to_pretty_sx
|
|
result["json_pretty"] = json_to_pretty_sx(lexical_raw)
|
|
if sx_content:
|
|
from shared.sx.parser import parse as sx_parse
|
|
from shared.sx.html import render as sx_html_render
|
|
from shared.sx.jinja_bridge import _COMPONENT_ENV
|
|
try:
|
|
parsed = sx_parse(sx_content)
|
|
result["sx_rendered"] = sx_html_render(parsed, dict(_COMPONENT_ENV))
|
|
except Exception:
|
|
result["sx_rendered"] = "<em>Error rendering sx</em>"
|
|
if lexical_raw:
|
|
from bp.blog.ghost.lexical_renderer import render_lexical
|
|
try:
|
|
result["lex_rendered"] = render_lexical(lexical_raw)
|
|
except Exception:
|
|
result["lex_rendered"] = "<em>Error rendering lexical</em>"
|
|
return result
|