platform-core/modules/dbHelpers/paginationHelpers.py
ValueOn AG 4a60086c80
Some checks failed
Deploy Plattform-Core (Int) / test (push) Failing after 15s
Deploy Plattform-Core (Int) / deploy (push) Has been skipped
cp adapted to 2026 poweron
2026-06-09 09:53:31 +02:00

543 lines
19 KiB
Python

# Copyright (c) 2026 PowerOn AG
# All rights reserved.
"""
Pagination, filtering and sorting helpers for paginated record sets.
Provides unified logic for:
- mode=filterValues: distinct column values for filter dropdowns (cross-filtered)
- mode=ids: all IDs matching current filters (for bulk selection)
- In-memory equivalents for enriched/non-SQL routes
- FK-label-aware sorting (cross-DB)
"""
import copy
import json
import logging
import math
from datetime import datetime, timezone
from typing import Any, Callable, Dict, List, Optional
from fastapi.responses import JSONResponse
from modules.datamodels.datamodelPagination import (
PaginationParams,
normalize_pagination_dict,
)
from modules.shared.i18nRegistry import resolveText
logger = logging.getLogger(__name__)
# ---------------------------------------------------------------------------
# Cross-filter pagination parsing
# ---------------------------------------------------------------------------
def parseCrossFilterPagination(
column: str,
paginationJson: Optional[str],
) -> Optional[PaginationParams]:
"""
Parse pagination JSON, remove the requested column from filters (cross-filtering),
and drop sort — used for filter-values requests.
"""
if not paginationJson:
return None
try:
paginationDict = json.loads(paginationJson)
if not paginationDict:
return None
paginationDict = normalize_pagination_dict(paginationDict)
filters = paginationDict.get("filters", {})
filters.pop(column, None)
paginationDict["filters"] = filters
paginationDict.pop("sort", None)
return PaginationParams(**paginationDict)
except (json.JSONDecodeError, ValueError, TypeError):
return None
def parsePaginationForIds(
paginationJson: Optional[str],
) -> Optional[PaginationParams]:
"""
Parse pagination JSON for mode=ids — keep filters, drop sort and page/pageSize.
"""
if not paginationJson:
return None
try:
paginationDict = json.loads(paginationJson)
if not paginationDict:
return None
paginationDict = normalize_pagination_dict(paginationDict)
paginationDict.pop("sort", None)
return PaginationParams(**paginationDict)
except (json.JSONDecodeError, ValueError, TypeError):
return None
# ---------------------------------------------------------------------------
# SQL-based helpers (delegate to DB connector)
# ---------------------------------------------------------------------------
def handleFilterValuesMode(
db,
modelClass: type,
column: str,
paginationJson: Optional[str] = None,
recordFilter: Optional[Dict[str, Any]] = None,
enrichFn: Optional[Callable[[str, Optional[PaginationParams], Optional[Dict[str, Any]]], List[str]]] = None,
) -> List[str]:
"""
SQL-based distinct column values with cross-filtering.
If enrichFn is provided and the column is enriched (computed/joined),
enrichFn(column, crossPagination, recordFilter) is called instead of SQL DISTINCT.
"""
crossPagination = parseCrossFilterPagination(column, paginationJson)
if enrichFn:
try:
result = enrichFn(column, crossPagination, recordFilter)
if result is not None:
return JSONResponse(content=result)
except Exception as e:
logger.warning(f"handleFilterValuesMode enrichFn failed for {column}: {e}")
try:
values = db.getDistinctColumnValues(
modelClass, column,
pagination=crossPagination,
recordFilter=recordFilter,
) or []
return JSONResponse(content=values)
except Exception as e:
logger.error(f"handleFilterValuesMode SQL failed for {modelClass.__name__}.{column}: {e}")
return JSONResponse(content=[])
def handleIdsMode(
db,
modelClass: type,
paginationJson: Optional[str] = None,
recordFilter: Optional[Dict[str, Any]] = None,
idField: str = "id",
) -> List[str]:
"""
Return all IDs matching the current filters (no LIMIT/OFFSET).
Uses the same WHERE clause as getRecordsetPaginated.
"""
pagination = parsePaginationForIds(paginationJson)
table = modelClass.__name__
try:
if not db._ensureTableExists(modelClass):
return JSONResponse(content=[])
where_clause, _, _, values, _ = db._buildPaginationClauses(
modelClass, pagination, recordFilter,
)
sql = f'SELECT "{idField}"::TEXT AS val FROM "{table}"{where_clause} ORDER BY "{idField}"'
with db.borrowCursor() as cursor:
cursor.execute(sql, values)
return JSONResponse(content=[row["val"] for row in cursor.fetchall()])
except Exception as e:
logger.error(f"handleIdsMode failed for {table}: {e}")
return JSONResponse(content=[])
# ---------------------------------------------------------------------------
# In-memory helpers (for enriched / non-SQL routes)
# ---------------------------------------------------------------------------
def applyFiltersAndSort(
items: List[Dict[str, Any]],
paginationParams: Optional[PaginationParams],
) -> List[Dict[str, Any]]:
"""
Apply filters and sorting to a list of dicts in-memory.
Does NOT paginate (no page/pageSize slicing).
"""
if not paginationParams:
return items
result = list(items)
if paginationParams.filters:
filters = paginationParams.filters
searchTerm = filters.get("search", "").lower() if filters.get("search") else None
if searchTerm:
result = [
item for item in result
if any(
searchTerm in str(v).lower()
for v in item.values()
if v is not None
)
]
for field, filterValue in filters.items():
if field == "search":
continue
if isinstance(filterValue, dict) and "operator" in filterValue:
operator = filterValue.get("operator", "equals")
value = filterValue.get("value")
else:
operator = "equals"
value = filterValue
if value is None:
result = [
item for item in result
if item.get(field) is None or item.get(field) == ""
]
continue
if value == "":
continue
result = [
item for item in result
if _matchesFilter(item, field, operator, value)
]
if paginationParams.sort:
for sortField in reversed(paginationParams.sort):
fieldName = sortField.field
ascending = sortField.direction == "asc"
noneItems = [item for item in result if item.get(fieldName) is None]
nonNoneItems = [item for item in result if item.get(fieldName) is not None]
def _getSortKey(item: Dict[str, Any], _fn=fieldName):
value = item.get(_fn)
if isinstance(value, bool):
return (0, int(value), "")
if isinstance(value, (int, float)):
return (0, value, "")
return (1, 0, str(value).lower())
nonNoneItems = sorted(nonNoneItems, key=_getSortKey, reverse=not ascending)
result = nonNoneItems + noneItems
return result
def _matchesFilter(item: Dict[str, Any], field: str, operator: str, value: Any) -> bool:
"""Single-field filter match for in-memory filtering."""
itemValue = item.get(field)
if itemValue is None:
return False
itemStr = str(itemValue).lower()
valueStr = str(value).lower()
if operator in ("equals", "eq"):
return itemStr == valueStr
if operator == "contains":
return valueStr in itemStr
if operator == "startsWith":
return itemStr.startswith(valueStr)
if operator == "endsWith":
return itemStr.endswith(valueStr)
if operator in ("gt", "gte", "lt", "lte"):
try:
itemNum = float(itemValue)
valueNum = float(value)
if operator == "gt":
return itemNum > valueNum
if operator == "gte":
return itemNum >= valueNum
if operator == "lt":
return itemNum < valueNum
return itemNum <= valueNum
except (ValueError, TypeError):
return False
if operator == "between":
return _matchesBetween(itemValue, itemStr, value)
if operator == "in":
if isinstance(value, list):
return itemStr in [str(x).lower() for x in value]
return False
if operator == "notIn":
if isinstance(value, list):
return itemStr not in [str(x).lower() for x in value]
return True
return True
def _matchesBetween(itemValue: Any, itemStr: str, value: Any) -> bool:
"""Handle 'between' operator for date ranges and numeric ranges."""
if not isinstance(value, dict):
return True
fromVal = value.get("from", "")
toVal = value.get("to", "")
if not fromVal and not toVal:
return True
try:
fromTs = None
toTs = None
if fromVal:
fromTs = datetime.strptime(str(fromVal), "%Y-%m-%d").replace(tzinfo=timezone.utc).timestamp()
if toVal:
toTs = datetime.strptime(str(toVal), "%Y-%m-%d").replace(
hour=23, minute=59, second=59, tzinfo=timezone.utc
).timestamp()
itemNum = float(itemValue) if not isinstance(itemValue, (int, float)) else itemValue
if itemNum > 10000000000:
itemNum = itemNum / 1000
if fromTs is not None and toTs is not None:
return fromTs <= itemNum <= toTs
if fromTs is not None:
return itemNum >= fromTs
if toTs is not None:
return itemNum <= toTs
except (ValueError, TypeError):
try:
itemNum = float(itemValue)
fromNum = float(fromVal) if fromVal not in (None, "") else None
toNum = float(toVal) if toVal not in (None, "") else None
if fromNum is not None and toNum is not None:
return fromNum <= itemNum <= toNum
if fromNum is not None:
return itemNum >= fromNum
if toNum is not None:
return itemNum <= toNum
except (ValueError, TypeError):
pass
fromStr = str(fromVal).lower() if fromVal else ""
toStr = str(toVal).lower() if toVal else ""
if fromStr and toStr:
return fromStr <= itemStr <= toStr
if fromStr:
return itemStr >= fromStr
if toStr:
return itemStr <= toStr
return True
def _extractDistinctValues(
items: List[Dict[str, Any]],
columnKey: str,
requestLang: Optional[str] = None,
) -> list:
"""Extract sorted distinct display values for a column from enriched items.
When the items contain a ``{columnKey}Label`` field (FK enrichment convention),
returns ``{value, label}`` objects so the frontend shows human-readable
labels in filter dropdowns. Otherwise returns plain strings.
Includes ``None`` as the last entry when at least one row has a null/empty
value — this enables the "(Leer)" filter option in the frontend.
"""
_MISSING = object()
labelKey = f"{columnKey}Label"
hasFkLabels = any(labelKey in item for item in items[:20])
if hasFkLabels:
byVal: Dict[str, str] = {}
hasEmpty = False
for item in items:
val = item.get(columnKey, _MISSING)
if val is _MISSING:
continue
if val is None or val == "":
hasEmpty = True
continue
strVal = str(val)
if strVal not in byVal:
label = item.get(labelKey)
byVal[strVal] = str(label) if label else f"NA({strVal[:8]})"
result: list = sorted(
[{"value": v, "label": l} for v, l in byVal.items()],
key=lambda x: x["label"].lower(),
)
if hasEmpty:
result.append(None)
return result
values = set()
hasEmpty = False
for item in items:
val = item.get(columnKey, _MISSING)
if val is _MISSING:
continue
if val is None or val == "":
hasEmpty = True
continue
if isinstance(val, bool):
values.add("true" if val else "false")
elif isinstance(val, (int, float)):
values.add(str(val))
elif isinstance(val, dict):
text = resolveText(val, requestLang)
if text:
values.add(text)
else:
values.add(str(val))
result = sorted(values, key=lambda v: v.lower())
if hasEmpty:
result.append(None)
return result
def handleFilterValuesInMemory(
items: List[Dict[str, Any]],
column: str,
paginationJson: Optional[str] = None,
requestLang: Optional[str] = None,
) -> JSONResponse:
"""
In-memory filter-values: apply cross-filters, then extract distinct values.
For routes that build enriched in-memory lists.
Returns JSONResponse to bypass FastAPI response_model validation.
"""
crossFilterParams = parseCrossFilterPagination(column, paginationJson)
crossFiltered = applyFiltersAndSort(items, crossFilterParams)
return JSONResponse(content=_extractDistinctValues(crossFiltered, column, requestLang))
def handleIdsInMemory(
items: List[Dict[str, Any]],
paginationJson: Optional[str] = None,
idField: str = "id",
) -> JSONResponse:
"""
In-memory IDs: apply filters, return all IDs.
For routes that build enriched in-memory lists.
Returns JSONResponse to bypass FastAPI response_model validation.
"""
pagination = parsePaginationForIds(paginationJson)
filtered = applyFiltersAndSort(items, pagination)
ids = []
for item in filtered:
val = item.get(idField)
if val is not None:
ids.append(str(val))
return JSONResponse(content=ids)
def getRecordsetPaginatedWithFkSort(
db,
modelClass: type,
pagination,
recordFilter: Optional[Dict[str, Any]] = None,
labelResolvers: Optional[Dict[str, Callable[[List[str]], Dict[str, str]]]] = None,
fieldFilter: Optional[List[str]] = None,
idField: str = "id",
) -> Dict[str, Any]:
"""
Wrapper around db.getRecordsetPaginated that handles FK-label sorting.
If the current sort field is a FK with a registered labelResolver, the
function fetches all filtered IDs + FK values, resolves labels cross-DB,
sorts in-memory by label, and returns only the requested page.
If no FK sort is active, delegates directly to db.getRecordsetPaginated.
"""
from modules.dbHelpers.fkLabelResolver import enrichRowsWithFkLabels, buildLabelResolversFromModel
if not pagination or not pagination.sort:
result = db.getRecordsetPaginated(modelClass, pagination, recordFilter, fieldFilter)
enrichRowsWithFkLabels(result.get("items", []), modelClass, db=db)
return result
if labelResolvers is None:
labelResolvers = buildLabelResolversFromModel(modelClass, db)
if not labelResolvers:
result = db.getRecordsetPaginated(modelClass, pagination, recordFilter, fieldFilter)
enrichRowsWithFkLabels(result.get("items", []), modelClass, db=db)
return result
fkSortField = None
fkSortDir = "asc"
for sf in pagination.sort:
sfField = sf.get("field") if isinstance(sf, dict) else getattr(sf, "field", None)
sfDir = sf.get("direction", "asc") if isinstance(sf, dict) else getattr(sf, "direction", "asc")
if sfField and sfField in labelResolvers:
fkSortField = sfField
fkSortDir = str(sfDir).lower()
break
if not fkSortField:
result = db.getRecordsetPaginated(modelClass, pagination, recordFilter, fieldFilter)
enrichRowsWithFkLabels(result.get("items", []), modelClass, db=db)
return result
try:
distinctIds = db.getDistinctColumnValues(
modelClass, fkSortField, recordFilter=recordFilter,
) or []
labelMap = {}
if distinctIds:
try:
labelMap = labelResolvers[fkSortField](distinctIds)
except Exception as e:
logger.warning(f"getRecordsetPaginatedWithFkSort: resolver for {fkSortField} failed: {e}")
filterOnlyPagination = copy.deepcopy(pagination)
filterOnlyPagination.sort = []
filterOnlyPagination.page = 1
filterOnlyPagination.pageSize = 999999
lightRows = db.getRecordsetPaginated(
modelClass, filterOnlyPagination, recordFilter,
fieldFilter=[idField, fkSortField],
)
allRows = lightRows.get("items", [])
totalItems = len(allRows)
if totalItems == 0:
return {"items": [], "totalItems": 0, "totalPages": 0}
def _sortKey(row):
fkVal = row.get(fkSortField, "") or ""
label = labelMap.get(str(fkVal), str(fkVal)).lower()
return label
reverse = fkSortDir == "desc"
allRows.sort(key=_sortKey, reverse=reverse)
pageSize = pagination.pageSize
offset = (pagination.page - 1) * pageSize
pageSlice = allRows[offset:offset + pageSize]
pageIds = [row[idField] for row in pageSlice if row.get(idField)]
if not pageIds:
return {"items": [], "totalItems": totalItems, "totalPages": math.ceil(totalItems / pageSize)}
pageItems = db.getRecordset(modelClass, recordFilter={idField: pageIds}, fieldFilter=fieldFilter)
idOrder = {pid: idx for idx, pid in enumerate(pageIds)}
pageItems.sort(key=lambda r: idOrder.get(r.get(idField), 999999))
enrichRowsWithFkLabels(pageItems, modelClass, db=db)
totalPages = math.ceil(totalItems / pageSize) if totalItems > 0 else 0
return {"items": pageItems, "totalItems": totalItems, "totalPages": totalPages}
except Exception as e:
logger.error(f"getRecordsetPaginatedWithFkSort failed for {modelClass.__name__}: {e}")
result = db.getRecordsetPaginated(modelClass, pagination, recordFilter, fieldFilter)
enrichRowsWithFkLabels(result.get("items", []), modelClass, db=db)
return result
def paginateInMemory(
items: List[Dict[str, Any]],
paginationParams: Optional[PaginationParams],
) -> tuple:
"""
Apply pagination (page/pageSize slicing) to an already-filtered+sorted list.
Returns (pageItems, totalItems).
"""
totalItems = len(items)
if not paginationParams:
return items, totalItems
offset = (paginationParams.page - 1) * paginationParams.pageSize
pageItems = items[offset:offset + paginationParams.pageSize]
return pageItems, totalItems