Annotate all primitives in primitives.sx with (:as type) param types where meaningful (67/80 — 13 polymorphic ops stay untyped). Add parse_primitive_param_types() to boundary_parser.py for extraction. Implement check-primitive-call in types.sx with full positional + rest param validation, thread prim-param-types through check-body-walk, check-component, and check-all. 10 new tests (438 total, all pass). Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
301 lines
10 KiB
Python
301 lines
10 KiB
Python
"""
|
|
Parse boundary declarations from multiple sources.
|
|
|
|
Three tiers of boundary files:
|
|
1. shared/sx/ref/boundary.sx — core SX language I/O contract
|
|
2. shared/sx/ref/boundary-app.sx — deployment-specific layout I/O
|
|
3. {service}/sx/boundary.sx — per-service page helpers
|
|
|
|
Shared by both bootstrap_py.py and bootstrap_js.py, and used at runtime
|
|
by the validation module.
|
|
"""
|
|
from __future__ import annotations
|
|
|
|
import glob
|
|
import logging
|
|
import os
|
|
from typing import Any
|
|
|
|
logger = logging.getLogger("sx.boundary_parser")
|
|
|
|
# Allow standalone use (from bootstrappers) or in-project imports
|
|
try:
|
|
from shared.sx.parser import parse_all
|
|
from shared.sx.types import Symbol, Keyword, NIL as SX_NIL
|
|
except ImportError:
|
|
import sys
|
|
_HERE = os.path.dirname(os.path.abspath(__file__))
|
|
_PROJECT = os.path.abspath(os.path.join(_HERE, "..", "..", ".."))
|
|
sys.path.insert(0, _PROJECT)
|
|
from shared.sx.parser import parse_all
|
|
from shared.sx.types import Symbol, Keyword, NIL as SX_NIL
|
|
|
|
|
|
def _ref_dir() -> str:
|
|
return os.path.dirname(os.path.abspath(__file__))
|
|
|
|
|
|
def _project_root() -> str:
|
|
"""Return the project root containing service directories.
|
|
|
|
Dev: shared/sx/ref -> shared/sx -> shared -> project root
|
|
Docker: /app/shared/sx/ref -> /app (shared is inside /app)
|
|
"""
|
|
ref = _ref_dir()
|
|
# Go up 3 levels: shared/sx/ref -> project root
|
|
root = os.path.abspath(os.path.join(ref, "..", "..", ".."))
|
|
# Verify by checking for a known service directory or shared/
|
|
if os.path.isdir(os.path.join(root, "shared")):
|
|
return root
|
|
# Docker: /app/shared/sx/ref -> /app
|
|
# shared is INSIDE /app, not a sibling — go up to parent of shared
|
|
root = os.path.abspath(os.path.join(ref, "..", ".."))
|
|
if os.path.isdir(os.path.join(root, "sx")): # /app/sx exists in Docker
|
|
return root
|
|
return root
|
|
|
|
|
|
def _read_file(filename: str) -> str:
|
|
filepath = os.path.join(_ref_dir(), filename)
|
|
with open(filepath, encoding="utf-8") as f:
|
|
return f.read()
|
|
|
|
|
|
def _read_file_path(filepath: str) -> str:
|
|
with open(filepath, encoding="utf-8") as f:
|
|
return f.read()
|
|
|
|
|
|
def _extract_keyword_arg(expr: list, key: str) -> Any:
|
|
"""Extract :key value from a flat keyword-arg list."""
|
|
for i, item in enumerate(expr):
|
|
if isinstance(item, Keyword) and item.name == key and i + 1 < len(expr):
|
|
return expr[i + 1]
|
|
return None
|
|
|
|
|
|
def _extract_declarations(
|
|
source: str,
|
|
) -> tuple[set[str], dict[str, set[str]]]:
|
|
"""Extract I/O primitive names and page helper names from boundary source.
|
|
|
|
Returns (io_names, {service: helper_names}).
|
|
"""
|
|
exprs = parse_all(source)
|
|
io_names: set[str] = set()
|
|
helpers: dict[str, set[str]] = {}
|
|
|
|
for expr in exprs:
|
|
if not isinstance(expr, list) or not expr:
|
|
continue
|
|
head = expr[0]
|
|
if not isinstance(head, Symbol):
|
|
continue
|
|
|
|
if head.name == "define-io-primitive":
|
|
name = expr[1]
|
|
if isinstance(name, str):
|
|
io_names.add(name)
|
|
|
|
elif head.name == "define-page-helper":
|
|
name = expr[1]
|
|
service = _extract_keyword_arg(expr, "service")
|
|
if isinstance(name, str) and isinstance(service, str):
|
|
helpers.setdefault(service, set()).add(name)
|
|
|
|
return io_names, helpers
|
|
|
|
|
|
def _find_service_boundary_files() -> list[str]:
|
|
"""Find service boundary.sx files.
|
|
|
|
Dev: {project}/{service}/sx/boundary.sx (e.g. blog/sx/boundary.sx)
|
|
Docker: /app/sx/boundary.sx (service's sx/ dir copied directly into /app/)
|
|
"""
|
|
root = _project_root()
|
|
files: list[str] = []
|
|
|
|
# Dev layout: {root}/{service}/sx/boundary.sx
|
|
for f in glob.glob(os.path.join(root, "*/sx/boundary.sx")):
|
|
if "/shared/" not in f:
|
|
files.append(f)
|
|
|
|
# Docker layout: service's sx/ dir is at {root}/sx/boundary.sx
|
|
docker_path = os.path.join(root, "sx", "boundary.sx")
|
|
if os.path.exists(docker_path) and docker_path not in files:
|
|
files.append(docker_path)
|
|
|
|
return files
|
|
|
|
|
|
# ---------------------------------------------------------------------------
|
|
# Public API
|
|
# ---------------------------------------------------------------------------
|
|
|
|
def parse_primitives_sx() -> frozenset[str]:
|
|
"""Parse primitives.sx and return frozenset of declared pure primitive names."""
|
|
by_module = parse_primitives_by_module()
|
|
all_names: set[str] = set()
|
|
for names in by_module.values():
|
|
all_names.update(names)
|
|
return frozenset(all_names)
|
|
|
|
|
|
def parse_primitives_by_module() -> dict[str, frozenset[str]]:
|
|
"""Parse primitives.sx and return primitives grouped by module."""
|
|
source = _read_file("primitives.sx")
|
|
exprs = parse_all(source)
|
|
modules: dict[str, set[str]] = {}
|
|
current_module = "_unscoped"
|
|
|
|
for expr in exprs:
|
|
if not isinstance(expr, list) or len(expr) < 2:
|
|
continue
|
|
if not isinstance(expr[0], Symbol):
|
|
continue
|
|
|
|
if expr[0].name == "define-module":
|
|
mod_name = expr[1]
|
|
if isinstance(mod_name, Keyword):
|
|
current_module = mod_name.name
|
|
elif isinstance(mod_name, str):
|
|
current_module = mod_name
|
|
|
|
elif expr[0].name == "define-primitive":
|
|
name = expr[1]
|
|
if isinstance(name, str):
|
|
modules.setdefault(current_module, set()).add(name)
|
|
|
|
return {mod: frozenset(names) for mod, names in modules.items()}
|
|
|
|
|
|
def _parse_param_type(param) -> tuple[str, str | None, bool]:
|
|
"""Parse a single param entry from a :params list.
|
|
|
|
Returns (name, type_or_none, is_rest).
|
|
A bare symbol like ``x`` → ("x", None, False).
|
|
A typed form ``(x :as number)`` → ("x", "number", False).
|
|
The ``&rest`` marker is tracked externally.
|
|
"""
|
|
if isinstance(param, Symbol):
|
|
return (param.name, None, False)
|
|
if isinstance(param, list) and len(param) == 3:
|
|
# (name :as type)
|
|
name_sym, kw, type_val = param
|
|
if (isinstance(name_sym, Symbol)
|
|
and isinstance(kw, Keyword) and kw.name == "as"):
|
|
type_str = type_val.name if isinstance(type_val, Symbol) else str(type_val)
|
|
return (name_sym.name, type_str, False)
|
|
return (str(param), None, False)
|
|
|
|
|
|
def parse_primitive_param_types() -> dict[str, dict]:
|
|
"""Parse primitives.sx and extract param type info for each primitive.
|
|
|
|
Returns a dict mapping primitive name to param type descriptor::
|
|
|
|
{
|
|
"+": {"positional": [], "rest_type": "number"},
|
|
"/": {"positional": [("a", "number"), ("b", "number")], "rest_type": None},
|
|
"get": {"positional": [("coll", None), ("key", None)], "rest_type": None},
|
|
}
|
|
|
|
Each positional entry is (name, type_or_none). rest_type is the
|
|
type of the &rest parameter (or None if no &rest, or None if untyped &rest).
|
|
"""
|
|
source = _read_file("primitives.sx")
|
|
exprs = parse_all(source)
|
|
result: dict[str, dict] = {}
|
|
|
|
for expr in exprs:
|
|
if not isinstance(expr, list) or len(expr) < 2:
|
|
continue
|
|
if not isinstance(expr[0], Symbol) or expr[0].name != "define-primitive":
|
|
continue
|
|
|
|
name = expr[1]
|
|
if not isinstance(name, str):
|
|
continue
|
|
|
|
params_list = _extract_keyword_arg(expr, "params")
|
|
if not isinstance(params_list, list):
|
|
continue
|
|
|
|
positional: list[tuple[str, str | None]] = []
|
|
rest_type: str | None = None
|
|
i = 0
|
|
while i < len(params_list):
|
|
item = params_list[i]
|
|
if isinstance(item, Symbol) and item.name == "&rest":
|
|
# Next item is the rest param
|
|
if i + 1 < len(params_list):
|
|
rname, rtype, _ = _parse_param_type(params_list[i + 1])
|
|
rest_type = rtype
|
|
i += 2
|
|
else:
|
|
pname, ptype, _ = _parse_param_type(item)
|
|
if pname != "&rest":
|
|
positional.append((pname, ptype))
|
|
i += 1
|
|
|
|
# Only store if at least one param has a type
|
|
has_types = rest_type is not None or any(t is not None for _, t in positional)
|
|
if has_types:
|
|
result[name] = {"positional": positional, "rest_type": rest_type}
|
|
|
|
return result
|
|
|
|
|
|
def parse_boundary_sx() -> tuple[frozenset[str], dict[str, frozenset[str]]]:
|
|
"""Parse all boundary sources and return (io_names, {service: helper_names}).
|
|
|
|
Loads three tiers:
|
|
1. boundary.sx — core language I/O
|
|
2. boundary-app.sx — deployment-specific I/O
|
|
3. {service}/sx/boundary.sx — per-service page helpers
|
|
"""
|
|
all_io: set[str] = set()
|
|
all_helpers: dict[str, set[str]] = {}
|
|
|
|
def _merge(source: str, label: str) -> None:
|
|
io_names, helpers = _extract_declarations(source)
|
|
all_io.update(io_names)
|
|
for svc, names in helpers.items():
|
|
all_helpers.setdefault(svc, set()).update(names)
|
|
logger.debug("Boundary %s: %d io, %d helpers", label, len(io_names), sum(len(v) for v in helpers.values()))
|
|
|
|
# 1. Core language contract
|
|
_merge(_read_file("boundary.sx"), "core")
|
|
|
|
# 2. Deployment-specific I/O
|
|
app_path = os.path.join(_ref_dir(), "boundary-app.sx")
|
|
if os.path.exists(app_path):
|
|
_merge(_read_file("boundary-app.sx"), "app")
|
|
|
|
# 3. Per-service boundary files
|
|
for filepath in _find_service_boundary_files():
|
|
try:
|
|
_merge(_read_file_path(filepath), filepath)
|
|
except Exception as e:
|
|
logger.warning("Failed to parse %s: %s", filepath, e)
|
|
|
|
frozen_helpers = {svc: frozenset(names) for svc, names in all_helpers.items()}
|
|
return frozenset(all_io), frozen_helpers
|
|
|
|
|
|
def parse_boundary_types() -> frozenset[str]:
|
|
"""Parse boundary.sx and return the declared boundary type names."""
|
|
source = _read_file("boundary.sx")
|
|
exprs = parse_all(source)
|
|
for expr in exprs:
|
|
if (isinstance(expr, list) and len(expr) >= 2
|
|
and isinstance(expr[0], Symbol)
|
|
and expr[0].name == "define-boundary-types"):
|
|
type_list = expr[1]
|
|
if isinstance(type_list, list):
|
|
return frozenset(
|
|
item for item in type_list
|
|
if isinstance(item, str)
|
|
)
|
|
return frozenset()
|