This repository has been archived on 2026-02-24. You can view files and clone it. You cannot open issues or pull requests or push a commit.
Files
shared/infrastructure/fragments.py
giles 20d3ff8425 Make fragment failures raise by default instead of silent degradation
FragmentError raised on network errors or non-200 responses when
required=True (default). Logs at ERROR level. Pass required=False
for optional fragments that should degrade gracefully.

Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
2026-02-24 18:03:04 +00:00

179 lines
5.0 KiB
Python

"""
Server-side fragment composition client.
Each coop app exposes HTML fragments at ``/internal/fragments/{type}``.
This module provides helpers to fetch and cache those fragments so that
consuming apps can compose cross-app UI without shared templates.
Failures raise ``FragmentError`` by default so broken fragments are
immediately visible rather than silently missing from the page.
"""
from __future__ import annotations
import asyncio
import logging
import os
from typing import Sequence
import httpx
log = logging.getLogger(__name__)
# Re-usable async client (created lazily, one per process)
_client: httpx.AsyncClient | None = None
# Default request timeout (seconds)
_DEFAULT_TIMEOUT = 2.0
# Header sent on every fragment request so providers can distinguish
# fragment fetches from normal browser traffic.
FRAGMENT_HEADER = "X-Fragment-Request"
class FragmentError(Exception):
"""Raised when a fragment fetch fails."""
def _get_client() -> httpx.AsyncClient:
global _client
if _client is None or _client.is_closed:
_client = httpx.AsyncClient(
timeout=httpx.Timeout(_DEFAULT_TIMEOUT),
follow_redirects=False,
)
return _client
def _internal_url(app_name: str) -> str:
"""Resolve the Docker-internal base URL for *app_name*.
Looks up ``INTERNAL_URL_{APP}`` first, falls back to
``http://{app}:8000``.
"""
env_key = f"INTERNAL_URL_{app_name.upper()}"
return os.getenv(env_key, f"http://{app_name}:8000").rstrip("/")
# ------------------------------------------------------------------
# Public API
# ------------------------------------------------------------------
async def fetch_fragment(
app_name: str,
fragment_type: str,
*,
params: dict | None = None,
timeout: float = _DEFAULT_TIMEOUT,
required: bool = True,
) -> str:
"""Fetch an HTML fragment from another app.
Returns the raw HTML string. When *required* is True (default),
raises ``FragmentError`` on network errors or non-200 responses.
When *required* is False, returns ``""`` on failure.
"""
base = _internal_url(app_name)
url = f"{base}/internal/fragments/{fragment_type}"
try:
resp = await _get_client().get(
url,
params=params,
headers={FRAGMENT_HEADER: "1"},
timeout=timeout,
)
if resp.status_code == 200:
return resp.text
msg = f"Fragment {app_name}/{fragment_type} returned {resp.status_code}"
if required:
log.error(msg)
raise FragmentError(msg)
log.warning(msg)
return ""
except FragmentError:
raise
except Exception as exc:
msg = f"Fragment {app_name}/{fragment_type} failed: {exc}"
if required:
log.error(msg)
raise FragmentError(msg) from exc
log.warning(msg)
return ""
async def fetch_fragments(
requests: Sequence[tuple[str, str, dict | None]],
*,
timeout: float = _DEFAULT_TIMEOUT,
required: bool = True,
) -> list[str]:
"""Fetch multiple fragments concurrently.
*requests* is a sequence of ``(app_name, fragment_type, params)`` tuples.
Returns a list of HTML strings in the same order. When *required*
is True, any single failure raises ``FragmentError``.
"""
return list(await asyncio.gather(*(
fetch_fragment(app, ftype, params=params, timeout=timeout, required=required)
for app, ftype, params in requests
)))
async def fetch_fragment_cached(
app_name: str,
fragment_type: str,
*,
params: dict | None = None,
ttl: int = 30,
timeout: float = _DEFAULT_TIMEOUT,
required: bool = True,
) -> str:
"""Fetch a fragment with a Redis cache layer.
Cache key: ``frag:{app}:{type}:{sorted_params}``.
"""
# Build a stable cache key
suffix = ""
if params:
sorted_items = sorted(params.items())
suffix = ":" + "&".join(f"{k}={v}" for k, v in sorted_items)
cache_key = f"frag:{app_name}:{fragment_type}{suffix}"
# Try Redis cache
redis = _get_redis()
if redis:
try:
cached = await redis.get(cache_key)
if cached is not None:
return cached.decode() if isinstance(cached, bytes) else cached
except Exception:
pass
# Cache miss — fetch from provider
html = await fetch_fragment(
app_name, fragment_type, params=params, timeout=timeout, required=required,
)
# Store in cache (even empty string — avoids hammering a down service)
if redis and ttl > 0:
try:
await redis.set(cache_key, html.encode(), ex=ttl)
except Exception:
pass
return html
# ------------------------------------------------------------------
# Helpers
# ------------------------------------------------------------------
def _get_redis():
"""Return the current app's Redis connection, or None."""
try:
from quart import current_app
r = current_app.redis
return r if r else None
except Exception:
return None