# Copyright (c) 2025 Patrick Motsch # All rights reserved. """ Pagination, filtering and sorting helpers for paginated record sets. Provides unified logic for: - mode=filterValues: distinct column values for filter dropdowns (cross-filtered) - mode=ids: all IDs matching current filters (for bulk selection) - In-memory equivalents for enriched/non-SQL routes - FK-label-aware sorting (cross-DB) """ import copy import json import logging import math from datetime import datetime, timezone from typing import Any, Callable, Dict, List, Optional from fastapi.responses import JSONResponse from modules.datamodels.datamodelPagination import ( PaginationParams, normalize_pagination_dict, ) from modules.shared.i18nRegistry import resolveText logger = logging.getLogger(__name__) # --------------------------------------------------------------------------- # Cross-filter pagination parsing # --------------------------------------------------------------------------- def parseCrossFilterPagination( column: str, paginationJson: Optional[str], ) -> Optional[PaginationParams]: """ Parse pagination JSON, remove the requested column from filters (cross-filtering), and drop sort — used for filter-values requests. """ if not paginationJson: return None try: paginationDict = json.loads(paginationJson) if not paginationDict: return None paginationDict = normalize_pagination_dict(paginationDict) filters = paginationDict.get("filters", {}) filters.pop(column, None) paginationDict["filters"] = filters paginationDict.pop("sort", None) return PaginationParams(**paginationDict) except (json.JSONDecodeError, ValueError, TypeError): return None def parsePaginationForIds( paginationJson: Optional[str], ) -> Optional[PaginationParams]: """ Parse pagination JSON for mode=ids — keep filters, drop sort and page/pageSize. """ if not paginationJson: return None try: paginationDict = json.loads(paginationJson) if not paginationDict: return None paginationDict = normalize_pagination_dict(paginationDict) paginationDict.pop("sort", None) return PaginationParams(**paginationDict) except (json.JSONDecodeError, ValueError, TypeError): return None # --------------------------------------------------------------------------- # SQL-based helpers (delegate to DB connector) # --------------------------------------------------------------------------- def handleFilterValuesMode( db, modelClass: type, column: str, paginationJson: Optional[str] = None, recordFilter: Optional[Dict[str, Any]] = None, enrichFn: Optional[Callable[[str, Optional[PaginationParams], Optional[Dict[str, Any]]], List[str]]] = None, ) -> List[str]: """ SQL-based distinct column values with cross-filtering. If enrichFn is provided and the column is enriched (computed/joined), enrichFn(column, crossPagination, recordFilter) is called instead of SQL DISTINCT. """ crossPagination = parseCrossFilterPagination(column, paginationJson) if enrichFn: try: result = enrichFn(column, crossPagination, recordFilter) if result is not None: return JSONResponse(content=result) except Exception as e: logger.warning(f"handleFilterValuesMode enrichFn failed for {column}: {e}") try: values = db.getDistinctColumnValues( modelClass, column, pagination=crossPagination, recordFilter=recordFilter, ) or [] return JSONResponse(content=values) except Exception as e: logger.error(f"handleFilterValuesMode SQL failed for {modelClass.__name__}.{column}: {e}") return JSONResponse(content=[]) def handleIdsMode( db, modelClass: type, paginationJson: Optional[str] = None, recordFilter: Optional[Dict[str, Any]] = None, idField: str = "id", ) -> List[str]: """ Return all IDs matching the current filters (no LIMIT/OFFSET). Uses the same WHERE clause as getRecordsetPaginated. """ pagination = parsePaginationForIds(paginationJson) table = modelClass.__name__ try: if not db._ensureTableExists(modelClass): return JSONResponse(content=[]) where_clause, _, _, values, _ = db._buildPaginationClauses( modelClass, pagination, recordFilter, ) sql = f'SELECT "{idField}"::TEXT AS val FROM "{table}"{where_clause} ORDER BY "{idField}"' with db.borrowCursor() as cursor: cursor.execute(sql, values) return JSONResponse(content=[row["val"] for row in cursor.fetchall()]) except Exception as e: logger.error(f"handleIdsMode failed for {table}: {e}") return JSONResponse(content=[]) # --------------------------------------------------------------------------- # In-memory helpers (for enriched / non-SQL routes) # --------------------------------------------------------------------------- def applyFiltersAndSort( items: List[Dict[str, Any]], paginationParams: Optional[PaginationParams], ) -> List[Dict[str, Any]]: """ Apply filters and sorting to a list of dicts in-memory. Does NOT paginate (no page/pageSize slicing). """ if not paginationParams: return items result = list(items) if paginationParams.filters: filters = paginationParams.filters searchTerm = filters.get("search", "").lower() if filters.get("search") else None if searchTerm: result = [ item for item in result if any( searchTerm in str(v).lower() for v in item.values() if v is not None ) ] for field, filterValue in filters.items(): if field == "search": continue if isinstance(filterValue, dict) and "operator" in filterValue: operator = filterValue.get("operator", "equals") value = filterValue.get("value") else: operator = "equals" value = filterValue if value is None: result = [ item for item in result if item.get(field) is None or item.get(field) == "" ] continue if value == "": continue result = [ item for item in result if _matchesFilter(item, field, operator, value) ] if paginationParams.sort: for sortField in reversed(paginationParams.sort): fieldName = sortField.field ascending = sortField.direction == "asc" noneItems = [item for item in result if item.get(fieldName) is None] nonNoneItems = [item for item in result if item.get(fieldName) is not None] def _getSortKey(item: Dict[str, Any], _fn=fieldName): value = item.get(_fn) if isinstance(value, bool): return (0, int(value), "") if isinstance(value, (int, float)): return (0, value, "") return (1, 0, str(value).lower()) nonNoneItems = sorted(nonNoneItems, key=_getSortKey, reverse=not ascending) result = nonNoneItems + noneItems return result def _matchesFilter(item: Dict[str, Any], field: str, operator: str, value: Any) -> bool: """Single-field filter match for in-memory filtering.""" itemValue = item.get(field) if itemValue is None: return False itemStr = str(itemValue).lower() valueStr = str(value).lower() if operator in ("equals", "eq"): return itemStr == valueStr if operator == "contains": return valueStr in itemStr if operator == "startsWith": return itemStr.startswith(valueStr) if operator == "endsWith": return itemStr.endswith(valueStr) if operator in ("gt", "gte", "lt", "lte"): try: itemNum = float(itemValue) valueNum = float(value) if operator == "gt": return itemNum > valueNum if operator == "gte": return itemNum >= valueNum if operator == "lt": return itemNum < valueNum return itemNum <= valueNum except (ValueError, TypeError): return False if operator == "between": return _matchesBetween(itemValue, itemStr, value) if operator == "in": if isinstance(value, list): return itemStr in [str(x).lower() for x in value] return False if operator == "notIn": if isinstance(value, list): return itemStr not in [str(x).lower() for x in value] return True return True def _matchesBetween(itemValue: Any, itemStr: str, value: Any) -> bool: """Handle 'between' operator for date ranges and numeric ranges.""" if not isinstance(value, dict): return True fromVal = value.get("from", "") toVal = value.get("to", "") if not fromVal and not toVal: return True try: fromTs = None toTs = None if fromVal: fromTs = datetime.strptime(str(fromVal), "%Y-%m-%d").replace(tzinfo=timezone.utc).timestamp() if toVal: toTs = datetime.strptime(str(toVal), "%Y-%m-%d").replace( hour=23, minute=59, second=59, tzinfo=timezone.utc ).timestamp() itemNum = float(itemValue) if not isinstance(itemValue, (int, float)) else itemValue if itemNum > 10000000000: itemNum = itemNum / 1000 if fromTs is not None and toTs is not None: return fromTs <= itemNum <= toTs if fromTs is not None: return itemNum >= fromTs if toTs is not None: return itemNum <= toTs except (ValueError, TypeError): try: itemNum = float(itemValue) fromNum = float(fromVal) if fromVal not in (None, "") else None toNum = float(toVal) if toVal not in (None, "") else None if fromNum is not None and toNum is not None: return fromNum <= itemNum <= toNum if fromNum is not None: return itemNum >= fromNum if toNum is not None: return itemNum <= toNum except (ValueError, TypeError): pass fromStr = str(fromVal).lower() if fromVal else "" toStr = str(toVal).lower() if toVal else "" if fromStr and toStr: return fromStr <= itemStr <= toStr if fromStr: return itemStr >= fromStr if toStr: return itemStr <= toStr return True def _extractDistinctValues( items: List[Dict[str, Any]], columnKey: str, requestLang: Optional[str] = None, ) -> list: """Extract sorted distinct display values for a column from enriched items. When the items contain a ``{columnKey}Label`` field (FK enrichment convention), returns ``{value, label}`` objects so the frontend shows human-readable labels in filter dropdowns. Otherwise returns plain strings. Includes ``None`` as the last entry when at least one row has a null/empty value — this enables the "(Leer)" filter option in the frontend. """ _MISSING = object() labelKey = f"{columnKey}Label" hasFkLabels = any(labelKey in item for item in items[:20]) if hasFkLabels: byVal: Dict[str, str] = {} hasEmpty = False for item in items: val = item.get(columnKey, _MISSING) if val is _MISSING: continue if val is None or val == "": hasEmpty = True continue strVal = str(val) if strVal not in byVal: label = item.get(labelKey) byVal[strVal] = str(label) if label else f"NA({strVal[:8]})" result: list = sorted( [{"value": v, "label": l} for v, l in byVal.items()], key=lambda x: x["label"].lower(), ) if hasEmpty: result.append(None) return result values = set() hasEmpty = False for item in items: val = item.get(columnKey, _MISSING) if val is _MISSING: continue if val is None or val == "": hasEmpty = True continue if isinstance(val, bool): values.add("true" if val else "false") elif isinstance(val, (int, float)): values.add(str(val)) elif isinstance(val, dict): text = resolveText(val, requestLang) if text: values.add(text) else: values.add(str(val)) result = sorted(values, key=lambda v: v.lower()) if hasEmpty: result.append(None) return result def handleFilterValuesInMemory( items: List[Dict[str, Any]], column: str, paginationJson: Optional[str] = None, requestLang: Optional[str] = None, ) -> JSONResponse: """ In-memory filter-values: apply cross-filters, then extract distinct values. For routes that build enriched in-memory lists. Returns JSONResponse to bypass FastAPI response_model validation. """ crossFilterParams = parseCrossFilterPagination(column, paginationJson) crossFiltered = applyFiltersAndSort(items, crossFilterParams) return JSONResponse(content=_extractDistinctValues(crossFiltered, column, requestLang)) def handleIdsInMemory( items: List[Dict[str, Any]], paginationJson: Optional[str] = None, idField: str = "id", ) -> JSONResponse: """ In-memory IDs: apply filters, return all IDs. For routes that build enriched in-memory lists. Returns JSONResponse to bypass FastAPI response_model validation. """ pagination = parsePaginationForIds(paginationJson) filtered = applyFiltersAndSort(items, pagination) ids = [] for item in filtered: val = item.get(idField) if val is not None: ids.append(str(val)) return JSONResponse(content=ids) def getRecordsetPaginatedWithFkSort( db, modelClass: type, pagination, recordFilter: Optional[Dict[str, Any]] = None, labelResolvers: Optional[Dict[str, Callable[[List[str]], Dict[str, str]]]] = None, fieldFilter: Optional[List[str]] = None, idField: str = "id", ) -> Dict[str, Any]: """ Wrapper around db.getRecordsetPaginated that handles FK-label sorting. If the current sort field is a FK with a registered labelResolver, the function fetches all filtered IDs + FK values, resolves labels cross-DB, sorts in-memory by label, and returns only the requested page. If no FK sort is active, delegates directly to db.getRecordsetPaginated. """ from modules.dbHelpers.fkLabelResolver import enrichRowsWithFkLabels, buildLabelResolversFromModel if not pagination or not pagination.sort: result = db.getRecordsetPaginated(modelClass, pagination, recordFilter, fieldFilter) enrichRowsWithFkLabels(result.get("items", []), modelClass, db=db) return result if labelResolvers is None: labelResolvers = buildLabelResolversFromModel(modelClass, db) if not labelResolvers: result = db.getRecordsetPaginated(modelClass, pagination, recordFilter, fieldFilter) enrichRowsWithFkLabels(result.get("items", []), modelClass, db=db) return result fkSortField = None fkSortDir = "asc" for sf in pagination.sort: sfField = sf.get("field") if isinstance(sf, dict) else getattr(sf, "field", None) sfDir = sf.get("direction", "asc") if isinstance(sf, dict) else getattr(sf, "direction", "asc") if sfField and sfField in labelResolvers: fkSortField = sfField fkSortDir = str(sfDir).lower() break if not fkSortField: result = db.getRecordsetPaginated(modelClass, pagination, recordFilter, fieldFilter) enrichRowsWithFkLabels(result.get("items", []), modelClass, db=db) return result try: distinctIds = db.getDistinctColumnValues( modelClass, fkSortField, recordFilter=recordFilter, ) or [] labelMap = {} if distinctIds: try: labelMap = labelResolvers[fkSortField](distinctIds) except Exception as e: logger.warning(f"getRecordsetPaginatedWithFkSort: resolver for {fkSortField} failed: {e}") filterOnlyPagination = copy.deepcopy(pagination) filterOnlyPagination.sort = [] filterOnlyPagination.page = 1 filterOnlyPagination.pageSize = 999999 lightRows = db.getRecordsetPaginated( modelClass, filterOnlyPagination, recordFilter, fieldFilter=[idField, fkSortField], ) allRows = lightRows.get("items", []) totalItems = len(allRows) if totalItems == 0: return {"items": [], "totalItems": 0, "totalPages": 0} def _sortKey(row): fkVal = row.get(fkSortField, "") or "" label = labelMap.get(str(fkVal), str(fkVal)).lower() return label reverse = fkSortDir == "desc" allRows.sort(key=_sortKey, reverse=reverse) pageSize = pagination.pageSize offset = (pagination.page - 1) * pageSize pageSlice = allRows[offset:offset + pageSize] pageIds = [row[idField] for row in pageSlice if row.get(idField)] if not pageIds: return {"items": [], "totalItems": totalItems, "totalPages": math.ceil(totalItems / pageSize)} pageItems = db.getRecordset(modelClass, recordFilter={idField: pageIds}, fieldFilter=fieldFilter) idOrder = {pid: idx for idx, pid in enumerate(pageIds)} pageItems.sort(key=lambda r: idOrder.get(r.get(idField), 999999)) enrichRowsWithFkLabels(pageItems, modelClass, db=db) totalPages = math.ceil(totalItems / pageSize) if totalItems > 0 else 0 return {"items": pageItems, "totalItems": totalItems, "totalPages": totalPages} except Exception as e: logger.error(f"getRecordsetPaginatedWithFkSort failed for {modelClass.__name__}: {e}") result = db.getRecordsetPaginated(modelClass, pagination, recordFilter, fieldFilter) enrichRowsWithFkLabels(result.get("items", []), modelClass, db=db) return result def paginateInMemory( items: List[Dict[str, Any]], paginationParams: Optional[PaginationParams], ) -> tuple: """ Apply pagination (page/pageSize slicing) to an already-filtered+sorted list. Returns (pageItems, totalItems). """ totalItems = len(items) if not paginationParams: return items, totalItems offset = (paginationParams.page - 1) * paginationParams.pageSize pageItems = items[offset:offset + paginationParams.pageSize] return pageItems, totalItems