"""Blog page data service — provides serialized dicts for .sx defpages.""" from __future__ import annotations class BlogPageService: """Service for blog page data, callable via (service "blog-page" ...).""" async def cache_data(self, session, **kw): from quart import url_for as qurl from shared.browser.app.csrf import generate_csrf_token return { "clear_url": qurl("settings.cache_clear"), "csrf": generate_csrf_token(), } async def snippets_data(self, session, **kw): from quart import g, url_for as qurl from sqlalchemy import select, or_ from models import Snippet from shared.browser.app.csrf import generate_csrf_token uid = g.user.id is_admin = g.rights.get("admin") csrf = generate_csrf_token() filters = [Snippet.user_id == uid, Snippet.visibility == "shared"] if is_admin: filters.append(Snippet.visibility == "admin") rows = (await session.execute( select(Snippet).where(or_(*filters)).order_by(Snippet.name) )).scalars().all() snippets = [] for s in rows: s_id = s.id s_vis = s.visibility or "private" s_uid = s.user_id owner = "You" if s_uid == uid else f"User #{s_uid}" can_delete = s_uid == uid or is_admin d = { "id": s_id, "name": s.name or "", "visibility": s_vis, "owner": owner, "can_delete": can_delete, } if is_admin: d["patch_url"] = qurl("snippets.patch_visibility", snippet_id=s_id) if can_delete: d["delete_url"] = qurl("snippets.delete_snippet", snippet_id=s_id) snippets.append(d) return { "snippets": snippets, "is_admin": bool(is_admin), "csrf": csrf, } async def menu_items_data(self, session, **kw): from quart import url_for as qurl from bp.menu_items.services.menu_items import get_all_menu_items from shared.browser.app.csrf import generate_csrf_token menu_items = await get_all_menu_items(session) csrf = generate_csrf_token() items = [] for mi in menu_items: i_id = mi.id label = mi.label or "" fi = getattr(mi, "feature_image", None) sort = mi.position or 0 items.append({ "id": i_id, "label": label, "url": mi.url or "", "sort_order": sort, "feature_image": fi, "edit_url": qurl("menu_items.edit_menu_item", item_id=i_id), "delete_url": qurl("menu_items.delete_menu_item_route", item_id=i_id), }) return { "menu_items": items, "new_url": qurl("menu_items.new_menu_item"), "csrf": csrf, } async def tag_groups_data(self, session, **kw): from quart import url_for as qurl from sqlalchemy import select from models.tag_group import TagGroup from bp.blog.admin.routes import _unassigned_tags from shared.browser.app.csrf import generate_csrf_token groups_rows = list( (await session.execute( select(TagGroup).order_by(TagGroup.sort_order, TagGroup.name) )).scalars() ) unassigned = await _unassigned_tags(session) groups = [] for g in groups_rows: groups.append({ "id": g.id, "name": g.name or "", "slug": getattr(g, "slug", "") or "", "feature_image": getattr(g, "feature_image", None), "colour": getattr(g, "colour", None), "sort_order": getattr(g, "sort_order", 0) or 0, "edit_href": qurl("blog.tag_groups_admin.defpage_tag_group_edit", id=g.id), }) unassigned_tags = [] for t in unassigned: unassigned_tags.append({ "name": getattr(t, "name", "") if hasattr(t, "name") else t.get("name", ""), }) return { "groups": groups, "unassigned_tags": unassigned_tags, "create_url": qurl("blog.tag_groups_admin.create"), "csrf": generate_csrf_token(), } async def tag_group_edit_data(self, session, *, id=None, **kw): from quart import abort, url_for as qurl from sqlalchemy import select from models.tag_group import TagGroup, TagGroupTag from models.ghost_content import Tag from shared.browser.app.csrf import generate_csrf_token tg = await session.get(TagGroup, id) if not tg: abort(404) assigned_rows = list( (await session.execute( select(TagGroupTag.tag_id).where(TagGroupTag.tag_group_id == id) )).scalars() ) assigned_set = set(assigned_rows) all_tags_rows = list( (await session.execute( select(Tag).where( Tag.deleted_at.is_(None), (Tag.visibility == "public") | (Tag.visibility.is_(None)), ).order_by(Tag.name) )).scalars() ) all_tags = [] for t in all_tags_rows: all_tags.append({ "id": t.id, "name": getattr(t, "name", "") or "", "feature_image": getattr(t, "feature_image", None), "checked": t.id in assigned_set, }) return { "group": { "id": tg.id, "name": tg.name or "", "colour": getattr(tg, "colour", "") or "", "sort_order": getattr(tg, "sort_order", 0) or 0, "feature_image": getattr(tg, "feature_image", "") or "", }, "all_tags": all_tags, "save_url": qurl("blog.tag_groups_admin.save", id=tg.id), "delete_url": qurl("blog.tag_groups_admin.delete_group", id=tg.id), "csrf": generate_csrf_token(), } async def post_admin_data(self, session, *, slug=None, **kw): """Post admin panel — just needs post loaded into context.""" from quart import g from sqlalchemy import select from shared.models.page_config import PageConfig # _ensure_post_data is called by before_request in defpage context post = (g.post_data or {}).get("post", {}) features = {} sumup_configured = False if post.get("is_page"): pc = (await session.execute( select(PageConfig).where( PageConfig.container_type == "page", PageConfig.container_id == post["id"], ) )).scalar_one_or_none() if pc: features = pc.features or {} sumup_configured = bool(pc.sumup_api_key) return { "features": features, "sumup_configured": sumup_configured, } async def preview_data(self, session, *, slug=None, **kw): """Build preview data with prettified/rendered content.""" from quart import g from models.ghost_content import Post from sqlalchemy import select as sa_select post_id = g.post_data["post"]["id"] post = (await session.execute( sa_select(Post).where(Post.id == post_id) )).scalar_one_or_none() result = {} sx_content = getattr(post, "sx_content", None) or "" if sx_content: from shared.sx.prettify import sx_to_pretty_sx result["sx_pretty"] = sx_to_pretty_sx(sx_content) lexical_raw = getattr(post, "lexical", None) or "" if lexical_raw: from shared.sx.prettify import json_to_pretty_sx result["json_pretty"] = json_to_pretty_sx(lexical_raw) if sx_content: from shared.sx.parser import parse as sx_parse from shared.sx.html import render as sx_html_render from shared.sx.jinja_bridge import _COMPONENT_ENV try: parsed = sx_parse(sx_content) result["sx_rendered"] = sx_html_render(parsed, dict(_COMPONENT_ENV)) except Exception: result["sx_rendered"] = "Error rendering sx" if lexical_raw: from bp.blog.ghost.lexical_renderer import render_lexical try: result["lex_rendered"] = render_lexical(lexical_raw) except Exception: result["lex_rendered"] = "Error rendering lexical" return result