Add rose-ash-services MCP server: 12 tools for service introspection

Python-based MCP server (tools/mcp_services.py) for understanding the
microservice topology via static analysis:

- svc_status: Docker container status
- svc_routes: HTTP route table from blueprint scanning
- svc_calls: inter-service dependency graph (fetch_data/call_action/etc)
- svc_config: environment variables from docker-compose
- svc_models: SQLAlchemy models, columns, relationships
- svc_schema: live defquery/defaction manifest from running services
- alembic_status: migration count per service
- svc_logs/svc_start/svc_stop: service lifecycle
- svc_queries/svc_actions: SX query and action definitions

Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
This commit is contained in:
2026-03-25 23:34:52 +00:00
parent 6806343d0e
commit 250bee69c7
3 changed files with 549 additions and 0 deletions

View File

@@ -3,6 +3,11 @@
"sx-tree": {
"type": "stdio",
"command": "./hosts/ocaml/_build/default/bin/mcp_tree.exe"
},
"rose-ash-services": {
"type": "stdio",
"command": "python3",
"args": ["tools/mcp_services.py"]
}
}
}

View File

@@ -330,3 +330,22 @@ Dev bind mounts in `docker-compose.dev.yml` must mirror the Docker image's COPY
- Use Context7 MCP for up-to-date library documentation
- Playwright MCP is available for browser automation/testing
### Service introspection MCP (rose-ash-services)
Python-based MCP server for understanding the microservice topology. Static analysis — works without running services.
| Tool | Purpose |
|------|---------|
| `svc_status` | Docker container status for all rose-ash services |
| `svc_routes` | List all HTTP routes for a service by scanning blueprints |
| `svc_calls` | Map inter-service calls (fetch_data/call_action/send_internal_activity/fetch_fragment) |
| `svc_config` | Environment variables and config for a service |
| `svc_models` | SQLAlchemy models, columns, relationships for a service |
| `svc_schema` | Live defquery/defaction manifest from a running service |
| `alembic_status` | Migration count and latest migration per service |
| `svc_logs` | Recent Docker logs for a service |
| `svc_start` | Start services via dev.sh |
| `svc_stop` | Stop all services |
| `svc_queries` | List all defquery definitions from queries.sx files |
| `svc_actions` | List all defaction definitions from actions.sx files |

525
tools/mcp_services.py Executable file
View File

@@ -0,0 +1,525 @@
#!/usr/bin/env python3
"""MCP server for Rose Ash service introspection.
Stdio JSON-RPC transport. Static analysis of the codebase — no app imports needed.
"""
import json, sys, os, re, subprocess, glob
PROJECT_DIR = os.path.dirname(os.path.dirname(os.path.abspath(__file__)))
SERVICES = {
"blog": 8001, "market": 8002, "cart": 8003, "events": 8004,
"federation": 8005, "account": 8006, "relations": 8008,
"likes": 8009, "orders": 8010, "test": 8011, "sx_docs": 8012,
}
# ---------------------------------------------------------------------------
# Helpers
# ---------------------------------------------------------------------------
def run(cmd, timeout=10):
try:
r = subprocess.run(cmd, shell=True, capture_output=True, text=True, timeout=timeout, cwd=PROJECT_DIR)
return r.stdout.strip()
except Exception as e:
return f"Error: {e}"
def text_result(s):
return {"content": [{"type": "text", "text": s}]}
def error_result(s):
return {"content": [{"type": "text", "text": s}], "isError": True}
def read_file(path):
full = os.path.join(PROJECT_DIR, path) if not os.path.isabs(path) else path
try:
with open(full) as f:
return f.read()
except FileNotFoundError:
return None
# ---------------------------------------------------------------------------
# Tool implementations
# ---------------------------------------------------------------------------
def svc_status():
out = run("docker ps --format '{{.Names}}\t{{.Status}}\t{{.Ports}}' 2>/dev/null")
if not out:
return text_result("No containers running (or docker not available)")
lines = []
for line in out.split("\n"):
parts = line.split("\t")
if len(parts) >= 2:
name = parts[0]
# Filter to rose-ash related containers
if any(s in name.lower() for s in list(SERVICES) + ["redis", "pgbouncer", "db", "postgres", "caddy"]):
lines.append(f"{name:<30} {parts[1]:<30} {parts[2] if len(parts) > 2 else ''}")
if not lines:
return text_result("No rose-ash containers found")
header = f"{'CONTAINER':<30} {'STATUS':<30} PORTS"
return text_result(header + "\n" + "\n".join(lines))
def svc_routes(service):
bp_dir = os.path.join(PROJECT_DIR, service, "bp")
if not os.path.isdir(bp_dir):
return error_result(f"No bp/ directory for service '{service}'")
route_re = re.compile(r'@\w+\.(get|post|put|delete|patch|route)\(["\']([^"\']+)["\']')
prefix_re = re.compile(r'url_prefix\s*=\s*["\']([^"\']+)["\']')
routes = []
for root, _, files in os.walk(bp_dir):
for f in sorted(files):
if not f.endswith(".py"):
continue
path = os.path.join(root, f)
rel = os.path.relpath(path, PROJECT_DIR)
src = read_file(path)
if not src:
continue
# Detect url_prefix in this file
prefix_match = prefix_re.search(src)
prefix = prefix_match.group(1) if prefix_match else ""
for m in route_re.finditer(src):
method = m.group(1).upper()
route_path = m.group(2)
if method == "ROUTE":
method = "ANY"
full_path = prefix + route_path if not route_path.startswith(prefix) else route_path
# Find the handler name (next def after decorator)
pos = m.end()
handler_match = re.search(r'(?:async\s+)?def\s+(\w+)', src[pos:pos+200])
handler = handler_match.group(1) if handler_match else "?"
routes.append(f"{method:<8} {full_path:<40} {handler:<30} {rel}")
if not routes:
return text_result(f"No routes found for '{service}'")
header = f"{'METHOD':<8} {'PATH':<40} {'HANDLER':<30} FILE"
return text_result(header + "\n" + "\n".join(routes))
def svc_calls(service=None):
patterns = {
"data": re.compile(r'fetch_data\(\s*["\'](\w+)["\']\s*,\s*["\']([^"\']+)["\']'),
"action": re.compile(r'call_action\(\s*["\'](\w+)["\']\s*,\s*["\']([^"\']+)["\']'),
"inbox": re.compile(r'send_internal_activity\(\s*["\'](\w+)["\']'),
"fragment": re.compile(r'fetch_fragment\w*\(\s*["\'](\w+)["\']\s*,\s*["\']([^"\']+)["\']'),
}
scan_dirs = [service] if service else list(SERVICES)
calls = []
for svc in scan_dirs:
svc_dir = os.path.join(PROJECT_DIR, svc)
if not os.path.isdir(svc_dir):
continue
for root, _, files in os.walk(svc_dir):
# Skip alembic/versions
if "alembic/versions" in root:
continue
for f in sorted(files):
if not f.endswith(".py"):
continue
path = os.path.join(root, f)
rel = os.path.relpath(path, PROJECT_DIR)
src = read_file(path)
if not src:
continue
for call_type, pattern in patterns.items():
for m in pattern.finditer(src):
target = m.group(1)
name = m.group(2) if m.lastindex >= 2 else ""
calls.append(f"{svc:<15}{target:<15} {call_type:<10} {name:<30} {rel}")
if not calls:
scope = f"'{service}'" if service else "any service"
return text_result(f"No inter-service calls found in {scope}")
header = f"{'CALLER':<15} {'TARGET':<15} {'TYPE':<10} {'NAME':<30} FILE"
return text_result(header + "\n" + "\n".join(calls))
def _extract_env_vars(compose_src, service):
"""Extract environment variables for a service from a compose file."""
lines = compose_src.split("\n")
in_service = False
in_env = False
env_vars = []
service_indent = None
for line in lines:
stripped = line.strip()
# Detect service block start (2-space indented under services:)
if re.match(rf"^ {service}:\s*$", line):
in_service = True
service_indent = len(line) - len(line.lstrip())
continue
if in_service:
# Detect end of service block (same or less indent, non-empty)
if stripped and not line.startswith(" " * (service_indent + 1)) and not stripped.startswith("#"):
if re.match(r"^\s{1,4}\w+:", line):
break
if stripped in ("environment:", "environment: &env"):
in_env = True
continue
if re.match(r"^\s+environment:", stripped):
in_env = True
continue
if in_env:
if stripped.startswith("<<:"):
env_vars.append(stripped) # Show anchor reference
elif stripped.startswith("- "):
env_vars.append(stripped[2:].strip())
elif re.match(r"^\s+\w+.*:", stripped) and ":" in stripped:
# KEY: value format
env_vars.append(stripped)
elif stripped and not stripped.startswith("#"):
# Check if we've left the environment section
line_indent = len(line) - len(line.lstrip())
if line_indent <= service_indent + 2:
in_env = False
return env_vars
def svc_config(service):
result = f"Service: {service}\nDev port: {SERVICES.get(service, '?')}\n\n"
# Production compose (has actual env vars)
prod = read_file("docker-compose.yml")
if prod:
prod_vars = _extract_env_vars(prod, service)
if prod_vars:
result += "Production environment:\n" + "\n".join(f" {v}" for v in prod_vars) + "\n\n"
# Dev compose (usually just anchor refs)
dev = read_file("docker-compose.dev.yml")
if dev:
dev_vars = _extract_env_vars(dev, service)
if dev_vars:
result += "Dev overrides:\n" + "\n".join(f" {v}" for v in dev_vars) + "\n\n"
# Also extract the x-dev-env anchor
anchor_match = re.search(r'x-dev-env:\s*&dev-env\n((?:\s+\w.*\n)*)', dev)
if anchor_match:
result += "Shared dev env (x-dev-env):\n" + anchor_match.group(1)
if "environment" not in result.lower():
result += "(no environment variables found)"
return text_result(result)
def svc_models(service):
# Read alembic/env.py to find MODELS and TABLES
env_py = read_file(f"{service}/alembic/env.py")
if not env_py:
return error_result(f"No alembic/env.py for '{service}'")
# Extract MODELS list
models_match = re.search(r'MODELS\s*=\s*\[(.*?)\]', env_py, re.DOTALL)
if not models_match:
return error_result(f"No MODELS list found in {service}/alembic/env.py")
model_modules = re.findall(r'["\']([^"\']+)["\']', models_match.group(1))
# Extract TABLES set
tables_match = re.search(r'TABLES\s*=\s*frozenset\(\{(.*?)\}\)', env_py, re.DOTALL)
tables = re.findall(r'["\']([^"\']+)["\']', tables_match.group(1)) if tables_match else []
result = f"Service: {service}\n\n"
result += f"Tables: {', '.join(sorted(tables))}\n\n" if tables else ""
result += "Models:\n"
for mod in model_modules:
# Convert module path to file path
file_path = mod.replace(".", "/") + ".py"
src = read_file(file_path)
if not src:
result += f"\n {mod} (file not found: {file_path})\n"
continue
# Extract classes
classes = re.findall(r'class\s+(\w+)\(.*?Base.*?\):', src)
for cls in classes:
result += f"\n {cls} ({mod}):\n"
# Find columns
col_pattern = re.compile(r'(\w+)\s*=\s*Column\((\w+)(?:\(.*?\))?,?\s*(.*?)\)', re.DOTALL)
# Also handle mapped_column
mapped_pattern = re.compile(r'(\w+):\s*Mapped\[([^\]]+)\]\s*=\s*mapped_column\(', re.DOTALL)
for cm in col_pattern.finditer(src):
col_name = cm.group(1)
col_type = cm.group(2)
extras = cm.group(3).strip().rstrip(",")
flags = []
if "primary_key=True" in extras:
flags.append("PK")
if "nullable=False" in extras:
flags.append("NOT NULL")
if "unique=True" in extras:
flags.append("UNIQUE")
flag_str = f" [{', '.join(flags)}]" if flags else ""
result += f" {col_name}: {col_type}{flag_str}\n"
for mm in mapped_pattern.finditer(src):
result += f" {mm.group(1)}: {mm.group(2)} (mapped)\n"
# Relationships
rels = re.findall(r'(\w+)\s*=\s*relationship\(["\'](\w+)["\']', src)
for rel_name, rel_target in rels:
result += f" {rel_name}{rel_target} (relationship)\n"
return text_result(result)
def svc_schema(service):
port = SERVICES.get(service)
if not port:
return error_result(f"Unknown service '{service}'")
try:
out = run(f"curl -s http://localhost:{port}/internal/schema", timeout=5)
if not out:
return error_result(f"No response from {service}:{port}/internal/schema — is the service running?")
data = json.loads(out)
return text_result(json.dumps(data, indent=2))
except json.JSONDecodeError:
return text_result(out) # Return raw if not JSON
except Exception as e:
return error_result(f"Failed to query {service} schema: {e}")
def alembic_status(service=None):
services_to_check = [service] if service else [s for s in SERVICES if os.path.isdir(os.path.join(PROJECT_DIR, s, "alembic"))]
results = []
for svc in services_to_check:
alembic_dir = os.path.join(PROJECT_DIR, svc, "alembic")
if not os.path.isdir(alembic_dir):
continue
# Count migration files
versions_dir = os.path.join(alembic_dir, "versions")
if os.path.isdir(versions_dir):
migrations = [f for f in os.listdir(versions_dir) if f.endswith(".py") and not f.startswith("__")]
count = len(migrations)
latest = sorted(migrations)[-1] if migrations else "(none)"
else:
count = 0
latest = "(no versions dir)"
results.append(f"{svc:<15} {count:>3} migrations latest: {latest}")
if not results:
return text_result("No services with alembic directories found")
header = f"{'SERVICE':<15} {'COUNT':>3} LATEST"
return text_result(header + "\n" + "\n".join(results))
def svc_logs(service, lines=50):
# Try common container name patterns
for pattern in [f"rose-ash-{service}-1", f"rose-ash_{service}_1", service]:
out = run(f"docker logs --tail {lines} {pattern} 2>&1", timeout=10)
if out and "No such container" not in out and "Error" not in out[:10]:
return text_result(f"=== {service} (last {lines} lines) ===\n{out}")
return error_result(f"Container for '{service}' not found. Try: docker ps")
def svc_start(services):
if services == "all" or services == ["all"]:
cmd = "./dev.sh"
elif isinstance(services, list):
cmd = f"./dev.sh {' '.join(services)}"
else:
cmd = f"./dev.sh {services}"
out = run(cmd, timeout=60)
return text_result(f"Started: {services}\n{out}")
def svc_stop():
out = run("./dev.sh down", timeout=30)
return text_result(f"Stopped all services\n{out}")
def svc_queries(service=None):
services_to_check = [service] if service else list(SERVICES)
results = []
for svc in services_to_check:
path = os.path.join(PROJECT_DIR, svc, "queries.sx")
src = read_file(path)
if not src:
continue
# Parse defquery forms
query_re = re.compile(
r'\(defquery\s+([\w-]+)\s*\(([^)]*)\)\s*"([^"]*)"',
re.DOTALL
)
for m in query_re.finditer(src):
name = m.group(1)
params = m.group(2).strip()
doc = m.group(3).strip()[:100]
results.append(f"{svc:<15} {name:<30} {params:<40} {doc}")
if not results:
scope = f"'{service}'" if service else "any service"
return text_result(f"No defquery definitions found in {scope}")
header = f"{'SERVICE':<15} {'QUERY':<30} {'PARAMS':<40} DESCRIPTION"
return text_result(header + "\n" + "\n".join(results))
def svc_actions(service=None):
services_to_check = [service] if service else list(SERVICES)
results = []
for svc in services_to_check:
path = os.path.join(PROJECT_DIR, svc, "actions.sx")
src = read_file(path)
if not src:
continue
action_re = re.compile(
r'\(defaction\s+([\w-]+)\s*\(([^)]*)\)\s*"([^"]*)"',
re.DOTALL
)
for m in action_re.finditer(src):
name = m.group(1)
params = m.group(2).strip()
doc = m.group(3).strip()[:100]
results.append(f"{svc:<15} {name:<30} {params:<40} {doc}")
if not results:
scope = f"'{service}'" if service else "any service"
return text_result(f"No defaction definitions found in {scope}")
header = f"{'SERVICE':<15} {'ACTION':<30} {'PARAMS':<40} DESCRIPTION"
return text_result(header + "\n" + "\n".join(results))
# ---------------------------------------------------------------------------
# Tool definitions
# ---------------------------------------------------------------------------
def tool(name, desc, props, required):
return {
"name": name,
"description": desc,
"inputSchema": {
"type": "object",
"required": required,
"properties": props,
},
}
svc_prop = {"service": {"type": "string", "description": "Service name (blog, market, cart, etc)"}}
dir_prop = {"dir": {"type": "string", "description": "Directory to scan"}}
TOOLS = [
tool("svc_status", "Show Docker container status for all rose-ash services.", {}, []),
tool("svc_routes", "List all HTTP routes for a service by scanning blueprint files.",
svc_prop, ["service"]),
tool("svc_calls", "Map inter-service calls (fetch_data, call_action, send_internal_activity, fetch_fragment). Shows caller→target graph.",
{"service": {"type": "string", "description": "Service to scan (omit for all)"}}, []),
tool("svc_config", "Show environment variables and config for a service from docker-compose.",
svc_prop, ["service"]),
tool("svc_models", "Show SQLAlchemy models, columns, and relationships for a service.",
svc_prop, ["service"]),
tool("svc_schema", "Query a running service's /internal/schema endpoint for live defquery/defaction manifest.",
svc_prop, ["service"]),
tool("alembic_status", "Show Alembic migration count and latest migration for each service.",
{"service": {"type": "string", "description": "Service (omit for all)"}}, []),
tool("svc_logs", "Show recent Docker logs for a service.",
{**svc_prop, "lines": {"type": "integer", "description": "Number of lines (default 50)"}},
["service"]),
tool("svc_start", "Start services using dev.sh. Pass service names or 'all'.",
{"services": {"type": "string", "description": "Service names (space-separated) or 'all'"}},
["services"]),
tool("svc_stop", "Stop all services (dev.sh down).", {}, []),
tool("svc_queries", "List all defquery definitions from queries.sx files.",
{"service": {"type": "string", "description": "Service (omit for all)"}}, []),
tool("svc_actions", "List all defaction definitions from actions.sx files.",
{"service": {"type": "string", "description": "Service (omit for all)"}}, []),
]
# ---------------------------------------------------------------------------
# JSON-RPC dispatch
# ---------------------------------------------------------------------------
def handle_tool(name, args):
match name:
case "svc_status":
return svc_status()
case "svc_routes":
return svc_routes(args["service"])
case "svc_calls":
return svc_calls(args.get("service"))
case "svc_config":
return svc_config(args["service"])
case "svc_models":
return svc_models(args["service"])
case "svc_schema":
return svc_schema(args["service"])
case "alembic_status":
return alembic_status(args.get("service"))
case "svc_logs":
return svc_logs(args["service"], args.get("lines", 50))
case "svc_start":
return svc_start(args["services"])
case "svc_stop":
return svc_stop()
case "svc_queries":
return svc_queries(args.get("service"))
case "svc_actions":
return svc_actions(args.get("service"))
case _:
return error_result(f"Unknown tool: {name}")
def dispatch(method, params):
match method:
case "initialize":
return {
"protocolVersion": "2024-11-05",
"capabilities": {"tools": {}},
"serverInfo": {"name": "rose-ash-services", "version": "0.1.0"},
}
case "notifications/initialized":
return None
case "tools/list":
return {"tools": TOOLS}
case "tools/call":
name = params["name"]
args = params.get("arguments", {})
try:
return handle_tool(name, args)
except Exception as e:
return error_result(f"Error: {e}")
case _:
return None
def main():
for line in sys.stdin:
line = line.strip()
if not line:
continue
try:
msg = json.loads(line)
except json.JSONDecodeError:
continue
method = msg.get("method", "")
params = msg.get("params", {})
msg_id = msg.get("id")
result = dispatch(method, params)
if msg_id is not None and result is not None:
response = {"jsonrpc": "2.0", "id": msg_id, "result": result}
print(json.dumps(response), flush=True)
if __name__ == "__main__":
main()