This repository has been archived on 2026-02-24. You can view files and clone it. You cannot open issues or pull requests or push a commit.
Files
blog/bp/blog/admin/routes.py
giles a01016d8d5 feat: decouple blog from shared_lib, add app-owned models
Phase 1-3 of decoupling:
- path_setup.py adds project root to sys.path
- Blog-owned models in blog/models/ (ghost_content, snippet, tag_group)
- Re-export shims for shared models (user, kv, magic_link, menu_item)
- All imports updated: shared.infrastructure, shared.db, shared.browser, etc.
- No more cross-app post_id FKs in calendar/market/page_config

Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
2026-02-11 12:46:31 +00:00

174 lines
5.2 KiB
Python

from __future__ import annotations
import re
from quart import (
render_template,
make_response,
Blueprint,
redirect,
url_for,
request,
g,
)
from sqlalchemy import select, delete
from shared.browser.app.authz import require_admin
from shared.browser.app.utils.htmx import is_htmx_request
from shared.browser.app.redis_cacher import invalidate_tag_cache
from models.tag_group import TagGroup, TagGroupTag
from models.ghost_content import Tag
def _slugify(name: str) -> str:
s = name.strip().lower()
s = re.sub(r"[^\w\s-]", "", s)
s = re.sub(r"[\s_]+", "-", s)
return s.strip("-")
async def _unassigned_tags(session):
"""Return public, non-deleted tags not assigned to any group."""
assigned_sq = select(TagGroupTag.tag_id).subquery()
q = (
select(Tag)
.where(
Tag.deleted_at.is_(None),
(Tag.visibility == "public") | (Tag.visibility.is_(None)),
Tag.id.notin_(select(assigned_sq)),
)
.order_by(Tag.name)
)
return list((await session.execute(q)).scalars())
def register():
bp = Blueprint("tag_groups_admin", __name__, url_prefix="/settings/tag-groups")
@bp.get("/")
@require_admin
async def index():
groups = list(
(await g.s.execute(
select(TagGroup).order_by(TagGroup.sort_order, TagGroup.name)
)).scalars()
)
unassigned = await _unassigned_tags(g.s)
ctx = {"groups": groups, "unassigned_tags": unassigned}
if not is_htmx_request():
return await render_template("_types/blog/admin/tag_groups/index.html", **ctx)
else:
return await render_template("_types/blog/admin/tag_groups/_oob_elements.html", **ctx)
@bp.post("/")
@require_admin
async def create():
form = await request.form
name = (form.get("name") or "").strip()
if not name:
return redirect(url_for("blog.tag_groups_admin.index"))
slug = _slugify(name)
feature_image = (form.get("feature_image") or "").strip() or None
colour = (form.get("colour") or "").strip() or None
sort_order = int(form.get("sort_order") or 0)
tg = TagGroup(
name=name, slug=slug,
feature_image=feature_image, colour=colour,
sort_order=sort_order,
)
g.s.add(tg)
await g.s.flush()
await invalidate_tag_cache("blog")
return redirect(url_for("blog.tag_groups_admin.index"))
@bp.get("/<int:id>/")
@require_admin
async def edit(id: int):
tg = await g.s.get(TagGroup, id)
if not tg:
return redirect(url_for("blog.tag_groups_admin.index"))
# Assigned tag IDs for this group
assigned_rows = list(
(await g.s.execute(
select(TagGroupTag.tag_id).where(TagGroupTag.tag_group_id == id)
)).scalars()
)
assigned_tag_ids = set(assigned_rows)
# All public, non-deleted tags
all_tags = list(
(await g.s.execute(
select(Tag).where(
Tag.deleted_at.is_(None),
(Tag.visibility == "public") | (Tag.visibility.is_(None)),
).order_by(Tag.name)
)).scalars()
)
ctx = {
"group": tg,
"all_tags": all_tags,
"assigned_tag_ids": assigned_tag_ids,
}
if not is_htmx_request():
return await render_template("_types/blog/admin/tag_groups/edit.html", **ctx)
else:
return await render_template("_types/blog/admin/tag_groups/_edit_oob.html", **ctx)
@bp.post("/<int:id>/")
@require_admin
async def save(id: int):
tg = await g.s.get(TagGroup, id)
if not tg:
return redirect(url_for("blog.tag_groups_admin.index"))
form = await request.form
name = (form.get("name") or "").strip()
if name:
tg.name = name
tg.slug = _slugify(name)
tg.feature_image = (form.get("feature_image") or "").strip() or None
tg.colour = (form.get("colour") or "").strip() or None
tg.sort_order = int(form.get("sort_order") or 0)
# Update tag assignments
selected_tag_ids = set()
for val in form.getlist("tag_ids"):
try:
selected_tag_ids.add(int(val))
except (ValueError, TypeError):
pass
# Remove old assignments
await g.s.execute(
delete(TagGroupTag).where(TagGroupTag.tag_group_id == id)
)
await g.s.flush()
# Add new assignments
for tid in selected_tag_ids:
g.s.add(TagGroupTag(tag_group_id=id, tag_id=tid))
await g.s.flush()
await invalidate_tag_cache("blog")
return redirect(url_for("blog.tag_groups_admin.edit", id=id))
@bp.post("/<int:id>/delete/")
@require_admin
async def delete_group(id: int):
tg = await g.s.get(TagGroup, id)
if tg:
await g.s.delete(tg)
await g.s.flush()
await invalidate_tag_cache("blog")
return redirect(url_for("blog.tag_groups_admin.index"))
return bp