from __future__ import annotations from quart import ( make_response, Blueprint, g, request, redirect, url_for, ) from shared.browser.app.authz import require_admin, require_post_author from markupsafe import escape from shared.sx.helpers import sx_response, sx_call from shared.sx.parser import SxExpr, serialize as sx_serialize from shared.utils import host_url def _raw_html_sx(html: str) -> str: """Wrap raw HTML in (raw! "...") so it's valid inside sx source.""" if not html: return "" return "(raw! " + sx_serialize(html) + ")" def _post_to_edit_dict(post) -> dict: """Convert an ORM Post to a dict matching the shape templates expect. The templates were written for Ghost Admin API responses, so we mimic that structure (dot-access on dicts via Jinja) from ORM columns. """ d: dict = {} for col in ( "id", "slug", "title", "html", "plaintext", "lexical", "mobiledoc", "sx_content", "feature_image", "feature_image_alt", "feature_image_caption", "excerpt", "custom_excerpt", "visibility", "status", "featured", "is_page", "email_only", "canonical_url", "meta_title", "meta_description", "og_image", "og_title", "og_description", "twitter_image", "twitter_title", "twitter_description", "custom_template", "reading_time", "comment_id", ): d[col] = getattr(post, col, None) # Timestamps as ISO strings (templates do [:16] slicing) for ts in ("published_at", "updated_at", "created_at"): val = getattr(post, ts, None) d[ts] = val.isoformat() if val else "" # Tags as list of dicts with .name (for Jinja map(attribute='name')) if hasattr(post, "tags") and post.tags: d["tags"] = [{"name": t.name, "slug": t.slug, "id": t.id} for t in post.tags] else: d["tags"] = [] # email/newsletter — not available without Ghost, set safe defaults d["email"] = None d["newsletter"] = None return d def _render_features(features, post, result): """Render features panel via .sx defcomp.""" slug = post.get("slug", "") return sx_call("blog-features-panel-content", features_url=host_url(url_for("blog.post.admin.update_features", slug=slug)), calendar_checked=bool(features.get("calendar")), market_checked=bool(features.get("market")), show_sumup=bool(features.get("calendar") or features.get("market")), sumup_url=host_url(url_for("blog.post.admin.update_sumup", slug=slug)), merchant_code=result.get("sumup_merchant_code") or "", placeholder="\u2022" * 8 if result.get("sumup_configured") else "sup_sk_...", sumup_configured=result.get("sumup_configured", False), checkout_prefix=result.get("sumup_checkout_prefix") or "", ) def _serialize_markets(markets, slug): """Serialize ORM/DTO market objects to dicts for .sx defcomp.""" result = [] for m in markets: m_name = getattr(m, "name", "") if hasattr(m, "name") else m.get("name", "") m_slug = getattr(m, "slug", "") if hasattr(m, "slug") else m.get("slug", "") result.append({ "name": m_name, "slug": m_slug, "delete_url": host_url(url_for("blog.post.admin.delete_market", slug=slug, market_slug=m_slug)), }) return result def _render_calendar_view( calendar, year, month, month_name, weekday_names, weeks, prev_month, prev_month_year, next_month, next_month_year, prev_year, next_year, month_entries, associated_entry_ids, post_slug: str, ) -> str: """Build calendar month grid HTML.""" from quart import url_for as qurl from shared.browser.app.csrf import generate_csrf_token esc = escape csrf = generate_csrf_token() cal_id = calendar.id def cal_url(y, m): return esc(host_url(qurl("blog.post.admin.calendar_view", slug=post_slug, calendar_id=cal_id, year=y, month=m))) cur_url = cal_url(year, month) toggle_url_fn = lambda eid: esc(host_url(qurl("blog.post.admin.toggle_entry", slug=post_slug, entry_id=eid))) nav = ( f'
' f'
' ) wd_cells = "".join(f'
{esc(wd)}
' for wd in weekday_names) wd_row = f'' cells: list[str] = [] for week in weeks: for day in week: extra_cls = " bg-stone-50 text-stone-400" if not day.in_month else "" day_date = day.date entry_btns: list[str] = [] for e in month_entries: e_start = getattr(e, "start_at", None) if not e_start or e_start.date() != day_date: continue e_id = getattr(e, "id", None) e_name = esc(getattr(e, "name", "")) t_url = toggle_url_fn(e_id) hx_hdrs = '{:X-CSRFToken "' + csrf + '"}' if e_id in associated_entry_ids: entry_btns.append( f'
' f'{e_name}' f'
' ) else: entry_btns.append( f'' ) entries_html = '
' + "".join(entry_btns) + '
' if entry_btns else '' cells.append( f'
' f'
{day_date.day}
{entries_html}
' ) grid = f'
{"".join(cells)}
' html = ( f'
' f'{nav}' f'
{wd_row}{grid}
' f'
' ) return _raw_html_sx(html) def _render_associated_entries(all_calendars, associated_entry_ids, post_slug: str) -> str: """Render the associated entries panel.""" from shared.browser.app.csrf import generate_csrf_token from sxc.pages.helpers import _extract_associated_entries_data csrf = generate_csrf_token() entry_data = _extract_associated_entries_data( all_calendars, associated_entry_ids, post_slug) return sx_call("blog-associated-entries-from-data", entries=entry_data, csrf=csrf) def _render_nav_entries_oob(associated_entries, calendars, post: dict) -> str: """Render the OOB nav entries swap.""" entries_list = [] if associated_entries and hasattr(associated_entries, "entries"): entries_list = associated_entries.entries or [] has_items = bool(entries_list or calendars) if not has_items: return sx_call("blog-nav-entries-empty") select_colours = ( "[.hover-capable_&]:hover:bg-yellow-300" " aria-selected:bg-stone-500 aria-selected:text-white" " [.hover-capable_&[aria-selected=true]:hover]:bg-orange-500" ) nav_cls = ( f"justify-center cursor-pointer flex flex-row items-center gap-2" f" rounded bg-stone-200 text-black {select_colours} p-2" ) post_slug = post.get("slug", "") scroll_hs = ( "on load or scroll" " if window.innerWidth >= 640 and my.scrollWidth > my.clientWidth" " remove .hidden from .entries-nav-arrow add .flex to .entries-nav-arrow" " else add .hidden to .entries-nav-arrow remove .flex from .entries-nav-arrow end" ) item_parts = [] for entry in entries_list: e_name = getattr(entry, "name", "") e_start = getattr(entry, "start_at", None) e_end = getattr(entry, "end_at", None) cal_slug = getattr(entry, "calendar_slug", "") if e_start: entry_path = ( f"/{post_slug}/{cal_slug}/" f"{e_start.year}/{e_start.month}/{e_start.day}" f"/entries/{getattr(entry, 'id', '')}/" ) date_str = e_start.strftime("%b %d, %Y at %H:%M") if e_end: date_str += f" \u2013 {e_end.strftime('%H:%M')}" else: entry_path = f"/{post_slug}/{cal_slug}/" date_str = "" item_parts.append(sx_call("calendar-entry-nav", href=entry_path, nav_class=nav_cls, name=e_name, date_str=date_str, )) for calendar in (calendars or []): cal_name = getattr(calendar, "name", "") cal_slug = getattr(calendar, "slug", "") cal_path = f"/{post_slug}/{cal_slug}/" item_parts.append(sx_call("blog-nav-calendar-item", href=cal_path, nav_cls=nav_cls, name=cal_name, )) items_sx = "(<> " + " ".join(item_parts) + ")" if item_parts else "" return sx_call("scroll-nav-wrapper", wrapper_id="entries-calendars-nav-wrapper", container_id="associated-items-container", arrow_cls="entries-nav-arrow", left_hs="on click set #associated-items-container.scrollLeft to #associated-items-container.scrollLeft - 200", scroll_hs=scroll_hs, right_hs="on click set #associated-items-container.scrollLeft to #associated-items-container.scrollLeft + 200", items=SxExpr(items_sx) if items_sx else None, oob=True, ) def register(): bp = Blueprint("admin", __name__, url_prefix='/admin') @bp.put("/features/") @require_admin async def update_features(slug: str): """Update PageConfig.features for a page (page_configs lives in db_cart).""" from shared.infrastructure.actions import call_action from quart import jsonify post = g.post_data.get("post") if not post or not post.get("is_page"): return jsonify({"error": "This is not a page."}), 400 post_id = post["id"] # Parse request body body = await request.get_json() if body is None: form = await request.form body = {} for key in ("calendar", "market"): val = form.get(key) if val is not None: body[key] = val in ("true", "1", "on") if not isinstance(body, dict): return jsonify({"error": "Expected JSON object with feature flags."}), 400 # Update via cart action (page_configs lives in db_cart) result = await call_action("blog", "update-page-config", payload={ "container_type": "page", "container_id": post_id, "features": body, }) features = result.get("features", {}) html = _render_features(features, post, result) return sx_response(html) @bp.put("/admin/sumup/") @require_admin async def update_sumup(slug: str): """Update PageConfig SumUp credentials for a page (page_configs lives in db_cart).""" from shared.infrastructure.actions import call_action from quart import jsonify post = g.post_data.get("post") if not post or not post.get("is_page"): return jsonify({"error": "This is not a page."}), 400 post_id = post["id"] form = await request.form merchant_code = (form.get("merchant_code") or "").strip() api_key = (form.get("api_key") or "").strip() checkout_prefix = (form.get("checkout_prefix") or "").strip() payload: dict = { "container_type": "page", "container_id": post_id, "sumup_merchant_code": merchant_code or None, "sumup_checkout_prefix": checkout_prefix or None, } if api_key: payload["sumup_api_key"] = api_key result = await call_action("blog", "update-page-config", payload=payload) features = result.get("features", {}) html = _render_features(features, post, result) return sx_response(html) @bp.get("/entries/calendar//") @require_admin async def calendar_view(slug: str, calendar_id: int): """Show calendar month view for browsing entries""" from shared.models.calendars import Calendar from shared.utils.calendar_helpers import parse_int_arg, add_months, build_calendar_weeks from shared.infrastructure.data_client import fetch_data from shared.contracts.dtos import CalendarEntryDTO, dto_from_dict from sqlalchemy import select from datetime import datetime, timezone import calendar as pycalendar from quart import session as qsession from ..services.entry_associations import get_post_entry_ids # Get month/year from query params today = datetime.now(timezone.utc).date() month = parse_int_arg("month") year = parse_int_arg("year") if year is None: year = today.year if month is None or not (1 <= month <= 12): month = today.month # Load calendar result = await g.s.execute( select(Calendar).where(Calendar.id == calendar_id, Calendar.deleted_at.is_(None)) ) calendar_obj = result.scalar_one_or_none() if not calendar_obj: return await make_response("Calendar not found", 404) # Build calendar data prev_month_year, prev_month = add_months(year, month, -1) next_month_year, next_month = add_months(year, month, +1) prev_year = year - 1 next_year = year + 1 weeks = build_calendar_weeks(year, month) month_name = pycalendar.month_name[month] weekday_names = [pycalendar.day_abbr[i] for i in range(7)] # Get entries for this month via events data endpoint period_start = datetime(year, month, 1, tzinfo=timezone.utc) next_y, next_m = add_months(year, month, +1) period_end = datetime(next_y, next_m, 1, tzinfo=timezone.utc) user = getattr(g, "user", None) user_id = user.id if user else None is_admin = bool(user and getattr(user, "is_admin", False)) session_id = qsession.get("calendar_sid") raw_entries = await fetch_data("events", "visible-entries-for-period", params={ "calendar_id": calendar_obj.id, "period_start": period_start.isoformat(), "period_end": period_end.isoformat(), "user_id": user_id, "is_admin": str(is_admin).lower(), "session_id": session_id, }, required=False) or [] month_entries = [dto_from_dict(CalendarEntryDTO, e) for e in raw_entries] # Get associated entry IDs for this post post_id = g.post_data["post"]["id"] associated_entry_ids = await get_post_entry_ids(post_id) html = _render_calendar_view( calendar_obj, year, month, month_name, weekday_names, weeks, prev_month, prev_month_year, next_month, next_month_year, prev_year, next_year, month_entries, associated_entry_ids, g.post_data["post"]["slug"], ) return sx_response(html) @bp.post("/entries//toggle/") @require_admin async def toggle_entry(slug: str, entry_id: int): from ..services.entry_associations import toggle_entry_association, get_post_entry_ids, get_associated_entries from shared.models.calendars import Calendar from sqlalchemy import select from quart import jsonify post_id = g.post_data["post"]["id"] is_associated, error = await toggle_entry_association(post_id, entry_id) if error: return jsonify({"message": error, "errors": {}}), 400 await g.s.flush() # Return updated association status associated_entry_ids = await get_post_entry_ids(post_id) # Load ALL calendars result = await g.s.execute( select(Calendar) .where(Calendar.deleted_at.is_(None)) .order_by(Calendar.name.asc()) ) all_calendars = result.scalars().all() # Load entries and post for each calendar for calendar in all_calendars: await g.s.refresh(calendar, ["entries", "post"]) # Fetch associated entries for nav display associated_entries = await get_associated_entries(post_id) # Load calendars for this post (for nav display) calendars = ( await g.s.execute( select(Calendar) .where(Calendar.container_type == "page", Calendar.container_id == post_id, Calendar.deleted_at.is_(None)) .order_by(Calendar.name.asc()) ) ).scalars().all() # Return the associated entries admin list + OOB update for nav entries post = g.post_data["post"] admin_list = _render_associated_entries(all_calendars, associated_entry_ids, post["slug"]) nav_entries_html = _render_nav_entries_oob(associated_entries, calendars, post) return sx_response(admin_list + nav_entries_html) @bp.post("/settings/") @require_post_author async def settings_save(slug: str): from services.post_writer import update_post_settings, OptimisticLockError from shared.browser.app.redis_cacher import invalidate_tag_cache post_id = g.post_data["post"]["id"] form = await request.form updated_at = form.get("updated_at", "") # Build kwargs — only include fields that were submitted kwargs: dict = {} # Text fields for field in ( "slug", "custom_template", "meta_title", "meta_description", "canonical_url", "og_image", "og_title", "og_description", "twitter_image", "twitter_title", "twitter_description", "feature_image_alt", ): val = form.get(field) if val is not None: kwargs[field] = val.strip() # Select fields visibility = form.get("visibility") if visibility is not None: kwargs["visibility"] = visibility # Datetime published_at = form.get("published_at", "").strip() if published_at: kwargs["published_at"] = published_at # Checkbox fields: present = True, absent = False kwargs["featured"] = form.get("featured") == "on" kwargs["email_only"] = form.get("email_only") == "on" # Tags — comma-separated string → list of names tags_str = form.get("tags", "").strip() tag_names = [t.strip() for t in tags_str.split(",") if t.strip()] if tags_str else [] try: post = await update_post_settings( g.s, post_id=post_id, expected_updated_at=updated_at, tag_names=tag_names, **kwargs, ) except OptimisticLockError: from urllib.parse import quote return redirect( host_url(url_for("defpage_post_settings", slug=slug)) + "?error=" + quote("Someone else edited this post. Please reload and try again.") ) await g.s.flush() # Clear caches await invalidate_tag_cache("blog") await invalidate_tag_cache("post.post_detail") # Redirect using the (possibly new) slug return redirect(host_url(url_for("defpage_post_settings", slug=post.slug)) + "?saved=1") @bp.post("/edit/") @require_post_author async def edit_save(slug: str): import json from ...blog.ghost.lexical_validator import validate_lexical from services.post_writer import update_post as writer_update, OptimisticLockError from shared.browser.app.redis_cacher import invalidate_tag_cache post_id = g.post_data["post"]["id"] form = await request.form title = form.get("title", "").strip() lexical_raw = form.get("lexical", "") updated_at = form.get("updated_at", "") status = form.get("status", "draft") feature_image = form.get("feature_image", "").strip() custom_excerpt = form.get("custom_excerpt", "").strip() feature_image_caption = form.get("feature_image_caption", "").strip() # Validate the lexical JSON from urllib.parse import quote try: lexical_doc = json.loads(lexical_raw) except (json.JSONDecodeError, TypeError): return redirect(host_url(url_for("defpage_post_edit", slug=slug)) + "?error=" + quote("Invalid JSON in editor content.")) ok, reason = validate_lexical(lexical_doc) if not ok: return redirect(host_url(url_for("defpage_post_edit", slug=slug)) + "?error=" + quote(reason)) # Publish workflow is_admin = bool((g.get("rights") or {}).get("admin")) publish_requested_msg = None # Determine effective status effective_status: str | None = None current_status = g.post_data["post"].get("status", "draft") if status == "published" and current_status != "published" and not is_admin: # Non-admin requesting publish: keep as draft, set local flag publish_requested_msg = "Publish requested — an admin will review." elif status and status != current_status: effective_status = status sx_content_raw = form.get("sx_content", "").strip() or None # Build optional kwargs — only pass sx_content if the form field was present extra_kw: dict = {} if "sx_content" in form: extra_kw["sx_content"] = sx_content_raw try: post = await writer_update( g.s, post_id=post_id, lexical_json=lexical_raw, title=title or None, expected_updated_at=updated_at, feature_image=feature_image or None, custom_excerpt=custom_excerpt or None, feature_image_caption=feature_image_caption or None, status=effective_status, **extra_kw, ) except OptimisticLockError: return redirect( host_url(url_for("defpage_post_edit", slug=slug)) + "?error=" + quote("Someone else edited this post. Please reload and try again.") ) # Handle publish_requested flag if publish_requested_msg: post.publish_requested = True elif status == "published" and is_admin: post.publish_requested = False await g.s.flush() # Clear caches await invalidate_tag_cache("blog") await invalidate_tag_cache("post.post_detail") # Redirect to GET (PRG pattern) — use post.slug in case it changed redirect_url = host_url(url_for("defpage_post_edit", slug=post.slug)) + "?saved=1" if publish_requested_msg: redirect_url += "&publish_requested=1" return redirect(redirect_url) async def _fetch_page_markets(post_id): """Fetch marketplaces for a page via market data endpoint.""" from shared.infrastructure.data_client import fetch_data from shared.contracts.dtos import MarketPlaceDTO, dto_from_dict raw = await fetch_data("market", "marketplaces-for-container", params={"type": "page", "id": post_id}, required=False) or [] return [dto_from_dict(MarketPlaceDTO, m) for m in raw] @bp.get("/markets/") @require_admin async def markets(slug: str): """List markets for this page.""" post = (g.post_data or {}).get("post", {}) post_id = post.get("id") if not post_id: return await make_response("Post not found", 404) page_markets = await _fetch_page_markets(post_id) slug = post.get("slug", "") create_url = host_url(url_for("blog.post.admin.create_market", slug=slug)) html = sx_call("blog-markets-panel-content", markets=_serialize_markets(page_markets, slug), create_url=create_url) return sx_response(html) @bp.post("/markets/new/") @require_admin async def create_market(slug: str): """Create a new market for this page.""" from ..services.markets import create_market as _create_market, MarketError from quart import jsonify post = (g.post_data or {}).get("post", {}) post_id = post.get("id") if not post_id: return jsonify({"error": "Post not found"}), 404 form = await request.form name = (form.get("name") or "").strip() try: await _create_market(g.s, post_id, name) except MarketError as e: return jsonify({"error": str(e)}), 400 # Return updated markets list page_markets = await _fetch_page_markets(post_id) slug = post.get("slug", "") create_url = host_url(url_for("blog.post.admin.create_market", slug=slug)) html = sx_call("blog-markets-panel-content", markets=_serialize_markets(page_markets, slug), create_url=create_url) return sx_response(html) @bp.delete("/markets//") @require_admin async def delete_market(slug: str, market_slug: str): """Soft-delete a market.""" from ..services.markets import soft_delete_market from quart import jsonify post = (g.post_data or {}).get("post", {}) post_id = post.get("id") deleted = await soft_delete_market(g.s, slug, market_slug) if not deleted: return jsonify({"error": "Market not found"}), 404 # Return updated markets list page_markets = await _fetch_page_markets(post_id) slug = post.get("slug", "") create_url = host_url(url_for("blog.post.admin.create_market", slug=slug)) html = sx_call("blog-markets-panel-content", markets=_serialize_markets(page_markets, slug), create_url=create_url) return sx_response(html) return bp