All checks were successful
Build and Deploy / build-and-deploy (push) Successful in 1m18s
Switch all cross-service relation calls to the new registry-aware relate/unrelate/can-relate actions, and consolidate per-service container-nav fragment fetches into the generic relations handler. Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
404 lines
14 KiB
Python
404 lines
14 KiB
Python
from __future__ import annotations
|
|
|
|
#from quart import Blueprint, g
|
|
|
|
import json
|
|
import os
|
|
|
|
from quart import (
|
|
request,
|
|
make_response,
|
|
g,
|
|
Blueprint,
|
|
redirect,
|
|
url_for,
|
|
)
|
|
from .ghost_db import DBClient # adjust import path
|
|
from shared.db.session import get_session
|
|
from .filters.qs import makeqs_factory, decode
|
|
from .services.posts_data import posts_data
|
|
from .services.pages_data import pages_data
|
|
|
|
from shared.browser.app.redis_cacher import cache_page, invalidate_tag_cache
|
|
from shared.browser.app.utils.htmx import is_htmx_request
|
|
from shared.browser.app.authz import require_admin
|
|
from shared.utils import host_url
|
|
|
|
def register(url_prefix, title):
|
|
blogs_bp = Blueprint("blog", __name__, url_prefix)
|
|
|
|
from .web_hooks.routes import ghost_webhooks
|
|
blogs_bp.register_blueprint(ghost_webhooks)
|
|
|
|
from .ghost.editor_api import editor_api_bp
|
|
blogs_bp.register_blueprint(editor_api_bp)
|
|
|
|
|
|
|
|
from ..post.routes import register as register_blog
|
|
blogs_bp.register_blueprint(
|
|
register_blog(),
|
|
)
|
|
|
|
from .admin.routes import register as register_tag_groups_admin
|
|
blogs_bp.register_blueprint(register_tag_groups_admin())
|
|
|
|
|
|
@blogs_bp.before_app_serving
|
|
async def init():
|
|
from .ghost.ghost_sync import sync_all_content_from_ghost
|
|
from sqlalchemy import text
|
|
import logging
|
|
logger = logging.getLogger(__name__)
|
|
|
|
# Advisory lock prevents multiple Hypercorn workers from
|
|
# running the sync concurrently (which causes PK conflicts).
|
|
async with get_session() as s:
|
|
got_lock = await s.scalar(text("SELECT pg_try_advisory_lock(900001)"))
|
|
if not got_lock:
|
|
await s.rollback() # clean up before returning connection to pool
|
|
return
|
|
try:
|
|
await sync_all_content_from_ghost(s)
|
|
await s.commit()
|
|
except Exception:
|
|
logger.exception("Ghost sync failed — will retry on next deploy")
|
|
try:
|
|
await s.rollback()
|
|
except Exception:
|
|
pass
|
|
finally:
|
|
try:
|
|
await s.execute(text("SELECT pg_advisory_unlock(900001)"))
|
|
await s.commit()
|
|
except Exception:
|
|
pass # lock auto-releases when session closes
|
|
|
|
@blogs_bp.before_request
|
|
def route():
|
|
g.makeqs_factory = makeqs_factory
|
|
|
|
|
|
@blogs_bp.context_processor
|
|
async def inject_root():
|
|
return {
|
|
"blog_title": title,
|
|
"qs": makeqs_factory()(),
|
|
"unsplash_api_key": os.environ.get("UNSPLASH_ACCESS_KEY", ""),
|
|
}
|
|
|
|
SORT_MAP = {
|
|
"newest": "published_at DESC",
|
|
"oldest": "published_at ASC",
|
|
"az": "title ASC",
|
|
"za": "title DESC",
|
|
"featured": "featured DESC, published_at DESC",
|
|
}
|
|
|
|
@blogs_bp.get("/")
|
|
async def home():
|
|
"""Render the Ghost page with slug 'home' as the site homepage."""
|
|
from ..post.services.post_data import post_data as _post_data
|
|
from shared.config import config as get_config
|
|
from shared.infrastructure.cart_identity import current_cart_identity
|
|
from shared.infrastructure.data_client import fetch_data
|
|
from shared.contracts.dtos import CartSummaryDTO, dto_from_dict
|
|
from shared.infrastructure.fragments import fetch_fragment
|
|
|
|
p_data = await _post_data("home", g.s, include_drafts=False)
|
|
if not p_data:
|
|
# Fall back to blog index if "home" page doesn't exist yet
|
|
return redirect(host_url(url_for("blog.index")))
|
|
|
|
g.post_data = p_data
|
|
|
|
# Build the same context the post blueprint's context_processor provides
|
|
db_post_id = p_data["post"]["id"]
|
|
post_slug = p_data["post"]["slug"]
|
|
|
|
# Fetch container nav from relations service
|
|
container_nav_html = await fetch_fragment("relations", "container-nav", params={
|
|
"container_type": "page",
|
|
"container_id": str(db_post_id),
|
|
"post_slug": post_slug,
|
|
})
|
|
|
|
ctx = {
|
|
**p_data,
|
|
"base_title": f"{get_config()['title']} {p_data['post']['title']}",
|
|
"container_nav_html": container_nav_html,
|
|
}
|
|
|
|
# Page cart badge via HTTP
|
|
if p_data["post"].get("is_page"):
|
|
ident = current_cart_identity()
|
|
summary_params = {"page_slug": post_slug}
|
|
if ident["user_id"] is not None:
|
|
summary_params["user_id"] = ident["user_id"]
|
|
if ident["session_id"] is not None:
|
|
summary_params["session_id"] = ident["session_id"]
|
|
raw_summary = await fetch_data("cart", "cart-summary", params=summary_params, required=False)
|
|
page_summary = dto_from_dict(CartSummaryDTO, raw_summary) if raw_summary else CartSummaryDTO()
|
|
ctx["page_cart_count"] = page_summary.count + page_summary.calendar_count + page_summary.ticket_count
|
|
ctx["page_cart_total"] = float(page_summary.total + page_summary.calendar_total + page_summary.ticket_total)
|
|
|
|
from shared.sexp.page import get_template_context
|
|
from sexp.sexp_components import render_home_page, render_home_oob
|
|
|
|
tctx = await get_template_context()
|
|
tctx.update(ctx)
|
|
if not is_htmx_request():
|
|
html = await render_home_page(tctx)
|
|
else:
|
|
html = await render_home_oob(tctx)
|
|
return await make_response(html)
|
|
|
|
@blogs_bp.get("/index")
|
|
@blogs_bp.get("/index/")
|
|
async def index():
|
|
"""Blog listing — moved from / to /index."""
|
|
|
|
q = decode()
|
|
content_type = request.args.get("type", "posts")
|
|
|
|
if content_type == "pages":
|
|
data = await pages_data(g.s, q.page, q.search)
|
|
context = {
|
|
**data,
|
|
"content_type": "pages",
|
|
"search": q.search,
|
|
"selected_tags": (),
|
|
"selected_authors": (),
|
|
"selected_groups": (),
|
|
"sort": None,
|
|
"view": None,
|
|
"drafts": None,
|
|
"draft_count": 0,
|
|
"tags": [],
|
|
"authors": [],
|
|
"tag_groups": [],
|
|
"posts": data.get("pages", []),
|
|
}
|
|
from shared.sexp.page import get_template_context
|
|
from sexp.sexp_components import render_blog_page, render_blog_oob, render_blog_page_cards
|
|
|
|
tctx = await get_template_context()
|
|
tctx.update(context)
|
|
if not is_htmx_request():
|
|
html = await render_blog_page(tctx)
|
|
elif q.page > 1:
|
|
html = await render_blog_page_cards(tctx)
|
|
else:
|
|
html = await render_blog_oob(tctx)
|
|
return await make_response(html)
|
|
|
|
# Default: posts listing
|
|
# Drafts filter requires login; ignore if not logged in
|
|
show_drafts = bool(q.drafts and g.user)
|
|
is_admin = bool((g.get("rights") or {}).get("admin"))
|
|
drafts_user_id = None if (not show_drafts or is_admin) else g.user.id
|
|
|
|
# For the draft count badge: admin sees all drafts, non-admin sees own
|
|
count_drafts_uid = None if (g.user and is_admin) else (g.user.id if g.user else False)
|
|
|
|
data = await posts_data(
|
|
g.s, q.page, q.search, q.sort, q.selected_tags, q.selected_authors, q.liked,
|
|
drafts=show_drafts, drafts_user_id=drafts_user_id,
|
|
count_drafts_for_user_id=count_drafts_uid,
|
|
selected_groups=q.selected_groups,
|
|
)
|
|
|
|
context = {
|
|
**data,
|
|
"content_type": "posts",
|
|
"selected_tags": q.selected_tags,
|
|
"selected_authors": q.selected_authors,
|
|
"selected_groups": q.selected_groups,
|
|
"sort": q.sort,
|
|
"search": q.search,
|
|
"view": q.view,
|
|
"drafts": q.drafts if show_drafts else None,
|
|
}
|
|
|
|
from shared.sexp.page import get_template_context
|
|
from sexp.sexp_components import render_blog_page, render_blog_oob, render_blog_cards
|
|
|
|
tctx = await get_template_context()
|
|
tctx.update(context)
|
|
if not is_htmx_request():
|
|
html = await render_blog_page(tctx)
|
|
elif q.page > 1:
|
|
html = await render_blog_cards(tctx)
|
|
else:
|
|
html = await render_blog_oob(tctx)
|
|
|
|
return await make_response(html)
|
|
|
|
@blogs_bp.get("/new/")
|
|
@require_admin
|
|
async def new_post():
|
|
from shared.sexp.page import get_template_context
|
|
from sexp.sexp_components import render_new_post_page, render_new_post_oob, render_editor_panel
|
|
|
|
tctx = await get_template_context()
|
|
tctx["editor_html"] = render_editor_panel()
|
|
if not is_htmx_request():
|
|
html = await render_new_post_page(tctx)
|
|
else:
|
|
html = await render_new_post_oob(tctx)
|
|
return await make_response(html)
|
|
|
|
@blogs_bp.post("/new/")
|
|
@require_admin
|
|
async def new_post_save():
|
|
from .ghost.ghost_posts import create_post
|
|
from .ghost.lexical_validator import validate_lexical
|
|
from .ghost.ghost_sync import sync_single_post
|
|
|
|
form = await request.form
|
|
title = form.get("title", "").strip() or "Untitled"
|
|
lexical_raw = form.get("lexical", "")
|
|
status = form.get("status", "draft")
|
|
feature_image = form.get("feature_image", "").strip()
|
|
custom_excerpt = form.get("custom_excerpt", "").strip()
|
|
feature_image_caption = form.get("feature_image_caption", "").strip()
|
|
|
|
# Validate
|
|
try:
|
|
lexical_doc = json.loads(lexical_raw)
|
|
except (json.JSONDecodeError, TypeError):
|
|
from shared.sexp.page import get_template_context
|
|
from sexp.sexp_components import render_new_post_page, render_editor_panel
|
|
tctx = await get_template_context()
|
|
tctx["editor_html"] = render_editor_panel(save_error="Invalid JSON in editor content.")
|
|
html = await render_new_post_page(tctx)
|
|
return await make_response(html, 400)
|
|
|
|
ok, reason = validate_lexical(lexical_doc)
|
|
if not ok:
|
|
from shared.sexp.page import get_template_context
|
|
from sexp.sexp_components import render_new_post_page, render_editor_panel
|
|
tctx = await get_template_context()
|
|
tctx["editor_html"] = render_editor_panel(save_error=reason)
|
|
html = await render_new_post_page(tctx)
|
|
return await make_response(html, 400)
|
|
|
|
# Create in Ghost
|
|
ghost_post = await create_post(
|
|
title=title,
|
|
lexical_json=lexical_raw,
|
|
status=status,
|
|
feature_image=feature_image or None,
|
|
custom_excerpt=custom_excerpt or None,
|
|
feature_image_caption=feature_image_caption or None,
|
|
)
|
|
|
|
# Sync to local DB
|
|
await sync_single_post(g.s, ghost_post["id"])
|
|
await g.s.flush()
|
|
|
|
# Set user_id on the newly created post
|
|
from models.ghost_content import Post
|
|
from sqlalchemy import select
|
|
local_post = (await g.s.execute(
|
|
select(Post).where(Post.ghost_id == ghost_post["id"])
|
|
)).scalar_one_or_none()
|
|
if local_post and local_post.user_id is None:
|
|
local_post.user_id = g.user.id
|
|
await g.s.flush()
|
|
|
|
# Clear blog listing cache
|
|
await invalidate_tag_cache("blog")
|
|
|
|
# Redirect to the edit page (post is likely a draft, so public detail would 404)
|
|
return redirect(host_url(url_for("blog.post.admin.edit", slug=ghost_post["slug"])))
|
|
|
|
|
|
@blogs_bp.get("/new-page/")
|
|
@require_admin
|
|
async def new_page():
|
|
from shared.sexp.page import get_template_context
|
|
from sexp.sexp_components import render_new_post_page, render_new_post_oob, render_editor_panel
|
|
|
|
tctx = await get_template_context()
|
|
tctx["editor_html"] = render_editor_panel(is_page=True)
|
|
tctx["is_page"] = True
|
|
if not is_htmx_request():
|
|
html = await render_new_post_page(tctx)
|
|
else:
|
|
html = await render_new_post_oob(tctx)
|
|
return await make_response(html)
|
|
|
|
@blogs_bp.post("/new-page/")
|
|
@require_admin
|
|
async def new_page_save():
|
|
from .ghost.ghost_posts import create_page
|
|
from .ghost.lexical_validator import validate_lexical
|
|
from .ghost.ghost_sync import sync_single_page
|
|
|
|
form = await request.form
|
|
title = form.get("title", "").strip() or "Untitled"
|
|
lexical_raw = form.get("lexical", "")
|
|
status = form.get("status", "draft")
|
|
feature_image = form.get("feature_image", "").strip()
|
|
custom_excerpt = form.get("custom_excerpt", "").strip()
|
|
feature_image_caption = form.get("feature_image_caption", "").strip()
|
|
|
|
# Validate
|
|
try:
|
|
lexical_doc = json.loads(lexical_raw)
|
|
except (json.JSONDecodeError, TypeError):
|
|
from shared.sexp.page import get_template_context
|
|
from sexp.sexp_components import render_new_post_page, render_editor_panel
|
|
tctx = await get_template_context()
|
|
tctx["editor_html"] = render_editor_panel(save_error="Invalid JSON in editor content.", is_page=True)
|
|
tctx["is_page"] = True
|
|
html = await render_new_post_page(tctx)
|
|
return await make_response(html, 400)
|
|
|
|
ok, reason = validate_lexical(lexical_doc)
|
|
if not ok:
|
|
from shared.sexp.page import get_template_context
|
|
from sexp.sexp_components import render_new_post_page, render_editor_panel
|
|
tctx = await get_template_context()
|
|
tctx["editor_html"] = render_editor_panel(save_error=reason, is_page=True)
|
|
tctx["is_page"] = True
|
|
html = await render_new_post_page(tctx)
|
|
return await make_response(html, 400)
|
|
|
|
# Create in Ghost (as page)
|
|
ghost_page = await create_page(
|
|
title=title,
|
|
lexical_json=lexical_raw,
|
|
status=status,
|
|
feature_image=feature_image or None,
|
|
custom_excerpt=custom_excerpt or None,
|
|
feature_image_caption=feature_image_caption or None,
|
|
)
|
|
|
|
# Sync to local DB (uses pages endpoint)
|
|
await sync_single_page(g.s, ghost_page["id"])
|
|
await g.s.flush()
|
|
|
|
# Set user_id on the newly created page
|
|
from models.ghost_content import Post
|
|
from sqlalchemy import select
|
|
local_post = (await g.s.execute(
|
|
select(Post).where(Post.ghost_id == ghost_page["id"])
|
|
)).scalar_one_or_none()
|
|
if local_post and local_post.user_id is None:
|
|
local_post.user_id = g.user.id
|
|
await g.s.flush()
|
|
|
|
# Clear blog listing cache
|
|
await invalidate_tag_cache("blog")
|
|
|
|
# Redirect to the page admin
|
|
return redirect(host_url(url_for("blog.post.admin.edit", slug=ghost_page["slug"])))
|
|
|
|
|
|
@blogs_bp.get("/drafts/")
|
|
async def drafts():
|
|
return redirect(host_url(url_for("blog.index")) + "?drafts=1")
|
|
|
|
return blogs_bp |