platform-core/modules/shared/httpResilience.py
ValueOn AG bc7c6fe27c
Some checks failed
Deploy Plattform-Core (Int) / test (push) Failing after 13s
Deploy Plattform-Core (Int) / deploy (push) Has been skipped
elimination of technical issues (imports)
2026-06-06 00:32:45 +02:00

241 lines
10 KiB
Python

# Copyright (c) 2025 Patrick Motsch
# All rights reserved.
"""Shared HTTP resilience helpers for provider connectors.
Provides a reusable session pool with concurrency limiter and retry-with-backoff
so that Google, MSFT and Infomaniak connectors do not each re-implement
per-request sessions, unbounded parallelism, and missing retry logic.
"""
import asyncio
import logging
import time
from typing import Any, Dict, Optional, Union
import aiohttp
logger = logging.getLogger(__name__)
_DEFAULT_MAX_CONCURRENT = 8
_DEFAULT_MAX_RETRIES = 3
_DEFAULT_TIMEOUT_S = 30
_RETRYABLE_STATUS = {429, 502, 503, 504}
_instances: list["ResilientHttp"] = []
class ResilientHttp:
"""Managed aiohttp.ClientSession with semaphore + retry.
Typical usage inside a connector module-level function::
_http = ResilientHttp("Google", maxConcurrent=8)
async def _googleGet(token, url):
return await _http.getJson(url, headers={"Authorization": f"Bearer {token}"})
The session is created lazily on first call, reused across requests,
and closed via ``closeAllResilientHttp()`` at app shutdown.
"""
def __init__(
self,
providerLabel: str = "HTTP",
maxConcurrent: int = _DEFAULT_MAX_CONCURRENT,
maxRetries: int = _DEFAULT_MAX_RETRIES,
defaultTimeoutS: float = _DEFAULT_TIMEOUT_S,
):
self._label = providerLabel
self._maxConcurrent = maxConcurrent
self._maxRetries = maxRetries
self._defaultTimeout = aiohttp.ClientTimeout(total=defaultTimeoutS)
self._semaphore: Optional[asyncio.Semaphore] = None
self._session: Optional[aiohttp.ClientSession] = None
_instances.append(self)
def _ensureReady(self) -> aiohttp.ClientSession:
if self._semaphore is None:
self._semaphore = asyncio.Semaphore(self._maxConcurrent)
if self._session is None or self._session.closed:
self._session = aiohttp.ClientSession(timeout=self._defaultTimeout)
return self._session
async def close(self):
if self._session and not self._session.closed:
await self._session.close()
await asyncio.sleep(0.25)
self._session = None
async def getJson(
self,
url: str,
headers: Dict[str, str],
timeout: Optional[aiohttp.ClientTimeout] = None,
allowRedirects: bool = True,
) -> Dict[str, Any]:
"""GET request returning parsed JSON with retry + throttle."""
session = self._ensureReady()
assert self._semaphore is not None
lastError: Optional[str] = None
for attempt in range(1, self._maxRetries + 1):
async with self._semaphore:
try:
async with session.get(
url,
headers=headers,
timeout=timeout or self._defaultTimeout,
allow_redirects=allowRedirects,
) as resp:
if resp.status in (200, 201):
return await resp.json()
if resp.status in _RETRYABLE_STATUS:
retryAfter = _parseRetryAfter(resp.headers.get("Retry-After"))
waitS = retryAfter if retryAfter > 0 else min(2 ** attempt, 30)
logger.warning(
f"{self._label} GET {resp.status} (attempt {attempt}/{self._maxRetries}), "
f"retry in {waitS:.1f}s: {url[:120]}"
)
await asyncio.sleep(waitS)
continue
errorText = await resp.text()
lastError = f"{resp.status}: {errorText[:200]}"
logger.warning(f"{self._label} GET {url[:120]} -> {lastError[:300]}")
return {"error": lastError}
except asyncio.TimeoutError:
lastError = f"timeout after {self._defaultTimeout.total}s"
if attempt < self._maxRetries:
logger.warning(f"{self._label} GET timeout (attempt {attempt}): {url[:120]}")
await asyncio.sleep(min(2 ** attempt, 10))
continue
except aiohttp.ClientError as e:
lastError = str(e)
if attempt < self._maxRetries:
logger.warning(f"{self._label} GET client error (attempt {attempt}): {e}")
await asyncio.sleep(min(2 ** attempt, 10))
continue
return {"error": lastError or "unknown error"}
async def getBytes(
self,
url: str,
headers: Dict[str, str],
timeout: Optional[aiohttp.ClientTimeout] = None,
allowRedirects: bool = True,
) -> Optional[bytes]:
"""GET request returning raw bytes (for file downloads)."""
session = self._ensureReady()
assert self._semaphore is not None
for attempt in range(1, self._maxRetries + 1):
async with self._semaphore:
try:
async with session.get(
url,
headers=headers,
timeout=timeout or self._defaultTimeout,
allow_redirects=allowRedirects,
) as resp:
if resp.status == 200:
return await resp.read()
if resp.status in _RETRYABLE_STATUS:
retryAfter = _parseRetryAfter(resp.headers.get("Retry-After"))
waitS = retryAfter if retryAfter > 0 else min(2 ** attempt, 30)
logger.warning(
f"{self._label} download {resp.status} (attempt {attempt}), "
f"retry in {waitS:.1f}s: {url[:120]}"
)
await asyncio.sleep(waitS)
continue
errorText = await resp.text()
logger.warning(f"{self._label} download {url[:120]} -> {resp.status}: {errorText[:200]}")
return None
except asyncio.TimeoutError:
if attempt < self._maxRetries:
logger.warning(f"{self._label} download timeout (attempt {attempt}): {url[:120]}")
await asyncio.sleep(min(2 ** attempt, 10))
continue
except aiohttp.ClientError as e:
if attempt < self._maxRetries:
logger.warning(f"{self._label} download client error (attempt {attempt}): {e}")
await asyncio.sleep(min(2 ** attempt, 10))
continue
return None
async def request(
self,
method: str,
url: str,
headers: Dict[str, str],
data: Any = None,
timeout: Optional[aiohttp.ClientTimeout] = None,
) -> Dict[str, Any]:
"""Generic HTTP request with retry for retryable status codes."""
session = self._ensureReady()
assert self._semaphore is not None
lastError: Optional[str] = None
for attempt in range(1, self._maxRetries + 1):
async with self._semaphore:
try:
kwargs: Dict[str, Any] = {"headers": headers}
if data is not None:
kwargs["data"] = data
async with session.request(
method, url,
timeout=timeout or self._defaultTimeout,
**kwargs,
) as resp:
if resp.status in (200, 201, 202, 204):
if resp.status == 204:
return {}
return await resp.json()
if resp.status in _RETRYABLE_STATUS:
retryAfter = _parseRetryAfter(resp.headers.get("Retry-After"))
waitS = retryAfter if retryAfter > 0 else min(2 ** attempt, 30)
logger.warning(
f"{self._label} {method} {resp.status} (attempt {attempt}), "
f"retry in {waitS:.1f}s: {url[:120]}"
)
await asyncio.sleep(waitS)
continue
errorText = await resp.text()
lastError = f"{resp.status}: {errorText[:200]}"
logger.warning(f"{self._label} {method} {url[:120]} -> {lastError[:300]}")
return {"error": lastError}
except asyncio.TimeoutError:
lastError = f"timeout after {(timeout or self._defaultTimeout).total}s"
if attempt < self._maxRetries:
await asyncio.sleep(min(2 ** attempt, 10))
continue
except aiohttp.ClientError as e:
lastError = str(e)
if attempt < self._maxRetries:
await asyncio.sleep(min(2 ** attempt, 10))
continue
return {"error": lastError or "unknown error"}
async def closeAllResilientHttp() -> None:
"""Close all ResilientHttp sessions. Call at app shutdown."""
for inst in _instances:
try:
await inst.close()
except Exception as e:
logger.debug(f"Error closing {inst._label} session: {e}")
logger.info(f"Closed {len(_instances)} ResilientHttp session(s)")
def _parseRetryAfter(value: Optional[str]) -> float:
"""Parse Retry-After header (seconds or HTTP-date). Returns 0 if absent/unparseable."""
if not value:
return 0.0
try:
return float(value)
except ValueError:
pass
try:
from email.utils import parsedate_to_datetime
dt = parsedate_to_datetime(value)
delta = (dt.timestamp() - time.time())
return max(delta, 0.5)
except Exception:
return 0.0