""" Registry for defquery / defaction definitions. Mirrors the pattern in ``handlers.py`` but for inter-service data queries and action endpoints. Each service loads its ``.sx`` files at startup, and the registry makes them available for dispatch by the query blueprint. Usage:: from shared.sx.query_registry import load_query_file, get_query load_query_file("events/queries.sx", "events") qdef = get_query("events", "pending-entries") """ from __future__ import annotations import logging import os from typing import Any from .types import QueryDef, ActionDef logger = logging.getLogger("sx.query_registry") # --------------------------------------------------------------------------- # Registry — service → name → QueryDef / ActionDef # --------------------------------------------------------------------------- _QUERY_REGISTRY: dict[str, dict[str, QueryDef]] = {} _ACTION_REGISTRY: dict[str, dict[str, ActionDef]] = {} def register_query(service: str, qdef: QueryDef) -> None: if service not in _QUERY_REGISTRY: _QUERY_REGISTRY[service] = {} _QUERY_REGISTRY[service][qdef.name] = qdef logger.debug("Registered query %s:%s", service, qdef.name) def register_action(service: str, adef: ActionDef) -> None: if service not in _ACTION_REGISTRY: _ACTION_REGISTRY[service] = {} _ACTION_REGISTRY[service][adef.name] = adef logger.debug("Registered action %s:%s", service, adef.name) def get_query(service: str, name: str) -> QueryDef | None: return _QUERY_REGISTRY.get(service, {}).get(name) def get_action(service: str, name: str) -> ActionDef | None: return _ACTION_REGISTRY.get(service, {}).get(name) def get_all_queries(service: str) -> dict[str, QueryDef]: return dict(_QUERY_REGISTRY.get(service, {})) def get_all_actions(service: str) -> dict[str, ActionDef]: return dict(_ACTION_REGISTRY.get(service, {})) def clear(service: str | None = None) -> None: if service is None: _QUERY_REGISTRY.clear() _ACTION_REGISTRY.clear() else: _QUERY_REGISTRY.pop(service, None) _ACTION_REGISTRY.pop(service, None) # --------------------------------------------------------------------------- # Loading — parse .sx files and collect QueryDef / ActionDef instances # --------------------------------------------------------------------------- def load_query_file(filepath: str, service_name: str) -> list[QueryDef]: """Parse an .sx file and register any defquery definitions.""" from .parser import parse_all from .jinja_bridge import get_component_env with open(filepath, encoding="utf-8") as f: source = f.read() # Use the jinja_bridge register_components path which handles # defquery/defaction via the OCaml kernel from .jinja_bridge import register_components register_components(source, _defer_postprocess=True) env = get_component_env() queries: list[QueryDef] = [] for val in env.values(): if isinstance(val, QueryDef): register_query(service_name, val) queries.append(val) return queries def load_action_file(filepath: str, service_name: str) -> list[ActionDef]: """Parse an .sx file and register any defaction definitions.""" from .jinja_bridge import get_component_env, register_components with open(filepath, encoding="utf-8") as f: source = f.read() register_components(source, _defer_postprocess=True) env = get_component_env() actions: list[ActionDef] = [] for val in env.values(): if isinstance(val, ActionDef): register_action(service_name, val) actions.append(val) return actions def load_query_dir(directory: str, service_name: str) -> list[QueryDef]: """Load all .sx files from a directory and register queries.""" import glob as glob_mod queries: list[QueryDef] = [] for filepath in sorted(glob_mod.glob(os.path.join(directory, "*.sx"))): queries.extend(load_query_file(filepath, service_name)) return queries def load_action_dir(directory: str, service_name: str) -> list[ActionDef]: """Load all .sx files from a directory and register actions.""" import glob as glob_mod actions: list[ActionDef] = [] for filepath in sorted(glob_mod.glob(os.path.join(directory, "*.sx"))): actions.extend(load_action_file(filepath, service_name)) return actions def load_service_protocols(service_name: str, base_dir: str) -> None: """Load queries.sx and actions.sx from a service's base directory.""" queries_path = os.path.join(base_dir, "queries.sx") actions_path = os.path.join(base_dir, "actions.sx") if os.path.exists(queries_path): load_query_file(queries_path, service_name) logger.info("Loaded queries for %s from %s", service_name, queries_path) if os.path.exists(actions_path): load_action_file(actions_path, service_name) logger.info("Loaded actions for %s from %s", service_name, actions_path) # --------------------------------------------------------------------------- # Schema — introspection for /internal/schema # --------------------------------------------------------------------------- def schema_for_service(service: str) -> dict[str, Any]: """Return a JSON-serializable schema of all queries and actions.""" queries = [] for qdef in _QUERY_REGISTRY.get(service, {}).values(): queries.append({ "name": qdef.name, "params": list(qdef.params), "doc": qdef.doc, }) actions = [] for adef in _ACTION_REGISTRY.get(service, {}).values(): actions.append({ "name": adef.name, "params": list(adef.params), "doc": adef.doc, }) return { "service": service, "queries": sorted(queries, key=lambda q: q["name"]), "actions": sorted(actions, key=lambda a: a["name"]), }