""" Server-side fragment composition client. Each coop app exposes HTML fragments at ``/internal/fragments/{type}``. This module provides helpers to fetch and cache those fragments so that consuming apps can compose cross-app UI without shared templates. Failures raise ``FragmentError`` by default so broken fragments are immediately visible rather than silently missing from the page. """ from __future__ import annotations import asyncio import logging import os from typing import Sequence import httpx log = logging.getLogger(__name__) # Re-usable async client (created lazily, one per process) _client: httpx.AsyncClient | None = None # Default request timeout (seconds) _DEFAULT_TIMEOUT = 2.0 # Header sent on every fragment request so providers can distinguish # fragment fetches from normal browser traffic. FRAGMENT_HEADER = "X-Fragment-Request" class FragmentError(Exception): """Raised when a fragment fetch fails.""" def _get_client() -> httpx.AsyncClient: global _client if _client is None or _client.is_closed: _client = httpx.AsyncClient( timeout=httpx.Timeout(_DEFAULT_TIMEOUT), follow_redirects=False, ) return _client def _internal_url(app_name: str) -> str: """Resolve the Docker-internal base URL for *app_name*. Looks up ``INTERNAL_URL_{APP}`` first, falls back to ``http://{app}:8000``. """ env_key = f"INTERNAL_URL_{app_name.upper()}" return os.getenv(env_key, f"http://{app_name}:8000").rstrip("/") # ------------------------------------------------------------------ # Public API # ------------------------------------------------------------------ def _is_fragment_request() -> bool: """True when the current request is itself a fragment fetch.""" try: from quart import request as _req return bool(_req.headers.get(FRAGMENT_HEADER)) except Exception: return False async def fetch_fragment( app_name: str, fragment_type: str, *, params: dict | None = None, timeout: float = _DEFAULT_TIMEOUT, required: bool = True, ) -> str: """Fetch an HTML fragment from another app. Returns the raw HTML string. When *required* is True (default), raises ``FragmentError`` on network errors or non-200 responses. When *required* is False, returns ``""`` on failure. Automatically returns ``""`` when called inside a fragment request to prevent circular dependencies between apps. """ if _is_fragment_request(): return "" base = _internal_url(app_name) url = f"{base}/internal/fragments/{fragment_type}" try: resp = await _get_client().get( url, params=params, headers={FRAGMENT_HEADER: "1"}, timeout=timeout, ) if resp.status_code == 200: return resp.text msg = f"Fragment {app_name}/{fragment_type} returned {resp.status_code}" if required: log.error(msg) raise FragmentError(msg) log.warning(msg) return "" except FragmentError: raise except Exception as exc: msg = f"Fragment {app_name}/{fragment_type} failed: {exc}" if required: log.error(msg) raise FragmentError(msg) from exc log.warning(msg) return "" async def fetch_fragments( requests: Sequence[tuple[str, str, dict | None]], *, timeout: float = _DEFAULT_TIMEOUT, required: bool = True, ) -> list[str]: """Fetch multiple fragments concurrently. *requests* is a sequence of ``(app_name, fragment_type, params)`` tuples. Returns a list of HTML strings in the same order. When *required* is True, any single failure raises ``FragmentError``. """ return list(await asyncio.gather(*( fetch_fragment(app, ftype, params=params, timeout=timeout, required=required) for app, ftype, params in requests ))) async def fetch_fragment_cached( app_name: str, fragment_type: str, *, params: dict | None = None, ttl: int = 30, timeout: float = _DEFAULT_TIMEOUT, required: bool = True, ) -> str: """Fetch a fragment with a Redis cache layer. Cache key: ``frag:{app}:{type}:{sorted_params}``. """ # Build a stable cache key suffix = "" if params: sorted_items = sorted(params.items()) suffix = ":" + "&".join(f"{k}={v}" for k, v in sorted_items) cache_key = f"frag:{app_name}:{fragment_type}{suffix}" # Try Redis cache redis = _get_redis() if redis: try: cached = await redis.get(cache_key) if cached is not None: return cached.decode() if isinstance(cached, bytes) else cached except Exception: pass # Cache miss — fetch from provider html = await fetch_fragment( app_name, fragment_type, params=params, timeout=timeout, required=required, ) # Store in cache (even empty string — avoids hammering a down service) if redis and ttl > 0: try: await redis.set(cache_key, html.encode(), ex=ttl) except Exception: pass return html # ------------------------------------------------------------------ # Helpers # ------------------------------------------------------------------ def _get_redis(): """Return the current app's Redis connection, or None.""" try: from quart import current_app r = current_app.redis return r if r else None except Exception: return None