Files
rose-ash/blog/bp/blog/routes.py
giles 959e63d440
All checks were successful
Build and Deploy / build-and-deploy (push) Successful in 1m44s
Remove render_to_sx from public API: enforce sx_call for all service code
Replace ~250 render_to_sx calls across all services with sync sx_call,
converting many async functions to sync where no other awaits remained.
Make render_to_sx/render_to_sx_with_env private (_render_to_sx).
Add (post-header-ctx) IO primitive and shared post/post-admin defmacros.
Convert built-in post/post-admin layouts from Python to register_sx_layout
with .sx defcomps. Remove dead post_admin_mobile_nav_sx.

Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
2026-03-04 19:30:45 +00:00

331 lines
12 KiB
Python

from __future__ import annotations
#from quart import Blueprint, g
import json
import os
from quart import (
request,
make_response,
g,
Blueprint,
redirect,
url_for,
)
from .ghost_db import DBClient # adjust import path
from .filters.qs import makeqs_factory, decode
from .services.posts_data import posts_data
from .services.pages_data import pages_data
from shared.browser.app.redis_cacher import cache_page, invalidate_tag_cache
from shared.browser.app.utils.htmx import is_htmx_request
from shared.browser.app.authz import require_admin
from shared.sx.helpers import sx_response, sx_call
from shared.utils import host_url
def register(url_prefix, title):
blogs_bp = Blueprint("blog", __name__, url_prefix)
from .web_hooks.routes import ghost_webhooks
blogs_bp.register_blueprint(ghost_webhooks)
from .ghost.editor_api import editor_api_bp
blogs_bp.register_blueprint(editor_api_bp)
from ..post.routes import register as register_blog
blogs_bp.register_blueprint(
register_blog(),
)
from .admin.routes import register as register_tag_groups_admin
blogs_bp.register_blueprint(register_tag_groups_admin())
@blogs_bp.before_app_serving
async def init():
# Ghost startup sync disabled (Phase 1) — blog service owns content
# directly. The final_ghost_sync.py script was run before cutover.
pass
@blogs_bp.before_request
async def route():
g.makeqs_factory = makeqs_factory
@blogs_bp.context_processor
async def inject_root():
return {
"blog_title": title,
"qs": makeqs_factory()(),
"unsplash_api_key": os.environ.get("UNSPLASH_ACCESS_KEY", ""),
}
async def _render_new_post_page(tctx):
"""Compose a full page with blog header for new post/page creation."""
from shared.sx.helpers import root_header_sx, full_page_sx
from shared.sx.parser import SxExpr
root_hdr = await root_header_sx(tctx)
blog_hdr = sx_call("menu-row-sx",
id="blog-row", level=1,
link_label_content=SxExpr("(div)"),
child_id="blog-header-child")
header_rows = "(<> " + root_hdr + " " + blog_hdr + ")"
content = tctx.get("editor_html", "")
return await full_page_sx(tctx, header_rows=header_rows, content=content)
SORT_MAP = {
"newest": "published_at DESC",
"oldest": "published_at ASC",
"az": "title ASC",
"za": "title DESC",
"featured": "featured DESC, published_at DESC",
}
@blogs_bp.get("/")
async def home():
"""Render the Ghost page with slug 'home' as the site homepage."""
from ..post.services.post_data import post_data as _post_data
from shared.config import config as get_config
from shared.infrastructure.cart_identity import current_cart_identity
from shared.infrastructure.data_client import fetch_data
from shared.contracts.dtos import CartSummaryDTO, dto_from_dict
from shared.infrastructure.fragments import fetch_fragment
p_data = await _post_data("home", g.s, include_drafts=False)
if not p_data:
# Fall back to blog index if "home" page doesn't exist yet
return redirect(host_url(url_for("blog.index")))
g.post_data = p_data
# Build the same context the post blueprint's context_processor provides
db_post_id = p_data["post"]["id"]
post_slug = p_data["post"]["slug"]
# Fetch container nav from relations service
container_nav = await fetch_fragment("relations", "container-nav", params={
"container_type": "page",
"container_id": str(db_post_id),
"post_slug": post_slug,
})
ctx = {
**p_data,
"base_title": get_config()["title"],
"container_nav": container_nav,
}
# Page cart badge via HTTP
if p_data["post"].get("is_page"):
ident = current_cart_identity()
summary_params = {"page_slug": post_slug}
if ident["user_id"] is not None:
summary_params["user_id"] = ident["user_id"]
if ident["session_id"] is not None:
summary_params["session_id"] = ident["session_id"]
raw_summary = await fetch_data("cart", "cart-summary", params=summary_params, required=False)
page_summary = dto_from_dict(CartSummaryDTO, raw_summary) if raw_summary else CartSummaryDTO()
ctx["page_cart_count"] = page_summary.count + page_summary.calendar_count + page_summary.ticket_count
ctx["page_cart_total"] = float(page_summary.total + page_summary.calendar_total + page_summary.ticket_total)
from shared.sx.page import get_template_context
from shared.sx.helpers import (
sx_call, root_header_sx, full_page_sx, oob_page_sx,
post_header_sx, oob_header_sx, mobile_menu_sx,
post_mobile_nav_sx, mobile_root_nav_sx,
)
from shared.sx.parser import SxExpr
from shared.services.registry import services
tctx = await get_template_context()
tctx.update(ctx)
post = ctx.get("post", {})
content = sx_call("blog-home-main",
html_content=post.get("html", ""),
sx_content=SxExpr(post.get("sx_content", "")) if post.get("sx_content") else None)
meta_data = services.blog_page.post_meta_data(post, ctx.get("base_title", ""))
meta = sx_call("blog-meta", **meta_data)
if not is_htmx_request():
root_hdr = await root_header_sx(tctx)
post_hdr = await post_header_sx(tctx)
header_rows = "(<> " + root_hdr + " " + post_hdr + ")"
menu = mobile_menu_sx(await post_mobile_nav_sx(tctx), await mobile_root_nav_sx(tctx))
html = await full_page_sx(tctx, header_rows=header_rows, content=content,
meta=meta, menu=menu)
return await make_response(html)
else:
root_hdr = await root_header_sx(tctx)
post_hdr = await post_header_sx(tctx)
rows = "(<> " + root_hdr + " " + post_hdr + ")"
header_oob = await oob_header_sx("root-header-child", "post-header-child", rows)
sx_src = await oob_page_sx(oobs=header_oob, content=content)
return sx_response(sx_src)
@blogs_bp.get("/index")
@blogs_bp.get("/index/")
async def index():
"""Blog listing — moved from / to /index."""
from shared.services.registry import services
from shared.sx.helpers import (
sx_call, root_header_sx, full_page_sx, oob_page_sx, oob_header_sx,
)
from shared.sx.parser import SxExpr
def _blog_hdr(ctx, oob=False):
return sx_call("menu-row-sx",
id="blog-row", level=1,
link_label_content=SxExpr("(div)"),
child_id="blog-header-child", oob=oob)
data = await services.blog_page.index_data(g.s)
# Render content, aside, and filter via .sx defcomps
content = sx_call("blog-index-main-content", **data)
aside = sx_call("blog-index-aside-content", **data)
filter_sx = sx_call("blog-index-filter-content", **data)
from shared.sx.page import get_template_context
tctx = await get_template_context()
if not is_htmx_request():
root_hdr = await root_header_sx(tctx)
blog_hdr = _blog_hdr(tctx)
header_rows = "(<> " + root_hdr + " " + blog_hdr + ")"
html = await full_page_sx(tctx, header_rows=header_rows,
content=content, aside=aside, filter=filter_sx)
return await make_response(html)
elif data.get("page", 1) > 1:
# Pagination — return just the cards
return sx_response(content)
else:
root_hdr = await root_header_sx(tctx)
blog_hdr = _blog_hdr(tctx)
rows = "(<> " + root_hdr + " " + blog_hdr + ")"
header_oob = await oob_header_sx("root-header-child", "blog-header-child", rows)
sx_src = await oob_page_sx(oobs=header_oob, content=content,
aside=aside, filter=filter_sx)
return sx_response(sx_src)
@blogs_bp.post("/new/")
@require_admin
async def new_post_save():
from .ghost.lexical_validator import validate_lexical
from services.post_writer import create_post as writer_create
form = await request.form
title = form.get("title", "").strip() or "Untitled"
lexical_raw = form.get("lexical", "")
status = form.get("status", "draft")
feature_image = form.get("feature_image", "").strip()
custom_excerpt = form.get("custom_excerpt", "").strip()
feature_image_caption = form.get("feature_image_caption", "").strip()
# Validate
try:
lexical_doc = json.loads(lexical_raw)
except (json.JSONDecodeError, TypeError):
from shared.sx.page import get_template_context
from sxc.pages.renders import render_editor_panel
tctx = await get_template_context()
tctx["editor_html"] = render_editor_panel(save_error="Invalid JSON in editor content.")
html = await _render_new_post_page(tctx)
return await make_response(html, 400)
ok, reason = validate_lexical(lexical_doc)
if not ok:
from shared.sx.page import get_template_context
from sxc.pages.renders import render_editor_panel
tctx = await get_template_context()
tctx["editor_html"] = render_editor_panel(save_error=reason)
html = await _render_new_post_page(tctx)
return await make_response(html, 400)
# Create directly in db_blog
sx_content_raw = form.get("sx_content", "").strip() or None
post = await writer_create(
g.s,
title=title,
lexical_json=lexical_raw,
status=status,
user_id=g.user.id,
feature_image=feature_image or None,
custom_excerpt=custom_excerpt or None,
feature_image_caption=feature_image_caption or None,
sx_content=sx_content_raw,
)
await g.s.flush()
# Clear blog listing cache
await invalidate_tag_cache("blog")
# Redirect to the edit page
return redirect(host_url(url_for("defpage_post_edit", slug=post.slug)))
@blogs_bp.post("/new-page/")
@require_admin
async def new_page_save():
from .ghost.lexical_validator import validate_lexical
from services.post_writer import create_page as writer_create_page
form = await request.form
title = form.get("title", "").strip() or "Untitled"
lexical_raw = form.get("lexical", "")
status = form.get("status", "draft")
feature_image = form.get("feature_image", "").strip()
custom_excerpt = form.get("custom_excerpt", "").strip()
feature_image_caption = form.get("feature_image_caption", "").strip()
# Validate
try:
lexical_doc = json.loads(lexical_raw)
except (json.JSONDecodeError, TypeError):
from shared.sx.page import get_template_context
from sxc.pages.renders import render_editor_panel
tctx = await get_template_context()
tctx["editor_html"] = render_editor_panel(save_error="Invalid JSON in editor content.", is_page=True)
tctx["is_page"] = True
html = await _render_new_post_page(tctx)
return await make_response(html, 400)
ok, reason = validate_lexical(lexical_doc)
if not ok:
from shared.sx.page import get_template_context
from sxc.pages.renders import render_editor_panel
tctx = await get_template_context()
tctx["editor_html"] = render_editor_panel(save_error=reason, is_page=True)
tctx["is_page"] = True
html = await _render_new_post_page(tctx)
return await make_response(html, 400)
# Create directly in db_blog
sx_content_raw = form.get("sx_content", "").strip() or None
page = await writer_create_page(
g.s,
title=title,
lexical_json=lexical_raw,
status=status,
user_id=g.user.id,
feature_image=feature_image or None,
custom_excerpt=custom_excerpt or None,
feature_image_caption=feature_image_caption or None,
sx_content=sx_content_raw,
)
await g.s.flush()
# Clear blog listing cache
await invalidate_tag_cache("blog")
# Redirect to the page admin
return redirect(host_url(url_for("defpage_post_edit", slug=page.slug)))
@blogs_bp.get("/drafts/")
async def drafts():
return redirect(host_url(url_for("blog.index")) + "?drafts=1")
return blogs_bp