gateway/modules/features/redmine/serviceRedmineStatsCache.py
2026-04-21 21:30:11 +02:00

131 lines
4 KiB
Python

# Copyright (c) 2026 Patrick Motsch
# All rights reserved.
"""TTL-based in-memory cache for ``serviceRedmineStats`` results.
The cache key is ``(featureInstanceId, dateFrom, dateTo, bucket, sorted(trackerIds))``.
Any write through ``serviceRedmine`` (createIssue, updateIssue, deleteIssue,
addRelation, deleteRelation) MUST call :func:`invalidateInstance` to drop
all cached entries for that feature instance.
Default TTL: 90 seconds. Override at construction or via ``setTtl``.
"""
from __future__ import annotations
import threading
import time
from dataclasses import dataclass
from typing import Any, Dict, Iterable, Optional, Tuple
_DEFAULT_TTL_SECONDS = 90.0
def _freeze(value: Any) -> Any:
"""Make ``value`` hashable so it can live in a tuple cache key.
Lists / sets become sorted tuples; dicts become sorted item tuples;
everything else is returned untouched.
"""
if isinstance(value, (list, set, tuple)):
try:
return tuple(sorted(value))
except TypeError:
return tuple(value)
if isinstance(value, dict):
return tuple(sorted(value.items()))
return value
@dataclass
class _CacheEntry:
value: Any
expiresAt: float
CacheKey = Tuple[str, Optional[str], Optional[str], str, Tuple[int, ...], Tuple[Any, ...]]
class RedmineStatsCache:
"""Thread-safe TTL cache."""
def __init__(self, ttlSeconds: float = _DEFAULT_TTL_SECONDS) -> None:
self._ttlSeconds = float(ttlSeconds)
self._store: Dict[CacheKey, _CacheEntry] = {}
self._lock = threading.Lock()
def setTtl(self, ttlSeconds: float) -> None:
self._ttlSeconds = float(ttlSeconds)
@staticmethod
def buildKey(
featureInstanceId: str,
dateFrom: Optional[str],
dateTo: Optional[str],
bucket: str,
trackerIds: Iterable[int],
*extraDims: Any,
) -> CacheKey:
"""Build a cache key for the given query.
``extraDims`` is an open-ended tail so callers can add more filter
dimensions (e.g. ``categoryIds``, ``statusFilter``) without forcing
a signature break here. Pass them as already-canonicalised values
(sorted lists, normalised strings, ...) so the same query always
produces the same key.
"""
return (
str(featureInstanceId),
dateFrom or None,
dateTo or None,
(bucket or "week").lower(),
tuple(sorted(int(t) for t in trackerIds or [])),
tuple(_freeze(d) for d in extraDims),
)
def get(self, key: CacheKey) -> Optional[Any]:
now = time.monotonic()
with self._lock:
entry = self._store.get(key)
if not entry:
return None
if entry.expiresAt < now:
self._store.pop(key, None)
return None
return entry.value
def set(self, key: CacheKey, value: Any, *, ttlSeconds: Optional[float] = None) -> None:
ttl = float(ttlSeconds) if ttlSeconds is not None else self._ttlSeconds
with self._lock:
self._store[key] = _CacheEntry(value=value, expiresAt=time.monotonic() + ttl)
def invalidateInstance(self, featureInstanceId: str) -> int:
"""Drop every entry whose key starts with ``featureInstanceId``.
Returns the number of entries dropped.
"""
target = str(featureInstanceId)
with self._lock:
to_drop = [k for k in self._store.keys() if k[0] == target]
for k in to_drop:
self._store.pop(k, None)
return len(to_drop)
def clear(self) -> None:
with self._lock:
self._store.clear()
def size(self) -> int:
with self._lock:
return len(self._store)
_globalCache: Optional[RedmineStatsCache] = None
def _getStatsCache() -> RedmineStatsCache:
"""Process-wide singleton."""
global _globalCache
if _globalCache is None:
_globalCache = RedmineStatsCache()
return _globalCache