Files
mono/blog/bp/post/admin/routes.py
giles 22802bd36b Send all responses as sexp wire format with client-side rendering
- Server sends sexp source text, client (sexp.js) renders everything
- SexpExpr marker class for nested sexp composition in serialize()
- sexp_page() HTML shell with data-mount="body" for full page loads
- sexp_response() returns text/sexp for OOB/partial responses
- ~app-body layout component replaces ~app-layout (no raw!)
- ~rich-text is the only component using raw! (for CMS HTML content)
- Fragment endpoints return text/sexp, auto-wrapped in SexpExpr
- All _*_html() helpers converted to _*_sexp() returning sexp source
- Head auto-hoist: sexp.js moves meta/title/link/script[ld+json]
  from rendered body to document.head automatically
- Unknown components render warning box instead of crashing page
- Component kwargs preserve AST for lazy rendering (fixes <> in kwargs)
- Fix unterminated paren in events/sexp/tickets.sexpr

Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
2026-03-01 09:45:07 +00:00

651 lines
22 KiB
Python

from __future__ import annotations
from quart import (
render_template,
make_response,
Blueprint,
g,
request,
redirect,
url_for,
)
from shared.browser.app.authz import require_admin, require_post_author
from shared.browser.app.utils.htmx import is_htmx_request
from shared.sexp.helpers import sexp_response
from shared.utils import host_url
def register():
bp = Blueprint("admin", __name__, url_prefix='/admin')
@bp.get("/")
@require_admin
async def admin(slug: str):
from shared.browser.app.utils.htmx import is_htmx_request
from sqlalchemy import select
from shared.models.page_config import PageConfig
# Load features for page admin (page_configs now lives in db_blog)
post = (g.post_data or {}).get("post", {})
features = {}
sumup_configured = False
sumup_merchant_code = ""
sumup_checkout_prefix = ""
if post.get("is_page"):
pc = (await g.s.execute(
select(PageConfig).where(
PageConfig.container_type == "page",
PageConfig.container_id == post["id"],
)
)).scalar_one_or_none()
if pc:
features = pc.features or {}
sumup_configured = bool(pc.sumup_api_key)
sumup_merchant_code = pc.sumup_merchant_code or ""
sumup_checkout_prefix = pc.sumup_checkout_prefix or ""
ctx = {
"features": features,
"sumup_configured": sumup_configured,
"sumup_merchant_code": sumup_merchant_code,
"sumup_checkout_prefix": sumup_checkout_prefix,
}
from shared.sexp.page import get_template_context
from sexp.sexp_components import render_post_admin_page, render_post_admin_oob
tctx = await get_template_context()
tctx.update(ctx)
if not is_htmx_request():
html = await render_post_admin_page(tctx)
return await make_response(html)
else:
sexp_src = await render_post_admin_oob(tctx)
return sexp_response(sexp_src)
@bp.put("/features/")
@require_admin
async def update_features(slug: str):
"""Update PageConfig.features for a page (page_configs lives in db_cart)."""
from shared.infrastructure.actions import call_action
from quart import jsonify
post = g.post_data.get("post")
if not post or not post.get("is_page"):
return jsonify({"error": "This is not a page."}), 400
post_id = post["id"]
# Parse request body
body = await request.get_json()
if body is None:
form = await request.form
body = {}
for key in ("calendar", "market"):
val = form.get(key)
if val is not None:
body[key] = val in ("true", "1", "on")
if not isinstance(body, dict):
return jsonify({"error": "Expected JSON object with feature flags."}), 400
# Update via cart action (page_configs lives in db_cart)
result = await call_action("blog", "update-page-config", payload={
"container_type": "page",
"container_id": post_id,
"features": body,
})
features = result.get("features", {})
from sexp.sexp_components import render_features_panel
html = render_features_panel(
features, post,
sumup_configured=result.get("sumup_configured", False),
sumup_merchant_code=result.get("sumup_merchant_code") or "",
sumup_checkout_prefix=result.get("sumup_checkout_prefix") or "",
)
return sexp_response(html)
@bp.put("/admin/sumup/")
@require_admin
async def update_sumup(slug: str):
"""Update PageConfig SumUp credentials for a page (page_configs lives in db_cart)."""
from shared.infrastructure.actions import call_action
from quart import jsonify
post = g.post_data.get("post")
if not post or not post.get("is_page"):
return jsonify({"error": "This is not a page."}), 400
post_id = post["id"]
form = await request.form
merchant_code = (form.get("merchant_code") or "").strip()
api_key = (form.get("api_key") or "").strip()
checkout_prefix = (form.get("checkout_prefix") or "").strip()
payload: dict = {
"container_type": "page",
"container_id": post_id,
"sumup_merchant_code": merchant_code or None,
"sumup_checkout_prefix": checkout_prefix or None,
}
if api_key:
payload["sumup_api_key"] = api_key
result = await call_action("blog", "update-page-config", payload=payload)
features = result.get("features", {})
from sexp.sexp_components import render_features_panel
html = render_features_panel(
features, post,
sumup_configured=result.get("sumup_configured", False),
sumup_merchant_code=result.get("sumup_merchant_code") or "",
sumup_checkout_prefix=result.get("sumup_checkout_prefix") or "",
)
return sexp_response(html)
@bp.get("/data/")
@require_admin
async def data(slug: str):
from shared.sexp.page import get_template_context
from sexp.sexp_components import render_post_data_page, render_post_data_oob
data_html = await render_template("_types/post_data/_main_panel.html")
tctx = await get_template_context()
tctx["data_html"] = data_html
if not is_htmx_request():
html = await render_post_data_page(tctx)
return await make_response(html)
else:
sexp_src = await render_post_data_oob(tctx)
return sexp_response(sexp_src)
@bp.get("/entries/calendar/<int:calendar_id>/")
@require_admin
async def calendar_view(slug: str, calendar_id: int):
"""Show calendar month view for browsing entries"""
from shared.models.calendars import Calendar
from shared.utils.calendar_helpers import parse_int_arg, add_months, build_calendar_weeks
from shared.infrastructure.data_client import fetch_data
from shared.contracts.dtos import CalendarEntryDTO, dto_from_dict
from sqlalchemy import select
from datetime import datetime, timezone
import calendar as pycalendar
from quart import session as qsession
from ..services.entry_associations import get_post_entry_ids
# Get month/year from query params
today = datetime.now(timezone.utc).date()
month = parse_int_arg("month")
year = parse_int_arg("year")
if year is None:
year = today.year
if month is None or not (1 <= month <= 12):
month = today.month
# Load calendar
result = await g.s.execute(
select(Calendar).where(Calendar.id == calendar_id, Calendar.deleted_at.is_(None))
)
calendar_obj = result.scalar_one_or_none()
if not calendar_obj:
return await make_response("Calendar not found", 404)
# Build calendar data
prev_month_year, prev_month = add_months(year, month, -1)
next_month_year, next_month = add_months(year, month, +1)
prev_year = year - 1
next_year = year + 1
weeks = build_calendar_weeks(year, month)
month_name = pycalendar.month_name[month]
weekday_names = [pycalendar.day_abbr[i] for i in range(7)]
# Get entries for this month via events data endpoint
period_start = datetime(year, month, 1, tzinfo=timezone.utc)
next_y, next_m = add_months(year, month, +1)
period_end = datetime(next_y, next_m, 1, tzinfo=timezone.utc)
user = getattr(g, "user", None)
user_id = user.id if user else None
is_admin = bool(user and getattr(user, "is_admin", False))
session_id = qsession.get("calendar_sid")
raw_entries = await fetch_data("events", "visible-entries-for-period", params={
"calendar_id": calendar_obj.id,
"period_start": period_start.isoformat(),
"period_end": period_end.isoformat(),
"user_id": user_id,
"is_admin": str(is_admin).lower(),
"session_id": session_id,
}, required=False) or []
month_entries = [dto_from_dict(CalendarEntryDTO, e) for e in raw_entries]
# Get associated entry IDs for this post
post_id = g.post_data["post"]["id"]
associated_entry_ids = await get_post_entry_ids(g.s, post_id)
html = await render_template(
"_types/post/admin/_calendar_view.html",
calendar=calendar_obj,
year=year,
month=month,
month_name=month_name,
weekday_names=weekday_names,
weeks=weeks,
prev_month=prev_month,
prev_month_year=prev_month_year,
next_month=next_month,
next_month_year=next_month_year,
prev_year=prev_year,
next_year=next_year,
month_entries=month_entries,
associated_entry_ids=associated_entry_ids,
)
return await make_response(html)
@bp.get("/entries/")
@require_admin
async def entries(slug: str):
from ..services.entry_associations import get_post_entry_ids
from shared.models.calendars import Calendar
from sqlalchemy import select
post_id = g.post_data["post"]["id"]
associated_entry_ids = await get_post_entry_ids(g.s, post_id)
# Load ALL calendars (not just this post's calendars)
result = await g.s.execute(
select(Calendar)
.where(Calendar.deleted_at.is_(None))
.order_by(Calendar.name.asc())
)
all_calendars = result.scalars().all()
# Load entries and post for each calendar
for calendar in all_calendars:
await g.s.refresh(calendar, ["entries", "post"])
from shared.sexp.page import get_template_context
from sexp.sexp_components import render_post_entries_page, render_post_entries_oob
entries_html = await render_template(
"_types/post_entries/_main_panel.html",
all_calendars=all_calendars,
associated_entry_ids=associated_entry_ids,
)
tctx = await get_template_context()
tctx["entries_html"] = entries_html
if not is_htmx_request():
html = await render_post_entries_page(tctx)
return await make_response(html)
else:
sexp_src = await render_post_entries_oob(tctx)
return sexp_response(sexp_src)
@bp.post("/entries/<int:entry_id>/toggle/")
@require_admin
async def toggle_entry(slug: str, entry_id: int):
from ..services.entry_associations import toggle_entry_association, get_post_entry_ids, get_associated_entries
from shared.models.calendars import Calendar
from sqlalchemy import select
from quart import jsonify
post_id = g.post_data["post"]["id"]
is_associated, error = await toggle_entry_association(g.s, post_id, entry_id)
if error:
return jsonify({"message": error, "errors": {}}), 400
await g.s.flush()
# Return updated association status
associated_entry_ids = await get_post_entry_ids(g.s, post_id)
# Load ALL calendars
result = await g.s.execute(
select(Calendar)
.where(Calendar.deleted_at.is_(None))
.order_by(Calendar.name.asc())
)
all_calendars = result.scalars().all()
# Load entries and post for each calendar
for calendar in all_calendars:
await g.s.refresh(calendar, ["entries", "post"])
# Fetch associated entries for nav display
associated_entries = await get_associated_entries(g.s, post_id)
# Load calendars for this post (for nav display)
calendars = (
await g.s.execute(
select(Calendar)
.where(Calendar.container_type == "page", Calendar.container_id == post_id, Calendar.deleted_at.is_(None))
.order_by(Calendar.name.asc())
)
).scalars().all()
# Return the associated entries admin list + OOB update for nav entries
from sexp.sexp_components import render_associated_entries, render_nav_entries_oob
post = g.post_data["post"]
admin_list = render_associated_entries(all_calendars, associated_entry_ids, post["slug"])
nav_entries_html = render_nav_entries_oob(associated_entries, calendars, post)
return sexp_response(admin_list + nav_entries_html)
@bp.get("/settings/")
@require_post_author
async def settings(slug: str):
from ...blog.ghost.ghost_posts import get_post_for_edit
ghost_id = g.post_data["post"]["ghost_id"]
is_page = bool(g.post_data["post"].get("is_page"))
ghost_post = await get_post_for_edit(ghost_id, is_page=is_page)
save_success = request.args.get("saved") == "1"
from shared.sexp.page import get_template_context
from sexp.sexp_components import render_post_settings_page, render_post_settings_oob
settings_html = await render_template(
"_types/post_settings/_main_panel.html",
ghost_post=ghost_post,
save_success=save_success,
)
tctx = await get_template_context()
tctx["settings_html"] = settings_html
if not is_htmx_request():
html = await render_post_settings_page(tctx)
return await make_response(html)
else:
sexp_src = await render_post_settings_oob(tctx)
return sexp_response(sexp_src)
@bp.post("/settings/")
@require_post_author
async def settings_save(slug: str):
from ...blog.ghost.ghost_posts import update_post_settings
from ...blog.ghost.ghost_sync import sync_single_post, sync_single_page
from shared.browser.app.redis_cacher import invalidate_tag_cache
ghost_id = g.post_data["post"]["ghost_id"]
is_page = bool(g.post_data["post"].get("is_page"))
form = await request.form
updated_at = form.get("updated_at", "")
# Build kwargs — only include fields that were submitted
kwargs: dict = {}
# Text fields
for field in (
"slug", "custom_template", "meta_title", "meta_description",
"canonical_url", "og_image", "og_title", "og_description",
"twitter_image", "twitter_title", "twitter_description",
"feature_image_alt",
):
val = form.get(field)
if val is not None:
kwargs[field] = val.strip()
# Select fields
visibility = form.get("visibility")
if visibility is not None:
kwargs["visibility"] = visibility
# Datetime
published_at = form.get("published_at", "").strip()
if published_at:
kwargs["published_at"] = published_at
# Checkbox fields: present = True, absent = False
kwargs["featured"] = form.get("featured") == "on"
kwargs["email_only"] = form.get("email_only") == "on"
# Tags — comma-separated string → list of {"name": "..."} dicts
tags_str = form.get("tags", "").strip()
if tags_str:
kwargs["tags"] = [{"name": t.strip()} for t in tags_str.split(",") if t.strip()]
else:
kwargs["tags"] = []
# Update in Ghost
await update_post_settings(
ghost_id=ghost_id,
updated_at=updated_at,
is_page=is_page,
**kwargs,
)
# Sync to local DB
if is_page:
await sync_single_page(g.s, ghost_id)
else:
await sync_single_post(g.s, ghost_id)
await g.s.flush()
# Clear caches
await invalidate_tag_cache("blog")
await invalidate_tag_cache("post.post_detail")
return redirect(host_url(url_for("blog.post.admin.settings", slug=slug)) + "?saved=1")
@bp.get("/edit/")
@require_post_author
async def edit(slug: str):
from ...blog.ghost.ghost_posts import get_post_for_edit
from shared.infrastructure.data_client import fetch_data
ghost_id = g.post_data["post"]["ghost_id"]
is_page = bool(g.post_data["post"].get("is_page"))
ghost_post = await get_post_for_edit(ghost_id, is_page=is_page)
save_success = request.args.get("saved") == "1"
save_error = request.args.get("error", "")
# Newsletters live in db_account — fetch via HTTP
raw_newsletters = await fetch_data("account", "newsletters", required=False) or []
# Convert dicts to objects with .name/.ghost_id attributes for template compat
from types import SimpleNamespace
newsletters = [SimpleNamespace(**nl) for nl in raw_newsletters]
from shared.sexp.page import get_template_context
from sexp.sexp_components import render_post_edit_page, render_post_edit_oob
edit_html = await render_template(
"_types/post_edit/_main_panel.html",
ghost_post=ghost_post,
save_success=save_success,
save_error=save_error,
newsletters=newsletters,
)
tctx = await get_template_context()
tctx["edit_html"] = edit_html
if not is_htmx_request():
html = await render_post_edit_page(tctx)
return await make_response(html)
else:
sexp_src = await render_post_edit_oob(tctx)
return sexp_response(sexp_src)
@bp.post("/edit/")
@require_post_author
async def edit_save(slug: str):
import json
from ...blog.ghost.ghost_posts import update_post
from ...blog.ghost.lexical_validator import validate_lexical
from ...blog.ghost.ghost_sync import sync_single_post, sync_single_page
from shared.browser.app.redis_cacher import invalidate_tag_cache
ghost_id = g.post_data["post"]["ghost_id"]
is_page = bool(g.post_data["post"].get("is_page"))
form = await request.form
title = form.get("title", "").strip()
lexical_raw = form.get("lexical", "")
updated_at = form.get("updated_at", "")
status = form.get("status", "draft")
publish_mode = form.get("publish_mode", "web")
newsletter_slug = form.get("newsletter_slug", "").strip() or None
feature_image = form.get("feature_image", "").strip()
custom_excerpt = form.get("custom_excerpt", "").strip()
feature_image_caption = form.get("feature_image_caption", "").strip()
# Validate the lexical JSON
from urllib.parse import quote
try:
lexical_doc = json.loads(lexical_raw)
except (json.JSONDecodeError, TypeError):
return redirect(host_url(url_for("blog.post.admin.edit", slug=slug)) + "?error=" + quote("Invalid JSON in editor content."))
ok, reason = validate_lexical(lexical_doc)
if not ok:
return redirect(host_url(url_for("blog.post.admin.edit", slug=slug)) + "?error=" + quote(reason))
# Update in Ghost (content save — no status change yet)
ghost_post = await update_post(
ghost_id=ghost_id,
lexical_json=lexical_raw,
title=title or None,
updated_at=updated_at,
feature_image=feature_image,
custom_excerpt=custom_excerpt,
feature_image_caption=feature_image_caption,
is_page=is_page,
)
# Publish workflow
is_admin = bool((g.get("rights") or {}).get("admin"))
publish_requested_msg = None
# Guard: if already emailed, force publish_mode to "web" to prevent re-send
already_emailed = bool(ghost_post.get("email") and ghost_post["email"].get("status"))
if already_emailed and publish_mode in ("email", "both"):
publish_mode = "web"
if status == "published" and ghost_post.get("status") != "published" and not is_admin:
# Non-admin requesting publish: don't send status to Ghost, set local flag
publish_requested_msg = "Publish requested — an admin will review."
elif status and status != ghost_post.get("status"):
# Status is changing — determine email params based on publish_mode
email_kwargs: dict = {}
if status == "published" and publish_mode in ("email", "both") and newsletter_slug:
email_kwargs["newsletter_slug"] = newsletter_slug
email_kwargs["email_segment"] = "all"
if publish_mode == "email":
email_kwargs["email_only"] = True
from ...blog.ghost.ghost_posts import update_post as _up
ghost_post = await _up(
ghost_id=ghost_id,
lexical_json=lexical_raw,
title=None,
updated_at=ghost_post["updated_at"],
status=status,
is_page=is_page,
**email_kwargs,
)
# Sync to local DB
if is_page:
await sync_single_page(g.s, ghost_id)
else:
await sync_single_post(g.s, ghost_id)
await g.s.flush()
# Handle publish_requested flag on the local post
from models.ghost_content import Post
from sqlalchemy import select as sa_select
local_post = (await g.s.execute(
sa_select(Post).where(Post.ghost_id == ghost_id)
)).scalar_one_or_none()
if local_post:
if publish_requested_msg:
local_post.publish_requested = True
elif status == "published" and is_admin:
local_post.publish_requested = False
await g.s.flush()
# Clear caches
await invalidate_tag_cache("blog")
await invalidate_tag_cache("post.post_detail")
# Redirect to GET to avoid resubmit warning on refresh (PRG pattern)
redirect_url = host_url(url_for("blog.post.admin.edit", slug=slug)) + "?saved=1"
if publish_requested_msg:
redirect_url += "&publish_requested=1"
return redirect(redirect_url)
async def _fetch_page_markets(post_id):
"""Fetch marketplaces for a page via market data endpoint."""
from shared.infrastructure.data_client import fetch_data
from shared.contracts.dtos import MarketPlaceDTO, dto_from_dict
raw = await fetch_data("market", "marketplaces-for-container",
params={"type": "page", "id": post_id}, required=False) or []
return [dto_from_dict(MarketPlaceDTO, m) for m in raw]
@bp.get("/markets/")
@require_admin
async def markets(slug: str):
"""List markets for this page."""
post = (g.post_data or {}).get("post", {})
post_id = post.get("id")
if not post_id:
return await make_response("Post not found", 404)
page_markets = await _fetch_page_markets(post_id)
from sexp.sexp_components import render_markets_panel
return sexp_response(render_markets_panel(page_markets, post))
@bp.post("/markets/new/")
@require_admin
async def create_market(slug: str):
"""Create a new market for this page."""
from ..services.markets import create_market as _create_market, MarketError
from quart import jsonify
post = (g.post_data or {}).get("post", {})
post_id = post.get("id")
if not post_id:
return jsonify({"error": "Post not found"}), 404
form = await request.form
name = (form.get("name") or "").strip()
try:
await _create_market(g.s, post_id, name)
except MarketError as e:
return jsonify({"error": str(e)}), 400
# Return updated markets list
page_markets = await _fetch_page_markets(post_id)
from sexp.sexp_components import render_markets_panel
return sexp_response(render_markets_panel(page_markets, post))
@bp.delete("/markets/<market_slug>/")
@require_admin
async def delete_market(slug: str, market_slug: str):
"""Soft-delete a market."""
from ..services.markets import soft_delete_market
from quart import jsonify
post = (g.post_data or {}).get("post", {})
post_id = post.get("id")
deleted = await soft_delete_market(g.s, slug, market_slug)
if not deleted:
return jsonify({"error": "Market not found"}), 404
# Return updated markets list
page_markets = await _fetch_page_markets(post_id)
from sexp.sexp_components import render_markets_panel
return sexp_response(render_markets_panel(page_markets, post))
return bp