""" Async HTTP client for inter-app communication. Each app exposes internal JSON API endpoints. Other apps call them via httpx over the Docker overlay network (or localhost in dev). URLs resolved from env vars: INTERNAL_URL_COOP (default http://localhost:8000) INTERNAL_URL_MARKET (default http://localhost:8001) INTERNAL_URL_CART (default http://localhost:8002) Session cookie forwarding: when ``forward_session=True`` the current request's ``coop_session`` cookie is sent along so the target app can resolve ``g.user`` / cart identity. """ from __future__ import annotations import logging import os from typing import Any import httpx from quart import request as quart_request log = logging.getLogger("internal_api") class DictObj: """Thin wrapper so ``d.key`` works on dicts returned by JSON APIs. Jinja templates use attribute access (``item.post.slug``) which doesn't work on plain dicts. Wrapping the API response with ``dictobj()`` makes both ``item.post.slug`` and ``item["post"]["slug"]`` work identically. """ __slots__ = ("_data",) def __init__(self, data: dict): self._data = data def __getattr__(self, name: str): try: v = self._data[name] except KeyError: raise AttributeError(name) if isinstance(v, dict): return DictObj(v) return v def get(self, key, default=None): v = self._data.get(key, default) if isinstance(v, dict): return DictObj(v) return v def __repr__(self): return f"DictObj({self._data!r})" def __bool__(self): return bool(self._data) def dictobj(data): """Recursively wrap dicts (or lists of dicts) for attribute access.""" if isinstance(data, list): return [DictObj(d) if isinstance(d, dict) else d for d in data] if isinstance(data, dict): return DictObj(data) return data _DEFAULTS = { "coop": "http://localhost:8000", "market": "http://localhost:8001", "cart": "http://localhost:8002", "events": "http://localhost:8003", } _client: httpx.AsyncClient | None = None TIMEOUT = 3.0 # seconds def _base_url(app_name: str) -> str: env_key = f"INTERNAL_URL_{app_name.upper()}" return os.getenv(env_key, _DEFAULTS.get(app_name, "")) def _get_client() -> httpx.AsyncClient: global _client if _client is None or _client.is_closed: _client = httpx.AsyncClient(timeout=TIMEOUT) return _client async def close_client() -> None: """Call from ``@app.after_serving`` to cleanly close the pool.""" global _client if _client is not None and not _client.is_closed: await _client.aclose() _client = None def _session_cookies() -> dict[str, str]: """Extract the shared session cookie from the incoming request.""" cookie_name = "coop_session" try: val = quart_request.cookies.get(cookie_name) except RuntimeError: # No active request context val = None if val: return {cookie_name: val} return {} async def get( app_name: str, path: str, *, forward_session: bool = False, params: dict | None = None, ) -> dict | list | None: """GET ```` and return parsed JSON, or ``None`` on failure.""" url = _base_url(app_name).rstrip("/") + path cookies = _session_cookies() if forward_session else {} try: resp = await _get_client().get(url, params=params, cookies=cookies) resp.raise_for_status() return resp.json() except Exception as exc: log.warning("internal_api GET %s failed: %r", url, exc) return None async def post( app_name: str, path: str, *, json: Any = None, forward_session: bool = False, ) -> dict | list | None: """POST ```` and return parsed JSON, or ``None`` on failure.""" url = _base_url(app_name).rstrip("/") + path cookies = _session_cookies() if forward_session else {} try: resp = await _get_client().post(url, json=json, cookies=cookies) resp.raise_for_status() return resp.json() except Exception as exc: log.warning("internal_api POST %s failed: %r", url, exc) return None