Files
rose-ash/shared/sx/ref/boundary_parser.py
giles 07a73821e7 Fix boundary parser Docker path: handle /app/sx/boundary.sx layout
In Docker, each service's sx/ dir is copied directly to /app/sx/,
not /app/{service}/sx/. Add fallback search for /app/sx/boundary.sx
alongside the dev glob pattern.

Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
2026-03-06 12:47:50 +00:00

224 lines
7.4 KiB
Python

"""
Parse boundary declarations from multiple sources.
Three tiers of boundary files:
1. shared/sx/ref/boundary.sx — core SX language I/O contract
2. shared/sx/ref/boundary-app.sx — deployment-specific layout I/O
3. {service}/sx/boundary.sx — per-service page helpers
Shared by both bootstrap_py.py and bootstrap_js.py, and used at runtime
by the validation module.
"""
from __future__ import annotations
import glob
import logging
import os
from typing import Any
logger = logging.getLogger("sx.boundary_parser")
# Allow standalone use (from bootstrappers) or in-project imports
try:
from shared.sx.parser import parse_all
from shared.sx.types import Symbol, Keyword, NIL as SX_NIL
except ImportError:
import sys
_HERE = os.path.dirname(os.path.abspath(__file__))
_PROJECT = os.path.abspath(os.path.join(_HERE, "..", "..", ".."))
sys.path.insert(0, _PROJECT)
from shared.sx.parser import parse_all
from shared.sx.types import Symbol, Keyword, NIL as SX_NIL
def _ref_dir() -> str:
return os.path.dirname(os.path.abspath(__file__))
def _project_root() -> str:
"""Return the project root containing service directories.
Dev: shared/sx/ref -> shared/sx -> shared -> project root
Docker: /app/shared/sx/ref -> /app (shared is inside /app)
"""
ref = _ref_dir()
# Go up 3 levels: shared/sx/ref -> project root
root = os.path.abspath(os.path.join(ref, "..", "..", ".."))
# Verify by checking for a known service directory or shared/
if os.path.isdir(os.path.join(root, "shared")):
return root
# Docker: /app/shared/sx/ref -> /app
# shared is INSIDE /app, not a sibling — go up to parent of shared
root = os.path.abspath(os.path.join(ref, "..", ".."))
if os.path.isdir(os.path.join(root, "sx")): # /app/sx exists in Docker
return root
return root
def _read_file(filename: str) -> str:
filepath = os.path.join(_ref_dir(), filename)
with open(filepath, encoding="utf-8") as f:
return f.read()
def _read_file_path(filepath: str) -> str:
with open(filepath, encoding="utf-8") as f:
return f.read()
def _extract_keyword_arg(expr: list, key: str) -> Any:
"""Extract :key value from a flat keyword-arg list."""
for i, item in enumerate(expr):
if isinstance(item, Keyword) and item.name == key and i + 1 < len(expr):
return expr[i + 1]
return None
def _extract_declarations(
source: str,
) -> tuple[set[str], dict[str, set[str]]]:
"""Extract I/O primitive names and page helper names from boundary source.
Returns (io_names, {service: helper_names}).
"""
exprs = parse_all(source)
io_names: set[str] = set()
helpers: dict[str, set[str]] = {}
for expr in exprs:
if not isinstance(expr, list) or not expr:
continue
head = expr[0]
if not isinstance(head, Symbol):
continue
if head.name == "define-io-primitive":
name = expr[1]
if isinstance(name, str):
io_names.add(name)
elif head.name == "define-page-helper":
name = expr[1]
service = _extract_keyword_arg(expr, "service")
if isinstance(name, str) and isinstance(service, str):
helpers.setdefault(service, set()).add(name)
return io_names, helpers
def _find_service_boundary_files() -> list[str]:
"""Find service boundary.sx files.
Dev: {project}/{service}/sx/boundary.sx (e.g. blog/sx/boundary.sx)
Docker: /app/sx/boundary.sx (service's sx/ dir copied directly into /app/)
"""
root = _project_root()
files: list[str] = []
# Dev layout: {root}/{service}/sx/boundary.sx
for f in glob.glob(os.path.join(root, "*/sx/boundary.sx")):
if "/shared/" not in f:
files.append(f)
# Docker layout: service's sx/ dir is at {root}/sx/boundary.sx
docker_path = os.path.join(root, "sx", "boundary.sx")
if os.path.exists(docker_path) and docker_path not in files:
files.append(docker_path)
return files
# ---------------------------------------------------------------------------
# Public API
# ---------------------------------------------------------------------------
def parse_primitives_sx() -> frozenset[str]:
"""Parse primitives.sx and return frozenset of declared pure primitive names."""
by_module = parse_primitives_by_module()
all_names: set[str] = set()
for names in by_module.values():
all_names.update(names)
return frozenset(all_names)
def parse_primitives_by_module() -> dict[str, frozenset[str]]:
"""Parse primitives.sx and return primitives grouped by module."""
source = _read_file("primitives.sx")
exprs = parse_all(source)
modules: dict[str, set[str]] = {}
current_module = "_unscoped"
for expr in exprs:
if not isinstance(expr, list) or len(expr) < 2:
continue
if not isinstance(expr[0], Symbol):
continue
if expr[0].name == "define-module":
mod_name = expr[1]
if isinstance(mod_name, Keyword):
current_module = mod_name.name
elif isinstance(mod_name, str):
current_module = mod_name
elif expr[0].name == "define-primitive":
name = expr[1]
if isinstance(name, str):
modules.setdefault(current_module, set()).add(name)
return {mod: frozenset(names) for mod, names in modules.items()}
def parse_boundary_sx() -> tuple[frozenset[str], dict[str, frozenset[str]]]:
"""Parse all boundary sources and return (io_names, {service: helper_names}).
Loads three tiers:
1. boundary.sx — core language I/O
2. boundary-app.sx — deployment-specific I/O
3. {service}/sx/boundary.sx — per-service page helpers
"""
all_io: set[str] = set()
all_helpers: dict[str, set[str]] = {}
def _merge(source: str, label: str) -> None:
io_names, helpers = _extract_declarations(source)
all_io.update(io_names)
for svc, names in helpers.items():
all_helpers.setdefault(svc, set()).update(names)
logger.debug("Boundary %s: %d io, %d helpers", label, len(io_names), sum(len(v) for v in helpers.values()))
# 1. Core language contract
_merge(_read_file("boundary.sx"), "core")
# 2. Deployment-specific I/O
app_path = os.path.join(_ref_dir(), "boundary-app.sx")
if os.path.exists(app_path):
_merge(_read_file("boundary-app.sx"), "app")
# 3. Per-service boundary files
for filepath in _find_service_boundary_files():
try:
_merge(_read_file_path(filepath), filepath)
except Exception as e:
logger.warning("Failed to parse %s: %s", filepath, e)
frozen_helpers = {svc: frozenset(names) for svc, names in all_helpers.items()}
return frozenset(all_io), frozen_helpers
def parse_boundary_types() -> frozenset[str]:
"""Parse boundary.sx and return the declared boundary type names."""
source = _read_file("boundary.sx")
exprs = parse_all(source)
for expr in exprs:
if (isinstance(expr, list) and len(expr) >= 2
and isinstance(expr[0], Symbol)
and expr[0].name == "define-boundary-types"):
type_list = expr[1]
if isinstance(type_list, list):
return frozenset(
item for item in type_list
if isinstance(item, str)
)
return frozenset()