from __future__ import annotations #from quart import Blueprint, g import json import os from quart import ( request, make_response, g, Blueprint, redirect, url_for, ) from .ghost_db import DBClient # adjust import path from .filters.qs import makeqs_factory, decode from .services.posts_data import posts_data from .services.pages_data import pages_data from shared.browser.app.redis_cacher import cache_page, invalidate_tag_cache from shared.browser.app.utils.htmx import is_htmx_request from shared.browser.app.authz import require_admin from shared.sx.helpers import sx_response, sx_call from shared.utils import host_url def register(url_prefix, title): blogs_bp = Blueprint("blog", __name__, url_prefix) from .web_hooks.routes import ghost_webhooks blogs_bp.register_blueprint(ghost_webhooks) from .ghost.editor_api import editor_api_bp blogs_bp.register_blueprint(editor_api_bp) from ..post.routes import register as register_blog blogs_bp.register_blueprint( register_blog(), ) from .admin.routes import register as register_tag_groups_admin blogs_bp.register_blueprint(register_tag_groups_admin()) @blogs_bp.before_app_serving async def init(): # Ghost startup sync disabled (Phase 1) — blog service owns content # directly. The final_ghost_sync.py script was run before cutover. pass @blogs_bp.before_request async def route(): g.makeqs_factory = makeqs_factory @blogs_bp.context_processor async def inject_root(): return { "blog_title": title, "qs": makeqs_factory()(), "unsplash_api_key": os.environ.get("UNSPLASH_ACCESS_KEY", ""), } async def _render_new_post_page(tctx): """Compose a full page with blog header for new post/page creation.""" from shared.sx.helpers import root_header_sx, full_page_sx from shared.sx.parser import SxExpr root_hdr = await root_header_sx(tctx) blog_hdr = sx_call("menu-row-sx", id="blog-row", level=1, link_label_content=SxExpr("(div)"), child_id="blog-header-child") header_rows = "(<> " + root_hdr + " " + blog_hdr + ")" content = tctx.get("editor_html", "") return await full_page_sx(tctx, header_rows=header_rows, content=content) SORT_MAP = { "newest": "published_at DESC", "oldest": "published_at ASC", "az": "title ASC", "za": "title DESC", "featured": "featured DESC, published_at DESC", } @blogs_bp.get("/") async def home(): """Render the Ghost page with slug 'home' as the site homepage.""" from ..post.services.post_data import post_data as _post_data from shared.config import config as get_config from shared.infrastructure.cart_identity import current_cart_identity from shared.infrastructure.data_client import fetch_data from shared.contracts.dtos import CartSummaryDTO, dto_from_dict from shared.infrastructure.fragments import fetch_fragment p_data = await _post_data("home", g.s, include_drafts=False) if not p_data: # Fall back to blog index if "home" page doesn't exist yet return redirect(host_url(url_for("blog.index"))) g.post_data = p_data # Build the same context the post blueprint's context_processor provides db_post_id = p_data["post"]["id"] post_slug = p_data["post"]["slug"] # Fetch container nav from relations service container_nav = await fetch_fragment("relations", "container-nav", params={ "container_type": "page", "container_id": str(db_post_id), "post_slug": post_slug, }) ctx = { **p_data, "base_title": get_config()["title"], "container_nav": container_nav, } # Page cart badge via HTTP if p_data["post"].get("is_page"): ident = current_cart_identity() summary_params = {"page_slug": post_slug} if ident["user_id"] is not None: summary_params["user_id"] = ident["user_id"] if ident["session_id"] is not None: summary_params["session_id"] = ident["session_id"] raw_summary = await fetch_data("cart", "cart-summary", params=summary_params, required=False) page_summary = dto_from_dict(CartSummaryDTO, raw_summary) if raw_summary else CartSummaryDTO() ctx["page_cart_count"] = page_summary.count + page_summary.calendar_count + page_summary.ticket_count ctx["page_cart_total"] = float(page_summary.total + page_summary.calendar_total + page_summary.ticket_total) from shared.sx.page import get_template_context from shared.sx.helpers import ( sx_call, root_header_sx, full_page_sx, oob_page_sx, post_header_sx, oob_header_sx, mobile_menu_sx, post_mobile_nav_sx, mobile_root_nav_sx, ) from shared.sx.parser import SxExpr from shared.services.registry import services tctx = await get_template_context() tctx.update(ctx) post = ctx.get("post", {}) content = sx_call("blog-home-main", html_content=post.get("html", ""), sx_content=SxExpr(post.get("sx_content", "")) if post.get("sx_content") else None) meta_data = services.blog_page.post_meta_data(post, ctx.get("base_title", "")) meta = sx_call("blog-meta", **meta_data) if not is_htmx_request(): root_hdr = await root_header_sx(tctx) post_hdr = await post_header_sx(tctx) header_rows = "(<> " + root_hdr + " " + post_hdr + ")" menu = mobile_menu_sx(await post_mobile_nav_sx(tctx), await mobile_root_nav_sx(tctx)) html = await full_page_sx(tctx, header_rows=header_rows, content=content, meta=meta, menu=menu) return await make_response(html) else: root_hdr = await root_header_sx(tctx) post_hdr = await post_header_sx(tctx) rows = "(<> " + root_hdr + " " + post_hdr + ")" header_oob = await oob_header_sx("root-header-child", "post-header-child", rows) sx_src = await oob_page_sx(oobs=header_oob, content=content) return sx_response(sx_src) @blogs_bp.get("/index") @blogs_bp.get("/index/") async def index(): """Blog listing — moved from / to /index.""" from shared.services.registry import services from shared.sx.helpers import ( sx_call, root_header_sx, full_page_sx, oob_page_sx, oob_header_sx, ) from shared.sx.parser import SxExpr def _blog_hdr(ctx, oob=False): return sx_call("menu-row-sx", id="blog-row", level=1, link_label_content=SxExpr("(div)"), child_id="blog-header-child", oob=oob) data = await services.blog_page.index_data(g.s) # Render content, aside, and filter via .sx defcomps content = sx_call("blog-index-main-content", **data) aside = sx_call("blog-index-aside-content", **data) filter_sx = sx_call("blog-index-filter-content", **data) from shared.sx.page import get_template_context tctx = await get_template_context() if not is_htmx_request(): root_hdr = await root_header_sx(tctx) blog_hdr = _blog_hdr(tctx) header_rows = "(<> " + root_hdr + " " + blog_hdr + ")" html = await full_page_sx(tctx, header_rows=header_rows, content=content, aside=aside, filter=filter_sx) return await make_response(html) elif data.get("page", 1) > 1: # Pagination — return just the cards return sx_response(content) else: root_hdr = await root_header_sx(tctx) blog_hdr = _blog_hdr(tctx) rows = "(<> " + root_hdr + " " + blog_hdr + ")" header_oob = await oob_header_sx("root-header-child", "blog-header-child", rows) sx_src = await oob_page_sx(oobs=header_oob, content=content, aside=aside, filter=filter_sx) return sx_response(sx_src) @blogs_bp.post("/new/") @require_admin async def new_post_save(): from .ghost.lexical_validator import validate_lexical from services.post_writer import create_post as writer_create form = await request.form title = form.get("title", "").strip() or "Untitled" lexical_raw = form.get("lexical", "") status = form.get("status", "draft") feature_image = form.get("feature_image", "").strip() custom_excerpt = form.get("custom_excerpt", "").strip() feature_image_caption = form.get("feature_image_caption", "").strip() # Validate try: lexical_doc = json.loads(lexical_raw) except (json.JSONDecodeError, TypeError): from shared.sx.page import get_template_context from sxc.pages.renders import render_editor_panel tctx = await get_template_context() tctx["editor_html"] = render_editor_panel(save_error="Invalid JSON in editor content.") html = await _render_new_post_page(tctx) return await make_response(html, 400) ok, reason = validate_lexical(lexical_doc) if not ok: from shared.sx.page import get_template_context from sxc.pages.renders import render_editor_panel tctx = await get_template_context() tctx["editor_html"] = render_editor_panel(save_error=reason) html = await _render_new_post_page(tctx) return await make_response(html, 400) # Create directly in db_blog sx_content_raw = form.get("sx_content", "").strip() or None post = await writer_create( g.s, title=title, lexical_json=lexical_raw, status=status, user_id=g.user.id, feature_image=feature_image or None, custom_excerpt=custom_excerpt or None, feature_image_caption=feature_image_caption or None, sx_content=sx_content_raw, ) await g.s.flush() # Clear blog listing cache await invalidate_tag_cache("blog") # Redirect to the edit page return redirect(host_url(url_for("defpage_post_edit", slug=post.slug))) @blogs_bp.post("/new-page/") @require_admin async def new_page_save(): from .ghost.lexical_validator import validate_lexical from services.post_writer import create_page as writer_create_page form = await request.form title = form.get("title", "").strip() or "Untitled" lexical_raw = form.get("lexical", "") status = form.get("status", "draft") feature_image = form.get("feature_image", "").strip() custom_excerpt = form.get("custom_excerpt", "").strip() feature_image_caption = form.get("feature_image_caption", "").strip() # Validate try: lexical_doc = json.loads(lexical_raw) except (json.JSONDecodeError, TypeError): from shared.sx.page import get_template_context from sxc.pages.renders import render_editor_panel tctx = await get_template_context() tctx["editor_html"] = render_editor_panel(save_error="Invalid JSON in editor content.", is_page=True) tctx["is_page"] = True html = await _render_new_post_page(tctx) return await make_response(html, 400) ok, reason = validate_lexical(lexical_doc) if not ok: from shared.sx.page import get_template_context from sxc.pages.renders import render_editor_panel tctx = await get_template_context() tctx["editor_html"] = render_editor_panel(save_error=reason, is_page=True) tctx["is_page"] = True html = await _render_new_post_page(tctx) return await make_response(html, 400) # Create directly in db_blog sx_content_raw = form.get("sx_content", "").strip() or None page = await writer_create_page( g.s, title=title, lexical_json=lexical_raw, status=status, user_id=g.user.id, feature_image=feature_image or None, custom_excerpt=custom_excerpt or None, feature_image_caption=feature_image_caption or None, sx_content=sx_content_raw, ) await g.s.flush() # Clear blog listing cache await invalidate_tag_cache("blog") # Redirect to the page admin return redirect(host_url(url_for("defpage_post_edit", slug=page.slug))) @blogs_bp.get("/drafts/") async def drafts(): return redirect(host_url(url_for("blog.index")) + "?drafts=1") return blogs_bp