All checks were successful
Build and Deploy / build-and-deploy (push) Successful in 3m41s
Replace Python GET page handlers with declarative defpage definitions in .sx files across all 8 apps (sx docs, orders, account, market, cart, federation, events, blog). Each app now has sxc/pages/ with setup functions, layout registrations, page helpers, and .sx defpage declarations. Core infrastructure: add g I/O primitive, PageDef support for auth/layout/ data/content/filter/aside/menu slots, post_author auth level, and custom layout registration. Remove ~1400 lines of render_*_page/render_*_oob boilerplate. Update all endpoint references in routes, sx_components, and templates to defpage_* naming. Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
638 lines
24 KiB
Python
638 lines
24 KiB
Python
from __future__ import annotations
|
|
|
|
|
|
from quart import (
|
|
make_response,
|
|
Blueprint,
|
|
g,
|
|
request,
|
|
redirect,
|
|
url_for,
|
|
)
|
|
from shared.browser.app.authz import require_admin, require_post_author
|
|
from shared.browser.app.utils.htmx import is_htmx_request
|
|
from shared.sx.helpers import sx_response
|
|
from shared.utils import host_url
|
|
|
|
def _post_to_edit_dict(post) -> dict:
|
|
"""Convert an ORM Post to a dict matching the shape templates expect.
|
|
|
|
The templates were written for Ghost Admin API responses, so we mimic
|
|
that structure (dot-access on dicts via Jinja) from ORM columns.
|
|
"""
|
|
d: dict = {}
|
|
for col in (
|
|
"id", "slug", "title", "html", "plaintext", "lexical", "mobiledoc",
|
|
"sx_content",
|
|
"feature_image", "feature_image_alt", "feature_image_caption",
|
|
"excerpt", "custom_excerpt", "visibility", "status", "featured",
|
|
"is_page", "email_only", "canonical_url",
|
|
"meta_title", "meta_description",
|
|
"og_image", "og_title", "og_description",
|
|
"twitter_image", "twitter_title", "twitter_description",
|
|
"custom_template", "reading_time", "comment_id",
|
|
):
|
|
d[col] = getattr(post, col, None)
|
|
|
|
# Timestamps as ISO strings (templates do [:16] slicing)
|
|
for ts in ("published_at", "updated_at", "created_at"):
|
|
val = getattr(post, ts, None)
|
|
d[ts] = val.isoformat() if val else ""
|
|
|
|
# Tags as list of dicts with .name (for Jinja map(attribute='name'))
|
|
if hasattr(post, "tags") and post.tags:
|
|
d["tags"] = [{"name": t.name, "slug": t.slug, "id": t.id} for t in post.tags]
|
|
else:
|
|
d["tags"] = []
|
|
|
|
# email/newsletter — not available without Ghost, set safe defaults
|
|
d["email"] = None
|
|
d["newsletter"] = None
|
|
|
|
return d
|
|
|
|
|
|
def register():
|
|
bp = Blueprint("admin", __name__, url_prefix='/admin')
|
|
|
|
@bp.before_request
|
|
async def _prepare_page_data():
|
|
ep = request.endpoint or ""
|
|
if "defpage_post_admin" in ep:
|
|
from sqlalchemy import select
|
|
from shared.models.page_config import PageConfig
|
|
post = (g.post_data or {}).get("post", {})
|
|
features = {}
|
|
sumup_configured = False
|
|
sumup_merchant_code = ""
|
|
sumup_checkout_prefix = ""
|
|
if post.get("is_page"):
|
|
pc = (await g.s.execute(
|
|
select(PageConfig).where(
|
|
PageConfig.container_type == "page",
|
|
PageConfig.container_id == post["id"],
|
|
)
|
|
)).scalar_one_or_none()
|
|
if pc:
|
|
features = pc.features or {}
|
|
sumup_configured = bool(pc.sumup_api_key)
|
|
sumup_merchant_code = pc.sumup_merchant_code or ""
|
|
sumup_checkout_prefix = pc.sumup_checkout_prefix or ""
|
|
from shared.sx.page import get_template_context
|
|
from sx.sx_components import _post_admin_main_panel_sx
|
|
tctx = await get_template_context()
|
|
tctx.update({
|
|
"features": features,
|
|
"sumup_configured": sumup_configured,
|
|
"sumup_merchant_code": sumup_merchant_code,
|
|
"sumup_checkout_prefix": sumup_checkout_prefix,
|
|
})
|
|
g.post_admin_content = _post_admin_main_panel_sx(tctx)
|
|
|
|
elif "defpage_post_data" in ep:
|
|
from shared.sx.page import get_template_context
|
|
from sx.sx_components import _post_data_content_sx
|
|
tctx = await get_template_context()
|
|
g.post_data_content = _post_data_content_sx(tctx)
|
|
|
|
elif "defpage_post_preview" in ep:
|
|
from models.ghost_content import Post
|
|
from sqlalchemy import select as sa_select
|
|
post_id = g.post_data["post"]["id"]
|
|
post = (await g.s.execute(
|
|
sa_select(Post).where(Post.id == post_id)
|
|
)).scalar_one_or_none()
|
|
preview_ctx = {}
|
|
sx_content = getattr(post, "sx_content", None) or ""
|
|
if sx_content:
|
|
from shared.sx.prettify import sx_to_pretty_sx
|
|
preview_ctx["sx_pretty"] = sx_to_pretty_sx(sx_content)
|
|
lexical_raw = getattr(post, "lexical", None) or ""
|
|
if lexical_raw:
|
|
from shared.sx.prettify import json_to_pretty_sx
|
|
preview_ctx["json_pretty"] = json_to_pretty_sx(lexical_raw)
|
|
if sx_content:
|
|
from shared.sx.parser import parse as sx_parse
|
|
from shared.sx.html import render as sx_html_render
|
|
from shared.sx.jinja_bridge import _COMPONENT_ENV
|
|
try:
|
|
parsed = sx_parse(sx_content)
|
|
preview_ctx["sx_rendered"] = sx_html_render(parsed, dict(_COMPONENT_ENV))
|
|
except Exception:
|
|
preview_ctx["sx_rendered"] = "<em>Error rendering sx</em>"
|
|
if lexical_raw:
|
|
from bp.blog.ghost.lexical_renderer import render_lexical
|
|
try:
|
|
preview_ctx["lex_rendered"] = render_lexical(lexical_raw)
|
|
except Exception:
|
|
preview_ctx["lex_rendered"] = "<em>Error rendering lexical</em>"
|
|
from shared.sx.page import get_template_context
|
|
from sx.sx_components import _preview_main_panel_sx
|
|
tctx = await get_template_context()
|
|
tctx.update(preview_ctx)
|
|
g.post_preview_content = _preview_main_panel_sx(tctx)
|
|
|
|
elif "defpage_post_entries" in ep:
|
|
from sqlalchemy import select
|
|
from shared.models.calendars import Calendar
|
|
from ..services.entry_associations import get_post_entry_ids
|
|
post_id = g.post_data["post"]["id"]
|
|
associated_entry_ids = await get_post_entry_ids(post_id)
|
|
result = await g.s.execute(
|
|
select(Calendar)
|
|
.where(Calendar.deleted_at.is_(None))
|
|
.order_by(Calendar.name.asc())
|
|
)
|
|
all_calendars = result.scalars().all()
|
|
for calendar in all_calendars:
|
|
await g.s.refresh(calendar, ["entries", "post"])
|
|
from shared.sx.page import get_template_context
|
|
from sx.sx_components import _post_entries_content_sx
|
|
tctx = await get_template_context()
|
|
tctx["all_calendars"] = all_calendars
|
|
tctx["associated_entry_ids"] = associated_entry_ids
|
|
g.post_entries_content = _post_entries_content_sx(tctx)
|
|
|
|
elif "defpage_post_settings" in ep:
|
|
from models.ghost_content import Post
|
|
from sqlalchemy import select as sa_select
|
|
from sqlalchemy.orm import selectinload
|
|
post_id = g.post_data["post"]["id"]
|
|
post = (await g.s.execute(
|
|
sa_select(Post)
|
|
.where(Post.id == post_id)
|
|
.options(selectinload(Post.tags))
|
|
)).scalar_one_or_none()
|
|
ghost_post = _post_to_edit_dict(post) if post else {}
|
|
save_success = request.args.get("saved") == "1"
|
|
from shared.sx.page import get_template_context
|
|
from sx.sx_components import _post_settings_content_sx
|
|
tctx = await get_template_context()
|
|
tctx["ghost_post"] = ghost_post
|
|
tctx["save_success"] = save_success
|
|
g.post_settings_content = _post_settings_content_sx(tctx)
|
|
|
|
elif "defpage_post_edit" in ep:
|
|
from models.ghost_content import Post
|
|
from sqlalchemy import select as sa_select
|
|
from sqlalchemy.orm import selectinload
|
|
from shared.infrastructure.data_client import fetch_data
|
|
post_id = g.post_data["post"]["id"]
|
|
post = (await g.s.execute(
|
|
sa_select(Post)
|
|
.where(Post.id == post_id)
|
|
.options(selectinload(Post.tags))
|
|
)).scalar_one_or_none()
|
|
ghost_post = _post_to_edit_dict(post) if post else {}
|
|
save_success = request.args.get("saved") == "1"
|
|
save_error = request.args.get("error", "")
|
|
raw_newsletters = await fetch_data("account", "newsletters", required=False) or []
|
|
from types import SimpleNamespace
|
|
newsletters = [SimpleNamespace(**nl) for nl in raw_newsletters]
|
|
from shared.sx.page import get_template_context
|
|
from sx.sx_components import _post_edit_content_sx
|
|
tctx = await get_template_context()
|
|
tctx["ghost_post"] = ghost_post
|
|
tctx["save_success"] = save_success
|
|
tctx["save_error"] = save_error
|
|
tctx["newsletters"] = newsletters
|
|
g.post_edit_content = _post_edit_content_sx(tctx)
|
|
|
|
from shared.sx.pages import mount_pages
|
|
mount_pages(bp, "blog", names=[
|
|
"post-admin", "post-data", "post-preview",
|
|
"post-entries", "post-settings", "post-edit",
|
|
])
|
|
|
|
@bp.put("/features/")
|
|
@require_admin
|
|
async def update_features(slug: str):
|
|
"""Update PageConfig.features for a page (page_configs lives in db_cart)."""
|
|
from shared.infrastructure.actions import call_action
|
|
from quart import jsonify
|
|
|
|
post = g.post_data.get("post")
|
|
if not post or not post.get("is_page"):
|
|
return jsonify({"error": "This is not a page."}), 400
|
|
|
|
post_id = post["id"]
|
|
|
|
# Parse request body
|
|
body = await request.get_json()
|
|
if body is None:
|
|
form = await request.form
|
|
body = {}
|
|
for key in ("calendar", "market"):
|
|
val = form.get(key)
|
|
if val is not None:
|
|
body[key] = val in ("true", "1", "on")
|
|
|
|
if not isinstance(body, dict):
|
|
return jsonify({"error": "Expected JSON object with feature flags."}), 400
|
|
|
|
# Update via cart action (page_configs lives in db_cart)
|
|
result = await call_action("blog", "update-page-config", payload={
|
|
"container_type": "page",
|
|
"container_id": post_id,
|
|
"features": body,
|
|
})
|
|
|
|
features = result.get("features", {})
|
|
|
|
from sx.sx_components import render_features_panel
|
|
html = render_features_panel(
|
|
features, post,
|
|
sumup_configured=result.get("sumup_configured", False),
|
|
sumup_merchant_code=result.get("sumup_merchant_code") or "",
|
|
sumup_checkout_prefix=result.get("sumup_checkout_prefix") or "",
|
|
)
|
|
return sx_response(html)
|
|
|
|
@bp.put("/admin/sumup/")
|
|
@require_admin
|
|
async def update_sumup(slug: str):
|
|
"""Update PageConfig SumUp credentials for a page (page_configs lives in db_cart)."""
|
|
from shared.infrastructure.actions import call_action
|
|
from quart import jsonify
|
|
|
|
post = g.post_data.get("post")
|
|
if not post or not post.get("is_page"):
|
|
return jsonify({"error": "This is not a page."}), 400
|
|
|
|
post_id = post["id"]
|
|
|
|
form = await request.form
|
|
merchant_code = (form.get("merchant_code") or "").strip()
|
|
api_key = (form.get("api_key") or "").strip()
|
|
checkout_prefix = (form.get("checkout_prefix") or "").strip()
|
|
|
|
payload: dict = {
|
|
"container_type": "page",
|
|
"container_id": post_id,
|
|
"sumup_merchant_code": merchant_code or None,
|
|
"sumup_checkout_prefix": checkout_prefix or None,
|
|
}
|
|
if api_key:
|
|
payload["sumup_api_key"] = api_key
|
|
|
|
result = await call_action("blog", "update-page-config", payload=payload)
|
|
|
|
features = result.get("features", {})
|
|
from sx.sx_components import render_features_panel
|
|
html = render_features_panel(
|
|
features, post,
|
|
sumup_configured=result.get("sumup_configured", False),
|
|
sumup_merchant_code=result.get("sumup_merchant_code") or "",
|
|
sumup_checkout_prefix=result.get("sumup_checkout_prefix") or "",
|
|
)
|
|
return sx_response(html)
|
|
|
|
@bp.get("/entries/calendar/<int:calendar_id>/")
|
|
@require_admin
|
|
async def calendar_view(slug: str, calendar_id: int):
|
|
"""Show calendar month view for browsing entries"""
|
|
from shared.models.calendars import Calendar
|
|
from shared.utils.calendar_helpers import parse_int_arg, add_months, build_calendar_weeks
|
|
from shared.infrastructure.data_client import fetch_data
|
|
from shared.contracts.dtos import CalendarEntryDTO, dto_from_dict
|
|
from sqlalchemy import select
|
|
from datetime import datetime, timezone
|
|
import calendar as pycalendar
|
|
from quart import session as qsession
|
|
from ..services.entry_associations import get_post_entry_ids
|
|
|
|
# Get month/year from query params
|
|
today = datetime.now(timezone.utc).date()
|
|
month = parse_int_arg("month")
|
|
year = parse_int_arg("year")
|
|
|
|
if year is None:
|
|
year = today.year
|
|
if month is None or not (1 <= month <= 12):
|
|
month = today.month
|
|
|
|
# Load calendar
|
|
result = await g.s.execute(
|
|
select(Calendar).where(Calendar.id == calendar_id, Calendar.deleted_at.is_(None))
|
|
)
|
|
calendar_obj = result.scalar_one_or_none()
|
|
if not calendar_obj:
|
|
return await make_response("Calendar not found", 404)
|
|
|
|
# Build calendar data
|
|
prev_month_year, prev_month = add_months(year, month, -1)
|
|
next_month_year, next_month = add_months(year, month, +1)
|
|
prev_year = year - 1
|
|
next_year = year + 1
|
|
|
|
weeks = build_calendar_weeks(year, month)
|
|
month_name = pycalendar.month_name[month]
|
|
weekday_names = [pycalendar.day_abbr[i] for i in range(7)]
|
|
|
|
# Get entries for this month via events data endpoint
|
|
period_start = datetime(year, month, 1, tzinfo=timezone.utc)
|
|
next_y, next_m = add_months(year, month, +1)
|
|
period_end = datetime(next_y, next_m, 1, tzinfo=timezone.utc)
|
|
|
|
user = getattr(g, "user", None)
|
|
user_id = user.id if user else None
|
|
is_admin = bool(user and getattr(user, "is_admin", False))
|
|
session_id = qsession.get("calendar_sid")
|
|
|
|
raw_entries = await fetch_data("events", "visible-entries-for-period", params={
|
|
"calendar_id": calendar_obj.id,
|
|
"period_start": period_start.isoformat(),
|
|
"period_end": period_end.isoformat(),
|
|
"user_id": user_id,
|
|
"is_admin": str(is_admin).lower(),
|
|
"session_id": session_id,
|
|
}, required=False) or []
|
|
month_entries = [dto_from_dict(CalendarEntryDTO, e) for e in raw_entries]
|
|
|
|
# Get associated entry IDs for this post
|
|
post_id = g.post_data["post"]["id"]
|
|
associated_entry_ids = await get_post_entry_ids(post_id)
|
|
|
|
from sx.sx_components import render_calendar_view
|
|
html = render_calendar_view(
|
|
calendar_obj, year, month, month_name, weekday_names, weeks,
|
|
prev_month, prev_month_year, next_month, next_month_year,
|
|
prev_year, next_year, month_entries, associated_entry_ids,
|
|
g.post_data["post"]["slug"],
|
|
)
|
|
return sx_response(html)
|
|
|
|
@bp.post("/entries/<int:entry_id>/toggle/")
|
|
@require_admin
|
|
async def toggle_entry(slug: str, entry_id: int):
|
|
from ..services.entry_associations import toggle_entry_association, get_post_entry_ids, get_associated_entries
|
|
from shared.models.calendars import Calendar
|
|
from sqlalchemy import select
|
|
from quart import jsonify
|
|
|
|
post_id = g.post_data["post"]["id"]
|
|
is_associated, error = await toggle_entry_association(post_id, entry_id)
|
|
|
|
if error:
|
|
return jsonify({"message": error, "errors": {}}), 400
|
|
|
|
await g.s.flush()
|
|
|
|
# Return updated association status
|
|
associated_entry_ids = await get_post_entry_ids(post_id)
|
|
|
|
# Load ALL calendars
|
|
result = await g.s.execute(
|
|
select(Calendar)
|
|
.where(Calendar.deleted_at.is_(None))
|
|
.order_by(Calendar.name.asc())
|
|
)
|
|
all_calendars = result.scalars().all()
|
|
|
|
# Load entries and post for each calendar
|
|
for calendar in all_calendars:
|
|
await g.s.refresh(calendar, ["entries", "post"])
|
|
|
|
# Fetch associated entries for nav display
|
|
associated_entries = await get_associated_entries(post_id)
|
|
|
|
# Load calendars for this post (for nav display)
|
|
calendars = (
|
|
await g.s.execute(
|
|
select(Calendar)
|
|
.where(Calendar.container_type == "page", Calendar.container_id == post_id, Calendar.deleted_at.is_(None))
|
|
.order_by(Calendar.name.asc())
|
|
)
|
|
).scalars().all()
|
|
|
|
# Return the associated entries admin list + OOB update for nav entries
|
|
from sx.sx_components import render_associated_entries, render_nav_entries_oob
|
|
|
|
post = g.post_data["post"]
|
|
admin_list = render_associated_entries(all_calendars, associated_entry_ids, post["slug"])
|
|
nav_entries_html = render_nav_entries_oob(associated_entries, calendars, post)
|
|
|
|
return sx_response(admin_list + nav_entries_html)
|
|
|
|
@bp.post("/settings/")
|
|
@require_post_author
|
|
async def settings_save(slug: str):
|
|
from services.post_writer import update_post_settings, OptimisticLockError
|
|
from shared.browser.app.redis_cacher import invalidate_tag_cache
|
|
|
|
post_id = g.post_data["post"]["id"]
|
|
form = await request.form
|
|
|
|
updated_at = form.get("updated_at", "")
|
|
|
|
# Build kwargs — only include fields that were submitted
|
|
kwargs: dict = {}
|
|
|
|
# Text fields
|
|
for field in (
|
|
"slug", "custom_template", "meta_title", "meta_description",
|
|
"canonical_url", "og_image", "og_title", "og_description",
|
|
"twitter_image", "twitter_title", "twitter_description",
|
|
"feature_image_alt",
|
|
):
|
|
val = form.get(field)
|
|
if val is not None:
|
|
kwargs[field] = val.strip()
|
|
|
|
# Select fields
|
|
visibility = form.get("visibility")
|
|
if visibility is not None:
|
|
kwargs["visibility"] = visibility
|
|
|
|
# Datetime
|
|
published_at = form.get("published_at", "").strip()
|
|
if published_at:
|
|
kwargs["published_at"] = published_at
|
|
|
|
# Checkbox fields: present = True, absent = False
|
|
kwargs["featured"] = form.get("featured") == "on"
|
|
kwargs["email_only"] = form.get("email_only") == "on"
|
|
|
|
# Tags — comma-separated string → list of names
|
|
tags_str = form.get("tags", "").strip()
|
|
tag_names = [t.strip() for t in tags_str.split(",") if t.strip()] if tags_str else []
|
|
|
|
try:
|
|
post = await update_post_settings(
|
|
g.s,
|
|
post_id=post_id,
|
|
expected_updated_at=updated_at,
|
|
tag_names=tag_names,
|
|
**kwargs,
|
|
)
|
|
except OptimisticLockError:
|
|
from urllib.parse import quote
|
|
return redirect(
|
|
host_url(url_for("blog.post.admin.defpage_post_settings", slug=slug))
|
|
+ "?error=" + quote("Someone else edited this post. Please reload and try again.")
|
|
)
|
|
|
|
await g.s.flush()
|
|
|
|
# Clear caches
|
|
await invalidate_tag_cache("blog")
|
|
await invalidate_tag_cache("post.post_detail")
|
|
|
|
# Redirect using the (possibly new) slug
|
|
return redirect(host_url(url_for("blog.post.admin.defpage_post_settings", slug=post.slug)) + "?saved=1")
|
|
|
|
@bp.post("/edit/")
|
|
@require_post_author
|
|
async def edit_save(slug: str):
|
|
import json
|
|
from ...blog.ghost.lexical_validator import validate_lexical
|
|
from services.post_writer import update_post as writer_update, OptimisticLockError
|
|
from shared.browser.app.redis_cacher import invalidate_tag_cache
|
|
|
|
post_id = g.post_data["post"]["id"]
|
|
form = await request.form
|
|
title = form.get("title", "").strip()
|
|
lexical_raw = form.get("lexical", "")
|
|
updated_at = form.get("updated_at", "")
|
|
status = form.get("status", "draft")
|
|
feature_image = form.get("feature_image", "").strip()
|
|
custom_excerpt = form.get("custom_excerpt", "").strip()
|
|
feature_image_caption = form.get("feature_image_caption", "").strip()
|
|
|
|
# Validate the lexical JSON
|
|
from urllib.parse import quote
|
|
try:
|
|
lexical_doc = json.loads(lexical_raw)
|
|
except (json.JSONDecodeError, TypeError):
|
|
return redirect(host_url(url_for("blog.post.admin.defpage_post_edit", slug=slug)) + "?error=" + quote("Invalid JSON in editor content."))
|
|
|
|
ok, reason = validate_lexical(lexical_doc)
|
|
if not ok:
|
|
return redirect(host_url(url_for("blog.post.admin.defpage_post_edit", slug=slug)) + "?error=" + quote(reason))
|
|
|
|
# Publish workflow
|
|
is_admin = bool((g.get("rights") or {}).get("admin"))
|
|
publish_requested_msg = None
|
|
|
|
# Determine effective status
|
|
effective_status: str | None = None
|
|
current_status = g.post_data["post"].get("status", "draft")
|
|
|
|
if status == "published" and current_status != "published" and not is_admin:
|
|
# Non-admin requesting publish: keep as draft, set local flag
|
|
publish_requested_msg = "Publish requested — an admin will review."
|
|
elif status and status != current_status:
|
|
effective_status = status
|
|
|
|
sx_content_raw = form.get("sx_content", "").strip() or None
|
|
# Build optional kwargs — only pass sx_content if the form field was present
|
|
extra_kw: dict = {}
|
|
if "sx_content" in form:
|
|
extra_kw["sx_content"] = sx_content_raw
|
|
try:
|
|
post = await writer_update(
|
|
g.s,
|
|
post_id=post_id,
|
|
lexical_json=lexical_raw,
|
|
title=title or None,
|
|
expected_updated_at=updated_at,
|
|
feature_image=feature_image or None,
|
|
custom_excerpt=custom_excerpt or None,
|
|
feature_image_caption=feature_image_caption or None,
|
|
status=effective_status,
|
|
**extra_kw,
|
|
)
|
|
except OptimisticLockError:
|
|
return redirect(
|
|
host_url(url_for("blog.post.admin.defpage_post_edit", slug=slug))
|
|
+ "?error=" + quote("Someone else edited this post. Please reload and try again.")
|
|
)
|
|
|
|
# Handle publish_requested flag
|
|
if publish_requested_msg:
|
|
post.publish_requested = True
|
|
elif status == "published" and is_admin:
|
|
post.publish_requested = False
|
|
await g.s.flush()
|
|
|
|
# Clear caches
|
|
await invalidate_tag_cache("blog")
|
|
await invalidate_tag_cache("post.post_detail")
|
|
|
|
# Redirect to GET (PRG pattern) — use post.slug in case it changed
|
|
redirect_url = host_url(url_for("blog.post.admin.defpage_post_edit", slug=post.slug)) + "?saved=1"
|
|
if publish_requested_msg:
|
|
redirect_url += "&publish_requested=1"
|
|
return redirect(redirect_url)
|
|
|
|
|
|
async def _fetch_page_markets(post_id):
|
|
"""Fetch marketplaces for a page via market data endpoint."""
|
|
from shared.infrastructure.data_client import fetch_data
|
|
from shared.contracts.dtos import MarketPlaceDTO, dto_from_dict
|
|
raw = await fetch_data("market", "marketplaces-for-container",
|
|
params={"type": "page", "id": post_id}, required=False) or []
|
|
return [dto_from_dict(MarketPlaceDTO, m) for m in raw]
|
|
|
|
@bp.get("/markets/")
|
|
@require_admin
|
|
async def markets(slug: str):
|
|
"""List markets for this page."""
|
|
post = (g.post_data or {}).get("post", {})
|
|
post_id = post.get("id")
|
|
if not post_id:
|
|
return await make_response("Post not found", 404)
|
|
|
|
page_markets = await _fetch_page_markets(post_id)
|
|
|
|
from sx.sx_components import render_markets_panel
|
|
return sx_response(render_markets_panel(page_markets, post))
|
|
|
|
@bp.post("/markets/new/")
|
|
@require_admin
|
|
async def create_market(slug: str):
|
|
"""Create a new market for this page."""
|
|
from ..services.markets import create_market as _create_market, MarketError
|
|
from quart import jsonify
|
|
|
|
post = (g.post_data or {}).get("post", {})
|
|
post_id = post.get("id")
|
|
if not post_id:
|
|
return jsonify({"error": "Post not found"}), 404
|
|
|
|
form = await request.form
|
|
name = (form.get("name") or "").strip()
|
|
|
|
try:
|
|
await _create_market(g.s, post_id, name)
|
|
except MarketError as e:
|
|
return jsonify({"error": str(e)}), 400
|
|
|
|
# Return updated markets list
|
|
page_markets = await _fetch_page_markets(post_id)
|
|
|
|
from sx.sx_components import render_markets_panel
|
|
return sx_response(render_markets_panel(page_markets, post))
|
|
|
|
@bp.delete("/markets/<market_slug>/")
|
|
@require_admin
|
|
async def delete_market(slug: str, market_slug: str):
|
|
"""Soft-delete a market."""
|
|
from ..services.markets import soft_delete_market
|
|
from quart import jsonify
|
|
|
|
post = (g.post_data or {}).get("post", {})
|
|
post_id = post.get("id")
|
|
|
|
deleted = await soft_delete_market(g.s, slug, market_slug)
|
|
if not deleted:
|
|
return jsonify({"error": "Market not found"}), 404
|
|
|
|
# Return updated markets list
|
|
page_markets = await _fetch_page_markets(post_id)
|
|
|
|
from sx.sx_components import render_markets_panel
|
|
return sx_response(render_markets_panel(page_markets, post))
|
|
|
|
return bp
|