Files
rose-ash/shared/infrastructure/query_blueprint.py
giles 1f36987f77
All checks were successful
Build and Deploy / build-and-deploy (push) Successful in 2m5s
Replace inter-service _handlers dicts with declarative sx defquery/defaction
The inter-service data layer (fetch_data/call_action) was the least
structured part of the codebase — Python _handlers dicts with ad-hoc
param extraction scattered across 16 route files. This replaces them
with declarative .sx query/action definitions that make the entire
inter-service protocol self-describing and greppable.

Infrastructure:
- defquery/defaction special forms in the sx evaluator
- Query/action registry with load, lookup, and schema introspection
- Query executor using async_eval with I/O primitives
- Blueprint factories (create_data_blueprint/create_action_blueprint)
  with sx-first dispatch and Python fallback
- /internal/schema endpoint on every service
- parse-datetime and split-ids primitives for type coercion

Service extractions:
- LikesService (toggle, is_liked, liked_slugs, liked_ids)
- PageConfigService (ensure, get_by_container, get_by_id, get_batch, update)
- RelationsService (wraps module-level functions)
- AccountDataService (user_by_email, newsletters)
- CartItemsService, MarketDataService (raw SQLAlchemy lookups)

50 of 54 handlers converted to sx, 4 Python fallbacks remain
(ghost-sync/push-member, clear-cart-for-order, create-order).
Net: -1,383 lines Python, +251 lines modified.

Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
2026-03-04 08:13:50 +00:00

128 lines
4.4 KiB
Python

"""
Blueprint factories for sx-dispatched data and action routes.
Replaces per-service boilerplate in ``bp/data/routes.py`` and
``bp/actions/routes.py`` by dispatching to defquery/defaction definitions
from the sx query registry. Falls back to Python ``_handlers`` dicts
for queries/actions not yet converted.
Usage::
from shared.infrastructure.query_blueprint import (
create_data_blueprint, create_action_blueprint,
)
# In service's bp/data/routes.py:
def register() -> Blueprint:
bp, _handlers = create_data_blueprint("events")
# Optional Python fallback handlers:
# _handlers["some-query"] = _some_python_handler
return bp
"""
from __future__ import annotations
import logging
from typing import Any, Callable, Awaitable
from quart import Blueprint, g, jsonify, request
logger = logging.getLogger("sx.query_blueprint")
def create_data_blueprint(
service_name: str,
) -> tuple[Blueprint, dict[str, Callable[[], Awaitable[Any]]]]:
"""Create a data blueprint that dispatches to sx queries with Python fallback.
Returns (blueprint, python_handlers_dict) so the caller can register
Python fallback handlers for queries not yet converted to sx.
"""
from shared.infrastructure.data_client import DATA_HEADER
bp = Blueprint("data", __name__, url_prefix="/internal/data")
_handlers: dict[str, Callable[[], Awaitable[Any]]] = {}
@bp.before_request
async def _require_data_header():
if not request.headers.get(DATA_HEADER):
return jsonify({"error": "forbidden"}), 403
from shared.infrastructure.internal_auth import validate_internal_request
if not validate_internal_request():
return jsonify({"error": "forbidden"}), 403
@bp.get("/<query_name>")
async def handle_query(query_name: str):
# 1. Check sx query registry first
from shared.sx.query_registry import get_query
from shared.sx.query_executor import execute_query
qdef = get_query(service_name, query_name)
if qdef is not None:
result = await execute_query(qdef, dict(request.args))
return jsonify(result)
# 2. Fall back to Python handlers
handler = _handlers.get(query_name)
if handler is not None:
result = await handler()
return jsonify(result)
return jsonify({"error": "unknown query"}), 404
return bp, _handlers
def create_action_blueprint(
service_name: str,
) -> tuple[Blueprint, dict[str, Callable[[], Awaitable[Any]]]]:
"""Create an action blueprint that dispatches to sx actions with Python fallback.
Returns (blueprint, python_handlers_dict) so the caller can register
Python fallback handlers for actions not yet converted to sx.
"""
from shared.infrastructure.actions import ACTION_HEADER
bp = Blueprint("actions", __name__, url_prefix="/internal/actions")
_handlers: dict[str, Callable[[], Awaitable[Any]]] = {}
@bp.before_request
async def _require_action_header():
if not request.headers.get(ACTION_HEADER):
return jsonify({"error": "forbidden"}), 403
from shared.infrastructure.internal_auth import validate_internal_request
if not validate_internal_request():
return jsonify({"error": "forbidden"}), 403
@bp.post("/<action_name>")
async def handle_action(action_name: str):
# 1. Check sx action registry first
from shared.sx.query_registry import get_action
from shared.sx.query_executor import execute_action
adef = get_action(service_name, action_name)
if adef is not None:
try:
payload = await request.get_json(force=True) or {}
result = await execute_action(adef, payload)
return jsonify(result)
except Exception as exc:
logger.exception("SX action %s:%s failed", service_name, action_name)
return jsonify({"error": str(exc)}), 500
# 2. Fall back to Python handlers
handler = _handlers.get(action_name)
if handler is not None:
try:
result = await handler()
return jsonify(result)
except Exception as exc:
logger.exception("Action %s failed", action_name)
return jsonify({"error": str(exc)}), 500
return jsonify({"error": "unknown action"}), 404
return bp, _handlers