Split boundary.sx: separate language contract from app-specific declarations
boundary.sx was mixing three concerns in one file:
- Core SX I/O primitives (the language contract)
- Deployment-specific layout I/O (app architecture)
- Per-service page helpers (fully app-specific)
Now split into three tiers:
1. shared/sx/ref/boundary.sx — core I/O only (frag, query, current-user, etc.)
2. shared/sx/ref/boundary-app.sx — deployment layout contexts (*-header-ctx, *-ctx)
3. {service}/sx/boundary.sx — per-service page helpers
The boundary parser loads all three tiers automatically. Validation error
messages now point to the correct file for each tier.
Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
This commit is contained in:
@@ -1,14 +1,23 @@
|
||||
"""
|
||||
Parse boundary.sx and primitives.sx to extract declared names.
|
||||
Parse boundary declarations from multiple sources.
|
||||
|
||||
Three tiers of boundary files:
|
||||
1. shared/sx/ref/boundary.sx — core SX language I/O contract
|
||||
2. shared/sx/ref/boundary-app.sx — deployment-specific layout I/O
|
||||
3. {service}/sx/boundary.sx — per-service page helpers
|
||||
|
||||
Shared by both bootstrap_py.py and bootstrap_js.py, and used at runtime
|
||||
by the validation module.
|
||||
"""
|
||||
from __future__ import annotations
|
||||
|
||||
import glob
|
||||
import logging
|
||||
import os
|
||||
from typing import Any
|
||||
|
||||
logger = logging.getLogger("sx.boundary_parser")
|
||||
|
||||
# Allow standalone use (from bootstrappers) or in-project imports
|
||||
try:
|
||||
from shared.sx.parser import parse_all
|
||||
@@ -26,12 +35,28 @@ def _ref_dir() -> str:
|
||||
return os.path.dirname(os.path.abspath(__file__))
|
||||
|
||||
|
||||
def _project_root() -> str:
|
||||
"""Return the project root (3 levels up from shared/sx/ref/)."""
|
||||
ref = _ref_dir()
|
||||
# shared/sx/ref -> shared/sx -> shared -> project root
|
||||
root = os.path.abspath(os.path.join(ref, "..", "..", ".."))
|
||||
# In Docker the layout is /app/shared/sx/ref -> /app
|
||||
if not os.path.isdir(root):
|
||||
root = os.path.abspath(os.path.join(ref, "..", ".."))
|
||||
return root
|
||||
|
||||
|
||||
def _read_file(filename: str) -> str:
|
||||
filepath = os.path.join(_ref_dir(), filename)
|
||||
with open(filepath, encoding="utf-8") as f:
|
||||
return f.read()
|
||||
|
||||
|
||||
def _read_file_path(filepath: str) -> str:
|
||||
with open(filepath, encoding="utf-8") as f:
|
||||
return f.read()
|
||||
|
||||
|
||||
def _extract_keyword_arg(expr: list, key: str) -> Any:
|
||||
"""Extract :key value from a flat keyword-arg list."""
|
||||
for i, item in enumerate(expr):
|
||||
@@ -40,6 +65,51 @@ def _extract_keyword_arg(expr: list, key: str) -> Any:
|
||||
return None
|
||||
|
||||
|
||||
def _extract_declarations(
|
||||
source: str,
|
||||
) -> tuple[set[str], dict[str, set[str]]]:
|
||||
"""Extract I/O primitive names and page helper names from boundary source.
|
||||
|
||||
Returns (io_names, {service: helper_names}).
|
||||
"""
|
||||
exprs = parse_all(source)
|
||||
io_names: set[str] = set()
|
||||
helpers: dict[str, set[str]] = {}
|
||||
|
||||
for expr in exprs:
|
||||
if not isinstance(expr, list) or not expr:
|
||||
continue
|
||||
head = expr[0]
|
||||
if not isinstance(head, Symbol):
|
||||
continue
|
||||
|
||||
if head.name == "define-io-primitive":
|
||||
name = expr[1]
|
||||
if isinstance(name, str):
|
||||
io_names.add(name)
|
||||
|
||||
elif head.name == "define-page-helper":
|
||||
name = expr[1]
|
||||
service = _extract_keyword_arg(expr, "service")
|
||||
if isinstance(name, str) and isinstance(service, str):
|
||||
helpers.setdefault(service, set()).add(name)
|
||||
|
||||
return io_names, helpers
|
||||
|
||||
|
||||
def _find_service_boundary_files() -> list[str]:
|
||||
"""Find all {service}/sx/boundary.sx files in the project."""
|
||||
root = _project_root()
|
||||
pattern = os.path.join(root, "*/sx/boundary.sx")
|
||||
files = glob.glob(pattern)
|
||||
# Exclude shared/sx/ref/ — that's the core boundary
|
||||
return [f for f in files if "/shared/" not in f]
|
||||
|
||||
|
||||
# ---------------------------------------------------------------------------
|
||||
# Public API
|
||||
# ---------------------------------------------------------------------------
|
||||
|
||||
def parse_primitives_sx() -> frozenset[str]:
|
||||
"""Parse primitives.sx and return frozenset of declared pure primitive names."""
|
||||
by_module = parse_primitives_by_module()
|
||||
@@ -50,12 +120,7 @@ def parse_primitives_sx() -> frozenset[str]:
|
||||
|
||||
|
||||
def parse_primitives_by_module() -> dict[str, frozenset[str]]:
|
||||
"""Parse primitives.sx and return primitives grouped by module.
|
||||
|
||||
Returns:
|
||||
Dict mapping module name (e.g. "core.arithmetic") to frozenset of
|
||||
primitive names declared under that module.
|
||||
"""
|
||||
"""Parse primitives.sx and return primitives grouped by module."""
|
||||
source = _read_file("primitives.sx")
|
||||
exprs = parse_all(source)
|
||||
modules: dict[str, set[str]] = {}
|
||||
@@ -83,37 +148,40 @@ def parse_primitives_by_module() -> dict[str, frozenset[str]]:
|
||||
|
||||
|
||||
def parse_boundary_sx() -> tuple[frozenset[str], dict[str, frozenset[str]]]:
|
||||
"""Parse boundary.sx and return (io_names, {service: helper_names}).
|
||||
"""Parse all boundary sources and return (io_names, {service: helper_names}).
|
||||
|
||||
Returns:
|
||||
io_names: frozenset of declared I/O primitive names
|
||||
helpers: dict mapping service name to frozenset of helper names
|
||||
Loads three tiers:
|
||||
1. boundary.sx — core language I/O
|
||||
2. boundary-app.sx — deployment-specific I/O
|
||||
3. {service}/sx/boundary.sx — per-service page helpers
|
||||
"""
|
||||
source = _read_file("boundary.sx")
|
||||
exprs = parse_all(source)
|
||||
io_names: set[str] = set()
|
||||
helpers: dict[str, set[str]] = {}
|
||||
all_io: set[str] = set()
|
||||
all_helpers: dict[str, set[str]] = {}
|
||||
|
||||
for expr in exprs:
|
||||
if not isinstance(expr, list) or not expr:
|
||||
continue
|
||||
head = expr[0]
|
||||
if not isinstance(head, Symbol):
|
||||
continue
|
||||
def _merge(source: str, label: str) -> None:
|
||||
io_names, helpers = _extract_declarations(source)
|
||||
all_io.update(io_names)
|
||||
for svc, names in helpers.items():
|
||||
all_helpers.setdefault(svc, set()).update(names)
|
||||
logger.debug("Boundary %s: %d io, %d helpers", label, len(io_names), sum(len(v) for v in helpers.values()))
|
||||
|
||||
if head.name == "define-io-primitive":
|
||||
name = expr[1]
|
||||
if isinstance(name, str):
|
||||
io_names.add(name)
|
||||
# 1. Core language contract
|
||||
_merge(_read_file("boundary.sx"), "core")
|
||||
|
||||
elif head.name == "define-page-helper":
|
||||
name = expr[1]
|
||||
service = _extract_keyword_arg(expr, "service")
|
||||
if isinstance(name, str) and isinstance(service, str):
|
||||
helpers.setdefault(service, set()).add(name)
|
||||
# 2. Deployment-specific I/O
|
||||
app_path = os.path.join(_ref_dir(), "boundary-app.sx")
|
||||
if os.path.exists(app_path):
|
||||
_merge(_read_file("boundary-app.sx"), "app")
|
||||
|
||||
frozen_helpers = {svc: frozenset(names) for svc, names in helpers.items()}
|
||||
return frozenset(io_names), frozen_helpers
|
||||
# 3. Per-service boundary files
|
||||
for filepath in _find_service_boundary_files():
|
||||
try:
|
||||
_merge(_read_file_path(filepath), filepath)
|
||||
except Exception as e:
|
||||
logger.warning("Failed to parse %s: %s", filepath, e)
|
||||
|
||||
frozen_helpers = {svc: frozenset(names) for svc, names in all_helpers.items()}
|
||||
return frozenset(all_io), frozen_helpers
|
||||
|
||||
|
||||
def parse_boundary_types() -> frozenset[str]:
|
||||
@@ -126,7 +194,6 @@ def parse_boundary_types() -> frozenset[str]:
|
||||
and expr[0].name == "define-boundary-types"):
|
||||
type_list = expr[1]
|
||||
if isinstance(type_list, list):
|
||||
# (list "number" "string" ...)
|
||||
return frozenset(
|
||||
item for item in type_list
|
||||
if isinstance(item, str)
|
||||
|
||||
Reference in New Issue
Block a user