#!/usr/bin/env python3 """MCP server for Rose Ash service introspection. Stdio JSON-RPC transport. Static analysis of the codebase — no app imports needed. """ import json, sys, os, re, subprocess, glob PROJECT_DIR = os.path.dirname(os.path.dirname(os.path.abspath(__file__))) SERVICES = { "blog": 8001, "market": 8002, "cart": 8003, "events": 8004, "federation": 8005, "account": 8006, "relations": 8008, "likes": 8009, "orders": 8010, "test": 8011, "sx_docs": 8012, } # --------------------------------------------------------------------------- # Helpers # --------------------------------------------------------------------------- def run(cmd, timeout=10): try: r = subprocess.run(cmd, shell=True, capture_output=True, text=True, timeout=timeout, cwd=PROJECT_DIR) return r.stdout.strip() except Exception as e: return f"Error: {e}" def text_result(s): return {"content": [{"type": "text", "text": s}]} def error_result(s): return {"content": [{"type": "text", "text": s}], "isError": True} def read_file(path): full = os.path.join(PROJECT_DIR, path) if not os.path.isabs(path) else path try: with open(full) as f: return f.read() except FileNotFoundError: return None # --------------------------------------------------------------------------- # Tool implementations # --------------------------------------------------------------------------- def svc_status(): out = run("docker ps --format '{{.Names}}\t{{.Status}}\t{{.Ports}}' 2>/dev/null") if not out: return text_result("No containers running (or docker not available)") lines = [] for line in out.split("\n"): parts = line.split("\t") if len(parts) >= 2: name = parts[0] # Filter to rose-ash related containers if any(s in name.lower() for s in list(SERVICES) + ["redis", "pgbouncer", "db", "postgres", "caddy"]): lines.append(f"{name:<30} {parts[1]:<30} {parts[2] if len(parts) > 2 else ''}") if not lines: return text_result("No rose-ash containers found") header = f"{'CONTAINER':<30} {'STATUS':<30} PORTS" return text_result(header + "\n" + "\n".join(lines)) def svc_routes(service): bp_dir = os.path.join(PROJECT_DIR, service, "bp") if not os.path.isdir(bp_dir): return error_result(f"No bp/ directory for service '{service}'") route_re = re.compile(r'@\w+\.(get|post|put|delete|patch|route)\(["\']([^"\']+)["\']') prefix_re = re.compile(r'url_prefix\s*=\s*["\']([^"\']+)["\']') routes = [] for root, _, files in os.walk(bp_dir): for f in sorted(files): if not f.endswith(".py"): continue path = os.path.join(root, f) rel = os.path.relpath(path, PROJECT_DIR) src = read_file(path) if not src: continue # Detect url_prefix in this file prefix_match = prefix_re.search(src) prefix = prefix_match.group(1) if prefix_match else "" for m in route_re.finditer(src): method = m.group(1).upper() route_path = m.group(2) if method == "ROUTE": method = "ANY" full_path = prefix + route_path if not route_path.startswith(prefix) else route_path # Find the handler name (next def after decorator) pos = m.end() handler_match = re.search(r'(?:async\s+)?def\s+(\w+)', src[pos:pos+200]) handler = handler_match.group(1) if handler_match else "?" routes.append(f"{method:<8} {full_path:<40} {handler:<30} {rel}") if not routes: return text_result(f"No routes found for '{service}'") header = f"{'METHOD':<8} {'PATH':<40} {'HANDLER':<30} FILE" return text_result(header + "\n" + "\n".join(routes)) def svc_calls(service=None): patterns = { "data": re.compile(r'fetch_data\(\s*["\'](\w+)["\']\s*,\s*["\']([^"\']+)["\']'), "action": re.compile(r'call_action\(\s*["\'](\w+)["\']\s*,\s*["\']([^"\']+)["\']'), "inbox": re.compile(r'send_internal_activity\(\s*["\'](\w+)["\']'), "fragment": re.compile(r'fetch_fragment\w*\(\s*["\'](\w+)["\']\s*,\s*["\']([^"\']+)["\']'), } scan_dirs = [service] if service else list(SERVICES) calls = [] for svc in scan_dirs: svc_dir = os.path.join(PROJECT_DIR, svc) if not os.path.isdir(svc_dir): continue for root, _, files in os.walk(svc_dir): # Skip alembic/versions if "alembic/versions" in root: continue for f in sorted(files): if not f.endswith(".py"): continue path = os.path.join(root, f) rel = os.path.relpath(path, PROJECT_DIR) src = read_file(path) if not src: continue for call_type, pattern in patterns.items(): for m in pattern.finditer(src): target = m.group(1) name = m.group(2) if m.lastindex >= 2 else "" calls.append(f"{svc:<15} → {target:<15} {call_type:<10} {name:<30} {rel}") if not calls: scope = f"'{service}'" if service else "any service" return text_result(f"No inter-service calls found in {scope}") header = f"{'CALLER':<15} {'TARGET':<15} {'TYPE':<10} {'NAME':<30} FILE" return text_result(header + "\n" + "\n".join(calls)) def _extract_env_vars(compose_src, service): """Extract environment variables for a service from a compose file.""" lines = compose_src.split("\n") in_service = False in_env = False env_vars = [] service_indent = None for line in lines: stripped = line.strip() # Detect service block start (2-space indented under services:) if re.match(rf"^ {service}:\s*$", line): in_service = True service_indent = len(line) - len(line.lstrip()) continue if in_service: # Detect end of service block (same or less indent, non-empty) if stripped and not line.startswith(" " * (service_indent + 1)) and not stripped.startswith("#"): if re.match(r"^\s{1,4}\w+:", line): break if stripped in ("environment:", "environment: &env"): in_env = True continue if re.match(r"^\s+environment:", stripped): in_env = True continue if in_env: if stripped.startswith("<<:"): env_vars.append(stripped) # Show anchor reference elif stripped.startswith("- "): env_vars.append(stripped[2:].strip()) elif re.match(r"^\s+\w+.*:", stripped) and ":" in stripped: # KEY: value format env_vars.append(stripped) elif stripped and not stripped.startswith("#"): # Check if we've left the environment section line_indent = len(line) - len(line.lstrip()) if line_indent <= service_indent + 2: in_env = False return env_vars def svc_config(service): result = f"Service: {service}\nDev port: {SERVICES.get(service, '?')}\n\n" # Production compose (has actual env vars) prod = read_file("docker-compose.yml") if prod: prod_vars = _extract_env_vars(prod, service) if prod_vars: result += "Production environment:\n" + "\n".join(f" {v}" for v in prod_vars) + "\n\n" # Dev compose (usually just anchor refs) dev = read_file("docker-compose.dev.yml") if dev: dev_vars = _extract_env_vars(dev, service) if dev_vars: result += "Dev overrides:\n" + "\n".join(f" {v}" for v in dev_vars) + "\n\n" # Also extract the x-dev-env anchor anchor_match = re.search(r'x-dev-env:\s*&dev-env\n((?:\s+\w.*\n)*)', dev) if anchor_match: result += "Shared dev env (x-dev-env):\n" + anchor_match.group(1) if "environment" not in result.lower(): result += "(no environment variables found)" return text_result(result) def svc_models(service): # Read alembic/env.py to find MODELS and TABLES env_py = read_file(f"{service}/alembic/env.py") if not env_py: return error_result(f"No alembic/env.py for '{service}'") # Extract MODELS list models_match = re.search(r'MODELS\s*=\s*\[(.*?)\]', env_py, re.DOTALL) if not models_match: return error_result(f"No MODELS list found in {service}/alembic/env.py") model_modules = re.findall(r'["\']([^"\']+)["\']', models_match.group(1)) # Extract TABLES set tables_match = re.search(r'TABLES\s*=\s*frozenset\(\{(.*?)\}\)', env_py, re.DOTALL) tables = re.findall(r'["\']([^"\']+)["\']', tables_match.group(1)) if tables_match else [] result = f"Service: {service}\n\n" result += f"Tables: {', '.join(sorted(tables))}\n\n" if tables else "" result += "Models:\n" for mod in model_modules: # Convert module path to file path file_path = mod.replace(".", "/") + ".py" src = read_file(file_path) if not src: result += f"\n {mod} (file not found: {file_path})\n" continue # Extract classes classes = re.findall(r'class\s+(\w+)\(.*?Base.*?\):', src) for cls in classes: result += f"\n {cls} ({mod}):\n" # Find columns col_pattern = re.compile(r'(\w+)\s*=\s*Column\((\w+)(?:\(.*?\))?,?\s*(.*?)\)', re.DOTALL) # Also handle mapped_column mapped_pattern = re.compile(r'(\w+):\s*Mapped\[([^\]]+)\]\s*=\s*mapped_column\(', re.DOTALL) for cm in col_pattern.finditer(src): col_name = cm.group(1) col_type = cm.group(2) extras = cm.group(3).strip().rstrip(",") flags = [] if "primary_key=True" in extras: flags.append("PK") if "nullable=False" in extras: flags.append("NOT NULL") if "unique=True" in extras: flags.append("UNIQUE") flag_str = f" [{', '.join(flags)}]" if flags else "" result += f" {col_name}: {col_type}{flag_str}\n" for mm in mapped_pattern.finditer(src): result += f" {mm.group(1)}: {mm.group(2)} (mapped)\n" # Relationships rels = re.findall(r'(\w+)\s*=\s*relationship\(["\'](\w+)["\']', src) for rel_name, rel_target in rels: result += f" {rel_name} → {rel_target} (relationship)\n" return text_result(result) def svc_schema(service): port = SERVICES.get(service) if not port: return error_result(f"Unknown service '{service}'") try: out = run(f"curl -s http://localhost:{port}/internal/schema", timeout=5) if not out: return error_result(f"No response from {service}:{port}/internal/schema — is the service running?") data = json.loads(out) return text_result(json.dumps(data, indent=2)) except json.JSONDecodeError: return text_result(out) # Return raw if not JSON except Exception as e: return error_result(f"Failed to query {service} schema: {e}") def alembic_status(service=None): services_to_check = [service] if service else [s for s in SERVICES if os.path.isdir(os.path.join(PROJECT_DIR, s, "alembic"))] results = [] for svc in services_to_check: alembic_dir = os.path.join(PROJECT_DIR, svc, "alembic") if not os.path.isdir(alembic_dir): continue # Count migration files versions_dir = os.path.join(alembic_dir, "versions") if os.path.isdir(versions_dir): migrations = [f for f in os.listdir(versions_dir) if f.endswith(".py") and not f.startswith("__")] count = len(migrations) latest = sorted(migrations)[-1] if migrations else "(none)" else: count = 0 latest = "(no versions dir)" results.append(f"{svc:<15} {count:>3} migrations latest: {latest}") if not results: return text_result("No services with alembic directories found") header = f"{'SERVICE':<15} {'COUNT':>3} LATEST" return text_result(header + "\n" + "\n".join(results)) def svc_logs(service, lines=50): # Try common container name patterns for pattern in [f"rose-ash-{service}-1", f"rose-ash_{service}_1", service]: out = run(f"docker logs --tail {lines} {pattern} 2>&1", timeout=10) if out and "No such container" not in out and "Error" not in out[:10]: return text_result(f"=== {service} (last {lines} lines) ===\n{out}") return error_result(f"Container for '{service}' not found. Try: docker ps") def svc_start(services): if services == "all" or services == ["all"]: cmd = "./dev.sh" elif isinstance(services, list): cmd = f"./dev.sh {' '.join(services)}" else: cmd = f"./dev.sh {services}" out = run(cmd, timeout=60) return text_result(f"Started: {services}\n{out}") def svc_stop(): out = run("./dev.sh down", timeout=30) return text_result(f"Stopped all services\n{out}") def svc_queries(service=None): services_to_check = [service] if service else list(SERVICES) results = [] for svc in services_to_check: path = os.path.join(PROJECT_DIR, svc, "queries.sx") src = read_file(path) if not src: continue # Parse defquery forms query_re = re.compile( r'\(defquery\s+([\w-]+)\s*\(([^)]*)\)\s*"([^"]*)"', re.DOTALL ) for m in query_re.finditer(src): name = m.group(1) params = m.group(2).strip() doc = m.group(3).strip()[:100] results.append(f"{svc:<15} {name:<30} {params:<40} {doc}") if not results: scope = f"'{service}'" if service else "any service" return text_result(f"No defquery definitions found in {scope}") header = f"{'SERVICE':<15} {'QUERY':<30} {'PARAMS':<40} DESCRIPTION" return text_result(header + "\n" + "\n".join(results)) def svc_actions(service=None): services_to_check = [service] if service else list(SERVICES) results = [] for svc in services_to_check: path = os.path.join(PROJECT_DIR, svc, "actions.sx") src = read_file(path) if not src: continue action_re = re.compile( r'\(defaction\s+([\w-]+)\s*\(([^)]*)\)\s*"([^"]*)"', re.DOTALL ) for m in action_re.finditer(src): name = m.group(1) params = m.group(2).strip() doc = m.group(3).strip()[:100] results.append(f"{svc:<15} {name:<30} {params:<40} {doc}") if not results: scope = f"'{service}'" if service else "any service" return text_result(f"No defaction definitions found in {scope}") header = f"{'SERVICE':<15} {'ACTION':<30} {'PARAMS':<40} DESCRIPTION" return text_result(header + "\n" + "\n".join(results)) # --------------------------------------------------------------------------- # Tool definitions # --------------------------------------------------------------------------- def tool(name, desc, props, required): return { "name": name, "description": desc, "inputSchema": { "type": "object", "required": required, "properties": props, }, } svc_prop = {"service": {"type": "string", "description": "Service name (blog, market, cart, etc)"}} dir_prop = {"dir": {"type": "string", "description": "Directory to scan"}} TOOLS = [ tool("svc_status", "Show Docker container status for all rose-ash services.", {}, []), tool("svc_routes", "List all HTTP routes for a service by scanning blueprint files.", svc_prop, ["service"]), tool("svc_calls", "Map inter-service calls (fetch_data, call_action, send_internal_activity, fetch_fragment). Shows caller→target graph.", {"service": {"type": "string", "description": "Service to scan (omit for all)"}}, []), tool("svc_config", "Show environment variables and config for a service from docker-compose.", svc_prop, ["service"]), tool("svc_models", "Show SQLAlchemy models, columns, and relationships for a service.", svc_prop, ["service"]), tool("svc_schema", "Query a running service's /internal/schema endpoint for live defquery/defaction manifest.", svc_prop, ["service"]), tool("alembic_status", "Show Alembic migration count and latest migration for each service.", {"service": {"type": "string", "description": "Service (omit for all)"}}, []), tool("svc_logs", "Show recent Docker logs for a service.", {**svc_prop, "lines": {"type": "integer", "description": "Number of lines (default 50)"}}, ["service"]), tool("svc_start", "Start services using dev.sh. Pass service names or 'all'.", {"services": {"type": "string", "description": "Service names (space-separated) or 'all'"}}, ["services"]), tool("svc_stop", "Stop all services (dev.sh down).", {}, []), tool("svc_queries", "List all defquery definitions from queries.sx files.", {"service": {"type": "string", "description": "Service (omit for all)"}}, []), tool("svc_actions", "List all defaction definitions from actions.sx files.", {"service": {"type": "string", "description": "Service (omit for all)"}}, []), ] # --------------------------------------------------------------------------- # JSON-RPC dispatch # --------------------------------------------------------------------------- def handle_tool(name, args): match name: case "svc_status": return svc_status() case "svc_routes": return svc_routes(args["service"]) case "svc_calls": return svc_calls(args.get("service")) case "svc_config": return svc_config(args["service"]) case "svc_models": return svc_models(args["service"]) case "svc_schema": return svc_schema(args["service"]) case "alembic_status": return alembic_status(args.get("service")) case "svc_logs": return svc_logs(args["service"], args.get("lines", 50)) case "svc_start": return svc_start(args["services"]) case "svc_stop": return svc_stop() case "svc_queries": return svc_queries(args.get("service")) case "svc_actions": return svc_actions(args.get("service")) case _: return error_result(f"Unknown tool: {name}") def dispatch(method, params): match method: case "initialize": return { "protocolVersion": "2024-11-05", "capabilities": {"tools": {}}, "serverInfo": {"name": "rose-ash-services", "version": "0.1.0"}, } case "notifications/initialized": return None case "tools/list": return {"tools": TOOLS} case "tools/call": name = params["name"] args = params.get("arguments", {}) try: return handle_tool(name, args) except Exception as e: return error_result(f"Error: {e}") case _: return None def main(): for line in sys.stdin: line = line.strip() if not line: continue try: msg = json.loads(line) except json.JSONDecodeError: continue method = msg.get("method", "") params = msg.get("params", {}) msg_id = msg.get("id") result = dispatch(method, params) if msg_id is not None and result is not None: response = {"jsonrpc": "2.0", "id": msg_id, "result": result} print(json.dumps(response), flush=True) if __name__ == "__main__": main()