# suma_browser/http_client.py from __future__ import annotations import asyncio import os import secrets from typing import Optional, Dict import httpx from config import config _CLIENT: httpx.AsyncClient | None = None # ----- optional decoders -> Accept-Encoding BROTLI_OK = False ZSTD_OK = False try: import brotli # noqa: F401 BROTLI_OK = True except Exception: pass try: import zstandard as zstd # noqa: F401 ZSTD_OK = True except Exception: pass def _accept_encoding() -> str: enc = ["gzip", "deflate"] if BROTLI_OK: enc.append("br") if ZSTD_OK: enc.append("zstd") return ", ".join(enc) FIREFOX_UA = "Mozilla/5.0 (X11; Ubuntu; Linux x86_64; rv:142.0) Gecko/20100101 Firefox/142.0" def _ff_headers(referer: Optional[str] = None) -> Dict[str, str]: h = { "User-Agent": FIREFOX_UA, "Accept": "text/html,application/xhtml+xml,application/xml;q=0.9,*/*;q=0.8", "Accept-Language": "en-GB,en;q=0.5", "Accept-Encoding": _accept_encoding(), "Connection": "keep-alive", "Upgrade-Insecure-Requests": "1", "Sec-Fetch-Dest": "document", "Sec-Fetch-Mode": "navigate", "Sec-Fetch-Site": "none" if not referer else "same-origin", "Sec-Fetch-User": "?1", "DNT": "1", "Sec-GPC": "1", "Priority": "u=0, i", "Cache-Control": "no-cache", "Pragma": "no-cache", } if referer: h["Referer"] = referer return h def _chrome_headers(referer=None, origin=None): headers = { "User-Agent": "Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 " "(KHTML, like Gecko) Chrome/119.0.0.0 Safari/537.36", "Accept": "text/html,application/xhtml+xml,application/xml;q=0.9,image/avif,image/webp,*/*;q=0.8", "Accept-Language": "en-US,en;q=0.5", "Accept-Encoding": "gzip, deflate, br", "Connection": "keep-alive", "Upgrade-Insecure-Requests": "1", } if referer: headers["Referer"] = referer if origin: headers["Origin"] = origin return headers def _parse_cookie_header(cookie_header: str) -> Dict[str, str]: jar: Dict[str, str] = {} for part in cookie_header.split(";"): part = part.strip() if not part or "=" not in part: continue k, v = part.split("=", 1) jar[k.strip()] = v.strip() return jar def _looks_like_cloudflare(html: bytes) -> bool: if not html: return False s = html[:40000].lower() return ( b"please wait while your request is being verified" in s or b"/cdn-cgi/challenge-platform/scripts/jsd/main.js" in s or b"rocket-loader.min.js" in s or b"cf-ray" in s or b"challenge-platform" in s or b"cf-chl-" in s ) # -------- runtime cookie configuration (preferred over env) -------------------- _INITIAL_COOKIES: Dict[str, str] = {} _INITIAL_COOKIE_HEADER: Optional[str] = None async def configure_cookies(cookies: Dict[str, str]) -> None: """ Configure initial cookies programmatically (preferred over env). Call BEFORE the first request (i.e., before get_client()/fetch()). If a client already exists, its jar is updated immediately. """ global _INITIAL_COOKIES, _INITIAL_COOKIE_HEADER _INITIAL_COOKIE_HEADER = None _INITIAL_COOKIES = dict(cookies or {}) # If client already built, update it now if _CLIENT is not None: print('configuring cookies') host = config()["base_host"] or "wholesale.suma.coop" for k, v in _INITIAL_COOKIES.items(): _CLIENT.cookies.set(k, v, domain=host, path="/") def configure_cookies_from_header(cookie_header: str) -> None: """ Configure initial cookies from a raw 'Cookie:' header string. Preferred over env; call BEFORE the first request. """ global _INITIAL_COOKIES, _INITIAL_COOKIE_HEADER _INITIAL_COOKIE_HEADER = cookie_header or "" _INITIAL_COOKIES = _parse_cookie_header(_INITIAL_COOKIE_HEADER) if _CLIENT is not None: host = config()["base_host"] or "wholesale.suma.coop" for k, v in _INITIAL_COOKIES.items(): _CLIENT.cookies.set(k, v, domain=host, path="/") # ------------------------------------------------------------------------------ async def get_client() -> httpx.AsyncClient: """Public accessor (same as _get_client).""" return await _get_client() async def _get_client() -> httpx.AsyncClient: global _CLIENT if _CLIENT is None: timeout = httpx.Timeout(300.0, connect=150.0) limits = httpx.Limits(max_keepalive_connections=8, max_connections=16) _CLIENT = httpx.AsyncClient( follow_redirects=True, timeout=timeout, http2=True, limits=limits, headers=_chrome_headers(), trust_env=True, ) # ---- Seed cookies (priority: runtime config > env var) --------------- host = config()["base_host"] or "wholesale.suma.coop" if _INITIAL_COOKIES or _INITIAL_COOKIE_HEADER: # From runtime config if _INITIAL_COOKIE_HEADER: _CLIENT.cookies.update(_parse_cookie_header(_INITIAL_COOKIE_HEADER)) for k, v in _INITIAL_COOKIES.items(): _CLIENT.cookies.set(k, v, domain=host, path="/") else: # Fallback to environment cookie_str = os.environ.get("SUMA_COOKIES", "").strip() if cookie_str: _CLIENT.cookies.update(_parse_cookie_header(cookie_str)) # Ensure private_content_version is present if "private_content_version" not in _CLIENT.cookies: pcv = secrets.token_hex(16) _CLIENT.cookies.set("private_content_version", pcv, domain=host, path="/") # --------------------------------------------------------------------- return _CLIENT async def aclose_client() -> None: global _CLIENT if _CLIENT is not None: await _CLIENT.aclose() _CLIENT = None async def fetch(url: str, *, referer: Optional[str] = None, retries: int = 3) -> str: client = await _get_client() # Warm-up visit to look like a real session if len(client.cookies.jar) == 0: try: await client.get(config()["base_url"].rstrip("/") + "/", headers=_chrome_headers()) await asyncio.sleep(0.25) except Exception: pass last_exc: Optional[Exception] = None for attempt in range(1, retries + 1): try: h = _chrome_headers(referer=referer or (config()["base_url"].rstrip("/") + "/")) r = await client.get(url, headers=h) if _looks_like_cloudflare(r.content): if attempt < retries: await asyncio.sleep(0.9 if attempt == 1 else 1.3) try: await client.get(config()["base_url"].rstrip("/") + "/", headers=_chrome_headers()) await asyncio.sleep(0.4) except Exception: pass continue try: r.raise_for_status() except httpx.HTTPStatusError as e: print(f"Fetch failed for {url}") print("Status:", r.status_code) print("Body:", r.text[:1000]) # Trimmed raise return r.text except Exception as e: last_exc = e if attempt >= retries: raise await asyncio.sleep(0.45 * attempt + 0.25) if last_exc: raise last_exc raise RuntimeError("fetch failed unexpectedly")