All checks were successful
Build and Deploy / build-and-deploy (push) Successful in 1m5s
Combines shared, blog, market, cart, events, federation, and account into a single repository. Eliminates submodule sync, sibling model copying at build time, and per-app CI orchestration. Changes: - Remove per-app .git, .gitmodules, .gitea, submodule shared/ dirs - Remove stale sibling model copies from each app - Update all 6 Dockerfiles for monorepo build context (root = .) - Add build directives to docker-compose.yml - Add single .gitea/workflows/ci.yml with change detection - Add .dockerignore for monorepo build context - Create __init__.py for federation and account (cross-app imports)
257 lines
8.4 KiB
Python
257 lines
8.4 KiB
Python
"""
|
||
Editor API proxy – image/media/file uploads and oembed.
|
||
|
||
Forwards requests to the Ghost Admin API with JWT auth so the browser
|
||
never needs direct Ghost access.
|
||
"""
|
||
from __future__ import annotations
|
||
|
||
import logging
|
||
import os
|
||
|
||
import httpx
|
||
from quart import Blueprint, request, jsonify, g
|
||
from sqlalchemy import select, or_
|
||
|
||
from shared.browser.app.authz import require_admin, require_login
|
||
from models import Snippet
|
||
from .ghost_admin_token import make_ghost_admin_jwt
|
||
|
||
log = logging.getLogger(__name__)
|
||
|
||
GHOST_ADMIN_API_URL = os.environ["GHOST_ADMIN_API_URL"]
|
||
MAX_IMAGE_SIZE = 10 * 1024 * 1024 # 10 MB
|
||
MAX_MEDIA_SIZE = 100 * 1024 * 1024 # 100 MB
|
||
MAX_FILE_SIZE = 50 * 1024 * 1024 # 50 MB
|
||
|
||
ALLOWED_IMAGE_MIMETYPES = frozenset({
|
||
"image/jpeg", "image/png", "image/gif", "image/webp", "image/svg+xml",
|
||
})
|
||
ALLOWED_MEDIA_MIMETYPES = frozenset({
|
||
"audio/mpeg", "audio/ogg", "audio/wav", "audio/mp4", "audio/aac",
|
||
"video/mp4", "video/webm", "video/ogg",
|
||
})
|
||
|
||
editor_api_bp = Blueprint("editor_api", __name__, url_prefix="/editor-api")
|
||
|
||
|
||
def _auth_header() -> dict[str, str]:
|
||
return {"Authorization": f"Ghost {make_ghost_admin_jwt()}"}
|
||
|
||
|
||
@editor_api_bp.post("/images/upload/")
|
||
@require_admin
|
||
async def upload_image():
|
||
"""Proxy image upload to Ghost Admin API."""
|
||
files = await request.files
|
||
uploaded = files.get("file")
|
||
if not uploaded:
|
||
return jsonify({"errors": [{"message": "No file provided"}]}), 400
|
||
|
||
content = uploaded.read()
|
||
if len(content) > MAX_IMAGE_SIZE:
|
||
return jsonify({"errors": [{"message": "File too large (max 10 MB)"}]}), 413
|
||
|
||
if uploaded.content_type not in ALLOWED_IMAGE_MIMETYPES:
|
||
return jsonify({"errors": [{"message": f"Unsupported file type: {uploaded.content_type}"}]}), 415
|
||
|
||
url = f"{GHOST_ADMIN_API_URL}/images/upload/"
|
||
async with httpx.AsyncClient(timeout=30) as client:
|
||
resp = await client.post(
|
||
url,
|
||
headers=_auth_header(),
|
||
files={"file": (uploaded.filename, content, uploaded.content_type)},
|
||
)
|
||
|
||
if not resp.is_success:
|
||
log.error("Ghost image upload failed %s: %s", resp.status_code, resp.text[:500])
|
||
|
||
return resp.json(), resp.status_code
|
||
|
||
|
||
@editor_api_bp.post("/media/upload/")
|
||
@require_admin
|
||
async def upload_media():
|
||
"""Proxy audio/video upload to Ghost Admin API."""
|
||
files = await request.files
|
||
uploaded = files.get("file")
|
||
if not uploaded:
|
||
return jsonify({"errors": [{"message": "No file provided"}]}), 400
|
||
|
||
content = uploaded.read()
|
||
if len(content) > MAX_MEDIA_SIZE:
|
||
return jsonify({"errors": [{"message": "File too large (max 100 MB)"}]}), 413
|
||
|
||
if uploaded.content_type not in ALLOWED_MEDIA_MIMETYPES:
|
||
return jsonify({"errors": [{"message": f"Unsupported media type: {uploaded.content_type}"}]}), 415
|
||
|
||
ghost_files = {"file": (uploaded.filename, content, uploaded.content_type)}
|
||
|
||
# Optional video thumbnail
|
||
thumbnail = files.get("thumbnail")
|
||
if thumbnail:
|
||
thumb_content = thumbnail.read()
|
||
ghost_files["thumbnail"] = (thumbnail.filename, thumb_content, thumbnail.content_type)
|
||
|
||
url = f"{GHOST_ADMIN_API_URL}/media/upload/"
|
||
async with httpx.AsyncClient(timeout=60) as client:
|
||
resp = await client.post(url, headers=_auth_header(), files=ghost_files)
|
||
|
||
if not resp.is_success:
|
||
log.error("Ghost media upload failed %s: %s", resp.status_code, resp.text[:500])
|
||
|
||
return resp.json(), resp.status_code
|
||
|
||
|
||
@editor_api_bp.post("/files/upload/")
|
||
@require_admin
|
||
async def upload_file():
|
||
"""Proxy file upload to Ghost Admin API."""
|
||
files = await request.files
|
||
uploaded = files.get("file")
|
||
if not uploaded:
|
||
return jsonify({"errors": [{"message": "No file provided"}]}), 400
|
||
|
||
content = uploaded.read()
|
||
if len(content) > MAX_FILE_SIZE:
|
||
return jsonify({"errors": [{"message": "File too large (max 50 MB)"}]}), 413
|
||
|
||
url = f"{GHOST_ADMIN_API_URL}/files/upload/"
|
||
async with httpx.AsyncClient(timeout=60) as client:
|
||
resp = await client.post(
|
||
url,
|
||
headers=_auth_header(),
|
||
files={"file": (uploaded.filename, content, uploaded.content_type)},
|
||
)
|
||
|
||
if not resp.is_success:
|
||
log.error("Ghost file upload failed %s: %s", resp.status_code, resp.text[:500])
|
||
|
||
return resp.json(), resp.status_code
|
||
|
||
|
||
@editor_api_bp.get("/oembed/")
|
||
@require_admin
|
||
async def oembed_proxy():
|
||
"""Proxy oembed lookups to Ghost Admin API."""
|
||
params = dict(request.args)
|
||
if not params.get("url"):
|
||
return jsonify({"errors": [{"message": "url parameter required"}]}), 400
|
||
|
||
url = f"{GHOST_ADMIN_API_URL}/oembed/"
|
||
async with httpx.AsyncClient(timeout=30) as client:
|
||
resp = await client.get(url, headers=_auth_header(), params=params)
|
||
|
||
if not resp.is_success:
|
||
log.error("Ghost oembed failed %s: %s", resp.status_code, resp.text[:500])
|
||
|
||
return resp.json(), resp.status_code
|
||
|
||
|
||
# ── Snippets ────────────────────────────────────────────────────────
|
||
|
||
VALID_VISIBILITY = frozenset({"private", "shared", "admin"})
|
||
|
||
|
||
@editor_api_bp.get("/snippets/")
|
||
@require_login
|
||
async def list_snippets():
|
||
"""Return snippets visible to the current user."""
|
||
uid = g.user.id
|
||
is_admin = g.rights.get("admin")
|
||
|
||
filters = [Snippet.user_id == uid, Snippet.visibility == "shared"]
|
||
if is_admin:
|
||
filters.append(Snippet.visibility == "admin")
|
||
|
||
rows = (await g.s.execute(
|
||
select(Snippet).where(or_(*filters)).order_by(Snippet.name)
|
||
)).scalars().all()
|
||
|
||
return jsonify([
|
||
{"id": s.id, "name": s.name, "value": s.value, "visibility": s.visibility}
|
||
for s in rows
|
||
])
|
||
|
||
|
||
@editor_api_bp.post("/snippets/")
|
||
@require_login
|
||
async def create_snippet():
|
||
"""Create or upsert a snippet by (user_id, name)."""
|
||
data = await request.get_json(force=True)
|
||
name = (data.get("name") or "").strip()
|
||
value = data.get("value")
|
||
visibility = data.get("visibility", "private")
|
||
|
||
if not name or value is None:
|
||
return jsonify({"error": "name and value are required"}), 400
|
||
if visibility not in VALID_VISIBILITY:
|
||
return jsonify({"error": f"visibility must be one of {sorted(VALID_VISIBILITY)}"}), 400
|
||
if visibility != "private" and not g.rights.get("admin"):
|
||
visibility = "private"
|
||
|
||
uid = g.user.id
|
||
|
||
existing = (await g.s.execute(
|
||
select(Snippet).where(Snippet.user_id == uid, Snippet.name == name)
|
||
)).scalar_one_or_none()
|
||
|
||
if existing:
|
||
existing.value = value
|
||
existing.visibility = visibility
|
||
snippet = existing
|
||
else:
|
||
snippet = Snippet(user_id=uid, name=name, value=value, visibility=visibility)
|
||
g.s.add(snippet)
|
||
|
||
await g.s.flush()
|
||
return jsonify({
|
||
"id": snippet.id, "name": snippet.name,
|
||
"value": snippet.value, "visibility": snippet.visibility,
|
||
}), 200 if existing else 201
|
||
|
||
|
||
@editor_api_bp.patch("/snippets/<int:snippet_id>/")
|
||
@require_login
|
||
async def patch_snippet(snippet_id: int):
|
||
"""Update snippet visibility. Only admins may set shared/admin."""
|
||
snippet = await g.s.get(Snippet, snippet_id)
|
||
if not snippet:
|
||
return jsonify({"error": "not found"}), 404
|
||
|
||
is_admin = g.rights.get("admin")
|
||
|
||
if snippet.user_id != g.user.id and not is_admin:
|
||
return jsonify({"error": "forbidden"}), 403
|
||
|
||
data = await request.get_json(force=True)
|
||
visibility = data.get("visibility")
|
||
if visibility is not None:
|
||
if visibility not in VALID_VISIBILITY:
|
||
return jsonify({"error": f"visibility must be one of {sorted(VALID_VISIBILITY)}"}), 400
|
||
if visibility != "private" and not is_admin:
|
||
return jsonify({"error": "only admins may set shared/admin visibility"}), 403
|
||
snippet.visibility = visibility
|
||
|
||
await g.s.flush()
|
||
return jsonify({
|
||
"id": snippet.id, "name": snippet.name,
|
||
"value": snippet.value, "visibility": snippet.visibility,
|
||
})
|
||
|
||
|
||
@editor_api_bp.delete("/snippets/<int:snippet_id>/")
|
||
@require_login
|
||
async def delete_snippet(snippet_id: int):
|
||
"""Delete a snippet. Owners can delete their own; admins can delete any."""
|
||
snippet = await g.s.get(Snippet, snippet_id)
|
||
if not snippet:
|
||
return jsonify({"error": "not found"}), 404
|
||
|
||
if snippet.user_id != g.user.id and not g.rights.get("admin"):
|
||
return jsonify({"error": "forbidden"}), 403
|
||
|
||
await g.s.delete(snippet)
|
||
await g.s.flush()
|
||
return jsonify({"ok": True})
|