Files
rose-ash/blog/bp/post/admin/routes.py
giles c2fe142039
All checks were successful
Build and Deploy / build-and-deploy (push) Successful in 2m19s
Delete blog sx_components.py — move all rendering to callers
Move remaining 19 rendering functions from the 2487-line
sx_components.py to their direct callers:

- menu_items/routes.py: menu item form, page search, nav OOB
- post/admin/routes.py: calendar view, associated entries, nav OOB
- sxc/pages/__init__.py: editor panel, post data inspector, preview,
  entries browser, settings form, edit page editor
- bp/blog/routes.py: inline new post page composition

Move load_service_components() call from sx_components module-level
to setup_blog_pages() so .sx files still load at startup.

Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
2026-03-04 09:43:52 +00:00

746 lines
28 KiB
Python

from __future__ import annotations
from quart import (
make_response,
Blueprint,
g,
request,
redirect,
url_for,
)
from shared.browser.app.authz import require_admin, require_post_author
from markupsafe import escape
from shared.sx.helpers import sx_response, render_to_sx
from shared.sx.parser import SxExpr, serialize as sx_serialize
from shared.utils import host_url
def _raw_html_sx(html: str) -> str:
"""Wrap raw HTML in (raw! "...") so it's valid inside sx source."""
if not html:
return ""
return "(raw! " + sx_serialize(html) + ")"
def _post_to_edit_dict(post) -> dict:
"""Convert an ORM Post to a dict matching the shape templates expect.
The templates were written for Ghost Admin API responses, so we mimic
that structure (dot-access on dicts via Jinja) from ORM columns.
"""
d: dict = {}
for col in (
"id", "slug", "title", "html", "plaintext", "lexical", "mobiledoc",
"sx_content",
"feature_image", "feature_image_alt", "feature_image_caption",
"excerpt", "custom_excerpt", "visibility", "status", "featured",
"is_page", "email_only", "canonical_url",
"meta_title", "meta_description",
"og_image", "og_title", "og_description",
"twitter_image", "twitter_title", "twitter_description",
"custom_template", "reading_time", "comment_id",
):
d[col] = getattr(post, col, None)
# Timestamps as ISO strings (templates do [:16] slicing)
for ts in ("published_at", "updated_at", "created_at"):
val = getattr(post, ts, None)
d[ts] = val.isoformat() if val else ""
# Tags as list of dicts with .name (for Jinja map(attribute='name'))
if hasattr(post, "tags") and post.tags:
d["tags"] = [{"name": t.name, "slug": t.slug, "id": t.id} for t in post.tags]
else:
d["tags"] = []
# email/newsletter — not available without Ghost, set safe defaults
d["email"] = None
d["newsletter"] = None
return d
async def _render_features(features, post, result):
"""Render features panel via .sx defcomp."""
slug = post.get("slug", "")
return await render_to_sx("blog-features-panel-content",
features_url=host_url(url_for("blog.post.admin.update_features", slug=slug)),
calendar_checked=bool(features.get("calendar")),
market_checked=bool(features.get("market")),
show_sumup=bool(features.get("calendar") or features.get("market")),
sumup_url=host_url(url_for("blog.post.admin.update_sumup", slug=slug)),
merchant_code=result.get("sumup_merchant_code") or "",
placeholder="\u2022" * 8 if result.get("sumup_configured") else "sup_sk_...",
sumup_configured=result.get("sumup_configured", False),
checkout_prefix=result.get("sumup_checkout_prefix") or "",
)
def _serialize_markets(markets, slug):
"""Serialize ORM/DTO market objects to dicts for .sx defcomp."""
result = []
for m in markets:
m_name = getattr(m, "name", "") if hasattr(m, "name") else m.get("name", "")
m_slug = getattr(m, "slug", "") if hasattr(m, "slug") else m.get("slug", "")
result.append({
"name": m_name, "slug": m_slug,
"delete_url": host_url(url_for("blog.post.admin.delete_market",
slug=slug, market_slug=m_slug)),
})
return result
def _render_calendar_view(
calendar, year, month, month_name, weekday_names, weeks,
prev_month, prev_month_year, next_month, next_month_year,
prev_year, next_year, month_entries, associated_entry_ids,
post_slug: str,
) -> str:
"""Build calendar month grid HTML."""
from quart import url_for as qurl
from shared.browser.app.csrf import generate_csrf_token
esc = escape
csrf = generate_csrf_token()
cal_id = calendar.id
def cal_url(y, m):
return esc(host_url(qurl("blog.post.admin.calendar_view", slug=post_slug, calendar_id=cal_id, year=y, month=m)))
cur_url = cal_url(year, month)
toggle_url_fn = lambda eid: esc(host_url(qurl("blog.post.admin.toggle_entry", slug=post_slug, entry_id=eid)))
nav = (
f'<header class="flex items-center justify-center mb-4">'
f'<nav class="flex items-center gap-2 text-xl">'
f'<a class="px-2 py-1 hover:bg-stone-100 rounded" sx-get="{cal_url(prev_year, month)}" sx-target="#calendar-view-{cal_id}" sx-swap="outerHTML">&laquo;</a>'
f'<a class="px-2 py-1 hover:bg-stone-100 rounded" sx-get="{cal_url(prev_month_year, prev_month)}" sx-target="#calendar-view-{cal_id}" sx-swap="outerHTML">&lsaquo;</a>'
f'<div class="px-3 font-medium">{esc(month_name)} {year}</div>'
f'<a class="px-2 py-1 hover:bg-stone-100 rounded" sx-get="{cal_url(next_month_year, next_month)}" sx-target="#calendar-view-{cal_id}" sx-swap="outerHTML">&rsaquo;</a>'
f'<a class="px-2 py-1 hover:bg-stone-100 rounded" sx-get="{cal_url(next_year, month)}" sx-target="#calendar-view-{cal_id}" sx-swap="outerHTML">&raquo;</a>'
f'</nav></header>'
)
wd_cells = "".join(f'<div class="py-2">{esc(wd)}</div>' for wd in weekday_names)
wd_row = f'<div class="hidden sm:grid grid-cols-7 text-center text-xs font-semibold text-stone-700 bg-stone-50 border-b">{wd_cells}</div>'
cells: list[str] = []
for week in weeks:
for day in week:
extra_cls = " bg-stone-50 text-stone-400" if not day.in_month else ""
day_date = day.date
entry_btns: list[str] = []
for e in month_entries:
e_start = getattr(e, "start_at", None)
if not e_start or e_start.date() != day_date:
continue
e_id = getattr(e, "id", None)
e_name = esc(getattr(e, "name", ""))
t_url = toggle_url_fn(e_id)
hx_hdrs = f'{{"X-CSRFToken": "{csrf}"}}'
if e_id in associated_entry_ids:
entry_btns.append(
f'<div class="flex items-center gap-1 text-[10px] rounded px-1 py-0.5 bg-green-200 text-green-900">'
f'<span class="truncate flex-1">{e_name}</span>'
f'<button type="button" class="flex-shrink-0 hover:text-red-600"'
f' data-confirm data-confirm-title="Remove entry?"'
f' data-confirm-text="Remove {e_name} from this post?"'
f' data-confirm-icon="warning" data-confirm-confirm-text="Yes, remove it"'
f' data-confirm-cancel-text="Cancel" data-confirm-event="confirmed"'
f' sx-post="{t_url}" sx-trigger="confirmed"'
f' sx-target="#associated-entries-list" sx-swap="outerHTML"'
f""" sx-headers='{hx_hdrs}'"""
f' sx-on:afterSwap="document.body.dispatchEvent(new CustomEvent(\'entryToggled\'))"'
f'><i class="fa fa-times"></i></button></div>'
)
else:
entry_btns.append(
f'<button type="button" class="w-full text-left text-[10px] rounded px-1 py-0.5 bg-stone-100 text-stone-700 hover:bg-stone-200"'
f' data-confirm data-confirm-title="Add entry?"'
f' data-confirm-text="Add {e_name} to this post?"'
f' data-confirm-icon="question" data-confirm-confirm-text="Yes, add it"'
f' data-confirm-cancel-text="Cancel" data-confirm-event="confirmed"'
f' sx-post="{t_url}" sx-trigger="confirmed"'
f' sx-target="#associated-entries-list" sx-swap="outerHTML"'
f""" sx-headers='{hx_hdrs}'"""
f' sx-on:afterSwap="document.body.dispatchEvent(new CustomEvent(\'entryToggled\'))"'
f'><span class="truncate block">{e_name}</span></button>'
)
entries_html = '<div class="space-y-0.5">' + "".join(entry_btns) + '</div>' if entry_btns else ''
cells.append(
f'<div class="min-h-20 bg-white px-2 py-2 text-xs{extra_cls}">'
f'<div class="font-medium mb-1">{day_date.day}</div>{entries_html}</div>'
)
grid = f'<div class="grid grid-cols-1 sm:grid-cols-7 gap-px bg-stone-200">{"".join(cells)}</div>'
html = (
f'<div id="calendar-view-{cal_id}"'
f' sx-get="{cur_url}" sx-trigger="entryToggled from:body" sx-swap="outerHTML">'
f'{nav}'
f'<div class="rounded border bg-white">{wd_row}{grid}</div>'
f'</div>'
)
return _raw_html_sx(html)
async def _render_associated_entries(all_calendars, associated_entry_ids, post_slug: str) -> str:
"""Render the associated entries panel."""
from shared.browser.app.csrf import generate_csrf_token
from quart import url_for as qurl
csrf = generate_csrf_token()
has_entries = False
entry_items: list[str] = []
for calendar in all_calendars:
entries = getattr(calendar, "entries", []) or []
cal_name = getattr(calendar, "name", "")
cal_post = getattr(calendar, "post", None)
cal_fi = getattr(cal_post, "feature_image", None) if cal_post else None
cal_title = getattr(cal_post, "title", "") if cal_post else ""
for entry in entries:
e_id = getattr(entry, "id", None)
if e_id not in associated_entry_ids:
continue
if getattr(entry, "deleted_at", None) is not None:
continue
has_entries = True
e_name = getattr(entry, "name", "")
e_start = getattr(entry, "start_at", None)
e_end = getattr(entry, "end_at", None)
toggle_url = host_url(qurl("blog.post.admin.toggle_entry", slug=post_slug, entry_id=e_id))
img_sx = await render_to_sx("blog-entry-image", src=cal_fi, title=cal_title)
date_str = e_start.strftime("%A, %B %d, %Y at %H:%M") if e_start else ""
if e_end:
date_str += f" \u2013 {e_end.strftime('%H:%M')}"
entry_items.append(await render_to_sx("blog-associated-entry",
confirm_text=f"This will remove {e_name} from this post",
toggle_url=toggle_url,
hx_headers=f'{{"X-CSRFToken": "{csrf}"}}',
img=SxExpr(img_sx), name=e_name,
date_str=f"{cal_name} \u2022 {date_str}",
))
if has_entries:
content_sx = await render_to_sx("blog-associated-entries-content",
items=SxExpr("(<> " + " ".join(entry_items) + ")"),
)
else:
content_sx = await render_to_sx("blog-associated-entries-empty")
return await render_to_sx("blog-associated-entries-panel", content=SxExpr(content_sx))
async def _render_nav_entries_oob(associated_entries, calendars, post: dict) -> str:
"""Render the OOB nav entries swap."""
entries_list = []
if associated_entries and hasattr(associated_entries, "entries"):
entries_list = associated_entries.entries or []
has_items = bool(entries_list or calendars)
if not has_items:
return await render_to_sx("blog-nav-entries-empty")
select_colours = (
"[.hover-capable_&]:hover:bg-yellow-300"
" aria-selected:bg-stone-500 aria-selected:text-white"
" [.hover-capable_&[aria-selected=true]:hover]:bg-orange-500"
)
nav_cls = (
f"justify-center cursor-pointer flex flex-row items-center gap-2"
f" rounded bg-stone-200 text-black {select_colours} p-2"
)
post_slug = post.get("slug", "")
scroll_hs = (
"on load or scroll"
" if window.innerWidth >= 640 and my.scrollWidth > my.clientWidth"
" remove .hidden from .entries-nav-arrow add .flex to .entries-nav-arrow"
" else add .hidden to .entries-nav-arrow remove .flex from .entries-nav-arrow end"
)
item_parts = []
for entry in entries_list:
e_name = getattr(entry, "name", "")
e_start = getattr(entry, "start_at", None)
e_end = getattr(entry, "end_at", None)
cal_slug = getattr(entry, "calendar_slug", "")
if e_start:
entry_path = (
f"/{post_slug}/{cal_slug}/"
f"{e_start.year}/{e_start.month}/{e_start.day}"
f"/entries/{getattr(entry, 'id', '')}/"
)
date_str = e_start.strftime("%b %d, %Y at %H:%M")
if e_end:
date_str += f" \u2013 {e_end.strftime('%H:%M')}"
else:
entry_path = f"/{post_slug}/{cal_slug}/"
date_str = ""
item_parts.append(await render_to_sx("calendar-entry-nav",
href=entry_path, nav_class=nav_cls, name=e_name, date_str=date_str,
))
for calendar in (calendars or []):
cal_name = getattr(calendar, "name", "")
cal_slug = getattr(calendar, "slug", "")
cal_path = f"/{post_slug}/{cal_slug}/"
item_parts.append(await render_to_sx("blog-nav-calendar-item",
href=cal_path, nav_cls=nav_cls, name=cal_name,
))
items_sx = "(<> " + " ".join(item_parts) + ")" if item_parts else ""
return await render_to_sx("scroll-nav-wrapper",
wrapper_id="entries-calendars-nav-wrapper", container_id="associated-items-container",
arrow_cls="entries-nav-arrow",
left_hs="on click set #associated-items-container.scrollLeft to #associated-items-container.scrollLeft - 200",
scroll_hs=scroll_hs,
right_hs="on click set #associated-items-container.scrollLeft to #associated-items-container.scrollLeft + 200",
items=SxExpr(items_sx) if items_sx else None, oob=True,
)
def register():
bp = Blueprint("admin", __name__, url_prefix='/admin')
@bp.put("/features/")
@require_admin
async def update_features(slug: str):
"""Update PageConfig.features for a page (page_configs lives in db_cart)."""
from shared.infrastructure.actions import call_action
from quart import jsonify
post = g.post_data.get("post")
if not post or not post.get("is_page"):
return jsonify({"error": "This is not a page."}), 400
post_id = post["id"]
# Parse request body
body = await request.get_json()
if body is None:
form = await request.form
body = {}
for key in ("calendar", "market"):
val = form.get(key)
if val is not None:
body[key] = val in ("true", "1", "on")
if not isinstance(body, dict):
return jsonify({"error": "Expected JSON object with feature flags."}), 400
# Update via cart action (page_configs lives in db_cart)
result = await call_action("blog", "update-page-config", payload={
"container_type": "page",
"container_id": post_id,
"features": body,
})
features = result.get("features", {})
html = await _render_features(features, post, result)
return sx_response(html)
@bp.put("/admin/sumup/")
@require_admin
async def update_sumup(slug: str):
"""Update PageConfig SumUp credentials for a page (page_configs lives in db_cart)."""
from shared.infrastructure.actions import call_action
from quart import jsonify
post = g.post_data.get("post")
if not post or not post.get("is_page"):
return jsonify({"error": "This is not a page."}), 400
post_id = post["id"]
form = await request.form
merchant_code = (form.get("merchant_code") or "").strip()
api_key = (form.get("api_key") or "").strip()
checkout_prefix = (form.get("checkout_prefix") or "").strip()
payload: dict = {
"container_type": "page",
"container_id": post_id,
"sumup_merchant_code": merchant_code or None,
"sumup_checkout_prefix": checkout_prefix or None,
}
if api_key:
payload["sumup_api_key"] = api_key
result = await call_action("blog", "update-page-config", payload=payload)
features = result.get("features", {})
html = await _render_features(features, post, result)
return sx_response(html)
@bp.get("/entries/calendar/<int:calendar_id>/")
@require_admin
async def calendar_view(slug: str, calendar_id: int):
"""Show calendar month view for browsing entries"""
from shared.models.calendars import Calendar
from shared.utils.calendar_helpers import parse_int_arg, add_months, build_calendar_weeks
from shared.infrastructure.data_client import fetch_data
from shared.contracts.dtos import CalendarEntryDTO, dto_from_dict
from sqlalchemy import select
from datetime import datetime, timezone
import calendar as pycalendar
from quart import session as qsession
from ..services.entry_associations import get_post_entry_ids
# Get month/year from query params
today = datetime.now(timezone.utc).date()
month = parse_int_arg("month")
year = parse_int_arg("year")
if year is None:
year = today.year
if month is None or not (1 <= month <= 12):
month = today.month
# Load calendar
result = await g.s.execute(
select(Calendar).where(Calendar.id == calendar_id, Calendar.deleted_at.is_(None))
)
calendar_obj = result.scalar_one_or_none()
if not calendar_obj:
return await make_response("Calendar not found", 404)
# Build calendar data
prev_month_year, prev_month = add_months(year, month, -1)
next_month_year, next_month = add_months(year, month, +1)
prev_year = year - 1
next_year = year + 1
weeks = build_calendar_weeks(year, month)
month_name = pycalendar.month_name[month]
weekday_names = [pycalendar.day_abbr[i] for i in range(7)]
# Get entries for this month via events data endpoint
period_start = datetime(year, month, 1, tzinfo=timezone.utc)
next_y, next_m = add_months(year, month, +1)
period_end = datetime(next_y, next_m, 1, tzinfo=timezone.utc)
user = getattr(g, "user", None)
user_id = user.id if user else None
is_admin = bool(user and getattr(user, "is_admin", False))
session_id = qsession.get("calendar_sid")
raw_entries = await fetch_data("events", "visible-entries-for-period", params={
"calendar_id": calendar_obj.id,
"period_start": period_start.isoformat(),
"period_end": period_end.isoformat(),
"user_id": user_id,
"is_admin": str(is_admin).lower(),
"session_id": session_id,
}, required=False) or []
month_entries = [dto_from_dict(CalendarEntryDTO, e) for e in raw_entries]
# Get associated entry IDs for this post
post_id = g.post_data["post"]["id"]
associated_entry_ids = await get_post_entry_ids(post_id)
html = _render_calendar_view(
calendar_obj, year, month, month_name, weekday_names, weeks,
prev_month, prev_month_year, next_month, next_month_year,
prev_year, next_year, month_entries, associated_entry_ids,
g.post_data["post"]["slug"],
)
return sx_response(html)
@bp.post("/entries/<int:entry_id>/toggle/")
@require_admin
async def toggle_entry(slug: str, entry_id: int):
from ..services.entry_associations import toggle_entry_association, get_post_entry_ids, get_associated_entries
from shared.models.calendars import Calendar
from sqlalchemy import select
from quart import jsonify
post_id = g.post_data["post"]["id"]
is_associated, error = await toggle_entry_association(post_id, entry_id)
if error:
return jsonify({"message": error, "errors": {}}), 400
await g.s.flush()
# Return updated association status
associated_entry_ids = await get_post_entry_ids(post_id)
# Load ALL calendars
result = await g.s.execute(
select(Calendar)
.where(Calendar.deleted_at.is_(None))
.order_by(Calendar.name.asc())
)
all_calendars = result.scalars().all()
# Load entries and post for each calendar
for calendar in all_calendars:
await g.s.refresh(calendar, ["entries", "post"])
# Fetch associated entries for nav display
associated_entries = await get_associated_entries(post_id)
# Load calendars for this post (for nav display)
calendars = (
await g.s.execute(
select(Calendar)
.where(Calendar.container_type == "page", Calendar.container_id == post_id, Calendar.deleted_at.is_(None))
.order_by(Calendar.name.asc())
)
).scalars().all()
# Return the associated entries admin list + OOB update for nav entries
post = g.post_data["post"]
admin_list = await _render_associated_entries(all_calendars, associated_entry_ids, post["slug"])
nav_entries_html = await _render_nav_entries_oob(associated_entries, calendars, post)
return sx_response(admin_list + nav_entries_html)
@bp.post("/settings/")
@require_post_author
async def settings_save(slug: str):
from services.post_writer import update_post_settings, OptimisticLockError
from shared.browser.app.redis_cacher import invalidate_tag_cache
post_id = g.post_data["post"]["id"]
form = await request.form
updated_at = form.get("updated_at", "")
# Build kwargs — only include fields that were submitted
kwargs: dict = {}
# Text fields
for field in (
"slug", "custom_template", "meta_title", "meta_description",
"canonical_url", "og_image", "og_title", "og_description",
"twitter_image", "twitter_title", "twitter_description",
"feature_image_alt",
):
val = form.get(field)
if val is not None:
kwargs[field] = val.strip()
# Select fields
visibility = form.get("visibility")
if visibility is not None:
kwargs["visibility"] = visibility
# Datetime
published_at = form.get("published_at", "").strip()
if published_at:
kwargs["published_at"] = published_at
# Checkbox fields: present = True, absent = False
kwargs["featured"] = form.get("featured") == "on"
kwargs["email_only"] = form.get("email_only") == "on"
# Tags — comma-separated string → list of names
tags_str = form.get("tags", "").strip()
tag_names = [t.strip() for t in tags_str.split(",") if t.strip()] if tags_str else []
try:
post = await update_post_settings(
g.s,
post_id=post_id,
expected_updated_at=updated_at,
tag_names=tag_names,
**kwargs,
)
except OptimisticLockError:
from urllib.parse import quote
return redirect(
host_url(url_for("defpage_post_settings", slug=slug))
+ "?error=" + quote("Someone else edited this post. Please reload and try again.")
)
await g.s.flush()
# Clear caches
await invalidate_tag_cache("blog")
await invalidate_tag_cache("post.post_detail")
# Redirect using the (possibly new) slug
return redirect(host_url(url_for("defpage_post_settings", slug=post.slug)) + "?saved=1")
@bp.post("/edit/")
@require_post_author
async def edit_save(slug: str):
import json
from ...blog.ghost.lexical_validator import validate_lexical
from services.post_writer import update_post as writer_update, OptimisticLockError
from shared.browser.app.redis_cacher import invalidate_tag_cache
post_id = g.post_data["post"]["id"]
form = await request.form
title = form.get("title", "").strip()
lexical_raw = form.get("lexical", "")
updated_at = form.get("updated_at", "")
status = form.get("status", "draft")
feature_image = form.get("feature_image", "").strip()
custom_excerpt = form.get("custom_excerpt", "").strip()
feature_image_caption = form.get("feature_image_caption", "").strip()
# Validate the lexical JSON
from urllib.parse import quote
try:
lexical_doc = json.loads(lexical_raw)
except (json.JSONDecodeError, TypeError):
return redirect(host_url(url_for("defpage_post_edit", slug=slug)) + "?error=" + quote("Invalid JSON in editor content."))
ok, reason = validate_lexical(lexical_doc)
if not ok:
return redirect(host_url(url_for("defpage_post_edit", slug=slug)) + "?error=" + quote(reason))
# Publish workflow
is_admin = bool((g.get("rights") or {}).get("admin"))
publish_requested_msg = None
# Determine effective status
effective_status: str | None = None
current_status = g.post_data["post"].get("status", "draft")
if status == "published" and current_status != "published" and not is_admin:
# Non-admin requesting publish: keep as draft, set local flag
publish_requested_msg = "Publish requested — an admin will review."
elif status and status != current_status:
effective_status = status
sx_content_raw = form.get("sx_content", "").strip() or None
# Build optional kwargs — only pass sx_content if the form field was present
extra_kw: dict = {}
if "sx_content" in form:
extra_kw["sx_content"] = sx_content_raw
try:
post = await writer_update(
g.s,
post_id=post_id,
lexical_json=lexical_raw,
title=title or None,
expected_updated_at=updated_at,
feature_image=feature_image or None,
custom_excerpt=custom_excerpt or None,
feature_image_caption=feature_image_caption or None,
status=effective_status,
**extra_kw,
)
except OptimisticLockError:
return redirect(
host_url(url_for("defpage_post_edit", slug=slug))
+ "?error=" + quote("Someone else edited this post. Please reload and try again.")
)
# Handle publish_requested flag
if publish_requested_msg:
post.publish_requested = True
elif status == "published" and is_admin:
post.publish_requested = False
await g.s.flush()
# Clear caches
await invalidate_tag_cache("blog")
await invalidate_tag_cache("post.post_detail")
# Redirect to GET (PRG pattern) — use post.slug in case it changed
redirect_url = host_url(url_for("defpage_post_edit", slug=post.slug)) + "?saved=1"
if publish_requested_msg:
redirect_url += "&publish_requested=1"
return redirect(redirect_url)
async def _fetch_page_markets(post_id):
"""Fetch marketplaces for a page via market data endpoint."""
from shared.infrastructure.data_client import fetch_data
from shared.contracts.dtos import MarketPlaceDTO, dto_from_dict
raw = await fetch_data("market", "marketplaces-for-container",
params={"type": "page", "id": post_id}, required=False) or []
return [dto_from_dict(MarketPlaceDTO, m) for m in raw]
@bp.get("/markets/")
@require_admin
async def markets(slug: str):
"""List markets for this page."""
post = (g.post_data or {}).get("post", {})
post_id = post.get("id")
if not post_id:
return await make_response("Post not found", 404)
page_markets = await _fetch_page_markets(post_id)
slug = post.get("slug", "")
create_url = host_url(url_for("blog.post.admin.create_market", slug=slug))
html = await render_to_sx("blog-markets-panel-content",
markets=_serialize_markets(page_markets, slug), create_url=create_url)
return sx_response(html)
@bp.post("/markets/new/")
@require_admin
async def create_market(slug: str):
"""Create a new market for this page."""
from ..services.markets import create_market as _create_market, MarketError
from quart import jsonify
post = (g.post_data or {}).get("post", {})
post_id = post.get("id")
if not post_id:
return jsonify({"error": "Post not found"}), 404
form = await request.form
name = (form.get("name") or "").strip()
try:
await _create_market(g.s, post_id, name)
except MarketError as e:
return jsonify({"error": str(e)}), 400
# Return updated markets list
page_markets = await _fetch_page_markets(post_id)
slug = post.get("slug", "")
create_url = host_url(url_for("blog.post.admin.create_market", slug=slug))
html = await render_to_sx("blog-markets-panel-content",
markets=_serialize_markets(page_markets, slug), create_url=create_url)
return sx_response(html)
@bp.delete("/markets/<market_slug>/")
@require_admin
async def delete_market(slug: str, market_slug: str):
"""Soft-delete a market."""
from ..services.markets import soft_delete_market
from quart import jsonify
post = (g.post_data or {}).get("post", {})
post_id = post.get("id")
deleted = await soft_delete_market(g.s, slug, market_slug)
if not deleted:
return jsonify({"error": "Market not found"}), 404
# Return updated markets list
page_markets = await _fetch_page_markets(post_id)
slug = post.get("slug", "")
create_url = host_url(url_for("blog.post.admin.create_market", slug=slug))
html = await render_to_sx("blog-markets-panel-content",
markets=_serialize_markets(page_markets, slug), create_url=create_url)
return sx_response(html)
return bp