All checks were successful
Build and Deploy / build-and-deploy (push) Successful in 2m20s
Move Post/Author/Tag/PostAuthor/PostTag/PostUser models from
shared/models/ghost_content.py to blog/models/content.py so blog-domain
models no longer live in the shared layer. Replace the shared
SqlBlogService + BlogService protocol with a blog-local singleton
(blog_service), and switch entry_associations.py from direct DB access
to HTTP fetch_data("blog", "post-by-id") to respect the inter-service
boundary.
Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
158 lines
5.9 KiB
Python
158 lines
5.9 KiB
Python
"""Blog app fragment endpoints.
|
|
|
|
Exposes sx fragments at ``/internal/fragments/<type>`` for consumption
|
|
by other coop apps via the fragment client.
|
|
"""
|
|
|
|
from __future__ import annotations
|
|
|
|
from quart import Blueprint, Response, g, render_template, request
|
|
|
|
from shared.infrastructure.fragments import FRAGMENT_HEADER
|
|
from shared.services.navigation import get_navigation_tree
|
|
|
|
|
|
def register():
|
|
bp = Blueprint("fragments", __name__, url_prefix="/internal/fragments")
|
|
|
|
_handlers: dict[str, object] = {}
|
|
|
|
@bp.before_request
|
|
async def _require_fragment_header():
|
|
if not request.headers.get(FRAGMENT_HEADER):
|
|
return Response("", status=403)
|
|
|
|
@bp.get("/<fragment_type>")
|
|
async def get_fragment(fragment_type: str):
|
|
handler = _handlers.get(fragment_type)
|
|
if handler is None:
|
|
return Response("", status=200, content_type="text/sx")
|
|
result = await handler()
|
|
return Response(result, status=200, content_type="text/sx")
|
|
|
|
# --- nav-tree fragment — returns sx source ---
|
|
async def _nav_tree_handler():
|
|
from shared.sx.helpers import sx_call, SxExpr
|
|
from shared.infrastructure.urls import (
|
|
blog_url, cart_url, market_url, events_url,
|
|
federation_url, account_url, artdag_url,
|
|
)
|
|
|
|
app_name = request.args.get("app_name", "")
|
|
path = request.args.get("path", "/")
|
|
first_seg = path.strip("/").split("/")[0]
|
|
menu_items = list(await get_navigation_tree(g.s))
|
|
|
|
app_slugs = {
|
|
"cart": cart_url("/"),
|
|
"market": market_url("/"),
|
|
"events": events_url("/"),
|
|
"federation": federation_url("/"),
|
|
"account": account_url("/"),
|
|
"artdag": artdag_url("/"),
|
|
}
|
|
|
|
nav_cls = "whitespace-nowrap flex items-center gap-2 rounded p-2 text-sm"
|
|
|
|
item_sxs = []
|
|
for item in menu_items:
|
|
href = app_slugs.get(item.slug, blog_url(f"/{item.slug}/"))
|
|
selected = "true" if (item.slug == first_seg
|
|
or item.slug == app_name) else "false"
|
|
img = sx_call("blog-nav-item-image",
|
|
src=getattr(item, "feature_image", None),
|
|
label=getattr(item, "label", item.slug))
|
|
item_sxs.append(sx_call(
|
|
"blog-nav-item-link",
|
|
href=href, hx_get=href, selected=selected, nav_cls=nav_cls,
|
|
img=SxExpr(img), label=getattr(item, "label", item.slug),
|
|
))
|
|
|
|
# artdag link
|
|
href = artdag_url("/")
|
|
selected = "true" if ("artdag" == first_seg
|
|
or "artdag" == app_name) else "false"
|
|
img = sx_call("blog-nav-item-image", src=None, label="art-dag")
|
|
item_sxs.append(sx_call(
|
|
"blog-nav-item-link",
|
|
href=href, hx_get=href, selected=selected, nav_cls=nav_cls,
|
|
img=SxExpr(img), label="art-dag",
|
|
))
|
|
|
|
if not item_sxs:
|
|
return sx_call("blog-nav-empty",
|
|
wrapper_id="menu-items-nav-wrapper")
|
|
|
|
items_frag = "(<> " + " ".join(item_sxs) + ")"
|
|
|
|
arrow_cls = "scrolling-menu-arrow-menu-items-container"
|
|
container_id = "menu-items-container"
|
|
left_hs = ("on click set #" + container_id
|
|
+ ".scrollLeft to #" + container_id + ".scrollLeft - 200")
|
|
scroll_hs = ("on scroll "
|
|
"set cls to '" + arrow_cls + "' "
|
|
"set arrows to document.getElementsByClassName(cls) "
|
|
"set show to (window.innerWidth >= 640 and "
|
|
"my.scrollWidth > my.clientWidth) "
|
|
"repeat for arrow in arrows "
|
|
"if show remove .hidden from arrow add .flex to arrow "
|
|
"else add .hidden to arrow remove .flex from arrow end "
|
|
"end")
|
|
right_hs = ("on click set #" + container_id
|
|
+ ".scrollLeft to #" + container_id + ".scrollLeft + 200")
|
|
|
|
return sx_call("blog-nav-wrapper",
|
|
arrow_cls=arrow_cls,
|
|
container_id=container_id,
|
|
left_hs=left_hs,
|
|
scroll_hs=scroll_hs,
|
|
right_hs=right_hs,
|
|
items=SxExpr(items_frag))
|
|
|
|
_handlers["nav-tree"] = _nav_tree_handler
|
|
|
|
# --- link-card fragment — returns sx source ---
|
|
def _blog_link_card_sx(post, link: str) -> str:
|
|
from shared.sx.helpers import sx_call
|
|
published = post.published_at.strftime("%d %b %Y") if post.published_at else None
|
|
return sx_call("link-card",
|
|
link=link,
|
|
title=post.title,
|
|
image=post.feature_image,
|
|
icon="fas fa-file-alt",
|
|
subtitle=post.custom_excerpt or post.excerpt,
|
|
detail=published,
|
|
data_app="blog")
|
|
|
|
async def _link_card_handler():
|
|
from services import blog_service
|
|
from shared.infrastructure.urls import blog_url
|
|
|
|
slug = request.args.get("slug", "")
|
|
keys_raw = request.args.get("keys", "")
|
|
|
|
# Batch mode
|
|
if keys_raw:
|
|
slugs = [k.strip() for k in keys_raw.split(",") if k.strip()]
|
|
parts = []
|
|
for s in slugs:
|
|
parts.append(f"<!-- fragment:{s} -->")
|
|
post = await blog_service.get_post_by_slug(g.s, s)
|
|
if post:
|
|
parts.append(_blog_link_card_sx(post, blog_url(f"/{post.slug}")))
|
|
return "\n".join(parts)
|
|
|
|
# Single mode
|
|
if not slug:
|
|
return ""
|
|
post = await blog_service.get_post_by_slug(g.s, slug)
|
|
if not post:
|
|
return ""
|
|
return _blog_link_card_sx(post, blog_url(f"/{post.slug}"))
|
|
|
|
_handlers["link-card"] = _link_card_handler
|
|
|
|
bp._fragment_handlers = _handlers
|
|
|
|
return bp
|