Fix page editing: use Ghost /pages/ API for pages, not /posts/
All checks were successful
Build and Deploy / build-and-deploy (push) Successful in 56s

Ghost has separate /posts/ and /pages/ endpoints. All admin functions
(get_post_for_edit, update_post, update_post_settings, sync) were
hardcoded to /posts/, causing 404s when editing pages. Now checks
is_page from post_data and uses the correct endpoint.

Also uses sync_single_page (not sync_single_post) after page saves
to prevent IntegrityError from mismatched fetch/upsert paths.

Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
This commit is contained in:
giles
2026-02-21 22:03:41 +00:00
parent 7f52f59fe0
commit 23fe8c233e
2 changed files with 37 additions and 19 deletions

View File

@@ -30,10 +30,11 @@ def _check(resp: httpx.Response) -> None:
resp.raise_for_status() resp.raise_for_status()
async def get_post_for_edit(ghost_id: str) -> dict | None: async def get_post_for_edit(ghost_id: str, *, is_page: bool = False) -> dict | None:
"""Fetch a single post by Ghost ID, including lexical source.""" """Fetch a single post/page by Ghost ID, including lexical source."""
resource = "pages" if is_page else "posts"
url = ( url = (
f"{GHOST_ADMIN_API_URL}/posts/{ghost_id}/" f"{GHOST_ADMIN_API_URL}/{resource}/{ghost_id}/"
"?formats=lexical,html,mobiledoc&include=newsletters" "?formats=lexical,html,mobiledoc&include=newsletters"
) )
async with httpx.AsyncClient(timeout=30) as client: async with httpx.AsyncClient(timeout=30) as client:
@@ -41,7 +42,7 @@ async def get_post_for_edit(ghost_id: str) -> dict | None:
if resp.status_code == 404: if resp.status_code == 404:
return None return None
_check(resp) _check(resp)
return resp.json()["posts"][0] return resp.json()[resource][0]
async def create_post( async def create_post(
@@ -114,6 +115,7 @@ async def update_post(
newsletter_slug: str | None = None, newsletter_slug: str | None = None,
email_segment: str | None = None, email_segment: str | None = None,
email_only: bool | None = None, email_only: bool | None = None,
is_page: bool = False,
) -> dict: ) -> dict:
"""Update an existing Ghost post. Returns the updated post dict. """Update an existing Ghost post. Returns the updated post dict.
@@ -141,9 +143,10 @@ async def update_post(
post_body["status"] = status post_body["status"] = status
if email_only: if email_only:
post_body["email_only"] = True post_body["email_only"] = True
payload = {"posts": [post_body]} resource = "pages" if is_page else "posts"
payload = {resource: [post_body]}
url = f"{GHOST_ADMIN_API_URL}/posts/{ghost_id}/" url = f"{GHOST_ADMIN_API_URL}/{resource}/{ghost_id}/"
if newsletter_slug: if newsletter_slug:
url += f"?newsletter={newsletter_slug}" url += f"?newsletter={newsletter_slug}"
if email_segment: if email_segment:
@@ -151,7 +154,7 @@ async def update_post(
async with httpx.AsyncClient(timeout=30) as client: async with httpx.AsyncClient(timeout=30) as client:
resp = await client.put(url, json=payload, headers=_auth_header()) resp = await client.put(url, json=payload, headers=_auth_header())
_check(resp) _check(resp)
return resp.json()["posts"][0] return resp.json()[resource][0]
_SETTINGS_FIELDS = ( _SETTINGS_FIELDS = (
@@ -178,22 +181,24 @@ _SETTINGS_FIELDS = (
async def update_post_settings( async def update_post_settings(
ghost_id: str, ghost_id: str,
updated_at: str, updated_at: str,
is_page: bool = False,
**kwargs, **kwargs,
) -> dict: ) -> dict:
"""Update Ghost post settings (slug, tags, SEO, social, etc.). """Update Ghost post/page settings (slug, tags, SEO, social, etc.).
Only non-None keyword args are included in the PUT payload. Only non-None keyword args are included in the PUT payload.
Accepts any key from ``_SETTINGS_FIELDS``. Accepts any key from ``_SETTINGS_FIELDS``.
""" """
resource = "pages" if is_page else "posts"
post_body: dict = {"updated_at": updated_at} post_body: dict = {"updated_at": updated_at}
for key in _SETTINGS_FIELDS: for key in _SETTINGS_FIELDS:
val = kwargs.get(key) val = kwargs.get(key)
if val is not None: if val is not None:
post_body[key] = val post_body[key] = val
payload = {"posts": [post_body]} payload = {resource: [post_body]}
url = f"{GHOST_ADMIN_API_URL}/posts/{ghost_id}/" url = f"{GHOST_ADMIN_API_URL}/{resource}/{ghost_id}/"
async with httpx.AsyncClient(timeout=30) as client: async with httpx.AsyncClient(timeout=30) as client:
resp = await client.put(url, json=payload, headers=_auth_header()) resp = await client.put(url, json=payload, headers=_auth_header())
_check(resp) _check(resp)
return resp.json()["posts"][0] return resp.json()[resource][0]

View File

@@ -367,7 +367,8 @@ def register():
from ...blog.ghost.ghost_posts import get_post_for_edit from ...blog.ghost.ghost_posts import get_post_for_edit
ghost_id = g.post_data["post"]["ghost_id"] ghost_id = g.post_data["post"]["ghost_id"]
ghost_post = await get_post_for_edit(ghost_id) is_page = bool(g.post_data["post"].get("is_page"))
ghost_post = await get_post_for_edit(ghost_id, is_page=is_page)
save_success = request.args.get("saved") == "1" save_success = request.args.get("saved") == "1"
if not is_htmx_request(): if not is_htmx_request():
@@ -389,10 +390,11 @@ def register():
@require_post_author @require_post_author
async def settings_save(slug: str): async def settings_save(slug: str):
from ...blog.ghost.ghost_posts import update_post_settings from ...blog.ghost.ghost_posts import update_post_settings
from ...blog.ghost.ghost_sync import sync_single_post from ...blog.ghost.ghost_sync import sync_single_post, sync_single_page
from shared.browser.app.redis_cacher import invalidate_tag_cache from shared.browser.app.redis_cacher import invalidate_tag_cache
ghost_id = g.post_data["post"]["ghost_id"] ghost_id = g.post_data["post"]["ghost_id"]
is_page = bool(g.post_data["post"].get("is_page"))
form = await request.form form = await request.form
updated_at = form.get("updated_at", "") updated_at = form.get("updated_at", "")
@@ -436,11 +438,15 @@ def register():
await update_post_settings( await update_post_settings(
ghost_id=ghost_id, ghost_id=ghost_id,
updated_at=updated_at, updated_at=updated_at,
is_page=is_page,
**kwargs, **kwargs,
) )
# Sync to local DB # Sync to local DB
await sync_single_post(g.s, ghost_id) if is_page:
await sync_single_page(g.s, ghost_id)
else:
await sync_single_post(g.s, ghost_id)
await g.s.flush() await g.s.flush()
# Clear caches # Clear caches
@@ -457,7 +463,8 @@ def register():
from sqlalchemy import select as sa_select from sqlalchemy import select as sa_select
ghost_id = g.post_data["post"]["ghost_id"] ghost_id = g.post_data["post"]["ghost_id"]
ghost_post = await get_post_for_edit(ghost_id) is_page = bool(g.post_data["post"].get("is_page"))
ghost_post = await get_post_for_edit(ghost_id, is_page=is_page)
save_success = request.args.get("saved") == "1" save_success = request.args.get("saved") == "1"
newsletters = (await g.s.execute( newsletters = (await g.s.execute(
@@ -487,10 +494,11 @@ def register():
import json import json
from ...blog.ghost.ghost_posts import update_post from ...blog.ghost.ghost_posts import update_post
from ...blog.ghost.lexical_validator import validate_lexical from ...blog.ghost.lexical_validator import validate_lexical
from ...blog.ghost.ghost_sync import sync_single_post from ...blog.ghost.ghost_sync import sync_single_post, sync_single_page
from shared.browser.app.redis_cacher import invalidate_tag_cache from shared.browser.app.redis_cacher import invalidate_tag_cache
ghost_id = g.post_data["post"]["ghost_id"] ghost_id = g.post_data["post"]["ghost_id"]
is_page = bool(g.post_data["post"].get("is_page"))
form = await request.form form = await request.form
title = form.get("title", "").strip() title = form.get("title", "").strip()
lexical_raw = form.get("lexical", "") lexical_raw = form.get("lexical", "")
@@ -507,7 +515,7 @@ def register():
lexical_doc = json.loads(lexical_raw) lexical_doc = json.loads(lexical_raw)
except (json.JSONDecodeError, TypeError): except (json.JSONDecodeError, TypeError):
from ...blog.ghost.ghost_posts import get_post_for_edit from ...blog.ghost.ghost_posts import get_post_for_edit
ghost_post = await get_post_for_edit(ghost_id) ghost_post = await get_post_for_edit(ghost_id, is_page=is_page)
html = await render_template( html = await render_template(
"_types/post_edit/index.html", "_types/post_edit/index.html",
ghost_post=ghost_post, ghost_post=ghost_post,
@@ -518,7 +526,7 @@ def register():
ok, reason = validate_lexical(lexical_doc) ok, reason = validate_lexical(lexical_doc)
if not ok: if not ok:
from ...blog.ghost.ghost_posts import get_post_for_edit from ...blog.ghost.ghost_posts import get_post_for_edit
ghost_post = await get_post_for_edit(ghost_id) ghost_post = await get_post_for_edit(ghost_id, is_page=is_page)
html = await render_template( html = await render_template(
"_types/post_edit/index.html", "_types/post_edit/index.html",
ghost_post=ghost_post, ghost_post=ghost_post,
@@ -535,6 +543,7 @@ def register():
feature_image=feature_image, feature_image=feature_image,
custom_excerpt=custom_excerpt, custom_excerpt=custom_excerpt,
feature_image_caption=feature_image_caption, feature_image_caption=feature_image_caption,
is_page=is_page,
) )
# Publish workflow # Publish workflow
@@ -565,11 +574,15 @@ def register():
title=None, title=None,
updated_at=ghost_post["updated_at"], updated_at=ghost_post["updated_at"],
status=status, status=status,
is_page=is_page,
**email_kwargs, **email_kwargs,
) )
# Sync to local DB # Sync to local DB
await sync_single_post(g.s, ghost_id) if is_page:
await sync_single_page(g.s, ghost_id)
else:
await sync_single_post(g.s, ghost_id)
await g.s.flush() await g.s.flush()
# Handle publish_requested flag on the local post # Handle publish_requested flag on the local post