This repository has been archived on 2026-02-24. You can view files and clone it. You cannot open issues or pull requests or push a commit.
Files
blog/bp/blog/routes.py
giles 1f697b2961
All checks were successful
Build and Deploy / build-and-deploy (push) Successful in 1m3s
Phase 4: replace container widgets with fragment fetches
Blog post page and home route fetch container-nav from events + market
concurrently via fetch_fragments(). Blog listing fetches container-cards
from events and parses per-post HTML via comment markers.
widget_paginate proxies calendar pagination to events fragment.

Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
2026-02-24 13:33:18 +00:00

369 lines
12 KiB
Python

from __future__ import annotations
#from quart import Blueprint, g
import json
import os
from quart import (
request,
render_template,
make_response,
g,
Blueprint,
redirect,
url_for,
)
from .ghost_db import DBClient # adjust import path
from shared.db.session import get_session
from .filters.qs import makeqs_factory, decode
from .services.posts_data import posts_data
from .services.pages_data import pages_data
from shared.browser.app.redis_cacher import cache_page, invalidate_tag_cache
from shared.browser.app.utils.htmx import is_htmx_request
from shared.browser.app.authz import require_admin
from shared.utils import host_url
def register(url_prefix, title):
blogs_bp = Blueprint("blog", __name__, url_prefix)
from .web_hooks.routes import ghost_webhooks
blogs_bp.register_blueprint(ghost_webhooks)
from .ghost.editor_api import editor_api_bp
blogs_bp.register_blueprint(editor_api_bp)
from ..post.routes import register as register_blog
blogs_bp.register_blueprint(
register_blog(),
)
from .admin.routes import register as register_tag_groups_admin
blogs_bp.register_blueprint(register_tag_groups_admin())
@blogs_bp.before_app_serving
async def init():
from .ghost.ghost_sync import (
sync_all_content_from_ghost,
sync_all_membership_from_ghost,
)
async with get_session() as s:
await sync_all_content_from_ghost(s)
await sync_all_membership_from_ghost(s)
await s.commit()
@blogs_bp.before_request
def route():
g.makeqs_factory = makeqs_factory
@blogs_bp.context_processor
async def inject_root():
return {
"blog_title": title,
"qs": makeqs_factory()(),
"unsplash_api_key": os.environ.get("UNSPLASH_ACCESS_KEY", ""),
}
SORT_MAP = {
"newest": "published_at DESC",
"oldest": "published_at ASC",
"az": "title ASC",
"za": "title DESC",
"featured": "featured DESC, published_at DESC",
}
@blogs_bp.get("/")
async def home():
"""Render the Ghost page with slug 'home' as the site homepage."""
from ..post.services.post_data import post_data as _post_data
from shared.config import config as get_config
from shared.infrastructure.cart_identity import current_cart_identity
from shared.services.registry import services as svc
from shared.infrastructure.fragments import fetch_fragment, fetch_fragments
p_data = await _post_data("home", g.s, include_drafts=False)
if not p_data:
# Fall back to blog index if "home" page doesn't exist yet
return redirect(host_url(url_for("blog.index")))
g.post_data = p_data
# Build the same context the post blueprint's context_processor provides
db_post_id = p_data["post"]["id"]
post_slug = p_data["post"]["slug"]
# Fetch container nav fragments from events + market
paginate_url = url_for(
'blog.post.widget_paginate',
slug=post_slug, widget_domain='calendar',
)
nav_params = {
"container_type": "page",
"container_id": str(db_post_id),
"post_slug": post_slug,
"paginate_url": paginate_url,
}
events_nav_html, market_nav_html = await fetch_fragments([
("events", "container-nav", nav_params),
("market", "container-nav", nav_params),
])
container_nav_html = events_nav_html + market_nav_html
ctx = {
**p_data,
"base_title": f"{get_config()['title']} {p_data['post']['title']}",
"container_nav_html": container_nav_html,
}
# Page cart badge
if p_data["post"].get("is_page"):
ident = current_cart_identity()
page_summary = await svc.cart.cart_summary(
g.s, user_id=ident["user_id"], session_id=ident["session_id"],
page_slug=post_slug,
)
ctx["page_cart_count"] = page_summary.count + page_summary.calendar_count + page_summary.ticket_count
ctx["page_cart_total"] = float(page_summary.total + page_summary.calendar_total + page_summary.ticket_total)
if not is_htmx_request():
html = await render_template("_types/home/index.html", **ctx)
else:
html = await render_template("_types/home/_oob_elements.html", **ctx)
return await make_response(html)
@blogs_bp.get("/index")
@blogs_bp.get("/index/")
async def index():
"""Blog listing — moved from / to /index."""
q = decode()
content_type = request.args.get("type", "posts")
if content_type == "pages":
data = await pages_data(g.s, q.page, q.search)
context = {
**data,
"content_type": "pages",
"search": q.search,
"selected_tags": (),
"selected_authors": (),
"selected_groups": (),
"sort": None,
"view": None,
"drafts": None,
"draft_count": 0,
"tags": [],
"authors": [],
"tag_groups": [],
"posts": data.get("pages", []),
}
if not is_htmx_request():
html = await render_template("_types/blog/index.html", **context)
elif q.page > 1:
html = await render_template("_types/blog/_page_cards.html", **context)
else:
html = await render_template("_types/blog/_oob_elements.html", **context)
return await make_response(html)
# Default: posts listing
# Drafts filter requires login; ignore if not logged in
show_drafts = bool(q.drafts and g.user)
is_admin = bool((g.get("rights") or {}).get("admin"))
drafts_user_id = None if (not show_drafts or is_admin) else g.user.id
# For the draft count badge: admin sees all drafts, non-admin sees own
count_drafts_uid = None if (g.user and is_admin) else (g.user.id if g.user else False)
data = await posts_data(
g.s, q.page, q.search, q.sort, q.selected_tags, q.selected_authors, q.liked,
drafts=show_drafts, drafts_user_id=drafts_user_id,
count_drafts_for_user_id=count_drafts_uid,
selected_groups=q.selected_groups,
)
context = {
**data,
"content_type": "posts",
"selected_tags": q.selected_tags,
"selected_authors": q.selected_authors,
"selected_groups": q.selected_groups,
"sort": q.sort,
"search": q.search,
"view": q.view,
"drafts": q.drafts if show_drafts else None,
}
# Determine which template to use based on request type and pagination
if not is_htmx_request():
# Normal browser request: full page with layout
html = await render_template("_types/blog/index.html", **context)
elif q.page > 1:
# HTMX pagination: just blog cards + sentinel
html = await render_template("_types/blog/_cards.html", **context)
else:
# HTMX navigation (page 1): main panel + OOB elements
#main_panel = await render_template("_types/blog/_main_panel.html", **context)
html = await render_template("_types/blog/_oob_elements.html", **context)
#html = oob_elements + main_panel
return await make_response(html)
@blogs_bp.get("/new/")
@require_admin
async def new_post():
if not is_htmx_request():
html = await render_template("_types/blog_new/index.html")
else:
html = await render_template("_types/blog_new/_oob_elements.html")
return await make_response(html)
@blogs_bp.post("/new/")
@require_admin
async def new_post_save():
from .ghost.ghost_posts import create_post
from .ghost.lexical_validator import validate_lexical
from .ghost.ghost_sync import sync_single_post
form = await request.form
title = form.get("title", "").strip() or "Untitled"
lexical_raw = form.get("lexical", "")
status = form.get("status", "draft")
feature_image = form.get("feature_image", "").strip()
custom_excerpt = form.get("custom_excerpt", "").strip()
feature_image_caption = form.get("feature_image_caption", "").strip()
# Validate
try:
lexical_doc = json.loads(lexical_raw)
except (json.JSONDecodeError, TypeError):
html = await render_template(
"_types/blog_new/index.html",
save_error="Invalid JSON in editor content.",
)
return await make_response(html, 400)
ok, reason = validate_lexical(lexical_doc)
if not ok:
html = await render_template(
"_types/blog_new/index.html",
save_error=reason,
)
return await make_response(html, 400)
# Create in Ghost
ghost_post = await create_post(
title=title,
lexical_json=lexical_raw,
status=status,
feature_image=feature_image or None,
custom_excerpt=custom_excerpt or None,
feature_image_caption=feature_image_caption or None,
)
# Sync to local DB
await sync_single_post(g.s, ghost_post["id"])
await g.s.flush()
# Set user_id on the newly created post
from models.ghost_content import Post
from sqlalchemy import select
local_post = (await g.s.execute(
select(Post).where(Post.ghost_id == ghost_post["id"])
)).scalar_one_or_none()
if local_post and local_post.user_id is None:
local_post.user_id = g.user.id
await g.s.flush()
# Clear blog listing cache
await invalidate_tag_cache("blog")
# Redirect to the edit page (post is likely a draft, so public detail would 404)
return redirect(host_url(url_for("blog.post.admin.edit", slug=ghost_post["slug"])))
@blogs_bp.get("/new-page/")
@require_admin
async def new_page():
if not is_htmx_request():
html = await render_template("_types/blog_new/index.html", is_page=True)
else:
html = await render_template("_types/blog_new/_oob_elements.html", is_page=True)
return await make_response(html)
@blogs_bp.post("/new-page/")
@require_admin
async def new_page_save():
from .ghost.ghost_posts import create_page
from .ghost.lexical_validator import validate_lexical
from .ghost.ghost_sync import sync_single_page
form = await request.form
title = form.get("title", "").strip() or "Untitled"
lexical_raw = form.get("lexical", "")
status = form.get("status", "draft")
feature_image = form.get("feature_image", "").strip()
custom_excerpt = form.get("custom_excerpt", "").strip()
feature_image_caption = form.get("feature_image_caption", "").strip()
# Validate
try:
lexical_doc = json.loads(lexical_raw)
except (json.JSONDecodeError, TypeError):
html = await render_template(
"_types/blog_new/index.html",
save_error="Invalid JSON in editor content.",
is_page=True,
)
return await make_response(html, 400)
ok, reason = validate_lexical(lexical_doc)
if not ok:
html = await render_template(
"_types/blog_new/index.html",
save_error=reason,
is_page=True,
)
return await make_response(html, 400)
# Create in Ghost (as page)
ghost_page = await create_page(
title=title,
lexical_json=lexical_raw,
status=status,
feature_image=feature_image or None,
custom_excerpt=custom_excerpt or None,
feature_image_caption=feature_image_caption or None,
)
# Sync to local DB (uses pages endpoint)
await sync_single_page(g.s, ghost_page["id"])
await g.s.flush()
# Set user_id on the newly created page
from models.ghost_content import Post
from sqlalchemy import select
local_post = (await g.s.execute(
select(Post).where(Post.ghost_id == ghost_page["id"])
)).scalar_one_or_none()
if local_post and local_post.user_id is None:
local_post.user_id = g.user.id
await g.s.flush()
# Clear blog listing cache
await invalidate_tag_cache("blog")
# Redirect to the page admin
return redirect(host_url(url_for("blog.post.admin.edit", slug=ghost_page["slug"])))
@blogs_bp.get("/drafts/")
async def drafts():
return redirect(host_url(url_for("blog.index")) + "?drafts=1")
return blogs_bp