from __future__ import annotations from quart import ( render_template, make_response, Blueprint, g, request, redirect, url_for, ) from suma_browser.app.authz import require_admin, require_post_author from suma_browser.app.utils.htmx import is_htmx_request from utils import host_url def register(): bp = Blueprint("admin", __name__, url_prefix='/admin') @bp.get("/") @require_admin async def admin(slug: str): from suma_browser.app.utils.htmx import is_htmx_request from models.page_config import PageConfig from sqlalchemy import select as sa_select # Load features for page admin post = (g.post_data or {}).get("post", {}) features = {} sumup_configured = False sumup_merchant_code = "" sumup_checkout_prefix = "" if post.get("is_page"): pc = (await g.s.execute( sa_select(PageConfig).where(PageConfig.post_id == post["id"]) )).scalar_one_or_none() if pc: features = pc.features or {} sumup_configured = bool(pc.sumup_api_key) sumup_merchant_code = pc.sumup_merchant_code or "" sumup_checkout_prefix = pc.sumup_checkout_prefix or "" ctx = { "features": features, "sumup_configured": sumup_configured, "sumup_merchant_code": sumup_merchant_code, "sumup_checkout_prefix": sumup_checkout_prefix, } # Determine which template to use based on request type if not is_htmx_request(): # Normal browser request: full page with layout html = await render_template("_types/post/admin/index.html", **ctx) else: # HTMX request: main panel + OOB elements html = await render_template("_types/post/admin/_oob_elements.html", **ctx) return await make_response(html) @bp.put("/features/") @require_admin async def update_features(slug: str): """Update PageConfig.features for a page.""" from models.page_config import PageConfig from models.ghost_content import Post from sqlalchemy import select as sa_select from quart import jsonify import json post = g.post_data.get("post") if not post or not post.get("is_page"): return jsonify({"error": "This is not a page."}), 400 post_id = post["id"] # Load or create PageConfig pc = (await g.s.execute( sa_select(PageConfig).where(PageConfig.post_id == post_id) )).scalar_one_or_none() if pc is None: pc = PageConfig(post_id=post_id, features={}) g.s.add(pc) await g.s.flush() # Parse request body body = await request.get_json() if body is None: # Fall back to form data form = await request.form body = {} for key in ("calendar", "market"): val = form.get(key) if val is not None: body[key] = val in ("true", "1", "on") if not isinstance(body, dict): return jsonify({"error": "Expected JSON object with feature flags."}), 400 # Merge features features = dict(pc.features or {}) for key, val in body.items(): if isinstance(val, bool): features[key] = val elif val in ("true", "1", "on"): features[key] = True elif val in ("false", "0", "off", None): features[key] = False pc.features = features from sqlalchemy.orm.attributes import flag_modified flag_modified(pc, "features") await g.s.flush() # Return updated features panel html = await render_template( "_types/post/admin/_features_panel.html", features=features, post=post, sumup_configured=bool(pc.sumup_api_key), sumup_merchant_code=pc.sumup_merchant_code or "", sumup_checkout_prefix=pc.sumup_checkout_prefix or "", ) return await make_response(html) @bp.put("/admin/sumup/") @require_admin async def update_sumup(slug: str): """Update PageConfig SumUp credentials for a page.""" from models.page_config import PageConfig from sqlalchemy import select as sa_select from quart import jsonify post = g.post_data.get("post") if not post or not post.get("is_page"): return jsonify({"error": "This is not a page."}), 400 post_id = post["id"] pc = (await g.s.execute( sa_select(PageConfig).where(PageConfig.post_id == post_id) )).scalar_one_or_none() if pc is None: pc = PageConfig(post_id=post_id, features={}) g.s.add(pc) await g.s.flush() form = await request.form merchant_code = (form.get("merchant_code") or "").strip() api_key = (form.get("api_key") or "").strip() checkout_prefix = (form.get("checkout_prefix") or "").strip() pc.sumup_merchant_code = merchant_code or None pc.sumup_checkout_prefix = checkout_prefix or None # Only update API key if non-empty (allows updating other fields without re-entering key) if api_key: pc.sumup_api_key = api_key await g.s.flush() features = pc.features or {} html = await render_template( "_types/post/admin/_features_panel.html", features=features, post=post, sumup_configured=bool(pc.sumup_api_key), sumup_merchant_code=pc.sumup_merchant_code or "", sumup_checkout_prefix=pc.sumup_checkout_prefix or "", ) return await make_response(html) @bp.get("/data/") @require_admin async def data(slug: str): if not is_htmx_request(): html = await render_template( "_types/post_data/index.html", ) else: html = await render_template( "_types/post_data/_oob_elements.html", ) return await make_response(html) @bp.get("/entries/calendar//") @require_admin async def calendar_view(slug: str, calendar_id: int): """Show calendar month view for browsing entries""" from models.calendars import Calendar from sqlalchemy import select from datetime import datetime, timezone from quart import request import calendar as pycalendar from ...calendar.services.calendar_view import parse_int_arg, add_months, build_calendar_weeks from ...calendar.services import get_visible_entries_for_period from quart import session as qsession from ..services.entry_associations import get_post_entry_ids # Get month/year from query params today = datetime.now(timezone.utc).date() month = parse_int_arg("month") year = parse_int_arg("year") if year is None: year = today.year if month is None or not (1 <= month <= 12): month = today.month # Load calendar result = await g.s.execute( select(Calendar).where(Calendar.id == calendar_id, Calendar.deleted_at.is_(None)) ) calendar_obj = result.scalar_one_or_none() if not calendar_obj: return await make_response("Calendar not found", 404) # Build calendar data prev_month_year, prev_month = add_months(year, month, -1) next_month_year, next_month = add_months(year, month, +1) prev_year = year - 1 next_year = year + 1 weeks = build_calendar_weeks(year, month) month_name = pycalendar.month_name[month] weekday_names = [pycalendar.day_abbr[i] for i in range(7)] # Get entries for this month period_start = datetime(year, month, 1, tzinfo=timezone.utc) next_y, next_m = add_months(year, month, +1) period_end = datetime(next_y, next_m, 1, tzinfo=timezone.utc) user = getattr(g, "user", None) session_id = qsession.get("calendar_sid") visible = await get_visible_entries_for_period( sess=g.s, calendar_id=calendar_obj.id, period_start=period_start, period_end=period_end, user=user, session_id=session_id, ) # Get associated entry IDs for this post post_id = g.post_data["post"]["id"] associated_entry_ids = await get_post_entry_ids(g.s, post_id) html = await render_template( "_types/post/admin/_calendar_view.html", calendar=calendar_obj, year=year, month=month, month_name=month_name, weekday_names=weekday_names, weeks=weeks, prev_month=prev_month, prev_month_year=prev_month_year, next_month=next_month, next_month_year=next_month_year, prev_year=prev_year, next_year=next_year, month_entries=visible.merged_entries, associated_entry_ids=associated_entry_ids, ) return await make_response(html) @bp.get("/entries/") @require_admin async def entries(slug: str): from ..services.entry_associations import get_post_entry_ids from models.calendars import Calendar from sqlalchemy import select post_id = g.post_data["post"]["id"] associated_entry_ids = await get_post_entry_ids(g.s, post_id) # Load ALL calendars (not just this post's calendars) result = await g.s.execute( select(Calendar) .where(Calendar.deleted_at.is_(None)) .order_by(Calendar.name.asc()) ) all_calendars = result.scalars().all() # Load entries and post for each calendar for calendar in all_calendars: await g.s.refresh(calendar, ["entries", "post"]) if not is_htmx_request(): html = await render_template( "_types/post_entries/index.html", all_calendars=all_calendars, associated_entry_ids=associated_entry_ids, ) else: html = await render_template( "_types/post_entries/_oob_elements.html", all_calendars=all_calendars, associated_entry_ids=associated_entry_ids, ) return await make_response(html) @bp.post("/entries//toggle/") @require_admin async def toggle_entry(slug: str, entry_id: int): from ..services.entry_associations import toggle_entry_association, get_post_entry_ids, get_associated_entries from models.calendars import Calendar from sqlalchemy import select from quart import jsonify post_id = g.post_data["post"]["id"] is_associated, error = await toggle_entry_association(g.s, post_id, entry_id) if error: return jsonify({"message": error, "errors": {}}), 400 await g.s.flush() # Return updated association status associated_entry_ids = await get_post_entry_ids(g.s, post_id) # Load ALL calendars result = await g.s.execute( select(Calendar) .where(Calendar.deleted_at.is_(None)) .order_by(Calendar.name.asc()) ) all_calendars = result.scalars().all() # Load entries and post for each calendar for calendar in all_calendars: await g.s.refresh(calendar, ["entries", "post"]) # Fetch associated entries for nav display associated_entries = await get_associated_entries(g.s, post_id) # Load calendars for this post (for nav display) calendars = ( await g.s.execute( select(Calendar) .where(Calendar.post_id == post_id, Calendar.deleted_at.is_(None)) .order_by(Calendar.name.asc()) ) ).scalars().all() # Return the associated entries admin list + OOB update for nav entries admin_list = await render_template( "_types/post/admin/_associated_entries.html", all_calendars=all_calendars, associated_entry_ids=associated_entry_ids, ) nav_entries_oob = await render_template( "_types/post/admin/_nav_entries_oob.html", associated_entries=associated_entries, calendars=calendars, post=g.post_data["post"], ) return await make_response(admin_list + nav_entries_oob) @bp.get("/settings/") @require_post_author async def settings(slug: str): from ...blog.ghost.ghost_posts import get_post_for_edit ghost_id = g.post_data["post"]["ghost_id"] ghost_post = await get_post_for_edit(ghost_id) save_success = request.args.get("saved") == "1" if not is_htmx_request(): html = await render_template( "_types/post_settings/index.html", ghost_post=ghost_post, save_success=save_success, ) else: html = await render_template( "_types/post_settings/_oob_elements.html", ghost_post=ghost_post, save_success=save_success, ) return await make_response(html) @bp.post("/settings/") @require_post_author async def settings_save(slug: str): from ...blog.ghost.ghost_posts import update_post_settings from ...blog.ghost.ghost_sync import sync_single_post from suma_browser.app.redis_cacher import invalidate_tag_cache ghost_id = g.post_data["post"]["ghost_id"] form = await request.form updated_at = form.get("updated_at", "") # Build kwargs — only include fields that were submitted kwargs: dict = {} # Text fields for field in ( "slug", "custom_template", "meta_title", "meta_description", "canonical_url", "og_image", "og_title", "og_description", "twitter_image", "twitter_title", "twitter_description", "feature_image_alt", ): val = form.get(field) if val is not None: kwargs[field] = val.strip() # Select fields visibility = form.get("visibility") if visibility is not None: kwargs["visibility"] = visibility # Datetime published_at = form.get("published_at", "").strip() if published_at: kwargs["published_at"] = published_at # Checkbox fields: present = True, absent = False kwargs["featured"] = form.get("featured") == "on" kwargs["email_only"] = form.get("email_only") == "on" # Tags — comma-separated string → list of {"name": "..."} dicts tags_str = form.get("tags", "").strip() if tags_str: kwargs["tags"] = [{"name": t.strip()} for t in tags_str.split(",") if t.strip()] else: kwargs["tags"] = [] # Update in Ghost await update_post_settings( ghost_id=ghost_id, updated_at=updated_at, **kwargs, ) # Sync to local DB await sync_single_post(g.s, ghost_id) await g.s.flush() # Clear caches await invalidate_tag_cache("blog") await invalidate_tag_cache("post.post_detail") return redirect(host_url(url_for("blog.post.admin.settings", slug=slug)) + "?saved=1") @bp.get("/edit/") @require_post_author async def edit(slug: str): from ...blog.ghost.ghost_posts import get_post_for_edit from models.ghost_membership_entities import GhostNewsletter from sqlalchemy import select as sa_select ghost_id = g.post_data["post"]["ghost_id"] ghost_post = await get_post_for_edit(ghost_id) save_success = request.args.get("saved") == "1" newsletters = (await g.s.execute( sa_select(GhostNewsletter).order_by(GhostNewsletter.name) )).scalars().all() if not is_htmx_request(): html = await render_template( "_types/post_edit/index.html", ghost_post=ghost_post, save_success=save_success, newsletters=newsletters, ) else: html = await render_template( "_types/post_edit/_oob_elements.html", ghost_post=ghost_post, save_success=save_success, newsletters=newsletters, ) return await make_response(html) @bp.post("/edit/") @require_post_author async def edit_save(slug: str): import json from ...blog.ghost.ghost_posts import update_post from ...blog.ghost.lexical_validator import validate_lexical from ...blog.ghost.ghost_sync import sync_single_post from suma_browser.app.redis_cacher import invalidate_tag_cache ghost_id = g.post_data["post"]["ghost_id"] form = await request.form title = form.get("title", "").strip() lexical_raw = form.get("lexical", "") updated_at = form.get("updated_at", "") status = form.get("status", "draft") publish_mode = form.get("publish_mode", "web") newsletter_slug = form.get("newsletter_slug", "").strip() or None feature_image = form.get("feature_image", "").strip() custom_excerpt = form.get("custom_excerpt", "").strip() feature_image_caption = form.get("feature_image_caption", "").strip() # Validate the lexical JSON try: lexical_doc = json.loads(lexical_raw) except (json.JSONDecodeError, TypeError): from ...blog.ghost.ghost_posts import get_post_for_edit ghost_post = await get_post_for_edit(ghost_id) html = await render_template( "_types/post_edit/index.html", ghost_post=ghost_post, save_error="Invalid JSON in editor content.", ) return await make_response(html, 400) ok, reason = validate_lexical(lexical_doc) if not ok: from ...blog.ghost.ghost_posts import get_post_for_edit ghost_post = await get_post_for_edit(ghost_id) html = await render_template( "_types/post_edit/index.html", ghost_post=ghost_post, save_error=reason, ) return await make_response(html, 400) # Update in Ghost (content save — no status change yet) ghost_post = await update_post( ghost_id=ghost_id, lexical_json=lexical_raw, title=title or None, updated_at=updated_at, feature_image=feature_image, custom_excerpt=custom_excerpt, feature_image_caption=feature_image_caption, ) # Publish workflow is_admin = bool((g.get("rights") or {}).get("admin")) publish_requested_msg = None # Guard: if already emailed, force publish_mode to "web" to prevent re-send already_emailed = bool(ghost_post.get("email") and ghost_post["email"].get("status")) if already_emailed and publish_mode in ("email", "both"): publish_mode = "web" if status == "published" and ghost_post.get("status") != "published" and not is_admin: # Non-admin requesting publish: don't send status to Ghost, set local flag publish_requested_msg = "Publish requested — an admin will review." elif status and status != ghost_post.get("status"): # Status is changing — determine email params based on publish_mode email_kwargs: dict = {} if status == "published" and publish_mode in ("email", "both") and newsletter_slug: email_kwargs["newsletter_slug"] = newsletter_slug email_kwargs["email_segment"] = "all" if publish_mode == "email": email_kwargs["email_only"] = True from ...blog.ghost.ghost_posts import update_post as _up ghost_post = await _up( ghost_id=ghost_id, lexical_json=lexical_raw, title=None, updated_at=ghost_post["updated_at"], status=status, **email_kwargs, ) # Sync to local DB await sync_single_post(g.s, ghost_id) await g.s.flush() # Handle publish_requested flag on the local post from models.ghost_content import Post from sqlalchemy import select as sa_select local_post = (await g.s.execute( sa_select(Post).where(Post.ghost_id == ghost_id) )).scalar_one_or_none() if local_post: if publish_requested_msg: local_post.publish_requested = True elif status == "published" and is_admin: local_post.publish_requested = False await g.s.flush() # Clear caches await invalidate_tag_cache("blog") await invalidate_tag_cache("post.post_detail") # Redirect to GET to avoid resubmit warning on refresh (PRG pattern) redirect_url = host_url(url_for("blog.post.admin.edit", slug=slug)) + "?saved=1" if publish_requested_msg: redirect_url += "&publish_requested=1" return redirect(redirect_url) @bp.get("/markets/") @require_admin async def markets(slug: str): """List markets for this page.""" from models.market_place import MarketPlace from sqlalchemy import select as sa_select post = (g.post_data or {}).get("post", {}) post_id = post.get("id") if not post_id: return await make_response("Post not found", 404) page_markets = (await g.s.execute( sa_select(MarketPlace).where( MarketPlace.post_id == post_id, MarketPlace.deleted_at.is_(None), ).order_by(MarketPlace.name) )).scalars().all() html = await render_template( "_types/post/admin/_markets_panel.html", markets=page_markets, post=post, ) return await make_response(html) @bp.post("/markets/new/") @require_admin async def create_market(slug: str): """Create a new market for this page.""" from ..services.markets import create_market as _create_market, MarketError from models.market_place import MarketPlace from sqlalchemy import select as sa_select from quart import jsonify post = (g.post_data or {}).get("post", {}) post_id = post.get("id") if not post_id: return jsonify({"error": "Post not found"}), 404 form = await request.form name = (form.get("name") or "").strip() try: await _create_market(g.s, post_id, name) except MarketError as e: return jsonify({"error": str(e)}), 400 # Return updated markets list page_markets = (await g.s.execute( sa_select(MarketPlace).where( MarketPlace.post_id == post_id, MarketPlace.deleted_at.is_(None), ).order_by(MarketPlace.name) )).scalars().all() html = await render_template( "_types/post/admin/_markets_panel.html", markets=page_markets, post=post, ) return await make_response(html) @bp.delete("/markets//") @require_admin async def delete_market(slug: str, market_slug: str): """Soft-delete a market.""" from ..services.markets import soft_delete_market from models.market_place import MarketPlace from sqlalchemy import select as sa_select from quart import jsonify post = (g.post_data or {}).get("post", {}) post_id = post.get("id") deleted = await soft_delete_market(g.s, slug, market_slug) if not deleted: return jsonify({"error": "Market not found"}), 404 # Return updated markets list page_markets = (await g.s.execute( sa_select(MarketPlace).where( MarketPlace.post_id == post_id, MarketPlace.deleted_at.is_(None), ).order_by(MarketPlace.name) )).scalars().all() html = await render_template( "_types/post/admin/_markets_panel.html", markets=page_markets, post=post, ) return await make_response(html) return bp