""" Parse boundary declarations from multiple sources. Three tiers of boundary files: 1. shared/sx/ref/boundary.sx — core SX language I/O contract 2. shared/sx/ref/boundary-app.sx — deployment-specific layout I/O 3. {service}/sx/boundary.sx — per-service page helpers Shared by both bootstrap_py.py and bootstrap_js.py, and used at runtime by the validation module. """ from __future__ import annotations import glob import logging import os from typing import Any logger = logging.getLogger("sx.boundary_parser") # Allow standalone use (from bootstrappers) or in-project imports try: from shared.sx.parser import parse_all from shared.sx.types import Symbol, Keyword, NIL as SX_NIL except ImportError: import sys _HERE = os.path.dirname(os.path.abspath(__file__)) _PROJECT = os.path.abspath(os.path.join(_HERE, "..", "..", "..")) sys.path.insert(0, _PROJECT) from shared.sx.parser import parse_all from shared.sx.types import Symbol, Keyword, NIL as SX_NIL def _ref_dir() -> str: return os.path.dirname(os.path.abspath(__file__)) def _project_root() -> str: """Return the project root containing service directories. Dev: shared/sx/ref -> shared/sx -> shared -> project root Docker: /app/shared/sx/ref -> /app (shared is inside /app) """ ref = _ref_dir() # Go up 3 levels: shared/sx/ref -> project root root = os.path.abspath(os.path.join(ref, "..", "..", "..")) # Verify by checking for a known service directory or shared/ if os.path.isdir(os.path.join(root, "shared")): return root # Docker: /app/shared/sx/ref -> /app # shared is INSIDE /app, not a sibling — go up to parent of shared root = os.path.abspath(os.path.join(ref, "..", "..")) if os.path.isdir(os.path.join(root, "sx")): # /app/sx exists in Docker return root return root def _read_file(filename: str) -> str: filepath = os.path.join(_ref_dir(), filename) with open(filepath, encoding="utf-8") as f: return f.read() def _read_file_path(filepath: str) -> str: with open(filepath, encoding="utf-8") as f: return f.read() def _extract_keyword_arg(expr: list, key: str) -> Any: """Extract :key value from a flat keyword-arg list.""" for i, item in enumerate(expr): if isinstance(item, Keyword) and item.name == key and i + 1 < len(expr): return expr[i + 1] return None def _extract_declarations( source: str, ) -> tuple[set[str], dict[str, set[str]]]: """Extract I/O primitive names and page helper names from boundary source. Returns (io_names, {service: helper_names}). """ exprs = parse_all(source) io_names: set[str] = set() helpers: dict[str, set[str]] = {} for expr in exprs: if not isinstance(expr, list) or not expr: continue head = expr[0] if not isinstance(head, Symbol): continue if head.name == "define-io-primitive": name = expr[1] if isinstance(name, str): io_names.add(name) elif head.name == "define-page-helper": name = expr[1] service = _extract_keyword_arg(expr, "service") if isinstance(name, str) and isinstance(service, str): helpers.setdefault(service, set()).add(name) return io_names, helpers def _find_service_boundary_files() -> list[str]: """Find service boundary.sx files. Dev: {project}/{service}/sx/boundary.sx (e.g. blog/sx/boundary.sx) Docker: /app/sx/boundary.sx (service's sx/ dir copied directly into /app/) """ root = _project_root() files: list[str] = [] # Dev layout: {root}/{service}/sx/boundary.sx for f in glob.glob(os.path.join(root, "*/sx/boundary.sx")): if "/shared/" not in f: files.append(f) # Docker layout: service's sx/ dir is at {root}/sx/boundary.sx docker_path = os.path.join(root, "sx", "boundary.sx") if os.path.exists(docker_path) and docker_path not in files: files.append(docker_path) return files # --------------------------------------------------------------------------- # Public API # --------------------------------------------------------------------------- def parse_primitives_sx() -> frozenset[str]: """Parse primitives.sx and return frozenset of declared pure primitive names.""" by_module = parse_primitives_by_module() all_names: set[str] = set() for names in by_module.values(): all_names.update(names) return frozenset(all_names) def parse_primitives_by_module() -> dict[str, frozenset[str]]: """Parse primitives.sx and return primitives grouped by module.""" source = _read_file("primitives.sx") exprs = parse_all(source) modules: dict[str, set[str]] = {} current_module = "_unscoped" for expr in exprs: if not isinstance(expr, list) or len(expr) < 2: continue if not isinstance(expr[0], Symbol): continue if expr[0].name == "define-module": mod_name = expr[1] if isinstance(mod_name, Keyword): current_module = mod_name.name elif isinstance(mod_name, str): current_module = mod_name elif expr[0].name == "define-primitive": name = expr[1] if isinstance(name, str): modules.setdefault(current_module, set()).add(name) return {mod: frozenset(names) for mod, names in modules.items()} def _parse_param_type(param) -> tuple[str, str | None, bool]: """Parse a single param entry from a :params list. Returns (name, type_or_none, is_rest). A bare symbol like ``x`` → ("x", None, False). A typed form ``(x :as number)`` → ("x", "number", False). The ``&rest`` marker is tracked externally. """ if isinstance(param, Symbol): return (param.name, None, False) if isinstance(param, list) and len(param) == 3: # (name :as type) name_sym, kw, type_val = param if (isinstance(name_sym, Symbol) and isinstance(kw, Keyword) and kw.name == "as"): type_str = type_val.name if isinstance(type_val, Symbol) else str(type_val) return (name_sym.name, type_str, False) return (str(param), None, False) def parse_primitive_param_types() -> dict[str, dict]: """Parse primitives.sx and extract param type info for each primitive. Returns a dict mapping primitive name to param type descriptor:: { "+": {"positional": [], "rest_type": "number"}, "/": {"positional": [("a", "number"), ("b", "number")], "rest_type": None}, "get": {"positional": [("coll", None), ("key", None)], "rest_type": None}, } Each positional entry is (name, type_or_none). rest_type is the type of the &rest parameter (or None if no &rest, or None if untyped &rest). """ source = _read_file("primitives.sx") exprs = parse_all(source) result: dict[str, dict] = {} for expr in exprs: if not isinstance(expr, list) or len(expr) < 2: continue if not isinstance(expr[0], Symbol) or expr[0].name != "define-primitive": continue name = expr[1] if not isinstance(name, str): continue params_list = _extract_keyword_arg(expr, "params") if not isinstance(params_list, list): continue positional: list[tuple[str, str | None]] = [] rest_type: str | None = None i = 0 while i < len(params_list): item = params_list[i] if isinstance(item, Symbol) and item.name == "&rest": # Next item is the rest param if i + 1 < len(params_list): rname, rtype, _ = _parse_param_type(params_list[i + 1]) rest_type = rtype i += 2 else: pname, ptype, _ = _parse_param_type(item) if pname != "&rest": positional.append((pname, ptype)) i += 1 # Only store if at least one param has a type has_types = rest_type is not None or any(t is not None for _, t in positional) if has_types: result[name] = {"positional": positional, "rest_type": rest_type} return result def parse_boundary_sx() -> tuple[frozenset[str], dict[str, frozenset[str]]]: """Parse all boundary sources and return (io_names, {service: helper_names}). Loads three tiers: 1. boundary.sx — core language I/O 2. boundary-app.sx — deployment-specific I/O 3. {service}/sx/boundary.sx — per-service page helpers """ all_io: set[str] = set() all_helpers: dict[str, set[str]] = {} def _merge(source: str, label: str) -> None: io_names, helpers = _extract_declarations(source) all_io.update(io_names) for svc, names in helpers.items(): all_helpers.setdefault(svc, set()).update(names) logger.debug("Boundary %s: %d io, %d helpers", label, len(io_names), sum(len(v) for v in helpers.values())) # 1. Core language contract _merge(_read_file("boundary.sx"), "core") # 2. Deployment-specific I/O app_path = os.path.join(_ref_dir(), "boundary-app.sx") if os.path.exists(app_path): _merge(_read_file("boundary-app.sx"), "app") # 3. Per-service boundary files for filepath in _find_service_boundary_files(): try: _merge(_read_file_path(filepath), filepath) except Exception as e: logger.warning("Failed to parse %s: %s", filepath, e) frozen_helpers = {svc: frozenset(names) for svc, names in all_helpers.items()} return frozenset(all_io), frozen_helpers def parse_boundary_types() -> frozenset[str]: """Parse boundary.sx and return the declared boundary type names.""" source = _read_file("boundary.sx") exprs = parse_all(source) for expr in exprs: if (isinstance(expr, list) and len(expr) >= 2 and isinstance(expr[0], Symbol) and expr[0].name == "define-boundary-types"): type_list = expr[1] if isinstance(type_list, list): return frozenset( item for item in type_list if isinstance(item, str) ) return frozenset()