"""Protocol manifest — aggregates /internal/schema from all services. Can be used as a CLI tool or imported for dev-mode inspection. Usage:: python -m shared.infrastructure.protocol_manifest """ from __future__ import annotations import asyncio import json from typing import Any from shared.infrastructure.data_client import fetch_data # Service names that have inter-service protocols _SERVICES = [ "blog", "market", "cart", "events", "account", "likes", "relations", "orders", ] async def fetch_service_schema(service: str) -> dict[str, Any] | None: """Fetch /internal/schema from a single service.""" try: from shared.infrastructure.urls import service_url import aiohttp url = service_url(service, "/internal/schema") async with aiohttp.ClientSession() as session: async with session.get(url, timeout=aiohttp.ClientTimeout(total=3)) as resp: if resp.status == 200: return await resp.json() except Exception: return None return None async def generate_manifest() -> dict[str, Any]: """Fetch schemas from all services and produce a unified protocol map.""" results = await asyncio.gather( *(fetch_service_schema(s) for s in _SERVICES), return_exceptions=True, ) manifest = {"services": {}} for service, result in zip(_SERVICES, results): if isinstance(result, dict): manifest["services"][service] = result else: manifest["services"][service] = {"error": str(result) if isinstance(result, Exception) else "unavailable"} return manifest if __name__ == "__main__": m = asyncio.run(generate_manifest()) print(json.dumps(m, indent=2))